1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @title: splitmuxsink
23 * @short_description: Muxer wrapper for splitting output stream by size or time
25 * This element wraps a muxer and a sink, and starts a new file when the mux
26 * contents are about to cross a threshold of maximum size of maximum time,
27 * splitting at video keyframe boundaries. Exactly one input video stream
28 * can be muxed, with as many accompanying audio and subtitle streams as
31 * By default, it uses mp4mux and filesink, but they can be changed via
32 * the 'muxer' and 'sink' properties.
34 * The minimum file size is 1 GOP, however - so limits may be overrun if the
35 * distance between any 2 keyframes is larger than the limits.
37 * If a video stream is available, the splitting process is driven by the video
38 * stream contents, and the video stream must contain closed GOPs for the output
39 * file parts to be played individually correctly. In the absence of a video
40 * stream, the first available stream is used as reference for synchronization.
42 * In the async-finalize mode, when the threshold is crossed, the old muxer
43 * and sink is disconnected from the pipeline and left to finish the file
44 * asynchronously, and a new muxer and sink is created to continue with the
45 * next fragment. For that reason, instead of muxer and sink objects, the
46 * muxer-factory and sink-factory properties are used to construct the new
47 * objects, together with muxer-properties and sink-properties.
49 * ## Example pipelines
51 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
53 * Records a video stream captured from a v4l2 device and muxes it into
54 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55 * and 1MB maximum size.
58 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
60 * Records a video stream captured from a v4l2 device and muxer it into
61 * streamable Matroska files, splitting as needed to limit size/duration to 10
62 * seconds. Each file will finalize asynchronously.
65 * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
67 * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97 GstClockTime split_time);
106 PROP_MAX_SIZE_TIMECODE,
107 PROP_SEND_KEYFRAME_REQUESTS,
110 PROP_USE_ROBUST_MUXING,
111 PROP_ALIGNMENT_THRESHOLD,
118 PROP_MUXER_PROPERTIES,
121 PROP_SINK_PROPERTIES,
125 #define DEFAULT_MAX_SIZE_TIME 0
126 #define DEFAULT_MAX_SIZE_BYTES 0
127 #define DEFAULT_MAX_FILES 0
128 #define DEFAULT_MUXER_OVERHEAD 0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
138 typedef struct _AsyncEosHelper
146 SIGNAL_FORMAT_LOCATION,
147 SIGNAL_FORMAT_LOCATION_FULL,
150 SIGNAL_SPLIT_AT_RUNNING_TIME,
156 static guint signals[SIGNAL_LAST];
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
162 GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
167 GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
172 GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
177 GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
182 GST_STATIC_CAPS_ANY);
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188 * to forward an incoming EOS message, but we cannot rely on the state of the
189 * splitmux anymore, so we set this qdata on the sink instead.
190 * The muxer and sink must be destroyed after both of these things have
192 * 1) The EOS message has been sent when the fragment is ending
193 * 2) The muxer has been unlinked and relinked
194 * Therefore, EOS_FROM_US can have these two values:
195 * 0: EOS was not requested from us. Forward the message. The muxer and the
196 * sink will be destroyed together with the rest of the bin.
197 * 1: EOS was requested from us, but the other of the two tasks hasn't
198 * finished. Set EOS_FROM_US to 2 and do your stuff.
199 * 2: EOS was requested from us and the other of the two tasks has finished.
200 * Now we can destroy the muxer and the sink.
206 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207 EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208 RUNNING_TIME = g_quark_from_static_string ("running-time");
209 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210 "Split File Muxing Sink");
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217 GST_TYPE_SPLITMUX_SINK);
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222 const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224 GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233 element, GstStateChange transition);
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244 const gchar * factory, const gchar * name, gboolean locked);
246 static void do_async_done (GstSplitMuxSink * splitmux);
248 static GstClockTime calculate_next_max_timecode (GstSplitMuxSink * splitmux,
249 const GstVideoTimeCode * cur_tc, GstClockTime running_time,
250 GstVideoTimeCode ** next_tc);
253 mq_stream_buf_new (void)
255 return g_slice_new0 (MqStreamBuf);
259 mq_stream_buf_free (MqStreamBuf * data)
261 g_slice_free (MqStreamBuf, data);
264 static SplitMuxOutputCommand *
265 out_cmd_buf_new (void)
267 return g_slice_new0 (SplitMuxOutputCommand);
271 out_cmd_buf_free (SplitMuxOutputCommand * data)
273 g_slice_free (SplitMuxOutputCommand, data);
277 input_gop_free (InputGop * gop)
279 g_clear_pointer (&gop->start_tc, gst_video_time_code_free);
280 g_slice_free (InputGop, gop);
284 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
286 GObjectClass *gobject_class = (GObjectClass *) klass;
287 GstElementClass *gstelement_class = (GstElementClass *) klass;
288 GstBinClass *gstbin_class = (GstBinClass *) klass;
290 gobject_class->set_property = gst_splitmux_sink_set_property;
291 gobject_class->get_property = gst_splitmux_sink_get_property;
292 gobject_class->dispose = gst_splitmux_sink_dispose;
293 gobject_class->finalize = gst_splitmux_sink_finalize;
295 gst_element_class_set_static_metadata (gstelement_class,
296 "Split Muxing Bin", "Generic/Bin/Muxer",
297 "Convenience bin that muxes incoming streams into multiple time/size limited files",
298 "Jan Schmidt <jan@centricular.com>");
300 gst_element_class_add_static_pad_template (gstelement_class,
301 &video_sink_template);
302 gst_element_class_add_static_pad_template (gstelement_class,
303 &video_aux_sink_template);
304 gst_element_class_add_static_pad_template (gstelement_class,
305 &audio_sink_template);
306 gst_element_class_add_static_pad_template (gstelement_class,
307 &subtitle_sink_template);
308 gst_element_class_add_static_pad_template (gstelement_class,
309 &caption_sink_template);
311 gstelement_class->change_state =
312 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
313 gstelement_class->request_new_pad =
314 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
315 gstelement_class->release_pad =
316 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
318 gstbin_class->handle_message = bus_handler;
320 g_object_class_install_property (gobject_class, PROP_LOCATION,
321 g_param_spec_string ("location", "File Output Pattern",
322 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
323 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
325 g_param_spec_double ("mux-overhead", "Muxing Overhead",
326 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
327 DEFAULT_MUXER_OVERHEAD,
328 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
330 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
331 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
332 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
333 DEFAULT_MAX_SIZE_TIME,
334 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
335 G_PARAM_STATIC_STRINGS));
336 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
337 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
338 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
339 DEFAULT_MAX_SIZE_BYTES,
340 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
341 G_PARAM_STATIC_STRINGS));
342 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
343 g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
344 "Maximum difference in timecode between first and last frame. "
345 "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
346 "Will only be effective if a timecode track is present.", NULL,
347 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
348 G_PARAM_STATIC_STRINGS));
349 g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
350 g_param_spec_boolean ("send-keyframe-requests",
351 "Request keyframes at max-size-time",
352 "Request a keyframe every max-size-time ns to try splitting at that point. "
353 "Needs max-size-bytes to be 0 in order to be effective.",
354 DEFAULT_SEND_KEYFRAME_REQUESTS,
355 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
356 G_PARAM_STATIC_STRINGS));
357 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
358 g_param_spec_uint ("max-files", "Max files",
359 "Maximum number of files to keep on disk. Once the maximum is reached,"
360 "old files start to be deleted to make room for new ones.", 0,
361 G_MAXUINT, DEFAULT_MAX_FILES,
362 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363 g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
364 g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
365 "Allow non-reference streams to be that many ns before the reference"
366 " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
367 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
368 G_PARAM_STATIC_STRINGS));
370 g_object_class_install_property (gobject_class, PROP_MUXER,
371 g_param_spec_object ("muxer", "Muxer",
372 "The muxer element to use (NULL = default mp4mux). "
373 "Valid only for async-finalize = FALSE",
374 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375 g_object_class_install_property (gobject_class, PROP_SINK,
376 g_param_spec_object ("sink", "Sink",
377 "The sink element (or element chain) to use (NULL = default filesink). "
378 "Valid only for async-finalize = FALSE",
379 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
381 g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
382 g_param_spec_boolean ("use-robust-muxing",
383 "Support robust-muxing mode of some muxers",
384 "Check if muxers support robust muxing via the reserved-max-duration and "
385 "reserved-duration-remaining properties and use them if so. "
386 "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
387 " create new fragments if the reserved header space is about to overflow. "
388 "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
389 "manually by the app to a non-zero value for robust muxing to have an effect.",
390 DEFAULT_USE_ROBUST_MUXING,
391 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
393 g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
394 g_param_spec_boolean ("reset-muxer",
396 "Reset the muxer after each segment. Disabling this will not work for most muxers.",
397 DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399 g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
400 g_param_spec_boolean ("async-finalize",
401 "Finalize fragments asynchronously",
402 "Finalize each fragment asynchronously and start a new one",
403 DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404 g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
405 g_param_spec_string ("muxer-factory", "Muxer factory",
406 "The muxer element factory to use (default = mp4mux). "
407 "Valid only for async-finalize = TRUE",
408 "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
410 * GstSplitMuxSink:muxer-preset
412 * An optional #GstPreset name to use for the muxer. This only has an effect
413 * in `async-finalize=TRUE` mode.
417 g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
418 g_param_spec_string ("muxer-preset", "Muxer preset",
419 "The muxer preset to use. "
420 "Valid only for async-finalize = TRUE",
421 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422 g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
423 g_param_spec_boxed ("muxer-properties", "Muxer properties",
424 "The muxer element properties to use. "
425 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
426 "Valid only for async-finalize = TRUE",
427 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428 g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
429 g_param_spec_string ("sink-factory", "Sink factory",
430 "The sink element factory to use (default = filesink). "
431 "Valid only for async-finalize = TRUE",
432 "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
434 * GstSplitMuxSink:sink-preset
436 * An optional #GstPreset name to use for the sink. This only has an effect
437 * in `async-finalize=TRUE` mode.
441 g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
442 g_param_spec_string ("sink-preset", "Sink preset",
443 "The sink preset to use. "
444 "Valid only for async-finalize = TRUE",
445 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446 g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
447 g_param_spec_boxed ("sink-properties", "Sink properties",
448 "The sink element properties to use. "
449 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
450 "Valid only for async-finalize = TRUE",
451 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452 g_object_class_install_property (gobject_class, PROP_START_INDEX,
453 g_param_spec_int ("start-index", "Start Index",
454 "Start value of fragment index.",
455 0, G_MAXINT, DEFAULT_START_INDEX,
456 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
459 * GstSplitMuxSink::muxer-pad-map
461 * An optional GstStructure that provides a map from splitmuxsink sinkpad
462 * names to muxer pad names they should feed. Splitmuxsink has some default
463 * mapping behaviour to link video to video pads and audio to audio pads
464 * that usually works fine. This property is useful if you need to ensure
465 * a particular mapping to muxed streams.
467 * The GstStructure contains string fields like so:
468 * splitmuxsink muxer-pad-map=x-pad-map,video=video_1
472 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
473 g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
474 "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
476 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
479 * GstSplitMuxSink::format-location:
480 * @splitmux: the #GstSplitMuxSink
481 * @fragment_id: the sequence number of the file to be created
483 * Returns: the location to be used for the next output file. This must be
484 * a newly-allocated string which will be freed with g_free() by the
485 * splitmuxsink element when it no longer needs it, so use g_strdup() or
486 * g_strdup_printf() or similar functions to allocate it.
488 signals[SIGNAL_FORMAT_LOCATION] =
489 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
490 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
493 * GstSplitMuxSink::format-location-full:
494 * @splitmux: the #GstSplitMuxSink
495 * @fragment_id: the sequence number of the file to be created
496 * @first_sample: A #GstSample containing the first buffer
497 * from the reference stream in the new file
499 * Returns: the location to be used for the next output file. This must be
500 * a newly-allocated string which will be freed with g_free() by the
501 * splitmuxsink element when it no longer needs it, so use g_strdup() or
502 * g_strdup_printf() or similar functions to allocate it.
506 signals[SIGNAL_FORMAT_LOCATION_FULL] =
507 g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
508 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
512 * GstSplitMuxSink::split-now:
513 * @splitmux: the #GstSplitMuxSink
515 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
516 * The current GOP will be output to the new file.
520 signals[SIGNAL_SPLIT_NOW] =
521 g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
522 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
523 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
527 * GstSplitMuxSink::split-after:
528 * @splitmux: the #GstSplitMuxSink
530 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
531 * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
535 signals[SIGNAL_SPLIT_AFTER] =
536 g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
537 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
538 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
542 * GstSplitMuxSink::split-at-running-time:
543 * @splitmux: the #GstSplitMuxSink
545 * When called by the user, this action signal splits the video file (and
546 * begins a new one) as soon as the given running time is reached. If this
547 * action signal is called multiple times, running times are queued up and
548 * processed in the order they were given.
550 * Note that this is prone to race conditions, where said running time is
551 * reached and surpassed before we had a chance to split. The file will
552 * still split immediately, but in order to make sure that the split doesn't
553 * happen too late, it is recommended to call this action signal from
554 * something that will prevent further buffers from flowing into
555 * splitmuxsink before the split is completed, such as a pad probe before
561 signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
562 g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
563 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
564 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
565 NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
568 * GstSplitMuxSink::muxer-added:
569 * @splitmux: the #GstSplitMuxSink
570 * @muxer: the newly added muxer element
574 signals[SIGNAL_MUXER_ADDED] =
575 g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
576 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
579 * GstSplitMuxSink::sink-added:
580 * @splitmux: the #GstSplitMuxSink
581 * @sink: the newly added sink element
585 signals[SIGNAL_SINK_ADDED] =
586 g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
587 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
589 klass->split_now = split_now;
590 klass->split_after = split_after;
591 klass->split_at_running_time = split_at_running_time;
595 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
597 g_mutex_init (&splitmux->lock);
598 g_mutex_init (&splitmux->state_lock);
599 g_cond_init (&splitmux->input_cond);
600 g_cond_init (&splitmux->output_cond);
601 g_queue_init (&splitmux->out_cmd_q);
603 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
604 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
605 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
606 splitmux->max_files = DEFAULT_MAX_FILES;
607 splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
608 splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
609 splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
610 splitmux->reset_muxer = DEFAULT_RESET_MUXER;
612 splitmux->threshold_timecode_str = NULL;
614 splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
615 splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
616 splitmux->muxer_properties = NULL;
617 splitmux->sink_factory = g_strdup (DEFAULT_SINK);
618 splitmux->sink_properties = NULL;
620 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
621 splitmux->split_requested = FALSE;
622 splitmux->do_split_next_gop = FALSE;
623 splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
624 splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
626 g_queue_init (&splitmux->pending_input_gops);
630 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
632 if (splitmux->muxer) {
633 gst_element_set_locked_state (splitmux->muxer, TRUE);
634 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
635 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
637 if (splitmux->active_sink) {
638 gst_element_set_locked_state (splitmux->active_sink, TRUE);
639 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
640 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
643 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
647 gst_splitmux_sink_dispose (GObject * object)
649 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
651 /* Calling parent dispose invalidates all child pointers */
652 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
654 G_OBJECT_CLASS (parent_class)->dispose (object);
658 gst_splitmux_sink_finalize (GObject * object)
660 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
662 g_cond_clear (&splitmux->input_cond);
663 g_cond_clear (&splitmux->output_cond);
664 g_mutex_clear (&splitmux->lock);
665 g_mutex_clear (&splitmux->state_lock);
666 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
667 g_queue_clear (&splitmux->out_cmd_q);
668 g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
669 g_queue_clear (&splitmux->pending_input_gops);
671 g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
673 if (splitmux->muxerpad_map)
674 gst_structure_free (splitmux->muxerpad_map);
676 if (splitmux->provided_sink)
677 gst_object_unref (splitmux->provided_sink);
678 if (splitmux->provided_muxer)
679 gst_object_unref (splitmux->provided_muxer);
681 if (splitmux->muxer_factory)
682 g_free (splitmux->muxer_factory);
683 if (splitmux->muxer_preset)
684 g_free (splitmux->muxer_preset);
685 if (splitmux->muxer_properties)
686 gst_structure_free (splitmux->muxer_properties);
687 if (splitmux->sink_factory)
688 g_free (splitmux->sink_factory);
689 if (splitmux->sink_preset)
690 g_free (splitmux->sink_preset);
691 if (splitmux->sink_properties)
692 gst_structure_free (splitmux->sink_properties);
694 if (splitmux->threshold_timecode_str)
695 g_free (splitmux->threshold_timecode_str);
696 if (splitmux->tc_interval)
697 gst_video_time_code_interval_free (splitmux->tc_interval);
699 if (splitmux->times_to_split)
700 gst_queue_array_free (splitmux->times_to_split);
702 g_free (splitmux->location);
704 /* Make sure to free any un-released contexts. There should not be any,
705 * because the dispose will have freed all request pads though */
706 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
707 g_list_free (splitmux->contexts);
709 G_OBJECT_CLASS (parent_class)->finalize (object);
713 * Set any time threshold to the muxer, if it has
714 * reserved-max-duration and reserved-duration-remaining
715 * properties. Called when creating/claiming the muxer
716 * in create_elements() */
718 update_muxer_properties (GstSplitMuxSink * sink)
721 GstClockTime threshold_time;
723 sink->muxer_has_reserved_props = FALSE;
724 if (sink->muxer == NULL)
726 klass = G_OBJECT_GET_CLASS (sink->muxer);
727 if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
729 if (g_object_class_find_property (klass,
730 "reserved-duration-remaining") == NULL)
732 sink->muxer_has_reserved_props = TRUE;
734 GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
735 GST_TIME_ARGS (sink->threshold_time));
736 GST_OBJECT_LOCK (sink);
737 threshold_time = sink->threshold_time;
738 GST_OBJECT_UNLOCK (sink);
740 if (threshold_time > 0) {
741 /* Tell the muxer how much space to reserve */
742 GstClockTime muxer_threshold = threshold_time;
743 g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
748 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
749 const GValue * value, GParamSpec * pspec)
751 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
755 GST_OBJECT_LOCK (splitmux);
756 g_free (splitmux->location);
757 splitmux->location = g_value_dup_string (value);
758 GST_OBJECT_UNLOCK (splitmux);
761 case PROP_START_INDEX:
762 GST_OBJECT_LOCK (splitmux);
763 splitmux->start_index = g_value_get_int (value);
764 GST_OBJECT_UNLOCK (splitmux);
766 case PROP_MAX_SIZE_BYTES:
767 GST_OBJECT_LOCK (splitmux);
768 splitmux->threshold_bytes = g_value_get_uint64 (value);
769 GST_OBJECT_UNLOCK (splitmux);
771 case PROP_MAX_SIZE_TIME:
772 GST_OBJECT_LOCK (splitmux);
773 splitmux->threshold_time = g_value_get_uint64 (value);
774 GST_OBJECT_UNLOCK (splitmux);
776 case PROP_MAX_SIZE_TIMECODE:
777 GST_OBJECT_LOCK (splitmux);
778 g_free (splitmux->threshold_timecode_str);
779 /* will be calculated later */
780 g_clear_pointer (&splitmux->tc_interval,
781 gst_video_time_code_interval_free);
783 splitmux->threshold_timecode_str = g_value_dup_string (value);
784 if (splitmux->threshold_timecode_str) {
785 splitmux->tc_interval =
786 gst_video_time_code_interval_new_from_string
787 (splitmux->threshold_timecode_str);
788 if (!splitmux->tc_interval) {
789 g_warning ("Wrong timecode string %s",
790 splitmux->threshold_timecode_str);
791 g_free (splitmux->threshold_timecode_str);
792 splitmux->threshold_timecode_str = NULL;
795 splitmux->next_fragment_start_tc_time = GST_CLOCK_TIME_NONE;
796 if (splitmux->tc_interval && splitmux->fragment_start_tc) {
797 splitmux->next_fragment_start_tc_time =
798 calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
799 splitmux->fragment_start_time, NULL);
801 GST_OBJECT_UNLOCK (splitmux);
803 case PROP_SEND_KEYFRAME_REQUESTS:
804 GST_OBJECT_LOCK (splitmux);
805 splitmux->send_keyframe_requests = g_value_get_boolean (value);
806 GST_OBJECT_UNLOCK (splitmux);
809 GST_OBJECT_LOCK (splitmux);
810 splitmux->max_files = g_value_get_uint (value);
811 GST_OBJECT_UNLOCK (splitmux);
813 case PROP_MUXER_OVERHEAD:
814 GST_OBJECT_LOCK (splitmux);
815 splitmux->mux_overhead = g_value_get_double (value);
816 GST_OBJECT_UNLOCK (splitmux);
818 case PROP_USE_ROBUST_MUXING:
819 GST_OBJECT_LOCK (splitmux);
820 splitmux->use_robust_muxing = g_value_get_boolean (value);
821 GST_OBJECT_UNLOCK (splitmux);
822 if (splitmux->use_robust_muxing)
823 update_muxer_properties (splitmux);
825 case PROP_ALIGNMENT_THRESHOLD:
826 GST_OBJECT_LOCK (splitmux);
827 splitmux->alignment_threshold = g_value_get_uint64 (value);
828 GST_OBJECT_UNLOCK (splitmux);
831 GST_OBJECT_LOCK (splitmux);
832 gst_clear_object (&splitmux->provided_sink);
833 splitmux->provided_sink = g_value_get_object (value);
834 if (splitmux->provided_sink)
835 gst_object_ref_sink (splitmux->provided_sink);
836 GST_OBJECT_UNLOCK (splitmux);
839 GST_OBJECT_LOCK (splitmux);
840 gst_clear_object (&splitmux->provided_muxer);
841 splitmux->provided_muxer = g_value_get_object (value);
842 if (splitmux->provided_muxer)
843 gst_object_ref_sink (splitmux->provided_muxer);
844 GST_OBJECT_UNLOCK (splitmux);
846 case PROP_RESET_MUXER:
847 GST_OBJECT_LOCK (splitmux);
848 splitmux->reset_muxer = g_value_get_boolean (value);
849 GST_OBJECT_UNLOCK (splitmux);
851 case PROP_ASYNC_FINALIZE:
852 GST_OBJECT_LOCK (splitmux);
853 splitmux->async_finalize = g_value_get_boolean (value);
854 GST_OBJECT_UNLOCK (splitmux);
856 case PROP_MUXER_FACTORY:
857 GST_OBJECT_LOCK (splitmux);
858 if (splitmux->muxer_factory)
859 g_free (splitmux->muxer_factory);
860 splitmux->muxer_factory = g_value_dup_string (value);
861 GST_OBJECT_UNLOCK (splitmux);
863 case PROP_MUXER_PRESET:
864 GST_OBJECT_LOCK (splitmux);
865 if (splitmux->muxer_preset)
866 g_free (splitmux->muxer_preset);
867 splitmux->muxer_preset = g_value_dup_string (value);
868 GST_OBJECT_UNLOCK (splitmux);
870 case PROP_MUXER_PROPERTIES:
871 GST_OBJECT_LOCK (splitmux);
872 if (splitmux->muxer_properties)
873 gst_structure_free (splitmux->muxer_properties);
874 if (gst_value_get_structure (value))
875 splitmux->muxer_properties =
876 gst_structure_copy (gst_value_get_structure (value));
878 splitmux->muxer_properties = NULL;
879 GST_OBJECT_UNLOCK (splitmux);
881 case PROP_SINK_FACTORY:
882 GST_OBJECT_LOCK (splitmux);
883 if (splitmux->sink_factory)
884 g_free (splitmux->sink_factory);
885 splitmux->sink_factory = g_value_dup_string (value);
886 GST_OBJECT_UNLOCK (splitmux);
888 case PROP_SINK_PRESET:
889 GST_OBJECT_LOCK (splitmux);
890 if (splitmux->sink_preset)
891 g_free (splitmux->sink_preset);
892 splitmux->sink_preset = g_value_dup_string (value);
893 GST_OBJECT_UNLOCK (splitmux);
895 case PROP_SINK_PROPERTIES:
896 GST_OBJECT_LOCK (splitmux);
897 if (splitmux->sink_properties)
898 gst_structure_free (splitmux->sink_properties);
899 if (gst_value_get_structure (value))
900 splitmux->sink_properties =
901 gst_structure_copy (gst_value_get_structure (value));
903 splitmux->sink_properties = NULL;
904 GST_OBJECT_UNLOCK (splitmux);
906 case PROP_MUXERPAD_MAP:
908 const GstStructure *s = gst_value_get_structure (value);
909 GST_SPLITMUX_LOCK (splitmux);
910 if (splitmux->muxerpad_map) {
911 gst_structure_free (splitmux->muxerpad_map);
914 splitmux->muxerpad_map = gst_structure_copy (s);
916 splitmux->muxerpad_map = NULL;
917 GST_SPLITMUX_UNLOCK (splitmux);
921 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
927 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
928 GValue * value, GParamSpec * pspec)
930 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
934 GST_OBJECT_LOCK (splitmux);
935 g_value_set_string (value, splitmux->location);
936 GST_OBJECT_UNLOCK (splitmux);
938 case PROP_START_INDEX:
939 GST_OBJECT_LOCK (splitmux);
940 g_value_set_int (value, splitmux->start_index);
941 GST_OBJECT_UNLOCK (splitmux);
943 case PROP_MAX_SIZE_BYTES:
944 GST_OBJECT_LOCK (splitmux);
945 g_value_set_uint64 (value, splitmux->threshold_bytes);
946 GST_OBJECT_UNLOCK (splitmux);
948 case PROP_MAX_SIZE_TIME:
949 GST_OBJECT_LOCK (splitmux);
950 g_value_set_uint64 (value, splitmux->threshold_time);
951 GST_OBJECT_UNLOCK (splitmux);
953 case PROP_MAX_SIZE_TIMECODE:
954 GST_OBJECT_LOCK (splitmux);
955 g_value_set_string (value, splitmux->threshold_timecode_str);
956 GST_OBJECT_UNLOCK (splitmux);
958 case PROP_SEND_KEYFRAME_REQUESTS:
959 GST_OBJECT_LOCK (splitmux);
960 g_value_set_boolean (value, splitmux->send_keyframe_requests);
961 GST_OBJECT_UNLOCK (splitmux);
964 GST_OBJECT_LOCK (splitmux);
965 g_value_set_uint (value, splitmux->max_files);
966 GST_OBJECT_UNLOCK (splitmux);
968 case PROP_MUXER_OVERHEAD:
969 GST_OBJECT_LOCK (splitmux);
970 g_value_set_double (value, splitmux->mux_overhead);
971 GST_OBJECT_UNLOCK (splitmux);
973 case PROP_USE_ROBUST_MUXING:
974 GST_OBJECT_LOCK (splitmux);
975 g_value_set_boolean (value, splitmux->use_robust_muxing);
976 GST_OBJECT_UNLOCK (splitmux);
978 case PROP_ALIGNMENT_THRESHOLD:
979 GST_OBJECT_LOCK (splitmux);
980 g_value_set_uint64 (value, splitmux->alignment_threshold);
981 GST_OBJECT_UNLOCK (splitmux);
984 GST_OBJECT_LOCK (splitmux);
985 g_value_set_object (value, splitmux->provided_sink);
986 GST_OBJECT_UNLOCK (splitmux);
989 GST_OBJECT_LOCK (splitmux);
990 g_value_set_object (value, splitmux->provided_muxer);
991 GST_OBJECT_UNLOCK (splitmux);
993 case PROP_RESET_MUXER:
994 GST_OBJECT_LOCK (splitmux);
995 g_value_set_boolean (value, splitmux->reset_muxer);
996 GST_OBJECT_UNLOCK (splitmux);
998 case PROP_ASYNC_FINALIZE:
999 GST_OBJECT_LOCK (splitmux);
1000 g_value_set_boolean (value, splitmux->async_finalize);
1001 GST_OBJECT_UNLOCK (splitmux);
1003 case PROP_MUXER_FACTORY:
1004 GST_OBJECT_LOCK (splitmux);
1005 g_value_set_string (value, splitmux->muxer_factory);
1006 GST_OBJECT_UNLOCK (splitmux);
1008 case PROP_MUXER_PRESET:
1009 GST_OBJECT_LOCK (splitmux);
1010 g_value_set_string (value, splitmux->muxer_preset);
1011 GST_OBJECT_UNLOCK (splitmux);
1013 case PROP_MUXER_PROPERTIES:
1014 GST_OBJECT_LOCK (splitmux);
1015 gst_value_set_structure (value, splitmux->muxer_properties);
1016 GST_OBJECT_UNLOCK (splitmux);
1018 case PROP_SINK_FACTORY:
1019 GST_OBJECT_LOCK (splitmux);
1020 g_value_set_string (value, splitmux->sink_factory);
1021 GST_OBJECT_UNLOCK (splitmux);
1023 case PROP_SINK_PRESET:
1024 GST_OBJECT_LOCK (splitmux);
1025 g_value_set_string (value, splitmux->sink_preset);
1026 GST_OBJECT_UNLOCK (splitmux);
1028 case PROP_SINK_PROPERTIES:
1029 GST_OBJECT_LOCK (splitmux);
1030 gst_value_set_structure (value, splitmux->sink_properties);
1031 GST_OBJECT_UNLOCK (splitmux);
1033 case PROP_MUXERPAD_MAP:
1034 GST_SPLITMUX_LOCK (splitmux);
1035 gst_value_set_structure (value, splitmux->muxerpad_map);
1036 GST_SPLITMUX_UNLOCK (splitmux);
1039 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1044 /* Convenience function */
1045 static inline GstClockTimeDiff
1046 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1048 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1050 if (GST_CLOCK_TIME_IS_VALID (val)) {
1052 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1062 mq_stream_ctx_reset (MqStreamCtx * ctx)
1064 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1065 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1066 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1067 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1068 g_queue_clear (&ctx->queued_bufs);
1071 static MqStreamCtx *
1072 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1076 ctx = g_new0 (MqStreamCtx, 1);
1077 ctx->splitmux = splitmux;
1078 g_queue_init (&ctx->queued_bufs);
1079 mq_stream_ctx_reset (ctx);
1085 mq_stream_ctx_free (MqStreamCtx * ctx)
1088 GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1090 g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1092 if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1093 gst_element_set_locked_state (ctx->q, TRUE);
1094 gst_element_set_state (ctx->q, GST_STATE_NULL);
1095 gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1096 gst_object_unref (parent);
1098 gst_object_unref (ctx->q);
1100 gst_object_unref (ctx->sinkpad);
1101 gst_object_unref (ctx->srcpad);
1102 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1103 g_queue_clear (&ctx->queued_bufs);
1108 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1111 gchar *location = NULL;
1113 const gchar *msg_name = opened ?
1114 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1115 GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1118 GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1120 running_time = *rtime;
1123 if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1124 "location") != NULL)
1125 g_object_get (sink, "location", &location, NULL);
1127 GST_DEBUG_OBJECT (splitmux,
1128 "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1129 msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1131 /* If it's in the middle of a teardown, the reference_ctc might have become
1133 if (splitmux->reference_ctx) {
1134 msg = gst_message_new_element (GST_OBJECT (splitmux),
1135 gst_structure_new (msg_name,
1136 "location", G_TYPE_STRING, location,
1137 "running-time", GST_TYPE_CLOCK_TIME, running_time,
1138 "sink", GST_TYPE_ELEMENT, sink, NULL));
1139 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1146 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1152 eos = gst_event_new_eos ();
1156 GST_SPLITMUX_LOCK (splitmux);
1158 pad = gst_pad_get_peer (ctx->srcpad);
1159 GST_SPLITMUX_UNLOCK (splitmux);
1161 gst_pad_send_event (pad, eos);
1162 GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1164 gst_object_unref (pad);
1168 /* Called with lock held, drops the lock to send EOS to the
1172 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1177 eos = gst_event_new_eos ();
1178 pad = gst_pad_get_peer (ctx->srcpad);
1180 ctx->out_eos = TRUE;
1182 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1183 GST_SPLITMUX_UNLOCK (splitmux);
1184 gst_pad_send_event (pad, eos);
1185 GST_SPLITMUX_LOCK (splitmux);
1187 gst_object_unref (pad);
1190 /* Called with lock held. Schedules an EOS event to the ctx pad
1191 * to happen in another thread */
1193 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1195 AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1196 GstPad *srcpad, *sinkpad;
1198 srcpad = ctx->srcpad;
1199 sinkpad = gst_pad_get_peer (srcpad);
1202 helper->pad = sinkpad; /* Takes the reference */
1204 ctx->out_eos_async_done = TRUE;
1206 /* There used to be a bug here, where we had to explicitly remove
1207 * the SINK flag so that GstBin would ignore it for EOS purposes.
1208 * That fixed a race where if splitmuxsink really reaches EOS
1209 * before an asynchronous background element has finished, then
1210 * the bin wouldn't actually send EOS to the pipeline. Even after
1211 * finishing and removing the old element, the bin didn't re-check
1212 * EOS status on removing a SINK element. That bug was fixed
1214 GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1217 g_assert_nonnull (helper->pad);
1218 gst_element_call_async (GST_ELEMENT (splitmux),
1219 (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1222 /* Called with lock held. TRUE iff all contexts have a
1223 * pending (or delivered) async eos event */
1225 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1227 gboolean ret = TRUE;
1230 for (item = splitmux->contexts; item; item = item->next) {
1231 MqStreamCtx *ctx = item->data;
1232 ret &= ctx->out_eos_async_done;
1237 /* Called with splitmux lock held to check if this output
1238 * context needs to sleep to wait for the release of the
1239 * next GOP, or to send EOS to close out the current file
1241 static GstFlowReturn
1242 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1244 if (ctx->caps_change)
1248 /* When first starting up, the reference stream has to output
1249 * the first buffer to prepare the muxer and sink */
1250 gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1251 GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1253 if (my_max_out_running_time != GST_CLOCK_STIME_NONE
1254 && my_max_out_running_time != G_MAXINT64) {
1255 my_max_out_running_time -= splitmux->alignment_threshold;
1256 GST_LOG_OBJECT (ctx->srcpad,
1257 "Max out running time currently %" GST_STIME_FORMAT
1258 ", with threshold applied it is %" GST_STIME_FORMAT,
1259 GST_STIME_ARGS (splitmux->max_out_running_time),
1260 GST_STIME_ARGS (my_max_out_running_time));
1264 || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1265 return GST_FLOW_FLUSHING;
1267 GST_LOG_OBJECT (ctx->srcpad,
1268 "Checking running time %" GST_STIME_FORMAT " against max %"
1269 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1270 GST_STIME_ARGS (my_max_out_running_time));
1273 if (splitmux->max_out_running_time != GST_CLOCK_STIME_NONE &&
1274 ctx->out_running_time < my_max_out_running_time) {
1278 switch (splitmux->output_state) {
1279 case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1280 /* We only get here if we've finished outputting a GOP and need to know
1281 * what to do next */
1282 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1283 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1286 case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1287 case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1288 /* We've reached the max out running_time to get here, so end this file now */
1289 if (ctx->out_eos == FALSE) {
1290 if (splitmux->async_finalize) {
1291 /* We must set EOS asynchronously at this point. We cannot defer
1292 * it, because we need all contexts to wake up, for the
1293 * reference context to eventually give us something at
1294 * START_NEXT_FILE. Otherwise, collectpads might choose another
1295 * context to give us the first buffer, and format-location-full
1296 * will not contain a valid sample. */
1297 g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1298 GINT_TO_POINTER (1));
1299 eos_context_async (ctx, splitmux);
1300 if (all_contexts_are_async_eos (splitmux)) {
1301 GST_INFO_OBJECT (splitmux,
1302 "All contexts are async_eos. Moving to the next file.");
1303 /* We can start the next file once we've asked each pad to go EOS */
1304 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1305 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1309 send_eos (splitmux, ctx);
1313 GST_INFO_OBJECT (splitmux,
1314 "At end-of-file state, but context %p is already EOS", ctx);
1317 case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1318 if (ctx->is_reference) {
1319 GstFlowReturn ret = GST_FLOW_OK;
1321 /* Special handling on the reference ctx to start new fragments
1322 * and collect commands from the command queue */
1323 /* drops the splitmux lock briefly: */
1324 /* We must have reference ctx in order for format-location-full to
1326 ret = start_next_fragment (splitmux, ctx);
1327 if (ret != GST_FLOW_OK)
1333 case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1335 SplitMuxOutputCommand *cmd =
1336 g_queue_pop_tail (&splitmux->out_cmd_q);
1338 /* If we pop the last command, we need to make our queues bigger */
1339 if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1340 grow_blocked_queues (splitmux);
1342 if (cmd->start_new_fragment) {
1343 if (splitmux->muxed_out_bytes > 0) {
1344 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1345 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1347 GST_DEBUG_OBJECT (splitmux,
1348 "Got cmd to start new fragment, but fragment is empty - ignoring.");
1351 GST_DEBUG_OBJECT (splitmux,
1352 "Got new output cmd for time %" GST_STIME_FORMAT,
1353 GST_STIME_ARGS (cmd->max_output_ts));
1355 /* Extend the output range immediately */
1356 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE
1357 || cmd->max_output_ts > splitmux->max_out_running_time)
1358 splitmux->max_out_running_time = cmd->max_output_ts;
1359 GST_DEBUG_OBJECT (splitmux,
1360 "Max out running time now %" GST_STIME_FORMAT,
1361 GST_STIME_ARGS (splitmux->max_out_running_time));
1362 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1364 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1366 out_cmd_buf_free (cmd);
1369 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1371 } while (!ctx->flushing && splitmux->output_state ==
1372 SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1373 /* loop and re-check the state */
1376 case SPLITMUX_OUTPUT_STATE_STOPPED:
1377 return GST_FLOW_FLUSHING;
1380 GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1383 GST_INFO_OBJECT (ctx->srcpad,
1384 "Sleeping for running time %"
1385 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1386 GST_STIME_ARGS (ctx->out_running_time),
1387 GST_STIME_ARGS (splitmux->max_out_running_time));
1388 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1389 GST_INFO_OBJECT (ctx->srcpad,
1390 "Woken for new max running time %" GST_STIME_FORMAT,
1391 GST_STIME_ARGS (splitmux->max_out_running_time));
1399 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1400 const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1401 GstVideoTimeCode ** next_tc)
1403 GstVideoTimeCode *target_tc;
1404 GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1406 if (cur_tc == NULL || splitmux->tc_interval == NULL)
1407 return GST_CLOCK_TIME_NONE;
1409 target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1411 GST_ELEMENT_ERROR (splitmux,
1412 STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1413 return GST_CLOCK_TIME_NONE;
1417 target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1418 cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1420 /* Add running_time, accounting for wraparound. */
1421 if (target_tc_time >= cur_tc_time) {
1422 next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1424 GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1426 if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1427 (cur_tc->config.fps_d == 1001)) {
1428 /* Checking fps_d is probably unneeded, but better safe than sorry
1429 * (e.g. someone accidentally set a flag) */
1430 GstVideoTimeCode *tc_for_offset;
1432 /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1433 * but slightly less. Calculate that duration from a fake timecode. The
1434 * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1435 * is to add one frame to 23:59:59;29 */
1437 gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1438 NULL, cur_tc->config.flags, 23, 59, 59,
1439 cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1441 gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1442 gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1443 cur_tc->config.fps_n);
1444 gst_video_time_code_free (tc_for_offset);
1446 next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1449 #ifndef GST_DISABLE_GST_DEBUG
1451 gchar *next_max_tc_str, *cur_tc_str;
1453 cur_tc_str = gst_video_time_code_to_string (cur_tc);
1454 next_max_tc_str = gst_video_time_code_to_string (target_tc);
1456 GST_INFO_OBJECT (splitmux, "Next max timecode %s time: %" GST_TIME_FORMAT
1457 " from ref timecode %s time: %" GST_TIME_FORMAT,
1459 GST_TIME_ARGS (next_max_tc_time),
1460 cur_tc_str, GST_TIME_ARGS (cur_tc_time));
1462 g_free (next_max_tc_str);
1463 g_free (cur_tc_str);
1468 *next_tc = target_tc;
1470 gst_video_time_code_free (target_tc);
1472 return next_max_tc_time;
1476 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1477 GstClockTimeDiff running_time_dts)
1480 GstClockTime target_time;
1481 gboolean timecode_based = FALSE;
1482 GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1483 GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1484 GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1485 GstClockTime tc_rounding_error = 5 * GST_USECOND;
1486 InputGop *newest_gop = NULL;
1489 if (!splitmux->send_keyframe_requests)
1492 /* Find the newest GOP where we passed in DTS the start PTS */
1493 for (l = splitmux->pending_input_gops.tail; l; l = l->prev) {
1494 InputGop *tmp = l->data;
1496 GST_TRACE_OBJECT (splitmux,
1497 "Having pending input GOP with start PTS %" GST_STIME_FORMAT
1498 " and start time %" GST_STIME_FORMAT,
1499 GST_STIME_ARGS (tmp->start_time_pts), GST_STIME_ARGS (tmp->start_time));
1501 if (tmp->sent_fku) {
1502 GST_DEBUG_OBJECT (splitmux,
1503 "Already checked for a keyframe request for this GOP");
1507 if (running_time_dts == GST_CLOCK_STIME_NONE ||
1508 tmp->start_time_pts == GST_CLOCK_STIME_NONE ||
1509 running_time_dts >= tmp->start_time_pts) {
1510 GST_DEBUG_OBJECT (splitmux,
1511 "Using GOP with start PTS %" GST_STIME_FORMAT " and start time %"
1512 GST_STIME_FORMAT, GST_STIME_ARGS (tmp->start_time_pts),
1513 GST_STIME_ARGS (tmp->start_time));
1520 GST_DEBUG_OBJECT (splitmux, "Have no complete enough pending input GOP");
1524 if (splitmux->tc_interval) {
1525 if (newest_gop->start_tc
1526 && gst_video_time_code_is_valid (newest_gop->start_tc)) {
1527 GstVideoTimeCode *next_tc = NULL;
1529 calculate_next_max_timecode (splitmux, newest_gop->start_tc,
1530 newest_gop->start_time, &next_tc);
1532 /* calculate the next expected keyframe time to prevent too early fku
1534 if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1536 calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1539 gst_video_time_code_free (next_tc);
1541 timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1542 GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1544 /* This can happen in the presence of GAP events that trigger
1545 * a new fragment start */
1546 GST_WARNING_OBJECT (splitmux,
1547 "No buffer available to calculate next timecode");
1551 if ((splitmux->threshold_time == 0 && !timecode_based)
1552 || splitmux->threshold_bytes != 0)
1555 if (timecode_based) {
1556 /* We might have rounding errors: aim slightly earlier */
1557 if (max_tc_time >= tc_rounding_error) {
1558 target_time = max_tc_time - tc_rounding_error;
1560 /* unreliable target time */
1561 GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1562 " is smaller than allowed rounding error, set it to zero",
1563 GST_TIME_ARGS (max_tc_time));
1567 if (next_max_tc_time >= tc_rounding_error) {
1568 next_fku_time = next_max_tc_time - tc_rounding_error;
1570 /* unreliable target time */
1571 GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1572 " is smaller than allowed rounding error, set it to zero",
1573 GST_TIME_ARGS (next_max_tc_time));
1577 target_time = newest_gop->start_time + splitmux->threshold_time;
1580 if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1581 GstClockTime allowed_time = splitmux->next_fku_time;
1583 if (timecode_based) {
1584 if (allowed_time >= tc_rounding_error) {
1585 allowed_time -= tc_rounding_error;
1587 /* unreliable next force key unit time */
1588 GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1590 " is smaller than allowed rounding error, set it to zero",
1591 GST_TIME_ARGS (splitmux->next_fku_time));
1596 if (target_time < allowed_time) {
1597 GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1598 " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1599 ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1600 GST_TIME_ARGS (target_time),
1601 GST_TIME_ARGS (splitmux->next_fku_time),
1602 GST_TIME_ARGS (allowed_time));
1605 } else if (allowed_time != splitmux->next_fku_time &&
1606 target_time < splitmux->next_fku_time) {
1607 GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1608 " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1609 ", but the difference is smaller than allowed rounding error",
1610 GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1614 if (!timecode_based) {
1615 next_fku_time = target_time + splitmux->threshold_time;
1618 GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1619 ", the next expected keyframe request time is %" GST_TIME_FORMAT,
1620 GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1622 newest_gop->sent_fku = TRUE;
1624 splitmux->next_fku_time = next_fku_time;
1625 ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1627 return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1630 static GstPadProbeReturn
1631 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1633 GstSplitMuxSink *splitmux = ctx->splitmux;
1634 MqStreamBuf *buf_info = NULL;
1635 GstFlowReturn ret = GST_FLOW_OK;
1637 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1639 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1640 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1641 g_warning ("Buffer list handling not implemented");
1642 return GST_PAD_PROBE_DROP;
1644 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1645 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1646 GstEvent *event = gst_pad_probe_info_get_event (info);
1647 gboolean locked = FALSE, wait = !ctx->is_reference;
1649 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1651 switch (GST_EVENT_TYPE (event)) {
1652 case GST_EVENT_SEGMENT:
1653 gst_event_copy_segment (event, &ctx->out_segment);
1655 case GST_EVENT_FLUSH_STOP:
1656 GST_SPLITMUX_LOCK (splitmux);
1658 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1659 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1660 g_queue_clear (&ctx->queued_bufs);
1661 g_queue_clear (&ctx->queued_bufs);
1662 /* If this is the reference context, we just threw away any queued keyframes */
1663 if (ctx->is_reference)
1664 splitmux->queued_keyframes = 0;
1665 ctx->flushing = FALSE;
1668 case GST_EVENT_FLUSH_START:
1669 GST_SPLITMUX_LOCK (splitmux);
1671 GST_LOG_OBJECT (pad, "Flush start");
1672 ctx->flushing = TRUE;
1673 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1674 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1677 GST_SPLITMUX_LOCK (splitmux);
1679 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1681 ctx->out_eos = TRUE;
1683 if (ctx == splitmux->reference_ctx) {
1684 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1685 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1688 GST_INFO_OBJECT (splitmux,
1689 "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1691 case GST_EVENT_GAP:{
1692 GstClockTime gap_ts;
1693 GstClockTimeDiff rtime;
1695 gst_event_parse_gap (event, &gap_ts, NULL);
1696 if (gap_ts == GST_CLOCK_TIME_NONE)
1699 GST_SPLITMUX_LOCK (splitmux);
1702 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1705 /* When we get a gap event on the
1706 * reference stream and we're trying to open a
1707 * new file, we need to store it until we get
1708 * the buffer afterwards
1710 if (ctx->is_reference &&
1711 (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1712 GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1713 gst_event_replace (&ctx->pending_gap, event);
1714 GST_SPLITMUX_UNLOCK (splitmux);
1715 return GST_PAD_PROBE_HANDLED;
1718 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1720 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1721 GST_STIME_ARGS (rtime));
1723 if (rtime != GST_CLOCK_STIME_NONE) {
1724 ctx->out_running_time = rtime;
1725 complete_or_wait_on_out (splitmux, ctx);
1729 case GST_EVENT_CUSTOM_DOWNSTREAM:{
1730 const GstStructure *s;
1731 GstClockTimeDiff ts = 0;
1733 s = gst_event_get_structure (event);
1734 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1737 gst_structure_get_int64 (s, "timestamp", &ts);
1739 GST_SPLITMUX_LOCK (splitmux);
1742 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1744 ctx->out_running_time = ts;
1745 if (!ctx->is_reference)
1746 ret = complete_or_wait_on_out (splitmux, ctx);
1747 GST_SPLITMUX_UNLOCK (splitmux);
1748 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1749 return GST_PAD_PROBE_DROP;
1751 case GST_EVENT_CAPS:{
1754 if (!ctx->is_reference)
1757 peer = gst_pad_get_peer (pad);
1759 gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1761 gst_object_unref (peer);
1769 /* This is in the case the muxer doesn't allow this change of caps */
1770 GST_SPLITMUX_LOCK (splitmux);
1772 ctx->caps_change = TRUE;
1774 if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1775 GST_DEBUG_OBJECT (splitmux,
1776 "New caps were not accepted. Switching output file");
1777 if (ctx->out_eos == FALSE) {
1778 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1779 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1783 /* Lets it fall through, if it fails again, then the muxer just can't
1784 * support this format, but at least we have a closed file.
1792 /* We need to make sure events aren't passed
1793 * until the muxer / sink are ready for it */
1795 GST_SPLITMUX_LOCK (splitmux);
1797 ret = complete_or_wait_on_out (splitmux, ctx);
1798 GST_SPLITMUX_UNLOCK (splitmux);
1800 /* Don't try to forward sticky events before the next buffer is there
1801 * because it would cause a new file to be created without the first
1802 * buffer being available.
1804 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1805 if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1806 gst_event_unref (event);
1807 return GST_PAD_PROBE_HANDLED;
1809 return GST_PAD_PROBE_PASS;
1813 /* Allow everything through until the configured next stopping point */
1814 GST_SPLITMUX_LOCK (splitmux);
1816 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1817 if (buf_info == NULL) {
1818 /* Can only happen due to a poorly timed flush */
1819 ret = GST_FLOW_FLUSHING;
1823 /* If we have popped a keyframe, decrement the queued_gop count */
1824 if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
1825 splitmux->queued_keyframes--;
1827 ctx->out_running_time = buf_info->run_ts;
1828 ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1830 GST_LOG_OBJECT (splitmux,
1831 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1832 " size %" G_GUINT64_FORMAT,
1833 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1835 ctx->caps_change = FALSE;
1837 ret = complete_or_wait_on_out (splitmux, ctx);
1839 splitmux->muxed_out_bytes += buf_info->buf_size;
1841 #ifndef GST_DISABLE_GST_DEBUG
1843 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1844 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1845 " run ts %" GST_STIME_FORMAT, buf,
1846 GST_STIME_ARGS (ctx->out_running_time));
1850 ctx->cur_out_buffer = NULL;
1851 GST_SPLITMUX_UNLOCK (splitmux);
1853 /* pending_gap is protected by the STREAM lock */
1854 if (ctx->pending_gap) {
1855 /* If we previously stored a gap event, send it now */
1856 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1858 GST_DEBUG_OBJECT (splitmux,
1859 "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1861 gst_pad_send_event (peer, ctx->pending_gap);
1862 ctx->pending_gap = NULL;
1864 gst_object_unref (peer);
1867 mq_stream_buf_free (buf_info);
1869 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1870 return GST_PAD_PROBE_PASS;
1873 GST_SPLITMUX_UNLOCK (splitmux);
1874 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1875 return GST_PAD_PROBE_DROP;
1879 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1881 return gst_pad_send_event (peer, gst_event_ref (*event));
1885 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1887 if (ctx->fragment_block_id > 0) {
1888 gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1889 ctx->fragment_block_id = 0;
1894 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1896 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1898 gst_pad_sticky_events_foreach (ctx->srcpad,
1899 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1901 /* Clear EOS flag if not actually EOS */
1902 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1903 ctx->out_eos_async_done = ctx->out_eos;
1905 gst_object_unref (peer);
1909 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1911 GstPad *sinkpad, *srcpad, *newpad;
1912 GstPadTemplate *templ;
1914 srcpad = ctx->srcpad;
1915 sinkpad = gst_pad_get_peer (srcpad);
1917 templ = sinkpad->padtemplate;
1919 gst_element_request_pad (splitmux->muxer, templ,
1920 GST_PAD_NAME (sinkpad), NULL);
1922 GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1924 if (!gst_pad_unlink (srcpad, sinkpad)) {
1925 gst_object_unref (sinkpad);
1928 if (gst_pad_link_full (srcpad, newpad,
1929 GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1930 gst_element_release_request_pad (splitmux->muxer, newpad);
1931 gst_object_unref (sinkpad);
1932 gst_object_unref (newpad);
1935 gst_object_unref (newpad);
1936 gst_object_unref (sinkpad);
1940 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1941 ("Could not create the new muxer/sink"), NULL);
1944 static GstPadProbeReturn
1945 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1947 return GST_PAD_PROBE_OK;
1951 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1953 ctx->fragment_block_id =
1954 gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1959 _set_property_from_structure (GQuark field_id, const GValue * value,
1962 const gchar *property_name = g_quark_to_string (field_id);
1963 GObject *element = G_OBJECT (user_data);
1965 g_object_set_property (element, property_name, value);
1971 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1973 gst_element_set_locked_state (element, TRUE);
1974 gst_element_set_state (element, GST_STATE_NULL);
1975 GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1976 gst_bin_remove (GST_BIN (splitmux), element);
1981 _send_event (const GValue * value, gpointer user_data)
1983 GstPad *pad = g_value_get_object (value);
1984 GstEvent *ev = user_data;
1986 gst_pad_send_event (pad, gst_event_ref (ev));
1989 /* Called with lock held when a fragment
1990 * reaches EOS and it is time to restart
1993 static GstFlowReturn
1994 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1996 GstElement *muxer, *sink;
1998 g_assert (ctx->is_reference);
2000 /* 1 change to new file */
2001 splitmux->switching_fragment = TRUE;
2003 /* We need to drop the splitmux lock to acquire the state lock
2004 * here and ensure there's no racy state change going on elsewhere */
2005 muxer = gst_object_ref (splitmux->muxer);
2006 sink = gst_object_ref (splitmux->active_sink);
2008 GST_SPLITMUX_UNLOCK (splitmux);
2009 GST_SPLITMUX_STATE_LOCK (splitmux);
2011 if (splitmux->shutdown) {
2012 GST_DEBUG_OBJECT (splitmux,
2013 "Shutdown requested. Aborting fragment switch.");
2014 GST_SPLITMUX_LOCK (splitmux);
2015 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2016 gst_object_unref (muxer);
2017 gst_object_unref (sink);
2018 return GST_FLOW_FLUSHING;
2021 if (splitmux->async_finalize) {
2022 if (splitmux->muxed_out_bytes > 0
2023 || splitmux->fragment_id != splitmux->start_index) {
2025 GstElement *new_sink, *new_muxer;
2027 GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
2028 splitmux->fragment_id);
2029 g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
2030 newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
2031 GST_SPLITMUX_LOCK (splitmux);
2032 if ((splitmux->sink =
2033 create_element (splitmux, splitmux->sink_factory, newname,
2036 if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
2037 gst_preset_load_preset (GST_PRESET (splitmux->sink),
2038 splitmux->sink_preset);
2039 if (splitmux->sink_properties)
2040 gst_structure_foreach (splitmux->sink_properties,
2041 _set_property_from_structure, splitmux->sink);
2042 splitmux->active_sink = splitmux->sink;
2043 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
2045 newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
2046 if ((splitmux->muxer =
2047 create_element (splitmux, splitmux->muxer_factory, newname,
2050 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2052 /* async child elements are causing state change races and weird
2053 * failures, so let's try and turn that off */
2054 g_object_set (splitmux->sink, "async", FALSE, NULL);
2056 if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
2057 gst_preset_load_preset (GST_PRESET (splitmux->muxer),
2058 splitmux->muxer_preset);
2059 if (splitmux->muxer_properties)
2060 gst_structure_foreach (splitmux->muxer_properties,
2061 _set_property_from_structure, splitmux->muxer);
2062 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2064 new_sink = splitmux->sink;
2065 new_muxer = splitmux->muxer;
2066 GST_SPLITMUX_UNLOCK (splitmux);
2067 g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2068 gst_element_link (new_muxer, new_sink);
2070 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2071 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2072 EOS_FROM_US)) == 2) {
2073 _lock_and_set_to_null (muxer, splitmux);
2074 _lock_and_set_to_null (sink, splitmux);
2076 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2077 GINT_TO_POINTER (2));
2080 gst_object_unref (muxer);
2081 gst_object_unref (sink);
2084 gst_object_ref (muxer);
2085 gst_object_ref (sink);
2089 gst_element_set_locked_state (muxer, TRUE);
2090 gst_element_set_locked_state (sink, TRUE);
2091 gst_element_set_state (sink, GST_STATE_NULL);
2093 if (splitmux->reset_muxer) {
2094 gst_element_set_state (muxer, GST_STATE_NULL);
2096 GstIterator *it = gst_element_iterate_sink_pads (muxer);
2100 ev = gst_event_new_flush_start ();
2101 seqnum = gst_event_get_seqnum (ev);
2102 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2103 gst_event_unref (ev);
2105 gst_iterator_resync (it);
2107 ev = gst_event_new_flush_stop (TRUE);
2108 gst_event_set_seqnum (ev, seqnum);
2109 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2110 gst_event_unref (ev);
2112 gst_iterator_free (it);
2116 GST_SPLITMUX_LOCK (splitmux);
2117 set_next_filename (splitmux, ctx);
2118 splitmux->muxed_out_bytes = 0;
2119 GST_SPLITMUX_UNLOCK (splitmux);
2121 if (gst_element_set_state (sink,
2122 GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2123 gst_element_set_state (sink, GST_STATE_NULL);
2124 gst_element_set_locked_state (muxer, FALSE);
2125 gst_element_set_locked_state (sink, FALSE);
2130 if (gst_element_set_state (muxer,
2131 GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2132 gst_element_set_state (muxer, GST_STATE_NULL);
2133 gst_element_set_state (sink, GST_STATE_NULL);
2134 gst_element_set_locked_state (muxer, FALSE);
2135 gst_element_set_locked_state (sink, FALSE);
2139 gst_element_set_locked_state (muxer, FALSE);
2140 gst_element_set_locked_state (sink, FALSE);
2142 gst_object_unref (sink);
2143 gst_object_unref (muxer);
2145 GST_SPLITMUX_LOCK (splitmux);
2146 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2147 splitmux->switching_fragment = FALSE;
2148 do_async_done (splitmux);
2150 splitmux->ready_for_output = TRUE;
2152 g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2153 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2155 send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2157 /* FIXME: Is this always the correct next state? */
2158 GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2159 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2160 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2164 gst_object_unref (sink);
2165 gst_object_unref (muxer);
2167 GST_SPLITMUX_LOCK (splitmux);
2168 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2169 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2170 ("Could not create the new muxer/sink"), NULL);
2171 return GST_FLOW_ERROR;
2174 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2175 ("Could not start new output sink"), NULL);
2176 gst_object_unref (sink);
2177 gst_object_unref (muxer);
2179 GST_SPLITMUX_LOCK (splitmux);
2180 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2181 splitmux->switching_fragment = FALSE;
2182 return GST_FLOW_ERROR;
2185 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2186 ("Could not start new muxer"), NULL);
2187 gst_object_unref (sink);
2188 gst_object_unref (muxer);
2190 GST_SPLITMUX_LOCK (splitmux);
2191 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2192 splitmux->switching_fragment = FALSE;
2193 return GST_FLOW_ERROR;
2197 bus_handler (GstBin * bin, GstMessage * message)
2199 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2201 switch (GST_MESSAGE_TYPE (message)) {
2202 case GST_MESSAGE_EOS:{
2203 /* If the state is draining out the current file, drop this EOS */
2206 sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2207 GST_SPLITMUX_LOCK (splitmux);
2209 send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2211 if (splitmux->async_finalize) {
2213 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2214 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2215 EOS_FROM_US)) == 2) {
2217 GstPad *sinksink, *muxersrc;
2219 sinksink = gst_element_get_static_pad (sink, "sink");
2220 muxersrc = gst_pad_get_peer (sinksink);
2221 muxer = gst_pad_get_parent_element (muxersrc);
2222 gst_object_unref (sinksink);
2223 gst_object_unref (muxersrc);
2225 gst_element_call_async (muxer,
2226 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2227 gst_object_ref (splitmux), gst_object_unref);
2228 gst_element_call_async (sink,
2229 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2230 gst_object_ref (splitmux), gst_object_unref);
2231 gst_object_unref (muxer);
2233 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2234 GINT_TO_POINTER (2));
2236 GST_DEBUG_OBJECT (splitmux,
2237 "Caught async EOS from previous muxer+sink. Dropping.");
2238 /* We forward the EOS so that it gets aggregated as normal. If the sink
2239 * finishes and is removed before the end, it will be de-aggregated */
2240 gst_message_unref (message);
2241 GST_SPLITMUX_UNLOCK (splitmux);
2244 } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2245 GST_DEBUG_OBJECT (splitmux,
2246 "Passing EOS message. Output state %d max_out_running_time %"
2247 GST_STIME_FORMAT, splitmux->output_state,
2248 GST_STIME_ARGS (splitmux->max_out_running_time));
2250 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2251 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2252 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2254 gst_message_unref (message);
2255 GST_SPLITMUX_UNLOCK (splitmux);
2258 GST_SPLITMUX_UNLOCK (splitmux);
2261 case GST_MESSAGE_ASYNC_START:
2262 case GST_MESSAGE_ASYNC_DONE:
2263 /* Ignore state changes from our children while switching */
2264 GST_SPLITMUX_LOCK (splitmux);
2265 if (splitmux->switching_fragment) {
2266 if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2267 || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2268 GST_LOG_OBJECT (splitmux,
2269 "Ignoring state change from child %" GST_PTR_FORMAT
2270 " while switching", GST_MESSAGE_SRC (message));
2271 gst_message_unref (message);
2272 GST_SPLITMUX_UNLOCK (splitmux);
2276 GST_SPLITMUX_UNLOCK (splitmux);
2278 case GST_MESSAGE_WARNING:
2280 GError *gerror = NULL;
2282 gst_message_parse_warning (message, &gerror, NULL);
2284 if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2286 gboolean caps_change = FALSE;
2288 GST_SPLITMUX_LOCK (splitmux);
2290 for (item = splitmux->contexts; item; item = item->next) {
2291 MqStreamCtx *ctx = item->data;
2293 if (ctx->caps_change) {
2299 GST_SPLITMUX_UNLOCK (splitmux);
2302 GST_LOG_OBJECT (splitmux,
2303 "Ignoring warning change from child %" GST_PTR_FORMAT
2304 " while switching caps", GST_MESSAGE_SRC (message));
2305 gst_message_unref (message);
2315 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2319 ctx_set_unblock (MqStreamCtx * ctx)
2321 ctx->need_unblock = TRUE;
2325 need_new_fragment (GstSplitMuxSink * splitmux,
2326 GstClockTime queued_time, GstClockTime queued_gop_time,
2327 guint64 queued_bytes)
2329 guint64 thresh_bytes;
2330 GstClockTime thresh_time;
2331 gboolean check_robust_muxing;
2332 GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2333 GstClockTime *ptr_to_time;
2334 const InputGop *gop, *next_gop;
2336 GST_OBJECT_LOCK (splitmux);
2337 thresh_bytes = splitmux->threshold_bytes;
2338 thresh_time = splitmux->threshold_time;
2339 ptr_to_time = (GstClockTime *)
2340 gst_queue_array_peek_head_struct (splitmux->times_to_split);
2342 time_to_split = *ptr_to_time;
2343 check_robust_muxing = splitmux->use_robust_muxing
2344 && splitmux->muxer_has_reserved_props;
2345 GST_OBJECT_UNLOCK (splitmux);
2347 /* Have we muxed at least one thing from the reference
2348 * stream into the file? If not, no other streams can have
2350 if (splitmux->fragment_reference_bytes <= 0) {
2351 GST_TRACE_OBJECT (splitmux,
2352 "Not ready to split - nothing muxed on the reference stream");
2356 /* User told us to split now */
2357 if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2358 GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2362 gop = g_queue_peek_head (&splitmux->pending_input_gops);
2363 /* We need a full GOP queued up at this point */
2364 g_assert (gop != NULL);
2365 next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2366 /* And the beginning of the next GOP or otherwise EOS */
2368 /* User told us to split at this running time */
2369 if (gop->start_time >= time_to_split) {
2370 GST_OBJECT_LOCK (splitmux);
2371 /* Dequeue running time */
2372 gst_queue_array_pop_head_struct (splitmux->times_to_split);
2373 /* Empty any running times after this that are past now */
2374 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2375 while (ptr_to_time) {
2376 time_to_split = *ptr_to_time;
2377 if (gop->start_time < time_to_split) {
2380 gst_queue_array_pop_head_struct (splitmux->times_to_split);
2381 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2383 GST_TRACE_OBJECT (splitmux,
2384 "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2385 GST_STIME_FORMAT, GST_STIME_ARGS (gop->start_time),
2386 GST_STIME_ARGS (time_to_split));
2387 GST_OBJECT_UNLOCK (splitmux);
2391 if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2392 GST_TRACE_OBJECT (splitmux,
2393 "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2394 return TRUE; /* Would overrun byte limit */
2397 if (thresh_time > 0 && queued_time > thresh_time) {
2398 GST_TRACE_OBJECT (splitmux,
2399 "queued time %" GST_STIME_FORMAT " overruns time limit",
2400 GST_STIME_ARGS (queued_time));
2401 return TRUE; /* Would overrun time limit */
2404 if (splitmux->tc_interval) {
2405 GstClockTime next_gop_start_time =
2406 next_gop ? next_gop->start_time : splitmux->max_in_running_time;
2408 if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2409 GST_CLOCK_STIME_IS_VALID (next_gop_start_time) &&
2410 next_gop_start_time >
2411 splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2412 GST_TRACE_OBJECT (splitmux,
2413 "in running time %" GST_STIME_FORMAT " overruns time limit %"
2414 GST_TIME_FORMAT, GST_STIME_ARGS (next_gop_start_time),
2415 GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2420 if (check_robust_muxing) {
2421 GstClockTime mux_reserved_remain;
2423 g_object_get (splitmux->muxer,
2424 "reserved-duration-remaining", &mux_reserved_remain, NULL);
2426 GST_LOG_OBJECT (splitmux,
2427 "Muxer robust muxing report - %" G_GUINT64_FORMAT
2428 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2429 mux_reserved_remain, queued_gop_time);
2431 if (queued_gop_time >= mux_reserved_remain) {
2432 GST_INFO_OBJECT (splitmux,
2433 "File is about to run out of header room - %" G_GUINT64_FORMAT
2434 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2435 ". Switching to new file", mux_reserved_remain, queued_gop_time);
2440 /* Continue and mux this GOP */
2444 /* probably we want to add this API? */
2446 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2448 GstVideoTimeCode *timecode = NULL;
2450 g_return_if_fail (old_tc != NULL);
2452 if (*old_tc == new_tc)
2456 timecode = gst_video_time_code_copy (new_tc);
2459 gst_video_time_code_free (*old_tc);
2464 /* Called with splitmux lock held */
2465 /* Called when entering ProcessingCompleteGop state
2466 * Assess if mq contents overflowed the current file
2467 * -> If yes, need to switch to new file
2468 * -> if no, set max_out_running_time to let this GOP in and
2469 * go to COLLECTING_GOP_START state
2472 handle_gathered_gop (GstSplitMuxSink * splitmux, const InputGop * gop,
2473 GstClockTimeDiff next_gop_start_time, GstClockTimeDiff max_out_running_time)
2475 guint64 queued_bytes;
2476 GstClockTimeDiff queued_time = 0;
2477 GstClockTimeDiff queued_gop_time = 0;
2478 SplitMuxOutputCommand *cmd;
2480 /* Assess if the multiqueue contents overflowed the current file */
2481 /* When considering if a newly gathered GOP overflows
2482 * the time limit for the file, only consider the running time of the
2483 * reference stream. Other streams might have run ahead a little bit,
2484 * but extra pieces won't be released to the muxer beyond the reference
2485 * stream cut-off anyway - so it forms the limit. */
2486 queued_bytes = splitmux->fragment_total_bytes + gop->total_bytes;
2487 queued_time = next_gop_start_time;
2488 /* queued_gop_time tracks how much unwritten data there is waiting to
2489 * be written to this fragment including this GOP */
2490 if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2491 queued_gop_time = queued_time - splitmux->reference_ctx->out_running_time;
2493 queued_gop_time = queued_time - gop->start_time;
2495 GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2496 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2497 " bytes %" G_GUINT64_FORMAT " in next gop start time %" GST_STIME_FORMAT
2498 " gop start time %" GST_STIME_FORMAT,
2499 GST_STIME_ARGS (queued_time), queued_bytes,
2500 GST_STIME_ARGS (next_gop_start_time), GST_STIME_ARGS (gop->start_time));
2502 if (queued_gop_time < 0)
2503 goto error_gop_duration;
2505 if (queued_time < splitmux->fragment_start_time)
2506 goto error_queued_time;
2508 queued_time -= splitmux->fragment_start_time;
2509 if (queued_time < queued_gop_time)
2510 queued_gop_time = queued_time;
2512 /* Expand queued bytes estimate by muxer overhead */
2513 queued_bytes += (queued_bytes * splitmux->mux_overhead);
2515 /* Check for overrun - have we output at least one byte and overrun
2516 * either threshold? */
2517 if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2518 if (splitmux->async_finalize) {
2519 GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2520 *sink_running_time = splitmux->reference_ctx->out_running_time;
2521 g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2522 RUNNING_TIME, sink_running_time, g_free);
2524 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2525 /* Tell the output side to start a new fragment */
2526 GST_INFO_OBJECT (splitmux,
2527 "This GOP (dur %" GST_STIME_FORMAT
2528 ") would overflow the fragment, Sending start_new_fragment cmd",
2529 GST_STIME_ARGS (queued_gop_time));
2530 cmd = out_cmd_buf_new ();
2531 cmd->start_new_fragment = TRUE;
2532 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2533 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2535 splitmux->fragment_start_time = gop->start_time;
2536 splitmux->fragment_start_time_pts = gop->start_time_pts;
2537 splitmux->fragment_total_bytes = 0;
2538 splitmux->fragment_reference_bytes = 0;
2540 video_time_code_replace (&splitmux->fragment_start_tc, gop->start_tc);
2541 splitmux->next_fragment_start_tc_time =
2542 calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2543 splitmux->fragment_start_time, NULL);
2544 if (!GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2545 GST_WARNING_OBJECT (splitmux,
2546 "Couldn't calculate next fragment start time for timecode mode");
2550 /* And set up to collect the next GOP */
2551 if (max_out_running_time != G_MAXINT64) {
2552 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2554 /* This is probably already the current state, but just in case: */
2555 splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2558 /* And wake all input contexts to send a wake-up event */
2559 g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2560 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2562 /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2563 splitmux->fragment_total_bytes += gop->total_bytes;
2564 splitmux->fragment_reference_bytes += gop->reference_bytes;
2566 if (gop->total_bytes > 0) {
2567 GST_LOG_OBJECT (splitmux,
2568 "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2569 " time %" GST_STIME_FORMAT,
2570 splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2572 /* Send this GOP to the output command queue */
2573 cmd = out_cmd_buf_new ();
2574 cmd->start_new_fragment = FALSE;
2575 cmd->max_output_ts = max_out_running_time;
2576 GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2577 GST_STIME_FORMAT, GST_STIME_ARGS (max_out_running_time));
2578 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2580 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2586 GST_ELEMENT_ERROR (splitmux,
2587 STREAM, FAILED, ("Timestamping error on input streams"),
2588 ("Queued GOP time is negative %" GST_STIME_FORMAT,
2589 GST_STIME_ARGS (queued_gop_time)));
2592 GST_ELEMENT_ERROR (splitmux,
2593 STREAM, FAILED, ("Timestamping error on input streams"),
2594 ("Queued time is negative. Input went backwards. queued_time - %"
2595 GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2599 /* Called with splitmux lock held */
2600 /* Called from each input pad when it is has all the pieces
2601 * for a GOP or EOS, starting with the reference pad which has set the
2602 * splitmux->max_in_running_time
2605 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2610 /* On ENDING_FILE, the reference stream sends a command to start a new
2611 * fragment, then releases the GOP for output in the new fragment.
2612 * If some streams received no buffer during the last GOP that overran,
2613 * because its next buffer has a timestamp bigger than
2614 * ctx->max_in_running_time, its queue is empty. In that case the only
2615 * way to wakeup the output thread is by injecting an event in the
2616 * queue. This usually happen with subtitle streams.
2617 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2618 if (ctx->need_unblock) {
2619 GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2620 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2621 GST_EVENT_TYPE_SERIALIZED,
2622 gst_structure_new ("splitmuxsink-unblock", "timestamp",
2623 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2625 GST_SPLITMUX_UNLOCK (splitmux);
2626 gst_pad_send_event (ctx->sinkpad, event);
2627 GST_SPLITMUX_LOCK (splitmux);
2629 ctx->need_unblock = FALSE;
2630 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2631 /* state may have changed while we were unlocked. Loop again if so */
2632 if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2637 GstClockTimeDiff next_gop_start = GST_CLOCK_STIME_NONE;
2639 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2640 GstClockTimeDiff max_out_running_time;
2641 gboolean ready = TRUE;
2643 const InputGop *next_gop;
2645 gop = g_queue_peek_head (&splitmux->pending_input_gops);
2646 next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2648 /* If we have no GOP or no next GOP here then the reference context is
2649 * at EOS, otherwise use the start time of the next GOP if we're far
2650 * enough in the GOP to know it */
2651 if (gop && next_gop) {
2652 if (!splitmux->reference_ctx->in_eos
2653 && splitmux->max_in_running_time_dts != GST_CLOCK_STIME_NONE
2654 && splitmux->max_in_running_time_dts < next_gop->start_time_pts) {
2655 GST_LOG_OBJECT (splitmux,
2656 "No further GOPs finished collecting, waiting until current DTS %"
2657 GST_STIME_FORMAT " has passed next GOP start PTS %"
2659 GST_STIME_ARGS (splitmux->max_in_running_time_dts),
2660 GST_STIME_ARGS (next_gop->start_time_pts));
2664 GST_LOG_OBJECT (splitmux,
2665 "Finished collecting GOP with start time %" GST_STIME_FORMAT
2666 ", next GOP start time %" GST_STIME_FORMAT,
2667 GST_STIME_ARGS (gop->start_time),
2668 GST_STIME_ARGS (next_gop->start_time));
2669 next_gop_start = next_gop->start_time;
2670 max_out_running_time =
2671 splitmux->reference_ctx->in_eos ? G_MAXINT64 : next_gop->start_time;
2672 } else if (!next_gop) {
2673 GST_LOG_OBJECT (splitmux, "Reference context is EOS");
2674 next_gop_start = splitmux->max_in_running_time;
2675 max_out_running_time = G_MAXINT64;
2677 GST_LOG_OBJECT (splitmux, "No further GOPs finished collecting");
2680 g_assert_not_reached ();
2683 g_assert (gop != NULL);
2685 /* Iterate each pad, and check that the input running time is at least
2686 * up to the start running time of the next GOP or EOS, and if so handle
2687 * the collected GOP */
2688 GST_LOG_OBJECT (splitmux, "Checking GOP collected, next GOP start %"
2689 GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (next_gop_start), ctx);
2690 for (cur = g_list_first (splitmux->contexts); cur != NULL;
2691 cur = g_list_next (cur)) {
2692 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2694 GST_LOG_OBJECT (splitmux,
2695 "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2696 " EOS %d", tmpctx, tmpctx->sinkpad,
2697 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2699 if (next_gop_start != GST_CLOCK_STIME_NONE &&
2700 tmpctx->in_running_time < next_gop_start && !tmpctx->in_eos) {
2701 GST_LOG_OBJECT (splitmux,
2702 "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2703 tmpctx, tmpctx->sinkpad);
2709 GST_DEBUG_OBJECT (splitmux,
2710 "Collected GOP is complete. Processing (ctx %p)", ctx);
2711 /* All pads have a complete GOP, release it into the multiqueue */
2712 handle_gathered_gop (splitmux, gop, next_gop_start,
2713 max_out_running_time);
2715 g_queue_pop_head (&splitmux->pending_input_gops);
2716 input_gop_free (gop);
2718 /* The user has requested a split, we can split now that the previous GOP
2719 * has been collected to the correct location */
2720 if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2722 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2727 /* If upstream reached EOS we are not expecting more data, no need to wait
2732 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2734 ctx->in_running_time >= next_gop_start &&
2735 next_gop_start != GST_CLOCK_STIME_NONE) {
2736 /* Some pad is not yet ready, or GOP is being pushed
2737 * either way, sleep and wait to get woken */
2738 GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2739 GST_SPLITMUX_WAIT_INPUT (splitmux);
2740 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2742 /* This pad is not ready or the state changed - break out and get another
2746 } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2749 static GstPadProbeReturn
2750 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2752 GstSplitMuxSink *splitmux = ctx->splitmux;
2753 GstFlowReturn ret = GST_FLOW_OK;
2755 MqStreamBuf *buf_info = NULL;
2756 GstClockTime ts, pts, dts;
2757 GstClockTimeDiff running_time, running_time_pts, running_time_dts;
2758 gboolean loop_again;
2759 gboolean keyframe = FALSE;
2761 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2763 /* FIXME: Handle buffer lists, until then make it clear they won't work */
2764 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2765 g_warning ("Buffer list handling not implemented");
2766 return GST_PAD_PROBE_DROP;
2768 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2769 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2770 GstEvent *event = gst_pad_probe_info_get_event (info);
2772 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2774 switch (GST_EVENT_TYPE (event)) {
2775 case GST_EVENT_SEGMENT:
2776 gst_event_copy_segment (event, &ctx->in_segment);
2778 case GST_EVENT_FLUSH_STOP:
2779 GST_SPLITMUX_LOCK (splitmux);
2780 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2781 ctx->in_eos = FALSE;
2782 ctx->in_running_time = GST_CLOCK_STIME_NONE;
2783 GST_SPLITMUX_UNLOCK (splitmux);
2786 GST_SPLITMUX_LOCK (splitmux);
2789 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2790 ret = GST_FLOW_FLUSHING;
2794 if (ctx->is_reference) {
2795 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2796 /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2797 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2798 /* Wake up other input pads to collect this GOP */
2799 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2800 check_completed_gop (splitmux, ctx);
2801 } else if (splitmux->input_state ==
2802 SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2803 /* If we are waiting for a GOP to be completed (ie, for aux
2804 * pads to catch up), then this pad is complete, so check
2805 * if the whole GOP is.
2807 check_completed_gop (splitmux, ctx);
2809 GST_SPLITMUX_UNLOCK (splitmux);
2811 case GST_EVENT_GAP:{
2812 GstClockTime gap_ts;
2813 GstClockTimeDiff rtime;
2815 gst_event_parse_gap (event, &gap_ts, NULL);
2816 if (gap_ts == GST_CLOCK_TIME_NONE)
2819 GST_SPLITMUX_LOCK (splitmux);
2821 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2822 ret = GST_FLOW_FLUSHING;
2825 rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2827 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2828 GST_STIME_ARGS (rtime));
2830 if (ctx->is_reference && GST_CLOCK_STIME_IS_VALID (rtime)) {
2831 /* If this GAP event happens before the first fragment then
2832 * initialize the fragment start time here. */
2833 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)) {
2834 splitmux->fragment_start_time = rtime;
2835 GST_LOG_OBJECT (splitmux,
2836 "Fragment start time now %" GST_STIME_FORMAT,
2837 GST_STIME_ARGS (splitmux->fragment_start_time));
2839 /* Also take this as the first start time when starting up,
2840 * so that we start counting overflow from the first frame */
2841 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2842 splitmux->max_in_running_time = rtime;
2843 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2844 splitmux->max_in_running_time_dts = rtime;
2847 /* Similarly take it as fragment start PTS and GOP start time if
2848 * these are not set */
2849 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time_pts))
2850 splitmux->fragment_start_time_pts = rtime;
2852 if (g_queue_is_empty (&splitmux->pending_input_gops)) {
2853 InputGop *gop = g_slice_new0 (InputGop);
2855 gop->from_gap = TRUE;
2856 gop->start_time = rtime;
2857 gop->start_time_pts = rtime;
2859 g_queue_push_tail (&splitmux->pending_input_gops, gop);
2863 GST_SPLITMUX_UNLOCK (splitmux);
2869 return GST_PAD_PROBE_PASS;
2870 } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2871 switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2872 case GST_QUERY_ALLOCATION:
2873 return GST_PAD_PROBE_DROP;
2875 return GST_PAD_PROBE_PASS;
2879 buf = gst_pad_probe_info_get_buffer (info);
2880 buf_info = mq_stream_buf_new ();
2882 pts = GST_BUFFER_PTS (buf);
2883 dts = GST_BUFFER_DTS (buf);
2884 if (GST_BUFFER_PTS_IS_VALID (buf))
2885 ts = GST_BUFFER_PTS (buf);
2887 ts = GST_BUFFER_DTS (buf);
2889 GST_LOG_OBJECT (pad,
2890 "Buffer TS is %" GST_TIME_FORMAT " (PTS %" GST_TIME_FORMAT ", DTS %"
2891 GST_TIME_FORMAT ")", GST_TIME_ARGS (ts), GST_TIME_ARGS (pts),
2892 GST_TIME_ARGS (dts));
2894 GST_SPLITMUX_LOCK (splitmux);
2896 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2897 ret = GST_FLOW_FLUSHING;
2901 /* If this buffer has a timestamp, advance the input timestamp of the
2903 if (GST_CLOCK_TIME_IS_VALID (ts)) {
2904 running_time = my_segment_to_running_time (&ctx->in_segment, ts);
2906 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2907 GST_STIME_ARGS (running_time));
2909 /* in running time is always the maximum PTS (or DTS) that was observed so far */
2910 if (GST_CLOCK_STIME_IS_VALID (running_time)
2911 && running_time > ctx->in_running_time)
2912 ctx->in_running_time = running_time;
2914 running_time = ctx->in_running_time;
2917 if (GST_CLOCK_TIME_IS_VALID (pts))
2918 running_time_pts = my_segment_to_running_time (&ctx->in_segment, pts);
2920 running_time_pts = GST_CLOCK_STIME_NONE;
2922 if (GST_CLOCK_TIME_IS_VALID (dts))
2923 running_time_dts = my_segment_to_running_time (&ctx->in_segment, dts);
2925 running_time_dts = GST_CLOCK_STIME_NONE;
2927 /* Try to make sure we have a valid running time */
2928 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2929 ctx->in_running_time =
2930 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2933 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2934 GST_STIME_ARGS (ctx->in_running_time));
2936 buf_info->run_ts = ctx->in_running_time;
2937 buf_info->buf_size = gst_buffer_get_size (buf);
2938 buf_info->duration = GST_BUFFER_DURATION (buf);
2940 if (ctx->is_reference) {
2941 InputGop *gop = NULL;
2942 GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
2944 /* initialize fragment_start_time if it was not set yet (i.e. for the
2945 * first fragment), or otherwise set it to the minimum observed time */
2946 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)
2947 || splitmux->fragment_start_time > running_time) {
2948 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time))
2949 splitmux->fragment_start_time_pts = running_time_pts;
2950 splitmux->fragment_start_time = running_time;
2952 GST_LOG_OBJECT (splitmux,
2953 "Fragment start time now %" GST_STIME_FORMAT " (initial PTS %"
2954 GST_STIME_FORMAT ")", GST_STIME_ARGS (splitmux->fragment_start_time),
2955 GST_STIME_ARGS (splitmux->fragment_start_time_pts));
2957 /* Also take this as the first start time when starting up,
2958 * so that we start counting overflow from the first frame */
2959 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)
2960 || splitmux->max_in_running_time < splitmux->fragment_start_time)
2961 splitmux->max_in_running_time = splitmux->fragment_start_time;
2963 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2964 splitmux->max_in_running_time_dts = running_time_dts;
2967 video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
2969 splitmux->next_fragment_start_tc_time =
2970 calculate_next_max_timecode (splitmux, &tc_meta->tc,
2971 running_time, NULL);
2973 #ifndef GST_DISABLE_GST_DEBUG
2977 tc_str = gst_video_time_code_to_string (&tc_meta->tc);
2978 GST_DEBUG_OBJECT (splitmux,
2979 "Initialize fragment start timecode %s, next fragment start timecode time %"
2980 GST_TIME_FORMAT, tc_str,
2981 GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2989 /* First check if we're at the very first GOP and the tracking was created
2990 * from a GAP event. In that case don't start a new GOP on keyframes but
2991 * just updated it as needed */
2992 gop = g_queue_peek_tail (&splitmux->pending_input_gops);
2994 if (!gop || (!gop->from_gap
2995 && !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))) {
2996 gop = g_slice_new0 (InputGop);
2998 gop->start_time = running_time;
2999 gop->start_time_pts = running_time_pts;
3001 GST_LOG_OBJECT (splitmux,
3002 "Next GOP start time now %" GST_STIME_FORMAT " (initial PTS %"
3003 GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3004 GST_STIME_ARGS (gop->start_time_pts));
3007 video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3009 #ifndef GST_DISABLE_GST_DEBUG
3013 tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3014 GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode %s", tc_str);
3020 g_queue_push_tail (&splitmux->pending_input_gops, gop);
3022 gop->from_gap = FALSE;
3024 if (!GST_CLOCK_STIME_IS_VALID (gop->start_time)
3025 || gop->start_time > running_time) {
3026 gop->start_time = running_time;
3028 GST_LOG_OBJECT (splitmux,
3029 "GOP start time updated now %" GST_STIME_FORMAT " (initial PTS %"
3030 GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3031 GST_STIME_ARGS (gop->start_time_pts));
3034 video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3036 #ifndef GST_DISABLE_GST_DEBUG
3040 tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3041 GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode updated %s",
3050 /* Check whether we need to request next keyframe depending on
3051 * current running time */
3052 if (request_next_keyframe (splitmux, buf, running_time_dts) == FALSE) {
3053 GST_WARNING_OBJECT (splitmux,
3054 "Could not request a keyframe. Files may not split at the exact location they should");
3059 InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3062 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
3063 " total GOP bytes %" G_GUINT64_FORMAT ", total next GOP bytes %"
3064 G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts),
3065 gop->total_bytes, gop->total_bytes);
3074 switch (splitmux->input_state) {
3075 case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
3076 if (ctx->is_releasing) {
3077 /* The pad belonging to this context is being released */
3078 GST_WARNING_OBJECT (pad, "Pad is being released while the muxer is "
3079 "running. Data might not drain correctly");
3081 } else if (ctx->is_reference) {
3082 const InputGop *gop, *next_gop;
3084 /* This is the reference context. If it's a keyframe,
3085 * it marks the start of a new GOP and we should wait in
3086 * check_completed_gop before continuing, but either way
3087 * (keyframe or no, we'll pass this buffer through after
3088 * so set loop_again to FALSE */
3091 gop = g_queue_peek_head (&splitmux->pending_input_gops);
3092 g_assert (gop != NULL);
3093 next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
3095 if (ctx->in_running_time > splitmux->max_in_running_time)
3096 splitmux->max_in_running_time = ctx->in_running_time;
3097 if (running_time_dts > splitmux->max_in_running_time_dts)
3098 splitmux->max_in_running_time_dts = running_time_dts;
3100 GST_LOG_OBJECT (splitmux,
3101 "Max in running time now %" GST_STIME_FORMAT ", DTS %"
3102 GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_in_running_time),
3103 GST_STIME_ARGS (splitmux->max_in_running_time_dts));
3106 GST_DEBUG_OBJECT (pad, "Waiting for end of GOP");
3107 /* Allow other input pads to catch up to here too */
3108 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3112 if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
3113 GST_INFO_OBJECT (pad,
3114 "Have keyframe with running time %" GST_STIME_FORMAT,
3115 GST_STIME_ARGS (ctx->in_running_time));
3119 if (running_time_dts != GST_CLOCK_STIME_NONE
3120 && running_time_dts < next_gop->start_time_pts) {
3121 GST_DEBUG_OBJECT (splitmux,
3122 "Waiting until DTS (%" GST_STIME_FORMAT
3123 ") has passed next GOP start PTS (%" GST_STIME_FORMAT ")",
3124 GST_STIME_ARGS (running_time_dts),
3125 GST_STIME_ARGS (next_gop->start_time_pts));
3126 /* Allow other input pads to catch up to here too */
3127 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3131 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
3132 /* Wake up other input pads to collect this GOP */
3133 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3134 check_completed_gop (splitmux, ctx);
3136 /* Pass this buffer if the reference ctx is far enough ahead */
3137 if (ctx->in_running_time < splitmux->max_in_running_time) {
3142 /* We're still waiting for a keyframe on the reference pad, sleep */
3143 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
3144 GST_SPLITMUX_WAIT_INPUT (splitmux);
3145 GST_LOG_OBJECT (pad,
3146 "Done sleeping for GOP start input state now %d",
3147 splitmux->input_state);
3150 case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
3151 /* We're collecting a GOP, this is only ever called for non-reference
3152 * contexts as the reference context would be waiting inside
3153 * check_completed_gop() */
3155 g_assert (!ctx->is_reference);
3157 /* If we overran the target timestamp, it might be time to process
3158 * the GOP, otherwise bail out for more data. */
3159 GST_LOG_OBJECT (pad,
3160 "Checking TS %" GST_STIME_FORMAT " against max %"
3161 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
3162 GST_STIME_ARGS (splitmux->max_in_running_time));
3164 if (ctx->in_running_time < splitmux->max_in_running_time) {
3169 GST_LOG_OBJECT (pad,
3170 "Collected last packet of GOP. Checking other pads");
3171 check_completed_gop (splitmux, ctx);
3174 case SPLITMUX_INPUT_STATE_FINISHING_UP:
3184 if (keyframe && ctx->is_reference)
3185 splitmux->queued_keyframes++;
3186 buf_info->keyframe = keyframe;
3188 /* Update total input byte counter for overflow detect unless we're after
3190 if (splitmux->input_state != SPLITMUX_INPUT_STATE_FINISHING_UP) {
3191 InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3193 /* We must have a GOP at this point */
3194 g_assert (gop != NULL);
3196 gop->total_bytes += buf_info->buf_size;
3197 if (ctx->is_reference) {
3198 gop->reference_bytes += buf_info->buf_size;
3202 /* Now add this buffer to the queue just before returning */
3203 g_queue_push_head (&ctx->queued_bufs, buf_info);
3205 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
3206 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
3208 GST_SPLITMUX_UNLOCK (splitmux);
3209 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3210 return GST_PAD_PROBE_PASS;
3213 GST_SPLITMUX_UNLOCK (splitmux);
3215 mq_stream_buf_free (buf_info);
3216 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3217 return GST_PAD_PROBE_PASS;
3221 grow_blocked_queues (GstSplitMuxSink * splitmux)
3225 /* Scan other queues for full-ness and grow them */
3226 for (cur = g_list_first (splitmux->contexts);
3227 cur != NULL; cur = g_list_next (cur)) {
3228 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3230 guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
3232 g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
3233 GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
3235 if (cur_len >= cur_limit) {
3236 cur_limit = cur_len + 1;
3237 GST_DEBUG_OBJECT (tmpctx->q,
3238 "Queue overflowed and needs enlarging. Growing to %u buffers",
3240 g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
3246 handle_q_underrun (GstElement * q, gpointer user_data)
3248 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3249 GstSplitMuxSink *splitmux = ctx->splitmux;
3251 GST_SPLITMUX_LOCK (splitmux);
3252 GST_DEBUG_OBJECT (q,
3253 "Queue reported underrun with %d keyframes and %d cmds enqueued",
3254 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3255 grow_blocked_queues (splitmux);
3256 GST_SPLITMUX_UNLOCK (splitmux);
3260 handle_q_overrun (GstElement * q, gpointer user_data)
3262 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3263 GstSplitMuxSink *splitmux = ctx->splitmux;
3264 gboolean allow_grow = FALSE;
3266 GST_SPLITMUX_LOCK (splitmux);
3267 GST_DEBUG_OBJECT (q,
3268 "Queue reported overrun with %d keyframes and %d cmds enqueued",
3269 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3271 if (splitmux->queued_keyframes < 2) {
3272 /* Less than a full GOP queued, grow the queue */
3274 } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3277 /* If another queue is starved, grow */
3279 for (cur = g_list_first (splitmux->contexts);
3280 cur != NULL; cur = g_list_next (cur)) {
3281 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3282 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3287 GST_SPLITMUX_UNLOCK (splitmux);
3292 g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3295 GST_DEBUG_OBJECT (q,
3296 "Queue overflowed and needs enlarging. Growing to %u buffers",
3299 g_object_set (q, "max-size-buffers", cur_limit, NULL);
3303 /* Called with SPLITMUX lock held */
3304 static const gchar *
3305 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3307 const gchar *ret = NULL;
3309 if (splitmux->muxerpad_map == NULL)
3312 if (sinkpad_name == NULL) {
3313 GST_WARNING_OBJECT (splitmux,
3314 "Can't look up request pad in pad map without providing a pad name");
3318 ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3320 GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3322 return g_strdup (ret);
3329 gst_splitmux_sink_request_new_pad (GstElement * element,
3330 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3332 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3333 GstPadTemplate *mux_template = NULL;
3334 GstPad *ret = NULL, *muxpad = NULL;
3336 GstPad *q_sink = NULL, *q_src = NULL;
3337 gchar *gname, *qname;
3338 gboolean is_primary_video = FALSE, is_video = FALSE,
3339 muxer_is_requestpad = FALSE;
3341 const gchar *muxer_padname = NULL;
3343 GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3345 GST_SPLITMUX_LOCK (splitmux);
3346 if (!create_muxer (splitmux))
3348 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3350 if (g_str_equal (templ->name_template, "video") ||
3351 g_str_has_prefix (templ->name_template, "video_aux_")) {
3352 is_primary_video = g_str_equal (templ->name_template, "video");
3353 if (is_primary_video && splitmux->have_video)
3354 goto already_have_video;
3358 /* See if there's a pad map and it lists this pad */
3359 muxer_padname = lookup_muxer_pad (splitmux, name);
3361 if (muxer_padname == NULL) {
3363 /* FIXME: Look for a pad template with matching caps, rather than by name */
3364 GST_DEBUG_OBJECT (element,
3365 "searching for pad-template with name 'video_%%u'");
3367 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3368 (splitmux->muxer), "video_%u");
3370 /* Fallback to find sink pad templates named 'video' (flvmux) */
3371 if (!mux_template) {
3372 GST_DEBUG_OBJECT (element,
3373 "searching for pad-template with name 'video'");
3375 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3376 (splitmux->muxer), "video");
3380 GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3381 templ->name_template);
3383 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3384 (splitmux->muxer), templ->name_template);
3386 /* Fallback to find sink pad templates named 'audio' (flvmux) */
3387 if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3388 GST_DEBUG_OBJECT (element,
3389 "searching for pad-template with name 'audio'");
3391 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3392 (splitmux->muxer), "audio");
3397 if (mux_template == NULL) {
3398 GST_DEBUG_OBJECT (element,
3399 "searching for pad-template with name 'sink_%%d'");
3401 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3402 (splitmux->muxer), "sink_%d");
3405 if (mux_template == NULL) {
3406 GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3408 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3409 (splitmux->muxer), "sink");
3413 if (mux_template == NULL) {
3414 GST_ERROR_OBJECT (element,
3415 "unable to find a suitable sink pad-template on the muxer");
3418 GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3419 mux_template->name_template);
3421 if (mux_template->presence == GST_PAD_REQUEST) {
3422 GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3425 gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3426 muxer_is_requestpad = TRUE;
3427 } else if (mux_template->presence == GST_PAD_ALWAYS) {
3428 GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3431 gst_element_get_static_pad (splitmux->muxer,
3432 mux_template->name_template);
3434 GST_ERROR_OBJECT (element,
3435 "unexpected pad presence %d", mux_template->presence);
3439 /* Have a muxer pad name */
3440 if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3442 gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3443 muxer_is_requestpad = TRUE;
3445 g_free ((gchar *) muxer_padname);
3446 muxer_padname = NULL;
3449 /* One way or another, we must have a muxer pad by now */
3453 if (is_primary_video)
3454 gname = g_strdup ("video");
3455 else if (name == NULL)
3456 gname = gst_pad_get_name (muxpad);
3458 gname = g_strdup (name);
3460 qname = g_strdup_printf ("queue_%s", gname);
3461 if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3467 gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3469 g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3470 "max-size-buffers", 5, NULL);
3472 q_sink = gst_element_get_static_pad (q, "sink");
3473 q_src = gst_element_get_static_pad (q, "src");
3475 if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3476 if (muxer_is_requestpad)
3477 gst_element_release_request_pad (splitmux->muxer, muxpad);
3478 gst_object_unref (GST_OBJECT (muxpad));
3482 gst_object_unref (GST_OBJECT (muxpad));
3484 ctx = mq_stream_ctx_new (splitmux);
3485 /* Context holds a ref: */
3486 ctx->q = gst_object_ref (q);
3487 ctx->srcpad = q_src;
3488 ctx->sinkpad = q_sink;
3490 g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3491 g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3493 ctx->src_pad_block_id =
3494 gst_pad_add_probe (q_src,
3495 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3496 (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3497 if (is_primary_video && splitmux->reference_ctx != NULL) {
3498 splitmux->reference_ctx->is_reference = FALSE;
3499 splitmux->reference_ctx = NULL;
3501 if (splitmux->reference_ctx == NULL) {
3502 splitmux->reference_ctx = ctx;
3503 ctx->is_reference = TRUE;
3506 ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3507 g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3509 ctx->sink_pad_block_id =
3510 gst_pad_add_probe (q_sink,
3511 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3512 GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3513 (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3515 GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3516 " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3518 splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3522 if (is_primary_video)
3523 splitmux->have_video = TRUE;
3525 gst_pad_set_active (ret, TRUE);
3526 gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3528 GST_SPLITMUX_UNLOCK (splitmux);
3532 GST_SPLITMUX_UNLOCK (splitmux);
3535 gst_object_unref (q_sink);
3537 gst_object_unref (q_src);
3540 GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3541 GST_SPLITMUX_UNLOCK (splitmux);
3546 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3548 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3549 GstPad *muxpad = NULL;
3551 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3553 GST_SPLITMUX_LOCK (splitmux);
3555 if (splitmux->muxer == NULL)
3556 goto fail; /* Elements don't exist yet - nothing to release */
3558 GST_INFO_OBJECT (pad, "releasing request pad");
3560 muxpad = gst_pad_get_peer (ctx->srcpad);
3562 /* Remove the context from our consideration */
3563 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3565 GST_SPLITMUX_UNLOCK (splitmux);
3567 if (ctx->sink_pad_block_id) {
3568 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3569 gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3572 if (ctx->src_pad_block_id)
3573 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3575 GST_SPLITMUX_LOCK (splitmux);
3577 ctx->is_releasing = TRUE;
3578 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3580 /* Can release the context now */
3581 mq_stream_ctx_free (ctx);
3582 if (ctx == splitmux->reference_ctx)
3583 splitmux->reference_ctx = NULL;
3585 /* Release and free the muxer input */
3587 gst_element_release_request_pad (splitmux->muxer, muxpad);
3588 gst_object_unref (muxpad);
3591 if (GST_PAD_PAD_TEMPLATE (pad) &&
3592 g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3594 splitmux->have_video = FALSE;
3596 gst_element_remove_pad (element, pad);
3598 /* Reset the internal elements only after all request pads are released */
3599 if (splitmux->contexts == NULL)
3600 gst_splitmux_reset_elements (splitmux);
3602 /* Wake up other input streams to check if the completion conditions have
3604 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3607 GST_SPLITMUX_UNLOCK (splitmux);
3611 create_element (GstSplitMuxSink * splitmux,
3612 const gchar * factory, const gchar * name, gboolean locked)
3614 GstElement *ret = gst_element_factory_make (factory, name);
3616 g_warning ("Failed to create %s - splitmuxsink will not work", name);
3621 /* Ensure the sink starts in locked state and NULL - it will be changed
3622 * by the filename setting code */
3623 gst_element_set_locked_state (ret, TRUE);
3624 gst_element_set_state (ret, GST_STATE_NULL);
3627 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3628 g_warning ("Could not add %s element - splitmuxsink will not work", name);
3629 gst_object_unref (ret);
3637 create_muxer (GstSplitMuxSink * splitmux)
3639 /* Create internal elements */
3640 if (splitmux->muxer == NULL) {
3641 GstElement *provided_muxer = NULL;
3643 GST_OBJECT_LOCK (splitmux);
3644 if (splitmux->provided_muxer != NULL)
3645 provided_muxer = gst_object_ref (splitmux->provided_muxer);
3646 GST_OBJECT_UNLOCK (splitmux);
3648 if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3649 (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3650 if ((splitmux->muxer =
3651 create_element (splitmux,
3652 splitmux->muxer_factory ? splitmux->
3653 muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3655 } else if (splitmux->async_finalize) {
3656 if ((splitmux->muxer =
3657 create_element (splitmux, splitmux->muxer_factory, "muxer",
3660 if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3661 gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3662 splitmux->muxer_preset);
3663 if (splitmux->muxer_properties)
3664 gst_structure_foreach (splitmux->muxer_properties,
3665 _set_property_from_structure, splitmux->muxer);
3667 /* Ensure it's not in locked state (we might be reusing an old element) */
3668 gst_element_set_locked_state (provided_muxer, FALSE);
3669 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3670 g_warning ("Could not add muxer element - splitmuxsink will not work");
3671 gst_object_unref (provided_muxer);
3675 splitmux->muxer = provided_muxer;
3676 gst_object_unref (provided_muxer);
3679 if (splitmux->use_robust_muxing) {
3680 update_muxer_properties (splitmux);
3690 find_sink (GstElement * e)
3692 GstElement *res = NULL;
3694 gboolean done = FALSE;
3695 GValue data = { 0, };
3697 if (!GST_IS_BIN (e))
3700 if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3703 iter = gst_bin_iterate_sinks (GST_BIN (e));
3705 switch (gst_iterator_next (iter, &data)) {
3706 case GST_ITERATOR_OK:
3708 GstElement *child = g_value_get_object (&data);
3709 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3710 "location") != NULL) {
3714 g_value_reset (&data);
3717 case GST_ITERATOR_RESYNC:
3718 gst_iterator_resync (iter);
3720 case GST_ITERATOR_DONE:
3723 case GST_ITERATOR_ERROR:
3724 g_assert_not_reached ();
3728 g_value_unset (&data);
3729 gst_iterator_free (iter);
3735 create_sink (GstSplitMuxSink * splitmux)
3737 GstElement *provided_sink = NULL;
3739 if (splitmux->active_sink == NULL) {
3741 GST_OBJECT_LOCK (splitmux);
3742 if (splitmux->provided_sink != NULL)
3743 provided_sink = gst_object_ref (splitmux->provided_sink);
3744 GST_OBJECT_UNLOCK (splitmux);
3746 if ((!splitmux->async_finalize && provided_sink == NULL) ||
3747 (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3748 if ((splitmux->sink =
3749 create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3751 splitmux->active_sink = splitmux->sink;
3752 } else if (splitmux->async_finalize) {
3753 if ((splitmux->sink =
3754 create_element (splitmux, splitmux->sink_factory, "sink",
3757 if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3758 gst_preset_load_preset (GST_PRESET (splitmux->sink),
3759 splitmux->sink_preset);
3760 if (splitmux->sink_properties)
3761 gst_structure_foreach (splitmux->sink_properties,
3762 _set_property_from_structure, splitmux->sink);
3763 splitmux->active_sink = splitmux->sink;
3765 /* Ensure the sink starts in locked state and NULL - it will be changed
3766 * by the filename setting code */
3767 gst_element_set_locked_state (provided_sink, TRUE);
3768 gst_element_set_state (provided_sink, GST_STATE_NULL);
3769 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3770 g_warning ("Could not add sink elements - splitmuxsink will not work");
3771 gst_object_unref (provided_sink);
3775 splitmux->active_sink = provided_sink;
3777 /* The bin holds a ref now, we can drop our tmp ref */
3778 gst_object_unref (provided_sink);
3780 /* Find the sink element */
3781 splitmux->sink = find_sink (splitmux->active_sink);
3782 if (splitmux->sink == NULL) {
3784 ("Could not locate sink element in provided sink - splitmuxsink will not work");
3790 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3792 /* async child elements are causing state change races and weird
3793 * failures, so let's try and turn that off */
3794 g_object_set (splitmux->sink, "async", FALSE, NULL);
3798 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3799 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3810 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3813 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3815 gchar *fname = NULL;
3819 gst_splitmux_sink_ensure_max_files (splitmux);
3821 if (ctx->cur_out_buffer == NULL) {
3822 GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3825 caps = gst_pad_get_current_caps (ctx->srcpad);
3826 sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3827 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3828 splitmux->fragment_id, sample, &fname);
3829 gst_sample_unref (sample);
3831 gst_caps_unref (caps);
3833 if (fname == NULL) {
3834 /* Fallback to the old signal if the new one returned nothing */
3835 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3836 splitmux->fragment_id, &fname);
3840 fname = splitmux->location ?
3841 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3844 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3845 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3846 "location") != NULL)
3847 g_object_set (splitmux->sink, "location", fname, NULL);
3851 splitmux->fragment_id++;
3854 /* called with GST_SPLITMUX_LOCK */
3856 do_async_start (GstSplitMuxSink * splitmux)
3858 GstMessage *message;
3860 if (!splitmux->need_async_start) {
3861 GST_INFO_OBJECT (splitmux, "no async_start needed");
3865 splitmux->async_pending = TRUE;
3867 GST_INFO_OBJECT (splitmux, "Sending async_start message");
3868 message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3870 GST_SPLITMUX_UNLOCK (splitmux);
3871 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3872 (splitmux), message);
3873 GST_SPLITMUX_LOCK (splitmux);
3876 /* called with GST_SPLITMUX_LOCK */
3878 do_async_done (GstSplitMuxSink * splitmux)
3880 GstMessage *message;
3882 if (splitmux->async_pending) {
3883 GST_INFO_OBJECT (splitmux, "Sending async_done message");
3884 splitmux->async_pending = FALSE;
3885 GST_SPLITMUX_UNLOCK (splitmux);
3888 gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3889 GST_CLOCK_TIME_NONE);
3890 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3891 (splitmux), message);
3892 GST_SPLITMUX_LOCK (splitmux);
3895 splitmux->need_async_start = FALSE;
3899 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3901 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3902 splitmux->max_in_running_time_dts = GST_CLOCK_STIME_NONE;
3904 splitmux->fragment_start_time = GST_CLOCK_STIME_NONE;
3905 splitmux->fragment_start_time_pts = GST_CLOCK_STIME_NONE;
3906 g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
3908 g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
3909 g_queue_clear (&splitmux->pending_input_gops);
3911 splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
3912 splitmux->fragment_total_bytes = 0;
3913 splitmux->fragment_reference_bytes = 0;
3914 splitmux->muxed_out_bytes = 0;
3915 splitmux->ready_for_output = FALSE;
3917 g_atomic_int_set (&(splitmux->split_requested), FALSE);
3918 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3920 splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
3921 gst_queue_array_clear (splitmux->times_to_split);
3923 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
3924 splitmux->queued_keyframes = 0;
3926 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
3927 g_queue_clear (&splitmux->out_cmd_q);
3930 static GstStateChangeReturn
3931 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3933 GstStateChangeReturn ret;
3934 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3936 switch (transition) {
3937 case GST_STATE_CHANGE_NULL_TO_READY:{
3938 GST_SPLITMUX_LOCK (splitmux);
3939 if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3940 ret = GST_STATE_CHANGE_FAILURE;
3941 GST_SPLITMUX_UNLOCK (splitmux);
3944 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3945 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3946 GST_SPLITMUX_UNLOCK (splitmux);
3947 splitmux->fragment_id = splitmux->start_index;
3950 case GST_STATE_CHANGE_READY_TO_PAUSED:{
3951 GST_SPLITMUX_LOCK (splitmux);
3952 /* Make sure contexts and tracking times are cleared, in case we're being reused */
3953 gst_splitmux_sink_reset (splitmux);
3954 /* Start by collecting one input on each pad */
3955 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3956 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3958 GST_SPLITMUX_UNLOCK (splitmux);
3960 GST_SPLITMUX_STATE_LOCK (splitmux);
3961 splitmux->shutdown = FALSE;
3962 GST_SPLITMUX_STATE_UNLOCK (splitmux);
3965 case GST_STATE_CHANGE_PAUSED_TO_READY:
3966 case GST_STATE_CHANGE_READY_TO_READY:
3967 g_atomic_int_set (&(splitmux->split_requested), FALSE);
3968 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3970 case GST_STATE_CHANGE_READY_TO_NULL:
3971 GST_SPLITMUX_STATE_LOCK (splitmux);
3972 splitmux->shutdown = TRUE;
3973 GST_SPLITMUX_STATE_UNLOCK (splitmux);
3975 GST_SPLITMUX_LOCK (splitmux);
3976 gst_splitmux_sink_reset (splitmux);
3977 splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3978 splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3979 /* Wake up any blocked threads */
3980 GST_LOG_OBJECT (splitmux,
3981 "State change -> NULL or READY. Waking threads");
3982 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3983 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
3984 GST_SPLITMUX_UNLOCK (splitmux);
3990 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3991 if (ret == GST_STATE_CHANGE_FAILURE)
3994 switch (transition) {
3995 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3996 splitmux->need_async_start = TRUE;
3998 case GST_STATE_CHANGE_READY_TO_PAUSED:{
3999 /* Change state async, because our child sink might not
4000 * be ready to do that for us yet if it's state is still locked */
4002 splitmux->need_async_start = TRUE;
4003 /* we want to go async to PAUSED until we managed to configure and add the
4005 GST_SPLITMUX_LOCK (splitmux);
4006 do_async_start (splitmux);
4007 GST_SPLITMUX_UNLOCK (splitmux);
4008 ret = GST_STATE_CHANGE_ASYNC;
4011 case GST_STATE_CHANGE_READY_TO_NULL:
4012 GST_SPLITMUX_LOCK (splitmux);
4013 splitmux->fragment_id = 0;
4014 /* Reset internal elements only if no pad contexts are using them */
4015 if (splitmux->contexts == NULL)
4016 gst_splitmux_reset_elements (splitmux);
4017 do_async_done (splitmux);
4018 GST_SPLITMUX_UNLOCK (splitmux);
4027 if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
4028 /* Cleanup elements on failed transition out of NULL */
4029 gst_splitmux_reset_elements (splitmux);
4030 GST_SPLITMUX_LOCK (splitmux);
4031 do_async_done (splitmux);
4032 GST_SPLITMUX_UNLOCK (splitmux);
4034 if (transition == GST_STATE_CHANGE_READY_TO_READY) {
4035 /* READY to READY transition only happens when we're already
4036 * in READY state, but a child element is in NULL, which
4037 * happens when there's an error changing the state of the sink.
4038 * We need to make sure not to fail the state transition, or
4039 * the core won't transition us back to NULL successfully */
4040 ret = GST_STATE_CHANGE_SUCCESS;
4046 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
4048 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
4049 splitmux->fragment_id = 0;
4054 split_now (GstSplitMuxSink * splitmux)
4056 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
4060 split_after (GstSplitMuxSink * splitmux)
4062 g_atomic_int_set (&(splitmux->split_requested), TRUE);
4066 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
4068 gboolean send_keyframe_requests;
4070 GST_SPLITMUX_LOCK (splitmux);
4071 gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
4072 send_keyframe_requests = splitmux->send_keyframe_requests;
4073 GST_SPLITMUX_UNLOCK (splitmux);
4075 if (send_keyframe_requests) {
4077 gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
4078 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
4079 GST_TIME_ARGS (split_time));
4080 if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
4081 GST_WARNING_OBJECT (splitmux,
4082 "Could not request keyframe at %" GST_TIME_FORMAT,
4083 GST_TIME_ARGS (split_time));