splitmuxsink: Only update max in/out running time if it's actually bigger
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @title: splitmuxsink
23  * @short_description: Muxer wrapper for splitting output stream by size or time
24  *
25  * This element wraps a muxer and a sink, and starts a new file when the mux
26  * contents are about to cross a threshold of maximum size of maximum time,
27  * splitting at video keyframe boundaries. Exactly one input video stream
28  * can be muxed, with as many accompanying audio and subtitle streams as
29  * desired.
30  *
31  * By default, it uses mp4mux and filesink, but they can be changed via
32  * the 'muxer' and 'sink' properties.
33  *
34  * The minimum file size is 1 GOP, however - so limits may be overrun if the
35  * distance between any 2 keyframes is larger than the limits.
36  *
37  * If a video stream is available, the splitting process is driven by the video
38  * stream contents, and the video stream must contain closed GOPs for the output
39  * file parts to be played individually correctly. In the absence of a video
40  * stream, the first available stream is used as reference for synchronization.
41  *
42  * In the async-finalize mode, when the threshold is crossed, the old muxer
43  * and sink is disconnected from the pipeline and left to finish the file
44  * asynchronously, and a new muxer and sink is created to continue with the
45  * next fragment. For that reason, instead of muxer and sink objects, the
46  * muxer-factory and sink-factory properties are used to construct the new
47  * objects, together with muxer-properties and sink-properties.
48  *
49  * ## Example pipelines
50  * |[
51  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52  * ]|
53  * Records a video stream captured from a v4l2 device and muxes it into
54  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55  * and 1MB maximum size.
56  *
57  * |[
58  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59  * ]|
60  * Records a video stream captured from a v4l2 device and muxer it into
61  * streamable Matroska files, splitting as needed to limit size/duration to 10
62  * seconds. Each file will finalize asynchronously.
63  *
64  * |[
65  * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
66  * ]|
67  * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
68  * it will deliver to.
69  */
70
71 #ifdef HAVE_CONFIG_H
72 #include "config.h"
73 #endif
74
75 #include <string.h>
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
79
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
82
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
85
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
90
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
93
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97     GstClockTime split_time);
98
99 enum
100 {
101   PROP_0,
102   PROP_LOCATION,
103   PROP_START_INDEX,
104   PROP_MAX_SIZE_TIME,
105   PROP_MAX_SIZE_BYTES,
106   PROP_MAX_SIZE_TIMECODE,
107   PROP_SEND_KEYFRAME_REQUESTS,
108   PROP_MAX_FILES,
109   PROP_MUXER_OVERHEAD,
110   PROP_USE_ROBUST_MUXING,
111   PROP_ALIGNMENT_THRESHOLD,
112   PROP_MUXER,
113   PROP_SINK,
114   PROP_RESET_MUXER,
115   PROP_ASYNC_FINALIZE,
116   PROP_MUXER_FACTORY,
117   PROP_MUXER_PRESET,
118   PROP_MUXER_PROPERTIES,
119   PROP_SINK_FACTORY,
120   PROP_SINK_PRESET,
121   PROP_SINK_PROPERTIES,
122   PROP_MUXERPAD_MAP
123 };
124
125 #define DEFAULT_MAX_SIZE_TIME       0
126 #define DEFAULT_MAX_SIZE_BYTES      0
127 #define DEFAULT_MAX_FILES           0
128 #define DEFAULT_MUXER_OVERHEAD      0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
137
138 typedef struct _AsyncEosHelper
139 {
140   MqStreamCtx *ctx;
141   GstPad *pad;
142 } AsyncEosHelper;
143
144 enum
145 {
146   SIGNAL_FORMAT_LOCATION,
147   SIGNAL_FORMAT_LOCATION_FULL,
148   SIGNAL_SPLIT_NOW,
149   SIGNAL_SPLIT_AFTER,
150   SIGNAL_SPLIT_AT_RUNNING_TIME,
151   SIGNAL_MUXER_ADDED,
152   SIGNAL_SINK_ADDED,
153   SIGNAL_LAST
154 };
155
156 static guint signals[SIGNAL_LAST];
157
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
160     GST_PAD_SINK,
161     GST_PAD_REQUEST,
162     GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
165     GST_PAD_SINK,
166     GST_PAD_REQUEST,
167     GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
170     GST_PAD_SINK,
171     GST_PAD_REQUEST,
172     GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
175     GST_PAD_SINK,
176     GST_PAD_REQUEST,
177     GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
180     GST_PAD_SINK,
181     GST_PAD_REQUEST,
182     GST_STATIC_CAPS_ANY);
183
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188  * to forward an incoming EOS message, but we cannot rely on the state of the
189  * splitmux anymore, so we set this qdata on the sink instead.
190  * The muxer and sink must be destroyed after both of these things have
191  * finished:
192  * 1) The EOS message has been sent when the fragment is ending
193  * 2) The muxer has been unlinked and relinked
194  * Therefore, EOS_FROM_US can have these two values:
195  * 0: EOS was not requested from us. Forward the message. The muxer and the
196  * sink will be destroyed together with the rest of the bin.
197  * 1: EOS was requested from us, but the other of the two tasks hasn't
198  * finished. Set EOS_FROM_US to 2 and do your stuff.
199  * 2: EOS was requested from us and the other of the two tasks has finished.
200  * Now we can destroy the muxer and the sink.
201  */
202
203 static void
204 _do_init (void)
205 {
206   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207   EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208   RUNNING_TIME = g_quark_from_static_string ("running-time");
209   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210       "Split File Muxing Sink");
211 }
212
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
215     _do_init ());
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217     GST_TYPE_SPLITMUX_SINK);
218
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222     const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224     GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
227
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
231
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233     element, GstStateChange transition);
234
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
238     MqStreamCtx * ctx);
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
241
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244     const gchar * factory, const gchar * name, gboolean locked);
245
246 static void do_async_done (GstSplitMuxSink * splitmux);
247 static void gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux);
248
249 static MqStreamBuf *
250 mq_stream_buf_new (void)
251 {
252   return g_slice_new0 (MqStreamBuf);
253 }
254
255 static void
256 mq_stream_buf_free (MqStreamBuf * data)
257 {
258   g_slice_free (MqStreamBuf, data);
259 }
260
261 static SplitMuxOutputCommand *
262 out_cmd_buf_new (void)
263 {
264   return g_slice_new0 (SplitMuxOutputCommand);
265 }
266
267 static void
268 out_cmd_buf_free (SplitMuxOutputCommand * data)
269 {
270   g_slice_free (SplitMuxOutputCommand, data);
271 }
272
273 static void
274 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
275 {
276   GObjectClass *gobject_class = (GObjectClass *) klass;
277   GstElementClass *gstelement_class = (GstElementClass *) klass;
278   GstBinClass *gstbin_class = (GstBinClass *) klass;
279
280   gobject_class->set_property = gst_splitmux_sink_set_property;
281   gobject_class->get_property = gst_splitmux_sink_get_property;
282   gobject_class->dispose = gst_splitmux_sink_dispose;
283   gobject_class->finalize = gst_splitmux_sink_finalize;
284
285   gst_element_class_set_static_metadata (gstelement_class,
286       "Split Muxing Bin", "Generic/Bin/Muxer",
287       "Convenience bin that muxes incoming streams into multiple time/size limited files",
288       "Jan Schmidt <jan@centricular.com>");
289
290   gst_element_class_add_static_pad_template (gstelement_class,
291       &video_sink_template);
292   gst_element_class_add_static_pad_template (gstelement_class,
293       &video_aux_sink_template);
294   gst_element_class_add_static_pad_template (gstelement_class,
295       &audio_sink_template);
296   gst_element_class_add_static_pad_template (gstelement_class,
297       &subtitle_sink_template);
298   gst_element_class_add_static_pad_template (gstelement_class,
299       &caption_sink_template);
300
301   gstelement_class->change_state =
302       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
303   gstelement_class->request_new_pad =
304       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
305   gstelement_class->release_pad =
306       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
307
308   gstbin_class->handle_message = bus_handler;
309
310   g_object_class_install_property (gobject_class, PROP_LOCATION,
311       g_param_spec_string ("location", "File Output Pattern",
312           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
313           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
314   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
315       g_param_spec_double ("mux-overhead", "Muxing Overhead",
316           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
317           DEFAULT_MUXER_OVERHEAD,
318           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
319
320   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
321       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
322           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
323           DEFAULT_MAX_SIZE_TIME,
324           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
325           G_PARAM_STATIC_STRINGS));
326   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
327       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
328           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
329           DEFAULT_MAX_SIZE_BYTES,
330           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
331           G_PARAM_STATIC_STRINGS));
332   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
333       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
334           "Maximum difference in timecode between first and last frame. "
335           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
336           "Will only be effective if a timecode track is present.", NULL,
337           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
338           G_PARAM_STATIC_STRINGS));
339   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
340       g_param_spec_boolean ("send-keyframe-requests",
341           "Request keyframes at max-size-time",
342           "Request a keyframe every max-size-time ns to try splitting at that point. "
343           "Needs max-size-bytes to be 0 in order to be effective.",
344           DEFAULT_SEND_KEYFRAME_REQUESTS,
345           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
346           G_PARAM_STATIC_STRINGS));
347   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
348       g_param_spec_uint ("max-files", "Max files",
349           "Maximum number of files to keep on disk. Once the maximum is reached,"
350           "old files start to be deleted to make room for new ones.", 0,
351           G_MAXUINT, DEFAULT_MAX_FILES,
352           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
354       g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
355           "Allow non-reference streams to be that many ns before the reference"
356           " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
357           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
358           G_PARAM_STATIC_STRINGS));
359
360   g_object_class_install_property (gobject_class, PROP_MUXER,
361       g_param_spec_object ("muxer", "Muxer",
362           "The muxer element to use (NULL = default mp4mux). "
363           "Valid only for async-finalize = FALSE",
364           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
365   g_object_class_install_property (gobject_class, PROP_SINK,
366       g_param_spec_object ("sink", "Sink",
367           "The sink element (or element chain) to use (NULL = default filesink). "
368           "Valid only for async-finalize = FALSE",
369           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
370
371   g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
372       g_param_spec_boolean ("use-robust-muxing",
373           "Support robust-muxing mode of some muxers",
374           "Check if muxers support robust muxing via the reserved-max-duration and "
375           "reserved-duration-remaining properties and use them if so. "
376           "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
377           " create new fragments if the reserved header space is about to overflow. "
378           "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
379           "manually by the app to a non-zero value for robust muxing to have an effect.",
380           DEFAULT_USE_ROBUST_MUXING,
381           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
382
383   g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
384       g_param_spec_boolean ("reset-muxer",
385           "Reset Muxer",
386           "Reset the muxer after each segment. Disabling this will not work for most muxers.",
387           DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
388
389   g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
390       g_param_spec_boolean ("async-finalize",
391           "Finalize fragments asynchronously",
392           "Finalize each fragment asynchronously and start a new one",
393           DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
394   g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
395       g_param_spec_string ("muxer-factory", "Muxer factory",
396           "The muxer element factory to use (default = mp4mux). "
397           "Valid only for async-finalize = TRUE",
398           "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399   /**
400    * GstSplitMuxSink:muxer-preset
401    *
402    * An optional #GstPreset name to use for the muxer. This only has an effect
403    * in `async-finalize=TRUE` mode.
404    *
405    * Since: 1.18
406    */
407   g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
408       g_param_spec_string ("muxer-preset", "Muxer preset",
409           "The muxer preset to use. "
410           "Valid only for async-finalize = TRUE",
411           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
412   g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
413       g_param_spec_boxed ("muxer-properties", "Muxer properties",
414           "The muxer element properties to use. "
415           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
416           "Valid only for async-finalize = TRUE",
417           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
418   g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
419       g_param_spec_string ("sink-factory", "Sink factory",
420           "The sink element factory to use (default = filesink). "
421           "Valid only for async-finalize = TRUE",
422           "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
423   /**
424    * GstSplitMuxSink:sink-preset
425    *
426    * An optional #GstPreset name to use for the sink. This only has an effect
427    * in `async-finalize=TRUE` mode.
428    *
429    * Since: 1.18
430    */
431   g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
432       g_param_spec_string ("sink-preset", "Sink preset",
433           "The sink preset to use. "
434           "Valid only for async-finalize = TRUE",
435           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
436   g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
437       g_param_spec_boxed ("sink-properties", "Sink properties",
438           "The sink element properties to use. "
439           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
440           "Valid only for async-finalize = TRUE",
441           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
442   g_object_class_install_property (gobject_class, PROP_START_INDEX,
443       g_param_spec_int ("start-index", "Start Index",
444           "Start value of fragment index.",
445           0, G_MAXINT, DEFAULT_START_INDEX,
446           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
447
448   /**
449    * GstSplitMuxSink::muxer-pad-map
450    *
451    * An optional GstStructure that provides a map from splitmuxsink sinkpad
452    * names to muxer pad names they should feed. Splitmuxsink has some default
453    * mapping behaviour to link video to video pads and audio to audio pads
454    * that usually works fine. This property is useful if you need to ensure
455    * a particular mapping to muxed streams.
456    *
457    * The GstStructure contains string fields like so:
458    *   splitmuxsink muxer-pad-map=x-pad-map,video=video_1
459    *
460    * Since: 1.18
461    */
462   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
463       g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
464           "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
465           GST_TYPE_STRUCTURE,
466           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
467
468   /**
469    * GstSplitMuxSink::format-location:
470    * @splitmux: the #GstSplitMuxSink
471    * @fragment_id: the sequence number of the file to be created
472    *
473    * Returns: the location to be used for the next output file. This must be
474    *    a newly-allocated string which will be freed with g_free() by the
475    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
476    *    g_strdup_printf() or similar functions to allocate it.
477    */
478   signals[SIGNAL_FORMAT_LOCATION] =
479       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
480       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
481
482   /**
483    * GstSplitMuxSink::format-location-full:
484    * @splitmux: the #GstSplitMuxSink
485    * @fragment_id: the sequence number of the file to be created
486    * @first_sample: A #GstSample containing the first buffer
487    *   from the reference stream in the new file
488    *
489    * Returns: the location to be used for the next output file. This must be
490    *    a newly-allocated string which will be freed with g_free() by the
491    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
492    *    g_strdup_printf() or similar functions to allocate it.
493    *
494    * Since: 1.12
495    */
496   signals[SIGNAL_FORMAT_LOCATION_FULL] =
497       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
498       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
499       GST_TYPE_SAMPLE);
500
501   /**
502    * GstSplitMuxSink::split-now:
503    * @splitmux: the #GstSplitMuxSink
504    *
505    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
506    * The current GOP will be output to the new file.
507    *
508    * Since: 1.14
509    */
510   signals[SIGNAL_SPLIT_NOW] =
511       g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
512       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
513       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
514       G_TYPE_NONE, 0);
515
516   /**
517    * GstSplitMuxSink::split-after:
518    * @splitmux: the #GstSplitMuxSink
519    *
520    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
521    * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
522    *
523    * Since: 1.16
524    */
525   signals[SIGNAL_SPLIT_AFTER] =
526       g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
527       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
528       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
529       G_TYPE_NONE, 0);
530
531   /**
532    * GstSplitMuxSink::split-at-running-time:
533    * @splitmux: the #GstSplitMuxSink
534    *
535    * When called by the user, this action signal splits the video file (and
536    * begins a new one) as soon as the given running time is reached. If this
537    * action signal is called multiple times, running times are queued up and
538    * processed in the order they were given.
539    *
540    * Note that this is prone to race conditions, where said running time is
541    * reached and surpassed before we had a chance to split. The file will
542    * still split immediately, but in order to make sure that the split doesn't
543    * happen too late, it is recommended to call this action signal from
544    * something that will prevent further buffers from flowing into
545    * splitmuxsink before the split is completed, such as a pad probe before
546    * splitmuxsink.
547    *
548    *
549    * Since: 1.16
550    */
551   signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
552       g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
553       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
554       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
555       NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
556
557   /**
558    * GstSplitMuxSink::muxer-added:
559    * @splitmux: the #GstSplitMuxSink
560    * @muxer: the newly added muxer element
561    *
562    * Since: 1.14
563    */
564   signals[SIGNAL_MUXER_ADDED] =
565       g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
566       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
567
568   /**
569    * GstSplitMuxSink::sink-added:
570    * @splitmux: the #GstSplitMuxSink
571    * @sink: the newly added sink element
572    *
573    * Since: 1.14
574    */
575   signals[SIGNAL_SINK_ADDED] =
576       g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
577       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
578
579   klass->split_now = split_now;
580   klass->split_after = split_after;
581   klass->split_at_running_time = split_at_running_time;
582 }
583
584 static void
585 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
586 {
587   g_mutex_init (&splitmux->lock);
588   g_mutex_init (&splitmux->state_lock);
589   g_cond_init (&splitmux->input_cond);
590   g_cond_init (&splitmux->output_cond);
591   g_queue_init (&splitmux->out_cmd_q);
592
593   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
594   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
595   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
596   splitmux->max_files = DEFAULT_MAX_FILES;
597   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
598   splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
599   splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
600   splitmux->reset_muxer = DEFAULT_RESET_MUXER;
601
602   splitmux->threshold_timecode_str = NULL;
603   gst_splitmux_reset_timecode (splitmux);
604
605   splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
606   splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
607   splitmux->muxer_properties = NULL;
608   splitmux->sink_factory = g_strdup (DEFAULT_SINK);
609   splitmux->sink_properties = NULL;
610
611   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
612   splitmux->split_requested = FALSE;
613   splitmux->do_split_next_gop = FALSE;
614   splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
615   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
616 }
617
618 static void
619 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
620 {
621   if (splitmux->muxer) {
622     gst_element_set_locked_state (splitmux->muxer, TRUE);
623     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
624     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
625   }
626   if (splitmux->active_sink) {
627     gst_element_set_locked_state (splitmux->active_sink, TRUE);
628     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
629     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
630   }
631
632   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
633 }
634
635 static void
636 gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux)
637 {
638   g_clear_pointer (&splitmux->in_tc, gst_video_time_code_free);
639   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
640   g_clear_pointer (&splitmux->gop_start_tc, gst_video_time_code_free);
641   splitmux->next_fragment_start_tc_time = GST_CLOCK_TIME_NONE;
642 }
643
644 static void
645 gst_splitmux_sink_dispose (GObject * object)
646 {
647   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
648
649   /* Calling parent dispose invalidates all child pointers */
650   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
651
652   G_OBJECT_CLASS (parent_class)->dispose (object);
653 }
654
655 static void
656 gst_splitmux_sink_finalize (GObject * object)
657 {
658   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
659   g_cond_clear (&splitmux->input_cond);
660   g_cond_clear (&splitmux->output_cond);
661   g_mutex_clear (&splitmux->lock);
662   g_mutex_clear (&splitmux->state_lock);
663   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
664   g_queue_clear (&splitmux->out_cmd_q);
665
666   if (splitmux->muxerpad_map)
667     gst_structure_free (splitmux->muxerpad_map);
668
669   if (splitmux->provided_sink)
670     gst_object_unref (splitmux->provided_sink);
671   if (splitmux->provided_muxer)
672     gst_object_unref (splitmux->provided_muxer);
673
674   if (splitmux->muxer_factory)
675     g_free (splitmux->muxer_factory);
676   if (splitmux->muxer_preset)
677     g_free (splitmux->muxer_preset);
678   if (splitmux->muxer_properties)
679     gst_structure_free (splitmux->muxer_properties);
680   if (splitmux->sink_factory)
681     g_free (splitmux->sink_factory);
682   if (splitmux->sink_preset)
683     g_free (splitmux->sink_preset);
684   if (splitmux->sink_properties)
685     gst_structure_free (splitmux->sink_properties);
686
687   if (splitmux->threshold_timecode_str)
688     g_free (splitmux->threshold_timecode_str);
689   if (splitmux->tc_interval)
690     gst_video_time_code_interval_free (splitmux->tc_interval);
691
692   if (splitmux->times_to_split)
693     gst_queue_array_free (splitmux->times_to_split);
694
695   g_free (splitmux->location);
696
697   /* Make sure to free any un-released contexts. There should not be any,
698    * because the dispose will have freed all request pads though */
699   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
700   g_list_free (splitmux->contexts);
701   gst_splitmux_reset_timecode (splitmux);
702
703   G_OBJECT_CLASS (parent_class)->finalize (object);
704 }
705
706 /*
707  * Set any time threshold to the muxer, if it has
708  * reserved-max-duration and reserved-duration-remaining
709  * properties. Called when creating/claiming the muxer
710  * in create_elements() */
711 static void
712 update_muxer_properties (GstSplitMuxSink * sink)
713 {
714   GObjectClass *klass;
715   GstClockTime threshold_time;
716
717   sink->muxer_has_reserved_props = FALSE;
718   if (sink->muxer == NULL)
719     return;
720   klass = G_OBJECT_GET_CLASS (sink->muxer);
721   if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
722     return;
723   if (g_object_class_find_property (klass,
724           "reserved-duration-remaining") == NULL)
725     return;
726   sink->muxer_has_reserved_props = TRUE;
727
728   GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
729       GST_TIME_ARGS (sink->threshold_time));
730   GST_OBJECT_LOCK (sink);
731   threshold_time = sink->threshold_time;
732   GST_OBJECT_UNLOCK (sink);
733
734   if (threshold_time > 0) {
735     /* Tell the muxer how much space to reserve */
736     GstClockTime muxer_threshold = threshold_time;
737     g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
738   }
739 }
740
741 static void
742 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
743     const GValue * value, GParamSpec * pspec)
744 {
745   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
746
747   switch (prop_id) {
748     case PROP_LOCATION:{
749       GST_OBJECT_LOCK (splitmux);
750       g_free (splitmux->location);
751       splitmux->location = g_value_dup_string (value);
752       GST_OBJECT_UNLOCK (splitmux);
753       break;
754     }
755     case PROP_START_INDEX:
756       GST_OBJECT_LOCK (splitmux);
757       splitmux->start_index = g_value_get_int (value);
758       GST_OBJECT_UNLOCK (splitmux);
759       break;
760     case PROP_MAX_SIZE_BYTES:
761       GST_OBJECT_LOCK (splitmux);
762       splitmux->threshold_bytes = g_value_get_uint64 (value);
763       GST_OBJECT_UNLOCK (splitmux);
764       break;
765     case PROP_MAX_SIZE_TIME:
766       GST_OBJECT_LOCK (splitmux);
767       splitmux->threshold_time = g_value_get_uint64 (value);
768       GST_OBJECT_UNLOCK (splitmux);
769       break;
770     case PROP_MAX_SIZE_TIMECODE:
771       GST_OBJECT_LOCK (splitmux);
772       g_free (splitmux->threshold_timecode_str);
773       /* will be calculated later */
774       g_clear_pointer (&splitmux->tc_interval,
775           gst_video_time_code_interval_free);
776       gst_splitmux_reset_timecode (splitmux);
777
778       splitmux->threshold_timecode_str = g_value_dup_string (value);
779       if (splitmux->threshold_timecode_str) {
780         splitmux->tc_interval =
781             gst_video_time_code_interval_new_from_string
782             (splitmux->threshold_timecode_str);
783         if (!splitmux->tc_interval) {
784           g_warning ("Wrong timecode string %s",
785               splitmux->threshold_timecode_str);
786           g_free (splitmux->threshold_timecode_str);
787           splitmux->threshold_timecode_str = NULL;
788         }
789       }
790       GST_OBJECT_UNLOCK (splitmux);
791       break;
792     case PROP_SEND_KEYFRAME_REQUESTS:
793       GST_OBJECT_LOCK (splitmux);
794       splitmux->send_keyframe_requests = g_value_get_boolean (value);
795       GST_OBJECT_UNLOCK (splitmux);
796       break;
797     case PROP_MAX_FILES:
798       GST_OBJECT_LOCK (splitmux);
799       splitmux->max_files = g_value_get_uint (value);
800       GST_OBJECT_UNLOCK (splitmux);
801       break;
802     case PROP_MUXER_OVERHEAD:
803       GST_OBJECT_LOCK (splitmux);
804       splitmux->mux_overhead = g_value_get_double (value);
805       GST_OBJECT_UNLOCK (splitmux);
806       break;
807     case PROP_USE_ROBUST_MUXING:
808       GST_OBJECT_LOCK (splitmux);
809       splitmux->use_robust_muxing = g_value_get_boolean (value);
810       GST_OBJECT_UNLOCK (splitmux);
811       if (splitmux->use_robust_muxing)
812         update_muxer_properties (splitmux);
813       break;
814     case PROP_ALIGNMENT_THRESHOLD:
815       GST_OBJECT_LOCK (splitmux);
816       splitmux->alignment_threshold = g_value_get_uint64 (value);
817       GST_OBJECT_UNLOCK (splitmux);
818       break;
819     case PROP_SINK:
820       GST_OBJECT_LOCK (splitmux);
821       gst_clear_object (&splitmux->provided_sink);
822       splitmux->provided_sink = g_value_get_object (value);
823       if (splitmux->provided_sink)
824         gst_object_ref_sink (splitmux->provided_sink);
825       GST_OBJECT_UNLOCK (splitmux);
826       break;
827     case PROP_MUXER:
828       GST_OBJECT_LOCK (splitmux);
829       gst_clear_object (&splitmux->provided_muxer);
830       splitmux->provided_muxer = g_value_get_object (value);
831       if (splitmux->provided_muxer)
832         gst_object_ref_sink (splitmux->provided_muxer);
833       GST_OBJECT_UNLOCK (splitmux);
834       break;
835     case PROP_RESET_MUXER:
836       GST_OBJECT_LOCK (splitmux);
837       splitmux->reset_muxer = g_value_get_boolean (value);
838       GST_OBJECT_UNLOCK (splitmux);
839       break;
840     case PROP_ASYNC_FINALIZE:
841       GST_OBJECT_LOCK (splitmux);
842       splitmux->async_finalize = g_value_get_boolean (value);
843       GST_OBJECT_UNLOCK (splitmux);
844       break;
845     case PROP_MUXER_FACTORY:
846       GST_OBJECT_LOCK (splitmux);
847       if (splitmux->muxer_factory)
848         g_free (splitmux->muxer_factory);
849       splitmux->muxer_factory = g_value_dup_string (value);
850       GST_OBJECT_UNLOCK (splitmux);
851       break;
852     case PROP_MUXER_PRESET:
853       GST_OBJECT_LOCK (splitmux);
854       if (splitmux->muxer_preset)
855         g_free (splitmux->muxer_preset);
856       splitmux->muxer_preset = g_value_dup_string (value);
857       GST_OBJECT_UNLOCK (splitmux);
858       break;
859     case PROP_MUXER_PROPERTIES:
860       GST_OBJECT_LOCK (splitmux);
861       if (splitmux->muxer_properties)
862         gst_structure_free (splitmux->muxer_properties);
863       if (gst_value_get_structure (value))
864         splitmux->muxer_properties =
865             gst_structure_copy (gst_value_get_structure (value));
866       else
867         splitmux->muxer_properties = NULL;
868       GST_OBJECT_UNLOCK (splitmux);
869       break;
870     case PROP_SINK_FACTORY:
871       GST_OBJECT_LOCK (splitmux);
872       if (splitmux->sink_factory)
873         g_free (splitmux->sink_factory);
874       splitmux->sink_factory = g_value_dup_string (value);
875       GST_OBJECT_UNLOCK (splitmux);
876       break;
877     case PROP_SINK_PRESET:
878       GST_OBJECT_LOCK (splitmux);
879       if (splitmux->sink_preset)
880         g_free (splitmux->sink_preset);
881       splitmux->sink_preset = g_value_dup_string (value);
882       GST_OBJECT_UNLOCK (splitmux);
883       break;
884     case PROP_SINK_PROPERTIES:
885       GST_OBJECT_LOCK (splitmux);
886       if (splitmux->sink_properties)
887         gst_structure_free (splitmux->sink_properties);
888       if (gst_value_get_structure (value))
889         splitmux->sink_properties =
890             gst_structure_copy (gst_value_get_structure (value));
891       else
892         splitmux->sink_properties = NULL;
893       GST_OBJECT_UNLOCK (splitmux);
894       break;
895     case PROP_MUXERPAD_MAP:
896     {
897       const GstStructure *s = gst_value_get_structure (value);
898       GST_SPLITMUX_LOCK (splitmux);
899       if (splitmux->muxerpad_map) {
900         gst_structure_free (splitmux->muxerpad_map);
901       }
902       if (s)
903         splitmux->muxerpad_map = gst_structure_copy (s);
904       else
905         splitmux->muxerpad_map = NULL;
906       GST_SPLITMUX_UNLOCK (splitmux);
907       break;
908     }
909     default:
910       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
911       break;
912   }
913 }
914
915 static void
916 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
917     GValue * value, GParamSpec * pspec)
918 {
919   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
920
921   switch (prop_id) {
922     case PROP_LOCATION:
923       GST_OBJECT_LOCK (splitmux);
924       g_value_set_string (value, splitmux->location);
925       GST_OBJECT_UNLOCK (splitmux);
926       break;
927     case PROP_START_INDEX:
928       GST_OBJECT_LOCK (splitmux);
929       g_value_set_int (value, splitmux->start_index);
930       GST_OBJECT_UNLOCK (splitmux);
931       break;
932     case PROP_MAX_SIZE_BYTES:
933       GST_OBJECT_LOCK (splitmux);
934       g_value_set_uint64 (value, splitmux->threshold_bytes);
935       GST_OBJECT_UNLOCK (splitmux);
936       break;
937     case PROP_MAX_SIZE_TIME:
938       GST_OBJECT_LOCK (splitmux);
939       g_value_set_uint64 (value, splitmux->threshold_time);
940       GST_OBJECT_UNLOCK (splitmux);
941       break;
942     case PROP_MAX_SIZE_TIMECODE:
943       GST_OBJECT_LOCK (splitmux);
944       g_value_set_string (value, splitmux->threshold_timecode_str);
945       GST_OBJECT_UNLOCK (splitmux);
946       break;
947     case PROP_SEND_KEYFRAME_REQUESTS:
948       GST_OBJECT_LOCK (splitmux);
949       g_value_set_boolean (value, splitmux->send_keyframe_requests);
950       GST_OBJECT_UNLOCK (splitmux);
951       break;
952     case PROP_MAX_FILES:
953       GST_OBJECT_LOCK (splitmux);
954       g_value_set_uint (value, splitmux->max_files);
955       GST_OBJECT_UNLOCK (splitmux);
956       break;
957     case PROP_MUXER_OVERHEAD:
958       GST_OBJECT_LOCK (splitmux);
959       g_value_set_double (value, splitmux->mux_overhead);
960       GST_OBJECT_UNLOCK (splitmux);
961       break;
962     case PROP_USE_ROBUST_MUXING:
963       GST_OBJECT_LOCK (splitmux);
964       g_value_set_boolean (value, splitmux->use_robust_muxing);
965       GST_OBJECT_UNLOCK (splitmux);
966       break;
967     case PROP_ALIGNMENT_THRESHOLD:
968       GST_OBJECT_LOCK (splitmux);
969       g_value_set_uint64 (value, splitmux->alignment_threshold);
970       GST_OBJECT_UNLOCK (splitmux);
971       break;
972     case PROP_SINK:
973       GST_OBJECT_LOCK (splitmux);
974       g_value_set_object (value, splitmux->provided_sink);
975       GST_OBJECT_UNLOCK (splitmux);
976       break;
977     case PROP_MUXER:
978       GST_OBJECT_LOCK (splitmux);
979       g_value_set_object (value, splitmux->provided_muxer);
980       GST_OBJECT_UNLOCK (splitmux);
981       break;
982     case PROP_RESET_MUXER:
983       GST_OBJECT_LOCK (splitmux);
984       g_value_set_boolean (value, splitmux->reset_muxer);
985       GST_OBJECT_UNLOCK (splitmux);
986       break;
987     case PROP_ASYNC_FINALIZE:
988       GST_OBJECT_LOCK (splitmux);
989       g_value_set_boolean (value, splitmux->async_finalize);
990       GST_OBJECT_UNLOCK (splitmux);
991       break;
992     case PROP_MUXER_FACTORY:
993       GST_OBJECT_LOCK (splitmux);
994       g_value_set_string (value, splitmux->muxer_factory);
995       GST_OBJECT_UNLOCK (splitmux);
996       break;
997     case PROP_MUXER_PRESET:
998       GST_OBJECT_LOCK (splitmux);
999       g_value_set_string (value, splitmux->muxer_preset);
1000       GST_OBJECT_UNLOCK (splitmux);
1001       break;
1002     case PROP_MUXER_PROPERTIES:
1003       GST_OBJECT_LOCK (splitmux);
1004       gst_value_set_structure (value, splitmux->muxer_properties);
1005       GST_OBJECT_UNLOCK (splitmux);
1006       break;
1007     case PROP_SINK_FACTORY:
1008       GST_OBJECT_LOCK (splitmux);
1009       g_value_set_string (value, splitmux->sink_factory);
1010       GST_OBJECT_UNLOCK (splitmux);
1011       break;
1012     case PROP_SINK_PRESET:
1013       GST_OBJECT_LOCK (splitmux);
1014       g_value_set_string (value, splitmux->sink_preset);
1015       GST_OBJECT_UNLOCK (splitmux);
1016       break;
1017     case PROP_SINK_PROPERTIES:
1018       GST_OBJECT_LOCK (splitmux);
1019       gst_value_set_structure (value, splitmux->sink_properties);
1020       GST_OBJECT_UNLOCK (splitmux);
1021       break;
1022     case PROP_MUXERPAD_MAP:
1023       GST_SPLITMUX_LOCK (splitmux);
1024       gst_value_set_structure (value, splitmux->muxerpad_map);
1025       GST_SPLITMUX_UNLOCK (splitmux);
1026       break;
1027     default:
1028       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1029       break;
1030   }
1031 }
1032
1033 /* Convenience function */
1034 static inline GstClockTimeDiff
1035 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1036 {
1037   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1038
1039   if (GST_CLOCK_TIME_IS_VALID (val)) {
1040     gboolean sign =
1041         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1042     if (sign > 0)
1043       res = val;
1044     else if (sign < 0)
1045       res = -val;
1046   }
1047   return res;
1048 }
1049
1050 static void
1051 mq_stream_ctx_reset (MqStreamCtx * ctx)
1052 {
1053   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1054   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1055   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1056   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1057   g_queue_clear (&ctx->queued_bufs);
1058 }
1059
1060 static MqStreamCtx *
1061 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1062 {
1063   MqStreamCtx *ctx;
1064
1065   ctx = g_new0 (MqStreamCtx, 1);
1066   ctx->splitmux = splitmux;
1067   g_queue_init (&ctx->queued_bufs);
1068   mq_stream_ctx_reset (ctx);
1069
1070   return ctx;
1071 }
1072
1073 static void
1074 mq_stream_ctx_free (MqStreamCtx * ctx)
1075 {
1076   if (ctx->q) {
1077     GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1078
1079     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1080
1081     if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1082       gst_element_set_locked_state (ctx->q, TRUE);
1083       gst_element_set_state (ctx->q, GST_STATE_NULL);
1084       gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1085       gst_object_unref (parent);
1086     }
1087     gst_object_unref (ctx->q);
1088   }
1089   gst_object_unref (ctx->sinkpad);
1090   gst_object_unref (ctx->srcpad);
1091   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1092   g_queue_clear (&ctx->queued_bufs);
1093   g_free (ctx);
1094 }
1095
1096 static void
1097 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1098     GstElement * sink)
1099 {
1100   gchar *location = NULL;
1101   GstMessage *msg;
1102   const gchar *msg_name = opened ?
1103       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1104   GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1105
1106   if (!opened) {
1107     GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1108     if (rtime)
1109       running_time = *rtime;
1110   }
1111
1112   if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1113           "location") != NULL)
1114     g_object_get (sink, "location", &location, NULL);
1115
1116   GST_DEBUG_OBJECT (splitmux,
1117       "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1118       msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1119
1120   /* If it's in the middle of a teardown, the reference_ctc might have become
1121    * NULL */
1122   if (splitmux->reference_ctx) {
1123     msg = gst_message_new_element (GST_OBJECT (splitmux),
1124         gst_structure_new (msg_name,
1125             "location", G_TYPE_STRING, location,
1126             "running-time", GST_TYPE_CLOCK_TIME, running_time,
1127             "sink", GST_TYPE_ELEMENT, sink, NULL));
1128     gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1129   }
1130
1131   g_free (location);
1132 }
1133
1134 static void
1135 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1136 {
1137   GstEvent *eos;
1138   GstPad *pad;
1139   MqStreamCtx *ctx;
1140
1141   eos = gst_event_new_eos ();
1142   pad = helper->pad;
1143   ctx = helper->ctx;
1144
1145   GST_SPLITMUX_LOCK (splitmux);
1146   if (!pad)
1147     pad = gst_pad_get_peer (ctx->srcpad);
1148   GST_SPLITMUX_UNLOCK (splitmux);
1149
1150   gst_pad_send_event (pad, eos);
1151   GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1152
1153   gst_object_unref (pad);
1154   g_free (helper);
1155 }
1156
1157 /* Called with lock held, drops the lock to send EOS to the
1158  * pad
1159  */
1160 static void
1161 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1162 {
1163   GstEvent *eos;
1164   GstPad *pad;
1165
1166   eos = gst_event_new_eos ();
1167   pad = gst_pad_get_peer (ctx->srcpad);
1168
1169   ctx->out_eos = TRUE;
1170
1171   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1172   GST_SPLITMUX_UNLOCK (splitmux);
1173   gst_pad_send_event (pad, eos);
1174   GST_SPLITMUX_LOCK (splitmux);
1175
1176   gst_object_unref (pad);
1177 }
1178
1179 /* Called with lock held. Schedules an EOS event to the ctx pad
1180  * to happen in another thread */
1181 static void
1182 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1183 {
1184   AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1185   GstPad *srcpad, *sinkpad;
1186
1187   srcpad = ctx->srcpad;
1188   sinkpad = gst_pad_get_peer (srcpad);
1189
1190   helper->ctx = ctx;
1191   helper->pad = sinkpad;        /* Takes the reference */
1192
1193   ctx->out_eos_async_done = TRUE;
1194
1195   /* There used to be a bug here, where we had to explicitly remove
1196    * the SINK flag so that GstBin would ignore it for EOS purposes.
1197    * That fixed a race where if splitmuxsink really reaches EOS
1198    * before an asynchronous background element has finished, then
1199    * the bin wouldn't actually send EOS to the pipeline. Even after
1200    * finishing and removing the old element, the bin didn't re-check
1201    * EOS status on removing a SINK element. That bug was fixed
1202    * in core. */
1203   GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1204       sinkpad, ctx);
1205
1206   g_assert_nonnull (helper->pad);
1207   gst_element_call_async (GST_ELEMENT (splitmux),
1208       (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1209 }
1210
1211 /* Called with lock held. TRUE iff all contexts have a
1212  * pending (or delivered) async eos event */
1213 static gboolean
1214 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1215 {
1216   gboolean ret = TRUE;
1217   GList *item;
1218
1219   for (item = splitmux->contexts; item; item = item->next) {
1220     MqStreamCtx *ctx = item->data;
1221     ret &= ctx->out_eos_async_done;
1222   }
1223   return ret;
1224 }
1225
1226 /* Called with splitmux lock held to check if this output
1227  * context needs to sleep to wait for the release of the
1228  * next GOP, or to send EOS to close out the current file
1229  */
1230 static GstFlowReturn
1231 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1232 {
1233   if (ctx->caps_change)
1234     return GST_FLOW_OK;
1235
1236   do {
1237     /* When first starting up, the reference stream has to output
1238      * the first buffer to prepare the muxer and sink */
1239     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1240     GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1241
1242     if (!(splitmux->max_out_running_time == 0 ||
1243             splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1244             splitmux->alignment_threshold == 0 ||
1245             splitmux->max_out_running_time < splitmux->alignment_threshold)) {
1246       my_max_out_running_time -= splitmux->alignment_threshold;
1247       GST_LOG_OBJECT (ctx->srcpad,
1248           "Max out running time currently %" GST_STIME_FORMAT
1249           ", with threshold applied it is %" GST_STIME_FORMAT,
1250           GST_STIME_ARGS (splitmux->max_out_running_time),
1251           GST_STIME_ARGS (my_max_out_running_time));
1252     }
1253
1254     if (ctx->flushing
1255         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1256       return GST_FLOW_FLUSHING;
1257
1258     GST_LOG_OBJECT (ctx->srcpad,
1259         "Checking running time %" GST_STIME_FORMAT " against max %"
1260         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1261         GST_STIME_ARGS (my_max_out_running_time));
1262
1263     if (can_output) {
1264       if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1265           ctx->out_running_time < my_max_out_running_time) {
1266         return GST_FLOW_OK;
1267       }
1268
1269       switch (splitmux->output_state) {
1270         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1271           /* We only get here if we've finished outputting a GOP and need to know
1272            * what to do next */
1273           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1274           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1275           continue;
1276
1277         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1278         case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1279           /* We've reached the max out running_time to get here, so end this file now */
1280           if (ctx->out_eos == FALSE) {
1281             if (splitmux->async_finalize) {
1282               /* We must set EOS asynchronously at this point. We cannot defer
1283                * it, because we need all contexts to wake up, for the
1284                * reference context to eventually give us something at
1285                * START_NEXT_FILE. Otherwise, collectpads might choose another
1286                * context to give us the first buffer, and format-location-full
1287                * will not contain a valid sample. */
1288               g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1289                   GINT_TO_POINTER (1));
1290               eos_context_async (ctx, splitmux);
1291               if (all_contexts_are_async_eos (splitmux)) {
1292                 GST_INFO_OBJECT (splitmux,
1293                     "All contexts are async_eos. Moving to the next file.");
1294                 /* We can start the next file once we've asked each pad to go EOS */
1295                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1296                 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1297                 continue;
1298               }
1299             } else {
1300               send_eos (splitmux, ctx);
1301               continue;
1302             }
1303           } else {
1304             GST_INFO_OBJECT (splitmux,
1305                 "At end-of-file state, but context %p is already EOS", ctx);
1306           }
1307           break;
1308         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1309           if (ctx->is_reference) {
1310             GstFlowReturn ret = GST_FLOW_OK;
1311
1312             /* Special handling on the reference ctx to start new fragments
1313              * and collect commands from the command queue */
1314             /* drops the splitmux lock briefly: */
1315             /* We must have reference ctx in order for format-location-full to
1316              * have a sample */
1317             ret = start_next_fragment (splitmux, ctx);
1318             if (ret != GST_FLOW_OK)
1319               return ret;
1320
1321             continue;
1322           }
1323           break;
1324         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1325           do {
1326             SplitMuxOutputCommand *cmd =
1327                 g_queue_pop_tail (&splitmux->out_cmd_q);
1328             if (cmd != NULL) {
1329               /* If we pop the last command, we need to make our queues bigger */
1330               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1331                 grow_blocked_queues (splitmux);
1332
1333               if (cmd->start_new_fragment) {
1334                 if (splitmux->muxed_out_bytes > 0) {
1335                   GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1336                   splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1337                 } else {
1338                   GST_DEBUG_OBJECT (splitmux,
1339                       "Got cmd to start new fragment, but fragment is empty - ignoring.");
1340                 }
1341               } else {
1342                 GST_DEBUG_OBJECT (splitmux,
1343                     "Got new output cmd for time %" GST_STIME_FORMAT,
1344                     GST_STIME_ARGS (cmd->max_output_ts));
1345
1346                 /* Extend the output range immediately */
1347                 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE
1348                     || cmd->max_output_ts > splitmux->max_out_running_time)
1349                   splitmux->max_out_running_time = cmd->max_output_ts;
1350                 GST_DEBUG_OBJECT (splitmux,
1351                     "Max out running time now %" GST_STIME_FORMAT,
1352                     GST_STIME_ARGS (splitmux->max_out_running_time));
1353                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1354               }
1355               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1356
1357               out_cmd_buf_free (cmd);
1358               break;
1359             } else {
1360               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1361             }
1362           } while (!ctx->flushing && splitmux->output_state ==
1363               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1364           /* loop and re-check the state */
1365           continue;
1366         }
1367         case SPLITMUX_OUTPUT_STATE_STOPPED:
1368           return GST_FLOW_FLUSHING;
1369       }
1370     } else {
1371       GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1372     }
1373
1374     GST_INFO_OBJECT (ctx->srcpad,
1375         "Sleeping for running time %"
1376         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1377         GST_STIME_ARGS (ctx->out_running_time),
1378         GST_STIME_ARGS (splitmux->max_out_running_time));
1379     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1380     GST_INFO_OBJECT (ctx->srcpad,
1381         "Woken for new max running time %" GST_STIME_FORMAT,
1382         GST_STIME_ARGS (splitmux->max_out_running_time));
1383   }
1384   while (1);
1385
1386   return GST_FLOW_OK;
1387 }
1388
1389 static GstClockTime
1390 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1391     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1392     GstVideoTimeCode ** next_tc)
1393 {
1394   GstVideoTimeCode *target_tc;
1395   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1396
1397   if (cur_tc == NULL || splitmux->tc_interval == NULL)
1398     return GST_CLOCK_TIME_NONE;
1399
1400   target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1401   if (!target_tc) {
1402     GST_ELEMENT_ERROR (splitmux,
1403         STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1404     return GST_CLOCK_TIME_NONE;
1405   }
1406
1407   /* Convert to ns */
1408   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1409   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1410
1411   /* Add running_time, accounting for wraparound. */
1412   if (target_tc_time >= cur_tc_time) {
1413     next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1414   } else {
1415     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1416
1417     if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1418         (cur_tc->config.fps_d == 1001)) {
1419       /* Checking fps_d is probably unneeded, but better safe than sorry
1420        * (e.g. someone accidentally set a flag) */
1421       GstVideoTimeCode *tc_for_offset;
1422
1423       /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1424        * but slightly less. Calculate that duration from a fake timecode. The
1425        * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1426        * is to add one frame to 23:59:59;29 */
1427       tc_for_offset =
1428           gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1429           NULL, cur_tc->config.flags, 23, 59, 59,
1430           cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1431       day_in_ns =
1432           gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1433           gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1434           cur_tc->config.fps_n);
1435       gst_video_time_code_free (tc_for_offset);
1436     }
1437     next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1438   }
1439
1440   GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
1441       " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
1442       GST_TIME_ARGS (cur_tc_time));
1443   if (next_tc)
1444     *next_tc = target_tc;
1445   else
1446     gst_video_time_code_free (target_tc);
1447
1448   return next_max_tc_time;
1449 }
1450
1451 static gboolean
1452 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1453     GstClockTime running_time)
1454 {
1455   GstEvent *ev;
1456   GstClockTime target_time;
1457   gboolean timecode_based = FALSE;
1458   GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1459   GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1460   GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1461   GstClockTime tc_rounding_error = 5 * GST_USECOND;
1462
1463   if (!splitmux->send_keyframe_requests)
1464     return TRUE;
1465
1466   if (splitmux->tc_interval) {
1467     if (splitmux->in_tc && gst_video_time_code_is_valid (splitmux->in_tc)) {
1468       GstVideoTimeCode *next_tc = NULL;
1469       max_tc_time =
1470           calculate_next_max_timecode (splitmux, splitmux->in_tc,
1471           running_time, &next_tc);
1472
1473       /* calculate the next expected keyframe time to prevent too early fku
1474        * event */
1475       if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1476         next_max_tc_time =
1477             calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1478       }
1479       if (next_tc)
1480         gst_video_time_code_free (next_tc);
1481
1482       timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1483           GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1484     } else {
1485       /* This can happen in the presence of GAP events that trigger
1486        * a new fragment start */
1487       GST_WARNING_OBJECT (splitmux,
1488           "No buffer available to calculate next timecode");
1489     }
1490   }
1491
1492   if ((splitmux->threshold_time == 0 && !timecode_based)
1493       || splitmux->threshold_bytes != 0)
1494     return TRUE;
1495
1496   if (timecode_based) {
1497     /* We might have rounding errors: aim slightly earlier */
1498     if (max_tc_time >= tc_rounding_error) {
1499       target_time = max_tc_time - tc_rounding_error;
1500     } else {
1501       /* unreliable target time */
1502       GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1503           " is smaller than allowed rounding error, set it to zero",
1504           GST_TIME_ARGS (max_tc_time));
1505       target_time = 0;
1506     }
1507
1508     if (next_max_tc_time >= tc_rounding_error) {
1509       next_fku_time = next_max_tc_time - tc_rounding_error;
1510     } else {
1511       /* unreliable target time */
1512       GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1513           " is smaller than allowed rounding error, set it to zero",
1514           GST_TIME_ARGS (next_max_tc_time));
1515       next_fku_time = 0;
1516     }
1517   } else {
1518     target_time = running_time + splitmux->threshold_time;
1519   }
1520
1521   if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1522     GstClockTime allowed_time = splitmux->next_fku_time;
1523
1524     if (timecode_based) {
1525       if (allowed_time >= tc_rounding_error) {
1526         allowed_time -= tc_rounding_error;
1527       } else {
1528         /* unreliable next force key unit time */
1529         GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1530             GST_TIME_FORMAT
1531             " is smaller than allowed rounding error, set it to zero",
1532             GST_TIME_ARGS (splitmux->next_fku_time));
1533         allowed_time = 0;
1534       }
1535     }
1536
1537     if (target_time < allowed_time) {
1538       GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1539           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1540           ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1541           GST_TIME_ARGS (target_time),
1542           GST_TIME_ARGS (splitmux->next_fku_time),
1543           GST_TIME_ARGS (allowed_time));
1544
1545       return TRUE;
1546     } else if (allowed_time != splitmux->next_fku_time &&
1547         target_time < splitmux->next_fku_time) {
1548       GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1549           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1550           ", but the difference is smaller than allowed rounding error",
1551           GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1552     }
1553   }
1554
1555   if (!timecode_based) {
1556     next_fku_time = target_time + splitmux->threshold_time;
1557   }
1558
1559   splitmux->next_fku_time = next_fku_time;
1560
1561   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1562   GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1563       ", the next expected keyframe is %" GST_TIME_FORMAT,
1564       GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1565   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1566 }
1567
1568 static GstPadProbeReturn
1569 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1570 {
1571   GstSplitMuxSink *splitmux = ctx->splitmux;
1572   MqStreamBuf *buf_info = NULL;
1573   GstFlowReturn ret = GST_FLOW_OK;
1574
1575   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1576
1577   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1578   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1579     g_warning ("Buffer list handling not implemented");
1580     return GST_PAD_PROBE_DROP;
1581   }
1582   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1583       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1584     GstEvent *event = gst_pad_probe_info_get_event (info);
1585     gboolean locked = FALSE, wait = !ctx->is_reference;
1586
1587     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1588
1589     switch (GST_EVENT_TYPE (event)) {
1590       case GST_EVENT_SEGMENT:
1591         gst_event_copy_segment (event, &ctx->out_segment);
1592         break;
1593       case GST_EVENT_FLUSH_STOP:
1594         GST_SPLITMUX_LOCK (splitmux);
1595         locked = TRUE;
1596         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1597         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1598         g_queue_clear (&ctx->queued_bufs);
1599         g_queue_clear (&ctx->queued_bufs);
1600         /* If this is the reference context, we just threw away any queued keyframes */
1601         if (ctx->is_reference)
1602           splitmux->queued_keyframes = 0;
1603         ctx->flushing = FALSE;
1604         wait = FALSE;
1605         break;
1606       case GST_EVENT_FLUSH_START:
1607         GST_SPLITMUX_LOCK (splitmux);
1608         locked = TRUE;
1609         GST_LOG_OBJECT (pad, "Flush start");
1610         ctx->flushing = TRUE;
1611         GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1612         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1613         break;
1614       case GST_EVENT_EOS:
1615         GST_SPLITMUX_LOCK (splitmux);
1616         locked = TRUE;
1617         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1618           goto beach;
1619         ctx->out_eos = TRUE;
1620
1621         if (ctx == splitmux->reference_ctx) {
1622           splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1623           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1624         }
1625
1626         GST_INFO_OBJECT (splitmux,
1627             "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1628         break;
1629       case GST_EVENT_GAP:{
1630         GstClockTime gap_ts;
1631         GstClockTimeDiff rtime;
1632
1633         gst_event_parse_gap (event, &gap_ts, NULL);
1634         if (gap_ts == GST_CLOCK_TIME_NONE)
1635           break;
1636
1637         GST_SPLITMUX_LOCK (splitmux);
1638         locked = TRUE;
1639
1640         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1641           goto beach;
1642
1643         /* When we get a gap event on the
1644          * reference stream and we're trying to open a
1645          * new file, we need to store it until we get
1646          * the buffer afterwards
1647          */
1648         if (ctx->is_reference &&
1649             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1650           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1651           gst_event_replace (&ctx->pending_gap, event);
1652           GST_SPLITMUX_UNLOCK (splitmux);
1653           return GST_PAD_PROBE_HANDLED;
1654         }
1655
1656         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1657
1658         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1659             GST_STIME_ARGS (rtime));
1660
1661         if (rtime != GST_CLOCK_STIME_NONE) {
1662           ctx->out_running_time = rtime;
1663           complete_or_wait_on_out (splitmux, ctx);
1664         }
1665         break;
1666       }
1667       case GST_EVENT_CUSTOM_DOWNSTREAM:{
1668         const GstStructure *s;
1669         GstClockTimeDiff ts = 0;
1670
1671         s = gst_event_get_structure (event);
1672         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1673           break;
1674
1675         gst_structure_get_int64 (s, "timestamp", &ts);
1676
1677         GST_SPLITMUX_LOCK (splitmux);
1678         locked = TRUE;
1679
1680         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1681           goto beach;
1682         ctx->out_running_time = ts;
1683         if (!ctx->is_reference)
1684           ret = complete_or_wait_on_out (splitmux, ctx);
1685         GST_SPLITMUX_UNLOCK (splitmux);
1686         GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1687         return GST_PAD_PROBE_DROP;
1688       }
1689       case GST_EVENT_CAPS:{
1690         GstPad *peer;
1691
1692         if (!ctx->is_reference)
1693           break;
1694
1695         peer = gst_pad_get_peer (pad);
1696         if (peer) {
1697           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1698
1699           gst_object_unref (peer);
1700
1701           if (ok)
1702             break;
1703
1704         } else {
1705           break;
1706         }
1707         /* This is in the case the muxer doesn't allow this change of caps */
1708         GST_SPLITMUX_LOCK (splitmux);
1709         locked = TRUE;
1710         ctx->caps_change = TRUE;
1711
1712         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1713           GST_DEBUG_OBJECT (splitmux,
1714               "New caps were not accepted. Switching output file");
1715           if (ctx->out_eos == FALSE) {
1716             splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1717             GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1718           }
1719         }
1720
1721         /* Lets it fall through, if it fails again, then the muxer just can't
1722          * support this format, but at least we have a closed file.
1723          */
1724         break;
1725       }
1726       default:
1727         break;
1728     }
1729
1730     /* We need to make sure events aren't passed
1731      * until the muxer / sink are ready for it */
1732     if (!locked)
1733       GST_SPLITMUX_LOCK (splitmux);
1734     if (wait)
1735       ret = complete_or_wait_on_out (splitmux, ctx);
1736     GST_SPLITMUX_UNLOCK (splitmux);
1737
1738     /* Don't try to forward sticky events before the next buffer is there
1739      * because it would cause a new file to be created without the first
1740      * buffer being available.
1741      */
1742     GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1743     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1744       gst_event_unref (event);
1745       return GST_PAD_PROBE_HANDLED;
1746     } else {
1747       return GST_PAD_PROBE_PASS;
1748     }
1749   }
1750
1751   /* Allow everything through until the configured next stopping point */
1752   GST_SPLITMUX_LOCK (splitmux);
1753
1754   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1755   if (buf_info == NULL) {
1756     /* Can only happen due to a poorly timed flush */
1757     ret = GST_FLOW_FLUSHING;
1758     goto beach;
1759   }
1760
1761   /* If we have popped a keyframe, decrement the queued_gop count */
1762   if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
1763     splitmux->queued_keyframes--;
1764
1765   ctx->out_running_time = buf_info->run_ts;
1766   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1767
1768   GST_LOG_OBJECT (splitmux,
1769       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1770       " size %" G_GUINT64_FORMAT,
1771       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1772
1773   ctx->caps_change = FALSE;
1774
1775   ret = complete_or_wait_on_out (splitmux, ctx);
1776
1777   splitmux->muxed_out_bytes += buf_info->buf_size;
1778
1779 #ifndef GST_DISABLE_GST_DEBUG
1780   {
1781     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1782     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1783         " run ts %" GST_STIME_FORMAT, buf,
1784         GST_STIME_ARGS (ctx->out_running_time));
1785   }
1786 #endif
1787
1788   ctx->cur_out_buffer = NULL;
1789   GST_SPLITMUX_UNLOCK (splitmux);
1790
1791   /* pending_gap is protected by the STREAM lock */
1792   if (ctx->pending_gap) {
1793     /* If we previously stored a gap event, send it now */
1794     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1795
1796     GST_DEBUG_OBJECT (splitmux,
1797         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1798
1799     gst_pad_send_event (peer, ctx->pending_gap);
1800     ctx->pending_gap = NULL;
1801
1802     gst_object_unref (peer);
1803   }
1804
1805   mq_stream_buf_free (buf_info);
1806
1807   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1808   return GST_PAD_PROBE_PASS;
1809
1810 beach:
1811   GST_SPLITMUX_UNLOCK (splitmux);
1812   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1813   return GST_PAD_PROBE_DROP;
1814 }
1815
1816 static gboolean
1817 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1818 {
1819   return gst_pad_send_event (peer, gst_event_ref (*event));
1820 }
1821
1822 static void
1823 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1824 {
1825   if (ctx->fragment_block_id > 0) {
1826     gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1827     ctx->fragment_block_id = 0;
1828   }
1829 }
1830
1831 static void
1832 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1833 {
1834   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1835
1836   gst_pad_sticky_events_foreach (ctx->srcpad,
1837       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1838
1839   /* Clear EOS flag if not actually EOS */
1840   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1841   ctx->out_eos_async_done = ctx->out_eos;
1842
1843   gst_object_unref (peer);
1844 }
1845
1846 static void
1847 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1848 {
1849   GstPad *sinkpad, *srcpad, *newpad;
1850   GstPadTemplate *templ;
1851
1852   srcpad = ctx->srcpad;
1853   sinkpad = gst_pad_get_peer (srcpad);
1854
1855   templ = sinkpad->padtemplate;
1856   newpad =
1857       gst_element_request_pad (splitmux->muxer, templ,
1858       GST_PAD_NAME (sinkpad), NULL);
1859
1860   GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1861       newpad);
1862   if (!gst_pad_unlink (srcpad, sinkpad)) {
1863     gst_object_unref (sinkpad);
1864     goto fail;
1865   }
1866   if (gst_pad_link_full (srcpad, newpad,
1867           GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1868     gst_element_release_request_pad (splitmux->muxer, newpad);
1869     gst_object_unref (sinkpad);
1870     gst_object_unref (newpad);
1871     goto fail;
1872   }
1873   gst_object_unref (newpad);
1874   gst_object_unref (sinkpad);
1875   return;
1876
1877 fail:
1878   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1879       ("Could not create the new muxer/sink"), NULL);
1880 }
1881
1882 static GstPadProbeReturn
1883 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1884 {
1885   return GST_PAD_PROBE_OK;
1886 }
1887
1888 static void
1889 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1890 {
1891   ctx->fragment_block_id =
1892       gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1893       NULL, NULL);
1894 }
1895
1896 static gboolean
1897 _set_property_from_structure (GQuark field_id, const GValue * value,
1898     gpointer user_data)
1899 {
1900   const gchar *property_name = g_quark_to_string (field_id);
1901   GObject *element = G_OBJECT (user_data);
1902
1903   g_object_set_property (element, property_name, value);
1904
1905   return TRUE;
1906 }
1907
1908 static void
1909 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1910 {
1911   gst_element_set_locked_state (element, TRUE);
1912   gst_element_set_state (element, GST_STATE_NULL);
1913   GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1914   gst_bin_remove (GST_BIN (splitmux), element);
1915 }
1916
1917
1918 static void
1919 _send_event (const GValue * value, gpointer user_data)
1920 {
1921   GstPad *pad = g_value_get_object (value);
1922   GstEvent *ev = user_data;
1923
1924   gst_pad_send_event (pad, gst_event_ref (ev));
1925 }
1926
1927 /* Called with lock held when a fragment
1928  * reaches EOS and it is time to restart
1929  * a new fragment
1930  */
1931 static GstFlowReturn
1932 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1933 {
1934   GstElement *muxer, *sink;
1935
1936   g_assert (ctx->is_reference);
1937
1938   /* 1 change to new file */
1939   splitmux->switching_fragment = TRUE;
1940
1941   /* We need to drop the splitmux lock to acquire the state lock
1942    * here and ensure there's no racy state change going on elsewhere */
1943   muxer = gst_object_ref (splitmux->muxer);
1944   sink = gst_object_ref (splitmux->active_sink);
1945
1946   GST_SPLITMUX_UNLOCK (splitmux);
1947   GST_SPLITMUX_STATE_LOCK (splitmux);
1948
1949   if (splitmux->shutdown) {
1950     GST_DEBUG_OBJECT (splitmux,
1951         "Shutdown requested. Aborting fragment switch.");
1952     GST_SPLITMUX_LOCK (splitmux);
1953     GST_SPLITMUX_STATE_UNLOCK (splitmux);
1954     gst_object_unref (muxer);
1955     gst_object_unref (sink);
1956     return GST_FLOW_FLUSHING;
1957   }
1958
1959   if (splitmux->async_finalize) {
1960     if (splitmux->muxed_out_bytes > 0
1961         || splitmux->fragment_id != splitmux->start_index) {
1962       gchar *newname;
1963       GstElement *new_sink, *new_muxer;
1964
1965       GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
1966           splitmux->fragment_id);
1967       g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
1968       newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
1969       GST_SPLITMUX_LOCK (splitmux);
1970       if ((splitmux->sink =
1971               create_element (splitmux, splitmux->sink_factory, newname,
1972                   TRUE)) == NULL)
1973         goto fail;
1974       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
1975         gst_preset_load_preset (GST_PRESET (splitmux->sink),
1976             splitmux->sink_preset);
1977       if (splitmux->sink_properties)
1978         gst_structure_foreach (splitmux->sink_properties,
1979             _set_property_from_structure, splitmux->sink);
1980       splitmux->active_sink = splitmux->sink;
1981       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
1982       g_free (newname);
1983       newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
1984       if ((splitmux->muxer =
1985               create_element (splitmux, splitmux->muxer_factory, newname,
1986                   TRUE)) == NULL)
1987         goto fail;
1988       if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
1989               "async") != NULL) {
1990         /* async child elements are causing state change races and weird
1991          * failures, so let's try and turn that off */
1992         g_object_set (splitmux->sink, "async", FALSE, NULL);
1993       }
1994       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
1995         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
1996             splitmux->muxer_preset);
1997       if (splitmux->muxer_properties)
1998         gst_structure_foreach (splitmux->muxer_properties,
1999             _set_property_from_structure, splitmux->muxer);
2000       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2001       g_free (newname);
2002       new_sink = splitmux->sink;
2003       new_muxer = splitmux->muxer;
2004       GST_SPLITMUX_UNLOCK (splitmux);
2005       g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2006       gst_element_link (new_muxer, new_sink);
2007
2008       if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2009         if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2010                     EOS_FROM_US)) == 2) {
2011           _lock_and_set_to_null (muxer, splitmux);
2012           _lock_and_set_to_null (sink, splitmux);
2013         } else {
2014           g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2015               GINT_TO_POINTER (2));
2016         }
2017       }
2018       gst_object_unref (muxer);
2019       gst_object_unref (sink);
2020       muxer = new_muxer;
2021       sink = new_sink;
2022       gst_object_ref (muxer);
2023       gst_object_ref (sink);
2024     }
2025   } else {
2026
2027     gst_element_set_locked_state (muxer, TRUE);
2028     gst_element_set_locked_state (sink, TRUE);
2029     gst_element_set_state (sink, GST_STATE_NULL);
2030
2031     if (splitmux->reset_muxer) {
2032       gst_element_set_state (muxer, GST_STATE_NULL);
2033     } else {
2034       GstIterator *it = gst_element_iterate_sink_pads (muxer);
2035       GstEvent *ev;
2036       guint32 seqnum;
2037
2038       ev = gst_event_new_flush_start ();
2039       seqnum = gst_event_get_seqnum (ev);
2040       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2041       gst_event_unref (ev);
2042
2043       gst_iterator_resync (it);
2044
2045       ev = gst_event_new_flush_stop (TRUE);
2046       gst_event_set_seqnum (ev, seqnum);
2047       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2048       gst_event_unref (ev);
2049
2050       gst_iterator_free (it);
2051     }
2052   }
2053
2054   GST_SPLITMUX_LOCK (splitmux);
2055   set_next_filename (splitmux, ctx);
2056   splitmux->muxed_out_bytes = 0;
2057   GST_SPLITMUX_UNLOCK (splitmux);
2058
2059   if (gst_element_set_state (sink,
2060           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2061     gst_element_set_state (sink, GST_STATE_NULL);
2062     gst_element_set_locked_state (muxer, FALSE);
2063     gst_element_set_locked_state (sink, FALSE);
2064
2065     goto fail_output;
2066   }
2067
2068   if (gst_element_set_state (muxer,
2069           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2070     gst_element_set_state (muxer, GST_STATE_NULL);
2071     gst_element_set_state (sink, GST_STATE_NULL);
2072     gst_element_set_locked_state (muxer, FALSE);
2073     gst_element_set_locked_state (sink, FALSE);
2074     goto fail_muxer;
2075   }
2076
2077   gst_element_set_locked_state (muxer, FALSE);
2078   gst_element_set_locked_state (sink, FALSE);
2079
2080   gst_object_unref (sink);
2081   gst_object_unref (muxer);
2082
2083   GST_SPLITMUX_LOCK (splitmux);
2084   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2085   splitmux->switching_fragment = FALSE;
2086   do_async_done (splitmux);
2087
2088   splitmux->ready_for_output = TRUE;
2089
2090   g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2091   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2092
2093   send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2094
2095   /* FIXME: Is this always the correct next state? */
2096   GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2097   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2098   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2099   return GST_FLOW_OK;
2100
2101 fail:
2102   gst_object_unref (sink);
2103   gst_object_unref (muxer);
2104
2105   GST_SPLITMUX_LOCK (splitmux);
2106   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2107   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2108       ("Could not create the new muxer/sink"), NULL);
2109   return GST_FLOW_ERROR;
2110
2111 fail_output:
2112   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2113       ("Could not start new output sink"), NULL);
2114   gst_object_unref (sink);
2115   gst_object_unref (muxer);
2116
2117   GST_SPLITMUX_LOCK (splitmux);
2118   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2119   splitmux->switching_fragment = FALSE;
2120   return GST_FLOW_ERROR;
2121
2122 fail_muxer:
2123   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2124       ("Could not start new muxer"), NULL);
2125   gst_object_unref (sink);
2126   gst_object_unref (muxer);
2127
2128   GST_SPLITMUX_LOCK (splitmux);
2129   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2130   splitmux->switching_fragment = FALSE;
2131   return GST_FLOW_ERROR;
2132 }
2133
2134 static void
2135 bus_handler (GstBin * bin, GstMessage * message)
2136 {
2137   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2138
2139   switch (GST_MESSAGE_TYPE (message)) {
2140     case GST_MESSAGE_EOS:{
2141       /* If the state is draining out the current file, drop this EOS */
2142       GstElement *sink;
2143
2144       sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2145       GST_SPLITMUX_LOCK (splitmux);
2146
2147       send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2148
2149       if (splitmux->async_finalize) {
2150
2151         if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2152           if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2153                       EOS_FROM_US)) == 2) {
2154             GstElement *muxer;
2155             GstPad *sinksink, *muxersrc;
2156
2157             sinksink = gst_element_get_static_pad (sink, "sink");
2158             muxersrc = gst_pad_get_peer (sinksink);
2159             muxer = gst_pad_get_parent_element (muxersrc);
2160             gst_object_unref (sinksink);
2161             gst_object_unref (muxersrc);
2162
2163             gst_element_call_async (muxer,
2164                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2165                 gst_object_ref (splitmux), gst_object_unref);
2166             gst_element_call_async (sink,
2167                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2168                 gst_object_ref (splitmux), gst_object_unref);
2169             gst_object_unref (muxer);
2170           } else {
2171             g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2172                 GINT_TO_POINTER (2));
2173           }
2174           GST_DEBUG_OBJECT (splitmux,
2175               "Caught async EOS from previous muxer+sink. Dropping.");
2176           /* We forward the EOS so that it gets aggregated as normal. If the sink
2177            * finishes and is removed before the end, it will be de-aggregated */
2178           gst_message_unref (message);
2179           GST_SPLITMUX_UNLOCK (splitmux);
2180           return;
2181         }
2182       } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2183         GST_DEBUG_OBJECT (splitmux,
2184             "Passing EOS message. Output state %d max_out_running_time %"
2185             GST_STIME_FORMAT, splitmux->output_state,
2186             GST_STIME_ARGS (splitmux->max_out_running_time));
2187       } else {
2188         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2189         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2190         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2191
2192         gst_message_unref (message);
2193         GST_SPLITMUX_UNLOCK (splitmux);
2194         return;
2195       }
2196       GST_SPLITMUX_UNLOCK (splitmux);
2197       break;
2198     }
2199     case GST_MESSAGE_ASYNC_START:
2200     case GST_MESSAGE_ASYNC_DONE:
2201       /* Ignore state changes from our children while switching */
2202       GST_SPLITMUX_LOCK (splitmux);
2203       if (splitmux->switching_fragment) {
2204         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2205             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2206           GST_LOG_OBJECT (splitmux,
2207               "Ignoring state change from child %" GST_PTR_FORMAT
2208               " while switching", GST_MESSAGE_SRC (message));
2209           gst_message_unref (message);
2210           GST_SPLITMUX_UNLOCK (splitmux);
2211           return;
2212         }
2213       }
2214       GST_SPLITMUX_UNLOCK (splitmux);
2215       break;
2216     case GST_MESSAGE_WARNING:
2217     {
2218       GError *gerror = NULL;
2219
2220       gst_message_parse_warning (message, &gerror, NULL);
2221
2222       if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2223         GList *item;
2224         gboolean caps_change = FALSE;
2225
2226         GST_SPLITMUX_LOCK (splitmux);
2227
2228         for (item = splitmux->contexts; item; item = item->next) {
2229           MqStreamCtx *ctx = item->data;
2230
2231           if (ctx->caps_change) {
2232             caps_change = TRUE;
2233             break;
2234           }
2235         }
2236
2237         GST_SPLITMUX_UNLOCK (splitmux);
2238
2239         if (caps_change) {
2240           GST_LOG_OBJECT (splitmux,
2241               "Ignoring warning change from child %" GST_PTR_FORMAT
2242               " while switching caps", GST_MESSAGE_SRC (message));
2243           gst_message_unref (message);
2244           return;
2245         }
2246       }
2247       break;
2248     }
2249     default:
2250       break;
2251   }
2252
2253   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2254 }
2255
2256 static void
2257 ctx_set_unblock (MqStreamCtx * ctx)
2258 {
2259   ctx->need_unblock = TRUE;
2260 }
2261
2262 static gboolean
2263 need_new_fragment (GstSplitMuxSink * splitmux,
2264     GstClockTime queued_time, GstClockTime queued_gop_time,
2265     guint64 queued_bytes)
2266 {
2267   guint64 thresh_bytes;
2268   GstClockTime thresh_time;
2269   gboolean check_robust_muxing;
2270   GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2271   GstClockTime *ptr_to_time;
2272
2273   GST_OBJECT_LOCK (splitmux);
2274   thresh_bytes = splitmux->threshold_bytes;
2275   thresh_time = splitmux->threshold_time;
2276   ptr_to_time = (GstClockTime *)
2277       gst_queue_array_peek_head_struct (splitmux->times_to_split);
2278   if (ptr_to_time)
2279     time_to_split = *ptr_to_time;
2280   check_robust_muxing = splitmux->use_robust_muxing
2281       && splitmux->muxer_has_reserved_props;
2282   GST_OBJECT_UNLOCK (splitmux);
2283
2284   /* Have we muxed at least one thing from the reference
2285    * stream into the file? If not, no other streams can have
2286    * either */
2287   if (splitmux->fragment_reference_bytes <= 0) {
2288     GST_TRACE_OBJECT (splitmux,
2289         "Not ready to split - nothing muxed on the reference stream");
2290     return FALSE;
2291   }
2292
2293   /* User told us to split now */
2294   if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2295     GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2296     return TRUE;
2297   }
2298
2299   /* User told us to split at this running time */
2300   if (splitmux->gop_start_time >= time_to_split) {
2301     GST_OBJECT_LOCK (splitmux);
2302     /* Dequeue running time */
2303     gst_queue_array_pop_head_struct (splitmux->times_to_split);
2304     /* Empty any running times after this that are past now */
2305     ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2306     while (ptr_to_time) {
2307       time_to_split = *ptr_to_time;
2308       if (splitmux->gop_start_time < time_to_split) {
2309         break;
2310       }
2311       gst_queue_array_pop_head_struct (splitmux->times_to_split);
2312       ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2313     }
2314     GST_TRACE_OBJECT (splitmux,
2315         "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2316         GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->gop_start_time),
2317         GST_STIME_ARGS (time_to_split));
2318     GST_OBJECT_UNLOCK (splitmux);
2319     return TRUE;
2320   }
2321
2322   if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2323     GST_TRACE_OBJECT (splitmux,
2324         "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2325     return TRUE;                /* Would overrun byte limit */
2326   }
2327
2328   if (thresh_time > 0 && queued_time > thresh_time) {
2329     GST_TRACE_OBJECT (splitmux,
2330         "queued time %" GST_STIME_FORMAT " overruns time limit",
2331         GST_STIME_ARGS (queued_time));
2332     return TRUE;                /* Would overrun time limit */
2333   }
2334
2335   if (splitmux->tc_interval &&
2336       GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2337       splitmux->reference_ctx->in_running_time >
2338       splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2339     GST_TRACE_OBJECT (splitmux,
2340         "in running time %" GST_STIME_FORMAT " overruns time limit %"
2341         GST_TIME_FORMAT,
2342         GST_STIME_ARGS (splitmux->reference_ctx->in_running_time),
2343         GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2344     return TRUE;
2345   }
2346
2347   if (check_robust_muxing) {
2348     GstClockTime mux_reserved_remain;
2349
2350     g_object_get (splitmux->muxer,
2351         "reserved-duration-remaining", &mux_reserved_remain, NULL);
2352
2353     GST_LOG_OBJECT (splitmux,
2354         "Muxer robust muxing report - %" G_GUINT64_FORMAT
2355         " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2356         mux_reserved_remain, queued_gop_time);
2357
2358     if (queued_gop_time >= mux_reserved_remain) {
2359       GST_INFO_OBJECT (splitmux,
2360           "File is about to run out of header room - %" G_GUINT64_FORMAT
2361           " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2362           ". Switching to new file", mux_reserved_remain, queued_gop_time);
2363       return TRUE;
2364     }
2365   }
2366
2367   /* Continue and mux this GOP */
2368   return FALSE;
2369 }
2370
2371 /* probably we want to add this API? */
2372 static void
2373 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2374 {
2375   GstVideoTimeCode *timecode = NULL;
2376
2377   g_return_if_fail (old_tc != NULL);
2378
2379   if (*old_tc == new_tc)
2380     return;
2381
2382   if (new_tc)
2383     timecode = gst_video_time_code_copy (new_tc);
2384
2385   if (*old_tc)
2386     gst_video_time_code_free (*old_tc);
2387
2388   *old_tc = timecode;
2389 }
2390
2391 /* Called with splitmux lock held */
2392 /* Called when entering ProcessingCompleteGop state
2393  * Assess if mq contents overflowed the current file
2394  *   -> If yes, need to switch to new file
2395  *   -> if no, set max_out_running_time to let this GOP in and
2396  *      go to COLLECTING_GOP_START state
2397  */
2398 static void
2399 handle_gathered_gop (GstSplitMuxSink * splitmux)
2400 {
2401   guint64 queued_bytes;
2402   GstClockTimeDiff queued_time = 0;
2403   GstClockTimeDiff queued_gop_time = 0;
2404   GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
2405   SplitMuxOutputCommand *cmd;
2406
2407   /* Assess if the multiqueue contents overflowed the current file */
2408   /* When considering if a newly gathered GOP overflows
2409    * the time limit for the file, only consider the running time of the
2410    * reference stream. Other streams might have run ahead a little bit,
2411    * but extra pieces won't be released to the muxer beyond the reference
2412    * stream cut-off anyway - so it forms the limit. */
2413   queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
2414   queued_time = splitmux->reference_ctx->in_running_time;
2415   /* queued_gop_time tracks how much unwritten data there is waiting to
2416    * be written to this fragment including this GOP */
2417   if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2418     queued_gop_time =
2419         splitmux->reference_ctx->in_running_time -
2420         splitmux->reference_ctx->out_running_time;
2421   else
2422     queued_gop_time =
2423         splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
2424
2425   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2426   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2427       " bytes %" G_GUINT64_FORMAT " in running time %" GST_STIME_FORMAT
2428       " gop start time %" GST_STIME_FORMAT,
2429       GST_STIME_ARGS (queued_time), queued_bytes,
2430       GST_STIME_ARGS (splitmux->reference_ctx->in_running_time),
2431       GST_STIME_ARGS (splitmux->gop_start_time));
2432
2433   if (queued_gop_time < 0)
2434     goto error_gop_duration;
2435
2436   if (queued_time < splitmux->fragment_start_time)
2437     goto error_queued_time;
2438
2439   queued_time -= splitmux->fragment_start_time;
2440   if (queued_time < queued_gop_time)
2441     queued_gop_time = queued_time;
2442
2443   /* Expand queued bytes estimate by muxer overhead */
2444   queued_bytes += (queued_bytes * splitmux->mux_overhead);
2445
2446   /* Check for overrun - have we output at least one byte and overrun
2447    * either threshold? */
2448   if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2449     if (splitmux->async_finalize) {
2450       GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2451       *sink_running_time = splitmux->reference_ctx->out_running_time;
2452       g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2453           RUNNING_TIME, sink_running_time, g_free);
2454     }
2455     g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2456     /* Tell the output side to start a new fragment */
2457     GST_INFO_OBJECT (splitmux,
2458         "This GOP (dur %" GST_STIME_FORMAT
2459         ") would overflow the fragment, Sending start_new_fragment cmd",
2460         GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
2461             splitmux->gop_start_time));
2462     cmd = out_cmd_buf_new ();
2463     cmd->start_new_fragment = TRUE;
2464     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2465     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2466
2467     new_out_ts = splitmux->reference_ctx->in_running_time;
2468     splitmux->fragment_start_time = splitmux->gop_start_time;
2469     splitmux->fragment_total_bytes = 0;
2470     splitmux->fragment_reference_bytes = 0;
2471
2472     if (splitmux->tc_interval) {
2473       video_time_code_replace (&splitmux->fragment_start_tc,
2474           splitmux->gop_start_tc);
2475       splitmux->next_fragment_start_tc_time =
2476           calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2477           splitmux->fragment_start_time, NULL);
2478       if (!GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2479         GST_WARNING_OBJECT (splitmux,
2480             "Couldn't calculate next fragment start time for timecode mode");
2481         /* shouldn't happen, but reset all and try again with next buffers */
2482         gst_splitmux_reset_timecode (splitmux);
2483       }
2484     }
2485   }
2486
2487   /* And set up to collect the next GOP */
2488   if (!splitmux->reference_ctx->in_eos) {
2489     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2490     splitmux->gop_start_time = new_out_ts;
2491     if (splitmux->tc_interval)
2492       video_time_code_replace (&splitmux->gop_start_tc, splitmux->in_tc);
2493   } else {
2494     /* This is probably already the current state, but just in case: */
2495     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2496     new_out_ts = GST_CLOCK_STIME_NONE;  /* EOS runs until forever */
2497   }
2498
2499   /* And wake all input contexts to send a wake-up event */
2500   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2501   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2502
2503   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2504   splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
2505   splitmux->fragment_reference_bytes += splitmux->gop_reference_bytes;
2506
2507   if (splitmux->gop_total_bytes > 0) {
2508     GST_LOG_OBJECT (splitmux,
2509         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2510         " time %" GST_STIME_FORMAT,
2511         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2512
2513     /* Send this GOP to the output command queue */
2514     cmd = out_cmd_buf_new ();
2515     cmd->start_new_fragment = FALSE;
2516     cmd->max_output_ts = new_out_ts;
2517     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2518         GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
2519     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2520
2521     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2522   }
2523
2524   splitmux->gop_total_bytes = 0;
2525   splitmux->gop_reference_bytes = 0;
2526   return;
2527
2528 error_gop_duration:
2529   GST_ELEMENT_ERROR (splitmux,
2530       STREAM, FAILED, ("Timestamping error on input streams"),
2531       ("Queued GOP time is negative %" GST_STIME_FORMAT,
2532           GST_STIME_ARGS (queued_gop_time)));
2533   return;
2534 error_queued_time:
2535   GST_ELEMENT_ERROR (splitmux,
2536       STREAM, FAILED, ("Timestamping error on input streams"),
2537       ("Queued time is negative. Input went backwards. queued_time - %"
2538           GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2539   return;
2540 }
2541
2542 /* Called with splitmux lock held */
2543 /* Called from each input pad when it is has all the pieces
2544  * for a GOP or EOS, starting with the reference pad which has set the
2545  * splitmux->max_in_running_time
2546  */
2547 static void
2548 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2549 {
2550   GList *cur;
2551   GstEvent *event;
2552
2553   /* On ENDING_FILE, the reference stream sends a command to start a new
2554    * fragment, then releases the GOP for output in the new fragment.
2555    *  If some streams received no buffer during the last GOP that overran,
2556    * because its next buffer has a timestamp bigger than
2557    * ctx->max_in_running_time, its queue is empty. In that case the only
2558    * way to wakeup the output thread is by injecting an event in the
2559    * queue. This usually happen with subtitle streams.
2560    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2561   if (ctx->need_unblock) {
2562     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2563     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2564         GST_EVENT_TYPE_SERIALIZED,
2565         gst_structure_new ("splitmuxsink-unblock", "timestamp",
2566             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2567
2568     GST_SPLITMUX_UNLOCK (splitmux);
2569     gst_pad_send_event (ctx->sinkpad, event);
2570     GST_SPLITMUX_LOCK (splitmux);
2571
2572     ctx->need_unblock = FALSE;
2573     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2574     /* state may have changed while we were unlocked. Loop again if so */
2575     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2576       return;
2577   }
2578
2579   do {
2580     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2581       gboolean ready = TRUE;
2582
2583       /* Iterate each pad, and check that the input running time is at least
2584        * up to the reference running time, and if so handle the collected GOP */
2585       GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
2586           GST_STIME_FORMAT " ctx %p",
2587           GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
2588       for (cur = g_list_first (splitmux->contexts); cur != NULL;
2589           cur = g_list_next (cur)) {
2590         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2591
2592         GST_LOG_OBJECT (splitmux,
2593             "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2594             " EOS %d", tmpctx, tmpctx->sinkpad,
2595             GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2596
2597         if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
2598             tmpctx->in_running_time < splitmux->max_in_running_time &&
2599             !tmpctx->in_eos) {
2600           GST_LOG_OBJECT (splitmux,
2601               "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2602               tmpctx, tmpctx->sinkpad);
2603           ready = FALSE;
2604           break;
2605         }
2606       }
2607       if (ready) {
2608         GST_DEBUG_OBJECT (splitmux,
2609             "Collected GOP is complete. Processing (ctx %p)", ctx);
2610         /* All pads have a complete GOP, release it into the multiqueue */
2611         handle_gathered_gop (splitmux);
2612
2613         /* The user has requested a split, we can split now that the previous GOP
2614          * has been collected to the correct location */
2615         if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2616                 TRUE, FALSE)) {
2617           g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2618         }
2619       }
2620     }
2621
2622     /* If upstream reached EOS we are not expecting more data, no need to wait
2623      * here. */
2624     if (ctx->in_eos)
2625       return;
2626
2627     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2628         !ctx->flushing &&
2629         (ctx->in_running_time >= splitmux->max_in_running_time) &&
2630         (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
2631       /* Some pad is not yet ready, or GOP is being pushed
2632        * either way, sleep and wait to get woken */
2633       GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2634       GST_SPLITMUX_WAIT_INPUT (splitmux);
2635       GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2636     } else {
2637       /* This pad is not ready or the state changed - break out and get another
2638        * buffer / event */
2639       break;
2640     }
2641   } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2642 }
2643
2644 static GstPadProbeReturn
2645 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2646 {
2647   GstSplitMuxSink *splitmux = ctx->splitmux;
2648   GstFlowReturn ret = GST_FLOW_OK;
2649   GstBuffer *buf;
2650   MqStreamBuf *buf_info = NULL;
2651   GstClockTime ts;
2652   gboolean loop_again;
2653   gboolean keyframe = FALSE;
2654
2655   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2656
2657   /* FIXME: Handle buffer lists, until then make it clear they won't work */
2658   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2659     g_warning ("Buffer list handling not implemented");
2660     return GST_PAD_PROBE_DROP;
2661   }
2662   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2663       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2664     GstEvent *event = gst_pad_probe_info_get_event (info);
2665
2666     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2667
2668     switch (GST_EVENT_TYPE (event)) {
2669       case GST_EVENT_SEGMENT:
2670         gst_event_copy_segment (event, &ctx->in_segment);
2671         break;
2672       case GST_EVENT_FLUSH_STOP:
2673         GST_SPLITMUX_LOCK (splitmux);
2674         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2675         ctx->in_eos = FALSE;
2676         ctx->in_running_time = GST_CLOCK_STIME_NONE;
2677         GST_SPLITMUX_UNLOCK (splitmux);
2678         break;
2679       case GST_EVENT_EOS:
2680         GST_SPLITMUX_LOCK (splitmux);
2681         ctx->in_eos = TRUE;
2682
2683         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2684           ret = GST_FLOW_FLUSHING;
2685           goto beach;
2686         }
2687
2688         if (ctx->is_reference) {
2689           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2690           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2691           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2692           /* Wake up other input pads to collect this GOP */
2693           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2694           check_completed_gop (splitmux, ctx);
2695         } else if (splitmux->input_state ==
2696             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2697           /* If we are waiting for a GOP to be completed (ie, for aux
2698            * pads to catch up), then this pad is complete, so check
2699            * if the whole GOP is.
2700            */
2701           check_completed_gop (splitmux, ctx);
2702         }
2703         GST_SPLITMUX_UNLOCK (splitmux);
2704         break;
2705       case GST_EVENT_GAP:{
2706         GstClockTime gap_ts;
2707         GstClockTimeDiff rtime;
2708
2709         gst_event_parse_gap (event, &gap_ts, NULL);
2710         if (gap_ts == GST_CLOCK_TIME_NONE)
2711           break;
2712
2713         GST_SPLITMUX_LOCK (splitmux);
2714
2715         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2716           ret = GST_FLOW_FLUSHING;
2717           goto beach;
2718         }
2719         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2720
2721         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2722             GST_STIME_ARGS (rtime));
2723
2724         if (ctx->is_reference
2725             && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2726           splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
2727           GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2728               GST_STIME_ARGS (splitmux->fragment_start_time));
2729           /* Also take this as the first start time when starting up,
2730            * so that we start counting overflow from the first frame */
2731           if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2732             splitmux->max_in_running_time = splitmux->fragment_start_time;
2733         }
2734
2735         GST_SPLITMUX_UNLOCK (splitmux);
2736         break;
2737       }
2738       default:
2739         break;
2740     }
2741     return GST_PAD_PROBE_PASS;
2742   } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2743     switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2744       case GST_QUERY_ALLOCATION:
2745         return GST_PAD_PROBE_DROP;
2746       default:
2747         return GST_PAD_PROBE_PASS;
2748     }
2749   }
2750
2751   buf = gst_pad_probe_info_get_buffer (info);
2752   buf_info = mq_stream_buf_new ();
2753
2754   if (GST_BUFFER_PTS_IS_VALID (buf))
2755     ts = GST_BUFFER_PTS (buf);
2756   else
2757     ts = GST_BUFFER_DTS (buf);
2758
2759   GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2760
2761   GST_SPLITMUX_LOCK (splitmux);
2762
2763   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2764     ret = GST_FLOW_FLUSHING;
2765     goto beach;
2766   }
2767
2768   /* If this buffer has a timestamp, advance the input timestamp of the
2769    * stream */
2770   if (GST_CLOCK_TIME_IS_VALID (ts)) {
2771     GstClockTimeDiff running_time =
2772         my_segment_to_running_time (&ctx->in_segment, ts);
2773
2774     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2775         GST_STIME_ARGS (running_time));
2776
2777     if (GST_CLOCK_STIME_IS_VALID (running_time)
2778         && running_time > ctx->in_running_time)
2779       ctx->in_running_time = running_time;
2780   }
2781
2782   /* Try to make sure we have a valid running time */
2783   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2784     ctx->in_running_time =
2785         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2786   }
2787
2788   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2789       GST_STIME_ARGS (ctx->in_running_time));
2790
2791   buf_info->run_ts = ctx->in_running_time;
2792   buf_info->buf_size = gst_buffer_get_size (buf);
2793   buf_info->duration = GST_BUFFER_DURATION (buf);
2794
2795   if (ctx->is_reference) {
2796     /* initialize fragment_start_time */
2797     if (splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2798       splitmux->gop_start_time = splitmux->fragment_start_time =
2799           buf_info->run_ts;
2800       GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2801           GST_STIME_ARGS (splitmux->fragment_start_time));
2802
2803       /* Also take this as the first start time when starting up,
2804        * so that we start counting overflow from the first frame */
2805       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2806         splitmux->max_in_running_time = splitmux->fragment_start_time;
2807     }
2808
2809     if (splitmux->tc_interval) {
2810       GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
2811       if (tc_meta) {
2812         video_time_code_replace (&splitmux->in_tc, &tc_meta->tc);
2813
2814         if (!splitmux->fragment_start_tc) {
2815           /* also initialize fragment_start_tc */
2816           video_time_code_replace (&splitmux->gop_start_tc, &tc_meta->tc);
2817           video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
2818
2819           splitmux->next_fragment_start_tc_time =
2820               calculate_next_max_timecode (splitmux, splitmux->in_tc,
2821               ctx->in_running_time, NULL);
2822           GST_DEBUG_OBJECT (splitmux, "Initialize next fragment start tc time %"
2823               GST_TIME_FORMAT,
2824               GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2825         }
2826       }
2827     }
2828
2829     /* Check whether we need to request next keyframe depending on
2830      * current running time */
2831     if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) &&
2832         request_next_keyframe (splitmux, buf, ctx->in_running_time) == FALSE) {
2833       GST_WARNING_OBJECT (splitmux,
2834           "Could not request a keyframe. Files may not split at the exact location they should");
2835     }
2836   }
2837
2838   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
2839       " total GOP bytes %" G_GUINT64_FORMAT,
2840       GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
2841
2842   loop_again = TRUE;
2843   do {
2844     if (ctx->flushing)
2845       break;
2846
2847     switch (splitmux->input_state) {
2848       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
2849         if (ctx->is_releasing) {
2850           /* The pad belonging to this context is being released */
2851           GST_WARNING_OBJECT (pad, "Pad is being released while the muxer is "
2852               "running. Data might not drain correctly");
2853           loop_again = FALSE;
2854         } else if (ctx->is_reference) {
2855           /* This is the reference context. If it's a keyframe,
2856            * it marks the start of a new GOP and we should wait in
2857            * check_completed_gop before continuing, but either way
2858            * (keyframe or no, we'll pass this buffer through after
2859            * so set loop_again to FALSE */
2860           loop_again = FALSE;
2861
2862           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
2863             /* Allow other input pads to catch up to here too */
2864             if (ctx->in_running_time > splitmux->max_in_running_time)
2865               splitmux->max_in_running_time = ctx->in_running_time;
2866             GST_LOG_OBJECT (splitmux,
2867                 "Max in running time now %" GST_TIME_FORMAT,
2868                 GST_TIME_ARGS (splitmux->max_in_running_time));
2869             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2870             break;
2871           }
2872           GST_INFO_OBJECT (pad,
2873               "Have keyframe with running time %" GST_STIME_FORMAT,
2874               GST_STIME_ARGS (ctx->in_running_time));
2875           keyframe = TRUE;
2876           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2877           if (ctx->in_running_time > splitmux->max_in_running_time)
2878             splitmux->max_in_running_time = ctx->in_running_time;
2879           GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
2880               GST_TIME_ARGS (splitmux->max_in_running_time));
2881           /* Wake up other input pads to collect this GOP */
2882           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2883           check_completed_gop (splitmux, ctx);
2884         } else {
2885           /* Pass this buffer if the reference ctx is far enough ahead */
2886           if (ctx->in_running_time < splitmux->max_in_running_time) {
2887             loop_again = FALSE;
2888             break;
2889           }
2890
2891           /* We're still waiting for a keyframe on the reference pad, sleep */
2892           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
2893           GST_SPLITMUX_WAIT_INPUT (splitmux);
2894           GST_LOG_OBJECT (pad,
2895               "Done sleeping for GOP start input state now %d",
2896               splitmux->input_state);
2897         }
2898         break;
2899       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
2900         /* We're collecting a GOP. If this is the reference context,
2901          * we need to check if this is a keyframe that marks the start
2902          * of the next GOP. If it is, it marks the end of the GOP we're
2903          * collecting, so sleep and wait until all the other pads also
2904          * reach that timestamp - at which point, we have an entire GOP
2905          * and either go to ENDING_FILE or release this GOP to the muxer and
2906          * go back to COLLECT_GOP_START. */
2907
2908         /* If we overran the target timestamp, it might be time to process
2909          * the GOP, otherwise bail out for more data
2910          */
2911         GST_LOG_OBJECT (pad,
2912             "Checking TS %" GST_STIME_FORMAT " against max %"
2913             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
2914             GST_STIME_ARGS (splitmux->max_in_running_time));
2915
2916         if (ctx->in_running_time < splitmux->max_in_running_time) {
2917           loop_again = FALSE;
2918           break;
2919         }
2920
2921         GST_LOG_OBJECT (pad,
2922             "Collected last packet of GOP. Checking other pads");
2923         check_completed_gop (splitmux, ctx);
2924         break;
2925       }
2926       case SPLITMUX_INPUT_STATE_FINISHING_UP:
2927         loop_again = FALSE;
2928         break;
2929       default:
2930         loop_again = FALSE;
2931         break;
2932     }
2933   }
2934   while (loop_again);
2935
2936   if (keyframe && ctx->is_reference)
2937     splitmux->queued_keyframes++;
2938   buf_info->keyframe = keyframe;
2939
2940   /* Update total input byte counter for overflow detect */
2941   splitmux->gop_total_bytes += buf_info->buf_size;
2942   if (ctx->is_reference) {
2943     splitmux->gop_reference_bytes += buf_info->buf_size;
2944   }
2945
2946   /* Now add this buffer to the queue just before returning */
2947   g_queue_push_head (&ctx->queued_bufs, buf_info);
2948
2949   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
2950       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
2951
2952   GST_SPLITMUX_UNLOCK (splitmux);
2953   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
2954   return GST_PAD_PROBE_PASS;
2955
2956 beach:
2957   GST_SPLITMUX_UNLOCK (splitmux);
2958   if (buf_info)
2959     mq_stream_buf_free (buf_info);
2960   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
2961   return GST_PAD_PROBE_PASS;
2962 }
2963
2964 static void
2965 grow_blocked_queues (GstSplitMuxSink * splitmux)
2966 {
2967   GList *cur;
2968
2969   /* Scan other queues for full-ness and grow them */
2970   for (cur = g_list_first (splitmux->contexts);
2971       cur != NULL; cur = g_list_next (cur)) {
2972     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2973     guint cur_limit;
2974     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
2975
2976     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
2977     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
2978
2979     if (cur_len >= cur_limit) {
2980       cur_limit = cur_len + 1;
2981       GST_DEBUG_OBJECT (tmpctx->q,
2982           "Queue overflowed and needs enlarging. Growing to %u buffers",
2983           cur_limit);
2984       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
2985     }
2986   }
2987 }
2988
2989 static void
2990 handle_q_underrun (GstElement * q, gpointer user_data)
2991 {
2992   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2993   GstSplitMuxSink *splitmux = ctx->splitmux;
2994
2995   GST_SPLITMUX_LOCK (splitmux);
2996   GST_DEBUG_OBJECT (q,
2997       "Queue reported underrun with %d keyframes and %d cmds enqueued",
2998       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2999   grow_blocked_queues (splitmux);
3000   GST_SPLITMUX_UNLOCK (splitmux);
3001 }
3002
3003 static void
3004 handle_q_overrun (GstElement * q, gpointer user_data)
3005 {
3006   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3007   GstSplitMuxSink *splitmux = ctx->splitmux;
3008   gboolean allow_grow = FALSE;
3009
3010   GST_SPLITMUX_LOCK (splitmux);
3011   GST_DEBUG_OBJECT (q,
3012       "Queue reported overrun with %d keyframes and %d cmds enqueued",
3013       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3014
3015   if (splitmux->queued_keyframes < 2) {
3016     /* Less than a full GOP queued, grow the queue */
3017     allow_grow = TRUE;
3018   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3019     allow_grow = TRUE;
3020   } else {
3021     /* If another queue is starved, grow */
3022     GList *cur;
3023     for (cur = g_list_first (splitmux->contexts);
3024         cur != NULL; cur = g_list_next (cur)) {
3025       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3026       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3027         allow_grow = TRUE;
3028       }
3029     }
3030   }
3031   GST_SPLITMUX_UNLOCK (splitmux);
3032
3033   if (allow_grow) {
3034     guint cur_limit;
3035
3036     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3037     cur_limit++;
3038
3039     GST_DEBUG_OBJECT (q,
3040         "Queue overflowed and needs enlarging. Growing to %u buffers",
3041         cur_limit);
3042
3043     g_object_set (q, "max-size-buffers", cur_limit, NULL);
3044   }
3045 }
3046
3047 /* Called with SPLITMUX lock held */
3048 static const gchar *
3049 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3050 {
3051   const gchar *ret = NULL;
3052
3053   if (splitmux->muxerpad_map == NULL)
3054     return NULL;
3055
3056   if (sinkpad_name == NULL) {
3057     GST_WARNING_OBJECT (splitmux,
3058         "Can't look up request pad in pad map without providing a pad name");
3059     return NULL;
3060   }
3061
3062   ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3063   if (ret) {
3064     GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3065         ret);
3066     return g_strdup (ret);
3067   }
3068
3069   return NULL;
3070 }
3071
3072 static GstPad *
3073 gst_splitmux_sink_request_new_pad (GstElement * element,
3074     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3075 {
3076   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3077   GstPadTemplate *mux_template = NULL;
3078   GstPad *ret = NULL, *muxpad = NULL;
3079   GstElement *q;
3080   GstPad *q_sink = NULL, *q_src = NULL;
3081   gchar *gname, *qname;
3082   gboolean is_primary_video = FALSE, is_video = FALSE,
3083       muxer_is_requestpad = FALSE;
3084   MqStreamCtx *ctx;
3085   const gchar *muxer_padname = NULL;
3086
3087   GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3088
3089   GST_SPLITMUX_LOCK (splitmux);
3090   if (!create_muxer (splitmux))
3091     goto fail;
3092   g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3093
3094   if (g_str_equal (templ->name_template, "video") ||
3095       g_str_has_prefix (templ->name_template, "video_aux_")) {
3096     is_primary_video = g_str_equal (templ->name_template, "video");
3097     if (is_primary_video && splitmux->have_video)
3098       goto already_have_video;
3099     is_video = TRUE;
3100   }
3101
3102   /* See if there's a pad map and it lists this pad */
3103   muxer_padname = lookup_muxer_pad (splitmux, name);
3104
3105   if (muxer_padname == NULL) {
3106     if (is_video) {
3107       /* FIXME: Look for a pad template with matching caps, rather than by name */
3108       GST_DEBUG_OBJECT (element,
3109           "searching for pad-template with name 'video_%%u'");
3110       mux_template =
3111           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3112           (splitmux->muxer), "video_%u");
3113
3114       /* Fallback to find sink pad templates named 'video' (flvmux) */
3115       if (!mux_template) {
3116         GST_DEBUG_OBJECT (element,
3117             "searching for pad-template with name 'video'");
3118         mux_template =
3119             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3120             (splitmux->muxer), "video");
3121       }
3122       name = NULL;
3123     } else {
3124       GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3125           templ->name_template);
3126       mux_template =
3127           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3128           (splitmux->muxer), templ->name_template);
3129
3130       /* Fallback to find sink pad templates named 'audio' (flvmux) */
3131       if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3132         GST_DEBUG_OBJECT (element,
3133             "searching for pad-template with name 'audio'");
3134         mux_template =
3135             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3136             (splitmux->muxer), "audio");
3137         name = NULL;
3138       }
3139     }
3140
3141     if (mux_template == NULL) {
3142       GST_DEBUG_OBJECT (element,
3143           "searching for pad-template with name 'sink_%%d'");
3144       mux_template =
3145           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3146           (splitmux->muxer), "sink_%d");
3147       name = NULL;
3148     }
3149     if (mux_template == NULL) {
3150       GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3151       mux_template =
3152           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3153           (splitmux->muxer), "sink");
3154       name = NULL;
3155     }
3156
3157     if (mux_template == NULL) {
3158       GST_ERROR_OBJECT (element,
3159           "unable to find a suitable sink pad-template on the muxer");
3160       goto fail;
3161     }
3162     GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3163         mux_template->name_template);
3164
3165     if (mux_template->presence == GST_PAD_REQUEST) {
3166       GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3167
3168       muxpad =
3169           gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3170       muxer_is_requestpad = TRUE;
3171     } else if (mux_template->presence == GST_PAD_ALWAYS) {
3172       GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3173
3174       muxpad =
3175           gst_element_get_static_pad (splitmux->muxer,
3176           mux_template->name_template);
3177     } else {
3178       GST_ERROR_OBJECT (element,
3179           "unexpected pad presence %d", mux_template->presence);
3180       goto fail;
3181     }
3182   } else {
3183     /* Have a muxer pad name */
3184     if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3185       if ((muxpad =
3186               gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3187         muxer_is_requestpad = TRUE;
3188     }
3189     g_free ((gchar *) muxer_padname);
3190     muxer_padname = NULL;
3191   }
3192
3193   /* One way or another, we must have a muxer pad by now */
3194   if (muxpad == NULL)
3195     goto fail;
3196
3197   if (is_primary_video)
3198     gname = g_strdup ("video");
3199   else if (name == NULL)
3200     gname = gst_pad_get_name (muxpad);
3201   else
3202     gname = g_strdup (name);
3203
3204   qname = g_strdup_printf ("queue_%s", gname);
3205   if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3206     g_free (qname);
3207     goto fail;
3208   }
3209   g_free (qname);
3210
3211   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3212
3213   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3214       "max-size-buffers", 5, NULL);
3215
3216   q_sink = gst_element_get_static_pad (q, "sink");
3217   q_src = gst_element_get_static_pad (q, "src");
3218
3219   if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3220     if (muxer_is_requestpad)
3221       gst_element_release_request_pad (splitmux->muxer, muxpad);
3222     gst_object_unref (GST_OBJECT (muxpad));
3223     goto fail;
3224   }
3225
3226   gst_object_unref (GST_OBJECT (muxpad));
3227
3228   ctx = mq_stream_ctx_new (splitmux);
3229   /* Context holds a ref: */
3230   ctx->q = gst_object_ref (q);
3231   ctx->srcpad = q_src;
3232   ctx->sinkpad = q_sink;
3233   ctx->q_overrun_id =
3234       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3235   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3236
3237   ctx->src_pad_block_id =
3238       gst_pad_add_probe (q_src,
3239       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3240       (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3241   if (is_primary_video && splitmux->reference_ctx != NULL) {
3242     splitmux->reference_ctx->is_reference = FALSE;
3243     splitmux->reference_ctx = NULL;
3244   }
3245   if (splitmux->reference_ctx == NULL) {
3246     splitmux->reference_ctx = ctx;
3247     ctx->is_reference = TRUE;
3248   }
3249
3250   ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3251   g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3252
3253   ctx->sink_pad_block_id =
3254       gst_pad_add_probe (q_sink,
3255       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3256       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3257       (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3258
3259   GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3260       " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3261
3262   splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3263
3264   g_free (gname);
3265
3266   if (is_primary_video)
3267     splitmux->have_video = TRUE;
3268
3269   gst_pad_set_active (ret, TRUE);
3270   gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3271
3272   GST_SPLITMUX_UNLOCK (splitmux);
3273
3274   return ret;
3275 fail:
3276   GST_SPLITMUX_UNLOCK (splitmux);
3277
3278   if (q_sink)
3279     gst_object_unref (q_sink);
3280   if (q_src)
3281     gst_object_unref (q_src);
3282   return NULL;
3283 already_have_video:
3284   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3285   GST_SPLITMUX_UNLOCK (splitmux);
3286   return NULL;
3287 }
3288
3289 static void
3290 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3291 {
3292   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3293   GstPad *muxpad = NULL;
3294   MqStreamCtx *ctx =
3295       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3296
3297   GST_SPLITMUX_LOCK (splitmux);
3298
3299   if (splitmux->muxer == NULL)
3300     goto fail;                  /* Elements don't exist yet - nothing to release */
3301
3302   GST_INFO_OBJECT (pad, "releasing request pad");
3303
3304   muxpad = gst_pad_get_peer (ctx->srcpad);
3305
3306   /* Remove the context from our consideration */
3307   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3308
3309   GST_SPLITMUX_UNLOCK (splitmux);
3310
3311   if (ctx->sink_pad_block_id) {
3312     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3313     gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3314   }
3315
3316   if (ctx->src_pad_block_id)
3317     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3318
3319   GST_SPLITMUX_LOCK (splitmux);
3320
3321   ctx->is_releasing = TRUE;
3322   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3323
3324   /* Can release the context now */
3325   mq_stream_ctx_free (ctx);
3326   if (ctx == splitmux->reference_ctx)
3327     splitmux->reference_ctx = NULL;
3328
3329   /* Release and free the muxer input */
3330   if (muxpad) {
3331     gst_element_release_request_pad (splitmux->muxer, muxpad);
3332     gst_object_unref (muxpad);
3333   }
3334
3335   if (GST_PAD_PAD_TEMPLATE (pad) &&
3336       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3337               (pad)), "video"))
3338     splitmux->have_video = FALSE;
3339
3340   gst_element_remove_pad (element, pad);
3341
3342   /* Reset the internal elements only after all request pads are released */
3343   if (splitmux->contexts == NULL)
3344     gst_splitmux_reset_elements (splitmux);
3345
3346   /* Wake up other input streams to check if the completion conditions have
3347    * changed */
3348   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3349
3350 fail:
3351   GST_SPLITMUX_UNLOCK (splitmux);
3352 }
3353
3354 static GstElement *
3355 create_element (GstSplitMuxSink * splitmux,
3356     const gchar * factory, const gchar * name, gboolean locked)
3357 {
3358   GstElement *ret = gst_element_factory_make (factory, name);
3359   if (ret == NULL) {
3360     g_warning ("Failed to create %s - splitmuxsink will not work", name);
3361     return NULL;
3362   }
3363
3364   if (locked) {
3365     /* Ensure the sink starts in locked state and NULL - it will be changed
3366      * by the filename setting code */
3367     gst_element_set_locked_state (ret, TRUE);
3368     gst_element_set_state (ret, GST_STATE_NULL);
3369   }
3370
3371   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3372     g_warning ("Could not add %s element - splitmuxsink will not work", name);
3373     gst_object_unref (ret);
3374     return NULL;
3375   }
3376
3377   return ret;
3378 }
3379
3380 static gboolean
3381 create_muxer (GstSplitMuxSink * splitmux)
3382 {
3383   /* Create internal elements */
3384   if (splitmux->muxer == NULL) {
3385     GstElement *provided_muxer = NULL;
3386
3387     GST_OBJECT_LOCK (splitmux);
3388     if (splitmux->provided_muxer != NULL)
3389       provided_muxer = gst_object_ref (splitmux->provided_muxer);
3390     GST_OBJECT_UNLOCK (splitmux);
3391
3392     if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3393         (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3394       if ((splitmux->muxer =
3395               create_element (splitmux,
3396                   splitmux->muxer_factory ? splitmux->
3397                   muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3398         goto fail;
3399     } else if (splitmux->async_finalize) {
3400       if ((splitmux->muxer =
3401               create_element (splitmux, splitmux->muxer_factory, "muxer",
3402                   FALSE)) == NULL)
3403         goto fail;
3404       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3405         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3406             splitmux->muxer_preset);
3407       if (splitmux->muxer_properties)
3408         gst_structure_foreach (splitmux->muxer_properties,
3409             _set_property_from_structure, splitmux->muxer);
3410     } else {
3411       /* Ensure it's not in locked state (we might be reusing an old element) */
3412       gst_element_set_locked_state (provided_muxer, FALSE);
3413       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3414         g_warning ("Could not add muxer element - splitmuxsink will not work");
3415         gst_object_unref (provided_muxer);
3416         goto fail;
3417       }
3418
3419       splitmux->muxer = provided_muxer;
3420       gst_object_unref (provided_muxer);
3421     }
3422
3423     if (splitmux->use_robust_muxing) {
3424       update_muxer_properties (splitmux);
3425     }
3426   }
3427
3428   return TRUE;
3429 fail:
3430   return FALSE;
3431 }
3432
3433 static GstElement *
3434 find_sink (GstElement * e)
3435 {
3436   GstElement *res = NULL;
3437   GstIterator *iter;
3438   gboolean done = FALSE;
3439   GValue data = { 0, };
3440
3441   if (!GST_IS_BIN (e))
3442     return e;
3443
3444   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3445     return e;
3446
3447   iter = gst_bin_iterate_sinks (GST_BIN (e));
3448   while (!done) {
3449     switch (gst_iterator_next (iter, &data)) {
3450       case GST_ITERATOR_OK:
3451       {
3452         GstElement *child = g_value_get_object (&data);
3453         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3454                 "location") != NULL) {
3455           res = child;
3456           done = TRUE;
3457         }
3458         g_value_reset (&data);
3459         break;
3460       }
3461       case GST_ITERATOR_RESYNC:
3462         gst_iterator_resync (iter);
3463         break;
3464       case GST_ITERATOR_DONE:
3465         done = TRUE;
3466         break;
3467       case GST_ITERATOR_ERROR:
3468         g_assert_not_reached ();
3469         break;
3470     }
3471   }
3472   g_value_unset (&data);
3473   gst_iterator_free (iter);
3474
3475   return res;
3476 }
3477
3478 static gboolean
3479 create_sink (GstSplitMuxSink * splitmux)
3480 {
3481   GstElement *provided_sink = NULL;
3482
3483   if (splitmux->active_sink == NULL) {
3484
3485     GST_OBJECT_LOCK (splitmux);
3486     if (splitmux->provided_sink != NULL)
3487       provided_sink = gst_object_ref (splitmux->provided_sink);
3488     GST_OBJECT_UNLOCK (splitmux);
3489
3490     if ((!splitmux->async_finalize && provided_sink == NULL) ||
3491         (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3492       if ((splitmux->sink =
3493               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3494         goto fail;
3495       splitmux->active_sink = splitmux->sink;
3496     } else if (splitmux->async_finalize) {
3497       if ((splitmux->sink =
3498               create_element (splitmux, splitmux->sink_factory, "sink",
3499                   TRUE)) == NULL)
3500         goto fail;
3501       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3502         gst_preset_load_preset (GST_PRESET (splitmux->sink),
3503             splitmux->sink_preset);
3504       if (splitmux->sink_properties)
3505         gst_structure_foreach (splitmux->sink_properties,
3506             _set_property_from_structure, splitmux->sink);
3507       splitmux->active_sink = splitmux->sink;
3508     } else {
3509       /* Ensure the sink starts in locked state and NULL - it will be changed
3510        * by the filename setting code */
3511       gst_element_set_locked_state (provided_sink, TRUE);
3512       gst_element_set_state (provided_sink, GST_STATE_NULL);
3513       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3514         g_warning ("Could not add sink elements - splitmuxsink will not work");
3515         gst_object_unref (provided_sink);
3516         goto fail;
3517       }
3518
3519       splitmux->active_sink = provided_sink;
3520
3521       /* The bin holds a ref now, we can drop our tmp ref */
3522       gst_object_unref (provided_sink);
3523
3524       /* Find the sink element */
3525       splitmux->sink = find_sink (splitmux->active_sink);
3526       if (splitmux->sink == NULL) {
3527         g_warning
3528             ("Could not locate sink element in provided sink - splitmuxsink will not work");
3529         goto fail;
3530       }
3531     }
3532
3533 #if 1
3534     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3535             "async") != NULL) {
3536       /* async child elements are causing state change races and weird
3537        * failures, so let's try and turn that off */
3538       g_object_set (splitmux->sink, "async", FALSE, NULL);
3539     }
3540 #endif
3541
3542     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3543       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3544       goto fail;
3545     }
3546   }
3547
3548   return TRUE;
3549 fail:
3550   return FALSE;
3551 }
3552
3553 #ifdef __GNUC__
3554 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3555 #endif
3556 static void
3557 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3558 {
3559   gchar *fname = NULL;
3560   GstSample *sample;
3561   GstCaps *caps;
3562
3563   gst_splitmux_sink_ensure_max_files (splitmux);
3564
3565   if (ctx->cur_out_buffer == NULL) {
3566     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3567   }
3568
3569   caps = gst_pad_get_current_caps (ctx->srcpad);
3570   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3571   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3572       splitmux->fragment_id, sample, &fname);
3573   gst_sample_unref (sample);
3574   if (caps)
3575     gst_caps_unref (caps);
3576
3577   if (fname == NULL) {
3578     /* Fallback to the old signal if the new one returned nothing */
3579     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3580         splitmux->fragment_id, &fname);
3581   }
3582
3583   if (!fname)
3584     fname = splitmux->location ?
3585         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3586
3587   if (fname) {
3588     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3589     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3590             "location") != NULL)
3591       g_object_set (splitmux->sink, "location", fname, NULL);
3592     g_free (fname);
3593   }
3594
3595   splitmux->fragment_id++;
3596 }
3597
3598 /* called with GST_SPLITMUX_LOCK */
3599 static void
3600 do_async_start (GstSplitMuxSink * splitmux)
3601 {
3602   GstMessage *message;
3603
3604   if (!splitmux->need_async_start) {
3605     GST_INFO_OBJECT (splitmux, "no async_start needed");
3606     return;
3607   }
3608
3609   splitmux->async_pending = TRUE;
3610
3611   GST_INFO_OBJECT (splitmux, "Sending async_start message");
3612   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3613
3614   GST_SPLITMUX_UNLOCK (splitmux);
3615   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3616       (splitmux), message);
3617   GST_SPLITMUX_LOCK (splitmux);
3618 }
3619
3620 /* called with GST_SPLITMUX_LOCK */
3621 static void
3622 do_async_done (GstSplitMuxSink * splitmux)
3623 {
3624   GstMessage *message;
3625
3626   if (splitmux->async_pending) {
3627     GST_INFO_OBJECT (splitmux, "Sending async_done message");
3628     splitmux->async_pending = FALSE;
3629     GST_SPLITMUX_UNLOCK (splitmux);
3630
3631     message =
3632         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3633         GST_CLOCK_TIME_NONE);
3634     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3635         (splitmux), message);
3636     GST_SPLITMUX_LOCK (splitmux);
3637   }
3638
3639   splitmux->need_async_start = FALSE;
3640 }
3641
3642 static void
3643 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3644 {
3645   splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3646   splitmux->gop_start_time = splitmux->fragment_start_time =
3647       GST_CLOCK_STIME_NONE;
3648   splitmux->max_out_running_time = 0;
3649   splitmux->fragment_total_bytes = 0;
3650   splitmux->fragment_reference_bytes = 0;
3651   splitmux->gop_total_bytes = 0;
3652   splitmux->gop_reference_bytes = 0;
3653   splitmux->muxed_out_bytes = 0;
3654   splitmux->ready_for_output = FALSE;
3655
3656   g_atomic_int_set (&(splitmux->split_requested), FALSE);
3657   g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3658
3659   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
3660   gst_queue_array_clear (splitmux->times_to_split);
3661
3662   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
3663   splitmux->queued_keyframes = 0;
3664
3665   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
3666   g_queue_clear (&splitmux->out_cmd_q);
3667 }
3668
3669 static GstStateChangeReturn
3670 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3671 {
3672   GstStateChangeReturn ret;
3673   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3674
3675   switch (transition) {
3676     case GST_STATE_CHANGE_NULL_TO_READY:{
3677       GST_SPLITMUX_LOCK (splitmux);
3678       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3679         ret = GST_STATE_CHANGE_FAILURE;
3680         GST_SPLITMUX_UNLOCK (splitmux);
3681         goto beach;
3682       }
3683       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3684       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3685       GST_SPLITMUX_UNLOCK (splitmux);
3686       splitmux->fragment_id = splitmux->start_index;
3687       break;
3688     }
3689     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3690       GST_SPLITMUX_LOCK (splitmux);
3691       /* Make sure contexts and tracking times are cleared, in case we're being reused */
3692       gst_splitmux_sink_reset (splitmux);
3693       /* Start by collecting one input on each pad */
3694       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3695       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3696
3697       GST_SPLITMUX_UNLOCK (splitmux);
3698
3699       GST_SPLITMUX_STATE_LOCK (splitmux);
3700       splitmux->shutdown = FALSE;
3701       GST_SPLITMUX_STATE_UNLOCK (splitmux);
3702       break;
3703     }
3704     case GST_STATE_CHANGE_PAUSED_TO_READY:
3705     case GST_STATE_CHANGE_READY_TO_READY:
3706       g_atomic_int_set (&(splitmux->split_requested), FALSE);
3707       g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3708       /* Fall through */
3709     case GST_STATE_CHANGE_READY_TO_NULL:
3710       GST_SPLITMUX_STATE_LOCK (splitmux);
3711       splitmux->shutdown = TRUE;
3712       GST_SPLITMUX_STATE_UNLOCK (splitmux);
3713
3714       GST_SPLITMUX_LOCK (splitmux);
3715       gst_splitmux_sink_reset (splitmux);
3716       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3717       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3718       /* Wake up any blocked threads */
3719       GST_LOG_OBJECT (splitmux,
3720           "State change -> NULL or READY. Waking threads");
3721       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3722       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
3723       GST_SPLITMUX_UNLOCK (splitmux);
3724       break;
3725     default:
3726       break;
3727   }
3728
3729   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3730   if (ret == GST_STATE_CHANGE_FAILURE)
3731     goto beach;
3732
3733   switch (transition) {
3734     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3735       splitmux->need_async_start = TRUE;
3736       break;
3737     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3738       /* Change state async, because our child sink might not
3739        * be ready to do that for us yet if it's state is still locked */
3740
3741       splitmux->need_async_start = TRUE;
3742       /* we want to go async to PAUSED until we managed to configure and add the
3743        * sink */
3744       GST_SPLITMUX_LOCK (splitmux);
3745       do_async_start (splitmux);
3746       GST_SPLITMUX_UNLOCK (splitmux);
3747       ret = GST_STATE_CHANGE_ASYNC;
3748       break;
3749     }
3750     case GST_STATE_CHANGE_READY_TO_NULL:
3751       GST_SPLITMUX_LOCK (splitmux);
3752       splitmux->fragment_id = 0;
3753       /* Reset internal elements only if no pad contexts are using them */
3754       if (splitmux->contexts == NULL)
3755         gst_splitmux_reset_elements (splitmux);
3756       do_async_done (splitmux);
3757       GST_SPLITMUX_UNLOCK (splitmux);
3758       break;
3759     default:
3760       break;
3761   }
3762
3763   return ret;
3764
3765 beach:
3766   if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
3767     /* Cleanup elements on failed transition out of NULL */
3768     gst_splitmux_reset_elements (splitmux);
3769     GST_SPLITMUX_LOCK (splitmux);
3770     do_async_done (splitmux);
3771     GST_SPLITMUX_UNLOCK (splitmux);
3772   }
3773   if (transition == GST_STATE_CHANGE_READY_TO_READY) {
3774     /* READY to READY transition only happens when we're already
3775      * in READY state, but a child element is in NULL, which
3776      * happens when there's an error changing the state of the sink.
3777      * We need to make sure not to fail the state transition, or
3778      * the core won't transition us back to NULL successfully */
3779     ret = GST_STATE_CHANGE_SUCCESS;
3780   }
3781   return ret;
3782 }
3783
3784 static void
3785 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
3786 {
3787   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
3788     splitmux->fragment_id = 0;
3789   }
3790 }
3791
3792 static void
3793 split_now (GstSplitMuxSink * splitmux)
3794 {
3795   g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
3796 }
3797
3798 static void
3799 split_after (GstSplitMuxSink * splitmux)
3800 {
3801   g_atomic_int_set (&(splitmux->split_requested), TRUE);
3802 }
3803
3804 static void
3805 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
3806 {
3807   gboolean send_keyframe_requests;
3808
3809   GST_SPLITMUX_LOCK (splitmux);
3810   gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
3811   send_keyframe_requests = splitmux->send_keyframe_requests;
3812   GST_SPLITMUX_UNLOCK (splitmux);
3813
3814   if (send_keyframe_requests) {
3815     GstEvent *ev =
3816         gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
3817     GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
3818         GST_TIME_ARGS (split_time));
3819     if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
3820       GST_WARNING_OBJECT (splitmux,
3821           "Could not request keyframe at %" GST_TIME_FORMAT,
3822           GST_TIME_ARGS (split_time));
3823     }
3824   }
3825 }