1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @title: splitmuxsink
23 * @short_description: Muxer wrapper for splitting output stream by size or time
25 * This element wraps a muxer and a sink, and starts a new file when the mux
26 * contents are about to cross a threshold of maximum size of maximum time,
27 * splitting at video keyframe boundaries. Exactly one input video stream
28 * can be muxed, with as many accompanying audio and subtitle streams as
31 * By default, it uses mp4mux and filesink, but they can be changed via
32 * the 'muxer' and 'sink' properties.
34 * The minimum file size is 1 GOP, however - so limits may be overrun if the
35 * distance between any 2 keyframes is larger than the limits.
37 * If a video stream is available, the splitting process is driven by the video
38 * stream contents, and the video stream must contain closed GOPs for the output
39 * file parts to be played individually correctly. In the absence of a video
40 * stream, the first available stream is used as reference for synchronization.
42 * In the async-finalize mode, when the threshold is crossed, the old muxer
43 * and sink is disconnected from the pipeline and left to finish the file
44 * asynchronously, and a new muxer and sink is created to continue with the
45 * next fragment. For that reason, instead of muxer and sink objects, the
46 * muxer-factory and sink-factory properties are used to construct the new
47 * objects, together with muxer-properties and sink-properties.
49 * ## Example pipelines
51 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
53 * Records a video stream captured from a v4l2 device and muxes it into
54 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55 * and 1MB maximum size.
58 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
60 * Records a video stream captured from a v4l2 device and muxer it into
61 * streamable Matroska files, splitting as needed to limit size/duration to 10
62 * seconds. Each file will finalize asynchronously.
65 * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
67 * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97 GstClockTime split_time);
106 PROP_MAX_SIZE_TIMECODE,
107 PROP_SEND_KEYFRAME_REQUESTS,
110 PROP_USE_ROBUST_MUXING,
111 PROP_ALIGNMENT_THRESHOLD,
118 PROP_MUXER_PROPERTIES,
121 PROP_SINK_PROPERTIES,
125 #define DEFAULT_MAX_SIZE_TIME 0
126 #define DEFAULT_MAX_SIZE_BYTES 0
127 #define DEFAULT_MAX_FILES 0
128 #define DEFAULT_MUXER_OVERHEAD 0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
138 typedef struct _AsyncEosHelper
146 SIGNAL_FORMAT_LOCATION,
147 SIGNAL_FORMAT_LOCATION_FULL,
150 SIGNAL_SPLIT_AT_RUNNING_TIME,
156 static guint signals[SIGNAL_LAST];
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
162 GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
167 GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
172 GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
177 GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
182 GST_STATIC_CAPS_ANY);
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188 * to forward an incoming EOS message, but we cannot rely on the state of the
189 * splitmux anymore, so we set this qdata on the sink instead.
190 * The muxer and sink must be destroyed after both of these things have
192 * 1) The EOS message has been sent when the fragment is ending
193 * 2) The muxer has been unlinked and relinked
194 * Therefore, EOS_FROM_US can have these two values:
195 * 0: EOS was not requested from us. Forward the message. The muxer and the
196 * sink will be destroyed together with the rest of the bin.
197 * 1: EOS was requested from us, but the other of the two tasks hasn't
198 * finished. Set EOS_FROM_US to 2 and do your stuff.
199 * 2: EOS was requested from us and the other of the two tasks has finished.
200 * Now we can destroy the muxer and the sink.
206 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207 EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208 RUNNING_TIME = g_quark_from_static_string ("running-time");
209 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210 "Split File Muxing Sink");
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217 GST_TYPE_SPLITMUX_SINK);
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222 const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224 GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233 element, GstStateChange transition);
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244 const gchar * factory, const gchar * name, gboolean locked);
246 static void do_async_done (GstSplitMuxSink * splitmux);
247 static void gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux);
250 mq_stream_buf_new (void)
252 return g_slice_new0 (MqStreamBuf);
256 mq_stream_buf_free (MqStreamBuf * data)
258 g_slice_free (MqStreamBuf, data);
261 static SplitMuxOutputCommand *
262 out_cmd_buf_new (void)
264 return g_slice_new0 (SplitMuxOutputCommand);
268 out_cmd_buf_free (SplitMuxOutputCommand * data)
270 g_slice_free (SplitMuxOutputCommand, data);
274 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
276 GObjectClass *gobject_class = (GObjectClass *) klass;
277 GstElementClass *gstelement_class = (GstElementClass *) klass;
278 GstBinClass *gstbin_class = (GstBinClass *) klass;
280 gobject_class->set_property = gst_splitmux_sink_set_property;
281 gobject_class->get_property = gst_splitmux_sink_get_property;
282 gobject_class->dispose = gst_splitmux_sink_dispose;
283 gobject_class->finalize = gst_splitmux_sink_finalize;
285 gst_element_class_set_static_metadata (gstelement_class,
286 "Split Muxing Bin", "Generic/Bin/Muxer",
287 "Convenience bin that muxes incoming streams into multiple time/size limited files",
288 "Jan Schmidt <jan@centricular.com>");
290 gst_element_class_add_static_pad_template (gstelement_class,
291 &video_sink_template);
292 gst_element_class_add_static_pad_template (gstelement_class,
293 &video_aux_sink_template);
294 gst_element_class_add_static_pad_template (gstelement_class,
295 &audio_sink_template);
296 gst_element_class_add_static_pad_template (gstelement_class,
297 &subtitle_sink_template);
298 gst_element_class_add_static_pad_template (gstelement_class,
299 &caption_sink_template);
301 gstelement_class->change_state =
302 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
303 gstelement_class->request_new_pad =
304 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
305 gstelement_class->release_pad =
306 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
308 gstbin_class->handle_message = bus_handler;
310 g_object_class_install_property (gobject_class, PROP_LOCATION,
311 g_param_spec_string ("location", "File Output Pattern",
312 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
313 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
314 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
315 g_param_spec_double ("mux-overhead", "Muxing Overhead",
316 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
317 DEFAULT_MUXER_OVERHEAD,
318 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
320 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
321 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
322 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
323 DEFAULT_MAX_SIZE_TIME,
324 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
325 G_PARAM_STATIC_STRINGS));
326 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
327 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
328 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
329 DEFAULT_MAX_SIZE_BYTES,
330 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
331 G_PARAM_STATIC_STRINGS));
332 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
333 g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
334 "Maximum difference in timecode between first and last frame. "
335 "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
336 "Will only be effective if a timecode track is present.", NULL,
337 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
338 G_PARAM_STATIC_STRINGS));
339 g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
340 g_param_spec_boolean ("send-keyframe-requests",
341 "Request keyframes at max-size-time",
342 "Request a keyframe every max-size-time ns to try splitting at that point. "
343 "Needs max-size-bytes to be 0 in order to be effective.",
344 DEFAULT_SEND_KEYFRAME_REQUESTS,
345 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
346 G_PARAM_STATIC_STRINGS));
347 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
348 g_param_spec_uint ("max-files", "Max files",
349 "Maximum number of files to keep on disk. Once the maximum is reached,"
350 "old files start to be deleted to make room for new ones.", 0,
351 G_MAXUINT, DEFAULT_MAX_FILES,
352 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353 g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
354 g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
355 "Allow non-reference streams to be that many ns before the reference"
356 " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
357 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
358 G_PARAM_STATIC_STRINGS));
360 g_object_class_install_property (gobject_class, PROP_MUXER,
361 g_param_spec_object ("muxer", "Muxer",
362 "The muxer element to use (NULL = default mp4mux). "
363 "Valid only for async-finalize = FALSE",
364 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
365 g_object_class_install_property (gobject_class, PROP_SINK,
366 g_param_spec_object ("sink", "Sink",
367 "The sink element (or element chain) to use (NULL = default filesink). "
368 "Valid only for async-finalize = FALSE",
369 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
371 g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
372 g_param_spec_boolean ("use-robust-muxing",
373 "Support robust-muxing mode of some muxers",
374 "Check if muxers support robust muxing via the reserved-max-duration and "
375 "reserved-duration-remaining properties and use them if so. "
376 "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
377 " create new fragments if the reserved header space is about to overflow. "
378 "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
379 "manually by the app to a non-zero value for robust muxing to have an effect.",
380 DEFAULT_USE_ROBUST_MUXING,
381 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383 g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
384 g_param_spec_boolean ("reset-muxer",
386 "Reset the muxer after each segment. Disabling this will not work for most muxers.",
387 DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
389 g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
390 g_param_spec_boolean ("async-finalize",
391 "Finalize fragments asynchronously",
392 "Finalize each fragment asynchronously and start a new one",
393 DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
394 g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
395 g_param_spec_string ("muxer-factory", "Muxer factory",
396 "The muxer element factory to use (default = mp4mux). "
397 "Valid only for async-finalize = TRUE",
398 "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
400 * GstSplitMuxSink:muxer-preset
402 * An optional #GstPreset name to use for the muxer. This only has an effect
403 * in `async-finalize=TRUE` mode.
407 g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
408 g_param_spec_string ("muxer-preset", "Muxer preset",
409 "The muxer preset to use. "
410 "Valid only for async-finalize = TRUE",
411 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
412 g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
413 g_param_spec_boxed ("muxer-properties", "Muxer properties",
414 "The muxer element properties to use. "
415 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
416 "Valid only for async-finalize = TRUE",
417 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
418 g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
419 g_param_spec_string ("sink-factory", "Sink factory",
420 "The sink element factory to use (default = filesink). "
421 "Valid only for async-finalize = TRUE",
422 "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
424 * GstSplitMuxSink:sink-preset
426 * An optional #GstPreset name to use for the sink. This only has an effect
427 * in `async-finalize=TRUE` mode.
431 g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
432 g_param_spec_string ("sink-preset", "Sink preset",
433 "The sink preset to use. "
434 "Valid only for async-finalize = TRUE",
435 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
436 g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
437 g_param_spec_boxed ("sink-properties", "Sink properties",
438 "The sink element properties to use. "
439 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
440 "Valid only for async-finalize = TRUE",
441 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
442 g_object_class_install_property (gobject_class, PROP_START_INDEX,
443 g_param_spec_int ("start-index", "Start Index",
444 "Start value of fragment index.",
445 0, G_MAXINT, DEFAULT_START_INDEX,
446 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
449 * GstSplitMuxSink::muxer-pad-map
451 * An optional GstStructure that provides a map from splitmuxsink sinkpad
452 * names to muxer pad names they should feed. Splitmuxsink has some default
453 * mapping behaviour to link video to video pads and audio to audio pads
454 * that usually works fine. This property is useful if you need to ensure
455 * a particular mapping to muxed streams.
457 * The GstStructure contains string fields like so:
458 * splitmuxsink muxer-pad-map=x-pad-map,video=video_1
462 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
463 g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
464 "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
466 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
469 * GstSplitMuxSink::format-location:
470 * @splitmux: the #GstSplitMuxSink
471 * @fragment_id: the sequence number of the file to be created
473 * Returns: the location to be used for the next output file. This must be
474 * a newly-allocated string which will be freed with g_free() by the
475 * splitmuxsink element when it no longer needs it, so use g_strdup() or
476 * g_strdup_printf() or similar functions to allocate it.
478 signals[SIGNAL_FORMAT_LOCATION] =
479 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
480 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
483 * GstSplitMuxSink::format-location-full:
484 * @splitmux: the #GstSplitMuxSink
485 * @fragment_id: the sequence number of the file to be created
486 * @first_sample: A #GstSample containing the first buffer
487 * from the reference stream in the new file
489 * Returns: the location to be used for the next output file. This must be
490 * a newly-allocated string which will be freed with g_free() by the
491 * splitmuxsink element when it no longer needs it, so use g_strdup() or
492 * g_strdup_printf() or similar functions to allocate it.
496 signals[SIGNAL_FORMAT_LOCATION_FULL] =
497 g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
498 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
502 * GstSplitMuxSink::split-now:
503 * @splitmux: the #GstSplitMuxSink
505 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
506 * The current GOP will be output to the new file.
510 signals[SIGNAL_SPLIT_NOW] =
511 g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
512 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
513 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
517 * GstSplitMuxSink::split-after:
518 * @splitmux: the #GstSplitMuxSink
520 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
521 * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
525 signals[SIGNAL_SPLIT_AFTER] =
526 g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
527 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
528 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
532 * GstSplitMuxSink::split-at-running-time:
533 * @splitmux: the #GstSplitMuxSink
535 * When called by the user, this action signal splits the video file (and
536 * begins a new one) as soon as the given running time is reached. If this
537 * action signal is called multiple times, running times are queued up and
538 * processed in the order they were given.
540 * Note that this is prone to race conditions, where said running time is
541 * reached and surpassed before we had a chance to split. The file will
542 * still split immediately, but in order to make sure that the split doesn't
543 * happen too late, it is recommended to call this action signal from
544 * something that will prevent further buffers from flowing into
545 * splitmuxsink before the split is completed, such as a pad probe before
551 signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
552 g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
553 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
554 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
555 NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
558 * GstSplitMuxSink::muxer-added:
559 * @splitmux: the #GstSplitMuxSink
560 * @muxer: the newly added muxer element
564 signals[SIGNAL_MUXER_ADDED] =
565 g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
566 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
569 * GstSplitMuxSink::sink-added:
570 * @splitmux: the #GstSplitMuxSink
571 * @sink: the newly added sink element
575 signals[SIGNAL_SINK_ADDED] =
576 g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
577 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
579 klass->split_now = split_now;
580 klass->split_after = split_after;
581 klass->split_at_running_time = split_at_running_time;
585 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
587 g_mutex_init (&splitmux->lock);
588 g_mutex_init (&splitmux->state_lock);
589 g_cond_init (&splitmux->input_cond);
590 g_cond_init (&splitmux->output_cond);
591 g_queue_init (&splitmux->out_cmd_q);
593 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
594 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
595 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
596 splitmux->max_files = DEFAULT_MAX_FILES;
597 splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
598 splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
599 splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
600 splitmux->reset_muxer = DEFAULT_RESET_MUXER;
602 splitmux->threshold_timecode_str = NULL;
603 gst_splitmux_reset_timecode (splitmux);
605 splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
606 splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
607 splitmux->muxer_properties = NULL;
608 splitmux->sink_factory = g_strdup (DEFAULT_SINK);
609 splitmux->sink_properties = NULL;
611 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
612 splitmux->split_requested = FALSE;
613 splitmux->do_split_next_gop = FALSE;
614 splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
615 splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
619 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
621 if (splitmux->muxer) {
622 gst_element_set_locked_state (splitmux->muxer, TRUE);
623 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
624 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
626 if (splitmux->active_sink) {
627 gst_element_set_locked_state (splitmux->active_sink, TRUE);
628 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
629 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
632 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
636 gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux)
638 g_clear_pointer (&splitmux->in_tc, gst_video_time_code_free);
639 g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
640 g_clear_pointer (&splitmux->gop_start_tc, gst_video_time_code_free);
641 splitmux->next_fragment_start_tc_time = GST_CLOCK_TIME_NONE;
645 gst_splitmux_sink_dispose (GObject * object)
647 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
649 /* Calling parent dispose invalidates all child pointers */
650 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
652 G_OBJECT_CLASS (parent_class)->dispose (object);
656 gst_splitmux_sink_finalize (GObject * object)
658 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
659 g_cond_clear (&splitmux->input_cond);
660 g_cond_clear (&splitmux->output_cond);
661 g_mutex_clear (&splitmux->lock);
662 g_mutex_clear (&splitmux->state_lock);
663 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
664 g_queue_clear (&splitmux->out_cmd_q);
666 if (splitmux->muxerpad_map)
667 gst_structure_free (splitmux->muxerpad_map);
669 if (splitmux->provided_sink)
670 gst_object_unref (splitmux->provided_sink);
671 if (splitmux->provided_muxer)
672 gst_object_unref (splitmux->provided_muxer);
674 if (splitmux->muxer_factory)
675 g_free (splitmux->muxer_factory);
676 if (splitmux->muxer_preset)
677 g_free (splitmux->muxer_preset);
678 if (splitmux->muxer_properties)
679 gst_structure_free (splitmux->muxer_properties);
680 if (splitmux->sink_factory)
681 g_free (splitmux->sink_factory);
682 if (splitmux->sink_preset)
683 g_free (splitmux->sink_preset);
684 if (splitmux->sink_properties)
685 gst_structure_free (splitmux->sink_properties);
687 if (splitmux->threshold_timecode_str)
688 g_free (splitmux->threshold_timecode_str);
689 if (splitmux->tc_interval)
690 gst_video_time_code_interval_free (splitmux->tc_interval);
692 if (splitmux->times_to_split)
693 gst_queue_array_free (splitmux->times_to_split);
695 g_free (splitmux->location);
697 /* Make sure to free any un-released contexts. There should not be any,
698 * because the dispose will have freed all request pads though */
699 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
700 g_list_free (splitmux->contexts);
701 gst_splitmux_reset_timecode (splitmux);
703 G_OBJECT_CLASS (parent_class)->finalize (object);
707 * Set any time threshold to the muxer, if it has
708 * reserved-max-duration and reserved-duration-remaining
709 * properties. Called when creating/claiming the muxer
710 * in create_elements() */
712 update_muxer_properties (GstSplitMuxSink * sink)
715 GstClockTime threshold_time;
717 sink->muxer_has_reserved_props = FALSE;
718 if (sink->muxer == NULL)
720 klass = G_OBJECT_GET_CLASS (sink->muxer);
721 if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
723 if (g_object_class_find_property (klass,
724 "reserved-duration-remaining") == NULL)
726 sink->muxer_has_reserved_props = TRUE;
728 GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
729 GST_TIME_ARGS (sink->threshold_time));
730 GST_OBJECT_LOCK (sink);
731 threshold_time = sink->threshold_time;
732 GST_OBJECT_UNLOCK (sink);
734 if (threshold_time > 0) {
735 /* Tell the muxer how much space to reserve */
736 GstClockTime muxer_threshold = threshold_time;
737 g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
742 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
743 const GValue * value, GParamSpec * pspec)
745 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
749 GST_OBJECT_LOCK (splitmux);
750 g_free (splitmux->location);
751 splitmux->location = g_value_dup_string (value);
752 GST_OBJECT_UNLOCK (splitmux);
755 case PROP_START_INDEX:
756 GST_OBJECT_LOCK (splitmux);
757 splitmux->start_index = g_value_get_int (value);
758 GST_OBJECT_UNLOCK (splitmux);
760 case PROP_MAX_SIZE_BYTES:
761 GST_OBJECT_LOCK (splitmux);
762 splitmux->threshold_bytes = g_value_get_uint64 (value);
763 GST_OBJECT_UNLOCK (splitmux);
765 case PROP_MAX_SIZE_TIME:
766 GST_OBJECT_LOCK (splitmux);
767 splitmux->threshold_time = g_value_get_uint64 (value);
768 GST_OBJECT_UNLOCK (splitmux);
770 case PROP_MAX_SIZE_TIMECODE:
771 GST_OBJECT_LOCK (splitmux);
772 g_free (splitmux->threshold_timecode_str);
773 /* will be calculated later */
774 g_clear_pointer (&splitmux->tc_interval,
775 gst_video_time_code_interval_free);
776 gst_splitmux_reset_timecode (splitmux);
778 splitmux->threshold_timecode_str = g_value_dup_string (value);
779 if (splitmux->threshold_timecode_str) {
780 splitmux->tc_interval =
781 gst_video_time_code_interval_new_from_string
782 (splitmux->threshold_timecode_str);
783 if (!splitmux->tc_interval) {
784 g_warning ("Wrong timecode string %s",
785 splitmux->threshold_timecode_str);
786 g_free (splitmux->threshold_timecode_str);
787 splitmux->threshold_timecode_str = NULL;
790 GST_OBJECT_UNLOCK (splitmux);
792 case PROP_SEND_KEYFRAME_REQUESTS:
793 GST_OBJECT_LOCK (splitmux);
794 splitmux->send_keyframe_requests = g_value_get_boolean (value);
795 GST_OBJECT_UNLOCK (splitmux);
798 GST_OBJECT_LOCK (splitmux);
799 splitmux->max_files = g_value_get_uint (value);
800 GST_OBJECT_UNLOCK (splitmux);
802 case PROP_MUXER_OVERHEAD:
803 GST_OBJECT_LOCK (splitmux);
804 splitmux->mux_overhead = g_value_get_double (value);
805 GST_OBJECT_UNLOCK (splitmux);
807 case PROP_USE_ROBUST_MUXING:
808 GST_OBJECT_LOCK (splitmux);
809 splitmux->use_robust_muxing = g_value_get_boolean (value);
810 GST_OBJECT_UNLOCK (splitmux);
811 if (splitmux->use_robust_muxing)
812 update_muxer_properties (splitmux);
814 case PROP_ALIGNMENT_THRESHOLD:
815 GST_OBJECT_LOCK (splitmux);
816 splitmux->alignment_threshold = g_value_get_uint64 (value);
817 GST_OBJECT_UNLOCK (splitmux);
820 GST_OBJECT_LOCK (splitmux);
821 gst_clear_object (&splitmux->provided_sink);
822 splitmux->provided_sink = g_value_get_object (value);
823 if (splitmux->provided_sink)
824 gst_object_ref_sink (splitmux->provided_sink);
825 GST_OBJECT_UNLOCK (splitmux);
828 GST_OBJECT_LOCK (splitmux);
829 gst_clear_object (&splitmux->provided_muxer);
830 splitmux->provided_muxer = g_value_get_object (value);
831 if (splitmux->provided_muxer)
832 gst_object_ref_sink (splitmux->provided_muxer);
833 GST_OBJECT_UNLOCK (splitmux);
835 case PROP_RESET_MUXER:
836 GST_OBJECT_LOCK (splitmux);
837 splitmux->reset_muxer = g_value_get_boolean (value);
838 GST_OBJECT_UNLOCK (splitmux);
840 case PROP_ASYNC_FINALIZE:
841 GST_OBJECT_LOCK (splitmux);
842 splitmux->async_finalize = g_value_get_boolean (value);
843 GST_OBJECT_UNLOCK (splitmux);
845 case PROP_MUXER_FACTORY:
846 GST_OBJECT_LOCK (splitmux);
847 if (splitmux->muxer_factory)
848 g_free (splitmux->muxer_factory);
849 splitmux->muxer_factory = g_value_dup_string (value);
850 GST_OBJECT_UNLOCK (splitmux);
852 case PROP_MUXER_PRESET:
853 GST_OBJECT_LOCK (splitmux);
854 if (splitmux->muxer_preset)
855 g_free (splitmux->muxer_preset);
856 splitmux->muxer_preset = g_value_dup_string (value);
857 GST_OBJECT_UNLOCK (splitmux);
859 case PROP_MUXER_PROPERTIES:
860 GST_OBJECT_LOCK (splitmux);
861 if (splitmux->muxer_properties)
862 gst_structure_free (splitmux->muxer_properties);
863 if (gst_value_get_structure (value))
864 splitmux->muxer_properties =
865 gst_structure_copy (gst_value_get_structure (value));
867 splitmux->muxer_properties = NULL;
868 GST_OBJECT_UNLOCK (splitmux);
870 case PROP_SINK_FACTORY:
871 GST_OBJECT_LOCK (splitmux);
872 if (splitmux->sink_factory)
873 g_free (splitmux->sink_factory);
874 splitmux->sink_factory = g_value_dup_string (value);
875 GST_OBJECT_UNLOCK (splitmux);
877 case PROP_SINK_PRESET:
878 GST_OBJECT_LOCK (splitmux);
879 if (splitmux->sink_preset)
880 g_free (splitmux->sink_preset);
881 splitmux->sink_preset = g_value_dup_string (value);
882 GST_OBJECT_UNLOCK (splitmux);
884 case PROP_SINK_PROPERTIES:
885 GST_OBJECT_LOCK (splitmux);
886 if (splitmux->sink_properties)
887 gst_structure_free (splitmux->sink_properties);
888 if (gst_value_get_structure (value))
889 splitmux->sink_properties =
890 gst_structure_copy (gst_value_get_structure (value));
892 splitmux->sink_properties = NULL;
893 GST_OBJECT_UNLOCK (splitmux);
895 case PROP_MUXERPAD_MAP:
897 const GstStructure *s = gst_value_get_structure (value);
898 GST_SPLITMUX_LOCK (splitmux);
899 if (splitmux->muxerpad_map) {
900 gst_structure_free (splitmux->muxerpad_map);
903 splitmux->muxerpad_map = gst_structure_copy (s);
905 splitmux->muxerpad_map = NULL;
906 GST_SPLITMUX_UNLOCK (splitmux);
910 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
916 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
917 GValue * value, GParamSpec * pspec)
919 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
923 GST_OBJECT_LOCK (splitmux);
924 g_value_set_string (value, splitmux->location);
925 GST_OBJECT_UNLOCK (splitmux);
927 case PROP_START_INDEX:
928 GST_OBJECT_LOCK (splitmux);
929 g_value_set_int (value, splitmux->start_index);
930 GST_OBJECT_UNLOCK (splitmux);
932 case PROP_MAX_SIZE_BYTES:
933 GST_OBJECT_LOCK (splitmux);
934 g_value_set_uint64 (value, splitmux->threshold_bytes);
935 GST_OBJECT_UNLOCK (splitmux);
937 case PROP_MAX_SIZE_TIME:
938 GST_OBJECT_LOCK (splitmux);
939 g_value_set_uint64 (value, splitmux->threshold_time);
940 GST_OBJECT_UNLOCK (splitmux);
942 case PROP_MAX_SIZE_TIMECODE:
943 GST_OBJECT_LOCK (splitmux);
944 g_value_set_string (value, splitmux->threshold_timecode_str);
945 GST_OBJECT_UNLOCK (splitmux);
947 case PROP_SEND_KEYFRAME_REQUESTS:
948 GST_OBJECT_LOCK (splitmux);
949 g_value_set_boolean (value, splitmux->send_keyframe_requests);
950 GST_OBJECT_UNLOCK (splitmux);
953 GST_OBJECT_LOCK (splitmux);
954 g_value_set_uint (value, splitmux->max_files);
955 GST_OBJECT_UNLOCK (splitmux);
957 case PROP_MUXER_OVERHEAD:
958 GST_OBJECT_LOCK (splitmux);
959 g_value_set_double (value, splitmux->mux_overhead);
960 GST_OBJECT_UNLOCK (splitmux);
962 case PROP_USE_ROBUST_MUXING:
963 GST_OBJECT_LOCK (splitmux);
964 g_value_set_boolean (value, splitmux->use_robust_muxing);
965 GST_OBJECT_UNLOCK (splitmux);
967 case PROP_ALIGNMENT_THRESHOLD:
968 GST_OBJECT_LOCK (splitmux);
969 g_value_set_uint64 (value, splitmux->alignment_threshold);
970 GST_OBJECT_UNLOCK (splitmux);
973 GST_OBJECT_LOCK (splitmux);
974 g_value_set_object (value, splitmux->provided_sink);
975 GST_OBJECT_UNLOCK (splitmux);
978 GST_OBJECT_LOCK (splitmux);
979 g_value_set_object (value, splitmux->provided_muxer);
980 GST_OBJECT_UNLOCK (splitmux);
982 case PROP_RESET_MUXER:
983 GST_OBJECT_LOCK (splitmux);
984 g_value_set_boolean (value, splitmux->reset_muxer);
985 GST_OBJECT_UNLOCK (splitmux);
987 case PROP_ASYNC_FINALIZE:
988 GST_OBJECT_LOCK (splitmux);
989 g_value_set_boolean (value, splitmux->async_finalize);
990 GST_OBJECT_UNLOCK (splitmux);
992 case PROP_MUXER_FACTORY:
993 GST_OBJECT_LOCK (splitmux);
994 g_value_set_string (value, splitmux->muxer_factory);
995 GST_OBJECT_UNLOCK (splitmux);
997 case PROP_MUXER_PRESET:
998 GST_OBJECT_LOCK (splitmux);
999 g_value_set_string (value, splitmux->muxer_preset);
1000 GST_OBJECT_UNLOCK (splitmux);
1002 case PROP_MUXER_PROPERTIES:
1003 GST_OBJECT_LOCK (splitmux);
1004 gst_value_set_structure (value, splitmux->muxer_properties);
1005 GST_OBJECT_UNLOCK (splitmux);
1007 case PROP_SINK_FACTORY:
1008 GST_OBJECT_LOCK (splitmux);
1009 g_value_set_string (value, splitmux->sink_factory);
1010 GST_OBJECT_UNLOCK (splitmux);
1012 case PROP_SINK_PRESET:
1013 GST_OBJECT_LOCK (splitmux);
1014 g_value_set_string (value, splitmux->sink_preset);
1015 GST_OBJECT_UNLOCK (splitmux);
1017 case PROP_SINK_PROPERTIES:
1018 GST_OBJECT_LOCK (splitmux);
1019 gst_value_set_structure (value, splitmux->sink_properties);
1020 GST_OBJECT_UNLOCK (splitmux);
1022 case PROP_MUXERPAD_MAP:
1023 GST_SPLITMUX_LOCK (splitmux);
1024 gst_value_set_structure (value, splitmux->muxerpad_map);
1025 GST_SPLITMUX_UNLOCK (splitmux);
1028 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1033 /* Convenience function */
1034 static inline GstClockTimeDiff
1035 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1037 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1039 if (GST_CLOCK_TIME_IS_VALID (val)) {
1041 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1051 mq_stream_ctx_reset (MqStreamCtx * ctx)
1053 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1054 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1055 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1056 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1057 g_queue_clear (&ctx->queued_bufs);
1060 static MqStreamCtx *
1061 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1065 ctx = g_new0 (MqStreamCtx, 1);
1066 ctx->splitmux = splitmux;
1067 g_queue_init (&ctx->queued_bufs);
1068 mq_stream_ctx_reset (ctx);
1074 mq_stream_ctx_free (MqStreamCtx * ctx)
1077 GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1079 g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1081 if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1082 gst_element_set_locked_state (ctx->q, TRUE);
1083 gst_element_set_state (ctx->q, GST_STATE_NULL);
1084 gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1085 gst_object_unref (parent);
1087 gst_object_unref (ctx->q);
1089 gst_object_unref (ctx->sinkpad);
1090 gst_object_unref (ctx->srcpad);
1091 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1092 g_queue_clear (&ctx->queued_bufs);
1097 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1100 gchar *location = NULL;
1102 const gchar *msg_name = opened ?
1103 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1104 GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1107 GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1109 running_time = *rtime;
1112 if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1113 "location") != NULL)
1114 g_object_get (sink, "location", &location, NULL);
1116 GST_DEBUG_OBJECT (splitmux,
1117 "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1118 msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1120 /* If it's in the middle of a teardown, the reference_ctc might have become
1122 if (splitmux->reference_ctx) {
1123 msg = gst_message_new_element (GST_OBJECT (splitmux),
1124 gst_structure_new (msg_name,
1125 "location", G_TYPE_STRING, location,
1126 "running-time", GST_TYPE_CLOCK_TIME, running_time,
1127 "sink", GST_TYPE_ELEMENT, sink, NULL));
1128 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1135 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1141 eos = gst_event_new_eos ();
1145 GST_SPLITMUX_LOCK (splitmux);
1147 pad = gst_pad_get_peer (ctx->srcpad);
1148 GST_SPLITMUX_UNLOCK (splitmux);
1150 gst_pad_send_event (pad, eos);
1151 GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1153 gst_object_unref (pad);
1157 /* Called with lock held, drops the lock to send EOS to the
1161 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1166 eos = gst_event_new_eos ();
1167 pad = gst_pad_get_peer (ctx->srcpad);
1169 ctx->out_eos = TRUE;
1171 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1172 GST_SPLITMUX_UNLOCK (splitmux);
1173 gst_pad_send_event (pad, eos);
1174 GST_SPLITMUX_LOCK (splitmux);
1176 gst_object_unref (pad);
1179 /* Called with lock held. Schedules an EOS event to the ctx pad
1180 * to happen in another thread */
1182 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1184 AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1185 GstPad *srcpad, *sinkpad;
1187 srcpad = ctx->srcpad;
1188 sinkpad = gst_pad_get_peer (srcpad);
1191 helper->pad = sinkpad; /* Takes the reference */
1193 ctx->out_eos_async_done = TRUE;
1195 /* There used to be a bug here, where we had to explicitly remove
1196 * the SINK flag so that GstBin would ignore it for EOS purposes.
1197 * That fixed a race where if splitmuxsink really reaches EOS
1198 * before an asynchronous background element has finished, then
1199 * the bin wouldn't actually send EOS to the pipeline. Even after
1200 * finishing and removing the old element, the bin didn't re-check
1201 * EOS status on removing a SINK element. That bug was fixed
1203 GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1206 g_assert_nonnull (helper->pad);
1207 gst_element_call_async (GST_ELEMENT (splitmux),
1208 (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1211 /* Called with lock held. TRUE iff all contexts have a
1212 * pending (or delivered) async eos event */
1214 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1216 gboolean ret = TRUE;
1219 for (item = splitmux->contexts; item; item = item->next) {
1220 MqStreamCtx *ctx = item->data;
1221 ret &= ctx->out_eos_async_done;
1226 /* Called with splitmux lock held to check if this output
1227 * context needs to sleep to wait for the release of the
1228 * next GOP, or to send EOS to close out the current file
1230 static GstFlowReturn
1231 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1233 if (ctx->caps_change)
1237 /* When first starting up, the reference stream has to output
1238 * the first buffer to prepare the muxer and sink */
1239 gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1240 GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1242 if (!(splitmux->max_out_running_time == 0 ||
1243 splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1244 splitmux->alignment_threshold == 0 ||
1245 splitmux->max_out_running_time < splitmux->alignment_threshold)) {
1246 my_max_out_running_time -= splitmux->alignment_threshold;
1247 GST_LOG_OBJECT (ctx->srcpad,
1248 "Max out running time currently %" GST_STIME_FORMAT
1249 ", with threshold applied it is %" GST_STIME_FORMAT,
1250 GST_STIME_ARGS (splitmux->max_out_running_time),
1251 GST_STIME_ARGS (my_max_out_running_time));
1255 || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1256 return GST_FLOW_FLUSHING;
1258 GST_LOG_OBJECT (ctx->srcpad,
1259 "Checking running time %" GST_STIME_FORMAT " against max %"
1260 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1261 GST_STIME_ARGS (my_max_out_running_time));
1264 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1265 ctx->out_running_time < my_max_out_running_time) {
1269 switch (splitmux->output_state) {
1270 case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1271 /* We only get here if we've finished outputting a GOP and need to know
1272 * what to do next */
1273 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1274 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1277 case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1278 case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1279 /* We've reached the max out running_time to get here, so end this file now */
1280 if (ctx->out_eos == FALSE) {
1281 if (splitmux->async_finalize) {
1282 /* We must set EOS asynchronously at this point. We cannot defer
1283 * it, because we need all contexts to wake up, for the
1284 * reference context to eventually give us something at
1285 * START_NEXT_FILE. Otherwise, collectpads might choose another
1286 * context to give us the first buffer, and format-location-full
1287 * will not contain a valid sample. */
1288 g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1289 GINT_TO_POINTER (1));
1290 eos_context_async (ctx, splitmux);
1291 if (all_contexts_are_async_eos (splitmux)) {
1292 GST_INFO_OBJECT (splitmux,
1293 "All contexts are async_eos. Moving to the next file.");
1294 /* We can start the next file once we've asked each pad to go EOS */
1295 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1296 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1300 send_eos (splitmux, ctx);
1304 GST_INFO_OBJECT (splitmux,
1305 "At end-of-file state, but context %p is already EOS", ctx);
1308 case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1309 if (ctx->is_reference) {
1310 GstFlowReturn ret = GST_FLOW_OK;
1312 /* Special handling on the reference ctx to start new fragments
1313 * and collect commands from the command queue */
1314 /* drops the splitmux lock briefly: */
1315 /* We must have reference ctx in order for format-location-full to
1317 ret = start_next_fragment (splitmux, ctx);
1318 if (ret != GST_FLOW_OK)
1324 case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1326 SplitMuxOutputCommand *cmd =
1327 g_queue_pop_tail (&splitmux->out_cmd_q);
1329 /* If we pop the last command, we need to make our queues bigger */
1330 if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1331 grow_blocked_queues (splitmux);
1333 if (cmd->start_new_fragment) {
1334 if (splitmux->muxed_out_bytes > 0) {
1335 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1336 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1338 GST_DEBUG_OBJECT (splitmux,
1339 "Got cmd to start new fragment, but fragment is empty - ignoring.");
1342 GST_DEBUG_OBJECT (splitmux,
1343 "Got new output cmd for time %" GST_STIME_FORMAT,
1344 GST_STIME_ARGS (cmd->max_output_ts));
1346 /* Extend the output range immediately */
1347 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE
1348 || cmd->max_output_ts > splitmux->max_out_running_time)
1349 splitmux->max_out_running_time = cmd->max_output_ts;
1350 GST_DEBUG_OBJECT (splitmux,
1351 "Max out running time now %" GST_STIME_FORMAT,
1352 GST_STIME_ARGS (splitmux->max_out_running_time));
1353 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1355 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1357 out_cmd_buf_free (cmd);
1360 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1362 } while (!ctx->flushing && splitmux->output_state ==
1363 SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1364 /* loop and re-check the state */
1367 case SPLITMUX_OUTPUT_STATE_STOPPED:
1368 return GST_FLOW_FLUSHING;
1371 GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1374 GST_INFO_OBJECT (ctx->srcpad,
1375 "Sleeping for running time %"
1376 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1377 GST_STIME_ARGS (ctx->out_running_time),
1378 GST_STIME_ARGS (splitmux->max_out_running_time));
1379 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1380 GST_INFO_OBJECT (ctx->srcpad,
1381 "Woken for new max running time %" GST_STIME_FORMAT,
1382 GST_STIME_ARGS (splitmux->max_out_running_time));
1390 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1391 const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1392 GstVideoTimeCode ** next_tc)
1394 GstVideoTimeCode *target_tc;
1395 GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1397 if (cur_tc == NULL || splitmux->tc_interval == NULL)
1398 return GST_CLOCK_TIME_NONE;
1400 target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1402 GST_ELEMENT_ERROR (splitmux,
1403 STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1404 return GST_CLOCK_TIME_NONE;
1408 target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1409 cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1411 /* Add running_time, accounting for wraparound. */
1412 if (target_tc_time >= cur_tc_time) {
1413 next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1415 GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1417 if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1418 (cur_tc->config.fps_d == 1001)) {
1419 /* Checking fps_d is probably unneeded, but better safe than sorry
1420 * (e.g. someone accidentally set a flag) */
1421 GstVideoTimeCode *tc_for_offset;
1423 /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1424 * but slightly less. Calculate that duration from a fake timecode. The
1425 * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1426 * is to add one frame to 23:59:59;29 */
1428 gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1429 NULL, cur_tc->config.flags, 23, 59, 59,
1430 cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1432 gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1433 gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1434 cur_tc->config.fps_n);
1435 gst_video_time_code_free (tc_for_offset);
1437 next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1440 GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
1441 " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
1442 GST_TIME_ARGS (cur_tc_time));
1444 *next_tc = target_tc;
1446 gst_video_time_code_free (target_tc);
1448 return next_max_tc_time;
1452 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1453 GstClockTime running_time)
1456 GstClockTime target_time;
1457 gboolean timecode_based = FALSE;
1458 GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1459 GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1460 GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1461 GstClockTime tc_rounding_error = 5 * GST_USECOND;
1463 if (!splitmux->send_keyframe_requests)
1466 if (splitmux->tc_interval) {
1467 if (splitmux->in_tc && gst_video_time_code_is_valid (splitmux->in_tc)) {
1468 GstVideoTimeCode *next_tc = NULL;
1470 calculate_next_max_timecode (splitmux, splitmux->in_tc,
1471 running_time, &next_tc);
1473 /* calculate the next expected keyframe time to prevent too early fku
1475 if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1477 calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1480 gst_video_time_code_free (next_tc);
1482 timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1483 GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1485 /* This can happen in the presence of GAP events that trigger
1486 * a new fragment start */
1487 GST_WARNING_OBJECT (splitmux,
1488 "No buffer available to calculate next timecode");
1492 if ((splitmux->threshold_time == 0 && !timecode_based)
1493 || splitmux->threshold_bytes != 0)
1496 if (timecode_based) {
1497 /* We might have rounding errors: aim slightly earlier */
1498 if (max_tc_time >= tc_rounding_error) {
1499 target_time = max_tc_time - tc_rounding_error;
1501 /* unreliable target time */
1502 GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1503 " is smaller than allowed rounding error, set it to zero",
1504 GST_TIME_ARGS (max_tc_time));
1508 if (next_max_tc_time >= tc_rounding_error) {
1509 next_fku_time = next_max_tc_time - tc_rounding_error;
1511 /* unreliable target time */
1512 GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1513 " is smaller than allowed rounding error, set it to zero",
1514 GST_TIME_ARGS (next_max_tc_time));
1518 target_time = running_time + splitmux->threshold_time;
1521 if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1522 GstClockTime allowed_time = splitmux->next_fku_time;
1524 if (timecode_based) {
1525 if (allowed_time >= tc_rounding_error) {
1526 allowed_time -= tc_rounding_error;
1528 /* unreliable next force key unit time */
1529 GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1531 " is smaller than allowed rounding error, set it to zero",
1532 GST_TIME_ARGS (splitmux->next_fku_time));
1537 if (target_time < allowed_time) {
1538 GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1539 " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1540 ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1541 GST_TIME_ARGS (target_time),
1542 GST_TIME_ARGS (splitmux->next_fku_time),
1543 GST_TIME_ARGS (allowed_time));
1546 } else if (allowed_time != splitmux->next_fku_time &&
1547 target_time < splitmux->next_fku_time) {
1548 GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1549 " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1550 ", but the difference is smaller than allowed rounding error",
1551 GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1555 if (!timecode_based) {
1556 next_fku_time = target_time + splitmux->threshold_time;
1559 splitmux->next_fku_time = next_fku_time;
1561 ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1562 GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1563 ", the next expected keyframe is %" GST_TIME_FORMAT,
1564 GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1565 return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1568 static GstPadProbeReturn
1569 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1571 GstSplitMuxSink *splitmux = ctx->splitmux;
1572 MqStreamBuf *buf_info = NULL;
1573 GstFlowReturn ret = GST_FLOW_OK;
1575 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1577 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1578 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1579 g_warning ("Buffer list handling not implemented");
1580 return GST_PAD_PROBE_DROP;
1582 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1583 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1584 GstEvent *event = gst_pad_probe_info_get_event (info);
1585 gboolean locked = FALSE, wait = !ctx->is_reference;
1587 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1589 switch (GST_EVENT_TYPE (event)) {
1590 case GST_EVENT_SEGMENT:
1591 gst_event_copy_segment (event, &ctx->out_segment);
1593 case GST_EVENT_FLUSH_STOP:
1594 GST_SPLITMUX_LOCK (splitmux);
1596 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1597 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1598 g_queue_clear (&ctx->queued_bufs);
1599 g_queue_clear (&ctx->queued_bufs);
1600 /* If this is the reference context, we just threw away any queued keyframes */
1601 if (ctx->is_reference)
1602 splitmux->queued_keyframes = 0;
1603 ctx->flushing = FALSE;
1606 case GST_EVENT_FLUSH_START:
1607 GST_SPLITMUX_LOCK (splitmux);
1609 GST_LOG_OBJECT (pad, "Flush start");
1610 ctx->flushing = TRUE;
1611 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1612 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1615 GST_SPLITMUX_LOCK (splitmux);
1617 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1619 ctx->out_eos = TRUE;
1621 if (ctx == splitmux->reference_ctx) {
1622 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1623 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1626 GST_INFO_OBJECT (splitmux,
1627 "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1629 case GST_EVENT_GAP:{
1630 GstClockTime gap_ts;
1631 GstClockTimeDiff rtime;
1633 gst_event_parse_gap (event, &gap_ts, NULL);
1634 if (gap_ts == GST_CLOCK_TIME_NONE)
1637 GST_SPLITMUX_LOCK (splitmux);
1640 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1643 /* When we get a gap event on the
1644 * reference stream and we're trying to open a
1645 * new file, we need to store it until we get
1646 * the buffer afterwards
1648 if (ctx->is_reference &&
1649 (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1650 GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1651 gst_event_replace (&ctx->pending_gap, event);
1652 GST_SPLITMUX_UNLOCK (splitmux);
1653 return GST_PAD_PROBE_HANDLED;
1656 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1658 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1659 GST_STIME_ARGS (rtime));
1661 if (rtime != GST_CLOCK_STIME_NONE) {
1662 ctx->out_running_time = rtime;
1663 complete_or_wait_on_out (splitmux, ctx);
1667 case GST_EVENT_CUSTOM_DOWNSTREAM:{
1668 const GstStructure *s;
1669 GstClockTimeDiff ts = 0;
1671 s = gst_event_get_structure (event);
1672 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1675 gst_structure_get_int64 (s, "timestamp", &ts);
1677 GST_SPLITMUX_LOCK (splitmux);
1680 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1682 ctx->out_running_time = ts;
1683 if (!ctx->is_reference)
1684 ret = complete_or_wait_on_out (splitmux, ctx);
1685 GST_SPLITMUX_UNLOCK (splitmux);
1686 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1687 return GST_PAD_PROBE_DROP;
1689 case GST_EVENT_CAPS:{
1692 if (!ctx->is_reference)
1695 peer = gst_pad_get_peer (pad);
1697 gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1699 gst_object_unref (peer);
1707 /* This is in the case the muxer doesn't allow this change of caps */
1708 GST_SPLITMUX_LOCK (splitmux);
1710 ctx->caps_change = TRUE;
1712 if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1713 GST_DEBUG_OBJECT (splitmux,
1714 "New caps were not accepted. Switching output file");
1715 if (ctx->out_eos == FALSE) {
1716 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1717 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1721 /* Lets it fall through, if it fails again, then the muxer just can't
1722 * support this format, but at least we have a closed file.
1730 /* We need to make sure events aren't passed
1731 * until the muxer / sink are ready for it */
1733 GST_SPLITMUX_LOCK (splitmux);
1735 ret = complete_or_wait_on_out (splitmux, ctx);
1736 GST_SPLITMUX_UNLOCK (splitmux);
1738 /* Don't try to forward sticky events before the next buffer is there
1739 * because it would cause a new file to be created without the first
1740 * buffer being available.
1742 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1743 if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1744 gst_event_unref (event);
1745 return GST_PAD_PROBE_HANDLED;
1747 return GST_PAD_PROBE_PASS;
1751 /* Allow everything through until the configured next stopping point */
1752 GST_SPLITMUX_LOCK (splitmux);
1754 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1755 if (buf_info == NULL) {
1756 /* Can only happen due to a poorly timed flush */
1757 ret = GST_FLOW_FLUSHING;
1761 /* If we have popped a keyframe, decrement the queued_gop count */
1762 if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
1763 splitmux->queued_keyframes--;
1765 ctx->out_running_time = buf_info->run_ts;
1766 ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1768 GST_LOG_OBJECT (splitmux,
1769 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1770 " size %" G_GUINT64_FORMAT,
1771 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1773 ctx->caps_change = FALSE;
1775 ret = complete_or_wait_on_out (splitmux, ctx);
1777 splitmux->muxed_out_bytes += buf_info->buf_size;
1779 #ifndef GST_DISABLE_GST_DEBUG
1781 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1782 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1783 " run ts %" GST_STIME_FORMAT, buf,
1784 GST_STIME_ARGS (ctx->out_running_time));
1788 ctx->cur_out_buffer = NULL;
1789 GST_SPLITMUX_UNLOCK (splitmux);
1791 /* pending_gap is protected by the STREAM lock */
1792 if (ctx->pending_gap) {
1793 /* If we previously stored a gap event, send it now */
1794 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1796 GST_DEBUG_OBJECT (splitmux,
1797 "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1799 gst_pad_send_event (peer, ctx->pending_gap);
1800 ctx->pending_gap = NULL;
1802 gst_object_unref (peer);
1805 mq_stream_buf_free (buf_info);
1807 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1808 return GST_PAD_PROBE_PASS;
1811 GST_SPLITMUX_UNLOCK (splitmux);
1812 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1813 return GST_PAD_PROBE_DROP;
1817 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1819 return gst_pad_send_event (peer, gst_event_ref (*event));
1823 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1825 if (ctx->fragment_block_id > 0) {
1826 gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1827 ctx->fragment_block_id = 0;
1832 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1834 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1836 gst_pad_sticky_events_foreach (ctx->srcpad,
1837 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1839 /* Clear EOS flag if not actually EOS */
1840 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1841 ctx->out_eos_async_done = ctx->out_eos;
1843 gst_object_unref (peer);
1847 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1849 GstPad *sinkpad, *srcpad, *newpad;
1850 GstPadTemplate *templ;
1852 srcpad = ctx->srcpad;
1853 sinkpad = gst_pad_get_peer (srcpad);
1855 templ = sinkpad->padtemplate;
1857 gst_element_request_pad (splitmux->muxer, templ,
1858 GST_PAD_NAME (sinkpad), NULL);
1860 GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1862 if (!gst_pad_unlink (srcpad, sinkpad)) {
1863 gst_object_unref (sinkpad);
1866 if (gst_pad_link_full (srcpad, newpad,
1867 GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1868 gst_element_release_request_pad (splitmux->muxer, newpad);
1869 gst_object_unref (sinkpad);
1870 gst_object_unref (newpad);
1873 gst_object_unref (newpad);
1874 gst_object_unref (sinkpad);
1878 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1879 ("Could not create the new muxer/sink"), NULL);
1882 static GstPadProbeReturn
1883 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1885 return GST_PAD_PROBE_OK;
1889 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1891 ctx->fragment_block_id =
1892 gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1897 _set_property_from_structure (GQuark field_id, const GValue * value,
1900 const gchar *property_name = g_quark_to_string (field_id);
1901 GObject *element = G_OBJECT (user_data);
1903 g_object_set_property (element, property_name, value);
1909 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1911 gst_element_set_locked_state (element, TRUE);
1912 gst_element_set_state (element, GST_STATE_NULL);
1913 GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1914 gst_bin_remove (GST_BIN (splitmux), element);
1919 _send_event (const GValue * value, gpointer user_data)
1921 GstPad *pad = g_value_get_object (value);
1922 GstEvent *ev = user_data;
1924 gst_pad_send_event (pad, gst_event_ref (ev));
1927 /* Called with lock held when a fragment
1928 * reaches EOS and it is time to restart
1931 static GstFlowReturn
1932 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1934 GstElement *muxer, *sink;
1936 g_assert (ctx->is_reference);
1938 /* 1 change to new file */
1939 splitmux->switching_fragment = TRUE;
1941 /* We need to drop the splitmux lock to acquire the state lock
1942 * here and ensure there's no racy state change going on elsewhere */
1943 muxer = gst_object_ref (splitmux->muxer);
1944 sink = gst_object_ref (splitmux->active_sink);
1946 GST_SPLITMUX_UNLOCK (splitmux);
1947 GST_SPLITMUX_STATE_LOCK (splitmux);
1949 if (splitmux->shutdown) {
1950 GST_DEBUG_OBJECT (splitmux,
1951 "Shutdown requested. Aborting fragment switch.");
1952 GST_SPLITMUX_LOCK (splitmux);
1953 GST_SPLITMUX_STATE_UNLOCK (splitmux);
1954 gst_object_unref (muxer);
1955 gst_object_unref (sink);
1956 return GST_FLOW_FLUSHING;
1959 if (splitmux->async_finalize) {
1960 if (splitmux->muxed_out_bytes > 0
1961 || splitmux->fragment_id != splitmux->start_index) {
1963 GstElement *new_sink, *new_muxer;
1965 GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
1966 splitmux->fragment_id);
1967 g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
1968 newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
1969 GST_SPLITMUX_LOCK (splitmux);
1970 if ((splitmux->sink =
1971 create_element (splitmux, splitmux->sink_factory, newname,
1974 if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
1975 gst_preset_load_preset (GST_PRESET (splitmux->sink),
1976 splitmux->sink_preset);
1977 if (splitmux->sink_properties)
1978 gst_structure_foreach (splitmux->sink_properties,
1979 _set_property_from_structure, splitmux->sink);
1980 splitmux->active_sink = splitmux->sink;
1981 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
1983 newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
1984 if ((splitmux->muxer =
1985 create_element (splitmux, splitmux->muxer_factory, newname,
1988 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
1990 /* async child elements are causing state change races and weird
1991 * failures, so let's try and turn that off */
1992 g_object_set (splitmux->sink, "async", FALSE, NULL);
1994 if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
1995 gst_preset_load_preset (GST_PRESET (splitmux->muxer),
1996 splitmux->muxer_preset);
1997 if (splitmux->muxer_properties)
1998 gst_structure_foreach (splitmux->muxer_properties,
1999 _set_property_from_structure, splitmux->muxer);
2000 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2002 new_sink = splitmux->sink;
2003 new_muxer = splitmux->muxer;
2004 GST_SPLITMUX_UNLOCK (splitmux);
2005 g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2006 gst_element_link (new_muxer, new_sink);
2008 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2009 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2010 EOS_FROM_US)) == 2) {
2011 _lock_and_set_to_null (muxer, splitmux);
2012 _lock_and_set_to_null (sink, splitmux);
2014 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2015 GINT_TO_POINTER (2));
2018 gst_object_unref (muxer);
2019 gst_object_unref (sink);
2022 gst_object_ref (muxer);
2023 gst_object_ref (sink);
2027 gst_element_set_locked_state (muxer, TRUE);
2028 gst_element_set_locked_state (sink, TRUE);
2029 gst_element_set_state (sink, GST_STATE_NULL);
2031 if (splitmux->reset_muxer) {
2032 gst_element_set_state (muxer, GST_STATE_NULL);
2034 GstIterator *it = gst_element_iterate_sink_pads (muxer);
2038 ev = gst_event_new_flush_start ();
2039 seqnum = gst_event_get_seqnum (ev);
2040 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2041 gst_event_unref (ev);
2043 gst_iterator_resync (it);
2045 ev = gst_event_new_flush_stop (TRUE);
2046 gst_event_set_seqnum (ev, seqnum);
2047 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2048 gst_event_unref (ev);
2050 gst_iterator_free (it);
2054 GST_SPLITMUX_LOCK (splitmux);
2055 set_next_filename (splitmux, ctx);
2056 splitmux->muxed_out_bytes = 0;
2057 GST_SPLITMUX_UNLOCK (splitmux);
2059 if (gst_element_set_state (sink,
2060 GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2061 gst_element_set_state (sink, GST_STATE_NULL);
2062 gst_element_set_locked_state (muxer, FALSE);
2063 gst_element_set_locked_state (sink, FALSE);
2068 if (gst_element_set_state (muxer,
2069 GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2070 gst_element_set_state (muxer, GST_STATE_NULL);
2071 gst_element_set_state (sink, GST_STATE_NULL);
2072 gst_element_set_locked_state (muxer, FALSE);
2073 gst_element_set_locked_state (sink, FALSE);
2077 gst_element_set_locked_state (muxer, FALSE);
2078 gst_element_set_locked_state (sink, FALSE);
2080 gst_object_unref (sink);
2081 gst_object_unref (muxer);
2083 GST_SPLITMUX_LOCK (splitmux);
2084 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2085 splitmux->switching_fragment = FALSE;
2086 do_async_done (splitmux);
2088 splitmux->ready_for_output = TRUE;
2090 g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2091 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2093 send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2095 /* FIXME: Is this always the correct next state? */
2096 GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2097 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2098 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2102 gst_object_unref (sink);
2103 gst_object_unref (muxer);
2105 GST_SPLITMUX_LOCK (splitmux);
2106 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2107 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2108 ("Could not create the new muxer/sink"), NULL);
2109 return GST_FLOW_ERROR;
2112 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2113 ("Could not start new output sink"), NULL);
2114 gst_object_unref (sink);
2115 gst_object_unref (muxer);
2117 GST_SPLITMUX_LOCK (splitmux);
2118 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2119 splitmux->switching_fragment = FALSE;
2120 return GST_FLOW_ERROR;
2123 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2124 ("Could not start new muxer"), NULL);
2125 gst_object_unref (sink);
2126 gst_object_unref (muxer);
2128 GST_SPLITMUX_LOCK (splitmux);
2129 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2130 splitmux->switching_fragment = FALSE;
2131 return GST_FLOW_ERROR;
2135 bus_handler (GstBin * bin, GstMessage * message)
2137 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2139 switch (GST_MESSAGE_TYPE (message)) {
2140 case GST_MESSAGE_EOS:{
2141 /* If the state is draining out the current file, drop this EOS */
2144 sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2145 GST_SPLITMUX_LOCK (splitmux);
2147 send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2149 if (splitmux->async_finalize) {
2151 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2152 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2153 EOS_FROM_US)) == 2) {
2155 GstPad *sinksink, *muxersrc;
2157 sinksink = gst_element_get_static_pad (sink, "sink");
2158 muxersrc = gst_pad_get_peer (sinksink);
2159 muxer = gst_pad_get_parent_element (muxersrc);
2160 gst_object_unref (sinksink);
2161 gst_object_unref (muxersrc);
2163 gst_element_call_async (muxer,
2164 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2165 gst_object_ref (splitmux), gst_object_unref);
2166 gst_element_call_async (sink,
2167 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2168 gst_object_ref (splitmux), gst_object_unref);
2169 gst_object_unref (muxer);
2171 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2172 GINT_TO_POINTER (2));
2174 GST_DEBUG_OBJECT (splitmux,
2175 "Caught async EOS from previous muxer+sink. Dropping.");
2176 /* We forward the EOS so that it gets aggregated as normal. If the sink
2177 * finishes and is removed before the end, it will be de-aggregated */
2178 gst_message_unref (message);
2179 GST_SPLITMUX_UNLOCK (splitmux);
2182 } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2183 GST_DEBUG_OBJECT (splitmux,
2184 "Passing EOS message. Output state %d max_out_running_time %"
2185 GST_STIME_FORMAT, splitmux->output_state,
2186 GST_STIME_ARGS (splitmux->max_out_running_time));
2188 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2189 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2190 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2192 gst_message_unref (message);
2193 GST_SPLITMUX_UNLOCK (splitmux);
2196 GST_SPLITMUX_UNLOCK (splitmux);
2199 case GST_MESSAGE_ASYNC_START:
2200 case GST_MESSAGE_ASYNC_DONE:
2201 /* Ignore state changes from our children while switching */
2202 GST_SPLITMUX_LOCK (splitmux);
2203 if (splitmux->switching_fragment) {
2204 if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2205 || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2206 GST_LOG_OBJECT (splitmux,
2207 "Ignoring state change from child %" GST_PTR_FORMAT
2208 " while switching", GST_MESSAGE_SRC (message));
2209 gst_message_unref (message);
2210 GST_SPLITMUX_UNLOCK (splitmux);
2214 GST_SPLITMUX_UNLOCK (splitmux);
2216 case GST_MESSAGE_WARNING:
2218 GError *gerror = NULL;
2220 gst_message_parse_warning (message, &gerror, NULL);
2222 if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2224 gboolean caps_change = FALSE;
2226 GST_SPLITMUX_LOCK (splitmux);
2228 for (item = splitmux->contexts; item; item = item->next) {
2229 MqStreamCtx *ctx = item->data;
2231 if (ctx->caps_change) {
2237 GST_SPLITMUX_UNLOCK (splitmux);
2240 GST_LOG_OBJECT (splitmux,
2241 "Ignoring warning change from child %" GST_PTR_FORMAT
2242 " while switching caps", GST_MESSAGE_SRC (message));
2243 gst_message_unref (message);
2253 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2257 ctx_set_unblock (MqStreamCtx * ctx)
2259 ctx->need_unblock = TRUE;
2263 need_new_fragment (GstSplitMuxSink * splitmux,
2264 GstClockTime queued_time, GstClockTime queued_gop_time,
2265 guint64 queued_bytes)
2267 guint64 thresh_bytes;
2268 GstClockTime thresh_time;
2269 gboolean check_robust_muxing;
2270 GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2271 GstClockTime *ptr_to_time;
2273 GST_OBJECT_LOCK (splitmux);
2274 thresh_bytes = splitmux->threshold_bytes;
2275 thresh_time = splitmux->threshold_time;
2276 ptr_to_time = (GstClockTime *)
2277 gst_queue_array_peek_head_struct (splitmux->times_to_split);
2279 time_to_split = *ptr_to_time;
2280 check_robust_muxing = splitmux->use_robust_muxing
2281 && splitmux->muxer_has_reserved_props;
2282 GST_OBJECT_UNLOCK (splitmux);
2284 /* Have we muxed at least one thing from the reference
2285 * stream into the file? If not, no other streams can have
2287 if (splitmux->fragment_reference_bytes <= 0) {
2288 GST_TRACE_OBJECT (splitmux,
2289 "Not ready to split - nothing muxed on the reference stream");
2293 /* User told us to split now */
2294 if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2295 GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2299 /* User told us to split at this running time */
2300 if (splitmux->gop_start_time >= time_to_split) {
2301 GST_OBJECT_LOCK (splitmux);
2302 /* Dequeue running time */
2303 gst_queue_array_pop_head_struct (splitmux->times_to_split);
2304 /* Empty any running times after this that are past now */
2305 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2306 while (ptr_to_time) {
2307 time_to_split = *ptr_to_time;
2308 if (splitmux->gop_start_time < time_to_split) {
2311 gst_queue_array_pop_head_struct (splitmux->times_to_split);
2312 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2314 GST_TRACE_OBJECT (splitmux,
2315 "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2316 GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->gop_start_time),
2317 GST_STIME_ARGS (time_to_split));
2318 GST_OBJECT_UNLOCK (splitmux);
2322 if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2323 GST_TRACE_OBJECT (splitmux,
2324 "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2325 return TRUE; /* Would overrun byte limit */
2328 if (thresh_time > 0 && queued_time > thresh_time) {
2329 GST_TRACE_OBJECT (splitmux,
2330 "queued time %" GST_STIME_FORMAT " overruns time limit",
2331 GST_STIME_ARGS (queued_time));
2332 return TRUE; /* Would overrun time limit */
2335 if (splitmux->tc_interval &&
2336 GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2337 splitmux->reference_ctx->in_running_time >
2338 splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2339 GST_TRACE_OBJECT (splitmux,
2340 "in running time %" GST_STIME_FORMAT " overruns time limit %"
2342 GST_STIME_ARGS (splitmux->reference_ctx->in_running_time),
2343 GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2347 if (check_robust_muxing) {
2348 GstClockTime mux_reserved_remain;
2350 g_object_get (splitmux->muxer,
2351 "reserved-duration-remaining", &mux_reserved_remain, NULL);
2353 GST_LOG_OBJECT (splitmux,
2354 "Muxer robust muxing report - %" G_GUINT64_FORMAT
2355 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2356 mux_reserved_remain, queued_gop_time);
2358 if (queued_gop_time >= mux_reserved_remain) {
2359 GST_INFO_OBJECT (splitmux,
2360 "File is about to run out of header room - %" G_GUINT64_FORMAT
2361 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2362 ". Switching to new file", mux_reserved_remain, queued_gop_time);
2367 /* Continue and mux this GOP */
2371 /* probably we want to add this API? */
2373 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2375 GstVideoTimeCode *timecode = NULL;
2377 g_return_if_fail (old_tc != NULL);
2379 if (*old_tc == new_tc)
2383 timecode = gst_video_time_code_copy (new_tc);
2386 gst_video_time_code_free (*old_tc);
2391 /* Called with splitmux lock held */
2392 /* Called when entering ProcessingCompleteGop state
2393 * Assess if mq contents overflowed the current file
2394 * -> If yes, need to switch to new file
2395 * -> if no, set max_out_running_time to let this GOP in and
2396 * go to COLLECTING_GOP_START state
2399 handle_gathered_gop (GstSplitMuxSink * splitmux)
2401 guint64 queued_bytes;
2402 GstClockTimeDiff queued_time = 0;
2403 GstClockTimeDiff queued_gop_time = 0;
2404 GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
2405 SplitMuxOutputCommand *cmd;
2407 /* Assess if the multiqueue contents overflowed the current file */
2408 /* When considering if a newly gathered GOP overflows
2409 * the time limit for the file, only consider the running time of the
2410 * reference stream. Other streams might have run ahead a little bit,
2411 * but extra pieces won't be released to the muxer beyond the reference
2412 * stream cut-off anyway - so it forms the limit. */
2413 queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
2414 queued_time = splitmux->reference_ctx->in_running_time;
2415 /* queued_gop_time tracks how much unwritten data there is waiting to
2416 * be written to this fragment including this GOP */
2417 if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2419 splitmux->reference_ctx->in_running_time -
2420 splitmux->reference_ctx->out_running_time;
2423 splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
2425 GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2426 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2427 " bytes %" G_GUINT64_FORMAT " in running time %" GST_STIME_FORMAT
2428 " gop start time %" GST_STIME_FORMAT,
2429 GST_STIME_ARGS (queued_time), queued_bytes,
2430 GST_STIME_ARGS (splitmux->reference_ctx->in_running_time),
2431 GST_STIME_ARGS (splitmux->gop_start_time));
2433 if (queued_gop_time < 0)
2434 goto error_gop_duration;
2436 if (queued_time < splitmux->fragment_start_time)
2437 goto error_queued_time;
2439 queued_time -= splitmux->fragment_start_time;
2440 if (queued_time < queued_gop_time)
2441 queued_gop_time = queued_time;
2443 /* Expand queued bytes estimate by muxer overhead */
2444 queued_bytes += (queued_bytes * splitmux->mux_overhead);
2446 /* Check for overrun - have we output at least one byte and overrun
2447 * either threshold? */
2448 if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2449 if (splitmux->async_finalize) {
2450 GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2451 *sink_running_time = splitmux->reference_ctx->out_running_time;
2452 g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2453 RUNNING_TIME, sink_running_time, g_free);
2455 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2456 /* Tell the output side to start a new fragment */
2457 GST_INFO_OBJECT (splitmux,
2458 "This GOP (dur %" GST_STIME_FORMAT
2459 ") would overflow the fragment, Sending start_new_fragment cmd",
2460 GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
2461 splitmux->gop_start_time));
2462 cmd = out_cmd_buf_new ();
2463 cmd->start_new_fragment = TRUE;
2464 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2465 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2467 new_out_ts = splitmux->reference_ctx->in_running_time;
2468 splitmux->fragment_start_time = splitmux->gop_start_time;
2469 splitmux->fragment_total_bytes = 0;
2470 splitmux->fragment_reference_bytes = 0;
2472 if (splitmux->tc_interval) {
2473 video_time_code_replace (&splitmux->fragment_start_tc,
2474 splitmux->gop_start_tc);
2475 splitmux->next_fragment_start_tc_time =
2476 calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2477 splitmux->fragment_start_time, NULL);
2478 if (!GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2479 GST_WARNING_OBJECT (splitmux,
2480 "Couldn't calculate next fragment start time for timecode mode");
2481 /* shouldn't happen, but reset all and try again with next buffers */
2482 gst_splitmux_reset_timecode (splitmux);
2487 /* And set up to collect the next GOP */
2488 if (!splitmux->reference_ctx->in_eos) {
2489 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2490 splitmux->gop_start_time = new_out_ts;
2491 if (splitmux->tc_interval)
2492 video_time_code_replace (&splitmux->gop_start_tc, splitmux->in_tc);
2494 /* This is probably already the current state, but just in case: */
2495 splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2496 new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */
2499 /* And wake all input contexts to send a wake-up event */
2500 g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2501 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2503 /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2504 splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
2505 splitmux->fragment_reference_bytes += splitmux->gop_reference_bytes;
2507 if (splitmux->gop_total_bytes > 0) {
2508 GST_LOG_OBJECT (splitmux,
2509 "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2510 " time %" GST_STIME_FORMAT,
2511 splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2513 /* Send this GOP to the output command queue */
2514 cmd = out_cmd_buf_new ();
2515 cmd->start_new_fragment = FALSE;
2516 cmd->max_output_ts = new_out_ts;
2517 GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2518 GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
2519 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2521 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2524 splitmux->gop_total_bytes = 0;
2525 splitmux->gop_reference_bytes = 0;
2529 GST_ELEMENT_ERROR (splitmux,
2530 STREAM, FAILED, ("Timestamping error on input streams"),
2531 ("Queued GOP time is negative %" GST_STIME_FORMAT,
2532 GST_STIME_ARGS (queued_gop_time)));
2535 GST_ELEMENT_ERROR (splitmux,
2536 STREAM, FAILED, ("Timestamping error on input streams"),
2537 ("Queued time is negative. Input went backwards. queued_time - %"
2538 GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2542 /* Called with splitmux lock held */
2543 /* Called from each input pad when it is has all the pieces
2544 * for a GOP or EOS, starting with the reference pad which has set the
2545 * splitmux->max_in_running_time
2548 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2553 /* On ENDING_FILE, the reference stream sends a command to start a new
2554 * fragment, then releases the GOP for output in the new fragment.
2555 * If some streams received no buffer during the last GOP that overran,
2556 * because its next buffer has a timestamp bigger than
2557 * ctx->max_in_running_time, its queue is empty. In that case the only
2558 * way to wakeup the output thread is by injecting an event in the
2559 * queue. This usually happen with subtitle streams.
2560 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2561 if (ctx->need_unblock) {
2562 GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2563 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2564 GST_EVENT_TYPE_SERIALIZED,
2565 gst_structure_new ("splitmuxsink-unblock", "timestamp",
2566 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2568 GST_SPLITMUX_UNLOCK (splitmux);
2569 gst_pad_send_event (ctx->sinkpad, event);
2570 GST_SPLITMUX_LOCK (splitmux);
2572 ctx->need_unblock = FALSE;
2573 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2574 /* state may have changed while we were unlocked. Loop again if so */
2575 if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2580 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2581 gboolean ready = TRUE;
2583 /* Iterate each pad, and check that the input running time is at least
2584 * up to the reference running time, and if so handle the collected GOP */
2585 GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
2586 GST_STIME_FORMAT " ctx %p",
2587 GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
2588 for (cur = g_list_first (splitmux->contexts); cur != NULL;
2589 cur = g_list_next (cur)) {
2590 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2592 GST_LOG_OBJECT (splitmux,
2593 "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2594 " EOS %d", tmpctx, tmpctx->sinkpad,
2595 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2597 if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
2598 tmpctx->in_running_time < splitmux->max_in_running_time &&
2600 GST_LOG_OBJECT (splitmux,
2601 "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2602 tmpctx, tmpctx->sinkpad);
2608 GST_DEBUG_OBJECT (splitmux,
2609 "Collected GOP is complete. Processing (ctx %p)", ctx);
2610 /* All pads have a complete GOP, release it into the multiqueue */
2611 handle_gathered_gop (splitmux);
2613 /* The user has requested a split, we can split now that the previous GOP
2614 * has been collected to the correct location */
2615 if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2617 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2622 /* If upstream reached EOS we are not expecting more data, no need to wait
2627 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2629 (ctx->in_running_time >= splitmux->max_in_running_time) &&
2630 (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
2631 /* Some pad is not yet ready, or GOP is being pushed
2632 * either way, sleep and wait to get woken */
2633 GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2634 GST_SPLITMUX_WAIT_INPUT (splitmux);
2635 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2637 /* This pad is not ready or the state changed - break out and get another
2641 } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2644 static GstPadProbeReturn
2645 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2647 GstSplitMuxSink *splitmux = ctx->splitmux;
2648 GstFlowReturn ret = GST_FLOW_OK;
2650 MqStreamBuf *buf_info = NULL;
2652 gboolean loop_again;
2653 gboolean keyframe = FALSE;
2655 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2657 /* FIXME: Handle buffer lists, until then make it clear they won't work */
2658 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2659 g_warning ("Buffer list handling not implemented");
2660 return GST_PAD_PROBE_DROP;
2662 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2663 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2664 GstEvent *event = gst_pad_probe_info_get_event (info);
2666 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2668 switch (GST_EVENT_TYPE (event)) {
2669 case GST_EVENT_SEGMENT:
2670 gst_event_copy_segment (event, &ctx->in_segment);
2672 case GST_EVENT_FLUSH_STOP:
2673 GST_SPLITMUX_LOCK (splitmux);
2674 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2675 ctx->in_eos = FALSE;
2676 ctx->in_running_time = GST_CLOCK_STIME_NONE;
2677 GST_SPLITMUX_UNLOCK (splitmux);
2680 GST_SPLITMUX_LOCK (splitmux);
2683 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2684 ret = GST_FLOW_FLUSHING;
2688 if (ctx->is_reference) {
2689 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2690 /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2691 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2692 /* Wake up other input pads to collect this GOP */
2693 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2694 check_completed_gop (splitmux, ctx);
2695 } else if (splitmux->input_state ==
2696 SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2697 /* If we are waiting for a GOP to be completed (ie, for aux
2698 * pads to catch up), then this pad is complete, so check
2699 * if the whole GOP is.
2701 check_completed_gop (splitmux, ctx);
2703 GST_SPLITMUX_UNLOCK (splitmux);
2705 case GST_EVENT_GAP:{
2706 GstClockTime gap_ts;
2707 GstClockTimeDiff rtime;
2709 gst_event_parse_gap (event, &gap_ts, NULL);
2710 if (gap_ts == GST_CLOCK_TIME_NONE)
2713 GST_SPLITMUX_LOCK (splitmux);
2715 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2716 ret = GST_FLOW_FLUSHING;
2719 rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2721 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2722 GST_STIME_ARGS (rtime));
2724 if (ctx->is_reference
2725 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2726 splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
2727 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2728 GST_STIME_ARGS (splitmux->fragment_start_time));
2729 /* Also take this as the first start time when starting up,
2730 * so that we start counting overflow from the first frame */
2731 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2732 splitmux->max_in_running_time = splitmux->fragment_start_time;
2735 GST_SPLITMUX_UNLOCK (splitmux);
2741 return GST_PAD_PROBE_PASS;
2742 } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2743 switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2744 case GST_QUERY_ALLOCATION:
2745 return GST_PAD_PROBE_DROP;
2747 return GST_PAD_PROBE_PASS;
2751 buf = gst_pad_probe_info_get_buffer (info);
2752 buf_info = mq_stream_buf_new ();
2754 if (GST_BUFFER_PTS_IS_VALID (buf))
2755 ts = GST_BUFFER_PTS (buf);
2757 ts = GST_BUFFER_DTS (buf);
2759 GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2761 GST_SPLITMUX_LOCK (splitmux);
2763 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2764 ret = GST_FLOW_FLUSHING;
2768 /* If this buffer has a timestamp, advance the input timestamp of the
2770 if (GST_CLOCK_TIME_IS_VALID (ts)) {
2771 GstClockTimeDiff running_time =
2772 my_segment_to_running_time (&ctx->in_segment, ts);
2774 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2775 GST_STIME_ARGS (running_time));
2777 if (GST_CLOCK_STIME_IS_VALID (running_time)
2778 && running_time > ctx->in_running_time)
2779 ctx->in_running_time = running_time;
2782 /* Try to make sure we have a valid running time */
2783 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2784 ctx->in_running_time =
2785 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2788 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2789 GST_STIME_ARGS (ctx->in_running_time));
2791 buf_info->run_ts = ctx->in_running_time;
2792 buf_info->buf_size = gst_buffer_get_size (buf);
2793 buf_info->duration = GST_BUFFER_DURATION (buf);
2795 if (ctx->is_reference) {
2796 /* initialize fragment_start_time */
2797 if (splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2798 splitmux->gop_start_time = splitmux->fragment_start_time =
2800 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2801 GST_STIME_ARGS (splitmux->fragment_start_time));
2803 /* Also take this as the first start time when starting up,
2804 * so that we start counting overflow from the first frame */
2805 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2806 splitmux->max_in_running_time = splitmux->fragment_start_time;
2809 if (splitmux->tc_interval) {
2810 GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
2812 video_time_code_replace (&splitmux->in_tc, &tc_meta->tc);
2814 if (!splitmux->fragment_start_tc) {
2815 /* also initialize fragment_start_tc */
2816 video_time_code_replace (&splitmux->gop_start_tc, &tc_meta->tc);
2817 video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
2819 splitmux->next_fragment_start_tc_time =
2820 calculate_next_max_timecode (splitmux, splitmux->in_tc,
2821 ctx->in_running_time, NULL);
2822 GST_DEBUG_OBJECT (splitmux, "Initialize next fragment start tc time %"
2824 GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2829 /* Check whether we need to request next keyframe depending on
2830 * current running time */
2831 if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) &&
2832 request_next_keyframe (splitmux, buf, ctx->in_running_time) == FALSE) {
2833 GST_WARNING_OBJECT (splitmux,
2834 "Could not request a keyframe. Files may not split at the exact location they should");
2838 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
2839 " total GOP bytes %" G_GUINT64_FORMAT,
2840 GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
2847 switch (splitmux->input_state) {
2848 case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
2849 if (ctx->is_releasing) {
2850 /* The pad belonging to this context is being released */
2851 GST_WARNING_OBJECT (pad, "Pad is being released while the muxer is "
2852 "running. Data might not drain correctly");
2854 } else if (ctx->is_reference) {
2855 /* This is the reference context. If it's a keyframe,
2856 * it marks the start of a new GOP and we should wait in
2857 * check_completed_gop before continuing, but either way
2858 * (keyframe or no, we'll pass this buffer through after
2859 * so set loop_again to FALSE */
2862 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
2863 /* Allow other input pads to catch up to here too */
2864 if (ctx->in_running_time > splitmux->max_in_running_time)
2865 splitmux->max_in_running_time = ctx->in_running_time;
2866 GST_LOG_OBJECT (splitmux,
2867 "Max in running time now %" GST_TIME_FORMAT,
2868 GST_TIME_ARGS (splitmux->max_in_running_time));
2869 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2872 GST_INFO_OBJECT (pad,
2873 "Have keyframe with running time %" GST_STIME_FORMAT,
2874 GST_STIME_ARGS (ctx->in_running_time));
2876 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2877 if (ctx->in_running_time > splitmux->max_in_running_time)
2878 splitmux->max_in_running_time = ctx->in_running_time;
2879 GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
2880 GST_TIME_ARGS (splitmux->max_in_running_time));
2881 /* Wake up other input pads to collect this GOP */
2882 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2883 check_completed_gop (splitmux, ctx);
2885 /* Pass this buffer if the reference ctx is far enough ahead */
2886 if (ctx->in_running_time < splitmux->max_in_running_time) {
2891 /* We're still waiting for a keyframe on the reference pad, sleep */
2892 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
2893 GST_SPLITMUX_WAIT_INPUT (splitmux);
2894 GST_LOG_OBJECT (pad,
2895 "Done sleeping for GOP start input state now %d",
2896 splitmux->input_state);
2899 case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
2900 /* We're collecting a GOP. If this is the reference context,
2901 * we need to check if this is a keyframe that marks the start
2902 * of the next GOP. If it is, it marks the end of the GOP we're
2903 * collecting, so sleep and wait until all the other pads also
2904 * reach that timestamp - at which point, we have an entire GOP
2905 * and either go to ENDING_FILE or release this GOP to the muxer and
2906 * go back to COLLECT_GOP_START. */
2908 /* If we overran the target timestamp, it might be time to process
2909 * the GOP, otherwise bail out for more data
2911 GST_LOG_OBJECT (pad,
2912 "Checking TS %" GST_STIME_FORMAT " against max %"
2913 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
2914 GST_STIME_ARGS (splitmux->max_in_running_time));
2916 if (ctx->in_running_time < splitmux->max_in_running_time) {
2921 GST_LOG_OBJECT (pad,
2922 "Collected last packet of GOP. Checking other pads");
2923 check_completed_gop (splitmux, ctx);
2926 case SPLITMUX_INPUT_STATE_FINISHING_UP:
2936 if (keyframe && ctx->is_reference)
2937 splitmux->queued_keyframes++;
2938 buf_info->keyframe = keyframe;
2940 /* Update total input byte counter for overflow detect */
2941 splitmux->gop_total_bytes += buf_info->buf_size;
2942 if (ctx->is_reference) {
2943 splitmux->gop_reference_bytes += buf_info->buf_size;
2946 /* Now add this buffer to the queue just before returning */
2947 g_queue_push_head (&ctx->queued_bufs, buf_info);
2949 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
2950 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
2952 GST_SPLITMUX_UNLOCK (splitmux);
2953 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
2954 return GST_PAD_PROBE_PASS;
2957 GST_SPLITMUX_UNLOCK (splitmux);
2959 mq_stream_buf_free (buf_info);
2960 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
2961 return GST_PAD_PROBE_PASS;
2965 grow_blocked_queues (GstSplitMuxSink * splitmux)
2969 /* Scan other queues for full-ness and grow them */
2970 for (cur = g_list_first (splitmux->contexts);
2971 cur != NULL; cur = g_list_next (cur)) {
2972 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2974 guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
2976 g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
2977 GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
2979 if (cur_len >= cur_limit) {
2980 cur_limit = cur_len + 1;
2981 GST_DEBUG_OBJECT (tmpctx->q,
2982 "Queue overflowed and needs enlarging. Growing to %u buffers",
2984 g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
2990 handle_q_underrun (GstElement * q, gpointer user_data)
2992 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2993 GstSplitMuxSink *splitmux = ctx->splitmux;
2995 GST_SPLITMUX_LOCK (splitmux);
2996 GST_DEBUG_OBJECT (q,
2997 "Queue reported underrun with %d keyframes and %d cmds enqueued",
2998 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2999 grow_blocked_queues (splitmux);
3000 GST_SPLITMUX_UNLOCK (splitmux);
3004 handle_q_overrun (GstElement * q, gpointer user_data)
3006 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3007 GstSplitMuxSink *splitmux = ctx->splitmux;
3008 gboolean allow_grow = FALSE;
3010 GST_SPLITMUX_LOCK (splitmux);
3011 GST_DEBUG_OBJECT (q,
3012 "Queue reported overrun with %d keyframes and %d cmds enqueued",
3013 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3015 if (splitmux->queued_keyframes < 2) {
3016 /* Less than a full GOP queued, grow the queue */
3018 } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3021 /* If another queue is starved, grow */
3023 for (cur = g_list_first (splitmux->contexts);
3024 cur != NULL; cur = g_list_next (cur)) {
3025 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3026 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3031 GST_SPLITMUX_UNLOCK (splitmux);
3036 g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3039 GST_DEBUG_OBJECT (q,
3040 "Queue overflowed and needs enlarging. Growing to %u buffers",
3043 g_object_set (q, "max-size-buffers", cur_limit, NULL);
3047 /* Called with SPLITMUX lock held */
3048 static const gchar *
3049 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3051 const gchar *ret = NULL;
3053 if (splitmux->muxerpad_map == NULL)
3056 if (sinkpad_name == NULL) {
3057 GST_WARNING_OBJECT (splitmux,
3058 "Can't look up request pad in pad map without providing a pad name");
3062 ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3064 GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3066 return g_strdup (ret);
3073 gst_splitmux_sink_request_new_pad (GstElement * element,
3074 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3076 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3077 GstPadTemplate *mux_template = NULL;
3078 GstPad *ret = NULL, *muxpad = NULL;
3080 GstPad *q_sink = NULL, *q_src = NULL;
3081 gchar *gname, *qname;
3082 gboolean is_primary_video = FALSE, is_video = FALSE,
3083 muxer_is_requestpad = FALSE;
3085 const gchar *muxer_padname = NULL;
3087 GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3089 GST_SPLITMUX_LOCK (splitmux);
3090 if (!create_muxer (splitmux))
3092 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3094 if (g_str_equal (templ->name_template, "video") ||
3095 g_str_has_prefix (templ->name_template, "video_aux_")) {
3096 is_primary_video = g_str_equal (templ->name_template, "video");
3097 if (is_primary_video && splitmux->have_video)
3098 goto already_have_video;
3102 /* See if there's a pad map and it lists this pad */
3103 muxer_padname = lookup_muxer_pad (splitmux, name);
3105 if (muxer_padname == NULL) {
3107 /* FIXME: Look for a pad template with matching caps, rather than by name */
3108 GST_DEBUG_OBJECT (element,
3109 "searching for pad-template with name 'video_%%u'");
3111 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3112 (splitmux->muxer), "video_%u");
3114 /* Fallback to find sink pad templates named 'video' (flvmux) */
3115 if (!mux_template) {
3116 GST_DEBUG_OBJECT (element,
3117 "searching for pad-template with name 'video'");
3119 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3120 (splitmux->muxer), "video");
3124 GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3125 templ->name_template);
3127 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3128 (splitmux->muxer), templ->name_template);
3130 /* Fallback to find sink pad templates named 'audio' (flvmux) */
3131 if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3132 GST_DEBUG_OBJECT (element,
3133 "searching for pad-template with name 'audio'");
3135 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3136 (splitmux->muxer), "audio");
3141 if (mux_template == NULL) {
3142 GST_DEBUG_OBJECT (element,
3143 "searching for pad-template with name 'sink_%%d'");
3145 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3146 (splitmux->muxer), "sink_%d");
3149 if (mux_template == NULL) {
3150 GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3152 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3153 (splitmux->muxer), "sink");
3157 if (mux_template == NULL) {
3158 GST_ERROR_OBJECT (element,
3159 "unable to find a suitable sink pad-template on the muxer");
3162 GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3163 mux_template->name_template);
3165 if (mux_template->presence == GST_PAD_REQUEST) {
3166 GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3169 gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3170 muxer_is_requestpad = TRUE;
3171 } else if (mux_template->presence == GST_PAD_ALWAYS) {
3172 GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3175 gst_element_get_static_pad (splitmux->muxer,
3176 mux_template->name_template);
3178 GST_ERROR_OBJECT (element,
3179 "unexpected pad presence %d", mux_template->presence);
3183 /* Have a muxer pad name */
3184 if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3186 gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3187 muxer_is_requestpad = TRUE;
3189 g_free ((gchar *) muxer_padname);
3190 muxer_padname = NULL;
3193 /* One way or another, we must have a muxer pad by now */
3197 if (is_primary_video)
3198 gname = g_strdup ("video");
3199 else if (name == NULL)
3200 gname = gst_pad_get_name (muxpad);
3202 gname = g_strdup (name);
3204 qname = g_strdup_printf ("queue_%s", gname);
3205 if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3211 gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3213 g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3214 "max-size-buffers", 5, NULL);
3216 q_sink = gst_element_get_static_pad (q, "sink");
3217 q_src = gst_element_get_static_pad (q, "src");
3219 if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3220 if (muxer_is_requestpad)
3221 gst_element_release_request_pad (splitmux->muxer, muxpad);
3222 gst_object_unref (GST_OBJECT (muxpad));
3226 gst_object_unref (GST_OBJECT (muxpad));
3228 ctx = mq_stream_ctx_new (splitmux);
3229 /* Context holds a ref: */
3230 ctx->q = gst_object_ref (q);
3231 ctx->srcpad = q_src;
3232 ctx->sinkpad = q_sink;
3234 g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3235 g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3237 ctx->src_pad_block_id =
3238 gst_pad_add_probe (q_src,
3239 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3240 (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3241 if (is_primary_video && splitmux->reference_ctx != NULL) {
3242 splitmux->reference_ctx->is_reference = FALSE;
3243 splitmux->reference_ctx = NULL;
3245 if (splitmux->reference_ctx == NULL) {
3246 splitmux->reference_ctx = ctx;
3247 ctx->is_reference = TRUE;
3250 ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3251 g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3253 ctx->sink_pad_block_id =
3254 gst_pad_add_probe (q_sink,
3255 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3256 GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3257 (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3259 GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3260 " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3262 splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3266 if (is_primary_video)
3267 splitmux->have_video = TRUE;
3269 gst_pad_set_active (ret, TRUE);
3270 gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3272 GST_SPLITMUX_UNLOCK (splitmux);
3276 GST_SPLITMUX_UNLOCK (splitmux);
3279 gst_object_unref (q_sink);
3281 gst_object_unref (q_src);
3284 GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3285 GST_SPLITMUX_UNLOCK (splitmux);
3290 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3292 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3293 GstPad *muxpad = NULL;
3295 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3297 GST_SPLITMUX_LOCK (splitmux);
3299 if (splitmux->muxer == NULL)
3300 goto fail; /* Elements don't exist yet - nothing to release */
3302 GST_INFO_OBJECT (pad, "releasing request pad");
3304 muxpad = gst_pad_get_peer (ctx->srcpad);
3306 /* Remove the context from our consideration */
3307 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3309 GST_SPLITMUX_UNLOCK (splitmux);
3311 if (ctx->sink_pad_block_id) {
3312 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3313 gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3316 if (ctx->src_pad_block_id)
3317 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3319 GST_SPLITMUX_LOCK (splitmux);
3321 ctx->is_releasing = TRUE;
3322 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3324 /* Can release the context now */
3325 mq_stream_ctx_free (ctx);
3326 if (ctx == splitmux->reference_ctx)
3327 splitmux->reference_ctx = NULL;
3329 /* Release and free the muxer input */
3331 gst_element_release_request_pad (splitmux->muxer, muxpad);
3332 gst_object_unref (muxpad);
3335 if (GST_PAD_PAD_TEMPLATE (pad) &&
3336 g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3338 splitmux->have_video = FALSE;
3340 gst_element_remove_pad (element, pad);
3342 /* Reset the internal elements only after all request pads are released */
3343 if (splitmux->contexts == NULL)
3344 gst_splitmux_reset_elements (splitmux);
3346 /* Wake up other input streams to check if the completion conditions have
3348 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3351 GST_SPLITMUX_UNLOCK (splitmux);
3355 create_element (GstSplitMuxSink * splitmux,
3356 const gchar * factory, const gchar * name, gboolean locked)
3358 GstElement *ret = gst_element_factory_make (factory, name);
3360 g_warning ("Failed to create %s - splitmuxsink will not work", name);
3365 /* Ensure the sink starts in locked state and NULL - it will be changed
3366 * by the filename setting code */
3367 gst_element_set_locked_state (ret, TRUE);
3368 gst_element_set_state (ret, GST_STATE_NULL);
3371 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3372 g_warning ("Could not add %s element - splitmuxsink will not work", name);
3373 gst_object_unref (ret);
3381 create_muxer (GstSplitMuxSink * splitmux)
3383 /* Create internal elements */
3384 if (splitmux->muxer == NULL) {
3385 GstElement *provided_muxer = NULL;
3387 GST_OBJECT_LOCK (splitmux);
3388 if (splitmux->provided_muxer != NULL)
3389 provided_muxer = gst_object_ref (splitmux->provided_muxer);
3390 GST_OBJECT_UNLOCK (splitmux);
3392 if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3393 (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3394 if ((splitmux->muxer =
3395 create_element (splitmux,
3396 splitmux->muxer_factory ? splitmux->
3397 muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3399 } else if (splitmux->async_finalize) {
3400 if ((splitmux->muxer =
3401 create_element (splitmux, splitmux->muxer_factory, "muxer",
3404 if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3405 gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3406 splitmux->muxer_preset);
3407 if (splitmux->muxer_properties)
3408 gst_structure_foreach (splitmux->muxer_properties,
3409 _set_property_from_structure, splitmux->muxer);
3411 /* Ensure it's not in locked state (we might be reusing an old element) */
3412 gst_element_set_locked_state (provided_muxer, FALSE);
3413 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3414 g_warning ("Could not add muxer element - splitmuxsink will not work");
3415 gst_object_unref (provided_muxer);
3419 splitmux->muxer = provided_muxer;
3420 gst_object_unref (provided_muxer);
3423 if (splitmux->use_robust_muxing) {
3424 update_muxer_properties (splitmux);
3434 find_sink (GstElement * e)
3436 GstElement *res = NULL;
3438 gboolean done = FALSE;
3439 GValue data = { 0, };
3441 if (!GST_IS_BIN (e))
3444 if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3447 iter = gst_bin_iterate_sinks (GST_BIN (e));
3449 switch (gst_iterator_next (iter, &data)) {
3450 case GST_ITERATOR_OK:
3452 GstElement *child = g_value_get_object (&data);
3453 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3454 "location") != NULL) {
3458 g_value_reset (&data);
3461 case GST_ITERATOR_RESYNC:
3462 gst_iterator_resync (iter);
3464 case GST_ITERATOR_DONE:
3467 case GST_ITERATOR_ERROR:
3468 g_assert_not_reached ();
3472 g_value_unset (&data);
3473 gst_iterator_free (iter);
3479 create_sink (GstSplitMuxSink * splitmux)
3481 GstElement *provided_sink = NULL;
3483 if (splitmux->active_sink == NULL) {
3485 GST_OBJECT_LOCK (splitmux);
3486 if (splitmux->provided_sink != NULL)
3487 provided_sink = gst_object_ref (splitmux->provided_sink);
3488 GST_OBJECT_UNLOCK (splitmux);
3490 if ((!splitmux->async_finalize && provided_sink == NULL) ||
3491 (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3492 if ((splitmux->sink =
3493 create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3495 splitmux->active_sink = splitmux->sink;
3496 } else if (splitmux->async_finalize) {
3497 if ((splitmux->sink =
3498 create_element (splitmux, splitmux->sink_factory, "sink",
3501 if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3502 gst_preset_load_preset (GST_PRESET (splitmux->sink),
3503 splitmux->sink_preset);
3504 if (splitmux->sink_properties)
3505 gst_structure_foreach (splitmux->sink_properties,
3506 _set_property_from_structure, splitmux->sink);
3507 splitmux->active_sink = splitmux->sink;
3509 /* Ensure the sink starts in locked state and NULL - it will be changed
3510 * by the filename setting code */
3511 gst_element_set_locked_state (provided_sink, TRUE);
3512 gst_element_set_state (provided_sink, GST_STATE_NULL);
3513 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3514 g_warning ("Could not add sink elements - splitmuxsink will not work");
3515 gst_object_unref (provided_sink);
3519 splitmux->active_sink = provided_sink;
3521 /* The bin holds a ref now, we can drop our tmp ref */
3522 gst_object_unref (provided_sink);
3524 /* Find the sink element */
3525 splitmux->sink = find_sink (splitmux->active_sink);
3526 if (splitmux->sink == NULL) {
3528 ("Could not locate sink element in provided sink - splitmuxsink will not work");
3534 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3536 /* async child elements are causing state change races and weird
3537 * failures, so let's try and turn that off */
3538 g_object_set (splitmux->sink, "async", FALSE, NULL);
3542 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3543 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3554 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3557 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3559 gchar *fname = NULL;
3563 gst_splitmux_sink_ensure_max_files (splitmux);
3565 if (ctx->cur_out_buffer == NULL) {
3566 GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3569 caps = gst_pad_get_current_caps (ctx->srcpad);
3570 sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3571 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3572 splitmux->fragment_id, sample, &fname);
3573 gst_sample_unref (sample);
3575 gst_caps_unref (caps);
3577 if (fname == NULL) {
3578 /* Fallback to the old signal if the new one returned nothing */
3579 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3580 splitmux->fragment_id, &fname);
3584 fname = splitmux->location ?
3585 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3588 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3589 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3590 "location") != NULL)
3591 g_object_set (splitmux->sink, "location", fname, NULL);
3595 splitmux->fragment_id++;
3598 /* called with GST_SPLITMUX_LOCK */
3600 do_async_start (GstSplitMuxSink * splitmux)
3602 GstMessage *message;
3604 if (!splitmux->need_async_start) {
3605 GST_INFO_OBJECT (splitmux, "no async_start needed");
3609 splitmux->async_pending = TRUE;
3611 GST_INFO_OBJECT (splitmux, "Sending async_start message");
3612 message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3614 GST_SPLITMUX_UNLOCK (splitmux);
3615 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3616 (splitmux), message);
3617 GST_SPLITMUX_LOCK (splitmux);
3620 /* called with GST_SPLITMUX_LOCK */
3622 do_async_done (GstSplitMuxSink * splitmux)
3624 GstMessage *message;
3626 if (splitmux->async_pending) {
3627 GST_INFO_OBJECT (splitmux, "Sending async_done message");
3628 splitmux->async_pending = FALSE;
3629 GST_SPLITMUX_UNLOCK (splitmux);
3632 gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3633 GST_CLOCK_TIME_NONE);
3634 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3635 (splitmux), message);
3636 GST_SPLITMUX_LOCK (splitmux);
3639 splitmux->need_async_start = FALSE;
3643 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3645 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3646 splitmux->gop_start_time = splitmux->fragment_start_time =
3647 GST_CLOCK_STIME_NONE;
3648 splitmux->max_out_running_time = 0;
3649 splitmux->fragment_total_bytes = 0;
3650 splitmux->fragment_reference_bytes = 0;
3651 splitmux->gop_total_bytes = 0;
3652 splitmux->gop_reference_bytes = 0;
3653 splitmux->muxed_out_bytes = 0;
3654 splitmux->ready_for_output = FALSE;
3656 g_atomic_int_set (&(splitmux->split_requested), FALSE);
3657 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3659 splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
3660 gst_queue_array_clear (splitmux->times_to_split);
3662 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
3663 splitmux->queued_keyframes = 0;
3665 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
3666 g_queue_clear (&splitmux->out_cmd_q);
3669 static GstStateChangeReturn
3670 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3672 GstStateChangeReturn ret;
3673 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3675 switch (transition) {
3676 case GST_STATE_CHANGE_NULL_TO_READY:{
3677 GST_SPLITMUX_LOCK (splitmux);
3678 if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3679 ret = GST_STATE_CHANGE_FAILURE;
3680 GST_SPLITMUX_UNLOCK (splitmux);
3683 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3684 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3685 GST_SPLITMUX_UNLOCK (splitmux);
3686 splitmux->fragment_id = splitmux->start_index;
3689 case GST_STATE_CHANGE_READY_TO_PAUSED:{
3690 GST_SPLITMUX_LOCK (splitmux);
3691 /* Make sure contexts and tracking times are cleared, in case we're being reused */
3692 gst_splitmux_sink_reset (splitmux);
3693 /* Start by collecting one input on each pad */
3694 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3695 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3697 GST_SPLITMUX_UNLOCK (splitmux);
3699 GST_SPLITMUX_STATE_LOCK (splitmux);
3700 splitmux->shutdown = FALSE;
3701 GST_SPLITMUX_STATE_UNLOCK (splitmux);
3704 case GST_STATE_CHANGE_PAUSED_TO_READY:
3705 case GST_STATE_CHANGE_READY_TO_READY:
3706 g_atomic_int_set (&(splitmux->split_requested), FALSE);
3707 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3709 case GST_STATE_CHANGE_READY_TO_NULL:
3710 GST_SPLITMUX_STATE_LOCK (splitmux);
3711 splitmux->shutdown = TRUE;
3712 GST_SPLITMUX_STATE_UNLOCK (splitmux);
3714 GST_SPLITMUX_LOCK (splitmux);
3715 gst_splitmux_sink_reset (splitmux);
3716 splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3717 splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3718 /* Wake up any blocked threads */
3719 GST_LOG_OBJECT (splitmux,
3720 "State change -> NULL or READY. Waking threads");
3721 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3722 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
3723 GST_SPLITMUX_UNLOCK (splitmux);
3729 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3730 if (ret == GST_STATE_CHANGE_FAILURE)
3733 switch (transition) {
3734 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3735 splitmux->need_async_start = TRUE;
3737 case GST_STATE_CHANGE_READY_TO_PAUSED:{
3738 /* Change state async, because our child sink might not
3739 * be ready to do that for us yet if it's state is still locked */
3741 splitmux->need_async_start = TRUE;
3742 /* we want to go async to PAUSED until we managed to configure and add the
3744 GST_SPLITMUX_LOCK (splitmux);
3745 do_async_start (splitmux);
3746 GST_SPLITMUX_UNLOCK (splitmux);
3747 ret = GST_STATE_CHANGE_ASYNC;
3750 case GST_STATE_CHANGE_READY_TO_NULL:
3751 GST_SPLITMUX_LOCK (splitmux);
3752 splitmux->fragment_id = 0;
3753 /* Reset internal elements only if no pad contexts are using them */
3754 if (splitmux->contexts == NULL)
3755 gst_splitmux_reset_elements (splitmux);
3756 do_async_done (splitmux);
3757 GST_SPLITMUX_UNLOCK (splitmux);
3766 if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
3767 /* Cleanup elements on failed transition out of NULL */
3768 gst_splitmux_reset_elements (splitmux);
3769 GST_SPLITMUX_LOCK (splitmux);
3770 do_async_done (splitmux);
3771 GST_SPLITMUX_UNLOCK (splitmux);
3773 if (transition == GST_STATE_CHANGE_READY_TO_READY) {
3774 /* READY to READY transition only happens when we're already
3775 * in READY state, but a child element is in NULL, which
3776 * happens when there's an error changing the state of the sink.
3777 * We need to make sure not to fail the state transition, or
3778 * the core won't transition us back to NULL successfully */
3779 ret = GST_STATE_CHANGE_SUCCESS;
3785 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
3787 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
3788 splitmux->fragment_id = 0;
3793 split_now (GstSplitMuxSink * splitmux)
3795 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
3799 split_after (GstSplitMuxSink * splitmux)
3801 g_atomic_int_set (&(splitmux->split_requested), TRUE);
3805 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
3807 gboolean send_keyframe_requests;
3809 GST_SPLITMUX_LOCK (splitmux);
3810 gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
3811 send_keyframe_requests = splitmux->send_keyframe_requests;
3812 GST_SPLITMUX_UNLOCK (splitmux);
3814 if (send_keyframe_requests) {
3816 gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
3817 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
3818 GST_TIME_ARGS (split_time));
3819 if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
3820 GST_WARNING_OBJECT (splitmux,
3821 "Could not request keyframe at %" GST_TIME_FORMAT,
3822 GST_TIME_ARGS (split_time));