1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @title: splitmuxsink
23 * @short_description: Muxer wrapper for splitting output stream by size or time
25 * This element wraps a muxer and a sink, and starts a new file when the mux
26 * contents are about to cross a threshold of maximum size of maximum time,
27 * splitting at video keyframe boundaries. Exactly one input video stream
28 * can be muxed, with as many accompanying audio and subtitle streams as
31 * By default, it uses mp4mux and filesink, but they can be changed via
32 * the 'muxer' and 'sink' properties.
34 * The minimum file size is 1 GOP, however - so limits may be overrun if the
35 * distance between any 2 keyframes is larger than the limits.
37 * If a video stream is available, the splitting process is driven by the video
38 * stream contents, and the video stream must contain closed GOPs for the output
39 * file parts to be played individually correctly. In the absence of a video
40 * stream, the first available stream is used as reference for synchronization.
42 * In the async-finalize mode, when the threshold is crossed, the old muxer
43 * and sink is disconnected from the pipeline and left to finish the file
44 * asynchronously, and a new muxer and sink is created to continue with the
45 * next fragment. For that reason, instead of muxer and sink objects, the
46 * muxer-factory and sink-factory properties are used to construct the new
47 * objects, together with muxer-properties and sink-properties.
49 * ## Example pipelines
51 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
53 * Records a video stream captured from a v4l2 device and muxes it into
54 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55 * and 1MB maximum size.
58 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
60 * Records a video stream captured from a v4l2 device and muxer it into
61 * streamable Matroska files, splitting as needed to limit size/duration to 10
62 * seconds. Each file will finalize asynchronously.
65 * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
67 * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97 GstClockTime split_time);
106 PROP_MAX_SIZE_TIMECODE,
107 PROP_SEND_KEYFRAME_REQUESTS,
110 PROP_USE_ROBUST_MUXING,
111 PROP_ALIGNMENT_THRESHOLD,
118 PROP_MUXER_PROPERTIES,
121 PROP_SINK_PROPERTIES,
125 #define DEFAULT_MAX_SIZE_TIME 0
126 #define DEFAULT_MAX_SIZE_BYTES 0
127 #define DEFAULT_MAX_FILES 0
128 #define DEFAULT_MUXER_OVERHEAD 0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
138 typedef struct _AsyncEosHelper
146 SIGNAL_FORMAT_LOCATION,
147 SIGNAL_FORMAT_LOCATION_FULL,
150 SIGNAL_SPLIT_AT_RUNNING_TIME,
156 static guint signals[SIGNAL_LAST];
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
162 GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
167 GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
172 GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
177 GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
182 GST_STATIC_CAPS_ANY);
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188 * to forward an incoming EOS message, but we cannot rely on the state of the
189 * splitmux anymore, so we set this qdata on the sink instead.
190 * The muxer and sink must be destroyed after both of these things have
192 * 1) The EOS message has been sent when the fragment is ending
193 * 2) The muxer has been unlinked and relinked
194 * Therefore, EOS_FROM_US can have these two values:
195 * 0: EOS was not requested from us. Forward the message. The muxer and the
196 * sink will be destroyed together with the rest of the bin.
197 * 1: EOS was requested from us, but the other of the two tasks hasn't
198 * finished. Set EOS_FROM_US to 2 and do your stuff.
199 * 2: EOS was requested from us and the other of the two tasks has finished.
200 * Now we can destroy the muxer and the sink.
206 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207 EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208 RUNNING_TIME = g_quark_from_static_string ("running-time");
209 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210 "Split File Muxing Sink");
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217 GST_TYPE_SPLITMUX_SINK);
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222 const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224 GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233 element, GstStateChange transition);
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244 const gchar * factory, const gchar * name, gboolean locked);
246 static void do_async_done (GstSplitMuxSink * splitmux);
248 static GstClockTime calculate_next_max_timecode (GstSplitMuxSink * splitmux,
249 const GstVideoTimeCode * cur_tc, GstClockTime running_time,
250 GstVideoTimeCode ** next_tc);
253 mq_stream_buf_new (void)
255 return g_slice_new0 (MqStreamBuf);
259 mq_stream_buf_free (MqStreamBuf * data)
261 g_slice_free (MqStreamBuf, data);
264 static SplitMuxOutputCommand *
265 out_cmd_buf_new (void)
267 return g_slice_new0 (SplitMuxOutputCommand);
271 out_cmd_buf_free (SplitMuxOutputCommand * data)
273 g_slice_free (SplitMuxOutputCommand, data);
277 input_gop_free (InputGop * gop)
279 g_clear_pointer (&gop->start_tc, gst_video_time_code_free);
280 g_slice_free (InputGop, gop);
284 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
286 GObjectClass *gobject_class = (GObjectClass *) klass;
287 GstElementClass *gstelement_class = (GstElementClass *) klass;
288 GstBinClass *gstbin_class = (GstBinClass *) klass;
290 gobject_class->set_property = gst_splitmux_sink_set_property;
291 gobject_class->get_property = gst_splitmux_sink_get_property;
292 gobject_class->dispose = gst_splitmux_sink_dispose;
293 gobject_class->finalize = gst_splitmux_sink_finalize;
295 gst_element_class_set_static_metadata (gstelement_class,
296 "Split Muxing Bin", "Generic/Bin/Muxer",
297 "Convenience bin that muxes incoming streams into multiple time/size limited files",
298 "Jan Schmidt <jan@centricular.com>");
300 gst_element_class_add_static_pad_template (gstelement_class,
301 &video_sink_template);
302 gst_element_class_add_static_pad_template (gstelement_class,
303 &video_aux_sink_template);
304 gst_element_class_add_static_pad_template (gstelement_class,
305 &audio_sink_template);
306 gst_element_class_add_static_pad_template (gstelement_class,
307 &subtitle_sink_template);
308 gst_element_class_add_static_pad_template (gstelement_class,
309 &caption_sink_template);
311 gstelement_class->change_state =
312 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
313 gstelement_class->request_new_pad =
314 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
315 gstelement_class->release_pad =
316 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
318 gstbin_class->handle_message = bus_handler;
320 g_object_class_install_property (gobject_class, PROP_LOCATION,
321 g_param_spec_string ("location", "File Output Pattern",
322 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
323 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
324 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
325 g_param_spec_double ("mux-overhead", "Muxing Overhead",
326 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
327 DEFAULT_MUXER_OVERHEAD,
328 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
330 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
331 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
332 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
333 DEFAULT_MAX_SIZE_TIME,
334 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
335 G_PARAM_STATIC_STRINGS));
336 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
337 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
338 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
339 DEFAULT_MAX_SIZE_BYTES,
340 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
341 G_PARAM_STATIC_STRINGS));
342 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
343 g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
344 "Maximum difference in timecode between first and last frame. "
345 "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
346 "Will only be effective if a timecode track is present.", NULL,
347 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
348 G_PARAM_STATIC_STRINGS));
349 g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
350 g_param_spec_boolean ("send-keyframe-requests",
351 "Request keyframes at max-size-time",
352 "Request a keyframe every max-size-time ns to try splitting at that point. "
353 "Needs max-size-bytes to be 0 in order to be effective.",
354 DEFAULT_SEND_KEYFRAME_REQUESTS,
355 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
356 G_PARAM_STATIC_STRINGS));
357 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
358 g_param_spec_uint ("max-files", "Max files",
359 "Maximum number of files to keep on disk. Once the maximum is reached,"
360 "old files start to be deleted to make room for new ones.", 0,
361 G_MAXUINT, DEFAULT_MAX_FILES,
362 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363 g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
364 g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
365 "Allow non-reference streams to be that many ns before the reference"
366 " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
367 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
368 G_PARAM_STATIC_STRINGS));
370 g_object_class_install_property (gobject_class, PROP_MUXER,
371 g_param_spec_object ("muxer", "Muxer",
372 "The muxer element to use (NULL = default mp4mux). "
373 "Valid only for async-finalize = FALSE",
374 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375 g_object_class_install_property (gobject_class, PROP_SINK,
376 g_param_spec_object ("sink", "Sink",
377 "The sink element (or element chain) to use (NULL = default filesink). "
378 "Valid only for async-finalize = FALSE",
379 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
381 g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
382 g_param_spec_boolean ("use-robust-muxing",
383 "Support robust-muxing mode of some muxers",
384 "Check if muxers support robust muxing via the reserved-max-duration and "
385 "reserved-duration-remaining properties and use them if so. "
386 "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
387 " create new fragments if the reserved header space is about to overflow. "
388 "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
389 "manually by the app to a non-zero value for robust muxing to have an effect.",
390 DEFAULT_USE_ROBUST_MUXING,
391 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
393 g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
394 g_param_spec_boolean ("reset-muxer",
396 "Reset the muxer after each segment. Disabling this will not work for most muxers.",
397 DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
399 g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
400 g_param_spec_boolean ("async-finalize",
401 "Finalize fragments asynchronously",
402 "Finalize each fragment asynchronously and start a new one",
403 DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
404 g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
405 g_param_spec_string ("muxer-factory", "Muxer factory",
406 "The muxer element factory to use (default = mp4mux). "
407 "Valid only for async-finalize = TRUE",
408 "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
410 * GstSplitMuxSink:muxer-preset
412 * An optional #GstPreset name to use for the muxer. This only has an effect
413 * in `async-finalize=TRUE` mode.
417 g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
418 g_param_spec_string ("muxer-preset", "Muxer preset",
419 "The muxer preset to use. "
420 "Valid only for async-finalize = TRUE",
421 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
422 g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
423 g_param_spec_boxed ("muxer-properties", "Muxer properties",
424 "The muxer element properties to use. "
425 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
426 "Valid only for async-finalize = TRUE",
427 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
428 g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
429 g_param_spec_string ("sink-factory", "Sink factory",
430 "The sink element factory to use (default = filesink). "
431 "Valid only for async-finalize = TRUE",
432 "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
434 * GstSplitMuxSink:sink-preset
436 * An optional #GstPreset name to use for the sink. This only has an effect
437 * in `async-finalize=TRUE` mode.
441 g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
442 g_param_spec_string ("sink-preset", "Sink preset",
443 "The sink preset to use. "
444 "Valid only for async-finalize = TRUE",
445 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
446 g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
447 g_param_spec_boxed ("sink-properties", "Sink properties",
448 "The sink element properties to use. "
449 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
450 "Valid only for async-finalize = TRUE",
451 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
452 g_object_class_install_property (gobject_class, PROP_START_INDEX,
453 g_param_spec_int ("start-index", "Start Index",
454 "Start value of fragment index.",
455 0, G_MAXINT, DEFAULT_START_INDEX,
456 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
459 * GstSplitMuxSink::muxer-pad-map
461 * An optional GstStructure that provides a map from splitmuxsink sinkpad
462 * names to muxer pad names they should feed. Splitmuxsink has some default
463 * mapping behaviour to link video to video pads and audio to audio pads
464 * that usually works fine. This property is useful if you need to ensure
465 * a particular mapping to muxed streams.
467 * The GstStructure contains string fields like so:
468 * splitmuxsink muxer-pad-map=x-pad-map,video=video_1
472 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
473 g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
474 "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
476 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
479 * GstSplitMuxSink::format-location:
480 * @splitmux: the #GstSplitMuxSink
481 * @fragment_id: the sequence number of the file to be created
483 * Returns: the location to be used for the next output file. This must be
484 * a newly-allocated string which will be freed with g_free() by the
485 * splitmuxsink element when it no longer needs it, so use g_strdup() or
486 * g_strdup_printf() or similar functions to allocate it.
488 signals[SIGNAL_FORMAT_LOCATION] =
489 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
490 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
493 * GstSplitMuxSink::format-location-full:
494 * @splitmux: the #GstSplitMuxSink
495 * @fragment_id: the sequence number of the file to be created
496 * @first_sample: A #GstSample containing the first buffer
497 * from the reference stream in the new file
499 * Returns: the location to be used for the next output file. This must be
500 * a newly-allocated string which will be freed with g_free() by the
501 * splitmuxsink element when it no longer needs it, so use g_strdup() or
502 * g_strdup_printf() or similar functions to allocate it.
506 signals[SIGNAL_FORMAT_LOCATION_FULL] =
507 g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
508 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
512 * GstSplitMuxSink::split-now:
513 * @splitmux: the #GstSplitMuxSink
515 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
516 * The current GOP will be output to the new file.
520 signals[SIGNAL_SPLIT_NOW] =
521 g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
522 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
523 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
527 * GstSplitMuxSink::split-after:
528 * @splitmux: the #GstSplitMuxSink
530 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
531 * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
535 signals[SIGNAL_SPLIT_AFTER] =
536 g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
537 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
538 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
542 * GstSplitMuxSink::split-at-running-time:
543 * @splitmux: the #GstSplitMuxSink
545 * When called by the user, this action signal splits the video file (and
546 * begins a new one) as soon as the given running time is reached. If this
547 * action signal is called multiple times, running times are queued up and
548 * processed in the order they were given.
550 * Note that this is prone to race conditions, where said running time is
551 * reached and surpassed before we had a chance to split. The file will
552 * still split immediately, but in order to make sure that the split doesn't
553 * happen too late, it is recommended to call this action signal from
554 * something that will prevent further buffers from flowing into
555 * splitmuxsink before the split is completed, such as a pad probe before
561 signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
562 g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
563 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
564 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
565 NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
568 * GstSplitMuxSink::muxer-added:
569 * @splitmux: the #GstSplitMuxSink
570 * @muxer: the newly added muxer element
574 signals[SIGNAL_MUXER_ADDED] =
575 g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
576 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
579 * GstSplitMuxSink::sink-added:
580 * @splitmux: the #GstSplitMuxSink
581 * @sink: the newly added sink element
585 signals[SIGNAL_SINK_ADDED] =
586 g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
587 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
589 klass->split_now = split_now;
590 klass->split_after = split_after;
591 klass->split_at_running_time = split_at_running_time;
595 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
597 g_mutex_init (&splitmux->lock);
598 g_mutex_init (&splitmux->state_lock);
599 g_cond_init (&splitmux->input_cond);
600 g_cond_init (&splitmux->output_cond);
601 g_queue_init (&splitmux->out_cmd_q);
603 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
604 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
605 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
606 splitmux->max_files = DEFAULT_MAX_FILES;
607 splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
608 splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
609 splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
610 splitmux->reset_muxer = DEFAULT_RESET_MUXER;
612 splitmux->threshold_timecode_str = NULL;
614 splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
615 splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
616 splitmux->muxer_properties = NULL;
617 splitmux->sink_factory = g_strdup (DEFAULT_SINK);
618 splitmux->sink_properties = NULL;
620 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
621 splitmux->split_requested = FALSE;
622 splitmux->do_split_next_gop = FALSE;
623 splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
624 splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
626 g_queue_init (&splitmux->pending_input_gops);
630 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
632 if (splitmux->muxer) {
633 gst_element_set_locked_state (splitmux->muxer, TRUE);
634 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
635 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
637 if (splitmux->active_sink) {
638 gst_element_set_locked_state (splitmux->active_sink, TRUE);
639 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
640 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
643 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
647 gst_splitmux_sink_dispose (GObject * object)
649 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
651 /* Calling parent dispose invalidates all child pointers */
652 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
654 G_OBJECT_CLASS (parent_class)->dispose (object);
658 gst_splitmux_sink_finalize (GObject * object)
660 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
662 g_cond_clear (&splitmux->input_cond);
663 g_cond_clear (&splitmux->output_cond);
664 g_mutex_clear (&splitmux->lock);
665 g_mutex_clear (&splitmux->state_lock);
666 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
667 g_queue_clear (&splitmux->out_cmd_q);
668 g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
669 g_queue_clear (&splitmux->pending_input_gops);
671 g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
673 if (splitmux->muxerpad_map)
674 gst_structure_free (splitmux->muxerpad_map);
676 if (splitmux->provided_sink)
677 gst_object_unref (splitmux->provided_sink);
678 if (splitmux->provided_muxer)
679 gst_object_unref (splitmux->provided_muxer);
681 if (splitmux->muxer_factory)
682 g_free (splitmux->muxer_factory);
683 if (splitmux->muxer_preset)
684 g_free (splitmux->muxer_preset);
685 if (splitmux->muxer_properties)
686 gst_structure_free (splitmux->muxer_properties);
687 if (splitmux->sink_factory)
688 g_free (splitmux->sink_factory);
689 if (splitmux->sink_preset)
690 g_free (splitmux->sink_preset);
691 if (splitmux->sink_properties)
692 gst_structure_free (splitmux->sink_properties);
694 if (splitmux->threshold_timecode_str)
695 g_free (splitmux->threshold_timecode_str);
696 if (splitmux->tc_interval)
697 gst_video_time_code_interval_free (splitmux->tc_interval);
699 if (splitmux->times_to_split)
700 gst_queue_array_free (splitmux->times_to_split);
702 g_free (splitmux->location);
704 /* Make sure to free any un-released contexts. There should not be any,
705 * because the dispose will have freed all request pads though */
706 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
707 g_list_free (splitmux->contexts);
709 G_OBJECT_CLASS (parent_class)->finalize (object);
713 * Set any time threshold to the muxer, if it has
714 * reserved-max-duration and reserved-duration-remaining
715 * properties. Called when creating/claiming the muxer
716 * in create_elements() */
718 update_muxer_properties (GstSplitMuxSink * sink)
721 GstClockTime threshold_time;
723 sink->muxer_has_reserved_props = FALSE;
724 if (sink->muxer == NULL)
726 klass = G_OBJECT_GET_CLASS (sink->muxer);
727 if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
729 if (g_object_class_find_property (klass,
730 "reserved-duration-remaining") == NULL)
732 sink->muxer_has_reserved_props = TRUE;
734 GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
735 GST_TIME_ARGS (sink->threshold_time));
736 GST_OBJECT_LOCK (sink);
737 threshold_time = sink->threshold_time;
738 GST_OBJECT_UNLOCK (sink);
740 if (threshold_time > 0) {
741 /* Tell the muxer how much space to reserve */
742 GstClockTime muxer_threshold = threshold_time;
743 g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
748 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
749 const GValue * value, GParamSpec * pspec)
751 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
755 GST_OBJECT_LOCK (splitmux);
756 g_free (splitmux->location);
757 splitmux->location = g_value_dup_string (value);
758 GST_OBJECT_UNLOCK (splitmux);
761 case PROP_START_INDEX:
762 GST_OBJECT_LOCK (splitmux);
763 splitmux->start_index = g_value_get_int (value);
764 GST_OBJECT_UNLOCK (splitmux);
766 case PROP_MAX_SIZE_BYTES:
767 GST_OBJECT_LOCK (splitmux);
768 splitmux->threshold_bytes = g_value_get_uint64 (value);
769 GST_OBJECT_UNLOCK (splitmux);
771 case PROP_MAX_SIZE_TIME:
772 GST_OBJECT_LOCK (splitmux);
773 splitmux->threshold_time = g_value_get_uint64 (value);
774 GST_OBJECT_UNLOCK (splitmux);
776 case PROP_MAX_SIZE_TIMECODE:
777 GST_OBJECT_LOCK (splitmux);
778 g_free (splitmux->threshold_timecode_str);
779 /* will be calculated later */
780 g_clear_pointer (&splitmux->tc_interval,
781 gst_video_time_code_interval_free);
783 splitmux->threshold_timecode_str = g_value_dup_string (value);
784 if (splitmux->threshold_timecode_str) {
785 splitmux->tc_interval =
786 gst_video_time_code_interval_new_from_string
787 (splitmux->threshold_timecode_str);
788 if (!splitmux->tc_interval) {
789 g_warning ("Wrong timecode string %s",
790 splitmux->threshold_timecode_str);
791 g_free (splitmux->threshold_timecode_str);
792 splitmux->threshold_timecode_str = NULL;
795 splitmux->next_fragment_start_tc_time =
796 calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
797 splitmux->fragment_start_time, NULL);
798 if (splitmux->tc_interval && splitmux->fragment_start_tc
799 && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
800 GST_WARNING_OBJECT (splitmux,
801 "Couldn't calculate next fragment start time for timecode mode");
803 GST_OBJECT_UNLOCK (splitmux);
805 case PROP_SEND_KEYFRAME_REQUESTS:
806 GST_OBJECT_LOCK (splitmux);
807 splitmux->send_keyframe_requests = g_value_get_boolean (value);
808 GST_OBJECT_UNLOCK (splitmux);
811 GST_OBJECT_LOCK (splitmux);
812 splitmux->max_files = g_value_get_uint (value);
813 GST_OBJECT_UNLOCK (splitmux);
815 case PROP_MUXER_OVERHEAD:
816 GST_OBJECT_LOCK (splitmux);
817 splitmux->mux_overhead = g_value_get_double (value);
818 GST_OBJECT_UNLOCK (splitmux);
820 case PROP_USE_ROBUST_MUXING:
821 GST_OBJECT_LOCK (splitmux);
822 splitmux->use_robust_muxing = g_value_get_boolean (value);
823 GST_OBJECT_UNLOCK (splitmux);
824 if (splitmux->use_robust_muxing)
825 update_muxer_properties (splitmux);
827 case PROP_ALIGNMENT_THRESHOLD:
828 GST_OBJECT_LOCK (splitmux);
829 splitmux->alignment_threshold = g_value_get_uint64 (value);
830 GST_OBJECT_UNLOCK (splitmux);
833 GST_OBJECT_LOCK (splitmux);
834 gst_clear_object (&splitmux->provided_sink);
835 splitmux->provided_sink = g_value_get_object (value);
836 if (splitmux->provided_sink)
837 gst_object_ref_sink (splitmux->provided_sink);
838 GST_OBJECT_UNLOCK (splitmux);
841 GST_OBJECT_LOCK (splitmux);
842 gst_clear_object (&splitmux->provided_muxer);
843 splitmux->provided_muxer = g_value_get_object (value);
844 if (splitmux->provided_muxer)
845 gst_object_ref_sink (splitmux->provided_muxer);
846 GST_OBJECT_UNLOCK (splitmux);
848 case PROP_RESET_MUXER:
849 GST_OBJECT_LOCK (splitmux);
850 splitmux->reset_muxer = g_value_get_boolean (value);
851 GST_OBJECT_UNLOCK (splitmux);
853 case PROP_ASYNC_FINALIZE:
854 GST_OBJECT_LOCK (splitmux);
855 splitmux->async_finalize = g_value_get_boolean (value);
856 GST_OBJECT_UNLOCK (splitmux);
858 case PROP_MUXER_FACTORY:
859 GST_OBJECT_LOCK (splitmux);
860 if (splitmux->muxer_factory)
861 g_free (splitmux->muxer_factory);
862 splitmux->muxer_factory = g_value_dup_string (value);
863 GST_OBJECT_UNLOCK (splitmux);
865 case PROP_MUXER_PRESET:
866 GST_OBJECT_LOCK (splitmux);
867 if (splitmux->muxer_preset)
868 g_free (splitmux->muxer_preset);
869 splitmux->muxer_preset = g_value_dup_string (value);
870 GST_OBJECT_UNLOCK (splitmux);
872 case PROP_MUXER_PROPERTIES:
873 GST_OBJECT_LOCK (splitmux);
874 if (splitmux->muxer_properties)
875 gst_structure_free (splitmux->muxer_properties);
876 if (gst_value_get_structure (value))
877 splitmux->muxer_properties =
878 gst_structure_copy (gst_value_get_structure (value));
880 splitmux->muxer_properties = NULL;
881 GST_OBJECT_UNLOCK (splitmux);
883 case PROP_SINK_FACTORY:
884 GST_OBJECT_LOCK (splitmux);
885 if (splitmux->sink_factory)
886 g_free (splitmux->sink_factory);
887 splitmux->sink_factory = g_value_dup_string (value);
888 GST_OBJECT_UNLOCK (splitmux);
890 case PROP_SINK_PRESET:
891 GST_OBJECT_LOCK (splitmux);
892 if (splitmux->sink_preset)
893 g_free (splitmux->sink_preset);
894 splitmux->sink_preset = g_value_dup_string (value);
895 GST_OBJECT_UNLOCK (splitmux);
897 case PROP_SINK_PROPERTIES:
898 GST_OBJECT_LOCK (splitmux);
899 if (splitmux->sink_properties)
900 gst_structure_free (splitmux->sink_properties);
901 if (gst_value_get_structure (value))
902 splitmux->sink_properties =
903 gst_structure_copy (gst_value_get_structure (value));
905 splitmux->sink_properties = NULL;
906 GST_OBJECT_UNLOCK (splitmux);
908 case PROP_MUXERPAD_MAP:
910 const GstStructure *s = gst_value_get_structure (value);
911 GST_SPLITMUX_LOCK (splitmux);
912 if (splitmux->muxerpad_map) {
913 gst_structure_free (splitmux->muxerpad_map);
916 splitmux->muxerpad_map = gst_structure_copy (s);
918 splitmux->muxerpad_map = NULL;
919 GST_SPLITMUX_UNLOCK (splitmux);
923 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
929 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
930 GValue * value, GParamSpec * pspec)
932 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
936 GST_OBJECT_LOCK (splitmux);
937 g_value_set_string (value, splitmux->location);
938 GST_OBJECT_UNLOCK (splitmux);
940 case PROP_START_INDEX:
941 GST_OBJECT_LOCK (splitmux);
942 g_value_set_int (value, splitmux->start_index);
943 GST_OBJECT_UNLOCK (splitmux);
945 case PROP_MAX_SIZE_BYTES:
946 GST_OBJECT_LOCK (splitmux);
947 g_value_set_uint64 (value, splitmux->threshold_bytes);
948 GST_OBJECT_UNLOCK (splitmux);
950 case PROP_MAX_SIZE_TIME:
951 GST_OBJECT_LOCK (splitmux);
952 g_value_set_uint64 (value, splitmux->threshold_time);
953 GST_OBJECT_UNLOCK (splitmux);
955 case PROP_MAX_SIZE_TIMECODE:
956 GST_OBJECT_LOCK (splitmux);
957 g_value_set_string (value, splitmux->threshold_timecode_str);
958 GST_OBJECT_UNLOCK (splitmux);
960 case PROP_SEND_KEYFRAME_REQUESTS:
961 GST_OBJECT_LOCK (splitmux);
962 g_value_set_boolean (value, splitmux->send_keyframe_requests);
963 GST_OBJECT_UNLOCK (splitmux);
966 GST_OBJECT_LOCK (splitmux);
967 g_value_set_uint (value, splitmux->max_files);
968 GST_OBJECT_UNLOCK (splitmux);
970 case PROP_MUXER_OVERHEAD:
971 GST_OBJECT_LOCK (splitmux);
972 g_value_set_double (value, splitmux->mux_overhead);
973 GST_OBJECT_UNLOCK (splitmux);
975 case PROP_USE_ROBUST_MUXING:
976 GST_OBJECT_LOCK (splitmux);
977 g_value_set_boolean (value, splitmux->use_robust_muxing);
978 GST_OBJECT_UNLOCK (splitmux);
980 case PROP_ALIGNMENT_THRESHOLD:
981 GST_OBJECT_LOCK (splitmux);
982 g_value_set_uint64 (value, splitmux->alignment_threshold);
983 GST_OBJECT_UNLOCK (splitmux);
986 GST_OBJECT_LOCK (splitmux);
987 g_value_set_object (value, splitmux->provided_sink);
988 GST_OBJECT_UNLOCK (splitmux);
991 GST_OBJECT_LOCK (splitmux);
992 g_value_set_object (value, splitmux->provided_muxer);
993 GST_OBJECT_UNLOCK (splitmux);
995 case PROP_RESET_MUXER:
996 GST_OBJECT_LOCK (splitmux);
997 g_value_set_boolean (value, splitmux->reset_muxer);
998 GST_OBJECT_UNLOCK (splitmux);
1000 case PROP_ASYNC_FINALIZE:
1001 GST_OBJECT_LOCK (splitmux);
1002 g_value_set_boolean (value, splitmux->async_finalize);
1003 GST_OBJECT_UNLOCK (splitmux);
1005 case PROP_MUXER_FACTORY:
1006 GST_OBJECT_LOCK (splitmux);
1007 g_value_set_string (value, splitmux->muxer_factory);
1008 GST_OBJECT_UNLOCK (splitmux);
1010 case PROP_MUXER_PRESET:
1011 GST_OBJECT_LOCK (splitmux);
1012 g_value_set_string (value, splitmux->muxer_preset);
1013 GST_OBJECT_UNLOCK (splitmux);
1015 case PROP_MUXER_PROPERTIES:
1016 GST_OBJECT_LOCK (splitmux);
1017 gst_value_set_structure (value, splitmux->muxer_properties);
1018 GST_OBJECT_UNLOCK (splitmux);
1020 case PROP_SINK_FACTORY:
1021 GST_OBJECT_LOCK (splitmux);
1022 g_value_set_string (value, splitmux->sink_factory);
1023 GST_OBJECT_UNLOCK (splitmux);
1025 case PROP_SINK_PRESET:
1026 GST_OBJECT_LOCK (splitmux);
1027 g_value_set_string (value, splitmux->sink_preset);
1028 GST_OBJECT_UNLOCK (splitmux);
1030 case PROP_SINK_PROPERTIES:
1031 GST_OBJECT_LOCK (splitmux);
1032 gst_value_set_structure (value, splitmux->sink_properties);
1033 GST_OBJECT_UNLOCK (splitmux);
1035 case PROP_MUXERPAD_MAP:
1036 GST_SPLITMUX_LOCK (splitmux);
1037 gst_value_set_structure (value, splitmux->muxerpad_map);
1038 GST_SPLITMUX_UNLOCK (splitmux);
1041 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1046 /* Convenience function */
1047 static inline GstClockTimeDiff
1048 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1050 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1052 if (GST_CLOCK_TIME_IS_VALID (val)) {
1054 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1064 mq_stream_ctx_reset (MqStreamCtx * ctx)
1066 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1067 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1068 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1069 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1070 g_queue_clear (&ctx->queued_bufs);
1073 static MqStreamCtx *
1074 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1078 ctx = g_new0 (MqStreamCtx, 1);
1079 ctx->splitmux = splitmux;
1080 g_queue_init (&ctx->queued_bufs);
1081 mq_stream_ctx_reset (ctx);
1087 mq_stream_ctx_free (MqStreamCtx * ctx)
1090 GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1092 g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1094 if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1095 gst_element_set_locked_state (ctx->q, TRUE);
1096 gst_element_set_state (ctx->q, GST_STATE_NULL);
1097 gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1098 gst_object_unref (parent);
1100 gst_object_unref (ctx->q);
1102 gst_object_unref (ctx->sinkpad);
1103 gst_object_unref (ctx->srcpad);
1104 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1105 g_queue_clear (&ctx->queued_bufs);
1110 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1113 gchar *location = NULL;
1115 const gchar *msg_name = opened ?
1116 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1117 GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1120 GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1122 running_time = *rtime;
1125 if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1126 "location") != NULL)
1127 g_object_get (sink, "location", &location, NULL);
1129 GST_DEBUG_OBJECT (splitmux,
1130 "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1131 msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1133 /* If it's in the middle of a teardown, the reference_ctc might have become
1135 if (splitmux->reference_ctx) {
1136 msg = gst_message_new_element (GST_OBJECT (splitmux),
1137 gst_structure_new (msg_name,
1138 "location", G_TYPE_STRING, location,
1139 "running-time", GST_TYPE_CLOCK_TIME, running_time,
1140 "sink", GST_TYPE_ELEMENT, sink, NULL));
1141 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1148 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1154 eos = gst_event_new_eos ();
1158 GST_SPLITMUX_LOCK (splitmux);
1160 pad = gst_pad_get_peer (ctx->srcpad);
1161 GST_SPLITMUX_UNLOCK (splitmux);
1163 gst_pad_send_event (pad, eos);
1164 GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1166 gst_object_unref (pad);
1170 /* Called with lock held, drops the lock to send EOS to the
1174 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1179 eos = gst_event_new_eos ();
1180 pad = gst_pad_get_peer (ctx->srcpad);
1182 ctx->out_eos = TRUE;
1184 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1185 GST_SPLITMUX_UNLOCK (splitmux);
1186 gst_pad_send_event (pad, eos);
1187 GST_SPLITMUX_LOCK (splitmux);
1189 gst_object_unref (pad);
1192 /* Called with lock held. Schedules an EOS event to the ctx pad
1193 * to happen in another thread */
1195 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1197 AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1198 GstPad *srcpad, *sinkpad;
1200 srcpad = ctx->srcpad;
1201 sinkpad = gst_pad_get_peer (srcpad);
1204 helper->pad = sinkpad; /* Takes the reference */
1206 ctx->out_eos_async_done = TRUE;
1208 /* There used to be a bug here, where we had to explicitly remove
1209 * the SINK flag so that GstBin would ignore it for EOS purposes.
1210 * That fixed a race where if splitmuxsink really reaches EOS
1211 * before an asynchronous background element has finished, then
1212 * the bin wouldn't actually send EOS to the pipeline. Even after
1213 * finishing and removing the old element, the bin didn't re-check
1214 * EOS status on removing a SINK element. That bug was fixed
1216 GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1219 g_assert_nonnull (helper->pad);
1220 gst_element_call_async (GST_ELEMENT (splitmux),
1221 (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1224 /* Called with lock held. TRUE iff all contexts have a
1225 * pending (or delivered) async eos event */
1227 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1229 gboolean ret = TRUE;
1232 for (item = splitmux->contexts; item; item = item->next) {
1233 MqStreamCtx *ctx = item->data;
1234 ret &= ctx->out_eos_async_done;
1239 /* Called with splitmux lock held to check if this output
1240 * context needs to sleep to wait for the release of the
1241 * next GOP, or to send EOS to close out the current file
1243 static GstFlowReturn
1244 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1246 if (ctx->caps_change)
1250 /* When first starting up, the reference stream has to output
1251 * the first buffer to prepare the muxer and sink */
1252 gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1253 GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1255 if (my_max_out_running_time != GST_CLOCK_STIME_NONE
1256 && my_max_out_running_time != G_MAXINT64) {
1257 my_max_out_running_time -= splitmux->alignment_threshold;
1258 GST_LOG_OBJECT (ctx->srcpad,
1259 "Max out running time currently %" GST_STIME_FORMAT
1260 ", with threshold applied it is %" GST_STIME_FORMAT,
1261 GST_STIME_ARGS (splitmux->max_out_running_time),
1262 GST_STIME_ARGS (my_max_out_running_time));
1266 || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1267 return GST_FLOW_FLUSHING;
1269 GST_LOG_OBJECT (ctx->srcpad,
1270 "Checking running time %" GST_STIME_FORMAT " against max %"
1271 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1272 GST_STIME_ARGS (my_max_out_running_time));
1275 if (splitmux->max_out_running_time != GST_CLOCK_STIME_NONE &&
1276 ctx->out_running_time < my_max_out_running_time) {
1280 switch (splitmux->output_state) {
1281 case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1282 /* We only get here if we've finished outputting a GOP and need to know
1283 * what to do next */
1284 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1285 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1288 case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1289 case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1290 /* We've reached the max out running_time to get here, so end this file now */
1291 if (ctx->out_eos == FALSE) {
1292 if (splitmux->async_finalize) {
1293 /* We must set EOS asynchronously at this point. We cannot defer
1294 * it, because we need all contexts to wake up, for the
1295 * reference context to eventually give us something at
1296 * START_NEXT_FILE. Otherwise, collectpads might choose another
1297 * context to give us the first buffer, and format-location-full
1298 * will not contain a valid sample. */
1299 g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1300 GINT_TO_POINTER (1));
1301 eos_context_async (ctx, splitmux);
1302 if (all_contexts_are_async_eos (splitmux)) {
1303 GST_INFO_OBJECT (splitmux,
1304 "All contexts are async_eos. Moving to the next file.");
1305 /* We can start the next file once we've asked each pad to go EOS */
1306 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1307 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1311 send_eos (splitmux, ctx);
1315 GST_INFO_OBJECT (splitmux,
1316 "At end-of-file state, but context %p is already EOS", ctx);
1319 case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1320 if (ctx->is_reference) {
1321 GstFlowReturn ret = GST_FLOW_OK;
1323 /* Special handling on the reference ctx to start new fragments
1324 * and collect commands from the command queue */
1325 /* drops the splitmux lock briefly: */
1326 /* We must have reference ctx in order for format-location-full to
1328 ret = start_next_fragment (splitmux, ctx);
1329 if (ret != GST_FLOW_OK)
1335 case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1337 SplitMuxOutputCommand *cmd =
1338 g_queue_pop_tail (&splitmux->out_cmd_q);
1340 /* If we pop the last command, we need to make our queues bigger */
1341 if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1342 grow_blocked_queues (splitmux);
1344 if (cmd->start_new_fragment) {
1345 if (splitmux->muxed_out_bytes > 0) {
1346 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1347 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1349 GST_DEBUG_OBJECT (splitmux,
1350 "Got cmd to start new fragment, but fragment is empty - ignoring.");
1353 GST_DEBUG_OBJECT (splitmux,
1354 "Got new output cmd for time %" GST_STIME_FORMAT,
1355 GST_STIME_ARGS (cmd->max_output_ts));
1357 /* Extend the output range immediately */
1358 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE
1359 || cmd->max_output_ts > splitmux->max_out_running_time)
1360 splitmux->max_out_running_time = cmd->max_output_ts;
1361 GST_DEBUG_OBJECT (splitmux,
1362 "Max out running time now %" GST_STIME_FORMAT,
1363 GST_STIME_ARGS (splitmux->max_out_running_time));
1364 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1366 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1368 out_cmd_buf_free (cmd);
1371 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1373 } while (!ctx->flushing && splitmux->output_state ==
1374 SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1375 /* loop and re-check the state */
1378 case SPLITMUX_OUTPUT_STATE_STOPPED:
1379 return GST_FLOW_FLUSHING;
1382 GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1385 GST_INFO_OBJECT (ctx->srcpad,
1386 "Sleeping for running time %"
1387 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1388 GST_STIME_ARGS (ctx->out_running_time),
1389 GST_STIME_ARGS (splitmux->max_out_running_time));
1390 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1391 GST_INFO_OBJECT (ctx->srcpad,
1392 "Woken for new max running time %" GST_STIME_FORMAT,
1393 GST_STIME_ARGS (splitmux->max_out_running_time));
1401 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1402 const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1403 GstVideoTimeCode ** next_tc)
1405 GstVideoTimeCode *target_tc;
1406 GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1408 if (cur_tc == NULL || splitmux->tc_interval == NULL)
1409 return GST_CLOCK_TIME_NONE;
1411 target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1413 GST_ELEMENT_ERROR (splitmux,
1414 STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1415 return GST_CLOCK_TIME_NONE;
1419 target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1420 cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1422 /* Add running_time, accounting for wraparound. */
1423 if (target_tc_time >= cur_tc_time) {
1424 next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1426 GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1428 if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1429 (cur_tc->config.fps_d == 1001)) {
1430 /* Checking fps_d is probably unneeded, but better safe than sorry
1431 * (e.g. someone accidentally set a flag) */
1432 GstVideoTimeCode *tc_for_offset;
1434 /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1435 * but slightly less. Calculate that duration from a fake timecode. The
1436 * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1437 * is to add one frame to 23:59:59;29 */
1439 gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1440 NULL, cur_tc->config.flags, 23, 59, 59,
1441 cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1443 gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1444 gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1445 cur_tc->config.fps_n);
1446 gst_video_time_code_free (tc_for_offset);
1448 next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1451 #ifndef GST_DISABLE_GST_DEBUG
1453 gchar *next_max_tc_str, *cur_tc_str;
1455 cur_tc_str = gst_video_time_code_to_string (cur_tc);
1456 next_max_tc_str = gst_video_time_code_to_string (target_tc);
1458 GST_INFO_OBJECT (splitmux, "Next max timecode %s time: %" GST_TIME_FORMAT
1459 " from ref timecode %s time: %" GST_TIME_FORMAT,
1461 GST_TIME_ARGS (next_max_tc_time),
1462 cur_tc_str, GST_TIME_ARGS (cur_tc_time));
1464 g_free (next_max_tc_str);
1465 g_free (cur_tc_str);
1470 *next_tc = target_tc;
1472 gst_video_time_code_free (target_tc);
1474 return next_max_tc_time;
1478 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1479 GstClockTimeDiff running_time_dts)
1482 GstClockTime target_time;
1483 gboolean timecode_based = FALSE;
1484 GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1485 GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1486 GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1487 GstClockTime tc_rounding_error = 5 * GST_USECOND;
1488 InputGop *newest_gop = NULL;
1491 if (!splitmux->send_keyframe_requests)
1494 /* Find the newest GOP where we passed in DTS the start PTS */
1495 for (l = splitmux->pending_input_gops.tail; l; l = l->prev) {
1496 InputGop *tmp = l->data;
1498 GST_TRACE_OBJECT (splitmux,
1499 "Having pending input GOP with start PTS %" GST_STIME_FORMAT
1500 " and start time %" GST_STIME_FORMAT,
1501 GST_STIME_ARGS (tmp->start_time_pts), GST_STIME_ARGS (tmp->start_time));
1503 if (tmp->sent_fku) {
1504 GST_DEBUG_OBJECT (splitmux,
1505 "Already checked for a keyframe request for this GOP");
1509 if (running_time_dts == GST_CLOCK_STIME_NONE ||
1510 tmp->start_time_pts == GST_CLOCK_STIME_NONE ||
1511 running_time_dts >= tmp->start_time_pts) {
1512 GST_DEBUG_OBJECT (splitmux,
1513 "Using GOP with start PTS %" GST_STIME_FORMAT " and start time %"
1514 GST_STIME_FORMAT, GST_STIME_ARGS (tmp->start_time_pts),
1515 GST_STIME_ARGS (tmp->start_time));
1522 GST_DEBUG_OBJECT (splitmux, "Have no complete enough pending input GOP");
1526 if (splitmux->tc_interval) {
1527 if (newest_gop->start_tc
1528 && gst_video_time_code_is_valid (newest_gop->start_tc)) {
1529 GstVideoTimeCode *next_tc = NULL;
1531 calculate_next_max_timecode (splitmux, newest_gop->start_tc,
1532 newest_gop->start_time, &next_tc);
1534 /* calculate the next expected keyframe time to prevent too early fku
1536 if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1538 calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1541 gst_video_time_code_free (next_tc);
1543 timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1544 GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1546 if (!timecode_based) {
1547 GST_WARNING_OBJECT (splitmux,
1548 "Couldn't calculate maximum fragment time for timecode mode");
1551 /* This can happen in the presence of GAP events that trigger
1552 * a new fragment start */
1553 GST_WARNING_OBJECT (splitmux,
1554 "No buffer available to calculate next timecode");
1558 if ((splitmux->threshold_time == 0 && !timecode_based)
1559 || splitmux->threshold_bytes != 0)
1562 if (timecode_based) {
1563 /* We might have rounding errors: aim slightly earlier */
1564 if (max_tc_time >= tc_rounding_error) {
1565 target_time = max_tc_time - tc_rounding_error;
1567 /* unreliable target time */
1568 GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1569 " is smaller than allowed rounding error, set it to zero",
1570 GST_TIME_ARGS (max_tc_time));
1574 if (next_max_tc_time >= tc_rounding_error) {
1575 next_fku_time = next_max_tc_time - tc_rounding_error;
1577 /* unreliable target time */
1578 GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1579 " is smaller than allowed rounding error, set it to zero",
1580 GST_TIME_ARGS (next_max_tc_time));
1584 target_time = newest_gop->start_time + splitmux->threshold_time;
1587 if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1588 GstClockTime allowed_time = splitmux->next_fku_time;
1590 if (timecode_based) {
1591 if (allowed_time >= tc_rounding_error) {
1592 allowed_time -= tc_rounding_error;
1594 /* unreliable next force key unit time */
1595 GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1597 " is smaller than allowed rounding error, set it to zero",
1598 GST_TIME_ARGS (splitmux->next_fku_time));
1603 if (target_time < allowed_time) {
1604 GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1605 " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1606 ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1607 GST_TIME_ARGS (target_time),
1608 GST_TIME_ARGS (splitmux->next_fku_time),
1609 GST_TIME_ARGS (allowed_time));
1612 } else if (allowed_time != splitmux->next_fku_time &&
1613 target_time < splitmux->next_fku_time) {
1614 GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1615 " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1616 ", but the difference is smaller than allowed rounding error",
1617 GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1621 if (!timecode_based) {
1622 next_fku_time = target_time + splitmux->threshold_time;
1625 GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1626 ", the next expected keyframe request time is %" GST_TIME_FORMAT,
1627 GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1629 newest_gop->sent_fku = TRUE;
1631 splitmux->next_fku_time = next_fku_time;
1632 ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1634 return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1637 static GstPadProbeReturn
1638 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1640 GstSplitMuxSink *splitmux = ctx->splitmux;
1641 MqStreamBuf *buf_info = NULL;
1642 GstFlowReturn ret = GST_FLOW_OK;
1644 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1646 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1647 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1648 g_warning ("Buffer list handling not implemented");
1649 return GST_PAD_PROBE_DROP;
1651 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1652 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1653 GstEvent *event = gst_pad_probe_info_get_event (info);
1654 gboolean locked = FALSE, wait = !ctx->is_reference;
1656 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1658 switch (GST_EVENT_TYPE (event)) {
1659 case GST_EVENT_SEGMENT:
1660 gst_event_copy_segment (event, &ctx->out_segment);
1662 case GST_EVENT_FLUSH_STOP:
1663 GST_SPLITMUX_LOCK (splitmux);
1665 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1666 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1667 g_queue_clear (&ctx->queued_bufs);
1668 g_queue_clear (&ctx->queued_bufs);
1669 /* If this is the reference context, we just threw away any queued keyframes */
1670 if (ctx->is_reference)
1671 splitmux->queued_keyframes = 0;
1672 ctx->flushing = FALSE;
1675 case GST_EVENT_FLUSH_START:
1676 GST_SPLITMUX_LOCK (splitmux);
1678 GST_LOG_OBJECT (pad, "Flush start");
1679 ctx->flushing = TRUE;
1680 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1681 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1684 GST_SPLITMUX_LOCK (splitmux);
1686 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1688 ctx->out_eos = TRUE;
1690 if (ctx == splitmux->reference_ctx) {
1691 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1692 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1695 GST_INFO_OBJECT (splitmux,
1696 "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1698 case GST_EVENT_GAP:{
1699 GstClockTime gap_ts;
1700 GstClockTimeDiff rtime;
1702 gst_event_parse_gap (event, &gap_ts, NULL);
1703 if (gap_ts == GST_CLOCK_TIME_NONE)
1706 GST_SPLITMUX_LOCK (splitmux);
1709 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1712 /* When we get a gap event on the
1713 * reference stream and we're trying to open a
1714 * new file, we need to store it until we get
1715 * the buffer afterwards
1717 if (ctx->is_reference &&
1718 (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1719 GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1720 gst_event_replace (&ctx->pending_gap, event);
1721 GST_SPLITMUX_UNLOCK (splitmux);
1722 return GST_PAD_PROBE_HANDLED;
1725 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1727 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1728 GST_STIME_ARGS (rtime));
1730 if (rtime != GST_CLOCK_STIME_NONE) {
1731 ctx->out_running_time = rtime;
1732 complete_or_wait_on_out (splitmux, ctx);
1736 case GST_EVENT_CUSTOM_DOWNSTREAM:{
1737 const GstStructure *s;
1738 GstClockTimeDiff ts = 0;
1740 s = gst_event_get_structure (event);
1741 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1744 gst_structure_get_int64 (s, "timestamp", &ts);
1746 GST_SPLITMUX_LOCK (splitmux);
1749 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1751 ctx->out_running_time = ts;
1752 if (!ctx->is_reference)
1753 ret = complete_or_wait_on_out (splitmux, ctx);
1754 GST_SPLITMUX_UNLOCK (splitmux);
1755 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1756 return GST_PAD_PROBE_DROP;
1758 case GST_EVENT_CAPS:{
1761 if (!ctx->is_reference)
1764 peer = gst_pad_get_peer (pad);
1766 gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1768 gst_object_unref (peer);
1776 /* This is in the case the muxer doesn't allow this change of caps */
1777 GST_SPLITMUX_LOCK (splitmux);
1779 ctx->caps_change = TRUE;
1781 if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1782 GST_DEBUG_OBJECT (splitmux,
1783 "New caps were not accepted. Switching output file");
1784 if (ctx->out_eos == FALSE) {
1785 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1786 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1790 /* Lets it fall through, if it fails again, then the muxer just can't
1791 * support this format, but at least we have a closed file.
1799 /* We need to make sure events aren't passed
1800 * until the muxer / sink are ready for it */
1802 GST_SPLITMUX_LOCK (splitmux);
1804 ret = complete_or_wait_on_out (splitmux, ctx);
1805 GST_SPLITMUX_UNLOCK (splitmux);
1807 /* Don't try to forward sticky events before the next buffer is there
1808 * because it would cause a new file to be created without the first
1809 * buffer being available.
1811 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1812 if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1813 gst_event_unref (event);
1814 return GST_PAD_PROBE_HANDLED;
1816 return GST_PAD_PROBE_PASS;
1820 /* Allow everything through until the configured next stopping point */
1821 GST_SPLITMUX_LOCK (splitmux);
1823 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1824 if (buf_info == NULL) {
1825 /* Can only happen due to a poorly timed flush */
1826 ret = GST_FLOW_FLUSHING;
1830 /* If we have popped a keyframe, decrement the queued_gop count */
1831 if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
1832 splitmux->queued_keyframes--;
1834 ctx->out_running_time = buf_info->run_ts;
1835 ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1837 GST_LOG_OBJECT (splitmux,
1838 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1839 " size %" G_GUINT64_FORMAT,
1840 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1842 ctx->caps_change = FALSE;
1844 ret = complete_or_wait_on_out (splitmux, ctx);
1846 splitmux->muxed_out_bytes += buf_info->buf_size;
1848 #ifndef GST_DISABLE_GST_DEBUG
1850 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1851 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1852 " run ts %" GST_STIME_FORMAT, buf,
1853 GST_STIME_ARGS (ctx->out_running_time));
1857 ctx->cur_out_buffer = NULL;
1858 GST_SPLITMUX_UNLOCK (splitmux);
1860 /* pending_gap is protected by the STREAM lock */
1861 if (ctx->pending_gap) {
1862 /* If we previously stored a gap event, send it now */
1863 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1865 GST_DEBUG_OBJECT (splitmux,
1866 "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1868 gst_pad_send_event (peer, ctx->pending_gap);
1869 ctx->pending_gap = NULL;
1871 gst_object_unref (peer);
1874 mq_stream_buf_free (buf_info);
1876 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1877 return GST_PAD_PROBE_PASS;
1880 GST_SPLITMUX_UNLOCK (splitmux);
1881 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1882 return GST_PAD_PROBE_DROP;
1886 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1888 return gst_pad_send_event (peer, gst_event_ref (*event));
1892 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1894 if (ctx->fragment_block_id > 0) {
1895 gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1896 ctx->fragment_block_id = 0;
1901 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1903 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1905 gst_pad_sticky_events_foreach (ctx->srcpad,
1906 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1908 /* Clear EOS flag if not actually EOS */
1909 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1910 ctx->out_eos_async_done = ctx->out_eos;
1912 gst_object_unref (peer);
1916 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1918 GstPad *sinkpad, *srcpad, *newpad;
1919 GstPadTemplate *templ;
1921 srcpad = ctx->srcpad;
1922 sinkpad = gst_pad_get_peer (srcpad);
1924 templ = sinkpad->padtemplate;
1926 gst_element_request_pad (splitmux->muxer, templ,
1927 GST_PAD_NAME (sinkpad), NULL);
1929 GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1931 if (!gst_pad_unlink (srcpad, sinkpad)) {
1932 gst_object_unref (sinkpad);
1935 if (gst_pad_link_full (srcpad, newpad,
1936 GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1937 gst_element_release_request_pad (splitmux->muxer, newpad);
1938 gst_object_unref (sinkpad);
1939 gst_object_unref (newpad);
1942 gst_object_unref (newpad);
1943 gst_object_unref (sinkpad);
1947 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1948 ("Could not create the new muxer/sink"), NULL);
1951 static GstPadProbeReturn
1952 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1954 return GST_PAD_PROBE_OK;
1958 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1960 ctx->fragment_block_id =
1961 gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1966 _set_property_from_structure (GQuark field_id, const GValue * value,
1969 const gchar *property_name = g_quark_to_string (field_id);
1970 GObject *element = G_OBJECT (user_data);
1972 g_object_set_property (element, property_name, value);
1978 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1980 gst_element_set_locked_state (element, TRUE);
1981 gst_element_set_state (element, GST_STATE_NULL);
1982 GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1983 gst_bin_remove (GST_BIN (splitmux), element);
1988 _send_event (const GValue * value, gpointer user_data)
1990 GstPad *pad = g_value_get_object (value);
1991 GstEvent *ev = user_data;
1993 gst_pad_send_event (pad, gst_event_ref (ev));
1996 /* Called with lock held when a fragment
1997 * reaches EOS and it is time to restart
2000 static GstFlowReturn
2001 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2003 GstElement *muxer, *sink;
2005 g_assert (ctx->is_reference);
2007 /* 1 change to new file */
2008 splitmux->switching_fragment = TRUE;
2010 /* We need to drop the splitmux lock to acquire the state lock
2011 * here and ensure there's no racy state change going on elsewhere */
2012 muxer = gst_object_ref (splitmux->muxer);
2013 sink = gst_object_ref (splitmux->active_sink);
2015 GST_SPLITMUX_UNLOCK (splitmux);
2016 GST_SPLITMUX_STATE_LOCK (splitmux);
2018 if (splitmux->shutdown) {
2019 GST_DEBUG_OBJECT (splitmux,
2020 "Shutdown requested. Aborting fragment switch.");
2021 GST_SPLITMUX_LOCK (splitmux);
2022 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2023 gst_object_unref (muxer);
2024 gst_object_unref (sink);
2025 return GST_FLOW_FLUSHING;
2028 if (splitmux->async_finalize) {
2029 if (splitmux->muxed_out_bytes > 0
2030 || splitmux->fragment_id != splitmux->start_index) {
2032 GstElement *new_sink, *new_muxer;
2034 GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
2035 splitmux->fragment_id);
2036 g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
2037 newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
2038 GST_SPLITMUX_LOCK (splitmux);
2039 if ((splitmux->sink =
2040 create_element (splitmux, splitmux->sink_factory, newname,
2043 if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
2044 gst_preset_load_preset (GST_PRESET (splitmux->sink),
2045 splitmux->sink_preset);
2046 if (splitmux->sink_properties)
2047 gst_structure_foreach (splitmux->sink_properties,
2048 _set_property_from_structure, splitmux->sink);
2049 splitmux->active_sink = splitmux->sink;
2050 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
2052 newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
2053 if ((splitmux->muxer =
2054 create_element (splitmux, splitmux->muxer_factory, newname,
2057 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2059 /* async child elements are causing state change races and weird
2060 * failures, so let's try and turn that off */
2061 g_object_set (splitmux->sink, "async", FALSE, NULL);
2063 if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
2064 gst_preset_load_preset (GST_PRESET (splitmux->muxer),
2065 splitmux->muxer_preset);
2066 if (splitmux->muxer_properties)
2067 gst_structure_foreach (splitmux->muxer_properties,
2068 _set_property_from_structure, splitmux->muxer);
2069 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2071 new_sink = splitmux->sink;
2072 new_muxer = splitmux->muxer;
2073 GST_SPLITMUX_UNLOCK (splitmux);
2074 g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2075 gst_element_link (new_muxer, new_sink);
2077 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2078 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2079 EOS_FROM_US)) == 2) {
2080 _lock_and_set_to_null (muxer, splitmux);
2081 _lock_and_set_to_null (sink, splitmux);
2083 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2084 GINT_TO_POINTER (2));
2087 gst_object_unref (muxer);
2088 gst_object_unref (sink);
2091 gst_object_ref (muxer);
2092 gst_object_ref (sink);
2096 gst_element_set_locked_state (muxer, TRUE);
2097 gst_element_set_locked_state (sink, TRUE);
2098 gst_element_set_state (sink, GST_STATE_NULL);
2100 if (splitmux->reset_muxer) {
2101 gst_element_set_state (muxer, GST_STATE_NULL);
2103 GstIterator *it = gst_element_iterate_sink_pads (muxer);
2107 ev = gst_event_new_flush_start ();
2108 seqnum = gst_event_get_seqnum (ev);
2109 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2110 gst_event_unref (ev);
2112 gst_iterator_resync (it);
2114 ev = gst_event_new_flush_stop (TRUE);
2115 gst_event_set_seqnum (ev, seqnum);
2116 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2117 gst_event_unref (ev);
2119 gst_iterator_free (it);
2123 GST_SPLITMUX_LOCK (splitmux);
2124 set_next_filename (splitmux, ctx);
2125 splitmux->muxed_out_bytes = 0;
2126 GST_SPLITMUX_UNLOCK (splitmux);
2128 if (gst_element_set_state (sink,
2129 GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2130 gst_element_set_state (sink, GST_STATE_NULL);
2131 gst_element_set_locked_state (muxer, FALSE);
2132 gst_element_set_locked_state (sink, FALSE);
2137 if (gst_element_set_state (muxer,
2138 GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2139 gst_element_set_state (muxer, GST_STATE_NULL);
2140 gst_element_set_state (sink, GST_STATE_NULL);
2141 gst_element_set_locked_state (muxer, FALSE);
2142 gst_element_set_locked_state (sink, FALSE);
2146 gst_element_set_locked_state (muxer, FALSE);
2147 gst_element_set_locked_state (sink, FALSE);
2149 gst_object_unref (sink);
2150 gst_object_unref (muxer);
2152 GST_SPLITMUX_LOCK (splitmux);
2153 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2154 splitmux->switching_fragment = FALSE;
2155 do_async_done (splitmux);
2157 splitmux->ready_for_output = TRUE;
2159 g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2160 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2162 send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2164 /* FIXME: Is this always the correct next state? */
2165 GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2166 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2167 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2171 gst_object_unref (sink);
2172 gst_object_unref (muxer);
2174 GST_SPLITMUX_LOCK (splitmux);
2175 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2176 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2177 ("Could not create the new muxer/sink"), NULL);
2178 return GST_FLOW_ERROR;
2181 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2182 ("Could not start new output sink"), NULL);
2183 gst_object_unref (sink);
2184 gst_object_unref (muxer);
2186 GST_SPLITMUX_LOCK (splitmux);
2187 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2188 splitmux->switching_fragment = FALSE;
2189 return GST_FLOW_ERROR;
2192 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2193 ("Could not start new muxer"), NULL);
2194 gst_object_unref (sink);
2195 gst_object_unref (muxer);
2197 GST_SPLITMUX_LOCK (splitmux);
2198 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2199 splitmux->switching_fragment = FALSE;
2200 return GST_FLOW_ERROR;
2204 bus_handler (GstBin * bin, GstMessage * message)
2206 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2208 switch (GST_MESSAGE_TYPE (message)) {
2209 case GST_MESSAGE_EOS:{
2210 /* If the state is draining out the current file, drop this EOS */
2213 sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2214 GST_SPLITMUX_LOCK (splitmux);
2216 send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2218 if (splitmux->async_finalize) {
2220 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2221 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2222 EOS_FROM_US)) == 2) {
2224 GstPad *sinksink, *muxersrc;
2226 sinksink = gst_element_get_static_pad (sink, "sink");
2227 muxersrc = gst_pad_get_peer (sinksink);
2228 muxer = gst_pad_get_parent_element (muxersrc);
2229 gst_object_unref (sinksink);
2230 gst_object_unref (muxersrc);
2232 gst_element_call_async (muxer,
2233 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2234 gst_object_ref (splitmux), gst_object_unref);
2235 gst_element_call_async (sink,
2236 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2237 gst_object_ref (splitmux), gst_object_unref);
2238 gst_object_unref (muxer);
2240 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2241 GINT_TO_POINTER (2));
2243 GST_DEBUG_OBJECT (splitmux,
2244 "Caught async EOS from previous muxer+sink. Dropping.");
2245 /* We forward the EOS so that it gets aggregated as normal. If the sink
2246 * finishes and is removed before the end, it will be de-aggregated */
2247 gst_message_unref (message);
2248 GST_SPLITMUX_UNLOCK (splitmux);
2251 } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2252 GST_DEBUG_OBJECT (splitmux,
2253 "Passing EOS message. Output state %d max_out_running_time %"
2254 GST_STIME_FORMAT, splitmux->output_state,
2255 GST_STIME_ARGS (splitmux->max_out_running_time));
2257 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2258 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2259 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2261 gst_message_unref (message);
2262 GST_SPLITMUX_UNLOCK (splitmux);
2265 GST_SPLITMUX_UNLOCK (splitmux);
2268 case GST_MESSAGE_ASYNC_START:
2269 case GST_MESSAGE_ASYNC_DONE:
2270 /* Ignore state changes from our children while switching */
2271 GST_SPLITMUX_LOCK (splitmux);
2272 if (splitmux->switching_fragment) {
2273 if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2274 || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2275 GST_LOG_OBJECT (splitmux,
2276 "Ignoring state change from child %" GST_PTR_FORMAT
2277 " while switching", GST_MESSAGE_SRC (message));
2278 gst_message_unref (message);
2279 GST_SPLITMUX_UNLOCK (splitmux);
2283 GST_SPLITMUX_UNLOCK (splitmux);
2285 case GST_MESSAGE_WARNING:
2287 GError *gerror = NULL;
2289 gst_message_parse_warning (message, &gerror, NULL);
2291 if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2293 gboolean caps_change = FALSE;
2295 GST_SPLITMUX_LOCK (splitmux);
2297 for (item = splitmux->contexts; item; item = item->next) {
2298 MqStreamCtx *ctx = item->data;
2300 if (ctx->caps_change) {
2306 GST_SPLITMUX_UNLOCK (splitmux);
2309 GST_LOG_OBJECT (splitmux,
2310 "Ignoring warning change from child %" GST_PTR_FORMAT
2311 " while switching caps", GST_MESSAGE_SRC (message));
2312 gst_message_unref (message);
2322 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2326 ctx_set_unblock (MqStreamCtx * ctx)
2328 ctx->need_unblock = TRUE;
2332 need_new_fragment (GstSplitMuxSink * splitmux,
2333 GstClockTime queued_time, GstClockTime queued_gop_time,
2334 guint64 queued_bytes)
2336 guint64 thresh_bytes;
2337 GstClockTime thresh_time;
2338 gboolean check_robust_muxing;
2339 GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2340 GstClockTime *ptr_to_time;
2341 const InputGop *gop, *next_gop;
2343 GST_OBJECT_LOCK (splitmux);
2344 thresh_bytes = splitmux->threshold_bytes;
2345 thresh_time = splitmux->threshold_time;
2346 ptr_to_time = (GstClockTime *)
2347 gst_queue_array_peek_head_struct (splitmux->times_to_split);
2349 time_to_split = *ptr_to_time;
2350 check_robust_muxing = splitmux->use_robust_muxing
2351 && splitmux->muxer_has_reserved_props;
2352 GST_OBJECT_UNLOCK (splitmux);
2354 /* Have we muxed at least one thing from the reference
2355 * stream into the file? If not, no other streams can have
2357 if (splitmux->fragment_reference_bytes <= 0) {
2358 GST_TRACE_OBJECT (splitmux,
2359 "Not ready to split - nothing muxed on the reference stream");
2363 /* User told us to split now */
2364 if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2365 GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2369 gop = g_queue_peek_head (&splitmux->pending_input_gops);
2370 /* We need a full GOP queued up at this point */
2371 g_assert (gop != NULL);
2372 next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2373 /* And the beginning of the next GOP or otherwise EOS */
2375 /* User told us to split at this running time */
2376 if (gop->start_time >= time_to_split) {
2377 GST_OBJECT_LOCK (splitmux);
2378 /* Dequeue running time */
2379 gst_queue_array_pop_head_struct (splitmux->times_to_split);
2380 /* Empty any running times after this that are past now */
2381 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2382 while (ptr_to_time) {
2383 time_to_split = *ptr_to_time;
2384 if (gop->start_time < time_to_split) {
2387 gst_queue_array_pop_head_struct (splitmux->times_to_split);
2388 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2390 GST_TRACE_OBJECT (splitmux,
2391 "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2392 GST_STIME_FORMAT, GST_STIME_ARGS (gop->start_time),
2393 GST_STIME_ARGS (time_to_split));
2394 GST_OBJECT_UNLOCK (splitmux);
2398 if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2399 GST_TRACE_OBJECT (splitmux,
2400 "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2401 return TRUE; /* Would overrun byte limit */
2404 if (thresh_time > 0 && queued_time > thresh_time) {
2405 GST_TRACE_OBJECT (splitmux,
2406 "queued time %" GST_STIME_FORMAT " overruns time limit",
2407 GST_STIME_ARGS (queued_time));
2408 return TRUE; /* Would overrun time limit */
2411 if (splitmux->tc_interval) {
2412 GstClockTime next_gop_start_time =
2413 next_gop ? next_gop->start_time : splitmux->max_in_running_time;
2415 if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2416 GST_CLOCK_STIME_IS_VALID (next_gop_start_time) &&
2417 next_gop_start_time >
2418 splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2419 GST_TRACE_OBJECT (splitmux,
2420 "in running time %" GST_STIME_FORMAT " overruns time limit %"
2421 GST_TIME_FORMAT, GST_STIME_ARGS (next_gop_start_time),
2422 GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2427 if (check_robust_muxing) {
2428 GstClockTime mux_reserved_remain;
2430 g_object_get (splitmux->muxer,
2431 "reserved-duration-remaining", &mux_reserved_remain, NULL);
2433 GST_LOG_OBJECT (splitmux,
2434 "Muxer robust muxing report - %" G_GUINT64_FORMAT
2435 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2436 mux_reserved_remain, queued_gop_time);
2438 if (queued_gop_time >= mux_reserved_remain) {
2439 GST_INFO_OBJECT (splitmux,
2440 "File is about to run out of header room - %" G_GUINT64_FORMAT
2441 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2442 ". Switching to new file", mux_reserved_remain, queued_gop_time);
2447 /* Continue and mux this GOP */
2451 /* probably we want to add this API? */
2453 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2455 GstVideoTimeCode *timecode = NULL;
2457 g_return_if_fail (old_tc != NULL);
2459 if (*old_tc == new_tc)
2463 timecode = gst_video_time_code_copy (new_tc);
2466 gst_video_time_code_free (*old_tc);
2471 /* Called with splitmux lock held */
2472 /* Called when entering ProcessingCompleteGop state
2473 * Assess if mq contents overflowed the current file
2474 * -> If yes, need to switch to new file
2475 * -> if no, set max_out_running_time to let this GOP in and
2476 * go to COLLECTING_GOP_START state
2479 handle_gathered_gop (GstSplitMuxSink * splitmux, const InputGop * gop,
2480 GstClockTimeDiff next_gop_start_time, GstClockTimeDiff max_out_running_time)
2482 guint64 queued_bytes;
2483 GstClockTimeDiff queued_time = 0;
2484 GstClockTimeDiff queued_gop_time = 0;
2485 SplitMuxOutputCommand *cmd;
2487 /* Assess if the multiqueue contents overflowed the current file */
2488 /* When considering if a newly gathered GOP overflows
2489 * the time limit for the file, only consider the running time of the
2490 * reference stream. Other streams might have run ahead a little bit,
2491 * but extra pieces won't be released to the muxer beyond the reference
2492 * stream cut-off anyway - so it forms the limit. */
2493 queued_bytes = splitmux->fragment_total_bytes + gop->total_bytes;
2494 queued_time = next_gop_start_time;
2495 /* queued_gop_time tracks how much unwritten data there is waiting to
2496 * be written to this fragment including this GOP */
2497 if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2498 queued_gop_time = queued_time - splitmux->reference_ctx->out_running_time;
2500 queued_gop_time = queued_time - gop->start_time;
2502 GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2503 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2504 " bytes %" G_GUINT64_FORMAT " in next gop start time %" GST_STIME_FORMAT
2505 " gop start time %" GST_STIME_FORMAT,
2506 GST_STIME_ARGS (queued_time), queued_bytes,
2507 GST_STIME_ARGS (next_gop_start_time), GST_STIME_ARGS (gop->start_time));
2509 if (queued_gop_time < 0)
2510 goto error_gop_duration;
2512 if (queued_time < splitmux->fragment_start_time)
2513 goto error_queued_time;
2515 queued_time -= splitmux->fragment_start_time;
2516 if (queued_time < queued_gop_time)
2517 queued_gop_time = queued_time;
2519 /* Expand queued bytes estimate by muxer overhead */
2520 queued_bytes += (queued_bytes * splitmux->mux_overhead);
2522 /* Check for overrun - have we output at least one byte and overrun
2523 * either threshold? */
2524 if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2525 if (splitmux->async_finalize) {
2526 GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2527 *sink_running_time = splitmux->reference_ctx->out_running_time;
2528 g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2529 RUNNING_TIME, sink_running_time, g_free);
2531 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2532 /* Tell the output side to start a new fragment */
2533 GST_INFO_OBJECT (splitmux,
2534 "This GOP (dur %" GST_STIME_FORMAT
2535 ") would overflow the fragment, Sending start_new_fragment cmd",
2536 GST_STIME_ARGS (queued_gop_time));
2537 cmd = out_cmd_buf_new ();
2538 cmd->start_new_fragment = TRUE;
2539 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2540 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2542 splitmux->fragment_start_time = gop->start_time;
2543 splitmux->fragment_start_time_pts = gop->start_time_pts;
2544 splitmux->fragment_total_bytes = 0;
2545 splitmux->fragment_reference_bytes = 0;
2547 video_time_code_replace (&splitmux->fragment_start_tc, gop->start_tc);
2548 splitmux->next_fragment_start_tc_time =
2549 calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2550 splitmux->fragment_start_time, NULL);
2551 if (splitmux->tc_interval && splitmux->fragment_start_tc
2552 && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2553 GST_WARNING_OBJECT (splitmux,
2554 "Couldn't calculate next fragment start time for timecode mode");
2558 /* And set up to collect the next GOP */
2559 if (max_out_running_time != G_MAXINT64) {
2560 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2562 /* This is probably already the current state, but just in case: */
2563 splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2566 /* And wake all input contexts to send a wake-up event */
2567 g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2568 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2570 /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2571 splitmux->fragment_total_bytes += gop->total_bytes;
2572 splitmux->fragment_reference_bytes += gop->reference_bytes;
2574 if (gop->total_bytes > 0) {
2575 GST_LOG_OBJECT (splitmux,
2576 "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2577 " time %" GST_STIME_FORMAT,
2578 splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2580 /* Send this GOP to the output command queue */
2581 cmd = out_cmd_buf_new ();
2582 cmd->start_new_fragment = FALSE;
2583 cmd->max_output_ts = max_out_running_time;
2584 GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2585 GST_STIME_FORMAT, GST_STIME_ARGS (max_out_running_time));
2586 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2588 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2594 GST_ELEMENT_ERROR (splitmux,
2595 STREAM, FAILED, ("Timestamping error on input streams"),
2596 ("Queued GOP time is negative %" GST_STIME_FORMAT,
2597 GST_STIME_ARGS (queued_gop_time)));
2600 GST_ELEMENT_ERROR (splitmux,
2601 STREAM, FAILED, ("Timestamping error on input streams"),
2602 ("Queued time is negative. Input went backwards. queued_time - %"
2603 GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2607 /* Called with splitmux lock held */
2608 /* Called from each input pad when it is has all the pieces
2609 * for a GOP or EOS, starting with the reference pad which has set the
2610 * splitmux->max_in_running_time
2613 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2618 /* On ENDING_FILE, the reference stream sends a command to start a new
2619 * fragment, then releases the GOP for output in the new fragment.
2620 * If some streams received no buffer during the last GOP that overran,
2621 * because its next buffer has a timestamp bigger than
2622 * ctx->max_in_running_time, its queue is empty. In that case the only
2623 * way to wakeup the output thread is by injecting an event in the
2624 * queue. This usually happen with subtitle streams.
2625 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2626 if (ctx->need_unblock) {
2627 GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2628 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2629 GST_EVENT_TYPE_SERIALIZED,
2630 gst_structure_new ("splitmuxsink-unblock", "timestamp",
2631 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2633 GST_SPLITMUX_UNLOCK (splitmux);
2634 gst_pad_send_event (ctx->sinkpad, event);
2635 GST_SPLITMUX_LOCK (splitmux);
2637 ctx->need_unblock = FALSE;
2638 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2639 /* state may have changed while we were unlocked. Loop again if so */
2640 if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2645 GstClockTimeDiff next_gop_start = GST_CLOCK_STIME_NONE;
2647 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2648 GstClockTimeDiff max_out_running_time;
2649 gboolean ready = TRUE;
2651 const InputGop *next_gop;
2653 gop = g_queue_peek_head (&splitmux->pending_input_gops);
2654 next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
2656 /* If we have no GOP or no next GOP here then the reference context is
2657 * at EOS, otherwise use the start time of the next GOP if we're far
2658 * enough in the GOP to know it */
2659 if (gop && next_gop) {
2660 if (!splitmux->reference_ctx->in_eos
2661 && splitmux->max_in_running_time_dts != GST_CLOCK_STIME_NONE
2662 && splitmux->max_in_running_time_dts < next_gop->start_time_pts) {
2663 GST_LOG_OBJECT (splitmux,
2664 "No further GOPs finished collecting, waiting until current DTS %"
2665 GST_STIME_FORMAT " has passed next GOP start PTS %"
2667 GST_STIME_ARGS (splitmux->max_in_running_time_dts),
2668 GST_STIME_ARGS (next_gop->start_time_pts));
2672 GST_LOG_OBJECT (splitmux,
2673 "Finished collecting GOP with start time %" GST_STIME_FORMAT
2674 ", next GOP start time %" GST_STIME_FORMAT,
2675 GST_STIME_ARGS (gop->start_time),
2676 GST_STIME_ARGS (next_gop->start_time));
2677 next_gop_start = next_gop->start_time;
2678 max_out_running_time =
2679 splitmux->reference_ctx->in_eos ? G_MAXINT64 : next_gop->start_time;
2680 } else if (!next_gop) {
2681 GST_LOG_OBJECT (splitmux, "Reference context is EOS");
2682 next_gop_start = splitmux->max_in_running_time;
2683 max_out_running_time = G_MAXINT64;
2685 GST_LOG_OBJECT (splitmux, "No further GOPs finished collecting");
2688 g_assert_not_reached ();
2691 g_assert (gop != NULL);
2693 /* Iterate each pad, and check that the input running time is at least
2694 * up to the start running time of the next GOP or EOS, and if so handle
2695 * the collected GOP */
2696 GST_LOG_OBJECT (splitmux, "Checking GOP collected, next GOP start %"
2697 GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (next_gop_start), ctx);
2698 for (cur = g_list_first (splitmux->contexts); cur != NULL;
2699 cur = g_list_next (cur)) {
2700 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2702 GST_LOG_OBJECT (splitmux,
2703 "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2704 " EOS %d", tmpctx, tmpctx->sinkpad,
2705 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2707 if (next_gop_start != GST_CLOCK_STIME_NONE &&
2708 tmpctx->in_running_time < next_gop_start && !tmpctx->in_eos) {
2709 GST_LOG_OBJECT (splitmux,
2710 "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2711 tmpctx, tmpctx->sinkpad);
2717 GST_DEBUG_OBJECT (splitmux,
2718 "Collected GOP is complete. Processing (ctx %p)", ctx);
2719 /* All pads have a complete GOP, release it into the multiqueue */
2720 handle_gathered_gop (splitmux, gop, next_gop_start,
2721 max_out_running_time);
2723 g_queue_pop_head (&splitmux->pending_input_gops);
2724 input_gop_free (gop);
2726 /* The user has requested a split, we can split now that the previous GOP
2727 * has been collected to the correct location */
2728 if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2730 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2735 /* If upstream reached EOS we are not expecting more data, no need to wait
2740 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2742 ctx->in_running_time >= next_gop_start &&
2743 next_gop_start != GST_CLOCK_STIME_NONE) {
2744 /* Some pad is not yet ready, or GOP is being pushed
2745 * either way, sleep and wait to get woken */
2746 GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2747 GST_SPLITMUX_WAIT_INPUT (splitmux);
2748 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2750 /* This pad is not ready or the state changed - break out and get another
2754 } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2757 static GstPadProbeReturn
2758 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2760 GstSplitMuxSink *splitmux = ctx->splitmux;
2761 GstFlowReturn ret = GST_FLOW_OK;
2763 MqStreamBuf *buf_info = NULL;
2764 GstClockTime ts, pts, dts;
2765 GstClockTimeDiff running_time, running_time_pts, running_time_dts;
2766 gboolean loop_again;
2767 gboolean keyframe = FALSE;
2769 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2771 /* FIXME: Handle buffer lists, until then make it clear they won't work */
2772 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2773 g_warning ("Buffer list handling not implemented");
2774 return GST_PAD_PROBE_DROP;
2776 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2777 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2778 GstEvent *event = gst_pad_probe_info_get_event (info);
2780 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2782 switch (GST_EVENT_TYPE (event)) {
2783 case GST_EVENT_SEGMENT:
2784 gst_event_copy_segment (event, &ctx->in_segment);
2786 case GST_EVENT_FLUSH_STOP:
2787 GST_SPLITMUX_LOCK (splitmux);
2788 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2789 ctx->in_eos = FALSE;
2790 ctx->in_running_time = GST_CLOCK_STIME_NONE;
2791 GST_SPLITMUX_UNLOCK (splitmux);
2794 GST_SPLITMUX_LOCK (splitmux);
2797 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2798 ret = GST_FLOW_FLUSHING;
2802 if (ctx->is_reference) {
2803 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2804 /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2805 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2806 /* Wake up other input pads to collect this GOP */
2807 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2808 check_completed_gop (splitmux, ctx);
2809 } else if (splitmux->input_state ==
2810 SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2811 /* If we are waiting for a GOP to be completed (ie, for aux
2812 * pads to catch up), then this pad is complete, so check
2813 * if the whole GOP is.
2815 check_completed_gop (splitmux, ctx);
2817 GST_SPLITMUX_UNLOCK (splitmux);
2819 case GST_EVENT_GAP:{
2820 GstClockTime gap_ts;
2821 GstClockTimeDiff rtime;
2823 gst_event_parse_gap (event, &gap_ts, NULL);
2824 if (gap_ts == GST_CLOCK_TIME_NONE)
2827 GST_SPLITMUX_LOCK (splitmux);
2829 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2830 ret = GST_FLOW_FLUSHING;
2833 rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2835 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2836 GST_STIME_ARGS (rtime));
2838 if (ctx->is_reference && GST_CLOCK_STIME_IS_VALID (rtime)) {
2839 /* If this GAP event happens before the first fragment then
2840 * initialize the fragment start time here. */
2841 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)) {
2842 splitmux->fragment_start_time = rtime;
2843 GST_LOG_OBJECT (splitmux,
2844 "Fragment start time now %" GST_STIME_FORMAT,
2845 GST_STIME_ARGS (splitmux->fragment_start_time));
2847 /* Also take this as the first start time when starting up,
2848 * so that we start counting overflow from the first frame */
2849 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2850 splitmux->max_in_running_time = rtime;
2851 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2852 splitmux->max_in_running_time_dts = rtime;
2855 /* Similarly take it as fragment start PTS and GOP start time if
2856 * these are not set */
2857 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time_pts))
2858 splitmux->fragment_start_time_pts = rtime;
2860 if (g_queue_is_empty (&splitmux->pending_input_gops)) {
2861 InputGop *gop = g_slice_new0 (InputGop);
2863 gop->from_gap = TRUE;
2864 gop->start_time = rtime;
2865 gop->start_time_pts = rtime;
2867 g_queue_push_tail (&splitmux->pending_input_gops, gop);
2871 GST_SPLITMUX_UNLOCK (splitmux);
2877 return GST_PAD_PROBE_PASS;
2878 } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2879 switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2880 case GST_QUERY_ALLOCATION:
2881 return GST_PAD_PROBE_DROP;
2883 return GST_PAD_PROBE_PASS;
2887 buf = gst_pad_probe_info_get_buffer (info);
2888 buf_info = mq_stream_buf_new ();
2890 pts = GST_BUFFER_PTS (buf);
2891 dts = GST_BUFFER_DTS (buf);
2892 if (GST_BUFFER_PTS_IS_VALID (buf))
2893 ts = GST_BUFFER_PTS (buf);
2895 ts = GST_BUFFER_DTS (buf);
2897 GST_LOG_OBJECT (pad,
2898 "Buffer TS is %" GST_TIME_FORMAT " (PTS %" GST_TIME_FORMAT ", DTS %"
2899 GST_TIME_FORMAT ")", GST_TIME_ARGS (ts), GST_TIME_ARGS (pts),
2900 GST_TIME_ARGS (dts));
2902 GST_SPLITMUX_LOCK (splitmux);
2904 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2905 ret = GST_FLOW_FLUSHING;
2909 /* If this buffer has a timestamp, advance the input timestamp of the
2911 if (GST_CLOCK_TIME_IS_VALID (ts)) {
2912 running_time = my_segment_to_running_time (&ctx->in_segment, ts);
2914 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2915 GST_STIME_ARGS (running_time));
2917 /* in running time is always the maximum PTS (or DTS) that was observed so far */
2918 if (GST_CLOCK_STIME_IS_VALID (running_time)
2919 && running_time > ctx->in_running_time)
2920 ctx->in_running_time = running_time;
2922 running_time = ctx->in_running_time;
2925 if (GST_CLOCK_TIME_IS_VALID (pts))
2926 running_time_pts = my_segment_to_running_time (&ctx->in_segment, pts);
2928 running_time_pts = GST_CLOCK_STIME_NONE;
2930 if (GST_CLOCK_TIME_IS_VALID (dts))
2931 running_time_dts = my_segment_to_running_time (&ctx->in_segment, dts);
2933 running_time_dts = GST_CLOCK_STIME_NONE;
2935 /* Try to make sure we have a valid running time */
2936 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2937 ctx->in_running_time =
2938 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2941 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2942 GST_STIME_ARGS (ctx->in_running_time));
2944 buf_info->run_ts = ctx->in_running_time;
2945 buf_info->buf_size = gst_buffer_get_size (buf);
2946 buf_info->duration = GST_BUFFER_DURATION (buf);
2948 if (ctx->is_reference) {
2949 InputGop *gop = NULL;
2950 GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
2952 /* initialize fragment_start_time if it was not set yet (i.e. for the
2953 * first fragment), or otherwise set it to the minimum observed time */
2954 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)
2955 || splitmux->fragment_start_time > running_time) {
2956 if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time))
2957 splitmux->fragment_start_time_pts = running_time_pts;
2958 splitmux->fragment_start_time = running_time;
2960 GST_LOG_OBJECT (splitmux,
2961 "Fragment start time now %" GST_STIME_FORMAT " (initial PTS %"
2962 GST_STIME_FORMAT ")", GST_STIME_ARGS (splitmux->fragment_start_time),
2963 GST_STIME_ARGS (splitmux->fragment_start_time_pts));
2965 /* Also take this as the first start time when starting up,
2966 * so that we start counting overflow from the first frame */
2967 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)
2968 || splitmux->max_in_running_time < splitmux->fragment_start_time)
2969 splitmux->max_in_running_time = splitmux->fragment_start_time;
2971 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
2972 splitmux->max_in_running_time_dts = running_time_dts;
2975 video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
2977 splitmux->next_fragment_start_tc_time =
2978 calculate_next_max_timecode (splitmux, &tc_meta->tc,
2979 running_time, NULL);
2980 if (splitmux->tc_interval
2981 && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time))
2983 GST_WARNING_OBJECT (splitmux,
2984 "Couldn't calculate next fragment start time for timecode mode");
2986 #ifndef GST_DISABLE_GST_DEBUG
2990 tc_str = gst_video_time_code_to_string (&tc_meta->tc);
2991 GST_DEBUG_OBJECT (splitmux,
2992 "Initialize fragment start timecode %s, next fragment start timecode time %"
2993 GST_TIME_FORMAT, tc_str,
2994 GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
3002 /* First check if we're at the very first GOP and the tracking was created
3003 * from a GAP event. In that case don't start a new GOP on keyframes but
3004 * just updated it as needed */
3005 gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3007 if (!gop || (!gop->from_gap
3008 && !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))) {
3009 gop = g_slice_new0 (InputGop);
3011 gop->start_time = running_time;
3012 gop->start_time_pts = running_time_pts;
3014 GST_LOG_OBJECT (splitmux,
3015 "Next GOP start time now %" GST_STIME_FORMAT " (initial PTS %"
3016 GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3017 GST_STIME_ARGS (gop->start_time_pts));
3020 video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3022 #ifndef GST_DISABLE_GST_DEBUG
3026 tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3027 GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode %s", tc_str);
3033 g_queue_push_tail (&splitmux->pending_input_gops, gop);
3035 gop->from_gap = FALSE;
3037 if (!GST_CLOCK_STIME_IS_VALID (gop->start_time)
3038 || gop->start_time > running_time) {
3039 gop->start_time = running_time;
3041 GST_LOG_OBJECT (splitmux,
3042 "GOP start time updated now %" GST_STIME_FORMAT " (initial PTS %"
3043 GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
3044 GST_STIME_ARGS (gop->start_time_pts));
3047 video_time_code_replace (&gop->start_tc, &tc_meta->tc);
3049 #ifndef GST_DISABLE_GST_DEBUG
3053 tc_str = gst_video_time_code_to_string (&tc_meta->tc);
3054 GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode updated %s",
3063 /* Check whether we need to request next keyframe depending on
3064 * current running time */
3065 if (request_next_keyframe (splitmux, buf, running_time_dts) == FALSE) {
3066 GST_WARNING_OBJECT (splitmux,
3067 "Could not request a keyframe. Files may not split at the exact location they should");
3072 InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3075 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
3076 " total GOP bytes %" G_GUINT64_FORMAT ", total next GOP bytes %"
3077 G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts),
3078 gop->total_bytes, gop->total_bytes);
3087 switch (splitmux->input_state) {
3088 case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
3089 if (ctx->is_releasing) {
3090 /* The pad belonging to this context is being released */
3091 GST_WARNING_OBJECT (pad, "Pad is being released while the muxer is "
3092 "running. Data might not drain correctly");
3094 } else if (ctx->is_reference) {
3095 const InputGop *gop, *next_gop;
3097 /* This is the reference context. If it's a keyframe,
3098 * it marks the start of a new GOP and we should wait in
3099 * check_completed_gop before continuing, but either way
3100 * (keyframe or no, we'll pass this buffer through after
3101 * so set loop_again to FALSE */
3104 gop = g_queue_peek_head (&splitmux->pending_input_gops);
3105 g_assert (gop != NULL);
3106 next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
3108 if (ctx->in_running_time > splitmux->max_in_running_time)
3109 splitmux->max_in_running_time = ctx->in_running_time;
3110 if (running_time_dts > splitmux->max_in_running_time_dts)
3111 splitmux->max_in_running_time_dts = running_time_dts;
3113 GST_LOG_OBJECT (splitmux,
3114 "Max in running time now %" GST_STIME_FORMAT ", DTS %"
3115 GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_in_running_time),
3116 GST_STIME_ARGS (splitmux->max_in_running_time_dts));
3119 GST_DEBUG_OBJECT (pad, "Waiting for end of GOP");
3120 /* Allow other input pads to catch up to here too */
3121 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3125 if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
3126 GST_INFO_OBJECT (pad,
3127 "Have keyframe with running time %" GST_STIME_FORMAT,
3128 GST_STIME_ARGS (ctx->in_running_time));
3132 if (running_time_dts != GST_CLOCK_STIME_NONE
3133 && running_time_dts < next_gop->start_time_pts) {
3134 GST_DEBUG_OBJECT (splitmux,
3135 "Waiting until DTS (%" GST_STIME_FORMAT
3136 ") has passed next GOP start PTS (%" GST_STIME_FORMAT ")",
3137 GST_STIME_ARGS (running_time_dts),
3138 GST_STIME_ARGS (next_gop->start_time_pts));
3139 /* Allow other input pads to catch up to here too */
3140 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3144 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
3145 /* Wake up other input pads to collect this GOP */
3146 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3147 check_completed_gop (splitmux, ctx);
3149 /* Pass this buffer if the reference ctx is far enough ahead */
3150 if (ctx->in_running_time < splitmux->max_in_running_time) {
3155 /* We're still waiting for a keyframe on the reference pad, sleep */
3156 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
3157 GST_SPLITMUX_WAIT_INPUT (splitmux);
3158 GST_LOG_OBJECT (pad,
3159 "Done sleeping for GOP start input state now %d",
3160 splitmux->input_state);
3163 case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
3164 /* We're collecting a GOP, this is only ever called for non-reference
3165 * contexts as the reference context would be waiting inside
3166 * check_completed_gop() */
3168 g_assert (!ctx->is_reference);
3170 /* If we overran the target timestamp, it might be time to process
3171 * the GOP, otherwise bail out for more data. */
3172 GST_LOG_OBJECT (pad,
3173 "Checking TS %" GST_STIME_FORMAT " against max %"
3174 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
3175 GST_STIME_ARGS (splitmux->max_in_running_time));
3177 if (ctx->in_running_time < splitmux->max_in_running_time) {
3182 GST_LOG_OBJECT (pad,
3183 "Collected last packet of GOP. Checking other pads");
3184 check_completed_gop (splitmux, ctx);
3187 case SPLITMUX_INPUT_STATE_FINISHING_UP:
3197 if (keyframe && ctx->is_reference)
3198 splitmux->queued_keyframes++;
3199 buf_info->keyframe = keyframe;
3201 /* Update total input byte counter for overflow detect unless we're after
3203 if (splitmux->input_state != SPLITMUX_INPUT_STATE_FINISHING_UP
3204 && splitmux->input_state != SPLITMUX_INPUT_STATE_STOPPED) {
3205 InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
3207 /* We must have a GOP at this point */
3208 g_assert (gop != NULL);
3210 gop->total_bytes += buf_info->buf_size;
3211 if (ctx->is_reference) {
3212 gop->reference_bytes += buf_info->buf_size;
3216 /* Now add this buffer to the queue just before returning */
3217 g_queue_push_head (&ctx->queued_bufs, buf_info);
3219 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
3220 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
3222 GST_SPLITMUX_UNLOCK (splitmux);
3223 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3224 return GST_PAD_PROBE_PASS;
3227 GST_SPLITMUX_UNLOCK (splitmux);
3229 mq_stream_buf_free (buf_info);
3230 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
3231 return GST_PAD_PROBE_PASS;
3235 grow_blocked_queues (GstSplitMuxSink * splitmux)
3239 /* Scan other queues for full-ness and grow them */
3240 for (cur = g_list_first (splitmux->contexts);
3241 cur != NULL; cur = g_list_next (cur)) {
3242 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3244 guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
3246 g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
3247 GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
3249 if (cur_len >= cur_limit) {
3250 cur_limit = cur_len + 1;
3251 GST_DEBUG_OBJECT (tmpctx->q,
3252 "Queue overflowed and needs enlarging. Growing to %u buffers",
3254 g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
3260 handle_q_underrun (GstElement * q, gpointer user_data)
3262 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3263 GstSplitMuxSink *splitmux = ctx->splitmux;
3265 GST_SPLITMUX_LOCK (splitmux);
3266 GST_DEBUG_OBJECT (q,
3267 "Queue reported underrun with %d keyframes and %d cmds enqueued",
3268 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3269 grow_blocked_queues (splitmux);
3270 GST_SPLITMUX_UNLOCK (splitmux);
3274 handle_q_overrun (GstElement * q, gpointer user_data)
3276 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3277 GstSplitMuxSink *splitmux = ctx->splitmux;
3278 gboolean allow_grow = FALSE;
3280 GST_SPLITMUX_LOCK (splitmux);
3281 GST_DEBUG_OBJECT (q,
3282 "Queue reported overrun with %d keyframes and %d cmds enqueued",
3283 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3285 if (splitmux->queued_keyframes < 2) {
3286 /* Less than a full GOP queued, grow the queue */
3288 } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3291 /* If another queue is starved, grow */
3293 for (cur = g_list_first (splitmux->contexts);
3294 cur != NULL; cur = g_list_next (cur)) {
3295 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3296 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3301 GST_SPLITMUX_UNLOCK (splitmux);
3306 g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3309 GST_DEBUG_OBJECT (q,
3310 "Queue overflowed and needs enlarging. Growing to %u buffers",
3313 g_object_set (q, "max-size-buffers", cur_limit, NULL);
3317 /* Called with SPLITMUX lock held */
3318 static const gchar *
3319 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3321 const gchar *ret = NULL;
3323 if (splitmux->muxerpad_map == NULL)
3326 if (sinkpad_name == NULL) {
3327 GST_WARNING_OBJECT (splitmux,
3328 "Can't look up request pad in pad map without providing a pad name");
3332 ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3334 GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3336 return g_strdup (ret);
3343 gst_splitmux_sink_request_new_pad (GstElement * element,
3344 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3346 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3347 GstPadTemplate *mux_template = NULL;
3348 GstPad *ret = NULL, *muxpad = NULL;
3350 GstPad *q_sink = NULL, *q_src = NULL;
3351 gchar *gname, *qname;
3352 gboolean is_primary_video = FALSE, is_video = FALSE,
3353 muxer_is_requestpad = FALSE;
3355 const gchar *muxer_padname = NULL;
3357 GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3359 GST_SPLITMUX_LOCK (splitmux);
3360 if (!create_muxer (splitmux))
3362 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3364 if (g_str_equal (templ->name_template, "video") ||
3365 g_str_has_prefix (templ->name_template, "video_aux_")) {
3366 is_primary_video = g_str_equal (templ->name_template, "video");
3367 if (is_primary_video && splitmux->have_video)
3368 goto already_have_video;
3372 /* See if there's a pad map and it lists this pad */
3373 muxer_padname = lookup_muxer_pad (splitmux, name);
3375 if (muxer_padname == NULL) {
3377 /* FIXME: Look for a pad template with matching caps, rather than by name */
3378 GST_DEBUG_OBJECT (element,
3379 "searching for pad-template with name 'video_%%u'");
3381 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3382 (splitmux->muxer), "video_%u");
3384 /* Fallback to find sink pad templates named 'video' (flvmux) */
3385 if (!mux_template) {
3386 GST_DEBUG_OBJECT (element,
3387 "searching for pad-template with name 'video'");
3389 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3390 (splitmux->muxer), "video");
3394 GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3395 templ->name_template);
3397 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3398 (splitmux->muxer), templ->name_template);
3400 /* Fallback to find sink pad templates named 'audio' (flvmux) */
3401 if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3402 GST_DEBUG_OBJECT (element,
3403 "searching for pad-template with name 'audio'");
3405 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3406 (splitmux->muxer), "audio");
3411 if (mux_template == NULL) {
3412 GST_DEBUG_OBJECT (element,
3413 "searching for pad-template with name 'sink_%%d'");
3415 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3416 (splitmux->muxer), "sink_%d");
3419 if (mux_template == NULL) {
3420 GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3422 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3423 (splitmux->muxer), "sink");
3427 if (mux_template == NULL) {
3428 GST_ERROR_OBJECT (element,
3429 "unable to find a suitable sink pad-template on the muxer");
3432 GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3433 mux_template->name_template);
3435 if (mux_template->presence == GST_PAD_REQUEST) {
3436 GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3439 gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3440 muxer_is_requestpad = TRUE;
3441 } else if (mux_template->presence == GST_PAD_ALWAYS) {
3442 GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3445 gst_element_get_static_pad (splitmux->muxer,
3446 mux_template->name_template);
3448 GST_ERROR_OBJECT (element,
3449 "unexpected pad presence %d", mux_template->presence);
3453 /* Have a muxer pad name */
3454 if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3456 gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3457 muxer_is_requestpad = TRUE;
3459 g_free ((gchar *) muxer_padname);
3460 muxer_padname = NULL;
3463 /* One way or another, we must have a muxer pad by now */
3467 if (is_primary_video)
3468 gname = g_strdup ("video");
3469 else if (name == NULL)
3470 gname = gst_pad_get_name (muxpad);
3472 gname = g_strdup (name);
3474 qname = g_strdup_printf ("queue_%s", gname);
3475 if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3481 gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3483 g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3484 "max-size-buffers", 5, NULL);
3486 q_sink = gst_element_get_static_pad (q, "sink");
3487 q_src = gst_element_get_static_pad (q, "src");
3489 if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3490 if (muxer_is_requestpad)
3491 gst_element_release_request_pad (splitmux->muxer, muxpad);
3492 gst_object_unref (GST_OBJECT (muxpad));
3496 gst_object_unref (GST_OBJECT (muxpad));
3498 ctx = mq_stream_ctx_new (splitmux);
3499 /* Context holds a ref: */
3500 ctx->q = gst_object_ref (q);
3501 ctx->srcpad = q_src;
3502 ctx->sinkpad = q_sink;
3504 g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3505 g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3507 ctx->src_pad_block_id =
3508 gst_pad_add_probe (q_src,
3509 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3510 (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3511 if (is_primary_video && splitmux->reference_ctx != NULL) {
3512 splitmux->reference_ctx->is_reference = FALSE;
3513 splitmux->reference_ctx = NULL;
3515 if (splitmux->reference_ctx == NULL) {
3516 splitmux->reference_ctx = ctx;
3517 ctx->is_reference = TRUE;
3520 ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3521 g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3523 ctx->sink_pad_block_id =
3524 gst_pad_add_probe (q_sink,
3525 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3526 GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3527 (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3529 GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3530 " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3532 splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3536 if (is_primary_video)
3537 splitmux->have_video = TRUE;
3539 gst_pad_set_active (ret, TRUE);
3540 gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3542 GST_SPLITMUX_UNLOCK (splitmux);
3546 GST_SPLITMUX_UNLOCK (splitmux);
3549 gst_object_unref (q_sink);
3551 gst_object_unref (q_src);
3554 GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3555 GST_SPLITMUX_UNLOCK (splitmux);
3560 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3562 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3563 GstPad *muxpad = NULL;
3565 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3567 GST_SPLITMUX_LOCK (splitmux);
3569 if (splitmux->muxer == NULL)
3570 goto fail; /* Elements don't exist yet - nothing to release */
3572 GST_INFO_OBJECT (pad, "releasing request pad");
3574 muxpad = gst_pad_get_peer (ctx->srcpad);
3576 /* Remove the context from our consideration */
3577 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3579 ctx->is_releasing = TRUE;
3580 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3582 GST_SPLITMUX_UNLOCK (splitmux);
3584 if (ctx->sink_pad_block_id) {
3585 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3586 gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3589 if (ctx->src_pad_block_id)
3590 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3592 /* Wait for the pad to be free */
3593 GST_PAD_STREAM_LOCK (pad);
3594 GST_SPLITMUX_LOCK (splitmux);
3595 GST_PAD_STREAM_UNLOCK (pad);
3597 /* Can release the context now */
3598 mq_stream_ctx_free (ctx);
3599 if (ctx == splitmux->reference_ctx)
3600 splitmux->reference_ctx = NULL;
3602 /* Release and free the muxer input */
3604 gst_element_release_request_pad (splitmux->muxer, muxpad);
3605 gst_object_unref (muxpad);
3608 if (GST_PAD_PAD_TEMPLATE (pad) &&
3609 g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3611 splitmux->have_video = FALSE;
3613 gst_element_remove_pad (element, pad);
3615 /* Reset the internal elements only after all request pads are released */
3616 if (splitmux->contexts == NULL)
3617 gst_splitmux_reset_elements (splitmux);
3619 /* Wake up other input streams to check if the completion conditions have
3621 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3624 GST_SPLITMUX_UNLOCK (splitmux);
3628 create_element (GstSplitMuxSink * splitmux,
3629 const gchar * factory, const gchar * name, gboolean locked)
3631 GstElement *ret = gst_element_factory_make (factory, name);
3633 g_warning ("Failed to create %s - splitmuxsink will not work", name);
3638 /* Ensure the sink starts in locked state and NULL - it will be changed
3639 * by the filename setting code */
3640 gst_element_set_locked_state (ret, TRUE);
3641 gst_element_set_state (ret, GST_STATE_NULL);
3644 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3645 g_warning ("Could not add %s element - splitmuxsink will not work", name);
3646 gst_object_unref (ret);
3654 create_muxer (GstSplitMuxSink * splitmux)
3656 /* Create internal elements */
3657 if (splitmux->muxer == NULL) {
3658 GstElement *provided_muxer = NULL;
3660 GST_OBJECT_LOCK (splitmux);
3661 if (splitmux->provided_muxer != NULL)
3662 provided_muxer = gst_object_ref (splitmux->provided_muxer);
3663 GST_OBJECT_UNLOCK (splitmux);
3665 if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3666 (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3667 if ((splitmux->muxer =
3668 create_element (splitmux,
3669 splitmux->muxer_factory ? splitmux->
3670 muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3672 } else if (splitmux->async_finalize) {
3673 if ((splitmux->muxer =
3674 create_element (splitmux, splitmux->muxer_factory, "muxer",
3677 if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3678 gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3679 splitmux->muxer_preset);
3680 if (splitmux->muxer_properties)
3681 gst_structure_foreach (splitmux->muxer_properties,
3682 _set_property_from_structure, splitmux->muxer);
3684 /* Ensure it's not in locked state (we might be reusing an old element) */
3685 gst_element_set_locked_state (provided_muxer, FALSE);
3686 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3687 g_warning ("Could not add muxer element - splitmuxsink will not work");
3688 gst_object_unref (provided_muxer);
3692 splitmux->muxer = provided_muxer;
3693 gst_object_unref (provided_muxer);
3696 if (splitmux->use_robust_muxing) {
3697 update_muxer_properties (splitmux);
3707 find_sink (GstElement * e)
3709 GstElement *res = NULL;
3711 gboolean done = FALSE;
3712 GValue data = { 0, };
3714 if (!GST_IS_BIN (e))
3717 if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3720 iter = gst_bin_iterate_sinks (GST_BIN (e));
3722 switch (gst_iterator_next (iter, &data)) {
3723 case GST_ITERATOR_OK:
3725 GstElement *child = g_value_get_object (&data);
3726 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3727 "location") != NULL) {
3731 g_value_reset (&data);
3734 case GST_ITERATOR_RESYNC:
3735 gst_iterator_resync (iter);
3737 case GST_ITERATOR_DONE:
3740 case GST_ITERATOR_ERROR:
3741 g_assert_not_reached ();
3745 g_value_unset (&data);
3746 gst_iterator_free (iter);
3752 create_sink (GstSplitMuxSink * splitmux)
3754 GstElement *provided_sink = NULL;
3756 if (splitmux->active_sink == NULL) {
3758 GST_OBJECT_LOCK (splitmux);
3759 if (splitmux->provided_sink != NULL)
3760 provided_sink = gst_object_ref (splitmux->provided_sink);
3761 GST_OBJECT_UNLOCK (splitmux);
3763 if ((!splitmux->async_finalize && provided_sink == NULL) ||
3764 (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3765 if ((splitmux->sink =
3766 create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3768 splitmux->active_sink = splitmux->sink;
3769 } else if (splitmux->async_finalize) {
3770 if ((splitmux->sink =
3771 create_element (splitmux, splitmux->sink_factory, "sink",
3774 if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3775 gst_preset_load_preset (GST_PRESET (splitmux->sink),
3776 splitmux->sink_preset);
3777 if (splitmux->sink_properties)
3778 gst_structure_foreach (splitmux->sink_properties,
3779 _set_property_from_structure, splitmux->sink);
3780 splitmux->active_sink = splitmux->sink;
3782 /* Ensure the sink starts in locked state and NULL - it will be changed
3783 * by the filename setting code */
3784 gst_element_set_locked_state (provided_sink, TRUE);
3785 gst_element_set_state (provided_sink, GST_STATE_NULL);
3786 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3787 g_warning ("Could not add sink elements - splitmuxsink will not work");
3788 gst_object_unref (provided_sink);
3792 splitmux->active_sink = provided_sink;
3794 /* The bin holds a ref now, we can drop our tmp ref */
3795 gst_object_unref (provided_sink);
3797 /* Find the sink element */
3798 splitmux->sink = find_sink (splitmux->active_sink);
3799 if (splitmux->sink == NULL) {
3801 ("Could not locate sink element in provided sink - splitmuxsink will not work");
3807 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3809 /* async child elements are causing state change races and weird
3810 * failures, so let's try and turn that off */
3811 g_object_set (splitmux->sink, "async", FALSE, NULL);
3815 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3816 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3827 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3830 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3832 gchar *fname = NULL;
3836 gst_splitmux_sink_ensure_max_files (splitmux);
3838 if (ctx->cur_out_buffer == NULL) {
3839 GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3842 caps = gst_pad_get_current_caps (ctx->srcpad);
3843 sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3844 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3845 splitmux->fragment_id, sample, &fname);
3846 gst_sample_unref (sample);
3848 gst_caps_unref (caps);
3850 if (fname == NULL) {
3851 /* Fallback to the old signal if the new one returned nothing */
3852 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3853 splitmux->fragment_id, &fname);
3857 fname = splitmux->location ?
3858 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3861 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3862 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3863 "location") != NULL)
3864 g_object_set (splitmux->sink, "location", fname, NULL);
3868 splitmux->fragment_id++;
3871 /* called with GST_SPLITMUX_LOCK */
3873 do_async_start (GstSplitMuxSink * splitmux)
3875 GstMessage *message;
3877 if (!splitmux->need_async_start) {
3878 GST_INFO_OBJECT (splitmux, "no async_start needed");
3882 splitmux->async_pending = TRUE;
3884 GST_INFO_OBJECT (splitmux, "Sending async_start message");
3885 message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3887 GST_SPLITMUX_UNLOCK (splitmux);
3888 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3889 (splitmux), message);
3890 GST_SPLITMUX_LOCK (splitmux);
3893 /* called with GST_SPLITMUX_LOCK */
3895 do_async_done (GstSplitMuxSink * splitmux)
3897 GstMessage *message;
3899 if (splitmux->async_pending) {
3900 GST_INFO_OBJECT (splitmux, "Sending async_done message");
3901 splitmux->async_pending = FALSE;
3902 GST_SPLITMUX_UNLOCK (splitmux);
3905 gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3906 GST_CLOCK_TIME_NONE);
3907 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3908 (splitmux), message);
3909 GST_SPLITMUX_LOCK (splitmux);
3912 splitmux->need_async_start = FALSE;
3916 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3918 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3919 splitmux->max_in_running_time_dts = GST_CLOCK_STIME_NONE;
3921 splitmux->fragment_start_time = GST_CLOCK_STIME_NONE;
3922 splitmux->fragment_start_time_pts = GST_CLOCK_STIME_NONE;
3923 g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
3925 g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
3926 g_queue_clear (&splitmux->pending_input_gops);
3928 splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
3929 splitmux->fragment_total_bytes = 0;
3930 splitmux->fragment_reference_bytes = 0;
3931 splitmux->muxed_out_bytes = 0;
3932 splitmux->ready_for_output = FALSE;
3934 g_atomic_int_set (&(splitmux->split_requested), FALSE);
3935 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3937 splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
3938 gst_queue_array_clear (splitmux->times_to_split);
3940 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
3941 splitmux->queued_keyframes = 0;
3943 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
3944 g_queue_clear (&splitmux->out_cmd_q);
3947 static GstStateChangeReturn
3948 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3950 GstStateChangeReturn ret;
3951 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3953 switch (transition) {
3954 case GST_STATE_CHANGE_NULL_TO_READY:{
3955 GST_SPLITMUX_LOCK (splitmux);
3956 if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3957 ret = GST_STATE_CHANGE_FAILURE;
3958 GST_SPLITMUX_UNLOCK (splitmux);
3961 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3962 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3963 GST_SPLITMUX_UNLOCK (splitmux);
3964 splitmux->fragment_id = splitmux->start_index;
3967 case GST_STATE_CHANGE_READY_TO_PAUSED:{
3968 GST_SPLITMUX_LOCK (splitmux);
3969 /* Make sure contexts and tracking times are cleared, in case we're being reused */
3970 gst_splitmux_sink_reset (splitmux);
3971 /* Start by collecting one input on each pad */
3972 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3973 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3975 GST_SPLITMUX_UNLOCK (splitmux);
3977 GST_SPLITMUX_STATE_LOCK (splitmux);
3978 splitmux->shutdown = FALSE;
3979 GST_SPLITMUX_STATE_UNLOCK (splitmux);
3982 case GST_STATE_CHANGE_PAUSED_TO_READY:
3983 case GST_STATE_CHANGE_READY_TO_READY:
3984 g_atomic_int_set (&(splitmux->split_requested), FALSE);
3985 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3987 case GST_STATE_CHANGE_READY_TO_NULL:
3988 GST_SPLITMUX_STATE_LOCK (splitmux);
3989 splitmux->shutdown = TRUE;
3990 GST_SPLITMUX_STATE_UNLOCK (splitmux);
3992 GST_SPLITMUX_LOCK (splitmux);
3993 gst_splitmux_sink_reset (splitmux);
3994 splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3995 splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3996 /* Wake up any blocked threads */
3997 GST_LOG_OBJECT (splitmux,
3998 "State change -> NULL or READY. Waking threads");
3999 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
4000 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
4001 GST_SPLITMUX_UNLOCK (splitmux);
4007 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
4008 if (ret == GST_STATE_CHANGE_FAILURE)
4011 switch (transition) {
4012 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4013 splitmux->need_async_start = TRUE;
4015 case GST_STATE_CHANGE_READY_TO_PAUSED:{
4016 /* Change state async, because our child sink might not
4017 * be ready to do that for us yet if it's state is still locked */
4019 splitmux->need_async_start = TRUE;
4020 /* we want to go async to PAUSED until we managed to configure and add the
4022 GST_SPLITMUX_LOCK (splitmux);
4023 do_async_start (splitmux);
4024 GST_SPLITMUX_UNLOCK (splitmux);
4025 ret = GST_STATE_CHANGE_ASYNC;
4028 case GST_STATE_CHANGE_READY_TO_NULL:
4029 GST_SPLITMUX_LOCK (splitmux);
4030 splitmux->fragment_id = 0;
4031 /* Reset internal elements only if no pad contexts are using them */
4032 if (splitmux->contexts == NULL)
4033 gst_splitmux_reset_elements (splitmux);
4034 do_async_done (splitmux);
4035 GST_SPLITMUX_UNLOCK (splitmux);
4044 if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
4045 /* Cleanup elements on failed transition out of NULL */
4046 gst_splitmux_reset_elements (splitmux);
4047 GST_SPLITMUX_LOCK (splitmux);
4048 do_async_done (splitmux);
4049 GST_SPLITMUX_UNLOCK (splitmux);
4051 if (transition == GST_STATE_CHANGE_READY_TO_READY) {
4052 /* READY to READY transition only happens when we're already
4053 * in READY state, but a child element is in NULL, which
4054 * happens when there's an error changing the state of the sink.
4055 * We need to make sure not to fail the state transition, or
4056 * the core won't transition us back to NULL successfully */
4057 ret = GST_STATE_CHANGE_SUCCESS;
4063 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
4065 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
4066 splitmux->fragment_id = 0;
4071 split_now (GstSplitMuxSink * splitmux)
4073 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
4077 split_after (GstSplitMuxSink * splitmux)
4079 g_atomic_int_set (&(splitmux->split_requested), TRUE);
4083 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
4085 gboolean send_keyframe_requests;
4087 GST_SPLITMUX_LOCK (splitmux);
4088 gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
4089 send_keyframe_requests = splitmux->send_keyframe_requests;
4090 GST_SPLITMUX_UNLOCK (splitmux);
4092 if (send_keyframe_requests) {
4094 gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
4095 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
4096 GST_TIME_ARGS (split_time));
4097 if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
4098 GST_WARNING_OBJECT (splitmux,
4099 "Could not request keyframe at %" GST_TIME_FORMAT,
4100 GST_TIME_ARGS (split_time));