1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @title: splitmuxsink
23 * @short_description: Muxer wrapper for splitting output stream by size or time
25 * This element wraps a muxer and a sink, and starts a new file when the mux
26 * contents are about to cross a threshold of maximum size of maximum time,
27 * splitting at video keyframe boundaries. Exactly one input video stream
28 * can be muxed, with as many accompanying audio and subtitle streams as
31 * By default, it uses mp4mux and filesink, but they can be changed via
32 * the 'muxer' and 'sink' properties.
34 * The minimum file size is 1 GOP, however - so limits may be overrun if the
35 * distance between any 2 keyframes is larger than the limits.
37 * If a video stream is available, the splitting process is driven by the video
38 * stream contents, and the video stream must contain closed GOPs for the output
39 * file parts to be played individually correctly. In the absence of a video
40 * stream, the first available stream is used as reference for synchronization.
42 * In the async-finalize mode, when the threshold is crossed, the old muxer
43 * and sink is disconnected from the pipeline and left to finish the file
44 * asynchronously, and a new muxer and sink is created to continue with the
45 * next fragment. For that reason, instead of muxer and sink objects, the
46 * muxer-factory and sink-factory properties are used to construct the new
47 * objects, together with muxer-properties and sink-properties.
49 * ## Example pipelines
51 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
53 * Records a video stream captured from a v4l2 device and muxes it into
54 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55 * and 1MB maximum size.
58 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
60 * Records a video stream captured from a v4l2 device and muxer it into
61 * streamable Matroska files, splitting as needed to limit size/duration to 10
62 * seconds. Each file will finalize asynchronously.
65 * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
67 * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97 GstClockTime split_time);
106 PROP_MAX_SIZE_TIMECODE,
107 PROP_SEND_KEYFRAME_REQUESTS,
110 PROP_USE_ROBUST_MUXING,
111 PROP_ALIGNMENT_THRESHOLD,
118 PROP_MUXER_PROPERTIES,
121 PROP_SINK_PROPERTIES,
125 #define DEFAULT_MAX_SIZE_TIME 0
126 #define DEFAULT_MAX_SIZE_BYTES 0
127 #define DEFAULT_MAX_FILES 0
128 #define DEFAULT_MUXER_OVERHEAD 0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
138 typedef struct _AsyncEosHelper
146 SIGNAL_FORMAT_LOCATION,
147 SIGNAL_FORMAT_LOCATION_FULL,
150 SIGNAL_SPLIT_AT_RUNNING_TIME,
156 static guint signals[SIGNAL_LAST];
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
162 GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
167 GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
172 GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
177 GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
182 GST_STATIC_CAPS_ANY);
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188 * to forward an incoming EOS message, but we cannot rely on the state of the
189 * splitmux anymore, so we set this qdata on the sink instead.
190 * The muxer and sink must be destroyed after both of these things have
192 * 1) The EOS message has been sent when the fragment is ending
193 * 2) The muxer has been unlinked and relinked
194 * Therefore, EOS_FROM_US can have these two values:
195 * 0: EOS was not requested from us. Forward the message. The muxer and the
196 * sink will be destroyed together with the rest of the bin.
197 * 1: EOS was requested from us, but the other of the two tasks hasn't
198 * finished. Set EOS_FROM_US to 2 and do your stuff.
199 * 2: EOS was requested from us and the other of the two tasks has finished.
200 * Now we can destroy the muxer and the sink.
206 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207 EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208 RUNNING_TIME = g_quark_from_static_string ("running-time");
209 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210 "Split File Muxing Sink");
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217 GST_TYPE_SPLITMUX_SINK);
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222 const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224 GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233 element, GstStateChange transition);
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244 const gchar * factory, const gchar * name, gboolean locked);
246 static void do_async_done (GstSplitMuxSink * splitmux);
248 static GstClockTime calculate_next_max_timecode (GstSplitMuxSink * splitmux,
249 const GstVideoTimeCode * cur_tc, GstClockTime running_time,
250 GstVideoTimeCode ** next_tc);
253 mq_stream_buf_new (void)
255 return g_slice_new0 (MqStreamBuf);
259 mq_stream_buf_free (MqStreamBuf * data)
261 g_slice_free (MqStreamBuf, data);
264 static SplitMuxOutputCommand *
265 out_cmd_buf_new (void)
267 return g_slice_new0 (SplitMuxOutputCommand);
271 out_cmd_buf_free (SplitMuxOutputCommand * data)
273 g_slice_free (SplitMuxOutputCommand, data);
277 input_gop_free (InputGop * gop)
279 g_clear_pointer (&gop->start_tc, gst_video_time_code_free);
280 g_slice_free (InputGop, gop);
284 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
286 GObjectClass *gobject_class = (GObjectClass *) klass;
287 GstElementClass *gstelement_class = (GstElementClass *) klass;
288 GstBinClass *gstbin_class = (GstBinClass *) klass;
290 gobject_class->set_property = gst_splitmux_sink_set_property;
291 gobject_class->get_property = gst_splitmux_sink_get_property;
292 gobject_class->dispose = gst_splitmux_sink_dispose;
293 gobject_class->finalize = gst_splitmux_sink_finalize;
295 gst_element_class_set_static_metadata (gstelement_class,
296 "Split Muxing Bin", "Generic/Bin/Muxer",
297 "Convenience bin that muxes incoming streams into multiple time/size limited files",
298 "Jan Schmidt <jan@centricular.com>");
300 gst_element_class_add_static_pad_template (gstelement_class,
301 &video_sink_template);
302 gst_element_class_add_static_pad_template (gstelement_class,
303 &video_aux_sink_template);
304 gst_element_class_add_static_pad_template (gstelement_class,
305 &audio_sink_template);
306 gst_element_class_add_static_pad_template (gstelement_class,
307 &subtitle_sink_template);
308 gst_element_class_add_static_pad_template (gstelement_class,
309 &caption_sink_template);
311 gstelement_class->change_state =
312 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
313 gstelement_class->request_new_pad =
314 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
315 gstelement_class->release_pad =
316 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
318 gstbin_class->handle_message = bus_handler;
320 g_object_class_install_property (gobject_class, PROP_LOCATION,
321 g_param_spec_string ("location", "File Output Pattern",
322 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
323 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
325 g_param_spec_double ("mux-overhead", "Muxing Overhead",
326 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
327 DEFAULT_MUXER_OVERHEAD,
328 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
330 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
331 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
332 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
333 DEFAULT_MAX_SIZE_TIME,
334 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
335 G_PARAM_STATIC_STRINGS));
336 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
337 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
338 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
339 DEFAULT_MAX_SIZE_BYTES,
340 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
341 G_PARAM_STATIC_STRINGS));
342 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
343 g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
344 "Maximum difference in timecode between first and last frame. "
345 "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
346 "Will only be effective if a timecode track is present.", NULL,
347 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
348 G_PARAM_STATIC_STRINGS));
349 g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
350 g_param_spec_boolean ("send-keyframe-requests",
351 "Request keyframes at max-size-time",
352 "Request a keyframe every max-size-time ns to try splitting at that point. "
353 "Needs max-size-bytes to be 0 in order to be effective.",
354 DEFAULT_SEND_KEYFRAME_REQUESTS,
355 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
356 G_PARAM_STATIC_STRINGS));
357 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
358 g_param_spec_uint ("max-files", "Max files",
359 "Maximum number of files to keep on disk. Once the maximum is reached,"
360 "old files start to be deleted to make room for new ones.", 0,
361 G_MAXUINT, DEFAULT_MAX_FILES,
362 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363 g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
364 g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
365 "Allow non-reference streams to be that many ns before the reference"
366 " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
367 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
368 G_PARAM_STATIC_STRINGS));
370 g_object_class_install_property (gobject_class, PROP_MUXER,
371 g_param_spec_object ("muxer", "Muxer",
372 "The muxer element to use (NULL = default mp4mux). "
373 "Valid only for async-finalize = FALSE",
374 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375 g_object_class_install_property (gobject_class, PROP_SINK,
376 g_param_spec_object ("sink", "Sink",
377 "The sink element (or element chain) to use (NULL = default filesink). "
378 "Valid only for async-finalize = FALSE",
379 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
381 g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
382 g_param_spec_boolean ("use-robust-muxing",
383 "Support robust-muxing mode of some muxers",
384 "Check if muxers support robust muxing via the reserved-max-duration and "
385 "reserved-duration-remaining properties and use them if so. "
386 "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
387 " create new fragments if the reserved header space is about to overflow. "
388 "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
389 "manually by the app to a non-zero value for robust muxing to have an effect.",
390 DEFAULT_USE_ROBUST_MUXING,
391 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
393 g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
394 g_param_spec_boolean ("reset-muxer",
396 "Reset the muxer after each segment. Disabling this will not work for most muxers.",
397 DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399 g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
400 g_param_spec_boolean ("async-finalize",
401 "Finalize fragments asynchronously",
402 "Finalize each fragment asynchronously and start a new one",
403 DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404 g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
405 g_param_spec_string ("muxer-factory", "Muxer factory",
406 "The muxer element factory to use (default = mp4mux). "
407 "Valid only for async-finalize = TRUE",
408 "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
410 * GstSplitMuxSink:muxer-preset
412 * An optional #GstPreset name to use for the muxer. This only has an effect
413 * in `async-finalize=TRUE` mode.
417 g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
418 g_param_spec_string ("muxer-preset", "Muxer preset",
419 "The muxer preset to use. "
420 "Valid only for async-finalize = TRUE",
421 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422 g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
423 g_param_spec_boxed ("muxer-properties", "Muxer properties",
424 "The muxer element properties to use. "
425 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
426 "Valid only for async-finalize = TRUE",
427 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428 g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
429 g_param_spec_string ("sink-factory", "Sink factory",
430 "The sink element factory to use (default = filesink). "
431 "Valid only for async-finalize = TRUE",
432 "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
434 * GstSplitMuxSink:sink-preset
436 * An optional #GstPreset name to use for the sink. This only has an effect
437 * in `async-finalize=TRUE` mode.
441 g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
442 g_param_spec_string ("sink-preset", "Sink preset",
443 "The sink preset to use. "
444 "Valid only for async-finalize = TRUE",
445 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446 g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
447 g_param_spec_boxed ("sink-properties", "Sink properties",
448 "The sink element properties to use. "
449 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
450 "Valid only for async-finalize = TRUE",
451 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452 g_object_class_install_property (gobject_class, PROP_START_INDEX,
453 g_param_spec_int ("start-index", "Start Index",
454 "Start value of fragment index.",
455 0, G_MAXINT, DEFAULT_START_INDEX,
456 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
459 * GstSplitMuxSink::muxer-pad-map
461 * An optional GstStructure that provides a map from splitmuxsink sinkpad
462 * names to muxer pad names they should feed. Splitmuxsink has some default
463 * mapping behaviour to link video to video pads and audio to audio pads
464 * that usually works fine. This property is useful if you need to ensure
465 * a particular mapping to muxed streams.
467 * The GstStructure contains string fields like so:
468 * splitmuxsink muxer-pad-map=x-pad-map,video=video_1
472 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
473 g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
474 "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
476 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
479 * GstSplitMuxSink::format-location:
480 * @splitmux: the #GstSplitMuxSink
481 * @fragment_id: the sequence number of the file to be created
483 * Returns: the location to be used for the next output file. This must be
484 * a newly-allocated string which will be freed with g_free() by the
485 * splitmuxsink element when it no longer needs it, so use g_strdup() or
486 * g_strdup_printf() or similar functions to allocate it.
488 signals[SIGNAL_FORMAT_LOCATION] =
489 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
490 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
493 * GstSplitMuxSink::format-location-full:
494 * @splitmux: the #GstSplitMuxSink
495 * @fragment_id: the sequence number of the file to be created
496 * @first_sample: A #GstSample containing the first buffer
497 * from the reference stream in the new file
499 * Returns: the location to be used for the next output file. This must be
500 * a newly-allocated string which will be freed with g_free() by the
501 * splitmuxsink element when it no longer needs it, so use g_strdup() or
502 * g_strdup_printf() or similar functions to allocate it.
506 signals[SIGNAL_FORMAT_LOCATION_FULL] =
507 g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
508 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
512 * GstSplitMuxSink::split-now:
513 * @splitmux: the #GstSplitMuxSink
515 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
516 * The current GOP will be output to the new file.
520 signals[SIGNAL_SPLIT_NOW] =
521 g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
522 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
523 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
527 * GstSplitMuxSink::split-after:
528 * @splitmux: the #GstSplitMuxSink
530 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
531 * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
535 signals[SIGNAL_SPLIT_AFTER] =
536 g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
537 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
538 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
542 * GstSplitMuxSink::split-at-running-time:
543 * @splitmux: the #GstSplitMuxSink
545 * When called by the user, this action signal splits the video file (and
546 * begins a new one) as soon as the given running time is reached. If this
547 * action signal is called multiple times, running times are queued up and
548 * processed in the order they were given.
550 * Note that this is prone to race conditions, where said running time is
551 * reached and surpassed before we had a chance to split. The file will
552 * still split immediately, but in order to make sure that the split doesn't
553 * happen too late, it is recommended to call this action signal from
554 * something that will prevent further buffers from flowing into
555 * splitmuxsink before the split is completed, such as a pad probe before
561 signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
562 g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
563 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
564 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
565 NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
568 * GstSplitMuxSink::muxer-added:
569 * @splitmux: the #GstSplitMuxSink
570 * @muxer: the newly added muxer element
574 signals[SIGNAL_MUXER_ADDED] =
575 g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
576 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
579 * GstSplitMuxSink::sink-added:
580 * @splitmux: the #GstSplitMuxSink
581 * @sink: the newly added sink element
585 signals[SIGNAL_SINK_ADDED] =
586 g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
587 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
589 klass->split_now = split_now;
590 klass->split_after = split_after;
591 klass->split_at_running_time = split_at_running_time;
595 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
597 g_mutex_init (&splitmux->lock);
598 g_mutex_init (&splitmux->state_lock);
599 g_cond_init (&splitmux->input_cond);
600 g_cond_init (&splitmux->output_cond);
601 g_queue_init (&splitmux->out_cmd_q);
603 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
604 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
605 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
606 splitmux->max_files = DEFAULT_MAX_FILES;
607 splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
608 splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
609 splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
610 splitmux->reset_muxer = DEFAULT_RESET_MUXER;
612 splitmux->threshold_timecode_str = NULL;
614 splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
615 splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
616 splitmux->muxer_properties = NULL;
617 splitmux->sink_factory = g_strdup (DEFAULT_SINK);
618 splitmux->sink_properties = NULL;
620 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
621 splitmux->split_requested = FALSE;
622 splitmux->do_split_next_gop = FALSE;
623 splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
624 splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
626 g_queue_init (&splitmux->pending_input_gops);
630 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
632 if (splitmux->muxer) {
633 gst_element_set_locked_state (splitmux->muxer, TRUE);
634 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
635 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
637 if (splitmux->active_sink) {
638 gst_element_set_locked_state (splitmux->active_sink, TRUE);
639 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
640 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
643 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
647 gst_splitmux_sink_dispose (GObject * object)
649 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
651 /* Calling parent dispose invalidates all child pointers */
652 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
654 G_OBJECT_CLASS (parent_class)->dispose (object);
658 gst_splitmux_sink_finalize (GObject * object)
660 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
662 g_cond_clear (&splitmux->input_cond);
663 g_cond_clear (&splitmux->output_cond);
664 g_mutex_clear (&splitmux->lock);
665 g_mutex_clear (&splitmux->state_lock);
666 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
667 g_queue_clear (&splitmux->out_cmd_q);
668 g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
669 g_queue_clear (&splitmux->pending_input_gops);
671 g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
673 if (splitmux->muxerpad_map)
674 gst_structure_free (splitmux->muxerpad_map);
676 if (splitmux->provided_sink)
677 gst_object_unref (splitmux->provided_sink);
678 if (splitmux->provided_muxer)
679 gst_object_unref (splitmux->provided_muxer);
681 if (splitmux->muxer_factory)
682 g_free (splitmux->muxer_factory);
683 if (splitmux->muxer_preset)
684 g_free (splitmux->muxer_preset);
685 if (splitmux->muxer_properties)
686 gst_structure_free (splitmux->muxer_properties);
687 if (splitmux->sink_factory)
688 g_free (splitmux->sink_factory);
689 if (splitmux->sink_preset)
690 g_free (splitmux->sink_preset);
691 if (splitmux->sink_properties)
692 gst_structure_free (splitmux->sink_properties);
694 if (splitmux->threshold_timecode_str)
695 g_free (splitmux->threshold_timecode_str);
696 if (splitmux->tc_interval)
697 gst_video_time_code_interval_free (splitmux->tc_interval);
699 if (splitmux->times_to_split)
700 gst_queue_array_free (splitmux->times_to_split);
702 g_free (splitmux->location);
704 /* Make sure to free any un-released contexts. There should not be any,
705 * because the dispose will have freed all request pads though */
706 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
707 g_list_free (splitmux->contexts);
709 G_OBJECT_CLASS (parent_class)->finalize (object);
713 * Set any time threshold to the muxer, if it has
714 * reserved-max-duration and reserved-duration-remaining
715 * properties. Called when creating/claiming the muxer
716 * in create_elements() */
718 update_muxer_properties (GstSplitMuxSink * sink)
721 GstClockTime threshold_time;
723 sink->muxer_has_reserved_props = FALSE;
724 if (sink->muxer == NULL)
726 klass = G_OBJECT_GET_CLASS (sink->muxer);
727 if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
729 if (g_object_class_find_property (klass,
730 "reserved-duration-remaining") == NULL)
732 sink->muxer_has_reserved_props = TRUE;
734 GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
735 GST_TIME_ARGS (sink->threshold_time));
736 GST_OBJECT_LOCK (sink);
737 threshold_time = sink->threshold_time;
738 GST_OBJECT_UNLOCK (sink);
740 if (threshold_time > 0) {
741 /* Tell the muxer how much space to reserve */
742 GstClockTime muxer_threshold = threshold_time;
743 g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
748 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
749 const GValue * value, GParamSpec * pspec)
751 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
755 GST_OBJECT_LOCK (splitmux);
756 g_free (splitmux->location);
757 splitmux->location = g_value_dup_string (value);
758 GST_OBJECT_UNLOCK (splitmux);
761 case PROP_START_INDEX:
762 GST_OBJECT_LOCK (splitmux);
763 splitmux->start_index = g_value_get_int (value);
764 GST_OBJECT_UNLOCK (splitmux);
766 case PROP_MAX_SIZE_BYTES:
767 GST_OBJECT_LOCK (splitmux);
768 splitmux->threshold_bytes = g_value_get_uint64 (value);
769 GST_OBJECT_UNLOCK (splitmux);
771 case PROP_MAX_SIZE_TIME:
772 GST_OBJECT_LOCK (splitmux);
773 splitmux->threshold_time = g_value_get_uint64 (value);
774 GST_OBJECT_UNLOCK (splitmux);
776 case PROP_MAX_SIZE_TIMECODE:
777 GST_OBJECT_LOCK (splitmux);
778 g_free (splitmux->threshold_timecode_str);
779 /* will be calculated later */
780 g_clear_pointer (&splitmux->tc_interval,
781 gst_video_time_code_interval_free);
783 splitmux->threshold_timecode_str = g_value_dup_string (value);
784 if (splitmux->threshold_timecode_str) {
785 splitmux->tc_interval =
786 gst_video_time_code_interval_new_from_string
787 (splitmux->threshold_timecode_str);
788 if (!splitmux->tc_interval) {
789 g_warning ("Wrong timecode string %s",
790 splitmux->threshold_timecode_str);
791 g_free (splitmux->threshold_timecode_str);
792 splitmux->threshold_timecode_str = NULL;
795 splitmux->next_fragment_start_tc_time =
796 calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
797 splitmux->fragment_start_time, NULL);
798 if (splitmux->tc_interval && splitmux->fragment_start_tc
799 && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
800 GST_WARNING_OBJECT (splitmux,
801 "Couldn't calculate next fragment start time for timecode mode");
803 GST_OBJECT_UNLOCK (splitmux);
805 case PROP_SEND_KEYFRAME_REQUESTS:
806 GST_OBJECT_LOCK (splitmux);
807 splitmux->send_keyframe_requests = g_value_get_boolean (value);
808 GST_OBJECT_UNLOCK (splitmux);
811 GST_OBJECT_LOCK (splitmux);
812 splitmux->max_files = g_value_get_uint (value);
813 GST_OBJECT_UNLOCK (splitmux);
815 case PROP_MUXER_OVERHEAD:
816 GST_OBJECT_LOCK (splitmux);
817 splitmux->mux_overhead = g_value_get_double (value);
818 GST_OBJECT_UNLOCK (splitmux);
820 case PROP_USE_ROBUST_MUXING:
821 GST_OBJECT_LOCK (splitmux);
822 splitmux->use_robust_muxing = g_value_get_boolean (value);
823 GST_OBJECT_UNLOCK (splitmux);
824 if (splitmux->use_robust_muxing)
825 update_muxer_properties (splitmux);
827 case PROP_ALIGNMENT_THRESHOLD:
828 GST_OBJECT_LOCK (splitmux);
829 splitmux->alignment_threshold = g_value_get_uint64 (value);
830 GST_OBJECT_UNLOCK (splitmux);
833 GST_OBJECT_LOCK (splitmux);
834 gst_clear_object (&splitmux->provided_sink);
835 splitmux->provided_sink = g_value_get_object (value);
836 if (splitmux->provided_sink)
837 gst_object_ref_sink (splitmux->provided_sink);
838 GST_OBJECT_UNLOCK (splitmux);
841 GST_OBJECT_LOCK (splitmux);
842 gst_clear_object (&splitmux->provided_muxer);
843 splitmux->provided_muxer = g_value_get_object (value);
844 if (splitmux->provided_muxer)
845 gst_object_ref_sink (splitmux->provided_muxer);
846 GST_OBJECT_UNLOCK (splitmux);
848 case PROP_RESET_MUXER:
849 GST_OBJECT_LOCK (splitmux);
850 splitmux->reset_muxer = g_value_get_boolean (value);
851 GST_OBJECT_UNLOCK (splitmux);
853 case PROP_ASYNC_FINALIZE:
854 GST_OBJECT_LOCK (splitmux);
855 splitmux->async_finalize = g_value_get_boolean (value);
856 GST_OBJECT_UNLOCK (splitmux);
858 case PROP_MUXER_FACTORY:
859 GST_OBJECT_LOCK (splitmux);
860 if (splitmux->muxer_factory)
861 g_free (splitmux->muxer_factory);
862 splitmux->muxer_factory = g_value_dup_string (value);
863 GST_OBJECT_UNLOCK (splitmux);
865 case PROP_MUXER_PRESET:
866 GST_OBJECT_LOCK (splitmux);
867 if (splitmux->muxer_preset)
868 g_free (splitmux->muxer_preset);
869 splitmux->muxer_preset = g_value_dup_string (value);
870 GST_OBJECT_UNLOCK (splitmux);
872 case PROP_MUXER_PROPERTIES:
873 GST_OBJECT_LOCK (splitmux);
874 if (splitmux->muxer_properties)
875 gst_structure_free (splitmux->muxer_properties);
876 if (gst_value_get_structure (value))
877 splitmux->muxer_properties =
878 gst_structure_copy (gst_value_get_structure (value));
880 splitmux->muxer_properties = NULL;
881 GST_OBJECT_UNLOCK (splitmux);
883 case PROP_SINK_FACTORY:
884 GST_OBJECT_LOCK (splitmux);
885 if (splitmux->sink_factory)
886 g_free (splitmux->sink_factory);
887 splitmux->sink_factory = g_value_dup_string (value);
888 GST_OBJECT_UNLOCK (splitmux);
890 case PROP_SINK_PRESET:
891 GST_OBJECT_LOCK (splitmux);
892 if (splitmux->sink_preset)
893 g_free (splitmux->sink_preset);
894 splitmux->sink_preset = g_value_dup_string (value);
895 GST_OBJECT_UNLOCK (splitmux);
897 case PROP_SINK_PROPERTIES:
898 GST_OBJECT_LOCK (splitmux);
899 if (splitmux->sink_properties)
900 gst_structure_free (splitmux->sink_properties);
901 if (gst_value_get_structure (value))
902 splitmux->sink_properties =
903 gst_structure_copy (gst_value_get_structure (value));
905 splitmux->sink_properties = NULL;
906 GST_OBJECT_UNLOCK (splitmux);
908 case PROP_MUXERPAD_MAP:
910 const GstStructure *s = gst_value_get_structure (value);
911 GST_SPLITMUX_LOCK (splitmux);
912 if (splitmux->muxerpad_map) {
913 gst_structure_free (splitmux->muxerpad_map);
916 splitmux->muxerpad_map = gst_structure_copy (s);
918 splitmux->muxerpad_map = NULL;
919 GST_SPLITMUX_UNLOCK (splitmux);
923 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
929 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
930 GValue * value, GParamSpec * pspec)
932 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
936 GST_OBJECT_LOCK (splitmux);
937 g_value_set_string (value, splitmux->location);
938 GST_OBJECT_UNLOCK (splitmux);
940 case PROP_START_INDEX:
941 GST_OBJECT_LOCK (splitmux);
942 g_value_set_int (value, splitmux->start_index);
943 GST_OBJECT_UNLOCK (splitmux);
945 case PROP_MAX_SIZE_BYTES:
946 GST_OBJECT_LOCK (splitmux);
947 g_value_set_uint64 (value, splitmux->threshold_bytes);
948 GST_OBJECT_UNLOCK (splitmux);
950 case PROP_MAX_SIZE_TIME:
951 GST_OBJECT_LOCK (splitmux);
952 g_value_set_uint64 (value, splitmux->threshold_time);
953 GST_OBJECT_UNLOCK (splitmux);
955 case PROP_MAX_SIZE_TIMECODE:
956 GST_OBJECT_LOCK (splitmux);
957 g_value_set_string (value, splitmux->threshold_timecode_str);
958 GST_OBJECT_UNLOCK (splitmux);
960 case PROP_SEND_KEYFRAME_REQUESTS:
961 GST_OBJECT_LOCK (splitmux);
962 g_value_set_boolean (value, splitmux->send_keyframe_requests);
963 GST_OBJECT_UNLOCK (splitmux);
966 GST_OBJECT_LOCK (splitmux);
967 g_value_set_uint (value, splitmux->max_files);
968 GST_OBJECT_UNLOCK (splitmux);
970 case PROP_MUXER_OVERHEAD:
971 GST_OBJECT_LOCK (splitmux);
972 g_value_set_double (value, splitmux->mux_overhead);
973 GST_OBJECT_UNLOCK (splitmux);
975 case PROP_USE_ROBUST_MUXING:
976 GST_OBJECT_LOCK (splitmux);
977 g_value_set_boolean (value, splitmux->use_robust_muxing);
978 GST_OBJECT_UNLOCK (splitmux);
980 case PROP_ALIGNMENT_THRESHOLD:
981 GST_OBJECT_LOCK (splitmux);
982 g_value_set_uint64 (value, splitmux->alignment_threshold);
983 GST_OBJECT_UNLOCK (splitmux);
986 GST_OBJECT_LOCK (splitmux);
987 g_value_set_object (value, splitmux->provided_sink);
988 GST_OBJECT_UNLOCK (splitmux);
991 GST_OBJECT_LOCK (splitmux);
992 g_value_set_object (value, splitmux->provided_muxer);
993 GST_OBJECT_UNLOCK (splitmux);
995 case PROP_RESET_MUXER:
996 GST_OBJECT_LOCK (splitmux);
997 g_value_set_boolean (value, splitmux->reset_muxer);
998 GST_OBJECT_UNLOCK (splitmux);
1000 case PROP_ASYNC_FINALIZE:
1001 GST_OBJECT_LOCK (splitmux);
1002 g_value_set_boolean (value, splitmux->async_finalize);
1003 GST_OBJECT_UNLOCK (splitmux);
1005 case PROP_MUXER_FACTORY:
1006 GST_OBJECT_LOCK (splitmux);
1007 g_value_set_string (value, splitmux->muxer_factory);
1008 GST_OBJECT_UNLOCK (splitmux);
1010 case PROP_MUXER_PRESET:
1011 GST_OBJECT_LOCK (splitmux);
1012 g_value_set_string (value, splitmux->muxer_preset);
1013 GST_OBJECT_UNLOCK (splitmux);
1015 case PROP_MUXER_PROPERTIES:
1016 GST_OBJECT_LOCK (splitmux);
1017 gst_value_set_structure (value, splitmux->muxer_properties);
1018 GST_OBJECT_UNLOCK (splitmux);
1020 case PROP_SINK_FACTORY:
1021 GST_OBJECT_LOCK (splitmux);
1022 g_value_set_string (value, splitmux->sink_factory);
1023 GST_OBJECT_UNLOCK (splitmux);
1025 case PROP_SINK_PRESET:
1026 GST_OBJECT_LOCK (splitmux);
1027 g_value_set_string (value, splitmux->sink_preset);
1028 GST_OBJECT_UNLOCK (splitmux);
1030 case PROP_SINK_PROPERTIES:
1031 GST_OBJECT_LOCK (splitmux);
1032 gst_value_set_structure (value, splitmux->sink_properties);
1033 GST_OBJECT_UNLOCK (splitmux);
1035 case PROP_MUXERPAD_MAP:
1036 GST_SPLITMUX_LOCK (splitmux);
1037 gst_value_set_structure (value, splitmux->muxerpad_map);
1038 GST_SPLITMUX_UNLOCK (splitmux);
1041 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1046 /* Convenience function */
1047 static inline GstClockTimeDiff
1048 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1050 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1052 if (GST_CLOCK_TIME_IS_VALID (val)) {
1054 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1064 mq_stream_ctx_reset (MqStreamCtx * ctx)
1066 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1067 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1068 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1069 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1070 g_queue_clear (&ctx->queued_bufs);
1073 static MqStreamCtx *
1074 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1078 ctx = g_new0 (MqStreamCtx, 1);
1079 ctx->splitmux = splitmux;
1080 g_queue_init (&ctx->queued_bufs);
1081 mq_stream_ctx_reset (ctx);
1087 mq_stream_ctx_free (MqStreamCtx * ctx)
1090 GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1092 g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1094 if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1095 gst_element_set_locked_state (ctx->q, TRUE);
1096 gst_element_set_state (ctx->q, GST_STATE_NULL);
1097 gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1098 gst_object_unref (parent);
1100 gst_object_unref (ctx->q);
1102 gst_object_unref (ctx->sinkpad);
1103 gst_object_unref (ctx->srcpad);
1104 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1105 g_queue_clear (&ctx->queued_bufs);
1110 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1113 gchar *location = NULL;
1115 const gchar *msg_name = opened ?
1116 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1117 GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1120 GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1122 running_time = *rtime;
1125 if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1126 "location") != NULL)
1127 g_object_get (sink, "location", &location, NULL);
1129 GST_DEBUG_OBJECT (splitmux,
1130 "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1131 msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1133 /* If it's in the middle of a teardown, the reference_ctc might have become
1135 if (splitmux->reference_ctx) {
1136 msg = gst_message_new_element (GST_OBJECT (splitmux),
1137 gst_structure_new (msg_name,
1138 "location", G_TYPE_STRING, location,
1139 "running-time", GST_TYPE_CLOCK_TIME, running_time,
1140 "sink", GST_TYPE_ELEMENT, sink, NULL));
1141 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1148 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1154 eos = gst_event_new_eos ();
1158 GST_SPLITMUX_LOCK (splitmux);
1160 pad = gst_pad_get_peer (ctx->srcpad);
1161 GST_SPLITMUX_UNLOCK (splitmux);
1163 gst_pad_send_event (pad, eos);
1164 GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1166 gst_object_unref (pad);
1170 /* Called with lock held, drops the lock to send EOS to the
1174 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1179 eos = gst_event_new_eos ();
1180 pad = gst_pad_get_peer (ctx->srcpad);
1182 ctx->out_eos = TRUE;
1184 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1185 GST_SPLITMUX_UNLOCK (splitmux);
1186 gst_pad_send_event (pad, eos);
1187 GST_SPLITMUX_LOCK (splitmux);
1189 gst_object_unref (pad);
1192 /* Called with lock held. Schedules an EOS event to the ctx pad
1193 * to happen in another thread */
1195 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1197 AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1198 GstPad *srcpad, *sinkpad;
1200 srcpad = ctx->srcpad;
1201 sinkpad = gst_pad_get_peer (srcpad);
1204 helper->pad = sinkpad; /* Takes the reference */
1206 ctx->out_eos_async_done = TRUE;
1208 /* There used to be a bug here, where we had to explicitly remove
1209 * the SINK flag so that GstBin would ignore it for EOS purposes.
1210 * That fixed a race where if splitmuxsink really reaches EOS
1211 * before an asynchronous background element has finished, then
1212 * the bin wouldn't actually send EOS to the pipeline. Even after
1213 * finishing and removing the old element, the bin didn't re-check
1214 * EOS status on removing a SINK element. That bug was fixed
1216 GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1219 g_assert_nonnull (helper->pad);
1220 gst_element_call_async (GST_ELEMENT (splitmux),
1221 (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1224 /* Called with lock held. TRUE iff all contexts have a
1225 * pending (or delivered) async eos event */
1227 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1229 gboolean ret = TRUE;
1232 for (item = splitmux->contexts; item; item = item->next) {
1233 MqStreamCtx *ctx = item->data;
1234 ret &= ctx->out_eos_async_done;
1239 /* Called with splitmux lock held to check if this output
1240 * context needs to sleep to wait for the release of the
1241 * next GOP, or to send EOS to close out the current file
1243 static GstFlowReturn
1244 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1246 if (ctx->caps_change)
1250 /* When first starting up, the reference stream has to output
1251 * the first buffer to prepare the muxer and sink */
1252 gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1253 GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1255 if (my_max_out_running_time != GST_CLOCK_STIME_NONE
1256 && my_max_out_running_time != G_MAXINT64) {
1257 my_max_out_running_time -= splitmux->alignment_threshold;
1258 GST_LOG_OBJECT (ctx->srcpad,
1259 "Max out running time currently %" GST_STIME_FORMAT
1260 ", with threshold applied it is %" GST_STIME_FORMAT,
1261 GST_STIME_ARGS (splitmux->max_out_running_time),
1262 GST_STIME_ARGS (my_max_out_running_time));
1266 || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1267 return GST_FLOW_FLUSHING;
1269 GST_LOG_OBJECT (ctx->srcpad,
1270 "Checking running time %" GST_STIME_FORMAT " against max %"
1271 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1272 GST_STIME_ARGS (my_max_out_running_time));
1275 if (splitmux->max_out_running_time != GST_CLOCK_STIME_NONE &&
1276 ctx->out_running_time < my_max_out_running_time) {
1280 switch (splitmux->output_state) {
1281 case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1282 /* We only get here if we've finished outputting a GOP and need to know
1283 * what to do next */
1284 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1285 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1288 case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1289 case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1290 /* We've reached the max out running_time to get here, so end this file now */
1291 if (ctx->out_eos == FALSE) {
1292 if (splitmux->async_finalize) {
1293 /* We must set EOS asynchronously at this point. We cannot defer
1294 * it, because we need all contexts to wake up, for the
1295 * reference context to eventually give us something at
1296 * START_NEXT_FILE. Otherwise, collectpads might choose another
1297 * context to give us the first buffer, and format-location-full
1298 * will not contain a valid sample. */
1299 g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1300 GINT_TO_POINTER (1));
1301 eos_context_async (ctx, splitmux);
1302 if (all_contexts_are_async_eos (splitmux)) {
1303 GST_INFO_OBJECT (splitmux,
1304 "All contexts are async_eos. Moving to the next file.");
1305 /* We can start the next file once we've asked each pad to go EOS */
1306 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1307 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1311 send_eos (splitmux, ctx);
1315 GST_INFO_OBJECT (splitmux,
1316 "At end-of-file state, but context %p is already EOS", ctx);
1319 case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1320 if (ctx->is_reference) {
1321 GstFlowReturn ret = GST_FLOW_OK;
1323 /* Special handling on the reference ctx to start new fragments
1324 * and collect commands from the command queue */
1325 /* drops the splitmux lock briefly: */
1326 /* We must have reference ctx in order for format-location-full to
1328 ret = start_next_fragment (splitmux, ctx);
1329 if (ret != GST_FLOW_OK)
1335 case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1337 SplitMuxOutputCommand *cmd =
1338 g_queue_pop_tail (&splitmux->out_cmd_q);
1340 /* If we pop the last command, we need to make our queues bigger */
1341 if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1342 grow_blocked_queues (splitmux);
1344 if (cmd->start_new_fragment) {
1345 if (splitmux->muxed_out_bytes > 0) {
1346 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1347 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1349 GST_DEBUG_OBJECT (splitmux,
1350 "Got cmd to start new fragment, but fragment is empty - ignoring.");
1353 GST_DEBUG_OBJECT (splitmux,
1354 "Got new output cmd for time %" GST_STIME_FORMAT,
1355 GST_STIME_ARGS (cmd->max_output_ts));
1357 /* Extend the output range immediately */
1358 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE
1359 || cmd->max_output_ts > splitmux->max_out_running_time)
1360 splitmux->max_out_running_time = cmd->max_output_ts;
1361 GST_DEBUG_OBJECT (splitmux,
1362 "Max out running time now %" GST_STIME_FORMAT,
1363 GST_STIME_ARGS (splitmux->max_out_running_time));
1364 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1366 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1368 out_cmd_buf_free (cmd);
1371 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1373 } while (!ctx->flushing && splitmux->output_state ==
1374 SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1375 /* loop and re-check the state */
1378 case SPLITMUX_OUTPUT_STATE_STOPPED:
1379 return GST_FLOW_FLUSHING;
1382 GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1385 GST_INFO_OBJECT (ctx->srcpad,
1386 "Sleeping for running time %"
1387 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1388 GST_STIME_ARGS (ctx->out_running_time),
1389 GST_STIME_ARGS (splitmux->max_out_running_time));
1390 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1391 GST_INFO_OBJECT (ctx->srcpad,
1392 "Woken for new max running time %" GST_STIME_FORMAT,
1393 GST_STIME_ARGS (splitmux->max_out_running_time));
1401 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1402 const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1403 GstVideoTimeCode ** next_tc)
1405 GstVideoTimeCode *target_tc;
1406 GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1408 if (cur_tc == NULL || splitmux->tc_interval == NULL)
1409 return GST_CLOCK_TIME_NONE;
1411 target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1413 GST_ELEMENT_ERROR (splitmux,
1414 STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1415 return GST_CLOCK_TIME_NONE;
1419 target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1420 cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1422 /* Add running_time, accounting for wraparound. */
1423 if (target_tc_time >= cur_tc_time) {
1424 next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1426 GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1428 if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1429 (cur_tc->config.fps_d == 1001)) {
1430 /* Checking fps_d is probably unneeded, but better safe than sorry
1431 * (e.g. someone accidentally set a flag) */
1432 GstVideoTimeCode *tc_for_offset;
1434 /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1435 * but slightly less. Calculate that duration from a fake timecode. The
1436 * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1437 * is to add one frame to 23:59:59;29 */
1439 gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1440 NULL, cur_tc->config.flags, 23, 59, 59,
1441 cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1443 gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1444 gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1445 cur_tc->config.fps_n);
1446 gst_video_time_code_free (tc_for_offset);
1448 next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1451 #ifndef GST_DISABLE_GST_DEBUG
1453 gchar *next_max_tc_str, *cur_tc_str;
1455 cur_tc_str = gst_video_time_code_to_string (cur_tc);
1456 next_max_tc_str = gst_video_time_code_to_string (target_tc);
1458 GST_INFO_OBJECT (splitmux, "Next max timecode %s time: %" GST_TIME_FORMAT
1459 " from ref timecode %s time: %" GST_TIME_FORMAT,
1461 GST_TIME_ARGS (next_max_tc_time),
1462 cur_tc_str, GST_TIME_ARGS (cur_tc_time));
1464 g_free (next_max_tc_str);
1465 g_free (cur_tc_str);
1470 *next_tc = target_tc;
1472 gst_video_time_code_free (target_tc);
1474 return next_max_tc_time;
1478 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1479 GstClockTimeDiff running_time_dts)
1482 GstClockTime target_time;
1483 gboolean timecode_based = FALSE;
1484 GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1485 GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1486 GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1487 GstClockTime tc_rounding_error = 5 * GST_USECOND;
1488 InputGop *newest_gop = NULL;
1491 if (!splitmux->send_keyframe_requests)
1494 /* Find the newest GOP where we passed in DTS the start PTS */
1495 for (l = splitmux->pending_input_gops.tail; l; l = l->prev) {
1496 InputGop *tmp = l->data;
1498 GST_TRACE_OBJECT (splitmux,
1499 "Having pending input GOP with start PTS %" GST_STIME_FORMAT
1500 " and start time %" GST_STIME_FORMAT,
1501 GST_STIME_ARGS (tmp->start_time_pts), GST_STIME_ARGS (tmp->start_time));
1503 if (tmp->sent_fku) {
1504 GST_DEBUG_OBJECT (splitmux,
1505 "Already checked for a keyframe request for this GOP");
1509 if (running_time_dts == GST_CLOCK_STIME_NONE ||
1510 tmp->start_time_pts == GST_CLOCK_STIME_NONE ||
1511 running_time_dts >= tmp->start_time_pts) {
1512 GST_DEBUG_OBJECT (splitmux,
1513 "Using GOP with start PTS %" GST_STIME_FORMAT " and start time %"
1514 GST_STIME_FORMAT, GST_STIME_ARGS (tmp->start_time_pts),
1515 GST_STIME_ARGS (tmp->start_time));
1522 GST_DEBUG_OBJECT (splitmux, "Have no complete enough pending input GOP");
1526 if (splitmux->tc_interval) {
1527 if (newest_gop->start_tc
1528 && gst_video_time_code_is_valid (newest_gop->start_tc)) {
1529 GstVideoTimeCode *next_tc = NULL;
1531 calculate_next_max_timecode (splitmux, newest_gop->start_tc,
1532 newest_gop->start_time, &next_tc);
1534 /* calculate the next expected keyframe time to prevent too early fku
1536 if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1538 calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1541 gst_video_time_code_free (next_tc);
1543 timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1544 GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1546 if (!timecode_based) {
1547 GST_WARNING_OBJECT (splitmux,
1548 "Couldn't calculate maximum fragment time for timecode mode");
1551 /* This can happen in the presence of GAP events that trigger
1552 * a new fragment start */
1553 GST_WARNING_OBJECT (splitmux,
1554 "No buffer available to calculate next timecode");
1558 if ((splitmux->threshold_time == 0 && !timecode_based)
1559 || splitmux->threshold_bytes != 0)
1562 if (timecode_based) {
1563 /* We might have rounding errors: aim slightly earlier */
1564 if (max_tc_time >= tc_rounding_error) {
1565 target_time = max_tc_time - tc_rounding_error;
1567 /* unreliable target time */
1568 GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1569 " is smaller than allowed rounding error, set it to zero",
1570 GST_TIME_ARGS (max_tc_time));
1574 if (next_max_tc_time >= tc_rounding_error) {
1575 next_fku_time = next_max_tc_time - tc_rounding_error;
1577 /* unreliable target time */
1578 GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1579 " is smaller than allowed rounding error, set it to zero",
1580 GST_TIME_ARGS (next_max_tc_time));
1584 target_time = newest_gop->start_time + splitmux->threshold_time;
1587 if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1588 GstClockTime allowed_time = splitmux->next_fku_time;
1590 if (timecode_based) {
1591 if (allowed_time >= tc_rounding_error) {
1592 allowed_time -= tc_rounding_error;
1594 /* unreliable next force key unit time */
1595 GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1597 " is smaller than allowed rounding error, set it to zero",
1598 GST_TIME_ARGS (splitmux->next_fku_time));
1603 if (target_time < allowed_time) {
1604 GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1605 " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1606 ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1607 GST_TIME_ARGS (target_time),
1608 GST_TIME_ARGS (splitmux->next_fku_time),
1609 GST_TIME_ARGS (allowed_time));
1612 } else if (allowed_time != splitmux->next_fku_time &&
1613 target_time < splitmux->next_fku_time) {
1614 GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1615 " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1616 ", but the difference is smaller than allowed rounding error",
1617 GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1621 if (!timecode_based) {
1622 next_fku_time = target_time + splitmux->threshold_time;
1625 GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1626 ", the next expected keyframe request time is %" GST_TIME_FORMAT,
1627 GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1629 newest_gop->sent_fku = TRUE;
1631 splitmux->next_fku_time = next_fku_time;
1632 ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1634 return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1637 static GstPadProbeReturn
1638 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1640 GstSplitMuxSink *splitmux = ctx->splitmux;
1641 MqStreamBuf *buf_info = NULL;
1642 GstFlowReturn ret = GST_FLOW_OK;
1644 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1646 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1647 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1648 g_warning ("Buffer list handling not implemented");
1649 return GST_PAD_PROBE_DROP;
1651 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1652 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1653 GstEvent *event = gst_pad_probe_info_get_event (info);
1654 gboolean locked = FALSE, wait = !ctx->is_reference;
1656 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1658 switch (GST_EVENT_TYPE (event)) {
1659 case GST_EVENT_SEGMENT:
1660 gst_event_copy_segment (event, &ctx->out_segment);
1662 case GST_EVENT_FLUSH_STOP:
1663 GST_SPLITMUX_LOCK (splitmux);
1665 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1666 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1667 g_queue_clear (&ctx->queued_bufs);
1668 g_queue_clear (&ctx->queued_bufs);
1669 /* If this is the reference context, we just threw away any queued keyframes */
1670 if (ctx->is_reference)
1671 splitmux->queued_keyframes = 0;
1672 ctx->flushing = FALSE;
1675 case GST_EVENT_FLUSH_START:
1676 GST_SPLITMUX_LOCK (splitmux);
1678 GST_LOG_OBJECT (pad, "Flush start");
1679 ctx->flushing = TRUE;
1680 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1681 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1684 GST_SPLITMUX_LOCK (splitmux);
1686 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1688 ctx->out_eos = TRUE;
1690 if (ctx == splitmux->reference_ctx) {
1691 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1692 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1695 GST_INFO_OBJECT (splitmux,
1696 "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1698 case GST_EVENT_GAP:{
1699 GstClockTime gap_ts;
1700 GstClockTimeDiff rtime;
1702 gst_event_parse_gap (event, &gap_ts, NULL);
1703 if (gap_ts == GST_CLOCK_TIME_NONE)
1706 GST_SPLITMUX_LOCK (splitmux);
1709 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1712 /* When we get a gap event on the
1713 * reference stream and we're trying to open a
1714 * new file, we need to store it until we get
1715 * the buffer afterwards
1717 if (ctx->is_reference &&
1718 (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1719 GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1720 gst_event_replace (&ctx->pending_gap, event);
1721 GST_SPLITMUX_UNLOCK (splitmux);
1722 return GST_PAD_PROBE_HANDLED;
1725 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1727 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1728 GST_STIME_ARGS (rtime));
1730 if (rtime != GST_CLOCK_STIME_NONE) {
1731 ctx->out_running_time = rtime;
1732 complete_or_wait_on_out (splitmux, ctx);
1736 case GST_EVENT_CUSTOM_DOWNSTREAM:{
1737 const GstStructure *s;
1738 GstClockTimeDiff ts = 0;
1740 s = gst_event_get_structure (event);
1741 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1744 gst_structure_get_int64 (s, "timestamp", &ts);
1746 GST_SPLITMUX_LOCK (splitmux);
1749 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1751 ctx->out_running_time = ts;
1752 if (!ctx->is_reference)
1753 ret = complete_or_wait_on_out (splitmux, ctx);
1754 GST_SPLITMUX_UNLOCK (splitmux);
1755 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1756 return GST_PAD_PROBE_DROP;
1758 case GST_EVENT_CAPS:{
1761 if (!ctx->is_reference)
1764 peer = gst_pad_get_peer (pad);
1766 gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1768 gst_object_unref (peer);
1776 /* This is in the case the muxer doesn't allow this change of caps */
1777 GST_SPLITMUX_LOCK (splitmux);
1779 ctx->caps_change = TRUE;
1781 if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1782 GST_DEBUG_OBJECT (splitmux,
1783 "New caps were not accepted. Switching output file");
1784 if (ctx->out_eos == FALSE) {
1785 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1786 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1790 /* Lets it fall through, if it fails again, then the muxer just can't
1791 * support this format, but at least we have a closed file.
1799 /* We need to make sure events aren't passed
1800 * until the muxer / sink are ready for it */
1802 GST_SPLITMUX_LOCK (splitmux);
1804 ret = complete_or_wait_on_out (splitmux, ctx);
1805 GST_SPLITMUX_UNLOCK (splitmux);
1807 /* Don't try to forward sticky events before the next buffer is there
1808 * because it would cause a new file to be created without the first
1809 * buffer being available.
1811 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1812 if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1813 gst_event_unref (event);
1814 return GST_PAD_PROBE_HANDLED;
1816 return GST_PAD_PROBE_PASS;
1820 /* Allow everything through until the configured next stopping point */
1821 GST_SPLITMUX_LOCK (splitmux);
1823 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1824 if (buf_info == NULL) {
1825 /* Can only happen due to a poorly timed flush */
1826 ret = GST_FLOW_FLUSHING;
1830 /* If we have popped a keyframe, decrement the queued_gop count */
1831 if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
1832 splitmux->queued_keyframes--;
1834 ctx->out_running_time = buf_info->run_ts;
1835 ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1837 GST_LOG_OBJECT (splitmux,
1838 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1839 " size %" G_GUINT64_FORMAT,
1840 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1842 ctx->caps_change = FALSE;
1844 ret = complete_or_wait_on_out (splitmux, ctx);
1846 splitmux->muxed_out_bytes += buf_info->buf_size;
1848 #ifndef GST_DISABLE_GST_DEBUG
1850 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1851 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1852 " run ts %" GST_STIME_FORMAT, buf,
1853 GST_STIME_ARGS (ctx->out_running_time));
1857 ctx->cur_out_buffer = NULL;
1858 GST_SPLITMUX_UNLOCK (splitmux);
1860 /* pending_gap is protected by the STREAM lock */
1861 if (ctx->pending_gap) {
1862 /* If we previously stored a gap event, send it now */
1863 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1865 GST_DEBUG_OBJECT (splitmux,
1866 "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1868 gst_pad_send_event (peer, ctx->pending_gap);
1869 ctx->pending_gap = NULL;
1871 gst_object_unref (peer);
1874 mq_stream_buf_free (buf_info);
1876 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1877 return GST_PAD_PROBE_PASS;
1880 GST_SPLITMUX_UNLOCK (splitmux);
1881 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1882 return GST_PAD_PROBE_DROP;
1886 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1888 return gst_pad_send_event (peer, gst_event_ref (*event));
1892 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1894 if (ctx->fragment_block_id > 0) {
1895 gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1896 ctx->fragment_block_id = 0;
1901 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1903 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1905 gst_pad_sticky_events_foreach (ctx->srcpad,
1906 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1908 /* Clear EOS flag if not actually EOS */
1909 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1910 ctx->out_eos_async_done = ctx->out_eos;
1912 gst_object_unref (peer);
1916 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1918 GstPad *sinkpad, *srcpad, *newpad;
1919 GstPadTemplate *templ;
1921 srcpad = ctx->srcpad;
1922 sinkpad = gst_pad_get_peer (srcpad);
1924 templ = sinkpad->padtemplate;
1926 gst_element_request_pad (splitmux->muxer, templ,
1927 GST_PAD_NAME (sinkpad), NULL);
1929 GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1931 if (!gst_pad_unlink (srcpad, sinkpad)) {
1932 gst_object_unref (sinkpad);
1935 if (gst_pad_link_full (srcpad, newpad,
1936 GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1937 gst_element_release_request_pad (splitmux->muxer, newpad);
1938 gst_object_unref (sinkpad);
1939 gst_object_unref (newpad);
1942 gst_object_unref (newpad);
1943 gst_object_unref (sinkpad);
1947 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1948 ("Could not create the new muxer/sink"), NULL);
1951 static GstPadProbeReturn
1952 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1954 return GST_PAD_PROBE_OK;
1958 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1960 ctx->fragment_block_id =
1961 gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1966 _set_property_from_structure (GQuark field_id, const GValue * value,
1969 const gchar *property_name = g_quark_to_string (field_id);
1970 GObject *element = G_OBJECT (user_data);
1972 g_object_set_property (element, property_name, value);
1978 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1980 gst_element_set_locked_state (element, TRUE);
1981 gst_element_set_state (element, GST_STATE_NULL);
1982 GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1983 gst_bin_remove (GST_BIN (splitmux), element);
1988 _send_event (const GValue * value, gpointer user_data)
1990 GstPad *pad = g_value_get_object (value);
1991 GstEvent *ev = user_data;
1993 gst_pad_send_event (pad, gst_event_ref (ev));
1996 /* Called with lock held when a fragment
1997 * reaches EOS and it is time to restart
2000 static GstFlowReturn
2001 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2003 GstElement *muxer, *sink;
2005 g_assert (ctx->is_reference);
2007 /* 1 change to new file */
2008 splitmux->switching_fragment = TRUE;
2010 /* We need to drop the splitmux lock to acquire the state lock
2011 * here and ensure there's no racy state change going on elsewhere */
2012 muxer = gst_object_ref (splitmux->muxer);
2013 sink = gst_object_ref (splitmux->active_sink);
2015 GST_SPLITMUX_UNLOCK (splitmux);
2016 GST_SPLITMUX_STATE_LOCK (splitmux);
2018 if (splitmux->shutdown) {
2019 GST_DEBUG_OBJECT (splitmux,
2020 "Shutdown requested. Aborting fragment switch.");
2021 GST_SPLITMUX_LOCK (splitmux);
2022 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2023 gst_object_unref (muxer);
2024 gst_object_unref (sink);
2025 return GST_FLOW_FLUSHING;
2028 if (splitmux->async_finalize) {
2029 if (splitmux->muxed_out_bytes > 0
2030 || splitmux->fragment_id != splitmux->start_index) {
2032 GstElement *new_sink, *new_muxer;
2034 GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
2035 splitmux->fragment_id);
2036 g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
2037 newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
2038 GST_SPLITMUX_LOCK (splitmux);
2039 if ((splitmux->sink =
2040 create_element (splitmux, splitmux->sink_factory, newname,
2043 if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
2044 gst_preset_load_preset (GST_PRESET (splitmux->sink),
2045 splitmux->sink_preset);
2046 if (splitmux->sink_properties)
2047 gst_structure_foreach (splitmux->sink_properties,
2048 _set_property_from_structure, splitmux->sink);
2049 splitmux->active_sink = splitmux->sink;
2050 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
2052 newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
2053 if ((splitmux->muxer =
2054 create_element (splitmux, splitmux->muxer_factory, newname,
2057 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2059 /* async child elements are causing state change races and weird
2060 * failures, so let's try and turn that off */
2061 g_object_set (splitmux->sink, "async", FALSE, NULL);
2063 if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
2064 gst_preset_load_preset (GST_PRESET (splitmux->muxer),
2065 splitmux->muxer_preset);
2066 if (splitmux->muxer_properties)
2067 gst_structure_foreach (splitmux->muxer_properties,
2068 _set_property_from_structure, splitmux->muxer);
2069 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2071 new_sink = splitmux->sink;
2072 new_muxer = splitmux->muxer;
2073 GST_SPLITMUX_UNLOCK (splitmux);
2074 g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2075 gst_element_link (new_muxer, new_sink);
2077 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2078 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2079 EOS_FROM_US)) == 2) {
2080 _lock_and_set_to_null (muxer, splitmux);
2081 _lock_and_set_to_null (sink, splitmux);
2083 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2084 GINT_TO_POINTER (2));
2087 gst_object_unref (muxer);
2088 gst_object_unref (sink);
2091 gst_object_ref (muxer);
2092 gst_object_ref (sink);
2096 gst_element_set_locked_state (muxer, TRUE);
2097 gst_element_set_locked_state (sink, TRUE);
2098 gst_element_set_state (sink, GST_STATE_NULL);
2100 if (splitmux->reset_muxer) {
2101 gst_element_set_state (muxer, GST_STATE_NULL);
2103 GstIterator *it = gst_element_iterate_sink_pads (muxer);
2107 ev = gst_event_new_flush_start ();
2108 seqnum = gst_event_get_seqnum (ev);
2109 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2110 gst_event_unref (ev);
2112 gst_iterator_resync (it);
2114 ev = gst_event_new_flush_stop (TRUE);
2115 gst_event_set_seqnum (ev, seqnum);
2116 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2117 gst_event_unref (ev);
2119 gst_iterator_free (it);
2123 GST_SPLITMUX_LOCK (splitmux);
2124 set_next_filename (splitmux, ctx);
2125 splitmux->muxed_out_bytes = 0;
2126 GST_SPLITMUX_UNLOCK (splitmux);
2128 if (gst_element_set_state (sink,
2129 GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2130 gst_element_set_state (sink, GST_STATE_NULL);
2131 gst_element_set_locked_state (muxer, FALSE);
2132 gst_element_set_locked_state (sink, FALSE);
2137 if (gst_element_set_state (muxer,
2138 GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2139 gst_element_set_state (muxer, GST_STATE_NULL);
2140 gst_element_set_state (sink, GST_STATE_NULL);
2141 gst_element_set_locked_state (muxer, FALSE);
2142 gst_element_set_locked_state (sink, FALSE);
2146 gst_element_set_locked_state (muxer, FALSE);
2147 gst_element_set_locked_state (sink, FALSE);
2149 gst_object_unref (sink);
2150 gst_object_unref (muxer);
2152 GST_SPLITMUX_LOCK (splitmux);
2153 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2154 splitmux->switching_fragment = FALSE;
2155 do_async_done (splitmux);
2157 splitmux->ready_for_output = TRUE;
2159 g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2160 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2162 send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2164 /* FIXME: Is this always the correct next state? */
2165 GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2166 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2167 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2171 gst_object_unref (sink);
2172 gst_object_unref (muxer);
2174 GST_SPLITMUX_LOCK (splitmux);
2175 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2176 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2177 ("Could not create the new muxer/sink"), NULL);
2178 return GST_FLOW_ERROR;
2181 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2182 ("Could not start new output sink"), NULL);
2183 gst_object_unref (sink);
2184 gst_object_unref (muxer);
2186 GST_SPLITMUX_LOCK (splitmux);
2187 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2188 splitmux->switching_fragment = FALSE;
2189 return GST_FLOW_ERROR;
2192 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2193 ("Could not start new muxer"), NULL);
2194 gst_object_unref (sink);
2195 gst_object_unref (muxer);
2197 GST_SPLITMUX_LOCK (splitmux);
2198 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2199 splitmux->switching_fragment = FALSE;
2200 return GST_FLOW_ERROR;
2204 bus_handler (GstBin * bin, GstMessage * message)
2206 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2208 switch (GST_MESSAGE_TYPE (message)) {
2209 case GST_MESSAGE_EOS:{
2210 /* If the state is draining out the current file, drop this EOS */
2213 sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2214 GST_SPLITMUX_LOCK (splitmux);
2216 send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2218 if (splitmux->async_finalize) {
2220 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2221 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2222 EOS_FROM_US)) == 2) {
2224 GstPad *sinksink, *muxersrc;
2226 sinksink = gst_element_get_static_pad (sink, "sink");
2227 muxersrc = gst_pad_get_peer (sinksink);
2228 muxer = gst_pad_get_parent_element (muxersrc);
2229 gst_object_unref (sinksink);
2230 gst_object_unref (muxersrc);
2232 gst_element_call_async (muxer,
2233 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2234 gst_object_ref (splitmux), gst_object_unref);
2235 gst_element_call_async (sink,
2236 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2237 gst_object_ref (splitmux), gst_object_unref);
2238 gst_object_unref (muxer);
2240 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2241 GINT_TO_POINTER (2));
2243 GST_DEBUG_OBJECT (splitmux,
2244 "Caught async EOS from previous muxer+sink. Dropping.");
2245 /* We forward the EOS so that it gets aggregated as normal. If the sink
2246 * finishes and is removed before the end, it will be de-aggregated */
2247 gst_message_unref (message);
2248 GST_SPLITMUX_UNLOCK (splitmux);
2251 } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2252 GST_DEBUG_OBJECT (splitmux,
2253 "Passing EOS message. Output state %d max_out_running_time %"
2254 GST_STIME_FORMAT, splitmux->output_state,
2255 GST_STIME_ARGS (splitmux->max_out_running_time));
2257 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2258 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2259 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2261 gst_message_unref (message);
2262 GST_SPLITMUX_UNLOCK (splitmux);
2265 GST_SPLITMUX_UNLOCK (splitmux);
2268 case GST_MESSAGE_ASYNC_START:
2269 case GST_MESSAGE_ASYNC_DONE:
2270 /* Ignore state changes from our children while switching */
2271 GST_SPLITMUX_LOCK (splitmux);
2272 if (splitmux->switching_fragment) {
2273 if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2274 || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2275 GST_LOG_OBJECT (splitmux,
2276 "Ignoring state change from child %" GST_PTR_FORMAT
2277 " while switching", GST_MESSAGE_SRC (message));
2278 gst_message_unref (message);
2279 GST_SPLITMUX_UNLOCK (splitmux);
2283 GST_SPLITMUX_UNLOCK (splitmux);
2285 case GST_MESSAGE_WARNING:
2287 GError *gerror = NULL;
2289 gst_message_parse_warning (message, &gerror, NULL);
2291 if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2293 gboolean caps_change = FALSE;
2295 GST_SPLITMUX_LOCK (splitmux);
2297 for (item = splitmux->contexts; item; item = item->next) {
2298 MqStreamCtx *ctx = item->data;
2300 if (ctx->caps_change) {
2306 GST_SPLITMUX_UNLOCK (splitmux);
2309 GST_LOG_OBJECT (splitmux,
2310 "Ignoring warning change from child %" GST_PTR_FORMAT
2311 " while switching caps", GST_MESSAGE_SRC (message));
2312 gst_message_unref (message);
2322 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2326 ctx_set_unblock (MqStreamCtx * ctx)
2328 ctx->need_unblock = TRUE;
2332 need_new_fragment (GstSplitMuxSink * splitmux,
2333 GstClockTime queued_time, GstClockTime queued_gop_time,
2334 guint64 queued_bytes)
2336 guint64 thresh_bytes;
2337 GstClockTime thresh_time;
2338 gboolean check_robust_muxing;
2339 GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2340 GstClockTime *ptr_to_time;
2341 const InputGop *gop, *next_gop;
2343 GST_OBJECT_LOCK (splitmux);
2344 thresh_bytes = splitmux->threshold_bytes;
2345 thresh_time = splitmux->threshold_time;
2346 ptr_to_time = (GstClockTime *)
2347 gst_queue_array_peek_head_struct (splitmux->times_to_split);
2349 time_to_split = *ptr_to_time;
2350 check_robust_muxing = splitmux->use_robust_muxing
2351 && splitmux->muxer_has_reserved_props;
2352 GST_OBJECT_UNLOCK (splitmux);
2354 /* Have we muxed at least one thing from the reference
2355 * stream into the file? If not, no other streams can have
2357 if (splitmux->fragment_reference_bytes <= 0) {
2358 GST_TRACE_OBJECT (splitmux,
2359 "Not ready to split - nothing muxed on the reference stream");
2363 /* User told us to split now */
2364 if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2365 GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2369 gop = g_queue_peek_head (&splitmux->pending_input_gops);
2370 /* We need a full GOP queued up at this point */
2371 g_assert (gop != NULL);
2372 next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2373 /* And the beginning of the next GOP or otherwise EOS */
2375 /* User told us to split at this running time */
2376 if (gop->start_time >= time_to_split) {
2377 GST_OBJECT_LOCK (splitmux);
2378 /* Dequeue running time */
2379 gst_queue_array_pop_head_struct (splitmux->times_to_split);
2380 /* Empty any running times after this that are past now */
2381 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2382 while (ptr_to_time) {
2383 time_to_split = *ptr_to_time;
2384 if (gop->start_time < time_to_split) {
2387 gst_queue_array_pop_head_struct (splitmux->times_to_split);
2388 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2390 GST_TRACE_OBJECT (splitmux,
2391 "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2392 GST_STIME_FORMAT, GST_STIME_ARGS (gop->start_time),
2393 GST_STIME_ARGS (time_to_split));
2394 GST_OBJECT_UNLOCK (splitmux);
2398 if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2399 GST_TRACE_OBJECT (splitmux,
2400 "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2401 return TRUE; /* Would overrun byte limit */
2404 if (thresh_time > 0 && queued_time > thresh_time) {
2405 GST_TRACE_OBJECT (splitmux,
2406 "queued time %" GST_STIME_FORMAT " overruns time limit",
2407 GST_STIME_ARGS (queued_time));
2408 return TRUE; /* Would overrun time limit */
2411 if (splitmux->tc_interval) {
2412 GstClockTime next_gop_start_time =
2413 next_gop ? next_gop->start_time : splitmux->max_in_running_time;
2415 if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2416 GST_CLOCK_STIME_IS_VALID (next_gop_start_time) &&
2417 next_gop_start_time >
2418 splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2419 GST_TRACE_OBJECT (splitmux,
2420 "in running time %" GST_STIME_FORMAT " overruns time limit %"
2421 GST_TIME_FORMAT, GST_STIME_ARGS (next_gop_start_time),
2422 GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2427 if (check_robust_muxing) {
2428 GstClockTime mux_reserved_remain;
2430 g_object_get (splitmux->muxer,
2431 "reserved-duration-remaining", &mux_reserved_remain, NULL);
2433 GST_LOG_OBJECT (splitmux,
2434 "Muxer robust muxing report - %" G_GUINT64_FORMAT
2435 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2436 mux_reserved_remain, queued_gop_time);
2438 if (queued_gop_time >= mux_reserved_remain) {
2439 GST_INFO_OBJECT (splitmux,
2440 "File is about to run out of header room - %" G_GUINT64_FORMAT
2441 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2442 ". Switching to new file", mux_reserved_remain, queued_gop_time);
2447 /* Continue and mux this GOP */
2451 /* probably we want to add this API? */
2453 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2455 GstVideoTimeCode *timecode = NULL;
2457 g_return_if_fail (old_tc != NULL);
2459 if (*old_tc == new_tc)
2463 timecode = gst_video_time_code_copy (new_tc);
2466 gst_video_time_code_free (*old_tc);
2471 /* Called with splitmux lock held */
2472 /* Called when entering ProcessingCompleteGop state
2473 * Assess if mq contents overflowed the current file
2474 * -> If yes, need to switch to new file
2475 * -> if no, set max_out_running_time to let this GOP in and
2476 * go to COLLECTING_GOP_START state
2479 handle_gathered_gop (GstSplitMuxSink * splitmux, const InputGop * gop,
2480 GstClockTimeDiff next_gop_start_time, GstClockTimeDiff max_out_running_time)
2482 guint64 queued_bytes;
2483 GstClockTimeDiff queued_time = 0;
2484 GstClockTimeDiff queued_gop_time = 0;
2485 SplitMuxOutputCommand *cmd;
2487 /* Assess if the multiqueue contents overflowed the current file */
2488 /* When considering if a newly gathered GOP overflows
2489 * the time limit for the file, only consider the running time of the
2490 * reference stream. Other streams might have run ahead a little bit,
2491 * but extra pieces won't be released to the muxer beyond the reference
2492 * stream cut-off anyway - so it forms the limit. */
2493 queued_bytes = splitmux->fragment_total_bytes + gop->total_bytes;
2494 queued_time = next_gop_start_time;
2495 /* queued_gop_time tracks how much unwritten data there is waiting to
2496 * be written to this fragment including this GOP */
2497 if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2498 queued_gop_time = queued_time - splitmux->reference_ctx->out_running_time;
2500 queued_gop_time = queued_time - gop->start_time;
2502 GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2503 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2504 " bytes %" G_GUINT64_FORMAT " in next gop start time %" GST_STIME_FORMAT
2505 " gop start time %" GST_STIME_FORMAT,
2506 GST_STIME_ARGS (queued_time), queued_bytes,
2507 GST_STIME_ARGS (next_gop_start_time), GST_STIME_ARGS (gop->start_time));
2509 if (queued_gop_time < 0)
2510 goto error_gop_duration;
2512 if (queued_time < splitmux->fragment_start_time)
2513 goto error_queued_time;
2515 queued_time -= splitmux->fragment_start_time;
2516 if (queued_time < queued_gop_time)
2517 queued_gop_time = queued_time;
2519 /* Expand queued bytes estimate by muxer overhead */
2520 queued_bytes += (queued_bytes * splitmux->mux_overhead);
2522 /* Check for overrun - have we output at least one byte and overrun
2523 * either threshold? */
2524 if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2525 if (splitmux->async_finalize) {
2526 GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2527 *sink_running_time = splitmux->reference_ctx->out_running_time;
2528 g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2529 RUNNING_TIME, sink_running_time, g_free);
2531 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2532 /* Tell the output side to start a new fragment */
2533 GST_INFO_OBJECT (splitmux,
2534 "This GOP (dur %" GST_STIME_FORMAT
2535 ") would overflow the fragment, Sending start_new_fragment cmd",
2536 GST_STIME_ARGS (queued_gop_time));
2537 cmd = out_cmd_buf_new ();
2538 cmd->start_new_fragment = TRUE;
2539 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2540 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2542 splitmux->fragment_start_time = gop->start_time;
2543 splitmux->fragment_start_time_pts = gop->start_time_pts;
2544 splitmux->fragment_total_bytes = 0;
2545 splitmux->fragment_reference_bytes = 0;
2547 video_time_code_replace (&splitmux->fragment_start_tc, gop->start_tc);
2548 splitmux->next_fragment_start_tc_time =
2549 calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2550 splitmux->fragment_start_time, NULL);
2551 if (splitmux->tc_interval && splitmux->fragment_start_tc
2552 && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2553 GST_WARNING_OBJECT (splitmux,
2554 "Couldn't calculate next fragment start time for timecode mode");
2558 /* And set up to collect the next GOP */
2559 if (max_out_running_time != G_MAXINT64) {
2560 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2562 /* This is probably already the current state, but just in case: */
2563 splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2566 /* And wake all input contexts to send a wake-up event */
2567 g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2568 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2570 /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2571 splitmux->fragment_total_bytes += gop->total_bytes;
2572 splitmux->fragment_reference_bytes += gop->reference_bytes;
2574 if (gop->total_bytes > 0) {
2575 GST_LOG_OBJECT (splitmux,
2576 "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2577 " time %" GST_STIME_FORMAT,
2578 splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2580 /* Send this GOP to the output command queue */
2581 cmd = out_cmd_buf_new ();
2582 cmd->start_new_fragment = FALSE;
2583 cmd->max_output_ts = max_out_running_time;
2584 GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2585 GST_STIME_FORMAT, GST_STIME_ARGS (max_out_running_time));
2586 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2588 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2594 GST_ELEMENT_ERROR (splitmux,
2595 STREAM, FAILED, ("Timestamping error on input streams"),
2596 ("Queued GOP time is negative %" GST_STIME_FORMAT,
2597 GST_STIME_ARGS (queued_gop_time)));
2600 GST_ELEMENT_ERROR (splitmux,
2601 STREAM, FAILED, ("Timestamping error on input streams"),
2602 ("Queued time is negative. Input went backwards. queued_time - %"
2603 GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2607 /* Called with splitmux lock held */
2608 /* Called from each input pad when it is has all the pieces
2609 * for a GOP or EOS, starting with the reference pad which has set the
2610 * splitmux->max_in_running_time
2613 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2618 /* On ENDING_FILE, the reference stream sends a command to start a new
2619 * fragment, then releases the GOP for output in the new fragment.
2620 * If some streams received no buffer during the last GOP that overran,
2621 * because its next buffer has a timestamp bigger than
2622 * ctx->max_in_running_time, its queue is empty. In that case the only
2623 * way to wakeup the output thread is by injecting an event in the
2624 * queue. This usually happen with subtitle streams.
2625 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2626 if (ctx->need_unblock) {
2627 GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2628 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2629 GST_EVENT_TYPE_SERIALIZED,
2630 gst_structure_new ("splitmuxsink-unblock", "timestamp",
2631 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2633 GST_SPLITMUX_UNLOCK (splitmux);
2634 gst_pad_send_event (ctx->sinkpad, event);
2635 GST_SPLITMUX_LOCK (splitmux);
2637 ctx->need_unblock = FALSE;
2638 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2639 /* state may have changed while we were unlocked. Loop again if so */
2640 if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2645 GstClockTimeDiff next_gop_start = GST_CLOCK_STIME_NONE;
2647 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2648 GstClockTimeDiff max_out_running_time;
2649 gboolean ready = TRUE;
2651 const InputGop *next_gop;
2653 gop = g_queue_peek_head (&splitmux->pending_input_gops);
2654 next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2656 /* If we have no GOP or no next GOP here then the reference context is
2657 * at EOS, otherwise use the start time of the next GOP if we're far
2658 * enough in the GOP to know it */
2659 if (gop && next_gop) {
2660 if (!splitmux->reference_ctx->in_eos
2661 && splitmux->max_in_running_time_dts != GST_CLOCK_STIME_NONE
2662 && splitmux->max_in_running_time_dts < next_gop->start_time_pts) {
2663 GST_LOG_OBJECT (splitmux,
2664 "No further GOPs finished collecting, waiting until current DTS %"
2665 GST_STIME_FORMAT " has passed next GOP start PTS %"
2667 GST_STIME_ARGS (splitmux->max_in_running_time_dts),
2668 GST_STIME_ARGS (next_gop->start_time_pts));
2672 GST_LOG_OBJECT (splitmux,
2673 "Finished collecting GOP with start time %" GST_STIME_FORMAT
2674 ", next GOP start time %" GST_STIME_FORMAT,
2675 GST_STIME_ARGS (gop->start_time),
2676 GST_STIME_ARGS (next_gop->start_time));
2677 next_gop_start = next_gop->start_time;
2678 max_out_running_time =
2679 splitmux->reference_ctx->in_eos ? G_MAXINT64 : next_gop->start_time;
2680 } else if (!next_gop) {
2681 GST_LOG_OBJECT (splitmux, "Reference context is EOS");
2682 next_gop_start = splitmux->max_in_running_time;
2683 max_out_running_time = G_MAXINT64;
2685 GST_LOG_OBJECT (splitmux, "No further GOPs finished collecting");
2688 g_assert_not_reached ();
2691 g_assert (gop != NULL);
2693 /* Iterate each pad, and check that the input running time is at least
2694 * up to the start running time of the next GOP or EOS, and if so handle
2695 * the collected GOP */
2696 GST_LOG_OBJECT (splitmux, "Checking GOP collected, next GOP start %"
2697 GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (next_gop_start), ctx);
2698 for (cur = g_list_first (splitmux->contexts); cur != NULL;
2699 cur = g_list_next (cur)) {
2700 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2702 GST_LOG_OBJECT (splitmux,
2703 "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2704 " EOS %d", tmpctx, tmpctx->sinkpad,
2705 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2707 if (next_gop_start != GST_CLOCK_STIME_NONE &&
2708 tmpctx->in_running_time < next_gop_start && !tmpctx->in_eos) {
2709 GST_LOG_OBJECT (splitmux,
2710 "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2711 tmpctx, tmpctx->sinkpad);
2717 GST_DEBUG_OBJECT (splitmux,
2718 "Collected GOP is complete. Processing (ctx %p)", ctx);
2719 /* All pads have a complete GOP, release it into the multiqueue */
2720 handle_gathered_gop (splitmux, gop, next_gop_start,
2721 max_out_running_time);
2723 g_queue_pop_head (&splitmux->pending_input_gops);
2724 input_gop_free (gop);
2726 /* The user has requested a split, we can split now that the previous GOP
2727 * has been collected to the correct location */
2728 if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2730 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2735 /* If upstream reached EOS we are not expecting more data, no need to wait
2740 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2742 ctx->in_running_time >= next_gop_start &&
2743 next_gop_start != GST_CLOCK_STIME_NONE) {
2744 /* Some pad is not yet ready, or GOP is being pushed
2745 * either way, sleep and wait to get woken */
2746 GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2747 GST_SPLITMUX_WAIT_INPUT (splitmux);
2748 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2750 /* This pad is not ready or the state changed - break out and get another
2754 } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2757 static GstPadProbeReturn
2758 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2760 GstSplitMuxSink *splitmux = ctx->splitmux;
2761 GstFlowReturn ret = GST_FLOW_OK;
2763 MqStreamBuf *buf_info = NULL;
2764 GstClockTime ts, pts, dts;
2765 GstClockTimeDiff running_time, running_time_pts, running_time_dts;
2766 gboolean loop_again;
2767 gboolean keyframe = FALSE;
2769 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2771 /* FIXME: Handle buffer lists, until then make it clear they won't work */
2772 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2773 g_warning ("Buffer list handling not implemented");
2774 return GST_PAD_PROBE_DROP;
2776 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2777 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2778 GstEvent *event = gst_pad_probe_info_get_event (info);
2780 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2782 switch (GST_EVENT_TYPE (event)) {
2783 case GST_EVENT_SEGMENT:
2784 gst_event_copy_segment (event, &ctx->in_segment);
2786 case GST_EVENT_FLUSH_STOP:
2787 GST_SPLITMUX_LOCK (splitmux);
2788 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2789 ctx->in_eos = FALSE;
2790 ctx->in_running_time = GST_CLOCK_STIME_NONE;
2791 GST_SPLITMUX_UNLOCK (splitmux);
2794 GST_SPLITMUX_LOCK (splitmux);
2797 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2798 ret = GST_FLOW_FLUSHING;
2802 if (ctx->is_reference) {
2803 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2804 /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2805 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2806 /* Wake up other input pads to collect this GOP */
2807 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2808 if (g_queue_is_empty (&splitmux->pending_input_gops)) {
2809 GST_WARNING_OBJECT (splitmux,
2810 "EOS with no buffers received on the reference pad");
2812 /* - child muxer and sink might be still locked state
2813 * (see gst_splitmux_reset_elements()) so should be unlocked
2814 * for state change of splitmuxsink to be applied to child
2815 * - would need to post async done message
2816 * - location on sink element is still null then it will post
2817 * error message on bus (muxer will produce something, header
2820 * Calls start_next_fragment() here, the method will address
2821 * everything the above mentioned one */
2822 ret = start_next_fragment (splitmux, ctx);
2823 if (ret != GST_FLOW_OK)
2826 check_completed_gop (splitmux, ctx);
2828 } else if (splitmux->input_state ==
2829 SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2830 /* If we are waiting for a GOP to be completed (ie, for aux
2831 * pads to catch up), then this pad is complete, so check
2832 * if the whole GOP is.
2834 if (!g_queue_is_empty (&splitmux->pending_input_gops))
2835 check_completed_gop (splitmux, ctx);
2837 GST_SPLITMUX_UNLOCK (splitmux);
2839 case GST_EVENT_GAP:{
2840 GstClockTime gap_ts;
2841 GstClockTimeDiff rtime;
2843 gst_event_parse_gap (event, &gap_ts, NULL);
2844 if (gap_ts == GST_CLOCK_TIME_NONE)
2847 GST_SPLITMUX_LOCK (splitmux);
2849 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2850 ret = GST_FLOW_FLUSHING;
2853 rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2855 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2856 GST_STIME_ARGS (rtime));
2858 if (ctx->is_reference && GST_CLOCK_STIME_IS_VALID (rtime)) {
2859 /* If this GAP event happens before the first fragment then
2860 * initialize the fragment start time here. */
2861 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)) {
2862 splitmux->fragment_start_time = rtime;
2863 GST_LOG_OBJECT (splitmux,
2864 "Fragment start time now %" GST_STIME_FORMAT,
2865 GST_STIME_ARGS (splitmux->fragment_start_time));
2867 /* Also take this as the first start time when starting up,
2868 * so that we start counting overflow from the first frame */
2869 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2870 splitmux->max_in_running_time = rtime;
2871 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2872 splitmux->max_in_running_time_dts = rtime;
2875 /* Similarly take it as fragment start PTS and GOP start time if
2876 * these are not set */
2877 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time_pts))
2878 splitmux->fragment_start_time_pts = rtime;
2880 if (g_queue_is_empty (&splitmux->pending_input_gops)) {
2881 InputGop *gop = g_slice_new0 (InputGop);
2883 gop->from_gap = TRUE;
2884 gop->start_time = rtime;
2885 gop->start_time_pts = rtime;
2887 g_queue_push_tail (&splitmux->pending_input_gops, gop);
2891 GST_SPLITMUX_UNLOCK (splitmux);
2897 return GST_PAD_PROBE_PASS;
2898 } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2899 switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2900 case GST_QUERY_ALLOCATION:
2901 return GST_PAD_PROBE_DROP;
2903 return GST_PAD_PROBE_PASS;
2907 buf = gst_pad_probe_info_get_buffer (info);
2908 buf_info = mq_stream_buf_new ();
2910 pts = GST_BUFFER_PTS (buf);
2911 dts = GST_BUFFER_DTS (buf);
2912 if (GST_BUFFER_PTS_IS_VALID (buf))
2913 ts = GST_BUFFER_PTS (buf);
2915 ts = GST_BUFFER_DTS (buf);
2917 GST_LOG_OBJECT (pad,
2918 "Buffer TS is %" GST_TIME_FORMAT " (PTS %" GST_TIME_FORMAT ", DTS %"
2919 GST_TIME_FORMAT ")", GST_TIME_ARGS (ts), GST_TIME_ARGS (pts),
2920 GST_TIME_ARGS (dts));
2922 GST_SPLITMUX_LOCK (splitmux);
2924 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2925 ret = GST_FLOW_FLUSHING;
2929 /* If this buffer has a timestamp, advance the input timestamp of the
2931 if (GST_CLOCK_TIME_IS_VALID (ts)) {
2932 running_time = my_segment_to_running_time (&ctx->in_segment, ts);
2934 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2935 GST_STIME_ARGS (running_time));
2937 /* in running time is always the maximum PTS (or DTS) that was observed so far */
2938 if (GST_CLOCK_STIME_IS_VALID (running_time)
2939 && running_time > ctx->in_running_time)
2940 ctx->in_running_time = running_time;
2942 running_time = ctx->in_running_time;
2945 if (GST_CLOCK_TIME_IS_VALID (pts))
2946 running_time_pts = my_segment_to_running_time (&ctx->in_segment, pts);
2948 running_time_pts = GST_CLOCK_STIME_NONE;
2950 if (GST_CLOCK_TIME_IS_VALID (dts)) {
2951 running_time_dts = my_segment_to_running_time (&ctx->in_segment, dts);
2953 /* DTS > PTS makes conceptually no sense so catch such invalid DTS here
2954 * by clamping to the PTS */
2955 running_time_dts = MIN (running_time_pts, running_time_dts);
2957 /* If there is no DTS then assume PTS=DTS */
2958 running_time_dts = running_time_pts;
2961 /* Try to make sure we have a valid running time */
2962 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2963 ctx->in_running_time =
2964 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2967 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2968 GST_STIME_ARGS (ctx->in_running_time));
2970 buf_info->run_ts = ctx->in_running_time;
2971 buf_info->buf_size = gst_buffer_get_size (buf);
2972 buf_info->duration = GST_BUFFER_DURATION (buf);
2974 if (ctx->is_reference) {
2975 InputGop *gop = NULL;
2976 GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
2978 /* initialize fragment_start_time if it was not set yet (i.e. for the
2979 * first fragment), or otherwise set it to the minimum observed time */
2980 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)
2981 || splitmux->fragment_start_time > running_time) {
2982 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time))
2983 splitmux->fragment_start_time_pts = running_time_pts;
2984 splitmux->fragment_start_time = running_time;
2986 GST_LOG_OBJECT (splitmux,
2987 "Fragment start time now %" GST_STIME_FORMAT " (initial PTS %"
2988 GST_STIME_FORMAT ")", GST_STIME_ARGS (splitmux->fragment_start_time),
2989 GST_STIME_ARGS (splitmux->fragment_start_time_pts));
2991 /* Also take this as the first start time when starting up,
2992 * so that we start counting overflow from the first frame */
2993 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)
2994 || splitmux->max_in_running_time < splitmux->fragment_start_time)
2995 splitmux->max_in_running_time = splitmux->fragment_start_time;
2997 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2998 splitmux->max_in_running_time_dts = running_time_dts;
3001 video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
3003 splitmux->next_fragment_start_tc_time =
3004 calculate_next_max_timecode (splitmux, &tc_meta->tc,
3005 running_time, NULL);
3006 if (splitmux->tc_interval
3007 && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time))
3009 GST_WARNING_OBJECT (splitmux,
3010 "Couldn't calculate next fragment start time for timecode mode");
3012 #ifndef GST_DISABLE_GST_DEBUG
3016 tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3017 GST_DEBUG_OBJECT (splitmux,
3018 "Initialize fragment start timecode %s, next fragment start timecode time %"
3019 GST_TIME_FORMAT, tc_str,
3020 GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
3028 /* First check if we're at the very first GOP and the tracking was created
3029 * from a GAP event. In that case don't start a new GOP on keyframes but
3030 * just updated it as needed */
3031 gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3033 if (!gop || (!gop->from_gap
3034 && !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))) {
3035 gop = g_slice_new0 (InputGop);
3037 gop->start_time = running_time;
3038 gop->start_time_pts = running_time_pts;
3040 GST_LOG_OBJECT (splitmux,
3041 "Next GOP start time now %" GST_STIME_FORMAT " (initial PTS %"
3042 GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3043 GST_STIME_ARGS (gop->start_time_pts));
3046 video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3048 #ifndef GST_DISABLE_GST_DEBUG
3052 tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3053 GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode %s", tc_str);
3059 g_queue_push_tail (&splitmux->pending_input_gops, gop);
3061 gop->from_gap = FALSE;
3063 if (!GST_CLOCK_STIME_IS_VALID (gop->start_time)
3064 || gop->start_time > running_time) {
3065 gop->start_time = running_time;
3067 GST_LOG_OBJECT (splitmux,
3068 "GOP start time updated now %" GST_STIME_FORMAT " (initial PTS %"
3069 GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3070 GST_STIME_ARGS (gop->start_time_pts));
3073 video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3075 #ifndef GST_DISABLE_GST_DEBUG
3079 tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3080 GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode updated %s",
3089 /* Check whether we need to request next keyframe depending on
3090 * current running time */
3091 if (request_next_keyframe (splitmux, buf, running_time_dts) == FALSE) {
3092 GST_WARNING_OBJECT (splitmux,
3093 "Could not request a keyframe. Files may not split at the exact location they should");
3098 InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3101 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
3102 " total GOP bytes %" G_GUINT64_FORMAT ", total next GOP bytes %"
3103 G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts),
3104 gop->total_bytes, gop->total_bytes);
3110 if (ctx->flushing) {
3111 ret = GST_FLOW_FLUSHING;
3115 switch (splitmux->input_state) {
3116 case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
3117 if (ctx->is_reference) {
3118 const InputGop *gop, *next_gop;
3120 /* This is the reference context. If it's a keyframe,
3121 * it marks the start of a new GOP and we should wait in
3122 * check_completed_gop before continuing, but either way
3123 * (keyframe or no, we'll pass this buffer through after
3124 * so set loop_again to FALSE */
3127 gop = g_queue_peek_head (&splitmux->pending_input_gops);
3128 g_assert (gop != NULL);
3129 next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
3131 if (ctx->in_running_time > splitmux->max_in_running_time)
3132 splitmux->max_in_running_time = ctx->in_running_time;
3133 if (running_time_dts > splitmux->max_in_running_time_dts)
3134 splitmux->max_in_running_time_dts = running_time_dts;
3136 GST_LOG_OBJECT (splitmux,
3137 "Max in running time now %" GST_STIME_FORMAT ", DTS %"
3138 GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_in_running_time),
3139 GST_STIME_ARGS (splitmux->max_in_running_time_dts));
3142 GST_DEBUG_OBJECT (pad, "Waiting for end of GOP");
3143 /* Allow other input pads to catch up to here too */
3144 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3148 if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
3149 GST_INFO_OBJECT (pad,
3150 "Have keyframe with running time %" GST_STIME_FORMAT,
3151 GST_STIME_ARGS (ctx->in_running_time));
3155 if (running_time_dts != GST_CLOCK_STIME_NONE
3156 && running_time_dts < next_gop->start_time_pts) {
3157 GST_DEBUG_OBJECT (splitmux,
3158 "Waiting until DTS (%" GST_STIME_FORMAT
3159 ") has passed next GOP start PTS (%" GST_STIME_FORMAT ")",
3160 GST_STIME_ARGS (running_time_dts),
3161 GST_STIME_ARGS (next_gop->start_time_pts));
3162 /* Allow other input pads to catch up to here too */
3163 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3167 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
3168 /* Wake up other input pads to collect this GOP */
3169 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3170 check_completed_gop (splitmux, ctx);
3172 /* Pass this buffer if the reference ctx is far enough ahead */
3173 if (ctx->in_running_time < splitmux->max_in_running_time) {
3178 /* We're still waiting for a keyframe on the reference pad, sleep */
3179 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
3180 GST_SPLITMUX_WAIT_INPUT (splitmux);
3181 GST_LOG_OBJECT (pad,
3182 "Done sleeping for GOP start input state now %d",
3183 splitmux->input_state);
3186 case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
3187 /* We're collecting a GOP, this is only ever called for non-reference
3188 * contexts as the reference context would be waiting inside
3189 * check_completed_gop() */
3191 g_assert (!ctx->is_reference);
3193 /* If we overran the target timestamp, it might be time to process
3194 * the GOP, otherwise bail out for more data. */
3195 GST_LOG_OBJECT (pad,
3196 "Checking TS %" GST_STIME_FORMAT " against max %"
3197 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
3198 GST_STIME_ARGS (splitmux->max_in_running_time));
3200 if (ctx->in_running_time < splitmux->max_in_running_time) {
3205 GST_LOG_OBJECT (pad,
3206 "Collected last packet of GOP. Checking other pads");
3208 if (g_queue_is_empty (&splitmux->pending_input_gops)) {
3209 GST_WARNING_OBJECT (pad,
3210 "Reference was closed without GOP, dropping");
3214 check_completed_gop (splitmux, ctx);
3217 case SPLITMUX_INPUT_STATE_FINISHING_UP:
3227 if (keyframe && ctx->is_reference)
3228 splitmux->queued_keyframes++;
3229 buf_info->keyframe = keyframe;
3231 /* Update total input byte counter for overflow detect unless we're after
3233 if (splitmux->input_state != SPLITMUX_INPUT_STATE_FINISHING_UP
3234 && splitmux->input_state != SPLITMUX_INPUT_STATE_STOPPED) {
3235 InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3237 /* We must have a GOP at this point */
3238 g_assert (gop != NULL);
3240 gop->total_bytes += buf_info->buf_size;
3241 if (ctx->is_reference) {
3242 gop->reference_bytes += buf_info->buf_size;
3246 /* Now add this buffer to the queue just before returning */
3247 g_queue_push_head (&ctx->queued_bufs, buf_info);
3249 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
3250 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
3252 GST_SPLITMUX_UNLOCK (splitmux);
3253 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3254 return GST_PAD_PROBE_PASS;
3257 GST_SPLITMUX_UNLOCK (splitmux);
3259 mq_stream_buf_free (buf_info);
3260 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3261 return GST_PAD_PROBE_PASS;
3263 GST_SPLITMUX_UNLOCK (splitmux);
3265 mq_stream_buf_free (buf_info);
3266 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = GST_FLOW_EOS;
3267 return GST_PAD_PROBE_DROP;
3271 grow_blocked_queues (GstSplitMuxSink * splitmux)
3275 /* Scan other queues for full-ness and grow them */
3276 for (cur = g_list_first (splitmux->contexts);
3277 cur != NULL; cur = g_list_next (cur)) {
3278 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3280 guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
3282 g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
3283 GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
3285 if (cur_len >= cur_limit) {
3286 cur_limit = cur_len + 1;
3287 GST_DEBUG_OBJECT (tmpctx->q,
3288 "Queue overflowed and needs enlarging. Growing to %u buffers",
3290 g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
3296 handle_q_underrun (GstElement * q, gpointer user_data)
3298 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3299 GstSplitMuxSink *splitmux = ctx->splitmux;
3301 GST_SPLITMUX_LOCK (splitmux);
3302 GST_DEBUG_OBJECT (q,
3303 "Queue reported underrun with %d keyframes and %d cmds enqueued",
3304 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3305 grow_blocked_queues (splitmux);
3306 GST_SPLITMUX_UNLOCK (splitmux);
3310 handle_q_overrun (GstElement * q, gpointer user_data)
3312 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3313 GstSplitMuxSink *splitmux = ctx->splitmux;
3314 gboolean allow_grow = FALSE;
3316 GST_SPLITMUX_LOCK (splitmux);
3317 GST_DEBUG_OBJECT (q,
3318 "Queue reported overrun with %d keyframes and %d cmds enqueued",
3319 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3321 if (splitmux->queued_keyframes < 2) {
3322 /* Less than a full GOP queued, grow the queue */
3324 } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3327 /* If another queue is starved, grow */
3329 for (cur = g_list_first (splitmux->contexts);
3330 cur != NULL; cur = g_list_next (cur)) {
3331 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3332 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3337 GST_SPLITMUX_UNLOCK (splitmux);
3342 g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3345 GST_DEBUG_OBJECT (q,
3346 "Queue overflowed and needs enlarging. Growing to %u buffers",
3349 g_object_set (q, "max-size-buffers", cur_limit, NULL);
3353 /* Called with SPLITMUX lock held */
3354 static const gchar *
3355 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3357 const gchar *ret = NULL;
3359 if (splitmux->muxerpad_map == NULL)
3362 if (sinkpad_name == NULL) {
3363 GST_WARNING_OBJECT (splitmux,
3364 "Can't look up request pad in pad map without providing a pad name");
3368 ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3370 GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3372 return g_strdup (ret);
3379 gst_splitmux_sink_request_new_pad (GstElement * element,
3380 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3382 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3383 GstPadTemplate *mux_template = NULL;
3384 GstPad *ret = NULL, *muxpad = NULL;
3386 GstPad *q_sink = NULL, *q_src = NULL;
3387 gchar *gname, *qname;
3388 gboolean is_primary_video = FALSE, is_video = FALSE,
3389 muxer_is_requestpad = FALSE;
3391 const gchar *muxer_padname = NULL;
3393 GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3395 GST_SPLITMUX_LOCK (splitmux);
3396 if (!create_muxer (splitmux))
3398 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3400 if (g_str_equal (templ->name_template, "video") ||
3401 g_str_has_prefix (templ->name_template, "video_aux_")) {
3402 is_primary_video = g_str_equal (templ->name_template, "video");
3403 if (is_primary_video && splitmux->have_video)
3404 goto already_have_video;
3408 /* See if there's a pad map and it lists this pad */
3409 muxer_padname = lookup_muxer_pad (splitmux, name);
3411 if (muxer_padname == NULL) {
3413 /* FIXME: Look for a pad template with matching caps, rather than by name */
3414 GST_DEBUG_OBJECT (element,
3415 "searching for pad-template with name 'video_%%u'");
3417 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3418 (splitmux->muxer), "video_%u");
3420 /* Fallback to find sink pad templates named 'video' (flvmux) */
3421 if (!mux_template) {
3422 GST_DEBUG_OBJECT (element,
3423 "searching for pad-template with name 'video'");
3425 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3426 (splitmux->muxer), "video");
3430 GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3431 templ->name_template);
3433 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3434 (splitmux->muxer), templ->name_template);
3436 /* Fallback to find sink pad templates named 'audio' (flvmux) */
3437 if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3438 GST_DEBUG_OBJECT (element,
3439 "searching for pad-template with name 'audio'");
3441 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3442 (splitmux->muxer), "audio");
3447 if (mux_template == NULL) {
3448 GST_DEBUG_OBJECT (element,
3449 "searching for pad-template with name 'sink_%%d'");
3451 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3452 (splitmux->muxer), "sink_%d");
3455 if (mux_template == NULL) {
3456 GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3458 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3459 (splitmux->muxer), "sink");
3463 if (mux_template == NULL) {
3464 GST_ERROR_OBJECT (element,
3465 "unable to find a suitable sink pad-template on the muxer");
3468 GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3469 mux_template->name_template);
3471 if (mux_template->presence == GST_PAD_REQUEST) {
3472 GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3475 gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3476 muxer_is_requestpad = TRUE;
3477 } else if (mux_template->presence == GST_PAD_ALWAYS) {
3478 GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3481 gst_element_get_static_pad (splitmux->muxer,
3482 mux_template->name_template);
3484 GST_ERROR_OBJECT (element,
3485 "unexpected pad presence %d", mux_template->presence);
3489 /* Have a muxer pad name */
3490 if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3492 gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3493 muxer_is_requestpad = TRUE;
3495 g_free ((gchar *) muxer_padname);
3496 muxer_padname = NULL;
3499 /* One way or another, we must have a muxer pad by now */
3503 if (is_primary_video)
3504 gname = g_strdup ("video");
3505 else if (name == NULL)
3506 gname = gst_pad_get_name (muxpad);
3508 gname = g_strdup (name);
3510 qname = g_strdup_printf ("queue_%s", gname);
3511 if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3517 gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3519 g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3520 "max-size-buffers", 5, NULL);
3522 q_sink = gst_element_get_static_pad (q, "sink");
3523 q_src = gst_element_get_static_pad (q, "src");
3525 if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3526 if (muxer_is_requestpad)
3527 gst_element_release_request_pad (splitmux->muxer, muxpad);
3528 gst_object_unref (GST_OBJECT (muxpad));
3532 gst_object_unref (GST_OBJECT (muxpad));
3534 ctx = mq_stream_ctx_new (splitmux);
3535 /* Context holds a ref: */
3536 ctx->q = gst_object_ref (q);
3537 ctx->srcpad = q_src;
3538 ctx->sinkpad = q_sink;
3540 g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3541 g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3543 ctx->src_pad_block_id =
3544 gst_pad_add_probe (q_src,
3545 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3546 (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3547 if (is_primary_video && splitmux->reference_ctx != NULL) {
3548 splitmux->reference_ctx->is_reference = FALSE;
3549 splitmux->reference_ctx = NULL;
3551 if (splitmux->reference_ctx == NULL) {
3552 splitmux->reference_ctx = ctx;
3553 ctx->is_reference = TRUE;
3556 ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3557 g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3559 ctx->sink_pad_block_id =
3560 gst_pad_add_probe (q_sink,
3561 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3562 GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3563 (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3565 GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3566 " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3568 splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3572 if (is_primary_video)
3573 splitmux->have_video = TRUE;
3575 gst_pad_set_active (ret, TRUE);
3576 gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3578 GST_SPLITMUX_UNLOCK (splitmux);
3582 GST_SPLITMUX_UNLOCK (splitmux);
3585 gst_object_unref (q_sink);
3587 gst_object_unref (q_src);
3590 GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3591 GST_SPLITMUX_UNLOCK (splitmux);
3596 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3598 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3599 GstPad *muxpad = NULL;
3601 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3603 GST_SPLITMUX_LOCK (splitmux);
3605 if (splitmux->muxer == NULL)
3606 goto fail; /* Elements don't exist yet - nothing to release */
3608 GST_INFO_OBJECT (pad, "releasing request pad");
3610 muxpad = gst_pad_get_peer (ctx->srcpad);
3612 /* Remove the context from our consideration */
3613 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3615 ctx->flushing = TRUE;
3616 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3618 GST_SPLITMUX_UNLOCK (splitmux);
3620 if (ctx->sink_pad_block_id) {
3621 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3622 gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3625 if (ctx->src_pad_block_id)
3626 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3628 /* Wait for the pad to be free */
3629 GST_PAD_STREAM_LOCK (pad);
3630 GST_SPLITMUX_LOCK (splitmux);
3631 GST_PAD_STREAM_UNLOCK (pad);
3633 /* Can release the context now */
3634 mq_stream_ctx_free (ctx);
3635 if (ctx == splitmux->reference_ctx)
3636 splitmux->reference_ctx = NULL;
3638 /* Release and free the muxer input */
3640 gst_element_release_request_pad (splitmux->muxer, muxpad);
3641 gst_object_unref (muxpad);
3644 if (GST_PAD_PAD_TEMPLATE (pad) &&
3645 g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3647 splitmux->have_video = FALSE;
3649 gst_element_remove_pad (element, pad);
3651 /* Reset the internal elements only after all request pads are released */
3652 if (splitmux->contexts == NULL)
3653 gst_splitmux_reset_elements (splitmux);
3655 /* Wake up other input streams to check if the completion conditions have
3657 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3660 GST_SPLITMUX_UNLOCK (splitmux);
3664 create_element (GstSplitMuxSink * splitmux,
3665 const gchar * factory, const gchar * name, gboolean locked)
3667 GstElement *ret = gst_element_factory_make (factory, name);
3669 g_warning ("Failed to create %s - splitmuxsink will not work", name);
3674 /* Ensure the sink starts in locked state and NULL - it will be changed
3675 * by the filename setting code */
3676 gst_element_set_locked_state (ret, TRUE);
3677 gst_element_set_state (ret, GST_STATE_NULL);
3680 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3681 g_warning ("Could not add %s element - splitmuxsink will not work", name);
3682 gst_object_unref (ret);
3690 create_muxer (GstSplitMuxSink * splitmux)
3692 /* Create internal elements */
3693 if (splitmux->muxer == NULL) {
3694 GstElement *provided_muxer = NULL;
3696 GST_OBJECT_LOCK (splitmux);
3697 if (splitmux->provided_muxer != NULL)
3698 provided_muxer = gst_object_ref (splitmux->provided_muxer);
3699 GST_OBJECT_UNLOCK (splitmux);
3701 if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3702 (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3703 if ((splitmux->muxer =
3704 create_element (splitmux,
3705 splitmux->muxer_factory ? splitmux->
3706 muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3708 } else if (splitmux->async_finalize) {
3709 if ((splitmux->muxer =
3710 create_element (splitmux, splitmux->muxer_factory, "muxer",
3713 if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3714 gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3715 splitmux->muxer_preset);
3716 if (splitmux->muxer_properties)
3717 gst_structure_foreach (splitmux->muxer_properties,
3718 _set_property_from_structure, splitmux->muxer);
3720 /* Ensure it's not in locked state (we might be reusing an old element) */
3721 gst_element_set_locked_state (provided_muxer, FALSE);
3722 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3723 g_warning ("Could not add muxer element - splitmuxsink will not work");
3724 gst_object_unref (provided_muxer);
3728 splitmux->muxer = provided_muxer;
3729 gst_object_unref (provided_muxer);
3732 if (splitmux->use_robust_muxing) {
3733 update_muxer_properties (splitmux);
3743 find_sink (GstElement * e)
3745 GstElement *res = NULL;
3747 gboolean done = FALSE;
3748 GValue data = { 0, };
3750 if (!GST_IS_BIN (e))
3753 if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3756 iter = gst_bin_iterate_sinks (GST_BIN (e));
3758 switch (gst_iterator_next (iter, &data)) {
3759 case GST_ITERATOR_OK:
3761 GstElement *child = g_value_get_object (&data);
3762 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3763 "location") != NULL) {
3767 g_value_reset (&data);
3770 case GST_ITERATOR_RESYNC:
3771 gst_iterator_resync (iter);
3773 case GST_ITERATOR_DONE:
3776 case GST_ITERATOR_ERROR:
3777 g_assert_not_reached ();
3781 g_value_unset (&data);
3782 gst_iterator_free (iter);
3788 create_sink (GstSplitMuxSink * splitmux)
3790 GstElement *provided_sink = NULL;
3792 if (splitmux->active_sink == NULL) {
3794 GST_OBJECT_LOCK (splitmux);
3795 if (splitmux->provided_sink != NULL)
3796 provided_sink = gst_object_ref (splitmux->provided_sink);
3797 GST_OBJECT_UNLOCK (splitmux);
3799 if ((!splitmux->async_finalize && provided_sink == NULL) ||
3800 (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3801 if ((splitmux->sink =
3802 create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3804 splitmux->active_sink = splitmux->sink;
3805 } else if (splitmux->async_finalize) {
3806 if ((splitmux->sink =
3807 create_element (splitmux, splitmux->sink_factory, "sink",
3810 if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3811 gst_preset_load_preset (GST_PRESET (splitmux->sink),
3812 splitmux->sink_preset);
3813 if (splitmux->sink_properties)
3814 gst_structure_foreach (splitmux->sink_properties,
3815 _set_property_from_structure, splitmux->sink);
3816 splitmux->active_sink = splitmux->sink;
3818 /* Ensure the sink starts in locked state and NULL - it will be changed
3819 * by the filename setting code */
3820 gst_element_set_locked_state (provided_sink, TRUE);
3821 gst_element_set_state (provided_sink, GST_STATE_NULL);
3822 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3823 g_warning ("Could not add sink elements - splitmuxsink will not work");
3824 gst_object_unref (provided_sink);
3828 splitmux->active_sink = provided_sink;
3830 /* The bin holds a ref now, we can drop our tmp ref */
3831 gst_object_unref (provided_sink);
3833 /* Find the sink element */
3834 splitmux->sink = find_sink (splitmux->active_sink);
3835 if (splitmux->sink == NULL) {
3837 ("Could not locate sink element in provided sink - splitmuxsink will not work");
3843 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3845 /* async child elements are causing state change races and weird
3846 * failures, so let's try and turn that off */
3847 g_object_set (splitmux->sink, "async", FALSE, NULL);
3851 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3852 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3863 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3866 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3868 gchar *fname = NULL;
3872 gst_splitmux_sink_ensure_max_files (splitmux);
3874 if (ctx->cur_out_buffer == NULL) {
3875 GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3878 caps = gst_pad_get_current_caps (ctx->srcpad);
3879 sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3880 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3881 splitmux->fragment_id, sample, &fname);
3882 gst_sample_unref (sample);
3884 gst_caps_unref (caps);
3886 if (fname == NULL) {
3887 /* Fallback to the old signal if the new one returned nothing */
3888 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3889 splitmux->fragment_id, &fname);
3893 fname = splitmux->location ?
3894 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3897 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3898 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3899 "location") != NULL)
3900 g_object_set (splitmux->sink, "location", fname, NULL);
3904 splitmux->fragment_id++;
3907 /* called with GST_SPLITMUX_LOCK */
3909 do_async_start (GstSplitMuxSink * splitmux)
3911 GstMessage *message;
3913 if (!splitmux->need_async_start) {
3914 GST_INFO_OBJECT (splitmux, "no async_start needed");
3918 splitmux->async_pending = TRUE;
3920 GST_INFO_OBJECT (splitmux, "Sending async_start message");
3921 message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3923 GST_SPLITMUX_UNLOCK (splitmux);
3924 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3925 (splitmux), message);
3926 GST_SPLITMUX_LOCK (splitmux);
3929 /* called with GST_SPLITMUX_LOCK */
3931 do_async_done (GstSplitMuxSink * splitmux)
3933 GstMessage *message;
3935 if (splitmux->async_pending) {
3936 GST_INFO_OBJECT (splitmux, "Sending async_done message");
3937 splitmux->async_pending = FALSE;
3938 GST_SPLITMUX_UNLOCK (splitmux);
3941 gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3942 GST_CLOCK_TIME_NONE);
3943 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3944 (splitmux), message);
3945 GST_SPLITMUX_LOCK (splitmux);
3948 splitmux->need_async_start = FALSE;
3952 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3954 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3955 splitmux->max_in_running_time_dts = GST_CLOCK_STIME_NONE;
3957 splitmux->fragment_start_time = GST_CLOCK_STIME_NONE;
3958 splitmux->fragment_start_time_pts = GST_CLOCK_STIME_NONE;
3959 g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
3961 g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
3962 g_queue_clear (&splitmux->pending_input_gops);
3964 splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
3965 splitmux->fragment_total_bytes = 0;
3966 splitmux->fragment_reference_bytes = 0;
3967 splitmux->muxed_out_bytes = 0;
3968 splitmux->ready_for_output = FALSE;
3970 g_atomic_int_set (&(splitmux->split_requested), FALSE);
3971 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3973 splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
3974 gst_queue_array_clear (splitmux->times_to_split);
3976 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
3977 splitmux->queued_keyframes = 0;
3979 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
3980 g_queue_clear (&splitmux->out_cmd_q);
3983 static GstStateChangeReturn
3984 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3986 GstStateChangeReturn ret;
3987 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3989 switch (transition) {
3990 case GST_STATE_CHANGE_NULL_TO_READY:{
3991 GST_SPLITMUX_LOCK (splitmux);
3992 if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3993 ret = GST_STATE_CHANGE_FAILURE;
3994 GST_SPLITMUX_UNLOCK (splitmux);
3997 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3998 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3999 GST_SPLITMUX_UNLOCK (splitmux);
4000 splitmux->fragment_id = splitmux->start_index;
4003 case GST_STATE_CHANGE_READY_TO_PAUSED:{
4004 GST_SPLITMUX_LOCK (splitmux);
4005 /* Make sure contexts and tracking times are cleared, in case we're being reused */
4006 gst_splitmux_sink_reset (splitmux);
4007 /* Start by collecting one input on each pad */
4008 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
4009 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
4011 GST_SPLITMUX_UNLOCK (splitmux);
4013 GST_SPLITMUX_STATE_LOCK (splitmux);
4014 splitmux->shutdown = FALSE;
4015 GST_SPLITMUX_STATE_UNLOCK (splitmux);
4018 case GST_STATE_CHANGE_PAUSED_TO_READY:
4019 case GST_STATE_CHANGE_READY_TO_READY:
4020 g_atomic_int_set (&(splitmux->split_requested), FALSE);
4021 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
4023 case GST_STATE_CHANGE_READY_TO_NULL:
4024 GST_SPLITMUX_STATE_LOCK (splitmux);
4025 splitmux->shutdown = TRUE;
4026 GST_SPLITMUX_STATE_UNLOCK (splitmux);
4028 GST_SPLITMUX_LOCK (splitmux);
4029 gst_splitmux_sink_reset (splitmux);
4030 splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
4031 splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
4032 /* Wake up any blocked threads */
4033 GST_LOG_OBJECT (splitmux,
4034 "State change -> NULL or READY. Waking threads");
4035 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
4036 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
4037 GST_SPLITMUX_UNLOCK (splitmux);
4043 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4044 if (ret == GST_STATE_CHANGE_FAILURE)
4047 switch (transition) {
4048 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4049 splitmux->need_async_start = TRUE;
4051 case GST_STATE_CHANGE_READY_TO_PAUSED:{
4052 /* Change state async, because our child sink might not
4053 * be ready to do that for us yet if it's state is still locked */
4055 splitmux->need_async_start = TRUE;
4056 /* we want to go async to PAUSED until we managed to configure and add the
4058 GST_SPLITMUX_LOCK (splitmux);
4059 do_async_start (splitmux);
4060 GST_SPLITMUX_UNLOCK (splitmux);
4061 ret = GST_STATE_CHANGE_ASYNC;
4064 case GST_STATE_CHANGE_READY_TO_NULL:
4065 GST_SPLITMUX_LOCK (splitmux);
4066 splitmux->fragment_id = 0;
4067 /* Reset internal elements only if no pad contexts are using them */
4068 if (splitmux->contexts == NULL)
4069 gst_splitmux_reset_elements (splitmux);
4070 do_async_done (splitmux);
4071 GST_SPLITMUX_UNLOCK (splitmux);
4080 if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
4081 /* Cleanup elements on failed transition out of NULL */
4082 gst_splitmux_reset_elements (splitmux);
4083 GST_SPLITMUX_LOCK (splitmux);
4084 do_async_done (splitmux);
4085 GST_SPLITMUX_UNLOCK (splitmux);
4087 if (transition == GST_STATE_CHANGE_READY_TO_READY) {
4088 /* READY to READY transition only happens when we're already
4089 * in READY state, but a child element is in NULL, which
4090 * happens when there's an error changing the state of the sink.
4091 * We need to make sure not to fail the state transition, or
4092 * the core won't transition us back to NULL successfully */
4093 ret = GST_STATE_CHANGE_SUCCESS;
4099 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
4101 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
4102 splitmux->fragment_id = 0;
4107 split_now (GstSplitMuxSink * splitmux)
4109 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
4113 split_after (GstSplitMuxSink * splitmux)
4115 g_atomic_int_set (&(splitmux->split_requested), TRUE);
4119 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
4121 gboolean send_keyframe_requests;
4123 GST_SPLITMUX_LOCK (splitmux);
4124 gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
4125 send_keyframe_requests = splitmux->send_keyframe_requests;
4126 GST_SPLITMUX_UNLOCK (splitmux);
4128 if (send_keyframe_requests) {
4130 gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
4131 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
4132 GST_TIME_ARGS (split_time));
4133 if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
4134 GST_WARNING_OBJECT (splitmux,
4135 "Could not request keyframe at %" GST_TIME_FORMAT,
4136 GST_TIME_ARGS (split_time));