Merging gst-omx
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @title: splitmuxsink
23  * @short_description: Muxer wrapper for splitting output stream by size or time
24  *
25  * This element wraps a muxer and a sink, and starts a new file when the mux
26  * contents are about to cross a threshold of maximum size of maximum time,
27  * splitting at video keyframe boundaries. Exactly one input video stream
28  * can be muxed, with as many accompanying audio and subtitle streams as
29  * desired.
30  *
31  * By default, it uses mp4mux and filesink, but they can be changed via
32  * the 'muxer' and 'sink' properties.
33  *
34  * The minimum file size is 1 GOP, however - so limits may be overrun if the
35  * distance between any 2 keyframes is larger than the limits.
36  *
37  * If a video stream is available, the splitting process is driven by the video
38  * stream contents, and the video stream must contain closed GOPs for the output
39  * file parts to be played individually correctly. In the absence of a video
40  * stream, the first available stream is used as reference for synchronization.
41  *
42  * In the async-finalize mode, when the threshold is crossed, the old muxer
43  * and sink is disconnected from the pipeline and left to finish the file
44  * asynchronously, and a new muxer and sink is created to continue with the
45  * next fragment. For that reason, instead of muxer and sink objects, the
46  * muxer-factory and sink-factory properties are used to construct the new
47  * objects, together with muxer-properties and sink-properties.
48  *
49  * ## Example pipelines
50  * |[
51  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52  * ]|
53  * Records a video stream captured from a v4l2 device and muxes it into
54  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55  * and 1MB maximum size.
56  *
57  * |[
58  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59  * ]|
60  * Records a video stream captured from a v4l2 device and muxer it into
61  * streamable Matroska files, splitting as needed to limit size/duration to 10
62  * seconds. Each file will finalize asynchronously.
63  *
64  * |[
65  * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
66  * ]|
67  * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
68  * it will deliver to.
69  */
70
71 #ifdef HAVE_CONFIG_H
72 #include "config.h"
73 #endif
74
75 #include <string.h>
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
79
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
82
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
85
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
90
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
93
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97     GstClockTime split_time);
98
99 enum
100 {
101   PROP_0,
102   PROP_LOCATION,
103   PROP_START_INDEX,
104   PROP_MAX_SIZE_TIME,
105   PROP_MAX_SIZE_BYTES,
106   PROP_MAX_SIZE_TIMECODE,
107   PROP_SEND_KEYFRAME_REQUESTS,
108   PROP_MAX_FILES,
109   PROP_MUXER_OVERHEAD,
110   PROP_USE_ROBUST_MUXING,
111   PROP_ALIGNMENT_THRESHOLD,
112   PROP_MUXER,
113   PROP_SINK,
114   PROP_RESET_MUXER,
115   PROP_ASYNC_FINALIZE,
116   PROP_MUXER_FACTORY,
117   PROP_MUXER_PRESET,
118   PROP_MUXER_PROPERTIES,
119   PROP_SINK_FACTORY,
120   PROP_SINK_PRESET,
121   PROP_SINK_PROPERTIES,
122   PROP_MUXERPAD_MAP
123 };
124
125 #define DEFAULT_MAX_SIZE_TIME       0
126 #define DEFAULT_MAX_SIZE_BYTES      0
127 #define DEFAULT_MAX_FILES           0
128 #define DEFAULT_MUXER_OVERHEAD      0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
137
138 typedef struct _AsyncEosHelper
139 {
140   MqStreamCtx *ctx;
141   GstPad *pad;
142 } AsyncEosHelper;
143
144 enum
145 {
146   SIGNAL_FORMAT_LOCATION,
147   SIGNAL_FORMAT_LOCATION_FULL,
148   SIGNAL_SPLIT_NOW,
149   SIGNAL_SPLIT_AFTER,
150   SIGNAL_SPLIT_AT_RUNNING_TIME,
151   SIGNAL_MUXER_ADDED,
152   SIGNAL_SINK_ADDED,
153   SIGNAL_LAST
154 };
155
156 static guint signals[SIGNAL_LAST];
157
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
160     GST_PAD_SINK,
161     GST_PAD_REQUEST,
162     GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
165     GST_PAD_SINK,
166     GST_PAD_REQUEST,
167     GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
170     GST_PAD_SINK,
171     GST_PAD_REQUEST,
172     GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
175     GST_PAD_SINK,
176     GST_PAD_REQUEST,
177     GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
180     GST_PAD_SINK,
181     GST_PAD_REQUEST,
182     GST_STATIC_CAPS_ANY);
183
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188  * to forward an incoming EOS message, but we cannot rely on the state of the
189  * splitmux anymore, so we set this qdata on the sink instead.
190  * The muxer and sink must be destroyed after both of these things have
191  * finished:
192  * 1) The EOS message has been sent when the fragment is ending
193  * 2) The muxer has been unlinked and relinked
194  * Therefore, EOS_FROM_US can have these two values:
195  * 0: EOS was not requested from us. Forward the message. The muxer and the
196  * sink will be destroyed together with the rest of the bin.
197  * 1: EOS was requested from us, but the other of the two tasks hasn't
198  * finished. Set EOS_FROM_US to 2 and do your stuff.
199  * 2: EOS was requested from us and the other of the two tasks has finished.
200  * Now we can destroy the muxer and the sink.
201  */
202
203 static void
204 _do_init (void)
205 {
206   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207   EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208   RUNNING_TIME = g_quark_from_static_string ("running-time");
209   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210       "Split File Muxing Sink");
211 }
212
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
215     _do_init ());
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217     GST_TYPE_SPLITMUX_SINK);
218
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222     const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224     GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
227
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
231
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233     element, GstStateChange transition);
234
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
238     MqStreamCtx * ctx);
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
241
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244     const gchar * factory, const gchar * name, gboolean locked);
245
246 static void do_async_done (GstSplitMuxSink * splitmux);
247 static void gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux);
248
249 static MqStreamBuf *
250 mq_stream_buf_new (void)
251 {
252   return g_slice_new0 (MqStreamBuf);
253 }
254
255 static void
256 mq_stream_buf_free (MqStreamBuf * data)
257 {
258   g_slice_free (MqStreamBuf, data);
259 }
260
261 static SplitMuxOutputCommand *
262 out_cmd_buf_new (void)
263 {
264   return g_slice_new0 (SplitMuxOutputCommand);
265 }
266
267 static void
268 out_cmd_buf_free (SplitMuxOutputCommand * data)
269 {
270   g_slice_free (SplitMuxOutputCommand, data);
271 }
272
273 static void
274 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
275 {
276   GObjectClass *gobject_class = (GObjectClass *) klass;
277   GstElementClass *gstelement_class = (GstElementClass *) klass;
278   GstBinClass *gstbin_class = (GstBinClass *) klass;
279
280   gobject_class->set_property = gst_splitmux_sink_set_property;
281   gobject_class->get_property = gst_splitmux_sink_get_property;
282   gobject_class->dispose = gst_splitmux_sink_dispose;
283   gobject_class->finalize = gst_splitmux_sink_finalize;
284
285   gst_element_class_set_static_metadata (gstelement_class,
286       "Split Muxing Bin", "Generic/Bin/Muxer",
287       "Convenience bin that muxes incoming streams into multiple time/size limited files",
288       "Jan Schmidt <jan@centricular.com>");
289
290   gst_element_class_add_static_pad_template (gstelement_class,
291       &video_sink_template);
292   gst_element_class_add_static_pad_template (gstelement_class,
293       &video_aux_sink_template);
294   gst_element_class_add_static_pad_template (gstelement_class,
295       &audio_sink_template);
296   gst_element_class_add_static_pad_template (gstelement_class,
297       &subtitle_sink_template);
298   gst_element_class_add_static_pad_template (gstelement_class,
299       &caption_sink_template);
300
301   gstelement_class->change_state =
302       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
303   gstelement_class->request_new_pad =
304       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
305   gstelement_class->release_pad =
306       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
307
308   gstbin_class->handle_message = bus_handler;
309
310   g_object_class_install_property (gobject_class, PROP_LOCATION,
311       g_param_spec_string ("location", "File Output Pattern",
312           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
313           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
314   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
315       g_param_spec_double ("mux-overhead", "Muxing Overhead",
316           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
317           DEFAULT_MUXER_OVERHEAD,
318           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
319
320   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
321       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
322           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
323           DEFAULT_MAX_SIZE_TIME,
324           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
325           G_PARAM_STATIC_STRINGS));
326   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
327       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
328           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
329           DEFAULT_MAX_SIZE_BYTES,
330           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
331           G_PARAM_STATIC_STRINGS));
332   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
333       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
334           "Maximum difference in timecode between first and last frame. "
335           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
336           "Will only be effective if a timecode track is present.", NULL,
337           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
338           G_PARAM_STATIC_STRINGS));
339   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
340       g_param_spec_boolean ("send-keyframe-requests",
341           "Request keyframes at max-size-time",
342           "Request a keyframe every max-size-time ns to try splitting at that point. "
343           "Needs max-size-bytes to be 0 in order to be effective.",
344           DEFAULT_SEND_KEYFRAME_REQUESTS,
345           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
346           G_PARAM_STATIC_STRINGS));
347   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
348       g_param_spec_uint ("max-files", "Max files",
349           "Maximum number of files to keep on disk. Once the maximum is reached,"
350           "old files start to be deleted to make room for new ones.", 0,
351           G_MAXUINT, DEFAULT_MAX_FILES,
352           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
354       g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
355           "Allow non-reference streams to be that many ns before the reference"
356           " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
357           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
358           G_PARAM_STATIC_STRINGS));
359
360   g_object_class_install_property (gobject_class, PROP_MUXER,
361       g_param_spec_object ("muxer", "Muxer",
362           "The muxer element to use (NULL = default mp4mux). "
363           "Valid only for async-finalize = FALSE",
364           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
365   g_object_class_install_property (gobject_class, PROP_SINK,
366       g_param_spec_object ("sink", "Sink",
367           "The sink element (or element chain) to use (NULL = default filesink). "
368           "Valid only for async-finalize = FALSE",
369           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
370
371   g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
372       g_param_spec_boolean ("use-robust-muxing",
373           "Support robust-muxing mode of some muxers",
374           "Check if muxers support robust muxing via the reserved-max-duration and "
375           "reserved-duration-remaining properties and use them if so. "
376           "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
377           " create new fragments if the reserved header space is about to overflow. "
378           "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
379           "manually by the app to a non-zero value for robust muxing to have an effect.",
380           DEFAULT_USE_ROBUST_MUXING,
381           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
382
383   g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
384       g_param_spec_boolean ("reset-muxer",
385           "Reset Muxer",
386           "Reset the muxer after each segment. Disabling this will not work for most muxers.",
387           DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
388
389   g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
390       g_param_spec_boolean ("async-finalize",
391           "Finalize fragments asynchronously",
392           "Finalize each fragment asynchronously and start a new one",
393           DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
394   g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
395       g_param_spec_string ("muxer-factory", "Muxer factory",
396           "The muxer element factory to use (default = mp4mux). "
397           "Valid only for async-finalize = TRUE",
398           "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399   /**
400    * GstSplitMuxSink:muxer-preset
401    *
402    * An optional #GstPreset name to use for the muxer. This only has an effect
403    * in `async-finalize=TRUE` mode.
404    *
405    * Since: 1.18
406    */
407   g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
408       g_param_spec_string ("muxer-preset", "Muxer preset",
409           "The muxer preset to use. "
410           "Valid only for async-finalize = TRUE",
411           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
412   g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
413       g_param_spec_boxed ("muxer-properties", "Muxer properties",
414           "The muxer element properties to use. "
415           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
416           "Valid only for async-finalize = TRUE",
417           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
418   g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
419       g_param_spec_string ("sink-factory", "Sink factory",
420           "The sink element factory to use (default = filesink). "
421           "Valid only for async-finalize = TRUE",
422           "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
423   /**
424    * GstSplitMuxSink:sink-preset
425    *
426    * An optional #GstPreset name to use for the sink. This only has an effect
427    * in `async-finalize=TRUE` mode.
428    *
429    * Since: 1.18
430    */
431   g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
432       g_param_spec_string ("sink-preset", "Sink preset",
433           "The sink preset to use. "
434           "Valid only for async-finalize = TRUE",
435           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
436   g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
437       g_param_spec_boxed ("sink-properties", "Sink properties",
438           "The sink element properties to use. "
439           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
440           "Valid only for async-finalize = TRUE",
441           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
442   g_object_class_install_property (gobject_class, PROP_START_INDEX,
443       g_param_spec_int ("start-index", "Start Index",
444           "Start value of fragment index.",
445           0, G_MAXINT, DEFAULT_START_INDEX,
446           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
447
448   /**
449    * GstSplitMuxSink::muxer-pad-map
450    *
451    * An optional GstStructure that provides a map from splitmuxsink sinkpad
452    * names to muxer pad names they should feed. Splitmuxsink has some default
453    * mapping behaviour to link video to video pads and audio to audio pads
454    * that usually works fine. This property is useful if you need to ensure
455    * a particular mapping to muxed streams.
456    *
457    * The GstStructure contains string fields like so:
458    *   splitmuxsink muxer-pad-map=x-pad-map,video=video_1
459    *
460    * Since: 1.18
461    */
462   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
463       g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
464           "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
465           GST_TYPE_STRUCTURE,
466           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
467
468   /**
469    * GstSplitMuxSink::format-location:
470    * @splitmux: the #GstSplitMuxSink
471    * @fragment_id: the sequence number of the file to be created
472    *
473    * Returns: the location to be used for the next output file. This must be
474    *    a newly-allocated string which will be freed with g_free() by the
475    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
476    *    g_strdup_printf() or similar functions to allocate it.
477    */
478   signals[SIGNAL_FORMAT_LOCATION] =
479       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
480       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
481
482   /**
483    * GstSplitMuxSink::format-location-full:
484    * @splitmux: the #GstSplitMuxSink
485    * @fragment_id: the sequence number of the file to be created
486    * @first_sample: A #GstSample containing the first buffer
487    *   from the reference stream in the new file
488    *
489    * Returns: the location to be used for the next output file. This must be
490    *    a newly-allocated string which will be freed with g_free() by the
491    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
492    *    g_strdup_printf() or similar functions to allocate it.
493    *
494    * Since: 1.12
495    */
496   signals[SIGNAL_FORMAT_LOCATION_FULL] =
497       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
498       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
499       GST_TYPE_SAMPLE);
500
501   /**
502    * GstSplitMuxSink::split-now:
503    * @splitmux: the #GstSplitMuxSink
504    *
505    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
506    * The current GOP will be output to the new file.
507    *
508    * Since: 1.14
509    */
510   signals[SIGNAL_SPLIT_NOW] =
511       g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
512       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
513       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
514       G_TYPE_NONE, 0);
515
516   /**
517    * GstSplitMuxSink::split-after:
518    * @splitmux: the #GstSplitMuxSink
519    *
520    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
521    * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
522    *
523    * Since: 1.16
524    */
525   signals[SIGNAL_SPLIT_AFTER] =
526       g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
527       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
528       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
529       G_TYPE_NONE, 0);
530
531   /**
532    * GstSplitMuxSink::split-at-running-time:
533    * @splitmux: the #GstSplitMuxSink
534    *
535    * When called by the user, this action signal splits the video file (and
536    * begins a new one) as soon as the given running time is reached. If this
537    * action signal is called multiple times, running times are queued up and
538    * processed in the order they were given.
539    *
540    * Note that this is prone to race conditions, where said running time is
541    * reached and surpassed before we had a chance to split. The file will
542    * still split immediately, but in order to make sure that the split doesn't
543    * happen too late, it is recommended to call this action signal from
544    * something that will prevent further buffers from flowing into
545    * splitmuxsink before the split is completed, such as a pad probe before
546    * splitmuxsink.
547    *
548    *
549    * Since: 1.16
550    */
551   signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
552       g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
553       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
554       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
555       NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
556
557   /**
558    * GstSplitMuxSink::muxer-added:
559    * @splitmux: the #GstSplitMuxSink
560    * @muxer: the newly added muxer element
561    *
562    * Since: 1.14
563    */
564   signals[SIGNAL_MUXER_ADDED] =
565       g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
566       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
567
568   /**
569    * GstSplitMuxSink::sink-added:
570    * @splitmux: the #GstSplitMuxSink
571    * @sink: the newly added sink element
572    *
573    * Since: 1.14
574    */
575   signals[SIGNAL_SINK_ADDED] =
576       g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
577       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
578
579   klass->split_now = split_now;
580   klass->split_after = split_after;
581   klass->split_at_running_time = split_at_running_time;
582 }
583
584 static void
585 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
586 {
587   g_mutex_init (&splitmux->lock);
588   g_mutex_init (&splitmux->state_lock);
589   g_cond_init (&splitmux->input_cond);
590   g_cond_init (&splitmux->output_cond);
591   g_queue_init (&splitmux->out_cmd_q);
592
593   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
594   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
595   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
596   splitmux->max_files = DEFAULT_MAX_FILES;
597   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
598   splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
599   splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
600   splitmux->reset_muxer = DEFAULT_RESET_MUXER;
601
602   splitmux->threshold_timecode_str = NULL;
603   gst_splitmux_reset_timecode (splitmux);
604
605   splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
606   splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
607   splitmux->muxer_properties = NULL;
608   splitmux->sink_factory = g_strdup (DEFAULT_SINK);
609   splitmux->sink_properties = NULL;
610
611   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
612   splitmux->split_requested = FALSE;
613   splitmux->do_split_next_gop = FALSE;
614   splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
615   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
616 }
617
618 static void
619 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
620 {
621   if (splitmux->muxer) {
622     gst_element_set_locked_state (splitmux->muxer, TRUE);
623     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
624     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
625   }
626   if (splitmux->active_sink) {
627     gst_element_set_locked_state (splitmux->active_sink, TRUE);
628     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
629     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
630   }
631
632   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
633 }
634
635 static void
636 gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux)
637 {
638   g_clear_pointer (&splitmux->in_tc, gst_video_time_code_free);
639   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
640   g_clear_pointer (&splitmux->gop_start_tc, gst_video_time_code_free);
641   splitmux->next_fragment_start_tc_time = GST_CLOCK_TIME_NONE;
642 }
643
644 static void
645 gst_splitmux_sink_dispose (GObject * object)
646 {
647   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
648
649   /* Calling parent dispose invalidates all child pointers */
650   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
651
652   G_OBJECT_CLASS (parent_class)->dispose (object);
653 }
654
655 static void
656 gst_splitmux_sink_finalize (GObject * object)
657 {
658   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
659   g_cond_clear (&splitmux->input_cond);
660   g_cond_clear (&splitmux->output_cond);
661   g_mutex_clear (&splitmux->lock);
662   g_mutex_clear (&splitmux->state_lock);
663   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
664   g_queue_clear (&splitmux->out_cmd_q);
665
666   if (splitmux->muxerpad_map)
667     gst_structure_free (splitmux->muxerpad_map);
668
669   if (splitmux->provided_sink)
670     gst_object_unref (splitmux->provided_sink);
671   if (splitmux->provided_muxer)
672     gst_object_unref (splitmux->provided_muxer);
673
674   if (splitmux->muxer_factory)
675     g_free (splitmux->muxer_factory);
676   if (splitmux->muxer_preset)
677     g_free (splitmux->muxer_preset);
678   if (splitmux->muxer_properties)
679     gst_structure_free (splitmux->muxer_properties);
680   if (splitmux->sink_factory)
681     g_free (splitmux->sink_factory);
682   if (splitmux->sink_preset)
683     g_free (splitmux->sink_preset);
684   if (splitmux->sink_properties)
685     gst_structure_free (splitmux->sink_properties);
686
687   if (splitmux->threshold_timecode_str)
688     g_free (splitmux->threshold_timecode_str);
689   if (splitmux->tc_interval)
690     gst_video_time_code_interval_free (splitmux->tc_interval);
691
692   if (splitmux->times_to_split)
693     gst_queue_array_free (splitmux->times_to_split);
694
695   g_free (splitmux->location);
696
697   /* Make sure to free any un-released contexts. There should not be any,
698    * because the dispose will have freed all request pads though */
699   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
700   g_list_free (splitmux->contexts);
701   gst_splitmux_reset_timecode (splitmux);
702
703   G_OBJECT_CLASS (parent_class)->finalize (object);
704 }
705
706 /*
707  * Set any time threshold to the muxer, if it has
708  * reserved-max-duration and reserved-duration-remaining
709  * properties. Called when creating/claiming the muxer
710  * in create_elements() */
711 static void
712 update_muxer_properties (GstSplitMuxSink * sink)
713 {
714   GObjectClass *klass;
715   GstClockTime threshold_time;
716
717   sink->muxer_has_reserved_props = FALSE;
718   if (sink->muxer == NULL)
719     return;
720   klass = G_OBJECT_GET_CLASS (sink->muxer);
721   if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
722     return;
723   if (g_object_class_find_property (klass,
724           "reserved-duration-remaining") == NULL)
725     return;
726   sink->muxer_has_reserved_props = TRUE;
727
728   GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
729       GST_TIME_ARGS (sink->threshold_time));
730   GST_OBJECT_LOCK (sink);
731   threshold_time = sink->threshold_time;
732   GST_OBJECT_UNLOCK (sink);
733
734   if (threshold_time > 0) {
735     /* Tell the muxer how much space to reserve */
736     GstClockTime muxer_threshold = threshold_time;
737     g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
738   }
739 }
740
741 static void
742 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
743     const GValue * value, GParamSpec * pspec)
744 {
745   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
746
747   switch (prop_id) {
748     case PROP_LOCATION:{
749       GST_OBJECT_LOCK (splitmux);
750       g_free (splitmux->location);
751       splitmux->location = g_value_dup_string (value);
752       GST_OBJECT_UNLOCK (splitmux);
753       break;
754     }
755     case PROP_START_INDEX:
756       GST_OBJECT_LOCK (splitmux);
757       splitmux->start_index = g_value_get_int (value);
758       GST_OBJECT_UNLOCK (splitmux);
759       break;
760     case PROP_MAX_SIZE_BYTES:
761       GST_OBJECT_LOCK (splitmux);
762       splitmux->threshold_bytes = g_value_get_uint64 (value);
763       GST_OBJECT_UNLOCK (splitmux);
764       break;
765     case PROP_MAX_SIZE_TIME:
766       GST_OBJECT_LOCK (splitmux);
767       splitmux->threshold_time = g_value_get_uint64 (value);
768       GST_OBJECT_UNLOCK (splitmux);
769       break;
770     case PROP_MAX_SIZE_TIMECODE:
771       GST_OBJECT_LOCK (splitmux);
772       g_free (splitmux->threshold_timecode_str);
773       /* will be calculated later */
774       g_clear_pointer (&splitmux->tc_interval,
775           gst_video_time_code_interval_free);
776       gst_splitmux_reset_timecode (splitmux);
777
778       splitmux->threshold_timecode_str = g_value_dup_string (value);
779       if (splitmux->threshold_timecode_str) {
780         splitmux->tc_interval =
781             gst_video_time_code_interval_new_from_string
782             (splitmux->threshold_timecode_str);
783         if (!splitmux->tc_interval) {
784           g_warning ("Wrong timecode string %s",
785               splitmux->threshold_timecode_str);
786           g_free (splitmux->threshold_timecode_str);
787           splitmux->threshold_timecode_str = NULL;
788         }
789       }
790       GST_OBJECT_UNLOCK (splitmux);
791       break;
792     case PROP_SEND_KEYFRAME_REQUESTS:
793       GST_OBJECT_LOCK (splitmux);
794       splitmux->send_keyframe_requests = g_value_get_boolean (value);
795       GST_OBJECT_UNLOCK (splitmux);
796       break;
797     case PROP_MAX_FILES:
798       GST_OBJECT_LOCK (splitmux);
799       splitmux->max_files = g_value_get_uint (value);
800       GST_OBJECT_UNLOCK (splitmux);
801       break;
802     case PROP_MUXER_OVERHEAD:
803       GST_OBJECT_LOCK (splitmux);
804       splitmux->mux_overhead = g_value_get_double (value);
805       GST_OBJECT_UNLOCK (splitmux);
806       break;
807     case PROP_USE_ROBUST_MUXING:
808       GST_OBJECT_LOCK (splitmux);
809       splitmux->use_robust_muxing = g_value_get_boolean (value);
810       GST_OBJECT_UNLOCK (splitmux);
811       if (splitmux->use_robust_muxing)
812         update_muxer_properties (splitmux);
813       break;
814     case PROP_ALIGNMENT_THRESHOLD:
815       GST_OBJECT_LOCK (splitmux);
816       splitmux->alignment_threshold = g_value_get_uint64 (value);
817       GST_OBJECT_UNLOCK (splitmux);
818       break;
819     case PROP_SINK:
820       GST_OBJECT_LOCK (splitmux);
821       gst_clear_object (&splitmux->provided_sink);
822       splitmux->provided_sink = g_value_get_object (value);
823       if (splitmux->provided_sink)
824         gst_object_ref_sink (splitmux->provided_sink);
825       GST_OBJECT_UNLOCK (splitmux);
826       break;
827     case PROP_MUXER:
828       GST_OBJECT_LOCK (splitmux);
829       gst_clear_object (&splitmux->provided_muxer);
830       splitmux->provided_muxer = g_value_get_object (value);
831       if (splitmux->provided_muxer)
832         gst_object_ref_sink (splitmux->provided_muxer);
833       GST_OBJECT_UNLOCK (splitmux);
834       break;
835     case PROP_RESET_MUXER:
836       GST_OBJECT_LOCK (splitmux);
837       splitmux->reset_muxer = g_value_get_boolean (value);
838       GST_OBJECT_UNLOCK (splitmux);
839       break;
840     case PROP_ASYNC_FINALIZE:
841       GST_OBJECT_LOCK (splitmux);
842       splitmux->async_finalize = g_value_get_boolean (value);
843       GST_OBJECT_UNLOCK (splitmux);
844       break;
845     case PROP_MUXER_FACTORY:
846       GST_OBJECT_LOCK (splitmux);
847       if (splitmux->muxer_factory)
848         g_free (splitmux->muxer_factory);
849       splitmux->muxer_factory = g_value_dup_string (value);
850       GST_OBJECT_UNLOCK (splitmux);
851       break;
852     case PROP_MUXER_PRESET:
853       GST_OBJECT_LOCK (splitmux);
854       if (splitmux->muxer_preset)
855         g_free (splitmux->muxer_preset);
856       splitmux->muxer_preset = g_value_dup_string (value);
857       GST_OBJECT_UNLOCK (splitmux);
858       break;
859     case PROP_MUXER_PROPERTIES:
860       GST_OBJECT_LOCK (splitmux);
861       if (splitmux->muxer_properties)
862         gst_structure_free (splitmux->muxer_properties);
863       if (gst_value_get_structure (value))
864         splitmux->muxer_properties =
865             gst_structure_copy (gst_value_get_structure (value));
866       else
867         splitmux->muxer_properties = NULL;
868       GST_OBJECT_UNLOCK (splitmux);
869       break;
870     case PROP_SINK_FACTORY:
871       GST_OBJECT_LOCK (splitmux);
872       if (splitmux->sink_factory)
873         g_free (splitmux->sink_factory);
874       splitmux->sink_factory = g_value_dup_string (value);
875       GST_OBJECT_UNLOCK (splitmux);
876       break;
877     case PROP_SINK_PRESET:
878       GST_OBJECT_LOCK (splitmux);
879       if (splitmux->sink_preset)
880         g_free (splitmux->sink_preset);
881       splitmux->sink_preset = g_value_dup_string (value);
882       GST_OBJECT_UNLOCK (splitmux);
883       break;
884     case PROP_SINK_PROPERTIES:
885       GST_OBJECT_LOCK (splitmux);
886       if (splitmux->sink_properties)
887         gst_structure_free (splitmux->sink_properties);
888       if (gst_value_get_structure (value))
889         splitmux->sink_properties =
890             gst_structure_copy (gst_value_get_structure (value));
891       else
892         splitmux->sink_properties = NULL;
893       GST_OBJECT_UNLOCK (splitmux);
894       break;
895     case PROP_MUXERPAD_MAP:
896     {
897       const GstStructure *s = gst_value_get_structure (value);
898       GST_SPLITMUX_LOCK (splitmux);
899       if (splitmux->muxerpad_map) {
900         gst_structure_free (splitmux->muxerpad_map);
901       }
902       if (s)
903         splitmux->muxerpad_map = gst_structure_copy (s);
904       else
905         splitmux->muxerpad_map = NULL;
906       GST_SPLITMUX_UNLOCK (splitmux);
907       break;
908     }
909     default:
910       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
911       break;
912   }
913 }
914
915 static void
916 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
917     GValue * value, GParamSpec * pspec)
918 {
919   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
920
921   switch (prop_id) {
922     case PROP_LOCATION:
923       GST_OBJECT_LOCK (splitmux);
924       g_value_set_string (value, splitmux->location);
925       GST_OBJECT_UNLOCK (splitmux);
926       break;
927     case PROP_START_INDEX:
928       GST_OBJECT_LOCK (splitmux);
929       g_value_set_int (value, splitmux->start_index);
930       GST_OBJECT_UNLOCK (splitmux);
931       break;
932     case PROP_MAX_SIZE_BYTES:
933       GST_OBJECT_LOCK (splitmux);
934       g_value_set_uint64 (value, splitmux->threshold_bytes);
935       GST_OBJECT_UNLOCK (splitmux);
936       break;
937     case PROP_MAX_SIZE_TIME:
938       GST_OBJECT_LOCK (splitmux);
939       g_value_set_uint64 (value, splitmux->threshold_time);
940       GST_OBJECT_UNLOCK (splitmux);
941       break;
942     case PROP_MAX_SIZE_TIMECODE:
943       GST_OBJECT_LOCK (splitmux);
944       g_value_set_string (value, splitmux->threshold_timecode_str);
945       GST_OBJECT_UNLOCK (splitmux);
946       break;
947     case PROP_SEND_KEYFRAME_REQUESTS:
948       GST_OBJECT_LOCK (splitmux);
949       g_value_set_boolean (value, splitmux->send_keyframe_requests);
950       GST_OBJECT_UNLOCK (splitmux);
951       break;
952     case PROP_MAX_FILES:
953       GST_OBJECT_LOCK (splitmux);
954       g_value_set_uint (value, splitmux->max_files);
955       GST_OBJECT_UNLOCK (splitmux);
956       break;
957     case PROP_MUXER_OVERHEAD:
958       GST_OBJECT_LOCK (splitmux);
959       g_value_set_double (value, splitmux->mux_overhead);
960       GST_OBJECT_UNLOCK (splitmux);
961       break;
962     case PROP_USE_ROBUST_MUXING:
963       GST_OBJECT_LOCK (splitmux);
964       g_value_set_boolean (value, splitmux->use_robust_muxing);
965       GST_OBJECT_UNLOCK (splitmux);
966       break;
967     case PROP_ALIGNMENT_THRESHOLD:
968       GST_OBJECT_LOCK (splitmux);
969       g_value_set_uint64 (value, splitmux->alignment_threshold);
970       GST_OBJECT_UNLOCK (splitmux);
971       break;
972     case PROP_SINK:
973       GST_OBJECT_LOCK (splitmux);
974       g_value_set_object (value, splitmux->provided_sink);
975       GST_OBJECT_UNLOCK (splitmux);
976       break;
977     case PROP_MUXER:
978       GST_OBJECT_LOCK (splitmux);
979       g_value_set_object (value, splitmux->provided_muxer);
980       GST_OBJECT_UNLOCK (splitmux);
981       break;
982     case PROP_RESET_MUXER:
983       GST_OBJECT_LOCK (splitmux);
984       g_value_set_boolean (value, splitmux->reset_muxer);
985       GST_OBJECT_UNLOCK (splitmux);
986       break;
987     case PROP_ASYNC_FINALIZE:
988       GST_OBJECT_LOCK (splitmux);
989       g_value_set_boolean (value, splitmux->async_finalize);
990       GST_OBJECT_UNLOCK (splitmux);
991       break;
992     case PROP_MUXER_FACTORY:
993       GST_OBJECT_LOCK (splitmux);
994       g_value_set_string (value, splitmux->muxer_factory);
995       GST_OBJECT_UNLOCK (splitmux);
996       break;
997     case PROP_MUXER_PRESET:
998       GST_OBJECT_LOCK (splitmux);
999       g_value_set_string (value, splitmux->muxer_preset);
1000       GST_OBJECT_UNLOCK (splitmux);
1001       break;
1002     case PROP_MUXER_PROPERTIES:
1003       GST_OBJECT_LOCK (splitmux);
1004       gst_value_set_structure (value, splitmux->muxer_properties);
1005       GST_OBJECT_UNLOCK (splitmux);
1006       break;
1007     case PROP_SINK_FACTORY:
1008       GST_OBJECT_LOCK (splitmux);
1009       g_value_set_string (value, splitmux->sink_factory);
1010       GST_OBJECT_UNLOCK (splitmux);
1011       break;
1012     case PROP_SINK_PRESET:
1013       GST_OBJECT_LOCK (splitmux);
1014       g_value_set_string (value, splitmux->sink_preset);
1015       GST_OBJECT_UNLOCK (splitmux);
1016       break;
1017     case PROP_SINK_PROPERTIES:
1018       GST_OBJECT_LOCK (splitmux);
1019       gst_value_set_structure (value, splitmux->sink_properties);
1020       GST_OBJECT_UNLOCK (splitmux);
1021       break;
1022     case PROP_MUXERPAD_MAP:
1023       GST_SPLITMUX_LOCK (splitmux);
1024       gst_value_set_structure (value, splitmux->muxerpad_map);
1025       GST_SPLITMUX_UNLOCK (splitmux);
1026       break;
1027     default:
1028       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1029       break;
1030   }
1031 }
1032
1033 /* Convenience function */
1034 static inline GstClockTimeDiff
1035 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1036 {
1037   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1038
1039   if (GST_CLOCK_TIME_IS_VALID (val)) {
1040     gboolean sign =
1041         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1042     if (sign > 0)
1043       res = val;
1044     else if (sign < 0)
1045       res = -val;
1046   }
1047   return res;
1048 }
1049
1050 static void
1051 mq_stream_ctx_reset (MqStreamCtx * ctx)
1052 {
1053   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1054   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1055   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1056   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1057   g_queue_clear (&ctx->queued_bufs);
1058 }
1059
1060 static MqStreamCtx *
1061 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1062 {
1063   MqStreamCtx *ctx;
1064
1065   ctx = g_new0 (MqStreamCtx, 1);
1066   ctx->splitmux = splitmux;
1067   g_queue_init (&ctx->queued_bufs);
1068   mq_stream_ctx_reset (ctx);
1069
1070   return ctx;
1071 }
1072
1073 static void
1074 mq_stream_ctx_free (MqStreamCtx * ctx)
1075 {
1076   if (ctx->q) {
1077     GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1078
1079     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1080
1081     if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1082       gst_element_set_locked_state (ctx->q, TRUE);
1083       gst_element_set_state (ctx->q, GST_STATE_NULL);
1084       gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1085       gst_object_unref (parent);
1086     }
1087     gst_object_unref (ctx->q);
1088   }
1089   gst_object_unref (ctx->sinkpad);
1090   gst_object_unref (ctx->srcpad);
1091   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1092   g_queue_clear (&ctx->queued_bufs);
1093   g_free (ctx);
1094 }
1095
1096 static void
1097 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1098     GstElement * sink)
1099 {
1100   gchar *location = NULL;
1101   GstMessage *msg;
1102   const gchar *msg_name = opened ?
1103       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1104   GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1105
1106   if (!opened) {
1107     GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1108     if (rtime)
1109       running_time = *rtime;
1110   }
1111
1112   if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1113           "location") != NULL)
1114     g_object_get (sink, "location", &location, NULL);
1115
1116   GST_DEBUG_OBJECT (splitmux,
1117       "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1118       msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1119
1120   /* If it's in the middle of a teardown, the reference_ctc might have become
1121    * NULL */
1122   if (splitmux->reference_ctx) {
1123     msg = gst_message_new_element (GST_OBJECT (splitmux),
1124         gst_structure_new (msg_name,
1125             "location", G_TYPE_STRING, location,
1126             "running-time", GST_TYPE_CLOCK_TIME, running_time,
1127             "sink", GST_TYPE_ELEMENT, sink, NULL));
1128     gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1129   }
1130
1131   g_free (location);
1132 }
1133
1134 static void
1135 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1136 {
1137   GstEvent *eos;
1138   GstPad *pad;
1139   MqStreamCtx *ctx;
1140
1141   eos = gst_event_new_eos ();
1142   pad = helper->pad;
1143   ctx = helper->ctx;
1144
1145   GST_SPLITMUX_LOCK (splitmux);
1146   if (!pad)
1147     pad = gst_pad_get_peer (ctx->srcpad);
1148   GST_SPLITMUX_UNLOCK (splitmux);
1149
1150   gst_pad_send_event (pad, eos);
1151   GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1152
1153   gst_object_unref (pad);
1154   g_free (helper);
1155 }
1156
1157 /* Called with lock held, drops the lock to send EOS to the
1158  * pad
1159  */
1160 static void
1161 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1162 {
1163   GstEvent *eos;
1164   GstPad *pad;
1165
1166   eos = gst_event_new_eos ();
1167   pad = gst_pad_get_peer (ctx->srcpad);
1168
1169   ctx->out_eos = TRUE;
1170
1171   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1172   GST_SPLITMUX_UNLOCK (splitmux);
1173   gst_pad_send_event (pad, eos);
1174   GST_SPLITMUX_LOCK (splitmux);
1175
1176   gst_object_unref (pad);
1177 }
1178
1179 /* Called with lock held. Schedules an EOS event to the ctx pad
1180  * to happen in another thread */
1181 static void
1182 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1183 {
1184   AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1185   GstPad *srcpad, *sinkpad;
1186
1187   srcpad = ctx->srcpad;
1188   sinkpad = gst_pad_get_peer (srcpad);
1189
1190   helper->ctx = ctx;
1191   helper->pad = sinkpad;        /* Takes the reference */
1192
1193   ctx->out_eos_async_done = TRUE;
1194
1195   /* There used to be a bug here, where we had to explicitly remove
1196    * the SINK flag so that GstBin would ignore it for EOS purposes.
1197    * That fixed a race where if splitmuxsink really reaches EOS
1198    * before an asynchronous background element has finished, then
1199    * the bin wouldn't actually send EOS to the pipeline. Even after
1200    * finishing and removing the old element, the bin didn't re-check
1201    * EOS status on removing a SINK element. That bug was fixed
1202    * in core. */
1203   GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1204       sinkpad, ctx);
1205
1206   g_assert_nonnull (helper->pad);
1207   gst_element_call_async (GST_ELEMENT (splitmux),
1208       (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1209 }
1210
1211 /* Called with lock held. TRUE iff all contexts have a
1212  * pending (or delivered) async eos event */
1213 static gboolean
1214 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1215 {
1216   gboolean ret = TRUE;
1217   GList *item;
1218
1219   for (item = splitmux->contexts; item; item = item->next) {
1220     MqStreamCtx *ctx = item->data;
1221     ret &= ctx->out_eos_async_done;
1222   }
1223   return ret;
1224 }
1225
1226 /* Called with splitmux lock held to check if this output
1227  * context needs to sleep to wait for the release of the
1228  * next GOP, or to send EOS to close out the current file
1229  */
1230 static GstFlowReturn
1231 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1232 {
1233   if (ctx->caps_change)
1234     return GST_FLOW_OK;
1235
1236   do {
1237     /* When first starting up, the reference stream has to output
1238      * the first buffer to prepare the muxer and sink */
1239     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1240     GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1241
1242     if (!(splitmux->max_out_running_time == 0 ||
1243             splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1244             splitmux->alignment_threshold == 0 ||
1245             splitmux->max_out_running_time < splitmux->alignment_threshold)) {
1246       my_max_out_running_time -= splitmux->alignment_threshold;
1247       GST_LOG_OBJECT (ctx->srcpad,
1248           "Max out running time currently %" GST_STIME_FORMAT
1249           ", with threshold applied it is %" GST_STIME_FORMAT,
1250           GST_STIME_ARGS (splitmux->max_out_running_time),
1251           GST_STIME_ARGS (my_max_out_running_time));
1252     }
1253
1254     if (ctx->flushing
1255         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1256       return GST_FLOW_FLUSHING;
1257
1258     GST_LOG_OBJECT (ctx->srcpad,
1259         "Checking running time %" GST_STIME_FORMAT " against max %"
1260         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1261         GST_STIME_ARGS (my_max_out_running_time));
1262
1263     if (can_output) {
1264       if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1265           ctx->out_running_time < my_max_out_running_time) {
1266         return GST_FLOW_OK;
1267       }
1268
1269       switch (splitmux->output_state) {
1270         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1271           /* We only get here if we've finished outputting a GOP and need to know
1272            * what to do next */
1273           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1274           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1275           continue;
1276
1277         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1278         case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1279           /* We've reached the max out running_time to get here, so end this file now */
1280           if (ctx->out_eos == FALSE) {
1281             if (splitmux->async_finalize) {
1282               /* We must set EOS asynchronously at this point. We cannot defer
1283                * it, because we need all contexts to wake up, for the
1284                * reference context to eventually give us something at
1285                * START_NEXT_FILE. Otherwise, collectpads might choose another
1286                * context to give us the first buffer, and format-location-full
1287                * will not contain a valid sample. */
1288               g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1289                   GINT_TO_POINTER (1));
1290               eos_context_async (ctx, splitmux);
1291               if (all_contexts_are_async_eos (splitmux)) {
1292                 GST_INFO_OBJECT (splitmux,
1293                     "All contexts are async_eos. Moving to the next file.");
1294                 /* We can start the next file once we've asked each pad to go EOS */
1295                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1296                 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1297                 continue;
1298               }
1299             } else {
1300               send_eos (splitmux, ctx);
1301               continue;
1302             }
1303           } else {
1304             GST_INFO_OBJECT (splitmux,
1305                 "At end-of-file state, but context %p is already EOS", ctx);
1306           }
1307           break;
1308         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1309           if (ctx->is_reference) {
1310             GstFlowReturn ret = GST_FLOW_OK;
1311
1312             /* Special handling on the reference ctx to start new fragments
1313              * and collect commands from the command queue */
1314             /* drops the splitmux lock briefly: */
1315             /* We must have reference ctx in order for format-location-full to
1316              * have a sample */
1317             ret = start_next_fragment (splitmux, ctx);
1318             if (ret != GST_FLOW_OK)
1319               return ret;
1320
1321             continue;
1322           }
1323           break;
1324         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1325           do {
1326             SplitMuxOutputCommand *cmd =
1327                 g_queue_pop_tail (&splitmux->out_cmd_q);
1328             if (cmd != NULL) {
1329               /* If we pop the last command, we need to make our queues bigger */
1330               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1331                 grow_blocked_queues (splitmux);
1332
1333               if (cmd->start_new_fragment) {
1334                 if (splitmux->muxed_out_bytes > 0) {
1335                   GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1336                   splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1337                 } else {
1338                   GST_DEBUG_OBJECT (splitmux,
1339                       "Got cmd to start new fragment, but fragment is empty - ignoring.");
1340                 }
1341               } else {
1342                 GST_DEBUG_OBJECT (splitmux,
1343                     "Got new output cmd for time %" GST_STIME_FORMAT,
1344                     GST_STIME_ARGS (cmd->max_output_ts));
1345
1346                 /* Extend the output range immediately */
1347                 splitmux->max_out_running_time = cmd->max_output_ts;
1348                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1349               }
1350               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1351
1352               out_cmd_buf_free (cmd);
1353               break;
1354             } else {
1355               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1356             }
1357           } while (!ctx->flushing && splitmux->output_state ==
1358               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1359           /* loop and re-check the state */
1360           continue;
1361         }
1362         case SPLITMUX_OUTPUT_STATE_STOPPED:
1363           return GST_FLOW_FLUSHING;
1364       }
1365     } else {
1366       GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1367     }
1368
1369     GST_INFO_OBJECT (ctx->srcpad,
1370         "Sleeping for running time %"
1371         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1372         GST_STIME_ARGS (ctx->out_running_time),
1373         GST_STIME_ARGS (splitmux->max_out_running_time));
1374     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1375     GST_INFO_OBJECT (ctx->srcpad,
1376         "Woken for new max running time %" GST_STIME_FORMAT,
1377         GST_STIME_ARGS (splitmux->max_out_running_time));
1378   }
1379   while (1);
1380
1381   return GST_FLOW_OK;
1382 }
1383
1384 static GstClockTime
1385 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1386     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1387     GstVideoTimeCode ** next_tc)
1388 {
1389   GstVideoTimeCode *target_tc;
1390   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1391
1392   if (cur_tc == NULL || splitmux->tc_interval == NULL)
1393     return GST_CLOCK_TIME_NONE;
1394
1395   target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1396   if (!target_tc) {
1397     GST_ELEMENT_ERROR (splitmux,
1398         STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1399     return GST_CLOCK_TIME_NONE;
1400   }
1401
1402   /* Convert to ns */
1403   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1404   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1405
1406   /* Add running_time, accounting for wraparound. */
1407   if (target_tc_time >= cur_tc_time) {
1408     next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1409   } else {
1410     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1411
1412     if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1413         (cur_tc->config.fps_d == 1001)) {
1414       /* Checking fps_d is probably unneeded, but better safe than sorry
1415        * (e.g. someone accidentally set a flag) */
1416       GstVideoTimeCode *tc_for_offset;
1417
1418       /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1419        * but slightly less. Calculate that duration from a fake timecode. The
1420        * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1421        * is to add one frame to 23:59:59;29 */
1422       tc_for_offset =
1423           gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1424           NULL, cur_tc->config.flags, 23, 59, 59,
1425           cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1426       day_in_ns =
1427           gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1428           gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1429           cur_tc->config.fps_n);
1430       gst_video_time_code_free (tc_for_offset);
1431     }
1432     next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1433   }
1434
1435   GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
1436       " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
1437       GST_TIME_ARGS (cur_tc_time));
1438   if (next_tc)
1439     *next_tc = target_tc;
1440   else
1441     gst_video_time_code_free (target_tc);
1442
1443   return next_max_tc_time;
1444 }
1445
1446 static gboolean
1447 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1448     GstClockTime running_time)
1449 {
1450   GstEvent *ev;
1451   GstClockTime target_time;
1452   gboolean timecode_based = FALSE;
1453   GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1454   GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1455   GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1456   GstClockTime tc_rounding_error = 5 * GST_USECOND;
1457
1458   if (!splitmux->send_keyframe_requests)
1459     return TRUE;
1460
1461   if (splitmux->tc_interval) {
1462     if (splitmux->in_tc && gst_video_time_code_is_valid (splitmux->in_tc)) {
1463       GstVideoTimeCode *next_tc = NULL;
1464       max_tc_time =
1465           calculate_next_max_timecode (splitmux, splitmux->in_tc,
1466           running_time, &next_tc);
1467
1468       /* calculate the next expected keyframe time to prevent too early fku
1469        * event */
1470       if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1471         next_max_tc_time =
1472             calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1473       }
1474       if (next_tc)
1475         gst_video_time_code_free (next_tc);
1476
1477       timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1478           GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1479     } else {
1480       /* This can happen in the presence of GAP events that trigger
1481        * a new fragment start */
1482       GST_WARNING_OBJECT (splitmux,
1483           "No buffer available to calculate next timecode");
1484     }
1485   }
1486
1487   if ((splitmux->threshold_time == 0 && !timecode_based)
1488       || splitmux->threshold_bytes != 0)
1489     return TRUE;
1490
1491   if (timecode_based) {
1492     /* We might have rounding errors: aim slightly earlier */
1493     if (max_tc_time >= tc_rounding_error) {
1494       target_time = max_tc_time - tc_rounding_error;
1495     } else {
1496       /* unreliable target time */
1497       GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1498           " is smaller than allowed rounding error, set it to zero",
1499           GST_TIME_ARGS (max_tc_time));
1500       target_time = 0;
1501     }
1502
1503     if (next_max_tc_time >= tc_rounding_error) {
1504       next_fku_time = next_max_tc_time - tc_rounding_error;
1505     } else {
1506       /* unreliable target time */
1507       GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1508           " is smaller than allowed rounding error, set it to zero",
1509           GST_TIME_ARGS (next_max_tc_time));
1510       next_fku_time = 0;
1511     }
1512   } else {
1513     target_time = running_time + splitmux->threshold_time;
1514   }
1515
1516   if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1517     GstClockTime allowed_time = splitmux->next_fku_time;
1518
1519     if (timecode_based) {
1520       if (allowed_time >= tc_rounding_error) {
1521         allowed_time -= tc_rounding_error;
1522       } else {
1523         /* unreliable next force key unit time */
1524         GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1525             GST_TIME_FORMAT
1526             " is smaller than allowed rounding error, set it to zero",
1527             GST_TIME_ARGS (splitmux->next_fku_time));
1528         allowed_time = 0;
1529       }
1530     }
1531
1532     if (target_time < allowed_time) {
1533       GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1534           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1535           ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1536           GST_TIME_ARGS (target_time),
1537           GST_TIME_ARGS (splitmux->next_fku_time),
1538           GST_TIME_ARGS (allowed_time));
1539
1540       return TRUE;
1541     } else if (allowed_time != splitmux->next_fku_time &&
1542         target_time < splitmux->next_fku_time) {
1543       GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1544           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1545           ", but the difference is smaller than allowed rounding error",
1546           GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1547     }
1548   }
1549
1550   if (!timecode_based) {
1551     next_fku_time = target_time + splitmux->threshold_time;
1552   }
1553
1554   splitmux->next_fku_time = next_fku_time;
1555
1556   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1557   GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1558       ", the next expected keyframe is %" GST_TIME_FORMAT,
1559       GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1560   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1561 }
1562
1563 static GstPadProbeReturn
1564 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1565 {
1566   GstSplitMuxSink *splitmux = ctx->splitmux;
1567   MqStreamBuf *buf_info = NULL;
1568   GstFlowReturn ret = GST_FLOW_OK;
1569
1570   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1571
1572   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1573   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1574     g_warning ("Buffer list handling not implemented");
1575     return GST_PAD_PROBE_DROP;
1576   }
1577   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1578       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1579     GstEvent *event = gst_pad_probe_info_get_event (info);
1580     gboolean locked = FALSE, wait = !ctx->is_reference;
1581
1582     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1583
1584     switch (GST_EVENT_TYPE (event)) {
1585       case GST_EVENT_SEGMENT:
1586         gst_event_copy_segment (event, &ctx->out_segment);
1587         break;
1588       case GST_EVENT_FLUSH_STOP:
1589         GST_SPLITMUX_LOCK (splitmux);
1590         locked = TRUE;
1591         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1592         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1593         g_queue_clear (&ctx->queued_bufs);
1594         g_queue_clear (&ctx->queued_bufs);
1595         /* If this is the reference context, we just threw away any queued keyframes */
1596         if (ctx->is_reference)
1597           splitmux->queued_keyframes = 0;
1598         ctx->flushing = FALSE;
1599         wait = FALSE;
1600         break;
1601       case GST_EVENT_FLUSH_START:
1602         GST_SPLITMUX_LOCK (splitmux);
1603         locked = TRUE;
1604         GST_LOG_OBJECT (pad, "Flush start");
1605         ctx->flushing = TRUE;
1606         GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1607         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1608         break;
1609       case GST_EVENT_EOS:
1610         GST_SPLITMUX_LOCK (splitmux);
1611         locked = TRUE;
1612         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1613           goto beach;
1614         ctx->out_eos = TRUE;
1615
1616         if (ctx == splitmux->reference_ctx) {
1617           splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1618           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1619         }
1620
1621         GST_INFO_OBJECT (splitmux,
1622             "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1623         break;
1624       case GST_EVENT_GAP:{
1625         GstClockTime gap_ts;
1626         GstClockTimeDiff rtime;
1627
1628         gst_event_parse_gap (event, &gap_ts, NULL);
1629         if (gap_ts == GST_CLOCK_TIME_NONE)
1630           break;
1631
1632         GST_SPLITMUX_LOCK (splitmux);
1633         locked = TRUE;
1634
1635         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1636           goto beach;
1637
1638         /* When we get a gap event on the
1639          * reference stream and we're trying to open a
1640          * new file, we need to store it until we get
1641          * the buffer afterwards
1642          */
1643         if (ctx->is_reference &&
1644             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1645           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1646           gst_event_replace (&ctx->pending_gap, event);
1647           GST_SPLITMUX_UNLOCK (splitmux);
1648           return GST_PAD_PROBE_HANDLED;
1649         }
1650
1651         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1652
1653         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1654             GST_STIME_ARGS (rtime));
1655
1656         if (rtime != GST_CLOCK_STIME_NONE) {
1657           ctx->out_running_time = rtime;
1658           complete_or_wait_on_out (splitmux, ctx);
1659         }
1660         break;
1661       }
1662       case GST_EVENT_CUSTOM_DOWNSTREAM:{
1663         const GstStructure *s;
1664         GstClockTimeDiff ts = 0;
1665
1666         s = gst_event_get_structure (event);
1667         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1668           break;
1669
1670         gst_structure_get_int64 (s, "timestamp", &ts);
1671
1672         GST_SPLITMUX_LOCK (splitmux);
1673         locked = TRUE;
1674
1675         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1676           goto beach;
1677         ctx->out_running_time = ts;
1678         if (!ctx->is_reference)
1679           ret = complete_or_wait_on_out (splitmux, ctx);
1680         GST_SPLITMUX_UNLOCK (splitmux);
1681         GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1682         return GST_PAD_PROBE_DROP;
1683       }
1684       case GST_EVENT_CAPS:{
1685         GstPad *peer;
1686
1687         if (!ctx->is_reference)
1688           break;
1689
1690         peer = gst_pad_get_peer (pad);
1691         if (peer) {
1692           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1693
1694           gst_object_unref (peer);
1695
1696           if (ok)
1697             break;
1698
1699         } else {
1700           break;
1701         }
1702         /* This is in the case the muxer doesn't allow this change of caps */
1703         GST_SPLITMUX_LOCK (splitmux);
1704         locked = TRUE;
1705         ctx->caps_change = TRUE;
1706
1707         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1708           GST_DEBUG_OBJECT (splitmux,
1709               "New caps were not accepted. Switching output file");
1710           if (ctx->out_eos == FALSE) {
1711             splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1712             GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1713           }
1714         }
1715
1716         /* Lets it fall through, if it fails again, then the muxer just can't
1717          * support this format, but at least we have a closed file.
1718          */
1719         break;
1720       }
1721       default:
1722         break;
1723     }
1724
1725     /* We need to make sure events aren't passed
1726      * until the muxer / sink are ready for it */
1727     if (!locked)
1728       GST_SPLITMUX_LOCK (splitmux);
1729     if (wait)
1730       ret = complete_or_wait_on_out (splitmux, ctx);
1731     GST_SPLITMUX_UNLOCK (splitmux);
1732
1733     /* Don't try to forward sticky events before the next buffer is there
1734      * because it would cause a new file to be created without the first
1735      * buffer being available.
1736      */
1737     GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1738     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1739       gst_event_unref (event);
1740       return GST_PAD_PROBE_HANDLED;
1741     } else {
1742       return GST_PAD_PROBE_PASS;
1743     }
1744   }
1745
1746   /* Allow everything through until the configured next stopping point */
1747   GST_SPLITMUX_LOCK (splitmux);
1748
1749   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1750   if (buf_info == NULL) {
1751     /* Can only happen due to a poorly timed flush */
1752     ret = GST_FLOW_FLUSHING;
1753     goto beach;
1754   }
1755
1756   /* If we have popped a keyframe, decrement the queued_gop count */
1757   if (buf_info->keyframe && splitmux->queued_keyframes > 0)
1758     splitmux->queued_keyframes--;
1759
1760   ctx->out_running_time = buf_info->run_ts;
1761   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1762
1763   GST_LOG_OBJECT (splitmux,
1764       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1765       " size %" G_GUINT64_FORMAT,
1766       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1767
1768   ctx->caps_change = FALSE;
1769
1770   ret = complete_or_wait_on_out (splitmux, ctx);
1771
1772   splitmux->muxed_out_bytes += buf_info->buf_size;
1773
1774 #ifndef GST_DISABLE_GST_DEBUG
1775   {
1776     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1777     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1778         " run ts %" GST_STIME_FORMAT, buf,
1779         GST_STIME_ARGS (ctx->out_running_time));
1780   }
1781 #endif
1782
1783   ctx->cur_out_buffer = NULL;
1784   GST_SPLITMUX_UNLOCK (splitmux);
1785
1786   /* pending_gap is protected by the STREAM lock */
1787   if (ctx->pending_gap) {
1788     /* If we previously stored a gap event, send it now */
1789     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1790
1791     GST_DEBUG_OBJECT (splitmux,
1792         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1793
1794     gst_pad_send_event (peer, ctx->pending_gap);
1795     ctx->pending_gap = NULL;
1796
1797     gst_object_unref (peer);
1798   }
1799
1800   mq_stream_buf_free (buf_info);
1801
1802   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1803   return GST_PAD_PROBE_PASS;
1804
1805 beach:
1806   GST_SPLITMUX_UNLOCK (splitmux);
1807   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1808   return GST_PAD_PROBE_DROP;
1809 }
1810
1811 static gboolean
1812 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1813 {
1814   return gst_pad_send_event (peer, gst_event_ref (*event));
1815 }
1816
1817 static void
1818 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1819 {
1820   if (ctx->fragment_block_id > 0) {
1821     gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1822     ctx->fragment_block_id = 0;
1823   }
1824 }
1825
1826 static void
1827 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1828 {
1829   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1830
1831   gst_pad_sticky_events_foreach (ctx->srcpad,
1832       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1833
1834   /* Clear EOS flag if not actually EOS */
1835   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1836   ctx->out_eos_async_done = ctx->out_eos;
1837
1838   gst_object_unref (peer);
1839 }
1840
1841 static void
1842 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1843 {
1844   GstPad *sinkpad, *srcpad, *newpad;
1845   GstPadTemplate *templ;
1846
1847   srcpad = ctx->srcpad;
1848   sinkpad = gst_pad_get_peer (srcpad);
1849
1850   templ = sinkpad->padtemplate;
1851   newpad =
1852       gst_element_request_pad (splitmux->muxer, templ,
1853       GST_PAD_NAME (sinkpad), NULL);
1854
1855   GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1856       newpad);
1857   if (!gst_pad_unlink (srcpad, sinkpad)) {
1858     gst_object_unref (sinkpad);
1859     goto fail;
1860   }
1861   if (gst_pad_link_full (srcpad, newpad,
1862           GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1863     gst_element_release_request_pad (splitmux->muxer, newpad);
1864     gst_object_unref (sinkpad);
1865     gst_object_unref (newpad);
1866     goto fail;
1867   }
1868   gst_object_unref (newpad);
1869   gst_object_unref (sinkpad);
1870   return;
1871
1872 fail:
1873   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1874       ("Could not create the new muxer/sink"), NULL);
1875 }
1876
1877 static GstPadProbeReturn
1878 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1879 {
1880   return GST_PAD_PROBE_OK;
1881 }
1882
1883 static void
1884 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1885 {
1886   ctx->fragment_block_id =
1887       gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1888       NULL, NULL);
1889 }
1890
1891 static gboolean
1892 _set_property_from_structure (GQuark field_id, const GValue * value,
1893     gpointer user_data)
1894 {
1895   const gchar *property_name = g_quark_to_string (field_id);
1896   GObject *element = G_OBJECT (user_data);
1897
1898   g_object_set_property (element, property_name, value);
1899
1900   return TRUE;
1901 }
1902
1903 static void
1904 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1905 {
1906   gst_element_set_locked_state (element, TRUE);
1907   gst_element_set_state (element, GST_STATE_NULL);
1908   GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1909   gst_bin_remove (GST_BIN (splitmux), element);
1910 }
1911
1912
1913 static void
1914 _send_event (const GValue * value, gpointer user_data)
1915 {
1916   GstPad *pad = g_value_get_object (value);
1917   GstEvent *ev = user_data;
1918
1919   gst_pad_send_event (pad, gst_event_ref (ev));
1920 }
1921
1922 /* Called with lock held when a fragment
1923  * reaches EOS and it is time to restart
1924  * a new fragment
1925  */
1926 static GstFlowReturn
1927 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1928 {
1929   GstElement *muxer, *sink;
1930
1931   g_assert (ctx->is_reference);
1932
1933   /* 1 change to new file */
1934   splitmux->switching_fragment = TRUE;
1935
1936   /* We need to drop the splitmux lock to acquire the state lock
1937    * here and ensure there's no racy state change going on elsewhere */
1938   muxer = gst_object_ref (splitmux->muxer);
1939   sink = gst_object_ref (splitmux->active_sink);
1940
1941   GST_SPLITMUX_UNLOCK (splitmux);
1942   GST_SPLITMUX_STATE_LOCK (splitmux);
1943
1944   if (splitmux->shutdown) {
1945     GST_DEBUG_OBJECT (splitmux,
1946         "Shutdown requested. Aborting fragment switch.");
1947     GST_SPLITMUX_LOCK (splitmux);
1948     GST_SPLITMUX_STATE_UNLOCK (splitmux);
1949     gst_object_unref (muxer);
1950     gst_object_unref (sink);
1951     return GST_FLOW_FLUSHING;
1952   }
1953
1954   if (splitmux->async_finalize) {
1955     if (splitmux->muxed_out_bytes > 0
1956         || splitmux->fragment_id != splitmux->start_index) {
1957       gchar *newname;
1958       GstElement *new_sink, *new_muxer;
1959
1960       GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
1961           splitmux->fragment_id);
1962       g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
1963       newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
1964       GST_SPLITMUX_LOCK (splitmux);
1965       if ((splitmux->sink =
1966               create_element (splitmux, splitmux->sink_factory, newname,
1967                   TRUE)) == NULL)
1968         goto fail;
1969       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
1970         gst_preset_load_preset (GST_PRESET (splitmux->sink),
1971             splitmux->sink_preset);
1972       if (splitmux->sink_properties)
1973         gst_structure_foreach (splitmux->sink_properties,
1974             _set_property_from_structure, splitmux->sink);
1975       splitmux->active_sink = splitmux->sink;
1976       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
1977       g_free (newname);
1978       newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
1979       if ((splitmux->muxer =
1980               create_element (splitmux, splitmux->muxer_factory, newname,
1981                   TRUE)) == NULL)
1982         goto fail;
1983       if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
1984               "async") != NULL) {
1985         /* async child elements are causing state change races and weird
1986          * failures, so let's try and turn that off */
1987         g_object_set (splitmux->sink, "async", FALSE, NULL);
1988       }
1989       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
1990         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
1991             splitmux->muxer_preset);
1992       if (splitmux->muxer_properties)
1993         gst_structure_foreach (splitmux->muxer_properties,
1994             _set_property_from_structure, splitmux->muxer);
1995       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
1996       g_free (newname);
1997       new_sink = splitmux->sink;
1998       new_muxer = splitmux->muxer;
1999       GST_SPLITMUX_UNLOCK (splitmux);
2000       g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2001       gst_element_link (new_muxer, new_sink);
2002
2003       if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2004         if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2005                     EOS_FROM_US)) == 2) {
2006           _lock_and_set_to_null (muxer, splitmux);
2007           _lock_and_set_to_null (sink, splitmux);
2008         } else {
2009           g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2010               GINT_TO_POINTER (2));
2011         }
2012       }
2013       gst_object_unref (muxer);
2014       gst_object_unref (sink);
2015       muxer = new_muxer;
2016       sink = new_sink;
2017       gst_object_ref (muxer);
2018       gst_object_ref (sink);
2019     }
2020   } else {
2021
2022     gst_element_set_locked_state (muxer, TRUE);
2023     gst_element_set_locked_state (sink, TRUE);
2024     gst_element_set_state (sink, GST_STATE_NULL);
2025
2026     if (splitmux->reset_muxer) {
2027       gst_element_set_state (muxer, GST_STATE_NULL);
2028     } else {
2029       GstIterator *it = gst_element_iterate_sink_pads (muxer);
2030       GstEvent *ev;
2031       guint32 seqnum;
2032
2033       ev = gst_event_new_flush_start ();
2034       seqnum = gst_event_get_seqnum (ev);
2035       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2036       gst_event_unref (ev);
2037
2038       gst_iterator_resync (it);
2039
2040       ev = gst_event_new_flush_stop (TRUE);
2041       gst_event_set_seqnum (ev, seqnum);
2042       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2043       gst_event_unref (ev);
2044
2045       gst_iterator_free (it);
2046     }
2047   }
2048
2049   GST_SPLITMUX_LOCK (splitmux);
2050   set_next_filename (splitmux, ctx);
2051   splitmux->muxed_out_bytes = 0;
2052   GST_SPLITMUX_UNLOCK (splitmux);
2053
2054   if (gst_element_set_state (sink,
2055           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2056     gst_element_set_state (sink, GST_STATE_NULL);
2057     gst_element_set_locked_state (muxer, FALSE);
2058     gst_element_set_locked_state (sink, FALSE);
2059
2060     goto fail_output;
2061   }
2062
2063   if (gst_element_set_state (muxer,
2064           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2065     gst_element_set_state (muxer, GST_STATE_NULL);
2066     gst_element_set_state (sink, GST_STATE_NULL);
2067     gst_element_set_locked_state (muxer, FALSE);
2068     gst_element_set_locked_state (sink, FALSE);
2069     goto fail_muxer;
2070   }
2071
2072   gst_element_set_locked_state (muxer, FALSE);
2073   gst_element_set_locked_state (sink, FALSE);
2074
2075   gst_object_unref (sink);
2076   gst_object_unref (muxer);
2077
2078   GST_SPLITMUX_LOCK (splitmux);
2079   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2080   splitmux->switching_fragment = FALSE;
2081   do_async_done (splitmux);
2082
2083   splitmux->ready_for_output = TRUE;
2084
2085   g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2086   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2087
2088   send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2089
2090   /* FIXME: Is this always the correct next state? */
2091   GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2092   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2093   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2094   return GST_FLOW_OK;
2095
2096 fail:
2097   gst_object_unref (sink);
2098   gst_object_unref (muxer);
2099
2100   GST_SPLITMUX_LOCK (splitmux);
2101   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2102   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2103       ("Could not create the new muxer/sink"), NULL);
2104   return GST_FLOW_ERROR;
2105
2106 fail_output:
2107   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2108       ("Could not start new output sink"), NULL);
2109   gst_object_unref (sink);
2110   gst_object_unref (muxer);
2111
2112   GST_SPLITMUX_LOCK (splitmux);
2113   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2114   splitmux->switching_fragment = FALSE;
2115   return GST_FLOW_ERROR;
2116
2117 fail_muxer:
2118   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2119       ("Could not start new muxer"), NULL);
2120   gst_object_unref (sink);
2121   gst_object_unref (muxer);
2122
2123   GST_SPLITMUX_LOCK (splitmux);
2124   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2125   splitmux->switching_fragment = FALSE;
2126   return GST_FLOW_ERROR;
2127 }
2128
2129 static void
2130 bus_handler (GstBin * bin, GstMessage * message)
2131 {
2132   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2133
2134   switch (GST_MESSAGE_TYPE (message)) {
2135     case GST_MESSAGE_EOS:{
2136       /* If the state is draining out the current file, drop this EOS */
2137       GstElement *sink;
2138
2139       sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2140       GST_SPLITMUX_LOCK (splitmux);
2141
2142       send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2143
2144       if (splitmux->async_finalize) {
2145
2146         if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2147           if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2148                       EOS_FROM_US)) == 2) {
2149             GstElement *muxer;
2150             GstPad *sinksink, *muxersrc;
2151
2152             sinksink = gst_element_get_static_pad (sink, "sink");
2153             muxersrc = gst_pad_get_peer (sinksink);
2154             muxer = gst_pad_get_parent_element (muxersrc);
2155             gst_object_unref (sinksink);
2156             gst_object_unref (muxersrc);
2157
2158             gst_element_call_async (muxer,
2159                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2160                 gst_object_ref (splitmux), gst_object_unref);
2161             gst_element_call_async (sink,
2162                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2163                 gst_object_ref (splitmux), gst_object_unref);
2164             gst_object_unref (muxer);
2165           } else {
2166             g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2167                 GINT_TO_POINTER (2));
2168           }
2169           GST_DEBUG_OBJECT (splitmux,
2170               "Caught async EOS from previous muxer+sink. Dropping.");
2171           /* We forward the EOS so that it gets aggregated as normal. If the sink
2172            * finishes and is removed before the end, it will be de-aggregated */
2173           gst_message_unref (message);
2174           GST_SPLITMUX_UNLOCK (splitmux);
2175           return;
2176         }
2177       } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2178         GST_DEBUG_OBJECT (splitmux,
2179             "Passing EOS message. Output state %d max_out_running_time %"
2180             GST_STIME_FORMAT, splitmux->output_state,
2181             GST_STIME_ARGS (splitmux->max_out_running_time));
2182       } else {
2183         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2184         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2185         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2186
2187         gst_message_unref (message);
2188         GST_SPLITMUX_UNLOCK (splitmux);
2189         return;
2190       }
2191       GST_SPLITMUX_UNLOCK (splitmux);
2192       break;
2193     }
2194     case GST_MESSAGE_ASYNC_START:
2195     case GST_MESSAGE_ASYNC_DONE:
2196       /* Ignore state changes from our children while switching */
2197       GST_SPLITMUX_LOCK (splitmux);
2198       if (splitmux->switching_fragment) {
2199         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2200             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2201           GST_LOG_OBJECT (splitmux,
2202               "Ignoring state change from child %" GST_PTR_FORMAT
2203               " while switching", GST_MESSAGE_SRC (message));
2204           gst_message_unref (message);
2205           GST_SPLITMUX_UNLOCK (splitmux);
2206           return;
2207         }
2208       }
2209       GST_SPLITMUX_UNLOCK (splitmux);
2210       break;
2211     case GST_MESSAGE_WARNING:
2212     {
2213       GError *gerror = NULL;
2214
2215       gst_message_parse_warning (message, &gerror, NULL);
2216
2217       if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2218         GList *item;
2219         gboolean caps_change = FALSE;
2220
2221         GST_SPLITMUX_LOCK (splitmux);
2222
2223         for (item = splitmux->contexts; item; item = item->next) {
2224           MqStreamCtx *ctx = item->data;
2225
2226           if (ctx->caps_change) {
2227             caps_change = TRUE;
2228             break;
2229           }
2230         }
2231
2232         GST_SPLITMUX_UNLOCK (splitmux);
2233
2234         if (caps_change) {
2235           GST_LOG_OBJECT (splitmux,
2236               "Ignoring warning change from child %" GST_PTR_FORMAT
2237               " while switching caps", GST_MESSAGE_SRC (message));
2238           gst_message_unref (message);
2239           return;
2240         }
2241       }
2242       break;
2243     }
2244     default:
2245       break;
2246   }
2247
2248   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2249 }
2250
2251 static void
2252 ctx_set_unblock (MqStreamCtx * ctx)
2253 {
2254   ctx->need_unblock = TRUE;
2255 }
2256
2257 static gboolean
2258 need_new_fragment (GstSplitMuxSink * splitmux,
2259     GstClockTime queued_time, GstClockTime queued_gop_time,
2260     guint64 queued_bytes)
2261 {
2262   guint64 thresh_bytes;
2263   GstClockTime thresh_time;
2264   gboolean check_robust_muxing;
2265   GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2266   GstClockTime *ptr_to_time;
2267
2268   GST_OBJECT_LOCK (splitmux);
2269   thresh_bytes = splitmux->threshold_bytes;
2270   thresh_time = splitmux->threshold_time;
2271   ptr_to_time = (GstClockTime *)
2272       gst_queue_array_peek_head_struct (splitmux->times_to_split);
2273   if (ptr_to_time)
2274     time_to_split = *ptr_to_time;
2275   check_robust_muxing = splitmux->use_robust_muxing
2276       && splitmux->muxer_has_reserved_props;
2277   GST_OBJECT_UNLOCK (splitmux);
2278
2279   /* Have we muxed at least one thing from the reference
2280    * stream into the file? If not, no other streams can have
2281    * either */
2282   if (splitmux->fragment_reference_bytes <= 0) {
2283     GST_TRACE_OBJECT (splitmux,
2284         "Not ready to split - nothing muxed on the reference stream");
2285     return FALSE;
2286   }
2287
2288   /* User told us to split now */
2289   if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2290     GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2291     return TRUE;
2292   }
2293
2294   /* User told us to split at this running time */
2295   if (splitmux->gop_start_time >= time_to_split) {
2296     GST_OBJECT_LOCK (splitmux);
2297     /* Dequeue running time */
2298     gst_queue_array_pop_head_struct (splitmux->times_to_split);
2299     /* Empty any running times after this that are past now */
2300     ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2301     while (ptr_to_time) {
2302       time_to_split = *ptr_to_time;
2303       if (splitmux->gop_start_time < time_to_split) {
2304         break;
2305       }
2306       gst_queue_array_pop_head_struct (splitmux->times_to_split);
2307       ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2308     }
2309     GST_TRACE_OBJECT (splitmux,
2310         "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2311         GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->gop_start_time),
2312         GST_STIME_ARGS (time_to_split));
2313     GST_OBJECT_UNLOCK (splitmux);
2314     return TRUE;
2315   }
2316
2317   if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2318     GST_TRACE_OBJECT (splitmux,
2319         "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2320     return TRUE;                /* Would overrun byte limit */
2321   }
2322
2323   if (thresh_time > 0 && queued_time > thresh_time) {
2324     GST_TRACE_OBJECT (splitmux,
2325         "queued time %" GST_STIME_FORMAT " overruns time limit",
2326         GST_STIME_ARGS (queued_time));
2327     return TRUE;                /* Would overrun time limit */
2328   }
2329
2330   if (splitmux->tc_interval &&
2331       GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2332       splitmux->reference_ctx->in_running_time >
2333       splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2334     GST_TRACE_OBJECT (splitmux,
2335         "in running time %" GST_STIME_FORMAT " overruns time limit %"
2336         GST_TIME_FORMAT,
2337         GST_STIME_ARGS (splitmux->reference_ctx->in_running_time),
2338         GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2339     return TRUE;
2340   }
2341
2342   if (check_robust_muxing) {
2343     GstClockTime mux_reserved_remain;
2344
2345     g_object_get (splitmux->muxer,
2346         "reserved-duration-remaining", &mux_reserved_remain, NULL);
2347
2348     GST_LOG_OBJECT (splitmux,
2349         "Muxer robust muxing report - %" G_GUINT64_FORMAT
2350         " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2351         mux_reserved_remain, queued_gop_time);
2352
2353     if (queued_gop_time >= mux_reserved_remain) {
2354       GST_INFO_OBJECT (splitmux,
2355           "File is about to run out of header room - %" G_GUINT64_FORMAT
2356           " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2357           ". Switching to new file", mux_reserved_remain, queued_gop_time);
2358       return TRUE;
2359     }
2360   }
2361
2362   /* Continue and mux this GOP */
2363   return FALSE;
2364 }
2365
2366 /* probably we want to add this API? */
2367 static void
2368 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2369 {
2370   GstVideoTimeCode *timecode = NULL;
2371
2372   g_return_if_fail (old_tc != NULL);
2373
2374   if (*old_tc == new_tc)
2375     return;
2376
2377   if (new_tc)
2378     timecode = gst_video_time_code_copy (new_tc);
2379
2380   if (*old_tc)
2381     gst_video_time_code_free (*old_tc);
2382
2383   *old_tc = timecode;
2384 }
2385
2386 /* Called with splitmux lock held */
2387 /* Called when entering ProcessingCompleteGop state
2388  * Assess if mq contents overflowed the current file
2389  *   -> If yes, need to switch to new file
2390  *   -> if no, set max_out_running_time to let this GOP in and
2391  *      go to COLLECTING_GOP_START state
2392  */
2393 static void
2394 handle_gathered_gop (GstSplitMuxSink * splitmux)
2395 {
2396   guint64 queued_bytes;
2397   GstClockTimeDiff queued_time = 0;
2398   GstClockTimeDiff queued_gop_time = 0;
2399   GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
2400   SplitMuxOutputCommand *cmd;
2401
2402   /* Assess if the multiqueue contents overflowed the current file */
2403   /* When considering if a newly gathered GOP overflows
2404    * the time limit for the file, only consider the running time of the
2405    * reference stream. Other streams might have run ahead a little bit,
2406    * but extra pieces won't be released to the muxer beyond the reference
2407    * stream cut-off anyway - so it forms the limit. */
2408   queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
2409   queued_time = splitmux->reference_ctx->in_running_time;
2410   /* queued_gop_time tracks how much unwritten data there is waiting to
2411    * be written to this fragment including this GOP */
2412   if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2413     queued_gop_time =
2414         splitmux->reference_ctx->in_running_time -
2415         splitmux->reference_ctx->out_running_time;
2416   else
2417     queued_gop_time =
2418         splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
2419
2420   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2421   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2422       " bytes %" G_GUINT64_FORMAT " in running time %" GST_STIME_FORMAT
2423       " gop start time %" GST_STIME_FORMAT,
2424       GST_STIME_ARGS (queued_time), queued_bytes,
2425       GST_STIME_ARGS (splitmux->reference_ctx->in_running_time),
2426       GST_STIME_ARGS (splitmux->gop_start_time));
2427
2428   if (queued_gop_time < 0)
2429     goto error_gop_duration;
2430
2431   if (queued_time < splitmux->fragment_start_time)
2432     goto error_queued_time;
2433
2434   queued_time -= splitmux->fragment_start_time;
2435   if (queued_time < queued_gop_time)
2436     queued_gop_time = queued_time;
2437
2438   /* Expand queued bytes estimate by muxer overhead */
2439   queued_bytes += (queued_bytes * splitmux->mux_overhead);
2440
2441   /* Check for overrun - have we output at least one byte and overrun
2442    * either threshold? */
2443   if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2444     if (splitmux->async_finalize) {
2445       GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2446       *sink_running_time = splitmux->reference_ctx->out_running_time;
2447       g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2448           RUNNING_TIME, sink_running_time, g_free);
2449     }
2450     g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2451     /* Tell the output side to start a new fragment */
2452     GST_INFO_OBJECT (splitmux,
2453         "This GOP (dur %" GST_STIME_FORMAT
2454         ") would overflow the fragment, Sending start_new_fragment cmd",
2455         GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
2456             splitmux->gop_start_time));
2457     cmd = out_cmd_buf_new ();
2458     cmd->start_new_fragment = TRUE;
2459     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2460     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2461
2462     new_out_ts = splitmux->reference_ctx->in_running_time;
2463     splitmux->fragment_start_time = splitmux->gop_start_time;
2464     splitmux->fragment_total_bytes = 0;
2465     splitmux->fragment_reference_bytes = 0;
2466
2467     if (splitmux->tc_interval) {
2468       video_time_code_replace (&splitmux->fragment_start_tc,
2469           splitmux->gop_start_tc);
2470       splitmux->next_fragment_start_tc_time =
2471           calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2472           splitmux->fragment_start_time, NULL);
2473       if (!GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2474         GST_WARNING_OBJECT (splitmux,
2475             "Couldn't calculate next fragment start time for timecode mode");
2476         /* shouldn't happen, but reset all and try again with next buffers */
2477         gst_splitmux_reset_timecode (splitmux);
2478       }
2479     }
2480   }
2481
2482   /* And set up to collect the next GOP */
2483   if (!splitmux->reference_ctx->in_eos) {
2484     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2485     splitmux->gop_start_time = new_out_ts;
2486     if (splitmux->tc_interval)
2487       video_time_code_replace (&splitmux->gop_start_tc, splitmux->in_tc);
2488   } else {
2489     /* This is probably already the current state, but just in case: */
2490     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2491     new_out_ts = GST_CLOCK_STIME_NONE;  /* EOS runs until forever */
2492   }
2493
2494   /* And wake all input contexts to send a wake-up event */
2495   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2496   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2497
2498   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2499   splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
2500   splitmux->fragment_reference_bytes += splitmux->gop_reference_bytes;
2501
2502   if (splitmux->gop_total_bytes > 0) {
2503     GST_LOG_OBJECT (splitmux,
2504         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2505         " time %" GST_STIME_FORMAT,
2506         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2507
2508     /* Send this GOP to the output command queue */
2509     cmd = out_cmd_buf_new ();
2510     cmd->start_new_fragment = FALSE;
2511     cmd->max_output_ts = new_out_ts;
2512     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2513         GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
2514     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2515
2516     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2517   }
2518
2519   splitmux->gop_total_bytes = 0;
2520   splitmux->gop_reference_bytes = 0;
2521   return;
2522
2523 error_gop_duration:
2524   GST_ELEMENT_ERROR (splitmux,
2525       STREAM, FAILED, ("Timestamping error on input streams"),
2526       ("Queued GOP time is negative %" GST_STIME_FORMAT,
2527           GST_STIME_ARGS (queued_gop_time)));
2528   return;
2529 error_queued_time:
2530   GST_ELEMENT_ERROR (splitmux,
2531       STREAM, FAILED, ("Timestamping error on input streams"),
2532       ("Queued time is negative. Input went backwards. queued_time - %"
2533           GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2534   return;
2535 }
2536
2537 /* Called with splitmux lock held */
2538 /* Called from each input pad when it is has all the pieces
2539  * for a GOP or EOS, starting with the reference pad which has set the
2540  * splitmux->max_in_running_time
2541  */
2542 static void
2543 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2544 {
2545   GList *cur;
2546   GstEvent *event;
2547
2548   /* On ENDING_FILE, the reference stream sends a command to start a new
2549    * fragment, then releases the GOP for output in the new fragment.
2550    *  If some streams received no buffer during the last GOP that overran,
2551    * because its next buffer has a timestamp bigger than
2552    * ctx->max_in_running_time, its queue is empty. In that case the only
2553    * way to wakeup the output thread is by injecting an event in the
2554    * queue. This usually happen with subtitle streams.
2555    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2556   if (ctx->need_unblock) {
2557     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2558     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2559         GST_EVENT_TYPE_SERIALIZED,
2560         gst_structure_new ("splitmuxsink-unblock", "timestamp",
2561             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2562
2563     GST_SPLITMUX_UNLOCK (splitmux);
2564     gst_pad_send_event (ctx->sinkpad, event);
2565     GST_SPLITMUX_LOCK (splitmux);
2566
2567     ctx->need_unblock = FALSE;
2568     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2569     /* state may have changed while we were unlocked. Loop again if so */
2570     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2571       return;
2572   }
2573
2574   do {
2575     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2576       gboolean ready = TRUE;
2577
2578       /* Iterate each pad, and check that the input running time is at least
2579        * up to the reference running time, and if so handle the collected GOP */
2580       GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
2581           GST_STIME_FORMAT " ctx %p",
2582           GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
2583       for (cur = g_list_first (splitmux->contexts); cur != NULL;
2584           cur = g_list_next (cur)) {
2585         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2586
2587         GST_LOG_OBJECT (splitmux,
2588             "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2589             " EOS %d", tmpctx, tmpctx->sinkpad,
2590             GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2591
2592         if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
2593             tmpctx->in_running_time < splitmux->max_in_running_time &&
2594             !tmpctx->in_eos) {
2595           GST_LOG_OBJECT (splitmux,
2596               "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2597               tmpctx, tmpctx->sinkpad);
2598           ready = FALSE;
2599           break;
2600         }
2601       }
2602       if (ready) {
2603         GST_DEBUG_OBJECT (splitmux,
2604             "Collected GOP is complete. Processing (ctx %p)", ctx);
2605         /* All pads have a complete GOP, release it into the multiqueue */
2606         handle_gathered_gop (splitmux);
2607
2608         /* The user has requested a split, we can split now that the previous GOP
2609          * has been collected to the correct location */
2610         if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2611                 TRUE, FALSE)) {
2612           g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2613         }
2614       }
2615     }
2616
2617     /* If upstream reached EOS we are not expecting more data, no need to wait
2618      * here. */
2619     if (ctx->in_eos)
2620       return;
2621
2622     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2623         !ctx->flushing &&
2624         (ctx->in_running_time >= splitmux->max_in_running_time) &&
2625         (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
2626       /* Some pad is not yet ready, or GOP is being pushed
2627        * either way, sleep and wait to get woken */
2628       GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2629       GST_SPLITMUX_WAIT_INPUT (splitmux);
2630       GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2631     } else {
2632       /* This pad is not ready or the state changed - break out and get another
2633        * buffer / event */
2634       break;
2635     }
2636   } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2637 }
2638
2639 static GstPadProbeReturn
2640 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2641 {
2642   GstSplitMuxSink *splitmux = ctx->splitmux;
2643   GstFlowReturn ret = GST_FLOW_OK;
2644   GstBuffer *buf;
2645   MqStreamBuf *buf_info = NULL;
2646   GstClockTime ts;
2647   gboolean loop_again;
2648   gboolean keyframe = FALSE;
2649
2650   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2651
2652   /* FIXME: Handle buffer lists, until then make it clear they won't work */
2653   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2654     g_warning ("Buffer list handling not implemented");
2655     return GST_PAD_PROBE_DROP;
2656   }
2657   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2658       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2659     GstEvent *event = gst_pad_probe_info_get_event (info);
2660
2661     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2662
2663     switch (GST_EVENT_TYPE (event)) {
2664       case GST_EVENT_SEGMENT:
2665         gst_event_copy_segment (event, &ctx->in_segment);
2666         break;
2667       case GST_EVENT_FLUSH_STOP:
2668         GST_SPLITMUX_LOCK (splitmux);
2669         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2670         ctx->in_eos = FALSE;
2671         ctx->in_running_time = GST_CLOCK_STIME_NONE;
2672         GST_SPLITMUX_UNLOCK (splitmux);
2673         break;
2674       case GST_EVENT_EOS:
2675         GST_SPLITMUX_LOCK (splitmux);
2676         ctx->in_eos = TRUE;
2677
2678         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2679           ret = GST_FLOW_FLUSHING;
2680           goto beach;
2681         }
2682
2683         if (ctx->is_reference) {
2684           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2685           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2686           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2687           /* Wake up other input pads to collect this GOP */
2688           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2689           check_completed_gop (splitmux, ctx);
2690         } else if (splitmux->input_state ==
2691             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2692           /* If we are waiting for a GOP to be completed (ie, for aux
2693            * pads to catch up), then this pad is complete, so check
2694            * if the whole GOP is.
2695            */
2696           check_completed_gop (splitmux, ctx);
2697         }
2698         GST_SPLITMUX_UNLOCK (splitmux);
2699         break;
2700       case GST_EVENT_GAP:{
2701         GstClockTime gap_ts;
2702         GstClockTimeDiff rtime;
2703
2704         gst_event_parse_gap (event, &gap_ts, NULL);
2705         if (gap_ts == GST_CLOCK_TIME_NONE)
2706           break;
2707
2708         GST_SPLITMUX_LOCK (splitmux);
2709
2710         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2711           ret = GST_FLOW_FLUSHING;
2712           goto beach;
2713         }
2714         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2715
2716         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2717             GST_STIME_ARGS (rtime));
2718
2719         if (ctx->is_reference
2720             && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2721           splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
2722           GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2723               GST_STIME_ARGS (splitmux->fragment_start_time));
2724           /* Also take this as the first start time when starting up,
2725            * so that we start counting overflow from the first frame */
2726           if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2727             splitmux->max_in_running_time = splitmux->fragment_start_time;
2728         }
2729
2730         GST_SPLITMUX_UNLOCK (splitmux);
2731         break;
2732       }
2733       default:
2734         break;
2735     }
2736     return GST_PAD_PROBE_PASS;
2737   } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2738     switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2739       case GST_QUERY_ALLOCATION:
2740         return GST_PAD_PROBE_DROP;
2741       default:
2742         return GST_PAD_PROBE_PASS;
2743     }
2744   }
2745
2746   buf = gst_pad_probe_info_get_buffer (info);
2747   buf_info = mq_stream_buf_new ();
2748
2749   if (GST_BUFFER_PTS_IS_VALID (buf))
2750     ts = GST_BUFFER_PTS (buf);
2751   else
2752     ts = GST_BUFFER_DTS (buf);
2753
2754   GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2755
2756   GST_SPLITMUX_LOCK (splitmux);
2757
2758   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2759     ret = GST_FLOW_FLUSHING;
2760     goto beach;
2761   }
2762
2763   /* If this buffer has a timestamp, advance the input timestamp of the
2764    * stream */
2765   if (GST_CLOCK_TIME_IS_VALID (ts)) {
2766     GstClockTimeDiff running_time =
2767         my_segment_to_running_time (&ctx->in_segment, ts);
2768
2769     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2770         GST_STIME_ARGS (running_time));
2771
2772     if (GST_CLOCK_STIME_IS_VALID (running_time)
2773         && running_time > ctx->in_running_time)
2774       ctx->in_running_time = running_time;
2775   }
2776
2777   /* Try to make sure we have a valid running time */
2778   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2779     ctx->in_running_time =
2780         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2781   }
2782
2783   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2784       GST_STIME_ARGS (ctx->in_running_time));
2785
2786   buf_info->run_ts = ctx->in_running_time;
2787   buf_info->buf_size = gst_buffer_get_size (buf);
2788   buf_info->duration = GST_BUFFER_DURATION (buf);
2789
2790   if (ctx->is_reference) {
2791     /* initialize fragment_start_time */
2792     if (splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2793       splitmux->gop_start_time = splitmux->fragment_start_time =
2794           buf_info->run_ts;
2795       GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2796           GST_STIME_ARGS (splitmux->fragment_start_time));
2797
2798       /* Also take this as the first start time when starting up,
2799        * so that we start counting overflow from the first frame */
2800       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2801         splitmux->max_in_running_time = splitmux->fragment_start_time;
2802     }
2803
2804     if (splitmux->tc_interval) {
2805       GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
2806       if (tc_meta) {
2807         video_time_code_replace (&splitmux->in_tc, &tc_meta->tc);
2808
2809         if (!splitmux->fragment_start_tc) {
2810           /* also initialize fragment_start_tc */
2811           video_time_code_replace (&splitmux->gop_start_tc, &tc_meta->tc);
2812           video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
2813
2814           splitmux->next_fragment_start_tc_time =
2815               calculate_next_max_timecode (splitmux, splitmux->in_tc,
2816               ctx->in_running_time, NULL);
2817           GST_DEBUG_OBJECT (splitmux, "Initialize next fragment start tc time %"
2818               GST_TIME_FORMAT,
2819               GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2820         }
2821       }
2822     }
2823
2824     /* Check whether we need to request next keyframe depending on
2825      * current running time */
2826     if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) &&
2827         request_next_keyframe (splitmux, buf, ctx->in_running_time) == FALSE) {
2828       GST_WARNING_OBJECT (splitmux,
2829           "Could not request a keyframe. Files may not split at the exact location they should");
2830     }
2831   }
2832
2833   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
2834       " total GOP bytes %" G_GUINT64_FORMAT,
2835       GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
2836
2837   loop_again = TRUE;
2838   do {
2839     if (ctx->flushing)
2840       break;
2841
2842     switch (splitmux->input_state) {
2843       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
2844         if (ctx->is_releasing) {
2845           /* The pad belonging to this context is being released */
2846           GST_WARNING_OBJECT (pad, "Pad is being released while the muxer is "
2847               "running. Data might not drain correctly");
2848           loop_again = FALSE;
2849         } else if (ctx->is_reference) {
2850           /* This is the reference context. If it's a keyframe,
2851            * it marks the start of a new GOP and we should wait in
2852            * check_completed_gop before continuing, but either way
2853            * (keyframe or no, we'll pass this buffer through after
2854            * so set loop_again to FALSE */
2855           loop_again = FALSE;
2856
2857           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
2858             /* Allow other input pads to catch up to here too */
2859             splitmux->max_in_running_time = ctx->in_running_time;
2860             GST_LOG_OBJECT (splitmux,
2861                 "Max in running time now %" GST_TIME_FORMAT,
2862                 GST_TIME_ARGS (splitmux->max_in_running_time));
2863             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2864             break;
2865           }
2866           GST_INFO_OBJECT (pad,
2867               "Have keyframe with running time %" GST_STIME_FORMAT,
2868               GST_STIME_ARGS (ctx->in_running_time));
2869           keyframe = TRUE;
2870           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2871           splitmux->max_in_running_time = ctx->in_running_time;
2872           GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
2873               GST_TIME_ARGS (splitmux->max_in_running_time));
2874           /* Wake up other input pads to collect this GOP */
2875           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2876           check_completed_gop (splitmux, ctx);
2877         } else {
2878           /* Pass this buffer if the reference ctx is far enough ahead */
2879           if (ctx->in_running_time < splitmux->max_in_running_time) {
2880             loop_again = FALSE;
2881             break;
2882           }
2883
2884           /* We're still waiting for a keyframe on the reference pad, sleep */
2885           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
2886           GST_SPLITMUX_WAIT_INPUT (splitmux);
2887           GST_LOG_OBJECT (pad,
2888               "Done sleeping for GOP start input state now %d",
2889               splitmux->input_state);
2890         }
2891         break;
2892       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
2893         /* We're collecting a GOP. If this is the reference context,
2894          * we need to check if this is a keyframe that marks the start
2895          * of the next GOP. If it is, it marks the end of the GOP we're
2896          * collecting, so sleep and wait until all the other pads also
2897          * reach that timestamp - at which point, we have an entire GOP
2898          * and either go to ENDING_FILE or release this GOP to the muxer and
2899          * go back to COLLECT_GOP_START. */
2900
2901         /* If we overran the target timestamp, it might be time to process
2902          * the GOP, otherwise bail out for more data
2903          */
2904         GST_LOG_OBJECT (pad,
2905             "Checking TS %" GST_STIME_FORMAT " against max %"
2906             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
2907             GST_STIME_ARGS (splitmux->max_in_running_time));
2908
2909         if (ctx->in_running_time < splitmux->max_in_running_time) {
2910           loop_again = FALSE;
2911           break;
2912         }
2913
2914         GST_LOG_OBJECT (pad,
2915             "Collected last packet of GOP. Checking other pads");
2916         check_completed_gop (splitmux, ctx);
2917         break;
2918       }
2919       case SPLITMUX_INPUT_STATE_FINISHING_UP:
2920         loop_again = FALSE;
2921         break;
2922       default:
2923         loop_again = FALSE;
2924         break;
2925     }
2926   }
2927   while (loop_again);
2928
2929   if (keyframe) {
2930     splitmux->queued_keyframes++;
2931     buf_info->keyframe = TRUE;
2932   }
2933
2934   /* Update total input byte counter for overflow detect */
2935   splitmux->gop_total_bytes += buf_info->buf_size;
2936   if (ctx->is_reference) {
2937     splitmux->gop_reference_bytes += buf_info->buf_size;
2938   }
2939
2940   /* Now add this buffer to the queue just before returning */
2941   g_queue_push_head (&ctx->queued_bufs, buf_info);
2942
2943   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
2944       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
2945
2946   GST_SPLITMUX_UNLOCK (splitmux);
2947   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
2948   return GST_PAD_PROBE_PASS;
2949
2950 beach:
2951   GST_SPLITMUX_UNLOCK (splitmux);
2952   if (buf_info)
2953     mq_stream_buf_free (buf_info);
2954   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
2955   return GST_PAD_PROBE_PASS;
2956 }
2957
2958 static void
2959 grow_blocked_queues (GstSplitMuxSink * splitmux)
2960 {
2961   GList *cur;
2962
2963   /* Scan other queues for full-ness and grow them */
2964   for (cur = g_list_first (splitmux->contexts);
2965       cur != NULL; cur = g_list_next (cur)) {
2966     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2967     guint cur_limit;
2968     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
2969
2970     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
2971     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
2972
2973     if (cur_len >= cur_limit) {
2974       cur_limit = cur_len + 1;
2975       GST_DEBUG_OBJECT (tmpctx->q,
2976           "Queue overflowed and needs enlarging. Growing to %u buffers",
2977           cur_limit);
2978       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
2979     }
2980   }
2981 }
2982
2983 static void
2984 handle_q_underrun (GstElement * q, gpointer user_data)
2985 {
2986   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2987   GstSplitMuxSink *splitmux = ctx->splitmux;
2988
2989   GST_SPLITMUX_LOCK (splitmux);
2990   GST_DEBUG_OBJECT (q,
2991       "Queue reported underrun with %d keyframes and %d cmds enqueued",
2992       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2993   grow_blocked_queues (splitmux);
2994   GST_SPLITMUX_UNLOCK (splitmux);
2995 }
2996
2997 static void
2998 handle_q_overrun (GstElement * q, gpointer user_data)
2999 {
3000   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3001   GstSplitMuxSink *splitmux = ctx->splitmux;
3002   gboolean allow_grow = FALSE;
3003
3004   GST_SPLITMUX_LOCK (splitmux);
3005   GST_DEBUG_OBJECT (q,
3006       "Queue reported overrun with %d keyframes and %d cmds enqueued",
3007       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3008
3009   if (splitmux->queued_keyframes < 2) {
3010     /* Less than a full GOP queued, grow the queue */
3011     allow_grow = TRUE;
3012   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3013     allow_grow = TRUE;
3014   } else {
3015     /* If another queue is starved, grow */
3016     GList *cur;
3017     for (cur = g_list_first (splitmux->contexts);
3018         cur != NULL; cur = g_list_next (cur)) {
3019       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3020       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3021         allow_grow = TRUE;
3022       }
3023     }
3024   }
3025   GST_SPLITMUX_UNLOCK (splitmux);
3026
3027   if (allow_grow) {
3028     guint cur_limit;
3029
3030     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3031     cur_limit++;
3032
3033     GST_DEBUG_OBJECT (q,
3034         "Queue overflowed and needs enlarging. Growing to %u buffers",
3035         cur_limit);
3036
3037     g_object_set (q, "max-size-buffers", cur_limit, NULL);
3038   }
3039 }
3040
3041 /* Called with SPLITMUX lock held */
3042 static const gchar *
3043 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3044 {
3045   const gchar *ret = NULL;
3046
3047   if (splitmux->muxerpad_map == NULL)
3048     return NULL;
3049
3050   if (sinkpad_name == NULL) {
3051     GST_WARNING_OBJECT (splitmux,
3052         "Can't look up request pad in pad map without providing a pad name");
3053     return NULL;
3054   }
3055
3056   ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3057   if (ret) {
3058     GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3059         ret);
3060     return g_strdup (ret);
3061   }
3062
3063   return NULL;
3064 }
3065
3066 static GstPad *
3067 gst_splitmux_sink_request_new_pad (GstElement * element,
3068     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3069 {
3070   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3071   GstPadTemplate *mux_template = NULL;
3072   GstPad *ret = NULL, *muxpad = NULL;
3073   GstElement *q;
3074   GstPad *q_sink = NULL, *q_src = NULL;
3075   gchar *gname, *qname;
3076   gboolean is_primary_video = FALSE, is_video = FALSE,
3077       muxer_is_requestpad = FALSE;
3078   MqStreamCtx *ctx;
3079   const gchar *muxer_padname = NULL;
3080
3081   GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3082
3083   GST_SPLITMUX_LOCK (splitmux);
3084   if (!create_muxer (splitmux))
3085     goto fail;
3086   g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3087
3088   if (g_str_equal (templ->name_template, "video") ||
3089       g_str_has_prefix (templ->name_template, "video_aux_")) {
3090     is_primary_video = g_str_equal (templ->name_template, "video");
3091     if (is_primary_video && splitmux->have_video)
3092       goto already_have_video;
3093     is_video = TRUE;
3094   }
3095
3096   /* See if there's a pad map and it lists this pad */
3097   muxer_padname = lookup_muxer_pad (splitmux, name);
3098
3099   if (muxer_padname == NULL) {
3100     if (is_video) {
3101       /* FIXME: Look for a pad template with matching caps, rather than by name */
3102       GST_DEBUG_OBJECT (element,
3103           "searching for pad-template with name 'video_%%u'");
3104       mux_template =
3105           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3106           (splitmux->muxer), "video_%u");
3107
3108       /* Fallback to find sink pad templates named 'video' (flvmux) */
3109       if (!mux_template) {
3110         GST_DEBUG_OBJECT (element,
3111             "searching for pad-template with name 'video'");
3112         mux_template =
3113             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3114             (splitmux->muxer), "video");
3115       }
3116       name = NULL;
3117     } else {
3118       GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3119           templ->name_template);
3120       mux_template =
3121           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3122           (splitmux->muxer), templ->name_template);
3123
3124       /* Fallback to find sink pad templates named 'audio' (flvmux) */
3125       if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3126         GST_DEBUG_OBJECT (element,
3127             "searching for pad-template with name 'audio'");
3128         mux_template =
3129             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3130             (splitmux->muxer), "audio");
3131         name = NULL;
3132       }
3133     }
3134
3135     if (mux_template == NULL) {
3136       GST_DEBUG_OBJECT (element,
3137           "searching for pad-template with name 'sink_%%d'");
3138       mux_template =
3139           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3140           (splitmux->muxer), "sink_%d");
3141       name = NULL;
3142     }
3143     if (mux_template == NULL) {
3144       GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3145       mux_template =
3146           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3147           (splitmux->muxer), "sink");
3148       name = NULL;
3149     }
3150
3151     if (mux_template == NULL) {
3152       GST_ERROR_OBJECT (element,
3153           "unable to find a suitable sink pad-template on the muxer");
3154       goto fail;
3155     }
3156     GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3157         mux_template->name_template);
3158
3159     if (mux_template->presence == GST_PAD_REQUEST) {
3160       GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3161
3162       muxpad =
3163           gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3164       muxer_is_requestpad = TRUE;
3165     } else if (mux_template->presence == GST_PAD_ALWAYS) {
3166       GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3167
3168       muxpad =
3169           gst_element_get_static_pad (splitmux->muxer,
3170           mux_template->name_template);
3171     } else {
3172       GST_ERROR_OBJECT (element,
3173           "unexpected pad presence %d", mux_template->presence);
3174       goto fail;
3175     }
3176   } else {
3177     /* Have a muxer pad name */
3178     if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3179       if ((muxpad =
3180               gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3181         muxer_is_requestpad = TRUE;
3182     }
3183     g_free ((gchar *) muxer_padname);
3184     muxer_padname = NULL;
3185   }
3186
3187   /* One way or another, we must have a muxer pad by now */
3188   if (muxpad == NULL)
3189     goto fail;
3190
3191   if (is_primary_video)
3192     gname = g_strdup ("video");
3193   else if (name == NULL)
3194     gname = gst_pad_get_name (muxpad);
3195   else
3196     gname = g_strdup (name);
3197
3198   qname = g_strdup_printf ("queue_%s", gname);
3199   if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3200     g_free (qname);
3201     goto fail;
3202   }
3203   g_free (qname);
3204
3205   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3206
3207   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3208       "max-size-buffers", 5, NULL);
3209
3210   q_sink = gst_element_get_static_pad (q, "sink");
3211   q_src = gst_element_get_static_pad (q, "src");
3212
3213   if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3214     if (muxer_is_requestpad)
3215       gst_element_release_request_pad (splitmux->muxer, muxpad);
3216     gst_object_unref (GST_OBJECT (muxpad));
3217     goto fail;
3218   }
3219
3220   gst_object_unref (GST_OBJECT (muxpad));
3221
3222   ctx = mq_stream_ctx_new (splitmux);
3223   /* Context holds a ref: */
3224   ctx->q = gst_object_ref (q);
3225   ctx->srcpad = q_src;
3226   ctx->sinkpad = q_sink;
3227   ctx->q_overrun_id =
3228       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3229   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3230
3231   ctx->src_pad_block_id =
3232       gst_pad_add_probe (q_src,
3233       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3234       (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3235   if (is_primary_video && splitmux->reference_ctx != NULL) {
3236     splitmux->reference_ctx->is_reference = FALSE;
3237     splitmux->reference_ctx = NULL;
3238   }
3239   if (splitmux->reference_ctx == NULL) {
3240     splitmux->reference_ctx = ctx;
3241     ctx->is_reference = TRUE;
3242   }
3243
3244   ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3245   g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3246
3247   ctx->sink_pad_block_id =
3248       gst_pad_add_probe (q_sink,
3249       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3250       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3251       (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3252
3253   GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3254       " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3255
3256   splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3257
3258   g_free (gname);
3259
3260   if (is_primary_video)
3261     splitmux->have_video = TRUE;
3262
3263   gst_pad_set_active (ret, TRUE);
3264   gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3265
3266   GST_SPLITMUX_UNLOCK (splitmux);
3267
3268   return ret;
3269 fail:
3270   GST_SPLITMUX_UNLOCK (splitmux);
3271
3272   if (q_sink)
3273     gst_object_unref (q_sink);
3274   if (q_src)
3275     gst_object_unref (q_src);
3276   return NULL;
3277 already_have_video:
3278   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3279   GST_SPLITMUX_UNLOCK (splitmux);
3280   return NULL;
3281 }
3282
3283 static void
3284 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3285 {
3286   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3287   GstPad *muxpad = NULL;
3288   MqStreamCtx *ctx =
3289       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3290
3291   GST_SPLITMUX_LOCK (splitmux);
3292
3293   if (splitmux->muxer == NULL)
3294     goto fail;                  /* Elements don't exist yet - nothing to release */
3295
3296   GST_INFO_OBJECT (pad, "releasing request pad");
3297
3298   muxpad = gst_pad_get_peer (ctx->srcpad);
3299
3300   /* Remove the context from our consideration */
3301   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3302
3303   GST_SPLITMUX_UNLOCK (splitmux);
3304
3305   if (ctx->sink_pad_block_id) {
3306     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3307     gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3308   }
3309
3310   if (ctx->src_pad_block_id)
3311     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3312
3313   GST_SPLITMUX_LOCK (splitmux);
3314
3315   ctx->is_releasing = TRUE;
3316   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3317
3318   /* Can release the context now */
3319   mq_stream_ctx_free (ctx);
3320   if (ctx == splitmux->reference_ctx)
3321     splitmux->reference_ctx = NULL;
3322
3323   /* Release and free the muxer input */
3324   if (muxpad) {
3325     gst_element_release_request_pad (splitmux->muxer, muxpad);
3326     gst_object_unref (muxpad);
3327   }
3328
3329   if (GST_PAD_PAD_TEMPLATE (pad) &&
3330       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3331               (pad)), "video"))
3332     splitmux->have_video = FALSE;
3333
3334   gst_element_remove_pad (element, pad);
3335
3336   /* Reset the internal elements only after all request pads are released */
3337   if (splitmux->contexts == NULL)
3338     gst_splitmux_reset_elements (splitmux);
3339
3340   /* Wake up other input streams to check if the completion conditions have
3341    * changed */
3342   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3343
3344 fail:
3345   GST_SPLITMUX_UNLOCK (splitmux);
3346 }
3347
3348 static GstElement *
3349 create_element (GstSplitMuxSink * splitmux,
3350     const gchar * factory, const gchar * name, gboolean locked)
3351 {
3352   GstElement *ret = gst_element_factory_make (factory, name);
3353   if (ret == NULL) {
3354     g_warning ("Failed to create %s - splitmuxsink will not work", name);
3355     return NULL;
3356   }
3357
3358   if (locked) {
3359     /* Ensure the sink starts in locked state and NULL - it will be changed
3360      * by the filename setting code */
3361     gst_element_set_locked_state (ret, TRUE);
3362     gst_element_set_state (ret, GST_STATE_NULL);
3363   }
3364
3365   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3366     g_warning ("Could not add %s element - splitmuxsink will not work", name);
3367     gst_object_unref (ret);
3368     return NULL;
3369   }
3370
3371   return ret;
3372 }
3373
3374 static gboolean
3375 create_muxer (GstSplitMuxSink * splitmux)
3376 {
3377   /* Create internal elements */
3378   if (splitmux->muxer == NULL) {
3379     GstElement *provided_muxer = NULL;
3380
3381     GST_OBJECT_LOCK (splitmux);
3382     if (splitmux->provided_muxer != NULL)
3383       provided_muxer = gst_object_ref (splitmux->provided_muxer);
3384     GST_OBJECT_UNLOCK (splitmux);
3385
3386     if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3387         (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3388       if ((splitmux->muxer =
3389               create_element (splitmux,
3390                   splitmux->muxer_factory ? splitmux->
3391                   muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3392         goto fail;
3393     } else if (splitmux->async_finalize) {
3394       if ((splitmux->muxer =
3395               create_element (splitmux, splitmux->muxer_factory, "muxer",
3396                   FALSE)) == NULL)
3397         goto fail;
3398       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3399         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3400             splitmux->muxer_preset);
3401       if (splitmux->muxer_properties)
3402         gst_structure_foreach (splitmux->muxer_properties,
3403             _set_property_from_structure, splitmux->muxer);
3404     } else {
3405       /* Ensure it's not in locked state (we might be reusing an old element) */
3406       gst_element_set_locked_state (provided_muxer, FALSE);
3407       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3408         g_warning ("Could not add muxer element - splitmuxsink will not work");
3409         gst_object_unref (provided_muxer);
3410         goto fail;
3411       }
3412
3413       splitmux->muxer = provided_muxer;
3414       gst_object_unref (provided_muxer);
3415     }
3416
3417     if (splitmux->use_robust_muxing) {
3418       update_muxer_properties (splitmux);
3419     }
3420   }
3421
3422   return TRUE;
3423 fail:
3424   return FALSE;
3425 }
3426
3427 static GstElement *
3428 find_sink (GstElement * e)
3429 {
3430   GstElement *res = NULL;
3431   GstIterator *iter;
3432   gboolean done = FALSE;
3433   GValue data = { 0, };
3434
3435   if (!GST_IS_BIN (e))
3436     return e;
3437
3438   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3439     return e;
3440
3441   iter = gst_bin_iterate_sinks (GST_BIN (e));
3442   while (!done) {
3443     switch (gst_iterator_next (iter, &data)) {
3444       case GST_ITERATOR_OK:
3445       {
3446         GstElement *child = g_value_get_object (&data);
3447         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3448                 "location") != NULL) {
3449           res = child;
3450           done = TRUE;
3451         }
3452         g_value_reset (&data);
3453         break;
3454       }
3455       case GST_ITERATOR_RESYNC:
3456         gst_iterator_resync (iter);
3457         break;
3458       case GST_ITERATOR_DONE:
3459         done = TRUE;
3460         break;
3461       case GST_ITERATOR_ERROR:
3462         g_assert_not_reached ();
3463         break;
3464     }
3465   }
3466   g_value_unset (&data);
3467   gst_iterator_free (iter);
3468
3469   return res;
3470 }
3471
3472 static gboolean
3473 create_sink (GstSplitMuxSink * splitmux)
3474 {
3475   GstElement *provided_sink = NULL;
3476
3477   if (splitmux->active_sink == NULL) {
3478
3479     GST_OBJECT_LOCK (splitmux);
3480     if (splitmux->provided_sink != NULL)
3481       provided_sink = gst_object_ref (splitmux->provided_sink);
3482     GST_OBJECT_UNLOCK (splitmux);
3483
3484     if ((!splitmux->async_finalize && provided_sink == NULL) ||
3485         (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3486       if ((splitmux->sink =
3487               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3488         goto fail;
3489       splitmux->active_sink = splitmux->sink;
3490     } else if (splitmux->async_finalize) {
3491       if ((splitmux->sink =
3492               create_element (splitmux, splitmux->sink_factory, "sink",
3493                   TRUE)) == NULL)
3494         goto fail;
3495       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3496         gst_preset_load_preset (GST_PRESET (splitmux->sink),
3497             splitmux->sink_preset);
3498       if (splitmux->sink_properties)
3499         gst_structure_foreach (splitmux->sink_properties,
3500             _set_property_from_structure, splitmux->sink);
3501       splitmux->active_sink = splitmux->sink;
3502     } else {
3503       /* Ensure the sink starts in locked state and NULL - it will be changed
3504        * by the filename setting code */
3505       gst_element_set_locked_state (provided_sink, TRUE);
3506       gst_element_set_state (provided_sink, GST_STATE_NULL);
3507       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3508         g_warning ("Could not add sink elements - splitmuxsink will not work");
3509         gst_object_unref (provided_sink);
3510         goto fail;
3511       }
3512
3513       splitmux->active_sink = provided_sink;
3514
3515       /* The bin holds a ref now, we can drop our tmp ref */
3516       gst_object_unref (provided_sink);
3517
3518       /* Find the sink element */
3519       splitmux->sink = find_sink (splitmux->active_sink);
3520       if (splitmux->sink == NULL) {
3521         g_warning
3522             ("Could not locate sink element in provided sink - splitmuxsink will not work");
3523         goto fail;
3524       }
3525     }
3526
3527 #if 1
3528     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3529             "async") != NULL) {
3530       /* async child elements are causing state change races and weird
3531        * failures, so let's try and turn that off */
3532       g_object_set (splitmux->sink, "async", FALSE, NULL);
3533     }
3534 #endif
3535
3536     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3537       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3538       goto fail;
3539     }
3540   }
3541
3542   return TRUE;
3543 fail:
3544   return FALSE;
3545 }
3546
3547 #ifdef __GNUC__
3548 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3549 #endif
3550 static void
3551 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3552 {
3553   gchar *fname = NULL;
3554   GstSample *sample;
3555   GstCaps *caps;
3556
3557   gst_splitmux_sink_ensure_max_files (splitmux);
3558
3559   if (ctx->cur_out_buffer == NULL) {
3560     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3561   }
3562
3563   caps = gst_pad_get_current_caps (ctx->srcpad);
3564   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3565   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3566       splitmux->fragment_id, sample, &fname);
3567   gst_sample_unref (sample);
3568   if (caps)
3569     gst_caps_unref (caps);
3570
3571   if (fname == NULL) {
3572     /* Fallback to the old signal if the new one returned nothing */
3573     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3574         splitmux->fragment_id, &fname);
3575   }
3576
3577   if (!fname)
3578     fname = splitmux->location ?
3579         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3580
3581   if (fname) {
3582     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3583     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3584             "location") != NULL)
3585       g_object_set (splitmux->sink, "location", fname, NULL);
3586     g_free (fname);
3587   }
3588
3589   splitmux->fragment_id++;
3590 }
3591
3592 /* called with GST_SPLITMUX_LOCK */
3593 static void
3594 do_async_start (GstSplitMuxSink * splitmux)
3595 {
3596   GstMessage *message;
3597
3598   if (!splitmux->need_async_start) {
3599     GST_INFO_OBJECT (splitmux, "no async_start needed");
3600     return;
3601   }
3602
3603   splitmux->async_pending = TRUE;
3604
3605   GST_INFO_OBJECT (splitmux, "Sending async_start message");
3606   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3607
3608   GST_SPLITMUX_UNLOCK (splitmux);
3609   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3610       (splitmux), message);
3611   GST_SPLITMUX_LOCK (splitmux);
3612 }
3613
3614 /* called with GST_SPLITMUX_LOCK */
3615 static void
3616 do_async_done (GstSplitMuxSink * splitmux)
3617 {
3618   GstMessage *message;
3619
3620   if (splitmux->async_pending) {
3621     GST_INFO_OBJECT (splitmux, "Sending async_done message");
3622     splitmux->async_pending = FALSE;
3623     GST_SPLITMUX_UNLOCK (splitmux);
3624
3625     message =
3626         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3627         GST_CLOCK_TIME_NONE);
3628     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3629         (splitmux), message);
3630     GST_SPLITMUX_LOCK (splitmux);
3631   }
3632
3633   splitmux->need_async_start = FALSE;
3634 }
3635
3636 static void
3637 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3638 {
3639   splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3640   splitmux->gop_start_time = splitmux->fragment_start_time =
3641       GST_CLOCK_STIME_NONE;
3642   splitmux->max_out_running_time = 0;
3643   splitmux->fragment_total_bytes = 0;
3644   splitmux->fragment_reference_bytes = 0;
3645   splitmux->gop_total_bytes = 0;
3646   splitmux->gop_reference_bytes = 0;
3647   splitmux->muxed_out_bytes = 0;
3648   splitmux->ready_for_output = FALSE;
3649
3650   g_atomic_int_set (&(splitmux->split_requested), FALSE);
3651   g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3652
3653   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
3654   gst_queue_array_clear (splitmux->times_to_split);
3655
3656   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
3657   splitmux->queued_keyframes = 0;
3658
3659   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
3660   g_queue_clear (&splitmux->out_cmd_q);
3661 }
3662
3663 static GstStateChangeReturn
3664 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3665 {
3666   GstStateChangeReturn ret;
3667   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3668
3669   switch (transition) {
3670     case GST_STATE_CHANGE_NULL_TO_READY:{
3671       GST_SPLITMUX_LOCK (splitmux);
3672       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3673         ret = GST_STATE_CHANGE_FAILURE;
3674         GST_SPLITMUX_UNLOCK (splitmux);
3675         goto beach;
3676       }
3677       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3678       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3679       GST_SPLITMUX_UNLOCK (splitmux);
3680       splitmux->fragment_id = splitmux->start_index;
3681       break;
3682     }
3683     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3684       GST_SPLITMUX_LOCK (splitmux);
3685       /* Make sure contexts and tracking times are cleared, in case we're being reused */
3686       gst_splitmux_sink_reset (splitmux);
3687       /* Start by collecting one input on each pad */
3688       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3689       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3690
3691       GST_SPLITMUX_UNLOCK (splitmux);
3692
3693       GST_SPLITMUX_STATE_LOCK (splitmux);
3694       splitmux->shutdown = FALSE;
3695       GST_SPLITMUX_STATE_UNLOCK (splitmux);
3696       break;
3697     }
3698     case GST_STATE_CHANGE_PAUSED_TO_READY:
3699     case GST_STATE_CHANGE_READY_TO_READY:
3700       g_atomic_int_set (&(splitmux->split_requested), FALSE);
3701       g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3702       /* Fall through */
3703     case GST_STATE_CHANGE_READY_TO_NULL:
3704       GST_SPLITMUX_STATE_LOCK (splitmux);
3705       splitmux->shutdown = TRUE;
3706       GST_SPLITMUX_STATE_UNLOCK (splitmux);
3707
3708       GST_SPLITMUX_LOCK (splitmux);
3709       gst_splitmux_sink_reset (splitmux);
3710       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3711       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3712       /* Wake up any blocked threads */
3713       GST_LOG_OBJECT (splitmux,
3714           "State change -> NULL or READY. Waking threads");
3715       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3716       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
3717       GST_SPLITMUX_UNLOCK (splitmux);
3718       break;
3719     default:
3720       break;
3721   }
3722
3723   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3724   if (ret == GST_STATE_CHANGE_FAILURE)
3725     goto beach;
3726
3727   switch (transition) {
3728     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3729       splitmux->need_async_start = TRUE;
3730       break;
3731     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3732       /* Change state async, because our child sink might not
3733        * be ready to do that for us yet if it's state is still locked */
3734
3735       splitmux->need_async_start = TRUE;
3736       /* we want to go async to PAUSED until we managed to configure and add the
3737        * sink */
3738       GST_SPLITMUX_LOCK (splitmux);
3739       do_async_start (splitmux);
3740       GST_SPLITMUX_UNLOCK (splitmux);
3741       ret = GST_STATE_CHANGE_ASYNC;
3742       break;
3743     }
3744     case GST_STATE_CHANGE_READY_TO_NULL:
3745       GST_SPLITMUX_LOCK (splitmux);
3746       splitmux->fragment_id = 0;
3747       /* Reset internal elements only if no pad contexts are using them */
3748       if (splitmux->contexts == NULL)
3749         gst_splitmux_reset_elements (splitmux);
3750       do_async_done (splitmux);
3751       GST_SPLITMUX_UNLOCK (splitmux);
3752       break;
3753     default:
3754       break;
3755   }
3756
3757   return ret;
3758
3759 beach:
3760   if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
3761     /* Cleanup elements on failed transition out of NULL */
3762     gst_splitmux_reset_elements (splitmux);
3763     GST_SPLITMUX_LOCK (splitmux);
3764     do_async_done (splitmux);
3765     GST_SPLITMUX_UNLOCK (splitmux);
3766   }
3767   if (transition == GST_STATE_CHANGE_READY_TO_READY) {
3768     /* READY to READY transition only happens when we're already
3769      * in READY state, but a child element is in NULL, which
3770      * happens when there's an error changing the state of the sink.
3771      * We need to make sure not to fail the state transition, or
3772      * the core won't transition us back to NULL successfully */
3773     ret = GST_STATE_CHANGE_SUCCESS;
3774   }
3775   return ret;
3776 }
3777
3778 static void
3779 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
3780 {
3781   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
3782     splitmux->fragment_id = 0;
3783   }
3784 }
3785
3786 static void
3787 split_now (GstSplitMuxSink * splitmux)
3788 {
3789   g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
3790 }
3791
3792 static void
3793 split_after (GstSplitMuxSink * splitmux)
3794 {
3795   g_atomic_int_set (&(splitmux->split_requested), TRUE);
3796 }
3797
3798 static void
3799 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
3800 {
3801   gboolean send_keyframe_requests;
3802
3803   GST_SPLITMUX_LOCK (splitmux);
3804   gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
3805   send_keyframe_requests = splitmux->send_keyframe_requests;
3806   GST_SPLITMUX_UNLOCK (splitmux);
3807
3808   if (send_keyframe_requests) {
3809     GstEvent *ev =
3810         gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
3811     GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
3812         GST_TIME_ARGS (split_time));
3813     if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
3814       GST_WARNING_OBJECT (splitmux,
3815           "Could not request keyframe at %" GST_TIME_FORMAT,
3816           GST_TIME_ARGS (split_time));
3817     }
3818   }
3819 }