splitmuxsink: Keep track of the pending input GOPs in a queue
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / multifile / gstsplitmuxsink.c
1 /* GStreamer Muxer bin that splits output stream by size/time
2  * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 /**
21  * SECTION:element-splitmuxsink
22  * @title: splitmuxsink
23  * @short_description: Muxer wrapper for splitting output stream by size or time
24  *
25  * This element wraps a muxer and a sink, and starts a new file when the mux
26  * contents are about to cross a threshold of maximum size of maximum time,
27  * splitting at video keyframe boundaries. Exactly one input video stream
28  * can be muxed, with as many accompanying audio and subtitle streams as
29  * desired.
30  *
31  * By default, it uses mp4mux and filesink, but they can be changed via
32  * the 'muxer' and 'sink' properties.
33  *
34  * The minimum file size is 1 GOP, however - so limits may be overrun if the
35  * distance between any 2 keyframes is larger than the limits.
36  *
37  * If a video stream is available, the splitting process is driven by the video
38  * stream contents, and the video stream must contain closed GOPs for the output
39  * file parts to be played individually correctly. In the absence of a video
40  * stream, the first available stream is used as reference for synchronization.
41  *
42  * In the async-finalize mode, when the threshold is crossed, the old muxer
43  * and sink is disconnected from the pipeline and left to finish the file
44  * asynchronously, and a new muxer and sink is created to continue with the
45  * next fragment. For that reason, instead of muxer and sink objects, the
46  * muxer-factory and sink-factory properties are used to construct the new
47  * objects, together with muxer-properties and sink-properties.
48  *
49  * ## Example pipelines
50  * |[
51  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
52  * ]|
53  * Records a video stream captured from a v4l2 device and muxes it into
54  * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55  * and 1MB maximum size.
56  *
57  * |[
58  * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
59  * ]|
60  * Records a video stream captured from a v4l2 device and muxer it into
61  * streamable Matroska files, splitting as needed to limit size/duration to 10
62  * seconds. Each file will finalize asynchronously.
63  *
64  * |[
65  * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
66  * ]|
67  * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
68  * it will deliver to.
69  */
70
71 #ifdef HAVE_CONFIG_H
72 #include "config.h"
73 #endif
74
75 #include <string.h>
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
79
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
82
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
85
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
90
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
93
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97     GstClockTime split_time);
98
99 enum
100 {
101   PROP_0,
102   PROP_LOCATION,
103   PROP_START_INDEX,
104   PROP_MAX_SIZE_TIME,
105   PROP_MAX_SIZE_BYTES,
106   PROP_MAX_SIZE_TIMECODE,
107   PROP_SEND_KEYFRAME_REQUESTS,
108   PROP_MAX_FILES,
109   PROP_MUXER_OVERHEAD,
110   PROP_USE_ROBUST_MUXING,
111   PROP_ALIGNMENT_THRESHOLD,
112   PROP_MUXER,
113   PROP_SINK,
114   PROP_RESET_MUXER,
115   PROP_ASYNC_FINALIZE,
116   PROP_MUXER_FACTORY,
117   PROP_MUXER_PRESET,
118   PROP_MUXER_PROPERTIES,
119   PROP_SINK_FACTORY,
120   PROP_SINK_PRESET,
121   PROP_SINK_PROPERTIES,
122   PROP_MUXERPAD_MAP
123 };
124
125 #define DEFAULT_MAX_SIZE_TIME       0
126 #define DEFAULT_MAX_SIZE_BYTES      0
127 #define DEFAULT_MAX_FILES           0
128 #define DEFAULT_MUXER_OVERHEAD      0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
137
138 typedef struct _AsyncEosHelper
139 {
140   MqStreamCtx *ctx;
141   GstPad *pad;
142 } AsyncEosHelper;
143
144 enum
145 {
146   SIGNAL_FORMAT_LOCATION,
147   SIGNAL_FORMAT_LOCATION_FULL,
148   SIGNAL_SPLIT_NOW,
149   SIGNAL_SPLIT_AFTER,
150   SIGNAL_SPLIT_AT_RUNNING_TIME,
151   SIGNAL_MUXER_ADDED,
152   SIGNAL_SINK_ADDED,
153   SIGNAL_LAST
154 };
155
156 static guint signals[SIGNAL_LAST];
157
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
160     GST_PAD_SINK,
161     GST_PAD_REQUEST,
162     GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
165     GST_PAD_SINK,
166     GST_PAD_REQUEST,
167     GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
170     GST_PAD_SINK,
171     GST_PAD_REQUEST,
172     GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
175     GST_PAD_SINK,
176     GST_PAD_REQUEST,
177     GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
180     GST_PAD_SINK,
181     GST_PAD_REQUEST,
182     GST_STATIC_CAPS_ANY);
183
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188  * to forward an incoming EOS message, but we cannot rely on the state of the
189  * splitmux anymore, so we set this qdata on the sink instead.
190  * The muxer and sink must be destroyed after both of these things have
191  * finished:
192  * 1) The EOS message has been sent when the fragment is ending
193  * 2) The muxer has been unlinked and relinked
194  * Therefore, EOS_FROM_US can have these two values:
195  * 0: EOS was not requested from us. Forward the message. The muxer and the
196  * sink will be destroyed together with the rest of the bin.
197  * 1: EOS was requested from us, but the other of the two tasks hasn't
198  * finished. Set EOS_FROM_US to 2 and do your stuff.
199  * 2: EOS was requested from us and the other of the two tasks has finished.
200  * Now we can destroy the muxer and the sink.
201  */
202
203 static void
204 _do_init (void)
205 {
206   PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207   EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208   RUNNING_TIME = g_quark_from_static_string ("running-time");
209   GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210       "Split File Muxing Sink");
211 }
212
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
215     _do_init ());
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217     GST_TYPE_SPLITMUX_SINK);
218
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222     const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224     GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
227
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
231
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233     element, GstStateChange transition);
234
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
238     MqStreamCtx * ctx);
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
241
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244     const gchar * factory, const gchar * name, gboolean locked);
245
246 static void do_async_done (GstSplitMuxSink * splitmux);
247
248 static GstClockTime calculate_next_max_timecode (GstSplitMuxSink * splitmux,
249     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
250     GstVideoTimeCode ** next_tc);
251
252 static MqStreamBuf *
253 mq_stream_buf_new (void)
254 {
255   return g_slice_new0 (MqStreamBuf);
256 }
257
258 static void
259 mq_stream_buf_free (MqStreamBuf * data)
260 {
261   g_slice_free (MqStreamBuf, data);
262 }
263
264 static SplitMuxOutputCommand *
265 out_cmd_buf_new (void)
266 {
267   return g_slice_new0 (SplitMuxOutputCommand);
268 }
269
270 static void
271 out_cmd_buf_free (SplitMuxOutputCommand * data)
272 {
273   g_slice_free (SplitMuxOutputCommand, data);
274 }
275
276 static void
277 input_gop_free (InputGop * gop)
278 {
279   g_clear_pointer (&gop->start_tc, gst_video_time_code_free);
280   g_slice_free (InputGop, gop);
281 }
282
283 static void
284 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
285 {
286   GObjectClass *gobject_class = (GObjectClass *) klass;
287   GstElementClass *gstelement_class = (GstElementClass *) klass;
288   GstBinClass *gstbin_class = (GstBinClass *) klass;
289
290   gobject_class->set_property = gst_splitmux_sink_set_property;
291   gobject_class->get_property = gst_splitmux_sink_get_property;
292   gobject_class->dispose = gst_splitmux_sink_dispose;
293   gobject_class->finalize = gst_splitmux_sink_finalize;
294
295   gst_element_class_set_static_metadata (gstelement_class,
296       "Split Muxing Bin", "Generic/Bin/Muxer",
297       "Convenience bin that muxes incoming streams into multiple time/size limited files",
298       "Jan Schmidt <jan@centricular.com>");
299
300   gst_element_class_add_static_pad_template (gstelement_class,
301       &video_sink_template);
302   gst_element_class_add_static_pad_template (gstelement_class,
303       &video_aux_sink_template);
304   gst_element_class_add_static_pad_template (gstelement_class,
305       &audio_sink_template);
306   gst_element_class_add_static_pad_template (gstelement_class,
307       &subtitle_sink_template);
308   gst_element_class_add_static_pad_template (gstelement_class,
309       &caption_sink_template);
310
311   gstelement_class->change_state =
312       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
313   gstelement_class->request_new_pad =
314       GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
315   gstelement_class->release_pad =
316       GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
317
318   gstbin_class->handle_message = bus_handler;
319
320   g_object_class_install_property (gobject_class, PROP_LOCATION,
321       g_param_spec_string ("location", "File Output Pattern",
322           "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
323           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324   g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
325       g_param_spec_double ("mux-overhead", "Muxing Overhead",
326           "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
327           DEFAULT_MUXER_OVERHEAD,
328           G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
329
330   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
331       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
332           "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
333           DEFAULT_MAX_SIZE_TIME,
334           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
335           G_PARAM_STATIC_STRINGS));
336   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
337       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
338           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
339           DEFAULT_MAX_SIZE_BYTES,
340           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
341           G_PARAM_STATIC_STRINGS));
342   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
343       g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
344           "Maximum difference in timecode between first and last frame. "
345           "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
346           "Will only be effective if a timecode track is present.", NULL,
347           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
348           G_PARAM_STATIC_STRINGS));
349   g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
350       g_param_spec_boolean ("send-keyframe-requests",
351           "Request keyframes at max-size-time",
352           "Request a keyframe every max-size-time ns to try splitting at that point. "
353           "Needs max-size-bytes to be 0 in order to be effective.",
354           DEFAULT_SEND_KEYFRAME_REQUESTS,
355           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
356           G_PARAM_STATIC_STRINGS));
357   g_object_class_install_property (gobject_class, PROP_MAX_FILES,
358       g_param_spec_uint ("max-files", "Max files",
359           "Maximum number of files to keep on disk. Once the maximum is reached,"
360           "old files start to be deleted to make room for new ones.", 0,
361           G_MAXUINT, DEFAULT_MAX_FILES,
362           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363   g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
364       g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
365           "Allow non-reference streams to be that many ns before the reference"
366           " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
367           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
368           G_PARAM_STATIC_STRINGS));
369
370   g_object_class_install_property (gobject_class, PROP_MUXER,
371       g_param_spec_object ("muxer", "Muxer",
372           "The muxer element to use (NULL = default mp4mux). "
373           "Valid only for async-finalize = FALSE",
374           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375   g_object_class_install_property (gobject_class, PROP_SINK,
376       g_param_spec_object ("sink", "Sink",
377           "The sink element (or element chain) to use (NULL = default filesink). "
378           "Valid only for async-finalize = FALSE",
379           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380
381   g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
382       g_param_spec_boolean ("use-robust-muxing",
383           "Support robust-muxing mode of some muxers",
384           "Check if muxers support robust muxing via the reserved-max-duration and "
385           "reserved-duration-remaining properties and use them if so. "
386           "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
387           " create new fragments if the reserved header space is about to overflow. "
388           "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
389           "manually by the app to a non-zero value for robust muxing to have an effect.",
390           DEFAULT_USE_ROBUST_MUXING,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392
393   g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
394       g_param_spec_boolean ("reset-muxer",
395           "Reset Muxer",
396           "Reset the muxer after each segment. Disabling this will not work for most muxers.",
397           DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
398
399   g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
400       g_param_spec_boolean ("async-finalize",
401           "Finalize fragments asynchronously",
402           "Finalize each fragment asynchronously and start a new one",
403           DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404   g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
405       g_param_spec_string ("muxer-factory", "Muxer factory",
406           "The muxer element factory to use (default = mp4mux). "
407           "Valid only for async-finalize = TRUE",
408           "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
409   /**
410    * GstSplitMuxSink:muxer-preset
411    *
412    * An optional #GstPreset name to use for the muxer. This only has an effect
413    * in `async-finalize=TRUE` mode.
414    *
415    * Since: 1.18
416    */
417   g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
418       g_param_spec_string ("muxer-preset", "Muxer preset",
419           "The muxer preset to use. "
420           "Valid only for async-finalize = TRUE",
421           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422   g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
423       g_param_spec_boxed ("muxer-properties", "Muxer properties",
424           "The muxer element properties to use. "
425           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
426           "Valid only for async-finalize = TRUE",
427           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428   g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
429       g_param_spec_string ("sink-factory", "Sink factory",
430           "The sink element factory to use (default = filesink). "
431           "Valid only for async-finalize = TRUE",
432           "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
433   /**
434    * GstSplitMuxSink:sink-preset
435    *
436    * An optional #GstPreset name to use for the sink. This only has an effect
437    * in `async-finalize=TRUE` mode.
438    *
439    * Since: 1.18
440    */
441   g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
442       g_param_spec_string ("sink-preset", "Sink preset",
443           "The sink preset to use. "
444           "Valid only for async-finalize = TRUE",
445           NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446   g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
447       g_param_spec_boxed ("sink-properties", "Sink properties",
448           "The sink element properties to use. "
449           "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
450           "Valid only for async-finalize = TRUE",
451           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452   g_object_class_install_property (gobject_class, PROP_START_INDEX,
453       g_param_spec_int ("start-index", "Start Index",
454           "Start value of fragment index.",
455           0, G_MAXINT, DEFAULT_START_INDEX,
456           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
457
458   /**
459    * GstSplitMuxSink::muxer-pad-map
460    *
461    * An optional GstStructure that provides a map from splitmuxsink sinkpad
462    * names to muxer pad names they should feed. Splitmuxsink has some default
463    * mapping behaviour to link video to video pads and audio to audio pads
464    * that usually works fine. This property is useful if you need to ensure
465    * a particular mapping to muxed streams.
466    *
467    * The GstStructure contains string fields like so:
468    *   splitmuxsink muxer-pad-map=x-pad-map,video=video_1
469    *
470    * Since: 1.18
471    */
472   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
473       g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
474           "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
475           GST_TYPE_STRUCTURE,
476           (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
477
478   /**
479    * GstSplitMuxSink::format-location:
480    * @splitmux: the #GstSplitMuxSink
481    * @fragment_id: the sequence number of the file to be created
482    *
483    * Returns: the location to be used for the next output file. This must be
484    *    a newly-allocated string which will be freed with g_free() by the
485    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
486    *    g_strdup_printf() or similar functions to allocate it.
487    */
488   signals[SIGNAL_FORMAT_LOCATION] =
489       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
490       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
491
492   /**
493    * GstSplitMuxSink::format-location-full:
494    * @splitmux: the #GstSplitMuxSink
495    * @fragment_id: the sequence number of the file to be created
496    * @first_sample: A #GstSample containing the first buffer
497    *   from the reference stream in the new file
498    *
499    * Returns: the location to be used for the next output file. This must be
500    *    a newly-allocated string which will be freed with g_free() by the
501    *    splitmuxsink element when it no longer needs it, so use g_strdup() or
502    *    g_strdup_printf() or similar functions to allocate it.
503    *
504    * Since: 1.12
505    */
506   signals[SIGNAL_FORMAT_LOCATION_FULL] =
507       g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
508       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
509       GST_TYPE_SAMPLE);
510
511   /**
512    * GstSplitMuxSink::split-now:
513    * @splitmux: the #GstSplitMuxSink
514    *
515    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
516    * The current GOP will be output to the new file.
517    *
518    * Since: 1.14
519    */
520   signals[SIGNAL_SPLIT_NOW] =
521       g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
522       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
523       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
524       G_TYPE_NONE, 0);
525
526   /**
527    * GstSplitMuxSink::split-after:
528    * @splitmux: the #GstSplitMuxSink
529    *
530    * When called by the user, this action signal splits the video file (and begins a new one) immediately.
531    * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
532    *
533    * Since: 1.16
534    */
535   signals[SIGNAL_SPLIT_AFTER] =
536       g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
537       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
538       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
539       G_TYPE_NONE, 0);
540
541   /**
542    * GstSplitMuxSink::split-at-running-time:
543    * @splitmux: the #GstSplitMuxSink
544    *
545    * When called by the user, this action signal splits the video file (and
546    * begins a new one) as soon as the given running time is reached. If this
547    * action signal is called multiple times, running times are queued up and
548    * processed in the order they were given.
549    *
550    * Note that this is prone to race conditions, where said running time is
551    * reached and surpassed before we had a chance to split. The file will
552    * still split immediately, but in order to make sure that the split doesn't
553    * happen too late, it is recommended to call this action signal from
554    * something that will prevent further buffers from flowing into
555    * splitmuxsink before the split is completed, such as a pad probe before
556    * splitmuxsink.
557    *
558    *
559    * Since: 1.16
560    */
561   signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
562       g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
563       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
564       G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
565       NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
566
567   /**
568    * GstSplitMuxSink::muxer-added:
569    * @splitmux: the #GstSplitMuxSink
570    * @muxer: the newly added muxer element
571    *
572    * Since: 1.14
573    */
574   signals[SIGNAL_MUXER_ADDED] =
575       g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
576       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
577
578   /**
579    * GstSplitMuxSink::sink-added:
580    * @splitmux: the #GstSplitMuxSink
581    * @sink: the newly added sink element
582    *
583    * Since: 1.14
584    */
585   signals[SIGNAL_SINK_ADDED] =
586       g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
587       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
588
589   klass->split_now = split_now;
590   klass->split_after = split_after;
591   klass->split_at_running_time = split_at_running_time;
592 }
593
594 static void
595 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
596 {
597   g_mutex_init (&splitmux->lock);
598   g_mutex_init (&splitmux->state_lock);
599   g_cond_init (&splitmux->input_cond);
600   g_cond_init (&splitmux->output_cond);
601   g_queue_init (&splitmux->out_cmd_q);
602
603   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
604   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
605   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
606   splitmux->max_files = DEFAULT_MAX_FILES;
607   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
608   splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
609   splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
610   splitmux->reset_muxer = DEFAULT_RESET_MUXER;
611
612   splitmux->threshold_timecode_str = NULL;
613
614   splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
615   splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
616   splitmux->muxer_properties = NULL;
617   splitmux->sink_factory = g_strdup (DEFAULT_SINK);
618   splitmux->sink_properties = NULL;
619
620   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
621   splitmux->split_requested = FALSE;
622   splitmux->do_split_next_gop = FALSE;
623   splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
624   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
625
626   g_queue_init (&splitmux->pending_input_gops);
627 }
628
629 static void
630 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
631 {
632   if (splitmux->muxer) {
633     gst_element_set_locked_state (splitmux->muxer, TRUE);
634     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
635     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
636   }
637   if (splitmux->active_sink) {
638     gst_element_set_locked_state (splitmux->active_sink, TRUE);
639     gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
640     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
641   }
642
643   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
644 }
645
646 static void
647 gst_splitmux_sink_dispose (GObject * object)
648 {
649   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
650
651   /* Calling parent dispose invalidates all child pointers */
652   splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
653
654   G_OBJECT_CLASS (parent_class)->dispose (object);
655 }
656
657 static void
658 gst_splitmux_sink_finalize (GObject * object)
659 {
660   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
661
662   g_cond_clear (&splitmux->input_cond);
663   g_cond_clear (&splitmux->output_cond);
664   g_mutex_clear (&splitmux->lock);
665   g_mutex_clear (&splitmux->state_lock);
666   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
667   g_queue_clear (&splitmux->out_cmd_q);
668   g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
669   g_queue_clear (&splitmux->pending_input_gops);
670
671   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
672
673   if (splitmux->muxerpad_map)
674     gst_structure_free (splitmux->muxerpad_map);
675
676   if (splitmux->provided_sink)
677     gst_object_unref (splitmux->provided_sink);
678   if (splitmux->provided_muxer)
679     gst_object_unref (splitmux->provided_muxer);
680
681   if (splitmux->muxer_factory)
682     g_free (splitmux->muxer_factory);
683   if (splitmux->muxer_preset)
684     g_free (splitmux->muxer_preset);
685   if (splitmux->muxer_properties)
686     gst_structure_free (splitmux->muxer_properties);
687   if (splitmux->sink_factory)
688     g_free (splitmux->sink_factory);
689   if (splitmux->sink_preset)
690     g_free (splitmux->sink_preset);
691   if (splitmux->sink_properties)
692     gst_structure_free (splitmux->sink_properties);
693
694   if (splitmux->threshold_timecode_str)
695     g_free (splitmux->threshold_timecode_str);
696   if (splitmux->tc_interval)
697     gst_video_time_code_interval_free (splitmux->tc_interval);
698
699   if (splitmux->times_to_split)
700     gst_queue_array_free (splitmux->times_to_split);
701
702   g_free (splitmux->location);
703
704   /* Make sure to free any un-released contexts. There should not be any,
705    * because the dispose will have freed all request pads though */
706   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
707   g_list_free (splitmux->contexts);
708
709   G_OBJECT_CLASS (parent_class)->finalize (object);
710 }
711
712 /*
713  * Set any time threshold to the muxer, if it has
714  * reserved-max-duration and reserved-duration-remaining
715  * properties. Called when creating/claiming the muxer
716  * in create_elements() */
717 static void
718 update_muxer_properties (GstSplitMuxSink * sink)
719 {
720   GObjectClass *klass;
721   GstClockTime threshold_time;
722
723   sink->muxer_has_reserved_props = FALSE;
724   if (sink->muxer == NULL)
725     return;
726   klass = G_OBJECT_GET_CLASS (sink->muxer);
727   if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
728     return;
729   if (g_object_class_find_property (klass,
730           "reserved-duration-remaining") == NULL)
731     return;
732   sink->muxer_has_reserved_props = TRUE;
733
734   GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
735       GST_TIME_ARGS (sink->threshold_time));
736   GST_OBJECT_LOCK (sink);
737   threshold_time = sink->threshold_time;
738   GST_OBJECT_UNLOCK (sink);
739
740   if (threshold_time > 0) {
741     /* Tell the muxer how much space to reserve */
742     GstClockTime muxer_threshold = threshold_time;
743     g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
744   }
745 }
746
747 static void
748 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
749     const GValue * value, GParamSpec * pspec)
750 {
751   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
752
753   switch (prop_id) {
754     case PROP_LOCATION:{
755       GST_OBJECT_LOCK (splitmux);
756       g_free (splitmux->location);
757       splitmux->location = g_value_dup_string (value);
758       GST_OBJECT_UNLOCK (splitmux);
759       break;
760     }
761     case PROP_START_INDEX:
762       GST_OBJECT_LOCK (splitmux);
763       splitmux->start_index = g_value_get_int (value);
764       GST_OBJECT_UNLOCK (splitmux);
765       break;
766     case PROP_MAX_SIZE_BYTES:
767       GST_OBJECT_LOCK (splitmux);
768       splitmux->threshold_bytes = g_value_get_uint64 (value);
769       GST_OBJECT_UNLOCK (splitmux);
770       break;
771     case PROP_MAX_SIZE_TIME:
772       GST_OBJECT_LOCK (splitmux);
773       splitmux->threshold_time = g_value_get_uint64 (value);
774       GST_OBJECT_UNLOCK (splitmux);
775       break;
776     case PROP_MAX_SIZE_TIMECODE:
777       GST_OBJECT_LOCK (splitmux);
778       g_free (splitmux->threshold_timecode_str);
779       /* will be calculated later */
780       g_clear_pointer (&splitmux->tc_interval,
781           gst_video_time_code_interval_free);
782
783       splitmux->threshold_timecode_str = g_value_dup_string (value);
784       if (splitmux->threshold_timecode_str) {
785         splitmux->tc_interval =
786             gst_video_time_code_interval_new_from_string
787             (splitmux->threshold_timecode_str);
788         if (!splitmux->tc_interval) {
789           g_warning ("Wrong timecode string %s",
790               splitmux->threshold_timecode_str);
791           g_free (splitmux->threshold_timecode_str);
792           splitmux->threshold_timecode_str = NULL;
793         }
794       }
795       splitmux->next_fragment_start_tc_time = GST_CLOCK_TIME_NONE;
796       if (splitmux->tc_interval && splitmux->fragment_start_tc) {
797         splitmux->next_fragment_start_tc_time =
798             calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
799             splitmux->fragment_start_time, NULL);
800       }
801       GST_OBJECT_UNLOCK (splitmux);
802       break;
803     case PROP_SEND_KEYFRAME_REQUESTS:
804       GST_OBJECT_LOCK (splitmux);
805       splitmux->send_keyframe_requests = g_value_get_boolean (value);
806       GST_OBJECT_UNLOCK (splitmux);
807       break;
808     case PROP_MAX_FILES:
809       GST_OBJECT_LOCK (splitmux);
810       splitmux->max_files = g_value_get_uint (value);
811       GST_OBJECT_UNLOCK (splitmux);
812       break;
813     case PROP_MUXER_OVERHEAD:
814       GST_OBJECT_LOCK (splitmux);
815       splitmux->mux_overhead = g_value_get_double (value);
816       GST_OBJECT_UNLOCK (splitmux);
817       break;
818     case PROP_USE_ROBUST_MUXING:
819       GST_OBJECT_LOCK (splitmux);
820       splitmux->use_robust_muxing = g_value_get_boolean (value);
821       GST_OBJECT_UNLOCK (splitmux);
822       if (splitmux->use_robust_muxing)
823         update_muxer_properties (splitmux);
824       break;
825     case PROP_ALIGNMENT_THRESHOLD:
826       GST_OBJECT_LOCK (splitmux);
827       splitmux->alignment_threshold = g_value_get_uint64 (value);
828       GST_OBJECT_UNLOCK (splitmux);
829       break;
830     case PROP_SINK:
831       GST_OBJECT_LOCK (splitmux);
832       gst_clear_object (&splitmux->provided_sink);
833       splitmux->provided_sink = g_value_get_object (value);
834       if (splitmux->provided_sink)
835         gst_object_ref_sink (splitmux->provided_sink);
836       GST_OBJECT_UNLOCK (splitmux);
837       break;
838     case PROP_MUXER:
839       GST_OBJECT_LOCK (splitmux);
840       gst_clear_object (&splitmux->provided_muxer);
841       splitmux->provided_muxer = g_value_get_object (value);
842       if (splitmux->provided_muxer)
843         gst_object_ref_sink (splitmux->provided_muxer);
844       GST_OBJECT_UNLOCK (splitmux);
845       break;
846     case PROP_RESET_MUXER:
847       GST_OBJECT_LOCK (splitmux);
848       splitmux->reset_muxer = g_value_get_boolean (value);
849       GST_OBJECT_UNLOCK (splitmux);
850       break;
851     case PROP_ASYNC_FINALIZE:
852       GST_OBJECT_LOCK (splitmux);
853       splitmux->async_finalize = g_value_get_boolean (value);
854       GST_OBJECT_UNLOCK (splitmux);
855       break;
856     case PROP_MUXER_FACTORY:
857       GST_OBJECT_LOCK (splitmux);
858       if (splitmux->muxer_factory)
859         g_free (splitmux->muxer_factory);
860       splitmux->muxer_factory = g_value_dup_string (value);
861       GST_OBJECT_UNLOCK (splitmux);
862       break;
863     case PROP_MUXER_PRESET:
864       GST_OBJECT_LOCK (splitmux);
865       if (splitmux->muxer_preset)
866         g_free (splitmux->muxer_preset);
867       splitmux->muxer_preset = g_value_dup_string (value);
868       GST_OBJECT_UNLOCK (splitmux);
869       break;
870     case PROP_MUXER_PROPERTIES:
871       GST_OBJECT_LOCK (splitmux);
872       if (splitmux->muxer_properties)
873         gst_structure_free (splitmux->muxer_properties);
874       if (gst_value_get_structure (value))
875         splitmux->muxer_properties =
876             gst_structure_copy (gst_value_get_structure (value));
877       else
878         splitmux->muxer_properties = NULL;
879       GST_OBJECT_UNLOCK (splitmux);
880       break;
881     case PROP_SINK_FACTORY:
882       GST_OBJECT_LOCK (splitmux);
883       if (splitmux->sink_factory)
884         g_free (splitmux->sink_factory);
885       splitmux->sink_factory = g_value_dup_string (value);
886       GST_OBJECT_UNLOCK (splitmux);
887       break;
888     case PROP_SINK_PRESET:
889       GST_OBJECT_LOCK (splitmux);
890       if (splitmux->sink_preset)
891         g_free (splitmux->sink_preset);
892       splitmux->sink_preset = g_value_dup_string (value);
893       GST_OBJECT_UNLOCK (splitmux);
894       break;
895     case PROP_SINK_PROPERTIES:
896       GST_OBJECT_LOCK (splitmux);
897       if (splitmux->sink_properties)
898         gst_structure_free (splitmux->sink_properties);
899       if (gst_value_get_structure (value))
900         splitmux->sink_properties =
901             gst_structure_copy (gst_value_get_structure (value));
902       else
903         splitmux->sink_properties = NULL;
904       GST_OBJECT_UNLOCK (splitmux);
905       break;
906     case PROP_MUXERPAD_MAP:
907     {
908       const GstStructure *s = gst_value_get_structure (value);
909       GST_SPLITMUX_LOCK (splitmux);
910       if (splitmux->muxerpad_map) {
911         gst_structure_free (splitmux->muxerpad_map);
912       }
913       if (s)
914         splitmux->muxerpad_map = gst_structure_copy (s);
915       else
916         splitmux->muxerpad_map = NULL;
917       GST_SPLITMUX_UNLOCK (splitmux);
918       break;
919     }
920     default:
921       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
922       break;
923   }
924 }
925
926 static void
927 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
928     GValue * value, GParamSpec * pspec)
929 {
930   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
931
932   switch (prop_id) {
933     case PROP_LOCATION:
934       GST_OBJECT_LOCK (splitmux);
935       g_value_set_string (value, splitmux->location);
936       GST_OBJECT_UNLOCK (splitmux);
937       break;
938     case PROP_START_INDEX:
939       GST_OBJECT_LOCK (splitmux);
940       g_value_set_int (value, splitmux->start_index);
941       GST_OBJECT_UNLOCK (splitmux);
942       break;
943     case PROP_MAX_SIZE_BYTES:
944       GST_OBJECT_LOCK (splitmux);
945       g_value_set_uint64 (value, splitmux->threshold_bytes);
946       GST_OBJECT_UNLOCK (splitmux);
947       break;
948     case PROP_MAX_SIZE_TIME:
949       GST_OBJECT_LOCK (splitmux);
950       g_value_set_uint64 (value, splitmux->threshold_time);
951       GST_OBJECT_UNLOCK (splitmux);
952       break;
953     case PROP_MAX_SIZE_TIMECODE:
954       GST_OBJECT_LOCK (splitmux);
955       g_value_set_string (value, splitmux->threshold_timecode_str);
956       GST_OBJECT_UNLOCK (splitmux);
957       break;
958     case PROP_SEND_KEYFRAME_REQUESTS:
959       GST_OBJECT_LOCK (splitmux);
960       g_value_set_boolean (value, splitmux->send_keyframe_requests);
961       GST_OBJECT_UNLOCK (splitmux);
962       break;
963     case PROP_MAX_FILES:
964       GST_OBJECT_LOCK (splitmux);
965       g_value_set_uint (value, splitmux->max_files);
966       GST_OBJECT_UNLOCK (splitmux);
967       break;
968     case PROP_MUXER_OVERHEAD:
969       GST_OBJECT_LOCK (splitmux);
970       g_value_set_double (value, splitmux->mux_overhead);
971       GST_OBJECT_UNLOCK (splitmux);
972       break;
973     case PROP_USE_ROBUST_MUXING:
974       GST_OBJECT_LOCK (splitmux);
975       g_value_set_boolean (value, splitmux->use_robust_muxing);
976       GST_OBJECT_UNLOCK (splitmux);
977       break;
978     case PROP_ALIGNMENT_THRESHOLD:
979       GST_OBJECT_LOCK (splitmux);
980       g_value_set_uint64 (value, splitmux->alignment_threshold);
981       GST_OBJECT_UNLOCK (splitmux);
982       break;
983     case PROP_SINK:
984       GST_OBJECT_LOCK (splitmux);
985       g_value_set_object (value, splitmux->provided_sink);
986       GST_OBJECT_UNLOCK (splitmux);
987       break;
988     case PROP_MUXER:
989       GST_OBJECT_LOCK (splitmux);
990       g_value_set_object (value, splitmux->provided_muxer);
991       GST_OBJECT_UNLOCK (splitmux);
992       break;
993     case PROP_RESET_MUXER:
994       GST_OBJECT_LOCK (splitmux);
995       g_value_set_boolean (value, splitmux->reset_muxer);
996       GST_OBJECT_UNLOCK (splitmux);
997       break;
998     case PROP_ASYNC_FINALIZE:
999       GST_OBJECT_LOCK (splitmux);
1000       g_value_set_boolean (value, splitmux->async_finalize);
1001       GST_OBJECT_UNLOCK (splitmux);
1002       break;
1003     case PROP_MUXER_FACTORY:
1004       GST_OBJECT_LOCK (splitmux);
1005       g_value_set_string (value, splitmux->muxer_factory);
1006       GST_OBJECT_UNLOCK (splitmux);
1007       break;
1008     case PROP_MUXER_PRESET:
1009       GST_OBJECT_LOCK (splitmux);
1010       g_value_set_string (value, splitmux->muxer_preset);
1011       GST_OBJECT_UNLOCK (splitmux);
1012       break;
1013     case PROP_MUXER_PROPERTIES:
1014       GST_OBJECT_LOCK (splitmux);
1015       gst_value_set_structure (value, splitmux->muxer_properties);
1016       GST_OBJECT_UNLOCK (splitmux);
1017       break;
1018     case PROP_SINK_FACTORY:
1019       GST_OBJECT_LOCK (splitmux);
1020       g_value_set_string (value, splitmux->sink_factory);
1021       GST_OBJECT_UNLOCK (splitmux);
1022       break;
1023     case PROP_SINK_PRESET:
1024       GST_OBJECT_LOCK (splitmux);
1025       g_value_set_string (value, splitmux->sink_preset);
1026       GST_OBJECT_UNLOCK (splitmux);
1027       break;
1028     case PROP_SINK_PROPERTIES:
1029       GST_OBJECT_LOCK (splitmux);
1030       gst_value_set_structure (value, splitmux->sink_properties);
1031       GST_OBJECT_UNLOCK (splitmux);
1032       break;
1033     case PROP_MUXERPAD_MAP:
1034       GST_SPLITMUX_LOCK (splitmux);
1035       gst_value_set_structure (value, splitmux->muxerpad_map);
1036       GST_SPLITMUX_UNLOCK (splitmux);
1037       break;
1038     default:
1039       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1040       break;
1041   }
1042 }
1043
1044 /* Convenience function */
1045 static inline GstClockTimeDiff
1046 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1047 {
1048   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1049
1050   if (GST_CLOCK_TIME_IS_VALID (val)) {
1051     gboolean sign =
1052         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1053     if (sign > 0)
1054       res = val;
1055     else if (sign < 0)
1056       res = -val;
1057   }
1058   return res;
1059 }
1060
1061 static void
1062 mq_stream_ctx_reset (MqStreamCtx * ctx)
1063 {
1064   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1065   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1066   ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1067   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1068   g_queue_clear (&ctx->queued_bufs);
1069 }
1070
1071 static MqStreamCtx *
1072 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1073 {
1074   MqStreamCtx *ctx;
1075
1076   ctx = g_new0 (MqStreamCtx, 1);
1077   ctx->splitmux = splitmux;
1078   g_queue_init (&ctx->queued_bufs);
1079   mq_stream_ctx_reset (ctx);
1080
1081   return ctx;
1082 }
1083
1084 static void
1085 mq_stream_ctx_free (MqStreamCtx * ctx)
1086 {
1087   if (ctx->q) {
1088     GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1089
1090     g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1091
1092     if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1093       gst_element_set_locked_state (ctx->q, TRUE);
1094       gst_element_set_state (ctx->q, GST_STATE_NULL);
1095       gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1096       gst_object_unref (parent);
1097     }
1098     gst_object_unref (ctx->q);
1099   }
1100   gst_object_unref (ctx->sinkpad);
1101   gst_object_unref (ctx->srcpad);
1102   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1103   g_queue_clear (&ctx->queued_bufs);
1104   g_free (ctx);
1105 }
1106
1107 static void
1108 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1109     GstElement * sink)
1110 {
1111   gchar *location = NULL;
1112   GstMessage *msg;
1113   const gchar *msg_name = opened ?
1114       "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1115   GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1116
1117   if (!opened) {
1118     GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1119     if (rtime)
1120       running_time = *rtime;
1121   }
1122
1123   if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1124           "location") != NULL)
1125     g_object_get (sink, "location", &location, NULL);
1126
1127   GST_DEBUG_OBJECT (splitmux,
1128       "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1129       msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1130
1131   /* If it's in the middle of a teardown, the reference_ctc might have become
1132    * NULL */
1133   if (splitmux->reference_ctx) {
1134     msg = gst_message_new_element (GST_OBJECT (splitmux),
1135         gst_structure_new (msg_name,
1136             "location", G_TYPE_STRING, location,
1137             "running-time", GST_TYPE_CLOCK_TIME, running_time,
1138             "sink", GST_TYPE_ELEMENT, sink, NULL));
1139     gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1140   }
1141
1142   g_free (location);
1143 }
1144
1145 static void
1146 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1147 {
1148   GstEvent *eos;
1149   GstPad *pad;
1150   MqStreamCtx *ctx;
1151
1152   eos = gst_event_new_eos ();
1153   pad = helper->pad;
1154   ctx = helper->ctx;
1155
1156   GST_SPLITMUX_LOCK (splitmux);
1157   if (!pad)
1158     pad = gst_pad_get_peer (ctx->srcpad);
1159   GST_SPLITMUX_UNLOCK (splitmux);
1160
1161   gst_pad_send_event (pad, eos);
1162   GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1163
1164   gst_object_unref (pad);
1165   g_free (helper);
1166 }
1167
1168 /* Called with lock held, drops the lock to send EOS to the
1169  * pad
1170  */
1171 static void
1172 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1173 {
1174   GstEvent *eos;
1175   GstPad *pad;
1176
1177   eos = gst_event_new_eos ();
1178   pad = gst_pad_get_peer (ctx->srcpad);
1179
1180   ctx->out_eos = TRUE;
1181
1182   GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1183   GST_SPLITMUX_UNLOCK (splitmux);
1184   gst_pad_send_event (pad, eos);
1185   GST_SPLITMUX_LOCK (splitmux);
1186
1187   gst_object_unref (pad);
1188 }
1189
1190 /* Called with lock held. Schedules an EOS event to the ctx pad
1191  * to happen in another thread */
1192 static void
1193 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1194 {
1195   AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1196   GstPad *srcpad, *sinkpad;
1197
1198   srcpad = ctx->srcpad;
1199   sinkpad = gst_pad_get_peer (srcpad);
1200
1201   helper->ctx = ctx;
1202   helper->pad = sinkpad;        /* Takes the reference */
1203
1204   ctx->out_eos_async_done = TRUE;
1205
1206   /* There used to be a bug here, where we had to explicitly remove
1207    * the SINK flag so that GstBin would ignore it for EOS purposes.
1208    * That fixed a race where if splitmuxsink really reaches EOS
1209    * before an asynchronous background element has finished, then
1210    * the bin wouldn't actually send EOS to the pipeline. Even after
1211    * finishing and removing the old element, the bin didn't re-check
1212    * EOS status on removing a SINK element. That bug was fixed
1213    * in core. */
1214   GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1215       sinkpad, ctx);
1216
1217   g_assert_nonnull (helper->pad);
1218   gst_element_call_async (GST_ELEMENT (splitmux),
1219       (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1220 }
1221
1222 /* Called with lock held. TRUE iff all contexts have a
1223  * pending (or delivered) async eos event */
1224 static gboolean
1225 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1226 {
1227   gboolean ret = TRUE;
1228   GList *item;
1229
1230   for (item = splitmux->contexts; item; item = item->next) {
1231     MqStreamCtx *ctx = item->data;
1232     ret &= ctx->out_eos_async_done;
1233   }
1234   return ret;
1235 }
1236
1237 /* Called with splitmux lock held to check if this output
1238  * context needs to sleep to wait for the release of the
1239  * next GOP, or to send EOS to close out the current file
1240  */
1241 static GstFlowReturn
1242 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1243 {
1244   if (ctx->caps_change)
1245     return GST_FLOW_OK;
1246
1247   do {
1248     /* When first starting up, the reference stream has to output
1249      * the first buffer to prepare the muxer and sink */
1250     gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1251     GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1252
1253     if (my_max_out_running_time != GST_CLOCK_STIME_NONE
1254         && my_max_out_running_time != G_MAXINT64) {
1255       my_max_out_running_time -= splitmux->alignment_threshold;
1256       GST_LOG_OBJECT (ctx->srcpad,
1257           "Max out running time currently %" GST_STIME_FORMAT
1258           ", with threshold applied it is %" GST_STIME_FORMAT,
1259           GST_STIME_ARGS (splitmux->max_out_running_time),
1260           GST_STIME_ARGS (my_max_out_running_time));
1261     }
1262
1263     if (ctx->flushing
1264         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1265       return GST_FLOW_FLUSHING;
1266
1267     GST_LOG_OBJECT (ctx->srcpad,
1268         "Checking running time %" GST_STIME_FORMAT " against max %"
1269         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1270         GST_STIME_ARGS (my_max_out_running_time));
1271
1272     if (can_output) {
1273       if (splitmux->max_out_running_time != GST_CLOCK_STIME_NONE &&
1274           ctx->out_running_time < my_max_out_running_time) {
1275         return GST_FLOW_OK;
1276       }
1277
1278       switch (splitmux->output_state) {
1279         case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1280           /* We only get here if we've finished outputting a GOP and need to know
1281            * what to do next */
1282           splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1283           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1284           continue;
1285
1286         case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1287         case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1288           /* We've reached the max out running_time to get here, so end this file now */
1289           if (ctx->out_eos == FALSE) {
1290             if (splitmux->async_finalize) {
1291               /* We must set EOS asynchronously at this point. We cannot defer
1292                * it, because we need all contexts to wake up, for the
1293                * reference context to eventually give us something at
1294                * START_NEXT_FILE. Otherwise, collectpads might choose another
1295                * context to give us the first buffer, and format-location-full
1296                * will not contain a valid sample. */
1297               g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1298                   GINT_TO_POINTER (1));
1299               eos_context_async (ctx, splitmux);
1300               if (all_contexts_are_async_eos (splitmux)) {
1301                 GST_INFO_OBJECT (splitmux,
1302                     "All contexts are async_eos. Moving to the next file.");
1303                 /* We can start the next file once we've asked each pad to go EOS */
1304                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1305                 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1306                 continue;
1307               }
1308             } else {
1309               send_eos (splitmux, ctx);
1310               continue;
1311             }
1312           } else {
1313             GST_INFO_OBJECT (splitmux,
1314                 "At end-of-file state, but context %p is already EOS", ctx);
1315           }
1316           break;
1317         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1318           if (ctx->is_reference) {
1319             GstFlowReturn ret = GST_FLOW_OK;
1320
1321             /* Special handling on the reference ctx to start new fragments
1322              * and collect commands from the command queue */
1323             /* drops the splitmux lock briefly: */
1324             /* We must have reference ctx in order for format-location-full to
1325              * have a sample */
1326             ret = start_next_fragment (splitmux, ctx);
1327             if (ret != GST_FLOW_OK)
1328               return ret;
1329
1330             continue;
1331           }
1332           break;
1333         case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1334           do {
1335             SplitMuxOutputCommand *cmd =
1336                 g_queue_pop_tail (&splitmux->out_cmd_q);
1337             if (cmd != NULL) {
1338               /* If we pop the last command, we need to make our queues bigger */
1339               if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1340                 grow_blocked_queues (splitmux);
1341
1342               if (cmd->start_new_fragment) {
1343                 if (splitmux->muxed_out_bytes > 0) {
1344                   GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1345                   splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1346                 } else {
1347                   GST_DEBUG_OBJECT (splitmux,
1348                       "Got cmd to start new fragment, but fragment is empty - ignoring.");
1349                 }
1350               } else {
1351                 GST_DEBUG_OBJECT (splitmux,
1352                     "Got new output cmd for time %" GST_STIME_FORMAT,
1353                     GST_STIME_ARGS (cmd->max_output_ts));
1354
1355                 /* Extend the output range immediately */
1356                 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE
1357                     || cmd->max_output_ts > splitmux->max_out_running_time)
1358                   splitmux->max_out_running_time = cmd->max_output_ts;
1359                 GST_DEBUG_OBJECT (splitmux,
1360                     "Max out running time now %" GST_STIME_FORMAT,
1361                     GST_STIME_ARGS (splitmux->max_out_running_time));
1362                 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1363               }
1364               GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1365
1366               out_cmd_buf_free (cmd);
1367               break;
1368             } else {
1369               GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1370             }
1371           } while (!ctx->flushing && splitmux->output_state ==
1372               SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1373           /* loop and re-check the state */
1374           continue;
1375         }
1376         case SPLITMUX_OUTPUT_STATE_STOPPED:
1377           return GST_FLOW_FLUSHING;
1378       }
1379     } else {
1380       GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1381     }
1382
1383     GST_INFO_OBJECT (ctx->srcpad,
1384         "Sleeping for running time %"
1385         GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1386         GST_STIME_ARGS (ctx->out_running_time),
1387         GST_STIME_ARGS (splitmux->max_out_running_time));
1388     GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1389     GST_INFO_OBJECT (ctx->srcpad,
1390         "Woken for new max running time %" GST_STIME_FORMAT,
1391         GST_STIME_ARGS (splitmux->max_out_running_time));
1392   }
1393   while (1);
1394
1395   return GST_FLOW_OK;
1396 }
1397
1398 static GstClockTime
1399 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1400     const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1401     GstVideoTimeCode ** next_tc)
1402 {
1403   GstVideoTimeCode *target_tc;
1404   GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1405
1406   if (cur_tc == NULL || splitmux->tc_interval == NULL)
1407     return GST_CLOCK_TIME_NONE;
1408
1409   target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1410   if (!target_tc) {
1411     GST_ELEMENT_ERROR (splitmux,
1412         STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1413     return GST_CLOCK_TIME_NONE;
1414   }
1415
1416   /* Convert to ns */
1417   target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1418   cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1419
1420   /* Add running_time, accounting for wraparound. */
1421   if (target_tc_time >= cur_tc_time) {
1422     next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1423   } else {
1424     GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1425
1426     if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1427         (cur_tc->config.fps_d == 1001)) {
1428       /* Checking fps_d is probably unneeded, but better safe than sorry
1429        * (e.g. someone accidentally set a flag) */
1430       GstVideoTimeCode *tc_for_offset;
1431
1432       /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1433        * but slightly less. Calculate that duration from a fake timecode. The
1434        * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1435        * is to add one frame to 23:59:59;29 */
1436       tc_for_offset =
1437           gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1438           NULL, cur_tc->config.flags, 23, 59, 59,
1439           cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1440       day_in_ns =
1441           gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1442           gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1443           cur_tc->config.fps_n);
1444       gst_video_time_code_free (tc_for_offset);
1445     }
1446     next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1447   }
1448
1449 #ifndef GST_DISABLE_GST_DEBUG
1450   {
1451     gchar *next_max_tc_str, *cur_tc_str;
1452
1453     cur_tc_str = gst_video_time_code_to_string (cur_tc);
1454     next_max_tc_str = gst_video_time_code_to_string (target_tc);
1455
1456     GST_INFO_OBJECT (splitmux, "Next max timecode %s time: %" GST_TIME_FORMAT
1457         " from ref timecode %s time: %" GST_TIME_FORMAT,
1458         next_max_tc_str,
1459         GST_TIME_ARGS (next_max_tc_time),
1460         cur_tc_str, GST_TIME_ARGS (cur_tc_time));
1461
1462     g_free (next_max_tc_str);
1463     g_free (cur_tc_str);
1464   }
1465 #endif
1466
1467   if (next_tc)
1468     *next_tc = target_tc;
1469   else
1470     gst_video_time_code_free (target_tc);
1471
1472   return next_max_tc_time;
1473 }
1474
1475 static gboolean
1476 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1477     GstClockTimeDiff running_time_dts)
1478 {
1479   GstEvent *ev;
1480   GstClockTime target_time;
1481   gboolean timecode_based = FALSE;
1482   GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1483   GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1484   GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1485   GstClockTime tc_rounding_error = 5 * GST_USECOND;
1486   InputGop *newest_gop = NULL;
1487   GList *l;
1488
1489   if (!splitmux->send_keyframe_requests)
1490     return TRUE;
1491
1492   /* Find the newest GOP where we passed in DTS the start PTS */
1493   for (l = splitmux->pending_input_gops.tail; l; l = l->prev) {
1494     InputGop *tmp = l->data;
1495
1496     GST_TRACE_OBJECT (splitmux,
1497         "Having pending input GOP with start PTS %" GST_STIME_FORMAT
1498         " and start time %" GST_STIME_FORMAT,
1499         GST_STIME_ARGS (tmp->start_time_pts), GST_STIME_ARGS (tmp->start_time));
1500
1501     if (tmp->sent_fku) {
1502       GST_DEBUG_OBJECT (splitmux,
1503           "Already checked for a keyframe request for this GOP");
1504       return TRUE;
1505     }
1506
1507     if (running_time_dts == GST_CLOCK_STIME_NONE ||
1508         tmp->start_time_pts == GST_CLOCK_STIME_NONE ||
1509         running_time_dts >= tmp->start_time_pts) {
1510       GST_DEBUG_OBJECT (splitmux,
1511           "Using GOP with start PTS %" GST_STIME_FORMAT " and start time %"
1512           GST_STIME_FORMAT, GST_STIME_ARGS (tmp->start_time_pts),
1513           GST_STIME_ARGS (tmp->start_time));
1514       newest_gop = tmp;
1515       break;
1516     }
1517   }
1518
1519   if (!newest_gop) {
1520     GST_DEBUG_OBJECT (splitmux, "Have no complete enough pending input GOP");
1521     return TRUE;
1522   }
1523
1524   if (splitmux->tc_interval) {
1525     if (newest_gop->start_tc
1526         && gst_video_time_code_is_valid (newest_gop->start_tc)) {
1527       GstVideoTimeCode *next_tc = NULL;
1528       max_tc_time =
1529           calculate_next_max_timecode (splitmux, newest_gop->start_tc,
1530           newest_gop->start_time, &next_tc);
1531
1532       /* calculate the next expected keyframe time to prevent too early fku
1533        * event */
1534       if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1535         next_max_tc_time =
1536             calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1537       }
1538       if (next_tc)
1539         gst_video_time_code_free (next_tc);
1540
1541       timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1542           GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1543     } else {
1544       /* This can happen in the presence of GAP events that trigger
1545        * a new fragment start */
1546       GST_WARNING_OBJECT (splitmux,
1547           "No buffer available to calculate next timecode");
1548     }
1549   }
1550
1551   if ((splitmux->threshold_time == 0 && !timecode_based)
1552       || splitmux->threshold_bytes != 0)
1553     return TRUE;
1554
1555   if (timecode_based) {
1556     /* We might have rounding errors: aim slightly earlier */
1557     if (max_tc_time >= tc_rounding_error) {
1558       target_time = max_tc_time - tc_rounding_error;
1559     } else {
1560       /* unreliable target time */
1561       GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1562           " is smaller than allowed rounding error, set it to zero",
1563           GST_TIME_ARGS (max_tc_time));
1564       target_time = 0;
1565     }
1566
1567     if (next_max_tc_time >= tc_rounding_error) {
1568       next_fku_time = next_max_tc_time - tc_rounding_error;
1569     } else {
1570       /* unreliable target time */
1571       GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1572           " is smaller than allowed rounding error, set it to zero",
1573           GST_TIME_ARGS (next_max_tc_time));
1574       next_fku_time = 0;
1575     }
1576   } else {
1577     target_time = newest_gop->start_time + splitmux->threshold_time;
1578   }
1579
1580   if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1581     GstClockTime allowed_time = splitmux->next_fku_time;
1582
1583     if (timecode_based) {
1584       if (allowed_time >= tc_rounding_error) {
1585         allowed_time -= tc_rounding_error;
1586       } else {
1587         /* unreliable next force key unit time */
1588         GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1589             GST_TIME_FORMAT
1590             " is smaller than allowed rounding error, set it to zero",
1591             GST_TIME_ARGS (splitmux->next_fku_time));
1592         allowed_time = 0;
1593       }
1594     }
1595
1596     if (target_time < allowed_time) {
1597       GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1598           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1599           ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1600           GST_TIME_ARGS (target_time),
1601           GST_TIME_ARGS (splitmux->next_fku_time),
1602           GST_TIME_ARGS (allowed_time));
1603
1604       return TRUE;
1605     } else if (allowed_time != splitmux->next_fku_time &&
1606         target_time < splitmux->next_fku_time) {
1607       GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1608           " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1609           ", but the difference is smaller than allowed rounding error",
1610           GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1611     }
1612   }
1613
1614   if (!timecode_based) {
1615     next_fku_time = target_time + splitmux->threshold_time;
1616   }
1617
1618   GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1619       ", the next expected keyframe request time is %" GST_TIME_FORMAT,
1620       GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1621
1622   newest_gop->sent_fku = TRUE;
1623
1624   splitmux->next_fku_time = next_fku_time;
1625   ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1626
1627   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1628 }
1629
1630 static GstPadProbeReturn
1631 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1632 {
1633   GstSplitMuxSink *splitmux = ctx->splitmux;
1634   MqStreamBuf *buf_info = NULL;
1635   GstFlowReturn ret = GST_FLOW_OK;
1636
1637   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1638
1639   /* FIXME: Handle buffer lists, until then make it clear they won't work */
1640   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1641     g_warning ("Buffer list handling not implemented");
1642     return GST_PAD_PROBE_DROP;
1643   }
1644   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1645       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1646     GstEvent *event = gst_pad_probe_info_get_event (info);
1647     gboolean locked = FALSE, wait = !ctx->is_reference;
1648
1649     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1650
1651     switch (GST_EVENT_TYPE (event)) {
1652       case GST_EVENT_SEGMENT:
1653         gst_event_copy_segment (event, &ctx->out_segment);
1654         break;
1655       case GST_EVENT_FLUSH_STOP:
1656         GST_SPLITMUX_LOCK (splitmux);
1657         locked = TRUE;
1658         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1659         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1660         g_queue_clear (&ctx->queued_bufs);
1661         g_queue_clear (&ctx->queued_bufs);
1662         /* If this is the reference context, we just threw away any queued keyframes */
1663         if (ctx->is_reference)
1664           splitmux->queued_keyframes = 0;
1665         ctx->flushing = FALSE;
1666         wait = FALSE;
1667         break;
1668       case GST_EVENT_FLUSH_START:
1669         GST_SPLITMUX_LOCK (splitmux);
1670         locked = TRUE;
1671         GST_LOG_OBJECT (pad, "Flush start");
1672         ctx->flushing = TRUE;
1673         GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1674         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1675         break;
1676       case GST_EVENT_EOS:
1677         GST_SPLITMUX_LOCK (splitmux);
1678         locked = TRUE;
1679         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1680           goto beach;
1681         ctx->out_eos = TRUE;
1682
1683         if (ctx == splitmux->reference_ctx) {
1684           splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1685           GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1686         }
1687
1688         GST_INFO_OBJECT (splitmux,
1689             "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1690         break;
1691       case GST_EVENT_GAP:{
1692         GstClockTime gap_ts;
1693         GstClockTimeDiff rtime;
1694
1695         gst_event_parse_gap (event, &gap_ts, NULL);
1696         if (gap_ts == GST_CLOCK_TIME_NONE)
1697           break;
1698
1699         GST_SPLITMUX_LOCK (splitmux);
1700         locked = TRUE;
1701
1702         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1703           goto beach;
1704
1705         /* When we get a gap event on the
1706          * reference stream and we're trying to open a
1707          * new file, we need to store it until we get
1708          * the buffer afterwards
1709          */
1710         if (ctx->is_reference &&
1711             (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1712           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1713           gst_event_replace (&ctx->pending_gap, event);
1714           GST_SPLITMUX_UNLOCK (splitmux);
1715           return GST_PAD_PROBE_HANDLED;
1716         }
1717
1718         rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1719
1720         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1721             GST_STIME_ARGS (rtime));
1722
1723         if (rtime != GST_CLOCK_STIME_NONE) {
1724           ctx->out_running_time = rtime;
1725           complete_or_wait_on_out (splitmux, ctx);
1726         }
1727         break;
1728       }
1729       case GST_EVENT_CUSTOM_DOWNSTREAM:{
1730         const GstStructure *s;
1731         GstClockTimeDiff ts = 0;
1732
1733         s = gst_event_get_structure (event);
1734         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1735           break;
1736
1737         gst_structure_get_int64 (s, "timestamp", &ts);
1738
1739         GST_SPLITMUX_LOCK (splitmux);
1740         locked = TRUE;
1741
1742         if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1743           goto beach;
1744         ctx->out_running_time = ts;
1745         if (!ctx->is_reference)
1746           ret = complete_or_wait_on_out (splitmux, ctx);
1747         GST_SPLITMUX_UNLOCK (splitmux);
1748         GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1749         return GST_PAD_PROBE_DROP;
1750       }
1751       case GST_EVENT_CAPS:{
1752         GstPad *peer;
1753
1754         if (!ctx->is_reference)
1755           break;
1756
1757         peer = gst_pad_get_peer (pad);
1758         if (peer) {
1759           gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1760
1761           gst_object_unref (peer);
1762
1763           if (ok)
1764             break;
1765
1766         } else {
1767           break;
1768         }
1769         /* This is in the case the muxer doesn't allow this change of caps */
1770         GST_SPLITMUX_LOCK (splitmux);
1771         locked = TRUE;
1772         ctx->caps_change = TRUE;
1773
1774         if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1775           GST_DEBUG_OBJECT (splitmux,
1776               "New caps were not accepted. Switching output file");
1777           if (ctx->out_eos == FALSE) {
1778             splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1779             GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1780           }
1781         }
1782
1783         /* Lets it fall through, if it fails again, then the muxer just can't
1784          * support this format, but at least we have a closed file.
1785          */
1786         break;
1787       }
1788       default:
1789         break;
1790     }
1791
1792     /* We need to make sure events aren't passed
1793      * until the muxer / sink are ready for it */
1794     if (!locked)
1795       GST_SPLITMUX_LOCK (splitmux);
1796     if (wait)
1797       ret = complete_or_wait_on_out (splitmux, ctx);
1798     GST_SPLITMUX_UNLOCK (splitmux);
1799
1800     /* Don't try to forward sticky events before the next buffer is there
1801      * because it would cause a new file to be created without the first
1802      * buffer being available.
1803      */
1804     GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1805     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1806       gst_event_unref (event);
1807       return GST_PAD_PROBE_HANDLED;
1808     } else {
1809       return GST_PAD_PROBE_PASS;
1810     }
1811   }
1812
1813   /* Allow everything through until the configured next stopping point */
1814   GST_SPLITMUX_LOCK (splitmux);
1815
1816   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1817   if (buf_info == NULL) {
1818     /* Can only happen due to a poorly timed flush */
1819     ret = GST_FLOW_FLUSHING;
1820     goto beach;
1821   }
1822
1823   /* If we have popped a keyframe, decrement the queued_gop count */
1824   if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
1825     splitmux->queued_keyframes--;
1826
1827   ctx->out_running_time = buf_info->run_ts;
1828   ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1829
1830   GST_LOG_OBJECT (splitmux,
1831       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1832       " size %" G_GUINT64_FORMAT,
1833       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1834
1835   ctx->caps_change = FALSE;
1836
1837   ret = complete_or_wait_on_out (splitmux, ctx);
1838
1839   splitmux->muxed_out_bytes += buf_info->buf_size;
1840
1841 #ifndef GST_DISABLE_GST_DEBUG
1842   {
1843     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1844     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1845         " run ts %" GST_STIME_FORMAT, buf,
1846         GST_STIME_ARGS (ctx->out_running_time));
1847   }
1848 #endif
1849
1850   ctx->cur_out_buffer = NULL;
1851   GST_SPLITMUX_UNLOCK (splitmux);
1852
1853   /* pending_gap is protected by the STREAM lock */
1854   if (ctx->pending_gap) {
1855     /* If we previously stored a gap event, send it now */
1856     GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1857
1858     GST_DEBUG_OBJECT (splitmux,
1859         "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1860
1861     gst_pad_send_event (peer, ctx->pending_gap);
1862     ctx->pending_gap = NULL;
1863
1864     gst_object_unref (peer);
1865   }
1866
1867   mq_stream_buf_free (buf_info);
1868
1869   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1870   return GST_PAD_PROBE_PASS;
1871
1872 beach:
1873   GST_SPLITMUX_UNLOCK (splitmux);
1874   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1875   return GST_PAD_PROBE_DROP;
1876 }
1877
1878 static gboolean
1879 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1880 {
1881   return gst_pad_send_event (peer, gst_event_ref (*event));
1882 }
1883
1884 static void
1885 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1886 {
1887   if (ctx->fragment_block_id > 0) {
1888     gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1889     ctx->fragment_block_id = 0;
1890   }
1891 }
1892
1893 static void
1894 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1895 {
1896   GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1897
1898   gst_pad_sticky_events_foreach (ctx->srcpad,
1899       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1900
1901   /* Clear EOS flag if not actually EOS */
1902   ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1903   ctx->out_eos_async_done = ctx->out_eos;
1904
1905   gst_object_unref (peer);
1906 }
1907
1908 static void
1909 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1910 {
1911   GstPad *sinkpad, *srcpad, *newpad;
1912   GstPadTemplate *templ;
1913
1914   srcpad = ctx->srcpad;
1915   sinkpad = gst_pad_get_peer (srcpad);
1916
1917   templ = sinkpad->padtemplate;
1918   newpad =
1919       gst_element_request_pad (splitmux->muxer, templ,
1920       GST_PAD_NAME (sinkpad), NULL);
1921
1922   GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1923       newpad);
1924   if (!gst_pad_unlink (srcpad, sinkpad)) {
1925     gst_object_unref (sinkpad);
1926     goto fail;
1927   }
1928   if (gst_pad_link_full (srcpad, newpad,
1929           GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1930     gst_element_release_request_pad (splitmux->muxer, newpad);
1931     gst_object_unref (sinkpad);
1932     gst_object_unref (newpad);
1933     goto fail;
1934   }
1935   gst_object_unref (newpad);
1936   gst_object_unref (sinkpad);
1937   return;
1938
1939 fail:
1940   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1941       ("Could not create the new muxer/sink"), NULL);
1942 }
1943
1944 static GstPadProbeReturn
1945 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1946 {
1947   return GST_PAD_PROBE_OK;
1948 }
1949
1950 static void
1951 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1952 {
1953   ctx->fragment_block_id =
1954       gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1955       NULL, NULL);
1956 }
1957
1958 static gboolean
1959 _set_property_from_structure (GQuark field_id, const GValue * value,
1960     gpointer user_data)
1961 {
1962   const gchar *property_name = g_quark_to_string (field_id);
1963   GObject *element = G_OBJECT (user_data);
1964
1965   g_object_set_property (element, property_name, value);
1966
1967   return TRUE;
1968 }
1969
1970 static void
1971 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1972 {
1973   gst_element_set_locked_state (element, TRUE);
1974   gst_element_set_state (element, GST_STATE_NULL);
1975   GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1976   gst_bin_remove (GST_BIN (splitmux), element);
1977 }
1978
1979
1980 static void
1981 _send_event (const GValue * value, gpointer user_data)
1982 {
1983   GstPad *pad = g_value_get_object (value);
1984   GstEvent *ev = user_data;
1985
1986   gst_pad_send_event (pad, gst_event_ref (ev));
1987 }
1988
1989 /* Called with lock held when a fragment
1990  * reaches EOS and it is time to restart
1991  * a new fragment
1992  */
1993 static GstFlowReturn
1994 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1995 {
1996   GstElement *muxer, *sink;
1997
1998   g_assert (ctx->is_reference);
1999
2000   /* 1 change to new file */
2001   splitmux->switching_fragment = TRUE;
2002
2003   /* We need to drop the splitmux lock to acquire the state lock
2004    * here and ensure there's no racy state change going on elsewhere */
2005   muxer = gst_object_ref (splitmux->muxer);
2006   sink = gst_object_ref (splitmux->active_sink);
2007
2008   GST_SPLITMUX_UNLOCK (splitmux);
2009   GST_SPLITMUX_STATE_LOCK (splitmux);
2010
2011   if (splitmux->shutdown) {
2012     GST_DEBUG_OBJECT (splitmux,
2013         "Shutdown requested. Aborting fragment switch.");
2014     GST_SPLITMUX_LOCK (splitmux);
2015     GST_SPLITMUX_STATE_UNLOCK (splitmux);
2016     gst_object_unref (muxer);
2017     gst_object_unref (sink);
2018     return GST_FLOW_FLUSHING;
2019   }
2020
2021   if (splitmux->async_finalize) {
2022     if (splitmux->muxed_out_bytes > 0
2023         || splitmux->fragment_id != splitmux->start_index) {
2024       gchar *newname;
2025       GstElement *new_sink, *new_muxer;
2026
2027       GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
2028           splitmux->fragment_id);
2029       g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
2030       newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
2031       GST_SPLITMUX_LOCK (splitmux);
2032       if ((splitmux->sink =
2033               create_element (splitmux, splitmux->sink_factory, newname,
2034                   TRUE)) == NULL)
2035         goto fail;
2036       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
2037         gst_preset_load_preset (GST_PRESET (splitmux->sink),
2038             splitmux->sink_preset);
2039       if (splitmux->sink_properties)
2040         gst_structure_foreach (splitmux->sink_properties,
2041             _set_property_from_structure, splitmux->sink);
2042       splitmux->active_sink = splitmux->sink;
2043       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
2044       g_free (newname);
2045       newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
2046       if ((splitmux->muxer =
2047               create_element (splitmux, splitmux->muxer_factory, newname,
2048                   TRUE)) == NULL)
2049         goto fail;
2050       if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2051               "async") != NULL) {
2052         /* async child elements are causing state change races and weird
2053          * failures, so let's try and turn that off */
2054         g_object_set (splitmux->sink, "async", FALSE, NULL);
2055       }
2056       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
2057         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
2058             splitmux->muxer_preset);
2059       if (splitmux->muxer_properties)
2060         gst_structure_foreach (splitmux->muxer_properties,
2061             _set_property_from_structure, splitmux->muxer);
2062       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2063       g_free (newname);
2064       new_sink = splitmux->sink;
2065       new_muxer = splitmux->muxer;
2066       GST_SPLITMUX_UNLOCK (splitmux);
2067       g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2068       gst_element_link (new_muxer, new_sink);
2069
2070       if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2071         if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2072                     EOS_FROM_US)) == 2) {
2073           _lock_and_set_to_null (muxer, splitmux);
2074           _lock_and_set_to_null (sink, splitmux);
2075         } else {
2076           g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2077               GINT_TO_POINTER (2));
2078         }
2079       }
2080       gst_object_unref (muxer);
2081       gst_object_unref (sink);
2082       muxer = new_muxer;
2083       sink = new_sink;
2084       gst_object_ref (muxer);
2085       gst_object_ref (sink);
2086     }
2087   } else {
2088
2089     gst_element_set_locked_state (muxer, TRUE);
2090     gst_element_set_locked_state (sink, TRUE);
2091     gst_element_set_state (sink, GST_STATE_NULL);
2092
2093     if (splitmux->reset_muxer) {
2094       gst_element_set_state (muxer, GST_STATE_NULL);
2095     } else {
2096       GstIterator *it = gst_element_iterate_sink_pads (muxer);
2097       GstEvent *ev;
2098       guint32 seqnum;
2099
2100       ev = gst_event_new_flush_start ();
2101       seqnum = gst_event_get_seqnum (ev);
2102       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2103       gst_event_unref (ev);
2104
2105       gst_iterator_resync (it);
2106
2107       ev = gst_event_new_flush_stop (TRUE);
2108       gst_event_set_seqnum (ev, seqnum);
2109       while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2110       gst_event_unref (ev);
2111
2112       gst_iterator_free (it);
2113     }
2114   }
2115
2116   GST_SPLITMUX_LOCK (splitmux);
2117   set_next_filename (splitmux, ctx);
2118   splitmux->muxed_out_bytes = 0;
2119   GST_SPLITMUX_UNLOCK (splitmux);
2120
2121   if (gst_element_set_state (sink,
2122           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2123     gst_element_set_state (sink, GST_STATE_NULL);
2124     gst_element_set_locked_state (muxer, FALSE);
2125     gst_element_set_locked_state (sink, FALSE);
2126
2127     goto fail_output;
2128   }
2129
2130   if (gst_element_set_state (muxer,
2131           GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2132     gst_element_set_state (muxer, GST_STATE_NULL);
2133     gst_element_set_state (sink, GST_STATE_NULL);
2134     gst_element_set_locked_state (muxer, FALSE);
2135     gst_element_set_locked_state (sink, FALSE);
2136     goto fail_muxer;
2137   }
2138
2139   gst_element_set_locked_state (muxer, FALSE);
2140   gst_element_set_locked_state (sink, FALSE);
2141
2142   gst_object_unref (sink);
2143   gst_object_unref (muxer);
2144
2145   GST_SPLITMUX_LOCK (splitmux);
2146   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2147   splitmux->switching_fragment = FALSE;
2148   do_async_done (splitmux);
2149
2150   splitmux->ready_for_output = TRUE;
2151
2152   g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2153   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2154
2155   send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2156
2157   /* FIXME: Is this always the correct next state? */
2158   GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2159   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2160   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2161   return GST_FLOW_OK;
2162
2163 fail:
2164   gst_object_unref (sink);
2165   gst_object_unref (muxer);
2166
2167   GST_SPLITMUX_LOCK (splitmux);
2168   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2169   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2170       ("Could not create the new muxer/sink"), NULL);
2171   return GST_FLOW_ERROR;
2172
2173 fail_output:
2174   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2175       ("Could not start new output sink"), NULL);
2176   gst_object_unref (sink);
2177   gst_object_unref (muxer);
2178
2179   GST_SPLITMUX_LOCK (splitmux);
2180   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2181   splitmux->switching_fragment = FALSE;
2182   return GST_FLOW_ERROR;
2183
2184 fail_muxer:
2185   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2186       ("Could not start new muxer"), NULL);
2187   gst_object_unref (sink);
2188   gst_object_unref (muxer);
2189
2190   GST_SPLITMUX_LOCK (splitmux);
2191   GST_SPLITMUX_STATE_UNLOCK (splitmux);
2192   splitmux->switching_fragment = FALSE;
2193   return GST_FLOW_ERROR;
2194 }
2195
2196 static void
2197 bus_handler (GstBin * bin, GstMessage * message)
2198 {
2199   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2200
2201   switch (GST_MESSAGE_TYPE (message)) {
2202     case GST_MESSAGE_EOS:{
2203       /* If the state is draining out the current file, drop this EOS */
2204       GstElement *sink;
2205
2206       sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2207       GST_SPLITMUX_LOCK (splitmux);
2208
2209       send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2210
2211       if (splitmux->async_finalize) {
2212
2213         if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2214           if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2215                       EOS_FROM_US)) == 2) {
2216             GstElement *muxer;
2217             GstPad *sinksink, *muxersrc;
2218
2219             sinksink = gst_element_get_static_pad (sink, "sink");
2220             muxersrc = gst_pad_get_peer (sinksink);
2221             muxer = gst_pad_get_parent_element (muxersrc);
2222             gst_object_unref (sinksink);
2223             gst_object_unref (muxersrc);
2224
2225             gst_element_call_async (muxer,
2226                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2227                 gst_object_ref (splitmux), gst_object_unref);
2228             gst_element_call_async (sink,
2229                 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2230                 gst_object_ref (splitmux), gst_object_unref);
2231             gst_object_unref (muxer);
2232           } else {
2233             g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2234                 GINT_TO_POINTER (2));
2235           }
2236           GST_DEBUG_OBJECT (splitmux,
2237               "Caught async EOS from previous muxer+sink. Dropping.");
2238           /* We forward the EOS so that it gets aggregated as normal. If the sink
2239            * finishes and is removed before the end, it will be de-aggregated */
2240           gst_message_unref (message);
2241           GST_SPLITMUX_UNLOCK (splitmux);
2242           return;
2243         }
2244       } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2245         GST_DEBUG_OBJECT (splitmux,
2246             "Passing EOS message. Output state %d max_out_running_time %"
2247             GST_STIME_FORMAT, splitmux->output_state,
2248             GST_STIME_ARGS (splitmux->max_out_running_time));
2249       } else {
2250         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2251         splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2252         GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2253
2254         gst_message_unref (message);
2255         GST_SPLITMUX_UNLOCK (splitmux);
2256         return;
2257       }
2258       GST_SPLITMUX_UNLOCK (splitmux);
2259       break;
2260     }
2261     case GST_MESSAGE_ASYNC_START:
2262     case GST_MESSAGE_ASYNC_DONE:
2263       /* Ignore state changes from our children while switching */
2264       GST_SPLITMUX_LOCK (splitmux);
2265       if (splitmux->switching_fragment) {
2266         if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2267             || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2268           GST_LOG_OBJECT (splitmux,
2269               "Ignoring state change from child %" GST_PTR_FORMAT
2270               " while switching", GST_MESSAGE_SRC (message));
2271           gst_message_unref (message);
2272           GST_SPLITMUX_UNLOCK (splitmux);
2273           return;
2274         }
2275       }
2276       GST_SPLITMUX_UNLOCK (splitmux);
2277       break;
2278     case GST_MESSAGE_WARNING:
2279     {
2280       GError *gerror = NULL;
2281
2282       gst_message_parse_warning (message, &gerror, NULL);
2283
2284       if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2285         GList *item;
2286         gboolean caps_change = FALSE;
2287
2288         GST_SPLITMUX_LOCK (splitmux);
2289
2290         for (item = splitmux->contexts; item; item = item->next) {
2291           MqStreamCtx *ctx = item->data;
2292
2293           if (ctx->caps_change) {
2294             caps_change = TRUE;
2295             break;
2296           }
2297         }
2298
2299         GST_SPLITMUX_UNLOCK (splitmux);
2300
2301         if (caps_change) {
2302           GST_LOG_OBJECT (splitmux,
2303               "Ignoring warning change from child %" GST_PTR_FORMAT
2304               " while switching caps", GST_MESSAGE_SRC (message));
2305           gst_message_unref (message);
2306           return;
2307         }
2308       }
2309       break;
2310     }
2311     default:
2312       break;
2313   }
2314
2315   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2316 }
2317
2318 static void
2319 ctx_set_unblock (MqStreamCtx * ctx)
2320 {
2321   ctx->need_unblock = TRUE;
2322 }
2323
2324 static gboolean
2325 need_new_fragment (GstSplitMuxSink * splitmux,
2326     GstClockTime queued_time, GstClockTime queued_gop_time,
2327     guint64 queued_bytes)
2328 {
2329   guint64 thresh_bytes;
2330   GstClockTime thresh_time;
2331   gboolean check_robust_muxing;
2332   GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2333   GstClockTime *ptr_to_time;
2334   const InputGop *gop, *next_gop;
2335
2336   GST_OBJECT_LOCK (splitmux);
2337   thresh_bytes = splitmux->threshold_bytes;
2338   thresh_time = splitmux->threshold_time;
2339   ptr_to_time = (GstClockTime *)
2340       gst_queue_array_peek_head_struct (splitmux->times_to_split);
2341   if (ptr_to_time)
2342     time_to_split = *ptr_to_time;
2343   check_robust_muxing = splitmux->use_robust_muxing
2344       && splitmux->muxer_has_reserved_props;
2345   GST_OBJECT_UNLOCK (splitmux);
2346
2347   /* Have we muxed at least one thing from the reference
2348    * stream into the file? If not, no other streams can have
2349    * either */
2350   if (splitmux->fragment_reference_bytes <= 0) {
2351     GST_TRACE_OBJECT (splitmux,
2352         "Not ready to split - nothing muxed on the reference stream");
2353     return FALSE;
2354   }
2355
2356   /* User told us to split now */
2357   if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2358     GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2359     return TRUE;
2360   }
2361
2362   gop = g_queue_peek_head (&splitmux->pending_input_gops);
2363   /* We need a full GOP queued up at this point */
2364   g_assert (gop != NULL);
2365   next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2366   /* And the beginning of the next GOP or otherwise EOS */
2367
2368   /* User told us to split at this running time */
2369   if (gop->start_time >= time_to_split) {
2370     GST_OBJECT_LOCK (splitmux);
2371     /* Dequeue running time */
2372     gst_queue_array_pop_head_struct (splitmux->times_to_split);
2373     /* Empty any running times after this that are past now */
2374     ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2375     while (ptr_to_time) {
2376       time_to_split = *ptr_to_time;
2377       if (gop->start_time < time_to_split) {
2378         break;
2379       }
2380       gst_queue_array_pop_head_struct (splitmux->times_to_split);
2381       ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2382     }
2383     GST_TRACE_OBJECT (splitmux,
2384         "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2385         GST_STIME_FORMAT, GST_STIME_ARGS (gop->start_time),
2386         GST_STIME_ARGS (time_to_split));
2387     GST_OBJECT_UNLOCK (splitmux);
2388     return TRUE;
2389   }
2390
2391   if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2392     GST_TRACE_OBJECT (splitmux,
2393         "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2394     return TRUE;                /* Would overrun byte limit */
2395   }
2396
2397   if (thresh_time > 0 && queued_time > thresh_time) {
2398     GST_TRACE_OBJECT (splitmux,
2399         "queued time %" GST_STIME_FORMAT " overruns time limit",
2400         GST_STIME_ARGS (queued_time));
2401     return TRUE;                /* Would overrun time limit */
2402   }
2403
2404   if (splitmux->tc_interval) {
2405     GstClockTime next_gop_start_time =
2406         next_gop ? next_gop->start_time : splitmux->max_in_running_time;
2407
2408     if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2409         GST_CLOCK_STIME_IS_VALID (next_gop_start_time) &&
2410         next_gop_start_time >
2411         splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2412       GST_TRACE_OBJECT (splitmux,
2413           "in running time %" GST_STIME_FORMAT " overruns time limit %"
2414           GST_TIME_FORMAT, GST_STIME_ARGS (next_gop_start_time),
2415           GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2416       return TRUE;
2417     }
2418   }
2419
2420   if (check_robust_muxing) {
2421     GstClockTime mux_reserved_remain;
2422
2423     g_object_get (splitmux->muxer,
2424         "reserved-duration-remaining", &mux_reserved_remain, NULL);
2425
2426     GST_LOG_OBJECT (splitmux,
2427         "Muxer robust muxing report - %" G_GUINT64_FORMAT
2428         " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2429         mux_reserved_remain, queued_gop_time);
2430
2431     if (queued_gop_time >= mux_reserved_remain) {
2432       GST_INFO_OBJECT (splitmux,
2433           "File is about to run out of header room - %" G_GUINT64_FORMAT
2434           " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2435           ". Switching to new file", mux_reserved_remain, queued_gop_time);
2436       return TRUE;
2437     }
2438   }
2439
2440   /* Continue and mux this GOP */
2441   return FALSE;
2442 }
2443
2444 /* probably we want to add this API? */
2445 static void
2446 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2447 {
2448   GstVideoTimeCode *timecode = NULL;
2449
2450   g_return_if_fail (old_tc != NULL);
2451
2452   if (*old_tc == new_tc)
2453     return;
2454
2455   if (new_tc)
2456     timecode = gst_video_time_code_copy (new_tc);
2457
2458   if (*old_tc)
2459     gst_video_time_code_free (*old_tc);
2460
2461   *old_tc = timecode;
2462 }
2463
2464 /* Called with splitmux lock held */
2465 /* Called when entering ProcessingCompleteGop state
2466  * Assess if mq contents overflowed the current file
2467  *   -> If yes, need to switch to new file
2468  *   -> if no, set max_out_running_time to let this GOP in and
2469  *      go to COLLECTING_GOP_START state
2470  */
2471 static void
2472 handle_gathered_gop (GstSplitMuxSink * splitmux, const InputGop * gop,
2473     GstClockTimeDiff next_gop_start_time, GstClockTimeDiff max_out_running_time)
2474 {
2475   guint64 queued_bytes;
2476   GstClockTimeDiff queued_time = 0;
2477   GstClockTimeDiff queued_gop_time = 0;
2478   SplitMuxOutputCommand *cmd;
2479
2480   /* Assess if the multiqueue contents overflowed the current file */
2481   /* When considering if a newly gathered GOP overflows
2482    * the time limit for the file, only consider the running time of the
2483    * reference stream. Other streams might have run ahead a little bit,
2484    * but extra pieces won't be released to the muxer beyond the reference
2485    * stream cut-off anyway - so it forms the limit. */
2486   queued_bytes = splitmux->fragment_total_bytes + gop->total_bytes;
2487   queued_time = next_gop_start_time;
2488   /* queued_gop_time tracks how much unwritten data there is waiting to
2489    * be written to this fragment including this GOP */
2490   if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2491     queued_gop_time = queued_time - splitmux->reference_ctx->out_running_time;
2492   else
2493     queued_gop_time = queued_time - gop->start_time;
2494
2495   GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2496   GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2497       " bytes %" G_GUINT64_FORMAT " in next gop start time %" GST_STIME_FORMAT
2498       " gop start time %" GST_STIME_FORMAT,
2499       GST_STIME_ARGS (queued_time), queued_bytes,
2500       GST_STIME_ARGS (next_gop_start_time), GST_STIME_ARGS (gop->start_time));
2501
2502   if (queued_gop_time < 0)
2503     goto error_gop_duration;
2504
2505   if (queued_time < splitmux->fragment_start_time)
2506     goto error_queued_time;
2507
2508   queued_time -= splitmux->fragment_start_time;
2509   if (queued_time < queued_gop_time)
2510     queued_gop_time = queued_time;
2511
2512   /* Expand queued bytes estimate by muxer overhead */
2513   queued_bytes += (queued_bytes * splitmux->mux_overhead);
2514
2515   /* Check for overrun - have we output at least one byte and overrun
2516    * either threshold? */
2517   if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2518     if (splitmux->async_finalize) {
2519       GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2520       *sink_running_time = splitmux->reference_ctx->out_running_time;
2521       g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2522           RUNNING_TIME, sink_running_time, g_free);
2523     }
2524     g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2525     /* Tell the output side to start a new fragment */
2526     GST_INFO_OBJECT (splitmux,
2527         "This GOP (dur %" GST_STIME_FORMAT
2528         ") would overflow the fragment, Sending start_new_fragment cmd",
2529         GST_STIME_ARGS (queued_gop_time));
2530     cmd = out_cmd_buf_new ();
2531     cmd->start_new_fragment = TRUE;
2532     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2533     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2534
2535     splitmux->fragment_start_time = gop->start_time;
2536     splitmux->fragment_start_time_pts = gop->start_time_pts;
2537     splitmux->fragment_total_bytes = 0;
2538     splitmux->fragment_reference_bytes = 0;
2539
2540     video_time_code_replace (&splitmux->fragment_start_tc, gop->start_tc);
2541     splitmux->next_fragment_start_tc_time =
2542         calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2543         splitmux->fragment_start_time, NULL);
2544     if (!GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2545       GST_WARNING_OBJECT (splitmux,
2546           "Couldn't calculate next fragment start time for timecode mode");
2547     }
2548   }
2549
2550   /* And set up to collect the next GOP */
2551   if (max_out_running_time != G_MAXINT64) {
2552     splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2553   } else {
2554     /* This is probably already the current state, but just in case: */
2555     splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2556   }
2557
2558   /* And wake all input contexts to send a wake-up event */
2559   g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2560   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2561
2562   /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2563   splitmux->fragment_total_bytes += gop->total_bytes;
2564   splitmux->fragment_reference_bytes += gop->reference_bytes;
2565
2566   if (gop->total_bytes > 0) {
2567     GST_LOG_OBJECT (splitmux,
2568         "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2569         " time %" GST_STIME_FORMAT,
2570         splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2571
2572     /* Send this GOP to the output command queue */
2573     cmd = out_cmd_buf_new ();
2574     cmd->start_new_fragment = FALSE;
2575     cmd->max_output_ts = max_out_running_time;
2576     GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2577         GST_STIME_FORMAT, GST_STIME_ARGS (max_out_running_time));
2578     g_queue_push_head (&splitmux->out_cmd_q, cmd);
2579
2580     GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2581   }
2582
2583   return;
2584
2585 error_gop_duration:
2586   GST_ELEMENT_ERROR (splitmux,
2587       STREAM, FAILED, ("Timestamping error on input streams"),
2588       ("Queued GOP time is negative %" GST_STIME_FORMAT,
2589           GST_STIME_ARGS (queued_gop_time)));
2590   return;
2591 error_queued_time:
2592   GST_ELEMENT_ERROR (splitmux,
2593       STREAM, FAILED, ("Timestamping error on input streams"),
2594       ("Queued time is negative. Input went backwards. queued_time - %"
2595           GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2596   return;
2597 }
2598
2599 /* Called with splitmux lock held */
2600 /* Called from each input pad when it is has all the pieces
2601  * for a GOP or EOS, starting with the reference pad which has set the
2602  * splitmux->max_in_running_time
2603  */
2604 static void
2605 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2606 {
2607   GList *cur;
2608   GstEvent *event;
2609
2610   /* On ENDING_FILE, the reference stream sends a command to start a new
2611    * fragment, then releases the GOP for output in the new fragment.
2612    *  If some streams received no buffer during the last GOP that overran,
2613    * because its next buffer has a timestamp bigger than
2614    * ctx->max_in_running_time, its queue is empty. In that case the only
2615    * way to wakeup the output thread is by injecting an event in the
2616    * queue. This usually happen with subtitle streams.
2617    * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2618   if (ctx->need_unblock) {
2619     GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2620     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2621         GST_EVENT_TYPE_SERIALIZED,
2622         gst_structure_new ("splitmuxsink-unblock", "timestamp",
2623             G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2624
2625     GST_SPLITMUX_UNLOCK (splitmux);
2626     gst_pad_send_event (ctx->sinkpad, event);
2627     GST_SPLITMUX_LOCK (splitmux);
2628
2629     ctx->need_unblock = FALSE;
2630     GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2631     /* state may have changed while we were unlocked. Loop again if so */
2632     if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2633       return;
2634   }
2635
2636   do {
2637     GstClockTimeDiff next_gop_start = GST_CLOCK_STIME_NONE;
2638
2639     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2640       GstClockTimeDiff max_out_running_time;
2641       gboolean ready = TRUE;
2642       InputGop *gop;
2643       const InputGop *next_gop;
2644
2645       gop = g_queue_peek_head (&splitmux->pending_input_gops);
2646       next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2647
2648       /* If we have no GOP or no next GOP here then the reference context is
2649        * at EOS, otherwise use the start time of the next GOP if we're far
2650        * enough in the GOP to know it */
2651       if (gop && next_gop) {
2652         if (!splitmux->reference_ctx->in_eos
2653             && splitmux->max_in_running_time_dts != GST_CLOCK_STIME_NONE
2654             && splitmux->max_in_running_time_dts < next_gop->start_time_pts) {
2655           GST_LOG_OBJECT (splitmux,
2656               "No further GOPs finished collecting, waiting until current DTS %"
2657               GST_STIME_FORMAT " has passed next GOP start PTS %"
2658               GST_STIME_FORMAT,
2659               GST_STIME_ARGS (splitmux->max_in_running_time_dts),
2660               GST_STIME_ARGS (next_gop->start_time_pts));
2661           break;
2662         }
2663
2664         GST_LOG_OBJECT (splitmux,
2665             "Finished collecting GOP with start time %" GST_STIME_FORMAT
2666             ", next GOP start time %" GST_STIME_FORMAT,
2667             GST_STIME_ARGS (gop->start_time),
2668             GST_STIME_ARGS (next_gop->start_time));
2669         next_gop_start = next_gop->start_time;
2670         max_out_running_time =
2671             splitmux->reference_ctx->in_eos ? G_MAXINT64 : next_gop->start_time;
2672       } else if (!next_gop) {
2673         GST_LOG_OBJECT (splitmux, "Reference context is EOS");
2674         next_gop_start = splitmux->max_in_running_time;
2675         max_out_running_time = G_MAXINT64;
2676       } else if (!gop) {
2677         GST_LOG_OBJECT (splitmux, "No further GOPs finished collecting");
2678         break;
2679       } else {
2680         g_assert_not_reached ();
2681       }
2682
2683       g_assert (gop != NULL);
2684
2685       /* Iterate each pad, and check that the input running time is at least
2686        * up to the start running time of the next GOP or EOS, and if so handle
2687        * the collected GOP */
2688       GST_LOG_OBJECT (splitmux, "Checking GOP collected, next GOP start %"
2689           GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (next_gop_start), ctx);
2690       for (cur = g_list_first (splitmux->contexts); cur != NULL;
2691           cur = g_list_next (cur)) {
2692         MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2693
2694         GST_LOG_OBJECT (splitmux,
2695             "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2696             " EOS %d", tmpctx, tmpctx->sinkpad,
2697             GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2698
2699         if (next_gop_start != GST_CLOCK_STIME_NONE &&
2700             tmpctx->in_running_time < next_gop_start && !tmpctx->in_eos) {
2701           GST_LOG_OBJECT (splitmux,
2702               "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2703               tmpctx, tmpctx->sinkpad);
2704           ready = FALSE;
2705           break;
2706         }
2707       }
2708       if (ready) {
2709         GST_DEBUG_OBJECT (splitmux,
2710             "Collected GOP is complete. Processing (ctx %p)", ctx);
2711         /* All pads have a complete GOP, release it into the multiqueue */
2712         handle_gathered_gop (splitmux, gop, next_gop_start,
2713             max_out_running_time);
2714
2715         g_queue_pop_head (&splitmux->pending_input_gops);
2716         input_gop_free (gop);
2717
2718         /* The user has requested a split, we can split now that the previous GOP
2719          * has been collected to the correct location */
2720         if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2721                 TRUE, FALSE)) {
2722           g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2723         }
2724       }
2725     }
2726
2727     /* If upstream reached EOS we are not expecting more data, no need to wait
2728      * here. */
2729     if (ctx->in_eos)
2730       return;
2731
2732     if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2733         !ctx->flushing &&
2734         ctx->in_running_time >= next_gop_start &&
2735         next_gop_start != GST_CLOCK_STIME_NONE) {
2736       /* Some pad is not yet ready, or GOP is being pushed
2737        * either way, sleep and wait to get woken */
2738       GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2739       GST_SPLITMUX_WAIT_INPUT (splitmux);
2740       GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2741     } else {
2742       /* This pad is not ready or the state changed - break out and get another
2743        * buffer / event */
2744       break;
2745     }
2746   } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2747 }
2748
2749 static GstPadProbeReturn
2750 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2751 {
2752   GstSplitMuxSink *splitmux = ctx->splitmux;
2753   GstFlowReturn ret = GST_FLOW_OK;
2754   GstBuffer *buf;
2755   MqStreamBuf *buf_info = NULL;
2756   GstClockTime ts, pts, dts;
2757   GstClockTimeDiff running_time, running_time_pts, running_time_dts;
2758   gboolean loop_again;
2759   gboolean keyframe = FALSE;
2760
2761   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2762
2763   /* FIXME: Handle buffer lists, until then make it clear they won't work */
2764   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2765     g_warning ("Buffer list handling not implemented");
2766     return GST_PAD_PROBE_DROP;
2767   }
2768   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2769       info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2770     GstEvent *event = gst_pad_probe_info_get_event (info);
2771
2772     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2773
2774     switch (GST_EVENT_TYPE (event)) {
2775       case GST_EVENT_SEGMENT:
2776         gst_event_copy_segment (event, &ctx->in_segment);
2777         break;
2778       case GST_EVENT_FLUSH_STOP:
2779         GST_SPLITMUX_LOCK (splitmux);
2780         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2781         ctx->in_eos = FALSE;
2782         ctx->in_running_time = GST_CLOCK_STIME_NONE;
2783         GST_SPLITMUX_UNLOCK (splitmux);
2784         break;
2785       case GST_EVENT_EOS:
2786         GST_SPLITMUX_LOCK (splitmux);
2787         ctx->in_eos = TRUE;
2788
2789         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2790           ret = GST_FLOW_FLUSHING;
2791           goto beach;
2792         }
2793
2794         if (ctx->is_reference) {
2795           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2796           /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2797           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2798           /* Wake up other input pads to collect this GOP */
2799           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2800           check_completed_gop (splitmux, ctx);
2801         } else if (splitmux->input_state ==
2802             SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2803           /* If we are waiting for a GOP to be completed (ie, for aux
2804            * pads to catch up), then this pad is complete, so check
2805            * if the whole GOP is.
2806            */
2807           check_completed_gop (splitmux, ctx);
2808         }
2809         GST_SPLITMUX_UNLOCK (splitmux);
2810         break;
2811       case GST_EVENT_GAP:{
2812         GstClockTime gap_ts;
2813         GstClockTimeDiff rtime;
2814
2815         gst_event_parse_gap (event, &gap_ts, NULL);
2816         if (gap_ts == GST_CLOCK_TIME_NONE)
2817           break;
2818
2819         GST_SPLITMUX_LOCK (splitmux);
2820
2821         if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2822           ret = GST_FLOW_FLUSHING;
2823           goto beach;
2824         }
2825         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2826
2827         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2828             GST_STIME_ARGS (rtime));
2829
2830         if (ctx->is_reference && GST_CLOCK_STIME_IS_VALID (rtime)) {
2831           /* If this GAP event happens before the first fragment then
2832            * initialize the fragment start time here. */
2833           if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)) {
2834             splitmux->fragment_start_time = rtime;
2835             GST_LOG_OBJECT (splitmux,
2836                 "Fragment start time now %" GST_STIME_FORMAT,
2837                 GST_STIME_ARGS (splitmux->fragment_start_time));
2838
2839             /* Also take this as the first start time when starting up,
2840              * so that we start counting overflow from the first frame */
2841             if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2842               splitmux->max_in_running_time = rtime;
2843             if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2844               splitmux->max_in_running_time_dts = rtime;
2845           }
2846
2847           /* Similarly take it as fragment start PTS and GOP start time if
2848            * these are not set */
2849           if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time_pts))
2850             splitmux->fragment_start_time_pts = rtime;
2851
2852           if (g_queue_is_empty (&splitmux->pending_input_gops)) {
2853             InputGop *gop = g_slice_new0 (InputGop);
2854
2855             gop->from_gap = TRUE;
2856             gop->start_time = rtime;
2857             gop->start_time_pts = rtime;
2858
2859             g_queue_push_tail (&splitmux->pending_input_gops, gop);
2860           }
2861         }
2862
2863         GST_SPLITMUX_UNLOCK (splitmux);
2864         break;
2865       }
2866       default:
2867         break;
2868     }
2869     return GST_PAD_PROBE_PASS;
2870   } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2871     switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2872       case GST_QUERY_ALLOCATION:
2873         return GST_PAD_PROBE_DROP;
2874       default:
2875         return GST_PAD_PROBE_PASS;
2876     }
2877   }
2878
2879   buf = gst_pad_probe_info_get_buffer (info);
2880   buf_info = mq_stream_buf_new ();
2881
2882   pts = GST_BUFFER_PTS (buf);
2883   dts = GST_BUFFER_DTS (buf);
2884   if (GST_BUFFER_PTS_IS_VALID (buf))
2885     ts = GST_BUFFER_PTS (buf);
2886   else
2887     ts = GST_BUFFER_DTS (buf);
2888
2889   GST_LOG_OBJECT (pad,
2890       "Buffer TS is %" GST_TIME_FORMAT " (PTS %" GST_TIME_FORMAT ", DTS %"
2891       GST_TIME_FORMAT ")", GST_TIME_ARGS (ts), GST_TIME_ARGS (pts),
2892       GST_TIME_ARGS (dts));
2893
2894   GST_SPLITMUX_LOCK (splitmux);
2895
2896   if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2897     ret = GST_FLOW_FLUSHING;
2898     goto beach;
2899   }
2900
2901   /* If this buffer has a timestamp, advance the input timestamp of the
2902    * stream */
2903   if (GST_CLOCK_TIME_IS_VALID (ts)) {
2904     running_time = my_segment_to_running_time (&ctx->in_segment, ts);
2905
2906     GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2907         GST_STIME_ARGS (running_time));
2908
2909     /* in running time is always the maximum PTS (or DTS) that was observed so far */
2910     if (GST_CLOCK_STIME_IS_VALID (running_time)
2911         && running_time > ctx->in_running_time)
2912       ctx->in_running_time = running_time;
2913   } else {
2914     running_time = ctx->in_running_time;
2915   }
2916
2917   if (GST_CLOCK_TIME_IS_VALID (pts))
2918     running_time_pts = my_segment_to_running_time (&ctx->in_segment, pts);
2919   else
2920     running_time_pts = GST_CLOCK_STIME_NONE;
2921
2922   if (GST_CLOCK_TIME_IS_VALID (dts))
2923     running_time_dts = my_segment_to_running_time (&ctx->in_segment, dts);
2924   else
2925     running_time_dts = GST_CLOCK_STIME_NONE;
2926
2927   /* Try to make sure we have a valid running time */
2928   if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2929     ctx->in_running_time =
2930         my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2931   }
2932
2933   GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2934       GST_STIME_ARGS (ctx->in_running_time));
2935
2936   buf_info->run_ts = ctx->in_running_time;
2937   buf_info->buf_size = gst_buffer_get_size (buf);
2938   buf_info->duration = GST_BUFFER_DURATION (buf);
2939
2940   if (ctx->is_reference) {
2941     InputGop *gop = NULL;
2942     GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
2943
2944     /* initialize fragment_start_time if it was not set yet (i.e. for the
2945      * first fragment), or otherwise set it to the minimum observed time */
2946     if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)
2947         || splitmux->fragment_start_time > running_time) {
2948       if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time))
2949         splitmux->fragment_start_time_pts = running_time_pts;
2950       splitmux->fragment_start_time = running_time;
2951
2952       GST_LOG_OBJECT (splitmux,
2953           "Fragment start time now %" GST_STIME_FORMAT " (initial PTS %"
2954           GST_STIME_FORMAT ")", GST_STIME_ARGS (splitmux->fragment_start_time),
2955           GST_STIME_ARGS (splitmux->fragment_start_time_pts));
2956
2957       /* Also take this as the first start time when starting up,
2958        * so that we start counting overflow from the first frame */
2959       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)
2960           || splitmux->max_in_running_time < splitmux->fragment_start_time)
2961         splitmux->max_in_running_time = splitmux->fragment_start_time;
2962
2963       if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2964         splitmux->max_in_running_time_dts = running_time_dts;
2965
2966       if (tc_meta) {
2967         video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
2968
2969         splitmux->next_fragment_start_tc_time =
2970             calculate_next_max_timecode (splitmux, &tc_meta->tc,
2971             running_time, NULL);
2972
2973 #ifndef GST_DISABLE_GST_DEBUG
2974         {
2975           gchar *tc_str;
2976
2977           tc_str = gst_video_time_code_to_string (&tc_meta->tc);
2978           GST_DEBUG_OBJECT (splitmux,
2979               "Initialize fragment start timecode %s, next fragment start timecode time %"
2980               GST_TIME_FORMAT, tc_str,
2981               GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2982           g_free (tc_str);
2983         }
2984 #endif
2985       }
2986     }
2987
2988
2989     /* First check if we're at the very first GOP and the tracking was created
2990      * from a GAP event. In that case don't start a new GOP on keyframes but
2991      * just updated it as needed */
2992     gop = g_queue_peek_tail (&splitmux->pending_input_gops);
2993
2994     if (!gop || (!gop->from_gap
2995             && !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))) {
2996       gop = g_slice_new0 (InputGop);
2997
2998       gop->start_time = running_time;
2999       gop->start_time_pts = running_time_pts;
3000
3001       GST_LOG_OBJECT (splitmux,
3002           "Next GOP start time now %" GST_STIME_FORMAT " (initial PTS %"
3003           GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3004           GST_STIME_ARGS (gop->start_time_pts));
3005
3006       if (tc_meta) {
3007         video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3008
3009 #ifndef GST_DISABLE_GST_DEBUG
3010         {
3011           gchar *tc_str;
3012
3013           tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3014           GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode %s", tc_str);
3015           g_free (tc_str);
3016         }
3017 #endif
3018       }
3019
3020       g_queue_push_tail (&splitmux->pending_input_gops, gop);
3021     } else {
3022       gop->from_gap = FALSE;
3023
3024       if (!GST_CLOCK_STIME_IS_VALID (gop->start_time)
3025           || gop->start_time > running_time) {
3026         gop->start_time = running_time;
3027
3028         GST_LOG_OBJECT (splitmux,
3029             "GOP start time updated now %" GST_STIME_FORMAT " (initial PTS %"
3030             GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3031             GST_STIME_ARGS (gop->start_time_pts));
3032
3033         if (tc_meta) {
3034           video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3035
3036 #ifndef GST_DISABLE_GST_DEBUG
3037           {
3038             gchar *tc_str;
3039
3040             tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3041             GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode updated %s",
3042                 tc_str);
3043             g_free (tc_str);
3044           }
3045 #endif
3046         }
3047       }
3048     }
3049
3050     /* Check whether we need to request next keyframe depending on
3051      * current running time */
3052     if (request_next_keyframe (splitmux, buf, running_time_dts) == FALSE) {
3053       GST_WARNING_OBJECT (splitmux,
3054           "Could not request a keyframe. Files may not split at the exact location they should");
3055     }
3056   }
3057
3058   {
3059     InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3060
3061     if (gop) {
3062       GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
3063           " total GOP bytes %" G_GUINT64_FORMAT ", total next GOP bytes %"
3064           G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts),
3065           gop->total_bytes, gop->total_bytes);
3066     }
3067   }
3068
3069   loop_again = TRUE;
3070   do {
3071     if (ctx->flushing)
3072       break;
3073
3074     switch (splitmux->input_state) {
3075       case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
3076         if (ctx->is_releasing) {
3077           /* The pad belonging to this context is being released */
3078           GST_WARNING_OBJECT (pad, "Pad is being released while the muxer is "
3079               "running. Data might not drain correctly");
3080           loop_again = FALSE;
3081         } else if (ctx->is_reference) {
3082           const InputGop *gop, *next_gop;
3083
3084           /* This is the reference context. If it's a keyframe,
3085            * it marks the start of a new GOP and we should wait in
3086            * check_completed_gop before continuing, but either way
3087            * (keyframe or no, we'll pass this buffer through after
3088            * so set loop_again to FALSE */
3089           loop_again = FALSE;
3090
3091           gop = g_queue_peek_head (&splitmux->pending_input_gops);
3092           g_assert (gop != NULL);
3093           next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
3094
3095           if (ctx->in_running_time > splitmux->max_in_running_time)
3096             splitmux->max_in_running_time = ctx->in_running_time;
3097           if (running_time_dts > splitmux->max_in_running_time_dts)
3098             splitmux->max_in_running_time_dts = running_time_dts;
3099
3100           GST_LOG_OBJECT (splitmux,
3101               "Max in running time now %" GST_STIME_FORMAT ", DTS %"
3102               GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_in_running_time),
3103               GST_STIME_ARGS (splitmux->max_in_running_time_dts));
3104
3105           if (!next_gop) {
3106             GST_DEBUG_OBJECT (pad, "Waiting for end of GOP");
3107             /* Allow other input pads to catch up to here too */
3108             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3109             break;
3110           }
3111
3112           if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
3113             GST_INFO_OBJECT (pad,
3114                 "Have keyframe with running time %" GST_STIME_FORMAT,
3115                 GST_STIME_ARGS (ctx->in_running_time));
3116             keyframe = TRUE;
3117           }
3118
3119           if (running_time_dts != GST_CLOCK_STIME_NONE
3120               && running_time_dts < next_gop->start_time_pts) {
3121             GST_DEBUG_OBJECT (splitmux,
3122                 "Waiting until DTS (%" GST_STIME_FORMAT
3123                 ") has passed next GOP start PTS (%" GST_STIME_FORMAT ")",
3124                 GST_STIME_ARGS (running_time_dts),
3125                 GST_STIME_ARGS (next_gop->start_time_pts));
3126             /* Allow other input pads to catch up to here too */
3127             GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3128             break;
3129           }
3130
3131           splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
3132           /* Wake up other input pads to collect this GOP */
3133           GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3134           check_completed_gop (splitmux, ctx);
3135         } else {
3136           /* Pass this buffer if the reference ctx is far enough ahead */
3137           if (ctx->in_running_time < splitmux->max_in_running_time) {
3138             loop_again = FALSE;
3139             break;
3140           }
3141
3142           /* We're still waiting for a keyframe on the reference pad, sleep */
3143           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
3144           GST_SPLITMUX_WAIT_INPUT (splitmux);
3145           GST_LOG_OBJECT (pad,
3146               "Done sleeping for GOP start input state now %d",
3147               splitmux->input_state);
3148         }
3149         break;
3150       case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
3151         /* We're collecting a GOP, this is only ever called for non-reference
3152          * contexts as the reference context would be waiting inside
3153          * check_completed_gop() */
3154
3155         g_assert (!ctx->is_reference);
3156
3157         /* If we overran the target timestamp, it might be time to process
3158          * the GOP, otherwise bail out for more data. */
3159         GST_LOG_OBJECT (pad,
3160             "Checking TS %" GST_STIME_FORMAT " against max %"
3161             GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
3162             GST_STIME_ARGS (splitmux->max_in_running_time));
3163
3164         if (ctx->in_running_time < splitmux->max_in_running_time) {
3165           loop_again = FALSE;
3166           break;
3167         }
3168
3169         GST_LOG_OBJECT (pad,
3170             "Collected last packet of GOP. Checking other pads");
3171         check_completed_gop (splitmux, ctx);
3172         break;
3173       }
3174       case SPLITMUX_INPUT_STATE_FINISHING_UP:
3175         loop_again = FALSE;
3176         break;
3177       default:
3178         loop_again = FALSE;
3179         break;
3180     }
3181   }
3182   while (loop_again);
3183
3184   if (keyframe && ctx->is_reference)
3185     splitmux->queued_keyframes++;
3186   buf_info->keyframe = keyframe;
3187
3188   /* Update total input byte counter for overflow detect unless we're after
3189    * EOS now */
3190   if (splitmux->input_state != SPLITMUX_INPUT_STATE_FINISHING_UP) {
3191     InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3192
3193     /* We must have a GOP at this point */
3194     g_assert (gop != NULL);
3195
3196     gop->total_bytes += buf_info->buf_size;
3197     if (ctx->is_reference) {
3198       gop->reference_bytes += buf_info->buf_size;
3199     }
3200   }
3201
3202   /* Now add this buffer to the queue just before returning */
3203   g_queue_push_head (&ctx->queued_bufs, buf_info);
3204
3205   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
3206       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
3207
3208   GST_SPLITMUX_UNLOCK (splitmux);
3209   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3210   return GST_PAD_PROBE_PASS;
3211
3212 beach:
3213   GST_SPLITMUX_UNLOCK (splitmux);
3214   if (buf_info)
3215     mq_stream_buf_free (buf_info);
3216   GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3217   return GST_PAD_PROBE_PASS;
3218 }
3219
3220 static void
3221 grow_blocked_queues (GstSplitMuxSink * splitmux)
3222 {
3223   GList *cur;
3224
3225   /* Scan other queues for full-ness and grow them */
3226   for (cur = g_list_first (splitmux->contexts);
3227       cur != NULL; cur = g_list_next (cur)) {
3228     MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3229     guint cur_limit;
3230     guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
3231
3232     g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
3233     GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
3234
3235     if (cur_len >= cur_limit) {
3236       cur_limit = cur_len + 1;
3237       GST_DEBUG_OBJECT (tmpctx->q,
3238           "Queue overflowed and needs enlarging. Growing to %u buffers",
3239           cur_limit);
3240       g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
3241     }
3242   }
3243 }
3244
3245 static void
3246 handle_q_underrun (GstElement * q, gpointer user_data)
3247 {
3248   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3249   GstSplitMuxSink *splitmux = ctx->splitmux;
3250
3251   GST_SPLITMUX_LOCK (splitmux);
3252   GST_DEBUG_OBJECT (q,
3253       "Queue reported underrun with %d keyframes and %d cmds enqueued",
3254       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3255   grow_blocked_queues (splitmux);
3256   GST_SPLITMUX_UNLOCK (splitmux);
3257 }
3258
3259 static void
3260 handle_q_overrun (GstElement * q, gpointer user_data)
3261 {
3262   MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3263   GstSplitMuxSink *splitmux = ctx->splitmux;
3264   gboolean allow_grow = FALSE;
3265
3266   GST_SPLITMUX_LOCK (splitmux);
3267   GST_DEBUG_OBJECT (q,
3268       "Queue reported overrun with %d keyframes and %d cmds enqueued",
3269       splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3270
3271   if (splitmux->queued_keyframes < 2) {
3272     /* Less than a full GOP queued, grow the queue */
3273     allow_grow = TRUE;
3274   } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3275     allow_grow = TRUE;
3276   } else {
3277     /* If another queue is starved, grow */
3278     GList *cur;
3279     for (cur = g_list_first (splitmux->contexts);
3280         cur != NULL; cur = g_list_next (cur)) {
3281       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3282       if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3283         allow_grow = TRUE;
3284       }
3285     }
3286   }
3287   GST_SPLITMUX_UNLOCK (splitmux);
3288
3289   if (allow_grow) {
3290     guint cur_limit;
3291
3292     g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3293     cur_limit++;
3294
3295     GST_DEBUG_OBJECT (q,
3296         "Queue overflowed and needs enlarging. Growing to %u buffers",
3297         cur_limit);
3298
3299     g_object_set (q, "max-size-buffers", cur_limit, NULL);
3300   }
3301 }
3302
3303 /* Called with SPLITMUX lock held */
3304 static const gchar *
3305 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3306 {
3307   const gchar *ret = NULL;
3308
3309   if (splitmux->muxerpad_map == NULL)
3310     return NULL;
3311
3312   if (sinkpad_name == NULL) {
3313     GST_WARNING_OBJECT (splitmux,
3314         "Can't look up request pad in pad map without providing a pad name");
3315     return NULL;
3316   }
3317
3318   ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3319   if (ret) {
3320     GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3321         ret);
3322     return g_strdup (ret);
3323   }
3324
3325   return NULL;
3326 }
3327
3328 static GstPad *
3329 gst_splitmux_sink_request_new_pad (GstElement * element,
3330     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3331 {
3332   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3333   GstPadTemplate *mux_template = NULL;
3334   GstPad *ret = NULL, *muxpad = NULL;
3335   GstElement *q;
3336   GstPad *q_sink = NULL, *q_src = NULL;
3337   gchar *gname, *qname;
3338   gboolean is_primary_video = FALSE, is_video = FALSE,
3339       muxer_is_requestpad = FALSE;
3340   MqStreamCtx *ctx;
3341   const gchar *muxer_padname = NULL;
3342
3343   GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3344
3345   GST_SPLITMUX_LOCK (splitmux);
3346   if (!create_muxer (splitmux))
3347     goto fail;
3348   g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3349
3350   if (g_str_equal (templ->name_template, "video") ||
3351       g_str_has_prefix (templ->name_template, "video_aux_")) {
3352     is_primary_video = g_str_equal (templ->name_template, "video");
3353     if (is_primary_video && splitmux->have_video)
3354       goto already_have_video;
3355     is_video = TRUE;
3356   }
3357
3358   /* See if there's a pad map and it lists this pad */
3359   muxer_padname = lookup_muxer_pad (splitmux, name);
3360
3361   if (muxer_padname == NULL) {
3362     if (is_video) {
3363       /* FIXME: Look for a pad template with matching caps, rather than by name */
3364       GST_DEBUG_OBJECT (element,
3365           "searching for pad-template with name 'video_%%u'");
3366       mux_template =
3367           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3368           (splitmux->muxer), "video_%u");
3369
3370       /* Fallback to find sink pad templates named 'video' (flvmux) */
3371       if (!mux_template) {
3372         GST_DEBUG_OBJECT (element,
3373             "searching for pad-template with name 'video'");
3374         mux_template =
3375             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3376             (splitmux->muxer), "video");
3377       }
3378       name = NULL;
3379     } else {
3380       GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3381           templ->name_template);
3382       mux_template =
3383           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3384           (splitmux->muxer), templ->name_template);
3385
3386       /* Fallback to find sink pad templates named 'audio' (flvmux) */
3387       if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3388         GST_DEBUG_OBJECT (element,
3389             "searching for pad-template with name 'audio'");
3390         mux_template =
3391             gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3392             (splitmux->muxer), "audio");
3393         name = NULL;
3394       }
3395     }
3396
3397     if (mux_template == NULL) {
3398       GST_DEBUG_OBJECT (element,
3399           "searching for pad-template with name 'sink_%%d'");
3400       mux_template =
3401           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3402           (splitmux->muxer), "sink_%d");
3403       name = NULL;
3404     }
3405     if (mux_template == NULL) {
3406       GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3407       mux_template =
3408           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3409           (splitmux->muxer), "sink");
3410       name = NULL;
3411     }
3412
3413     if (mux_template == NULL) {
3414       GST_ERROR_OBJECT (element,
3415           "unable to find a suitable sink pad-template on the muxer");
3416       goto fail;
3417     }
3418     GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3419         mux_template->name_template);
3420
3421     if (mux_template->presence == GST_PAD_REQUEST) {
3422       GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3423
3424       muxpad =
3425           gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3426       muxer_is_requestpad = TRUE;
3427     } else if (mux_template->presence == GST_PAD_ALWAYS) {
3428       GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3429
3430       muxpad =
3431           gst_element_get_static_pad (splitmux->muxer,
3432           mux_template->name_template);
3433     } else {
3434       GST_ERROR_OBJECT (element,
3435           "unexpected pad presence %d", mux_template->presence);
3436       goto fail;
3437     }
3438   } else {
3439     /* Have a muxer pad name */
3440     if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3441       if ((muxpad =
3442               gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3443         muxer_is_requestpad = TRUE;
3444     }
3445     g_free ((gchar *) muxer_padname);
3446     muxer_padname = NULL;
3447   }
3448
3449   /* One way or another, we must have a muxer pad by now */
3450   if (muxpad == NULL)
3451     goto fail;
3452
3453   if (is_primary_video)
3454     gname = g_strdup ("video");
3455   else if (name == NULL)
3456     gname = gst_pad_get_name (muxpad);
3457   else
3458     gname = g_strdup (name);
3459
3460   qname = g_strdup_printf ("queue_%s", gname);
3461   if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3462     g_free (qname);
3463     goto fail;
3464   }
3465   g_free (qname);
3466
3467   gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3468
3469   g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3470       "max-size-buffers", 5, NULL);
3471
3472   q_sink = gst_element_get_static_pad (q, "sink");
3473   q_src = gst_element_get_static_pad (q, "src");
3474
3475   if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3476     if (muxer_is_requestpad)
3477       gst_element_release_request_pad (splitmux->muxer, muxpad);
3478     gst_object_unref (GST_OBJECT (muxpad));
3479     goto fail;
3480   }
3481
3482   gst_object_unref (GST_OBJECT (muxpad));
3483
3484   ctx = mq_stream_ctx_new (splitmux);
3485   /* Context holds a ref: */
3486   ctx->q = gst_object_ref (q);
3487   ctx->srcpad = q_src;
3488   ctx->sinkpad = q_sink;
3489   ctx->q_overrun_id =
3490       g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3491   g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3492
3493   ctx->src_pad_block_id =
3494       gst_pad_add_probe (q_src,
3495       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3496       (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3497   if (is_primary_video && splitmux->reference_ctx != NULL) {
3498     splitmux->reference_ctx->is_reference = FALSE;
3499     splitmux->reference_ctx = NULL;
3500   }
3501   if (splitmux->reference_ctx == NULL) {
3502     splitmux->reference_ctx = ctx;
3503     ctx->is_reference = TRUE;
3504   }
3505
3506   ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3507   g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3508
3509   ctx->sink_pad_block_id =
3510       gst_pad_add_probe (q_sink,
3511       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3512       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3513       (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3514
3515   GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3516       " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3517
3518   splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3519
3520   g_free (gname);
3521
3522   if (is_primary_video)
3523     splitmux->have_video = TRUE;
3524
3525   gst_pad_set_active (ret, TRUE);
3526   gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3527
3528   GST_SPLITMUX_UNLOCK (splitmux);
3529
3530   return ret;
3531 fail:
3532   GST_SPLITMUX_UNLOCK (splitmux);
3533
3534   if (q_sink)
3535     gst_object_unref (q_sink);
3536   if (q_src)
3537     gst_object_unref (q_src);
3538   return NULL;
3539 already_have_video:
3540   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3541   GST_SPLITMUX_UNLOCK (splitmux);
3542   return NULL;
3543 }
3544
3545 static void
3546 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3547 {
3548   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3549   GstPad *muxpad = NULL;
3550   MqStreamCtx *ctx =
3551       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3552
3553   GST_SPLITMUX_LOCK (splitmux);
3554
3555   if (splitmux->muxer == NULL)
3556     goto fail;                  /* Elements don't exist yet - nothing to release */
3557
3558   GST_INFO_OBJECT (pad, "releasing request pad");
3559
3560   muxpad = gst_pad_get_peer (ctx->srcpad);
3561
3562   /* Remove the context from our consideration */
3563   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3564
3565   GST_SPLITMUX_UNLOCK (splitmux);
3566
3567   if (ctx->sink_pad_block_id) {
3568     gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3569     gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3570   }
3571
3572   if (ctx->src_pad_block_id)
3573     gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3574
3575   GST_SPLITMUX_LOCK (splitmux);
3576
3577   ctx->is_releasing = TRUE;
3578   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3579
3580   /* Can release the context now */
3581   mq_stream_ctx_free (ctx);
3582   if (ctx == splitmux->reference_ctx)
3583     splitmux->reference_ctx = NULL;
3584
3585   /* Release and free the muxer input */
3586   if (muxpad) {
3587     gst_element_release_request_pad (splitmux->muxer, muxpad);
3588     gst_object_unref (muxpad);
3589   }
3590
3591   if (GST_PAD_PAD_TEMPLATE (pad) &&
3592       g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3593               (pad)), "video"))
3594     splitmux->have_video = FALSE;
3595
3596   gst_element_remove_pad (element, pad);
3597
3598   /* Reset the internal elements only after all request pads are released */
3599   if (splitmux->contexts == NULL)
3600     gst_splitmux_reset_elements (splitmux);
3601
3602   /* Wake up other input streams to check if the completion conditions have
3603    * changed */
3604   GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3605
3606 fail:
3607   GST_SPLITMUX_UNLOCK (splitmux);
3608 }
3609
3610 static GstElement *
3611 create_element (GstSplitMuxSink * splitmux,
3612     const gchar * factory, const gchar * name, gboolean locked)
3613 {
3614   GstElement *ret = gst_element_factory_make (factory, name);
3615   if (ret == NULL) {
3616     g_warning ("Failed to create %s - splitmuxsink will not work", name);
3617     return NULL;
3618   }
3619
3620   if (locked) {
3621     /* Ensure the sink starts in locked state and NULL - it will be changed
3622      * by the filename setting code */
3623     gst_element_set_locked_state (ret, TRUE);
3624     gst_element_set_state (ret, GST_STATE_NULL);
3625   }
3626
3627   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3628     g_warning ("Could not add %s element - splitmuxsink will not work", name);
3629     gst_object_unref (ret);
3630     return NULL;
3631   }
3632
3633   return ret;
3634 }
3635
3636 static gboolean
3637 create_muxer (GstSplitMuxSink * splitmux)
3638 {
3639   /* Create internal elements */
3640   if (splitmux->muxer == NULL) {
3641     GstElement *provided_muxer = NULL;
3642
3643     GST_OBJECT_LOCK (splitmux);
3644     if (splitmux->provided_muxer != NULL)
3645       provided_muxer = gst_object_ref (splitmux->provided_muxer);
3646     GST_OBJECT_UNLOCK (splitmux);
3647
3648     if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3649         (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3650       if ((splitmux->muxer =
3651               create_element (splitmux,
3652                   splitmux->muxer_factory ? splitmux->
3653                   muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3654         goto fail;
3655     } else if (splitmux->async_finalize) {
3656       if ((splitmux->muxer =
3657               create_element (splitmux, splitmux->muxer_factory, "muxer",
3658                   FALSE)) == NULL)
3659         goto fail;
3660       if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3661         gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3662             splitmux->muxer_preset);
3663       if (splitmux->muxer_properties)
3664         gst_structure_foreach (splitmux->muxer_properties,
3665             _set_property_from_structure, splitmux->muxer);
3666     } else {
3667       /* Ensure it's not in locked state (we might be reusing an old element) */
3668       gst_element_set_locked_state (provided_muxer, FALSE);
3669       if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3670         g_warning ("Could not add muxer element - splitmuxsink will not work");
3671         gst_object_unref (provided_muxer);
3672         goto fail;
3673       }
3674
3675       splitmux->muxer = provided_muxer;
3676       gst_object_unref (provided_muxer);
3677     }
3678
3679     if (splitmux->use_robust_muxing) {
3680       update_muxer_properties (splitmux);
3681     }
3682   }
3683
3684   return TRUE;
3685 fail:
3686   return FALSE;
3687 }
3688
3689 static GstElement *
3690 find_sink (GstElement * e)
3691 {
3692   GstElement *res = NULL;
3693   GstIterator *iter;
3694   gboolean done = FALSE;
3695   GValue data = { 0, };
3696
3697   if (!GST_IS_BIN (e))
3698     return e;
3699
3700   if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3701     return e;
3702
3703   iter = gst_bin_iterate_sinks (GST_BIN (e));
3704   while (!done) {
3705     switch (gst_iterator_next (iter, &data)) {
3706       case GST_ITERATOR_OK:
3707       {
3708         GstElement *child = g_value_get_object (&data);
3709         if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3710                 "location") != NULL) {
3711           res = child;
3712           done = TRUE;
3713         }
3714         g_value_reset (&data);
3715         break;
3716       }
3717       case GST_ITERATOR_RESYNC:
3718         gst_iterator_resync (iter);
3719         break;
3720       case GST_ITERATOR_DONE:
3721         done = TRUE;
3722         break;
3723       case GST_ITERATOR_ERROR:
3724         g_assert_not_reached ();
3725         break;
3726     }
3727   }
3728   g_value_unset (&data);
3729   gst_iterator_free (iter);
3730
3731   return res;
3732 }
3733
3734 static gboolean
3735 create_sink (GstSplitMuxSink * splitmux)
3736 {
3737   GstElement *provided_sink = NULL;
3738
3739   if (splitmux->active_sink == NULL) {
3740
3741     GST_OBJECT_LOCK (splitmux);
3742     if (splitmux->provided_sink != NULL)
3743       provided_sink = gst_object_ref (splitmux->provided_sink);
3744     GST_OBJECT_UNLOCK (splitmux);
3745
3746     if ((!splitmux->async_finalize && provided_sink == NULL) ||
3747         (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3748       if ((splitmux->sink =
3749               create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3750         goto fail;
3751       splitmux->active_sink = splitmux->sink;
3752     } else if (splitmux->async_finalize) {
3753       if ((splitmux->sink =
3754               create_element (splitmux, splitmux->sink_factory, "sink",
3755                   TRUE)) == NULL)
3756         goto fail;
3757       if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3758         gst_preset_load_preset (GST_PRESET (splitmux->sink),
3759             splitmux->sink_preset);
3760       if (splitmux->sink_properties)
3761         gst_structure_foreach (splitmux->sink_properties,
3762             _set_property_from_structure, splitmux->sink);
3763       splitmux->active_sink = splitmux->sink;
3764     } else {
3765       /* Ensure the sink starts in locked state and NULL - it will be changed
3766        * by the filename setting code */
3767       gst_element_set_locked_state (provided_sink, TRUE);
3768       gst_element_set_state (provided_sink, GST_STATE_NULL);
3769       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3770         g_warning ("Could not add sink elements - splitmuxsink will not work");
3771         gst_object_unref (provided_sink);
3772         goto fail;
3773       }
3774
3775       splitmux->active_sink = provided_sink;
3776
3777       /* The bin holds a ref now, we can drop our tmp ref */
3778       gst_object_unref (provided_sink);
3779
3780       /* Find the sink element */
3781       splitmux->sink = find_sink (splitmux->active_sink);
3782       if (splitmux->sink == NULL) {
3783         g_warning
3784             ("Could not locate sink element in provided sink - splitmuxsink will not work");
3785         goto fail;
3786       }
3787     }
3788
3789 #if 1
3790     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3791             "async") != NULL) {
3792       /* async child elements are causing state change races and weird
3793        * failures, so let's try and turn that off */
3794       g_object_set (splitmux->sink, "async", FALSE, NULL);
3795     }
3796 #endif
3797
3798     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3799       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3800       goto fail;
3801     }
3802   }
3803
3804   return TRUE;
3805 fail:
3806   return FALSE;
3807 }
3808
3809 #ifdef __GNUC__
3810 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3811 #endif
3812 static void
3813 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3814 {
3815   gchar *fname = NULL;
3816   GstSample *sample;
3817   GstCaps *caps;
3818
3819   gst_splitmux_sink_ensure_max_files (splitmux);
3820
3821   if (ctx->cur_out_buffer == NULL) {
3822     GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3823   }
3824
3825   caps = gst_pad_get_current_caps (ctx->srcpad);
3826   sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3827   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3828       splitmux->fragment_id, sample, &fname);
3829   gst_sample_unref (sample);
3830   if (caps)
3831     gst_caps_unref (caps);
3832
3833   if (fname == NULL) {
3834     /* Fallback to the old signal if the new one returned nothing */
3835     g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3836         splitmux->fragment_id, &fname);
3837   }
3838
3839   if (!fname)
3840     fname = splitmux->location ?
3841         g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3842
3843   if (fname) {
3844     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3845     if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3846             "location") != NULL)
3847       g_object_set (splitmux->sink, "location", fname, NULL);
3848     g_free (fname);
3849   }
3850
3851   splitmux->fragment_id++;
3852 }
3853
3854 /* called with GST_SPLITMUX_LOCK */
3855 static void
3856 do_async_start (GstSplitMuxSink * splitmux)
3857 {
3858   GstMessage *message;
3859
3860   if (!splitmux->need_async_start) {
3861     GST_INFO_OBJECT (splitmux, "no async_start needed");
3862     return;
3863   }
3864
3865   splitmux->async_pending = TRUE;
3866
3867   GST_INFO_OBJECT (splitmux, "Sending async_start message");
3868   message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3869
3870   GST_SPLITMUX_UNLOCK (splitmux);
3871   GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3872       (splitmux), message);
3873   GST_SPLITMUX_LOCK (splitmux);
3874 }
3875
3876 /* called with GST_SPLITMUX_LOCK */
3877 static void
3878 do_async_done (GstSplitMuxSink * splitmux)
3879 {
3880   GstMessage *message;
3881
3882   if (splitmux->async_pending) {
3883     GST_INFO_OBJECT (splitmux, "Sending async_done message");
3884     splitmux->async_pending = FALSE;
3885     GST_SPLITMUX_UNLOCK (splitmux);
3886
3887     message =
3888         gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3889         GST_CLOCK_TIME_NONE);
3890     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3891         (splitmux), message);
3892     GST_SPLITMUX_LOCK (splitmux);
3893   }
3894
3895   splitmux->need_async_start = FALSE;
3896 }
3897
3898 static void
3899 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3900 {
3901   splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3902   splitmux->max_in_running_time_dts = GST_CLOCK_STIME_NONE;
3903
3904   splitmux->fragment_start_time = GST_CLOCK_STIME_NONE;
3905   splitmux->fragment_start_time_pts = GST_CLOCK_STIME_NONE;
3906   g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
3907
3908   g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
3909   g_queue_clear (&splitmux->pending_input_gops);
3910
3911   splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
3912   splitmux->fragment_total_bytes = 0;
3913   splitmux->fragment_reference_bytes = 0;
3914   splitmux->muxed_out_bytes = 0;
3915   splitmux->ready_for_output = FALSE;
3916
3917   g_atomic_int_set (&(splitmux->split_requested), FALSE);
3918   g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3919
3920   splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
3921   gst_queue_array_clear (splitmux->times_to_split);
3922
3923   g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
3924   splitmux->queued_keyframes = 0;
3925
3926   g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
3927   g_queue_clear (&splitmux->out_cmd_q);
3928 }
3929
3930 static GstStateChangeReturn
3931 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3932 {
3933   GstStateChangeReturn ret;
3934   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3935
3936   switch (transition) {
3937     case GST_STATE_CHANGE_NULL_TO_READY:{
3938       GST_SPLITMUX_LOCK (splitmux);
3939       if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3940         ret = GST_STATE_CHANGE_FAILURE;
3941         GST_SPLITMUX_UNLOCK (splitmux);
3942         goto beach;
3943       }
3944       g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3945       g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3946       GST_SPLITMUX_UNLOCK (splitmux);
3947       splitmux->fragment_id = splitmux->start_index;
3948       break;
3949     }
3950     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3951       GST_SPLITMUX_LOCK (splitmux);
3952       /* Make sure contexts and tracking times are cleared, in case we're being reused */
3953       gst_splitmux_sink_reset (splitmux);
3954       /* Start by collecting one input on each pad */
3955       splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3956       splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3957
3958       GST_SPLITMUX_UNLOCK (splitmux);
3959
3960       GST_SPLITMUX_STATE_LOCK (splitmux);
3961       splitmux->shutdown = FALSE;
3962       GST_SPLITMUX_STATE_UNLOCK (splitmux);
3963       break;
3964     }
3965     case GST_STATE_CHANGE_PAUSED_TO_READY:
3966     case GST_STATE_CHANGE_READY_TO_READY:
3967       g_atomic_int_set (&(splitmux->split_requested), FALSE);
3968       g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3969       /* Fall through */
3970     case GST_STATE_CHANGE_READY_TO_NULL:
3971       GST_SPLITMUX_STATE_LOCK (splitmux);
3972       splitmux->shutdown = TRUE;
3973       GST_SPLITMUX_STATE_UNLOCK (splitmux);
3974
3975       GST_SPLITMUX_LOCK (splitmux);
3976       gst_splitmux_sink_reset (splitmux);
3977       splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3978       splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3979       /* Wake up any blocked threads */
3980       GST_LOG_OBJECT (splitmux,
3981           "State change -> NULL or READY. Waking threads");
3982       GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3983       GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
3984       GST_SPLITMUX_UNLOCK (splitmux);
3985       break;
3986     default:
3987       break;
3988   }
3989
3990   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3991   if (ret == GST_STATE_CHANGE_FAILURE)
3992     goto beach;
3993
3994   switch (transition) {
3995     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3996       splitmux->need_async_start = TRUE;
3997       break;
3998     case GST_STATE_CHANGE_READY_TO_PAUSED:{
3999       /* Change state async, because our child sink might not
4000        * be ready to do that for us yet if it's state is still locked */
4001
4002       splitmux->need_async_start = TRUE;
4003       /* we want to go async to PAUSED until we managed to configure and add the
4004        * sink */
4005       GST_SPLITMUX_LOCK (splitmux);
4006       do_async_start (splitmux);
4007       GST_SPLITMUX_UNLOCK (splitmux);
4008       ret = GST_STATE_CHANGE_ASYNC;
4009       break;
4010     }
4011     case GST_STATE_CHANGE_READY_TO_NULL:
4012       GST_SPLITMUX_LOCK (splitmux);
4013       splitmux->fragment_id = 0;
4014       /* Reset internal elements only if no pad contexts are using them */
4015       if (splitmux->contexts == NULL)
4016         gst_splitmux_reset_elements (splitmux);
4017       do_async_done (splitmux);
4018       GST_SPLITMUX_UNLOCK (splitmux);
4019       break;
4020     default:
4021       break;
4022   }
4023
4024   return ret;
4025
4026 beach:
4027   if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
4028     /* Cleanup elements on failed transition out of NULL */
4029     gst_splitmux_reset_elements (splitmux);
4030     GST_SPLITMUX_LOCK (splitmux);
4031     do_async_done (splitmux);
4032     GST_SPLITMUX_UNLOCK (splitmux);
4033   }
4034   if (transition == GST_STATE_CHANGE_READY_TO_READY) {
4035     /* READY to READY transition only happens when we're already
4036      * in READY state, but a child element is in NULL, which
4037      * happens when there's an error changing the state of the sink.
4038      * We need to make sure not to fail the state transition, or
4039      * the core won't transition us back to NULL successfully */
4040     ret = GST_STATE_CHANGE_SUCCESS;
4041   }
4042   return ret;
4043 }
4044
4045 static void
4046 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
4047 {
4048   if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
4049     splitmux->fragment_id = 0;
4050   }
4051 }
4052
4053 static void
4054 split_now (GstSplitMuxSink * splitmux)
4055 {
4056   g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
4057 }
4058
4059 static void
4060 split_after (GstSplitMuxSink * splitmux)
4061 {
4062   g_atomic_int_set (&(splitmux->split_requested), TRUE);
4063 }
4064
4065 static void
4066 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
4067 {
4068   gboolean send_keyframe_requests;
4069
4070   GST_SPLITMUX_LOCK (splitmux);
4071   gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
4072   send_keyframe_requests = splitmux->send_keyframe_requests;
4073   GST_SPLITMUX_UNLOCK (splitmux);
4074
4075   if (send_keyframe_requests) {
4076     GstEvent *ev =
4077         gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
4078     GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
4079         GST_TIME_ARGS (split_time));
4080     if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
4081       GST_WARNING_OBJECT (splitmux,
4082           "Could not request keyframe at %" GST_TIME_FORMAT,
4083           GST_TIME_ARGS (split_time));
4084     }
4085   }
4086 }