1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014-2019> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @title: splitmuxsink
23 * @short_description: Muxer wrapper for splitting output stream by size or time
25 * This element wraps a muxer and a sink, and starts a new file when the mux
26 * contents are about to cross a threshold of maximum size of maximum time,
27 * splitting at video keyframe boundaries. Exactly one input video stream
28 * can be muxed, with as many accompanying audio and subtitle streams as
31 * By default, it uses mp4mux and filesink, but they can be changed via
32 * the 'muxer' and 'sink' properties.
34 * The minimum file size is 1 GOP, however - so limits may be overrun if the
35 * distance between any 2 keyframes is larger than the limits.
37 * If a video stream is available, the splitting process is driven by the video
38 * stream contents, and the video stream must contain closed GOPs for the output
39 * file parts to be played individually correctly. In the absence of a video
40 * stream, the first available stream is used as reference for synchronization.
42 * In the async-finalize mode, when the threshold is crossed, the old muxer
43 * and sink is disconnected from the pipeline and left to finish the file
44 * asynchronously, and a new muxer and sink is created to continue with the
45 * next fragment. For that reason, instead of muxer and sink objects, the
46 * muxer-factory and sink-factory properties are used to construct the new
47 * objects, together with muxer-properties and sink-properties.
49 * ## Example pipelines
51 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
53 * Records a video stream captured from a v4l2 device and muxes it into
54 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55 * and 1MB maximum size.
58 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
60 * Records a video stream captured from a v4l2 device and muxer it into
61 * streamable Matroska files, splitting as needed to limit size/duration to 10
62 * seconds. Each file will finalize asynchronously.
65 * gst-launch-1.0 videotestsrc num-buffers=10 ! jpegenc ! .video splitmuxsink muxer=qtmux muxer-pad-map=x-pad-map,video=video_1 location=test%05d.mp4 -v
67 * Records 10 frames to an mp4 file, using a muxer-pad-map to make explicit mappings between the splitmuxsink sink pad and the corresponding muxer pad
76 #include <glib/gstdio.h>
77 #include <gst/video/video.h>
78 #include "gstsplitmuxsink.h"
80 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
81 #define GST_CAT_DEFAULT splitmux_debug
83 #define GST_SPLITMUX_STATE_LOCK(s) g_mutex_lock(&(s)->state_lock)
84 #define GST_SPLITMUX_STATE_UNLOCK(s) g_mutex_unlock(&(s)->state_lock)
86 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
87 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
88 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
89 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
91 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
92 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
94 static void split_now (GstSplitMuxSink * splitmux);
95 static void split_after (GstSplitMuxSink * splitmux);
96 static void split_at_running_time (GstSplitMuxSink * splitmux,
97 GstClockTime split_time);
106 PROP_MAX_SIZE_TIMECODE,
107 PROP_SEND_KEYFRAME_REQUESTS,
110 PROP_USE_ROBUST_MUXING,
111 PROP_ALIGNMENT_THRESHOLD,
118 PROP_MUXER_PROPERTIES,
121 PROP_SINK_PROPERTIES,
125 #define DEFAULT_MAX_SIZE_TIME 0
126 #define DEFAULT_MAX_SIZE_BYTES 0
127 #define DEFAULT_MAX_FILES 0
128 #define DEFAULT_MUXER_OVERHEAD 0.02
129 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
130 #define DEFAULT_ALIGNMENT_THRESHOLD 0
131 #define DEFAULT_MUXER "mp4mux"
132 #define DEFAULT_SINK "filesink"
133 #define DEFAULT_USE_ROBUST_MUXING FALSE
134 #define DEFAULT_RESET_MUXER TRUE
135 #define DEFAULT_ASYNC_FINALIZE FALSE
136 #define DEFAULT_START_INDEX 0
138 typedef struct _AsyncEosHelper
146 SIGNAL_FORMAT_LOCATION,
147 SIGNAL_FORMAT_LOCATION_FULL,
150 SIGNAL_SPLIT_AT_RUNNING_TIME,
156 static guint signals[SIGNAL_LAST];
158 static GstStaticPadTemplate video_sink_template =
159 GST_STATIC_PAD_TEMPLATE ("video",
162 GST_STATIC_CAPS_ANY);
163 static GstStaticPadTemplate video_aux_sink_template =
164 GST_STATIC_PAD_TEMPLATE ("video_aux_%u",
167 GST_STATIC_CAPS_ANY);
168 static GstStaticPadTemplate audio_sink_template =
169 GST_STATIC_PAD_TEMPLATE ("audio_%u",
172 GST_STATIC_CAPS_ANY);
173 static GstStaticPadTemplate subtitle_sink_template =
174 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
177 GST_STATIC_CAPS_ANY);
178 static GstStaticPadTemplate caption_sink_template =
179 GST_STATIC_PAD_TEMPLATE ("caption_%u",
182 GST_STATIC_CAPS_ANY);
184 static GQuark PAD_CONTEXT;
185 static GQuark EOS_FROM_US;
186 static GQuark RUNNING_TIME;
187 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
188 * to forward an incoming EOS message, but we cannot rely on the state of the
189 * splitmux anymore, so we set this qdata on the sink instead.
190 * The muxer and sink must be destroyed after both of these things have
192 * 1) The EOS message has been sent when the fragment is ending
193 * 2) The muxer has been unlinked and relinked
194 * Therefore, EOS_FROM_US can have these two values:
195 * 0: EOS was not requested from us. Forward the message. The muxer and the
196 * sink will be destroyed together with the rest of the bin.
197 * 1: EOS was requested from us, but the other of the two tasks hasn't
198 * finished. Set EOS_FROM_US to 2 and do your stuff.
199 * 2: EOS was requested from us and the other of the two tasks has finished.
200 * Now we can destroy the muxer and the sink.
206 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
207 EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
208 RUNNING_TIME = g_quark_from_static_string ("running-time");
209 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
210 "Split File Muxing Sink");
213 #define gst_splitmux_sink_parent_class parent_class
214 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
216 GST_ELEMENT_REGISTER_DEFINE (splitmuxsink, "splitmuxsink", GST_RANK_NONE,
217 GST_TYPE_SPLITMUX_SINK);
219 static gboolean create_muxer (GstSplitMuxSink * splitmux);
220 static gboolean create_sink (GstSplitMuxSink * splitmux);
221 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
222 const GValue * value, GParamSpec * pspec);
223 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
224 GValue * value, GParamSpec * pspec);
225 static void gst_splitmux_sink_dispose (GObject * object);
226 static void gst_splitmux_sink_finalize (GObject * object);
228 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
229 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
230 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
232 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
233 element, GstStateChange transition);
235 static void bus_handler (GstBin * bin, GstMessage * msg);
236 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
237 static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
239 static void mq_stream_ctx_free (MqStreamCtx * ctx);
240 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
242 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
243 static GstElement *create_element (GstSplitMuxSink * splitmux,
244 const gchar * factory, const gchar * name, gboolean locked);
246 static void do_async_done (GstSplitMuxSink * splitmux);
247 static void gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux);
250 mq_stream_buf_new (void)
252 return g_slice_new0 (MqStreamBuf);
256 mq_stream_buf_free (MqStreamBuf * data)
258 g_slice_free (MqStreamBuf, data);
261 static SplitMuxOutputCommand *
262 out_cmd_buf_new (void)
264 return g_slice_new0 (SplitMuxOutputCommand);
268 out_cmd_buf_free (SplitMuxOutputCommand * data)
270 g_slice_free (SplitMuxOutputCommand, data);
274 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
276 GObjectClass *gobject_class = (GObjectClass *) klass;
277 GstElementClass *gstelement_class = (GstElementClass *) klass;
278 GstBinClass *gstbin_class = (GstBinClass *) klass;
280 gobject_class->set_property = gst_splitmux_sink_set_property;
281 gobject_class->get_property = gst_splitmux_sink_get_property;
282 gobject_class->dispose = gst_splitmux_sink_dispose;
283 gobject_class->finalize = gst_splitmux_sink_finalize;
285 gst_element_class_set_static_metadata (gstelement_class,
286 "Split Muxing Bin", "Generic/Bin/Muxer",
287 "Convenience bin that muxes incoming streams into multiple time/size limited files",
288 "Jan Schmidt <jan@centricular.com>");
290 gst_element_class_add_static_pad_template (gstelement_class,
291 &video_sink_template);
292 gst_element_class_add_static_pad_template (gstelement_class,
293 &video_aux_sink_template);
294 gst_element_class_add_static_pad_template (gstelement_class,
295 &audio_sink_template);
296 gst_element_class_add_static_pad_template (gstelement_class,
297 &subtitle_sink_template);
298 gst_element_class_add_static_pad_template (gstelement_class,
299 &caption_sink_template);
301 gstelement_class->change_state =
302 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
303 gstelement_class->request_new_pad =
304 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
305 gstelement_class->release_pad =
306 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
308 gstbin_class->handle_message = bus_handler;
310 g_object_class_install_property (gobject_class, PROP_LOCATION,
311 g_param_spec_string ("location", "File Output Pattern",
312 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
313 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
314 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
315 g_param_spec_double ("mux-overhead", "Muxing Overhead",
316 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
317 DEFAULT_MUXER_OVERHEAD,
318 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
320 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
321 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
322 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
323 DEFAULT_MAX_SIZE_TIME,
324 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
325 G_PARAM_STATIC_STRINGS));
326 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
327 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
328 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
329 DEFAULT_MAX_SIZE_BYTES,
330 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
331 G_PARAM_STATIC_STRINGS));
332 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
333 g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
334 "Maximum difference in timecode between first and last frame. "
335 "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
336 "Will only be effective if a timecode track is present.", NULL,
337 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
338 G_PARAM_STATIC_STRINGS));
339 g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
340 g_param_spec_boolean ("send-keyframe-requests",
341 "Request keyframes at max-size-time",
342 "Request a keyframe every max-size-time ns to try splitting at that point. "
343 "Needs max-size-bytes to be 0 in order to be effective.",
344 DEFAULT_SEND_KEYFRAME_REQUESTS,
345 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
346 G_PARAM_STATIC_STRINGS));
347 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
348 g_param_spec_uint ("max-files", "Max files",
349 "Maximum number of files to keep on disk. Once the maximum is reached,"
350 "old files start to be deleted to make room for new ones.", 0,
351 G_MAXUINT, DEFAULT_MAX_FILES,
352 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353 g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
354 g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
355 "Allow non-reference streams to be that many ns before the reference"
356 " stream", 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
357 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
358 G_PARAM_STATIC_STRINGS));
360 g_object_class_install_property (gobject_class, PROP_MUXER,
361 g_param_spec_object ("muxer", "Muxer",
362 "The muxer element to use (NULL = default mp4mux). "
363 "Valid only for async-finalize = FALSE",
364 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
365 g_object_class_install_property (gobject_class, PROP_SINK,
366 g_param_spec_object ("sink", "Sink",
367 "The sink element (or element chain) to use (NULL = default filesink). "
368 "Valid only for async-finalize = FALSE",
369 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
371 g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
372 g_param_spec_boolean ("use-robust-muxing",
373 "Support robust-muxing mode of some muxers",
374 "Check if muxers support robust muxing via the reserved-max-duration and "
375 "reserved-duration-remaining properties and use them if so. "
376 "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
377 " create new fragments if the reserved header space is about to overflow. "
378 "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
379 "manually by the app to a non-zero value for robust muxing to have an effect.",
380 DEFAULT_USE_ROBUST_MUXING,
381 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383 g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
384 g_param_spec_boolean ("reset-muxer",
386 "Reset the muxer after each segment. Disabling this will not work for most muxers.",
387 DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
389 g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
390 g_param_spec_boolean ("async-finalize",
391 "Finalize fragments asynchronously",
392 "Finalize each fragment asynchronously and start a new one",
393 DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
394 g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
395 g_param_spec_string ("muxer-factory", "Muxer factory",
396 "The muxer element factory to use (default = mp4mux). "
397 "Valid only for async-finalize = TRUE",
398 "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
400 * GstSplitMuxSink:muxer-preset
402 * An optional #GstPreset name to use for the muxer. This only has an effect
403 * in `async-finalize=TRUE` mode.
407 g_object_class_install_property (gobject_class, PROP_MUXER_PRESET,
408 g_param_spec_string ("muxer-preset", "Muxer preset",
409 "The muxer preset to use. "
410 "Valid only for async-finalize = TRUE",
411 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
412 g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
413 g_param_spec_boxed ("muxer-properties", "Muxer properties",
414 "The muxer element properties to use. "
415 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
416 "Valid only for async-finalize = TRUE",
417 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
418 g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
419 g_param_spec_string ("sink-factory", "Sink factory",
420 "The sink element factory to use (default = filesink). "
421 "Valid only for async-finalize = TRUE",
422 "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
424 * GstSplitMuxSink:sink-preset
426 * An optional #GstPreset name to use for the sink. This only has an effect
427 * in `async-finalize=TRUE` mode.
431 g_object_class_install_property (gobject_class, PROP_SINK_PRESET,
432 g_param_spec_string ("sink-preset", "Sink preset",
433 "The sink preset to use. "
434 "Valid only for async-finalize = TRUE",
435 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
436 g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
437 g_param_spec_boxed ("sink-properties", "Sink properties",
438 "The sink element properties to use. "
439 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
440 "Valid only for async-finalize = TRUE",
441 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
442 g_object_class_install_property (gobject_class, PROP_START_INDEX,
443 g_param_spec_int ("start-index", "Start Index",
444 "Start value of fragment index.",
445 0, G_MAXINT, DEFAULT_START_INDEX,
446 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
449 * GstSplitMuxSink::muxer-pad-map
451 * An optional GstStructure that provides a map from splitmuxsink sinkpad
452 * names to muxer pad names they should feed. Splitmuxsink has some default
453 * mapping behaviour to link video to video pads and audio to audio pads
454 * that usually works fine. This property is useful if you need to ensure
455 * a particular mapping to muxed streams.
457 * The GstStructure contains string fields like so:
458 * splitmuxsink muxer-pad-map=x-pad-map,video=video_1
462 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MUXERPAD_MAP,
463 g_param_spec_boxed ("muxer-pad-map", "Muxer pad map",
464 "A GstStructure specifies the mapping from splitmuxsink sink pads to muxer pads",
466 (GParamFlags) (G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)));
469 * GstSplitMuxSink::format-location:
470 * @splitmux: the #GstSplitMuxSink
471 * @fragment_id: the sequence number of the file to be created
473 * Returns: the location to be used for the next output file. This must be
474 * a newly-allocated string which will be freed with g_free() by the
475 * splitmuxsink element when it no longer needs it, so use g_strdup() or
476 * g_strdup_printf() or similar functions to allocate it.
478 signals[SIGNAL_FORMAT_LOCATION] =
479 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
480 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
483 * GstSplitMuxSink::format-location-full:
484 * @splitmux: the #GstSplitMuxSink
485 * @fragment_id: the sequence number of the file to be created
486 * @first_sample: A #GstSample containing the first buffer
487 * from the reference stream in the new file
489 * Returns: the location to be used for the next output file. This must be
490 * a newly-allocated string which will be freed with g_free() by the
491 * splitmuxsink element when it no longer needs it, so use g_strdup() or
492 * g_strdup_printf() or similar functions to allocate it.
496 signals[SIGNAL_FORMAT_LOCATION_FULL] =
497 g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
498 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
502 * GstSplitMuxSink::split-now:
503 * @splitmux: the #GstSplitMuxSink
505 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
506 * The current GOP will be output to the new file.
510 signals[SIGNAL_SPLIT_NOW] =
511 g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
512 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
513 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_now), NULL, NULL, NULL,
517 * GstSplitMuxSink::split-after:
518 * @splitmux: the #GstSplitMuxSink
520 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
521 * Unlike the 'split-now' signal, with 'split-after', the current GOP will be output to the old file.
525 signals[SIGNAL_SPLIT_AFTER] =
526 g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
527 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
528 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_after), NULL, NULL, NULL,
532 * GstSplitMuxSink::split-at-running-time:
533 * @splitmux: the #GstSplitMuxSink
535 * When called by the user, this action signal splits the video file (and
536 * begins a new one) as soon as the given running time is reached. If this
537 * action signal is called multiple times, running times are queued up and
538 * processed in the order they were given.
540 * Note that this is prone to race conditions, where said running time is
541 * reached and surpassed before we had a chance to split. The file will
542 * still split immediately, but in order to make sure that the split doesn't
543 * happen too late, it is recommended to call this action signal from
544 * something that will prevent further buffers from flowing into
545 * splitmuxsink before the split is completed, such as a pad probe before
551 signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
552 g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
553 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
554 G_STRUCT_OFFSET (GstSplitMuxSinkClass, split_at_running_time), NULL, NULL,
555 NULL, G_TYPE_NONE, 1, G_TYPE_UINT64);
558 * GstSplitMuxSink::muxer-added:
559 * @splitmux: the #GstSplitMuxSink
560 * @muxer: the newly added muxer element
564 signals[SIGNAL_MUXER_ADDED] =
565 g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
566 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
569 * GstSplitMuxSink::sink-added:
570 * @splitmux: the #GstSplitMuxSink
571 * @sink: the newly added sink element
575 signals[SIGNAL_SINK_ADDED] =
576 g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
577 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
579 klass->split_now = split_now;
580 klass->split_after = split_after;
581 klass->split_at_running_time = split_at_running_time;
585 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
587 g_mutex_init (&splitmux->lock);
588 g_mutex_init (&splitmux->state_lock);
589 g_cond_init (&splitmux->input_cond);
590 g_cond_init (&splitmux->output_cond);
591 g_queue_init (&splitmux->out_cmd_q);
593 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
594 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
595 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
596 splitmux->max_files = DEFAULT_MAX_FILES;
597 splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
598 splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
599 splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
600 splitmux->reset_muxer = DEFAULT_RESET_MUXER;
602 splitmux->threshold_timecode_str = NULL;
603 gst_splitmux_reset_timecode (splitmux);
605 splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
606 splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
607 splitmux->muxer_properties = NULL;
608 splitmux->sink_factory = g_strdup (DEFAULT_SINK);
609 splitmux->sink_properties = NULL;
611 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
612 splitmux->split_requested = FALSE;
613 splitmux->do_split_next_gop = FALSE;
614 splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
615 splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
619 gst_splitmux_reset_elements (GstSplitMuxSink * splitmux)
621 if (splitmux->muxer) {
622 gst_element_set_locked_state (splitmux->muxer, TRUE);
623 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
624 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
626 if (splitmux->active_sink) {
627 gst_element_set_locked_state (splitmux->active_sink, TRUE);
628 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
629 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
632 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
636 gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux)
638 g_clear_pointer (&splitmux->in_tc, gst_video_time_code_free);
639 g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
640 g_clear_pointer (&splitmux->gop_start_tc, gst_video_time_code_free);
641 splitmux->next_fragment_start_tc_time = GST_CLOCK_TIME_NONE;
645 gst_splitmux_sink_dispose (GObject * object)
647 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
649 /* Calling parent dispose invalidates all child pointers */
650 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
652 G_OBJECT_CLASS (parent_class)->dispose (object);
656 gst_splitmux_sink_finalize (GObject * object)
658 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
659 g_cond_clear (&splitmux->input_cond);
660 g_cond_clear (&splitmux->output_cond);
661 g_mutex_clear (&splitmux->lock);
662 g_mutex_clear (&splitmux->state_lock);
663 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
664 g_queue_clear (&splitmux->out_cmd_q);
666 if (splitmux->muxerpad_map)
667 gst_structure_free (splitmux->muxerpad_map);
669 if (splitmux->provided_sink)
670 gst_object_unref (splitmux->provided_sink);
671 if (splitmux->provided_muxer)
672 gst_object_unref (splitmux->provided_muxer);
674 if (splitmux->muxer_factory)
675 g_free (splitmux->muxer_factory);
676 if (splitmux->muxer_preset)
677 g_free (splitmux->muxer_preset);
678 if (splitmux->muxer_properties)
679 gst_structure_free (splitmux->muxer_properties);
680 if (splitmux->sink_factory)
681 g_free (splitmux->sink_factory);
682 if (splitmux->sink_preset)
683 g_free (splitmux->sink_preset);
684 if (splitmux->sink_properties)
685 gst_structure_free (splitmux->sink_properties);
687 if (splitmux->threshold_timecode_str)
688 g_free (splitmux->threshold_timecode_str);
689 if (splitmux->tc_interval)
690 gst_video_time_code_interval_free (splitmux->tc_interval);
692 if (splitmux->times_to_split)
693 gst_queue_array_free (splitmux->times_to_split);
695 g_free (splitmux->location);
697 /* Make sure to free any un-released contexts. There should not be any,
698 * because the dispose will have freed all request pads though */
699 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
700 g_list_free (splitmux->contexts);
701 gst_splitmux_reset_timecode (splitmux);
703 G_OBJECT_CLASS (parent_class)->finalize (object);
707 * Set any time threshold to the muxer, if it has
708 * reserved-max-duration and reserved-duration-remaining
709 * properties. Called when creating/claiming the muxer
710 * in create_elements() */
712 update_muxer_properties (GstSplitMuxSink * sink)
715 GstClockTime threshold_time;
717 sink->muxer_has_reserved_props = FALSE;
718 if (sink->muxer == NULL)
720 klass = G_OBJECT_GET_CLASS (sink->muxer);
721 if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
723 if (g_object_class_find_property (klass,
724 "reserved-duration-remaining") == NULL)
726 sink->muxer_has_reserved_props = TRUE;
728 GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
729 GST_TIME_ARGS (sink->threshold_time));
730 GST_OBJECT_LOCK (sink);
731 threshold_time = sink->threshold_time;
732 GST_OBJECT_UNLOCK (sink);
734 if (threshold_time > 0) {
735 /* Tell the muxer how much space to reserve */
736 GstClockTime muxer_threshold = threshold_time;
737 g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
742 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
743 const GValue * value, GParamSpec * pspec)
745 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
749 GST_OBJECT_LOCK (splitmux);
750 g_free (splitmux->location);
751 splitmux->location = g_value_dup_string (value);
752 GST_OBJECT_UNLOCK (splitmux);
755 case PROP_START_INDEX:
756 GST_OBJECT_LOCK (splitmux);
757 splitmux->start_index = g_value_get_int (value);
758 GST_OBJECT_UNLOCK (splitmux);
760 case PROP_MAX_SIZE_BYTES:
761 GST_OBJECT_LOCK (splitmux);
762 splitmux->threshold_bytes = g_value_get_uint64 (value);
763 GST_OBJECT_UNLOCK (splitmux);
765 case PROP_MAX_SIZE_TIME:
766 GST_OBJECT_LOCK (splitmux);
767 splitmux->threshold_time = g_value_get_uint64 (value);
768 GST_OBJECT_UNLOCK (splitmux);
770 case PROP_MAX_SIZE_TIMECODE:
771 GST_OBJECT_LOCK (splitmux);
772 g_free (splitmux->threshold_timecode_str);
773 /* will be calculated later */
774 g_clear_pointer (&splitmux->tc_interval,
775 gst_video_time_code_interval_free);
776 gst_splitmux_reset_timecode (splitmux);
778 splitmux->threshold_timecode_str = g_value_dup_string (value);
779 if (splitmux->threshold_timecode_str) {
780 splitmux->tc_interval =
781 gst_video_time_code_interval_new_from_string
782 (splitmux->threshold_timecode_str);
783 if (!splitmux->tc_interval) {
784 g_warning ("Wrong timecode string %s",
785 splitmux->threshold_timecode_str);
786 g_free (splitmux->threshold_timecode_str);
787 splitmux->threshold_timecode_str = NULL;
790 GST_OBJECT_UNLOCK (splitmux);
792 case PROP_SEND_KEYFRAME_REQUESTS:
793 GST_OBJECT_LOCK (splitmux);
794 splitmux->send_keyframe_requests = g_value_get_boolean (value);
795 GST_OBJECT_UNLOCK (splitmux);
798 GST_OBJECT_LOCK (splitmux);
799 splitmux->max_files = g_value_get_uint (value);
800 GST_OBJECT_UNLOCK (splitmux);
802 case PROP_MUXER_OVERHEAD:
803 GST_OBJECT_LOCK (splitmux);
804 splitmux->mux_overhead = g_value_get_double (value);
805 GST_OBJECT_UNLOCK (splitmux);
807 case PROP_USE_ROBUST_MUXING:
808 GST_OBJECT_LOCK (splitmux);
809 splitmux->use_robust_muxing = g_value_get_boolean (value);
810 GST_OBJECT_UNLOCK (splitmux);
811 if (splitmux->use_robust_muxing)
812 update_muxer_properties (splitmux);
814 case PROP_ALIGNMENT_THRESHOLD:
815 GST_OBJECT_LOCK (splitmux);
816 splitmux->alignment_threshold = g_value_get_uint64 (value);
817 GST_OBJECT_UNLOCK (splitmux);
820 GST_OBJECT_LOCK (splitmux);
821 gst_clear_object (&splitmux->provided_sink);
822 splitmux->provided_sink = g_value_get_object (value);
823 if (splitmux->provided_sink)
824 gst_object_ref_sink (splitmux->provided_sink);
825 GST_OBJECT_UNLOCK (splitmux);
828 GST_OBJECT_LOCK (splitmux);
829 gst_clear_object (&splitmux->provided_muxer);
830 splitmux->provided_muxer = g_value_get_object (value);
831 if (splitmux->provided_muxer)
832 gst_object_ref_sink (splitmux->provided_muxer);
833 GST_OBJECT_UNLOCK (splitmux);
835 case PROP_RESET_MUXER:
836 GST_OBJECT_LOCK (splitmux);
837 splitmux->reset_muxer = g_value_get_boolean (value);
838 GST_OBJECT_UNLOCK (splitmux);
840 case PROP_ASYNC_FINALIZE:
841 GST_OBJECT_LOCK (splitmux);
842 splitmux->async_finalize = g_value_get_boolean (value);
843 GST_OBJECT_UNLOCK (splitmux);
845 case PROP_MUXER_FACTORY:
846 GST_OBJECT_LOCK (splitmux);
847 if (splitmux->muxer_factory)
848 g_free (splitmux->muxer_factory);
849 splitmux->muxer_factory = g_value_dup_string (value);
850 GST_OBJECT_UNLOCK (splitmux);
852 case PROP_MUXER_PRESET:
853 GST_OBJECT_LOCK (splitmux);
854 if (splitmux->muxer_preset)
855 g_free (splitmux->muxer_preset);
856 splitmux->muxer_preset = g_value_dup_string (value);
857 GST_OBJECT_UNLOCK (splitmux);
859 case PROP_MUXER_PROPERTIES:
860 GST_OBJECT_LOCK (splitmux);
861 if (splitmux->muxer_properties)
862 gst_structure_free (splitmux->muxer_properties);
863 if (gst_value_get_structure (value))
864 splitmux->muxer_properties =
865 gst_structure_copy (gst_value_get_structure (value));
867 splitmux->muxer_properties = NULL;
868 GST_OBJECT_UNLOCK (splitmux);
870 case PROP_SINK_FACTORY:
871 GST_OBJECT_LOCK (splitmux);
872 if (splitmux->sink_factory)
873 g_free (splitmux->sink_factory);
874 splitmux->sink_factory = g_value_dup_string (value);
875 GST_OBJECT_UNLOCK (splitmux);
877 case PROP_SINK_PRESET:
878 GST_OBJECT_LOCK (splitmux);
879 if (splitmux->sink_preset)
880 g_free (splitmux->sink_preset);
881 splitmux->sink_preset = g_value_dup_string (value);
882 GST_OBJECT_UNLOCK (splitmux);
884 case PROP_SINK_PROPERTIES:
885 GST_OBJECT_LOCK (splitmux);
886 if (splitmux->sink_properties)
887 gst_structure_free (splitmux->sink_properties);
888 if (gst_value_get_structure (value))
889 splitmux->sink_properties =
890 gst_structure_copy (gst_value_get_structure (value));
892 splitmux->sink_properties = NULL;
893 GST_OBJECT_UNLOCK (splitmux);
895 case PROP_MUXERPAD_MAP:
897 const GstStructure *s = gst_value_get_structure (value);
898 GST_SPLITMUX_LOCK (splitmux);
899 if (splitmux->muxerpad_map) {
900 gst_structure_free (splitmux->muxerpad_map);
903 splitmux->muxerpad_map = gst_structure_copy (s);
905 splitmux->muxerpad_map = NULL;
906 GST_SPLITMUX_UNLOCK (splitmux);
910 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
916 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
917 GValue * value, GParamSpec * pspec)
919 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
923 GST_OBJECT_LOCK (splitmux);
924 g_value_set_string (value, splitmux->location);
925 GST_OBJECT_UNLOCK (splitmux);
927 case PROP_START_INDEX:
928 GST_OBJECT_LOCK (splitmux);
929 g_value_set_int (value, splitmux->start_index);
930 GST_OBJECT_UNLOCK (splitmux);
932 case PROP_MAX_SIZE_BYTES:
933 GST_OBJECT_LOCK (splitmux);
934 g_value_set_uint64 (value, splitmux->threshold_bytes);
935 GST_OBJECT_UNLOCK (splitmux);
937 case PROP_MAX_SIZE_TIME:
938 GST_OBJECT_LOCK (splitmux);
939 g_value_set_uint64 (value, splitmux->threshold_time);
940 GST_OBJECT_UNLOCK (splitmux);
942 case PROP_MAX_SIZE_TIMECODE:
943 GST_OBJECT_LOCK (splitmux);
944 g_value_set_string (value, splitmux->threshold_timecode_str);
945 GST_OBJECT_UNLOCK (splitmux);
947 case PROP_SEND_KEYFRAME_REQUESTS:
948 GST_OBJECT_LOCK (splitmux);
949 g_value_set_boolean (value, splitmux->send_keyframe_requests);
950 GST_OBJECT_UNLOCK (splitmux);
953 GST_OBJECT_LOCK (splitmux);
954 g_value_set_uint (value, splitmux->max_files);
955 GST_OBJECT_UNLOCK (splitmux);
957 case PROP_MUXER_OVERHEAD:
958 GST_OBJECT_LOCK (splitmux);
959 g_value_set_double (value, splitmux->mux_overhead);
960 GST_OBJECT_UNLOCK (splitmux);
962 case PROP_USE_ROBUST_MUXING:
963 GST_OBJECT_LOCK (splitmux);
964 g_value_set_boolean (value, splitmux->use_robust_muxing);
965 GST_OBJECT_UNLOCK (splitmux);
967 case PROP_ALIGNMENT_THRESHOLD:
968 GST_OBJECT_LOCK (splitmux);
969 g_value_set_uint64 (value, splitmux->alignment_threshold);
970 GST_OBJECT_UNLOCK (splitmux);
973 GST_OBJECT_LOCK (splitmux);
974 g_value_set_object (value, splitmux->provided_sink);
975 GST_OBJECT_UNLOCK (splitmux);
978 GST_OBJECT_LOCK (splitmux);
979 g_value_set_object (value, splitmux->provided_muxer);
980 GST_OBJECT_UNLOCK (splitmux);
982 case PROP_RESET_MUXER:
983 GST_OBJECT_LOCK (splitmux);
984 g_value_set_boolean (value, splitmux->reset_muxer);
985 GST_OBJECT_UNLOCK (splitmux);
987 case PROP_ASYNC_FINALIZE:
988 GST_OBJECT_LOCK (splitmux);
989 g_value_set_boolean (value, splitmux->async_finalize);
990 GST_OBJECT_UNLOCK (splitmux);
992 case PROP_MUXER_FACTORY:
993 GST_OBJECT_LOCK (splitmux);
994 g_value_set_string (value, splitmux->muxer_factory);
995 GST_OBJECT_UNLOCK (splitmux);
997 case PROP_MUXER_PRESET:
998 GST_OBJECT_LOCK (splitmux);
999 g_value_set_string (value, splitmux->muxer_preset);
1000 GST_OBJECT_UNLOCK (splitmux);
1002 case PROP_MUXER_PROPERTIES:
1003 GST_OBJECT_LOCK (splitmux);
1004 gst_value_set_structure (value, splitmux->muxer_properties);
1005 GST_OBJECT_UNLOCK (splitmux);
1007 case PROP_SINK_FACTORY:
1008 GST_OBJECT_LOCK (splitmux);
1009 g_value_set_string (value, splitmux->sink_factory);
1010 GST_OBJECT_UNLOCK (splitmux);
1012 case PROP_SINK_PRESET:
1013 GST_OBJECT_LOCK (splitmux);
1014 g_value_set_string (value, splitmux->sink_preset);
1015 GST_OBJECT_UNLOCK (splitmux);
1017 case PROP_SINK_PROPERTIES:
1018 GST_OBJECT_LOCK (splitmux);
1019 gst_value_set_structure (value, splitmux->sink_properties);
1020 GST_OBJECT_UNLOCK (splitmux);
1022 case PROP_MUXERPAD_MAP:
1023 GST_SPLITMUX_LOCK (splitmux);
1024 gst_value_set_structure (value, splitmux->muxerpad_map);
1025 GST_SPLITMUX_UNLOCK (splitmux);
1028 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1033 /* Convenience function */
1034 static inline GstClockTimeDiff
1035 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
1037 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
1039 if (GST_CLOCK_TIME_IS_VALID (val)) {
1041 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
1051 mq_stream_ctx_reset (MqStreamCtx * ctx)
1053 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1054 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1055 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
1056 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1057 g_queue_clear (&ctx->queued_bufs);
1060 static MqStreamCtx *
1061 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
1065 ctx = g_new0 (MqStreamCtx, 1);
1066 ctx->splitmux = splitmux;
1067 g_queue_init (&ctx->queued_bufs);
1068 mq_stream_ctx_reset (ctx);
1074 mq_stream_ctx_free (MqStreamCtx * ctx)
1077 GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
1079 g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
1081 if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
1082 gst_element_set_locked_state (ctx->q, TRUE);
1083 gst_element_set_state (ctx->q, GST_STATE_NULL);
1084 gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
1085 gst_object_unref (parent);
1087 gst_object_unref (ctx->q);
1089 gst_object_unref (ctx->sinkpad);
1090 gst_object_unref (ctx->srcpad);
1091 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1092 g_queue_clear (&ctx->queued_bufs);
1097 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
1100 gchar *location = NULL;
1102 const gchar *msg_name = opened ?
1103 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
1104 GstClockTime running_time = splitmux->reference_ctx->out_running_time;
1107 GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
1109 running_time = *rtime;
1112 if (g_object_class_find_property (G_OBJECT_GET_CLASS (sink),
1113 "location") != NULL)
1114 g_object_get (sink, "location", &location, NULL);
1116 GST_DEBUG_OBJECT (splitmux,
1117 "Sending %s message. Running time %" GST_TIME_FORMAT " location %s",
1118 msg_name, GST_TIME_ARGS (running_time), GST_STR_NULL (location));
1120 /* If it's in the middle of a teardown, the reference_ctc might have become
1122 if (splitmux->reference_ctx) {
1123 msg = gst_message_new_element (GST_OBJECT (splitmux),
1124 gst_structure_new (msg_name,
1125 "location", G_TYPE_STRING, location,
1126 "running-time", GST_TYPE_CLOCK_TIME, running_time,
1127 "sink", GST_TYPE_ELEMENT, sink, NULL));
1128 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
1135 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
1141 eos = gst_event_new_eos ();
1145 GST_SPLITMUX_LOCK (splitmux);
1147 pad = gst_pad_get_peer (ctx->srcpad);
1148 GST_SPLITMUX_UNLOCK (splitmux);
1150 gst_pad_send_event (pad, eos);
1151 GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
1153 gst_object_unref (pad);
1157 /* Called with lock held, drops the lock to send EOS to the
1161 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1166 eos = gst_event_new_eos ();
1167 pad = gst_pad_get_peer (ctx->srcpad);
1169 ctx->out_eos = TRUE;
1171 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
1172 GST_SPLITMUX_UNLOCK (splitmux);
1173 gst_pad_send_event (pad, eos);
1174 GST_SPLITMUX_LOCK (splitmux);
1176 gst_object_unref (pad);
1179 /* Called with lock held. Schedules an EOS event to the ctx pad
1180 * to happen in another thread */
1182 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1184 AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
1185 GstPad *srcpad, *sinkpad;
1187 srcpad = ctx->srcpad;
1188 sinkpad = gst_pad_get_peer (srcpad);
1191 helper->pad = sinkpad; /* Takes the reference */
1193 ctx->out_eos_async_done = TRUE;
1195 /* There used to be a bug here, where we had to explicitly remove
1196 * the SINK flag so that GstBin would ignore it for EOS purposes.
1197 * That fixed a race where if splitmuxsink really reaches EOS
1198 * before an asynchronous background element has finished, then
1199 * the bin wouldn't actually send EOS to the pipeline. Even after
1200 * finishing and removing the old element, the bin didn't re-check
1201 * EOS status on removing a SINK element. That bug was fixed
1203 GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1206 g_assert_nonnull (helper->pad);
1207 gst_element_call_async (GST_ELEMENT (splitmux),
1208 (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1211 /* Called with lock held. TRUE iff all contexts have a
1212 * pending (or delivered) async eos event */
1214 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1216 gboolean ret = TRUE;
1219 for (item = splitmux->contexts; item; item = item->next) {
1220 MqStreamCtx *ctx = item->data;
1221 ret &= ctx->out_eos_async_done;
1226 /* Called with splitmux lock held to check if this output
1227 * context needs to sleep to wait for the release of the
1228 * next GOP, or to send EOS to close out the current file
1230 static GstFlowReturn
1231 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1233 if (ctx->caps_change)
1237 /* When first starting up, the reference stream has to output
1238 * the first buffer to prepare the muxer and sink */
1239 gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1240 GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1242 if (!(splitmux->max_out_running_time == 0 ||
1243 splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1244 splitmux->alignment_threshold == 0 ||
1245 splitmux->max_out_running_time < splitmux->alignment_threshold)) {
1246 my_max_out_running_time -= splitmux->alignment_threshold;
1247 GST_LOG_OBJECT (ctx->srcpad,
1248 "Max out running time currently %" GST_STIME_FORMAT
1249 ", with threshold applied it is %" GST_STIME_FORMAT,
1250 GST_STIME_ARGS (splitmux->max_out_running_time),
1251 GST_STIME_ARGS (my_max_out_running_time));
1255 || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1256 return GST_FLOW_FLUSHING;
1258 GST_LOG_OBJECT (ctx->srcpad,
1259 "Checking running time %" GST_STIME_FORMAT " against max %"
1260 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1261 GST_STIME_ARGS (my_max_out_running_time));
1264 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1265 ctx->out_running_time < my_max_out_running_time) {
1269 switch (splitmux->output_state) {
1270 case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1271 /* We only get here if we've finished outputting a GOP and need to know
1272 * what to do next */
1273 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1274 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1277 case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1278 case SPLITMUX_OUTPUT_STATE_ENDING_STREAM:
1279 /* We've reached the max out running_time to get here, so end this file now */
1280 if (ctx->out_eos == FALSE) {
1281 if (splitmux->async_finalize) {
1282 /* We must set EOS asynchronously at this point. We cannot defer
1283 * it, because we need all contexts to wake up, for the
1284 * reference context to eventually give us something at
1285 * START_NEXT_FILE. Otherwise, collectpads might choose another
1286 * context to give us the first buffer, and format-location-full
1287 * will not contain a valid sample. */
1288 g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1289 GINT_TO_POINTER (1));
1290 eos_context_async (ctx, splitmux);
1291 if (all_contexts_are_async_eos (splitmux)) {
1292 GST_INFO_OBJECT (splitmux,
1293 "All contexts are async_eos. Moving to the next file.");
1294 /* We can start the next file once we've asked each pad to go EOS */
1295 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1296 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1300 send_eos (splitmux, ctx);
1304 GST_INFO_OBJECT (splitmux,
1305 "At end-of-file state, but context %p is already EOS", ctx);
1308 case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1309 if (ctx->is_reference) {
1310 GstFlowReturn ret = GST_FLOW_OK;
1312 /* Special handling on the reference ctx to start new fragments
1313 * and collect commands from the command queue */
1314 /* drops the splitmux lock briefly: */
1315 /* We must have reference ctx in order for format-location-full to
1317 ret = start_next_fragment (splitmux, ctx);
1318 if (ret != GST_FLOW_OK)
1324 case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1326 SplitMuxOutputCommand *cmd =
1327 g_queue_pop_tail (&splitmux->out_cmd_q);
1329 /* If we pop the last command, we need to make our queues bigger */
1330 if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1331 grow_blocked_queues (splitmux);
1333 if (cmd->start_new_fragment) {
1334 if (splitmux->muxed_out_bytes > 0) {
1335 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1336 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1338 GST_DEBUG_OBJECT (splitmux,
1339 "Got cmd to start new fragment, but fragment is empty - ignoring.");
1342 GST_DEBUG_OBJECT (splitmux,
1343 "Got new output cmd for time %" GST_STIME_FORMAT,
1344 GST_STIME_ARGS (cmd->max_output_ts));
1346 /* Extend the output range immediately */
1347 splitmux->max_out_running_time = cmd->max_output_ts;
1348 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1350 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1352 out_cmd_buf_free (cmd);
1355 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1357 } while (!ctx->flushing && splitmux->output_state ==
1358 SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1359 /* loop and re-check the state */
1362 case SPLITMUX_OUTPUT_STATE_STOPPED:
1363 return GST_FLOW_FLUSHING;
1366 GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
1369 GST_INFO_OBJECT (ctx->srcpad,
1370 "Sleeping for running time %"
1371 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1372 GST_STIME_ARGS (ctx->out_running_time),
1373 GST_STIME_ARGS (splitmux->max_out_running_time));
1374 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1375 GST_INFO_OBJECT (ctx->srcpad,
1376 "Woken for new max running time %" GST_STIME_FORMAT,
1377 GST_STIME_ARGS (splitmux->max_out_running_time));
1385 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1386 const GstVideoTimeCode * cur_tc, GstClockTime running_time,
1387 GstVideoTimeCode ** next_tc)
1389 GstVideoTimeCode *target_tc;
1390 GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1392 if (cur_tc == NULL || splitmux->tc_interval == NULL)
1393 return GST_CLOCK_TIME_NONE;
1395 target_tc = gst_video_time_code_add_interval (cur_tc, splitmux->tc_interval);
1397 GST_ELEMENT_ERROR (splitmux,
1398 STREAM, FAILED, (NULL), ("Couldn't calculate target timecode"));
1399 return GST_CLOCK_TIME_NONE;
1403 target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1404 cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1406 /* Add running_time, accounting for wraparound. */
1407 if (target_tc_time >= cur_tc_time) {
1408 next_max_tc_time = target_tc_time - cur_tc_time + running_time;
1410 GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1412 if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1413 (cur_tc->config.fps_d == 1001)) {
1414 /* Checking fps_d is probably unneeded, but better safe than sorry
1415 * (e.g. someone accidentally set a flag) */
1416 GstVideoTimeCode *tc_for_offset;
1418 /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1419 * but slightly less. Calculate that duration from a fake timecode. The
1420 * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1421 * is to add one frame to 23:59:59;29 */
1423 gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1424 NULL, cur_tc->config.flags, 23, 59, 59,
1425 cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1427 gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1428 gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1429 cur_tc->config.fps_n);
1430 gst_video_time_code_free (tc_for_offset);
1432 next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
1435 GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
1436 " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
1437 GST_TIME_ARGS (cur_tc_time));
1439 *next_tc = target_tc;
1441 gst_video_time_code_free (target_tc);
1443 return next_max_tc_time;
1447 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
1448 GstClockTime running_time)
1451 GstClockTime target_time;
1452 gboolean timecode_based = FALSE;
1453 GstClockTime max_tc_time = GST_CLOCK_TIME_NONE;
1454 GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
1455 GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
1456 GstClockTime tc_rounding_error = 5 * GST_USECOND;
1458 if (!splitmux->send_keyframe_requests)
1461 if (splitmux->tc_interval) {
1462 if (splitmux->in_tc && gst_video_time_code_is_valid (splitmux->in_tc)) {
1463 GstVideoTimeCode *next_tc = NULL;
1465 calculate_next_max_timecode (splitmux, splitmux->in_tc,
1466 running_time, &next_tc);
1468 /* calculate the next expected keyframe time to prevent too early fku
1470 if (GST_CLOCK_TIME_IS_VALID (max_tc_time) && next_tc) {
1472 calculate_next_max_timecode (splitmux, next_tc, max_tc_time, NULL);
1475 gst_video_time_code_free (next_tc);
1477 timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
1478 GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
1480 /* This can happen in the presence of GAP events that trigger
1481 * a new fragment start */
1482 GST_WARNING_OBJECT (splitmux,
1483 "No buffer available to calculate next timecode");
1487 if ((splitmux->threshold_time == 0 && !timecode_based)
1488 || splitmux->threshold_bytes != 0)
1491 if (timecode_based) {
1492 /* We might have rounding errors: aim slightly earlier */
1493 if (max_tc_time >= tc_rounding_error) {
1494 target_time = max_tc_time - tc_rounding_error;
1496 /* unreliable target time */
1497 GST_DEBUG_OBJECT (splitmux, "tc time %" GST_TIME_FORMAT
1498 " is smaller than allowed rounding error, set it to zero",
1499 GST_TIME_ARGS (max_tc_time));
1503 if (next_max_tc_time >= tc_rounding_error) {
1504 next_fku_time = next_max_tc_time - tc_rounding_error;
1506 /* unreliable target time */
1507 GST_DEBUG_OBJECT (splitmux, "next tc time %" GST_TIME_FORMAT
1508 " is smaller than allowed rounding error, set it to zero",
1509 GST_TIME_ARGS (next_max_tc_time));
1513 target_time = running_time + splitmux->threshold_time;
1516 if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
1517 GstClockTime allowed_time = splitmux->next_fku_time;
1519 if (timecode_based) {
1520 if (allowed_time >= tc_rounding_error) {
1521 allowed_time -= tc_rounding_error;
1523 /* unreliable next force key unit time */
1524 GST_DEBUG_OBJECT (splitmux, "expected next force key unit time %"
1526 " is smaller than allowed rounding error, set it to zero",
1527 GST_TIME_ARGS (splitmux->next_fku_time));
1532 if (target_time < allowed_time) {
1533 GST_LOG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1534 " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1535 ", rounding error compensated next keyframe time %" GST_TIME_FORMAT,
1536 GST_TIME_ARGS (target_time),
1537 GST_TIME_ARGS (splitmux->next_fku_time),
1538 GST_TIME_ARGS (allowed_time));
1541 } else if (allowed_time != splitmux->next_fku_time &&
1542 target_time < splitmux->next_fku_time) {
1543 GST_DEBUG_OBJECT (splitmux, "Target time %" GST_TIME_FORMAT
1544 " is smaller than expected next keyframe time %" GST_TIME_FORMAT
1545 ", but the difference is smaller than allowed rounding error",
1546 GST_TIME_ARGS (target_time), GST_TIME_ARGS (splitmux->next_fku_time));
1550 if (!timecode_based) {
1551 next_fku_time = target_time + splitmux->threshold_time;
1554 splitmux->next_fku_time = next_fku_time;
1556 ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1557 GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
1558 ", the next expected keyframe is %" GST_TIME_FORMAT,
1559 GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
1560 return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1563 static GstPadProbeReturn
1564 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1566 GstSplitMuxSink *splitmux = ctx->splitmux;
1567 MqStreamBuf *buf_info = NULL;
1568 GstFlowReturn ret = GST_FLOW_OK;
1570 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1572 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1573 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1574 g_warning ("Buffer list handling not implemented");
1575 return GST_PAD_PROBE_DROP;
1577 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1578 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1579 GstEvent *event = gst_pad_probe_info_get_event (info);
1580 gboolean locked = FALSE, wait = !ctx->is_reference;
1582 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1584 switch (GST_EVENT_TYPE (event)) {
1585 case GST_EVENT_SEGMENT:
1586 gst_event_copy_segment (event, &ctx->out_segment);
1588 case GST_EVENT_FLUSH_STOP:
1589 GST_SPLITMUX_LOCK (splitmux);
1591 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1592 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1593 g_queue_clear (&ctx->queued_bufs);
1594 g_queue_clear (&ctx->queued_bufs);
1595 /* If this is the reference context, we just threw away any queued keyframes */
1596 if (ctx->is_reference)
1597 splitmux->queued_keyframes = 0;
1598 ctx->flushing = FALSE;
1601 case GST_EVENT_FLUSH_START:
1602 GST_SPLITMUX_LOCK (splitmux);
1604 GST_LOG_OBJECT (pad, "Flush start");
1605 ctx->flushing = TRUE;
1606 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1607 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1610 GST_SPLITMUX_LOCK (splitmux);
1612 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1614 ctx->out_eos = TRUE;
1616 if (ctx == splitmux->reference_ctx) {
1617 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_STREAM;
1618 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1621 GST_INFO_OBJECT (splitmux,
1622 "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1624 case GST_EVENT_GAP:{
1625 GstClockTime gap_ts;
1626 GstClockTimeDiff rtime;
1628 gst_event_parse_gap (event, &gap_ts, NULL);
1629 if (gap_ts == GST_CLOCK_TIME_NONE)
1632 GST_SPLITMUX_LOCK (splitmux);
1635 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1638 /* When we get a gap event on the
1639 * reference stream and we're trying to open a
1640 * new file, we need to store it until we get
1641 * the buffer afterwards
1643 if (ctx->is_reference &&
1644 (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1645 GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1646 gst_event_replace (&ctx->pending_gap, event);
1647 GST_SPLITMUX_UNLOCK (splitmux);
1648 return GST_PAD_PROBE_HANDLED;
1651 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1653 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1654 GST_STIME_ARGS (rtime));
1656 if (rtime != GST_CLOCK_STIME_NONE) {
1657 ctx->out_running_time = rtime;
1658 complete_or_wait_on_out (splitmux, ctx);
1662 case GST_EVENT_CUSTOM_DOWNSTREAM:{
1663 const GstStructure *s;
1664 GstClockTimeDiff ts = 0;
1666 s = gst_event_get_structure (event);
1667 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1670 gst_structure_get_int64 (s, "timestamp", &ts);
1672 GST_SPLITMUX_LOCK (splitmux);
1675 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1677 ctx->out_running_time = ts;
1678 if (!ctx->is_reference)
1679 ret = complete_or_wait_on_out (splitmux, ctx);
1680 GST_SPLITMUX_UNLOCK (splitmux);
1681 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1682 return GST_PAD_PROBE_DROP;
1684 case GST_EVENT_CAPS:{
1687 if (!ctx->is_reference)
1690 peer = gst_pad_get_peer (pad);
1692 gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1694 gst_object_unref (peer);
1702 /* This is in the case the muxer doesn't allow this change of caps */
1703 GST_SPLITMUX_LOCK (splitmux);
1705 ctx->caps_change = TRUE;
1707 if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1708 GST_DEBUG_OBJECT (splitmux,
1709 "New caps were not accepted. Switching output file");
1710 if (ctx->out_eos == FALSE) {
1711 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1712 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1716 /* Lets it fall through, if it fails again, then the muxer just can't
1717 * support this format, but at least we have a closed file.
1725 /* We need to make sure events aren't passed
1726 * until the muxer / sink are ready for it */
1728 GST_SPLITMUX_LOCK (splitmux);
1730 ret = complete_or_wait_on_out (splitmux, ctx);
1731 GST_SPLITMUX_UNLOCK (splitmux);
1733 /* Don't try to forward sticky events before the next buffer is there
1734 * because it would cause a new file to be created without the first
1735 * buffer being available.
1737 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1738 if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1739 gst_event_unref (event);
1740 return GST_PAD_PROBE_HANDLED;
1742 return GST_PAD_PROBE_PASS;
1746 /* Allow everything through until the configured next stopping point */
1747 GST_SPLITMUX_LOCK (splitmux);
1749 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1750 if (buf_info == NULL) {
1751 /* Can only happen due to a poorly timed flush */
1752 ret = GST_FLOW_FLUSHING;
1756 /* If we have popped a keyframe, decrement the queued_gop count */
1757 if (buf_info->keyframe && splitmux->queued_keyframes > 0)
1758 splitmux->queued_keyframes--;
1760 ctx->out_running_time = buf_info->run_ts;
1761 ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1763 GST_LOG_OBJECT (splitmux,
1764 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1765 " size %" G_GUINT64_FORMAT,
1766 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1768 ctx->caps_change = FALSE;
1770 ret = complete_or_wait_on_out (splitmux, ctx);
1772 splitmux->muxed_out_bytes += buf_info->buf_size;
1774 #ifndef GST_DISABLE_GST_DEBUG
1776 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1777 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1778 " run ts %" GST_STIME_FORMAT, buf,
1779 GST_STIME_ARGS (ctx->out_running_time));
1783 ctx->cur_out_buffer = NULL;
1784 GST_SPLITMUX_UNLOCK (splitmux);
1786 /* pending_gap is protected by the STREAM lock */
1787 if (ctx->pending_gap) {
1788 /* If we previously stored a gap event, send it now */
1789 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1791 GST_DEBUG_OBJECT (splitmux,
1792 "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1794 gst_pad_send_event (peer, ctx->pending_gap);
1795 ctx->pending_gap = NULL;
1797 gst_object_unref (peer);
1800 mq_stream_buf_free (buf_info);
1802 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1803 return GST_PAD_PROBE_PASS;
1806 GST_SPLITMUX_UNLOCK (splitmux);
1807 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
1808 return GST_PAD_PROBE_DROP;
1812 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1814 return gst_pad_send_event (peer, gst_event_ref (*event));
1818 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1820 if (ctx->fragment_block_id > 0) {
1821 gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1822 ctx->fragment_block_id = 0;
1827 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1829 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1831 gst_pad_sticky_events_foreach (ctx->srcpad,
1832 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1834 /* Clear EOS flag if not actually EOS */
1835 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1836 ctx->out_eos_async_done = ctx->out_eos;
1838 gst_object_unref (peer);
1842 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1844 GstPad *sinkpad, *srcpad, *newpad;
1845 GstPadTemplate *templ;
1847 srcpad = ctx->srcpad;
1848 sinkpad = gst_pad_get_peer (srcpad);
1850 templ = sinkpad->padtemplate;
1852 gst_element_request_pad (splitmux->muxer, templ,
1853 GST_PAD_NAME (sinkpad), NULL);
1855 GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1857 if (!gst_pad_unlink (srcpad, sinkpad)) {
1858 gst_object_unref (sinkpad);
1861 if (gst_pad_link_full (srcpad, newpad,
1862 GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1863 gst_element_release_request_pad (splitmux->muxer, newpad);
1864 gst_object_unref (sinkpad);
1865 gst_object_unref (newpad);
1868 gst_object_unref (newpad);
1869 gst_object_unref (sinkpad);
1873 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1874 ("Could not create the new muxer/sink"), NULL);
1877 static GstPadProbeReturn
1878 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1880 return GST_PAD_PROBE_OK;
1884 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1886 ctx->fragment_block_id =
1887 gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1892 _set_property_from_structure (GQuark field_id, const GValue * value,
1895 const gchar *property_name = g_quark_to_string (field_id);
1896 GObject *element = G_OBJECT (user_data);
1898 g_object_set_property (element, property_name, value);
1904 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1906 gst_element_set_locked_state (element, TRUE);
1907 gst_element_set_state (element, GST_STATE_NULL);
1908 GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1909 gst_bin_remove (GST_BIN (splitmux), element);
1914 _send_event (const GValue * value, gpointer user_data)
1916 GstPad *pad = g_value_get_object (value);
1917 GstEvent *ev = user_data;
1919 gst_pad_send_event (pad, gst_event_ref (ev));
1922 /* Called with lock held when a fragment
1923 * reaches EOS and it is time to restart
1926 static GstFlowReturn
1927 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1929 GstElement *muxer, *sink;
1931 g_assert (ctx->is_reference);
1933 /* 1 change to new file */
1934 splitmux->switching_fragment = TRUE;
1936 /* We need to drop the splitmux lock to acquire the state lock
1937 * here and ensure there's no racy state change going on elsewhere */
1938 muxer = gst_object_ref (splitmux->muxer);
1939 sink = gst_object_ref (splitmux->active_sink);
1941 GST_SPLITMUX_UNLOCK (splitmux);
1942 GST_SPLITMUX_STATE_LOCK (splitmux);
1944 if (splitmux->shutdown) {
1945 GST_DEBUG_OBJECT (splitmux,
1946 "Shutdown requested. Aborting fragment switch.");
1947 GST_SPLITMUX_LOCK (splitmux);
1948 GST_SPLITMUX_STATE_UNLOCK (splitmux);
1949 gst_object_unref (muxer);
1950 gst_object_unref (sink);
1951 return GST_FLOW_FLUSHING;
1954 if (splitmux->async_finalize) {
1955 if (splitmux->muxed_out_bytes > 0
1956 || splitmux->fragment_id != splitmux->start_index) {
1958 GstElement *new_sink, *new_muxer;
1960 GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
1961 splitmux->fragment_id);
1962 g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
1963 newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
1964 GST_SPLITMUX_LOCK (splitmux);
1965 if ((splitmux->sink =
1966 create_element (splitmux, splitmux->sink_factory, newname,
1969 if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
1970 gst_preset_load_preset (GST_PRESET (splitmux->sink),
1971 splitmux->sink_preset);
1972 if (splitmux->sink_properties)
1973 gst_structure_foreach (splitmux->sink_properties,
1974 _set_property_from_structure, splitmux->sink);
1975 splitmux->active_sink = splitmux->sink;
1976 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
1978 newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
1979 if ((splitmux->muxer =
1980 create_element (splitmux, splitmux->muxer_factory, newname,
1983 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
1985 /* async child elements are causing state change races and weird
1986 * failures, so let's try and turn that off */
1987 g_object_set (splitmux->sink, "async", FALSE, NULL);
1989 if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
1990 gst_preset_load_preset (GST_PRESET (splitmux->muxer),
1991 splitmux->muxer_preset);
1992 if (splitmux->muxer_properties)
1993 gst_structure_foreach (splitmux->muxer_properties,
1994 _set_property_from_structure, splitmux->muxer);
1995 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
1997 new_sink = splitmux->sink;
1998 new_muxer = splitmux->muxer;
1999 GST_SPLITMUX_UNLOCK (splitmux);
2000 g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
2001 gst_element_link (new_muxer, new_sink);
2003 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2004 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2005 EOS_FROM_US)) == 2) {
2006 _lock_and_set_to_null (muxer, splitmux);
2007 _lock_and_set_to_null (sink, splitmux);
2009 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2010 GINT_TO_POINTER (2));
2013 gst_object_unref (muxer);
2014 gst_object_unref (sink);
2017 gst_object_ref (muxer);
2018 gst_object_ref (sink);
2022 gst_element_set_locked_state (muxer, TRUE);
2023 gst_element_set_locked_state (sink, TRUE);
2024 gst_element_set_state (sink, GST_STATE_NULL);
2026 if (splitmux->reset_muxer) {
2027 gst_element_set_state (muxer, GST_STATE_NULL);
2029 GstIterator *it = gst_element_iterate_sink_pads (muxer);
2033 ev = gst_event_new_flush_start ();
2034 seqnum = gst_event_get_seqnum (ev);
2035 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2036 gst_event_unref (ev);
2038 gst_iterator_resync (it);
2040 ev = gst_event_new_flush_stop (TRUE);
2041 gst_event_set_seqnum (ev, seqnum);
2042 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
2043 gst_event_unref (ev);
2045 gst_iterator_free (it);
2049 GST_SPLITMUX_LOCK (splitmux);
2050 set_next_filename (splitmux, ctx);
2051 splitmux->muxed_out_bytes = 0;
2052 GST_SPLITMUX_UNLOCK (splitmux);
2054 if (gst_element_set_state (sink,
2055 GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2056 gst_element_set_state (sink, GST_STATE_NULL);
2057 gst_element_set_locked_state (muxer, FALSE);
2058 gst_element_set_locked_state (sink, FALSE);
2063 if (gst_element_set_state (muxer,
2064 GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
2065 gst_element_set_state (muxer, GST_STATE_NULL);
2066 gst_element_set_state (sink, GST_STATE_NULL);
2067 gst_element_set_locked_state (muxer, FALSE);
2068 gst_element_set_locked_state (sink, FALSE);
2072 gst_element_set_locked_state (muxer, FALSE);
2073 gst_element_set_locked_state (sink, FALSE);
2075 gst_object_unref (sink);
2076 gst_object_unref (muxer);
2078 GST_SPLITMUX_LOCK (splitmux);
2079 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2080 splitmux->switching_fragment = FALSE;
2081 do_async_done (splitmux);
2083 splitmux->ready_for_output = TRUE;
2085 g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
2086 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
2088 send_fragment_opened_closed_msg (splitmux, TRUE, sink);
2090 /* FIXME: Is this always the correct next state? */
2091 GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
2092 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
2093 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2097 gst_object_unref (sink);
2098 gst_object_unref (muxer);
2100 GST_SPLITMUX_LOCK (splitmux);
2101 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2102 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2103 ("Could not create the new muxer/sink"), NULL);
2104 return GST_FLOW_ERROR;
2107 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2108 ("Could not start new output sink"), NULL);
2109 gst_object_unref (sink);
2110 gst_object_unref (muxer);
2112 GST_SPLITMUX_LOCK (splitmux);
2113 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2114 splitmux->switching_fragment = FALSE;
2115 return GST_FLOW_ERROR;
2118 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
2119 ("Could not start new muxer"), NULL);
2120 gst_object_unref (sink);
2121 gst_object_unref (muxer);
2123 GST_SPLITMUX_LOCK (splitmux);
2124 GST_SPLITMUX_STATE_UNLOCK (splitmux);
2125 splitmux->switching_fragment = FALSE;
2126 return GST_FLOW_ERROR;
2130 bus_handler (GstBin * bin, GstMessage * message)
2132 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
2134 switch (GST_MESSAGE_TYPE (message)) {
2135 case GST_MESSAGE_EOS:{
2136 /* If the state is draining out the current file, drop this EOS */
2139 sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
2140 GST_SPLITMUX_LOCK (splitmux);
2142 send_fragment_opened_closed_msg (splitmux, FALSE, sink);
2144 if (splitmux->async_finalize) {
2146 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
2147 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
2148 EOS_FROM_US)) == 2) {
2150 GstPad *sinksink, *muxersrc;
2152 sinksink = gst_element_get_static_pad (sink, "sink");
2153 muxersrc = gst_pad_get_peer (sinksink);
2154 muxer = gst_pad_get_parent_element (muxersrc);
2155 gst_object_unref (sinksink);
2156 gst_object_unref (muxersrc);
2158 gst_element_call_async (muxer,
2159 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2160 gst_object_ref (splitmux), gst_object_unref);
2161 gst_element_call_async (sink,
2162 (GstElementCallAsyncFunc) _lock_and_set_to_null,
2163 gst_object_ref (splitmux), gst_object_unref);
2164 gst_object_unref (muxer);
2166 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
2167 GINT_TO_POINTER (2));
2169 GST_DEBUG_OBJECT (splitmux,
2170 "Caught async EOS from previous muxer+sink. Dropping.");
2171 /* We forward the EOS so that it gets aggregated as normal. If the sink
2172 * finishes and is removed before the end, it will be de-aggregated */
2173 gst_message_unref (message);
2174 GST_SPLITMUX_UNLOCK (splitmux);
2177 } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_STREAM) {
2178 GST_DEBUG_OBJECT (splitmux,
2179 "Passing EOS message. Output state %d max_out_running_time %"
2180 GST_STIME_FORMAT, splitmux->output_state,
2181 GST_STIME_ARGS (splitmux->max_out_running_time));
2183 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
2184 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2185 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2187 gst_message_unref (message);
2188 GST_SPLITMUX_UNLOCK (splitmux);
2191 GST_SPLITMUX_UNLOCK (splitmux);
2194 case GST_MESSAGE_ASYNC_START:
2195 case GST_MESSAGE_ASYNC_DONE:
2196 /* Ignore state changes from our children while switching */
2197 GST_SPLITMUX_LOCK (splitmux);
2198 if (splitmux->switching_fragment) {
2199 if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
2200 || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
2201 GST_LOG_OBJECT (splitmux,
2202 "Ignoring state change from child %" GST_PTR_FORMAT
2203 " while switching", GST_MESSAGE_SRC (message));
2204 gst_message_unref (message);
2205 GST_SPLITMUX_UNLOCK (splitmux);
2209 GST_SPLITMUX_UNLOCK (splitmux);
2211 case GST_MESSAGE_WARNING:
2213 GError *gerror = NULL;
2215 gst_message_parse_warning (message, &gerror, NULL);
2217 if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
2219 gboolean caps_change = FALSE;
2221 GST_SPLITMUX_LOCK (splitmux);
2223 for (item = splitmux->contexts; item; item = item->next) {
2224 MqStreamCtx *ctx = item->data;
2226 if (ctx->caps_change) {
2232 GST_SPLITMUX_UNLOCK (splitmux);
2235 GST_LOG_OBJECT (splitmux,
2236 "Ignoring warning change from child %" GST_PTR_FORMAT
2237 " while switching caps", GST_MESSAGE_SRC (message));
2238 gst_message_unref (message);
2248 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
2252 ctx_set_unblock (MqStreamCtx * ctx)
2254 ctx->need_unblock = TRUE;
2258 need_new_fragment (GstSplitMuxSink * splitmux,
2259 GstClockTime queued_time, GstClockTime queued_gop_time,
2260 guint64 queued_bytes)
2262 guint64 thresh_bytes;
2263 GstClockTime thresh_time;
2264 gboolean check_robust_muxing;
2265 GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
2266 GstClockTime *ptr_to_time;
2268 GST_OBJECT_LOCK (splitmux);
2269 thresh_bytes = splitmux->threshold_bytes;
2270 thresh_time = splitmux->threshold_time;
2271 ptr_to_time = (GstClockTime *)
2272 gst_queue_array_peek_head_struct (splitmux->times_to_split);
2274 time_to_split = *ptr_to_time;
2275 check_robust_muxing = splitmux->use_robust_muxing
2276 && splitmux->muxer_has_reserved_props;
2277 GST_OBJECT_UNLOCK (splitmux);
2279 /* Have we muxed at least one thing from the reference
2280 * stream into the file? If not, no other streams can have
2282 if (splitmux->fragment_reference_bytes <= 0) {
2283 GST_TRACE_OBJECT (splitmux,
2284 "Not ready to split - nothing muxed on the reference stream");
2288 /* User told us to split now */
2289 if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE) {
2290 GST_TRACE_OBJECT (splitmux, "Forcing because split_next_gop is set");
2294 /* User told us to split at this running time */
2295 if (splitmux->gop_start_time >= time_to_split) {
2296 GST_OBJECT_LOCK (splitmux);
2297 /* Dequeue running time */
2298 gst_queue_array_pop_head_struct (splitmux->times_to_split);
2299 /* Empty any running times after this that are past now */
2300 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2301 while (ptr_to_time) {
2302 time_to_split = *ptr_to_time;
2303 if (splitmux->gop_start_time < time_to_split) {
2306 gst_queue_array_pop_head_struct (splitmux->times_to_split);
2307 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
2309 GST_TRACE_OBJECT (splitmux,
2310 "GOP start time %" GST_STIME_FORMAT " is after requested split point %"
2311 GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->gop_start_time),
2312 GST_STIME_ARGS (time_to_split));
2313 GST_OBJECT_UNLOCK (splitmux);
2317 if (thresh_bytes > 0 && queued_bytes > thresh_bytes) {
2318 GST_TRACE_OBJECT (splitmux,
2319 "queued bytes %" G_GUINT64_FORMAT " overruns byte limit", queued_bytes);
2320 return TRUE; /* Would overrun byte limit */
2323 if (thresh_time > 0 && queued_time > thresh_time) {
2324 GST_TRACE_OBJECT (splitmux,
2325 "queued time %" GST_STIME_FORMAT " overruns time limit",
2326 GST_STIME_ARGS (queued_time));
2327 return TRUE; /* Would overrun time limit */
2330 if (splitmux->tc_interval &&
2331 GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
2332 splitmux->reference_ctx->in_running_time >
2333 splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
2334 GST_TRACE_OBJECT (splitmux,
2335 "in running time %" GST_STIME_FORMAT " overruns time limit %"
2337 GST_STIME_ARGS (splitmux->reference_ctx->in_running_time),
2338 GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2342 if (check_robust_muxing) {
2343 GstClockTime mux_reserved_remain;
2345 g_object_get (splitmux->muxer,
2346 "reserved-duration-remaining", &mux_reserved_remain, NULL);
2348 GST_LOG_OBJECT (splitmux,
2349 "Muxer robust muxing report - %" G_GUINT64_FORMAT
2350 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
2351 mux_reserved_remain, queued_gop_time);
2353 if (queued_gop_time >= mux_reserved_remain) {
2354 GST_INFO_OBJECT (splitmux,
2355 "File is about to run out of header room - %" G_GUINT64_FORMAT
2356 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
2357 ". Switching to new file", mux_reserved_remain, queued_gop_time);
2362 /* Continue and mux this GOP */
2366 /* probably we want to add this API? */
2368 video_time_code_replace (GstVideoTimeCode ** old_tc, GstVideoTimeCode * new_tc)
2370 GstVideoTimeCode *timecode = NULL;
2372 g_return_if_fail (old_tc != NULL);
2374 if (*old_tc == new_tc)
2378 timecode = gst_video_time_code_copy (new_tc);
2381 gst_video_time_code_free (*old_tc);
2386 /* Called with splitmux lock held */
2387 /* Called when entering ProcessingCompleteGop state
2388 * Assess if mq contents overflowed the current file
2389 * -> If yes, need to switch to new file
2390 * -> if no, set max_out_running_time to let this GOP in and
2391 * go to COLLECTING_GOP_START state
2394 handle_gathered_gop (GstSplitMuxSink * splitmux)
2396 guint64 queued_bytes;
2397 GstClockTimeDiff queued_time = 0;
2398 GstClockTimeDiff queued_gop_time = 0;
2399 GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
2400 SplitMuxOutputCommand *cmd;
2402 /* Assess if the multiqueue contents overflowed the current file */
2403 /* When considering if a newly gathered GOP overflows
2404 * the time limit for the file, only consider the running time of the
2405 * reference stream. Other streams might have run ahead a little bit,
2406 * but extra pieces won't be released to the muxer beyond the reference
2407 * stream cut-off anyway - so it forms the limit. */
2408 queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
2409 queued_time = splitmux->reference_ctx->in_running_time;
2410 /* queued_gop_time tracks how much unwritten data there is waiting to
2411 * be written to this fragment including this GOP */
2412 if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2414 splitmux->reference_ctx->in_running_time -
2415 splitmux->reference_ctx->out_running_time;
2418 splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
2420 GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2421 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2422 " bytes %" G_GUINT64_FORMAT " in running time %" GST_STIME_FORMAT
2423 " gop start time %" GST_STIME_FORMAT,
2424 GST_STIME_ARGS (queued_time), queued_bytes,
2425 GST_STIME_ARGS (splitmux->reference_ctx->in_running_time),
2426 GST_STIME_ARGS (splitmux->gop_start_time));
2428 if (queued_gop_time < 0)
2429 goto error_gop_duration;
2431 if (queued_time < splitmux->fragment_start_time)
2432 goto error_queued_time;
2434 queued_time -= splitmux->fragment_start_time;
2435 if (queued_time < queued_gop_time)
2436 queued_gop_time = queued_time;
2438 /* Expand queued bytes estimate by muxer overhead */
2439 queued_bytes += (queued_bytes * splitmux->mux_overhead);
2441 /* Check for overrun - have we output at least one byte and overrun
2442 * either threshold? */
2443 if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2444 if (splitmux->async_finalize) {
2445 GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2446 *sink_running_time = splitmux->reference_ctx->out_running_time;
2447 g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2448 RUNNING_TIME, sink_running_time, g_free);
2450 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2451 /* Tell the output side to start a new fragment */
2452 GST_INFO_OBJECT (splitmux,
2453 "This GOP (dur %" GST_STIME_FORMAT
2454 ") would overflow the fragment, Sending start_new_fragment cmd",
2455 GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
2456 splitmux->gop_start_time));
2457 cmd = out_cmd_buf_new ();
2458 cmd->start_new_fragment = TRUE;
2459 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2460 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2462 new_out_ts = splitmux->reference_ctx->in_running_time;
2463 splitmux->fragment_start_time = splitmux->gop_start_time;
2464 splitmux->fragment_total_bytes = 0;
2465 splitmux->fragment_reference_bytes = 0;
2467 if (splitmux->tc_interval) {
2468 video_time_code_replace (&splitmux->fragment_start_tc,
2469 splitmux->gop_start_tc);
2470 splitmux->next_fragment_start_tc_time =
2471 calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
2472 splitmux->fragment_start_time, NULL);
2473 if (!GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
2474 GST_WARNING_OBJECT (splitmux,
2475 "Couldn't calculate next fragment start time for timecode mode");
2476 /* shouldn't happen, but reset all and try again with next buffers */
2477 gst_splitmux_reset_timecode (splitmux);
2482 /* And set up to collect the next GOP */
2483 if (!splitmux->reference_ctx->in_eos) {
2484 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2485 splitmux->gop_start_time = new_out_ts;
2486 if (splitmux->tc_interval)
2487 video_time_code_replace (&splitmux->gop_start_tc, splitmux->in_tc);
2489 /* This is probably already the current state, but just in case: */
2490 splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2491 new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */
2494 /* And wake all input contexts to send a wake-up event */
2495 g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2496 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2498 /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2499 splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
2500 splitmux->fragment_reference_bytes += splitmux->gop_reference_bytes;
2502 if (splitmux->gop_total_bytes > 0) {
2503 GST_LOG_OBJECT (splitmux,
2504 "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2505 " time %" GST_STIME_FORMAT,
2506 splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2508 /* Send this GOP to the output command queue */
2509 cmd = out_cmd_buf_new ();
2510 cmd->start_new_fragment = FALSE;
2511 cmd->max_output_ts = new_out_ts;
2512 GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2513 GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
2514 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2516 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2519 splitmux->gop_total_bytes = 0;
2520 splitmux->gop_reference_bytes = 0;
2524 GST_ELEMENT_ERROR (splitmux,
2525 STREAM, FAILED, ("Timestamping error on input streams"),
2526 ("Queued GOP time is negative %" GST_STIME_FORMAT,
2527 GST_STIME_ARGS (queued_gop_time)));
2530 GST_ELEMENT_ERROR (splitmux,
2531 STREAM, FAILED, ("Timestamping error on input streams"),
2532 ("Queued time is negative. Input went backwards. queued_time - %"
2533 GST_STIME_FORMAT, GST_STIME_ARGS (queued_time)));
2537 /* Called with splitmux lock held */
2538 /* Called from each input pad when it is has all the pieces
2539 * for a GOP or EOS, starting with the reference pad which has set the
2540 * splitmux->max_in_running_time
2543 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2548 /* On ENDING_FILE, the reference stream sends a command to start a new
2549 * fragment, then releases the GOP for output in the new fragment.
2550 * If some streams received no buffer during the last GOP that overran,
2551 * because its next buffer has a timestamp bigger than
2552 * ctx->max_in_running_time, its queue is empty. In that case the only
2553 * way to wakeup the output thread is by injecting an event in the
2554 * queue. This usually happen with subtitle streams.
2555 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2556 if (ctx->need_unblock) {
2557 GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2558 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2559 GST_EVENT_TYPE_SERIALIZED,
2560 gst_structure_new ("splitmuxsink-unblock", "timestamp",
2561 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2563 GST_SPLITMUX_UNLOCK (splitmux);
2564 gst_pad_send_event (ctx->sinkpad, event);
2565 GST_SPLITMUX_LOCK (splitmux);
2567 ctx->need_unblock = FALSE;
2568 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2569 /* state may have changed while we were unlocked. Loop again if so */
2570 if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2575 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2576 gboolean ready = TRUE;
2578 /* Iterate each pad, and check that the input running time is at least
2579 * up to the reference running time, and if so handle the collected GOP */
2580 GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
2581 GST_STIME_FORMAT " ctx %p",
2582 GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
2583 for (cur = g_list_first (splitmux->contexts); cur != NULL;
2584 cur = g_list_next (cur)) {
2585 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2587 GST_LOG_OBJECT (splitmux,
2588 "Context %p sink pad %" GST_PTR_FORMAT " @ TS %" GST_STIME_FORMAT
2589 " EOS %d", tmpctx, tmpctx->sinkpad,
2590 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2592 if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
2593 tmpctx->in_running_time < splitmux->max_in_running_time &&
2595 GST_LOG_OBJECT (splitmux,
2596 "Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
2597 tmpctx, tmpctx->sinkpad);
2603 GST_DEBUG_OBJECT (splitmux,
2604 "Collected GOP is complete. Processing (ctx %p)", ctx);
2605 /* All pads have a complete GOP, release it into the multiqueue */
2606 handle_gathered_gop (splitmux);
2608 /* The user has requested a split, we can split now that the previous GOP
2609 * has been collected to the correct location */
2610 if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested),
2612 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2617 /* If upstream reached EOS we are not expecting more data, no need to wait
2622 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2624 (ctx->in_running_time >= splitmux->max_in_running_time) &&
2625 (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
2626 /* Some pad is not yet ready, or GOP is being pushed
2627 * either way, sleep and wait to get woken */
2628 GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2629 GST_SPLITMUX_WAIT_INPUT (splitmux);
2630 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2632 /* This pad is not ready or the state changed - break out and get another
2636 } while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT);
2639 static GstPadProbeReturn
2640 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2642 GstSplitMuxSink *splitmux = ctx->splitmux;
2643 GstFlowReturn ret = GST_FLOW_OK;
2645 MqStreamBuf *buf_info = NULL;
2647 gboolean loop_again;
2648 gboolean keyframe = FALSE;
2650 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2652 /* FIXME: Handle buffer lists, until then make it clear they won't work */
2653 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2654 g_warning ("Buffer list handling not implemented");
2655 return GST_PAD_PROBE_DROP;
2657 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2658 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2659 GstEvent *event = gst_pad_probe_info_get_event (info);
2661 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2663 switch (GST_EVENT_TYPE (event)) {
2664 case GST_EVENT_SEGMENT:
2665 gst_event_copy_segment (event, &ctx->in_segment);
2667 case GST_EVENT_FLUSH_STOP:
2668 GST_SPLITMUX_LOCK (splitmux);
2669 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2670 ctx->in_eos = FALSE;
2671 ctx->in_running_time = GST_CLOCK_STIME_NONE;
2672 GST_SPLITMUX_UNLOCK (splitmux);
2675 GST_SPLITMUX_LOCK (splitmux);
2678 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2679 ret = GST_FLOW_FLUSHING;
2683 if (ctx->is_reference) {
2684 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2685 /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2686 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2687 /* Wake up other input pads to collect this GOP */
2688 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2689 check_completed_gop (splitmux, ctx);
2690 } else if (splitmux->input_state ==
2691 SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2692 /* If we are waiting for a GOP to be completed (ie, for aux
2693 * pads to catch up), then this pad is complete, so check
2694 * if the whole GOP is.
2696 check_completed_gop (splitmux, ctx);
2698 GST_SPLITMUX_UNLOCK (splitmux);
2700 case GST_EVENT_GAP:{
2701 GstClockTime gap_ts;
2702 GstClockTimeDiff rtime;
2704 gst_event_parse_gap (event, &gap_ts, NULL);
2705 if (gap_ts == GST_CLOCK_TIME_NONE)
2708 GST_SPLITMUX_LOCK (splitmux);
2710 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2711 ret = GST_FLOW_FLUSHING;
2714 rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2716 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2717 GST_STIME_ARGS (rtime));
2719 if (ctx->is_reference
2720 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2721 splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
2722 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2723 GST_STIME_ARGS (splitmux->fragment_start_time));
2724 /* Also take this as the first start time when starting up,
2725 * so that we start counting overflow from the first frame */
2726 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2727 splitmux->max_in_running_time = splitmux->fragment_start_time;
2730 GST_SPLITMUX_UNLOCK (splitmux);
2736 return GST_PAD_PROBE_PASS;
2737 } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2738 switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2739 case GST_QUERY_ALLOCATION:
2740 return GST_PAD_PROBE_DROP;
2742 return GST_PAD_PROBE_PASS;
2746 buf = gst_pad_probe_info_get_buffer (info);
2747 buf_info = mq_stream_buf_new ();
2749 if (GST_BUFFER_PTS_IS_VALID (buf))
2750 ts = GST_BUFFER_PTS (buf);
2752 ts = GST_BUFFER_DTS (buf);
2754 GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2756 GST_SPLITMUX_LOCK (splitmux);
2758 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
2759 ret = GST_FLOW_FLUSHING;
2763 /* If this buffer has a timestamp, advance the input timestamp of the
2765 if (GST_CLOCK_TIME_IS_VALID (ts)) {
2766 GstClockTimeDiff running_time =
2767 my_segment_to_running_time (&ctx->in_segment, ts);
2769 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2770 GST_STIME_ARGS (running_time));
2772 if (GST_CLOCK_STIME_IS_VALID (running_time)
2773 && running_time > ctx->in_running_time)
2774 ctx->in_running_time = running_time;
2777 /* Try to make sure we have a valid running time */
2778 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2779 ctx->in_running_time =
2780 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2783 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2784 GST_STIME_ARGS (ctx->in_running_time));
2786 buf_info->run_ts = ctx->in_running_time;
2787 buf_info->buf_size = gst_buffer_get_size (buf);
2788 buf_info->duration = GST_BUFFER_DURATION (buf);
2790 if (ctx->is_reference) {
2791 /* initialize fragment_start_time */
2792 if (splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2793 splitmux->gop_start_time = splitmux->fragment_start_time =
2795 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2796 GST_STIME_ARGS (splitmux->fragment_start_time));
2798 /* Also take this as the first start time when starting up,
2799 * so that we start counting overflow from the first frame */
2800 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2801 splitmux->max_in_running_time = splitmux->fragment_start_time;
2804 if (splitmux->tc_interval) {
2805 GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
2807 video_time_code_replace (&splitmux->in_tc, &tc_meta->tc);
2809 if (!splitmux->fragment_start_tc) {
2810 /* also initialize fragment_start_tc */
2811 video_time_code_replace (&splitmux->gop_start_tc, &tc_meta->tc);
2812 video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
2814 splitmux->next_fragment_start_tc_time =
2815 calculate_next_max_timecode (splitmux, splitmux->in_tc,
2816 ctx->in_running_time, NULL);
2817 GST_DEBUG_OBJECT (splitmux, "Initialize next fragment start tc time %"
2819 GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
2824 /* Check whether we need to request next keyframe depending on
2825 * current running time */
2826 if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) &&
2827 request_next_keyframe (splitmux, buf, ctx->in_running_time) == FALSE) {
2828 GST_WARNING_OBJECT (splitmux,
2829 "Could not request a keyframe. Files may not split at the exact location they should");
2833 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
2834 " total GOP bytes %" G_GUINT64_FORMAT,
2835 GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
2842 switch (splitmux->input_state) {
2843 case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
2844 if (ctx->is_releasing) {
2845 /* The pad belonging to this context is being released */
2846 GST_WARNING_OBJECT (pad, "Pad is being released while the muxer is "
2847 "running. Data might not drain correctly");
2849 } else if (ctx->is_reference) {
2850 /* This is the reference context. If it's a keyframe,
2851 * it marks the start of a new GOP and we should wait in
2852 * check_completed_gop before continuing, but either way
2853 * (keyframe or no, we'll pass this buffer through after
2854 * so set loop_again to FALSE */
2857 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
2858 /* Allow other input pads to catch up to here too */
2859 splitmux->max_in_running_time = ctx->in_running_time;
2860 GST_LOG_OBJECT (splitmux,
2861 "Max in running time now %" GST_TIME_FORMAT,
2862 GST_TIME_ARGS (splitmux->max_in_running_time));
2863 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2866 GST_INFO_OBJECT (pad,
2867 "Have keyframe with running time %" GST_STIME_FORMAT,
2868 GST_STIME_ARGS (ctx->in_running_time));
2870 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2871 splitmux->max_in_running_time = ctx->in_running_time;
2872 GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
2873 GST_TIME_ARGS (splitmux->max_in_running_time));
2874 /* Wake up other input pads to collect this GOP */
2875 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2876 check_completed_gop (splitmux, ctx);
2878 /* Pass this buffer if the reference ctx is far enough ahead */
2879 if (ctx->in_running_time < splitmux->max_in_running_time) {
2884 /* We're still waiting for a keyframe on the reference pad, sleep */
2885 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
2886 GST_SPLITMUX_WAIT_INPUT (splitmux);
2887 GST_LOG_OBJECT (pad,
2888 "Done sleeping for GOP start input state now %d",
2889 splitmux->input_state);
2892 case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
2893 /* We're collecting a GOP. If this is the reference context,
2894 * we need to check if this is a keyframe that marks the start
2895 * of the next GOP. If it is, it marks the end of the GOP we're
2896 * collecting, so sleep and wait until all the other pads also
2897 * reach that timestamp - at which point, we have an entire GOP
2898 * and either go to ENDING_FILE or release this GOP to the muxer and
2899 * go back to COLLECT_GOP_START. */
2901 /* If we overran the target timestamp, it might be time to process
2902 * the GOP, otherwise bail out for more data
2904 GST_LOG_OBJECT (pad,
2905 "Checking TS %" GST_STIME_FORMAT " against max %"
2906 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
2907 GST_STIME_ARGS (splitmux->max_in_running_time));
2909 if (ctx->in_running_time < splitmux->max_in_running_time) {
2914 GST_LOG_OBJECT (pad,
2915 "Collected last packet of GOP. Checking other pads");
2916 check_completed_gop (splitmux, ctx);
2919 case SPLITMUX_INPUT_STATE_FINISHING_UP:
2930 splitmux->queued_keyframes++;
2931 buf_info->keyframe = TRUE;
2934 /* Update total input byte counter for overflow detect */
2935 splitmux->gop_total_bytes += buf_info->buf_size;
2936 if (ctx->is_reference) {
2937 splitmux->gop_reference_bytes += buf_info->buf_size;
2940 /* Now add this buffer to the queue just before returning */
2941 g_queue_push_head (&ctx->queued_bufs, buf_info);
2943 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
2944 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
2946 GST_SPLITMUX_UNLOCK (splitmux);
2947 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
2948 return GST_PAD_PROBE_PASS;
2951 GST_SPLITMUX_UNLOCK (splitmux);
2953 mq_stream_buf_free (buf_info);
2954 GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
2955 return GST_PAD_PROBE_PASS;
2959 grow_blocked_queues (GstSplitMuxSink * splitmux)
2963 /* Scan other queues for full-ness and grow them */
2964 for (cur = g_list_first (splitmux->contexts);
2965 cur != NULL; cur = g_list_next (cur)) {
2966 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2968 guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
2970 g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
2971 GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
2973 if (cur_len >= cur_limit) {
2974 cur_limit = cur_len + 1;
2975 GST_DEBUG_OBJECT (tmpctx->q,
2976 "Queue overflowed and needs enlarging. Growing to %u buffers",
2978 g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
2984 handle_q_underrun (GstElement * q, gpointer user_data)
2986 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2987 GstSplitMuxSink *splitmux = ctx->splitmux;
2989 GST_SPLITMUX_LOCK (splitmux);
2990 GST_DEBUG_OBJECT (q,
2991 "Queue reported underrun with %d keyframes and %d cmds enqueued",
2992 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2993 grow_blocked_queues (splitmux);
2994 GST_SPLITMUX_UNLOCK (splitmux);
2998 handle_q_overrun (GstElement * q, gpointer user_data)
3000 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
3001 GstSplitMuxSink *splitmux = ctx->splitmux;
3002 gboolean allow_grow = FALSE;
3004 GST_SPLITMUX_LOCK (splitmux);
3005 GST_DEBUG_OBJECT (q,
3006 "Queue reported overrun with %d keyframes and %d cmds enqueued",
3007 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
3009 if (splitmux->queued_keyframes < 2) {
3010 /* Less than a full GOP queued, grow the queue */
3012 } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
3015 /* If another queue is starved, grow */
3017 for (cur = g_list_first (splitmux->contexts);
3018 cur != NULL; cur = g_list_next (cur)) {
3019 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
3020 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
3025 GST_SPLITMUX_UNLOCK (splitmux);
3030 g_object_get (q, "max-size-buffers", &cur_limit, NULL);
3033 GST_DEBUG_OBJECT (q,
3034 "Queue overflowed and needs enlarging. Growing to %u buffers",
3037 g_object_set (q, "max-size-buffers", cur_limit, NULL);
3041 /* Called with SPLITMUX lock held */
3042 static const gchar *
3043 lookup_muxer_pad (GstSplitMuxSink * splitmux, const gchar * sinkpad_name)
3045 const gchar *ret = NULL;
3047 if (splitmux->muxerpad_map == NULL)
3050 if (sinkpad_name == NULL) {
3051 GST_WARNING_OBJECT (splitmux,
3052 "Can't look up request pad in pad map without providing a pad name");
3056 ret = gst_structure_get_string (splitmux->muxerpad_map, sinkpad_name);
3058 GST_INFO_OBJECT (splitmux, "Sink pad %s maps to muxer pad %s", sinkpad_name,
3060 return g_strdup (ret);
3067 gst_splitmux_sink_request_new_pad (GstElement * element,
3068 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
3070 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3071 GstPadTemplate *mux_template = NULL;
3072 GstPad *ret = NULL, *muxpad = NULL;
3074 GstPad *q_sink = NULL, *q_src = NULL;
3075 gchar *gname, *qname;
3076 gboolean is_primary_video = FALSE, is_video = FALSE,
3077 muxer_is_requestpad = FALSE;
3079 const gchar *muxer_padname = NULL;
3081 GST_DEBUG_OBJECT (splitmux, "templ:%s, name:%s", templ->name_template, name);
3083 GST_SPLITMUX_LOCK (splitmux);
3084 if (!create_muxer (splitmux))
3086 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3088 if (g_str_equal (templ->name_template, "video") ||
3089 g_str_has_prefix (templ->name_template, "video_aux_")) {
3090 is_primary_video = g_str_equal (templ->name_template, "video");
3091 if (is_primary_video && splitmux->have_video)
3092 goto already_have_video;
3096 /* See if there's a pad map and it lists this pad */
3097 muxer_padname = lookup_muxer_pad (splitmux, name);
3099 if (muxer_padname == NULL) {
3101 /* FIXME: Look for a pad template with matching caps, rather than by name */
3102 GST_DEBUG_OBJECT (element,
3103 "searching for pad-template with name 'video_%%u'");
3105 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3106 (splitmux->muxer), "video_%u");
3108 /* Fallback to find sink pad templates named 'video' (flvmux) */
3109 if (!mux_template) {
3110 GST_DEBUG_OBJECT (element,
3111 "searching for pad-template with name 'video'");
3113 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3114 (splitmux->muxer), "video");
3118 GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
3119 templ->name_template);
3121 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3122 (splitmux->muxer), templ->name_template);
3124 /* Fallback to find sink pad templates named 'audio' (flvmux) */
3125 if (!mux_template && g_str_has_prefix (templ->name_template, "audio_")) {
3126 GST_DEBUG_OBJECT (element,
3127 "searching for pad-template with name 'audio'");
3129 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3130 (splitmux->muxer), "audio");
3135 if (mux_template == NULL) {
3136 GST_DEBUG_OBJECT (element,
3137 "searching for pad-template with name 'sink_%%d'");
3139 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3140 (splitmux->muxer), "sink_%d");
3143 if (mux_template == NULL) {
3144 GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
3146 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
3147 (splitmux->muxer), "sink");
3151 if (mux_template == NULL) {
3152 GST_ERROR_OBJECT (element,
3153 "unable to find a suitable sink pad-template on the muxer");
3156 GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
3157 mux_template->name_template);
3159 if (mux_template->presence == GST_PAD_REQUEST) {
3160 GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
3163 gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
3164 muxer_is_requestpad = TRUE;
3165 } else if (mux_template->presence == GST_PAD_ALWAYS) {
3166 GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
3169 gst_element_get_static_pad (splitmux->muxer,
3170 mux_template->name_template);
3172 GST_ERROR_OBJECT (element,
3173 "unexpected pad presence %d", mux_template->presence);
3177 /* Have a muxer pad name */
3178 if (!(muxpad = gst_element_get_static_pad (splitmux->muxer, muxer_padname))) {
3180 gst_element_request_pad_simple (splitmux->muxer, muxer_padname)))
3181 muxer_is_requestpad = TRUE;
3183 g_free ((gchar *) muxer_padname);
3184 muxer_padname = NULL;
3187 /* One way or another, we must have a muxer pad by now */
3191 if (is_primary_video)
3192 gname = g_strdup ("video");
3193 else if (name == NULL)
3194 gname = gst_pad_get_name (muxpad);
3196 gname = g_strdup (name);
3198 qname = g_strdup_printf ("queue_%s", gname);
3199 if ((q = create_element (splitmux, "queue", qname, FALSE)) == NULL) {
3205 gst_element_set_state (q, GST_STATE_TARGET (splitmux));
3207 g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
3208 "max-size-buffers", 5, NULL);
3210 q_sink = gst_element_get_static_pad (q, "sink");
3211 q_src = gst_element_get_static_pad (q, "src");
3213 if (gst_pad_link (q_src, muxpad) != GST_PAD_LINK_OK) {
3214 if (muxer_is_requestpad)
3215 gst_element_release_request_pad (splitmux->muxer, muxpad);
3216 gst_object_unref (GST_OBJECT (muxpad));
3220 gst_object_unref (GST_OBJECT (muxpad));
3222 ctx = mq_stream_ctx_new (splitmux);
3223 /* Context holds a ref: */
3224 ctx->q = gst_object_ref (q);
3225 ctx->srcpad = q_src;
3226 ctx->sinkpad = q_sink;
3228 g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
3229 g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
3231 ctx->src_pad_block_id =
3232 gst_pad_add_probe (q_src,
3233 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
3234 (GstPadProbeCallback) handle_mq_output, ctx, NULL);
3235 if (is_primary_video && splitmux->reference_ctx != NULL) {
3236 splitmux->reference_ctx->is_reference = FALSE;
3237 splitmux->reference_ctx = NULL;
3239 if (splitmux->reference_ctx == NULL) {
3240 splitmux->reference_ctx = ctx;
3241 ctx->is_reference = TRUE;
3244 ret = gst_ghost_pad_new_from_template (gname, q_sink, templ);
3245 g_object_set_qdata ((GObject *) (ret), PAD_CONTEXT, ctx);
3247 ctx->sink_pad_block_id =
3248 gst_pad_add_probe (q_sink,
3249 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
3250 GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
3251 (GstPadProbeCallback) handle_mq_input, ctx, NULL);
3253 GST_DEBUG_OBJECT (splitmux, "splitmuxsink pad %" GST_PTR_FORMAT
3254 " feeds queue pad %" GST_PTR_FORMAT, ret, q_sink);
3256 splitmux->contexts = g_list_append (splitmux->contexts, ctx);
3260 if (is_primary_video)
3261 splitmux->have_video = TRUE;
3263 gst_pad_set_active (ret, TRUE);
3264 gst_element_add_pad (GST_ELEMENT (splitmux), ret);
3266 GST_SPLITMUX_UNLOCK (splitmux);
3270 GST_SPLITMUX_UNLOCK (splitmux);
3273 gst_object_unref (q_sink);
3275 gst_object_unref (q_src);
3278 GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
3279 GST_SPLITMUX_UNLOCK (splitmux);
3284 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
3286 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3287 GstPad *muxpad = NULL;
3289 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
3291 GST_SPLITMUX_LOCK (splitmux);
3293 if (splitmux->muxer == NULL)
3294 goto fail; /* Elements don't exist yet - nothing to release */
3296 GST_INFO_OBJECT (pad, "releasing request pad");
3298 muxpad = gst_pad_get_peer (ctx->srcpad);
3300 /* Remove the context from our consideration */
3301 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
3303 GST_SPLITMUX_UNLOCK (splitmux);
3305 if (ctx->sink_pad_block_id) {
3306 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
3307 gst_pad_send_event (ctx->sinkpad, gst_event_new_flush_start ());
3310 if (ctx->src_pad_block_id)
3311 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
3313 GST_SPLITMUX_LOCK (splitmux);
3315 ctx->is_releasing = TRUE;
3316 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3318 /* Can release the context now */
3319 mq_stream_ctx_free (ctx);
3320 if (ctx == splitmux->reference_ctx)
3321 splitmux->reference_ctx = NULL;
3323 /* Release and free the muxer input */
3325 gst_element_release_request_pad (splitmux->muxer, muxpad);
3326 gst_object_unref (muxpad);
3329 if (GST_PAD_PAD_TEMPLATE (pad) &&
3330 g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
3332 splitmux->have_video = FALSE;
3334 gst_element_remove_pad (element, pad);
3336 /* Reset the internal elements only after all request pads are released */
3337 if (splitmux->contexts == NULL)
3338 gst_splitmux_reset_elements (splitmux);
3340 /* Wake up other input streams to check if the completion conditions have
3342 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3345 GST_SPLITMUX_UNLOCK (splitmux);
3349 create_element (GstSplitMuxSink * splitmux,
3350 const gchar * factory, const gchar * name, gboolean locked)
3352 GstElement *ret = gst_element_factory_make (factory, name);
3354 g_warning ("Failed to create %s - splitmuxsink will not work", name);
3359 /* Ensure the sink starts in locked state and NULL - it will be changed
3360 * by the filename setting code */
3361 gst_element_set_locked_state (ret, TRUE);
3362 gst_element_set_state (ret, GST_STATE_NULL);
3365 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
3366 g_warning ("Could not add %s element - splitmuxsink will not work", name);
3367 gst_object_unref (ret);
3375 create_muxer (GstSplitMuxSink * splitmux)
3377 /* Create internal elements */
3378 if (splitmux->muxer == NULL) {
3379 GstElement *provided_muxer = NULL;
3381 GST_OBJECT_LOCK (splitmux);
3382 if (splitmux->provided_muxer != NULL)
3383 provided_muxer = gst_object_ref (splitmux->provided_muxer);
3384 GST_OBJECT_UNLOCK (splitmux);
3386 if ((!splitmux->async_finalize && provided_muxer == NULL) ||
3387 (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
3388 if ((splitmux->muxer =
3389 create_element (splitmux,
3390 splitmux->muxer_factory ? splitmux->
3391 muxer_factory : DEFAULT_MUXER, "muxer", FALSE)) == NULL)
3393 } else if (splitmux->async_finalize) {
3394 if ((splitmux->muxer =
3395 create_element (splitmux, splitmux->muxer_factory, "muxer",
3398 if (splitmux->muxer_preset && GST_IS_PRESET (splitmux->muxer))
3399 gst_preset_load_preset (GST_PRESET (splitmux->muxer),
3400 splitmux->muxer_preset);
3401 if (splitmux->muxer_properties)
3402 gst_structure_foreach (splitmux->muxer_properties,
3403 _set_property_from_structure, splitmux->muxer);
3405 /* Ensure it's not in locked state (we might be reusing an old element) */
3406 gst_element_set_locked_state (provided_muxer, FALSE);
3407 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
3408 g_warning ("Could not add muxer element - splitmuxsink will not work");
3409 gst_object_unref (provided_muxer);
3413 splitmux->muxer = provided_muxer;
3414 gst_object_unref (provided_muxer);
3417 if (splitmux->use_robust_muxing) {
3418 update_muxer_properties (splitmux);
3428 find_sink (GstElement * e)
3430 GstElement *res = NULL;
3432 gboolean done = FALSE;
3433 GValue data = { 0, };
3435 if (!GST_IS_BIN (e))
3438 if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
3441 iter = gst_bin_iterate_sinks (GST_BIN (e));
3443 switch (gst_iterator_next (iter, &data)) {
3444 case GST_ITERATOR_OK:
3446 GstElement *child = g_value_get_object (&data);
3447 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
3448 "location") != NULL) {
3452 g_value_reset (&data);
3455 case GST_ITERATOR_RESYNC:
3456 gst_iterator_resync (iter);
3458 case GST_ITERATOR_DONE:
3461 case GST_ITERATOR_ERROR:
3462 g_assert_not_reached ();
3466 g_value_unset (&data);
3467 gst_iterator_free (iter);
3473 create_sink (GstSplitMuxSink * splitmux)
3475 GstElement *provided_sink = NULL;
3477 if (splitmux->active_sink == NULL) {
3479 GST_OBJECT_LOCK (splitmux);
3480 if (splitmux->provided_sink != NULL)
3481 provided_sink = gst_object_ref (splitmux->provided_sink);
3482 GST_OBJECT_UNLOCK (splitmux);
3484 if ((!splitmux->async_finalize && provided_sink == NULL) ||
3485 (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
3486 if ((splitmux->sink =
3487 create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
3489 splitmux->active_sink = splitmux->sink;
3490 } else if (splitmux->async_finalize) {
3491 if ((splitmux->sink =
3492 create_element (splitmux, splitmux->sink_factory, "sink",
3495 if (splitmux->sink_preset && GST_IS_PRESET (splitmux->sink))
3496 gst_preset_load_preset (GST_PRESET (splitmux->sink),
3497 splitmux->sink_preset);
3498 if (splitmux->sink_properties)
3499 gst_structure_foreach (splitmux->sink_properties,
3500 _set_property_from_structure, splitmux->sink);
3501 splitmux->active_sink = splitmux->sink;
3503 /* Ensure the sink starts in locked state and NULL - it will be changed
3504 * by the filename setting code */
3505 gst_element_set_locked_state (provided_sink, TRUE);
3506 gst_element_set_state (provided_sink, GST_STATE_NULL);
3507 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
3508 g_warning ("Could not add sink elements - splitmuxsink will not work");
3509 gst_object_unref (provided_sink);
3513 splitmux->active_sink = provided_sink;
3515 /* The bin holds a ref now, we can drop our tmp ref */
3516 gst_object_unref (provided_sink);
3518 /* Find the sink element */
3519 splitmux->sink = find_sink (splitmux->active_sink);
3520 if (splitmux->sink == NULL) {
3522 ("Could not locate sink element in provided sink - splitmuxsink will not work");
3528 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3530 /* async child elements are causing state change races and weird
3531 * failures, so let's try and turn that off */
3532 g_object_set (splitmux->sink, "async", FALSE, NULL);
3536 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
3537 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
3548 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
3551 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3553 gchar *fname = NULL;
3557 gst_splitmux_sink_ensure_max_files (splitmux);
3559 if (ctx->cur_out_buffer == NULL) {
3560 GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3563 caps = gst_pad_get_current_caps (ctx->srcpad);
3564 sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3565 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3566 splitmux->fragment_id, sample, &fname);
3567 gst_sample_unref (sample);
3569 gst_caps_unref (caps);
3571 if (fname == NULL) {
3572 /* Fallback to the old signal if the new one returned nothing */
3573 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3574 splitmux->fragment_id, &fname);
3578 fname = splitmux->location ?
3579 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3582 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3583 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
3584 "location") != NULL)
3585 g_object_set (splitmux->sink, "location", fname, NULL);
3589 splitmux->fragment_id++;
3592 /* called with GST_SPLITMUX_LOCK */
3594 do_async_start (GstSplitMuxSink * splitmux)
3596 GstMessage *message;
3598 if (!splitmux->need_async_start) {
3599 GST_INFO_OBJECT (splitmux, "no async_start needed");
3603 splitmux->async_pending = TRUE;
3605 GST_INFO_OBJECT (splitmux, "Sending async_start message");
3606 message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3608 GST_SPLITMUX_UNLOCK (splitmux);
3609 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3610 (splitmux), message);
3611 GST_SPLITMUX_LOCK (splitmux);
3614 /* called with GST_SPLITMUX_LOCK */
3616 do_async_done (GstSplitMuxSink * splitmux)
3618 GstMessage *message;
3620 if (splitmux->async_pending) {
3621 GST_INFO_OBJECT (splitmux, "Sending async_done message");
3622 splitmux->async_pending = FALSE;
3623 GST_SPLITMUX_UNLOCK (splitmux);
3626 gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3627 GST_CLOCK_TIME_NONE);
3628 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3629 (splitmux), message);
3630 GST_SPLITMUX_LOCK (splitmux);
3633 splitmux->need_async_start = FALSE;
3637 gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
3639 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3640 splitmux->gop_start_time = splitmux->fragment_start_time =
3641 GST_CLOCK_STIME_NONE;
3642 splitmux->max_out_running_time = 0;
3643 splitmux->fragment_total_bytes = 0;
3644 splitmux->fragment_reference_bytes = 0;
3645 splitmux->gop_total_bytes = 0;
3646 splitmux->gop_reference_bytes = 0;
3647 splitmux->muxed_out_bytes = 0;
3648 splitmux->ready_for_output = FALSE;
3650 g_atomic_int_set (&(splitmux->split_requested), FALSE);
3651 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3653 splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
3654 gst_queue_array_clear (splitmux->times_to_split);
3656 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_reset, NULL);
3657 splitmux->queued_keyframes = 0;
3659 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
3660 g_queue_clear (&splitmux->out_cmd_q);
3663 static GstStateChangeReturn
3664 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3666 GstStateChangeReturn ret;
3667 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3669 switch (transition) {
3670 case GST_STATE_CHANGE_NULL_TO_READY:{
3671 GST_SPLITMUX_LOCK (splitmux);
3672 if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3673 ret = GST_STATE_CHANGE_FAILURE;
3674 GST_SPLITMUX_UNLOCK (splitmux);
3677 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3678 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3679 GST_SPLITMUX_UNLOCK (splitmux);
3680 splitmux->fragment_id = splitmux->start_index;
3683 case GST_STATE_CHANGE_READY_TO_PAUSED:{
3684 GST_SPLITMUX_LOCK (splitmux);
3685 /* Make sure contexts and tracking times are cleared, in case we're being reused */
3686 gst_splitmux_sink_reset (splitmux);
3687 /* Start by collecting one input on each pad */
3688 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3689 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3691 GST_SPLITMUX_UNLOCK (splitmux);
3693 GST_SPLITMUX_STATE_LOCK (splitmux);
3694 splitmux->shutdown = FALSE;
3695 GST_SPLITMUX_STATE_UNLOCK (splitmux);
3698 case GST_STATE_CHANGE_PAUSED_TO_READY:
3699 case GST_STATE_CHANGE_READY_TO_READY:
3700 g_atomic_int_set (&(splitmux->split_requested), FALSE);
3701 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3703 case GST_STATE_CHANGE_READY_TO_NULL:
3704 GST_SPLITMUX_STATE_LOCK (splitmux);
3705 splitmux->shutdown = TRUE;
3706 GST_SPLITMUX_STATE_UNLOCK (splitmux);
3708 GST_SPLITMUX_LOCK (splitmux);
3709 gst_splitmux_sink_reset (splitmux);
3710 splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3711 splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3712 /* Wake up any blocked threads */
3713 GST_LOG_OBJECT (splitmux,
3714 "State change -> NULL or READY. Waking threads");
3715 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3716 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
3717 GST_SPLITMUX_UNLOCK (splitmux);
3723 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3724 if (ret == GST_STATE_CHANGE_FAILURE)
3727 switch (transition) {
3728 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3729 splitmux->need_async_start = TRUE;
3731 case GST_STATE_CHANGE_READY_TO_PAUSED:{
3732 /* Change state async, because our child sink might not
3733 * be ready to do that for us yet if it's state is still locked */
3735 splitmux->need_async_start = TRUE;
3736 /* we want to go async to PAUSED until we managed to configure and add the
3738 GST_SPLITMUX_LOCK (splitmux);
3739 do_async_start (splitmux);
3740 GST_SPLITMUX_UNLOCK (splitmux);
3741 ret = GST_STATE_CHANGE_ASYNC;
3744 case GST_STATE_CHANGE_READY_TO_NULL:
3745 GST_SPLITMUX_LOCK (splitmux);
3746 splitmux->fragment_id = 0;
3747 /* Reset internal elements only if no pad contexts are using them */
3748 if (splitmux->contexts == NULL)
3749 gst_splitmux_reset_elements (splitmux);
3750 do_async_done (splitmux);
3751 GST_SPLITMUX_UNLOCK (splitmux);
3760 if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
3761 /* Cleanup elements on failed transition out of NULL */
3762 gst_splitmux_reset_elements (splitmux);
3763 GST_SPLITMUX_LOCK (splitmux);
3764 do_async_done (splitmux);
3765 GST_SPLITMUX_UNLOCK (splitmux);
3767 if (transition == GST_STATE_CHANGE_READY_TO_READY) {
3768 /* READY to READY transition only happens when we're already
3769 * in READY state, but a child element is in NULL, which
3770 * happens when there's an error changing the state of the sink.
3771 * We need to make sure not to fail the state transition, or
3772 * the core won't transition us back to NULL successfully */
3773 ret = GST_STATE_CHANGE_SUCCESS;
3779 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
3781 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
3782 splitmux->fragment_id = 0;
3787 split_now (GstSplitMuxSink * splitmux)
3789 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
3793 split_after (GstSplitMuxSink * splitmux)
3795 g_atomic_int_set (&(splitmux->split_requested), TRUE);
3799 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
3801 gboolean send_keyframe_requests;
3803 GST_SPLITMUX_LOCK (splitmux);
3804 gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
3805 send_keyframe_requests = splitmux->send_keyframe_requests;
3806 GST_SPLITMUX_UNLOCK (splitmux);
3808 if (send_keyframe_requests) {
3810 gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
3811 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
3812 GST_TIME_ARGS (split_time));
3813 if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
3814 GST_WARNING_OBJECT (splitmux,
3815 "Could not request keyframe at %" GST_TIME_FORMAT,
3816 GST_TIME_ARGS (split_time));