1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @short_description: Muxer wrapper for splitting output stream by size or time
24 * This element wraps a muxer and a sink, and starts a new file when the mux
25 * contents are about to cross a threshold of maximum size of maximum time,
26 * splitting at video keyframe boundaries. Exactly one input video stream
27 * can be muxed, with as many accompanying audio and subtitle streams as
30 * By default, it uses mp4mux and filesink, but they can be changed via
31 * the 'muxer' and 'sink' properties.
33 * The minimum file size is 1 GOP, however - so limits may be overrun if the
34 * distance between any 2 keyframes is larger than the limits.
36 * If a video stream is available, the splitting process is driven by the video
37 * stream contents, and the video stream must contain closed GOPs for the output
38 * file parts to be played individually correctly. In the absence of a video
39 * stream, the first available stream is used as reference for synchronization.
41 * In the async-finalize mode, when the threshold is crossed, the old muxer
42 * and sink is disconnected from the pipeline and left to finish the file
43 * asynchronously, and a new muxer and sink is created to continue with the
44 * next fragment. For that reason, instead of muxer and sink objects, the
45 * muxer-factory and sink-factory properties are used to construct the new
46 * objects, together with muxer-properties and sink-properties.
49 * <title>Example pipelines</title>
51 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
53 * Records a video stream captured from a v4l2 device and muxes it into
54 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55 * and 1MB maximum size.
58 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
60 * Records a video stream captured from a v4l2 device and muxer it into
61 * streamable Matroska files, splitting as needed to limit size/duration to 10
62 * seconds. Each file will finalize asynchronously.
71 #include <glib/gstdio.h>
72 #include <gst/video/video.h>
73 #include "gstsplitmuxsink.h"
75 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
76 #define GST_CAT_DEFAULT splitmux_debug
78 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
79 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
80 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
81 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
83 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
84 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
86 static void split_now (GstSplitMuxSink * splitmux);
94 PROP_MAX_SIZE_TIMECODE,
95 PROP_SEND_KEYFRAME_REQUESTS,
98 PROP_USE_ROBUST_MUXING,
99 PROP_ALIGNMENT_THRESHOLD,
105 PROP_MUXER_PROPERTIES,
110 #define DEFAULT_MAX_SIZE_TIME 0
111 #define DEFAULT_MAX_SIZE_BYTES 0
112 #define DEFAULT_MAX_FILES 0
113 #define DEFAULT_MUXER_OVERHEAD 0.02
114 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
115 #define DEFAULT_ALIGNMENT_THRESHOLD 0
116 #define DEFAULT_MUXER "mp4mux"
117 #define DEFAULT_SINK "filesink"
118 #define DEFAULT_USE_ROBUST_MUXING FALSE
119 #define DEFAULT_RESET_MUXER TRUE
120 #define DEFAULT_ASYNC_FINALIZE FALSE
122 typedef struct _AsyncEosHelper
130 SIGNAL_FORMAT_LOCATION,
131 SIGNAL_FORMAT_LOCATION_FULL,
138 static guint signals[SIGNAL_LAST];
140 static GstStaticPadTemplate video_sink_template =
141 GST_STATIC_PAD_TEMPLATE ("video",
144 GST_STATIC_CAPS_ANY);
145 static GstStaticPadTemplate audio_sink_template =
146 GST_STATIC_PAD_TEMPLATE ("audio_%u",
149 GST_STATIC_CAPS_ANY);
150 static GstStaticPadTemplate subtitle_sink_template =
151 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
154 GST_STATIC_CAPS_ANY);
155 static GstStaticPadTemplate caption_sink_template =
156 GST_STATIC_PAD_TEMPLATE ("caption_%u",
159 GST_STATIC_CAPS_ANY);
161 static GQuark PAD_CONTEXT;
162 static GQuark EOS_FROM_US;
163 static GQuark RUNNING_TIME;
164 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
165 * to forward an incoming EOS message, but we cannot rely on the state of the
166 * splitmux anymore, so we set this qdata on the sink instead.
167 * The muxer and sink must be destroyed after both of these things have
169 * 1) The EOS message has been sent when the fragment is ending
170 * 2) The muxer has been unlinked and relinked
171 * Therefore, EOS_FROM_US can have these two values:
172 * 0: EOS was not requested from us. Forward the message. The muxer and the
173 * sink will be destroyed together with the rest of the bin.
174 * 1: EOS was requested from us, but the other of the two tasks hasn't
175 * finished. Set EOS_FROM_US to 2 and do your stuff.
176 * 2: EOS was requested from us and the other of the two tasks has finished.
177 * Now we can destroy the muxer and the sink.
183 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
184 EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
185 RUNNING_TIME = g_quark_from_static_string ("running-time");
188 #define gst_splitmux_sink_parent_class parent_class
189 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
192 static gboolean create_muxer (GstSplitMuxSink * splitmux);
193 static gboolean create_sink (GstSplitMuxSink * splitmux);
194 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
195 const GValue * value, GParamSpec * pspec);
196 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
197 GValue * value, GParamSpec * pspec);
198 static void gst_splitmux_sink_dispose (GObject * object);
199 static void gst_splitmux_sink_finalize (GObject * object);
201 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
202 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
203 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
205 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
206 element, GstStateChange transition);
208 static void bus_handler (GstBin * bin, GstMessage * msg);
209 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
210 static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
211 static void mq_stream_ctx_free (MqStreamCtx * ctx);
212 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
214 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
215 static GstElement *create_element (GstSplitMuxSink * splitmux,
216 const gchar * factory, const gchar * name, gboolean locked);
218 static void do_async_done (GstSplitMuxSink * splitmux);
221 mq_stream_buf_new (void)
223 return g_slice_new0 (MqStreamBuf);
227 mq_stream_buf_free (MqStreamBuf * data)
229 g_slice_free (MqStreamBuf, data);
232 static SplitMuxOutputCommand *
233 out_cmd_buf_new (void)
235 return g_slice_new0 (SplitMuxOutputCommand);
239 out_cmd_buf_free (SplitMuxOutputCommand * data)
241 g_slice_free (SplitMuxOutputCommand, data);
245 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
247 GObjectClass *gobject_class = (GObjectClass *) klass;
248 GstElementClass *gstelement_class = (GstElementClass *) klass;
249 GstBinClass *gstbin_class = (GstBinClass *) klass;
251 gobject_class->set_property = gst_splitmux_sink_set_property;
252 gobject_class->get_property = gst_splitmux_sink_get_property;
253 gobject_class->dispose = gst_splitmux_sink_dispose;
254 gobject_class->finalize = gst_splitmux_sink_finalize;
256 gst_element_class_set_static_metadata (gstelement_class,
257 "Split Muxing Bin", "Generic/Bin/Muxer",
258 "Convenience bin that muxes incoming streams into multiple time/size limited files",
259 "Jan Schmidt <jan@centricular.com>");
261 gst_element_class_add_static_pad_template (gstelement_class,
262 &video_sink_template);
263 gst_element_class_add_static_pad_template (gstelement_class,
264 &audio_sink_template);
265 gst_element_class_add_static_pad_template (gstelement_class,
266 &subtitle_sink_template);
267 gst_element_class_add_static_pad_template (gstelement_class,
268 &caption_sink_template);
270 gstelement_class->change_state =
271 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
272 gstelement_class->request_new_pad =
273 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
274 gstelement_class->release_pad =
275 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
277 gstbin_class->handle_message = bus_handler;
279 g_object_class_install_property (gobject_class, PROP_LOCATION,
280 g_param_spec_string ("location", "File Output Pattern",
281 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
282 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
283 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
284 g_param_spec_double ("mux-overhead", "Muxing Overhead",
285 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
286 DEFAULT_MUXER_OVERHEAD,
287 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
289 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
290 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
291 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
292 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
293 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
294 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
295 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
296 DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
297 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
298 g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
299 "Maximum difference in timecode between first and last frame. "
300 "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
301 "Will only be effective if a timecode track is present.",
302 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
303 g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
304 g_param_spec_boolean ("send-keyframe-requests",
305 "Request keyframes at max-size-time",
306 "Request a keyframe every max-size-time ns to try splitting at that point. "
307 "Needs max-size-bytes to be 0 in order to be effective.",
308 DEFAULT_SEND_KEYFRAME_REQUESTS,
309 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
310 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
311 g_param_spec_uint ("max-files", "Max files",
312 "Maximum number of files to keep on disk. Once the maximum is reached,"
313 "old files start to be deleted to make room for new ones.", 0,
314 G_MAXUINT, DEFAULT_MAX_FILES,
315 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
316 g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
317 g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
318 "Allow non-reference streams to be that many ns before the reference"
320 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
321 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
323 g_object_class_install_property (gobject_class, PROP_MUXER,
324 g_param_spec_object ("muxer", "Muxer",
325 "The muxer element to use (NULL = default mp4mux). "
326 "Valid only for async-finalize = FALSE",
327 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328 g_object_class_install_property (gobject_class, PROP_SINK,
329 g_param_spec_object ("sink", "Sink",
330 "The sink element (or element chain) to use (NULL = default filesink). "
331 "Valid only for async-finalize = FALSE",
332 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
334 g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
335 g_param_spec_boolean ("use-robust-muxing",
336 "Support robust-muxing mode of some muxers",
337 "Check if muxers support robust muxing via the reserved-max-duration and "
338 "reserved-duration-remaining properties and use them if so. "
339 "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
340 " create new fragments if the reserved header space is about to overflow. "
341 "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
342 "manually by the app to a non-zero value for robust muxing to have an effect.",
343 DEFAULT_USE_ROBUST_MUXING,
344 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
346 g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
347 g_param_spec_boolean ("reset-muxer",
349 "Reset the muxer after each segment. Disabling this will not work for most muxers.",
350 DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
352 g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
353 g_param_spec_boolean ("async-finalize",
354 "Finalize fragments asynchronously",
355 "Finalize each fragment asynchronously and start a new one",
356 DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
357 g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
358 g_param_spec_string ("muxer-factory", "Muxer factory",
359 "The muxer element factory to use (default = mp4mux). "
360 "Valid only for async-finalize = TRUE",
361 "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362 g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
363 g_param_spec_boxed ("muxer-properties", "Muxer properties",
364 "The muxer element properties to use. "
365 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
366 "Valid only for async-finalize = TRUE",
367 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
368 g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
369 g_param_spec_string ("sink-factory", "Sink factory",
370 "The sink element factory to use (default = filesink). "
371 "Valid only for async-finalize = TRUE",
372 "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
373 g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
374 g_param_spec_boxed ("sink-properties", "Sink properties",
375 "The sink element properties to use. "
376 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
377 "Valid only for async-finalize = TRUE",
378 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
381 * GstSplitMuxSink::format-location:
382 * @splitmux: the #GstSplitMuxSink
383 * @fragment_id: the sequence number of the file to be created
385 * Returns: the location to be used for the next output file
387 signals[SIGNAL_FORMAT_LOCATION] =
388 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
389 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
392 * GstSplitMuxSink::format-location-full:
393 * @splitmux: the #GstSplitMuxSink
394 * @fragment_id: the sequence number of the file to be created
395 * @first_sample: A #GstSample containing the first buffer
396 * from the reference stream in the new file
398 * Returns: the location to be used for the next output file
400 signals[SIGNAL_FORMAT_LOCATION_FULL] =
401 g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
402 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
406 * GstSplitMuxSink::split-now:
407 * @splitmux: the #GstSplitMuxSink
409 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
414 signals[SIGNAL_SPLIT_NOW] =
415 g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
416 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
417 split_now), NULL, NULL, NULL, G_TYPE_NONE, 0);
420 * GstSplitMuxSink::muxer-added:
421 * @splitmux: the #GstSplitMuxSink
422 * @muxer: the newly added muxer element
426 signals[SIGNAL_MUXER_ADDED] =
427 g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
428 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
431 * GstSplitMuxSink::sink-added:
432 * @splitmux: the #GstSplitMuxSink
433 * @sink: the newly added sink element
437 signals[SIGNAL_SINK_ADDED] =
438 g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
439 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
441 klass->split_now = split_now;
445 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
447 g_mutex_init (&splitmux->lock);
448 g_cond_init (&splitmux->input_cond);
449 g_cond_init (&splitmux->output_cond);
450 g_queue_init (&splitmux->out_cmd_q);
452 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
453 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
454 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
455 splitmux->max_files = DEFAULT_MAX_FILES;
456 splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
457 splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
458 splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
459 splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
460 splitmux->reset_muxer = DEFAULT_RESET_MUXER;
462 splitmux->threshold_timecode_str = NULL;
464 splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
465 splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
466 splitmux->muxer_properties = NULL;
467 splitmux->sink_factory = g_strdup (DEFAULT_SINK);
468 splitmux->sink_properties = NULL;
470 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
471 splitmux->split_now = FALSE;
475 gst_splitmux_reset (GstSplitMuxSink * splitmux)
477 if (splitmux->muxer) {
478 gst_element_set_locked_state (splitmux->muxer, TRUE);
479 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
480 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
482 if (splitmux->active_sink) {
483 gst_element_set_locked_state (splitmux->active_sink, TRUE);
484 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
485 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
488 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
492 gst_splitmux_sink_dispose (GObject * object)
494 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
496 /* Calling parent dispose invalidates all child pointers */
497 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
499 G_OBJECT_CLASS (parent_class)->dispose (object);
503 gst_splitmux_sink_finalize (GObject * object)
505 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
506 g_cond_clear (&splitmux->input_cond);
507 g_cond_clear (&splitmux->output_cond);
508 g_mutex_clear (&splitmux->lock);
509 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
510 g_queue_clear (&splitmux->out_cmd_q);
512 if (splitmux->provided_sink)
513 gst_object_unref (splitmux->provided_sink);
514 if (splitmux->provided_muxer)
515 gst_object_unref (splitmux->provided_muxer);
517 if (splitmux->muxer_factory)
518 g_free (splitmux->muxer_factory);
519 if (splitmux->muxer_properties)
520 gst_structure_free (splitmux->muxer_properties);
521 if (splitmux->sink_factory)
522 g_free (splitmux->sink_factory);
523 if (splitmux->sink_properties)
524 gst_structure_free (splitmux->sink_properties);
526 if (splitmux->threshold_timecode_str)
527 g_free (splitmux->threshold_timecode_str);
529 g_free (splitmux->location);
531 /* Make sure to free any un-released contexts. There should not be any,
532 * because the dispose will have freed all request pads though */
533 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
534 g_list_free (splitmux->contexts);
536 G_OBJECT_CLASS (parent_class)->finalize (object);
540 * Set any time threshold to the muxer, if it has
541 * reserved-max-duration and reserved-duration-remaining
542 * properties. Called when creating/claiming the muxer
543 * in create_elements() */
545 update_muxer_properties (GstSplitMuxSink * sink)
548 GstClockTime threshold_time;
550 sink->muxer_has_reserved_props = FALSE;
551 if (sink->muxer == NULL)
553 klass = G_OBJECT_GET_CLASS (sink->muxer);
554 if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
556 if (g_object_class_find_property (klass,
557 "reserved-duration-remaining") == NULL)
559 sink->muxer_has_reserved_props = TRUE;
561 GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
562 GST_TIME_ARGS (sink->threshold_time));
563 GST_OBJECT_LOCK (sink);
564 threshold_time = sink->threshold_time;
565 GST_OBJECT_UNLOCK (sink);
567 if (threshold_time > 0) {
568 /* Tell the muxer how much space to reserve */
569 GstClockTime muxer_threshold = threshold_time;
570 g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
575 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
576 const GValue * value, GParamSpec * pspec)
578 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
582 GST_OBJECT_LOCK (splitmux);
583 g_free (splitmux->location);
584 splitmux->location = g_value_dup_string (value);
585 GST_OBJECT_UNLOCK (splitmux);
588 case PROP_MAX_SIZE_BYTES:
589 GST_OBJECT_LOCK (splitmux);
590 splitmux->threshold_bytes = g_value_get_uint64 (value);
591 GST_OBJECT_UNLOCK (splitmux);
593 case PROP_MAX_SIZE_TIME:
594 GST_OBJECT_LOCK (splitmux);
595 splitmux->threshold_time = g_value_get_uint64 (value);
596 GST_OBJECT_UNLOCK (splitmux);
598 case PROP_MAX_SIZE_TIMECODE:
599 GST_OBJECT_LOCK (splitmux);
600 splitmux->threshold_timecode_str = g_value_dup_string (value);
601 GST_OBJECT_UNLOCK (splitmux);
603 case PROP_SEND_KEYFRAME_REQUESTS:
604 GST_OBJECT_LOCK (splitmux);
605 splitmux->send_keyframe_requests = g_value_get_boolean (value);
606 GST_OBJECT_UNLOCK (splitmux);
609 GST_OBJECT_LOCK (splitmux);
610 splitmux->max_files = g_value_get_uint (value);
611 GST_OBJECT_UNLOCK (splitmux);
613 case PROP_MUXER_OVERHEAD:
614 GST_OBJECT_LOCK (splitmux);
615 splitmux->mux_overhead = g_value_get_double (value);
616 GST_OBJECT_UNLOCK (splitmux);
618 case PROP_USE_ROBUST_MUXING:
619 GST_OBJECT_LOCK (splitmux);
620 splitmux->use_robust_muxing = g_value_get_boolean (value);
621 GST_OBJECT_UNLOCK (splitmux);
622 if (splitmux->use_robust_muxing)
623 update_muxer_properties (splitmux);
625 case PROP_ALIGNMENT_THRESHOLD:
626 GST_OBJECT_LOCK (splitmux);
627 splitmux->alignment_threshold = g_value_get_uint64 (value);
628 GST_OBJECT_UNLOCK (splitmux);
631 GST_OBJECT_LOCK (splitmux);
632 if (splitmux->provided_sink)
633 gst_object_unref (splitmux->provided_sink);
634 splitmux->provided_sink = g_value_get_object (value);
635 gst_object_ref_sink (splitmux->provided_sink);
636 GST_OBJECT_UNLOCK (splitmux);
639 GST_OBJECT_LOCK (splitmux);
640 if (splitmux->provided_muxer)
641 gst_object_unref (splitmux->provided_muxer);
642 splitmux->provided_muxer = g_value_get_object (value);
643 gst_object_ref_sink (splitmux->provided_muxer);
644 GST_OBJECT_UNLOCK (splitmux);
646 case PROP_RESET_MUXER:
647 GST_OBJECT_LOCK (splitmux);
648 splitmux->reset_muxer = g_value_get_boolean (value);
649 GST_OBJECT_UNLOCK (splitmux);
651 case PROP_ASYNC_FINALIZE:
652 GST_OBJECT_LOCK (splitmux);
653 splitmux->async_finalize = g_value_get_boolean (value);
654 GST_OBJECT_UNLOCK (splitmux);
656 case PROP_MUXER_FACTORY:
657 GST_OBJECT_LOCK (splitmux);
658 if (splitmux->muxer_factory)
659 g_free (splitmux->muxer_factory);
660 splitmux->muxer_factory = g_value_dup_string (value);
661 GST_OBJECT_UNLOCK (splitmux);
663 case PROP_MUXER_PROPERTIES:
664 GST_OBJECT_LOCK (splitmux);
665 if (splitmux->muxer_properties)
666 gst_structure_free (splitmux->muxer_properties);
667 if (gst_value_get_structure (value))
668 splitmux->muxer_properties =
669 gst_structure_copy (gst_value_get_structure (value));
671 splitmux->muxer_properties = NULL;
672 GST_OBJECT_UNLOCK (splitmux);
674 case PROP_SINK_FACTORY:
675 GST_OBJECT_LOCK (splitmux);
676 if (splitmux->sink_factory)
677 g_free (splitmux->sink_factory);
678 splitmux->sink_factory = g_value_dup_string (value);
679 GST_OBJECT_UNLOCK (splitmux);
681 case PROP_SINK_PROPERTIES:
682 GST_OBJECT_LOCK (splitmux);
683 if (splitmux->sink_properties)
684 gst_structure_free (splitmux->sink_properties);
685 if (gst_value_get_structure (value))
686 splitmux->sink_properties =
687 gst_structure_copy (gst_value_get_structure (value));
689 splitmux->sink_properties = NULL;
690 GST_OBJECT_UNLOCK (splitmux);
693 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
699 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
700 GValue * value, GParamSpec * pspec)
702 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
706 GST_OBJECT_LOCK (splitmux);
707 g_value_set_string (value, splitmux->location);
708 GST_OBJECT_UNLOCK (splitmux);
710 case PROP_MAX_SIZE_BYTES:
711 GST_OBJECT_LOCK (splitmux);
712 g_value_set_uint64 (value, splitmux->threshold_bytes);
713 GST_OBJECT_UNLOCK (splitmux);
715 case PROP_MAX_SIZE_TIME:
716 GST_OBJECT_LOCK (splitmux);
717 g_value_set_uint64 (value, splitmux->threshold_time);
718 GST_OBJECT_UNLOCK (splitmux);
720 case PROP_MAX_SIZE_TIMECODE:
721 GST_OBJECT_LOCK (splitmux);
722 g_value_set_string (value, splitmux->threshold_timecode_str);
723 GST_OBJECT_UNLOCK (splitmux);
725 case PROP_SEND_KEYFRAME_REQUESTS:
726 GST_OBJECT_LOCK (splitmux);
727 g_value_set_boolean (value, splitmux->send_keyframe_requests);
728 GST_OBJECT_UNLOCK (splitmux);
731 GST_OBJECT_LOCK (splitmux);
732 g_value_set_uint (value, splitmux->max_files);
733 GST_OBJECT_UNLOCK (splitmux);
735 case PROP_MUXER_OVERHEAD:
736 GST_OBJECT_LOCK (splitmux);
737 g_value_set_double (value, splitmux->mux_overhead);
738 GST_OBJECT_UNLOCK (splitmux);
740 case PROP_USE_ROBUST_MUXING:
741 GST_OBJECT_LOCK (splitmux);
742 g_value_set_boolean (value, splitmux->use_robust_muxing);
743 GST_OBJECT_UNLOCK (splitmux);
745 case PROP_ALIGNMENT_THRESHOLD:
746 GST_OBJECT_LOCK (splitmux);
747 g_value_set_uint64 (value, splitmux->alignment_threshold);
748 GST_OBJECT_UNLOCK (splitmux);
751 GST_OBJECT_LOCK (splitmux);
752 g_value_set_object (value, splitmux->provided_sink);
753 GST_OBJECT_UNLOCK (splitmux);
756 GST_OBJECT_LOCK (splitmux);
757 g_value_set_object (value, splitmux->provided_muxer);
758 GST_OBJECT_UNLOCK (splitmux);
760 case PROP_RESET_MUXER:
761 GST_OBJECT_LOCK (splitmux);
762 g_value_set_boolean (value, splitmux->reset_muxer);
763 GST_OBJECT_UNLOCK (splitmux);
765 case PROP_ASYNC_FINALIZE:
766 GST_OBJECT_LOCK (splitmux);
767 g_value_set_boolean (value, splitmux->async_finalize);
768 GST_OBJECT_UNLOCK (splitmux);
770 case PROP_MUXER_FACTORY:
771 GST_OBJECT_LOCK (splitmux);
772 g_value_set_string (value, splitmux->muxer_factory);
773 GST_OBJECT_UNLOCK (splitmux);
775 case PROP_MUXER_PROPERTIES:
776 GST_OBJECT_LOCK (splitmux);
777 gst_value_set_structure (value, splitmux->muxer_properties);
778 GST_OBJECT_UNLOCK (splitmux);
780 case PROP_SINK_FACTORY:
781 GST_OBJECT_LOCK (splitmux);
782 g_value_set_string (value, splitmux->sink_factory);
783 GST_OBJECT_UNLOCK (splitmux);
785 case PROP_SINK_PROPERTIES:
786 GST_OBJECT_LOCK (splitmux);
787 gst_value_set_structure (value, splitmux->sink_properties);
788 GST_OBJECT_UNLOCK (splitmux);
791 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
796 /* Convenience function */
797 static inline GstClockTimeDiff
798 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
800 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
802 if (GST_CLOCK_TIME_IS_VALID (val)) {
804 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
814 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
818 ctx = g_new0 (MqStreamCtx, 1);
819 ctx->splitmux = splitmux;
820 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
821 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
822 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
823 g_queue_init (&ctx->queued_bufs);
828 mq_stream_ctx_free (MqStreamCtx * ctx)
831 GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
833 g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
835 if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
836 gst_element_set_locked_state (ctx->q, TRUE);
837 gst_element_set_state (ctx->q, GST_STATE_NULL);
838 gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
839 gst_object_unref (parent);
841 gst_object_unref (ctx->q);
843 gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
844 gst_object_unref (ctx->sinkpad);
845 gst_object_unref (ctx->srcpad);
846 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
847 g_queue_clear (&ctx->queued_bufs);
852 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
855 gchar *location = NULL;
857 const gchar *msg_name = opened ?
858 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
859 GstClockTime running_time = splitmux->reference_ctx->out_running_time;
862 GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
864 running_time = *rtime;
867 g_object_get (sink, "location", &location, NULL);
869 /* If it's in the middle of a teardown, the reference_ctc might have become
871 if (splitmux->reference_ctx) {
872 msg = gst_message_new_element (GST_OBJECT (splitmux),
873 gst_structure_new (msg_name,
874 "location", G_TYPE_STRING, location,
875 "running-time", GST_TYPE_CLOCK_TIME, running_time, NULL));
876 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
883 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
889 eos = gst_event_new_eos ();
893 GST_SPLITMUX_LOCK (splitmux);
895 pad = gst_pad_get_peer (ctx->srcpad);
896 GST_SPLITMUX_UNLOCK (splitmux);
898 gst_pad_send_event (pad, eos);
899 GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
901 gst_object_unref (pad);
905 /* Called with lock held, drops the lock to send EOS to the
909 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
914 eos = gst_event_new_eos ();
915 pad = gst_pad_get_peer (ctx->srcpad);
919 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
920 GST_SPLITMUX_UNLOCK (splitmux);
921 gst_pad_send_event (pad, eos);
922 GST_SPLITMUX_LOCK (splitmux);
924 gst_object_unref (pad);
927 /* Called with lock held. Schedules an EOS event to the ctx pad
928 * to happen in another thread */
930 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
932 AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
933 GstPad *srcpad, *sinkpad;
935 srcpad = ctx->srcpad;
936 sinkpad = gst_pad_get_peer (srcpad);
939 helper->pad = sinkpad; /* Takes the reference */
941 ctx->out_eos_async_done = TRUE;
942 /* HACK: Here, we explicitly unset the SINK flag on the target sink element
943 * that's about to be asynchronously disposed, so that it no longer
944 * participates in GstBin EOS logic. This fixes a race where if
945 * splitmuxsink really reaches EOS before an asynchronous background
946 * element has finished, then the bin won't actually send EOS to the
947 * pipeline. Even after finishing and removing the old element, the
948 * bin doesn't re-check EOS status on removing a SINK element. This
949 * should be fixed in core, making this hack unnecessary. */
950 GST_OBJECT_FLAG_UNSET (splitmux->active_sink, GST_ELEMENT_FLAG_SINK);
952 GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
955 g_assert_nonnull (helper->pad);
956 gst_element_call_async (GST_ELEMENT (splitmux),
957 (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
960 /* Called with lock held. TRUE iff all contexts have a
961 * pending (or delivered) async eos event */
963 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
968 for (item = splitmux->contexts; item; item = item->next) {
969 MqStreamCtx *ctx = item->data;
970 ret &= ctx->out_eos_async_done;
975 /* Called with splitmux lock held to check if this output
976 * context needs to sleep to wait for the release of the
977 * next GOP, or to send EOS to close out the current file
980 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
982 if (ctx->caps_change)
986 /* When first starting up, the reference stream has to output
987 * the first buffer to prepare the muxer and sink */
988 gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
989 GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
991 if (!(splitmux->max_out_running_time == 0 ||
992 splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
993 splitmux->alignment_threshold == 0 ||
994 splitmux->max_out_running_time < splitmux->alignment_threshold)) {
995 my_max_out_running_time -= splitmux->alignment_threshold;
996 GST_LOG_OBJECT (ctx->srcpad,
997 "Max out running time currently %" GST_STIME_FORMAT
998 ", with threshold applied it is %" GST_STIME_FORMAT,
999 GST_STIME_ARGS (splitmux->max_out_running_time),
1000 GST_STIME_ARGS (my_max_out_running_time));
1004 || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1007 GST_LOG_OBJECT (ctx->srcpad,
1008 "Checking running time %" GST_STIME_FORMAT " against max %"
1009 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1010 GST_STIME_ARGS (my_max_out_running_time));
1013 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1014 ctx->out_running_time < my_max_out_running_time) {
1018 switch (splitmux->output_state) {
1019 case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1020 /* We only get here if we've finished outputting a GOP and need to know
1021 * what to do next */
1022 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1023 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1026 case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1027 /* We've reached the max out running_time to get here, so end this file now */
1028 if (ctx->out_eos == FALSE) {
1029 if (splitmux->async_finalize) {
1030 /* We must set EOS asynchronously at this point. We cannot defer
1031 * it, because we need all contexts to wake up, for the
1032 * reference context to eventually give us something at
1033 * START_NEXT_FILE. Otherwise, collectpads might choose another
1034 * context to give us the first buffer, and format-location-full
1035 * will not contain a valid sample. */
1036 g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1037 GINT_TO_POINTER (1));
1038 eos_context_async (ctx, splitmux);
1039 if (all_contexts_are_async_eos (splitmux)) {
1040 GST_INFO_OBJECT (splitmux,
1041 "All contexts are async_eos. Moving to the next file.");
1042 /* We can start the next file once we've asked each pad to go EOS */
1043 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1044 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1048 send_eos (splitmux, ctx);
1052 GST_INFO_OBJECT (splitmux,
1053 "At end-of-file state, but context %p is already EOS", ctx);
1056 case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1057 if (ctx->is_reference) {
1058 /* Special handling on the reference ctx to start new fragments
1059 * and collect commands from the command queue */
1060 /* drops the splitmux lock briefly: */
1061 /* We must have reference ctx in order for format-location-full to
1063 start_next_fragment (splitmux, ctx);
1067 case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1069 SplitMuxOutputCommand *cmd =
1070 g_queue_pop_tail (&splitmux->out_cmd_q);
1072 /* If we pop the last command, we need to make our queues bigger */
1073 if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1074 grow_blocked_queues (splitmux);
1076 if (cmd->start_new_fragment) {
1077 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1078 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1080 GST_DEBUG_OBJECT (splitmux,
1081 "Got new output cmd for time %" GST_STIME_FORMAT,
1082 GST_STIME_ARGS (cmd->max_output_ts));
1084 /* Extend the output range immediately */
1085 splitmux->max_out_running_time = cmd->max_output_ts;
1086 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1088 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1090 out_cmd_buf_free (cmd);
1093 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1095 } while (splitmux->output_state ==
1096 SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1097 /* loop and re-check the state */
1100 case SPLITMUX_OUTPUT_STATE_STOPPED:
1105 GST_INFO_OBJECT (ctx->srcpad,
1106 "Sleeping for running time %"
1107 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1108 GST_STIME_ARGS (ctx->out_running_time),
1109 GST_STIME_ARGS (splitmux->max_out_running_time));
1110 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1111 GST_INFO_OBJECT (ctx->srcpad,
1112 "Woken for new max running time %" GST_STIME_FORMAT,
1113 GST_STIME_ARGS (splitmux->max_out_running_time));
1119 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1120 const GstVideoTimeCode * cur_tc)
1122 GstVideoTimeCode *target_tc;
1123 GstVideoTimeCodeInterval *tc_inter;
1124 GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1126 if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
1127 return GST_CLOCK_TIME_NONE;
1130 gst_video_time_code_interval_new_from_string
1131 (splitmux->threshold_timecode_str);
1132 target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
1133 gst_video_time_code_interval_free (tc_inter);
1136 target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1137 cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1139 /* Add fragment_start_time, accounting for wraparound */
1140 if (target_tc_time >= cur_tc_time) {
1142 target_tc_time - cur_tc_time + splitmux->fragment_start_time;
1144 GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1147 day_in_ns - cur_tc_time + target_tc_time +
1148 splitmux->fragment_start_time;
1150 GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
1151 " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
1152 GST_TIME_ARGS (cur_tc_time));
1153 gst_video_time_code_free (target_tc);
1155 return next_max_tc_time;
1159 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
1162 GstClockTime target_time;
1163 gboolean timecode_based = FALSE;
1165 splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
1166 if (splitmux->threshold_timecode_str) {
1167 GstVideoTimeCodeMeta *tc_meta;
1169 if (buffer != NULL) {
1170 tc_meta = gst_buffer_get_video_time_code_meta (buffer);
1172 splitmux->next_max_tc_time =
1173 calculate_next_max_timecode (splitmux, &tc_meta->tc);
1174 timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
1177 /* This can happen in the presence of GAP events that trigger
1178 * a new fragment start */
1179 GST_WARNING_OBJECT (splitmux,
1180 "No buffer available to calculate next timecode");
1184 if (splitmux->send_keyframe_requests == FALSE
1185 || (splitmux->threshold_time == 0 && !timecode_based)
1186 || splitmux->threshold_bytes != 0)
1189 if (timecode_based) {
1190 /* We might have rounding errors: aim slightly earlier */
1191 target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
1193 target_time = splitmux->fragment_start_time + splitmux->threshold_time;
1195 ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1196 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
1197 GST_TIME_ARGS (target_time));
1198 return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1201 static GstPadProbeReturn
1202 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1204 GstSplitMuxSink *splitmux = ctx->splitmux;
1205 MqStreamBuf *buf_info = NULL;
1207 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1209 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1210 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1211 g_warning ("Buffer list handling not implemented");
1212 return GST_PAD_PROBE_DROP;
1214 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1215 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1216 GstEvent *event = gst_pad_probe_info_get_event (info);
1217 gboolean locked = FALSE;
1219 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1221 switch (GST_EVENT_TYPE (event)) {
1222 case GST_EVENT_SEGMENT:
1223 gst_event_copy_segment (event, &ctx->out_segment);
1225 case GST_EVENT_FLUSH_STOP:
1226 GST_SPLITMUX_LOCK (splitmux);
1228 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1229 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1230 g_queue_clear (&ctx->queued_bufs);
1231 ctx->flushing = FALSE;
1233 case GST_EVENT_FLUSH_START:
1234 GST_SPLITMUX_LOCK (splitmux);
1236 GST_LOG_OBJECT (pad, "Flush start");
1237 ctx->flushing = TRUE;
1238 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1241 GST_SPLITMUX_LOCK (splitmux);
1243 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1245 ctx->out_eos = TRUE;
1246 GST_INFO_OBJECT (splitmux,
1247 "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1249 case GST_EVENT_GAP:{
1250 GstClockTime gap_ts;
1251 GstClockTimeDiff rtime;
1253 gst_event_parse_gap (event, &gap_ts, NULL);
1254 if (gap_ts == GST_CLOCK_TIME_NONE)
1257 GST_SPLITMUX_LOCK (splitmux);
1260 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1263 /* When we get a gap event on the
1264 * reference stream and we're trying to open a
1265 * new file, we need to store it until we get
1266 * the buffer afterwards
1268 if (ctx->is_reference &&
1269 (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1270 GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1271 gst_event_replace (&ctx->pending_gap, event);
1272 GST_SPLITMUX_UNLOCK (splitmux);
1273 return GST_PAD_PROBE_HANDLED;
1276 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1278 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1279 GST_STIME_ARGS (rtime));
1281 if (rtime != GST_CLOCK_STIME_NONE) {
1282 ctx->out_running_time = rtime;
1283 complete_or_wait_on_out (splitmux, ctx);
1287 case GST_EVENT_CUSTOM_DOWNSTREAM:{
1288 const GstStructure *s;
1289 GstClockTimeDiff ts = 0;
1291 s = gst_event_get_structure (event);
1292 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1295 gst_structure_get_int64 (s, "timestamp", &ts);
1297 GST_SPLITMUX_LOCK (splitmux);
1300 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1302 ctx->out_running_time = ts;
1303 if (!ctx->is_reference)
1304 complete_or_wait_on_out (splitmux, ctx);
1305 GST_SPLITMUX_UNLOCK (splitmux);
1306 return GST_PAD_PROBE_DROP;
1308 case GST_EVENT_CAPS:{
1311 if (!ctx->is_reference)
1314 peer = gst_pad_get_peer (pad);
1316 gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1318 gst_object_unref (peer);
1326 /* This is in the case the muxer doesn't allow this change of caps */
1327 GST_SPLITMUX_LOCK (splitmux);
1329 ctx->caps_change = TRUE;
1331 if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1332 GST_DEBUG_OBJECT (splitmux,
1333 "New caps were not accepted. Switching output file");
1334 if (ctx->out_eos == FALSE) {
1335 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1336 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1340 /* Lets it fall through, if it fails again, then the muxer just can't
1341 * support this format, but at least we have a closed file.
1349 /* We need to make sure events aren't passed
1350 * until the muxer / sink are ready for it */
1352 GST_SPLITMUX_LOCK (splitmux);
1353 if (!ctx->is_reference)
1354 complete_or_wait_on_out (splitmux, ctx);
1355 GST_SPLITMUX_UNLOCK (splitmux);
1357 /* Don't try to forward sticky events before the next buffer is there
1358 * because it would cause a new file to be created without the first
1359 * buffer being available.
1361 if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1362 gst_event_unref (event);
1363 return GST_PAD_PROBE_HANDLED;
1365 return GST_PAD_PROBE_PASS;
1368 /* Allow everything through until the configured next stopping point */
1369 GST_SPLITMUX_LOCK (splitmux);
1371 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1372 if (buf_info == NULL)
1373 /* Can only happen due to a poorly timed flush */
1376 /* If we have popped a keyframe, decrement the queued_gop count */
1377 if (buf_info->keyframe && splitmux->queued_keyframes > 0)
1378 splitmux->queued_keyframes--;
1380 ctx->out_running_time = buf_info->run_ts;
1381 ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1383 GST_LOG_OBJECT (splitmux,
1384 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1385 " size %" G_GUINT64_FORMAT,
1386 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1388 ctx->caps_change = FALSE;
1390 complete_or_wait_on_out (splitmux, ctx);
1392 splitmux->muxed_out_bytes += buf_info->buf_size;
1394 #ifndef GST_DISABLE_GST_DEBUG
1396 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1397 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1398 " run ts %" GST_STIME_FORMAT, buf,
1399 GST_STIME_ARGS (ctx->out_running_time));
1403 ctx->cur_out_buffer = NULL;
1404 GST_SPLITMUX_UNLOCK (splitmux);
1406 /* pending_gap is protected by the STREAM lock */
1407 if (ctx->pending_gap) {
1408 /* If we previously stored a gap event, send it now */
1409 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1411 GST_DEBUG_OBJECT (splitmux,
1412 "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1414 gst_pad_send_event (peer, ctx->pending_gap);
1415 ctx->pending_gap = NULL;
1417 gst_object_unref (peer);
1420 mq_stream_buf_free (buf_info);
1422 return GST_PAD_PROBE_PASS;
1425 GST_SPLITMUX_UNLOCK (splitmux);
1426 return GST_PAD_PROBE_DROP;
1430 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1432 return gst_pad_send_event (peer, gst_event_ref (*event));
1436 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1438 if (ctx->fragment_block_id > 0) {
1439 gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1440 ctx->fragment_block_id = 0;
1445 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1447 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1449 gst_pad_sticky_events_foreach (ctx->srcpad,
1450 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1452 /* Clear EOS flag if not actually EOS */
1453 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1454 ctx->out_eos_async_done = ctx->out_eos;
1456 gst_object_unref (peer);
1460 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1462 GstPad *sinkpad, *srcpad, *newpad;
1463 GstPadTemplate *templ;
1465 srcpad = ctx->srcpad;
1466 sinkpad = gst_pad_get_peer (srcpad);
1468 templ = sinkpad->padtemplate;
1470 gst_element_request_pad (splitmux->muxer, templ,
1471 GST_PAD_TEMPLATE_NAME_TEMPLATE (templ), NULL);
1473 GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1475 if (!gst_pad_unlink (srcpad, sinkpad)) {
1476 gst_object_unref (sinkpad);
1479 if (gst_pad_link_full (srcpad, newpad,
1480 GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1481 gst_element_release_request_pad (splitmux->muxer, newpad);
1482 gst_object_unref (sinkpad);
1483 gst_object_unref (newpad);
1486 gst_object_unref (newpad);
1487 gst_object_unref (sinkpad);
1491 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1492 ("Could not create the new muxer/sink"), NULL);
1495 static GstPadProbeReturn
1496 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1498 return GST_PAD_PROBE_OK;
1502 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1504 ctx->fragment_block_id =
1505 gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1510 _set_property_from_structure (GQuark field_id, const GValue * value,
1513 const gchar *property_name = g_quark_to_string (field_id);
1514 GObject *element = G_OBJECT (user_data);
1516 g_object_set_property (element, property_name, value);
1522 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1524 gst_element_set_locked_state (element, TRUE);
1525 gst_element_set_state (element, GST_STATE_NULL);
1526 GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1527 gst_bin_remove (GST_BIN (splitmux), element);
1532 _send_event (const GValue * value, gpointer user_data)
1534 GstPad *pad = g_value_get_object (value);
1535 GstEvent *ev = user_data;
1537 gst_pad_send_event (pad, gst_event_ref (ev));
1540 /* Called with lock held when a fragment
1541 * reaches EOS and it is time to restart
1545 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1547 GstElement *muxer, *sink;
1549 g_assert (ctx->is_reference);
1551 /* 1 change to new file */
1552 splitmux->switching_fragment = TRUE;
1554 /* We need to drop the splitmux lock to acquire the state lock
1555 * here and ensure there's no racy state change going on elsewhere */
1556 muxer = gst_object_ref (splitmux->muxer);
1557 sink = gst_object_ref (splitmux->active_sink);
1559 GST_SPLITMUX_UNLOCK (splitmux);
1560 GST_STATE_LOCK (splitmux);
1562 if (splitmux->async_finalize) {
1563 if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id != 0) {
1565 GstElement *new_sink, *new_muxer;
1567 GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
1568 splitmux->fragment_id);
1569 g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
1570 newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
1571 GST_SPLITMUX_LOCK (splitmux);
1572 if ((splitmux->sink =
1573 create_element (splitmux, splitmux->sink_factory, newname,
1576 if (splitmux->sink_properties)
1577 gst_structure_foreach (splitmux->sink_properties,
1578 _set_property_from_structure, splitmux->sink);
1579 splitmux->active_sink = splitmux->sink;
1580 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
1582 newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
1583 if ((splitmux->muxer =
1584 create_element (splitmux, splitmux->muxer_factory, newname,
1587 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
1589 /* async child elements are causing state change races and weird
1590 * failures, so let's try and turn that off */
1591 g_object_set (splitmux->sink, "async", FALSE, NULL);
1593 if (splitmux->muxer_properties)
1594 gst_structure_foreach (splitmux->muxer_properties,
1595 _set_property_from_structure, splitmux->muxer);
1596 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
1598 new_sink = splitmux->sink;
1599 new_muxer = splitmux->muxer;
1600 GST_SPLITMUX_UNLOCK (splitmux);
1601 g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
1602 gst_element_link (new_muxer, new_sink);
1604 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1605 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1606 EOS_FROM_US)) == 2) {
1607 _lock_and_set_to_null (muxer, splitmux);
1608 _lock_and_set_to_null (sink, splitmux);
1610 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1611 GINT_TO_POINTER (2));
1614 gst_object_unref (muxer);
1615 gst_object_unref (sink);
1618 gst_object_ref (muxer);
1619 gst_object_ref (sink);
1623 gst_element_set_locked_state (muxer, TRUE);
1624 gst_element_set_locked_state (sink, TRUE);
1625 gst_element_set_state (sink, GST_STATE_NULL);
1627 if (splitmux->reset_muxer) {
1628 gst_element_set_state (muxer, GST_STATE_NULL);
1630 GstIterator *it = gst_element_iterate_sink_pads (muxer);
1633 ev = gst_event_new_flush_start ();
1634 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1635 gst_event_unref (ev);
1637 gst_iterator_resync (it);
1639 ev = gst_event_new_flush_stop (TRUE);
1640 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1641 gst_event_unref (ev);
1643 gst_iterator_free (it);
1647 GST_SPLITMUX_LOCK (splitmux);
1648 if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0)
1649 set_next_filename (splitmux, ctx);
1650 splitmux->muxed_out_bytes = 0;
1651 GST_SPLITMUX_UNLOCK (splitmux);
1653 gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
1654 gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
1655 gst_element_set_locked_state (muxer, FALSE);
1656 gst_element_set_locked_state (sink, FALSE);
1658 gst_object_unref (sink);
1659 gst_object_unref (muxer);
1661 GST_SPLITMUX_LOCK (splitmux);
1662 GST_STATE_UNLOCK (splitmux);
1663 splitmux->switching_fragment = FALSE;
1664 do_async_done (splitmux);
1666 splitmux->ready_for_output = TRUE;
1668 g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
1669 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
1671 send_fragment_opened_closed_msg (splitmux, TRUE, sink);
1673 /* FIXME: Is this always the correct next state? */
1674 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1675 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1679 GST_STATE_UNLOCK (splitmux);
1680 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1681 ("Could not create the new muxer/sink"), NULL);
1685 bus_handler (GstBin * bin, GstMessage * message)
1687 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
1689 switch (GST_MESSAGE_TYPE (message)) {
1690 case GST_MESSAGE_EOS:{
1691 /* If the state is draining out the current file, drop this EOS */
1694 sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
1695 GST_SPLITMUX_LOCK (splitmux);
1697 send_fragment_opened_closed_msg (splitmux, FALSE, sink);
1699 if (splitmux->async_finalize) {
1701 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1702 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1703 EOS_FROM_US)) == 2) {
1705 GstPad *sinksink, *muxersrc;
1707 sinksink = gst_element_get_static_pad (sink, "sink");
1708 muxersrc = gst_pad_get_peer (sinksink);
1709 muxer = gst_pad_get_parent_element (muxersrc);
1710 gst_object_unref (sinksink);
1711 gst_object_unref (muxersrc);
1713 gst_element_call_async (muxer,
1714 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1715 gst_object_ref (splitmux), gst_object_unref);
1716 gst_element_call_async (sink,
1717 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1718 gst_object_ref (splitmux), gst_object_unref);
1719 gst_object_unref (muxer);
1721 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1722 GINT_TO_POINTER (2));
1724 GST_DEBUG_OBJECT (splitmux,
1725 "Caught async EOS from previous muxer+sink. Dropping.");
1726 /* We forward the EOS so that it gets aggregated as normal. If the sink
1727 * finishes and is removed before the end, it will be de-aggregated */
1728 gst_message_unref (message);
1729 GST_SPLITMUX_UNLOCK (splitmux);
1732 } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
1733 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
1734 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1735 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1737 gst_message_unref (message);
1738 GST_SPLITMUX_UNLOCK (splitmux);
1741 GST_DEBUG_OBJECT (splitmux,
1742 "Passing EOS message. Output state %d max_out_running_time %"
1743 GST_STIME_FORMAT, splitmux->output_state,
1744 GST_STIME_ARGS (splitmux->max_out_running_time));
1746 GST_SPLITMUX_UNLOCK (splitmux);
1749 case GST_MESSAGE_ASYNC_START:
1750 case GST_MESSAGE_ASYNC_DONE:
1751 /* Ignore state changes from our children while switching */
1752 GST_SPLITMUX_LOCK (splitmux);
1753 if (splitmux->switching_fragment) {
1754 if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
1755 || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
1756 GST_LOG_OBJECT (splitmux,
1757 "Ignoring state change from child %" GST_PTR_FORMAT
1758 " while switching", GST_MESSAGE_SRC (message));
1759 gst_message_unref (message);
1760 GST_SPLITMUX_UNLOCK (splitmux);
1764 GST_SPLITMUX_UNLOCK (splitmux);
1766 case GST_MESSAGE_WARNING:
1768 GError *gerror = NULL;
1770 gst_message_parse_warning (message, &gerror, NULL);
1772 if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
1774 gboolean caps_change = FALSE;
1776 GST_SPLITMUX_LOCK (splitmux);
1778 for (item = splitmux->contexts; item; item = item->next) {
1779 MqStreamCtx *ctx = item->data;
1781 if (ctx->caps_change) {
1787 GST_SPLITMUX_UNLOCK (splitmux);
1790 GST_LOG_OBJECT (splitmux,
1791 "Ignoring warning change from child %" GST_PTR_FORMAT
1792 " while switching caps", GST_MESSAGE_SRC (message));
1793 gst_message_unref (message);
1803 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1807 ctx_set_unblock (MqStreamCtx * ctx)
1809 ctx->need_unblock = TRUE;
1813 need_new_fragment (GstSplitMuxSink * splitmux,
1814 GstClockTime queued_time, GstClockTime queued_gop_time,
1815 guint64 queued_bytes)
1817 guint64 thresh_bytes;
1818 GstClockTime thresh_time;
1819 gboolean check_robust_muxing;
1821 GST_OBJECT_LOCK (splitmux);
1822 thresh_bytes = splitmux->threshold_bytes;
1823 thresh_time = splitmux->threshold_time;
1824 check_robust_muxing = splitmux->use_robust_muxing
1825 && splitmux->muxer_has_reserved_props;
1826 GST_OBJECT_UNLOCK (splitmux);
1828 /* Have we muxed anything into the new file at all? */
1829 if (splitmux->fragment_total_bytes <= 0)
1832 /* User told us to split now */
1833 if (g_atomic_int_get (&(splitmux->split_now)) == TRUE)
1836 if (thresh_bytes > 0 && queued_bytes > thresh_bytes)
1837 return TRUE; /* Would overrun byte limit */
1839 if (thresh_time > 0 && queued_time > thresh_time)
1840 return TRUE; /* Would overrun byte limit */
1842 /* Timecode-based threshold accounts for possible rounding errors:
1843 * 5us should be bigger than all possible rounding errors but nowhere near
1844 * big enough to skip to another frame */
1845 if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
1846 splitmux->reference_ctx->in_running_time >
1847 splitmux->next_max_tc_time + 5 * GST_USECOND)
1848 return TRUE; /* Timecode threshold */
1850 if (check_robust_muxing) {
1851 GstClockTime mux_reserved_remain;
1853 g_object_get (splitmux->muxer,
1854 "reserved-duration-remaining", &mux_reserved_remain, NULL);
1856 GST_LOG_OBJECT (splitmux,
1857 "Muxer robust muxing report - %" G_GUINT64_FORMAT
1858 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
1859 mux_reserved_remain, queued_gop_time);
1861 if (queued_gop_time >= mux_reserved_remain) {
1862 GST_INFO_OBJECT (splitmux,
1863 "File is about to run out of header room - %" G_GUINT64_FORMAT
1864 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
1865 ". Switching to new file", mux_reserved_remain, queued_gop_time);
1870 /* Continue and mux this GOP */
1874 /* Called with splitmux lock held */
1875 /* Called when entering ProcessingCompleteGop state
1876 * Assess if mq contents overflowed the current file
1877 * -> If yes, need to switch to new file
1878 * -> if no, set max_out_running_time to let this GOP in and
1879 * go to COLLECTING_GOP_START state
1882 handle_gathered_gop (GstSplitMuxSink * splitmux)
1884 guint64 queued_bytes;
1885 GstClockTimeDiff queued_time = 0;
1886 GstClockTimeDiff queued_gop_time = 0;
1887 GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
1888 SplitMuxOutputCommand *cmd;
1890 /* Assess if the multiqueue contents overflowed the current file */
1891 /* When considering if a newly gathered GOP overflows
1892 * the time limit for the file, only consider the running time of the
1893 * reference stream. Other streams might have run ahead a little bit,
1894 * but extra pieces won't be released to the muxer beyond the reference
1895 * stream cut-off anyway - so it forms the limit. */
1896 queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
1897 queued_time = splitmux->reference_ctx->in_running_time;
1898 /* queued_gop_time tracks how much unwritten data there is waiting to
1899 * be written to this fragment including this GOP */
1900 if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
1902 splitmux->reference_ctx->in_running_time -
1903 splitmux->reference_ctx->out_running_time;
1906 splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
1908 GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
1910 g_assert (queued_gop_time >= 0);
1911 g_assert (queued_time >= splitmux->fragment_start_time);
1913 queued_time -= splitmux->fragment_start_time;
1914 if (queued_time < queued_gop_time)
1915 queued_gop_time = queued_time;
1917 /* Expand queued bytes estimate by muxer overhead */
1918 queued_bytes += (queued_bytes * splitmux->mux_overhead);
1920 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
1921 " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
1922 if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
1923 GST_LOG_OBJECT (splitmux,
1924 "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
1925 GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
1926 GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
1929 /* Check for overrun - have we output at least one byte and overrun
1930 * either threshold? */
1931 if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
1932 GstClockTime *sink_running_time = g_new (GstClockTime, 1);
1933 *sink_running_time = splitmux->reference_ctx->out_running_time;
1934 g_object_set_qdata_full (G_OBJECT (splitmux->sink),
1935 RUNNING_TIME, sink_running_time, g_free);
1936 g_atomic_int_set (&(splitmux->split_now), FALSE);
1937 /* Tell the output side to start a new fragment */
1938 GST_INFO_OBJECT (splitmux,
1939 "This GOP (dur %" GST_STIME_FORMAT
1940 ") would overflow the fragment, Sending start_new_fragment cmd",
1941 GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
1942 splitmux->gop_start_time));
1943 cmd = out_cmd_buf_new ();
1944 cmd->start_new_fragment = TRUE;
1945 g_queue_push_head (&splitmux->out_cmd_q, cmd);
1946 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1948 new_out_ts = splitmux->reference_ctx->in_running_time;
1949 splitmux->fragment_start_time = splitmux->gop_start_time;
1950 splitmux->fragment_total_bytes = 0;
1952 if (request_next_keyframe (splitmux,
1953 splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
1954 GST_WARNING_OBJECT (splitmux,
1955 "Could not request a keyframe. Files may not split at the exact location they should");
1957 gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
1960 /* And set up to collect the next GOP */
1961 if (!splitmux->reference_ctx->in_eos) {
1962 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
1963 splitmux->gop_start_time = new_out_ts;
1965 /* This is probably already the current state, but just in case: */
1966 splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
1967 new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */
1970 /* And wake all input contexts to send a wake-up event */
1971 g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
1972 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1974 /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
1975 splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
1977 if (splitmux->gop_total_bytes > 0) {
1978 GST_LOG_OBJECT (splitmux,
1979 "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
1980 " time %" GST_STIME_FORMAT,
1981 splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
1983 /* Send this GOP to the output command queue */
1984 cmd = out_cmd_buf_new ();
1985 cmd->start_new_fragment = FALSE;
1986 cmd->max_output_ts = new_out_ts;
1987 GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
1988 GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
1989 g_queue_push_head (&splitmux->out_cmd_q, cmd);
1991 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1994 splitmux->gop_total_bytes = 0;
1997 /* Called with splitmux lock held */
1998 /* Called from each input pad when it is has all the pieces
1999 * for a GOP or EOS, starting with the reference pad which has set the
2000 * splitmux->max_in_running_time
2003 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2008 /* On ENDING_FILE, the reference stream sends a command to start a new
2009 * fragment, then releases the GOP for output in the new fragment.
2010 * If somes streams received no buffer during the last GOP that overran,
2011 * because its next buffer has a timestamp bigger than
2012 * ctx->max_in_running_time, its queue is empty. In that case the only
2013 * way to wakeup the output thread is by injecting an event in the
2014 * queue. This usually happen with subtitle streams.
2015 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2016 if (ctx->need_unblock) {
2017 GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2018 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2019 GST_EVENT_TYPE_SERIALIZED,
2020 gst_structure_new ("splitmuxsink-unblock", "timestamp",
2021 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2023 GST_SPLITMUX_UNLOCK (splitmux);
2024 gst_pad_send_event (ctx->sinkpad, event);
2025 GST_SPLITMUX_LOCK (splitmux);
2027 ctx->need_unblock = FALSE;
2028 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2029 /* state may have changed while we were unlocked. Loop again if so */
2030 if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2034 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2035 gboolean ready = TRUE;
2037 /* Iterate each pad, and check that the input running time is at least
2038 * up to the reference running time, and if so handle the collected GOP */
2039 GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
2040 GST_STIME_FORMAT " ctx %p",
2041 GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
2042 for (cur = g_list_first (splitmux->contexts); cur != NULL;
2043 cur = g_list_next (cur)) {
2044 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2046 GST_LOG_OBJECT (splitmux,
2047 "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
2048 " EOS %d", tmpctx, tmpctx->srcpad,
2049 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2051 if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
2052 tmpctx->in_running_time < splitmux->max_in_running_time &&
2054 GST_LOG_OBJECT (splitmux,
2055 "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
2056 tmpctx, tmpctx->srcpad);
2062 GST_DEBUG_OBJECT (splitmux,
2063 "Collected GOP is complete. Processing (ctx %p)", ctx);
2064 /* All pads have a complete GOP, release it into the multiqueue */
2065 handle_gathered_gop (splitmux);
2069 /* If upstream reached EOS we are not expecting more data, no need to wait
2074 /* Some pad is not yet ready, or GOP is being pushed
2075 * either way, sleep and wait to get woken */
2076 while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2078 (ctx->in_running_time >= splitmux->max_in_running_time) &&
2079 (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
2081 GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2082 GST_SPLITMUX_WAIT_INPUT (splitmux);
2083 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2087 static GstPadProbeReturn
2088 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2090 GstSplitMuxSink *splitmux = ctx->splitmux;
2092 MqStreamBuf *buf_info = NULL;
2094 gboolean loop_again;
2095 gboolean keyframe = FALSE;
2097 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2099 /* FIXME: Handle buffer lists, until then make it clear they won't work */
2100 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2101 g_warning ("Buffer list handling not implemented");
2102 return GST_PAD_PROBE_DROP;
2104 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2105 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2106 GstEvent *event = gst_pad_probe_info_get_event (info);
2108 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2110 switch (GST_EVENT_TYPE (event)) {
2111 case GST_EVENT_SEGMENT:
2112 gst_event_copy_segment (event, &ctx->in_segment);
2114 case GST_EVENT_FLUSH_STOP:
2115 GST_SPLITMUX_LOCK (splitmux);
2116 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2117 ctx->in_eos = FALSE;
2118 ctx->in_running_time = GST_CLOCK_STIME_NONE;
2119 GST_SPLITMUX_UNLOCK (splitmux);
2122 GST_SPLITMUX_LOCK (splitmux);
2125 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2128 if (ctx->is_reference) {
2129 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2130 /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2131 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2132 /* Wake up other input pads to collect this GOP */
2133 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2134 check_completed_gop (splitmux, ctx);
2135 } else if (splitmux->input_state ==
2136 SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2137 /* If we are waiting for a GOP to be completed (ie, for aux
2138 * pads to catch up), then this pad is complete, so check
2139 * if the whole GOP is.
2141 check_completed_gop (splitmux, ctx);
2143 GST_SPLITMUX_UNLOCK (splitmux);
2145 case GST_EVENT_GAP:{
2146 GstClockTime gap_ts;
2147 GstClockTimeDiff rtime;
2149 gst_event_parse_gap (event, &gap_ts, NULL);
2150 if (gap_ts == GST_CLOCK_TIME_NONE)
2153 GST_SPLITMUX_LOCK (splitmux);
2155 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2157 rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2159 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2160 GST_STIME_ARGS (rtime));
2162 if (ctx->is_reference
2163 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2164 splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
2165 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2166 GST_STIME_ARGS (splitmux->fragment_start_time));
2167 /* Also take this as the first start time when starting up,
2168 * so that we start counting overflow from the first frame */
2169 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2170 splitmux->max_in_running_time = splitmux->fragment_start_time;
2173 GST_SPLITMUX_UNLOCK (splitmux);
2179 return GST_PAD_PROBE_PASS;
2180 } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2181 switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2182 case GST_QUERY_ALLOCATION:
2183 return GST_PAD_PROBE_DROP;
2185 return GST_PAD_PROBE_PASS;
2189 buf = gst_pad_probe_info_get_buffer (info);
2190 buf_info = mq_stream_buf_new ();
2192 if (GST_BUFFER_PTS_IS_VALID (buf))
2193 ts = GST_BUFFER_PTS (buf);
2195 ts = GST_BUFFER_DTS (buf);
2197 GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2199 GST_SPLITMUX_LOCK (splitmux);
2201 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2204 /* If this buffer has a timestamp, advance the input timestamp of the
2206 if (GST_CLOCK_TIME_IS_VALID (ts)) {
2207 GstClockTimeDiff running_time =
2208 my_segment_to_running_time (&ctx->in_segment, ts);
2210 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2211 GST_STIME_ARGS (running_time));
2213 if (GST_CLOCK_STIME_IS_VALID (running_time)
2214 && running_time > ctx->in_running_time)
2215 ctx->in_running_time = running_time;
2218 /* Try to make sure we have a valid running time */
2219 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2220 ctx->in_running_time =
2221 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2224 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2225 GST_STIME_ARGS (ctx->in_running_time));
2227 buf_info->run_ts = ctx->in_running_time;
2228 buf_info->buf_size = gst_buffer_get_size (buf);
2229 buf_info->duration = GST_BUFFER_DURATION (buf);
2231 /* initialize fragment_start_time */
2232 if (ctx->is_reference
2233 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2234 splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
2235 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2236 GST_STIME_ARGS (splitmux->fragment_start_time));
2237 gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2239 /* Also take this as the first start time when starting up,
2240 * so that we start counting overflow from the first frame */
2241 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2242 splitmux->max_in_running_time = splitmux->fragment_start_time;
2243 if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
2244 GST_WARNING_OBJECT (splitmux,
2245 "Could not request a keyframe. Files may not split at the exact location they should");
2247 gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2250 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
2251 " total GOP bytes %" G_GUINT64_FORMAT,
2252 GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
2259 switch (splitmux->input_state) {
2260 case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
2261 if (ctx->is_reference) {
2262 /* This is the reference context. If it's a keyframe,
2263 * it marks the start of a new GOP and we should wait in
2264 * check_completed_gop before continuing, but either way
2265 * (keyframe or no, we'll pass this buffer through after
2266 * so set loop_again to FALSE */
2269 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
2270 /* Allow other input pads to catch up to here too */
2271 splitmux->max_in_running_time = ctx->in_running_time;
2272 GST_LOG_OBJECT (splitmux,
2273 "Max in running time now %" GST_TIME_FORMAT,
2274 GST_TIME_ARGS (splitmux->max_in_running_time));
2275 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2278 GST_INFO_OBJECT (pad,
2279 "Have keyframe with running time %" GST_STIME_FORMAT,
2280 GST_STIME_ARGS (ctx->in_running_time));
2282 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2283 splitmux->max_in_running_time = ctx->in_running_time;
2284 GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
2285 GST_TIME_ARGS (splitmux->max_in_running_time));
2286 /* Wake up other input pads to collect this GOP */
2287 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2288 check_completed_gop (splitmux, ctx);
2289 /* Store this new keyframe to remember the start of GOP */
2290 gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2292 /* Pass this buffer if the reference ctx is far enough ahead */
2293 if (ctx->in_running_time < splitmux->max_in_running_time) {
2298 /* We're still waiting for a keyframe on the reference pad, sleep */
2299 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
2300 GST_SPLITMUX_WAIT_INPUT (splitmux);
2301 GST_LOG_OBJECT (pad,
2302 "Done sleeping for GOP start input state now %d",
2303 splitmux->input_state);
2306 case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
2307 /* We're collecting a GOP. If this is the reference context,
2308 * we need to check if this is a keyframe that marks the start
2309 * of the next GOP. If it is, it marks the end of the GOP we're
2310 * collecting, so sleep and wait until all the other pads also
2311 * reach that timestamp - at which point, we have an entire GOP
2312 * and either go to ENDING_FILE or release this GOP to the muxer and
2313 * go back to COLLECT_GOP_START. */
2315 /* If we overran the target timestamp, it might be time to process
2316 * the GOP, otherwise bail out for more data
2318 GST_LOG_OBJECT (pad,
2319 "Checking TS %" GST_STIME_FORMAT " against max %"
2320 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
2321 GST_STIME_ARGS (splitmux->max_in_running_time));
2323 if (ctx->in_running_time < splitmux->max_in_running_time) {
2328 GST_LOG_OBJECT (pad,
2329 "Collected last packet of GOP. Checking other pads");
2330 check_completed_gop (splitmux, ctx);
2333 case SPLITMUX_INPUT_STATE_FINISHING_UP:
2344 splitmux->queued_keyframes++;
2345 buf_info->keyframe = TRUE;
2348 /* Update total input byte counter for overflow detect */
2349 splitmux->gop_total_bytes += buf_info->buf_size;
2351 /* Now add this buffer to the queue just before returning */
2352 g_queue_push_head (&ctx->queued_bufs, buf_info);
2354 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
2355 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
2357 GST_SPLITMUX_UNLOCK (splitmux);
2358 return GST_PAD_PROBE_PASS;
2361 GST_SPLITMUX_UNLOCK (splitmux);
2363 mq_stream_buf_free (buf_info);
2364 return GST_PAD_PROBE_PASS;
2368 grow_blocked_queues (GstSplitMuxSink * splitmux)
2372 /* Scan other queues for full-ness and grow them */
2373 for (cur = g_list_first (splitmux->contexts);
2374 cur != NULL; cur = g_list_next (cur)) {
2375 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2377 guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
2379 g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
2380 GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
2382 if (cur_len >= cur_limit) {
2383 cur_limit = cur_len + 1;
2384 GST_DEBUG_OBJECT (tmpctx->q,
2385 "Queue overflowed and needs enlarging. Growing to %u buffers",
2387 g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
2393 handle_q_underrun (GstElement * q, gpointer user_data)
2395 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2396 GstSplitMuxSink *splitmux = ctx->splitmux;
2398 GST_SPLITMUX_LOCK (splitmux);
2399 GST_DEBUG_OBJECT (q,
2400 "Queue reported underrun with %d keyframes and %d cmds enqueued",
2401 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2402 grow_blocked_queues (splitmux);
2403 GST_SPLITMUX_UNLOCK (splitmux);
2407 handle_q_overrun (GstElement * q, gpointer user_data)
2409 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2410 GstSplitMuxSink *splitmux = ctx->splitmux;
2411 gboolean allow_grow = FALSE;
2413 GST_SPLITMUX_LOCK (splitmux);
2414 GST_DEBUG_OBJECT (q,
2415 "Queue reported overrun with %d keyframes and %d cmds enqueued",
2416 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2418 if (splitmux->queued_keyframes < 2) {
2419 /* Less than a full GOP queued, grow the queue */
2421 } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
2424 /* If another queue is starved, grow */
2426 for (cur = g_list_first (splitmux->contexts);
2427 cur != NULL; cur = g_list_next (cur)) {
2428 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2429 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
2434 GST_SPLITMUX_UNLOCK (splitmux);
2439 g_object_get (q, "max-size-buffers", &cur_limit, NULL);
2442 GST_DEBUG_OBJECT (q,
2443 "Queue overflowed and needs enlarging. Growing to %u buffers",
2446 g_object_set (q, "max-size-buffers", cur_limit, NULL);
2451 gst_splitmux_sink_request_new_pad (GstElement * element,
2452 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
2454 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2455 GstPadTemplate *mux_template = NULL;
2458 GstPad *q_sink = NULL, *q_src = NULL;
2460 gboolean is_video = FALSE;
2463 GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
2465 GST_SPLITMUX_LOCK (splitmux);
2466 if (!create_muxer (splitmux))
2468 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2470 if (templ->name_template) {
2471 if (g_str_equal (templ->name_template, "video")) {
2472 if (splitmux->have_video)
2473 goto already_have_video;
2475 /* FIXME: Look for a pad template with matching caps, rather than by name */
2477 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2478 (splitmux->muxer), "video_%u");
2480 /* Fallback to find sink pad templates named 'video' (flvmux) */
2481 if (!mux_template) {
2483 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2484 (splitmux->muxer), "video");
2490 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2491 (splitmux->muxer), templ->name_template);
2493 /* Fallback to find sink pad templates named 'audio' (flvmux) */
2494 if (!mux_template) {
2496 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2497 (splitmux->muxer), "audio");
2501 if (mux_template == NULL) {
2502 /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
2504 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2505 (splitmux->muxer), "sink_%d");
2510 res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
2515 gname = g_strdup ("video");
2516 else if (name == NULL)
2517 gname = gst_pad_get_name (res);
2519 gname = g_strdup (name);
2521 if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
2524 gst_element_set_state (q, GST_STATE_TARGET (splitmux));
2526 g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
2527 "max-size-buffers", 5, NULL);
2529 q_sink = gst_element_get_static_pad (q, "sink");
2530 q_src = gst_element_get_static_pad (q, "src");
2532 if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
2533 gst_element_release_request_pad (splitmux->muxer, res);
2534 gst_object_unref (GST_OBJECT (res));
2538 gst_object_unref (GST_OBJECT (res));
2540 ctx = mq_stream_ctx_new (splitmux);
2541 /* Context holds a ref: */
2542 ctx->q = gst_object_ref (q);
2543 ctx->srcpad = q_src;
2544 ctx->sinkpad = q_sink;
2546 g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
2547 g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
2549 ctx->src_pad_block_id =
2550 gst_pad_add_probe (q_src,
2551 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
2552 (GstPadProbeCallback) handle_mq_output, ctx, NULL);
2553 if (is_video && splitmux->reference_ctx != NULL) {
2554 splitmux->reference_ctx->is_reference = FALSE;
2555 splitmux->reference_ctx = NULL;
2557 if (splitmux->reference_ctx == NULL) {
2558 splitmux->reference_ctx = ctx;
2559 ctx->is_reference = TRUE;
2562 res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
2563 g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
2565 ctx->sink_pad_block_id =
2566 gst_pad_add_probe (q_sink,
2567 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
2568 GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2569 (GstPadProbeCallback) handle_mq_input, ctx, NULL);
2571 GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
2572 " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
2574 splitmux->contexts = g_list_append (splitmux->contexts, ctx);
2579 splitmux->have_video = TRUE;
2581 gst_pad_set_active (res, TRUE);
2582 gst_element_add_pad (element, res);
2584 GST_SPLITMUX_UNLOCK (splitmux);
2588 GST_SPLITMUX_UNLOCK (splitmux);
2591 gst_object_unref (q_sink);
2593 gst_object_unref (q_src);
2596 GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
2597 GST_SPLITMUX_UNLOCK (splitmux);
2602 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
2604 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2605 GstPad *muxpad = NULL;
2607 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
2609 GST_SPLITMUX_LOCK (splitmux);
2611 if (splitmux->muxer == NULL)
2612 goto fail; /* Elements don't exist yet - nothing to release */
2614 GST_INFO_OBJECT (pad, "releasing request pad");
2616 muxpad = gst_pad_get_peer (ctx->srcpad);
2618 /* Remove the context from our consideration */
2619 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
2621 if (ctx->sink_pad_block_id)
2622 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
2624 if (ctx->src_pad_block_id)
2625 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
2627 /* Can release the context now */
2628 mq_stream_ctx_free (ctx);
2629 if (ctx == splitmux->reference_ctx)
2630 splitmux->reference_ctx = NULL;
2632 /* Release and free the muxer input */
2634 gst_element_release_request_pad (splitmux->muxer, muxpad);
2635 gst_object_unref (muxpad);
2638 if (GST_PAD_PAD_TEMPLATE (pad) &&
2639 g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
2641 splitmux->have_video = FALSE;
2643 gst_element_remove_pad (element, pad);
2645 /* Reset the internal elements only after all request pads are released */
2646 if (splitmux->contexts == NULL)
2647 gst_splitmux_reset (splitmux);
2650 GST_SPLITMUX_UNLOCK (splitmux);
2654 create_element (GstSplitMuxSink * splitmux,
2655 const gchar * factory, const gchar * name, gboolean locked)
2657 GstElement *ret = gst_element_factory_make (factory, name);
2659 g_warning ("Failed to create %s - splitmuxsink will not work", name);
2664 /* Ensure the sink starts in locked state and NULL - it will be changed
2665 * by the filename setting code */
2666 gst_element_set_locked_state (ret, TRUE);
2667 gst_element_set_state (ret, GST_STATE_NULL);
2670 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
2671 g_warning ("Could not add %s element - splitmuxsink will not work", name);
2672 gst_object_unref (ret);
2680 create_muxer (GstSplitMuxSink * splitmux)
2682 /* Create internal elements */
2683 if (splitmux->muxer == NULL) {
2684 GstElement *provided_muxer = NULL;
2686 GST_OBJECT_LOCK (splitmux);
2687 if (splitmux->provided_muxer != NULL)
2688 provided_muxer = gst_object_ref (splitmux->provided_muxer);
2689 GST_OBJECT_UNLOCK (splitmux);
2691 if ((!splitmux->async_finalize && provided_muxer == NULL) ||
2692 (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
2693 if ((splitmux->muxer =
2694 create_element (splitmux, DEFAULT_MUXER, "muxer", FALSE)) == NULL)
2696 } else if (splitmux->async_finalize) {
2697 if ((splitmux->muxer =
2698 create_element (splitmux, splitmux->muxer_factory, "muxer",
2701 if (splitmux->muxer_properties)
2702 gst_structure_foreach (splitmux->muxer_properties,
2703 _set_property_from_structure, splitmux->muxer);
2705 /* Ensure it's not in locked state (we might be reusing an old element) */
2706 gst_element_set_locked_state (provided_muxer, FALSE);
2707 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
2708 g_warning ("Could not add muxer element - splitmuxsink will not work");
2709 gst_object_unref (provided_muxer);
2713 splitmux->muxer = provided_muxer;
2714 gst_object_unref (provided_muxer);
2717 if (splitmux->use_robust_muxing) {
2718 update_muxer_properties (splitmux);
2728 find_sink (GstElement * e)
2730 GstElement *res = NULL;
2732 gboolean done = FALSE;
2733 GValue data = { 0, };
2735 if (!GST_IS_BIN (e))
2738 if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
2741 iter = gst_bin_iterate_sinks (GST_BIN (e));
2743 switch (gst_iterator_next (iter, &data)) {
2744 case GST_ITERATOR_OK:
2746 GstElement *child = g_value_get_object (&data);
2747 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
2748 "location") != NULL) {
2752 g_value_reset (&data);
2755 case GST_ITERATOR_RESYNC:
2756 gst_iterator_resync (iter);
2758 case GST_ITERATOR_DONE:
2761 case GST_ITERATOR_ERROR:
2762 g_assert_not_reached ();
2766 g_value_unset (&data);
2767 gst_iterator_free (iter);
2773 create_sink (GstSplitMuxSink * splitmux)
2775 GstElement *provided_sink = NULL;
2777 if (splitmux->active_sink == NULL) {
2779 GST_OBJECT_LOCK (splitmux);
2780 if (splitmux->provided_sink != NULL)
2781 provided_sink = gst_object_ref (splitmux->provided_sink);
2782 GST_OBJECT_UNLOCK (splitmux);
2784 if ((!splitmux->async_finalize && provided_sink == NULL) ||
2785 (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
2786 if ((splitmux->sink =
2787 create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
2789 splitmux->active_sink = splitmux->sink;
2790 } else if (splitmux->async_finalize) {
2791 if ((splitmux->sink =
2792 create_element (splitmux, splitmux->sink_factory, "sink",
2795 if (splitmux->sink_properties)
2796 gst_structure_foreach (splitmux->sink_properties,
2797 _set_property_from_structure, splitmux->sink);
2798 splitmux->active_sink = splitmux->sink;
2800 /* Ensure the sink starts in locked state and NULL - it will be changed
2801 * by the filename setting code */
2802 gst_element_set_locked_state (provided_sink, TRUE);
2803 gst_element_set_state (provided_sink, GST_STATE_NULL);
2804 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
2805 g_warning ("Could not add sink elements - splitmuxsink will not work");
2806 gst_object_unref (provided_sink);
2810 splitmux->active_sink = provided_sink;
2812 /* The bin holds a ref now, we can drop our tmp ref */
2813 gst_object_unref (provided_sink);
2815 /* Find the sink element */
2816 splitmux->sink = find_sink (splitmux->active_sink);
2817 if (splitmux->sink == NULL) {
2819 ("Could not locate sink element in provided sink - splitmuxsink will not work");
2825 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2827 /* async child elements are causing state change races and weird
2828 * failures, so let's try and turn that off */
2829 g_object_set (splitmux->sink, "async", FALSE, NULL);
2833 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
2834 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
2845 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
2848 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2850 gchar *fname = NULL;
2854 gst_splitmux_sink_ensure_max_files (splitmux);
2856 if (ctx->cur_out_buffer == NULL) {
2857 GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
2860 caps = gst_pad_get_current_caps (ctx->srcpad);
2861 sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
2862 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
2863 splitmux->fragment_id, sample, &fname);
2864 gst_sample_unref (sample);
2866 gst_caps_unref (caps);
2868 if (fname == NULL) {
2869 /* Fallback to the old signal if the new one returned nothing */
2870 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
2871 splitmux->fragment_id, &fname);
2875 fname = splitmux->location ?
2876 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
2879 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
2880 g_object_set (splitmux->sink, "location", fname, NULL);
2883 splitmux->fragment_id++;
2888 do_async_start (GstSplitMuxSink * splitmux)
2890 GstMessage *message;
2892 if (!splitmux->need_async_start) {
2893 GST_INFO_OBJECT (splitmux, "no async_start needed");
2897 splitmux->async_pending = TRUE;
2899 GST_INFO_OBJECT (splitmux, "Sending async_start message");
2900 message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
2901 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2902 (splitmux), message);
2906 do_async_done (GstSplitMuxSink * splitmux)
2908 GstMessage *message;
2910 if (splitmux->async_pending) {
2911 GST_INFO_OBJECT (splitmux, "Sending async_done message");
2913 gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
2914 GST_CLOCK_TIME_NONE);
2915 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2916 (splitmux), message);
2918 splitmux->async_pending = FALSE;
2921 splitmux->need_async_start = FALSE;
2924 static GstStateChangeReturn
2925 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
2927 GstStateChangeReturn ret;
2928 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2930 switch (transition) {
2931 case GST_STATE_CHANGE_NULL_TO_READY:{
2932 GST_SPLITMUX_LOCK (splitmux);
2933 if (!create_muxer (splitmux) || !create_sink (splitmux)) {
2934 ret = GST_STATE_CHANGE_FAILURE;
2935 GST_SPLITMUX_UNLOCK (splitmux);
2938 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2939 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
2940 GST_SPLITMUX_UNLOCK (splitmux);
2941 splitmux->fragment_id = 0;
2944 case GST_STATE_CHANGE_READY_TO_PAUSED:{
2945 GST_SPLITMUX_LOCK (splitmux);
2946 /* Start by collecting one input on each pad */
2947 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2948 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2949 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
2950 splitmux->gop_start_time = splitmux->fragment_start_time =
2951 GST_CLOCK_STIME_NONE;
2952 splitmux->muxed_out_bytes = 0;
2953 splitmux->ready_for_output = FALSE;
2954 GST_SPLITMUX_UNLOCK (splitmux);
2957 case GST_STATE_CHANGE_PAUSED_TO_READY:
2958 g_atomic_int_set (&(splitmux->split_now), FALSE);
2959 case GST_STATE_CHANGE_READY_TO_NULL:
2960 GST_SPLITMUX_LOCK (splitmux);
2961 splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
2962 splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
2963 /* Wake up any blocked threads */
2964 GST_LOG_OBJECT (splitmux,
2965 "State change -> NULL or READY. Waking threads");
2966 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2967 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2968 GST_SPLITMUX_UNLOCK (splitmux);
2974 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2975 if (ret == GST_STATE_CHANGE_FAILURE)
2978 switch (transition) {
2979 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2980 splitmux->need_async_start = TRUE;
2982 case GST_STATE_CHANGE_READY_TO_PAUSED:{
2983 /* Change state async, because our child sink might not
2984 * be ready to do that for us yet if it's state is still locked */
2986 splitmux->need_async_start = TRUE;
2987 /* we want to go async to PAUSED until we managed to configure and add the
2989 GST_SPLITMUX_LOCK (splitmux);
2990 do_async_start (splitmux);
2991 GST_SPLITMUX_UNLOCK (splitmux);
2992 ret = GST_STATE_CHANGE_ASYNC;
2995 case GST_STATE_CHANGE_READY_TO_NULL:
2996 GST_SPLITMUX_LOCK (splitmux);
2997 splitmux->fragment_id = 0;
2998 /* Reset internal elements only if no pad contexts are using them */
2999 if (splitmux->contexts == NULL)
3000 gst_splitmux_reset (splitmux);
3001 do_async_done (splitmux);
3002 GST_SPLITMUX_UNLOCK (splitmux);
3009 if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
3010 ret == GST_STATE_CHANGE_FAILURE) {
3011 /* Cleanup elements on failed transition out of NULL */
3012 gst_splitmux_reset (splitmux);
3013 GST_SPLITMUX_LOCK (splitmux);
3014 do_async_done (splitmux);
3015 GST_SPLITMUX_UNLOCK (splitmux);
3021 register_splitmuxsink (GstPlugin * plugin)
3023 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
3024 "Split File Muxing Sink");
3026 return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
3027 GST_TYPE_SPLITMUX_SINK);
3031 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
3033 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
3034 splitmux->fragment_id = 0;
3039 split_now (GstSplitMuxSink * splitmux)
3041 g_atomic_int_set (&(splitmux->split_now), TRUE);