1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @short_description: Muxer wrapper for splitting output stream by size or time
24 * This element wraps a muxer and a sink, and starts a new file when the mux
25 * contents are about to cross a threshold of maximum size of maximum time,
26 * splitting at video keyframe boundaries. Exactly one input video stream
27 * can be muxed, with as many accompanying audio and subtitle streams as
30 * By default, it uses mp4mux and filesink, but they can be changed via
31 * the 'muxer' and 'sink' properties.
33 * The minimum file size is 1 GOP, however - so limits may be overrun if the
34 * distance between any 2 keyframes is larger than the limits.
36 * If a video stream is available, the splitting process is driven by the video
37 * stream contents, and the video stream must contain closed GOPs for the output
38 * file parts to be played individually correctly. In the absence of a video
39 * stream, the first available stream is used as reference for synchronization.
41 * In the async-finalize mode, when the threshold is crossed, the old muxer
42 * and sink is disconnected from the pipeline and left to finish the file
43 * asynchronously, and a new muxer and sink is created to continue with the
44 * next fragment. For that reason, instead of muxer and sink objects, the
45 * muxer-factory and sink-factory properties are used to construct the new
46 * objects, together with muxer-properties and sink-properties.
49 * <title>Example pipelines</title>
51 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
53 * Records a video stream captured from a v4l2 device and muxes it into
54 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55 * and 1MB maximum size.
58 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
60 * Records a video stream captured from a v4l2 device and muxer it into
61 * streamable Matroska files, splitting as needed to limit size/duration to 10
62 * seconds. Each file will finalize asynchronously.
71 #include <glib/gstdio.h>
72 #include <gst/video/video.h>
73 #include "gstsplitmuxsink.h"
75 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
76 #define GST_CAT_DEFAULT splitmux_debug
78 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
79 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
80 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
81 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
83 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
84 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
86 static void split_now (GstSplitMuxSink * splitmux);
94 PROP_MAX_SIZE_TIMECODE,
95 PROP_SEND_KEYFRAME_REQUESTS,
98 PROP_USE_ROBUST_MUXING,
99 PROP_ALIGNMENT_THRESHOLD,
105 PROP_MUXER_PROPERTIES,
110 #define DEFAULT_MAX_SIZE_TIME 0
111 #define DEFAULT_MAX_SIZE_BYTES 0
112 #define DEFAULT_MAX_FILES 0
113 #define DEFAULT_MUXER_OVERHEAD 0.02
114 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
115 #define DEFAULT_ALIGNMENT_THRESHOLD 0
116 #define DEFAULT_MUXER "mp4mux"
117 #define DEFAULT_SINK "filesink"
118 #define DEFAULT_USE_ROBUST_MUXING FALSE
119 #define DEFAULT_RESET_MUXER TRUE
120 #define DEFAULT_ASYNC_FINALIZE FALSE
122 typedef struct _AsyncEosHelper
130 SIGNAL_FORMAT_LOCATION,
131 SIGNAL_FORMAT_LOCATION_FULL,
138 static guint signals[SIGNAL_LAST];
140 static GstStaticPadTemplate video_sink_template =
141 GST_STATIC_PAD_TEMPLATE ("video",
144 GST_STATIC_CAPS_ANY);
145 static GstStaticPadTemplate audio_sink_template =
146 GST_STATIC_PAD_TEMPLATE ("audio_%u",
149 GST_STATIC_CAPS_ANY);
150 static GstStaticPadTemplate subtitle_sink_template =
151 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
154 GST_STATIC_CAPS_ANY);
155 static GstStaticPadTemplate caption_sink_template =
156 GST_STATIC_PAD_TEMPLATE ("caption_%u",
159 GST_STATIC_CAPS_ANY);
161 static GQuark PAD_CONTEXT;
162 static GQuark EOS_FROM_US;
163 static GQuark RUNNING_TIME;
164 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
165 * to forward an incoming EOS message, but we cannot rely on the state of the
166 * splitmux anymore, so we set this qdata on the sink instead.
167 * The muxer and sink must be destroyed after both of these things have
169 * 1) The EOS message has been sent when the fragment is ending
170 * 2) The muxer has been unlinked and relinked
171 * Therefore, EOS_FROM_US can have these two values:
172 * 0: EOS was not requested from us. Forward the message. The muxer and the
173 * sink will be destroyed together with the rest of the bin.
174 * 1: EOS was requested from us, but the other of the two tasks hasn't
175 * finished. Set EOS_FROM_US to 2 and do your stuff.
176 * 2: EOS was requested from us and the other of the two tasks has finished.
177 * Now we can destroy the muxer and the sink.
183 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
184 EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
185 RUNNING_TIME = g_quark_from_static_string ("running-time");
188 #define gst_splitmux_sink_parent_class parent_class
189 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
192 static gboolean create_muxer (GstSplitMuxSink * splitmux);
193 static gboolean create_sink (GstSplitMuxSink * splitmux);
194 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
195 const GValue * value, GParamSpec * pspec);
196 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
197 GValue * value, GParamSpec * pspec);
198 static void gst_splitmux_sink_dispose (GObject * object);
199 static void gst_splitmux_sink_finalize (GObject * object);
201 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
202 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
203 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
205 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
206 element, GstStateChange transition);
208 static void bus_handler (GstBin * bin, GstMessage * msg);
209 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
210 static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
211 static void mq_stream_ctx_free (MqStreamCtx * ctx);
212 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
214 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
215 static GstElement *create_element (GstSplitMuxSink * splitmux,
216 const gchar * factory, const gchar * name, gboolean locked);
218 static void do_async_done (GstSplitMuxSink * splitmux);
221 mq_stream_buf_new (void)
223 return g_slice_new0 (MqStreamBuf);
227 mq_stream_buf_free (MqStreamBuf * data)
229 g_slice_free (MqStreamBuf, data);
232 static SplitMuxOutputCommand *
233 out_cmd_buf_new (void)
235 return g_slice_new0 (SplitMuxOutputCommand);
239 out_cmd_buf_free (SplitMuxOutputCommand * data)
241 g_slice_free (SplitMuxOutputCommand, data);
245 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
247 GObjectClass *gobject_class = (GObjectClass *) klass;
248 GstElementClass *gstelement_class = (GstElementClass *) klass;
249 GstBinClass *gstbin_class = (GstBinClass *) klass;
251 gobject_class->set_property = gst_splitmux_sink_set_property;
252 gobject_class->get_property = gst_splitmux_sink_get_property;
253 gobject_class->dispose = gst_splitmux_sink_dispose;
254 gobject_class->finalize = gst_splitmux_sink_finalize;
256 gst_element_class_set_static_metadata (gstelement_class,
257 "Split Muxing Bin", "Generic/Bin/Muxer",
258 "Convenience bin that muxes incoming streams into multiple time/size limited files",
259 "Jan Schmidt <jan@centricular.com>");
261 gst_element_class_add_static_pad_template (gstelement_class,
262 &video_sink_template);
263 gst_element_class_add_static_pad_template (gstelement_class,
264 &audio_sink_template);
265 gst_element_class_add_static_pad_template (gstelement_class,
266 &subtitle_sink_template);
267 gst_element_class_add_static_pad_template (gstelement_class,
268 &caption_sink_template);
270 gstelement_class->change_state =
271 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
272 gstelement_class->request_new_pad =
273 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
274 gstelement_class->release_pad =
275 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
277 gstbin_class->handle_message = bus_handler;
279 g_object_class_install_property (gobject_class, PROP_LOCATION,
280 g_param_spec_string ("location", "File Output Pattern",
281 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
282 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
283 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
284 g_param_spec_double ("mux-overhead", "Muxing Overhead",
285 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
286 DEFAULT_MUXER_OVERHEAD,
287 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
289 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
290 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
291 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
292 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
293 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
294 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
295 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
296 DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
297 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
298 g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
299 "Maximum difference in timecode between first and last frame. "
300 "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
301 "Will only be effective if a timecode track is present.",
302 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
303 g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
304 g_param_spec_boolean ("send-keyframe-requests",
305 "Request keyframes at max-size-time",
306 "Request a keyframe every max-size-time ns to try splitting at that point. "
307 "Needs max-size-bytes to be 0 in order to be effective.",
308 DEFAULT_SEND_KEYFRAME_REQUESTS,
309 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
310 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
311 g_param_spec_uint ("max-files", "Max files",
312 "Maximum number of files to keep on disk. Once the maximum is reached,"
313 "old files start to be deleted to make room for new ones.", 0,
314 G_MAXUINT, DEFAULT_MAX_FILES,
315 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
316 g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
317 g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
318 "Allow non-reference streams to be that many ns before the reference"
320 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
321 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
323 g_object_class_install_property (gobject_class, PROP_MUXER,
324 g_param_spec_object ("muxer", "Muxer",
325 "The muxer element to use (NULL = default mp4mux). "
326 "Valid only for async-finalize = FALSE",
327 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328 g_object_class_install_property (gobject_class, PROP_SINK,
329 g_param_spec_object ("sink", "Sink",
330 "The sink element (or element chain) to use (NULL = default filesink). "
331 "Valid only for async-finalize = FALSE",
332 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
334 g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
335 g_param_spec_boolean ("use-robust-muxing",
336 "Support robust-muxing mode of some muxers",
337 "Check if muxers support robust muxing via the reserved-max-duration and "
338 "reserved-duration-remaining properties and use them if so. "
339 "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
340 " create new fragments if the reserved header space is about to overflow. "
341 "Note this does not set reserved-moov-update-period - apps should do that manually",
342 DEFAULT_USE_ROBUST_MUXING,
343 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
345 g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
346 g_param_spec_boolean ("reset-muxer",
348 "Reset the muxer after each segment. Disabling this will not work for most muxers.",
349 DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
351 g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
352 g_param_spec_boolean ("async-finalize",
353 "Finalize fragments asynchronously",
354 "Finalize each fragment asynchronously and start a new one",
355 DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356 g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
357 g_param_spec_string ("muxer-factory", "Muxer factory",
358 "The muxer element factory to use (default = mp4mux). "
359 "Valid only for async-finalize = TRUE",
360 "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
361 g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
362 g_param_spec_boxed ("muxer-properties", "Muxer properties",
363 "The muxer element properties to use. "
364 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
365 "Valid only for async-finalize = TRUE",
366 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
367 g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
368 g_param_spec_string ("sink-factory", "Sink factory",
369 "The sink element factory to use (default = filesink). "
370 "Valid only for async-finalize = TRUE",
371 "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
372 g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
373 g_param_spec_boxed ("sink-properties", "Sink properties",
374 "The sink element properties to use. "
375 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
376 "Valid only for async-finalize = TRUE",
377 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
380 * GstSplitMuxSink::format-location:
381 * @splitmux: the #GstSplitMuxSink
382 * @fragment_id: the sequence number of the file to be created
384 * Returns: the location to be used for the next output file
386 signals[SIGNAL_FORMAT_LOCATION] =
387 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
388 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
391 * GstSplitMuxSink::format-location-full:
392 * @splitmux: the #GstSplitMuxSink
393 * @fragment_id: the sequence number of the file to be created
394 * @first_sample: A #GstSample containing the first buffer
395 * from the reference stream in the new file
397 * Returns: the location to be used for the next output file
399 signals[SIGNAL_FORMAT_LOCATION_FULL] =
400 g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
401 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
405 * GstSplitMuxSink::split-now:
406 * @splitmux: the #GstSplitMuxSink
408 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
413 signals[SIGNAL_SPLIT_NOW] =
414 g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
415 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
416 split_now), NULL, NULL, NULL, G_TYPE_NONE, 0);
419 * GstSplitMuxSink::muxer-added:
420 * @splitmux: the #GstSplitMuxSink
421 * @muxer: the newly added muxer element
425 signals[SIGNAL_MUXER_ADDED] =
426 g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
427 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
430 * GstSplitMuxSink::sink-added:
431 * @splitmux: the #GstSplitMuxSink
432 * @sink: the newly added sink element
436 signals[SIGNAL_SINK_ADDED] =
437 g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
438 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
440 klass->split_now = split_now;
444 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
446 g_mutex_init (&splitmux->lock);
447 g_cond_init (&splitmux->input_cond);
448 g_cond_init (&splitmux->output_cond);
449 g_queue_init (&splitmux->out_cmd_q);
451 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
452 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
453 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
454 splitmux->max_files = DEFAULT_MAX_FILES;
455 splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
456 splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
457 splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
458 splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
459 splitmux->reset_muxer = DEFAULT_RESET_MUXER;
461 splitmux->threshold_timecode_str = NULL;
463 splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
464 splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
465 splitmux->muxer_properties = NULL;
466 splitmux->sink_factory = g_strdup (DEFAULT_SINK);
467 splitmux->sink_properties = NULL;
469 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
470 splitmux->split_now = FALSE;
474 gst_splitmux_reset (GstSplitMuxSink * splitmux)
476 if (splitmux->muxer) {
477 gst_element_set_locked_state (splitmux->muxer, TRUE);
478 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
479 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
481 if (splitmux->active_sink) {
482 gst_element_set_locked_state (splitmux->active_sink, TRUE);
483 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
484 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
487 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
491 gst_splitmux_sink_dispose (GObject * object)
493 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
495 /* Calling parent dispose invalidates all child pointers */
496 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
498 G_OBJECT_CLASS (parent_class)->dispose (object);
502 gst_splitmux_sink_finalize (GObject * object)
504 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
505 g_cond_clear (&splitmux->input_cond);
506 g_cond_clear (&splitmux->output_cond);
507 g_mutex_clear (&splitmux->lock);
508 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
509 g_queue_clear (&splitmux->out_cmd_q);
511 if (splitmux->provided_sink)
512 gst_object_unref (splitmux->provided_sink);
513 if (splitmux->provided_muxer)
514 gst_object_unref (splitmux->provided_muxer);
516 if (splitmux->muxer_factory)
517 g_free (splitmux->muxer_factory);
518 if (splitmux->muxer_properties)
519 gst_structure_free (splitmux->muxer_properties);
520 if (splitmux->sink_factory)
521 g_free (splitmux->sink_factory);
522 if (splitmux->sink_properties)
523 gst_structure_free (splitmux->sink_properties);
525 if (splitmux->threshold_timecode_str)
526 g_free (splitmux->threshold_timecode_str);
528 g_free (splitmux->location);
530 /* Make sure to free any un-released contexts. There should not be any,
531 * because the dispose will have freed all request pads though */
532 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
533 g_list_free (splitmux->contexts);
535 G_OBJECT_CLASS (parent_class)->finalize (object);
539 * Set any time threshold to the muxer, if it has
540 * reserved-max-duration and reserved-duration-remaining
541 * properties. Called when creating/claiming the muxer
542 * in create_elements() */
544 update_muxer_properties (GstSplitMuxSink * sink)
547 GstClockTime threshold_time;
549 sink->muxer_has_reserved_props = FALSE;
550 if (sink->muxer == NULL)
552 klass = G_OBJECT_GET_CLASS (sink->muxer);
553 if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
555 if (g_object_class_find_property (klass,
556 "reserved-duration-remaining") == NULL)
558 sink->muxer_has_reserved_props = TRUE;
560 GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
561 GST_TIME_ARGS (sink->threshold_time));
562 GST_OBJECT_LOCK (sink);
563 threshold_time = sink->threshold_time;
564 GST_OBJECT_UNLOCK (sink);
566 if (threshold_time > 0) {
567 /* Tell the muxer how much space to reserve */
568 GstClockTime muxer_threshold = threshold_time;
569 g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
574 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
575 const GValue * value, GParamSpec * pspec)
577 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
581 GST_OBJECT_LOCK (splitmux);
582 g_free (splitmux->location);
583 splitmux->location = g_value_dup_string (value);
584 GST_OBJECT_UNLOCK (splitmux);
587 case PROP_MAX_SIZE_BYTES:
588 GST_OBJECT_LOCK (splitmux);
589 splitmux->threshold_bytes = g_value_get_uint64 (value);
590 GST_OBJECT_UNLOCK (splitmux);
592 case PROP_MAX_SIZE_TIME:
593 GST_OBJECT_LOCK (splitmux);
594 splitmux->threshold_time = g_value_get_uint64 (value);
595 GST_OBJECT_UNLOCK (splitmux);
597 case PROP_MAX_SIZE_TIMECODE:
598 GST_OBJECT_LOCK (splitmux);
599 splitmux->threshold_timecode_str = g_value_dup_string (value);
600 GST_OBJECT_UNLOCK (splitmux);
602 case PROP_SEND_KEYFRAME_REQUESTS:
603 GST_OBJECT_LOCK (splitmux);
604 splitmux->send_keyframe_requests = g_value_get_boolean (value);
605 GST_OBJECT_UNLOCK (splitmux);
608 GST_OBJECT_LOCK (splitmux);
609 splitmux->max_files = g_value_get_uint (value);
610 GST_OBJECT_UNLOCK (splitmux);
612 case PROP_MUXER_OVERHEAD:
613 GST_OBJECT_LOCK (splitmux);
614 splitmux->mux_overhead = g_value_get_double (value);
615 GST_OBJECT_UNLOCK (splitmux);
617 case PROP_USE_ROBUST_MUXING:
618 GST_OBJECT_LOCK (splitmux);
619 splitmux->use_robust_muxing = g_value_get_boolean (value);
620 GST_OBJECT_UNLOCK (splitmux);
621 if (splitmux->use_robust_muxing)
622 update_muxer_properties (splitmux);
624 case PROP_ALIGNMENT_THRESHOLD:
625 GST_OBJECT_LOCK (splitmux);
626 splitmux->alignment_threshold = g_value_get_uint64 (value);
627 GST_OBJECT_UNLOCK (splitmux);
630 GST_OBJECT_LOCK (splitmux);
631 if (splitmux->provided_sink)
632 gst_object_unref (splitmux->provided_sink);
633 splitmux->provided_sink = g_value_get_object (value);
634 gst_object_ref_sink (splitmux->provided_sink);
635 GST_OBJECT_UNLOCK (splitmux);
638 GST_OBJECT_LOCK (splitmux);
639 if (splitmux->provided_muxer)
640 gst_object_unref (splitmux->provided_muxer);
641 splitmux->provided_muxer = g_value_get_object (value);
642 gst_object_ref_sink (splitmux->provided_muxer);
643 GST_OBJECT_UNLOCK (splitmux);
645 case PROP_RESET_MUXER:
646 GST_OBJECT_LOCK (splitmux);
647 splitmux->reset_muxer = g_value_get_boolean (value);
648 GST_OBJECT_UNLOCK (splitmux);
650 case PROP_ASYNC_FINALIZE:
651 GST_OBJECT_LOCK (splitmux);
652 splitmux->async_finalize = g_value_get_boolean (value);
653 GST_OBJECT_UNLOCK (splitmux);
655 case PROP_MUXER_FACTORY:
656 GST_OBJECT_LOCK (splitmux);
657 if (splitmux->muxer_factory)
658 g_free (splitmux->muxer_factory);
659 splitmux->muxer_factory = g_value_dup_string (value);
660 GST_OBJECT_UNLOCK (splitmux);
662 case PROP_MUXER_PROPERTIES:
663 GST_OBJECT_LOCK (splitmux);
664 if (splitmux->muxer_properties)
665 gst_structure_free (splitmux->muxer_properties);
666 if (gst_value_get_structure (value))
667 splitmux->muxer_properties =
668 gst_structure_copy (gst_value_get_structure (value));
670 splitmux->muxer_properties = NULL;
671 GST_OBJECT_UNLOCK (splitmux);
673 case PROP_SINK_FACTORY:
674 GST_OBJECT_LOCK (splitmux);
675 if (splitmux->sink_factory)
676 g_free (splitmux->sink_factory);
677 splitmux->sink_factory = g_value_dup_string (value);
678 GST_OBJECT_UNLOCK (splitmux);
680 case PROP_SINK_PROPERTIES:
681 GST_OBJECT_LOCK (splitmux);
682 if (splitmux->sink_properties)
683 gst_structure_free (splitmux->sink_properties);
684 if (gst_value_get_structure (value))
685 splitmux->sink_properties =
686 gst_structure_copy (gst_value_get_structure (value));
688 splitmux->sink_properties = NULL;
689 GST_OBJECT_UNLOCK (splitmux);
692 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
698 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
699 GValue * value, GParamSpec * pspec)
701 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
705 GST_OBJECT_LOCK (splitmux);
706 g_value_set_string (value, splitmux->location);
707 GST_OBJECT_UNLOCK (splitmux);
709 case PROP_MAX_SIZE_BYTES:
710 GST_OBJECT_LOCK (splitmux);
711 g_value_set_uint64 (value, splitmux->threshold_bytes);
712 GST_OBJECT_UNLOCK (splitmux);
714 case PROP_MAX_SIZE_TIME:
715 GST_OBJECT_LOCK (splitmux);
716 g_value_set_uint64 (value, splitmux->threshold_time);
717 GST_OBJECT_UNLOCK (splitmux);
719 case PROP_MAX_SIZE_TIMECODE:
720 GST_OBJECT_LOCK (splitmux);
721 g_value_set_string (value, splitmux->threshold_timecode_str);
722 GST_OBJECT_UNLOCK (splitmux);
724 case PROP_SEND_KEYFRAME_REQUESTS:
725 GST_OBJECT_LOCK (splitmux);
726 g_value_set_boolean (value, splitmux->send_keyframe_requests);
727 GST_OBJECT_UNLOCK (splitmux);
730 GST_OBJECT_LOCK (splitmux);
731 g_value_set_uint (value, splitmux->max_files);
732 GST_OBJECT_UNLOCK (splitmux);
734 case PROP_MUXER_OVERHEAD:
735 GST_OBJECT_LOCK (splitmux);
736 g_value_set_double (value, splitmux->mux_overhead);
737 GST_OBJECT_UNLOCK (splitmux);
739 case PROP_USE_ROBUST_MUXING:
740 GST_OBJECT_LOCK (splitmux);
741 g_value_set_boolean (value, splitmux->use_robust_muxing);
742 GST_OBJECT_UNLOCK (splitmux);
744 case PROP_ALIGNMENT_THRESHOLD:
745 GST_OBJECT_LOCK (splitmux);
746 g_value_set_uint64 (value, splitmux->alignment_threshold);
747 GST_OBJECT_UNLOCK (splitmux);
750 GST_OBJECT_LOCK (splitmux);
751 g_value_set_object (value, splitmux->provided_sink);
752 GST_OBJECT_UNLOCK (splitmux);
755 GST_OBJECT_LOCK (splitmux);
756 g_value_set_object (value, splitmux->provided_muxer);
757 GST_OBJECT_UNLOCK (splitmux);
759 case PROP_RESET_MUXER:
760 GST_OBJECT_LOCK (splitmux);
761 g_value_set_boolean (value, splitmux->reset_muxer);
762 GST_OBJECT_UNLOCK (splitmux);
764 case PROP_ASYNC_FINALIZE:
765 GST_OBJECT_LOCK (splitmux);
766 g_value_set_boolean (value, splitmux->async_finalize);
767 GST_OBJECT_UNLOCK (splitmux);
769 case PROP_MUXER_FACTORY:
770 GST_OBJECT_LOCK (splitmux);
771 g_value_set_string (value, splitmux->muxer_factory);
772 GST_OBJECT_UNLOCK (splitmux);
774 case PROP_MUXER_PROPERTIES:
775 GST_OBJECT_LOCK (splitmux);
776 gst_value_set_structure (value, splitmux->muxer_properties);
777 GST_OBJECT_UNLOCK (splitmux);
779 case PROP_SINK_FACTORY:
780 GST_OBJECT_LOCK (splitmux);
781 g_value_set_string (value, splitmux->sink_factory);
782 GST_OBJECT_UNLOCK (splitmux);
784 case PROP_SINK_PROPERTIES:
785 GST_OBJECT_LOCK (splitmux);
786 gst_value_set_structure (value, splitmux->sink_properties);
787 GST_OBJECT_UNLOCK (splitmux);
790 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
795 /* Convenience function */
796 static inline GstClockTimeDiff
797 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
799 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
801 if (GST_CLOCK_TIME_IS_VALID (val)) {
803 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
813 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
817 ctx = g_new0 (MqStreamCtx, 1);
818 ctx->splitmux = splitmux;
819 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
820 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
821 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
822 g_queue_init (&ctx->queued_bufs);
827 mq_stream_ctx_free (MqStreamCtx * ctx)
830 GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
832 g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
834 if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
835 gst_element_set_locked_state (ctx->q, TRUE);
836 gst_element_set_state (ctx->q, GST_STATE_NULL);
837 gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
838 gst_object_unref (parent);
840 gst_object_unref (ctx->q);
842 gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
843 gst_object_unref (ctx->sinkpad);
844 gst_object_unref (ctx->srcpad);
845 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
846 g_queue_clear (&ctx->queued_bufs);
851 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
854 gchar *location = NULL;
856 const gchar *msg_name = opened ?
857 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
858 GstClockTime running_time = splitmux->reference_ctx->out_running_time;
861 GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
863 running_time = *rtime;
866 g_object_get (sink, "location", &location, NULL);
868 /* If it's in the middle of a teardown, the reference_ctc might have become
870 if (splitmux->reference_ctx) {
871 msg = gst_message_new_element (GST_OBJECT (splitmux),
872 gst_structure_new (msg_name,
873 "location", G_TYPE_STRING, location,
874 "running-time", GST_TYPE_CLOCK_TIME, running_time, NULL));
875 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
882 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
888 eos = gst_event_new_eos ();
892 GST_SPLITMUX_LOCK (splitmux);
894 pad = gst_pad_get_peer (ctx->srcpad);
895 GST_SPLITMUX_UNLOCK (splitmux);
897 gst_pad_send_event (pad, eos);
898 GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
900 gst_object_unref (pad);
904 /* Called with lock held, drops the lock to send EOS to the
908 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
913 eos = gst_event_new_eos ();
914 pad = gst_pad_get_peer (ctx->srcpad);
918 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
919 GST_SPLITMUX_UNLOCK (splitmux);
920 gst_pad_send_event (pad, eos);
921 GST_SPLITMUX_LOCK (splitmux);
923 gst_object_unref (pad);
926 /* Called with lock held. Schedules an EOS event to the ctx pad
927 * to happen in another thread */
929 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
931 AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
932 GstPad *srcpad, *sinkpad;
934 srcpad = ctx->srcpad;
935 sinkpad = gst_pad_get_peer (srcpad);
938 helper->pad = sinkpad; /* Takes the reference */
940 ctx->out_eos_async_done = TRUE;
941 /* HACK: Here, we explicitly unset the SINK flag on the target sink element
942 * that's about to be asynchronously disposed, so that it no longer
943 * participates in GstBin EOS logic. This fixes a race where if
944 * splitmuxsink really reaches EOS before an asynchronous background
945 * element has finished, then the bin won't actually send EOS to the
946 * pipeline. Even after finishing and removing the old element, the
947 * bin doesn't re-check EOS status on removing a SINK element. This
948 * should be fixed in core, making this hack unnecessary. */
949 GST_OBJECT_FLAG_UNSET (splitmux->active_sink, GST_ELEMENT_FLAG_SINK);
951 GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
954 g_assert_nonnull (helper->pad);
955 gst_element_call_async (GST_ELEMENT (splitmux),
956 (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
959 /* Called with lock held. TRUE iff all contexts have a
960 * pending (or delivered) async eos event */
962 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
967 for (item = splitmux->contexts; item; item = item->next) {
968 MqStreamCtx *ctx = item->data;
969 ret &= ctx->out_eos_async_done;
974 /* Called with splitmux lock held to check if this output
975 * context needs to sleep to wait for the release of the
976 * next GOP, or to send EOS to close out the current file
979 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
981 if (ctx->caps_change)
985 /* When first starting up, the reference stream has to output
986 * the first buffer to prepare the muxer and sink */
987 gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
988 GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
990 if (!(splitmux->max_out_running_time == 0 ||
991 splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
992 splitmux->alignment_threshold == 0 ||
993 splitmux->max_out_running_time < splitmux->alignment_threshold)) {
994 my_max_out_running_time -= splitmux->alignment_threshold;
995 GST_LOG_OBJECT (ctx->srcpad,
996 "Max out running time currently %" GST_STIME_FORMAT
997 ", with threshold applied it is %" GST_STIME_FORMAT,
998 GST_STIME_ARGS (splitmux->max_out_running_time),
999 GST_STIME_ARGS (my_max_out_running_time));
1003 || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1006 GST_LOG_OBJECT (ctx->srcpad,
1007 "Checking running time %" GST_STIME_FORMAT " against max %"
1008 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1009 GST_STIME_ARGS (my_max_out_running_time));
1012 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1013 ctx->out_running_time < my_max_out_running_time) {
1017 switch (splitmux->output_state) {
1018 case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1019 /* We only get here if we've finished outputting a GOP and need to know
1020 * what to do next */
1021 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1022 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1025 case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1026 /* We've reached the max out running_time to get here, so end this file now */
1027 if (ctx->out_eos == FALSE) {
1028 if (splitmux->async_finalize) {
1029 /* We must set EOS asynchronously at this point. We cannot defer
1030 * it, because we need all contexts to wake up, for the
1031 * reference context to eventually give us something at
1032 * START_NEXT_FILE. Otherwise, collectpads might choose another
1033 * context to give us the first buffer, and format-location-full
1034 * will not contain a valid sample. */
1035 g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1036 GINT_TO_POINTER (1));
1037 eos_context_async (ctx, splitmux);
1038 if (all_contexts_are_async_eos (splitmux)) {
1039 GST_INFO_OBJECT (splitmux,
1040 "All contexts are async_eos. Moving to the next file.");
1041 /* We can start the next file once we've asked each pad to go EOS */
1042 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1043 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1047 send_eos (splitmux, ctx);
1051 GST_INFO_OBJECT (splitmux,
1052 "At end-of-file state, but context %p is already EOS", ctx);
1055 case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1056 if (ctx->is_reference) {
1057 /* Special handling on the reference ctx to start new fragments
1058 * and collect commands from the command queue */
1059 /* drops the splitmux lock briefly: */
1060 /* We must have reference ctx in order for format-location-full to
1062 start_next_fragment (splitmux, ctx);
1066 case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1068 SplitMuxOutputCommand *cmd =
1069 g_queue_pop_tail (&splitmux->out_cmd_q);
1071 /* If we pop the last command, we need to make our queues bigger */
1072 if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1073 grow_blocked_queues (splitmux);
1075 if (cmd->start_new_fragment) {
1076 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1077 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1079 GST_DEBUG_OBJECT (splitmux,
1080 "Got new output cmd for time %" GST_STIME_FORMAT,
1081 GST_STIME_ARGS (cmd->max_output_ts));
1083 /* Extend the output range immediately */
1084 splitmux->max_out_running_time = cmd->max_output_ts;
1085 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1087 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1089 out_cmd_buf_free (cmd);
1092 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1094 } while (splitmux->output_state ==
1095 SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1096 /* loop and re-check the state */
1099 case SPLITMUX_OUTPUT_STATE_STOPPED:
1104 GST_INFO_OBJECT (ctx->srcpad,
1105 "Sleeping for running time %"
1106 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1107 GST_STIME_ARGS (ctx->out_running_time),
1108 GST_STIME_ARGS (splitmux->max_out_running_time));
1109 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1110 GST_INFO_OBJECT (ctx->srcpad,
1111 "Woken for new max running time %" GST_STIME_FORMAT,
1112 GST_STIME_ARGS (splitmux->max_out_running_time));
1118 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1119 const GstVideoTimeCode * cur_tc)
1121 GstVideoTimeCode *target_tc;
1122 GstVideoTimeCodeInterval *tc_inter;
1123 GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1125 if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
1126 return GST_CLOCK_TIME_NONE;
1129 gst_video_time_code_interval_new_from_string
1130 (splitmux->threshold_timecode_str);
1131 target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
1132 gst_video_time_code_interval_free (tc_inter);
1135 target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1136 cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1138 /* Add fragment_start_time, accounting for wraparound */
1139 if (target_tc_time >= cur_tc_time) {
1141 target_tc_time - cur_tc_time + splitmux->fragment_start_time;
1143 GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1146 day_in_ns - cur_tc_time + target_tc_time +
1147 splitmux->fragment_start_time;
1149 GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
1150 " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
1151 GST_TIME_ARGS (cur_tc_time));
1152 gst_video_time_code_free (target_tc);
1154 return next_max_tc_time;
1158 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
1161 GstClockTime target_time;
1162 gboolean timecode_based = FALSE;
1164 splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
1165 if (splitmux->threshold_timecode_str) {
1166 GstVideoTimeCodeMeta *tc_meta;
1168 if (buffer != NULL) {
1169 tc_meta = gst_buffer_get_video_time_code_meta (buffer);
1171 splitmux->next_max_tc_time =
1172 calculate_next_max_timecode (splitmux, &tc_meta->tc);
1173 timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
1176 /* This can happen in the presence of GAP events that trigger
1177 * a new fragment start */
1178 GST_WARNING_OBJECT (splitmux,
1179 "No buffer available to calculate next timecode");
1183 if (splitmux->send_keyframe_requests == FALSE
1184 || (splitmux->threshold_time == 0 && !timecode_based)
1185 || splitmux->threshold_bytes != 0)
1188 if (timecode_based) {
1189 /* We might have rounding errors: aim slightly earlier */
1190 target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
1192 target_time = splitmux->fragment_start_time + splitmux->threshold_time;
1194 ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1195 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
1196 GST_TIME_ARGS (target_time));
1197 return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1200 static GstPadProbeReturn
1201 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1203 GstSplitMuxSink *splitmux = ctx->splitmux;
1204 MqStreamBuf *buf_info = NULL;
1206 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1208 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1209 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1210 g_warning ("Buffer list handling not implemented");
1211 return GST_PAD_PROBE_DROP;
1213 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1214 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1215 GstEvent *event = gst_pad_probe_info_get_event (info);
1216 gboolean locked = FALSE;
1218 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1220 switch (GST_EVENT_TYPE (event)) {
1221 case GST_EVENT_SEGMENT:
1222 gst_event_copy_segment (event, &ctx->out_segment);
1224 case GST_EVENT_FLUSH_STOP:
1225 GST_SPLITMUX_LOCK (splitmux);
1227 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1228 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1229 g_queue_clear (&ctx->queued_bufs);
1230 ctx->flushing = FALSE;
1232 case GST_EVENT_FLUSH_START:
1233 GST_SPLITMUX_LOCK (splitmux);
1235 GST_LOG_OBJECT (pad, "Flush start");
1236 ctx->flushing = TRUE;
1237 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1240 GST_SPLITMUX_LOCK (splitmux);
1242 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1244 ctx->out_eos = TRUE;
1245 GST_INFO_OBJECT (splitmux,
1246 "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1248 case GST_EVENT_GAP:{
1249 GstClockTime gap_ts;
1250 GstClockTimeDiff rtime;
1252 gst_event_parse_gap (event, &gap_ts, NULL);
1253 if (gap_ts == GST_CLOCK_TIME_NONE)
1256 GST_SPLITMUX_LOCK (splitmux);
1259 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1262 /* When we get a gap event on the
1263 * reference stream and we're trying to open a
1264 * new file, we need to store it until we get
1265 * the buffer afterwards
1267 if (ctx->is_reference &&
1268 (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1269 GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1270 gst_event_replace (&ctx->pending_gap, event);
1271 GST_SPLITMUX_UNLOCK (splitmux);
1272 return GST_PAD_PROBE_HANDLED;
1275 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1277 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1278 GST_STIME_ARGS (rtime));
1280 if (rtime != GST_CLOCK_STIME_NONE) {
1281 ctx->out_running_time = rtime;
1282 complete_or_wait_on_out (splitmux, ctx);
1286 case GST_EVENT_CUSTOM_DOWNSTREAM:{
1287 const GstStructure *s;
1288 GstClockTimeDiff ts = 0;
1290 s = gst_event_get_structure (event);
1291 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1294 gst_structure_get_int64 (s, "timestamp", &ts);
1296 GST_SPLITMUX_LOCK (splitmux);
1299 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1301 ctx->out_running_time = ts;
1302 if (!ctx->is_reference)
1303 complete_or_wait_on_out (splitmux, ctx);
1304 GST_SPLITMUX_UNLOCK (splitmux);
1305 return GST_PAD_PROBE_DROP;
1307 case GST_EVENT_CAPS:{
1310 if (!ctx->is_reference)
1313 peer = gst_pad_get_peer (pad);
1315 gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1317 gst_object_unref (peer);
1325 /* This is in the case the muxer doesn't allow this change of caps */
1326 GST_SPLITMUX_LOCK (splitmux);
1328 ctx->caps_change = TRUE;
1330 if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1331 GST_DEBUG_OBJECT (splitmux,
1332 "New caps were not accepted. Switching output file");
1333 if (ctx->out_eos == FALSE) {
1334 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1335 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1339 /* Lets it fall through, if it fails again, then the muxer just can't
1340 * support this format, but at least we have a closed file.
1348 /* We need to make sure events aren't passed
1349 * until the muxer / sink are ready for it */
1351 GST_SPLITMUX_LOCK (splitmux);
1352 if (!ctx->is_reference)
1353 complete_or_wait_on_out (splitmux, ctx);
1354 GST_SPLITMUX_UNLOCK (splitmux);
1356 /* Don't try to forward sticky events before the next buffer is there
1357 * because it would cause a new file to be created without the first
1358 * buffer being available.
1360 if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1361 gst_event_unref (event);
1362 return GST_PAD_PROBE_HANDLED;
1364 return GST_PAD_PROBE_PASS;
1367 /* Allow everything through until the configured next stopping point */
1368 GST_SPLITMUX_LOCK (splitmux);
1370 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1371 if (buf_info == NULL)
1372 /* Can only happen due to a poorly timed flush */
1375 /* If we have popped a keyframe, decrement the queued_gop count */
1376 if (buf_info->keyframe && splitmux->queued_keyframes > 0)
1377 splitmux->queued_keyframes--;
1379 ctx->out_running_time = buf_info->run_ts;
1380 ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1382 GST_LOG_OBJECT (splitmux,
1383 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1384 " size %" G_GUINT64_FORMAT,
1385 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1387 ctx->caps_change = FALSE;
1389 complete_or_wait_on_out (splitmux, ctx);
1391 splitmux->muxed_out_bytes += buf_info->buf_size;
1393 #ifndef GST_DISABLE_GST_DEBUG
1395 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1396 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1397 " run ts %" GST_STIME_FORMAT, buf,
1398 GST_STIME_ARGS (ctx->out_running_time));
1402 ctx->cur_out_buffer = NULL;
1403 GST_SPLITMUX_UNLOCK (splitmux);
1405 /* pending_gap is protected by the STREAM lock */
1406 if (ctx->pending_gap) {
1407 /* If we previously stored a gap event, send it now */
1408 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1410 GST_DEBUG_OBJECT (splitmux,
1411 "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1413 gst_pad_send_event (peer, ctx->pending_gap);
1414 ctx->pending_gap = NULL;
1416 gst_object_unref (peer);
1419 mq_stream_buf_free (buf_info);
1421 return GST_PAD_PROBE_PASS;
1424 GST_SPLITMUX_UNLOCK (splitmux);
1425 return GST_PAD_PROBE_DROP;
1429 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1431 return gst_pad_send_event (peer, gst_event_ref (*event));
1435 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1437 if (ctx->fragment_block_id > 0) {
1438 gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1439 ctx->fragment_block_id = 0;
1444 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1446 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1448 gst_pad_sticky_events_foreach (ctx->srcpad,
1449 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1451 /* Clear EOS flag if not actually EOS */
1452 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1453 ctx->out_eos_async_done = ctx->out_eos;
1455 gst_object_unref (peer);
1459 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1461 GstPad *sinkpad, *srcpad, *newpad;
1462 GstPadTemplate *templ;
1464 srcpad = ctx->srcpad;
1465 sinkpad = gst_pad_get_peer (srcpad);
1467 templ = sinkpad->padtemplate;
1469 gst_element_request_pad (splitmux->muxer, templ,
1470 GST_PAD_TEMPLATE_NAME_TEMPLATE (templ), NULL);
1472 GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1474 if (!gst_pad_unlink (srcpad, sinkpad)) {
1475 gst_object_unref (sinkpad);
1478 if (gst_pad_link_full (srcpad, newpad,
1479 GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1480 gst_element_release_request_pad (splitmux->muxer, newpad);
1481 gst_object_unref (sinkpad);
1482 gst_object_unref (newpad);
1485 gst_object_unref (newpad);
1486 gst_object_unref (sinkpad);
1490 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1491 ("Could not create the new muxer/sink"), NULL);
1494 static GstPadProbeReturn
1495 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1497 return GST_PAD_PROBE_OK;
1501 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1503 ctx->fragment_block_id =
1504 gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1509 _set_property_from_structure (GQuark field_id, const GValue * value,
1512 const gchar *property_name = g_quark_to_string (field_id);
1513 GObject *element = G_OBJECT (user_data);
1515 g_object_set_property (element, property_name, value);
1521 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1523 gst_element_set_locked_state (element, TRUE);
1524 gst_element_set_state (element, GST_STATE_NULL);
1525 GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1526 gst_bin_remove (GST_BIN (splitmux), element);
1531 _send_event (const GValue * value, gpointer user_data)
1533 GstPad *pad = g_value_get_object (value);
1534 GstEvent *ev = user_data;
1536 gst_pad_send_event (pad, gst_event_ref (ev));
1539 /* Called with lock held when a fragment
1540 * reaches EOS and it is time to restart
1544 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1546 GstElement *muxer, *sink;
1548 g_assert (ctx->is_reference);
1550 /* 1 change to new file */
1551 splitmux->switching_fragment = TRUE;
1553 /* We need to drop the splitmux lock to acquire the state lock
1554 * here and ensure there's no racy state change going on elsewhere */
1555 muxer = gst_object_ref (splitmux->muxer);
1556 sink = gst_object_ref (splitmux->active_sink);
1558 GST_SPLITMUX_UNLOCK (splitmux);
1559 GST_STATE_LOCK (splitmux);
1561 if (splitmux->async_finalize) {
1562 if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id != 0) {
1564 GstElement *new_sink, *new_muxer;
1566 GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
1567 splitmux->fragment_id);
1568 g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
1569 newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
1570 GST_SPLITMUX_LOCK (splitmux);
1571 if ((splitmux->sink =
1572 create_element (splitmux, splitmux->sink_factory, newname,
1575 if (splitmux->sink_properties)
1576 gst_structure_foreach (splitmux->sink_properties,
1577 _set_property_from_structure, splitmux->sink);
1578 splitmux->active_sink = splitmux->sink;
1579 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
1581 newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
1582 if ((splitmux->muxer =
1583 create_element (splitmux, splitmux->muxer_factory, newname,
1586 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
1588 /* async child elements are causing state change races and weird
1589 * failures, so let's try and turn that off */
1590 g_object_set (splitmux->sink, "async", FALSE, NULL);
1592 if (splitmux->muxer_properties)
1593 gst_structure_foreach (splitmux->muxer_properties,
1594 _set_property_from_structure, splitmux->muxer);
1595 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
1597 new_sink = splitmux->sink;
1598 new_muxer = splitmux->muxer;
1599 GST_SPLITMUX_UNLOCK (splitmux);
1600 g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
1601 gst_element_link (new_muxer, new_sink);
1603 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1604 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1605 EOS_FROM_US)) == 2) {
1606 _lock_and_set_to_null (muxer, splitmux);
1607 _lock_and_set_to_null (sink, splitmux);
1609 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1610 GINT_TO_POINTER (2));
1613 gst_object_unref (muxer);
1614 gst_object_unref (sink);
1617 gst_object_ref (muxer);
1618 gst_object_ref (sink);
1622 gst_element_set_locked_state (muxer, TRUE);
1623 gst_element_set_locked_state (sink, TRUE);
1624 gst_element_set_state (sink, GST_STATE_NULL);
1626 if (splitmux->reset_muxer) {
1627 gst_element_set_state (muxer, GST_STATE_NULL);
1629 GstIterator *it = gst_element_iterate_sink_pads (muxer);
1632 ev = gst_event_new_flush_start ();
1633 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1634 gst_event_unref (ev);
1636 gst_iterator_resync (it);
1638 ev = gst_event_new_flush_stop (TRUE);
1639 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1640 gst_event_unref (ev);
1642 gst_iterator_free (it);
1646 GST_SPLITMUX_LOCK (splitmux);
1647 if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0)
1648 set_next_filename (splitmux, ctx);
1649 splitmux->muxed_out_bytes = 0;
1650 GST_SPLITMUX_UNLOCK (splitmux);
1652 gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
1653 gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
1654 gst_element_set_locked_state (muxer, FALSE);
1655 gst_element_set_locked_state (sink, FALSE);
1657 gst_object_unref (sink);
1658 gst_object_unref (muxer);
1660 GST_SPLITMUX_LOCK (splitmux);
1661 GST_STATE_UNLOCK (splitmux);
1662 splitmux->switching_fragment = FALSE;
1663 do_async_done (splitmux);
1665 splitmux->ready_for_output = TRUE;
1667 g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
1668 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
1670 send_fragment_opened_closed_msg (splitmux, TRUE, sink);
1672 /* FIXME: Is this always the correct next state? */
1673 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1674 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1678 GST_STATE_UNLOCK (splitmux);
1679 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1680 ("Could not create the new muxer/sink"), NULL);
1684 bus_handler (GstBin * bin, GstMessage * message)
1686 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
1688 switch (GST_MESSAGE_TYPE (message)) {
1689 case GST_MESSAGE_EOS:{
1690 /* If the state is draining out the current file, drop this EOS */
1693 sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
1694 GST_SPLITMUX_LOCK (splitmux);
1696 send_fragment_opened_closed_msg (splitmux, FALSE, sink);
1698 if (splitmux->async_finalize) {
1700 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1701 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1702 EOS_FROM_US)) == 2) {
1704 GstPad *sinksink, *muxersrc;
1706 sinksink = gst_element_get_static_pad (sink, "sink");
1707 muxersrc = gst_pad_get_peer (sinksink);
1708 muxer = gst_pad_get_parent_element (muxersrc);
1709 gst_object_unref (sinksink);
1710 gst_object_unref (muxersrc);
1712 gst_element_call_async (muxer,
1713 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1714 gst_object_ref (splitmux), gst_object_unref);
1715 gst_element_call_async (sink,
1716 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1717 gst_object_ref (splitmux), gst_object_unref);
1718 gst_object_unref (muxer);
1720 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1721 GINT_TO_POINTER (2));
1723 GST_DEBUG_OBJECT (splitmux,
1724 "Caught async EOS from previous muxer+sink. Dropping.");
1725 /* We forward the EOS so that it gets aggregated as normal. If the sink
1726 * finishes and is removed before the end, it will be de-aggregated */
1727 gst_message_unref (message);
1728 GST_SPLITMUX_UNLOCK (splitmux);
1731 } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
1732 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
1733 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1734 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1736 gst_message_unref (message);
1737 GST_SPLITMUX_UNLOCK (splitmux);
1740 GST_DEBUG_OBJECT (splitmux,
1741 "Passing EOS message. Output state %d max_out_running_time %"
1742 GST_STIME_FORMAT, splitmux->output_state,
1743 GST_STIME_ARGS (splitmux->max_out_running_time));
1745 GST_SPLITMUX_UNLOCK (splitmux);
1748 case GST_MESSAGE_ASYNC_START:
1749 case GST_MESSAGE_ASYNC_DONE:
1750 /* Ignore state changes from our children while switching */
1751 GST_SPLITMUX_LOCK (splitmux);
1752 if (splitmux->switching_fragment) {
1753 if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
1754 || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
1755 GST_LOG_OBJECT (splitmux,
1756 "Ignoring state change from child %" GST_PTR_FORMAT
1757 " while switching", GST_MESSAGE_SRC (message));
1758 gst_message_unref (message);
1759 GST_SPLITMUX_UNLOCK (splitmux);
1763 GST_SPLITMUX_UNLOCK (splitmux);
1765 case GST_MESSAGE_WARNING:
1767 GError *gerror = NULL;
1769 gst_message_parse_warning (message, &gerror, NULL);
1771 if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
1773 gboolean caps_change = FALSE;
1775 GST_SPLITMUX_LOCK (splitmux);
1777 for (item = splitmux->contexts; item; item = item->next) {
1778 MqStreamCtx *ctx = item->data;
1780 if (ctx->caps_change) {
1786 GST_SPLITMUX_UNLOCK (splitmux);
1789 GST_LOG_OBJECT (splitmux,
1790 "Ignoring warning change from child %" GST_PTR_FORMAT
1791 " while switching caps", GST_MESSAGE_SRC (message));
1792 gst_message_unref (message);
1802 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1806 ctx_set_unblock (MqStreamCtx * ctx)
1808 ctx->need_unblock = TRUE;
1812 need_new_fragment (GstSplitMuxSink * splitmux,
1813 GstClockTime queued_time, GstClockTime queued_gop_time,
1814 guint64 queued_bytes)
1816 guint64 thresh_bytes;
1817 GstClockTime thresh_time;
1818 gboolean check_robust_muxing;
1820 GST_OBJECT_LOCK (splitmux);
1821 thresh_bytes = splitmux->threshold_bytes;
1822 thresh_time = splitmux->threshold_time;
1823 check_robust_muxing = splitmux->use_robust_muxing
1824 && splitmux->muxer_has_reserved_props;
1825 GST_OBJECT_UNLOCK (splitmux);
1827 /* Have we muxed anything into the new file at all? */
1828 if (splitmux->fragment_total_bytes <= 0)
1831 /* User told us to split now */
1832 if (g_atomic_int_get (&(splitmux->split_now)) == TRUE)
1835 if (thresh_bytes > 0 && queued_bytes > thresh_bytes)
1836 return TRUE; /* Would overrun byte limit */
1838 if (thresh_time > 0 && queued_time > thresh_time)
1839 return TRUE; /* Would overrun byte limit */
1841 /* Timecode-based threshold accounts for possible rounding errors:
1842 * 5us should be bigger than all possible rounding errors but nowhere near
1843 * big enough to skip to another frame */
1844 if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
1845 splitmux->reference_ctx->in_running_time >
1846 splitmux->next_max_tc_time + 5 * GST_USECOND)
1847 return TRUE; /* Timecode threshold */
1849 if (check_robust_muxing) {
1850 GstClockTime mux_reserved_remain;
1852 g_object_get (splitmux->muxer,
1853 "reserved-duration-remaining", &mux_reserved_remain, NULL);
1855 GST_LOG_OBJECT (splitmux,
1856 "Muxer robust muxing report - %" G_GUINT64_FORMAT
1857 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
1858 mux_reserved_remain, queued_gop_time);
1860 if (queued_gop_time >= mux_reserved_remain) {
1861 GST_INFO_OBJECT (splitmux,
1862 "File is about to run out of header room - %" G_GUINT64_FORMAT
1863 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
1864 ". Switching to new file", mux_reserved_remain, queued_gop_time);
1869 /* Continue and mux this GOP */
1873 /* Called with splitmux lock held */
1874 /* Called when entering ProcessingCompleteGop state
1875 * Assess if mq contents overflowed the current file
1876 * -> If yes, need to switch to new file
1877 * -> if no, set max_out_running_time to let this GOP in and
1878 * go to COLLECTING_GOP_START state
1881 handle_gathered_gop (GstSplitMuxSink * splitmux)
1883 guint64 queued_bytes;
1884 GstClockTimeDiff queued_time = 0;
1885 GstClockTimeDiff queued_gop_time = 0;
1886 GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
1887 SplitMuxOutputCommand *cmd;
1889 /* Assess if the multiqueue contents overflowed the current file */
1890 /* When considering if a newly gathered GOP overflows
1891 * the time limit for the file, only consider the running time of the
1892 * reference stream. Other streams might have run ahead a little bit,
1893 * but extra pieces won't be released to the muxer beyond the reference
1894 * stream cut-off anyway - so it forms the limit. */
1895 queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
1896 queued_time = splitmux->reference_ctx->in_running_time;
1897 /* queued_gop_time tracks how much unwritten data there is waiting to
1898 * be written to this fragment including this GOP */
1899 if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
1901 splitmux->reference_ctx->in_running_time -
1902 splitmux->reference_ctx->out_running_time;
1905 splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
1907 GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
1909 g_assert (queued_gop_time >= 0);
1910 g_assert (queued_time >= splitmux->fragment_start_time);
1912 queued_time -= splitmux->fragment_start_time;
1913 if (queued_time < queued_gop_time)
1914 queued_gop_time = queued_time;
1916 /* Expand queued bytes estimate by muxer overhead */
1917 queued_bytes += (queued_bytes * splitmux->mux_overhead);
1919 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
1920 " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
1921 if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
1922 GST_LOG_OBJECT (splitmux,
1923 "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
1924 GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
1925 GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
1928 /* Check for overrun - have we output at least one byte and overrun
1929 * either threshold? */
1930 if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
1931 GstClockTime *sink_running_time = g_new (GstClockTime, 1);
1932 *sink_running_time = splitmux->reference_ctx->out_running_time;
1933 g_object_set_qdata_full (G_OBJECT (splitmux->sink),
1934 RUNNING_TIME, sink_running_time, g_free);
1935 g_atomic_int_set (&(splitmux->split_now), FALSE);
1936 /* Tell the output side to start a new fragment */
1937 GST_INFO_OBJECT (splitmux,
1938 "This GOP (dur %" GST_STIME_FORMAT
1939 ") would overflow the fragment, Sending start_new_fragment cmd",
1940 GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
1941 splitmux->gop_start_time));
1942 cmd = out_cmd_buf_new ();
1943 cmd->start_new_fragment = TRUE;
1944 g_queue_push_head (&splitmux->out_cmd_q, cmd);
1945 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1947 new_out_ts = splitmux->reference_ctx->in_running_time;
1948 splitmux->fragment_start_time = splitmux->gop_start_time;
1949 splitmux->fragment_total_bytes = 0;
1951 if (request_next_keyframe (splitmux,
1952 splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
1953 GST_WARNING_OBJECT (splitmux,
1954 "Could not request a keyframe. Files may not split at the exact location they should");
1956 gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
1959 /* And set up to collect the next GOP */
1960 if (!splitmux->reference_ctx->in_eos) {
1961 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
1962 splitmux->gop_start_time = new_out_ts;
1964 /* This is probably already the current state, but just in case: */
1965 splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
1966 new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */
1969 /* And wake all input contexts to send a wake-up event */
1970 g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
1971 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1973 /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
1974 splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
1976 if (splitmux->gop_total_bytes > 0) {
1977 GST_LOG_OBJECT (splitmux,
1978 "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
1979 " time %" GST_STIME_FORMAT,
1980 splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
1982 /* Send this GOP to the output command queue */
1983 cmd = out_cmd_buf_new ();
1984 cmd->start_new_fragment = FALSE;
1985 cmd->max_output_ts = new_out_ts;
1986 GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
1987 GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
1988 g_queue_push_head (&splitmux->out_cmd_q, cmd);
1990 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1993 splitmux->gop_total_bytes = 0;
1996 /* Called with splitmux lock held */
1997 /* Called from each input pad when it is has all the pieces
1998 * for a GOP or EOS, starting with the reference pad which has set the
1999 * splitmux->max_in_running_time
2002 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2007 /* On ENDING_FILE, the reference stream sends a command to start a new
2008 * fragment, then releases the GOP for output in the new fragment.
2009 * If somes streams received no buffer during the last GOP that overran,
2010 * because its next buffer has a timestamp bigger than
2011 * ctx->max_in_running_time, its queue is empty. In that case the only
2012 * way to wakeup the output thread is by injecting an event in the
2013 * queue. This usually happen with subtitle streams.
2014 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2015 if (ctx->need_unblock) {
2016 GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2017 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2018 GST_EVENT_TYPE_SERIALIZED,
2019 gst_structure_new ("splitmuxsink-unblock", "timestamp",
2020 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2022 GST_SPLITMUX_UNLOCK (splitmux);
2023 gst_pad_send_event (ctx->sinkpad, event);
2024 GST_SPLITMUX_LOCK (splitmux);
2026 ctx->need_unblock = FALSE;
2027 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2028 /* state may have changed while we were unlocked. Loop again if so */
2029 if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2033 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2034 gboolean ready = TRUE;
2036 /* Iterate each pad, and check that the input running time is at least
2037 * up to the reference running time, and if so handle the collected GOP */
2038 GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
2039 GST_STIME_FORMAT " ctx %p",
2040 GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
2041 for (cur = g_list_first (splitmux->contexts); cur != NULL;
2042 cur = g_list_next (cur)) {
2043 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2045 GST_LOG_OBJECT (splitmux,
2046 "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
2047 " EOS %d", tmpctx, tmpctx->srcpad,
2048 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2050 if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
2051 tmpctx->in_running_time < splitmux->max_in_running_time &&
2053 GST_LOG_OBJECT (splitmux,
2054 "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
2055 tmpctx, tmpctx->srcpad);
2061 GST_DEBUG_OBJECT (splitmux,
2062 "Collected GOP is complete. Processing (ctx %p)", ctx);
2063 /* All pads have a complete GOP, release it into the multiqueue */
2064 handle_gathered_gop (splitmux);
2068 /* If upstream reached EOS we are not expecting more data, no need to wait
2073 /* Some pad is not yet ready, or GOP is being pushed
2074 * either way, sleep and wait to get woken */
2075 while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2077 (ctx->in_running_time >= splitmux->max_in_running_time) &&
2078 (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
2080 GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2081 GST_SPLITMUX_WAIT_INPUT (splitmux);
2082 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2086 static GstPadProbeReturn
2087 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2089 GstSplitMuxSink *splitmux = ctx->splitmux;
2091 MqStreamBuf *buf_info = NULL;
2093 gboolean loop_again;
2094 gboolean keyframe = FALSE;
2096 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2098 /* FIXME: Handle buffer lists, until then make it clear they won't work */
2099 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2100 g_warning ("Buffer list handling not implemented");
2101 return GST_PAD_PROBE_DROP;
2103 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2104 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2105 GstEvent *event = gst_pad_probe_info_get_event (info);
2107 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2109 switch (GST_EVENT_TYPE (event)) {
2110 case GST_EVENT_SEGMENT:
2111 gst_event_copy_segment (event, &ctx->in_segment);
2113 case GST_EVENT_FLUSH_STOP:
2114 GST_SPLITMUX_LOCK (splitmux);
2115 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2116 ctx->in_eos = FALSE;
2117 ctx->in_running_time = GST_CLOCK_STIME_NONE;
2118 GST_SPLITMUX_UNLOCK (splitmux);
2121 GST_SPLITMUX_LOCK (splitmux);
2124 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2127 if (ctx->is_reference) {
2128 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2129 /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2130 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2131 /* Wake up other input pads to collect this GOP */
2132 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2133 check_completed_gop (splitmux, ctx);
2134 } else if (splitmux->input_state ==
2135 SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2136 /* If we are waiting for a GOP to be completed (ie, for aux
2137 * pads to catch up), then this pad is complete, so check
2138 * if the whole GOP is.
2140 check_completed_gop (splitmux, ctx);
2142 GST_SPLITMUX_UNLOCK (splitmux);
2144 case GST_EVENT_GAP:{
2145 GstClockTime gap_ts;
2146 GstClockTimeDiff rtime;
2148 gst_event_parse_gap (event, &gap_ts, NULL);
2149 if (gap_ts == GST_CLOCK_TIME_NONE)
2152 GST_SPLITMUX_LOCK (splitmux);
2154 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2156 rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2158 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2159 GST_STIME_ARGS (rtime));
2161 if (ctx->is_reference
2162 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2163 splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
2164 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2165 GST_STIME_ARGS (splitmux->fragment_start_time));
2166 /* Also take this as the first start time when starting up,
2167 * so that we start counting overflow from the first frame */
2168 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2169 splitmux->max_in_running_time = splitmux->fragment_start_time;
2172 GST_SPLITMUX_UNLOCK (splitmux);
2178 return GST_PAD_PROBE_PASS;
2179 } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2180 switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2181 case GST_QUERY_ALLOCATION:
2182 return GST_PAD_PROBE_DROP;
2184 return GST_PAD_PROBE_PASS;
2188 buf = gst_pad_probe_info_get_buffer (info);
2189 buf_info = mq_stream_buf_new ();
2191 if (GST_BUFFER_PTS_IS_VALID (buf))
2192 ts = GST_BUFFER_PTS (buf);
2194 ts = GST_BUFFER_DTS (buf);
2196 GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2198 GST_SPLITMUX_LOCK (splitmux);
2200 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2203 /* If this buffer has a timestamp, advance the input timestamp of the
2205 if (GST_CLOCK_TIME_IS_VALID (ts)) {
2206 GstClockTimeDiff running_time =
2207 my_segment_to_running_time (&ctx->in_segment, ts);
2209 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2210 GST_STIME_ARGS (running_time));
2212 if (GST_CLOCK_STIME_IS_VALID (running_time)
2213 && running_time > ctx->in_running_time)
2214 ctx->in_running_time = running_time;
2217 /* Try to make sure we have a valid running time */
2218 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2219 ctx->in_running_time =
2220 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2223 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2224 GST_STIME_ARGS (ctx->in_running_time));
2226 buf_info->run_ts = ctx->in_running_time;
2227 buf_info->buf_size = gst_buffer_get_size (buf);
2228 buf_info->duration = GST_BUFFER_DURATION (buf);
2230 /* initialize fragment_start_time */
2231 if (ctx->is_reference
2232 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2233 splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
2234 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2235 GST_STIME_ARGS (splitmux->fragment_start_time));
2236 gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2238 /* Also take this as the first start time when starting up,
2239 * so that we start counting overflow from the first frame */
2240 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2241 splitmux->max_in_running_time = splitmux->fragment_start_time;
2242 if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
2243 GST_WARNING_OBJECT (splitmux,
2244 "Could not request a keyframe. Files may not split at the exact location they should");
2246 gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2249 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
2250 " total GOP bytes %" G_GUINT64_FORMAT,
2251 GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
2258 switch (splitmux->input_state) {
2259 case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
2260 if (ctx->is_reference) {
2261 /* This is the reference context. If it's a keyframe,
2262 * it marks the start of a new GOP and we should wait in
2263 * check_completed_gop before continuing, but either way
2264 * (keyframe or no, we'll pass this buffer through after
2265 * so set loop_again to FALSE */
2268 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
2269 /* Allow other input pads to catch up to here too */
2270 splitmux->max_in_running_time = ctx->in_running_time;
2271 GST_LOG_OBJECT (splitmux,
2272 "Max in running time now %" GST_TIME_FORMAT,
2273 GST_TIME_ARGS (splitmux->max_in_running_time));
2274 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2277 GST_INFO_OBJECT (pad,
2278 "Have keyframe with running time %" GST_STIME_FORMAT,
2279 GST_STIME_ARGS (ctx->in_running_time));
2281 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2282 splitmux->max_in_running_time = ctx->in_running_time;
2283 GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
2284 GST_TIME_ARGS (splitmux->max_in_running_time));
2285 /* Wake up other input pads to collect this GOP */
2286 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2287 check_completed_gop (splitmux, ctx);
2288 /* Store this new keyframe to remember the start of GOP */
2289 gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2291 /* Pass this buffer if the reference ctx is far enough ahead */
2292 if (ctx->in_running_time < splitmux->max_in_running_time) {
2297 /* We're still waiting for a keyframe on the reference pad, sleep */
2298 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
2299 GST_SPLITMUX_WAIT_INPUT (splitmux);
2300 GST_LOG_OBJECT (pad,
2301 "Done sleeping for GOP start input state now %d",
2302 splitmux->input_state);
2305 case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
2306 /* We're collecting a GOP. If this is the reference context,
2307 * we need to check if this is a keyframe that marks the start
2308 * of the next GOP. If it is, it marks the end of the GOP we're
2309 * collecting, so sleep and wait until all the other pads also
2310 * reach that timestamp - at which point, we have an entire GOP
2311 * and either go to ENDING_FILE or release this GOP to the muxer and
2312 * go back to COLLECT_GOP_START. */
2314 /* If we overran the target timestamp, it might be time to process
2315 * the GOP, otherwise bail out for more data
2317 GST_LOG_OBJECT (pad,
2318 "Checking TS %" GST_STIME_FORMAT " against max %"
2319 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
2320 GST_STIME_ARGS (splitmux->max_in_running_time));
2322 if (ctx->in_running_time < splitmux->max_in_running_time) {
2327 GST_LOG_OBJECT (pad,
2328 "Collected last packet of GOP. Checking other pads");
2329 check_completed_gop (splitmux, ctx);
2332 case SPLITMUX_INPUT_STATE_FINISHING_UP:
2343 splitmux->queued_keyframes++;
2344 buf_info->keyframe = TRUE;
2347 /* Update total input byte counter for overflow detect */
2348 splitmux->gop_total_bytes += buf_info->buf_size;
2350 /* Now add this buffer to the queue just before returning */
2351 g_queue_push_head (&ctx->queued_bufs, buf_info);
2353 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
2354 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
2356 GST_SPLITMUX_UNLOCK (splitmux);
2357 return GST_PAD_PROBE_PASS;
2360 GST_SPLITMUX_UNLOCK (splitmux);
2362 mq_stream_buf_free (buf_info);
2363 return GST_PAD_PROBE_PASS;
2367 grow_blocked_queues (GstSplitMuxSink * splitmux)
2371 /* Scan other queues for full-ness and grow them */
2372 for (cur = g_list_first (splitmux->contexts);
2373 cur != NULL; cur = g_list_next (cur)) {
2374 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2376 guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
2378 g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
2379 GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
2381 if (cur_len >= cur_limit) {
2382 cur_limit = cur_len + 1;
2383 GST_DEBUG_OBJECT (tmpctx->q,
2384 "Queue overflowed and needs enlarging. Growing to %u buffers",
2386 g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
2392 handle_q_underrun (GstElement * q, gpointer user_data)
2394 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2395 GstSplitMuxSink *splitmux = ctx->splitmux;
2397 GST_SPLITMUX_LOCK (splitmux);
2398 GST_DEBUG_OBJECT (q,
2399 "Queue reported underrun with %d keyframes and %d cmds enqueued",
2400 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2401 grow_blocked_queues (splitmux);
2402 GST_SPLITMUX_UNLOCK (splitmux);
2406 handle_q_overrun (GstElement * q, gpointer user_data)
2408 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2409 GstSplitMuxSink *splitmux = ctx->splitmux;
2410 gboolean allow_grow = FALSE;
2412 GST_SPLITMUX_LOCK (splitmux);
2413 GST_DEBUG_OBJECT (q,
2414 "Queue reported overrun with %d keyframes and %d cmds enqueued",
2415 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2417 if (splitmux->queued_keyframes < 2) {
2418 /* Less than a full GOP queued, grow the queue */
2420 } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
2423 /* If another queue is starved, grow */
2425 for (cur = g_list_first (splitmux->contexts);
2426 cur != NULL; cur = g_list_next (cur)) {
2427 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2428 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
2433 GST_SPLITMUX_UNLOCK (splitmux);
2438 g_object_get (q, "max-size-buffers", &cur_limit, NULL);
2441 GST_DEBUG_OBJECT (q,
2442 "Queue overflowed and needs enlarging. Growing to %u buffers",
2445 g_object_set (q, "max-size-buffers", cur_limit, NULL);
2450 gst_splitmux_sink_request_new_pad (GstElement * element,
2451 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
2453 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2454 GstPadTemplate *mux_template = NULL;
2457 GstPad *q_sink = NULL, *q_src = NULL;
2459 gboolean is_video = FALSE;
2462 GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
2464 GST_SPLITMUX_LOCK (splitmux);
2465 if (!create_muxer (splitmux))
2467 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2469 if (templ->name_template) {
2470 if (g_str_equal (templ->name_template, "video")) {
2471 if (splitmux->have_video)
2472 goto already_have_video;
2474 /* FIXME: Look for a pad template with matching caps, rather than by name */
2476 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2477 (splitmux->muxer), "video_%u");
2479 /* Fallback to find sink pad templates named 'video' (flvmux) */
2480 if (!mux_template) {
2482 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2483 (splitmux->muxer), "video");
2489 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2490 (splitmux->muxer), templ->name_template);
2492 /* Fallback to find sink pad templates named 'audio' (flvmux) */
2493 if (!mux_template) {
2495 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2496 (splitmux->muxer), "audio");
2500 if (mux_template == NULL) {
2501 /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
2503 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2504 (splitmux->muxer), "sink_%d");
2509 res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
2514 gname = g_strdup ("video");
2515 else if (name == NULL)
2516 gname = gst_pad_get_name (res);
2518 gname = g_strdup (name);
2520 if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
2523 gst_element_set_state (q, GST_STATE_TARGET (splitmux));
2525 g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
2526 "max-size-buffers", 5, NULL);
2528 q_sink = gst_element_get_static_pad (q, "sink");
2529 q_src = gst_element_get_static_pad (q, "src");
2531 if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
2532 gst_element_release_request_pad (splitmux->muxer, res);
2533 gst_object_unref (GST_OBJECT (res));
2537 gst_object_unref (GST_OBJECT (res));
2539 ctx = mq_stream_ctx_new (splitmux);
2540 /* Context holds a ref: */
2541 ctx->q = gst_object_ref (q);
2542 ctx->srcpad = q_src;
2543 ctx->sinkpad = q_sink;
2545 g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
2546 g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
2548 ctx->src_pad_block_id =
2549 gst_pad_add_probe (q_src,
2550 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
2551 (GstPadProbeCallback) handle_mq_output, ctx, NULL);
2552 if (is_video && splitmux->reference_ctx != NULL) {
2553 splitmux->reference_ctx->is_reference = FALSE;
2554 splitmux->reference_ctx = NULL;
2556 if (splitmux->reference_ctx == NULL) {
2557 splitmux->reference_ctx = ctx;
2558 ctx->is_reference = TRUE;
2561 res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
2562 g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
2564 ctx->sink_pad_block_id =
2565 gst_pad_add_probe (q_sink,
2566 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
2567 GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2568 (GstPadProbeCallback) handle_mq_input, ctx, NULL);
2570 GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
2571 " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
2573 splitmux->contexts = g_list_append (splitmux->contexts, ctx);
2578 splitmux->have_video = TRUE;
2580 gst_pad_set_active (res, TRUE);
2581 gst_element_add_pad (element, res);
2583 GST_SPLITMUX_UNLOCK (splitmux);
2587 GST_SPLITMUX_UNLOCK (splitmux);
2590 gst_object_unref (q_sink);
2592 gst_object_unref (q_src);
2595 GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
2596 GST_SPLITMUX_UNLOCK (splitmux);
2601 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
2603 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2604 GstPad *muxpad = NULL;
2606 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
2608 GST_SPLITMUX_LOCK (splitmux);
2610 if (splitmux->muxer == NULL)
2611 goto fail; /* Elements don't exist yet - nothing to release */
2613 GST_INFO_OBJECT (pad, "releasing request pad");
2615 muxpad = gst_pad_get_peer (ctx->srcpad);
2617 /* Remove the context from our consideration */
2618 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
2620 if (ctx->sink_pad_block_id)
2621 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
2623 if (ctx->src_pad_block_id)
2624 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
2626 /* Can release the context now */
2627 mq_stream_ctx_free (ctx);
2628 if (ctx == splitmux->reference_ctx)
2629 splitmux->reference_ctx = NULL;
2631 /* Release and free the muxer input */
2633 gst_element_release_request_pad (splitmux->muxer, muxpad);
2634 gst_object_unref (muxpad);
2637 if (GST_PAD_PAD_TEMPLATE (pad) &&
2638 g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
2640 splitmux->have_video = FALSE;
2642 gst_element_remove_pad (element, pad);
2644 /* Reset the internal elements only after all request pads are released */
2645 if (splitmux->contexts == NULL)
2646 gst_splitmux_reset (splitmux);
2649 GST_SPLITMUX_UNLOCK (splitmux);
2653 create_element (GstSplitMuxSink * splitmux,
2654 const gchar * factory, const gchar * name, gboolean locked)
2656 GstElement *ret = gst_element_factory_make (factory, name);
2658 g_warning ("Failed to create %s - splitmuxsink will not work", name);
2663 /* Ensure the sink starts in locked state and NULL - it will be changed
2664 * by the filename setting code */
2665 gst_element_set_locked_state (ret, TRUE);
2666 gst_element_set_state (ret, GST_STATE_NULL);
2669 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
2670 g_warning ("Could not add %s element - splitmuxsink will not work", name);
2671 gst_object_unref (ret);
2679 create_muxer (GstSplitMuxSink * splitmux)
2681 /* Create internal elements */
2682 if (splitmux->muxer == NULL) {
2683 GstElement *provided_muxer = NULL;
2685 GST_OBJECT_LOCK (splitmux);
2686 if (splitmux->provided_muxer != NULL)
2687 provided_muxer = gst_object_ref (splitmux->provided_muxer);
2688 GST_OBJECT_UNLOCK (splitmux);
2690 if ((!splitmux->async_finalize && provided_muxer == NULL) ||
2691 (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
2692 if ((splitmux->muxer =
2693 create_element (splitmux, DEFAULT_MUXER, "muxer", FALSE)) == NULL)
2695 } else if (splitmux->async_finalize) {
2696 if ((splitmux->muxer =
2697 create_element (splitmux, splitmux->muxer_factory, "muxer",
2700 if (splitmux->muxer_properties)
2701 gst_structure_foreach (splitmux->muxer_properties,
2702 _set_property_from_structure, splitmux->muxer);
2704 /* Ensure it's not in locked state (we might be reusing an old element) */
2705 gst_element_set_locked_state (provided_muxer, FALSE);
2706 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
2707 g_warning ("Could not add muxer element - splitmuxsink will not work");
2708 gst_object_unref (provided_muxer);
2712 splitmux->muxer = provided_muxer;
2713 gst_object_unref (provided_muxer);
2716 if (splitmux->use_robust_muxing) {
2717 update_muxer_properties (splitmux);
2727 find_sink (GstElement * e)
2729 GstElement *res = NULL;
2731 gboolean done = FALSE;
2732 GValue data = { 0, };
2734 if (!GST_IS_BIN (e))
2737 if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
2740 iter = gst_bin_iterate_sinks (GST_BIN (e));
2742 switch (gst_iterator_next (iter, &data)) {
2743 case GST_ITERATOR_OK:
2745 GstElement *child = g_value_get_object (&data);
2746 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
2747 "location") != NULL) {
2751 g_value_reset (&data);
2754 case GST_ITERATOR_RESYNC:
2755 gst_iterator_resync (iter);
2757 case GST_ITERATOR_DONE:
2760 case GST_ITERATOR_ERROR:
2761 g_assert_not_reached ();
2765 g_value_unset (&data);
2766 gst_iterator_free (iter);
2772 create_sink (GstSplitMuxSink * splitmux)
2774 GstElement *provided_sink = NULL;
2776 if (splitmux->active_sink == NULL) {
2778 GST_OBJECT_LOCK (splitmux);
2779 if (splitmux->provided_sink != NULL)
2780 provided_sink = gst_object_ref (splitmux->provided_sink);
2781 GST_OBJECT_UNLOCK (splitmux);
2783 if ((!splitmux->async_finalize && provided_sink == NULL) ||
2784 (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
2785 if ((splitmux->sink =
2786 create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
2788 splitmux->active_sink = splitmux->sink;
2789 } else if (splitmux->async_finalize) {
2790 if ((splitmux->sink =
2791 create_element (splitmux, splitmux->sink_factory, "sink",
2794 if (splitmux->sink_properties)
2795 gst_structure_foreach (splitmux->sink_properties,
2796 _set_property_from_structure, splitmux->sink);
2797 splitmux->active_sink = splitmux->sink;
2799 /* Ensure the sink starts in locked state and NULL - it will be changed
2800 * by the filename setting code */
2801 gst_element_set_locked_state (provided_sink, TRUE);
2802 gst_element_set_state (provided_sink, GST_STATE_NULL);
2803 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
2804 g_warning ("Could not add sink elements - splitmuxsink will not work");
2805 gst_object_unref (provided_sink);
2809 splitmux->active_sink = provided_sink;
2811 /* The bin holds a ref now, we can drop our tmp ref */
2812 gst_object_unref (provided_sink);
2814 /* Find the sink element */
2815 splitmux->sink = find_sink (splitmux->active_sink);
2816 if (splitmux->sink == NULL) {
2818 ("Could not locate sink element in provided sink - splitmuxsink will not work");
2824 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2826 /* async child elements are causing state change races and weird
2827 * failures, so let's try and turn that off */
2828 g_object_set (splitmux->sink, "async", FALSE, NULL);
2832 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
2833 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
2844 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
2847 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2849 gchar *fname = NULL;
2853 gst_splitmux_sink_ensure_max_files (splitmux);
2855 if (ctx->cur_out_buffer == NULL) {
2856 GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
2859 caps = gst_pad_get_current_caps (ctx->srcpad);
2860 sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
2861 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
2862 splitmux->fragment_id, sample, &fname);
2863 gst_sample_unref (sample);
2865 gst_caps_unref (caps);
2867 if (fname == NULL) {
2868 /* Fallback to the old signal if the new one returned nothing */
2869 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
2870 splitmux->fragment_id, &fname);
2874 fname = splitmux->location ?
2875 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
2878 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
2879 g_object_set (splitmux->sink, "location", fname, NULL);
2882 splitmux->fragment_id++;
2887 do_async_start (GstSplitMuxSink * splitmux)
2889 GstMessage *message;
2891 if (!splitmux->need_async_start) {
2892 GST_INFO_OBJECT (splitmux, "no async_start needed");
2896 splitmux->async_pending = TRUE;
2898 GST_INFO_OBJECT (splitmux, "Sending async_start message");
2899 message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
2900 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2901 (splitmux), message);
2905 do_async_done (GstSplitMuxSink * splitmux)
2907 GstMessage *message;
2909 if (splitmux->async_pending) {
2910 GST_INFO_OBJECT (splitmux, "Sending async_done message");
2912 gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
2913 GST_CLOCK_TIME_NONE);
2914 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2915 (splitmux), message);
2917 splitmux->async_pending = FALSE;
2920 splitmux->need_async_start = FALSE;
2923 static GstStateChangeReturn
2924 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
2926 GstStateChangeReturn ret;
2927 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2929 switch (transition) {
2930 case GST_STATE_CHANGE_NULL_TO_READY:{
2931 GST_SPLITMUX_LOCK (splitmux);
2932 if (!create_muxer (splitmux) || !create_sink (splitmux)) {
2933 ret = GST_STATE_CHANGE_FAILURE;
2934 GST_SPLITMUX_UNLOCK (splitmux);
2937 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2938 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
2939 GST_SPLITMUX_UNLOCK (splitmux);
2940 splitmux->fragment_id = 0;
2943 case GST_STATE_CHANGE_READY_TO_PAUSED:{
2944 GST_SPLITMUX_LOCK (splitmux);
2945 /* Start by collecting one input on each pad */
2946 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2947 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2948 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
2949 splitmux->gop_start_time = splitmux->fragment_start_time =
2950 GST_CLOCK_STIME_NONE;
2951 splitmux->muxed_out_bytes = 0;
2952 splitmux->ready_for_output = FALSE;
2953 GST_SPLITMUX_UNLOCK (splitmux);
2956 case GST_STATE_CHANGE_PAUSED_TO_READY:
2957 g_atomic_int_set (&(splitmux->split_now), FALSE);
2958 case GST_STATE_CHANGE_READY_TO_NULL:
2959 GST_SPLITMUX_LOCK (splitmux);
2960 splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
2961 splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
2962 /* Wake up any blocked threads */
2963 GST_LOG_OBJECT (splitmux,
2964 "State change -> NULL or READY. Waking threads");
2965 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2966 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2967 GST_SPLITMUX_UNLOCK (splitmux);
2973 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2974 if (ret == GST_STATE_CHANGE_FAILURE)
2977 switch (transition) {
2978 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2979 splitmux->need_async_start = TRUE;
2981 case GST_STATE_CHANGE_READY_TO_PAUSED:{
2982 /* Change state async, because our child sink might not
2983 * be ready to do that for us yet if it's state is still locked */
2985 splitmux->need_async_start = TRUE;
2986 /* we want to go async to PAUSED until we managed to configure and add the
2988 GST_SPLITMUX_LOCK (splitmux);
2989 do_async_start (splitmux);
2990 GST_SPLITMUX_UNLOCK (splitmux);
2991 ret = GST_STATE_CHANGE_ASYNC;
2994 case GST_STATE_CHANGE_READY_TO_NULL:
2995 GST_SPLITMUX_LOCK (splitmux);
2996 splitmux->fragment_id = 0;
2997 /* Reset internal elements only if no pad contexts are using them */
2998 if (splitmux->contexts == NULL)
2999 gst_splitmux_reset (splitmux);
3000 do_async_done (splitmux);
3001 GST_SPLITMUX_UNLOCK (splitmux);
3008 if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
3009 ret == GST_STATE_CHANGE_FAILURE) {
3010 /* Cleanup elements on failed transition out of NULL */
3011 gst_splitmux_reset (splitmux);
3012 GST_SPLITMUX_LOCK (splitmux);
3013 do_async_done (splitmux);
3014 GST_SPLITMUX_UNLOCK (splitmux);
3020 register_splitmuxsink (GstPlugin * plugin)
3022 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
3023 "Split File Muxing Sink");
3025 return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
3026 GST_TYPE_SPLITMUX_SINK);
3030 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
3032 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
3033 splitmux->fragment_id = 0;
3038 split_now (GstSplitMuxSink * splitmux)
3040 g_atomic_int_set (&(splitmux->split_now), TRUE);