1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @short_description: Muxer wrapper for splitting output stream by size or time
24 * This element wraps a muxer and a sink, and starts a new file when the mux
25 * contents are about to cross a threshold of maximum size of maximum time,
26 * splitting at video keyframe boundaries. Exactly one input video stream
27 * can be muxed, with as many accompanying audio and subtitle streams as
30 * By default, it uses mp4mux and filesink, but they can be changed via
31 * the 'muxer' and 'sink' properties.
33 * The minimum file size is 1 GOP, however - so limits may be overrun if the
34 * distance between any 2 keyframes is larger than the limits.
36 * If a video stream is available, the splitting process is driven by the video
37 * stream contents, and the video stream must contain closed GOPs for the output
38 * file parts to be played individually correctly. In the absence of a video
39 * stream, the first available stream is used as reference for synchronization.
41 * In the async-finalize mode, when the threshold is crossed, the old muxer
42 * and sink is disconnected from the pipeline and left to finish the file
43 * asynchronously, and a new muxer and sink is created to continue with the
44 * next fragment. For that reason, instead of muxer and sink objects, the
45 * muxer-factory and sink-factory properties are used to construct the new
46 * objects, together with muxer-properties and sink-properties.
49 * <title>Example pipelines</title>
51 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
53 * Records a video stream captured from a v4l2 device and muxes it into
54 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55 * and 1MB maximum size.
58 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
60 * Records a video stream captured from a v4l2 device and muxer it into
61 * streamable Matroska files, splitting as needed to limit size/duration to 10
62 * seconds. Each file will finalize asynchronously.
71 #include <glib/gstdio.h>
72 #include <gst/video/video.h>
73 #include "gstsplitmuxsink.h"
75 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
76 #define GST_CAT_DEFAULT splitmux_debug
78 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
79 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
80 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
81 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
83 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
84 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
86 static void split_now (GstSplitMuxSink * splitmux);
87 static void split_after (GstSplitMuxSink * splitmux);
88 static void split_at_running_time (GstSplitMuxSink * splitmux,
89 GstClockTime split_time);
97 PROP_MAX_SIZE_TIMECODE,
98 PROP_SEND_KEYFRAME_REQUESTS,
101 PROP_USE_ROBUST_MUXING,
102 PROP_ALIGNMENT_THRESHOLD,
108 PROP_MUXER_PROPERTIES,
113 #define DEFAULT_MAX_SIZE_TIME 0
114 #define DEFAULT_MAX_SIZE_BYTES 0
115 #define DEFAULT_MAX_FILES 0
116 #define DEFAULT_MUXER_OVERHEAD 0.02
117 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
118 #define DEFAULT_ALIGNMENT_THRESHOLD 0
119 #define DEFAULT_MUXER "mp4mux"
120 #define DEFAULT_SINK "filesink"
121 #define DEFAULT_USE_ROBUST_MUXING FALSE
122 #define DEFAULT_RESET_MUXER TRUE
123 #define DEFAULT_ASYNC_FINALIZE FALSE
125 typedef struct _AsyncEosHelper
133 SIGNAL_FORMAT_LOCATION,
134 SIGNAL_FORMAT_LOCATION_FULL,
137 SIGNAL_SPLIT_AT_RUNNING_TIME,
143 static guint signals[SIGNAL_LAST];
145 static GstStaticPadTemplate video_sink_template =
146 GST_STATIC_PAD_TEMPLATE ("video",
149 GST_STATIC_CAPS_ANY);
150 static GstStaticPadTemplate audio_sink_template =
151 GST_STATIC_PAD_TEMPLATE ("audio_%u",
154 GST_STATIC_CAPS_ANY);
155 static GstStaticPadTemplate subtitle_sink_template =
156 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
159 GST_STATIC_CAPS_ANY);
160 static GstStaticPadTemplate caption_sink_template =
161 GST_STATIC_PAD_TEMPLATE ("caption_%u",
164 GST_STATIC_CAPS_ANY);
166 static GQuark PAD_CONTEXT;
167 static GQuark EOS_FROM_US;
168 static GQuark RUNNING_TIME;
169 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
170 * to forward an incoming EOS message, but we cannot rely on the state of the
171 * splitmux anymore, so we set this qdata on the sink instead.
172 * The muxer and sink must be destroyed after both of these things have
174 * 1) The EOS message has been sent when the fragment is ending
175 * 2) The muxer has been unlinked and relinked
176 * Therefore, EOS_FROM_US can have these two values:
177 * 0: EOS was not requested from us. Forward the message. The muxer and the
178 * sink will be destroyed together with the rest of the bin.
179 * 1: EOS was requested from us, but the other of the two tasks hasn't
180 * finished. Set EOS_FROM_US to 2 and do your stuff.
181 * 2: EOS was requested from us and the other of the two tasks has finished.
182 * Now we can destroy the muxer and the sink.
188 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
189 EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
190 RUNNING_TIME = g_quark_from_static_string ("running-time");
193 #define gst_splitmux_sink_parent_class parent_class
194 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
197 static gboolean create_muxer (GstSplitMuxSink * splitmux);
198 static gboolean create_sink (GstSplitMuxSink * splitmux);
199 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
200 const GValue * value, GParamSpec * pspec);
201 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
202 GValue * value, GParamSpec * pspec);
203 static void gst_splitmux_sink_dispose (GObject * object);
204 static void gst_splitmux_sink_finalize (GObject * object);
206 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
207 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
208 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
210 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
211 element, GstStateChange transition);
213 static void bus_handler (GstBin * bin, GstMessage * msg);
214 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
215 static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
216 static void mq_stream_ctx_free (MqStreamCtx * ctx);
217 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
219 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
220 static GstElement *create_element (GstSplitMuxSink * splitmux,
221 const gchar * factory, const gchar * name, gboolean locked);
223 static void do_async_done (GstSplitMuxSink * splitmux);
226 mq_stream_buf_new (void)
228 return g_slice_new0 (MqStreamBuf);
232 mq_stream_buf_free (MqStreamBuf * data)
234 g_slice_free (MqStreamBuf, data);
237 static SplitMuxOutputCommand *
238 out_cmd_buf_new (void)
240 return g_slice_new0 (SplitMuxOutputCommand);
244 out_cmd_buf_free (SplitMuxOutputCommand * data)
246 g_slice_free (SplitMuxOutputCommand, data);
250 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
252 GObjectClass *gobject_class = (GObjectClass *) klass;
253 GstElementClass *gstelement_class = (GstElementClass *) klass;
254 GstBinClass *gstbin_class = (GstBinClass *) klass;
256 gobject_class->set_property = gst_splitmux_sink_set_property;
257 gobject_class->get_property = gst_splitmux_sink_get_property;
258 gobject_class->dispose = gst_splitmux_sink_dispose;
259 gobject_class->finalize = gst_splitmux_sink_finalize;
261 gst_element_class_set_static_metadata (gstelement_class,
262 "Split Muxing Bin", "Generic/Bin/Muxer",
263 "Convenience bin that muxes incoming streams into multiple time/size limited files",
264 "Jan Schmidt <jan@centricular.com>");
266 gst_element_class_add_static_pad_template (gstelement_class,
267 &video_sink_template);
268 gst_element_class_add_static_pad_template (gstelement_class,
269 &audio_sink_template);
270 gst_element_class_add_static_pad_template (gstelement_class,
271 &subtitle_sink_template);
272 gst_element_class_add_static_pad_template (gstelement_class,
273 &caption_sink_template);
275 gstelement_class->change_state =
276 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
277 gstelement_class->request_new_pad =
278 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
279 gstelement_class->release_pad =
280 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
282 gstbin_class->handle_message = bus_handler;
284 g_object_class_install_property (gobject_class, PROP_LOCATION,
285 g_param_spec_string ("location", "File Output Pattern",
286 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
287 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
288 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
289 g_param_spec_double ("mux-overhead", "Muxing Overhead",
290 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
291 DEFAULT_MUXER_OVERHEAD,
292 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
294 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
295 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
296 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
297 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
298 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
299 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
300 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
301 DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
302 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
303 g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
304 "Maximum difference in timecode between first and last frame. "
305 "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
306 "Will only be effective if a timecode track is present.",
307 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
308 g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
309 g_param_spec_boolean ("send-keyframe-requests",
310 "Request keyframes at max-size-time",
311 "Request a keyframe every max-size-time ns to try splitting at that point. "
312 "Needs max-size-bytes to be 0 in order to be effective.",
313 DEFAULT_SEND_KEYFRAME_REQUESTS,
314 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
315 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
316 g_param_spec_uint ("max-files", "Max files",
317 "Maximum number of files to keep on disk. Once the maximum is reached,"
318 "old files start to be deleted to make room for new ones.", 0,
319 G_MAXUINT, DEFAULT_MAX_FILES,
320 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
321 g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
322 g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
323 "Allow non-reference streams to be that many ns before the reference"
325 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
326 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328 g_object_class_install_property (gobject_class, PROP_MUXER,
329 g_param_spec_object ("muxer", "Muxer",
330 "The muxer element to use (NULL = default mp4mux). "
331 "Valid only for async-finalize = FALSE",
332 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
333 g_object_class_install_property (gobject_class, PROP_SINK,
334 g_param_spec_object ("sink", "Sink",
335 "The sink element (or element chain) to use (NULL = default filesink). "
336 "Valid only for async-finalize = FALSE",
337 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
339 g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
340 g_param_spec_boolean ("use-robust-muxing",
341 "Support robust-muxing mode of some muxers",
342 "Check if muxers support robust muxing via the reserved-max-duration and "
343 "reserved-duration-remaining properties and use them if so. "
344 "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
345 " create new fragments if the reserved header space is about to overflow. "
346 "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
347 "manually by the app to a non-zero value for robust muxing to have an effect.",
348 DEFAULT_USE_ROBUST_MUXING,
349 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
351 g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
352 g_param_spec_boolean ("reset-muxer",
354 "Reset the muxer after each segment. Disabling this will not work for most muxers.",
355 DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
357 g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
358 g_param_spec_boolean ("async-finalize",
359 "Finalize fragments asynchronously",
360 "Finalize each fragment asynchronously and start a new one",
361 DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362 g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
363 g_param_spec_string ("muxer-factory", "Muxer factory",
364 "The muxer element factory to use (default = mp4mux). "
365 "Valid only for async-finalize = TRUE",
366 "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
367 g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
368 g_param_spec_boxed ("muxer-properties", "Muxer properties",
369 "The muxer element properties to use. "
370 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
371 "Valid only for async-finalize = TRUE",
372 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
373 g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
374 g_param_spec_string ("sink-factory", "Sink factory",
375 "The sink element factory to use (default = filesink). "
376 "Valid only for async-finalize = TRUE",
377 "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
378 g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
379 g_param_spec_boxed ("sink-properties", "Sink properties",
380 "The sink element properties to use. "
381 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
382 "Valid only for async-finalize = TRUE",
383 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
386 * GstSplitMuxSink::format-location:
387 * @splitmux: the #GstSplitMuxSink
388 * @fragment_id: the sequence number of the file to be created
390 * Returns: the location to be used for the next output file
392 signals[SIGNAL_FORMAT_LOCATION] =
393 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
394 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
397 * GstSplitMuxSink::format-location-full:
398 * @splitmux: the #GstSplitMuxSink
399 * @fragment_id: the sequence number of the file to be created
400 * @first_sample: A #GstSample containing the first buffer
401 * from the reference stream in the new file
403 * Returns: the location to be used for the next output file
405 signals[SIGNAL_FORMAT_LOCATION_FULL] =
406 g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
407 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
411 * GstSplitMuxSink::split-now:
412 * @splitmux: the #GstSplitMuxSink
414 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
415 * The current GOP will be output to the new file.
419 signals[SIGNAL_SPLIT_NOW] =
420 g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
421 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
422 split_now), NULL, NULL, NULL, G_TYPE_NONE, 0);
425 * GstSplitMuxSink::split-after:
426 * @splitmux: the #GstSplitMuxSink
428 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
429 * The current GOP will be output to the old file.
433 signals[SIGNAL_SPLIT_AFTER] =
434 g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
435 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
436 split_after), NULL, NULL, NULL, G_TYPE_NONE, 0);
439 * GstSplitMuxSink::split-now:
440 * @splitmux: the #GstSplitMuxSink
442 * When called by the user, this action signal splits the video file (and
443 * begins a new one) as soon as the given running time is reached. If this
444 * action signal is called multiple times, running times are queued up and
445 * processed in the order they were given.
447 * Note that this is prone to race conditions, where said running time is
448 * reached and surpassed before we had a chance to split. The file will
449 * still split immediately, but in order to make sure that the split doesn't
450 * happen too late, it is recommended to call this action signal from
451 * something that will prevent further buffers from flowing into
452 * splitmuxsink before the split is completed, such as a pad probe before
458 signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
459 g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
460 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
461 split_at_running_time), NULL, NULL, NULL, G_TYPE_NONE, 1,
465 * GstSplitMuxSink::muxer-added:
466 * @splitmux: the #GstSplitMuxSink
467 * @muxer: the newly added muxer element
471 signals[SIGNAL_MUXER_ADDED] =
472 g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
473 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
476 * GstSplitMuxSink::sink-added:
477 * @splitmux: the #GstSplitMuxSink
478 * @sink: the newly added sink element
482 signals[SIGNAL_SINK_ADDED] =
483 g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
484 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
486 klass->split_now = split_now;
487 klass->split_after = split_after;
488 klass->split_at_running_time = split_at_running_time;
492 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
494 g_mutex_init (&splitmux->lock);
495 g_cond_init (&splitmux->input_cond);
496 g_cond_init (&splitmux->output_cond);
497 g_queue_init (&splitmux->out_cmd_q);
499 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
500 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
501 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
502 splitmux->max_files = DEFAULT_MAX_FILES;
503 splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
504 splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
505 splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
506 splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
507 splitmux->reset_muxer = DEFAULT_RESET_MUXER;
509 splitmux->threshold_timecode_str = NULL;
511 splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
512 splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
513 splitmux->muxer_properties = NULL;
514 splitmux->sink_factory = g_strdup (DEFAULT_SINK);
515 splitmux->sink_properties = NULL;
517 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
518 splitmux->split_requested = FALSE;
519 splitmux->do_split_next_gop = FALSE;
520 splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
524 gst_splitmux_reset (GstSplitMuxSink * splitmux)
526 if (splitmux->muxer) {
527 gst_element_set_locked_state (splitmux->muxer, TRUE);
528 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
529 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
531 if (splitmux->active_sink) {
532 gst_element_set_locked_state (splitmux->active_sink, TRUE);
533 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
534 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
537 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
541 gst_splitmux_sink_dispose (GObject * object)
543 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
545 /* Calling parent dispose invalidates all child pointers */
546 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
548 G_OBJECT_CLASS (parent_class)->dispose (object);
552 gst_splitmux_sink_finalize (GObject * object)
554 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
555 g_cond_clear (&splitmux->input_cond);
556 g_cond_clear (&splitmux->output_cond);
557 g_mutex_clear (&splitmux->lock);
558 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
559 g_queue_clear (&splitmux->out_cmd_q);
561 if (splitmux->provided_sink)
562 gst_object_unref (splitmux->provided_sink);
563 if (splitmux->provided_muxer)
564 gst_object_unref (splitmux->provided_muxer);
566 if (splitmux->muxer_factory)
567 g_free (splitmux->muxer_factory);
568 if (splitmux->muxer_properties)
569 gst_structure_free (splitmux->muxer_properties);
570 if (splitmux->sink_factory)
571 g_free (splitmux->sink_factory);
572 if (splitmux->sink_properties)
573 gst_structure_free (splitmux->sink_properties);
575 if (splitmux->threshold_timecode_str)
576 g_free (splitmux->threshold_timecode_str);
578 if (splitmux->times_to_split)
579 gst_queue_array_free (splitmux->times_to_split);
581 g_free (splitmux->location);
583 /* Make sure to free any un-released contexts. There should not be any,
584 * because the dispose will have freed all request pads though */
585 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
586 g_list_free (splitmux->contexts);
588 G_OBJECT_CLASS (parent_class)->finalize (object);
592 * Set any time threshold to the muxer, if it has
593 * reserved-max-duration and reserved-duration-remaining
594 * properties. Called when creating/claiming the muxer
595 * in create_elements() */
597 update_muxer_properties (GstSplitMuxSink * sink)
600 GstClockTime threshold_time;
602 sink->muxer_has_reserved_props = FALSE;
603 if (sink->muxer == NULL)
605 klass = G_OBJECT_GET_CLASS (sink->muxer);
606 if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
608 if (g_object_class_find_property (klass,
609 "reserved-duration-remaining") == NULL)
611 sink->muxer_has_reserved_props = TRUE;
613 GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
614 GST_TIME_ARGS (sink->threshold_time));
615 GST_OBJECT_LOCK (sink);
616 threshold_time = sink->threshold_time;
617 GST_OBJECT_UNLOCK (sink);
619 if (threshold_time > 0) {
620 /* Tell the muxer how much space to reserve */
621 GstClockTime muxer_threshold = threshold_time;
622 g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
627 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
628 const GValue * value, GParamSpec * pspec)
630 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
634 GST_OBJECT_LOCK (splitmux);
635 g_free (splitmux->location);
636 splitmux->location = g_value_dup_string (value);
637 GST_OBJECT_UNLOCK (splitmux);
640 case PROP_MAX_SIZE_BYTES:
641 GST_OBJECT_LOCK (splitmux);
642 splitmux->threshold_bytes = g_value_get_uint64 (value);
643 GST_OBJECT_UNLOCK (splitmux);
645 case PROP_MAX_SIZE_TIME:
646 GST_OBJECT_LOCK (splitmux);
647 splitmux->threshold_time = g_value_get_uint64 (value);
648 GST_OBJECT_UNLOCK (splitmux);
650 case PROP_MAX_SIZE_TIMECODE:
651 GST_OBJECT_LOCK (splitmux);
652 splitmux->threshold_timecode_str = g_value_dup_string (value);
653 GST_OBJECT_UNLOCK (splitmux);
655 case PROP_SEND_KEYFRAME_REQUESTS:
656 GST_OBJECT_LOCK (splitmux);
657 splitmux->send_keyframe_requests = g_value_get_boolean (value);
658 GST_OBJECT_UNLOCK (splitmux);
661 GST_OBJECT_LOCK (splitmux);
662 splitmux->max_files = g_value_get_uint (value);
663 GST_OBJECT_UNLOCK (splitmux);
665 case PROP_MUXER_OVERHEAD:
666 GST_OBJECT_LOCK (splitmux);
667 splitmux->mux_overhead = g_value_get_double (value);
668 GST_OBJECT_UNLOCK (splitmux);
670 case PROP_USE_ROBUST_MUXING:
671 GST_OBJECT_LOCK (splitmux);
672 splitmux->use_robust_muxing = g_value_get_boolean (value);
673 GST_OBJECT_UNLOCK (splitmux);
674 if (splitmux->use_robust_muxing)
675 update_muxer_properties (splitmux);
677 case PROP_ALIGNMENT_THRESHOLD:
678 GST_OBJECT_LOCK (splitmux);
679 splitmux->alignment_threshold = g_value_get_uint64 (value);
680 GST_OBJECT_UNLOCK (splitmux);
683 GST_OBJECT_LOCK (splitmux);
684 if (splitmux->provided_sink)
685 gst_object_unref (splitmux->provided_sink);
686 splitmux->provided_sink = g_value_get_object (value);
687 gst_object_ref_sink (splitmux->provided_sink);
688 GST_OBJECT_UNLOCK (splitmux);
691 GST_OBJECT_LOCK (splitmux);
692 if (splitmux->provided_muxer)
693 gst_object_unref (splitmux->provided_muxer);
694 splitmux->provided_muxer = g_value_get_object (value);
695 gst_object_ref_sink (splitmux->provided_muxer);
696 GST_OBJECT_UNLOCK (splitmux);
698 case PROP_RESET_MUXER:
699 GST_OBJECT_LOCK (splitmux);
700 splitmux->reset_muxer = g_value_get_boolean (value);
701 GST_OBJECT_UNLOCK (splitmux);
703 case PROP_ASYNC_FINALIZE:
704 GST_OBJECT_LOCK (splitmux);
705 splitmux->async_finalize = g_value_get_boolean (value);
706 GST_OBJECT_UNLOCK (splitmux);
708 case PROP_MUXER_FACTORY:
709 GST_OBJECT_LOCK (splitmux);
710 if (splitmux->muxer_factory)
711 g_free (splitmux->muxer_factory);
712 splitmux->muxer_factory = g_value_dup_string (value);
713 GST_OBJECT_UNLOCK (splitmux);
715 case PROP_MUXER_PROPERTIES:
716 GST_OBJECT_LOCK (splitmux);
717 if (splitmux->muxer_properties)
718 gst_structure_free (splitmux->muxer_properties);
719 if (gst_value_get_structure (value))
720 splitmux->muxer_properties =
721 gst_structure_copy (gst_value_get_structure (value));
723 splitmux->muxer_properties = NULL;
724 GST_OBJECT_UNLOCK (splitmux);
726 case PROP_SINK_FACTORY:
727 GST_OBJECT_LOCK (splitmux);
728 if (splitmux->sink_factory)
729 g_free (splitmux->sink_factory);
730 splitmux->sink_factory = g_value_dup_string (value);
731 GST_OBJECT_UNLOCK (splitmux);
733 case PROP_SINK_PROPERTIES:
734 GST_OBJECT_LOCK (splitmux);
735 if (splitmux->sink_properties)
736 gst_structure_free (splitmux->sink_properties);
737 if (gst_value_get_structure (value))
738 splitmux->sink_properties =
739 gst_structure_copy (gst_value_get_structure (value));
741 splitmux->sink_properties = NULL;
742 GST_OBJECT_UNLOCK (splitmux);
745 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
751 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
752 GValue * value, GParamSpec * pspec)
754 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
758 GST_OBJECT_LOCK (splitmux);
759 g_value_set_string (value, splitmux->location);
760 GST_OBJECT_UNLOCK (splitmux);
762 case PROP_MAX_SIZE_BYTES:
763 GST_OBJECT_LOCK (splitmux);
764 g_value_set_uint64 (value, splitmux->threshold_bytes);
765 GST_OBJECT_UNLOCK (splitmux);
767 case PROP_MAX_SIZE_TIME:
768 GST_OBJECT_LOCK (splitmux);
769 g_value_set_uint64 (value, splitmux->threshold_time);
770 GST_OBJECT_UNLOCK (splitmux);
772 case PROP_MAX_SIZE_TIMECODE:
773 GST_OBJECT_LOCK (splitmux);
774 g_value_set_string (value, splitmux->threshold_timecode_str);
775 GST_OBJECT_UNLOCK (splitmux);
777 case PROP_SEND_KEYFRAME_REQUESTS:
778 GST_OBJECT_LOCK (splitmux);
779 g_value_set_boolean (value, splitmux->send_keyframe_requests);
780 GST_OBJECT_UNLOCK (splitmux);
783 GST_OBJECT_LOCK (splitmux);
784 g_value_set_uint (value, splitmux->max_files);
785 GST_OBJECT_UNLOCK (splitmux);
787 case PROP_MUXER_OVERHEAD:
788 GST_OBJECT_LOCK (splitmux);
789 g_value_set_double (value, splitmux->mux_overhead);
790 GST_OBJECT_UNLOCK (splitmux);
792 case PROP_USE_ROBUST_MUXING:
793 GST_OBJECT_LOCK (splitmux);
794 g_value_set_boolean (value, splitmux->use_robust_muxing);
795 GST_OBJECT_UNLOCK (splitmux);
797 case PROP_ALIGNMENT_THRESHOLD:
798 GST_OBJECT_LOCK (splitmux);
799 g_value_set_uint64 (value, splitmux->alignment_threshold);
800 GST_OBJECT_UNLOCK (splitmux);
803 GST_OBJECT_LOCK (splitmux);
804 g_value_set_object (value, splitmux->provided_sink);
805 GST_OBJECT_UNLOCK (splitmux);
808 GST_OBJECT_LOCK (splitmux);
809 g_value_set_object (value, splitmux->provided_muxer);
810 GST_OBJECT_UNLOCK (splitmux);
812 case PROP_RESET_MUXER:
813 GST_OBJECT_LOCK (splitmux);
814 g_value_set_boolean (value, splitmux->reset_muxer);
815 GST_OBJECT_UNLOCK (splitmux);
817 case PROP_ASYNC_FINALIZE:
818 GST_OBJECT_LOCK (splitmux);
819 g_value_set_boolean (value, splitmux->async_finalize);
820 GST_OBJECT_UNLOCK (splitmux);
822 case PROP_MUXER_FACTORY:
823 GST_OBJECT_LOCK (splitmux);
824 g_value_set_string (value, splitmux->muxer_factory);
825 GST_OBJECT_UNLOCK (splitmux);
827 case PROP_MUXER_PROPERTIES:
828 GST_OBJECT_LOCK (splitmux);
829 gst_value_set_structure (value, splitmux->muxer_properties);
830 GST_OBJECT_UNLOCK (splitmux);
832 case PROP_SINK_FACTORY:
833 GST_OBJECT_LOCK (splitmux);
834 g_value_set_string (value, splitmux->sink_factory);
835 GST_OBJECT_UNLOCK (splitmux);
837 case PROP_SINK_PROPERTIES:
838 GST_OBJECT_LOCK (splitmux);
839 gst_value_set_structure (value, splitmux->sink_properties);
840 GST_OBJECT_UNLOCK (splitmux);
843 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
848 /* Convenience function */
849 static inline GstClockTimeDiff
850 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
852 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
854 if (GST_CLOCK_TIME_IS_VALID (val)) {
856 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
866 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
870 ctx = g_new0 (MqStreamCtx, 1);
871 ctx->splitmux = splitmux;
872 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
873 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
874 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
875 g_queue_init (&ctx->queued_bufs);
880 mq_stream_ctx_free (MqStreamCtx * ctx)
883 GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
885 g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
887 if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
888 gst_element_set_locked_state (ctx->q, TRUE);
889 gst_element_set_state (ctx->q, GST_STATE_NULL);
890 gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
891 gst_object_unref (parent);
893 gst_object_unref (ctx->q);
895 gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
896 gst_object_unref (ctx->sinkpad);
897 gst_object_unref (ctx->srcpad);
898 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
899 g_queue_clear (&ctx->queued_bufs);
904 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
907 gchar *location = NULL;
909 const gchar *msg_name = opened ?
910 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
911 GstClockTime running_time = splitmux->reference_ctx->out_running_time;
914 GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
916 running_time = *rtime;
919 g_object_get (sink, "location", &location, NULL);
921 /* If it's in the middle of a teardown, the reference_ctc might have become
923 if (splitmux->reference_ctx) {
924 msg = gst_message_new_element (GST_OBJECT (splitmux),
925 gst_structure_new (msg_name,
926 "location", G_TYPE_STRING, location,
927 "running-time", GST_TYPE_CLOCK_TIME, running_time, NULL));
928 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
935 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
941 eos = gst_event_new_eos ();
945 GST_SPLITMUX_LOCK (splitmux);
947 pad = gst_pad_get_peer (ctx->srcpad);
948 GST_SPLITMUX_UNLOCK (splitmux);
950 gst_pad_send_event (pad, eos);
951 GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
953 gst_object_unref (pad);
957 /* Called with lock held, drops the lock to send EOS to the
961 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
966 eos = gst_event_new_eos ();
967 pad = gst_pad_get_peer (ctx->srcpad);
971 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
972 GST_SPLITMUX_UNLOCK (splitmux);
973 gst_pad_send_event (pad, eos);
974 GST_SPLITMUX_LOCK (splitmux);
976 gst_object_unref (pad);
979 /* Called with lock held. Schedules an EOS event to the ctx pad
980 * to happen in another thread */
982 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
984 AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
985 GstPad *srcpad, *sinkpad;
987 srcpad = ctx->srcpad;
988 sinkpad = gst_pad_get_peer (srcpad);
991 helper->pad = sinkpad; /* Takes the reference */
993 ctx->out_eos_async_done = TRUE;
994 /* HACK: Here, we explicitly unset the SINK flag on the target sink element
995 * that's about to be asynchronously disposed, so that it no longer
996 * participates in GstBin EOS logic. This fixes a race where if
997 * splitmuxsink really reaches EOS before an asynchronous background
998 * element has finished, then the bin won't actually send EOS to the
999 * pipeline. Even after finishing and removing the old element, the
1000 * bin doesn't re-check EOS status on removing a SINK element. This
1001 * should be fixed in core, making this hack unnecessary. */
1002 GST_OBJECT_FLAG_UNSET (splitmux->active_sink, GST_ELEMENT_FLAG_SINK);
1004 GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1007 g_assert_nonnull (helper->pad);
1008 gst_element_call_async (GST_ELEMENT (splitmux),
1009 (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1012 /* Called with lock held. TRUE iff all contexts have a
1013 * pending (or delivered) async eos event */
1015 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1017 gboolean ret = TRUE;
1020 for (item = splitmux->contexts; item; item = item->next) {
1021 MqStreamCtx *ctx = item->data;
1022 ret &= ctx->out_eos_async_done;
1027 /* Called with splitmux lock held to check if this output
1028 * context needs to sleep to wait for the release of the
1029 * next GOP, or to send EOS to close out the current file
1032 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1034 if (ctx->caps_change)
1038 /* When first starting up, the reference stream has to output
1039 * the first buffer to prepare the muxer and sink */
1040 gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1041 GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1043 if (!(splitmux->max_out_running_time == 0 ||
1044 splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1045 splitmux->alignment_threshold == 0 ||
1046 splitmux->max_out_running_time < splitmux->alignment_threshold)) {
1047 my_max_out_running_time -= splitmux->alignment_threshold;
1048 GST_LOG_OBJECT (ctx->srcpad,
1049 "Max out running time currently %" GST_STIME_FORMAT
1050 ", with threshold applied it is %" GST_STIME_FORMAT,
1051 GST_STIME_ARGS (splitmux->max_out_running_time),
1052 GST_STIME_ARGS (my_max_out_running_time));
1056 || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1059 GST_LOG_OBJECT (ctx->srcpad,
1060 "Checking running time %" GST_STIME_FORMAT " against max %"
1061 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1062 GST_STIME_ARGS (my_max_out_running_time));
1065 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1066 ctx->out_running_time < my_max_out_running_time) {
1070 switch (splitmux->output_state) {
1071 case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1072 /* We only get here if we've finished outputting a GOP and need to know
1073 * what to do next */
1074 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1075 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1078 case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1079 /* We've reached the max out running_time to get here, so end this file now */
1080 if (ctx->out_eos == FALSE) {
1081 if (splitmux->async_finalize) {
1082 /* We must set EOS asynchronously at this point. We cannot defer
1083 * it, because we need all contexts to wake up, for the
1084 * reference context to eventually give us something at
1085 * START_NEXT_FILE. Otherwise, collectpads might choose another
1086 * context to give us the first buffer, and format-location-full
1087 * will not contain a valid sample. */
1088 g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1089 GINT_TO_POINTER (1));
1090 eos_context_async (ctx, splitmux);
1091 if (all_contexts_are_async_eos (splitmux)) {
1092 GST_INFO_OBJECT (splitmux,
1093 "All contexts are async_eos. Moving to the next file.");
1094 /* We can start the next file once we've asked each pad to go EOS */
1095 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1096 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1100 send_eos (splitmux, ctx);
1104 GST_INFO_OBJECT (splitmux,
1105 "At end-of-file state, but context %p is already EOS", ctx);
1108 case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1109 if (ctx->is_reference) {
1110 /* Special handling on the reference ctx to start new fragments
1111 * and collect commands from the command queue */
1112 /* drops the splitmux lock briefly: */
1113 /* We must have reference ctx in order for format-location-full to
1115 start_next_fragment (splitmux, ctx);
1119 case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1121 SplitMuxOutputCommand *cmd =
1122 g_queue_pop_tail (&splitmux->out_cmd_q);
1124 /* If we pop the last command, we need to make our queues bigger */
1125 if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1126 grow_blocked_queues (splitmux);
1128 if (cmd->start_new_fragment) {
1129 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1130 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1132 GST_DEBUG_OBJECT (splitmux,
1133 "Got new output cmd for time %" GST_STIME_FORMAT,
1134 GST_STIME_ARGS (cmd->max_output_ts));
1136 /* Extend the output range immediately */
1137 splitmux->max_out_running_time = cmd->max_output_ts;
1138 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1140 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1142 out_cmd_buf_free (cmd);
1145 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1147 } while (splitmux->output_state ==
1148 SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1149 /* loop and re-check the state */
1152 case SPLITMUX_OUTPUT_STATE_STOPPED:
1157 GST_INFO_OBJECT (ctx->srcpad,
1158 "Sleeping for running time %"
1159 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1160 GST_STIME_ARGS (ctx->out_running_time),
1161 GST_STIME_ARGS (splitmux->max_out_running_time));
1162 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1163 GST_INFO_OBJECT (ctx->srcpad,
1164 "Woken for new max running time %" GST_STIME_FORMAT,
1165 GST_STIME_ARGS (splitmux->max_out_running_time));
1171 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1172 const GstVideoTimeCode * cur_tc)
1174 GstVideoTimeCode *target_tc;
1175 GstVideoTimeCodeInterval *tc_inter;
1176 GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1178 if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
1179 return GST_CLOCK_TIME_NONE;
1182 gst_video_time_code_interval_new_from_string
1183 (splitmux->threshold_timecode_str);
1184 target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
1185 gst_video_time_code_interval_free (tc_inter);
1188 target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1189 cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1191 /* Add fragment_start_time, accounting for wraparound */
1192 if (target_tc_time >= cur_tc_time) {
1194 target_tc_time - cur_tc_time + splitmux->fragment_start_time;
1196 GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1198 if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1199 (cur_tc->config.fps_d == 1001)) {
1200 /* Checking fps_d is probably unneeded, but better safe than sorry
1201 * (e.g. someone accidentally set a flag) */
1202 GstVideoTimeCode *tc_for_offset;
1204 /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1205 * but slightly less. Calculate that duration from a fake timecode. The
1206 * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1207 * is to add one frame to 23:59:59;29 */
1209 gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1210 NULL, cur_tc->config.flags, 23, 59, 59,
1211 cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1213 gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1214 gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1215 cur_tc->config.fps_n);
1216 gst_video_time_code_free (tc_for_offset);
1219 day_in_ns - cur_tc_time + target_tc_time +
1220 splitmux->fragment_start_time;
1223 GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
1224 " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
1225 GST_TIME_ARGS (cur_tc_time));
1226 gst_video_time_code_free (target_tc);
1228 return next_max_tc_time;
1232 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
1235 GstClockTime target_time;
1236 gboolean timecode_based = FALSE;
1238 splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
1239 if (splitmux->threshold_timecode_str) {
1240 GstVideoTimeCodeMeta *tc_meta;
1242 if (buffer != NULL) {
1243 tc_meta = gst_buffer_get_video_time_code_meta (buffer);
1245 splitmux->next_max_tc_time =
1246 calculate_next_max_timecode (splitmux, &tc_meta->tc);
1247 timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
1250 /* This can happen in the presence of GAP events that trigger
1251 * a new fragment start */
1252 GST_WARNING_OBJECT (splitmux,
1253 "No buffer available to calculate next timecode");
1257 if (splitmux->send_keyframe_requests == FALSE
1258 || (splitmux->threshold_time == 0 && !timecode_based)
1259 || splitmux->threshold_bytes != 0)
1262 if (timecode_based) {
1263 /* We might have rounding errors: aim slightly earlier */
1264 target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
1266 target_time = splitmux->fragment_start_time + splitmux->threshold_time;
1268 ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1269 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
1270 GST_TIME_ARGS (target_time));
1271 return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1274 static GstPadProbeReturn
1275 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1277 GstSplitMuxSink *splitmux = ctx->splitmux;
1278 MqStreamBuf *buf_info = NULL;
1280 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1282 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1283 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1284 g_warning ("Buffer list handling not implemented");
1285 return GST_PAD_PROBE_DROP;
1287 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1288 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1289 GstEvent *event = gst_pad_probe_info_get_event (info);
1290 gboolean locked = FALSE;
1292 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1294 switch (GST_EVENT_TYPE (event)) {
1295 case GST_EVENT_SEGMENT:
1296 gst_event_copy_segment (event, &ctx->out_segment);
1298 case GST_EVENT_FLUSH_STOP:
1299 GST_SPLITMUX_LOCK (splitmux);
1301 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1302 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1303 g_queue_clear (&ctx->queued_bufs);
1304 ctx->flushing = FALSE;
1306 case GST_EVENT_FLUSH_START:
1307 GST_SPLITMUX_LOCK (splitmux);
1309 GST_LOG_OBJECT (pad, "Flush start");
1310 ctx->flushing = TRUE;
1311 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1314 GST_SPLITMUX_LOCK (splitmux);
1316 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1318 ctx->out_eos = TRUE;
1319 GST_INFO_OBJECT (splitmux,
1320 "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1322 case GST_EVENT_GAP:{
1323 GstClockTime gap_ts;
1324 GstClockTimeDiff rtime;
1326 gst_event_parse_gap (event, &gap_ts, NULL);
1327 if (gap_ts == GST_CLOCK_TIME_NONE)
1330 GST_SPLITMUX_LOCK (splitmux);
1333 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1336 /* When we get a gap event on the
1337 * reference stream and we're trying to open a
1338 * new file, we need to store it until we get
1339 * the buffer afterwards
1341 if (ctx->is_reference &&
1342 (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1343 GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1344 gst_event_replace (&ctx->pending_gap, event);
1345 GST_SPLITMUX_UNLOCK (splitmux);
1346 return GST_PAD_PROBE_HANDLED;
1349 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1351 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1352 GST_STIME_ARGS (rtime));
1354 if (rtime != GST_CLOCK_STIME_NONE) {
1355 ctx->out_running_time = rtime;
1356 complete_or_wait_on_out (splitmux, ctx);
1360 case GST_EVENT_CUSTOM_DOWNSTREAM:{
1361 const GstStructure *s;
1362 GstClockTimeDiff ts = 0;
1364 s = gst_event_get_structure (event);
1365 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1368 gst_structure_get_int64 (s, "timestamp", &ts);
1370 GST_SPLITMUX_LOCK (splitmux);
1373 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1375 ctx->out_running_time = ts;
1376 if (!ctx->is_reference)
1377 complete_or_wait_on_out (splitmux, ctx);
1378 GST_SPLITMUX_UNLOCK (splitmux);
1379 return GST_PAD_PROBE_DROP;
1381 case GST_EVENT_CAPS:{
1384 if (!ctx->is_reference)
1387 peer = gst_pad_get_peer (pad);
1389 gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1391 gst_object_unref (peer);
1399 /* This is in the case the muxer doesn't allow this change of caps */
1400 GST_SPLITMUX_LOCK (splitmux);
1402 ctx->caps_change = TRUE;
1404 if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1405 GST_DEBUG_OBJECT (splitmux,
1406 "New caps were not accepted. Switching output file");
1407 if (ctx->out_eos == FALSE) {
1408 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1409 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1413 /* Lets it fall through, if it fails again, then the muxer just can't
1414 * support this format, but at least we have a closed file.
1422 /* We need to make sure events aren't passed
1423 * until the muxer / sink are ready for it */
1425 GST_SPLITMUX_LOCK (splitmux);
1426 if (!ctx->is_reference)
1427 complete_or_wait_on_out (splitmux, ctx);
1428 GST_SPLITMUX_UNLOCK (splitmux);
1430 /* Don't try to forward sticky events before the next buffer is there
1431 * because it would cause a new file to be created without the first
1432 * buffer being available.
1434 if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1435 gst_event_unref (event);
1436 return GST_PAD_PROBE_HANDLED;
1438 return GST_PAD_PROBE_PASS;
1441 /* Allow everything through until the configured next stopping point */
1442 GST_SPLITMUX_LOCK (splitmux);
1444 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1445 if (buf_info == NULL)
1446 /* Can only happen due to a poorly timed flush */
1449 /* If we have popped a keyframe, decrement the queued_gop count */
1450 if (buf_info->keyframe && splitmux->queued_keyframes > 0)
1451 splitmux->queued_keyframes--;
1453 ctx->out_running_time = buf_info->run_ts;
1454 ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1456 GST_LOG_OBJECT (splitmux,
1457 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1458 " size %" G_GUINT64_FORMAT,
1459 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1461 ctx->caps_change = FALSE;
1463 complete_or_wait_on_out (splitmux, ctx);
1465 splitmux->muxed_out_bytes += buf_info->buf_size;
1467 #ifndef GST_DISABLE_GST_DEBUG
1469 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1470 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1471 " run ts %" GST_STIME_FORMAT, buf,
1472 GST_STIME_ARGS (ctx->out_running_time));
1476 ctx->cur_out_buffer = NULL;
1477 GST_SPLITMUX_UNLOCK (splitmux);
1479 /* pending_gap is protected by the STREAM lock */
1480 if (ctx->pending_gap) {
1481 /* If we previously stored a gap event, send it now */
1482 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1484 GST_DEBUG_OBJECT (splitmux,
1485 "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1487 gst_pad_send_event (peer, ctx->pending_gap);
1488 ctx->pending_gap = NULL;
1490 gst_object_unref (peer);
1493 mq_stream_buf_free (buf_info);
1495 return GST_PAD_PROBE_PASS;
1498 GST_SPLITMUX_UNLOCK (splitmux);
1499 return GST_PAD_PROBE_DROP;
1503 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1505 return gst_pad_send_event (peer, gst_event_ref (*event));
1509 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1511 if (ctx->fragment_block_id > 0) {
1512 gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1513 ctx->fragment_block_id = 0;
1518 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1520 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1522 gst_pad_sticky_events_foreach (ctx->srcpad,
1523 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1525 /* Clear EOS flag if not actually EOS */
1526 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1527 ctx->out_eos_async_done = ctx->out_eos;
1529 gst_object_unref (peer);
1533 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1535 GstPad *sinkpad, *srcpad, *newpad;
1536 GstPadTemplate *templ;
1538 srcpad = ctx->srcpad;
1539 sinkpad = gst_pad_get_peer (srcpad);
1541 templ = sinkpad->padtemplate;
1543 gst_element_request_pad (splitmux->muxer, templ,
1544 GST_PAD_TEMPLATE_NAME_TEMPLATE (templ), NULL);
1546 GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1548 if (!gst_pad_unlink (srcpad, sinkpad)) {
1549 gst_object_unref (sinkpad);
1552 if (gst_pad_link_full (srcpad, newpad,
1553 GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1554 gst_element_release_request_pad (splitmux->muxer, newpad);
1555 gst_object_unref (sinkpad);
1556 gst_object_unref (newpad);
1559 gst_object_unref (newpad);
1560 gst_object_unref (sinkpad);
1564 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1565 ("Could not create the new muxer/sink"), NULL);
1568 static GstPadProbeReturn
1569 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1571 return GST_PAD_PROBE_OK;
1575 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1577 ctx->fragment_block_id =
1578 gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1583 _set_property_from_structure (GQuark field_id, const GValue * value,
1586 const gchar *property_name = g_quark_to_string (field_id);
1587 GObject *element = G_OBJECT (user_data);
1589 g_object_set_property (element, property_name, value);
1595 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1597 gst_element_set_locked_state (element, TRUE);
1598 gst_element_set_state (element, GST_STATE_NULL);
1599 GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1600 gst_bin_remove (GST_BIN (splitmux), element);
1605 _send_event (const GValue * value, gpointer user_data)
1607 GstPad *pad = g_value_get_object (value);
1608 GstEvent *ev = user_data;
1610 gst_pad_send_event (pad, gst_event_ref (ev));
1613 /* Called with lock held when a fragment
1614 * reaches EOS and it is time to restart
1618 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1620 GstElement *muxer, *sink;
1622 g_assert (ctx->is_reference);
1624 /* 1 change to new file */
1625 splitmux->switching_fragment = TRUE;
1627 /* We need to drop the splitmux lock to acquire the state lock
1628 * here and ensure there's no racy state change going on elsewhere */
1629 muxer = gst_object_ref (splitmux->muxer);
1630 sink = gst_object_ref (splitmux->active_sink);
1632 GST_SPLITMUX_UNLOCK (splitmux);
1633 GST_STATE_LOCK (splitmux);
1635 if (splitmux->async_finalize) {
1636 if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id != 0) {
1638 GstElement *new_sink, *new_muxer;
1640 GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
1641 splitmux->fragment_id);
1642 g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
1643 newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
1644 GST_SPLITMUX_LOCK (splitmux);
1645 if ((splitmux->sink =
1646 create_element (splitmux, splitmux->sink_factory, newname,
1649 if (splitmux->sink_properties)
1650 gst_structure_foreach (splitmux->sink_properties,
1651 _set_property_from_structure, splitmux->sink);
1652 splitmux->active_sink = splitmux->sink;
1653 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
1655 newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
1656 if ((splitmux->muxer =
1657 create_element (splitmux, splitmux->muxer_factory, newname,
1660 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
1662 /* async child elements are causing state change races and weird
1663 * failures, so let's try and turn that off */
1664 g_object_set (splitmux->sink, "async", FALSE, NULL);
1666 if (splitmux->muxer_properties)
1667 gst_structure_foreach (splitmux->muxer_properties,
1668 _set_property_from_structure, splitmux->muxer);
1669 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
1671 new_sink = splitmux->sink;
1672 new_muxer = splitmux->muxer;
1673 GST_SPLITMUX_UNLOCK (splitmux);
1674 g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
1675 gst_element_link (new_muxer, new_sink);
1677 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1678 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1679 EOS_FROM_US)) == 2) {
1680 _lock_and_set_to_null (muxer, splitmux);
1681 _lock_and_set_to_null (sink, splitmux);
1683 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1684 GINT_TO_POINTER (2));
1687 gst_object_unref (muxer);
1688 gst_object_unref (sink);
1691 gst_object_ref (muxer);
1692 gst_object_ref (sink);
1696 gst_element_set_locked_state (muxer, TRUE);
1697 gst_element_set_locked_state (sink, TRUE);
1698 gst_element_set_state (sink, GST_STATE_NULL);
1700 if (splitmux->reset_muxer) {
1701 gst_element_set_state (muxer, GST_STATE_NULL);
1703 GstIterator *it = gst_element_iterate_sink_pads (muxer);
1706 ev = gst_event_new_flush_start ();
1707 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1708 gst_event_unref (ev);
1710 gst_iterator_resync (it);
1712 ev = gst_event_new_flush_stop (TRUE);
1713 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1714 gst_event_unref (ev);
1716 gst_iterator_free (it);
1720 GST_SPLITMUX_LOCK (splitmux);
1721 if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0)
1722 set_next_filename (splitmux, ctx);
1723 splitmux->muxed_out_bytes = 0;
1724 GST_SPLITMUX_UNLOCK (splitmux);
1726 gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
1727 gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
1728 gst_element_set_locked_state (muxer, FALSE);
1729 gst_element_set_locked_state (sink, FALSE);
1731 gst_object_unref (sink);
1732 gst_object_unref (muxer);
1734 GST_SPLITMUX_LOCK (splitmux);
1735 GST_STATE_UNLOCK (splitmux);
1736 splitmux->switching_fragment = FALSE;
1737 do_async_done (splitmux);
1739 splitmux->ready_for_output = TRUE;
1741 g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
1742 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
1744 send_fragment_opened_closed_msg (splitmux, TRUE, sink);
1746 /* FIXME: Is this always the correct next state? */
1747 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1748 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1752 GST_STATE_UNLOCK (splitmux);
1753 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1754 ("Could not create the new muxer/sink"), NULL);
1758 bus_handler (GstBin * bin, GstMessage * message)
1760 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
1762 switch (GST_MESSAGE_TYPE (message)) {
1763 case GST_MESSAGE_EOS:{
1764 /* If the state is draining out the current file, drop this EOS */
1767 sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
1768 GST_SPLITMUX_LOCK (splitmux);
1770 send_fragment_opened_closed_msg (splitmux, FALSE, sink);
1772 if (splitmux->async_finalize) {
1774 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1775 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1776 EOS_FROM_US)) == 2) {
1778 GstPad *sinksink, *muxersrc;
1780 sinksink = gst_element_get_static_pad (sink, "sink");
1781 muxersrc = gst_pad_get_peer (sinksink);
1782 muxer = gst_pad_get_parent_element (muxersrc);
1783 gst_object_unref (sinksink);
1784 gst_object_unref (muxersrc);
1786 gst_element_call_async (muxer,
1787 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1788 gst_object_ref (splitmux), gst_object_unref);
1789 gst_element_call_async (sink,
1790 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1791 gst_object_ref (splitmux), gst_object_unref);
1792 gst_object_unref (muxer);
1794 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1795 GINT_TO_POINTER (2));
1797 GST_DEBUG_OBJECT (splitmux,
1798 "Caught async EOS from previous muxer+sink. Dropping.");
1799 /* We forward the EOS so that it gets aggregated as normal. If the sink
1800 * finishes and is removed before the end, it will be de-aggregated */
1801 gst_message_unref (message);
1802 GST_SPLITMUX_UNLOCK (splitmux);
1805 } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
1806 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
1807 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1808 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1810 gst_message_unref (message);
1811 GST_SPLITMUX_UNLOCK (splitmux);
1814 GST_DEBUG_OBJECT (splitmux,
1815 "Passing EOS message. Output state %d max_out_running_time %"
1816 GST_STIME_FORMAT, splitmux->output_state,
1817 GST_STIME_ARGS (splitmux->max_out_running_time));
1819 GST_SPLITMUX_UNLOCK (splitmux);
1822 case GST_MESSAGE_ASYNC_START:
1823 case GST_MESSAGE_ASYNC_DONE:
1824 /* Ignore state changes from our children while switching */
1825 GST_SPLITMUX_LOCK (splitmux);
1826 if (splitmux->switching_fragment) {
1827 if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
1828 || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
1829 GST_LOG_OBJECT (splitmux,
1830 "Ignoring state change from child %" GST_PTR_FORMAT
1831 " while switching", GST_MESSAGE_SRC (message));
1832 gst_message_unref (message);
1833 GST_SPLITMUX_UNLOCK (splitmux);
1837 GST_SPLITMUX_UNLOCK (splitmux);
1839 case GST_MESSAGE_WARNING:
1841 GError *gerror = NULL;
1843 gst_message_parse_warning (message, &gerror, NULL);
1845 if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
1847 gboolean caps_change = FALSE;
1849 GST_SPLITMUX_LOCK (splitmux);
1851 for (item = splitmux->contexts; item; item = item->next) {
1852 MqStreamCtx *ctx = item->data;
1854 if (ctx->caps_change) {
1860 GST_SPLITMUX_UNLOCK (splitmux);
1863 GST_LOG_OBJECT (splitmux,
1864 "Ignoring warning change from child %" GST_PTR_FORMAT
1865 " while switching caps", GST_MESSAGE_SRC (message));
1866 gst_message_unref (message);
1876 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1880 ctx_set_unblock (MqStreamCtx * ctx)
1882 ctx->need_unblock = TRUE;
1886 need_new_fragment (GstSplitMuxSink * splitmux,
1887 GstClockTime queued_time, GstClockTime queued_gop_time,
1888 guint64 queued_bytes)
1890 guint64 thresh_bytes;
1891 GstClockTime thresh_time;
1892 gboolean check_robust_muxing;
1893 GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
1894 GstClockTime *ptr_to_time;
1896 GST_OBJECT_LOCK (splitmux);
1897 thresh_bytes = splitmux->threshold_bytes;
1898 thresh_time = splitmux->threshold_time;
1899 ptr_to_time = (GstClockTime *)
1900 gst_queue_array_peek_head_struct (splitmux->times_to_split);
1902 time_to_split = *ptr_to_time;
1903 check_robust_muxing = splitmux->use_robust_muxing
1904 && splitmux->muxer_has_reserved_props;
1905 GST_OBJECT_UNLOCK (splitmux);
1907 /* Have we muxed anything into the new file at all? */
1908 if (splitmux->fragment_total_bytes <= 0)
1911 /* User told us to split now */
1912 if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE)
1915 /* User told us to split at this running time */
1916 if (splitmux->reference_ctx->in_running_time > time_to_split) {
1917 GST_OBJECT_LOCK (splitmux);
1918 /* Dequeue running time */
1919 gst_queue_array_pop_head_struct (splitmux->times_to_split);
1920 /* Empty any running times after this that are past now */
1921 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
1922 while (ptr_to_time) {
1923 time_to_split = *ptr_to_time;
1924 if (splitmux->reference_ctx->in_running_time <= time_to_split) {
1927 gst_queue_array_pop_head_struct (splitmux->times_to_split);
1928 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
1930 GST_OBJECT_UNLOCK (splitmux);
1934 if (thresh_bytes > 0 && queued_bytes > thresh_bytes)
1935 return TRUE; /* Would overrun byte limit */
1937 if (thresh_time > 0 && queued_time > thresh_time)
1938 return TRUE; /* Would overrun byte limit */
1940 /* Timecode-based threshold accounts for possible rounding errors:
1941 * 5us should be bigger than all possible rounding errors but nowhere near
1942 * big enough to skip to another frame */
1943 if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
1944 splitmux->reference_ctx->in_running_time >
1945 splitmux->next_max_tc_time + 5 * GST_USECOND)
1946 return TRUE; /* Timecode threshold */
1948 if (check_robust_muxing) {
1949 GstClockTime mux_reserved_remain;
1951 g_object_get (splitmux->muxer,
1952 "reserved-duration-remaining", &mux_reserved_remain, NULL);
1954 GST_LOG_OBJECT (splitmux,
1955 "Muxer robust muxing report - %" G_GUINT64_FORMAT
1956 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
1957 mux_reserved_remain, queued_gop_time);
1959 if (queued_gop_time >= mux_reserved_remain) {
1960 GST_INFO_OBJECT (splitmux,
1961 "File is about to run out of header room - %" G_GUINT64_FORMAT
1962 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
1963 ". Switching to new file", mux_reserved_remain, queued_gop_time);
1968 /* Continue and mux this GOP */
1972 /* Called with splitmux lock held */
1973 /* Called when entering ProcessingCompleteGop state
1974 * Assess if mq contents overflowed the current file
1975 * -> If yes, need to switch to new file
1976 * -> if no, set max_out_running_time to let this GOP in and
1977 * go to COLLECTING_GOP_START state
1980 handle_gathered_gop (GstSplitMuxSink * splitmux)
1982 guint64 queued_bytes;
1983 GstClockTimeDiff queued_time = 0;
1984 GstClockTimeDiff queued_gop_time = 0;
1985 GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
1986 SplitMuxOutputCommand *cmd;
1988 /* Assess if the multiqueue contents overflowed the current file */
1989 /* When considering if a newly gathered GOP overflows
1990 * the time limit for the file, only consider the running time of the
1991 * reference stream. Other streams might have run ahead a little bit,
1992 * but extra pieces won't be released to the muxer beyond the reference
1993 * stream cut-off anyway - so it forms the limit. */
1994 queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
1995 queued_time = splitmux->reference_ctx->in_running_time;
1996 /* queued_gop_time tracks how much unwritten data there is waiting to
1997 * be written to this fragment including this GOP */
1998 if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2000 splitmux->reference_ctx->in_running_time -
2001 splitmux->reference_ctx->out_running_time;
2004 splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
2006 GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2008 g_assert (queued_gop_time >= 0);
2009 g_assert (queued_time >= splitmux->fragment_start_time);
2011 queued_time -= splitmux->fragment_start_time;
2012 if (queued_time < queued_gop_time)
2013 queued_gop_time = queued_time;
2015 /* Expand queued bytes estimate by muxer overhead */
2016 queued_bytes += (queued_bytes * splitmux->mux_overhead);
2018 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2019 " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
2020 if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
2021 GST_LOG_OBJECT (splitmux,
2022 "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
2023 GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
2024 GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
2027 /* Check for overrun - have we output at least one byte and overrun
2028 * either threshold? */
2029 if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2030 if (splitmux->async_finalize) {
2031 GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2032 *sink_running_time = splitmux->reference_ctx->out_running_time;
2033 g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2034 RUNNING_TIME, sink_running_time, g_free);
2036 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2037 /* Tell the output side to start a new fragment */
2038 GST_INFO_OBJECT (splitmux,
2039 "This GOP (dur %" GST_STIME_FORMAT
2040 ") would overflow the fragment, Sending start_new_fragment cmd",
2041 GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
2042 splitmux->gop_start_time));
2043 cmd = out_cmd_buf_new ();
2044 cmd->start_new_fragment = TRUE;
2045 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2046 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2048 new_out_ts = splitmux->reference_ctx->in_running_time;
2049 splitmux->fragment_start_time = splitmux->gop_start_time;
2050 splitmux->fragment_total_bytes = 0;
2052 if (request_next_keyframe (splitmux,
2053 splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
2054 GST_WARNING_OBJECT (splitmux,
2055 "Could not request a keyframe. Files may not split at the exact location they should");
2057 gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2060 /* And set up to collect the next GOP */
2061 if (!splitmux->reference_ctx->in_eos) {
2062 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2063 splitmux->gop_start_time = new_out_ts;
2065 /* This is probably already the current state, but just in case: */
2066 splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2067 new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */
2070 /* And wake all input contexts to send a wake-up event */
2071 g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2072 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2074 /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2075 splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
2077 if (splitmux->gop_total_bytes > 0) {
2078 GST_LOG_OBJECT (splitmux,
2079 "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2080 " time %" GST_STIME_FORMAT,
2081 splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2083 /* Send this GOP to the output command queue */
2084 cmd = out_cmd_buf_new ();
2085 cmd->start_new_fragment = FALSE;
2086 cmd->max_output_ts = new_out_ts;
2087 GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2088 GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
2089 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2091 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2094 splitmux->gop_total_bytes = 0;
2097 /* Called with splitmux lock held */
2098 /* Called from each input pad when it is has all the pieces
2099 * for a GOP or EOS, starting with the reference pad which has set the
2100 * splitmux->max_in_running_time
2103 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2108 /* On ENDING_FILE, the reference stream sends a command to start a new
2109 * fragment, then releases the GOP for output in the new fragment.
2110 * If somes streams received no buffer during the last GOP that overran,
2111 * because its next buffer has a timestamp bigger than
2112 * ctx->max_in_running_time, its queue is empty. In that case the only
2113 * way to wakeup the output thread is by injecting an event in the
2114 * queue. This usually happen with subtitle streams.
2115 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2116 if (ctx->need_unblock) {
2117 GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2118 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2119 GST_EVENT_TYPE_SERIALIZED,
2120 gst_structure_new ("splitmuxsink-unblock", "timestamp",
2121 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2123 GST_SPLITMUX_UNLOCK (splitmux);
2124 gst_pad_send_event (ctx->sinkpad, event);
2125 GST_SPLITMUX_LOCK (splitmux);
2127 ctx->need_unblock = FALSE;
2128 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2129 /* state may have changed while we were unlocked. Loop again if so */
2130 if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2134 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2135 gboolean ready = TRUE;
2137 /* Iterate each pad, and check that the input running time is at least
2138 * up to the reference running time, and if so handle the collected GOP */
2139 GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
2140 GST_STIME_FORMAT " ctx %p",
2141 GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
2142 for (cur = g_list_first (splitmux->contexts); cur != NULL;
2143 cur = g_list_next (cur)) {
2144 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2146 GST_LOG_OBJECT (splitmux,
2147 "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
2148 " EOS %d", tmpctx, tmpctx->srcpad,
2149 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2151 if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
2152 tmpctx->in_running_time < splitmux->max_in_running_time &&
2154 GST_LOG_OBJECT (splitmux,
2155 "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
2156 tmpctx, tmpctx->srcpad);
2162 GST_DEBUG_OBJECT (splitmux,
2163 "Collected GOP is complete. Processing (ctx %p)", ctx);
2164 /* All pads have a complete GOP, release it into the multiqueue */
2165 handle_gathered_gop (splitmux);
2167 /* The user has requested a split, we can split now that the previous GOP
2168 * has been collected to the correct location */
2169 if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested), TRUE,
2171 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2176 /* If upstream reached EOS we are not expecting more data, no need to wait
2181 /* Some pad is not yet ready, or GOP is being pushed
2182 * either way, sleep and wait to get woken */
2183 while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2185 (ctx->in_running_time >= splitmux->max_in_running_time) &&
2186 (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
2188 GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2189 GST_SPLITMUX_WAIT_INPUT (splitmux);
2190 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2194 static GstPadProbeReturn
2195 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2197 GstSplitMuxSink *splitmux = ctx->splitmux;
2199 MqStreamBuf *buf_info = NULL;
2201 gboolean loop_again;
2202 gboolean keyframe = FALSE;
2204 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2206 /* FIXME: Handle buffer lists, until then make it clear they won't work */
2207 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2208 g_warning ("Buffer list handling not implemented");
2209 return GST_PAD_PROBE_DROP;
2211 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2212 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2213 GstEvent *event = gst_pad_probe_info_get_event (info);
2215 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2217 switch (GST_EVENT_TYPE (event)) {
2218 case GST_EVENT_SEGMENT:
2219 gst_event_copy_segment (event, &ctx->in_segment);
2221 case GST_EVENT_FLUSH_STOP:
2222 GST_SPLITMUX_LOCK (splitmux);
2223 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2224 ctx->in_eos = FALSE;
2225 ctx->in_running_time = GST_CLOCK_STIME_NONE;
2226 GST_SPLITMUX_UNLOCK (splitmux);
2229 GST_SPLITMUX_LOCK (splitmux);
2232 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2235 if (ctx->is_reference) {
2236 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2237 /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2238 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2239 /* Wake up other input pads to collect this GOP */
2240 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2241 check_completed_gop (splitmux, ctx);
2242 } else if (splitmux->input_state ==
2243 SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2244 /* If we are waiting for a GOP to be completed (ie, for aux
2245 * pads to catch up), then this pad is complete, so check
2246 * if the whole GOP is.
2248 check_completed_gop (splitmux, ctx);
2250 GST_SPLITMUX_UNLOCK (splitmux);
2252 case GST_EVENT_GAP:{
2253 GstClockTime gap_ts;
2254 GstClockTimeDiff rtime;
2256 gst_event_parse_gap (event, &gap_ts, NULL);
2257 if (gap_ts == GST_CLOCK_TIME_NONE)
2260 GST_SPLITMUX_LOCK (splitmux);
2262 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2264 rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2266 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2267 GST_STIME_ARGS (rtime));
2269 if (ctx->is_reference
2270 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2271 splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
2272 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2273 GST_STIME_ARGS (splitmux->fragment_start_time));
2274 /* Also take this as the first start time when starting up,
2275 * so that we start counting overflow from the first frame */
2276 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2277 splitmux->max_in_running_time = splitmux->fragment_start_time;
2280 GST_SPLITMUX_UNLOCK (splitmux);
2286 return GST_PAD_PROBE_PASS;
2287 } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2288 switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2289 case GST_QUERY_ALLOCATION:
2290 return GST_PAD_PROBE_DROP;
2292 return GST_PAD_PROBE_PASS;
2296 buf = gst_pad_probe_info_get_buffer (info);
2297 buf_info = mq_stream_buf_new ();
2299 if (GST_BUFFER_PTS_IS_VALID (buf))
2300 ts = GST_BUFFER_PTS (buf);
2302 ts = GST_BUFFER_DTS (buf);
2304 GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2306 GST_SPLITMUX_LOCK (splitmux);
2308 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2311 /* If this buffer has a timestamp, advance the input timestamp of the
2313 if (GST_CLOCK_TIME_IS_VALID (ts)) {
2314 GstClockTimeDiff running_time =
2315 my_segment_to_running_time (&ctx->in_segment, ts);
2317 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2318 GST_STIME_ARGS (running_time));
2320 if (GST_CLOCK_STIME_IS_VALID (running_time)
2321 && running_time > ctx->in_running_time)
2322 ctx->in_running_time = running_time;
2325 /* Try to make sure we have a valid running time */
2326 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2327 ctx->in_running_time =
2328 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2331 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2332 GST_STIME_ARGS (ctx->in_running_time));
2334 buf_info->run_ts = ctx->in_running_time;
2335 buf_info->buf_size = gst_buffer_get_size (buf);
2336 buf_info->duration = GST_BUFFER_DURATION (buf);
2338 /* initialize fragment_start_time */
2339 if (ctx->is_reference
2340 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2341 splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
2342 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2343 GST_STIME_ARGS (splitmux->fragment_start_time));
2344 gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2346 /* Also take this as the first start time when starting up,
2347 * so that we start counting overflow from the first frame */
2348 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2349 splitmux->max_in_running_time = splitmux->fragment_start_time;
2350 if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
2351 GST_WARNING_OBJECT (splitmux,
2352 "Could not request a keyframe. Files may not split at the exact location they should");
2354 gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2357 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
2358 " total GOP bytes %" G_GUINT64_FORMAT,
2359 GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
2366 switch (splitmux->input_state) {
2367 case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
2368 if (ctx->is_reference) {
2369 /* This is the reference context. If it's a keyframe,
2370 * it marks the start of a new GOP and we should wait in
2371 * check_completed_gop before continuing, but either way
2372 * (keyframe or no, we'll pass this buffer through after
2373 * so set loop_again to FALSE */
2376 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
2377 /* Allow other input pads to catch up to here too */
2378 splitmux->max_in_running_time = ctx->in_running_time;
2379 GST_LOG_OBJECT (splitmux,
2380 "Max in running time now %" GST_TIME_FORMAT,
2381 GST_TIME_ARGS (splitmux->max_in_running_time));
2382 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2385 GST_INFO_OBJECT (pad,
2386 "Have keyframe with running time %" GST_STIME_FORMAT,
2387 GST_STIME_ARGS (ctx->in_running_time));
2389 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2390 splitmux->max_in_running_time = ctx->in_running_time;
2391 GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
2392 GST_TIME_ARGS (splitmux->max_in_running_time));
2393 /* Wake up other input pads to collect this GOP */
2394 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2395 check_completed_gop (splitmux, ctx);
2396 /* Store this new keyframe to remember the start of GOP */
2397 gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2399 /* Pass this buffer if the reference ctx is far enough ahead */
2400 if (ctx->in_running_time < splitmux->max_in_running_time) {
2405 /* We're still waiting for a keyframe on the reference pad, sleep */
2406 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
2407 GST_SPLITMUX_WAIT_INPUT (splitmux);
2408 GST_LOG_OBJECT (pad,
2409 "Done sleeping for GOP start input state now %d",
2410 splitmux->input_state);
2413 case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
2414 /* We're collecting a GOP. If this is the reference context,
2415 * we need to check if this is a keyframe that marks the start
2416 * of the next GOP. If it is, it marks the end of the GOP we're
2417 * collecting, so sleep and wait until all the other pads also
2418 * reach that timestamp - at which point, we have an entire GOP
2419 * and either go to ENDING_FILE or release this GOP to the muxer and
2420 * go back to COLLECT_GOP_START. */
2422 /* If we overran the target timestamp, it might be time to process
2423 * the GOP, otherwise bail out for more data
2425 GST_LOG_OBJECT (pad,
2426 "Checking TS %" GST_STIME_FORMAT " against max %"
2427 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
2428 GST_STIME_ARGS (splitmux->max_in_running_time));
2430 if (ctx->in_running_time < splitmux->max_in_running_time) {
2435 GST_LOG_OBJECT (pad,
2436 "Collected last packet of GOP. Checking other pads");
2437 check_completed_gop (splitmux, ctx);
2440 case SPLITMUX_INPUT_STATE_FINISHING_UP:
2451 splitmux->queued_keyframes++;
2452 buf_info->keyframe = TRUE;
2455 /* Update total input byte counter for overflow detect */
2456 splitmux->gop_total_bytes += buf_info->buf_size;
2458 /* Now add this buffer to the queue just before returning */
2459 g_queue_push_head (&ctx->queued_bufs, buf_info);
2461 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
2462 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
2464 GST_SPLITMUX_UNLOCK (splitmux);
2465 return GST_PAD_PROBE_PASS;
2468 GST_SPLITMUX_UNLOCK (splitmux);
2470 mq_stream_buf_free (buf_info);
2471 return GST_PAD_PROBE_PASS;
2475 grow_blocked_queues (GstSplitMuxSink * splitmux)
2479 /* Scan other queues for full-ness and grow them */
2480 for (cur = g_list_first (splitmux->contexts);
2481 cur != NULL; cur = g_list_next (cur)) {
2482 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2484 guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
2486 g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
2487 GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
2489 if (cur_len >= cur_limit) {
2490 cur_limit = cur_len + 1;
2491 GST_DEBUG_OBJECT (tmpctx->q,
2492 "Queue overflowed and needs enlarging. Growing to %u buffers",
2494 g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
2500 handle_q_underrun (GstElement * q, gpointer user_data)
2502 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2503 GstSplitMuxSink *splitmux = ctx->splitmux;
2505 GST_SPLITMUX_LOCK (splitmux);
2506 GST_DEBUG_OBJECT (q,
2507 "Queue reported underrun with %d keyframes and %d cmds enqueued",
2508 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2509 grow_blocked_queues (splitmux);
2510 GST_SPLITMUX_UNLOCK (splitmux);
2514 handle_q_overrun (GstElement * q, gpointer user_data)
2516 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2517 GstSplitMuxSink *splitmux = ctx->splitmux;
2518 gboolean allow_grow = FALSE;
2520 GST_SPLITMUX_LOCK (splitmux);
2521 GST_DEBUG_OBJECT (q,
2522 "Queue reported overrun with %d keyframes and %d cmds enqueued",
2523 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2525 if (splitmux->queued_keyframes < 2) {
2526 /* Less than a full GOP queued, grow the queue */
2528 } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
2531 /* If another queue is starved, grow */
2533 for (cur = g_list_first (splitmux->contexts);
2534 cur != NULL; cur = g_list_next (cur)) {
2535 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2536 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
2541 GST_SPLITMUX_UNLOCK (splitmux);
2546 g_object_get (q, "max-size-buffers", &cur_limit, NULL);
2549 GST_DEBUG_OBJECT (q,
2550 "Queue overflowed and needs enlarging. Growing to %u buffers",
2553 g_object_set (q, "max-size-buffers", cur_limit, NULL);
2558 gst_splitmux_sink_request_new_pad (GstElement * element,
2559 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
2561 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2562 GstPadTemplate *mux_template = NULL;
2565 GstPad *q_sink = NULL, *q_src = NULL;
2567 gboolean is_video = FALSE;
2570 GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
2572 GST_SPLITMUX_LOCK (splitmux);
2573 if (!create_muxer (splitmux))
2575 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2577 if (templ->name_template) {
2578 if (g_str_equal (templ->name_template, "video")) {
2579 if (splitmux->have_video)
2580 goto already_have_video;
2582 /* FIXME: Look for a pad template with matching caps, rather than by name */
2583 GST_DEBUG_OBJECT (element,
2584 "searching for pad-template with name 'video_%%u'");
2586 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2587 (splitmux->muxer), "video_%u");
2589 /* Fallback to find sink pad templates named 'video' (flvmux) */
2590 if (!mux_template) {
2591 GST_DEBUG_OBJECT (element,
2592 "searching for pad-template with name 'video'");
2594 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2595 (splitmux->muxer), "video");
2600 GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
2601 templ->name_template);
2603 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2604 (splitmux->muxer), templ->name_template);
2606 /* Fallback to find sink pad templates named 'audio' (flvmux) */
2607 if (!mux_template) {
2608 GST_DEBUG_OBJECT (element,
2609 "searching for pad-template with name 'audio'");
2611 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2612 (splitmux->muxer), "audio");
2617 if (mux_template == NULL) {
2618 GST_DEBUG_OBJECT (element,
2619 "searching for pad-template with name 'sink_%%d'");
2621 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2622 (splitmux->muxer), "sink_%d");
2625 if (mux_template == NULL) {
2626 GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
2628 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2629 (splitmux->muxer), "sink");
2634 if (mux_template == NULL) {
2635 GST_ERROR_OBJECT (element,
2636 "unable to find a suitable sink pad-template on the muxer");
2640 GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
2641 mux_template->name_template);
2643 if (mux_template->presence == GST_PAD_REQUEST) {
2644 GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
2646 res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
2649 } else if (mux_template->presence == GST_PAD_ALWAYS) {
2650 GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
2653 gst_element_get_static_pad (splitmux->muxer,
2654 mux_template->name_template);
2658 GST_ERROR_OBJECT (element,
2659 "unexpected pad presence %d", mux_template->presence);
2665 gname = g_strdup ("video");
2666 else if (name == NULL)
2667 gname = gst_pad_get_name (res);
2669 gname = g_strdup (name);
2671 if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
2674 gst_element_set_state (q, GST_STATE_TARGET (splitmux));
2676 g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
2677 "max-size-buffers", 5, NULL);
2679 q_sink = gst_element_get_static_pad (q, "sink");
2680 q_src = gst_element_get_static_pad (q, "src");
2682 if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
2683 gst_element_release_request_pad (splitmux->muxer, res);
2684 gst_object_unref (GST_OBJECT (res));
2688 gst_object_unref (GST_OBJECT (res));
2690 ctx = mq_stream_ctx_new (splitmux);
2691 /* Context holds a ref: */
2692 ctx->q = gst_object_ref (q);
2693 ctx->srcpad = q_src;
2694 ctx->sinkpad = q_sink;
2696 g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
2697 g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
2699 ctx->src_pad_block_id =
2700 gst_pad_add_probe (q_src,
2701 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
2702 (GstPadProbeCallback) handle_mq_output, ctx, NULL);
2703 if (is_video && splitmux->reference_ctx != NULL) {
2704 splitmux->reference_ctx->is_reference = FALSE;
2705 splitmux->reference_ctx = NULL;
2707 if (splitmux->reference_ctx == NULL) {
2708 splitmux->reference_ctx = ctx;
2709 ctx->is_reference = TRUE;
2712 res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
2713 g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
2715 ctx->sink_pad_block_id =
2716 gst_pad_add_probe (q_sink,
2717 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
2718 GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2719 (GstPadProbeCallback) handle_mq_input, ctx, NULL);
2721 GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
2722 " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
2724 splitmux->contexts = g_list_append (splitmux->contexts, ctx);
2729 splitmux->have_video = TRUE;
2731 gst_pad_set_active (res, TRUE);
2732 gst_element_add_pad (element, res);
2734 GST_SPLITMUX_UNLOCK (splitmux);
2738 GST_SPLITMUX_UNLOCK (splitmux);
2741 gst_object_unref (q_sink);
2743 gst_object_unref (q_src);
2746 GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
2747 GST_SPLITMUX_UNLOCK (splitmux);
2752 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
2754 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2755 GstPad *muxpad = NULL;
2757 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
2759 GST_SPLITMUX_LOCK (splitmux);
2761 if (splitmux->muxer == NULL)
2762 goto fail; /* Elements don't exist yet - nothing to release */
2764 GST_INFO_OBJECT (pad, "releasing request pad");
2766 muxpad = gst_pad_get_peer (ctx->srcpad);
2768 /* Remove the context from our consideration */
2769 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
2771 if (ctx->sink_pad_block_id)
2772 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
2774 if (ctx->src_pad_block_id)
2775 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
2777 /* Can release the context now */
2778 mq_stream_ctx_free (ctx);
2779 if (ctx == splitmux->reference_ctx)
2780 splitmux->reference_ctx = NULL;
2782 /* Release and free the muxer input */
2784 gst_element_release_request_pad (splitmux->muxer, muxpad);
2785 gst_object_unref (muxpad);
2788 if (GST_PAD_PAD_TEMPLATE (pad) &&
2789 g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
2791 splitmux->have_video = FALSE;
2793 gst_element_remove_pad (element, pad);
2795 /* Reset the internal elements only after all request pads are released */
2796 if (splitmux->contexts == NULL)
2797 gst_splitmux_reset (splitmux);
2800 GST_SPLITMUX_UNLOCK (splitmux);
2804 create_element (GstSplitMuxSink * splitmux,
2805 const gchar * factory, const gchar * name, gboolean locked)
2807 GstElement *ret = gst_element_factory_make (factory, name);
2809 g_warning ("Failed to create %s - splitmuxsink will not work", name);
2814 /* Ensure the sink starts in locked state and NULL - it will be changed
2815 * by the filename setting code */
2816 gst_element_set_locked_state (ret, TRUE);
2817 gst_element_set_state (ret, GST_STATE_NULL);
2820 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
2821 g_warning ("Could not add %s element - splitmuxsink will not work", name);
2822 gst_object_unref (ret);
2830 create_muxer (GstSplitMuxSink * splitmux)
2832 /* Create internal elements */
2833 if (splitmux->muxer == NULL) {
2834 GstElement *provided_muxer = NULL;
2836 GST_OBJECT_LOCK (splitmux);
2837 if (splitmux->provided_muxer != NULL)
2838 provided_muxer = gst_object_ref (splitmux->provided_muxer);
2839 GST_OBJECT_UNLOCK (splitmux);
2841 if ((!splitmux->async_finalize && provided_muxer == NULL) ||
2842 (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
2843 if ((splitmux->muxer =
2844 create_element (splitmux, DEFAULT_MUXER, "muxer", FALSE)) == NULL)
2846 } else if (splitmux->async_finalize) {
2847 if ((splitmux->muxer =
2848 create_element (splitmux, splitmux->muxer_factory, "muxer",
2851 if (splitmux->muxer_properties)
2852 gst_structure_foreach (splitmux->muxer_properties,
2853 _set_property_from_structure, splitmux->muxer);
2855 /* Ensure it's not in locked state (we might be reusing an old element) */
2856 gst_element_set_locked_state (provided_muxer, FALSE);
2857 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
2858 g_warning ("Could not add muxer element - splitmuxsink will not work");
2859 gst_object_unref (provided_muxer);
2863 splitmux->muxer = provided_muxer;
2864 gst_object_unref (provided_muxer);
2867 if (splitmux->use_robust_muxing) {
2868 update_muxer_properties (splitmux);
2878 find_sink (GstElement * e)
2880 GstElement *res = NULL;
2882 gboolean done = FALSE;
2883 GValue data = { 0, };
2885 if (!GST_IS_BIN (e))
2888 if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
2891 iter = gst_bin_iterate_sinks (GST_BIN (e));
2893 switch (gst_iterator_next (iter, &data)) {
2894 case GST_ITERATOR_OK:
2896 GstElement *child = g_value_get_object (&data);
2897 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
2898 "location") != NULL) {
2902 g_value_reset (&data);
2905 case GST_ITERATOR_RESYNC:
2906 gst_iterator_resync (iter);
2908 case GST_ITERATOR_DONE:
2911 case GST_ITERATOR_ERROR:
2912 g_assert_not_reached ();
2916 g_value_unset (&data);
2917 gst_iterator_free (iter);
2923 create_sink (GstSplitMuxSink * splitmux)
2925 GstElement *provided_sink = NULL;
2927 if (splitmux->active_sink == NULL) {
2929 GST_OBJECT_LOCK (splitmux);
2930 if (splitmux->provided_sink != NULL)
2931 provided_sink = gst_object_ref (splitmux->provided_sink);
2932 GST_OBJECT_UNLOCK (splitmux);
2934 if ((!splitmux->async_finalize && provided_sink == NULL) ||
2935 (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
2936 if ((splitmux->sink =
2937 create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
2939 splitmux->active_sink = splitmux->sink;
2940 } else if (splitmux->async_finalize) {
2941 if ((splitmux->sink =
2942 create_element (splitmux, splitmux->sink_factory, "sink",
2945 if (splitmux->sink_properties)
2946 gst_structure_foreach (splitmux->sink_properties,
2947 _set_property_from_structure, splitmux->sink);
2948 splitmux->active_sink = splitmux->sink;
2950 /* Ensure the sink starts in locked state and NULL - it will be changed
2951 * by the filename setting code */
2952 gst_element_set_locked_state (provided_sink, TRUE);
2953 gst_element_set_state (provided_sink, GST_STATE_NULL);
2954 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
2955 g_warning ("Could not add sink elements - splitmuxsink will not work");
2956 gst_object_unref (provided_sink);
2960 splitmux->active_sink = provided_sink;
2962 /* The bin holds a ref now, we can drop our tmp ref */
2963 gst_object_unref (provided_sink);
2965 /* Find the sink element */
2966 splitmux->sink = find_sink (splitmux->active_sink);
2967 if (splitmux->sink == NULL) {
2969 ("Could not locate sink element in provided sink - splitmuxsink will not work");
2975 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2977 /* async child elements are causing state change races and weird
2978 * failures, so let's try and turn that off */
2979 g_object_set (splitmux->sink, "async", FALSE, NULL);
2983 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
2984 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
2995 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
2998 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
3000 gchar *fname = NULL;
3004 gst_splitmux_sink_ensure_max_files (splitmux);
3006 if (ctx->cur_out_buffer == NULL) {
3007 GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3010 caps = gst_pad_get_current_caps (ctx->srcpad);
3011 sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3012 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3013 splitmux->fragment_id, sample, &fname);
3014 gst_sample_unref (sample);
3016 gst_caps_unref (caps);
3018 if (fname == NULL) {
3019 /* Fallback to the old signal if the new one returned nothing */
3020 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3021 splitmux->fragment_id, &fname);
3025 fname = splitmux->location ?
3026 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3029 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3030 g_object_set (splitmux->sink, "location", fname, NULL);
3033 splitmux->fragment_id++;
3038 do_async_start (GstSplitMuxSink * splitmux)
3040 GstMessage *message;
3042 if (!splitmux->need_async_start) {
3043 GST_INFO_OBJECT (splitmux, "no async_start needed");
3047 splitmux->async_pending = TRUE;
3049 GST_INFO_OBJECT (splitmux, "Sending async_start message");
3050 message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3051 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3052 (splitmux), message);
3056 do_async_done (GstSplitMuxSink * splitmux)
3058 GstMessage *message;
3060 if (splitmux->async_pending) {
3061 GST_INFO_OBJECT (splitmux, "Sending async_done message");
3063 gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3064 GST_CLOCK_TIME_NONE);
3065 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3066 (splitmux), message);
3068 splitmux->async_pending = FALSE;
3071 splitmux->need_async_start = FALSE;
3074 static GstStateChangeReturn
3075 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3077 GstStateChangeReturn ret;
3078 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3080 switch (transition) {
3081 case GST_STATE_CHANGE_NULL_TO_READY:{
3082 GST_SPLITMUX_LOCK (splitmux);
3083 if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3084 ret = GST_STATE_CHANGE_FAILURE;
3085 GST_SPLITMUX_UNLOCK (splitmux);
3088 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3089 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3090 GST_SPLITMUX_UNLOCK (splitmux);
3091 splitmux->fragment_id = 0;
3094 case GST_STATE_CHANGE_READY_TO_PAUSED:{
3095 GST_SPLITMUX_LOCK (splitmux);
3096 /* Start by collecting one input on each pad */
3097 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3098 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3099 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3100 splitmux->gop_start_time = splitmux->fragment_start_time =
3101 GST_CLOCK_STIME_NONE;
3102 splitmux->muxed_out_bytes = 0;
3103 splitmux->ready_for_output = FALSE;
3104 GST_SPLITMUX_UNLOCK (splitmux);
3107 case GST_STATE_CHANGE_PAUSED_TO_READY:
3108 g_atomic_int_set (&(splitmux->split_requested), FALSE);
3109 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3110 case GST_STATE_CHANGE_READY_TO_NULL:
3111 GST_SPLITMUX_LOCK (splitmux);
3112 gst_queue_array_clear (splitmux->times_to_split);
3113 splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3114 splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3115 /* Wake up any blocked threads */
3116 GST_LOG_OBJECT (splitmux,
3117 "State change -> NULL or READY. Waking threads");
3118 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3119 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
3120 GST_SPLITMUX_UNLOCK (splitmux);
3126 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3127 if (ret == GST_STATE_CHANGE_FAILURE)
3130 switch (transition) {
3131 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3132 splitmux->need_async_start = TRUE;
3134 case GST_STATE_CHANGE_READY_TO_PAUSED:{
3135 /* Change state async, because our child sink might not
3136 * be ready to do that for us yet if it's state is still locked */
3138 splitmux->need_async_start = TRUE;
3139 /* we want to go async to PAUSED until we managed to configure and add the
3141 GST_SPLITMUX_LOCK (splitmux);
3142 do_async_start (splitmux);
3143 GST_SPLITMUX_UNLOCK (splitmux);
3144 ret = GST_STATE_CHANGE_ASYNC;
3147 case GST_STATE_CHANGE_READY_TO_NULL:
3148 GST_SPLITMUX_LOCK (splitmux);
3149 splitmux->fragment_id = 0;
3150 /* Reset internal elements only if no pad contexts are using them */
3151 if (splitmux->contexts == NULL)
3152 gst_splitmux_reset (splitmux);
3153 do_async_done (splitmux);
3154 GST_SPLITMUX_UNLOCK (splitmux);
3161 if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
3162 ret == GST_STATE_CHANGE_FAILURE) {
3163 /* Cleanup elements on failed transition out of NULL */
3164 gst_splitmux_reset (splitmux);
3165 GST_SPLITMUX_LOCK (splitmux);
3166 do_async_done (splitmux);
3167 GST_SPLITMUX_UNLOCK (splitmux);
3173 register_splitmuxsink (GstPlugin * plugin)
3175 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
3176 "Split File Muxing Sink");
3178 return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
3179 GST_TYPE_SPLITMUX_SINK);
3183 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
3185 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
3186 splitmux->fragment_id = 0;
3191 split_now (GstSplitMuxSink * splitmux)
3193 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
3197 split_after (GstSplitMuxSink * splitmux)
3199 g_atomic_int_set (&(splitmux->split_requested), TRUE);
3203 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
3205 gboolean send_keyframe_requests;
3207 GST_SPLITMUX_LOCK (splitmux);
3208 gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
3209 send_keyframe_requests = splitmux->send_keyframe_requests;
3210 GST_SPLITMUX_UNLOCK (splitmux);
3212 if (send_keyframe_requests) {
3214 gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
3215 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
3216 GST_TIME_ARGS (split_time));
3217 if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
3218 GST_WARNING_OBJECT (splitmux,
3219 "Could not request keyframe at %" GST_TIME_FORMAT,
3220 GST_TIME_ARGS (split_time));