splitmuxsink: Only count keyframes for the reference context, consistently
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @title: splitmuxsink
23  * @short_description: Muxer wrapper for splitting output stream by size or time
24  *
25  * This element wraps a muxer and a sink, and starts a new file when the mux
26  * contents are about to cross a threshold of maximum size of maximum time,
27  * splitting at video keyframe boundaries. Exactly one input video stream
28  * can be muxed, with as many accompanying audio and subtitle streams as
29  * desired.
30  *
31  * By default, it uses mp4mux and filesink, but they can be changed via
32  * the 'muxer' and 'sink' properties.
33  *
34  * The minimum file size is 1 GOP, however - so limits may be overrun if the
35  * distance between any 2 keyframes is larger than the limits.
36  *
37  * If a video stream is available, the splitting process is driven by the video
38  * stream contents, and the video stream must contain closed GOPs for the output
39  * file parts to be played individually correctly. In the absence of a video
40  * stream, the first available stream is used as reference for synchronization.
41  *
42  * In the async-finalize mode, when the threshold is crossed, the old muxer
43  * and sink is disconnected from the pipeline and left to finish the file
44  * asynchronously, and a new muxer and sink is created to continue with the
45  * next fragment. For that reason, instead of muxer and sink objects, the
46  * muxer-factory and sink-factory properties are used to construct the new
47  * objects, together with muxer-properties and sink-properties.
48  *
49  * ## Example pipelines
50  * |[
51  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52  * ]|
53  * Records a video stream captured from a v4l2 device and muxes it into
54  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55  * and 1MB maximum size.
56  *
57  * |[
58  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59  * ]|
60  * Records a video stream captured from a v4l2 device and muxer it into
61  * streamable Matroska files, splitting as needed to limit size/duration to 10
62  * seconds. Each file will finalize asynchronously.
63  *
64  * |[
65  * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
66  * ]|
67  * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
68  * it will deliver to.
69  */
70
71 #ifdef HAVE_CONFIG_H
72 #include "config.h"
73 #endif
74
75 #include <string.h>
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
79
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
82
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
85
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
90
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
93
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97     GstClockTime split_time);
98
99 enum
100 {
101   PROP_0,
102   PROP_LOCATION,
103   PROP_START_INDEX,
104   PROP_MAX_SIZE_TIME,
105   PROP_MAX_SIZE_BYTES,
106   PROP_MAX_SIZE_TIMECODE,
107   PROP_SEND_KEYFRAME_REQUESTS,
108   PROP_MAX_FILES,
109   PROP_MUXER_OVERHEAD,
110   PROP_USE_ROBUST_MUXING,
111   PROP_ALIGNMENT_THRESHOLD,
112   PROP_MUXER,
113   PROP_SINK,
114   PROP_RESET_MUXER,
115   PROP_ASYNC_FINALIZE,
116   PROP_MUXER_FACTORY,
117   PROP_MUXER_PRESET,
118   PROP_MUXER_PROPERTIES,
119   PROP_SINK_FACTORY,
120   PROP_SINK_PRESET,
121   PROP_SINK_PROPERTIES,
122   PROP_MUXERPAD_MAP
123 };
124
125 #define DEFAULT_MAX_SIZE_TIME       0
126 #define DEFAULT_MAX_SIZE_BYTES      0
127 #define DEFAULT_MAX_FILES           0
128 #define DEFAULT_MUXER_OVERHEAD      0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
137
138 typedef struct _AsyncEosHelper
139 {
140   MqStreamCtx *ctx;
141   GstPad *pad;
142 } AsyncEosHelper;
143
144 enum
145 {
146   SIGNAL_FORMAT_LOCATION,
147   SIGNAL_FORMAT_LOCATION_FULL,
148   SIGNAL_SPLIT_NOW,
149   SIGNAL_SPLIT_AFTER,
150   SIGNAL_SPLIT_AT_RUNNING_TIME,
151   SIGNAL_MUXER_ADDED,
152   SIGNAL_SINK_ADDED,
153   SIGNAL_LAST
154 };
155
156 static guint signals[SIGNAL_LAST];
157
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
160     GST_PAD_SINK,
161     GST_PAD_REQUEST,
162     GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
165     GST_PAD_SINK,
166     GST_PAD_REQUEST,
167     GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
170     GST_PAD_SINK,
171     GST_PAD_REQUEST,
172     GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
175     GST_PAD_SINK,
176     GST_PAD_REQUEST,
177     GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
180     GST_PAD_SINK,
181     GST_PAD_REQUEST,
182     GST_STATIC_CAPS_ANY);
183
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188  * to forward an incoming EOS message, but we cannot rely on the state of the
189  * splitmux anymore, so we set this qdata on the sink instead.
190  * The muxer and sink must be destroyed after both of these things have
191  * finished:
192  * 1) The EOS message has been sent when the fragment is ending
193  * 2) The muxer has been unlinked and relinked
194  * Therefore, EOS_FROM_US can have these two values:
195  * 0: EOS was not requested from us. Forward the message. The muxer and the
196  * sink will be destroyed together with the rest of the bin.
197  * 1: EOS was requested from us, but the other of the two tasks hasn't
198  * finished. Set EOS_FROM_US to 2 and do your stuff.
199  * 2: EOS was requested from us and the other of the two tasks has finished.
200  * Now we can destroy the muxer and the sink.
201  */
202
203 static void
204 _do_init (void)
205 {
206   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207   EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208   RUNNING_TIME = g_quark_from_static_string ("running-time");
209   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210       "Split File Muxing Sink");
211 }
212
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
215     _do_init ());
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217     GST_TYPE_SPLITMUX_SINK);
218
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222     const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224     GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
227
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
231
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233     element, GstStateChange transition);
234
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
238     MqStreamCtx * ctx);
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
241
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244     const gchar * factory, const gchar * name, gboolean locked);
245
246 static void do_async_done (GstSplitMuxSink * splitmux);
247 static void gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux);
248
249 static MqStreamBuf *
250 mq_stream_buf_new (void)
251 {
252   return g_slice_new0 (MqStreamBuf);
253 }
254
255 static void
256 mq_stream_buf_free (MqStreamBuf * data)
257 {
258   g_slice_free (MqStreamBuf, data);
259 }
260
261 static SplitMuxOutputCommand *
262 out_cmd_buf_new (void)
263 {
264   return g_slice_new0 (SplitMuxOutputCommand);
265 }
266
267 static void
268 out_cmd_buf_free (SplitMuxOutputCommand * data)
269 {
270   g_slice_free (SplitMuxOutputCommand, data);
271 }
272
273 static void
274 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
275 {
276   GObjectClass *gobject_class = (GObjectClass *) klass;
277   GstElementClass *gstelement_class = (GstElementClass *) klass;
278   GstBinClass *gstbin_class = (GstBinClass *) klass;
279
280   gobject_class->set_property = gst_splitmux_sink_set_property;
281   gobject_class->get_property = gst_splitmux_sink_get_property;
282   gobject_class->dispose = gst_splitmux_sink_dispose;
283   gobject_class->finalize = gst_splitmux_sink_finalize;
284
285   gst_element_class_set_static_metadata (gstelement_class,
286       "Split Muxing Bin", "Generic/Bin/Muxer",
287       "Convenience bin that muxes incoming streams into multiple time/size limited files",
288       "Jan Schmidt <jan@centricular.com>");
289
290   gst_element_class_add_static_pad_template (gstelement_class,
291       &video_sink_template);
292   gst_element_class_add_static_pad_template (gstelement_class,
293       &video_aux_sink_template);
294   gst_element_class_add_static_pad_template (gstelement_class,
295       &audio_sink_template);
296   gst_element_class_add_static_pad_template (gstelement_class,
297       &subtitle_sink_template);
298   gst_element_class_add_static_pad_template (gstelement_class,
299       &caption_sink_template);
300
301   gstelement_class->change_state =
302       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
303   gstelement_class->request_new_pad =
304       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
305   gstelement_class->release_pad =
306       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
307
308   gstbin_class->handle_message = bus_handler;
309
310   g_object_class_install_property (gobject_class, PROP_LOCATION,
311       g_param_spec_string ("location", "File Output Pattern",
312           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
313           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
314   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
315       g_param_spec_double ("mux-overhead", "Muxing Overhead",
316           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
317           DEFAULT_MUXER_OVERHEAD,
318           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
319
320   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
321       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
322           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
323           DEFAULT_MAX_SIZE_TIME,
324           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
325           G_PARAM_STATIC_STRINGS));
326   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
327       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
328           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
329           DEFAULT_MAX_SIZE_BYTES,
330           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
331           G_PARAM_STATIC_STRINGS));
332   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
333       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
334           "Maximum difference in timecode between first and last frame. "
335           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
336           "Will only be effective if a timecode track is present.", NULL,
337           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
338           G_PARAM_STATIC_STRINGS));
339   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
340       g_param_spec_boolean ("send-keyframe-requests",
341           "Request keyframes at max-size-time",
342           "Request a keyframe every max-size-time ns to try splitting at that point. "
343           "Needs max-size-bytes to be 0 in order to be effective.",
344           DEFAULT_SEND_KEYFRAME_REQUESTS,
345           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
346           G_PARAM_STATIC_STRINGS));
347   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
348       g_param_spec_uint ("max-files", "Max files",
349           "Maximum number of files to keep on disk. Once the maximum is reached,"
350           "old files start to be deleted to make room for new ones.", 0,
351           G_MAXUINT, DEFAULT_MAX_FILES,
352           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
354       g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
355           "Allow non-reference streams to be that many ns before the reference"
356           " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
357           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
358           G_PARAM_STATIC_STRINGS));
359
360   g_object_class_install_property (gobject_class, PROP_MUXER,
361       g_param_spec_object ("muxer", "Muxer",
362           "The muxer element to use (NULL = default mp4mux). "
363           "Valid only for async-finalize = FALSE",
364           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
365   g_object_class_install_property (gobject_class, PROP_SINK,
366       g_param_spec_object ("sink", "Sink",
367           "The sink element (or element chain) to use (NULL = default filesink). "
368           "Valid only for async-finalize = FALSE",
369           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
370
371   g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
372       g_param_spec_boolean ("use-robust-muxing",
373           "Support robust-muxing mode of some muxers",
374           "Check if muxers support robust muxing via the reserved-max-duration and "
375           "reserved-duration-remaining properties and use them if so. "
376           "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
377           " create new fragments if the reserved header space is about to overflow. "
378           "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
379           "manually by the app to a non-zero value for robust muxing to have an effect.",
380           DEFAULT_USE_ROBUST_MUXING,
381           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
382
383   g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
384       g_param_spec_boolean ("reset-muxer",
385           "Reset Muxer",
386           "Reset the muxer after each segment. Disabling this will not work for most muxers.",
387           DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
388
389   g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
390       g_param_spec_boolean ("async-finalize",
391           "Finalize fragments asynchronously",
392           "Finalize each fragment asynchronously and start a new one",
393           DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
394   g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
395       g_param_spec_string ("muxer-factory", "Muxer factory",
396           "The muxer element factory to use (default = mp4mux). "
397           "Valid only for async-finalize = TRUE",
398           "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399   /**
400    * GstSplitMuxSink:muxer-preset
401    *
402    * An optional #GstPreset name to use for the muxer. This only has an effect
403    * in `async-finalize=TRUE` mode.
404    *
405    * Since: 1.18
406    */
407   g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
408       g_param_spec_string ("muxer-preset", "Muxer preset",
409           "The muxer preset to use. "
410           "Valid only for async-finalize = TRUE",
411           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
412   g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
413       g_param_spec_boxed ("muxer-properties", "Muxer properties",
414           "The muxer element properties to use. "
415           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
416           "Valid only for async-finalize = TRUE",
417           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
418   g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
419       g_param_spec_string ("sink-factory", "Sink factory",
420           "The sink element factory to use (default = filesink). "
421           "Valid only for async-finalize = TRUE",
422           "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
423   /**
424    * GstSplitMuxSink:sink-preset
425    *
426    * An optional #GstPreset name to use for the sink. This only has an effect
427    * in `async-finalize=TRUE` mode.
428    *
429    * Since: 1.18
430    */
431   g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
432       g_param_spec_string ("sink-preset", "Sink preset",
433           "The sink preset to use. "
434           "Valid only for async-finalize = TRUE",
435           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
436   g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
437       g_param_spec_boxed ("sink-properties", "Sink properties",
438           "The sink element properties to use. "
439           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
440           "Valid only for async-finalize = TRUE",
441           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
442   g_object_class_install_property (gobject_class, PROP_START_INDEX,
443       g_param_spec_int ("start-index", "Start Index",
444           "Start value of fragment index.",
445           0, G_MAXINT, DEFAULT_START_INDEX,
446           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
447
448   /**
449    * GstSplitMuxSink::muxer-pad-map
450    *
451    * An optional GstStructure that provides a map from splitmuxsink sinkpad
452    * names to muxer pad names they should feed. Splitmuxsink has some default
453    * mapping behaviour to link video to video pads and audio to audio pads
454    * that usually works fine. This property is useful if you need to ensure
455    * a particular mapping to muxed streams.
456    *
457    * The GstStructure contains string fields like so:
458    *   splitmuxsink muxer-pad-map=x-pad-map,video=video_1
459    *
460    * Since: 1.18
461    */
462   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
463       g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
464           "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
465           GST_TYPE_STRUCTURE,
466           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
467
468   /**
469    * GstSplitMuxSink::format-location:
470    * @splitmux: the #GstSplitMuxSink
471    * @fragment_id: the sequence number of the file to be created
472    *
473    * Returns: the location to be used for the next output file. This must be
474    *    a newly-allocated string which will be freed with g_free() by the
475    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
476    *    g_strdup_printf() or similar functions to allocate it.
477    */
478   signals[SIGNAL_FORMAT_LOCATION] =
479       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
480       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
481
482   /**
483    * GstSplitMuxSink::format-location-full:
484    * @splitmux: the #GstSplitMuxSink
485    * @fragment_id: the sequence number of the file to be created
486    * @first_sample: A #GstSample containing the first buffer
487    *   from the reference stream in the new file
488    *
489    * Returns: the location to be used for the next output file. This must be
490    *    a newly-allocated string which will be freed with g_free() by the
491    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
492    *    g_strdup_printf() or similar functions to allocate it.
493    *
494    * Since: 1.12
495    */
496   signals[SIGNAL_FORMAT_LOCATION_FULL] =
497       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
498       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
499       GST_TYPE_SAMPLE);
500
501   /**
502    * GstSplitMuxSink::split-now:
503    * @splitmux: the #GstSplitMuxSink
504    *
505    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
506    * The current GOP will be output to the new file.
507    *
508    * Since: 1.14
509    */
510   signals[SIGNAL_SPLIT_NOW] =
511       g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
512       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
513       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
514       G_TYPE_NONE, 0);
515
516   /**
517    * GstSplitMuxSink::split-after:
518    * @splitmux: the #GstSplitMuxSink
519    *
520    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
521    * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
522    *
523    * Since: 1.16
524    */
525   signals[SIGNAL_SPLIT_AFTER] =
526       g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
527       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
528       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
529       G_TYPE_NONE, 0);
530
531   /**
532    * GstSplitMuxSink::split-at-running-time:
533    * @splitmux: the #GstSplitMuxSink
534    *
535    * When called by the user, this action signal splits the video file (and
536    * begins a new one) as soon as the given running time is reached. If this
537    * action signal is called multiple times, running times are queued up and
538    * processed in the order they were given.
539    *
540    * Note that this is prone to race conditions, where said running time is
541    * reached and surpassed before we had a chance to split. The file will
542    * still split immediately, but in order to make sure that the split doesn't
543    * happen too late, it is recommended to call this action signal from
544    * something that will prevent further buffers from flowing into
545    * splitmuxsink before the split is completed, such as a pad probe before
546    * splitmuxsink.
547    *
548    *
549    * Since: 1.16
550    */
551   signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
552       g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
553       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
554       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
555       NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
556
557   /**
558    * GstSplitMuxSink::muxer-added:
559    * @splitmux: the #GstSplitMuxSink
560    * @muxer: the newly added muxer element
561    *
562    * Since: 1.14
563    */
564   signals[SIGNAL_MUXER_ADDED] =
565       g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
566       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
567
568   /**
569    * GstSplitMuxSink::sink-added:
570    * @splitmux: the #GstSplitMuxSink
571    * @sink: the newly added sink element
572    *
573    * Since: 1.14
574    */
575   signals[SIGNAL_SINK_ADDED] =
576       g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
577       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
578
579   klass->split_now = split_now;
580   klass->split_after = split_after;
581   klass->split_at_running_time = split_at_running_time;
582 }
583
584 static void
585 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
586 {
587   g_mutex_init (&splitmux->lock);
588   g_mutex_init (&splitmux->state_lock);
589   g_cond_init (&splitmux->input_cond);
590   g_cond_init (&splitmux->output_cond);
591   g_queue_init (&splitmux->out_cmd_q);
592
593   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
594   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
595   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
596   splitmux->max_files = DEFAULT_MAX_FILES;
597   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
598   splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
599   splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
600   splitmux->reset_muxer = DEFAULT_RESET_MUXER;
601
602   splitmux->threshold_timecode_str = NULL;
603   gst_splitmux_reset_timecode (splitmux);
604
605   splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
606   splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
607   splitmux->muxer_properties = NULL;
608   splitmux->sink_factory = g_strdup (DEFAULT_SINK);
609   splitmux->sink_properties = NULL;
610
611   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
612   splitmux->split_requested = FALSE;
613   splitmux->do_split_next_gop = FALSE;
614   splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
615   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
616 }
617
618 static void
619 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
620 {
621   if (splitmux->muxer) {
622     gst_element_set_locked_state (splitmux->muxer, TRUE);
623     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
624     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
625   }
626   if (splitmux->active_sink) {
627     gst_element_set_locked_state (splitmux->active_sink, TRUE);
628     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
629     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
630   }
631
632   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
633 }
634
635 static void
636 gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux)
637 {
638   g_clear_pointer (&splitmux->in_tc, gst_video_time_code_free);
639   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
640   g_clear_pointer (&splitmux->gop_start_tc, gst_video_time_code_free);
641   splitmux->next_fragment_start_tc_time = GST_CLOCK_TIME_NONE;
642 }
643
644 static void
645 gst_splitmux_sink_dispose (GObject * object)
646 {
647   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
648
649   /* Calling parent dispose invalidates all child pointers */
650   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
651
652   G_OBJECT_CLASS (parent_class)->dispose (object);
653 }
654
655 static void
656 gst_splitmux_sink_finalize (GObject * object)
657 {
658   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
659   g_cond_clear (&splitmux->input_cond);
660   g_cond_clear (&splitmux->output_cond);
661   g_mutex_clear (&splitmux->lock);
662   g_mutex_clear (&splitmux->state_lock);
663   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
664   g_queue_clear (&splitmux->out_cmd_q);
665
666   if (splitmux->muxerpad_map)
667     gst_structure_free (splitmux->muxerpad_map);
668
669   if (splitmux->provided_sink)
670     gst_object_unref (splitmux->provided_sink);
671   if (splitmux->provided_muxer)
672     gst_object_unref (splitmux->provided_muxer);
673
674   if (splitmux->muxer_factory)
675     g_free (splitmux->muxer_factory);
676   if (splitmux->muxer_preset)
677     g_free (splitmux->muxer_preset);
678   if (splitmux->muxer_properties)
679     gst_structure_free (splitmux->muxer_properties);
680   if (splitmux->sink_factory)
681     g_free (splitmux->sink_factory);
682   if (splitmux->sink_preset)
683     g_free (splitmux->sink_preset);
684   if (splitmux->sink_properties)
685     gst_structure_free (splitmux->sink_properties);
686
687   if (splitmux->threshold_timecode_str)
688     g_free (splitmux->threshold_timecode_str);
689   if (splitmux->tc_interval)
690     gst_video_time_code_interval_free (splitmux->tc_interval);
691
692   if (splitmux->times_to_split)
693     gst_queue_array_free (splitmux->times_to_split);
694
695   g_free (splitmux->location);
696
697   /* Make sure to free any un-released contexts. There should not be any,
698    * because the dispose will have freed all request pads though */
699   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
700   g_list_free (splitmux->contexts);
701   gst_splitmux_reset_timecode (splitmux);
702
703   G_OBJECT_CLASS (parent_class)->finalize (object);
704 }
705
706 /*
707  * Set any time threshold to the muxer, if it has
708  * reserved-max-duration and reserved-duration-remaining
709  * properties. Called when creating/claiming the muxer
710  * in create_elements() */
711 static void
712 update_muxer_properties (GstSplitMuxSink * sink)
713 {
714   GObjectClass *klass;
715   GstClockTime threshold_time;
716
717   sink->muxer_has_reserved_props = FALSE;
718   if (sink->muxer == NULL)
719     return;
720   klass = G_OBJECT_GET_CLASS (sink->muxer);
721   if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
722     return;
723   if (g_object_class_find_property (klass,
724           "reserved-duration-remaining") == NULL)
725     return;
726   sink->muxer_has_reserved_props = TRUE;
727
728   GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
729       GST_TIME_ARGS (sink->threshold_time));
730   GST_OBJECT_LOCK (sink);
731   threshold_time = sink->threshold_time;
732   GST_OBJECT_UNLOCK (sink);
733
734   if (threshold_time > 0) {
735     /* Tell the muxer how much space to reserve */
736     GstClockTime muxer_threshold = threshold_time;
737     g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
738   }
739 }
740
741 static void
742 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
743     const GValue * value, GParamSpec * pspec)
744 {
745   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
746
747   switch (prop_id) {
748     case PROP_LOCATION:{
749       GST_OBJECT_LOCK (splitmux);
750       g_free (splitmux->location);
751       splitmux->location = g_value_dup_string (value);
752       GST_OBJECT_UNLOCK (splitmux);
753       break;
754     }
755     case PROP_START_INDEX:
756       GST_OBJECT_LOCK (splitmux);
757       splitmux->start_index = g_value_get_int (value);
758       GST_OBJECT_UNLOCK (splitmux);
759       break;
760     case PROP_MAX_SIZE_BYTES:
761       GST_OBJECT_LOCK (splitmux);
762       splitmux->threshold_bytes = g_value_get_uint64 (value);
763       GST_OBJECT_UNLOCK (splitmux);
764       break;
765     case PROP_MAX_SIZE_TIME:
766       GST_OBJECT_LOCK (splitmux);
767       splitmux->threshold_time = g_value_get_uint64 (value);
768       GST_OBJECT_UNLOCK (splitmux);
769       break;
770     case PROP_MAX_SIZE_TIMECODE:
771       GST_OBJECT_LOCK (splitmux);
772       g_free (splitmux->threshold_timecode_str);
773       /* will be calculated later */
774       g_clear_pointer (&splitmux->tc_interval,
775           gst_video_time_code_interval_free);
776       gst_splitmux_reset_timecode (splitmux);
777
778       splitmux->threshold_timecode_str = g_value_dup_string (value);
779       if (splitmux->threshold_timecode_str) {
780         splitmux->tc_interval =
781             gst_video_time_code_interval_new_from_string
782             (splitmux->threshold_timecode_str);
783         if (!splitmux->tc_interval) {
784           g_warning ("Wrong timecode string %s",
785               splitmux->threshold_timecode_str);
786           g_free (splitmux->threshold_timecode_str);
787           splitmux->threshold_timecode_str = NULL;
788         }
789       }
790       GST_OBJECT_UNLOCK (splitmux);
791       break;
792     case PROP_SEND_KEYFRAME_REQUESTS:
793       GST_OBJECT_LOCK (splitmux);
794       splitmux->send_keyframe_requests = g_value_get_boolean (value);
795       GST_OBJECT_UNLOCK (splitmux);
796       break;
797     case PROP_MAX_FILES:
798       GST_OBJECT_LOCK (splitmux);
799       splitmux->max_files = g_value_get_uint (value);
800       GST_OBJECT_UNLOCK (splitmux);
801       break;
802     case PROP_MUXER_OVERHEAD:
803       GST_OBJECT_LOCK (splitmux);
804       splitmux->mux_overhead = g_value_get_double (value);
805       GST_OBJECT_UNLOCK (splitmux);
806       break;
807     case PROP_USE_ROBUST_MUXING:
808       GST_OBJECT_LOCK (splitmux);
809       splitmux->use_robust_muxing = g_value_get_boolean (value);
810       GST_OBJECT_UNLOCK (splitmux);
811       if (splitmux->use_robust_muxing)
812         update_muxer_properties (splitmux);
813       break;
814     case PROP_ALIGNMENT_THRESHOLD:
815       GST_OBJECT_LOCK (splitmux);
816       splitmux->alignment_threshold = g_value_get_uint64 (value);
817       GST_OBJECT_UNLOCK (splitmux);
818       break;
819     case PROP_SINK:
820       GST_OBJECT_LOCK (splitmux);
821       gst_clear_object (&splitmux->provided_sink);
822       splitmux->provided_sink = g_value_get_object (value);
823       if (splitmux->provided_sink)
824         gst_object_ref_sink (splitmux->provided_sink);
825       GST_OBJECT_UNLOCK (splitmux);
826       break;
827     case PROP_MUXER:
828       GST_OBJECT_LOCK (splitmux);
829       gst_clear_object (&splitmux->provided_muxer);
830       splitmux->provided_muxer = g_value_get_object (value);
831       if (splitmux->provided_muxer)
832         gst_object_ref_sink (splitmux->provided_muxer);
833       GST_OBJECT_UNLOCK (splitmux);
834       break;
835     case PROP_RESET_MUXER:
836       GST_OBJECT_LOCK (splitmux);
837       splitmux->reset_muxer = g_value_get_boolean (value);
838       GST_OBJECT_UNLOCK (splitmux);
839       break;
840     case PROP_ASYNC_FINALIZE:
841       GST_OBJECT_LOCK (splitmux);
842       splitmux->async_finalize = g_value_get_boolean (value);
843       GST_OBJECT_UNLOCK (splitmux);
844       break;
845     case PROP_MUXER_FACTORY:
846       GST_OBJECT_LOCK (splitmux);
847       if (splitmux->muxer_factory)
848         g_free (splitmux->muxer_factory);
849       splitmux->muxer_factory = g_value_dup_string (value);
850       GST_OBJECT_UNLOCK (splitmux);
851       break;
852     case PROP_MUXER_PRESET:
853       GST_OBJECT_LOCK (splitmux);
854       if (splitmux->muxer_preset)
855         g_free (splitmux->muxer_preset);
856       splitmux->muxer_preset = g_value_dup_string (value);
857       GST_OBJECT_UNLOCK (splitmux);
858       break;
859     case PROP_MUXER_PROPERTIES:
860       GST_OBJECT_LOCK (splitmux);
861       if (splitmux->muxer_properties)
862         gst_structure_free (splitmux->muxer_properties);
863       if (gst_value_get_structure (value))
864         splitmux->muxer_properties =
865             gst_structure_copy (gst_value_get_structure (value));
866       else
867         splitmux->muxer_properties = NULL;
868       GST_OBJECT_UNLOCK (splitmux);
869       break;
870     case PROP_SINK_FACTORY:
871       GST_OBJECT_LOCK (splitmux);
872       if (splitmux->sink_factory)
873         g_free (splitmux->sink_factory);
874       splitmux->sink_factory = g_value_dup_string (value);
875       GST_OBJECT_UNLOCK (splitmux);
876       break;
877     case PROP_SINK_PRESET:
878       GST_OBJECT_LOCK (splitmux);
879       if (splitmux->sink_preset)
880         g_free (splitmux->sink_preset);
881       splitmux->sink_preset = g_value_dup_string (value);
882       GST_OBJECT_UNLOCK (splitmux);
883       break;
884     case PROP_SINK_PROPERTIES:
885       GST_OBJECT_LOCK (splitmux);
886       if (splitmux->sink_properties)
887         gst_structure_free (splitmux->sink_properties);
888       if (gst_value_get_structure (value))
889         splitmux->sink_properties =
890             gst_structure_copy (gst_value_get_structure (value));
891       else
892         splitmux->sink_properties = NULL;
893       GST_OBJECT_UNLOCK (splitmux);
894       break;
895     case PROP_MUXERPAD_MAP:
896     {
897       const GstStructure *s = gst_value_get_structure (value);
898       GST_SPLITMUX_LOCK (splitmux);
899       if (splitmux->muxerpad_map) {
900         gst_structure_free (splitmux->muxerpad_map);
901       }
902       if (s)
903         splitmux->muxerpad_map = gst_structure_copy (s);
904       else
905         splitmux->muxerpad_map = NULL;
906       GST_SPLITMUX_UNLOCK (splitmux);
907       break;
908     }
909     default:
910       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
911       break;
912   }
913 }
914
915 static void
916 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
917     GValue * value, GParamSpec * pspec)
918 {
919   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
920
921   switch (prop_id) {
922     case PROP_LOCATION:
923       GST_OBJECT_LOCK (splitmux);
924       g_value_set_string (value, splitmux->location);
925       GST_OBJECT_UNLOCK (splitmux);
926       break;
927     case PROP_START_INDEX:
928       GST_OBJECT_LOCK (splitmux);
929       g_value_set_int (value, splitmux->start_index);
930       GST_OBJECT_UNLOCK (splitmux);
931       break;
932     case PROP_MAX_SIZE_BYTES:
933       GST_OBJECT_LOCK (splitmux);
934       g_value_set_uint64 (value, splitmux->threshold_bytes);
935       GST_OBJECT_UNLOCK (splitmux);
936       break;
937     case PROP_MAX_SIZE_TIME:
938       GST_OBJECT_LOCK (splitmux);
939       g_value_set_uint64 (value, splitmux->threshold_time);
940       GST_OBJECT_UNLOCK (splitmux);
941       break;
942     case PROP_MAX_SIZE_TIMECODE:
943       GST_OBJECT_LOCK (splitmux);
944       g_value_set_string (value, splitmux->threshold_timecode_str);
945       GST_OBJECT_UNLOCK (splitmux);
946       break;
947     case PROP_SEND_KEYFRAME_REQUESTS:
948       GST_OBJECT_LOCK (splitmux);
949       g_value_set_boolean (value, splitmux->send_keyframe_requests);
950       GST_OBJECT_UNLOCK (splitmux);
951       break;
952     case PROP_MAX_FILES:
953       GST_OBJECT_LOCK (splitmux);
954       g_value_set_uint (value, splitmux->max_files);
955       GST_OBJECT_UNLOCK (splitmux);
956       break;
957     case PROP_MUXER_OVERHEAD:
958       GST_OBJECT_LOCK (splitmux);
959       g_value_set_double (value, splitmux->mux_overhead);
960       GST_OBJECT_UNLOCK (splitmux);
961       break;
962     case PROP_USE_ROBUST_MUXING:
963       GST_OBJECT_LOCK (splitmux);
964       g_value_set_boolean (value, splitmux->use_robust_muxing);
965       GST_OBJECT_UNLOCK (splitmux);
966       break;
967     case PROP_ALIGNMENT_THRESHOLD:
968       GST_OBJECT_LOCK (splitmux);
969       g_value_set_uint64 (value, splitmux->alignment_threshold);
970       GST_OBJECT_UNLOCK (splitmux);
971       break;
972     case PROP_SINK:
973       GST_OBJECT_LOCK (splitmux);
974       g_value_set_object (value, splitmux->provided_sink);
975       GST_OBJECT_UNLOCK (splitmux);
976       break;
977     case PROP_MUXER:
978       GST_OBJECT_LOCK (splitmux);
979       g_value_set_object (value, splitmux->provided_muxer);
980       GST_OBJECT_UNLOCK (splitmux);
981       break;
982     case PROP_RESET_MUXER:
983       GST_OBJECT_LOCK (splitmux);
984       g_value_set_boolean (value, splitmux->reset_muxer);
985       GST_OBJECT_UNLOCK (splitmux);
986       break;
987     case PROP_ASYNC_FINALIZE:
988       GST_OBJECT_LOCK (splitmux);
989       g_value_set_boolean (value, splitmux->async_finalize);
990       GST_OBJECT_UNLOCK (splitmux);
991       break;
992     case PROP_MUXER_FACTORY:
993       GST_OBJECT_LOCK (splitmux);
994       g_value_set_string (value, splitmux->muxer_factory);
995       GST_OBJECT_UNLOCK (splitmux);
996       break;
997     case PROP_MUXER_PRESET:
998       GST_OBJECT_LOCK (splitmux);
999       g_value_set_string (value, splitmux->muxer_preset);
1000       GST_OBJECT_UNLOCK (splitmux);
1001       break;
1002     case PROP_MUXER_PROPERTIES:
1003       GST_OBJECT_LOCK (splitmux);
1004       gst_value_set_structure (value, splitmux->muxer_properties);
1005       GST_OBJECT_UNLOCK (splitmux);
1006       break;
1007     case PROP_SINK_FACTORY:
1008       GST_OBJECT_LOCK (splitmux);
1009       g_value_set_string (value, splitmux->sink_factory);
1010       GST_OBJECT_UNLOCK (splitmux);
1011       break;
1012     case PROP_SINK_PRESET:
1013       GST_OBJECT_LOCK (splitmux);
1014       g_value_set_string (value, splitmux->sink_preset);
1015       GST_OBJECT_UNLOCK (splitmux);
1016       break;
1017     case PROP_SINK_PROPERTIES:
1018       GST_OBJECT_LOCK (splitmux);
1019       gst_value_set_structure (value, splitmux->sink_properties);
1020       GST_OBJECT_UNLOCK (splitmux);
1021       break;
1022     case PROP_MUXERPAD_MAP:
1023       GST_SPLITMUX_LOCK (splitmux);
1024       gst_value_set_structure (value, splitmux->muxerpad_map);
1025       GST_SPLITMUX_UNLOCK (splitmux);
1026       break;
1027     default:
1028       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1029       break;
1030   }
1031 }
1032
1033 /* Convenience function */
1034 static inline GstClockTimeDiff
1035 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1036 {
1037   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1038
1039   if (GST_CLOCK_TIME_IS_VALID (val)) {
1040     gboolean sign =
1041         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1042     if (sign > 0)
1043       res = val;
1044     else if (sign < 0)
1045       res = -val;
1046   }
1047   return res;
1048 }
1049
1050 static void
1051 mq_stream_ctx_reset (MqStreamCtx * ctx)
1052 {
1053   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1054   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1055   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1056   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1057   g_queue_clear (&ctx->queued_bufs);
1058 }
1059
1060 static MqStreamCtx *
1061 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1062 {
1063   MqStreamCtx *ctx;
1064
1065   ctx = g_new0 (MqStreamCtx, 1);
1066   ctx->splitmux = splitmux;
1067   g_queue_init (&ctx->queued_bufs);
1068   mq_stream_ctx_reset (ctx);
1069
1070   return ctx;
1071 }
1072
1073 static void
1074 mq_stream_ctx_free (MqStreamCtx * ctx)
1075 {
1076   if (ctx->q) {
1077     GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1078
1079     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1080
1081     if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1082       gst_element_set_locked_state (ctx->q, TRUE);
1083       gst_element_set_state (ctx->q, GST_STATE_NULL);
1084       gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1085       gst_object_unref (parent);
1086     }
1087     gst_object_unref (ctx->q);
1088   }
1089   gst_object_unref (ctx->sinkpad);
1090   gst_object_unref (ctx->srcpad);
1091   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1092   g_queue_clear (&ctx->queued_bufs);
1093   g_free (ctx);
1094 }
1095
1096 static void
1097 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1098     GstElement * sink)
1099 {
1100   gchar *location = NULL;
1101   GstMessage *msg;
1102   const gchar *msg_name = opened ?
1103       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1104   GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1105
1106   if (!opened) {
1107     GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1108     if (rtime)
1109       running_time = *rtime;
1110   }
1111
1112   if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1113           "location") != NULL)
1114     g_object_get (sink, "location", &location, NULL);
1115
1116   GST_DEBUG_OBJECT (splitmux,
1117       "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1118       msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1119
1120   /* If it's in the middle of a teardown, the reference_ctc might have become
1121    * NULL */
1122   if (splitmux->reference_ctx) {
1123     msg = gst_message_new_element (GST_OBJECT (splitmux),
1124         gst_structure_new (msg_name,
1125             "location", G_TYPE_STRING, location,
1126             "running-time", GST_TYPE_CLOCK_TIME, running_time,
1127             "sink", GST_TYPE_ELEMENT, sink, NULL));
1128     gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1129   }
1130
1131   g_free (location);
1132 }
1133
1134 static void
1135 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1136 {
1137   GstEvent *eos;
1138   GstPad *pad;
1139   MqStreamCtx *ctx;
1140
1141   eos = gst_event_new_eos ();
1142   pad = helper->pad;
1143   ctx = helper->ctx;
1144
1145   GST_SPLITMUX_LOCK (splitmux);
1146   if (!pad)
1147     pad = gst_pad_get_peer (ctx->srcpad);
1148   GST_SPLITMUX_UNLOCK (splitmux);
1149
1150   gst_pad_send_event (pad, eos);
1151   GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1152
1153   gst_object_unref (pad);
1154   g_free (helper);
1155 }
1156
1157 /* Called with lock held, drops the lock to send EOS to the
1158  * pad
1159  */
1160 static void
1161 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1162 {
1163   GstEvent *eos;
1164   GstPad *pad;
1165
1166   eos = gst_event_new_eos ();
1167   pad = gst_pad_get_peer (ctx->srcpad);
1168
1169   ctx->out_eos = TRUE;
1170
1171   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1172   GST_SPLITMUX_UNLOCK (splitmux);
1173   gst_pad_send_event (pad, eos);
1174   GST_SPLITMUX_LOCK (splitmux);
1175
1176   gst_object_unref (pad);
1177 }
1178
1179 /* Called with lock held. Schedules an EOS event to the ctx pad
1180  * to happen in another thread */
1181 static void
1182 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1183 {
1184   AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1185   GstPad *srcpad, *sinkpad;
1186
1187   srcpad = ctx->srcpad;
1188   sinkpad = gst_pad_get_peer (srcpad);
1189
1190   helper->ctx = ctx;
1191   helper->pad = sinkpad;        /* Takes the reference */
1192
1193   ctx->out_eos_async_done = TRUE;
1194
1195   /* There used to be a bug here, where we had to explicitly remove
1196    * the SINK flag so that GstBin would ignore it for EOS purposes.
1197    * That fixed a race where if splitmuxsink really reaches EOS
1198    * before an asynchronous background element has finished, then
1199    * the bin wouldn't actually send EOS to the pipeline. Even after
1200    * finishing and removing the old element, the bin didn't re-check
1201    * EOS status on removing a SINK element. That bug was fixed
1202    * in core. */
1203   GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1204       sinkpad, ctx);
1205
1206   g_assert_nonnull (helper->pad);
1207   gst_element_call_async (GST_ELEMENT (splitmux),
1208       (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1209 }
1210
1211 /* Called with lock held. TRUE iff all contexts have a
1212  * pending (or delivered) async eos event */
1213 static gboolean
1214 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1215 {
1216   gboolean ret = TRUE;
1217   GList *item;
1218
1219   for (item = splitmux->contexts; item; item = item->next) {
1220     MqStreamCtx *ctx = item->data;
1221     ret &= ctx->out_eos_async_done;
1222   }
1223   return ret;
1224 }
1225
1226 /* Called with splitmux lock held to check if this output
1227  * context needs to sleep to wait for the release of the
1228  * next GOP, or to send EOS to close out the current file
1229  */
1230 static GstFlowReturn
1231 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1232 {
1233   if (ctx->caps_change)
1234     return GST_FLOW_OK;
1235
1236   do {
1237     /* When first starting up, the reference stream has to output
1238      * the first buffer to prepare the muxer and sink */
1239     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1240     GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1241
1242     if (!(splitmux->max_out_running_time == 0 ||
1243             splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1244             splitmux->alignment_threshold == 0 ||
1245             splitmux->max_out_running_time < splitmux->alignment_threshold)) {
1246       my_max_out_running_time -= splitmux->alignment_threshold;
1247       GST_LOG_OBJECT (ctx->srcpad,
1248           "Max out running time currently %" GST_STIME_FORMAT
1249           ", with threshold applied it is %" GST_STIME_FORMAT,
1250           GST_STIME_ARGS (splitmux->max_out_running_time),
1251           GST_STIME_ARGS (my_max_out_running_time));
1252     }
1253
1254     if (ctx->flushing
1255         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1256       return GST_FLOW_FLUSHING;
1257
1258     GST_LOG_OBJECT (ctx->srcpad,
1259         "Checking running time %" GST_STIME_FORMAT " against max %"
1260         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1261         GST_STIME_ARGS (my_max_out_running_time));
1262
1263     if (can_output) {
1264       if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1265           ctx->out_running_time < my_max_out_running_time) {
1266         return GST_FLOW_OK;
1267       }
1268
1269       switch (splitmux->output_state) {
1270         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1271           /* We only get here if we've finished outputting a GOP and need to know
1272            * what to do next */
1273           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1274           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1275           continue;
1276
1277         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1278         case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1279           /* We've reached the max out running_time to get here, so end this file now */
1280           if (ctx->out_eos == FALSE) {
1281             if (splitmux->async_finalize) {
1282               /* We must set EOS asynchronously at this point. We cannot defer
1283                * it, because we need all contexts to wake up, for the
1284                * reference context to eventually give us something at
1285                * START_NEXT_FILE. Otherwise, collectpads might choose another
1286                * context to give us the first buffer, and format-location-full
1287                * will not contain a valid sample. */
1288               g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1289                   GINT_TO_POINTER (1));
1290               eos_context_async (ctx, splitmux);
1291               if (all_contexts_are_async_eos (splitmux)) {
1292                 GST_INFO_OBJECT (splitmux,
1293                     "All contexts are async_eos. Moving to the next file.");
1294                 /* We can start the next file once we've asked each pad to go EOS */
1295                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1296                 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1297                 continue;
1298               }
1299             } else {
1300               send_eos (splitmux, ctx);
1301               continue;
1302             }
1303           } else {
1304             GST_INFO_OBJECT (splitmux,
1305                 "At end-of-file state, but context %p is already EOS", ctx);
1306           }
1307           break;
1308         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1309           if (ctx->is_reference) {
1310             GstFlowReturn ret = GST_FLOW_OK;
1311
1312             /* Special handling on the reference ctx to start new fragments
1313              * and collect commands from the command queue */
1314             /* drops the splitmux lock briefly: */
1315             /* We must have reference ctx in order for format-location-full to
1316              * have a sample */
1317             ret = start_next_fragment (splitmux, ctx);
1318             if (ret != GST_FLOW_OK)
1319               return ret;
1320
1321             continue;
1322           }
1323           break;
1324         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1325           do {
1326             SplitMuxOutputCommand *cmd =
1327                 g_queue_pop_tail (&splitmux->out_cmd_q);
1328             if (cmd != NULL) {
1329               /* If we pop the last command, we need to make our queues bigger */
1330               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1331                 grow_blocked_queues (splitmux);
1332
1333               if (cmd->start_new_fragment) {
1334                 if (splitmux->muxed_out_bytes > 0) {
1335                   GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1336                   splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1337                 } else {
1338                   GST_DEBUG_OBJECT (splitmux,
1339                       "Got cmd to start new fragment, but fragment is empty - ignoring.");
1340                 }
1341               } else {
1342                 GST_DEBUG_OBJECT (splitmux,
1343                     "Got new output cmd for time %" GST_STIME_FORMAT,
1344                     GST_STIME_ARGS (cmd->max_output_ts));
1345
1346                 /* Extend the output range immediately */
1347                 splitmux->max_out_running_time = cmd->max_output_ts;
1348                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1349               }
1350               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1351
1352               out_cmd_buf_free (cmd);
1353               break;
1354             } else {
1355               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1356             }
1357           } while (!ctx->flushing && splitmux->output_state ==
1358               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1359           /* loop and re-check the state */
1360           continue;
1361         }
1362         case SPLITMUX_OUTPUT_STATE_STOPPED:
1363           return GST_FLOW_FLUSHING;
1364       }
1365     } else {
1366       GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1367     }
1368
1369     GST_INFO_OBJECT (ctx->srcpad,
1370         "Sleeping for running time %"
1371         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1372         GST_STIME_ARGS (ctx->out_running_time),
1373         GST_STIME_ARGS (splitmux->max_out_running_time));
1374     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1375     GST_INFO_OBJECT (ctx->srcpad,
1376         "Woken for new max running time %" GST_STIME_FORMAT,
1377         GST_STIME_ARGS (splitmux->max_out_running_time));
1378   }
1379   while (1);
1380
1381   return GST_FLOW_OK;
1382 }
1383
1384 static GstClockTime
1385 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1386     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1387     GstVideoTimeCode ** next_tc)
1388 {
1389   GstVideoTimeCode *target_tc;
1390   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1391
1392   if (cur_tc == NULL || splitmux->tc_interval == NULL)
1393     return GST_CLOCK_TIME_NONE;
1394
1395   target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1396   if (!target_tc) {
1397     GST_ELEMENT_ERROR (splitmux,
1398         STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1399     return GST_CLOCK_TIME_NONE;
1400   }
1401
1402   /* Convert to ns */
1403   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1404   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1405
1406   /* Add running_time, accounting for wraparound. */
1407   if (target_tc_time >= cur_tc_time) {
1408     next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1409   } else {
1410     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1411
1412     if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1413         (cur_tc->config.fps_d == 1001)) {
1414       /* Checking fps_d is probably unneeded, but better safe than sorry
1415        * (e.g. someone accidentally set a flag) */
1416       GstVideoTimeCode *tc_for_offset;
1417
1418       /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1419        * but slightly less. Calculate that duration from a fake timecode. The
1420        * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1421        * is to add one frame to 23:59:59;29 */
1422       tc_for_offset =
1423           gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1424           NULL, cur_tc->config.flags, 23, 59, 59,
1425           cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1426       day_in_ns =
1427           gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1428           gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1429           cur_tc->config.fps_n);
1430       gst_video_time_code_free (tc_for_offset);
1431     }
1432     next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1433   }
1434
1435   GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
1436       " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
1437       GST_TIME_ARGS (cur_tc_time));
1438   if (next_tc)
1439     *next_tc = target_tc;
1440   else
1441     gst_video_time_code_free (target_tc);
1442
1443   return next_max_tc_time;
1444 }
1445
1446 static gboolean
1447 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1448     GstClockTime running_time)
1449 {
1450   GstEvent *ev;
1451   GstClockTime target_time;
1452   gboolean timecode_based = FALSE;
1453   GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1454   GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1455   GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1456   GstClockTime tc_rounding_error = 5 * GST_USECOND;
1457
1458   if (!splitmux->send_keyframe_requests)
1459     return TRUE;
1460
1461   if (splitmux->tc_interval) {
1462     if (splitmux->in_tc && gst_video_time_code_is_valid (splitmux->in_tc)) {
1463       GstVideoTimeCode *next_tc = NULL;
1464       max_tc_time =
1465           calculate_next_max_timecode (splitmux, splitmux->in_tc,
1466           running_time, &next_tc);
1467
1468       /* calculate the next expected keyframe time to prevent too early fku
1469        * event */
1470       if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1471         next_max_tc_time =
1472             calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1473       }
1474       if (next_tc)
1475         gst_video_time_code_free (next_tc);
1476
1477       timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1478           GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1479     } else {
1480       /* This can happen in the presence of GAP events that trigger
1481        * a new fragment start */
1482       GST_WARNING_OBJECT (splitmux,
1483           "No buffer available to calculate next timecode");
1484     }
1485   }
1486
1487   if ((splitmux->threshold_time == 0 && !timecode_based)
1488       || splitmux->threshold_bytes != 0)
1489     return TRUE;
1490
1491   if (timecode_based) {
1492     /* We might have rounding errors: aim slightly earlier */
1493     if (max_tc_time >= tc_rounding_error) {
1494       target_time = max_tc_time - tc_rounding_error;
1495     } else {
1496       /* unreliable target time */
1497       GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1498           " is smaller than allowed rounding error, set it to zero",
1499           GST_TIME_ARGS (max_tc_time));
1500       target_time = 0;
1501     }
1502
1503     if (next_max_tc_time >= tc_rounding_error) {
1504       next_fku_time = next_max_tc_time - tc_rounding_error;
1505     } else {
1506       /* unreliable target time */
1507       GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1508           " is smaller than allowed rounding error, set it to zero",
1509           GST_TIME_ARGS (next_max_tc_time));
1510       next_fku_time = 0;
1511     }
1512   } else {
1513     target_time = running_time + splitmux->threshold_time;
1514   }
1515
1516   if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1517     GstClockTime allowed_time = splitmux->next_fku_time;
1518
1519     if (timecode_based) {
1520       if (allowed_time >= tc_rounding_error) {
1521         allowed_time -= tc_rounding_error;
1522       } else {
1523         /* unreliable next force key unit time */
1524         GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1525             GST_TIME_FORMAT
1526             " is smaller than allowed rounding error, set it to zero",
1527             GST_TIME_ARGS (splitmux->next_fku_time));
1528         allowed_time = 0;
1529       }
1530     }
1531
1532     if (target_time < allowed_time) {
1533       GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1534           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1535           ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1536           GST_TIME_ARGS (target_time),
1537           GST_TIME_ARGS (splitmux->next_fku_time),
1538           GST_TIME_ARGS (allowed_time));
1539
1540       return TRUE;
1541     } else if (allowed_time != splitmux->next_fku_time &&
1542         target_time < splitmux->next_fku_time) {
1543       GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1544           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1545           ", but the difference is smaller than allowed rounding error",
1546           GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1547     }
1548   }
1549
1550   if (!timecode_based) {
1551     next_fku_time = target_time + splitmux->threshold_time;
1552   }
1553
1554   splitmux->next_fku_time = next_fku_time;
1555
1556   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1557   GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1558       ", the next expected keyframe is %" GST_TIME_FORMAT,
1559       GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1560   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1561 }
1562
1563 static GstPadProbeReturn
1564 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1565 {
1566   GstSplitMuxSink *splitmux = ctx->splitmux;
1567   MqStreamBuf *buf_info = NULL;
1568   GstFlowReturn ret = GST_FLOW_OK;
1569
1570   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1571
1572   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1573   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1574     g_warning ("Buffer list handling not implemented");
1575     return GST_PAD_PROBE_DROP;
1576   }
1577   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1578       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1579     GstEvent *event = gst_pad_probe_info_get_event (info);
1580     gboolean locked = FALSE, wait = !ctx->is_reference;
1581
1582     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1583
1584     switch (GST_EVENT_TYPE (event)) {
1585       case GST_EVENT_SEGMENT:
1586         gst_event_copy_segment (event, &ctx->out_segment);
1587         break;
1588       case GST_EVENT_FLUSH_STOP:
1589         GST_SPLITMUX_LOCK (splitmux);
1590         locked = TRUE;
1591         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1592         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1593         g_queue_clear (&ctx->queued_bufs);
1594         g_queue_clear (&ctx->queued_bufs);
1595         /* If this is the reference context, we just threw away any queued keyframes */
1596         if (ctx->is_reference)
1597           splitmux->queued_keyframes = 0;
1598         ctx->flushing = FALSE;
1599         wait = FALSE;
1600         break;
1601       case GST_EVENT_FLUSH_START:
1602         GST_SPLITMUX_LOCK (splitmux);
1603         locked = TRUE;
1604         GST_LOG_OBJECT (pad, "Flush start");
1605         ctx->flushing = TRUE;
1606         GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1607         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1608         break;
1609       case GST_EVENT_EOS:
1610         GST_SPLITMUX_LOCK (splitmux);
1611         locked = TRUE;
1612         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1613           goto beach;
1614         ctx->out_eos = TRUE;
1615
1616         if (ctx == splitmux->reference_ctx) {
1617           splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1618           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1619         }
1620
1621         GST_INFO_OBJECT (splitmux,
1622             "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1623         break;
1624       case GST_EVENT_GAP:{
1625         GstClockTime gap_ts;
1626         GstClockTimeDiff rtime;
1627
1628         gst_event_parse_gap (event, &gap_ts, NULL);
1629         if (gap_ts == GST_CLOCK_TIME_NONE)
1630           break;
1631
1632         GST_SPLITMUX_LOCK (splitmux);
1633         locked = TRUE;
1634
1635         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1636           goto beach;
1637
1638         /* When we get a gap event on the
1639          * reference stream and we're trying to open a
1640          * new file, we need to store it until we get
1641          * the buffer afterwards
1642          */
1643         if (ctx->is_reference &&
1644             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1645           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1646           gst_event_replace (&ctx->pending_gap, event);
1647           GST_SPLITMUX_UNLOCK (splitmux);
1648           return GST_PAD_PROBE_HANDLED;
1649         }
1650
1651         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1652
1653         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1654             GST_STIME_ARGS (rtime));
1655
1656         if (rtime != GST_CLOCK_STIME_NONE) {
1657           ctx->out_running_time = rtime;
1658           complete_or_wait_on_out (splitmux, ctx);
1659         }
1660         break;
1661       }
1662       case GST_EVENT_CUSTOM_DOWNSTREAM:{
1663         const GstStructure *s;
1664         GstClockTimeDiff ts = 0;
1665
1666         s = gst_event_get_structure (event);
1667         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1668           break;
1669
1670         gst_structure_get_int64 (s, "timestamp", &ts);
1671
1672         GST_SPLITMUX_LOCK (splitmux);
1673         locked = TRUE;
1674
1675         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1676           goto beach;
1677         ctx->out_running_time = ts;
1678         if (!ctx->is_reference)
1679           ret = complete_or_wait_on_out (splitmux, ctx);
1680         GST_SPLITMUX_UNLOCK (splitmux);
1681         GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1682         return GST_PAD_PROBE_DROP;
1683       }
1684       case GST_EVENT_CAPS:{
1685         GstPad *peer;
1686
1687         if (!ctx->is_reference)
1688           break;
1689
1690         peer = gst_pad_get_peer (pad);
1691         if (peer) {
1692           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1693
1694           gst_object_unref (peer);
1695
1696           if (ok)
1697             break;
1698
1699         } else {
1700           break;
1701         }
1702         /* This is in the case the muxer doesn't allow this change of caps */
1703         GST_SPLITMUX_LOCK (splitmux);
1704         locked = TRUE;
1705         ctx->caps_change = TRUE;
1706
1707         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1708           GST_DEBUG_OBJECT (splitmux,
1709               "New caps were not accepted. Switching output file");
1710           if (ctx->out_eos == FALSE) {
1711             splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1712             GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1713           }
1714         }
1715
1716         /* Lets it fall through, if it fails again, then the muxer just can't
1717          * support this format, but at least we have a closed file.
1718          */
1719         break;
1720       }
1721       default:
1722         break;
1723     }
1724
1725     /* We need to make sure events aren't passed
1726      * until the muxer / sink are ready for it */
1727     if (!locked)
1728       GST_SPLITMUX_LOCK (splitmux);
1729     if (wait)
1730       ret = complete_or_wait_on_out (splitmux, ctx);
1731     GST_SPLITMUX_UNLOCK (splitmux);
1732
1733     /* Don't try to forward sticky events before the next buffer is there
1734      * because it would cause a new file to be created without the first
1735      * buffer being available.
1736      */
1737     GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1738     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1739       gst_event_unref (event);
1740       return GST_PAD_PROBE_HANDLED;
1741     } else {
1742       return GST_PAD_PROBE_PASS;
1743     }
1744   }
1745
1746   /* Allow everything through until the configured next stopping point */
1747   GST_SPLITMUX_LOCK (splitmux);
1748
1749   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1750   if (buf_info == NULL) {
1751     /* Can only happen due to a poorly timed flush */
1752     ret = GST_FLOW_FLUSHING;
1753     goto beach;
1754   }
1755
1756   /* If we have popped a keyframe, decrement the queued_gop count */
1757   if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
1758     splitmux->queued_keyframes--;
1759
1760   ctx->out_running_time = buf_info->run_ts;
1761   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1762
1763   GST_LOG_OBJECT (splitmux,
1764       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1765       " size %" G_GUINT64_FORMAT,
1766       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1767
1768   ctx->caps_change = FALSE;
1769
1770   ret = complete_or_wait_on_out (splitmux, ctx);
1771
1772   splitmux->muxed_out_bytes += buf_info->buf_size;
1773
1774 #ifndef GST_DISABLE_GST_DEBUG
1775   {
1776     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1777     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1778         " run ts %" GST_STIME_FORMAT, buf,
1779         GST_STIME_ARGS (ctx->out_running_time));
1780   }
1781 #endif
1782
1783   ctx->cur_out_buffer = NULL;
1784   GST_SPLITMUX_UNLOCK (splitmux);
1785
1786   /* pending_gap is protected by the STREAM lock */
1787   if (ctx->pending_gap) {
1788     /* If we previously stored a gap event, send it now */
1789     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1790
1791     GST_DEBUG_OBJECT (splitmux,
1792         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1793
1794     gst_pad_send_event (peer, ctx->pending_gap);
1795     ctx->pending_gap = NULL;
1796
1797     gst_object_unref (peer);
1798   }
1799
1800   mq_stream_buf_free (buf_info);
1801
1802   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1803   return GST_PAD_PROBE_PASS;
1804
1805 beach:
1806   GST_SPLITMUX_UNLOCK (splitmux);
1807   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1808   return GST_PAD_PROBE_DROP;
1809 }
1810
1811 static gboolean
1812 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1813 {
1814   return gst_pad_send_event (peer, gst_event_ref (*event));
1815 }
1816
1817 static void
1818 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1819 {
1820   if (ctx->fragment_block_id > 0) {
1821     gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1822     ctx->fragment_block_id = 0;
1823   }
1824 }
1825
1826 static void
1827 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1828 {
1829   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1830
1831   gst_pad_sticky_events_foreach (ctx->srcpad,
1832       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1833
1834   /* Clear EOS flag if not actually EOS */
1835   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1836   ctx->out_eos_async_done = ctx->out_eos;
1837
1838   gst_object_unref (peer);
1839 }
1840
1841 static void
1842 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1843 {
1844   GstPad *sinkpad, *srcpad, *newpad;
1845   GstPadTemplate *templ;
1846
1847   srcpad = ctx->srcpad;
1848   sinkpad = gst_pad_get_peer (srcpad);
1849
1850   templ = sinkpad->padtemplate;
1851   newpad =
1852       gst_element_request_pad (splitmux->muxer, templ,
1853       GST_PAD_NAME (sinkpad), NULL);
1854
1855   GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1856       newpad);
1857   if (!gst_pad_unlink (srcpad, sinkpad)) {
1858     gst_object_unref (sinkpad);
1859     goto fail;
1860   }
1861   if (gst_pad_link_full (srcpad, newpad,
1862           GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1863     gst_element_release_request_pad (splitmux->muxer, newpad);
1864     gst_object_unref (sinkpad);
1865     gst_object_unref (newpad);
1866     goto fail;
1867   }
1868   gst_object_unref (newpad);
1869   gst_object_unref (sinkpad);
1870   return;
1871
1872 fail:
1873   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1874       ("Could not create the new muxer/sink"), NULL);
1875 }
1876
1877 static GstPadProbeReturn
1878 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1879 {
1880   return GST_PAD_PROBE_OK;
1881 }
1882
1883 static void
1884 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1885 {
1886   ctx->fragment_block_id =
1887       gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1888       NULL, NULL);
1889 }
1890
1891 static gboolean
1892 _set_property_from_structure (GQuark field_id, const GValue * value,
1893     gpointer user_data)
1894 {
1895   const gchar *property_name = g_quark_to_string (field_id);
1896   GObject *element = G_OBJECT (user_data);
1897
1898   g_object_set_property (element, property_name, value);
1899
1900   return TRUE;
1901 }
1902
1903 static void
1904 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1905 {
1906   gst_element_set_locked_state (element, TRUE);
1907   gst_element_set_state (element, GST_STATE_NULL);
1908   GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1909   gst_bin_remove (GST_BIN (splitmux), element);
1910 }
1911
1912
1913 static void
1914 _send_event (const GValue * value, gpointer user_data)
1915 {
1916   GstPad *pad = g_value_get_object (value);
1917   GstEvent *ev = user_data;
1918
1919   gst_pad_send_event (pad, gst_event_ref (ev));
1920 }
1921
1922 /* Called with lock held when a fragment
1923  * reaches EOS and it is time to restart
1924  * a new fragment
1925  */
1926 static GstFlowReturn
1927 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1928 {
1929   GstElement *muxer, *sink;
1930
1931   g_assert (ctx->is_reference);
1932
1933   /* 1 change to new file */
1934   splitmux->switching_fragment = TRUE;
1935
1936   /* We need to drop the splitmux lock to acquire the state lock
1937    * here and ensure there's no racy state change going on elsewhere */
1938   muxer = gst_object_ref (splitmux->muxer);
1939   sink = gst_object_ref (splitmux->active_sink);
1940
1941   GST_SPLITMUX_UNLOCK (splitmux);
1942   GST_SPLITMUX_STATE_LOCK (splitmux);
1943
1944   if (splitmux->shutdown) {
1945     GST_DEBUG_OBJECT (splitmux,
1946         "Shutdown requested. Aborting fragment switch.");
1947     GST_SPLITMUX_LOCK (splitmux);
1948     GST_SPLITMUX_STATE_UNLOCK (splitmux);
1949     gst_object_unref (muxer);
1950     gst_object_unref (sink);
1951     return GST_FLOW_FLUSHING;
1952   }
1953
1954   if (splitmux->async_finalize) {
1955     if (splitmux->muxed_out_bytes > 0
1956         || splitmux->fragment_id != splitmux->start_index) {
1957       gchar *newname;
1958       GstElement *new_sink, *new_muxer;
1959
1960       GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
1961           splitmux->fragment_id);
1962       g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
1963       newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
1964       GST_SPLITMUX_LOCK (splitmux);
1965       if ((splitmux->sink =
1966               create_element (splitmux, splitmux->sink_factory, newname,
1967                   TRUE)) == NULL)
1968         goto fail;
1969       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
1970         gst_preset_load_preset (GST_PRESET (splitmux->sink),
1971             splitmux->sink_preset);
1972       if (splitmux->sink_properties)
1973         gst_structure_foreach (splitmux->sink_properties,
1974             _set_property_from_structure, splitmux->sink);
1975       splitmux->active_sink = splitmux->sink;
1976       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
1977       g_free (newname);
1978       newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
1979       if ((splitmux->muxer =
1980               create_element (splitmux, splitmux->muxer_factory, newname,
1981                   TRUE)) == NULL)
1982         goto fail;
1983       if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
1984               "async") != NULL) {
1985         /* async child elements are causing state change races and weird
1986          * failures, so let's try and turn that off */
1987         g_object_set (splitmux->sink, "async", FALSE, NULL);
1988       }
1989       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
1990         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
1991             splitmux->muxer_preset);
1992       if (splitmux->muxer_properties)
1993         gst_structure_foreach (splitmux->muxer_properties,
1994             _set_property_from_structure, splitmux->muxer);
1995       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
1996       g_free (newname);
1997       new_sink = splitmux->sink;
1998       new_muxer = splitmux->muxer;
1999       GST_SPLITMUX_UNLOCK (splitmux);
2000       g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2001       gst_element_link (new_muxer, new_sink);
2002
2003       if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2004         if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2005                     EOS_FROM_US)) == 2) {
2006           _lock_and_set_to_null (muxer, splitmux);
2007           _lock_and_set_to_null (sink, splitmux);
2008         } else {
2009           g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2010               GINT_TO_POINTER (2));
2011         }
2012       }
2013       gst_object_unref (muxer);
2014       gst_object_unref (sink);
2015       muxer = new_muxer;
2016       sink = new_sink;
2017       gst_object_ref (muxer);
2018       gst_object_ref (sink);
2019     }
2020   } else {
2021
2022     gst_element_set_locked_state (muxer, TRUE);
2023     gst_element_set_locked_state (sink, TRUE);
2024     gst_element_set_state (sink, GST_STATE_NULL);
2025
2026     if (splitmux->reset_muxer) {
2027       gst_element_set_state (muxer, GST_STATE_NULL);
2028     } else {
2029       GstIterator *it = gst_element_iterate_sink_pads (muxer);
2030       GstEvent *ev;
2031       guint32 seqnum;
2032
2033       ev = gst_event_new_flush_start ();
2034       seqnum = gst_event_get_seqnum (ev);
2035       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2036       gst_event_unref (ev);
2037
2038       gst_iterator_resync (it);
2039
2040       ev = gst_event_new_flush_stop (TRUE);
2041       gst_event_set_seqnum (ev, seqnum);
2042       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2043       gst_event_unref (ev);
2044
2045       gst_iterator_free (it);
2046     }
2047   }
2048
2049   GST_SPLITMUX_LOCK (splitmux);
2050   set_next_filename (splitmux, ctx);
2051   splitmux->muxed_out_bytes = 0;
2052   GST_SPLITMUX_UNLOCK (splitmux);
2053
2054   if (gst_element_set_state (sink,
2055           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2056     gst_element_set_state (sink, GST_STATE_NULL);
2057     gst_element_set_locked_state (muxer, FALSE);
2058     gst_element_set_locked_state (sink, FALSE);
2059
2060     goto fail_output;
2061   }
2062
2063   if (gst_element_set_state (muxer,
2064           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2065     gst_element_set_state (muxer, GST_STATE_NULL);
2066     gst_element_set_state (sink, GST_STATE_NULL);
2067     gst_element_set_locked_state (muxer, FALSE);
2068     gst_element_set_locked_state (sink, FALSE);
2069     goto fail_muxer;
2070   }
2071
2072   gst_element_set_locked_state (muxer, FALSE);
2073   gst_element_set_locked_state (sink, FALSE);
2074
2075   gst_object_unref (sink);
2076   gst_object_unref (muxer);
2077
2078   GST_SPLITMUX_LOCK (splitmux);
2079   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2080   splitmux->switching_fragment = FALSE;
2081   do_async_done (splitmux);
2082
2083   splitmux->ready_for_output = TRUE;
2084
2085   g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2086   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2087
2088   send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2089
2090   /* FIXME: Is this always the correct next state? */
2091   GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2092   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2093   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2094   return GST_FLOW_OK;
2095
2096 fail:
2097   gst_object_unref (sink);
2098   gst_object_unref (muxer);
2099
2100   GST_SPLITMUX_LOCK (splitmux);
2101   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2102   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2103       ("Could not create the new muxer/sink"), NULL);
2104   return GST_FLOW_ERROR;
2105
2106 fail_output:
2107   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2108       ("Could not start new output sink"), NULL);
2109   gst_object_unref (sink);
2110   gst_object_unref (muxer);
2111
2112   GST_SPLITMUX_LOCK (splitmux);
2113   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2114   splitmux->switching_fragment = FALSE;
2115   return GST_FLOW_ERROR;
2116
2117 fail_muxer:
2118   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2119       ("Could not start new muxer"), NULL);
2120   gst_object_unref (sink);
2121   gst_object_unref (muxer);
2122
2123   GST_SPLITMUX_LOCK (splitmux);
2124   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2125   splitmux->switching_fragment = FALSE;
2126   return GST_FLOW_ERROR;
2127 }
2128
2129 static void
2130 bus_handler (GstBin * bin, GstMessage * message)
2131 {
2132   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2133
2134   switch (GST_MESSAGE_TYPE (message)) {
2135     case GST_MESSAGE_EOS:{
2136       /* If the state is draining out the current file, drop this EOS */
2137       GstElement *sink;
2138
2139       sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2140       GST_SPLITMUX_LOCK (splitmux);
2141
2142       send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2143
2144       if (splitmux->async_finalize) {
2145
2146         if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2147           if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2148                       EOS_FROM_US)) == 2) {
2149             GstElement *muxer;
2150             GstPad *sinksink, *muxersrc;
2151
2152             sinksink = gst_element_get_static_pad (sink, "sink");
2153             muxersrc = gst_pad_get_peer (sinksink);
2154             muxer = gst_pad_get_parent_element (muxersrc);
2155             gst_object_unref (sinksink);
2156             gst_object_unref (muxersrc);
2157
2158             gst_element_call_async (muxer,
2159                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2160                 gst_object_ref (splitmux), gst_object_unref);
2161             gst_element_call_async (sink,
2162                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2163                 gst_object_ref (splitmux), gst_object_unref);
2164             gst_object_unref (muxer);
2165           } else {
2166             g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2167                 GINT_TO_POINTER (2));
2168           }
2169           GST_DEBUG_OBJECT (splitmux,
2170               "Caught async EOS from previous muxer+sink. Dropping.");
2171           /* We forward the EOS so that it gets aggregated as normal. If the sink
2172            * finishes and is removed before the end, it will be de-aggregated */
2173           gst_message_unref (message);
2174           GST_SPLITMUX_UNLOCK (splitmux);
2175           return;
2176         }
2177       } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2178         GST_DEBUG_OBJECT (splitmux,
2179             "Passing EOS message. Output state %d max_out_running_time %"
2180             GST_STIME_FORMAT, splitmux->output_state,
2181             GST_STIME_ARGS (splitmux->max_out_running_time));
2182       } else {
2183         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2184         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2185         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2186
2187         gst_message_unref (message);
2188         GST_SPLITMUX_UNLOCK (splitmux);
2189         return;
2190       }
2191       GST_SPLITMUX_UNLOCK (splitmux);
2192       break;
2193     }
2194     case GST_MESSAGE_ASYNC_START:
2195     case GST_MESSAGE_ASYNC_DONE:
2196       /* Ignore state changes from our children while switching */
2197       GST_SPLITMUX_LOCK (splitmux);
2198       if (splitmux->switching_fragment) {
2199         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2200             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2201           GST_LOG_OBJECT (splitmux,
2202               "Ignoring state change from child %" GST_PTR_FORMAT
2203               " while switching", GST_MESSAGE_SRC (message));
2204           gst_message_unref (message);
2205           GST_SPLITMUX_UNLOCK (splitmux);
2206           return;
2207         }
2208       }
2209       GST_SPLITMUX_UNLOCK (splitmux);
2210       break;
2211     case GST_MESSAGE_WARNING:
2212     {
2213       GError *gerror = NULL;
2214
2215       gst_message_parse_warning (message, &gerror, NULL);
2216
2217       if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2218         GList *item;
2219         gboolean caps_change = FALSE;
2220
2221         GST_SPLITMUX_LOCK (splitmux);
2222
2223         for (item = splitmux->contexts; item; item = item->next) {
2224           MqStreamCtx *ctx = item->data;
2225
2226           if (ctx->caps_change) {
2227             caps_change = TRUE;
2228             break;
2229           }
2230         }
2231
2232         GST_SPLITMUX_UNLOCK (splitmux);
2233
2234         if (caps_change) {
2235           GST_LOG_OBJECT (splitmux,
2236               "Ignoring warning change from child %" GST_PTR_FORMAT
2237               " while switching caps", GST_MESSAGE_SRC (message));
2238           gst_message_unref (message);
2239           return;
2240         }
2241       }
2242       break;
2243     }
2244     default:
2245       break;
2246   }
2247
2248   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2249 }
2250
2251 static void
2252 ctx_set_unblock (MqStreamCtx * ctx)
2253 {
2254   ctx->need_unblock = TRUE;
2255 }
2256
2257 static gboolean
2258 need_new_fragment (GstSplitMuxSink * splitmux,
2259     GstClockTime queued_time, GstClockTime queued_gop_time,
2260     guint64 queued_bytes)
2261 {
2262   guint64 thresh_bytes;
2263   GstClockTime thresh_time;
2264   gboolean check_robust_muxing;
2265   GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2266   GstClockTime *ptr_to_time;
2267
2268   GST_OBJECT_LOCK (splitmux);
2269   thresh_bytes = splitmux->threshold_bytes;
2270   thresh_time = splitmux->threshold_time;
2271   ptr_to_time = (GstClockTime *)
2272       gst_queue_array_peek_head_struct (splitmux->times_to_split);
2273   if (ptr_to_time)
2274     time_to_split = *ptr_to_time;
2275   check_robust_muxing = splitmux->use_robust_muxing
2276       && splitmux->muxer_has_reserved_props;
2277   GST_OBJECT_UNLOCK (splitmux);
2278
2279   /* Have we muxed at least one thing from the reference
2280    * stream into the file? If not, no other streams can have
2281    * either */
2282   if (splitmux->fragment_reference_bytes <= 0) {
2283     GST_TRACE_OBJECT (splitmux,
2284         "Not ready to split - nothing muxed on the reference stream");
2285     return FALSE;
2286   }
2287
2288   /* User told us to split now */
2289   if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2290     GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2291     return TRUE;
2292   }
2293
2294   /* User told us to split at this running time */
2295   if (splitmux->gop_start_time >= time_to_split) {
2296     GST_OBJECT_LOCK (splitmux);
2297     /* Dequeue running time */
2298     gst_queue_array_pop_head_struct (splitmux->times_to_split);
2299     /* Empty any running times after this that are past now */
2300     ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2301     while (ptr_to_time) {
2302       time_to_split = *ptr_to_time;
2303       if (splitmux->gop_start_time < time_to_split) {
2304         break;
2305       }
2306       gst_queue_array_pop_head_struct (splitmux->times_to_split);
2307       ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2308     }
2309     GST_TRACE_OBJECT (splitmux,
2310         "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2311         GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->gop_start_time),
2312         GST_STIME_ARGS (time_to_split));
2313     GST_OBJECT_UNLOCK (splitmux);
2314     return TRUE;
2315   }
2316
2317   if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2318     GST_TRACE_OBJECT (splitmux,
2319         "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2320     return TRUE;                /* Would overrun byte limit */
2321   }
2322
2323   if (thresh_time > 0 && queued_time > thresh_time) {
2324     GST_TRACE_OBJECT (splitmux,
2325         "queued time %" GST_STIME_FORMAT " overruns time limit",
2326         GST_STIME_ARGS (queued_time));
2327     return TRUE;                /* Would overrun time limit */
2328   }
2329
2330   if (splitmux->tc_interval &&
2331       GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2332       splitmux->reference_ctx->in_running_time >
2333       splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2334     GST_TRACE_OBJECT (splitmux,
2335         "in running time %" GST_STIME_FORMAT " overruns time limit %"
2336         GST_TIME_FORMAT,
2337         GST_STIME_ARGS (splitmux->reference_ctx->in_running_time),
2338         GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2339     return TRUE;
2340   }
2341
2342   if (check_robust_muxing) {
2343     GstClockTime mux_reserved_remain;
2344
2345     g_object_get (splitmux->muxer,
2346         "reserved-duration-remaining", &mux_reserved_remain, NULL);
2347
2348     GST_LOG_OBJECT (splitmux,
2349         "Muxer robust muxing report - %" G_GUINT64_FORMAT
2350         " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2351         mux_reserved_remain, queued_gop_time);
2352
2353     if (queued_gop_time >= mux_reserved_remain) {
2354       GST_INFO_OBJECT (splitmux,
2355           "File is about to run out of header room - %" G_GUINT64_FORMAT
2356           " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2357           ". Switching to new file", mux_reserved_remain, queued_gop_time);
2358       return TRUE;
2359     }
2360   }
2361
2362   /* Continue and mux this GOP */
2363   return FALSE;
2364 }
2365
2366 /* probably we want to add this API? */
2367 static void
2368 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2369 {
2370   GstVideoTimeCode *timecode = NULL;
2371
2372   g_return_if_fail (old_tc != NULL);
2373
2374   if (*old_tc == new_tc)
2375     return;
2376
2377   if (new_tc)
2378     timecode = gst_video_time_code_copy (new_tc);
2379
2380   if (*old_tc)
2381     gst_video_time_code_free (*old_tc);
2382
2383   *old_tc = timecode;
2384 }
2385
2386 /* Called with splitmux lock held */
2387 /* Called when entering ProcessingCompleteGop state
2388  * Assess if mq contents overflowed the current file
2389  *   -> If yes, need to switch to new file
2390  *   -> if no, set max_out_running_time to let this GOP in and
2391  *      go to COLLECTING_GOP_START state
2392  */
2393 static void
2394 handle_gathered_gop (GstSplitMuxSink * splitmux)
2395 {
2396   guint64 queued_bytes;
2397   GstClockTimeDiff queued_time = 0;
2398   GstClockTimeDiff queued_gop_time = 0;
2399   GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
2400   SplitMuxOutputCommand *cmd;
2401
2402   /* Assess if the multiqueue contents overflowed the current file */
2403   /* When considering if a newly gathered GOP overflows
2404    * the time limit for the file, only consider the running time of the
2405    * reference stream. Other streams might have run ahead a little bit,
2406    * but extra pieces won't be released to the muxer beyond the reference
2407    * stream cut-off anyway - so it forms the limit. */
2408   queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
2409   queued_time = splitmux->reference_ctx->in_running_time;
2410   /* queued_gop_time tracks how much unwritten data there is waiting to
2411    * be written to this fragment including this GOP */
2412   if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2413     queued_gop_time =
2414         splitmux->reference_ctx->in_running_time -
2415         splitmux->reference_ctx->out_running_time;
2416   else
2417     queued_gop_time =
2418         splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
2419
2420   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2421   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2422       " bytes %" G_GUINT64_FORMAT " in running time %" GST_STIME_FORMAT
2423       " gop start time %" GST_STIME_FORMAT,
2424       GST_STIME_ARGS (queued_time), queued_bytes,
2425       GST_STIME_ARGS (splitmux->reference_ctx->in_running_time),
2426       GST_STIME_ARGS (splitmux->gop_start_time));
2427
2428   if (queued_gop_time < 0)
2429     goto error_gop_duration;
2430
2431   if (queued_time < splitmux->fragment_start_time)
2432     goto error_queued_time;
2433
2434   queued_time -= splitmux->fragment_start_time;
2435   if (queued_time < queued_gop_time)
2436     queued_gop_time = queued_time;
2437
2438   /* Expand queued bytes estimate by muxer overhead */
2439   queued_bytes += (queued_bytes * splitmux->mux_overhead);
2440
2441   /* Check for overrun - have we output at least one byte and overrun
2442    * either threshold? */
2443   if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2444     if (splitmux->async_finalize) {
2445       GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2446       *sink_running_time = splitmux->reference_ctx->out_running_time;
2447       g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2448           RUNNING_TIME, sink_running_time, g_free);
2449     }
2450     g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2451     /* Tell the output side to start a new fragment */
2452     GST_INFO_OBJECT (splitmux,
2453         "This GOP (dur %" GST_STIME_FORMAT
2454         ") would overflow the fragment, Sending start_new_fragment cmd",
2455         GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
2456             splitmux->gop_start_time));
2457     cmd = out_cmd_buf_new ();
2458     cmd->start_new_fragment = TRUE;
2459     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2460     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2461
2462     new_out_ts = splitmux->reference_ctx->in_running_time;
2463     splitmux->fragment_start_time = splitmux->gop_start_time;
2464     splitmux->fragment_total_bytes = 0;
2465     splitmux->fragment_reference_bytes = 0;
2466
2467     if (splitmux->tc_interval) {
2468       video_time_code_replace (&splitmux->fragment_start_tc,
2469           splitmux->gop_start_tc);
2470       splitmux->next_fragment_start_tc_time =
2471           calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2472           splitmux->fragment_start_time, NULL);
2473       if (!GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2474         GST_WARNING_OBJECT (splitmux,
2475             "Couldn't calculate next fragment start time for timecode mode");
2476         /* shouldn't happen, but reset all and try again with next buffers */
2477         gst_splitmux_reset_timecode (splitmux);
2478       }
2479     }
2480   }
2481
2482   /* And set up to collect the next GOP */
2483   if (!splitmux->reference_ctx->in_eos) {
2484     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2485     splitmux->gop_start_time = new_out_ts;
2486     if (splitmux->tc_interval)
2487       video_time_code_replace (&splitmux->gop_start_tc, splitmux->in_tc);
2488   } else {
2489     /* This is probably already the current state, but just in case: */
2490     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2491     new_out_ts = GST_CLOCK_STIME_NONE;  /* EOS runs until forever */
2492   }
2493
2494   /* And wake all input contexts to send a wake-up event */
2495   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2496   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2497
2498   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2499   splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
2500   splitmux->fragment_reference_bytes += splitmux->gop_reference_bytes;
2501
2502   if (splitmux->gop_total_bytes > 0) {
2503     GST_LOG_OBJECT (splitmux,
2504         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2505         " time %" GST_STIME_FORMAT,
2506         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2507
2508     /* Send this GOP to the output command queue */
2509     cmd = out_cmd_buf_new ();
2510     cmd->start_new_fragment = FALSE;
2511     cmd->max_output_ts = new_out_ts;
2512     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2513         GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
2514     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2515
2516     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2517   }
2518
2519   splitmux->gop_total_bytes = 0;
2520   splitmux->gop_reference_bytes = 0;
2521   return;
2522
2523 error_gop_duration:
2524   GST_ELEMENT_ERROR (splitmux,
2525       STREAM, FAILED, ("Timestamping error on input streams"),
2526       ("Queued GOP time is negative %" GST_STIME_FORMAT,
2527           GST_STIME_ARGS (queued_gop_time)));
2528   return;
2529 error_queued_time:
2530   GST_ELEMENT_ERROR (splitmux,
2531       STREAM, FAILED, ("Timestamping error on input streams"),
2532       ("Queued time is negative. Input went backwards. queued_time - %"
2533           GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2534   return;
2535 }
2536
2537 /* Called with splitmux lock held */
2538 /* Called from each input pad when it is has all the pieces
2539  * for a GOP or EOS, starting with the reference pad which has set the
2540  * splitmux->max_in_running_time
2541  */
2542 static void
2543 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2544 {
2545   GList *cur;
2546   GstEvent *event;
2547
2548   /* On ENDING_FILE, the reference stream sends a command to start a new
2549    * fragment, then releases the GOP for output in the new fragment.
2550    *  If some streams received no buffer during the last GOP that overran,
2551    * because its next buffer has a timestamp bigger than
2552    * ctx->max_in_running_time, its queue is empty. In that case the only
2553    * way to wakeup the output thread is by injecting an event in the
2554    * queue. This usually happen with subtitle streams.
2555    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2556   if (ctx->need_unblock) {
2557     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2558     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2559         GST_EVENT_TYPE_SERIALIZED,
2560         gst_structure_new ("splitmuxsink-unblock", "timestamp",
2561             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2562
2563     GST_SPLITMUX_UNLOCK (splitmux);
2564     gst_pad_send_event (ctx->sinkpad, event);
2565     GST_SPLITMUX_LOCK (splitmux);
2566
2567     ctx->need_unblock = FALSE;
2568     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2569     /* state may have changed while we were unlocked. Loop again if so */
2570     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2571       return;
2572   }
2573
2574   do {
2575     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2576       gboolean ready = TRUE;
2577
2578       /* Iterate each pad, and check that the input running time is at least
2579        * up to the reference running time, and if so handle the collected GOP */
2580       GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
2581           GST_STIME_FORMAT " ctx %p",
2582           GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
2583       for (cur = g_list_first (splitmux->contexts); cur != NULL;
2584           cur = g_list_next (cur)) {
2585         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2586
2587         GST_LOG_OBJECT (splitmux,
2588             "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2589             " EOS %d", tmpctx, tmpctx->sinkpad,
2590             GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2591
2592         if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
2593             tmpctx->in_running_time < splitmux->max_in_running_time &&
2594             !tmpctx->in_eos) {
2595           GST_LOG_OBJECT (splitmux,
2596               "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2597               tmpctx, tmpctx->sinkpad);
2598           ready = FALSE;
2599           break;
2600         }
2601       }
2602       if (ready) {
2603         GST_DEBUG_OBJECT (splitmux,
2604             "Collected GOP is complete. Processing (ctx %p)", ctx);
2605         /* All pads have a complete GOP, release it into the multiqueue */
2606         handle_gathered_gop (splitmux);
2607
2608         /* The user has requested a split, we can split now that the previous GOP
2609          * has been collected to the correct location */
2610         if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2611                 TRUE, FALSE)) {
2612           g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2613         }
2614       }
2615     }
2616
2617     /* If upstream reached EOS we are not expecting more data, no need to wait
2618      * here. */
2619     if (ctx->in_eos)
2620       return;
2621
2622     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2623         !ctx->flushing &&
2624         (ctx->in_running_time >= splitmux->max_in_running_time) &&
2625         (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
2626       /* Some pad is not yet ready, or GOP is being pushed
2627        * either way, sleep and wait to get woken */
2628       GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2629       GST_SPLITMUX_WAIT_INPUT (splitmux);
2630       GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2631     } else {
2632       /* This pad is not ready or the state changed - break out and get another
2633        * buffer / event */
2634       break;
2635     }
2636   } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2637 }
2638
2639 static GstPadProbeReturn
2640 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2641 {
2642   GstSplitMuxSink *splitmux = ctx->splitmux;
2643   GstFlowReturn ret = GST_FLOW_OK;
2644   GstBuffer *buf;
2645   MqStreamBuf *buf_info = NULL;
2646   GstClockTime ts;
2647   gboolean loop_again;
2648   gboolean keyframe = FALSE;
2649
2650   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2651
2652   /* FIXME: Handle buffer lists, until then make it clear they won't work */
2653   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2654     g_warning ("Buffer list handling not implemented");
2655     return GST_PAD_PROBE_DROP;
2656   }
2657   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2658       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2659     GstEvent *event = gst_pad_probe_info_get_event (info);
2660
2661     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2662
2663     switch (GST_EVENT_TYPE (event)) {
2664       case GST_EVENT_SEGMENT:
2665         gst_event_copy_segment (event, &ctx->in_segment);
2666         break;
2667       case GST_EVENT_FLUSH_STOP:
2668         GST_SPLITMUX_LOCK (splitmux);
2669         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2670         ctx->in_eos = FALSE;
2671         ctx->in_running_time = GST_CLOCK_STIME_NONE;
2672         GST_SPLITMUX_UNLOCK (splitmux);
2673         break;
2674       case GST_EVENT_EOS:
2675         GST_SPLITMUX_LOCK (splitmux);
2676         ctx->in_eos = TRUE;
2677
2678         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2679           ret = GST_FLOW_FLUSHING;
2680           goto beach;
2681         }
2682
2683         if (ctx->is_reference) {
2684           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2685           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2686           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2687           /* Wake up other input pads to collect this GOP */
2688           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2689           check_completed_gop (splitmux, ctx);
2690         } else if (splitmux->input_state ==
2691             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2692           /* If we are waiting for a GOP to be completed (ie, for aux
2693            * pads to catch up), then this pad is complete, so check
2694            * if the whole GOP is.
2695            */
2696           check_completed_gop (splitmux, ctx);
2697         }
2698         GST_SPLITMUX_UNLOCK (splitmux);
2699         break;
2700       case GST_EVENT_GAP:{
2701         GstClockTime gap_ts;
2702         GstClockTimeDiff rtime;
2703
2704         gst_event_parse_gap (event, &gap_ts, NULL);
2705         if (gap_ts == GST_CLOCK_TIME_NONE)
2706           break;
2707
2708         GST_SPLITMUX_LOCK (splitmux);
2709
2710         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2711           ret = GST_FLOW_FLUSHING;
2712           goto beach;
2713         }
2714         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2715
2716         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2717             GST_STIME_ARGS (rtime));
2718
2719         if (ctx->is_reference
2720             && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2721           splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
2722           GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2723               GST_STIME_ARGS (splitmux->fragment_start_time));
2724           /* Also take this as the first start time when starting up,
2725            * so that we start counting overflow from the first frame */
2726           if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2727             splitmux->max_in_running_time = splitmux->fragment_start_time;
2728         }
2729
2730         GST_SPLITMUX_UNLOCK (splitmux);
2731         break;
2732       }
2733       default:
2734         break;
2735     }
2736     return GST_PAD_PROBE_PASS;
2737   } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2738     switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2739       case GST_QUERY_ALLOCATION:
2740         return GST_PAD_PROBE_DROP;
2741       default:
2742         return GST_PAD_PROBE_PASS;
2743     }
2744   }
2745
2746   buf = gst_pad_probe_info_get_buffer (info);
2747   buf_info = mq_stream_buf_new ();
2748
2749   if (GST_BUFFER_PTS_IS_VALID (buf))
2750     ts = GST_BUFFER_PTS (buf);
2751   else
2752     ts = GST_BUFFER_DTS (buf);
2753
2754   GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2755
2756   GST_SPLITMUX_LOCK (splitmux);
2757
2758   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2759     ret = GST_FLOW_FLUSHING;
2760     goto beach;
2761   }
2762
2763   /* If this buffer has a timestamp, advance the input timestamp of the
2764    * stream */
2765   if (GST_CLOCK_TIME_IS_VALID (ts)) {
2766     GstClockTimeDiff running_time =
2767         my_segment_to_running_time (&ctx->in_segment, ts);
2768
2769     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2770         GST_STIME_ARGS (running_time));
2771
2772     if (GST_CLOCK_STIME_IS_VALID (running_time)
2773         && running_time > ctx->in_running_time)
2774       ctx->in_running_time = running_time;
2775   }
2776
2777   /* Try to make sure we have a valid running time */
2778   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2779     ctx->in_running_time =
2780         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2781   }
2782
2783   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2784       GST_STIME_ARGS (ctx->in_running_time));
2785
2786   buf_info->run_ts = ctx->in_running_time;
2787   buf_info->buf_size = gst_buffer_get_size (buf);
2788   buf_info->duration = GST_BUFFER_DURATION (buf);
2789
2790   if (ctx->is_reference) {
2791     /* initialize fragment_start_time */
2792     if (splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2793       splitmux->gop_start_time = splitmux->fragment_start_time =
2794           buf_info->run_ts;
2795       GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2796           GST_STIME_ARGS (splitmux->fragment_start_time));
2797
2798       /* Also take this as the first start time when starting up,
2799        * so that we start counting overflow from the first frame */
2800       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2801         splitmux->max_in_running_time = splitmux->fragment_start_time;
2802     }
2803
2804     if (splitmux->tc_interval) {
2805       GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
2806       if (tc_meta) {
2807         video_time_code_replace (&splitmux->in_tc, &tc_meta->tc);
2808
2809         if (!splitmux->fragment_start_tc) {
2810           /* also initialize fragment_start_tc */
2811           video_time_code_replace (&splitmux->gop_start_tc, &tc_meta->tc);
2812           video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
2813
2814           splitmux->next_fragment_start_tc_time =
2815               calculate_next_max_timecode (splitmux, splitmux->in_tc,
2816               ctx->in_running_time, NULL);
2817           GST_DEBUG_OBJECT (splitmux, "Initialize next fragment start tc time %"
2818               GST_TIME_FORMAT,
2819               GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2820         }
2821       }
2822     }
2823
2824     /* Check whether we need to request next keyframe depending on
2825      * current running time */
2826     if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) &&
2827         request_next_keyframe (splitmux, buf, ctx->in_running_time) == FALSE) {
2828       GST_WARNING_OBJECT (splitmux,
2829           "Could not request a keyframe. Files may not split at the exact location they should");
2830     }
2831   }
2832
2833   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
2834       " total GOP bytes %" G_GUINT64_FORMAT,
2835       GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
2836
2837   loop_again = TRUE;
2838   do {
2839     if (ctx->flushing)
2840       break;
2841
2842     switch (splitmux->input_state) {
2843       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
2844         if (ctx->is_releasing) {
2845           /* The pad belonging to this context is being released */
2846           GST_WARNING_OBJECT (pad, "Pad is being released while the muxer is "
2847               "running. Data might not drain correctly");
2848           loop_again = FALSE;
2849         } else if (ctx->is_reference) {
2850           /* This is the reference context. If it's a keyframe,
2851            * it marks the start of a new GOP and we should wait in
2852            * check_completed_gop before continuing, but either way
2853            * (keyframe or no, we'll pass this buffer through after
2854            * so set loop_again to FALSE */
2855           loop_again = FALSE;
2856
2857           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
2858             /* Allow other input pads to catch up to here too */
2859             splitmux->max_in_running_time = ctx->in_running_time;
2860             GST_LOG_OBJECT (splitmux,
2861                 "Max in running time now %" GST_TIME_FORMAT,
2862                 GST_TIME_ARGS (splitmux->max_in_running_time));
2863             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2864             break;
2865           }
2866           GST_INFO_OBJECT (pad,
2867               "Have keyframe with running time %" GST_STIME_FORMAT,
2868               GST_STIME_ARGS (ctx->in_running_time));
2869           keyframe = TRUE;
2870           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2871           splitmux->max_in_running_time = ctx->in_running_time;
2872           GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
2873               GST_TIME_ARGS (splitmux->max_in_running_time));
2874           /* Wake up other input pads to collect this GOP */
2875           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2876           check_completed_gop (splitmux, ctx);
2877         } else {
2878           /* Pass this buffer if the reference ctx is far enough ahead */
2879           if (ctx->in_running_time < splitmux->max_in_running_time) {
2880             loop_again = FALSE;
2881             break;
2882           }
2883
2884           /* We're still waiting for a keyframe on the reference pad, sleep */
2885           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
2886           GST_SPLITMUX_WAIT_INPUT (splitmux);
2887           GST_LOG_OBJECT (pad,
2888               "Done sleeping for GOP start input state now %d",
2889               splitmux->input_state);
2890         }
2891         break;
2892       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
2893         /* We're collecting a GOP. If this is the reference context,
2894          * we need to check if this is a keyframe that marks the start
2895          * of the next GOP. If it is, it marks the end of the GOP we're
2896          * collecting, so sleep and wait until all the other pads also
2897          * reach that timestamp - at which point, we have an entire GOP
2898          * and either go to ENDING_FILE or release this GOP to the muxer and
2899          * go back to COLLECT_GOP_START. */
2900
2901         /* If we overran the target timestamp, it might be time to process
2902          * the GOP, otherwise bail out for more data
2903          */
2904         GST_LOG_OBJECT (pad,
2905             "Checking TS %" GST_STIME_FORMAT " against max %"
2906             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
2907             GST_STIME_ARGS (splitmux->max_in_running_time));
2908
2909         if (ctx->in_running_time < splitmux->max_in_running_time) {
2910           loop_again = FALSE;
2911           break;
2912         }
2913
2914         GST_LOG_OBJECT (pad,
2915             "Collected last packet of GOP. Checking other pads");
2916         check_completed_gop (splitmux, ctx);
2917         break;
2918       }
2919       case SPLITMUX_INPUT_STATE_FINISHING_UP:
2920         loop_again = FALSE;
2921         break;
2922       default:
2923         loop_again = FALSE;
2924         break;
2925     }
2926   }
2927   while (loop_again);
2928
2929   if (keyframe && ctx->is_reference)
2930     splitmux->queued_keyframes++;
2931   buf_info->keyframe = keyframe;
2932
2933   /* Update total input byte counter for overflow detect */
2934   splitmux->gop_total_bytes += buf_info->buf_size;
2935   if (ctx->is_reference) {
2936     splitmux->gop_reference_bytes += buf_info->buf_size;
2937   }
2938
2939   /* Now add this buffer to the queue just before returning */
2940   g_queue_push_head (&ctx->queued_bufs, buf_info);
2941
2942   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
2943       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
2944
2945   GST_SPLITMUX_UNLOCK (splitmux);
2946   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
2947   return GST_PAD_PROBE_PASS;
2948
2949 beach:
2950   GST_SPLITMUX_UNLOCK (splitmux);
2951   if (buf_info)
2952     mq_stream_buf_free (buf_info);
2953   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
2954   return GST_PAD_PROBE_PASS;
2955 }
2956
2957 static void
2958 grow_blocked_queues (GstSplitMuxSink * splitmux)
2959 {
2960   GList *cur;
2961
2962   /* Scan other queues for full-ness and grow them */
2963   for (cur = g_list_first (splitmux->contexts);
2964       cur != NULL; cur = g_list_next (cur)) {
2965     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2966     guint cur_limit;
2967     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
2968
2969     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
2970     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
2971
2972     if (cur_len >= cur_limit) {
2973       cur_limit = cur_len + 1;
2974       GST_DEBUG_OBJECT (tmpctx->q,
2975           "Queue overflowed and needs enlarging. Growing to %u buffers",
2976           cur_limit);
2977       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
2978     }
2979   }
2980 }
2981
2982 static void
2983 handle_q_underrun (GstElement * q, gpointer user_data)
2984 {
2985   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2986   GstSplitMuxSink *splitmux = ctx->splitmux;
2987
2988   GST_SPLITMUX_LOCK (splitmux);
2989   GST_DEBUG_OBJECT (q,
2990       "Queue reported underrun with %d keyframes and %d cmds enqueued",
2991       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2992   grow_blocked_queues (splitmux);
2993   GST_SPLITMUX_UNLOCK (splitmux);
2994 }
2995
2996 static void
2997 handle_q_overrun (GstElement * q, gpointer user_data)
2998 {
2999   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3000   GstSplitMuxSink *splitmux = ctx->splitmux;
3001   gboolean allow_grow = FALSE;
3002
3003   GST_SPLITMUX_LOCK (splitmux);
3004   GST_DEBUG_OBJECT (q,
3005       "Queue reported overrun with %d keyframes and %d cmds enqueued",
3006       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3007
3008   if (splitmux->queued_keyframes < 2) {
3009     /* Less than a full GOP queued, grow the queue */
3010     allow_grow = TRUE;
3011   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3012     allow_grow = TRUE;
3013   } else {
3014     /* If another queue is starved, grow */
3015     GList *cur;
3016     for (cur = g_list_first (splitmux->contexts);
3017         cur != NULL; cur = g_list_next (cur)) {
3018       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3019       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3020         allow_grow = TRUE;
3021       }
3022     }
3023   }
3024   GST_SPLITMUX_UNLOCK (splitmux);
3025
3026   if (allow_grow) {
3027     guint cur_limit;
3028
3029     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3030     cur_limit++;
3031
3032     GST_DEBUG_OBJECT (q,
3033         "Queue overflowed and needs enlarging. Growing to %u buffers",
3034         cur_limit);
3035
3036     g_object_set (q, "max-size-buffers", cur_limit, NULL);
3037   }
3038 }
3039
3040 /* Called with SPLITMUX lock held */
3041 static const gchar *
3042 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3043 {
3044   const gchar *ret = NULL;
3045
3046   if (splitmux->muxerpad_map == NULL)
3047     return NULL;
3048
3049   if (sinkpad_name == NULL) {
3050     GST_WARNING_OBJECT (splitmux,
3051         "Can't look up request pad in pad map without providing a pad name");
3052     return NULL;
3053   }
3054
3055   ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3056   if (ret) {
3057     GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3058         ret);
3059     return g_strdup (ret);
3060   }
3061
3062   return NULL;
3063 }
3064
3065 static GstPad *
3066 gst_splitmux_sink_request_new_pad (GstElement * element,
3067     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3068 {
3069   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3070   GstPadTemplate *mux_template = NULL;
3071   GstPad *ret = NULL, *muxpad = NULL;
3072   GstElement *q;
3073   GstPad *q_sink = NULL, *q_src = NULL;
3074   gchar *gname, *qname;
3075   gboolean is_primary_video = FALSE, is_video = FALSE,
3076       muxer_is_requestpad = FALSE;
3077   MqStreamCtx *ctx;
3078   const gchar *muxer_padname = NULL;
3079
3080   GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3081
3082   GST_SPLITMUX_LOCK (splitmux);
3083   if (!create_muxer (splitmux))
3084     goto fail;
3085   g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3086
3087   if (g_str_equal (templ->name_template, "video") ||
3088       g_str_has_prefix (templ->name_template, "video_aux_")) {
3089     is_primary_video = g_str_equal (templ->name_template, "video");
3090     if (is_primary_video && splitmux->have_video)
3091       goto already_have_video;
3092     is_video = TRUE;
3093   }
3094
3095   /* See if there's a pad map and it lists this pad */
3096   muxer_padname = lookup_muxer_pad (splitmux, name);
3097
3098   if (muxer_padname == NULL) {
3099     if (is_video) {
3100       /* FIXME: Look for a pad template with matching caps, rather than by name */
3101       GST_DEBUG_OBJECT (element,
3102           "searching for pad-template with name 'video_%%u'");
3103       mux_template =
3104           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3105           (splitmux->muxer), "video_%u");
3106
3107       /* Fallback to find sink pad templates named 'video' (flvmux) */
3108       if (!mux_template) {
3109         GST_DEBUG_OBJECT (element,
3110             "searching for pad-template with name 'video'");
3111         mux_template =
3112             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3113             (splitmux->muxer), "video");
3114       }
3115       name = NULL;
3116     } else {
3117       GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3118           templ->name_template);
3119       mux_template =
3120           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3121           (splitmux->muxer), templ->name_template);
3122
3123       /* Fallback to find sink pad templates named 'audio' (flvmux) */
3124       if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3125         GST_DEBUG_OBJECT (element,
3126             "searching for pad-template with name 'audio'");
3127         mux_template =
3128             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3129             (splitmux->muxer), "audio");
3130         name = NULL;
3131       }
3132     }
3133
3134     if (mux_template == NULL) {
3135       GST_DEBUG_OBJECT (element,
3136           "searching for pad-template with name 'sink_%%d'");
3137       mux_template =
3138           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3139           (splitmux->muxer), "sink_%d");
3140       name = NULL;
3141     }
3142     if (mux_template == NULL) {
3143       GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3144       mux_template =
3145           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3146           (splitmux->muxer), "sink");
3147       name = NULL;
3148     }
3149
3150     if (mux_template == NULL) {
3151       GST_ERROR_OBJECT (element,
3152           "unable to find a suitable sink pad-template on the muxer");
3153       goto fail;
3154     }
3155     GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3156         mux_template->name_template);
3157
3158     if (mux_template->presence == GST_PAD_REQUEST) {
3159       GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3160
3161       muxpad =
3162           gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3163       muxer_is_requestpad = TRUE;
3164     } else if (mux_template->presence == GST_PAD_ALWAYS) {
3165       GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3166
3167       muxpad =
3168           gst_element_get_static_pad (splitmux->muxer,
3169           mux_template->name_template);
3170     } else {
3171       GST_ERROR_OBJECT (element,
3172           "unexpected pad presence %d", mux_template->presence);
3173       goto fail;
3174     }
3175   } else {
3176     /* Have a muxer pad name */
3177     if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3178       if ((muxpad =
3179               gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3180         muxer_is_requestpad = TRUE;
3181     }
3182     g_free ((gchar *) muxer_padname);
3183     muxer_padname = NULL;
3184   }
3185
3186   /* One way or another, we must have a muxer pad by now */
3187   if (muxpad == NULL)
3188     goto fail;
3189
3190   if (is_primary_video)
3191     gname = g_strdup ("video");
3192   else if (name == NULL)
3193     gname = gst_pad_get_name (muxpad);
3194   else
3195     gname = g_strdup (name);
3196
3197   qname = g_strdup_printf ("queue_%s", gname);
3198   if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3199     g_free (qname);
3200     goto fail;
3201   }
3202   g_free (qname);
3203
3204   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3205
3206   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3207       "max-size-buffers", 5, NULL);
3208
3209   q_sink = gst_element_get_static_pad (q, "sink");
3210   q_src = gst_element_get_static_pad (q, "src");
3211
3212   if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3213     if (muxer_is_requestpad)
3214       gst_element_release_request_pad (splitmux->muxer, muxpad);
3215     gst_object_unref (GST_OBJECT (muxpad));
3216     goto fail;
3217   }
3218
3219   gst_object_unref (GST_OBJECT (muxpad));
3220
3221   ctx = mq_stream_ctx_new (splitmux);
3222   /* Context holds a ref: */
3223   ctx->q = gst_object_ref (q);
3224   ctx->srcpad = q_src;
3225   ctx->sinkpad = q_sink;
3226   ctx->q_overrun_id =
3227       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3228   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3229
3230   ctx->src_pad_block_id =
3231       gst_pad_add_probe (q_src,
3232       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3233       (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3234   if (is_primary_video && splitmux->reference_ctx != NULL) {
3235     splitmux->reference_ctx->is_reference = FALSE;
3236     splitmux->reference_ctx = NULL;
3237   }
3238   if (splitmux->reference_ctx == NULL) {
3239     splitmux->reference_ctx = ctx;
3240     ctx->is_reference = TRUE;
3241   }
3242
3243   ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3244   g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3245
3246   ctx->sink_pad_block_id =
3247       gst_pad_add_probe (q_sink,
3248       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3249       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3250       (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3251
3252   GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3253       " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3254
3255   splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3256
3257   g_free (gname);
3258
3259   if (is_primary_video)
3260     splitmux->have_video = TRUE;
3261
3262   gst_pad_set_active (ret, TRUE);
3263   gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3264
3265   GST_SPLITMUX_UNLOCK (splitmux);
3266
3267   return ret;
3268 fail:
3269   GST_SPLITMUX_UNLOCK (splitmux);
3270
3271   if (q_sink)
3272     gst_object_unref (q_sink);
3273   if (q_src)
3274     gst_object_unref (q_src);
3275   return NULL;
3276 already_have_video:
3277   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3278   GST_SPLITMUX_UNLOCK (splitmux);
3279   return NULL;
3280 }
3281
3282 static void
3283 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3284 {
3285   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3286   GstPad *muxpad = NULL;
3287   MqStreamCtx *ctx =
3288       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3289
3290   GST_SPLITMUX_LOCK (splitmux);
3291
3292   if (splitmux->muxer == NULL)
3293     goto fail;                  /* Elements don't exist yet - nothing to release */
3294
3295   GST_INFO_OBJECT (pad, "releasing request pad");
3296
3297   muxpad = gst_pad_get_peer (ctx->srcpad);
3298
3299   /* Remove the context from our consideration */
3300   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3301
3302   GST_SPLITMUX_UNLOCK (splitmux);
3303
3304   if (ctx->sink_pad_block_id) {
3305     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3306     gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3307   }
3308
3309   if (ctx->src_pad_block_id)
3310     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3311
3312   GST_SPLITMUX_LOCK (splitmux);
3313
3314   ctx->is_releasing = TRUE;
3315   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3316
3317   /* Can release the context now */
3318   mq_stream_ctx_free (ctx);
3319   if (ctx == splitmux->reference_ctx)
3320     splitmux->reference_ctx = NULL;
3321
3322   /* Release and free the muxer input */
3323   if (muxpad) {
3324     gst_element_release_request_pad (splitmux->muxer, muxpad);
3325     gst_object_unref (muxpad);
3326   }
3327
3328   if (GST_PAD_PAD_TEMPLATE (pad) &&
3329       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3330               (pad)), "video"))
3331     splitmux->have_video = FALSE;
3332
3333   gst_element_remove_pad (element, pad);
3334
3335   /* Reset the internal elements only after all request pads are released */
3336   if (splitmux->contexts == NULL)
3337     gst_splitmux_reset_elements (splitmux);
3338
3339   /* Wake up other input streams to check if the completion conditions have
3340    * changed */
3341   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3342
3343 fail:
3344   GST_SPLITMUX_UNLOCK (splitmux);
3345 }
3346
3347 static GstElement *
3348 create_element (GstSplitMuxSink * splitmux,
3349     const gchar * factory, const gchar * name, gboolean locked)
3350 {
3351   GstElement *ret = gst_element_factory_make (factory, name);
3352   if (ret == NULL) {
3353     g_warning ("Failed to create %s - splitmuxsink will not work", name);
3354     return NULL;
3355   }
3356
3357   if (locked) {
3358     /* Ensure the sink starts in locked state and NULL - it will be changed
3359      * by the filename setting code */
3360     gst_element_set_locked_state (ret, TRUE);
3361     gst_element_set_state (ret, GST_STATE_NULL);
3362   }
3363
3364   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3365     g_warning ("Could not add %s element - splitmuxsink will not work", name);
3366     gst_object_unref (ret);
3367     return NULL;
3368   }
3369
3370   return ret;
3371 }
3372
3373 static gboolean
3374 create_muxer (GstSplitMuxSink * splitmux)
3375 {
3376   /* Create internal elements */
3377   if (splitmux->muxer == NULL) {
3378     GstElement *provided_muxer = NULL;
3379
3380     GST_OBJECT_LOCK (splitmux);
3381     if (splitmux->provided_muxer != NULL)
3382       provided_muxer = gst_object_ref (splitmux->provided_muxer);
3383     GST_OBJECT_UNLOCK (splitmux);
3384
3385     if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3386         (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3387       if ((splitmux->muxer =
3388               create_element (splitmux,
3389                   splitmux->muxer_factory ? splitmux->
3390                   muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3391         goto fail;
3392     } else if (splitmux->async_finalize) {
3393       if ((splitmux->muxer =
3394               create_element (splitmux, splitmux->muxer_factory, "muxer",
3395                   FALSE)) == NULL)
3396         goto fail;
3397       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3398         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3399             splitmux->muxer_preset);
3400       if (splitmux->muxer_properties)
3401         gst_structure_foreach (splitmux->muxer_properties,
3402             _set_property_from_structure, splitmux->muxer);
3403     } else {
3404       /* Ensure it's not in locked state (we might be reusing an old element) */
3405       gst_element_set_locked_state (provided_muxer, FALSE);
3406       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3407         g_warning ("Could not add muxer element - splitmuxsink will not work");
3408         gst_object_unref (provided_muxer);
3409         goto fail;
3410       }
3411
3412       splitmux->muxer = provided_muxer;
3413       gst_object_unref (provided_muxer);
3414     }
3415
3416     if (splitmux->use_robust_muxing) {
3417       update_muxer_properties (splitmux);
3418     }
3419   }
3420
3421   return TRUE;
3422 fail:
3423   return FALSE;
3424 }
3425
3426 static GstElement *
3427 find_sink (GstElement * e)
3428 {
3429   GstElement *res = NULL;
3430   GstIterator *iter;
3431   gboolean done = FALSE;
3432   GValue data = { 0, };
3433
3434   if (!GST_IS_BIN (e))
3435     return e;
3436
3437   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3438     return e;
3439
3440   iter = gst_bin_iterate_sinks (GST_BIN (e));
3441   while (!done) {
3442     switch (gst_iterator_next (iter, &data)) {
3443       case GST_ITERATOR_OK:
3444       {
3445         GstElement *child = g_value_get_object (&data);
3446         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3447                 "location") != NULL) {
3448           res = child;
3449           done = TRUE;
3450         }
3451         g_value_reset (&data);
3452         break;
3453       }
3454       case GST_ITERATOR_RESYNC:
3455         gst_iterator_resync (iter);
3456         break;
3457       case GST_ITERATOR_DONE:
3458         done = TRUE;
3459         break;
3460       case GST_ITERATOR_ERROR:
3461         g_assert_not_reached ();
3462         break;
3463     }
3464   }
3465   g_value_unset (&data);
3466   gst_iterator_free (iter);
3467
3468   return res;
3469 }
3470
3471 static gboolean
3472 create_sink (GstSplitMuxSink * splitmux)
3473 {
3474   GstElement *provided_sink = NULL;
3475
3476   if (splitmux->active_sink == NULL) {
3477
3478     GST_OBJECT_LOCK (splitmux);
3479     if (splitmux->provided_sink != NULL)
3480       provided_sink = gst_object_ref (splitmux->provided_sink);
3481     GST_OBJECT_UNLOCK (splitmux);
3482
3483     if ((!splitmux->async_finalize && provided_sink == NULL) ||
3484         (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3485       if ((splitmux->sink =
3486               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3487         goto fail;
3488       splitmux->active_sink = splitmux->sink;
3489     } else if (splitmux->async_finalize) {
3490       if ((splitmux->sink =
3491               create_element (splitmux, splitmux->sink_factory, "sink",
3492                   TRUE)) == NULL)
3493         goto fail;
3494       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3495         gst_preset_load_preset (GST_PRESET (splitmux->sink),
3496             splitmux->sink_preset);
3497       if (splitmux->sink_properties)
3498         gst_structure_foreach (splitmux->sink_properties,
3499             _set_property_from_structure, splitmux->sink);
3500       splitmux->active_sink = splitmux->sink;
3501     } else {
3502       /* Ensure the sink starts in locked state and NULL - it will be changed
3503        * by the filename setting code */
3504       gst_element_set_locked_state (provided_sink, TRUE);
3505       gst_element_set_state (provided_sink, GST_STATE_NULL);
3506       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3507         g_warning ("Could not add sink elements - splitmuxsink will not work");
3508         gst_object_unref (provided_sink);
3509         goto fail;
3510       }
3511
3512       splitmux->active_sink = provided_sink;
3513
3514       /* The bin holds a ref now, we can drop our tmp ref */
3515       gst_object_unref (provided_sink);
3516
3517       /* Find the sink element */
3518       splitmux->sink = find_sink (splitmux->active_sink);
3519       if (splitmux->sink == NULL) {
3520         g_warning
3521             ("Could not locate sink element in provided sink - splitmuxsink will not work");
3522         goto fail;
3523       }
3524     }
3525
3526 #if 1
3527     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3528             "async") != NULL) {
3529       /* async child elements are causing state change races and weird
3530        * failures, so let's try and turn that off */
3531       g_object_set (splitmux->sink, "async", FALSE, NULL);
3532     }
3533 #endif
3534
3535     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3536       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3537       goto fail;
3538     }
3539   }
3540
3541   return TRUE;
3542 fail:
3543   return FALSE;
3544 }
3545
3546 #ifdef __GNUC__
3547 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3548 #endif
3549 static void
3550 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3551 {
3552   gchar *fname = NULL;
3553   GstSample *sample;
3554   GstCaps *caps;
3555
3556   gst_splitmux_sink_ensure_max_files (splitmux);
3557
3558   if (ctx->cur_out_buffer == NULL) {
3559     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3560   }
3561
3562   caps = gst_pad_get_current_caps (ctx->srcpad);
3563   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3564   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3565       splitmux->fragment_id, sample, &fname);
3566   gst_sample_unref (sample);
3567   if (caps)
3568     gst_caps_unref (caps);
3569
3570   if (fname == NULL) {
3571     /* Fallback to the old signal if the new one returned nothing */
3572     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3573         splitmux->fragment_id, &fname);
3574   }
3575
3576   if (!fname)
3577     fname = splitmux->location ?
3578         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3579
3580   if (fname) {
3581     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3582     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3583             "location") != NULL)
3584       g_object_set (splitmux->sink, "location", fname, NULL);
3585     g_free (fname);
3586   }
3587
3588   splitmux->fragment_id++;
3589 }
3590
3591 /* called with GST_SPLITMUX_LOCK */
3592 static void
3593 do_async_start (GstSplitMuxSink * splitmux)
3594 {
3595   GstMessage *message;
3596
3597   if (!splitmux->need_async_start) {
3598     GST_INFO_OBJECT (splitmux, "no async_start needed");
3599     return;
3600   }
3601
3602   splitmux->async_pending = TRUE;
3603
3604   GST_INFO_OBJECT (splitmux, "Sending async_start message");
3605   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3606
3607   GST_SPLITMUX_UNLOCK (splitmux);
3608   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3609       (splitmux), message);
3610   GST_SPLITMUX_LOCK (splitmux);
3611 }
3612
3613 /* called with GST_SPLITMUX_LOCK */
3614 static void
3615 do_async_done (GstSplitMuxSink * splitmux)
3616 {
3617   GstMessage *message;
3618
3619   if (splitmux->async_pending) {
3620     GST_INFO_OBJECT (splitmux, "Sending async_done message");
3621     splitmux->async_pending = FALSE;
3622     GST_SPLITMUX_UNLOCK (splitmux);
3623
3624     message =
3625         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3626         GST_CLOCK_TIME_NONE);
3627     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3628         (splitmux), message);
3629     GST_SPLITMUX_LOCK (splitmux);
3630   }
3631
3632   splitmux->need_async_start = FALSE;
3633 }
3634
3635 static void
3636 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3637 {
3638   splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3639   splitmux->gop_start_time = splitmux->fragment_start_time =
3640       GST_CLOCK_STIME_NONE;
3641   splitmux->max_out_running_time = 0;
3642   splitmux->fragment_total_bytes = 0;
3643   splitmux->fragment_reference_bytes = 0;
3644   splitmux->gop_total_bytes = 0;
3645   splitmux->gop_reference_bytes = 0;
3646   splitmux->muxed_out_bytes = 0;
3647   splitmux->ready_for_output = FALSE;
3648
3649   g_atomic_int_set (&(splitmux->split_requested), FALSE);
3650   g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3651
3652   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
3653   gst_queue_array_clear (splitmux->times_to_split);
3654
3655   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
3656   splitmux->queued_keyframes = 0;
3657
3658   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
3659   g_queue_clear (&splitmux->out_cmd_q);
3660 }
3661
3662 static GstStateChangeReturn
3663 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3664 {
3665   GstStateChangeReturn ret;
3666   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3667
3668   switch (transition) {
3669     case GST_STATE_CHANGE_NULL_TO_READY:{
3670       GST_SPLITMUX_LOCK (splitmux);
3671       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3672         ret = GST_STATE_CHANGE_FAILURE;
3673         GST_SPLITMUX_UNLOCK (splitmux);
3674         goto beach;
3675       }
3676       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3677       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3678       GST_SPLITMUX_UNLOCK (splitmux);
3679       splitmux->fragment_id = splitmux->start_index;
3680       break;
3681     }
3682     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3683       GST_SPLITMUX_LOCK (splitmux);
3684       /* Make sure contexts and tracking times are cleared, in case we're being reused */
3685       gst_splitmux_sink_reset (splitmux);
3686       /* Start by collecting one input on each pad */
3687       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3688       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3689
3690       GST_SPLITMUX_UNLOCK (splitmux);
3691
3692       GST_SPLITMUX_STATE_LOCK (splitmux);
3693       splitmux->shutdown = FALSE;
3694       GST_SPLITMUX_STATE_UNLOCK (splitmux);
3695       break;
3696     }
3697     case GST_STATE_CHANGE_PAUSED_TO_READY:
3698     case GST_STATE_CHANGE_READY_TO_READY:
3699       g_atomic_int_set (&(splitmux->split_requested), FALSE);
3700       g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3701       /* Fall through */
3702     case GST_STATE_CHANGE_READY_TO_NULL:
3703       GST_SPLITMUX_STATE_LOCK (splitmux);
3704       splitmux->shutdown = TRUE;
3705       GST_SPLITMUX_STATE_UNLOCK (splitmux);
3706
3707       GST_SPLITMUX_LOCK (splitmux);
3708       gst_splitmux_sink_reset (splitmux);
3709       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3710       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3711       /* Wake up any blocked threads */
3712       GST_LOG_OBJECT (splitmux,
3713           "State change -> NULL or READY. Waking threads");
3714       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3715       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
3716       GST_SPLITMUX_UNLOCK (splitmux);
3717       break;
3718     default:
3719       break;
3720   }
3721
3722   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3723   if (ret == GST_STATE_CHANGE_FAILURE)
3724     goto beach;
3725
3726   switch (transition) {
3727     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3728       splitmux->need_async_start = TRUE;
3729       break;
3730     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3731       /* Change state async, because our child sink might not
3732        * be ready to do that for us yet if it's state is still locked */
3733
3734       splitmux->need_async_start = TRUE;
3735       /* we want to go async to PAUSED until we managed to configure and add the
3736        * sink */
3737       GST_SPLITMUX_LOCK (splitmux);
3738       do_async_start (splitmux);
3739       GST_SPLITMUX_UNLOCK (splitmux);
3740       ret = GST_STATE_CHANGE_ASYNC;
3741       break;
3742     }
3743     case GST_STATE_CHANGE_READY_TO_NULL:
3744       GST_SPLITMUX_LOCK (splitmux);
3745       splitmux->fragment_id = 0;
3746       /* Reset internal elements only if no pad contexts are using them */
3747       if (splitmux->contexts == NULL)
3748         gst_splitmux_reset_elements (splitmux);
3749       do_async_done (splitmux);
3750       GST_SPLITMUX_UNLOCK (splitmux);
3751       break;
3752     default:
3753       break;
3754   }
3755
3756   return ret;
3757
3758 beach:
3759   if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
3760     /* Cleanup elements on failed transition out of NULL */
3761     gst_splitmux_reset_elements (splitmux);
3762     GST_SPLITMUX_LOCK (splitmux);
3763     do_async_done (splitmux);
3764     GST_SPLITMUX_UNLOCK (splitmux);
3765   }
3766   if (transition == GST_STATE_CHANGE_READY_TO_READY) {
3767     /* READY to READY transition only happens when we're already
3768      * in READY state, but a child element is in NULL, which
3769      * happens when there's an error changing the state of the sink.
3770      * We need to make sure not to fail the state transition, or
3771      * the core won't transition us back to NULL successfully */
3772     ret = GST_STATE_CHANGE_SUCCESS;
3773   }
3774   return ret;
3775 }
3776
3777 static void
3778 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
3779 {
3780   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
3781     splitmux->fragment_id = 0;
3782   }
3783 }
3784
3785 static void
3786 split_now (GstSplitMuxSink * splitmux)
3787 {
3788   g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
3789 }
3790
3791 static void
3792 split_after (GstSplitMuxSink * splitmux)
3793 {
3794   g_atomic_int_set (&(splitmux->split_requested), TRUE);
3795 }
3796
3797 static void
3798 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
3799 {
3800   gboolean send_keyframe_requests;
3801
3802   GST_SPLITMUX_LOCK (splitmux);
3803   gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
3804   send_keyframe_requests = splitmux->send_keyframe_requests;
3805   GST_SPLITMUX_UNLOCK (splitmux);
3806
3807   if (send_keyframe_requests) {
3808     GstEvent *ev =
3809         gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
3810     GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
3811         GST_TIME_ARGS (split_time));
3812     if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
3813       GST_WARNING_OBJECT (splitmux,
3814           "Could not request keyframe at %" GST_TIME_FORMAT,
3815           GST_TIME_ARGS (split_time));
3816     }
3817   }
3818 }