splitmuxsink: Don't assert on the input side if no GOP is available when shutting...
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @title: splitmuxsink
23  * @short_description: Muxer wrapper for splitting output stream by size or time
24  *
25  * This element wraps a muxer and a sink, and starts a new file when the mux
26  * contents are about to cross a threshold of maximum size of maximum time,
27  * splitting at video keyframe boundaries. Exactly one input video stream
28  * can be muxed, with as many accompanying audio and subtitle streams as
29  * desired.
30  *
31  * By default, it uses mp4mux and filesink, but they can be changed via
32  * the 'muxer' and 'sink' properties.
33  *
34  * The minimum file size is 1 GOP, however - so limits may be overrun if the
35  * distance between any 2 keyframes is larger than the limits.
36  *
37  * If a video stream is available, the splitting process is driven by the video
38  * stream contents, and the video stream must contain closed GOPs for the output
39  * file parts to be played individually correctly. In the absence of a video
40  * stream, the first available stream is used as reference for synchronization.
41  *
42  * In the async-finalize mode, when the threshold is crossed, the old muxer
43  * and sink is disconnected from the pipeline and left to finish the file
44  * asynchronously, and a new muxer and sink is created to continue with the
45  * next fragment. For that reason, instead of muxer and sink objects, the
46  * muxer-factory and sink-factory properties are used to construct the new
47  * objects, together with muxer-properties and sink-properties.
48  *
49  * ## Example pipelines
50  * |[
51  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52  * ]|
53  * Records a video stream captured from a v4l2 device and muxes it into
54  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55  * and 1MB maximum size.
56  *
57  * |[
58  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59  * ]|
60  * Records a video stream captured from a v4l2 device and muxer it into
61  * streamable Matroska files, splitting as needed to limit size/duration to 10
62  * seconds. Each file will finalize asynchronously.
63  *
64  * |[
65  * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
66  * ]|
67  * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
68  * it will deliver to.
69  */
70
71 #ifdef HAVE_CONFIG_H
72 #include "config.h"
73 #endif
74
75 #include <string.h>
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
79
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
82
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
85
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
90
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
93
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97     GstClockTime split_time);
98
99 enum
100 {
101   PROP_0,
102   PROP_LOCATION,
103   PROP_START_INDEX,
104   PROP_MAX_SIZE_TIME,
105   PROP_MAX_SIZE_BYTES,
106   PROP_MAX_SIZE_TIMECODE,
107   PROP_SEND_KEYFRAME_REQUESTS,
108   PROP_MAX_FILES,
109   PROP_MUXER_OVERHEAD,
110   PROP_USE_ROBUST_MUXING,
111   PROP_ALIGNMENT_THRESHOLD,
112   PROP_MUXER,
113   PROP_SINK,
114   PROP_RESET_MUXER,
115   PROP_ASYNC_FINALIZE,
116   PROP_MUXER_FACTORY,
117   PROP_MUXER_PRESET,
118   PROP_MUXER_PROPERTIES,
119   PROP_SINK_FACTORY,
120   PROP_SINK_PRESET,
121   PROP_SINK_PROPERTIES,
122   PROP_MUXERPAD_MAP
123 };
124
125 #define DEFAULT_MAX_SIZE_TIME       0
126 #define DEFAULT_MAX_SIZE_BYTES      0
127 #define DEFAULT_MAX_FILES           0
128 #define DEFAULT_MUXER_OVERHEAD      0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
137
138 typedef struct _AsyncEosHelper
139 {
140   MqStreamCtx *ctx;
141   GstPad *pad;
142 } AsyncEosHelper;
143
144 enum
145 {
146   SIGNAL_FORMAT_LOCATION,
147   SIGNAL_FORMAT_LOCATION_FULL,
148   SIGNAL_SPLIT_NOW,
149   SIGNAL_SPLIT_AFTER,
150   SIGNAL_SPLIT_AT_RUNNING_TIME,
151   SIGNAL_MUXER_ADDED,
152   SIGNAL_SINK_ADDED,
153   SIGNAL_LAST
154 };
155
156 static guint signals[SIGNAL_LAST];
157
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
160     GST_PAD_SINK,
161     GST_PAD_REQUEST,
162     GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
165     GST_PAD_SINK,
166     GST_PAD_REQUEST,
167     GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
170     GST_PAD_SINK,
171     GST_PAD_REQUEST,
172     GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
175     GST_PAD_SINK,
176     GST_PAD_REQUEST,
177     GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
180     GST_PAD_SINK,
181     GST_PAD_REQUEST,
182     GST_STATIC_CAPS_ANY);
183
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188  * to forward an incoming EOS message, but we cannot rely on the state of the
189  * splitmux anymore, so we set this qdata on the sink instead.
190  * The muxer and sink must be destroyed after both of these things have
191  * finished:
192  * 1) The EOS message has been sent when the fragment is ending
193  * 2) The muxer has been unlinked and relinked
194  * Therefore, EOS_FROM_US can have these two values:
195  * 0: EOS was not requested from us. Forward the message. The muxer and the
196  * sink will be destroyed together with the rest of the bin.
197  * 1: EOS was requested from us, but the other of the two tasks hasn't
198  * finished. Set EOS_FROM_US to 2 and do your stuff.
199  * 2: EOS was requested from us and the other of the two tasks has finished.
200  * Now we can destroy the muxer and the sink.
201  */
202
203 static void
204 _do_init (void)
205 {
206   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207   EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208   RUNNING_TIME = g_quark_from_static_string ("running-time");
209   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210       "Split File Muxing Sink");
211 }
212
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
215     _do_init ());
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217     GST_TYPE_SPLITMUX_SINK);
218
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222     const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224     GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
227
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
231
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233     element, GstStateChange transition);
234
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
238     MqStreamCtx * ctx);
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
241
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244     const gchar * factory, const gchar * name, gboolean locked);
245
246 static void do_async_done (GstSplitMuxSink * splitmux);
247
248 static GstClockTime calculate_next_max_timecode (GstSplitMuxSink * splitmux,
249     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
250     GstVideoTimeCode ** next_tc);
251
252 static MqStreamBuf *
253 mq_stream_buf_new (void)
254 {
255   return g_slice_new0 (MqStreamBuf);
256 }
257
258 static void
259 mq_stream_buf_free (MqStreamBuf * data)
260 {
261   g_slice_free (MqStreamBuf, data);
262 }
263
264 static SplitMuxOutputCommand *
265 out_cmd_buf_new (void)
266 {
267   return g_slice_new0 (SplitMuxOutputCommand);
268 }
269
270 static void
271 out_cmd_buf_free (SplitMuxOutputCommand * data)
272 {
273   g_slice_free (SplitMuxOutputCommand, data);
274 }
275
276 static void
277 input_gop_free (InputGop * gop)
278 {
279   g_clear_pointer (&gop->start_tc, gst_video_time_code_free);
280   g_slice_free (InputGop, gop);
281 }
282
283 static void
284 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
285 {
286   GObjectClass *gobject_class = (GObjectClass *) klass;
287   GstElementClass *gstelement_class = (GstElementClass *) klass;
288   GstBinClass *gstbin_class = (GstBinClass *) klass;
289
290   gobject_class->set_property = gst_splitmux_sink_set_property;
291   gobject_class->get_property = gst_splitmux_sink_get_property;
292   gobject_class->dispose = gst_splitmux_sink_dispose;
293   gobject_class->finalize = gst_splitmux_sink_finalize;
294
295   gst_element_class_set_static_metadata (gstelement_class,
296       "Split Muxing Bin", "Generic/Bin/Muxer",
297       "Convenience bin that muxes incoming streams into multiple time/size limited files",
298       "Jan Schmidt <jan@centricular.com>");
299
300   gst_element_class_add_static_pad_template (gstelement_class,
301       &video_sink_template);
302   gst_element_class_add_static_pad_template (gstelement_class,
303       &video_aux_sink_template);
304   gst_element_class_add_static_pad_template (gstelement_class,
305       &audio_sink_template);
306   gst_element_class_add_static_pad_template (gstelement_class,
307       &subtitle_sink_template);
308   gst_element_class_add_static_pad_template (gstelement_class,
309       &caption_sink_template);
310
311   gstelement_class->change_state =
312       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
313   gstelement_class->request_new_pad =
314       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
315   gstelement_class->release_pad =
316       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
317
318   gstbin_class->handle_message = bus_handler;
319
320   g_object_class_install_property (gobject_class, PROP_LOCATION,
321       g_param_spec_string ("location", "File Output Pattern",
322           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
323           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
325       g_param_spec_double ("mux-overhead", "Muxing Overhead",
326           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
327           DEFAULT_MUXER_OVERHEAD,
328           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
329
330   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
331       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
332           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
333           DEFAULT_MAX_SIZE_TIME,
334           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
335           G_PARAM_STATIC_STRINGS));
336   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
337       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
338           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
339           DEFAULT_MAX_SIZE_BYTES,
340           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
341           G_PARAM_STATIC_STRINGS));
342   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
343       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
344           "Maximum difference in timecode between first and last frame. "
345           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
346           "Will only be effective if a timecode track is present.", NULL,
347           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
348           G_PARAM_STATIC_STRINGS));
349   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
350       g_param_spec_boolean ("send-keyframe-requests",
351           "Request keyframes at max-size-time",
352           "Request a keyframe every max-size-time ns to try splitting at that point. "
353           "Needs max-size-bytes to be 0 in order to be effective.",
354           DEFAULT_SEND_KEYFRAME_REQUESTS,
355           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
356           G_PARAM_STATIC_STRINGS));
357   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
358       g_param_spec_uint ("max-files", "Max files",
359           "Maximum number of files to keep on disk. Once the maximum is reached,"
360           "old files start to be deleted to make room for new ones.", 0,
361           G_MAXUINT, DEFAULT_MAX_FILES,
362           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
364       g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
365           "Allow non-reference streams to be that many ns before the reference"
366           " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
367           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
368           G_PARAM_STATIC_STRINGS));
369
370   g_object_class_install_property (gobject_class, PROP_MUXER,
371       g_param_spec_object ("muxer", "Muxer",
372           "The muxer element to use (NULL = default mp4mux). "
373           "Valid only for async-finalize = FALSE",
374           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375   g_object_class_install_property (gobject_class, PROP_SINK,
376       g_param_spec_object ("sink", "Sink",
377           "The sink element (or element chain) to use (NULL = default filesink). "
378           "Valid only for async-finalize = FALSE",
379           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380
381   g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
382       g_param_spec_boolean ("use-robust-muxing",
383           "Support robust-muxing mode of some muxers",
384           "Check if muxers support robust muxing via the reserved-max-duration and "
385           "reserved-duration-remaining properties and use them if so. "
386           "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
387           " create new fragments if the reserved header space is about to overflow. "
388           "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
389           "manually by the app to a non-zero value for robust muxing to have an effect.",
390           DEFAULT_USE_ROBUST_MUXING,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392
393   g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
394       g_param_spec_boolean ("reset-muxer",
395           "Reset Muxer",
396           "Reset the muxer after each segment. Disabling this will not work for most muxers.",
397           DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
398
399   g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
400       g_param_spec_boolean ("async-finalize",
401           "Finalize fragments asynchronously",
402           "Finalize each fragment asynchronously and start a new one",
403           DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404   g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
405       g_param_spec_string ("muxer-factory", "Muxer factory",
406           "The muxer element factory to use (default = mp4mux). "
407           "Valid only for async-finalize = TRUE",
408           "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
409   /**
410    * GstSplitMuxSink:muxer-preset
411    *
412    * An optional #GstPreset name to use for the muxer. This only has an effect
413    * in `async-finalize=TRUE` mode.
414    *
415    * Since: 1.18
416    */
417   g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
418       g_param_spec_string ("muxer-preset", "Muxer preset",
419           "The muxer preset to use. "
420           "Valid only for async-finalize = TRUE",
421           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422   g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
423       g_param_spec_boxed ("muxer-properties", "Muxer properties",
424           "The muxer element properties to use. "
425           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
426           "Valid only for async-finalize = TRUE",
427           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428   g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
429       g_param_spec_string ("sink-factory", "Sink factory",
430           "The sink element factory to use (default = filesink). "
431           "Valid only for async-finalize = TRUE",
432           "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433   /**
434    * GstSplitMuxSink:sink-preset
435    *
436    * An optional #GstPreset name to use for the sink. This only has an effect
437    * in `async-finalize=TRUE` mode.
438    *
439    * Since: 1.18
440    */
441   g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
442       g_param_spec_string ("sink-preset", "Sink preset",
443           "The sink preset to use. "
444           "Valid only for async-finalize = TRUE",
445           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446   g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
447       g_param_spec_boxed ("sink-properties", "Sink properties",
448           "The sink element properties to use. "
449           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
450           "Valid only for async-finalize = TRUE",
451           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452   g_object_class_install_property (gobject_class, PROP_START_INDEX,
453       g_param_spec_int ("start-index", "Start Index",
454           "Start value of fragment index.",
455           0, G_MAXINT, DEFAULT_START_INDEX,
456           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
457
458   /**
459    * GstSplitMuxSink::muxer-pad-map
460    *
461    * An optional GstStructure that provides a map from splitmuxsink sinkpad
462    * names to muxer pad names they should feed. Splitmuxsink has some default
463    * mapping behaviour to link video to video pads and audio to audio pads
464    * that usually works fine. This property is useful if you need to ensure
465    * a particular mapping to muxed streams.
466    *
467    * The GstStructure contains string fields like so:
468    *   splitmuxsink muxer-pad-map=x-pad-map,video=video_1
469    *
470    * Since: 1.18
471    */
472   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
473       g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
474           "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
475           GST_TYPE_STRUCTURE,
476           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
477
478   /**
479    * GstSplitMuxSink::format-location:
480    * @splitmux: the #GstSplitMuxSink
481    * @fragment_id: the sequence number of the file to be created
482    *
483    * Returns: the location to be used for the next output file. This must be
484    *    a newly-allocated string which will be freed with g_free() by the
485    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
486    *    g_strdup_printf() or similar functions to allocate it.
487    */
488   signals[SIGNAL_FORMAT_LOCATION] =
489       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
490       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
491
492   /**
493    * GstSplitMuxSink::format-location-full:
494    * @splitmux: the #GstSplitMuxSink
495    * @fragment_id: the sequence number of the file to be created
496    * @first_sample: A #GstSample containing the first buffer
497    *   from the reference stream in the new file
498    *
499    * Returns: the location to be used for the next output file. This must be
500    *    a newly-allocated string which will be freed with g_free() by the
501    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
502    *    g_strdup_printf() or similar functions to allocate it.
503    *
504    * Since: 1.12
505    */
506   signals[SIGNAL_FORMAT_LOCATION_FULL] =
507       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
508       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
509       GST_TYPE_SAMPLE);
510
511   /**
512    * GstSplitMuxSink::split-now:
513    * @splitmux: the #GstSplitMuxSink
514    *
515    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
516    * The current GOP will be output to the new file.
517    *
518    * Since: 1.14
519    */
520   signals[SIGNAL_SPLIT_NOW] =
521       g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
522       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
523       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
524       G_TYPE_NONE, 0);
525
526   /**
527    * GstSplitMuxSink::split-after:
528    * @splitmux: the #GstSplitMuxSink
529    *
530    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
531    * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
532    *
533    * Since: 1.16
534    */
535   signals[SIGNAL_SPLIT_AFTER] =
536       g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
537       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
538       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
539       G_TYPE_NONE, 0);
540
541   /**
542    * GstSplitMuxSink::split-at-running-time:
543    * @splitmux: the #GstSplitMuxSink
544    *
545    * When called by the user, this action signal splits the video file (and
546    * begins a new one) as soon as the given running time is reached. If this
547    * action signal is called multiple times, running times are queued up and
548    * processed in the order they were given.
549    *
550    * Note that this is prone to race conditions, where said running time is
551    * reached and surpassed before we had a chance to split. The file will
552    * still split immediately, but in order to make sure that the split doesn't
553    * happen too late, it is recommended to call this action signal from
554    * something that will prevent further buffers from flowing into
555    * splitmuxsink before the split is completed, such as a pad probe before
556    * splitmuxsink.
557    *
558    *
559    * Since: 1.16
560    */
561   signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
562       g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
563       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
564       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
565       NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
566
567   /**
568    * GstSplitMuxSink::muxer-added:
569    * @splitmux: the #GstSplitMuxSink
570    * @muxer: the newly added muxer element
571    *
572    * Since: 1.14
573    */
574   signals[SIGNAL_MUXER_ADDED] =
575       g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
576       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
577
578   /**
579    * GstSplitMuxSink::sink-added:
580    * @splitmux: the #GstSplitMuxSink
581    * @sink: the newly added sink element
582    *
583    * Since: 1.14
584    */
585   signals[SIGNAL_SINK_ADDED] =
586       g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
587       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
588
589   klass->split_now = split_now;
590   klass->split_after = split_after;
591   klass->split_at_running_time = split_at_running_time;
592 }
593
594 static void
595 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
596 {
597   g_mutex_init (&splitmux->lock);
598   g_mutex_init (&splitmux->state_lock);
599   g_cond_init (&splitmux->input_cond);
600   g_cond_init (&splitmux->output_cond);
601   g_queue_init (&splitmux->out_cmd_q);
602
603   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
604   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
605   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
606   splitmux->max_files = DEFAULT_MAX_FILES;
607   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
608   splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
609   splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
610   splitmux->reset_muxer = DEFAULT_RESET_MUXER;
611
612   splitmux->threshold_timecode_str = NULL;
613
614   splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
615   splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
616   splitmux->muxer_properties = NULL;
617   splitmux->sink_factory = g_strdup (DEFAULT_SINK);
618   splitmux->sink_properties = NULL;
619
620   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
621   splitmux->split_requested = FALSE;
622   splitmux->do_split_next_gop = FALSE;
623   splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
624   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
625
626   g_queue_init (&splitmux->pending_input_gops);
627 }
628
629 static void
630 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
631 {
632   if (splitmux->muxer) {
633     gst_element_set_locked_state (splitmux->muxer, TRUE);
634     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
635     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
636   }
637   if (splitmux->active_sink) {
638     gst_element_set_locked_state (splitmux->active_sink, TRUE);
639     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
640     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
641   }
642
643   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
644 }
645
646 static void
647 gst_splitmux_sink_dispose (GObject * object)
648 {
649   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
650
651   /* Calling parent dispose invalidates all child pointers */
652   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
653
654   G_OBJECT_CLASS (parent_class)->dispose (object);
655 }
656
657 static void
658 gst_splitmux_sink_finalize (GObject * object)
659 {
660   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
661
662   g_cond_clear (&splitmux->input_cond);
663   g_cond_clear (&splitmux->output_cond);
664   g_mutex_clear (&splitmux->lock);
665   g_mutex_clear (&splitmux->state_lock);
666   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
667   g_queue_clear (&splitmux->out_cmd_q);
668   g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
669   g_queue_clear (&splitmux->pending_input_gops);
670
671   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
672
673   if (splitmux->muxerpad_map)
674     gst_structure_free (splitmux->muxerpad_map);
675
676   if (splitmux->provided_sink)
677     gst_object_unref (splitmux->provided_sink);
678   if (splitmux->provided_muxer)
679     gst_object_unref (splitmux->provided_muxer);
680
681   if (splitmux->muxer_factory)
682     g_free (splitmux->muxer_factory);
683   if (splitmux->muxer_preset)
684     g_free (splitmux->muxer_preset);
685   if (splitmux->muxer_properties)
686     gst_structure_free (splitmux->muxer_properties);
687   if (splitmux->sink_factory)
688     g_free (splitmux->sink_factory);
689   if (splitmux->sink_preset)
690     g_free (splitmux->sink_preset);
691   if (splitmux->sink_properties)
692     gst_structure_free (splitmux->sink_properties);
693
694   if (splitmux->threshold_timecode_str)
695     g_free (splitmux->threshold_timecode_str);
696   if (splitmux->tc_interval)
697     gst_video_time_code_interval_free (splitmux->tc_interval);
698
699   if (splitmux->times_to_split)
700     gst_queue_array_free (splitmux->times_to_split);
701
702   g_free (splitmux->location);
703
704   /* Make sure to free any un-released contexts. There should not be any,
705    * because the dispose will have freed all request pads though */
706   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
707   g_list_free (splitmux->contexts);
708
709   G_OBJECT_CLASS (parent_class)->finalize (object);
710 }
711
712 /*
713  * Set any time threshold to the muxer, if it has
714  * reserved-max-duration and reserved-duration-remaining
715  * properties. Called when creating/claiming the muxer
716  * in create_elements() */
717 static void
718 update_muxer_properties (GstSplitMuxSink * sink)
719 {
720   GObjectClass *klass;
721   GstClockTime threshold_time;
722
723   sink->muxer_has_reserved_props = FALSE;
724   if (sink->muxer == NULL)
725     return;
726   klass = G_OBJECT_GET_CLASS (sink->muxer);
727   if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
728     return;
729   if (g_object_class_find_property (klass,
730           "reserved-duration-remaining") == NULL)
731     return;
732   sink->muxer_has_reserved_props = TRUE;
733
734   GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
735       GST_TIME_ARGS (sink->threshold_time));
736   GST_OBJECT_LOCK (sink);
737   threshold_time = sink->threshold_time;
738   GST_OBJECT_UNLOCK (sink);
739
740   if (threshold_time > 0) {
741     /* Tell the muxer how much space to reserve */
742     GstClockTime muxer_threshold = threshold_time;
743     g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
744   }
745 }
746
747 static void
748 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
749     const GValue * value, GParamSpec * pspec)
750 {
751   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
752
753   switch (prop_id) {
754     case PROP_LOCATION:{
755       GST_OBJECT_LOCK (splitmux);
756       g_free (splitmux->location);
757       splitmux->location = g_value_dup_string (value);
758       GST_OBJECT_UNLOCK (splitmux);
759       break;
760     }
761     case PROP_START_INDEX:
762       GST_OBJECT_LOCK (splitmux);
763       splitmux->start_index = g_value_get_int (value);
764       GST_OBJECT_UNLOCK (splitmux);
765       break;
766     case PROP_MAX_SIZE_BYTES:
767       GST_OBJECT_LOCK (splitmux);
768       splitmux->threshold_bytes = g_value_get_uint64 (value);
769       GST_OBJECT_UNLOCK (splitmux);
770       break;
771     case PROP_MAX_SIZE_TIME:
772       GST_OBJECT_LOCK (splitmux);
773       splitmux->threshold_time = g_value_get_uint64 (value);
774       GST_OBJECT_UNLOCK (splitmux);
775       break;
776     case PROP_MAX_SIZE_TIMECODE:
777       GST_OBJECT_LOCK (splitmux);
778       g_free (splitmux->threshold_timecode_str);
779       /* will be calculated later */
780       g_clear_pointer (&splitmux->tc_interval,
781           gst_video_time_code_interval_free);
782
783       splitmux->threshold_timecode_str = g_value_dup_string (value);
784       if (splitmux->threshold_timecode_str) {
785         splitmux->tc_interval =
786             gst_video_time_code_interval_new_from_string
787             (splitmux->threshold_timecode_str);
788         if (!splitmux->tc_interval) {
789           g_warning ("Wrong timecode string %s",
790               splitmux->threshold_timecode_str);
791           g_free (splitmux->threshold_timecode_str);
792           splitmux->threshold_timecode_str = NULL;
793         }
794       }
795       splitmux->next_fragment_start_tc_time = GST_CLOCK_TIME_NONE;
796       if (splitmux->tc_interval && splitmux->fragment_start_tc) {
797         splitmux->next_fragment_start_tc_time =
798             calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
799             splitmux->fragment_start_time, NULL);
800       }
801       GST_OBJECT_UNLOCK (splitmux);
802       break;
803     case PROP_SEND_KEYFRAME_REQUESTS:
804       GST_OBJECT_LOCK (splitmux);
805       splitmux->send_keyframe_requests = g_value_get_boolean (value);
806       GST_OBJECT_UNLOCK (splitmux);
807       break;
808     case PROP_MAX_FILES:
809       GST_OBJECT_LOCK (splitmux);
810       splitmux->max_files = g_value_get_uint (value);
811       GST_OBJECT_UNLOCK (splitmux);
812       break;
813     case PROP_MUXER_OVERHEAD:
814       GST_OBJECT_LOCK (splitmux);
815       splitmux->mux_overhead = g_value_get_double (value);
816       GST_OBJECT_UNLOCK (splitmux);
817       break;
818     case PROP_USE_ROBUST_MUXING:
819       GST_OBJECT_LOCK (splitmux);
820       splitmux->use_robust_muxing = g_value_get_boolean (value);
821       GST_OBJECT_UNLOCK (splitmux);
822       if (splitmux->use_robust_muxing)
823         update_muxer_properties (splitmux);
824       break;
825     case PROP_ALIGNMENT_THRESHOLD:
826       GST_OBJECT_LOCK (splitmux);
827       splitmux->alignment_threshold = g_value_get_uint64 (value);
828       GST_OBJECT_UNLOCK (splitmux);
829       break;
830     case PROP_SINK:
831       GST_OBJECT_LOCK (splitmux);
832       gst_clear_object (&splitmux->provided_sink);
833       splitmux->provided_sink = g_value_get_object (value);
834       if (splitmux->provided_sink)
835         gst_object_ref_sink (splitmux->provided_sink);
836       GST_OBJECT_UNLOCK (splitmux);
837       break;
838     case PROP_MUXER:
839       GST_OBJECT_LOCK (splitmux);
840       gst_clear_object (&splitmux->provided_muxer);
841       splitmux->provided_muxer = g_value_get_object (value);
842       if (splitmux->provided_muxer)
843         gst_object_ref_sink (splitmux->provided_muxer);
844       GST_OBJECT_UNLOCK (splitmux);
845       break;
846     case PROP_RESET_MUXER:
847       GST_OBJECT_LOCK (splitmux);
848       splitmux->reset_muxer = g_value_get_boolean (value);
849       GST_OBJECT_UNLOCK (splitmux);
850       break;
851     case PROP_ASYNC_FINALIZE:
852       GST_OBJECT_LOCK (splitmux);
853       splitmux->async_finalize = g_value_get_boolean (value);
854       GST_OBJECT_UNLOCK (splitmux);
855       break;
856     case PROP_MUXER_FACTORY:
857       GST_OBJECT_LOCK (splitmux);
858       if (splitmux->muxer_factory)
859         g_free (splitmux->muxer_factory);
860       splitmux->muxer_factory = g_value_dup_string (value);
861       GST_OBJECT_UNLOCK (splitmux);
862       break;
863     case PROP_MUXER_PRESET:
864       GST_OBJECT_LOCK (splitmux);
865       if (splitmux->muxer_preset)
866         g_free (splitmux->muxer_preset);
867       splitmux->muxer_preset = g_value_dup_string (value);
868       GST_OBJECT_UNLOCK (splitmux);
869       break;
870     case PROP_MUXER_PROPERTIES:
871       GST_OBJECT_LOCK (splitmux);
872       if (splitmux->muxer_properties)
873         gst_structure_free (splitmux->muxer_properties);
874       if (gst_value_get_structure (value))
875         splitmux->muxer_properties =
876             gst_structure_copy (gst_value_get_structure (value));
877       else
878         splitmux->muxer_properties = NULL;
879       GST_OBJECT_UNLOCK (splitmux);
880       break;
881     case PROP_SINK_FACTORY:
882       GST_OBJECT_LOCK (splitmux);
883       if (splitmux->sink_factory)
884         g_free (splitmux->sink_factory);
885       splitmux->sink_factory = g_value_dup_string (value);
886       GST_OBJECT_UNLOCK (splitmux);
887       break;
888     case PROP_SINK_PRESET:
889       GST_OBJECT_LOCK (splitmux);
890       if (splitmux->sink_preset)
891         g_free (splitmux->sink_preset);
892       splitmux->sink_preset = g_value_dup_string (value);
893       GST_OBJECT_UNLOCK (splitmux);
894       break;
895     case PROP_SINK_PROPERTIES:
896       GST_OBJECT_LOCK (splitmux);
897       if (splitmux->sink_properties)
898         gst_structure_free (splitmux->sink_properties);
899       if (gst_value_get_structure (value))
900         splitmux->sink_properties =
901             gst_structure_copy (gst_value_get_structure (value));
902       else
903         splitmux->sink_properties = NULL;
904       GST_OBJECT_UNLOCK (splitmux);
905       break;
906     case PROP_MUXERPAD_MAP:
907     {
908       const GstStructure *s = gst_value_get_structure (value);
909       GST_SPLITMUX_LOCK (splitmux);
910       if (splitmux->muxerpad_map) {
911         gst_structure_free (splitmux->muxerpad_map);
912       }
913       if (s)
914         splitmux->muxerpad_map = gst_structure_copy (s);
915       else
916         splitmux->muxerpad_map = NULL;
917       GST_SPLITMUX_UNLOCK (splitmux);
918       break;
919     }
920     default:
921       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
922       break;
923   }
924 }
925
926 static void
927 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
928     GValue * value, GParamSpec * pspec)
929 {
930   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
931
932   switch (prop_id) {
933     case PROP_LOCATION:
934       GST_OBJECT_LOCK (splitmux);
935       g_value_set_string (value, splitmux->location);
936       GST_OBJECT_UNLOCK (splitmux);
937       break;
938     case PROP_START_INDEX:
939       GST_OBJECT_LOCK (splitmux);
940       g_value_set_int (value, splitmux->start_index);
941       GST_OBJECT_UNLOCK (splitmux);
942       break;
943     case PROP_MAX_SIZE_BYTES:
944       GST_OBJECT_LOCK (splitmux);
945       g_value_set_uint64 (value, splitmux->threshold_bytes);
946       GST_OBJECT_UNLOCK (splitmux);
947       break;
948     case PROP_MAX_SIZE_TIME:
949       GST_OBJECT_LOCK (splitmux);
950       g_value_set_uint64 (value, splitmux->threshold_time);
951       GST_OBJECT_UNLOCK (splitmux);
952       break;
953     case PROP_MAX_SIZE_TIMECODE:
954       GST_OBJECT_LOCK (splitmux);
955       g_value_set_string (value, splitmux->threshold_timecode_str);
956       GST_OBJECT_UNLOCK (splitmux);
957       break;
958     case PROP_SEND_KEYFRAME_REQUESTS:
959       GST_OBJECT_LOCK (splitmux);
960       g_value_set_boolean (value, splitmux->send_keyframe_requests);
961       GST_OBJECT_UNLOCK (splitmux);
962       break;
963     case PROP_MAX_FILES:
964       GST_OBJECT_LOCK (splitmux);
965       g_value_set_uint (value, splitmux->max_files);
966       GST_OBJECT_UNLOCK (splitmux);
967       break;
968     case PROP_MUXER_OVERHEAD:
969       GST_OBJECT_LOCK (splitmux);
970       g_value_set_double (value, splitmux->mux_overhead);
971       GST_OBJECT_UNLOCK (splitmux);
972       break;
973     case PROP_USE_ROBUST_MUXING:
974       GST_OBJECT_LOCK (splitmux);
975       g_value_set_boolean (value, splitmux->use_robust_muxing);
976       GST_OBJECT_UNLOCK (splitmux);
977       break;
978     case PROP_ALIGNMENT_THRESHOLD:
979       GST_OBJECT_LOCK (splitmux);
980       g_value_set_uint64 (value, splitmux->alignment_threshold);
981       GST_OBJECT_UNLOCK (splitmux);
982       break;
983     case PROP_SINK:
984       GST_OBJECT_LOCK (splitmux);
985       g_value_set_object (value, splitmux->provided_sink);
986       GST_OBJECT_UNLOCK (splitmux);
987       break;
988     case PROP_MUXER:
989       GST_OBJECT_LOCK (splitmux);
990       g_value_set_object (value, splitmux->provided_muxer);
991       GST_OBJECT_UNLOCK (splitmux);
992       break;
993     case PROP_RESET_MUXER:
994       GST_OBJECT_LOCK (splitmux);
995       g_value_set_boolean (value, splitmux->reset_muxer);
996       GST_OBJECT_UNLOCK (splitmux);
997       break;
998     case PROP_ASYNC_FINALIZE:
999       GST_OBJECT_LOCK (splitmux);
1000       g_value_set_boolean (value, splitmux->async_finalize);
1001       GST_OBJECT_UNLOCK (splitmux);
1002       break;
1003     case PROP_MUXER_FACTORY:
1004       GST_OBJECT_LOCK (splitmux);
1005       g_value_set_string (value, splitmux->muxer_factory);
1006       GST_OBJECT_UNLOCK (splitmux);
1007       break;
1008     case PROP_MUXER_PRESET:
1009       GST_OBJECT_LOCK (splitmux);
1010       g_value_set_string (value, splitmux->muxer_preset);
1011       GST_OBJECT_UNLOCK (splitmux);
1012       break;
1013     case PROP_MUXER_PROPERTIES:
1014       GST_OBJECT_LOCK (splitmux);
1015       gst_value_set_structure (value, splitmux->muxer_properties);
1016       GST_OBJECT_UNLOCK (splitmux);
1017       break;
1018     case PROP_SINK_FACTORY:
1019       GST_OBJECT_LOCK (splitmux);
1020       g_value_set_string (value, splitmux->sink_factory);
1021       GST_OBJECT_UNLOCK (splitmux);
1022       break;
1023     case PROP_SINK_PRESET:
1024       GST_OBJECT_LOCK (splitmux);
1025       g_value_set_string (value, splitmux->sink_preset);
1026       GST_OBJECT_UNLOCK (splitmux);
1027       break;
1028     case PROP_SINK_PROPERTIES:
1029       GST_OBJECT_LOCK (splitmux);
1030       gst_value_set_structure (value, splitmux->sink_properties);
1031       GST_OBJECT_UNLOCK (splitmux);
1032       break;
1033     case PROP_MUXERPAD_MAP:
1034       GST_SPLITMUX_LOCK (splitmux);
1035       gst_value_set_structure (value, splitmux->muxerpad_map);
1036       GST_SPLITMUX_UNLOCK (splitmux);
1037       break;
1038     default:
1039       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1040       break;
1041   }
1042 }
1043
1044 /* Convenience function */
1045 static inline GstClockTimeDiff
1046 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1047 {
1048   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1049
1050   if (GST_CLOCK_TIME_IS_VALID (val)) {
1051     gboolean sign =
1052         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1053     if (sign > 0)
1054       res = val;
1055     else if (sign < 0)
1056       res = -val;
1057   }
1058   return res;
1059 }
1060
1061 static void
1062 mq_stream_ctx_reset (MqStreamCtx * ctx)
1063 {
1064   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1065   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1066   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1067   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1068   g_queue_clear (&ctx->queued_bufs);
1069 }
1070
1071 static MqStreamCtx *
1072 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1073 {
1074   MqStreamCtx *ctx;
1075
1076   ctx = g_new0 (MqStreamCtx, 1);
1077   ctx->splitmux = splitmux;
1078   g_queue_init (&ctx->queued_bufs);
1079   mq_stream_ctx_reset (ctx);
1080
1081   return ctx;
1082 }
1083
1084 static void
1085 mq_stream_ctx_free (MqStreamCtx * ctx)
1086 {
1087   if (ctx->q) {
1088     GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1089
1090     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1091
1092     if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1093       gst_element_set_locked_state (ctx->q, TRUE);
1094       gst_element_set_state (ctx->q, GST_STATE_NULL);
1095       gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1096       gst_object_unref (parent);
1097     }
1098     gst_object_unref (ctx->q);
1099   }
1100   gst_object_unref (ctx->sinkpad);
1101   gst_object_unref (ctx->srcpad);
1102   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1103   g_queue_clear (&ctx->queued_bufs);
1104   g_free (ctx);
1105 }
1106
1107 static void
1108 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1109     GstElement * sink)
1110 {
1111   gchar *location = NULL;
1112   GstMessage *msg;
1113   const gchar *msg_name = opened ?
1114       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1115   GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1116
1117   if (!opened) {
1118     GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1119     if (rtime)
1120       running_time = *rtime;
1121   }
1122
1123   if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1124           "location") != NULL)
1125     g_object_get (sink, "location", &location, NULL);
1126
1127   GST_DEBUG_OBJECT (splitmux,
1128       "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1129       msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1130
1131   /* If it's in the middle of a teardown, the reference_ctc might have become
1132    * NULL */
1133   if (splitmux->reference_ctx) {
1134     msg = gst_message_new_element (GST_OBJECT (splitmux),
1135         gst_structure_new (msg_name,
1136             "location", G_TYPE_STRING, location,
1137             "running-time", GST_TYPE_CLOCK_TIME, running_time,
1138             "sink", GST_TYPE_ELEMENT, sink, NULL));
1139     gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1140   }
1141
1142   g_free (location);
1143 }
1144
1145 static void
1146 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1147 {
1148   GstEvent *eos;
1149   GstPad *pad;
1150   MqStreamCtx *ctx;
1151
1152   eos = gst_event_new_eos ();
1153   pad = helper->pad;
1154   ctx = helper->ctx;
1155
1156   GST_SPLITMUX_LOCK (splitmux);
1157   if (!pad)
1158     pad = gst_pad_get_peer (ctx->srcpad);
1159   GST_SPLITMUX_UNLOCK (splitmux);
1160
1161   gst_pad_send_event (pad, eos);
1162   GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1163
1164   gst_object_unref (pad);
1165   g_free (helper);
1166 }
1167
1168 /* Called with lock held, drops the lock to send EOS to the
1169  * pad
1170  */
1171 static void
1172 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1173 {
1174   GstEvent *eos;
1175   GstPad *pad;
1176
1177   eos = gst_event_new_eos ();
1178   pad = gst_pad_get_peer (ctx->srcpad);
1179
1180   ctx->out_eos = TRUE;
1181
1182   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1183   GST_SPLITMUX_UNLOCK (splitmux);
1184   gst_pad_send_event (pad, eos);
1185   GST_SPLITMUX_LOCK (splitmux);
1186
1187   gst_object_unref (pad);
1188 }
1189
1190 /* Called with lock held. Schedules an EOS event to the ctx pad
1191  * to happen in another thread */
1192 static void
1193 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1194 {
1195   AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1196   GstPad *srcpad, *sinkpad;
1197
1198   srcpad = ctx->srcpad;
1199   sinkpad = gst_pad_get_peer (srcpad);
1200
1201   helper->ctx = ctx;
1202   helper->pad = sinkpad;        /* Takes the reference */
1203
1204   ctx->out_eos_async_done = TRUE;
1205
1206   /* There used to be a bug here, where we had to explicitly remove
1207    * the SINK flag so that GstBin would ignore it for EOS purposes.
1208    * That fixed a race where if splitmuxsink really reaches EOS
1209    * before an asynchronous background element has finished, then
1210    * the bin wouldn't actually send EOS to the pipeline. Even after
1211    * finishing and removing the old element, the bin didn't re-check
1212    * EOS status on removing a SINK element. That bug was fixed
1213    * in core. */
1214   GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1215       sinkpad, ctx);
1216
1217   g_assert_nonnull (helper->pad);
1218   gst_element_call_async (GST_ELEMENT (splitmux),
1219       (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1220 }
1221
1222 /* Called with lock held. TRUE iff all contexts have a
1223  * pending (or delivered) async eos event */
1224 static gboolean
1225 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1226 {
1227   gboolean ret = TRUE;
1228   GList *item;
1229
1230   for (item = splitmux->contexts; item; item = item->next) {
1231     MqStreamCtx *ctx = item->data;
1232     ret &= ctx->out_eos_async_done;
1233   }
1234   return ret;
1235 }
1236
1237 /* Called with splitmux lock held to check if this output
1238  * context needs to sleep to wait for the release of the
1239  * next GOP, or to send EOS to close out the current file
1240  */
1241 static GstFlowReturn
1242 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1243 {
1244   if (ctx->caps_change)
1245     return GST_FLOW_OK;
1246
1247   do {
1248     /* When first starting up, the reference stream has to output
1249      * the first buffer to prepare the muxer and sink */
1250     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1251     GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1252
1253     if (my_max_out_running_time != GST_CLOCK_STIME_NONE
1254         && my_max_out_running_time != G_MAXINT64) {
1255       my_max_out_running_time -= splitmux->alignment_threshold;
1256       GST_LOG_OBJECT (ctx->srcpad,
1257           "Max out running time currently %" GST_STIME_FORMAT
1258           ", with threshold applied it is %" GST_STIME_FORMAT,
1259           GST_STIME_ARGS (splitmux->max_out_running_time),
1260           GST_STIME_ARGS (my_max_out_running_time));
1261     }
1262
1263     if (ctx->flushing
1264         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1265       return GST_FLOW_FLUSHING;
1266
1267     GST_LOG_OBJECT (ctx->srcpad,
1268         "Checking running time %" GST_STIME_FORMAT " against max %"
1269         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1270         GST_STIME_ARGS (my_max_out_running_time));
1271
1272     if (can_output) {
1273       if (splitmux->max_out_running_time != GST_CLOCK_STIME_NONE &&
1274           ctx->out_running_time < my_max_out_running_time) {
1275         return GST_FLOW_OK;
1276       }
1277
1278       switch (splitmux->output_state) {
1279         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1280           /* We only get here if we've finished outputting a GOP and need to know
1281            * what to do next */
1282           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1283           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1284           continue;
1285
1286         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1287         case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1288           /* We've reached the max out running_time to get here, so end this file now */
1289           if (ctx->out_eos == FALSE) {
1290             if (splitmux->async_finalize) {
1291               /* We must set EOS asynchronously at this point. We cannot defer
1292                * it, because we need all contexts to wake up, for the
1293                * reference context to eventually give us something at
1294                * START_NEXT_FILE. Otherwise, collectpads might choose another
1295                * context to give us the first buffer, and format-location-full
1296                * will not contain a valid sample. */
1297               g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1298                   GINT_TO_POINTER (1));
1299               eos_context_async (ctx, splitmux);
1300               if (all_contexts_are_async_eos (splitmux)) {
1301                 GST_INFO_OBJECT (splitmux,
1302                     "All contexts are async_eos. Moving to the next file.");
1303                 /* We can start the next file once we've asked each pad to go EOS */
1304                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1305                 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1306                 continue;
1307               }
1308             } else {
1309               send_eos (splitmux, ctx);
1310               continue;
1311             }
1312           } else {
1313             GST_INFO_OBJECT (splitmux,
1314                 "At end-of-file state, but context %p is already EOS", ctx);
1315           }
1316           break;
1317         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1318           if (ctx->is_reference) {
1319             GstFlowReturn ret = GST_FLOW_OK;
1320
1321             /* Special handling on the reference ctx to start new fragments
1322              * and collect commands from the command queue */
1323             /* drops the splitmux lock briefly: */
1324             /* We must have reference ctx in order for format-location-full to
1325              * have a sample */
1326             ret = start_next_fragment (splitmux, ctx);
1327             if (ret != GST_FLOW_OK)
1328               return ret;
1329
1330             continue;
1331           }
1332           break;
1333         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1334           do {
1335             SplitMuxOutputCommand *cmd =
1336                 g_queue_pop_tail (&splitmux->out_cmd_q);
1337             if (cmd != NULL) {
1338               /* If we pop the last command, we need to make our queues bigger */
1339               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1340                 grow_blocked_queues (splitmux);
1341
1342               if (cmd->start_new_fragment) {
1343                 if (splitmux->muxed_out_bytes > 0) {
1344                   GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1345                   splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1346                 } else {
1347                   GST_DEBUG_OBJECT (splitmux,
1348                       "Got cmd to start new fragment, but fragment is empty - ignoring.");
1349                 }
1350               } else {
1351                 GST_DEBUG_OBJECT (splitmux,
1352                     "Got new output cmd for time %" GST_STIME_FORMAT,
1353                     GST_STIME_ARGS (cmd->max_output_ts));
1354
1355                 /* Extend the output range immediately */
1356                 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE
1357                     || cmd->max_output_ts > splitmux->max_out_running_time)
1358                   splitmux->max_out_running_time = cmd->max_output_ts;
1359                 GST_DEBUG_OBJECT (splitmux,
1360                     "Max out running time now %" GST_STIME_FORMAT,
1361                     GST_STIME_ARGS (splitmux->max_out_running_time));
1362                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1363               }
1364               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1365
1366               out_cmd_buf_free (cmd);
1367               break;
1368             } else {
1369               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1370             }
1371           } while (!ctx->flushing && splitmux->output_state ==
1372               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1373           /* loop and re-check the state */
1374           continue;
1375         }
1376         case SPLITMUX_OUTPUT_STATE_STOPPED:
1377           return GST_FLOW_FLUSHING;
1378       }
1379     } else {
1380       GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1381     }
1382
1383     GST_INFO_OBJECT (ctx->srcpad,
1384         "Sleeping for running time %"
1385         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1386         GST_STIME_ARGS (ctx->out_running_time),
1387         GST_STIME_ARGS (splitmux->max_out_running_time));
1388     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1389     GST_INFO_OBJECT (ctx->srcpad,
1390         "Woken for new max running time %" GST_STIME_FORMAT,
1391         GST_STIME_ARGS (splitmux->max_out_running_time));
1392   }
1393   while (1);
1394
1395   return GST_FLOW_OK;
1396 }
1397
1398 static GstClockTime
1399 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1400     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1401     GstVideoTimeCode ** next_tc)
1402 {
1403   GstVideoTimeCode *target_tc;
1404   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1405
1406   if (cur_tc == NULL || splitmux->tc_interval == NULL)
1407     return GST_CLOCK_TIME_NONE;
1408
1409   target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1410   if (!target_tc) {
1411     GST_ELEMENT_ERROR (splitmux,
1412         STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1413     return GST_CLOCK_TIME_NONE;
1414   }
1415
1416   /* Convert to ns */
1417   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1418   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1419
1420   /* Add running_time, accounting for wraparound. */
1421   if (target_tc_time >= cur_tc_time) {
1422     next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1423   } else {
1424     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1425
1426     if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1427         (cur_tc->config.fps_d == 1001)) {
1428       /* Checking fps_d is probably unneeded, but better safe than sorry
1429        * (e.g. someone accidentally set a flag) */
1430       GstVideoTimeCode *tc_for_offset;
1431
1432       /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1433        * but slightly less. Calculate that duration from a fake timecode. The
1434        * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1435        * is to add one frame to 23:59:59;29 */
1436       tc_for_offset =
1437           gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1438           NULL, cur_tc->config.flags, 23, 59, 59,
1439           cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1440       day_in_ns =
1441           gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1442           gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1443           cur_tc->config.fps_n);
1444       gst_video_time_code_free (tc_for_offset);
1445     }
1446     next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1447   }
1448
1449 #ifndef GST_DISABLE_GST_DEBUG
1450   {
1451     gchar *next_max_tc_str, *cur_tc_str;
1452
1453     cur_tc_str = gst_video_time_code_to_string (cur_tc);
1454     next_max_tc_str = gst_video_time_code_to_string (target_tc);
1455
1456     GST_INFO_OBJECT (splitmux, "Next max timecode %s time: %" GST_TIME_FORMAT
1457         " from ref timecode %s time: %" GST_TIME_FORMAT,
1458         next_max_tc_str,
1459         GST_TIME_ARGS (next_max_tc_time),
1460         cur_tc_str, GST_TIME_ARGS (cur_tc_time));
1461
1462     g_free (next_max_tc_str);
1463     g_free (cur_tc_str);
1464   }
1465 #endif
1466
1467   if (next_tc)
1468     *next_tc = target_tc;
1469   else
1470     gst_video_time_code_free (target_tc);
1471
1472   return next_max_tc_time;
1473 }
1474
1475 static gboolean
1476 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1477     GstClockTimeDiff running_time_dts)
1478 {
1479   GstEvent *ev;
1480   GstClockTime target_time;
1481   gboolean timecode_based = FALSE;
1482   GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1483   GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1484   GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1485   GstClockTime tc_rounding_error = 5 * GST_USECOND;
1486   InputGop *newest_gop = NULL;
1487   GList *l;
1488
1489   if (!splitmux->send_keyframe_requests)
1490     return TRUE;
1491
1492   /* Find the newest GOP where we passed in DTS the start PTS */
1493   for (l = splitmux->pending_input_gops.tail; l; l = l->prev) {
1494     InputGop *tmp = l->data;
1495
1496     GST_TRACE_OBJECT (splitmux,
1497         "Having pending input GOP with start PTS %" GST_STIME_FORMAT
1498         " and start time %" GST_STIME_FORMAT,
1499         GST_STIME_ARGS (tmp->start_time_pts), GST_STIME_ARGS (tmp->start_time));
1500
1501     if (tmp->sent_fku) {
1502       GST_DEBUG_OBJECT (splitmux,
1503           "Already checked for a keyframe request for this GOP");
1504       return TRUE;
1505     }
1506
1507     if (running_time_dts == GST_CLOCK_STIME_NONE ||
1508         tmp->start_time_pts == GST_CLOCK_STIME_NONE ||
1509         running_time_dts >= tmp->start_time_pts) {
1510       GST_DEBUG_OBJECT (splitmux,
1511           "Using GOP with start PTS %" GST_STIME_FORMAT " and start time %"
1512           GST_STIME_FORMAT, GST_STIME_ARGS (tmp->start_time_pts),
1513           GST_STIME_ARGS (tmp->start_time));
1514       newest_gop = tmp;
1515       break;
1516     }
1517   }
1518
1519   if (!newest_gop) {
1520     GST_DEBUG_OBJECT (splitmux, "Have no complete enough pending input GOP");
1521     return TRUE;
1522   }
1523
1524   if (splitmux->tc_interval) {
1525     if (newest_gop->start_tc
1526         && gst_video_time_code_is_valid (newest_gop->start_tc)) {
1527       GstVideoTimeCode *next_tc = NULL;
1528       max_tc_time =
1529           calculate_next_max_timecode (splitmux, newest_gop->start_tc,
1530           newest_gop->start_time, &next_tc);
1531
1532       /* calculate the next expected keyframe time to prevent too early fku
1533        * event */
1534       if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1535         next_max_tc_time =
1536             calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1537       }
1538       if (next_tc)
1539         gst_video_time_code_free (next_tc);
1540
1541       timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1542           GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1543     } else {
1544       /* This can happen in the presence of GAP events that trigger
1545        * a new fragment start */
1546       GST_WARNING_OBJECT (splitmux,
1547           "No buffer available to calculate next timecode");
1548     }
1549   }
1550
1551   if ((splitmux->threshold_time == 0 && !timecode_based)
1552       || splitmux->threshold_bytes != 0)
1553     return TRUE;
1554
1555   if (timecode_based) {
1556     /* We might have rounding errors: aim slightly earlier */
1557     if (max_tc_time >= tc_rounding_error) {
1558       target_time = max_tc_time - tc_rounding_error;
1559     } else {
1560       /* unreliable target time */
1561       GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1562           " is smaller than allowed rounding error, set it to zero",
1563           GST_TIME_ARGS (max_tc_time));
1564       target_time = 0;
1565     }
1566
1567     if (next_max_tc_time >= tc_rounding_error) {
1568       next_fku_time = next_max_tc_time - tc_rounding_error;
1569     } else {
1570       /* unreliable target time */
1571       GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1572           " is smaller than allowed rounding error, set it to zero",
1573           GST_TIME_ARGS (next_max_tc_time));
1574       next_fku_time = 0;
1575     }
1576   } else {
1577     target_time = newest_gop->start_time + splitmux->threshold_time;
1578   }
1579
1580   if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1581     GstClockTime allowed_time = splitmux->next_fku_time;
1582
1583     if (timecode_based) {
1584       if (allowed_time >= tc_rounding_error) {
1585         allowed_time -= tc_rounding_error;
1586       } else {
1587         /* unreliable next force key unit time */
1588         GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1589             GST_TIME_FORMAT
1590             " is smaller than allowed rounding error, set it to zero",
1591             GST_TIME_ARGS (splitmux->next_fku_time));
1592         allowed_time = 0;
1593       }
1594     }
1595
1596     if (target_time < allowed_time) {
1597       GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1598           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1599           ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1600           GST_TIME_ARGS (target_time),
1601           GST_TIME_ARGS (splitmux->next_fku_time),
1602           GST_TIME_ARGS (allowed_time));
1603
1604       return TRUE;
1605     } else if (allowed_time != splitmux->next_fku_time &&
1606         target_time < splitmux->next_fku_time) {
1607       GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1608           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1609           ", but the difference is smaller than allowed rounding error",
1610           GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1611     }
1612   }
1613
1614   if (!timecode_based) {
1615     next_fku_time = target_time + splitmux->threshold_time;
1616   }
1617
1618   GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1619       ", the next expected keyframe request time is %" GST_TIME_FORMAT,
1620       GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1621
1622   newest_gop->sent_fku = TRUE;
1623
1624   splitmux->next_fku_time = next_fku_time;
1625   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1626
1627   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1628 }
1629
1630 static GstPadProbeReturn
1631 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1632 {
1633   GstSplitMuxSink *splitmux = ctx->splitmux;
1634   MqStreamBuf *buf_info = NULL;
1635   GstFlowReturn ret = GST_FLOW_OK;
1636
1637   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1638
1639   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1640   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1641     g_warning ("Buffer list handling not implemented");
1642     return GST_PAD_PROBE_DROP;
1643   }
1644   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1645       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1646     GstEvent *event = gst_pad_probe_info_get_event (info);
1647     gboolean locked = FALSE, wait = !ctx->is_reference;
1648
1649     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1650
1651     switch (GST_EVENT_TYPE (event)) {
1652       case GST_EVENT_SEGMENT:
1653         gst_event_copy_segment (event, &ctx->out_segment);
1654         break;
1655       case GST_EVENT_FLUSH_STOP:
1656         GST_SPLITMUX_LOCK (splitmux);
1657         locked = TRUE;
1658         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1659         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1660         g_queue_clear (&ctx->queued_bufs);
1661         g_queue_clear (&ctx->queued_bufs);
1662         /* If this is the reference context, we just threw away any queued keyframes */
1663         if (ctx->is_reference)
1664           splitmux->queued_keyframes = 0;
1665         ctx->flushing = FALSE;
1666         wait = FALSE;
1667         break;
1668       case GST_EVENT_FLUSH_START:
1669         GST_SPLITMUX_LOCK (splitmux);
1670         locked = TRUE;
1671         GST_LOG_OBJECT (pad, "Flush start");
1672         ctx->flushing = TRUE;
1673         GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1674         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1675         break;
1676       case GST_EVENT_EOS:
1677         GST_SPLITMUX_LOCK (splitmux);
1678         locked = TRUE;
1679         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1680           goto beach;
1681         ctx->out_eos = TRUE;
1682
1683         if (ctx == splitmux->reference_ctx) {
1684           splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1685           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1686         }
1687
1688         GST_INFO_OBJECT (splitmux,
1689             "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1690         break;
1691       case GST_EVENT_GAP:{
1692         GstClockTime gap_ts;
1693         GstClockTimeDiff rtime;
1694
1695         gst_event_parse_gap (event, &gap_ts, NULL);
1696         if (gap_ts == GST_CLOCK_TIME_NONE)
1697           break;
1698
1699         GST_SPLITMUX_LOCK (splitmux);
1700         locked = TRUE;
1701
1702         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1703           goto beach;
1704
1705         /* When we get a gap event on the
1706          * reference stream and we're trying to open a
1707          * new file, we need to store it until we get
1708          * the buffer afterwards
1709          */
1710         if (ctx->is_reference &&
1711             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1712           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1713           gst_event_replace (&ctx->pending_gap, event);
1714           GST_SPLITMUX_UNLOCK (splitmux);
1715           return GST_PAD_PROBE_HANDLED;
1716         }
1717
1718         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1719
1720         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1721             GST_STIME_ARGS (rtime));
1722
1723         if (rtime != GST_CLOCK_STIME_NONE) {
1724           ctx->out_running_time = rtime;
1725           complete_or_wait_on_out (splitmux, ctx);
1726         }
1727         break;
1728       }
1729       case GST_EVENT_CUSTOM_DOWNSTREAM:{
1730         const GstStructure *s;
1731         GstClockTimeDiff ts = 0;
1732
1733         s = gst_event_get_structure (event);
1734         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1735           break;
1736
1737         gst_structure_get_int64 (s, "timestamp", &ts);
1738
1739         GST_SPLITMUX_LOCK (splitmux);
1740         locked = TRUE;
1741
1742         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1743           goto beach;
1744         ctx->out_running_time = ts;
1745         if (!ctx->is_reference)
1746           ret = complete_or_wait_on_out (splitmux, ctx);
1747         GST_SPLITMUX_UNLOCK (splitmux);
1748         GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1749         return GST_PAD_PROBE_DROP;
1750       }
1751       case GST_EVENT_CAPS:{
1752         GstPad *peer;
1753
1754         if (!ctx->is_reference)
1755           break;
1756
1757         peer = gst_pad_get_peer (pad);
1758         if (peer) {
1759           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1760
1761           gst_object_unref (peer);
1762
1763           if (ok)
1764             break;
1765
1766         } else {
1767           break;
1768         }
1769         /* This is in the case the muxer doesn't allow this change of caps */
1770         GST_SPLITMUX_LOCK (splitmux);
1771         locked = TRUE;
1772         ctx->caps_change = TRUE;
1773
1774         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1775           GST_DEBUG_OBJECT (splitmux,
1776               "New caps were not accepted. Switching output file");
1777           if (ctx->out_eos == FALSE) {
1778             splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1779             GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1780           }
1781         }
1782
1783         /* Lets it fall through, if it fails again, then the muxer just can't
1784          * support this format, but at least we have a closed file.
1785          */
1786         break;
1787       }
1788       default:
1789         break;
1790     }
1791
1792     /* We need to make sure events aren't passed
1793      * until the muxer / sink are ready for it */
1794     if (!locked)
1795       GST_SPLITMUX_LOCK (splitmux);
1796     if (wait)
1797       ret = complete_or_wait_on_out (splitmux, ctx);
1798     GST_SPLITMUX_UNLOCK (splitmux);
1799
1800     /* Don't try to forward sticky events before the next buffer is there
1801      * because it would cause a new file to be created without the first
1802      * buffer being available.
1803      */
1804     GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1805     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1806       gst_event_unref (event);
1807       return GST_PAD_PROBE_HANDLED;
1808     } else {
1809       return GST_PAD_PROBE_PASS;
1810     }
1811   }
1812
1813   /* Allow everything through until the configured next stopping point */
1814   GST_SPLITMUX_LOCK (splitmux);
1815
1816   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1817   if (buf_info == NULL) {
1818     /* Can only happen due to a poorly timed flush */
1819     ret = GST_FLOW_FLUSHING;
1820     goto beach;
1821   }
1822
1823   /* If we have popped a keyframe, decrement the queued_gop count */
1824   if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
1825     splitmux->queued_keyframes--;
1826
1827   ctx->out_running_time = buf_info->run_ts;
1828   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1829
1830   GST_LOG_OBJECT (splitmux,
1831       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1832       " size %" G_GUINT64_FORMAT,
1833       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1834
1835   ctx->caps_change = FALSE;
1836
1837   ret = complete_or_wait_on_out (splitmux, ctx);
1838
1839   splitmux->muxed_out_bytes += buf_info->buf_size;
1840
1841 #ifndef GST_DISABLE_GST_DEBUG
1842   {
1843     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1844     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1845         " run ts %" GST_STIME_FORMAT, buf,
1846         GST_STIME_ARGS (ctx->out_running_time));
1847   }
1848 #endif
1849
1850   ctx->cur_out_buffer = NULL;
1851   GST_SPLITMUX_UNLOCK (splitmux);
1852
1853   /* pending_gap is protected by the STREAM lock */
1854   if (ctx->pending_gap) {
1855     /* If we previously stored a gap event, send it now */
1856     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1857
1858     GST_DEBUG_OBJECT (splitmux,
1859         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1860
1861     gst_pad_send_event (peer, ctx->pending_gap);
1862     ctx->pending_gap = NULL;
1863
1864     gst_object_unref (peer);
1865   }
1866
1867   mq_stream_buf_free (buf_info);
1868
1869   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1870   return GST_PAD_PROBE_PASS;
1871
1872 beach:
1873   GST_SPLITMUX_UNLOCK (splitmux);
1874   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1875   return GST_PAD_PROBE_DROP;
1876 }
1877
1878 static gboolean
1879 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1880 {
1881   return gst_pad_send_event (peer, gst_event_ref (*event));
1882 }
1883
1884 static void
1885 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1886 {
1887   if (ctx->fragment_block_id > 0) {
1888     gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1889     ctx->fragment_block_id = 0;
1890   }
1891 }
1892
1893 static void
1894 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1895 {
1896   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1897
1898   gst_pad_sticky_events_foreach (ctx->srcpad,
1899       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1900
1901   /* Clear EOS flag if not actually EOS */
1902   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1903   ctx->out_eos_async_done = ctx->out_eos;
1904
1905   gst_object_unref (peer);
1906 }
1907
1908 static void
1909 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1910 {
1911   GstPad *sinkpad, *srcpad, *newpad;
1912   GstPadTemplate *templ;
1913
1914   srcpad = ctx->srcpad;
1915   sinkpad = gst_pad_get_peer (srcpad);
1916
1917   templ = sinkpad->padtemplate;
1918   newpad =
1919       gst_element_request_pad (splitmux->muxer, templ,
1920       GST_PAD_NAME (sinkpad), NULL);
1921
1922   GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1923       newpad);
1924   if (!gst_pad_unlink (srcpad, sinkpad)) {
1925     gst_object_unref (sinkpad);
1926     goto fail;
1927   }
1928   if (gst_pad_link_full (srcpad, newpad,
1929           GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1930     gst_element_release_request_pad (splitmux->muxer, newpad);
1931     gst_object_unref (sinkpad);
1932     gst_object_unref (newpad);
1933     goto fail;
1934   }
1935   gst_object_unref (newpad);
1936   gst_object_unref (sinkpad);
1937   return;
1938
1939 fail:
1940   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1941       ("Could not create the new muxer/sink"), NULL);
1942 }
1943
1944 static GstPadProbeReturn
1945 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1946 {
1947   return GST_PAD_PROBE_OK;
1948 }
1949
1950 static void
1951 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1952 {
1953   ctx->fragment_block_id =
1954       gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1955       NULL, NULL);
1956 }
1957
1958 static gboolean
1959 _set_property_from_structure (GQuark field_id, const GValue * value,
1960     gpointer user_data)
1961 {
1962   const gchar *property_name = g_quark_to_string (field_id);
1963   GObject *element = G_OBJECT (user_data);
1964
1965   g_object_set_property (element, property_name, value);
1966
1967   return TRUE;
1968 }
1969
1970 static void
1971 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1972 {
1973   gst_element_set_locked_state (element, TRUE);
1974   gst_element_set_state (element, GST_STATE_NULL);
1975   GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1976   gst_bin_remove (GST_BIN (splitmux), element);
1977 }
1978
1979
1980 static void
1981 _send_event (const GValue * value, gpointer user_data)
1982 {
1983   GstPad *pad = g_value_get_object (value);
1984   GstEvent *ev = user_data;
1985
1986   gst_pad_send_event (pad, gst_event_ref (ev));
1987 }
1988
1989 /* Called with lock held when a fragment
1990  * reaches EOS and it is time to restart
1991  * a new fragment
1992  */
1993 static GstFlowReturn
1994 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1995 {
1996   GstElement *muxer, *sink;
1997
1998   g_assert (ctx->is_reference);
1999
2000   /* 1 change to new file */
2001   splitmux->switching_fragment = TRUE;
2002
2003   /* We need to drop the splitmux lock to acquire the state lock
2004    * here and ensure there's no racy state change going on elsewhere */
2005   muxer = gst_object_ref (splitmux->muxer);
2006   sink = gst_object_ref (splitmux->active_sink);
2007
2008   GST_SPLITMUX_UNLOCK (splitmux);
2009   GST_SPLITMUX_STATE_LOCK (splitmux);
2010
2011   if (splitmux->shutdown) {
2012     GST_DEBUG_OBJECT (splitmux,
2013         "Shutdown requested. Aborting fragment switch.");
2014     GST_SPLITMUX_LOCK (splitmux);
2015     GST_SPLITMUX_STATE_UNLOCK (splitmux);
2016     gst_object_unref (muxer);
2017     gst_object_unref (sink);
2018     return GST_FLOW_FLUSHING;
2019   }
2020
2021   if (splitmux->async_finalize) {
2022     if (splitmux->muxed_out_bytes > 0
2023         || splitmux->fragment_id != splitmux->start_index) {
2024       gchar *newname;
2025       GstElement *new_sink, *new_muxer;
2026
2027       GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
2028           splitmux->fragment_id);
2029       g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
2030       newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
2031       GST_SPLITMUX_LOCK (splitmux);
2032       if ((splitmux->sink =
2033               create_element (splitmux, splitmux->sink_factory, newname,
2034                   TRUE)) == NULL)
2035         goto fail;
2036       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
2037         gst_preset_load_preset (GST_PRESET (splitmux->sink),
2038             splitmux->sink_preset);
2039       if (splitmux->sink_properties)
2040         gst_structure_foreach (splitmux->sink_properties,
2041             _set_property_from_structure, splitmux->sink);
2042       splitmux->active_sink = splitmux->sink;
2043       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
2044       g_free (newname);
2045       newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
2046       if ((splitmux->muxer =
2047               create_element (splitmux, splitmux->muxer_factory, newname,
2048                   TRUE)) == NULL)
2049         goto fail;
2050       if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2051               "async") != NULL) {
2052         /* async child elements are causing state change races and weird
2053          * failures, so let's try and turn that off */
2054         g_object_set (splitmux->sink, "async", FALSE, NULL);
2055       }
2056       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
2057         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
2058             splitmux->muxer_preset);
2059       if (splitmux->muxer_properties)
2060         gst_structure_foreach (splitmux->muxer_properties,
2061             _set_property_from_structure, splitmux->muxer);
2062       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2063       g_free (newname);
2064       new_sink = splitmux->sink;
2065       new_muxer = splitmux->muxer;
2066       GST_SPLITMUX_UNLOCK (splitmux);
2067       g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2068       gst_element_link (new_muxer, new_sink);
2069
2070       if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2071         if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2072                     EOS_FROM_US)) == 2) {
2073           _lock_and_set_to_null (muxer, splitmux);
2074           _lock_and_set_to_null (sink, splitmux);
2075         } else {
2076           g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2077               GINT_TO_POINTER (2));
2078         }
2079       }
2080       gst_object_unref (muxer);
2081       gst_object_unref (sink);
2082       muxer = new_muxer;
2083       sink = new_sink;
2084       gst_object_ref (muxer);
2085       gst_object_ref (sink);
2086     }
2087   } else {
2088
2089     gst_element_set_locked_state (muxer, TRUE);
2090     gst_element_set_locked_state (sink, TRUE);
2091     gst_element_set_state (sink, GST_STATE_NULL);
2092
2093     if (splitmux->reset_muxer) {
2094       gst_element_set_state (muxer, GST_STATE_NULL);
2095     } else {
2096       GstIterator *it = gst_element_iterate_sink_pads (muxer);
2097       GstEvent *ev;
2098       guint32 seqnum;
2099
2100       ev = gst_event_new_flush_start ();
2101       seqnum = gst_event_get_seqnum (ev);
2102       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2103       gst_event_unref (ev);
2104
2105       gst_iterator_resync (it);
2106
2107       ev = gst_event_new_flush_stop (TRUE);
2108       gst_event_set_seqnum (ev, seqnum);
2109       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2110       gst_event_unref (ev);
2111
2112       gst_iterator_free (it);
2113     }
2114   }
2115
2116   GST_SPLITMUX_LOCK (splitmux);
2117   set_next_filename (splitmux, ctx);
2118   splitmux->muxed_out_bytes = 0;
2119   GST_SPLITMUX_UNLOCK (splitmux);
2120
2121   if (gst_element_set_state (sink,
2122           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2123     gst_element_set_state (sink, GST_STATE_NULL);
2124     gst_element_set_locked_state (muxer, FALSE);
2125     gst_element_set_locked_state (sink, FALSE);
2126
2127     goto fail_output;
2128   }
2129
2130   if (gst_element_set_state (muxer,
2131           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2132     gst_element_set_state (muxer, GST_STATE_NULL);
2133     gst_element_set_state (sink, GST_STATE_NULL);
2134     gst_element_set_locked_state (muxer, FALSE);
2135     gst_element_set_locked_state (sink, FALSE);
2136     goto fail_muxer;
2137   }
2138
2139   gst_element_set_locked_state (muxer, FALSE);
2140   gst_element_set_locked_state (sink, FALSE);
2141
2142   gst_object_unref (sink);
2143   gst_object_unref (muxer);
2144
2145   GST_SPLITMUX_LOCK (splitmux);
2146   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2147   splitmux->switching_fragment = FALSE;
2148   do_async_done (splitmux);
2149
2150   splitmux->ready_for_output = TRUE;
2151
2152   g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2153   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2154
2155   send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2156
2157   /* FIXME: Is this always the correct next state? */
2158   GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2159   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2160   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2161   return GST_FLOW_OK;
2162
2163 fail:
2164   gst_object_unref (sink);
2165   gst_object_unref (muxer);
2166
2167   GST_SPLITMUX_LOCK (splitmux);
2168   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2169   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2170       ("Could not create the new muxer/sink"), NULL);
2171   return GST_FLOW_ERROR;
2172
2173 fail_output:
2174   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2175       ("Could not start new output sink"), NULL);
2176   gst_object_unref (sink);
2177   gst_object_unref (muxer);
2178
2179   GST_SPLITMUX_LOCK (splitmux);
2180   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2181   splitmux->switching_fragment = FALSE;
2182   return GST_FLOW_ERROR;
2183
2184 fail_muxer:
2185   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2186       ("Could not start new muxer"), NULL);
2187   gst_object_unref (sink);
2188   gst_object_unref (muxer);
2189
2190   GST_SPLITMUX_LOCK (splitmux);
2191   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2192   splitmux->switching_fragment = FALSE;
2193   return GST_FLOW_ERROR;
2194 }
2195
2196 static void
2197 bus_handler (GstBin * bin, GstMessage * message)
2198 {
2199   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2200
2201   switch (GST_MESSAGE_TYPE (message)) {
2202     case GST_MESSAGE_EOS:{
2203       /* If the state is draining out the current file, drop this EOS */
2204       GstElement *sink;
2205
2206       sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2207       GST_SPLITMUX_LOCK (splitmux);
2208
2209       send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2210
2211       if (splitmux->async_finalize) {
2212
2213         if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2214           if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2215                       EOS_FROM_US)) == 2) {
2216             GstElement *muxer;
2217             GstPad *sinksink, *muxersrc;
2218
2219             sinksink = gst_element_get_static_pad (sink, "sink");
2220             muxersrc = gst_pad_get_peer (sinksink);
2221             muxer = gst_pad_get_parent_element (muxersrc);
2222             gst_object_unref (sinksink);
2223             gst_object_unref (muxersrc);
2224
2225             gst_element_call_async (muxer,
2226                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2227                 gst_object_ref (splitmux), gst_object_unref);
2228             gst_element_call_async (sink,
2229                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2230                 gst_object_ref (splitmux), gst_object_unref);
2231             gst_object_unref (muxer);
2232           } else {
2233             g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2234                 GINT_TO_POINTER (2));
2235           }
2236           GST_DEBUG_OBJECT (splitmux,
2237               "Caught async EOS from previous muxer+sink. Dropping.");
2238           /* We forward the EOS so that it gets aggregated as normal. If the sink
2239            * finishes and is removed before the end, it will be de-aggregated */
2240           gst_message_unref (message);
2241           GST_SPLITMUX_UNLOCK (splitmux);
2242           return;
2243         }
2244       } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2245         GST_DEBUG_OBJECT (splitmux,
2246             "Passing EOS message. Output state %d max_out_running_time %"
2247             GST_STIME_FORMAT, splitmux->output_state,
2248             GST_STIME_ARGS (splitmux->max_out_running_time));
2249       } else {
2250         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2251         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2252         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2253
2254         gst_message_unref (message);
2255         GST_SPLITMUX_UNLOCK (splitmux);
2256         return;
2257       }
2258       GST_SPLITMUX_UNLOCK (splitmux);
2259       break;
2260     }
2261     case GST_MESSAGE_ASYNC_START:
2262     case GST_MESSAGE_ASYNC_DONE:
2263       /* Ignore state changes from our children while switching */
2264       GST_SPLITMUX_LOCK (splitmux);
2265       if (splitmux->switching_fragment) {
2266         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2267             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2268           GST_LOG_OBJECT (splitmux,
2269               "Ignoring state change from child %" GST_PTR_FORMAT
2270               " while switching", GST_MESSAGE_SRC (message));
2271           gst_message_unref (message);
2272           GST_SPLITMUX_UNLOCK (splitmux);
2273           return;
2274         }
2275       }
2276       GST_SPLITMUX_UNLOCK (splitmux);
2277       break;
2278     case GST_MESSAGE_WARNING:
2279     {
2280       GError *gerror = NULL;
2281
2282       gst_message_parse_warning (message, &gerror, NULL);
2283
2284       if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2285         GList *item;
2286         gboolean caps_change = FALSE;
2287
2288         GST_SPLITMUX_LOCK (splitmux);
2289
2290         for (item = splitmux->contexts; item; item = item->next) {
2291           MqStreamCtx *ctx = item->data;
2292
2293           if (ctx->caps_change) {
2294             caps_change = TRUE;
2295             break;
2296           }
2297         }
2298
2299         GST_SPLITMUX_UNLOCK (splitmux);
2300
2301         if (caps_change) {
2302           GST_LOG_OBJECT (splitmux,
2303               "Ignoring warning change from child %" GST_PTR_FORMAT
2304               " while switching caps", GST_MESSAGE_SRC (message));
2305           gst_message_unref (message);
2306           return;
2307         }
2308       }
2309       break;
2310     }
2311     default:
2312       break;
2313   }
2314
2315   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2316 }
2317
2318 static void
2319 ctx_set_unblock (MqStreamCtx * ctx)
2320 {
2321   ctx->need_unblock = TRUE;
2322 }
2323
2324 static gboolean
2325 need_new_fragment (GstSplitMuxSink * splitmux,
2326     GstClockTime queued_time, GstClockTime queued_gop_time,
2327     guint64 queued_bytes)
2328 {
2329   guint64 thresh_bytes;
2330   GstClockTime thresh_time;
2331   gboolean check_robust_muxing;
2332   GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2333   GstClockTime *ptr_to_time;
2334   const InputGop *gop, *next_gop;
2335
2336   GST_OBJECT_LOCK (splitmux);
2337   thresh_bytes = splitmux->threshold_bytes;
2338   thresh_time = splitmux->threshold_time;
2339   ptr_to_time = (GstClockTime *)
2340       gst_queue_array_peek_head_struct (splitmux->times_to_split);
2341   if (ptr_to_time)
2342     time_to_split = *ptr_to_time;
2343   check_robust_muxing = splitmux->use_robust_muxing
2344       && splitmux->muxer_has_reserved_props;
2345   GST_OBJECT_UNLOCK (splitmux);
2346
2347   /* Have we muxed at least one thing from the reference
2348    * stream into the file? If not, no other streams can have
2349    * either */
2350   if (splitmux->fragment_reference_bytes <= 0) {
2351     GST_TRACE_OBJECT (splitmux,
2352         "Not ready to split - nothing muxed on the reference stream");
2353     return FALSE;
2354   }
2355
2356   /* User told us to split now */
2357   if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2358     GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2359     return TRUE;
2360   }
2361
2362   gop = g_queue_peek_head (&splitmux->pending_input_gops);
2363   /* We need a full GOP queued up at this point */
2364   g_assert (gop != NULL);
2365   next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2366   /* And the beginning of the next GOP or otherwise EOS */
2367
2368   /* User told us to split at this running time */
2369   if (gop->start_time >= time_to_split) {
2370     GST_OBJECT_LOCK (splitmux);
2371     /* Dequeue running time */
2372     gst_queue_array_pop_head_struct (splitmux->times_to_split);
2373     /* Empty any running times after this that are past now */
2374     ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2375     while (ptr_to_time) {
2376       time_to_split = *ptr_to_time;
2377       if (gop->start_time < time_to_split) {
2378         break;
2379       }
2380       gst_queue_array_pop_head_struct (splitmux->times_to_split);
2381       ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2382     }
2383     GST_TRACE_OBJECT (splitmux,
2384         "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2385         GST_STIME_FORMAT, GST_STIME_ARGS (gop->start_time),
2386         GST_STIME_ARGS (time_to_split));
2387     GST_OBJECT_UNLOCK (splitmux);
2388     return TRUE;
2389   }
2390
2391   if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2392     GST_TRACE_OBJECT (splitmux,
2393         "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2394     return TRUE;                /* Would overrun byte limit */
2395   }
2396
2397   if (thresh_time > 0 && queued_time > thresh_time) {
2398     GST_TRACE_OBJECT (splitmux,
2399         "queued time %" GST_STIME_FORMAT " overruns time limit",
2400         GST_STIME_ARGS (queued_time));
2401     return TRUE;                /* Would overrun time limit */
2402   }
2403
2404   if (splitmux->tc_interval) {
2405     GstClockTime next_gop_start_time =
2406         next_gop ? next_gop->start_time : splitmux->max_in_running_time;
2407
2408     if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2409         GST_CLOCK_STIME_IS_VALID (next_gop_start_time) &&
2410         next_gop_start_time >
2411         splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2412       GST_TRACE_OBJECT (splitmux,
2413           "in running time %" GST_STIME_FORMAT " overruns time limit %"
2414           GST_TIME_FORMAT, GST_STIME_ARGS (next_gop_start_time),
2415           GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2416       return TRUE;
2417     }
2418   }
2419
2420   if (check_robust_muxing) {
2421     GstClockTime mux_reserved_remain;
2422
2423     g_object_get (splitmux->muxer,
2424         "reserved-duration-remaining", &mux_reserved_remain, NULL);
2425
2426     GST_LOG_OBJECT (splitmux,
2427         "Muxer robust muxing report - %" G_GUINT64_FORMAT
2428         " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2429         mux_reserved_remain, queued_gop_time);
2430
2431     if (queued_gop_time >= mux_reserved_remain) {
2432       GST_INFO_OBJECT (splitmux,
2433           "File is about to run out of header room - %" G_GUINT64_FORMAT
2434           " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2435           ". Switching to new file", mux_reserved_remain, queued_gop_time);
2436       return TRUE;
2437     }
2438   }
2439
2440   /* Continue and mux this GOP */
2441   return FALSE;
2442 }
2443
2444 /* probably we want to add this API? */
2445 static void
2446 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2447 {
2448   GstVideoTimeCode *timecode = NULL;
2449
2450   g_return_if_fail (old_tc != NULL);
2451
2452   if (*old_tc == new_tc)
2453     return;
2454
2455   if (new_tc)
2456     timecode = gst_video_time_code_copy (new_tc);
2457
2458   if (*old_tc)
2459     gst_video_time_code_free (*old_tc);
2460
2461   *old_tc = timecode;
2462 }
2463
2464 /* Called with splitmux lock held */
2465 /* Called when entering ProcessingCompleteGop state
2466  * Assess if mq contents overflowed the current file
2467  *   -> If yes, need to switch to new file
2468  *   -> if no, set max_out_running_time to let this GOP in and
2469  *      go to COLLECTING_GOP_START state
2470  */
2471 static void
2472 handle_gathered_gop (GstSplitMuxSink * splitmux, const InputGop * gop,
2473     GstClockTimeDiff next_gop_start_time, GstClockTimeDiff max_out_running_time)
2474 {
2475   guint64 queued_bytes;
2476   GstClockTimeDiff queued_time = 0;
2477   GstClockTimeDiff queued_gop_time = 0;
2478   SplitMuxOutputCommand *cmd;
2479
2480   /* Assess if the multiqueue contents overflowed the current file */
2481   /* When considering if a newly gathered GOP overflows
2482    * the time limit for the file, only consider the running time of the
2483    * reference stream. Other streams might have run ahead a little bit,
2484    * but extra pieces won't be released to the muxer beyond the reference
2485    * stream cut-off anyway - so it forms the limit. */
2486   queued_bytes = splitmux->fragment_total_bytes + gop->total_bytes;
2487   queued_time = next_gop_start_time;
2488   /* queued_gop_time tracks how much unwritten data there is waiting to
2489    * be written to this fragment including this GOP */
2490   if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2491     queued_gop_time = queued_time - splitmux->reference_ctx->out_running_time;
2492   else
2493     queued_gop_time = queued_time - gop->start_time;
2494
2495   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2496   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2497       " bytes %" G_GUINT64_FORMAT " in next gop start time %" GST_STIME_FORMAT
2498       " gop start time %" GST_STIME_FORMAT,
2499       GST_STIME_ARGS (queued_time), queued_bytes,
2500       GST_STIME_ARGS (next_gop_start_time), GST_STIME_ARGS (gop->start_time));
2501
2502   if (queued_gop_time < 0)
2503     goto error_gop_duration;
2504
2505   if (queued_time < splitmux->fragment_start_time)
2506     goto error_queued_time;
2507
2508   queued_time -= splitmux->fragment_start_time;
2509   if (queued_time < queued_gop_time)
2510     queued_gop_time = queued_time;
2511
2512   /* Expand queued bytes estimate by muxer overhead */
2513   queued_bytes += (queued_bytes * splitmux->mux_overhead);
2514
2515   /* Check for overrun - have we output at least one byte and overrun
2516    * either threshold? */
2517   if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2518     if (splitmux->async_finalize) {
2519       GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2520       *sink_running_time = splitmux->reference_ctx->out_running_time;
2521       g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2522           RUNNING_TIME, sink_running_time, g_free);
2523     }
2524     g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2525     /* Tell the output side to start a new fragment */
2526     GST_INFO_OBJECT (splitmux,
2527         "This GOP (dur %" GST_STIME_FORMAT
2528         ") would overflow the fragment, Sending start_new_fragment cmd",
2529         GST_STIME_ARGS (queued_gop_time));
2530     cmd = out_cmd_buf_new ();
2531     cmd->start_new_fragment = TRUE;
2532     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2533     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2534
2535     splitmux->fragment_start_time = gop->start_time;
2536     splitmux->fragment_start_time_pts = gop->start_time_pts;
2537     splitmux->fragment_total_bytes = 0;
2538     splitmux->fragment_reference_bytes = 0;
2539
2540     video_time_code_replace (&splitmux->fragment_start_tc, gop->start_tc);
2541     splitmux->next_fragment_start_tc_time =
2542         calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2543         splitmux->fragment_start_time, NULL);
2544     if (!GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2545       GST_WARNING_OBJECT (splitmux,
2546           "Couldn't calculate next fragment start time for timecode mode");
2547     }
2548   }
2549
2550   /* And set up to collect the next GOP */
2551   if (max_out_running_time != G_MAXINT64) {
2552     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2553   } else {
2554     /* This is probably already the current state, but just in case: */
2555     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2556   }
2557
2558   /* And wake all input contexts to send a wake-up event */
2559   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2560   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2561
2562   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2563   splitmux->fragment_total_bytes += gop->total_bytes;
2564   splitmux->fragment_reference_bytes += gop->reference_bytes;
2565
2566   if (gop->total_bytes > 0) {
2567     GST_LOG_OBJECT (splitmux,
2568         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2569         " time %" GST_STIME_FORMAT,
2570         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2571
2572     /* Send this GOP to the output command queue */
2573     cmd = out_cmd_buf_new ();
2574     cmd->start_new_fragment = FALSE;
2575     cmd->max_output_ts = max_out_running_time;
2576     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2577         GST_STIME_FORMAT, GST_STIME_ARGS (max_out_running_time));
2578     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2579
2580     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2581   }
2582
2583   return;
2584
2585 error_gop_duration:
2586   GST_ELEMENT_ERROR (splitmux,
2587       STREAM, FAILED, ("Timestamping error on input streams"),
2588       ("Queued GOP time is negative %" GST_STIME_FORMAT,
2589           GST_STIME_ARGS (queued_gop_time)));
2590   return;
2591 error_queued_time:
2592   GST_ELEMENT_ERROR (splitmux,
2593       STREAM, FAILED, ("Timestamping error on input streams"),
2594       ("Queued time is negative. Input went backwards. queued_time - %"
2595           GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2596   return;
2597 }
2598
2599 /* Called with splitmux lock held */
2600 /* Called from each input pad when it is has all the pieces
2601  * for a GOP or EOS, starting with the reference pad which has set the
2602  * splitmux->max_in_running_time
2603  */
2604 static void
2605 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2606 {
2607   GList *cur;
2608   GstEvent *event;
2609
2610   /* On ENDING_FILE, the reference stream sends a command to start a new
2611    * fragment, then releases the GOP for output in the new fragment.
2612    *  If some streams received no buffer during the last GOP that overran,
2613    * because its next buffer has a timestamp bigger than
2614    * ctx->max_in_running_time, its queue is empty. In that case the only
2615    * way to wakeup the output thread is by injecting an event in the
2616    * queue. This usually happen with subtitle streams.
2617    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2618   if (ctx->need_unblock) {
2619     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2620     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2621         GST_EVENT_TYPE_SERIALIZED,
2622         gst_structure_new ("splitmuxsink-unblock", "timestamp",
2623             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2624
2625     GST_SPLITMUX_UNLOCK (splitmux);
2626     gst_pad_send_event (ctx->sinkpad, event);
2627     GST_SPLITMUX_LOCK (splitmux);
2628
2629     ctx->need_unblock = FALSE;
2630     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2631     /* state may have changed while we were unlocked. Loop again if so */
2632     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2633       return;
2634   }
2635
2636   do {
2637     GstClockTimeDiff next_gop_start = GST_CLOCK_STIME_NONE;
2638
2639     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2640       GstClockTimeDiff max_out_running_time;
2641       gboolean ready = TRUE;
2642       InputGop *gop;
2643       const InputGop *next_gop;
2644
2645       gop = g_queue_peek_head (&splitmux->pending_input_gops);
2646       next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2647
2648       /* If we have no GOP or no next GOP here then the reference context is
2649        * at EOS, otherwise use the start time of the next GOP if we're far
2650        * enough in the GOP to know it */
2651       if (gop && next_gop) {
2652         if (!splitmux->reference_ctx->in_eos
2653             && splitmux->max_in_running_time_dts != GST_CLOCK_STIME_NONE
2654             && splitmux->max_in_running_time_dts < next_gop->start_time_pts) {
2655           GST_LOG_OBJECT (splitmux,
2656               "No further GOPs finished collecting, waiting until current DTS %"
2657               GST_STIME_FORMAT " has passed next GOP start PTS %"
2658               GST_STIME_FORMAT,
2659               GST_STIME_ARGS (splitmux->max_in_running_time_dts),
2660               GST_STIME_ARGS (next_gop->start_time_pts));
2661           break;
2662         }
2663
2664         GST_LOG_OBJECT (splitmux,
2665             "Finished collecting GOP with start time %" GST_STIME_FORMAT
2666             ", next GOP start time %" GST_STIME_FORMAT,
2667             GST_STIME_ARGS (gop->start_time),
2668             GST_STIME_ARGS (next_gop->start_time));
2669         next_gop_start = next_gop->start_time;
2670         max_out_running_time =
2671             splitmux->reference_ctx->in_eos ? G_MAXINT64 : next_gop->start_time;
2672       } else if (!next_gop) {
2673         GST_LOG_OBJECT (splitmux, "Reference context is EOS");
2674         next_gop_start = splitmux->max_in_running_time;
2675         max_out_running_time = G_MAXINT64;
2676       } else if (!gop) {
2677         GST_LOG_OBJECT (splitmux, "No further GOPs finished collecting");
2678         break;
2679       } else {
2680         g_assert_not_reached ();
2681       }
2682
2683       g_assert (gop != NULL);
2684
2685       /* Iterate each pad, and check that the input running time is at least
2686        * up to the start running time of the next GOP or EOS, and if so handle
2687        * the collected GOP */
2688       GST_LOG_OBJECT (splitmux, "Checking GOP collected, next GOP start %"
2689           GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (next_gop_start), ctx);
2690       for (cur = g_list_first (splitmux->contexts); cur != NULL;
2691           cur = g_list_next (cur)) {
2692         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2693
2694         GST_LOG_OBJECT (splitmux,
2695             "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2696             " EOS %d", tmpctx, tmpctx->sinkpad,
2697             GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2698
2699         if (next_gop_start != GST_CLOCK_STIME_NONE &&
2700             tmpctx->in_running_time < next_gop_start && !tmpctx->in_eos) {
2701           GST_LOG_OBJECT (splitmux,
2702               "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2703               tmpctx, tmpctx->sinkpad);
2704           ready = FALSE;
2705           break;
2706         }
2707       }
2708       if (ready) {
2709         GST_DEBUG_OBJECT (splitmux,
2710             "Collected GOP is complete. Processing (ctx %p)", ctx);
2711         /* All pads have a complete GOP, release it into the multiqueue */
2712         handle_gathered_gop (splitmux, gop, next_gop_start,
2713             max_out_running_time);
2714
2715         g_queue_pop_head (&splitmux->pending_input_gops);
2716         input_gop_free (gop);
2717
2718         /* The user has requested a split, we can split now that the previous GOP
2719          * has been collected to the correct location */
2720         if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2721                 TRUE, FALSE)) {
2722           g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2723         }
2724       }
2725     }
2726
2727     /* If upstream reached EOS we are not expecting more data, no need to wait
2728      * here. */
2729     if (ctx->in_eos)
2730       return;
2731
2732     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2733         !ctx->flushing &&
2734         ctx->in_running_time >= next_gop_start &&
2735         next_gop_start != GST_CLOCK_STIME_NONE) {
2736       /* Some pad is not yet ready, or GOP is being pushed
2737        * either way, sleep and wait to get woken */
2738       GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2739       GST_SPLITMUX_WAIT_INPUT (splitmux);
2740       GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2741     } else {
2742       /* This pad is not ready or the state changed - break out and get another
2743        * buffer / event */
2744       break;
2745     }
2746   } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2747 }
2748
2749 static GstPadProbeReturn
2750 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2751 {
2752   GstSplitMuxSink *splitmux = ctx->splitmux;
2753   GstFlowReturn ret = GST_FLOW_OK;
2754   GstBuffer *buf;
2755   MqStreamBuf *buf_info = NULL;
2756   GstClockTime ts, pts, dts;
2757   GstClockTimeDiff running_time, running_time_pts, running_time_dts;
2758   gboolean loop_again;
2759   gboolean keyframe = FALSE;
2760
2761   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2762
2763   /* FIXME: Handle buffer lists, until then make it clear they won't work */
2764   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2765     g_warning ("Buffer list handling not implemented");
2766     return GST_PAD_PROBE_DROP;
2767   }
2768   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2769       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2770     GstEvent *event = gst_pad_probe_info_get_event (info);
2771
2772     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2773
2774     switch (GST_EVENT_TYPE (event)) {
2775       case GST_EVENT_SEGMENT:
2776         gst_event_copy_segment (event, &ctx->in_segment);
2777         break;
2778       case GST_EVENT_FLUSH_STOP:
2779         GST_SPLITMUX_LOCK (splitmux);
2780         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2781         ctx->in_eos = FALSE;
2782         ctx->in_running_time = GST_CLOCK_STIME_NONE;
2783         GST_SPLITMUX_UNLOCK (splitmux);
2784         break;
2785       case GST_EVENT_EOS:
2786         GST_SPLITMUX_LOCK (splitmux);
2787         ctx->in_eos = TRUE;
2788
2789         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2790           ret = GST_FLOW_FLUSHING;
2791           goto beach;
2792         }
2793
2794         if (ctx->is_reference) {
2795           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2796           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2797           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2798           /* Wake up other input pads to collect this GOP */
2799           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2800           check_completed_gop (splitmux, ctx);
2801         } else if (splitmux->input_state ==
2802             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2803           /* If we are waiting for a GOP to be completed (ie, for aux
2804            * pads to catch up), then this pad is complete, so check
2805            * if the whole GOP is.
2806            */
2807           check_completed_gop (splitmux, ctx);
2808         }
2809         GST_SPLITMUX_UNLOCK (splitmux);
2810         break;
2811       case GST_EVENT_GAP:{
2812         GstClockTime gap_ts;
2813         GstClockTimeDiff rtime;
2814
2815         gst_event_parse_gap (event, &gap_ts, NULL);
2816         if (gap_ts == GST_CLOCK_TIME_NONE)
2817           break;
2818
2819         GST_SPLITMUX_LOCK (splitmux);
2820
2821         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2822           ret = GST_FLOW_FLUSHING;
2823           goto beach;
2824         }
2825         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2826
2827         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2828             GST_STIME_ARGS (rtime));
2829
2830         if (ctx->is_reference && GST_CLOCK_STIME_IS_VALID (rtime)) {
2831           /* If this GAP event happens before the first fragment then
2832            * initialize the fragment start time here. */
2833           if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)) {
2834             splitmux->fragment_start_time = rtime;
2835             GST_LOG_OBJECT (splitmux,
2836                 "Fragment start time now %" GST_STIME_FORMAT,
2837                 GST_STIME_ARGS (splitmux->fragment_start_time));
2838
2839             /* Also take this as the first start time when starting up,
2840              * so that we start counting overflow from the first frame */
2841             if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2842               splitmux->max_in_running_time = rtime;
2843             if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2844               splitmux->max_in_running_time_dts = rtime;
2845           }
2846
2847           /* Similarly take it as fragment start PTS and GOP start time if
2848            * these are not set */
2849           if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time_pts))
2850             splitmux->fragment_start_time_pts = rtime;
2851
2852           if (g_queue_is_empty (&splitmux->pending_input_gops)) {
2853             InputGop *gop = g_slice_new0 (InputGop);
2854
2855             gop->from_gap = TRUE;
2856             gop->start_time = rtime;
2857             gop->start_time_pts = rtime;
2858
2859             g_queue_push_tail (&splitmux->pending_input_gops, gop);
2860           }
2861         }
2862
2863         GST_SPLITMUX_UNLOCK (splitmux);
2864         break;
2865       }
2866       default:
2867         break;
2868     }
2869     return GST_PAD_PROBE_PASS;
2870   } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2871     switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2872       case GST_QUERY_ALLOCATION:
2873         return GST_PAD_PROBE_DROP;
2874       default:
2875         return GST_PAD_PROBE_PASS;
2876     }
2877   }
2878
2879   buf = gst_pad_probe_info_get_buffer (info);
2880   buf_info = mq_stream_buf_new ();
2881
2882   pts = GST_BUFFER_PTS (buf);
2883   dts = GST_BUFFER_DTS (buf);
2884   if (GST_BUFFER_PTS_IS_VALID (buf))
2885     ts = GST_BUFFER_PTS (buf);
2886   else
2887     ts = GST_BUFFER_DTS (buf);
2888
2889   GST_LOG_OBJECT (pad,
2890       "Buffer TS is %" GST_TIME_FORMAT " (PTS %" GST_TIME_FORMAT ", DTS %"
2891       GST_TIME_FORMAT ")", GST_TIME_ARGS (ts), GST_TIME_ARGS (pts),
2892       GST_TIME_ARGS (dts));
2893
2894   GST_SPLITMUX_LOCK (splitmux);
2895
2896   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2897     ret = GST_FLOW_FLUSHING;
2898     goto beach;
2899   }
2900
2901   /* If this buffer has a timestamp, advance the input timestamp of the
2902    * stream */
2903   if (GST_CLOCK_TIME_IS_VALID (ts)) {
2904     running_time = my_segment_to_running_time (&ctx->in_segment, ts);
2905
2906     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2907         GST_STIME_ARGS (running_time));
2908
2909     /* in running time is always the maximum PTS (or DTS) that was observed so far */
2910     if (GST_CLOCK_STIME_IS_VALID (running_time)
2911         && running_time > ctx->in_running_time)
2912       ctx->in_running_time = running_time;
2913   } else {
2914     running_time = ctx->in_running_time;
2915   }
2916
2917   if (GST_CLOCK_TIME_IS_VALID (pts))
2918     running_time_pts = my_segment_to_running_time (&ctx->in_segment, pts);
2919   else
2920     running_time_pts = GST_CLOCK_STIME_NONE;
2921
2922   if (GST_CLOCK_TIME_IS_VALID (dts))
2923     running_time_dts = my_segment_to_running_time (&ctx->in_segment, dts);
2924   else
2925     running_time_dts = GST_CLOCK_STIME_NONE;
2926
2927   /* Try to make sure we have a valid running time */
2928   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2929     ctx->in_running_time =
2930         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2931   }
2932
2933   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2934       GST_STIME_ARGS (ctx->in_running_time));
2935
2936   buf_info->run_ts = ctx->in_running_time;
2937   buf_info->buf_size = gst_buffer_get_size (buf);
2938   buf_info->duration = GST_BUFFER_DURATION (buf);
2939
2940   if (ctx->is_reference) {
2941     InputGop *gop = NULL;
2942     GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
2943
2944     /* initialize fragment_start_time if it was not set yet (i.e. for the
2945      * first fragment), or otherwise set it to the minimum observed time */
2946     if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)
2947         || splitmux->fragment_start_time > running_time) {
2948       if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time))
2949         splitmux->fragment_start_time_pts = running_time_pts;
2950       splitmux->fragment_start_time = running_time;
2951
2952       GST_LOG_OBJECT (splitmux,
2953           "Fragment start time now %" GST_STIME_FORMAT " (initial PTS %"
2954           GST_STIME_FORMAT ")", GST_STIME_ARGS (splitmux->fragment_start_time),
2955           GST_STIME_ARGS (splitmux->fragment_start_time_pts));
2956
2957       /* Also take this as the first start time when starting up,
2958        * so that we start counting overflow from the first frame */
2959       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)
2960           || splitmux->max_in_running_time < splitmux->fragment_start_time)
2961         splitmux->max_in_running_time = splitmux->fragment_start_time;
2962
2963       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2964         splitmux->max_in_running_time_dts = running_time_dts;
2965
2966       if (tc_meta) {
2967         video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
2968
2969         splitmux->next_fragment_start_tc_time =
2970             calculate_next_max_timecode (splitmux, &tc_meta->tc,
2971             running_time, NULL);
2972
2973 #ifndef GST_DISABLE_GST_DEBUG
2974         {
2975           gchar *tc_str;
2976
2977           tc_str = gst_video_time_code_to_string (&tc_meta->tc);
2978           GST_DEBUG_OBJECT (splitmux,
2979               "Initialize fragment start timecode %s, next fragment start timecode time %"
2980               GST_TIME_FORMAT, tc_str,
2981               GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2982           g_free (tc_str);
2983         }
2984 #endif
2985       }
2986     }
2987
2988
2989     /* First check if we're at the very first GOP and the tracking was created
2990      * from a GAP event. In that case don't start a new GOP on keyframes but
2991      * just updated it as needed */
2992     gop = g_queue_peek_tail (&splitmux->pending_input_gops);
2993
2994     if (!gop || (!gop->from_gap
2995             && !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))) {
2996       gop = g_slice_new0 (InputGop);
2997
2998       gop->start_time = running_time;
2999       gop->start_time_pts = running_time_pts;
3000
3001       GST_LOG_OBJECT (splitmux,
3002           "Next GOP start time now %" GST_STIME_FORMAT " (initial PTS %"
3003           GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3004           GST_STIME_ARGS (gop->start_time_pts));
3005
3006       if (tc_meta) {
3007         video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3008
3009 #ifndef GST_DISABLE_GST_DEBUG
3010         {
3011           gchar *tc_str;
3012
3013           tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3014           GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode %s", tc_str);
3015           g_free (tc_str);
3016         }
3017 #endif
3018       }
3019
3020       g_queue_push_tail (&splitmux->pending_input_gops, gop);
3021     } else {
3022       gop->from_gap = FALSE;
3023
3024       if (!GST_CLOCK_STIME_IS_VALID (gop->start_time)
3025           || gop->start_time > running_time) {
3026         gop->start_time = running_time;
3027
3028         GST_LOG_OBJECT (splitmux,
3029             "GOP start time updated now %" GST_STIME_FORMAT " (initial PTS %"
3030             GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3031             GST_STIME_ARGS (gop->start_time_pts));
3032
3033         if (tc_meta) {
3034           video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3035
3036 #ifndef GST_DISABLE_GST_DEBUG
3037           {
3038             gchar *tc_str;
3039
3040             tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3041             GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode updated %s",
3042                 tc_str);
3043             g_free (tc_str);
3044           }
3045 #endif
3046         }
3047       }
3048     }
3049
3050     /* Check whether we need to request next keyframe depending on
3051      * current running time */
3052     if (request_next_keyframe (splitmux, buf, running_time_dts) == FALSE) {
3053       GST_WARNING_OBJECT (splitmux,
3054           "Could not request a keyframe. Files may not split at the exact location they should");
3055     }
3056   }
3057
3058   {
3059     InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3060
3061     if (gop) {
3062       GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
3063           " total GOP bytes %" G_GUINT64_FORMAT ", total next GOP bytes %"
3064           G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts),
3065           gop->total_bytes, gop->total_bytes);
3066     }
3067   }
3068
3069   loop_again = TRUE;
3070   do {
3071     if (ctx->flushing)
3072       break;
3073
3074     switch (splitmux->input_state) {
3075       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
3076         if (ctx->is_releasing) {
3077           /* The pad belonging to this context is being released */
3078           GST_WARNING_OBJECT (pad, "Pad is being released while the muxer is "
3079               "running. Data might not drain correctly");
3080           loop_again = FALSE;
3081         } else if (ctx->is_reference) {
3082           const InputGop *gop, *next_gop;
3083
3084           /* This is the reference context. If it's a keyframe,
3085            * it marks the start of a new GOP and we should wait in
3086            * check_completed_gop before continuing, but either way
3087            * (keyframe or no, we'll pass this buffer through after
3088            * so set loop_again to FALSE */
3089           loop_again = FALSE;
3090
3091           gop = g_queue_peek_head (&splitmux->pending_input_gops);
3092           g_assert (gop != NULL);
3093           next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
3094
3095           if (ctx->in_running_time > splitmux->max_in_running_time)
3096             splitmux->max_in_running_time = ctx->in_running_time;
3097           if (running_time_dts > splitmux->max_in_running_time_dts)
3098             splitmux->max_in_running_time_dts = running_time_dts;
3099
3100           GST_LOG_OBJECT (splitmux,
3101               "Max in running time now %" GST_STIME_FORMAT ", DTS %"
3102               GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_in_running_time),
3103               GST_STIME_ARGS (splitmux->max_in_running_time_dts));
3104
3105           if (!next_gop) {
3106             GST_DEBUG_OBJECT (pad, "Waiting for end of GOP");
3107             /* Allow other input pads to catch up to here too */
3108             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3109             break;
3110           }
3111
3112           if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
3113             GST_INFO_OBJECT (pad,
3114                 "Have keyframe with running time %" GST_STIME_FORMAT,
3115                 GST_STIME_ARGS (ctx->in_running_time));
3116             keyframe = TRUE;
3117           }
3118
3119           if (running_time_dts != GST_CLOCK_STIME_NONE
3120               && running_time_dts < next_gop->start_time_pts) {
3121             GST_DEBUG_OBJECT (splitmux,
3122                 "Waiting until DTS (%" GST_STIME_FORMAT
3123                 ") has passed next GOP start PTS (%" GST_STIME_FORMAT ")",
3124                 GST_STIME_ARGS (running_time_dts),
3125                 GST_STIME_ARGS (next_gop->start_time_pts));
3126             /* Allow other input pads to catch up to here too */
3127             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3128             break;
3129           }
3130
3131           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
3132           /* Wake up other input pads to collect this GOP */
3133           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3134           check_completed_gop (splitmux, ctx);
3135         } else {
3136           /* Pass this buffer if the reference ctx is far enough ahead */
3137           if (ctx->in_running_time < splitmux->max_in_running_time) {
3138             loop_again = FALSE;
3139             break;
3140           }
3141
3142           /* We're still waiting for a keyframe on the reference pad, sleep */
3143           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
3144           GST_SPLITMUX_WAIT_INPUT (splitmux);
3145           GST_LOG_OBJECT (pad,
3146               "Done sleeping for GOP start input state now %d",
3147               splitmux->input_state);
3148         }
3149         break;
3150       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
3151         /* We're collecting a GOP, this is only ever called for non-reference
3152          * contexts as the reference context would be waiting inside
3153          * check_completed_gop() */
3154
3155         g_assert (!ctx->is_reference);
3156
3157         /* If we overran the target timestamp, it might be time to process
3158          * the GOP, otherwise bail out for more data. */
3159         GST_LOG_OBJECT (pad,
3160             "Checking TS %" GST_STIME_FORMAT " against max %"
3161             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
3162             GST_STIME_ARGS (splitmux->max_in_running_time));
3163
3164         if (ctx->in_running_time < splitmux->max_in_running_time) {
3165           loop_again = FALSE;
3166           break;
3167         }
3168
3169         GST_LOG_OBJECT (pad,
3170             "Collected last packet of GOP. Checking other pads");
3171         check_completed_gop (splitmux, ctx);
3172         break;
3173       }
3174       case SPLITMUX_INPUT_STATE_FINISHING_UP:
3175         loop_again = FALSE;
3176         break;
3177       default:
3178         loop_again = FALSE;
3179         break;
3180     }
3181   }
3182   while (loop_again);
3183
3184   if (keyframe && ctx->is_reference)
3185     splitmux->queued_keyframes++;
3186   buf_info->keyframe = keyframe;
3187
3188   /* Update total input byte counter for overflow detect unless we're after
3189    * EOS now */
3190   if (splitmux->input_state != SPLITMUX_INPUT_STATE_FINISHING_UP
3191       && splitmux->input_state != SPLITMUX_INPUT_STATE_STOPPED) {
3192     InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3193
3194     /* We must have a GOP at this point */
3195     g_assert (gop != NULL);
3196
3197     gop->total_bytes += buf_info->buf_size;
3198     if (ctx->is_reference) {
3199       gop->reference_bytes += buf_info->buf_size;
3200     }
3201   }
3202
3203   /* Now add this buffer to the queue just before returning */
3204   g_queue_push_head (&ctx->queued_bufs, buf_info);
3205
3206   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
3207       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
3208
3209   GST_SPLITMUX_UNLOCK (splitmux);
3210   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3211   return GST_PAD_PROBE_PASS;
3212
3213 beach:
3214   GST_SPLITMUX_UNLOCK (splitmux);
3215   if (buf_info)
3216     mq_stream_buf_free (buf_info);
3217   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3218   return GST_PAD_PROBE_PASS;
3219 }
3220
3221 static void
3222 grow_blocked_queues (GstSplitMuxSink * splitmux)
3223 {
3224   GList *cur;
3225
3226   /* Scan other queues for full-ness and grow them */
3227   for (cur = g_list_first (splitmux->contexts);
3228       cur != NULL; cur = g_list_next (cur)) {
3229     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3230     guint cur_limit;
3231     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
3232
3233     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
3234     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
3235
3236     if (cur_len >= cur_limit) {
3237       cur_limit = cur_len + 1;
3238       GST_DEBUG_OBJECT (tmpctx->q,
3239           "Queue overflowed and needs enlarging. Growing to %u buffers",
3240           cur_limit);
3241       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
3242     }
3243   }
3244 }
3245
3246 static void
3247 handle_q_underrun (GstElement * q, gpointer user_data)
3248 {
3249   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3250   GstSplitMuxSink *splitmux = ctx->splitmux;
3251
3252   GST_SPLITMUX_LOCK (splitmux);
3253   GST_DEBUG_OBJECT (q,
3254       "Queue reported underrun with %d keyframes and %d cmds enqueued",
3255       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3256   grow_blocked_queues (splitmux);
3257   GST_SPLITMUX_UNLOCK (splitmux);
3258 }
3259
3260 static void
3261 handle_q_overrun (GstElement * q, gpointer user_data)
3262 {
3263   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3264   GstSplitMuxSink *splitmux = ctx->splitmux;
3265   gboolean allow_grow = FALSE;
3266
3267   GST_SPLITMUX_LOCK (splitmux);
3268   GST_DEBUG_OBJECT (q,
3269       "Queue reported overrun with %d keyframes and %d cmds enqueued",
3270       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3271
3272   if (splitmux->queued_keyframes < 2) {
3273     /* Less than a full GOP queued, grow the queue */
3274     allow_grow = TRUE;
3275   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3276     allow_grow = TRUE;
3277   } else {
3278     /* If another queue is starved, grow */
3279     GList *cur;
3280     for (cur = g_list_first (splitmux->contexts);
3281         cur != NULL; cur = g_list_next (cur)) {
3282       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3283       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3284         allow_grow = TRUE;
3285       }
3286     }
3287   }
3288   GST_SPLITMUX_UNLOCK (splitmux);
3289
3290   if (allow_grow) {
3291     guint cur_limit;
3292
3293     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3294     cur_limit++;
3295
3296     GST_DEBUG_OBJECT (q,
3297         "Queue overflowed and needs enlarging. Growing to %u buffers",
3298         cur_limit);
3299
3300     g_object_set (q, "max-size-buffers", cur_limit, NULL);
3301   }
3302 }
3303
3304 /* Called with SPLITMUX lock held */
3305 static const gchar *
3306 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3307 {
3308   const gchar *ret = NULL;
3309
3310   if (splitmux->muxerpad_map == NULL)
3311     return NULL;
3312
3313   if (sinkpad_name == NULL) {
3314     GST_WARNING_OBJECT (splitmux,
3315         "Can't look up request pad in pad map without providing a pad name");
3316     return NULL;
3317   }
3318
3319   ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3320   if (ret) {
3321     GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3322         ret);
3323     return g_strdup (ret);
3324   }
3325
3326   return NULL;
3327 }
3328
3329 static GstPad *
3330 gst_splitmux_sink_request_new_pad (GstElement * element,
3331     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3332 {
3333   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3334   GstPadTemplate *mux_template = NULL;
3335   GstPad *ret = NULL, *muxpad = NULL;
3336   GstElement *q;
3337   GstPad *q_sink = NULL, *q_src = NULL;
3338   gchar *gname, *qname;
3339   gboolean is_primary_video = FALSE, is_video = FALSE,
3340       muxer_is_requestpad = FALSE;
3341   MqStreamCtx *ctx;
3342   const gchar *muxer_padname = NULL;
3343
3344   GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3345
3346   GST_SPLITMUX_LOCK (splitmux);
3347   if (!create_muxer (splitmux))
3348     goto fail;
3349   g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3350
3351   if (g_str_equal (templ->name_template, "video") ||
3352       g_str_has_prefix (templ->name_template, "video_aux_")) {
3353     is_primary_video = g_str_equal (templ->name_template, "video");
3354     if (is_primary_video && splitmux->have_video)
3355       goto already_have_video;
3356     is_video = TRUE;
3357   }
3358
3359   /* See if there's a pad map and it lists this pad */
3360   muxer_padname = lookup_muxer_pad (splitmux, name);
3361
3362   if (muxer_padname == NULL) {
3363     if (is_video) {
3364       /* FIXME: Look for a pad template with matching caps, rather than by name */
3365       GST_DEBUG_OBJECT (element,
3366           "searching for pad-template with name 'video_%%u'");
3367       mux_template =
3368           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3369           (splitmux->muxer), "video_%u");
3370
3371       /* Fallback to find sink pad templates named 'video' (flvmux) */
3372       if (!mux_template) {
3373         GST_DEBUG_OBJECT (element,
3374             "searching for pad-template with name 'video'");
3375         mux_template =
3376             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3377             (splitmux->muxer), "video");
3378       }
3379       name = NULL;
3380     } else {
3381       GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3382           templ->name_template);
3383       mux_template =
3384           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3385           (splitmux->muxer), templ->name_template);
3386
3387       /* Fallback to find sink pad templates named 'audio' (flvmux) */
3388       if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3389         GST_DEBUG_OBJECT (element,
3390             "searching for pad-template with name 'audio'");
3391         mux_template =
3392             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3393             (splitmux->muxer), "audio");
3394         name = NULL;
3395       }
3396     }
3397
3398     if (mux_template == NULL) {
3399       GST_DEBUG_OBJECT (element,
3400           "searching for pad-template with name 'sink_%%d'");
3401       mux_template =
3402           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3403           (splitmux->muxer), "sink_%d");
3404       name = NULL;
3405     }
3406     if (mux_template == NULL) {
3407       GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3408       mux_template =
3409           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3410           (splitmux->muxer), "sink");
3411       name = NULL;
3412     }
3413
3414     if (mux_template == NULL) {
3415       GST_ERROR_OBJECT (element,
3416           "unable to find a suitable sink pad-template on the muxer");
3417       goto fail;
3418     }
3419     GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3420         mux_template->name_template);
3421
3422     if (mux_template->presence == GST_PAD_REQUEST) {
3423       GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3424
3425       muxpad =
3426           gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3427       muxer_is_requestpad = TRUE;
3428     } else if (mux_template->presence == GST_PAD_ALWAYS) {
3429       GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3430
3431       muxpad =
3432           gst_element_get_static_pad (splitmux->muxer,
3433           mux_template->name_template);
3434     } else {
3435       GST_ERROR_OBJECT (element,
3436           "unexpected pad presence %d", mux_template->presence);
3437       goto fail;
3438     }
3439   } else {
3440     /* Have a muxer pad name */
3441     if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3442       if ((muxpad =
3443               gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3444         muxer_is_requestpad = TRUE;
3445     }
3446     g_free ((gchar *) muxer_padname);
3447     muxer_padname = NULL;
3448   }
3449
3450   /* One way or another, we must have a muxer pad by now */
3451   if (muxpad == NULL)
3452     goto fail;
3453
3454   if (is_primary_video)
3455     gname = g_strdup ("video");
3456   else if (name == NULL)
3457     gname = gst_pad_get_name (muxpad);
3458   else
3459     gname = g_strdup (name);
3460
3461   qname = g_strdup_printf ("queue_%s", gname);
3462   if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3463     g_free (qname);
3464     goto fail;
3465   }
3466   g_free (qname);
3467
3468   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3469
3470   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3471       "max-size-buffers", 5, NULL);
3472
3473   q_sink = gst_element_get_static_pad (q, "sink");
3474   q_src = gst_element_get_static_pad (q, "src");
3475
3476   if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3477     if (muxer_is_requestpad)
3478       gst_element_release_request_pad (splitmux->muxer, muxpad);
3479     gst_object_unref (GST_OBJECT (muxpad));
3480     goto fail;
3481   }
3482
3483   gst_object_unref (GST_OBJECT (muxpad));
3484
3485   ctx = mq_stream_ctx_new (splitmux);
3486   /* Context holds a ref: */
3487   ctx->q = gst_object_ref (q);
3488   ctx->srcpad = q_src;
3489   ctx->sinkpad = q_sink;
3490   ctx->q_overrun_id =
3491       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3492   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3493
3494   ctx->src_pad_block_id =
3495       gst_pad_add_probe (q_src,
3496       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3497       (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3498   if (is_primary_video && splitmux->reference_ctx != NULL) {
3499     splitmux->reference_ctx->is_reference = FALSE;
3500     splitmux->reference_ctx = NULL;
3501   }
3502   if (splitmux->reference_ctx == NULL) {
3503     splitmux->reference_ctx = ctx;
3504     ctx->is_reference = TRUE;
3505   }
3506
3507   ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3508   g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3509
3510   ctx->sink_pad_block_id =
3511       gst_pad_add_probe (q_sink,
3512       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3513       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3514       (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3515
3516   GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3517       " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3518
3519   splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3520
3521   g_free (gname);
3522
3523   if (is_primary_video)
3524     splitmux->have_video = TRUE;
3525
3526   gst_pad_set_active (ret, TRUE);
3527   gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3528
3529   GST_SPLITMUX_UNLOCK (splitmux);
3530
3531   return ret;
3532 fail:
3533   GST_SPLITMUX_UNLOCK (splitmux);
3534
3535   if (q_sink)
3536     gst_object_unref (q_sink);
3537   if (q_src)
3538     gst_object_unref (q_src);
3539   return NULL;
3540 already_have_video:
3541   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3542   GST_SPLITMUX_UNLOCK (splitmux);
3543   return NULL;
3544 }
3545
3546 static void
3547 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3548 {
3549   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3550   GstPad *muxpad = NULL;
3551   MqStreamCtx *ctx =
3552       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3553
3554   GST_SPLITMUX_LOCK (splitmux);
3555
3556   if (splitmux->muxer == NULL)
3557     goto fail;                  /* Elements don't exist yet - nothing to release */
3558
3559   GST_INFO_OBJECT (pad, "releasing request pad");
3560
3561   muxpad = gst_pad_get_peer (ctx->srcpad);
3562
3563   /* Remove the context from our consideration */
3564   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3565
3566   GST_SPLITMUX_UNLOCK (splitmux);
3567
3568   if (ctx->sink_pad_block_id) {
3569     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3570     gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3571   }
3572
3573   if (ctx->src_pad_block_id)
3574     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3575
3576   GST_SPLITMUX_LOCK (splitmux);
3577
3578   ctx->is_releasing = TRUE;
3579   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3580
3581   /* Can release the context now */
3582   mq_stream_ctx_free (ctx);
3583   if (ctx == splitmux->reference_ctx)
3584     splitmux->reference_ctx = NULL;
3585
3586   /* Release and free the muxer input */
3587   if (muxpad) {
3588     gst_element_release_request_pad (splitmux->muxer, muxpad);
3589     gst_object_unref (muxpad);
3590   }
3591
3592   if (GST_PAD_PAD_TEMPLATE (pad) &&
3593       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3594               (pad)), "video"))
3595     splitmux->have_video = FALSE;
3596
3597   gst_element_remove_pad (element, pad);
3598
3599   /* Reset the internal elements only after all request pads are released */
3600   if (splitmux->contexts == NULL)
3601     gst_splitmux_reset_elements (splitmux);
3602
3603   /* Wake up other input streams to check if the completion conditions have
3604    * changed */
3605   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3606
3607 fail:
3608   GST_SPLITMUX_UNLOCK (splitmux);
3609 }
3610
3611 static GstElement *
3612 create_element (GstSplitMuxSink * splitmux,
3613     const gchar * factory, const gchar * name, gboolean locked)
3614 {
3615   GstElement *ret = gst_element_factory_make (factory, name);
3616   if (ret == NULL) {
3617     g_warning ("Failed to create %s - splitmuxsink will not work", name);
3618     return NULL;
3619   }
3620
3621   if (locked) {
3622     /* Ensure the sink starts in locked state and NULL - it will be changed
3623      * by the filename setting code */
3624     gst_element_set_locked_state (ret, TRUE);
3625     gst_element_set_state (ret, GST_STATE_NULL);
3626   }
3627
3628   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3629     g_warning ("Could not add %s element - splitmuxsink will not work", name);
3630     gst_object_unref (ret);
3631     return NULL;
3632   }
3633
3634   return ret;
3635 }
3636
3637 static gboolean
3638 create_muxer (GstSplitMuxSink * splitmux)
3639 {
3640   /* Create internal elements */
3641   if (splitmux->muxer == NULL) {
3642     GstElement *provided_muxer = NULL;
3643
3644     GST_OBJECT_LOCK (splitmux);
3645     if (splitmux->provided_muxer != NULL)
3646       provided_muxer = gst_object_ref (splitmux->provided_muxer);
3647     GST_OBJECT_UNLOCK (splitmux);
3648
3649     if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3650         (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3651       if ((splitmux->muxer =
3652               create_element (splitmux,
3653                   splitmux->muxer_factory ? splitmux->
3654                   muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3655         goto fail;
3656     } else if (splitmux->async_finalize) {
3657       if ((splitmux->muxer =
3658               create_element (splitmux, splitmux->muxer_factory, "muxer",
3659                   FALSE)) == NULL)
3660         goto fail;
3661       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3662         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3663             splitmux->muxer_preset);
3664       if (splitmux->muxer_properties)
3665         gst_structure_foreach (splitmux->muxer_properties,
3666             _set_property_from_structure, splitmux->muxer);
3667     } else {
3668       /* Ensure it's not in locked state (we might be reusing an old element) */
3669       gst_element_set_locked_state (provided_muxer, FALSE);
3670       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3671         g_warning ("Could not add muxer element - splitmuxsink will not work");
3672         gst_object_unref (provided_muxer);
3673         goto fail;
3674       }
3675
3676       splitmux->muxer = provided_muxer;
3677       gst_object_unref (provided_muxer);
3678     }
3679
3680     if (splitmux->use_robust_muxing) {
3681       update_muxer_properties (splitmux);
3682     }
3683   }
3684
3685   return TRUE;
3686 fail:
3687   return FALSE;
3688 }
3689
3690 static GstElement *
3691 find_sink (GstElement * e)
3692 {
3693   GstElement *res = NULL;
3694   GstIterator *iter;
3695   gboolean done = FALSE;
3696   GValue data = { 0, };
3697
3698   if (!GST_IS_BIN (e))
3699     return e;
3700
3701   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3702     return e;
3703
3704   iter = gst_bin_iterate_sinks (GST_BIN (e));
3705   while (!done) {
3706     switch (gst_iterator_next (iter, &data)) {
3707       case GST_ITERATOR_OK:
3708       {
3709         GstElement *child = g_value_get_object (&data);
3710         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3711                 "location") != NULL) {
3712           res = child;
3713           done = TRUE;
3714         }
3715         g_value_reset (&data);
3716         break;
3717       }
3718       case GST_ITERATOR_RESYNC:
3719         gst_iterator_resync (iter);
3720         break;
3721       case GST_ITERATOR_DONE:
3722         done = TRUE;
3723         break;
3724       case GST_ITERATOR_ERROR:
3725         g_assert_not_reached ();
3726         break;
3727     }
3728   }
3729   g_value_unset (&data);
3730   gst_iterator_free (iter);
3731
3732   return res;
3733 }
3734
3735 static gboolean
3736 create_sink (GstSplitMuxSink * splitmux)
3737 {
3738   GstElement *provided_sink = NULL;
3739
3740   if (splitmux->active_sink == NULL) {
3741
3742     GST_OBJECT_LOCK (splitmux);
3743     if (splitmux->provided_sink != NULL)
3744       provided_sink = gst_object_ref (splitmux->provided_sink);
3745     GST_OBJECT_UNLOCK (splitmux);
3746
3747     if ((!splitmux->async_finalize && provided_sink == NULL) ||
3748         (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3749       if ((splitmux->sink =
3750               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3751         goto fail;
3752       splitmux->active_sink = splitmux->sink;
3753     } else if (splitmux->async_finalize) {
3754       if ((splitmux->sink =
3755               create_element (splitmux, splitmux->sink_factory, "sink",
3756                   TRUE)) == NULL)
3757         goto fail;
3758       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3759         gst_preset_load_preset (GST_PRESET (splitmux->sink),
3760             splitmux->sink_preset);
3761       if (splitmux->sink_properties)
3762         gst_structure_foreach (splitmux->sink_properties,
3763             _set_property_from_structure, splitmux->sink);
3764       splitmux->active_sink = splitmux->sink;
3765     } else {
3766       /* Ensure the sink starts in locked state and NULL - it will be changed
3767        * by the filename setting code */
3768       gst_element_set_locked_state (provided_sink, TRUE);
3769       gst_element_set_state (provided_sink, GST_STATE_NULL);
3770       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3771         g_warning ("Could not add sink elements - splitmuxsink will not work");
3772         gst_object_unref (provided_sink);
3773         goto fail;
3774       }
3775
3776       splitmux->active_sink = provided_sink;
3777
3778       /* The bin holds a ref now, we can drop our tmp ref */
3779       gst_object_unref (provided_sink);
3780
3781       /* Find the sink element */
3782       splitmux->sink = find_sink (splitmux->active_sink);
3783       if (splitmux->sink == NULL) {
3784         g_warning
3785             ("Could not locate sink element in provided sink - splitmuxsink will not work");
3786         goto fail;
3787       }
3788     }
3789
3790 #if 1
3791     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3792             "async") != NULL) {
3793       /* async child elements are causing state change races and weird
3794        * failures, so let's try and turn that off */
3795       g_object_set (splitmux->sink, "async", FALSE, NULL);
3796     }
3797 #endif
3798
3799     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3800       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3801       goto fail;
3802     }
3803   }
3804
3805   return TRUE;
3806 fail:
3807   return FALSE;
3808 }
3809
3810 #ifdef __GNUC__
3811 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3812 #endif
3813 static void
3814 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3815 {
3816   gchar *fname = NULL;
3817   GstSample *sample;
3818   GstCaps *caps;
3819
3820   gst_splitmux_sink_ensure_max_files (splitmux);
3821
3822   if (ctx->cur_out_buffer == NULL) {
3823     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3824   }
3825
3826   caps = gst_pad_get_current_caps (ctx->srcpad);
3827   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3828   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3829       splitmux->fragment_id, sample, &fname);
3830   gst_sample_unref (sample);
3831   if (caps)
3832     gst_caps_unref (caps);
3833
3834   if (fname == NULL) {
3835     /* Fallback to the old signal if the new one returned nothing */
3836     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3837         splitmux->fragment_id, &fname);
3838   }
3839
3840   if (!fname)
3841     fname = splitmux->location ?
3842         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3843
3844   if (fname) {
3845     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3846     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3847             "location") != NULL)
3848       g_object_set (splitmux->sink, "location", fname, NULL);
3849     g_free (fname);
3850   }
3851
3852   splitmux->fragment_id++;
3853 }
3854
3855 /* called with GST_SPLITMUX_LOCK */
3856 static void
3857 do_async_start (GstSplitMuxSink * splitmux)
3858 {
3859   GstMessage *message;
3860
3861   if (!splitmux->need_async_start) {
3862     GST_INFO_OBJECT (splitmux, "no async_start needed");
3863     return;
3864   }
3865
3866   splitmux->async_pending = TRUE;
3867
3868   GST_INFO_OBJECT (splitmux, "Sending async_start message");
3869   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3870
3871   GST_SPLITMUX_UNLOCK (splitmux);
3872   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3873       (splitmux), message);
3874   GST_SPLITMUX_LOCK (splitmux);
3875 }
3876
3877 /* called with GST_SPLITMUX_LOCK */
3878 static void
3879 do_async_done (GstSplitMuxSink * splitmux)
3880 {
3881   GstMessage *message;
3882
3883   if (splitmux->async_pending) {
3884     GST_INFO_OBJECT (splitmux, "Sending async_done message");
3885     splitmux->async_pending = FALSE;
3886     GST_SPLITMUX_UNLOCK (splitmux);
3887
3888     message =
3889         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3890         GST_CLOCK_TIME_NONE);
3891     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3892         (splitmux), message);
3893     GST_SPLITMUX_LOCK (splitmux);
3894   }
3895
3896   splitmux->need_async_start = FALSE;
3897 }
3898
3899 static void
3900 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3901 {
3902   splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3903   splitmux->max_in_running_time_dts = GST_CLOCK_STIME_NONE;
3904
3905   splitmux->fragment_start_time = GST_CLOCK_STIME_NONE;
3906   splitmux->fragment_start_time_pts = GST_CLOCK_STIME_NONE;
3907   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
3908
3909   g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
3910   g_queue_clear (&splitmux->pending_input_gops);
3911
3912   splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
3913   splitmux->fragment_total_bytes = 0;
3914   splitmux->fragment_reference_bytes = 0;
3915   splitmux->muxed_out_bytes = 0;
3916   splitmux->ready_for_output = FALSE;
3917
3918   g_atomic_int_set (&(splitmux->split_requested), FALSE);
3919   g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3920
3921   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
3922   gst_queue_array_clear (splitmux->times_to_split);
3923
3924   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
3925   splitmux->queued_keyframes = 0;
3926
3927   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
3928   g_queue_clear (&splitmux->out_cmd_q);
3929 }
3930
3931 static GstStateChangeReturn
3932 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3933 {
3934   GstStateChangeReturn ret;
3935   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3936
3937   switch (transition) {
3938     case GST_STATE_CHANGE_NULL_TO_READY:{
3939       GST_SPLITMUX_LOCK (splitmux);
3940       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3941         ret = GST_STATE_CHANGE_FAILURE;
3942         GST_SPLITMUX_UNLOCK (splitmux);
3943         goto beach;
3944       }
3945       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3946       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3947       GST_SPLITMUX_UNLOCK (splitmux);
3948       splitmux->fragment_id = splitmux->start_index;
3949       break;
3950     }
3951     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3952       GST_SPLITMUX_LOCK (splitmux);
3953       /* Make sure contexts and tracking times are cleared, in case we're being reused */
3954       gst_splitmux_sink_reset (splitmux);
3955       /* Start by collecting one input on each pad */
3956       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3957       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3958
3959       GST_SPLITMUX_UNLOCK (splitmux);
3960
3961       GST_SPLITMUX_STATE_LOCK (splitmux);
3962       splitmux->shutdown = FALSE;
3963       GST_SPLITMUX_STATE_UNLOCK (splitmux);
3964       break;
3965     }
3966     case GST_STATE_CHANGE_PAUSED_TO_READY:
3967     case GST_STATE_CHANGE_READY_TO_READY:
3968       g_atomic_int_set (&(splitmux->split_requested), FALSE);
3969       g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3970       /* Fall through */
3971     case GST_STATE_CHANGE_READY_TO_NULL:
3972       GST_SPLITMUX_STATE_LOCK (splitmux);
3973       splitmux->shutdown = TRUE;
3974       GST_SPLITMUX_STATE_UNLOCK (splitmux);
3975
3976       GST_SPLITMUX_LOCK (splitmux);
3977       gst_splitmux_sink_reset (splitmux);
3978       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3979       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3980       /* Wake up any blocked threads */
3981       GST_LOG_OBJECT (splitmux,
3982           "State change -> NULL or READY. Waking threads");
3983       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3984       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
3985       GST_SPLITMUX_UNLOCK (splitmux);
3986       break;
3987     default:
3988       break;
3989   }
3990
3991   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3992   if (ret == GST_STATE_CHANGE_FAILURE)
3993     goto beach;
3994
3995   switch (transition) {
3996     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3997       splitmux->need_async_start = TRUE;
3998       break;
3999     case GST_STATE_CHANGE_READY_TO_PAUSED:{
4000       /* Change state async, because our child sink might not
4001        * be ready to do that for us yet if it's state is still locked */
4002
4003       splitmux->need_async_start = TRUE;
4004       /* we want to go async to PAUSED until we managed to configure and add the
4005        * sink */
4006       GST_SPLITMUX_LOCK (splitmux);
4007       do_async_start (splitmux);
4008       GST_SPLITMUX_UNLOCK (splitmux);
4009       ret = GST_STATE_CHANGE_ASYNC;
4010       break;
4011     }
4012     case GST_STATE_CHANGE_READY_TO_NULL:
4013       GST_SPLITMUX_LOCK (splitmux);
4014       splitmux->fragment_id = 0;
4015       /* Reset internal elements only if no pad contexts are using them */
4016       if (splitmux->contexts == NULL)
4017         gst_splitmux_reset_elements (splitmux);
4018       do_async_done (splitmux);
4019       GST_SPLITMUX_UNLOCK (splitmux);
4020       break;
4021     default:
4022       break;
4023   }
4024
4025   return ret;
4026
4027 beach:
4028   if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
4029     /* Cleanup elements on failed transition out of NULL */
4030     gst_splitmux_reset_elements (splitmux);
4031     GST_SPLITMUX_LOCK (splitmux);
4032     do_async_done (splitmux);
4033     GST_SPLITMUX_UNLOCK (splitmux);
4034   }
4035   if (transition == GST_STATE_CHANGE_READY_TO_READY) {
4036     /* READY to READY transition only happens when we're already
4037      * in READY state, but a child element is in NULL, which
4038      * happens when there's an error changing the state of the sink.
4039      * We need to make sure not to fail the state transition, or
4040      * the core won't transition us back to NULL successfully */
4041     ret = GST_STATE_CHANGE_SUCCESS;
4042   }
4043   return ret;
4044 }
4045
4046 static void
4047 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
4048 {
4049   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
4050     splitmux->fragment_id = 0;
4051   }
4052 }
4053
4054 static void
4055 split_now (GstSplitMuxSink * splitmux)
4056 {
4057   g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
4058 }
4059
4060 static void
4061 split_after (GstSplitMuxSink * splitmux)
4062 {
4063   g_atomic_int_set (&(splitmux->split_requested), TRUE);
4064 }
4065
4066 static void
4067 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
4068 {
4069   gboolean send_keyframe_requests;
4070
4071   GST_SPLITMUX_LOCK (splitmux);
4072   gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
4073   send_keyframe_requests = splitmux->send_keyframe_requests;
4074   GST_SPLITMUX_UNLOCK (splitmux);
4075
4076   if (send_keyframe_requests) {
4077     GstEvent *ev =
4078         gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
4079     GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
4080         GST_TIME_ARGS (split_time));
4081     if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
4082       GST_WARNING_OBJECT (splitmux,
4083           "Could not request keyframe at %" GST_TIME_FORMAT,
4084           GST_TIME_ARGS (split_time));
4085     }
4086   }
4087 }