1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @short_description: Muxer wrapper for splitting output stream by size or time
24 * This element wraps a muxer and a sink, and starts a new file when the mux
25 * contents are about to cross a threshold of maximum size of maximum time,
26 * splitting at video keyframe boundaries. Exactly one input video stream
27 * can be muxed, with as many accompanying audio and subtitle streams as
30 * By default, it uses mp4mux and filesink, but they can be changed via
31 * the 'muxer' and 'sink' properties.
33 * The minimum file size is 1 GOP, however - so limits may be overrun if the
34 * distance between any 2 keyframes is larger than the limits.
36 * If a video stream is available, the splitting process is driven by the video
37 * stream contents, and the video stream must contain closed GOPs for the output
38 * file parts to be played individually correctly. In the absence of a video
39 * stream, the first available stream is used as reference for synchronization.
41 * In the async-finalize mode, when the threshold is crossed, the old muxer
42 * and sink is disconnected from the pipeline and left to finish the file
43 * asynchronously, and a new muxer and sink is created to continue with the
44 * next fragment. For that reason, instead of muxer and sink objects, the
45 * muxer-factory and sink-factory properties are used to construct the new
46 * objects, together with muxer-properties and sink-properties.
49 * <title>Example pipelines</title>
51 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
53 * Records a video stream captured from a v4l2 device and muxes it into
54 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
55 * and 1MB maximum size.
58 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mkv max-size-time=10000000000 muxer-factory=matroskamux muxer-properties="properties,streamable=true"
60 * Records a video stream captured from a v4l2 device and muxer it into
61 * streamable Matroska files, splitting as needed to limit size/duration to 10
62 * seconds. Each file will finalize asynchronously.
71 #include <glib/gstdio.h>
72 #include <gst/video/video.h>
73 #include "gstsplitmuxsink.h"
75 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
76 #define GST_CAT_DEFAULT splitmux_debug
78 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
79 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
80 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
81 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
83 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
84 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
86 static void split_now (GstSplitMuxSink * splitmux);
87 static void split_after (GstSplitMuxSink * splitmux);
88 static void split_at_running_time (GstSplitMuxSink * splitmux,
89 GstClockTime split_time);
97 PROP_MAX_SIZE_TIMECODE,
98 PROP_SEND_KEYFRAME_REQUESTS,
101 PROP_USE_ROBUST_MUXING,
102 PROP_ALIGNMENT_THRESHOLD,
108 PROP_MUXER_PROPERTIES,
113 #define DEFAULT_MAX_SIZE_TIME 0
114 #define DEFAULT_MAX_SIZE_BYTES 0
115 #define DEFAULT_MAX_FILES 0
116 #define DEFAULT_MUXER_OVERHEAD 0.02
117 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
118 #define DEFAULT_ALIGNMENT_THRESHOLD 0
119 #define DEFAULT_MUXER "mp4mux"
120 #define DEFAULT_SINK "filesink"
121 #define DEFAULT_USE_ROBUST_MUXING FALSE
122 #define DEFAULT_RESET_MUXER TRUE
123 #define DEFAULT_ASYNC_FINALIZE FALSE
125 typedef struct _AsyncEosHelper
133 SIGNAL_FORMAT_LOCATION,
134 SIGNAL_FORMAT_LOCATION_FULL,
137 SIGNAL_SPLIT_AT_RUNNING_TIME,
143 static guint signals[SIGNAL_LAST];
145 static GstStaticPadTemplate video_sink_template =
146 GST_STATIC_PAD_TEMPLATE ("video",
149 GST_STATIC_CAPS_ANY);
150 static GstStaticPadTemplate audio_sink_template =
151 GST_STATIC_PAD_TEMPLATE ("audio_%u",
154 GST_STATIC_CAPS_ANY);
155 static GstStaticPadTemplate subtitle_sink_template =
156 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
159 GST_STATIC_CAPS_ANY);
160 static GstStaticPadTemplate caption_sink_template =
161 GST_STATIC_PAD_TEMPLATE ("caption_%u",
164 GST_STATIC_CAPS_ANY);
166 static GQuark PAD_CONTEXT;
167 static GQuark EOS_FROM_US;
168 static GQuark RUNNING_TIME;
169 /* EOS_FROM_US is only valid in async-finalize mode. We need to know whether
170 * to forward an incoming EOS message, but we cannot rely on the state of the
171 * splitmux anymore, so we set this qdata on the sink instead.
172 * The muxer and sink must be destroyed after both of these things have
174 * 1) The EOS message has been sent when the fragment is ending
175 * 2) The muxer has been unlinked and relinked
176 * Therefore, EOS_FROM_US can have these two values:
177 * 0: EOS was not requested from us. Forward the message. The muxer and the
178 * sink will be destroyed together with the rest of the bin.
179 * 1: EOS was requested from us, but the other of the two tasks hasn't
180 * finished. Set EOS_FROM_US to 2 and do your stuff.
181 * 2: EOS was requested from us and the other of the two tasks has finished.
182 * Now we can destroy the muxer and the sink.
188 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
189 EOS_FROM_US = g_quark_from_static_string ("eos-from-us");
190 RUNNING_TIME = g_quark_from_static_string ("running-time");
193 #define gst_splitmux_sink_parent_class parent_class
194 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
197 static gboolean create_muxer (GstSplitMuxSink * splitmux);
198 static gboolean create_sink (GstSplitMuxSink * splitmux);
199 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
200 const GValue * value, GParamSpec * pspec);
201 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
202 GValue * value, GParamSpec * pspec);
203 static void gst_splitmux_sink_dispose (GObject * object);
204 static void gst_splitmux_sink_finalize (GObject * object);
206 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
207 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
208 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
210 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
211 element, GstStateChange transition);
213 static void bus_handler (GstBin * bin, GstMessage * msg);
214 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
215 static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
216 static void mq_stream_ctx_free (MqStreamCtx * ctx);
217 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
219 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
220 static GstElement *create_element (GstSplitMuxSink * splitmux,
221 const gchar * factory, const gchar * name, gboolean locked);
223 static void do_async_done (GstSplitMuxSink * splitmux);
226 mq_stream_buf_new (void)
228 return g_slice_new0 (MqStreamBuf);
232 mq_stream_buf_free (MqStreamBuf * data)
234 g_slice_free (MqStreamBuf, data);
237 static SplitMuxOutputCommand *
238 out_cmd_buf_new (void)
240 return g_slice_new0 (SplitMuxOutputCommand);
244 out_cmd_buf_free (SplitMuxOutputCommand * data)
246 g_slice_free (SplitMuxOutputCommand, data);
250 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
252 GObjectClass *gobject_class = (GObjectClass *) klass;
253 GstElementClass *gstelement_class = (GstElementClass *) klass;
254 GstBinClass *gstbin_class = (GstBinClass *) klass;
256 gobject_class->set_property = gst_splitmux_sink_set_property;
257 gobject_class->get_property = gst_splitmux_sink_get_property;
258 gobject_class->dispose = gst_splitmux_sink_dispose;
259 gobject_class->finalize = gst_splitmux_sink_finalize;
261 gst_element_class_set_static_metadata (gstelement_class,
262 "Split Muxing Bin", "Generic/Bin/Muxer",
263 "Convenience bin that muxes incoming streams into multiple time/size limited files",
264 "Jan Schmidt <jan@centricular.com>");
266 gst_element_class_add_static_pad_template (gstelement_class,
267 &video_sink_template);
268 gst_element_class_add_static_pad_template (gstelement_class,
269 &audio_sink_template);
270 gst_element_class_add_static_pad_template (gstelement_class,
271 &subtitle_sink_template);
272 gst_element_class_add_static_pad_template (gstelement_class,
273 &caption_sink_template);
275 gstelement_class->change_state =
276 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
277 gstelement_class->request_new_pad =
278 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
279 gstelement_class->release_pad =
280 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
282 gstbin_class->handle_message = bus_handler;
284 g_object_class_install_property (gobject_class, PROP_LOCATION,
285 g_param_spec_string ("location", "File Output Pattern",
286 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
287 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
288 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
289 g_param_spec_double ("mux-overhead", "Muxing Overhead",
290 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
291 DEFAULT_MUXER_OVERHEAD,
292 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
294 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
295 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
296 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
297 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
298 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
299 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
300 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
301 DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
302 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
303 g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
304 "Maximum difference in timecode between first and last frame. "
305 "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
306 "Will only be effective if a timecode track is present.",
307 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
308 g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
309 g_param_spec_boolean ("send-keyframe-requests",
310 "Request keyframes at max-size-time",
311 "Request a keyframe every max-size-time ns to try splitting at that point. "
312 "Needs max-size-bytes to be 0 in order to be effective.",
313 DEFAULT_SEND_KEYFRAME_REQUESTS,
314 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
315 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
316 g_param_spec_uint ("max-files", "Max files",
317 "Maximum number of files to keep on disk. Once the maximum is reached,"
318 "old files start to be deleted to make room for new ones.", 0,
319 G_MAXUINT, DEFAULT_MAX_FILES,
320 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
321 g_object_class_install_property (gobject_class, PROP_ALIGNMENT_THRESHOLD,
322 g_param_spec_uint64 ("alignment-threshold", "Alignment threshold (ns)",
323 "Allow non-reference streams to be that many ns before the reference"
325 0, G_MAXUINT64, DEFAULT_ALIGNMENT_THRESHOLD,
326 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328 g_object_class_install_property (gobject_class, PROP_MUXER,
329 g_param_spec_object ("muxer", "Muxer",
330 "The muxer element to use (NULL = default mp4mux). "
331 "Valid only for async-finalize = FALSE",
332 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
333 g_object_class_install_property (gobject_class, PROP_SINK,
334 g_param_spec_object ("sink", "Sink",
335 "The sink element (or element chain) to use (NULL = default filesink). "
336 "Valid only for async-finalize = FALSE",
337 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
339 g_object_class_install_property (gobject_class, PROP_USE_ROBUST_MUXING,
340 g_param_spec_boolean ("use-robust-muxing",
341 "Support robust-muxing mode of some muxers",
342 "Check if muxers support robust muxing via the reserved-max-duration and "
343 "reserved-duration-remaining properties and use them if so. "
344 "(Only present on qtmux and mp4mux for now). splitmuxsink may then also "
345 " create new fragments if the reserved header space is about to overflow. "
346 "Note that for mp4mux and qtmux, reserved-moov-update-period must be set "
347 "manually by the app to a non-zero value for robust muxing to have an effect.",
348 DEFAULT_USE_ROBUST_MUXING,
349 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
351 g_object_class_install_property (gobject_class, PROP_RESET_MUXER,
352 g_param_spec_boolean ("reset-muxer",
354 "Reset the muxer after each segment. Disabling this will not work for most muxers.",
355 DEFAULT_RESET_MUXER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
357 g_object_class_install_property (gobject_class, PROP_ASYNC_FINALIZE,
358 g_param_spec_boolean ("async-finalize",
359 "Finalize fragments asynchronously",
360 "Finalize each fragment asynchronously and start a new one",
361 DEFAULT_ASYNC_FINALIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362 g_object_class_install_property (gobject_class, PROP_MUXER_FACTORY,
363 g_param_spec_string ("muxer-factory", "Muxer factory",
364 "The muxer element factory to use (default = mp4mux). "
365 "Valid only for async-finalize = TRUE",
366 "mp4mux", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
367 g_object_class_install_property (gobject_class, PROP_MUXER_PROPERTIES,
368 g_param_spec_boxed ("muxer-properties", "Muxer properties",
369 "The muxer element properties to use. "
370 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
371 "Valid only for async-finalize = TRUE",
372 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
373 g_object_class_install_property (gobject_class, PROP_SINK_FACTORY,
374 g_param_spec_string ("sink-factory", "Sink factory",
375 "The sink element factory to use (default = filesink). "
376 "Valid only for async-finalize = TRUE",
377 "filesink", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
378 g_object_class_install_property (gobject_class, PROP_SINK_PROPERTIES,
379 g_param_spec_boxed ("sink-properties", "Sink properties",
380 "The sink element properties to use. "
381 "Example: {properties,boolean-prop=true,string-prop=\"hi\"}. "
382 "Valid only for async-finalize = TRUE",
383 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
386 * GstSplitMuxSink::format-location:
387 * @splitmux: the #GstSplitMuxSink
388 * @fragment_id: the sequence number of the file to be created
390 * Returns: the location to be used for the next output file
392 signals[SIGNAL_FORMAT_LOCATION] =
393 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
394 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
397 * GstSplitMuxSink::format-location-full:
398 * @splitmux: the #GstSplitMuxSink
399 * @fragment_id: the sequence number of the file to be created
400 * @first_sample: A #GstSample containing the first buffer
401 * from the reference stream in the new file
403 * Returns: the location to be used for the next output file
405 signals[SIGNAL_FORMAT_LOCATION_FULL] =
406 g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
407 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
411 * GstSplitMuxSink::split-now:
412 * @splitmux: the #GstSplitMuxSink
414 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
415 * The current GOP will be output to the new file.
419 signals[SIGNAL_SPLIT_NOW] =
420 g_signal_new ("split-now", G_TYPE_FROM_CLASS (klass),
421 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
422 split_now), NULL, NULL, NULL, G_TYPE_NONE, 0);
425 * GstSplitMuxSink::split-after:
426 * @splitmux: the #GstSplitMuxSink
428 * When called by the user, this action signal splits the video file (and begins a new one) immediately.
429 * The current GOP will be output to the old file.
433 signals[SIGNAL_SPLIT_AFTER] =
434 g_signal_new ("split-after", G_TYPE_FROM_CLASS (klass),
435 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
436 split_after), NULL, NULL, NULL, G_TYPE_NONE, 0);
439 * GstSplitMuxSink::split-now:
440 * @splitmux: the #GstSplitMuxSink
442 * When called by the user, this action signal splits the video file (and
443 * begins a new one) as soon as the given running time is reached. If this
444 * action signal is called multiple times, running times are queued up and
445 * processed in the order they were given.
447 * Note that this is prone to race conditions, where said running time is
448 * reached and surpassed before we had a chance to split. The file will
449 * still split immediately, but in order to make sure that the split doesn't
450 * happen too late, it is recommended to call this action signal from
451 * something that will prevent further buffers from flowing into
452 * splitmuxsink before the split is completed, such as a pad probe before
458 signals[SIGNAL_SPLIT_AT_RUNNING_TIME] =
459 g_signal_new ("split-at-running-time", G_TYPE_FROM_CLASS (klass),
460 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstSplitMuxSinkClass,
461 split_at_running_time), NULL, NULL, NULL, G_TYPE_NONE, 1,
465 * GstSplitMuxSink::muxer-added:
466 * @splitmux: the #GstSplitMuxSink
467 * @muxer: the newly added muxer element
471 signals[SIGNAL_MUXER_ADDED] =
472 g_signal_new ("muxer-added", G_TYPE_FROM_CLASS (klass),
473 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
476 * GstSplitMuxSink::sink-added:
477 * @splitmux: the #GstSplitMuxSink
478 * @sink: the newly added sink element
482 signals[SIGNAL_SINK_ADDED] =
483 g_signal_new ("sink-added", G_TYPE_FROM_CLASS (klass),
484 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
486 klass->split_now = split_now;
487 klass->split_after = split_after;
488 klass->split_at_running_time = split_at_running_time;
492 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
494 g_mutex_init (&splitmux->lock);
495 g_cond_init (&splitmux->input_cond);
496 g_cond_init (&splitmux->output_cond);
497 g_queue_init (&splitmux->out_cmd_q);
499 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
500 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
501 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
502 splitmux->max_files = DEFAULT_MAX_FILES;
503 splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
504 splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
505 splitmux->alignment_threshold = DEFAULT_ALIGNMENT_THRESHOLD;
506 splitmux->use_robust_muxing = DEFAULT_USE_ROBUST_MUXING;
507 splitmux->reset_muxer = DEFAULT_RESET_MUXER;
509 splitmux->threshold_timecode_str = NULL;
511 splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
512 splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
513 splitmux->muxer_properties = NULL;
514 splitmux->sink_factory = g_strdup (DEFAULT_SINK);
515 splitmux->sink_properties = NULL;
517 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
518 splitmux->split_requested = FALSE;
519 splitmux->do_split_next_gop = FALSE;
520 splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
524 gst_splitmux_reset (GstSplitMuxSink * splitmux)
526 if (splitmux->muxer) {
527 gst_element_set_locked_state (splitmux->muxer, TRUE);
528 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
529 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
531 if (splitmux->active_sink) {
532 gst_element_set_locked_state (splitmux->active_sink, TRUE);
533 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
534 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
537 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
541 gst_splitmux_sink_dispose (GObject * object)
543 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
545 /* Calling parent dispose invalidates all child pointers */
546 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
548 G_OBJECT_CLASS (parent_class)->dispose (object);
552 gst_splitmux_sink_finalize (GObject * object)
554 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
555 g_cond_clear (&splitmux->input_cond);
556 g_cond_clear (&splitmux->output_cond);
557 g_mutex_clear (&splitmux->lock);
558 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
559 g_queue_clear (&splitmux->out_cmd_q);
561 if (splitmux->provided_sink)
562 gst_object_unref (splitmux->provided_sink);
563 if (splitmux->provided_muxer)
564 gst_object_unref (splitmux->provided_muxer);
566 if (splitmux->muxer_factory)
567 g_free (splitmux->muxer_factory);
568 if (splitmux->muxer_properties)
569 gst_structure_free (splitmux->muxer_properties);
570 if (splitmux->sink_factory)
571 g_free (splitmux->sink_factory);
572 if (splitmux->sink_properties)
573 gst_structure_free (splitmux->sink_properties);
575 if (splitmux->threshold_timecode_str)
576 g_free (splitmux->threshold_timecode_str);
578 if (splitmux->times_to_split)
579 gst_queue_array_free (splitmux->times_to_split);
581 g_free (splitmux->location);
583 /* Make sure to free any un-released contexts. There should not be any,
584 * because the dispose will have freed all request pads though */
585 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
586 g_list_free (splitmux->contexts);
588 G_OBJECT_CLASS (parent_class)->finalize (object);
592 * Set any time threshold to the muxer, if it has
593 * reserved-max-duration and reserved-duration-remaining
594 * properties. Called when creating/claiming the muxer
595 * in create_elements() */
597 update_muxer_properties (GstSplitMuxSink * sink)
600 GstClockTime threshold_time;
602 sink->muxer_has_reserved_props = FALSE;
603 if (sink->muxer == NULL)
605 klass = G_OBJECT_GET_CLASS (sink->muxer);
606 if (g_object_class_find_property (klass, "reserved-max-duration") == NULL)
608 if (g_object_class_find_property (klass,
609 "reserved-duration-remaining") == NULL)
611 sink->muxer_has_reserved_props = TRUE;
613 GST_LOG_OBJECT (sink, "Setting muxer reserved time to %" GST_TIME_FORMAT,
614 GST_TIME_ARGS (sink->threshold_time));
615 GST_OBJECT_LOCK (sink);
616 threshold_time = sink->threshold_time;
617 GST_OBJECT_UNLOCK (sink);
619 if (threshold_time > 0) {
620 /* Tell the muxer how much space to reserve */
621 GstClockTime muxer_threshold = threshold_time;
622 g_object_set (sink->muxer, "reserved-max-duration", muxer_threshold, NULL);
627 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
628 const GValue * value, GParamSpec * pspec)
630 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
634 GST_OBJECT_LOCK (splitmux);
635 g_free (splitmux->location);
636 splitmux->location = g_value_dup_string (value);
637 GST_OBJECT_UNLOCK (splitmux);
640 case PROP_MAX_SIZE_BYTES:
641 GST_OBJECT_LOCK (splitmux);
642 splitmux->threshold_bytes = g_value_get_uint64 (value);
643 GST_OBJECT_UNLOCK (splitmux);
645 case PROP_MAX_SIZE_TIME:
646 GST_OBJECT_LOCK (splitmux);
647 splitmux->threshold_time = g_value_get_uint64 (value);
648 GST_OBJECT_UNLOCK (splitmux);
650 case PROP_MAX_SIZE_TIMECODE:
651 GST_OBJECT_LOCK (splitmux);
652 splitmux->threshold_timecode_str = g_value_dup_string (value);
653 GST_OBJECT_UNLOCK (splitmux);
655 case PROP_SEND_KEYFRAME_REQUESTS:
656 GST_OBJECT_LOCK (splitmux);
657 splitmux->send_keyframe_requests = g_value_get_boolean (value);
658 GST_OBJECT_UNLOCK (splitmux);
661 GST_OBJECT_LOCK (splitmux);
662 splitmux->max_files = g_value_get_uint (value);
663 GST_OBJECT_UNLOCK (splitmux);
665 case PROP_MUXER_OVERHEAD:
666 GST_OBJECT_LOCK (splitmux);
667 splitmux->mux_overhead = g_value_get_double (value);
668 GST_OBJECT_UNLOCK (splitmux);
670 case PROP_USE_ROBUST_MUXING:
671 GST_OBJECT_LOCK (splitmux);
672 splitmux->use_robust_muxing = g_value_get_boolean (value);
673 GST_OBJECT_UNLOCK (splitmux);
674 if (splitmux->use_robust_muxing)
675 update_muxer_properties (splitmux);
677 case PROP_ALIGNMENT_THRESHOLD:
678 GST_OBJECT_LOCK (splitmux);
679 splitmux->alignment_threshold = g_value_get_uint64 (value);
680 GST_OBJECT_UNLOCK (splitmux);
683 GST_OBJECT_LOCK (splitmux);
684 if (splitmux->provided_sink)
685 gst_object_unref (splitmux->provided_sink);
686 splitmux->provided_sink = g_value_get_object (value);
687 gst_object_ref_sink (splitmux->provided_sink);
688 GST_OBJECT_UNLOCK (splitmux);
691 GST_OBJECT_LOCK (splitmux);
692 if (splitmux->provided_muxer)
693 gst_object_unref (splitmux->provided_muxer);
694 splitmux->provided_muxer = g_value_get_object (value);
695 gst_object_ref_sink (splitmux->provided_muxer);
696 GST_OBJECT_UNLOCK (splitmux);
698 case PROP_RESET_MUXER:
699 GST_OBJECT_LOCK (splitmux);
700 splitmux->reset_muxer = g_value_get_boolean (value);
701 GST_OBJECT_UNLOCK (splitmux);
703 case PROP_ASYNC_FINALIZE:
704 GST_OBJECT_LOCK (splitmux);
705 splitmux->async_finalize = g_value_get_boolean (value);
706 GST_OBJECT_UNLOCK (splitmux);
708 case PROP_MUXER_FACTORY:
709 GST_OBJECT_LOCK (splitmux);
710 if (splitmux->muxer_factory)
711 g_free (splitmux->muxer_factory);
712 splitmux->muxer_factory = g_value_dup_string (value);
713 GST_OBJECT_UNLOCK (splitmux);
715 case PROP_MUXER_PROPERTIES:
716 GST_OBJECT_LOCK (splitmux);
717 if (splitmux->muxer_properties)
718 gst_structure_free (splitmux->muxer_properties);
719 if (gst_value_get_structure (value))
720 splitmux->muxer_properties =
721 gst_structure_copy (gst_value_get_structure (value));
723 splitmux->muxer_properties = NULL;
724 GST_OBJECT_UNLOCK (splitmux);
726 case PROP_SINK_FACTORY:
727 GST_OBJECT_LOCK (splitmux);
728 if (splitmux->sink_factory)
729 g_free (splitmux->sink_factory);
730 splitmux->sink_factory = g_value_dup_string (value);
731 GST_OBJECT_UNLOCK (splitmux);
733 case PROP_SINK_PROPERTIES:
734 GST_OBJECT_LOCK (splitmux);
735 if (splitmux->sink_properties)
736 gst_structure_free (splitmux->sink_properties);
737 if (gst_value_get_structure (value))
738 splitmux->sink_properties =
739 gst_structure_copy (gst_value_get_structure (value));
741 splitmux->sink_properties = NULL;
742 GST_OBJECT_UNLOCK (splitmux);
745 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
751 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
752 GValue * value, GParamSpec * pspec)
754 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
758 GST_OBJECT_LOCK (splitmux);
759 g_value_set_string (value, splitmux->location);
760 GST_OBJECT_UNLOCK (splitmux);
762 case PROP_MAX_SIZE_BYTES:
763 GST_OBJECT_LOCK (splitmux);
764 g_value_set_uint64 (value, splitmux->threshold_bytes);
765 GST_OBJECT_UNLOCK (splitmux);
767 case PROP_MAX_SIZE_TIME:
768 GST_OBJECT_LOCK (splitmux);
769 g_value_set_uint64 (value, splitmux->threshold_time);
770 GST_OBJECT_UNLOCK (splitmux);
772 case PROP_MAX_SIZE_TIMECODE:
773 GST_OBJECT_LOCK (splitmux);
774 g_value_set_string (value, splitmux->threshold_timecode_str);
775 GST_OBJECT_UNLOCK (splitmux);
777 case PROP_SEND_KEYFRAME_REQUESTS:
778 GST_OBJECT_LOCK (splitmux);
779 g_value_set_boolean (value, splitmux->send_keyframe_requests);
780 GST_OBJECT_UNLOCK (splitmux);
783 GST_OBJECT_LOCK (splitmux);
784 g_value_set_uint (value, splitmux->max_files);
785 GST_OBJECT_UNLOCK (splitmux);
787 case PROP_MUXER_OVERHEAD:
788 GST_OBJECT_LOCK (splitmux);
789 g_value_set_double (value, splitmux->mux_overhead);
790 GST_OBJECT_UNLOCK (splitmux);
792 case PROP_USE_ROBUST_MUXING:
793 GST_OBJECT_LOCK (splitmux);
794 g_value_set_boolean (value, splitmux->use_robust_muxing);
795 GST_OBJECT_UNLOCK (splitmux);
797 case PROP_ALIGNMENT_THRESHOLD:
798 GST_OBJECT_LOCK (splitmux);
799 g_value_set_uint64 (value, splitmux->alignment_threshold);
800 GST_OBJECT_UNLOCK (splitmux);
803 GST_OBJECT_LOCK (splitmux);
804 g_value_set_object (value, splitmux->provided_sink);
805 GST_OBJECT_UNLOCK (splitmux);
808 GST_OBJECT_LOCK (splitmux);
809 g_value_set_object (value, splitmux->provided_muxer);
810 GST_OBJECT_UNLOCK (splitmux);
812 case PROP_RESET_MUXER:
813 GST_OBJECT_LOCK (splitmux);
814 g_value_set_boolean (value, splitmux->reset_muxer);
815 GST_OBJECT_UNLOCK (splitmux);
817 case PROP_ASYNC_FINALIZE:
818 GST_OBJECT_LOCK (splitmux);
819 g_value_set_boolean (value, splitmux->async_finalize);
820 GST_OBJECT_UNLOCK (splitmux);
822 case PROP_MUXER_FACTORY:
823 GST_OBJECT_LOCK (splitmux);
824 g_value_set_string (value, splitmux->muxer_factory);
825 GST_OBJECT_UNLOCK (splitmux);
827 case PROP_MUXER_PROPERTIES:
828 GST_OBJECT_LOCK (splitmux);
829 gst_value_set_structure (value, splitmux->muxer_properties);
830 GST_OBJECT_UNLOCK (splitmux);
832 case PROP_SINK_FACTORY:
833 GST_OBJECT_LOCK (splitmux);
834 g_value_set_string (value, splitmux->sink_factory);
835 GST_OBJECT_UNLOCK (splitmux);
837 case PROP_SINK_PROPERTIES:
838 GST_OBJECT_LOCK (splitmux);
839 gst_value_set_structure (value, splitmux->sink_properties);
840 GST_OBJECT_UNLOCK (splitmux);
843 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
848 /* Convenience function */
849 static inline GstClockTimeDiff
850 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
852 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
854 if (GST_CLOCK_TIME_IS_VALID (val)) {
856 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
866 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
870 ctx = g_new0 (MqStreamCtx, 1);
871 ctx->splitmux = splitmux;
872 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
873 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
874 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
875 g_queue_init (&ctx->queued_bufs);
880 mq_stream_ctx_free (MqStreamCtx * ctx)
883 GstObject *parent = gst_object_get_parent (GST_OBJECT (ctx->q));
885 g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
887 if (parent == GST_OBJECT_CAST (ctx->splitmux)) {
888 gst_element_set_locked_state (ctx->q, TRUE);
889 gst_element_set_state (ctx->q, GST_STATE_NULL);
890 gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
891 gst_object_unref (parent);
893 gst_object_unref (ctx->q);
895 gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
896 gst_object_unref (ctx->sinkpad);
897 gst_object_unref (ctx->srcpad);
898 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
899 g_queue_clear (&ctx->queued_bufs);
904 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened,
907 gchar *location = NULL;
909 const gchar *msg_name = opened ?
910 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
911 GstClockTime running_time = splitmux->reference_ctx->out_running_time;
914 GstClockTime *rtime = g_object_get_qdata (G_OBJECT (sink), RUNNING_TIME);
916 running_time = *rtime;
919 g_object_get (sink, "location", &location, NULL);
921 /* If it's in the middle of a teardown, the reference_ctc might have become
923 if (splitmux->reference_ctx) {
924 msg = gst_message_new_element (GST_OBJECT (splitmux),
925 gst_structure_new (msg_name,
926 "location", G_TYPE_STRING, location,
927 "running-time", GST_TYPE_CLOCK_TIME, running_time, NULL));
928 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
935 send_eos_async (GstSplitMuxSink * splitmux, AsyncEosHelper * helper)
941 eos = gst_event_new_eos ();
945 GST_SPLITMUX_LOCK (splitmux);
947 pad = gst_pad_get_peer (ctx->srcpad);
948 GST_SPLITMUX_UNLOCK (splitmux);
950 gst_pad_send_event (pad, eos);
951 GST_INFO_OBJECT (splitmux, "Sent async EOS on %" GST_PTR_FORMAT, pad);
953 gst_object_unref (pad);
957 /* Called with lock held, drops the lock to send EOS to the
961 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
966 eos = gst_event_new_eos ();
967 pad = gst_pad_get_peer (ctx->srcpad);
971 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
972 GST_SPLITMUX_UNLOCK (splitmux);
973 gst_pad_send_event (pad, eos);
974 GST_SPLITMUX_LOCK (splitmux);
976 gst_object_unref (pad);
979 /* Called with lock held. Schedules an EOS event to the ctx pad
980 * to happen in another thread */
982 eos_context_async (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
984 AsyncEosHelper *helper = g_new0 (AsyncEosHelper, 1);
985 GstPad *srcpad, *sinkpad;
987 srcpad = ctx->srcpad;
988 sinkpad = gst_pad_get_peer (srcpad);
991 helper->pad = sinkpad; /* Takes the reference */
993 ctx->out_eos_async_done = TRUE;
994 /* HACK: Here, we explicitly unset the SINK flag on the target sink element
995 * that's about to be asynchronously disposed, so that it no longer
996 * participates in GstBin EOS logic. This fixes a race where if
997 * splitmuxsink really reaches EOS before an asynchronous background
998 * element has finished, then the bin won't actually send EOS to the
999 * pipeline. Even after finishing and removing the old element, the
1000 * bin doesn't re-check EOS status on removing a SINK element. This
1001 * should be fixed in core, making this hack unnecessary. */
1002 GST_OBJECT_FLAG_UNSET (splitmux->active_sink, GST_ELEMENT_FLAG_SINK);
1004 GST_DEBUG_OBJECT (splitmux, "scheduled EOS to pad %" GST_PTR_FORMAT " ctx %p",
1007 g_assert_nonnull (helper->pad);
1008 gst_element_call_async (GST_ELEMENT (splitmux),
1009 (GstElementCallAsyncFunc) send_eos_async, helper, NULL);
1012 /* Called with lock held. TRUE iff all contexts have a
1013 * pending (or delivered) async eos event */
1015 all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
1017 gboolean ret = TRUE;
1020 for (item = splitmux->contexts; item; item = item->next) {
1021 MqStreamCtx *ctx = item->data;
1022 ret &= ctx->out_eos_async_done;
1027 /* Called with splitmux lock held to check if this output
1028 * context needs to sleep to wait for the release of the
1029 * next GOP, or to send EOS to close out the current file
1032 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1034 if (ctx->caps_change)
1038 /* When first starting up, the reference stream has to output
1039 * the first buffer to prepare the muxer and sink */
1040 gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
1041 GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
1043 if (!(splitmux->max_out_running_time == 0 ||
1044 splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1045 splitmux->alignment_threshold == 0 ||
1046 splitmux->max_out_running_time < splitmux->alignment_threshold)) {
1047 my_max_out_running_time -= splitmux->alignment_threshold;
1048 GST_LOG_OBJECT (ctx->srcpad,
1049 "Max out running time currently %" GST_STIME_FORMAT
1050 ", with threshold applied it is %" GST_STIME_FORMAT,
1051 GST_STIME_ARGS (splitmux->max_out_running_time),
1052 GST_STIME_ARGS (my_max_out_running_time));
1056 || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1059 GST_LOG_OBJECT (ctx->srcpad,
1060 "Checking running time %" GST_STIME_FORMAT " against max %"
1061 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
1062 GST_STIME_ARGS (my_max_out_running_time));
1065 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
1066 ctx->out_running_time < my_max_out_running_time) {
1070 switch (splitmux->output_state) {
1071 case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
1072 /* We only get here if we've finished outputting a GOP and need to know
1073 * what to do next */
1074 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1075 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1078 case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
1079 /* We've reached the max out running_time to get here, so end this file now */
1080 if (ctx->out_eos == FALSE) {
1081 if (splitmux->async_finalize) {
1082 /* We must set EOS asynchronously at this point. We cannot defer
1083 * it, because we need all contexts to wake up, for the
1084 * reference context to eventually give us something at
1085 * START_NEXT_FILE. Otherwise, collectpads might choose another
1086 * context to give us the first buffer, and format-location-full
1087 * will not contain a valid sample. */
1088 g_object_set_qdata ((GObject *) splitmux->sink, EOS_FROM_US,
1089 GINT_TO_POINTER (1));
1090 eos_context_async (ctx, splitmux);
1091 if (all_contexts_are_async_eos (splitmux)) {
1092 GST_INFO_OBJECT (splitmux,
1093 "All contexts are async_eos. Moving to the next file.");
1094 /* We can start the next file once we've asked each pad to go EOS */
1095 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1096 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1100 send_eos (splitmux, ctx);
1104 GST_INFO_OBJECT (splitmux,
1105 "At end-of-file state, but context %p is already EOS", ctx);
1108 case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
1109 if (ctx->is_reference) {
1110 /* Special handling on the reference ctx to start new fragments
1111 * and collect commands from the command queue */
1112 /* drops the splitmux lock briefly: */
1113 /* We must have reference ctx in order for format-location-full to
1115 start_next_fragment (splitmux, ctx);
1119 case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
1121 SplitMuxOutputCommand *cmd =
1122 g_queue_pop_tail (&splitmux->out_cmd_q);
1124 /* If we pop the last command, we need to make our queues bigger */
1125 if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
1126 grow_blocked_queues (splitmux);
1128 if (cmd->start_new_fragment) {
1129 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
1130 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1132 GST_DEBUG_OBJECT (splitmux,
1133 "Got new output cmd for time %" GST_STIME_FORMAT,
1134 GST_STIME_ARGS (cmd->max_output_ts));
1136 /* Extend the output range immediately */
1137 splitmux->max_out_running_time = cmd->max_output_ts;
1138 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
1140 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1142 out_cmd_buf_free (cmd);
1145 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1147 } while (splitmux->output_state ==
1148 SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
1149 /* loop and re-check the state */
1152 case SPLITMUX_OUTPUT_STATE_STOPPED:
1157 GST_INFO_OBJECT (ctx->srcpad,
1158 "Sleeping for running time %"
1159 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
1160 GST_STIME_ARGS (ctx->out_running_time),
1161 GST_STIME_ARGS (splitmux->max_out_running_time));
1162 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
1163 GST_INFO_OBJECT (ctx->srcpad,
1164 "Woken for new max running time %" GST_STIME_FORMAT,
1165 GST_STIME_ARGS (splitmux->max_out_running_time));
1171 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
1172 const GstVideoTimeCode * cur_tc)
1174 GstVideoTimeCode *target_tc;
1175 GstVideoTimeCodeInterval *tc_inter;
1176 GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
1178 if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
1179 return GST_CLOCK_TIME_NONE;
1182 gst_video_time_code_interval_new_from_string
1183 (splitmux->threshold_timecode_str);
1184 target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
1185 gst_video_time_code_interval_free (tc_inter);
1188 target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
1189 cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
1191 /* Add fragment_start_time, accounting for wraparound */
1192 if (target_tc_time >= cur_tc_time) {
1194 target_tc_time - cur_tc_time + splitmux->fragment_start_time;
1196 GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
1198 if ((cur_tc->config.flags & GST_VIDEO_TIME_CODE_FLAGS_DROP_FRAME) &&
1199 (cur_tc->config.fps_d == 1001)) {
1200 /* Checking fps_d is probably unneeded, but better safe than sorry
1201 * (e.g. someone accidentally set a flag) */
1202 GstVideoTimeCode *tc_for_offset;
1204 /* Here, the duration of the 24:00:00;00 timecode isn't exactly one day,
1205 * but slightly less. Calculate that duration from a fake timecode. The
1206 * problem is that 24:00:00;00 isn't a valid timecode, so the workaround
1207 * is to add one frame to 23:59:59;29 */
1209 gst_video_time_code_new (cur_tc->config.fps_n, cur_tc->config.fps_d,
1210 NULL, cur_tc->config.flags, 23, 59, 59,
1211 cur_tc->config.fps_n / cur_tc->config.fps_d, 0);
1213 gst_video_time_code_nsec_since_daily_jam (tc_for_offset) +
1214 gst_util_uint64_scale (GST_SECOND, cur_tc->config.fps_d,
1215 cur_tc->config.fps_n);
1216 gst_video_time_code_free (tc_for_offset);
1219 day_in_ns - cur_tc_time + target_tc_time +
1220 splitmux->fragment_start_time;
1223 GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
1224 " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
1225 GST_TIME_ARGS (cur_tc_time));
1226 gst_video_time_code_free (target_tc);
1228 return next_max_tc_time;
1232 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
1235 GstClockTime target_time;
1236 gboolean timecode_based = FALSE;
1238 splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
1239 if (splitmux->threshold_timecode_str) {
1240 GstVideoTimeCodeMeta *tc_meta;
1242 if (buffer != NULL) {
1243 tc_meta = gst_buffer_get_video_time_code_meta (buffer);
1245 splitmux->next_max_tc_time =
1246 calculate_next_max_timecode (splitmux, &tc_meta->tc);
1247 timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
1250 /* This can happen in the presence of GAP events that trigger
1251 * a new fragment start */
1252 GST_WARNING_OBJECT (splitmux,
1253 "No buffer available to calculate next timecode");
1257 if (splitmux->send_keyframe_requests == FALSE
1258 || (splitmux->threshold_time == 0 && !timecode_based)
1259 || splitmux->threshold_bytes != 0)
1262 if (timecode_based) {
1263 /* We might have rounding errors: aim slightly earlier */
1264 target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
1266 target_time = splitmux->fragment_start_time + splitmux->threshold_time;
1268 ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
1269 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
1270 GST_TIME_ARGS (target_time));
1271 return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
1274 static GstPadProbeReturn
1275 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1277 GstSplitMuxSink *splitmux = ctx->splitmux;
1278 MqStreamBuf *buf_info = NULL;
1280 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1282 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1283 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1284 g_warning ("Buffer list handling not implemented");
1285 return GST_PAD_PROBE_DROP;
1287 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1288 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1289 GstEvent *event = gst_pad_probe_info_get_event (info);
1290 gboolean locked = FALSE;
1292 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1294 switch (GST_EVENT_TYPE (event)) {
1295 case GST_EVENT_SEGMENT:
1296 gst_event_copy_segment (event, &ctx->out_segment);
1298 case GST_EVENT_FLUSH_STOP:
1299 GST_SPLITMUX_LOCK (splitmux);
1301 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
1302 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
1303 g_queue_clear (&ctx->queued_bufs);
1304 ctx->flushing = FALSE;
1306 case GST_EVENT_FLUSH_START:
1307 GST_SPLITMUX_LOCK (splitmux);
1309 GST_LOG_OBJECT (pad, "Flush start");
1310 ctx->flushing = TRUE;
1311 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1314 GST_SPLITMUX_LOCK (splitmux);
1316 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1318 ctx->out_eos = TRUE;
1319 GST_INFO_OBJECT (splitmux,
1320 "Have EOS event at pad %" GST_PTR_FORMAT " ctx %p", pad, ctx);
1322 case GST_EVENT_GAP:{
1323 GstClockTime gap_ts;
1324 GstClockTimeDiff rtime;
1326 gst_event_parse_gap (event, &gap_ts, NULL);
1327 if (gap_ts == GST_CLOCK_TIME_NONE)
1330 GST_SPLITMUX_LOCK (splitmux);
1333 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1336 /* When we get a gap event on the
1337 * reference stream and we're trying to open a
1338 * new file, we need to store it until we get
1339 * the buffer afterwards
1341 if (ctx->is_reference &&
1342 (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
1343 GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
1344 gst_event_replace (&ctx->pending_gap, event);
1345 GST_SPLITMUX_UNLOCK (splitmux);
1346 return GST_PAD_PROBE_HANDLED;
1349 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
1351 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1352 GST_STIME_ARGS (rtime));
1354 if (rtime != GST_CLOCK_STIME_NONE) {
1355 ctx->out_running_time = rtime;
1356 complete_or_wait_on_out (splitmux, ctx);
1360 case GST_EVENT_CUSTOM_DOWNSTREAM:{
1361 const GstStructure *s;
1362 GstClockTimeDiff ts = 0;
1364 s = gst_event_get_structure (event);
1365 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
1368 gst_structure_get_int64 (s, "timestamp", &ts);
1370 GST_SPLITMUX_LOCK (splitmux);
1373 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
1375 ctx->out_running_time = ts;
1376 if (!ctx->is_reference)
1377 complete_or_wait_on_out (splitmux, ctx);
1378 GST_SPLITMUX_UNLOCK (splitmux);
1379 return GST_PAD_PROBE_DROP;
1381 case GST_EVENT_CAPS:{
1384 if (!ctx->is_reference)
1387 peer = gst_pad_get_peer (pad);
1389 gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
1391 gst_object_unref (peer);
1399 /* This is in the case the muxer doesn't allow this change of caps */
1400 GST_SPLITMUX_LOCK (splitmux);
1402 ctx->caps_change = TRUE;
1404 if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
1405 GST_DEBUG_OBJECT (splitmux,
1406 "New caps were not accepted. Switching output file");
1407 if (ctx->out_eos == FALSE) {
1408 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
1409 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1413 /* Lets it fall through, if it fails again, then the muxer just can't
1414 * support this format, but at least we have a closed file.
1422 /* We need to make sure events aren't passed
1423 * until the muxer / sink are ready for it */
1425 GST_SPLITMUX_LOCK (splitmux);
1426 if (!ctx->is_reference)
1427 complete_or_wait_on_out (splitmux, ctx);
1428 GST_SPLITMUX_UNLOCK (splitmux);
1430 /* Don't try to forward sticky events before the next buffer is there
1431 * because it would cause a new file to be created without the first
1432 * buffer being available.
1434 if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
1435 gst_event_unref (event);
1436 return GST_PAD_PROBE_HANDLED;
1438 return GST_PAD_PROBE_PASS;
1441 /* Allow everything through until the configured next stopping point */
1442 GST_SPLITMUX_LOCK (splitmux);
1444 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
1445 if (buf_info == NULL)
1446 /* Can only happen due to a poorly timed flush */
1449 /* If we have popped a keyframe, decrement the queued_gop count */
1450 if (buf_info->keyframe && splitmux->queued_keyframes > 0)
1451 splitmux->queued_keyframes--;
1453 ctx->out_running_time = buf_info->run_ts;
1454 ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
1456 GST_LOG_OBJECT (splitmux,
1457 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
1458 " size %" G_GUINT64_FORMAT,
1459 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
1461 ctx->caps_change = FALSE;
1463 complete_or_wait_on_out (splitmux, ctx);
1465 splitmux->muxed_out_bytes += buf_info->buf_size;
1467 #ifndef GST_DISABLE_GST_DEBUG
1469 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
1470 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
1471 " run ts %" GST_STIME_FORMAT, buf,
1472 GST_STIME_ARGS (ctx->out_running_time));
1476 ctx->cur_out_buffer = NULL;
1477 GST_SPLITMUX_UNLOCK (splitmux);
1479 /* pending_gap is protected by the STREAM lock */
1480 if (ctx->pending_gap) {
1481 /* If we previously stored a gap event, send it now */
1482 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1484 GST_DEBUG_OBJECT (splitmux,
1485 "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1487 gst_pad_send_event (peer, ctx->pending_gap);
1488 ctx->pending_gap = NULL;
1490 gst_object_unref (peer);
1493 mq_stream_buf_free (buf_info);
1495 return GST_PAD_PROBE_PASS;
1498 GST_SPLITMUX_UNLOCK (splitmux);
1499 return GST_PAD_PROBE_DROP;
1503 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1505 return gst_pad_send_event (peer, gst_event_ref (*event));
1509 unlock_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1511 if (ctx->fragment_block_id > 0) {
1512 gst_pad_remove_probe (ctx->srcpad, ctx->fragment_block_id);
1513 ctx->fragment_block_id = 0;
1518 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1520 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1522 gst_pad_sticky_events_foreach (ctx->srcpad,
1523 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1525 /* Clear EOS flag if not actually EOS */
1526 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1527 ctx->out_eos_async_done = ctx->out_eos;
1529 gst_object_unref (peer);
1533 relink_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1535 GstPad *sinkpad, *srcpad, *newpad;
1536 GstPadTemplate *templ;
1538 srcpad = ctx->srcpad;
1539 sinkpad = gst_pad_get_peer (srcpad);
1541 templ = sinkpad->padtemplate;
1543 gst_element_request_pad (splitmux->muxer, templ,
1544 GST_PAD_TEMPLATE_NAME_TEMPLATE (templ), NULL);
1546 GST_DEBUG_OBJECT (splitmux, "Relinking ctx %p to pad %" GST_PTR_FORMAT, ctx,
1548 if (!gst_pad_unlink (srcpad, sinkpad)) {
1549 gst_object_unref (sinkpad);
1552 if (gst_pad_link_full (srcpad, newpad,
1553 GST_PAD_LINK_CHECK_NO_RECONFIGURE) != GST_PAD_LINK_OK) {
1554 gst_element_release_request_pad (splitmux->muxer, newpad);
1555 gst_object_unref (sinkpad);
1556 gst_object_unref (newpad);
1559 gst_object_unref (newpad);
1560 gst_object_unref (sinkpad);
1564 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1565 ("Could not create the new muxer/sink"), NULL);
1568 static GstPadProbeReturn
1569 _block_pad (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1571 return GST_PAD_PROBE_OK;
1575 block_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1577 ctx->fragment_block_id =
1578 gst_pad_add_probe (ctx->srcpad, GST_PAD_PROBE_TYPE_BLOCK, _block_pad,
1583 _set_property_from_structure (GQuark field_id, const GValue * value,
1586 const gchar *property_name = g_quark_to_string (field_id);
1587 GObject *element = G_OBJECT (user_data);
1589 g_object_set_property (element, property_name, value);
1595 _lock_and_set_to_null (GstElement * element, GstSplitMuxSink * splitmux)
1597 gst_element_set_locked_state (element, TRUE);
1598 gst_element_set_state (element, GST_STATE_NULL);
1599 GST_LOG_OBJECT (splitmux, "Removing old element %" GST_PTR_FORMAT, element);
1600 gst_bin_remove (GST_BIN (splitmux), element);
1605 _send_event (const GValue * value, gpointer user_data)
1607 GstPad *pad = g_value_get_object (value);
1608 GstEvent *ev = user_data;
1610 gst_pad_send_event (pad, gst_event_ref (ev));
1613 /* Called with lock held when a fragment
1614 * reaches EOS and it is time to restart
1618 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1620 GstElement *muxer, *sink;
1622 g_assert (ctx->is_reference);
1624 /* 1 change to new file */
1625 splitmux->switching_fragment = TRUE;
1627 /* We need to drop the splitmux lock to acquire the state lock
1628 * here and ensure there's no racy state change going on elsewhere */
1629 muxer = gst_object_ref (splitmux->muxer);
1630 sink = gst_object_ref (splitmux->active_sink);
1632 GST_SPLITMUX_UNLOCK (splitmux);
1633 GST_STATE_LOCK (splitmux);
1635 if (splitmux->async_finalize) {
1636 if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id != 0) {
1638 GstElement *new_sink, *new_muxer;
1640 GST_DEBUG_OBJECT (splitmux, "Starting fragment %u",
1641 splitmux->fragment_id);
1642 g_list_foreach (splitmux->contexts, (GFunc) block_context, splitmux);
1643 newname = g_strdup_printf ("sink_%u", splitmux->fragment_id);
1644 GST_SPLITMUX_LOCK (splitmux);
1645 if ((splitmux->sink =
1646 create_element (splitmux, splitmux->sink_factory, newname,
1649 if (splitmux->sink_properties)
1650 gst_structure_foreach (splitmux->sink_properties,
1651 _set_property_from_structure, splitmux->sink);
1652 splitmux->active_sink = splitmux->sink;
1653 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
1655 newname = g_strdup_printf ("muxer_%u", splitmux->fragment_id);
1656 if ((splitmux->muxer =
1657 create_element (splitmux, splitmux->muxer_factory, newname,
1660 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
1662 /* async child elements are causing state change races and weird
1663 * failures, so let's try and turn that off */
1664 g_object_set (splitmux->sink, "async", FALSE, NULL);
1666 if (splitmux->muxer_properties)
1667 gst_structure_foreach (splitmux->muxer_properties,
1668 _set_property_from_structure, splitmux->muxer);
1669 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
1671 new_sink = splitmux->sink;
1672 new_muxer = splitmux->muxer;
1673 GST_SPLITMUX_UNLOCK (splitmux);
1674 g_list_foreach (splitmux->contexts, (GFunc) relink_context, splitmux);
1675 gst_element_link (new_muxer, new_sink);
1677 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1678 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1679 EOS_FROM_US)) == 2) {
1680 _lock_and_set_to_null (muxer, splitmux);
1681 _lock_and_set_to_null (sink, splitmux);
1683 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1684 GINT_TO_POINTER (2));
1687 gst_object_unref (muxer);
1688 gst_object_unref (sink);
1691 gst_object_ref (muxer);
1692 gst_object_ref (sink);
1696 gst_element_set_locked_state (muxer, TRUE);
1697 gst_element_set_locked_state (sink, TRUE);
1698 gst_element_set_state (sink, GST_STATE_NULL);
1700 if (splitmux->reset_muxer) {
1701 gst_element_set_state (muxer, GST_STATE_NULL);
1703 GstIterator *it = gst_element_iterate_sink_pads (muxer);
1706 ev = gst_event_new_flush_start ();
1707 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1708 gst_event_unref (ev);
1710 gst_iterator_resync (it);
1712 ev = gst_event_new_flush_stop (TRUE);
1713 while (gst_iterator_foreach (it, _send_event, ev) == GST_ITERATOR_RESYNC);
1714 gst_event_unref (ev);
1716 gst_iterator_free (it);
1720 GST_SPLITMUX_LOCK (splitmux);
1721 if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0)
1722 set_next_filename (splitmux, ctx);
1723 splitmux->muxed_out_bytes = 0;
1724 GST_SPLITMUX_UNLOCK (splitmux);
1726 gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
1727 gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
1728 gst_element_set_locked_state (muxer, FALSE);
1729 gst_element_set_locked_state (sink, FALSE);
1731 gst_object_unref (sink);
1732 gst_object_unref (muxer);
1734 GST_SPLITMUX_LOCK (splitmux);
1735 GST_STATE_UNLOCK (splitmux);
1736 splitmux->switching_fragment = FALSE;
1737 do_async_done (splitmux);
1739 splitmux->ready_for_output = TRUE;
1741 g_list_foreach (splitmux->contexts, (GFunc) unlock_context, splitmux);
1742 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
1744 send_fragment_opened_closed_msg (splitmux, TRUE, sink);
1746 /* FIXME: Is this always the correct next state? */
1747 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1748 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1752 GST_STATE_UNLOCK (splitmux);
1753 GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
1754 ("Could not create the new muxer/sink"), NULL);
1758 bus_handler (GstBin * bin, GstMessage * message)
1760 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
1762 switch (GST_MESSAGE_TYPE (message)) {
1763 case GST_MESSAGE_EOS:{
1764 /* If the state is draining out the current file, drop this EOS */
1767 sink = GST_ELEMENT (GST_MESSAGE_SRC (message));
1768 GST_SPLITMUX_LOCK (splitmux);
1770 send_fragment_opened_closed_msg (splitmux, FALSE, sink);
1772 if (splitmux->async_finalize) {
1774 if (g_object_get_qdata ((GObject *) sink, EOS_FROM_US)) {
1775 if (GPOINTER_TO_INT (g_object_get_qdata ((GObject *) sink,
1776 EOS_FROM_US)) == 2) {
1778 GstPad *sinksink, *muxersrc;
1780 sinksink = gst_element_get_static_pad (sink, "sink");
1781 muxersrc = gst_pad_get_peer (sinksink);
1782 muxer = gst_pad_get_parent_element (muxersrc);
1783 gst_object_unref (sinksink);
1784 gst_object_unref (muxersrc);
1786 gst_element_call_async (muxer,
1787 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1788 gst_object_ref (splitmux), gst_object_unref);
1789 gst_element_call_async (sink,
1790 (GstElementCallAsyncFunc) _lock_and_set_to_null,
1791 gst_object_ref (splitmux), gst_object_unref);
1792 gst_object_unref (muxer);
1794 g_object_set_qdata ((GObject *) sink, EOS_FROM_US,
1795 GINT_TO_POINTER (2));
1797 GST_DEBUG_OBJECT (splitmux,
1798 "Caught async EOS from previous muxer+sink. Dropping.");
1799 /* We forward the EOS so that it gets aggregated as normal. If the sink
1800 * finishes and is removed before the end, it will be de-aggregated */
1801 gst_message_unref (message);
1802 GST_SPLITMUX_UNLOCK (splitmux);
1805 } else if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
1806 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
1807 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1808 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1810 gst_message_unref (message);
1811 GST_SPLITMUX_UNLOCK (splitmux);
1814 GST_DEBUG_OBJECT (splitmux,
1815 "Passing EOS message. Output state %d max_out_running_time %"
1816 GST_STIME_FORMAT, splitmux->output_state,
1817 GST_STIME_ARGS (splitmux->max_out_running_time));
1819 GST_SPLITMUX_UNLOCK (splitmux);
1822 case GST_MESSAGE_ASYNC_START:
1823 case GST_MESSAGE_ASYNC_DONE:
1824 /* Ignore state changes from our children while switching */
1825 GST_SPLITMUX_LOCK (splitmux);
1826 if (splitmux->switching_fragment) {
1827 if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
1828 || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
1829 GST_LOG_OBJECT (splitmux,
1830 "Ignoring state change from child %" GST_PTR_FORMAT
1831 " while switching", GST_MESSAGE_SRC (message));
1832 gst_message_unref (message);
1833 GST_SPLITMUX_UNLOCK (splitmux);
1837 GST_SPLITMUX_UNLOCK (splitmux);
1839 case GST_MESSAGE_WARNING:
1841 GError *gerror = NULL;
1843 gst_message_parse_warning (message, &gerror, NULL);
1845 if (g_error_matches (gerror, GST_STREAM_ERROR, GST_STREAM_ERROR_FORMAT)) {
1847 gboolean caps_change = FALSE;
1849 GST_SPLITMUX_LOCK (splitmux);
1851 for (item = splitmux->contexts; item; item = item->next) {
1852 MqStreamCtx *ctx = item->data;
1854 if (ctx->caps_change) {
1860 GST_SPLITMUX_UNLOCK (splitmux);
1863 GST_LOG_OBJECT (splitmux,
1864 "Ignoring warning change from child %" GST_PTR_FORMAT
1865 " while switching caps", GST_MESSAGE_SRC (message));
1866 gst_message_unref (message);
1876 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1880 ctx_set_unblock (MqStreamCtx * ctx)
1882 ctx->need_unblock = TRUE;
1886 need_new_fragment (GstSplitMuxSink * splitmux,
1887 GstClockTime queued_time, GstClockTime queued_gop_time,
1888 guint64 queued_bytes)
1890 guint64 thresh_bytes;
1891 GstClockTime thresh_time;
1892 gboolean check_robust_muxing;
1893 GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
1894 GstClockTime *ptr_to_time;
1896 GST_OBJECT_LOCK (splitmux);
1897 thresh_bytes = splitmux->threshold_bytes;
1898 thresh_time = splitmux->threshold_time;
1899 ptr_to_time = (GstClockTime *)
1900 gst_queue_array_peek_head_struct (splitmux->times_to_split);
1902 time_to_split = *ptr_to_time;
1903 check_robust_muxing = splitmux->use_robust_muxing
1904 && splitmux->muxer_has_reserved_props;
1905 GST_OBJECT_UNLOCK (splitmux);
1907 /* Have we muxed anything into the new file at all? */
1908 if (splitmux->fragment_total_bytes <= 0)
1911 /* User told us to split now */
1912 if (g_atomic_int_get (&(splitmux->do_split_next_gop)) == TRUE)
1915 /* User told us to split at this running time */
1916 if (splitmux->reference_ctx->in_running_time > time_to_split) {
1917 GST_OBJECT_LOCK (splitmux);
1918 /* Dequeue running time */
1919 gst_queue_array_pop_head_struct (splitmux->times_to_split);
1920 /* Empty any running times after this that are past now */
1921 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
1922 while (ptr_to_time) {
1923 time_to_split = *ptr_to_time;
1924 if (splitmux->reference_ctx->in_running_time <= time_to_split) {
1927 gst_queue_array_pop_head_struct (splitmux->times_to_split);
1928 ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
1930 GST_OBJECT_UNLOCK (splitmux);
1934 if (thresh_bytes > 0 && queued_bytes > thresh_bytes)
1935 return TRUE; /* Would overrun byte limit */
1937 if (thresh_time > 0 && queued_time > thresh_time)
1938 return TRUE; /* Would overrun byte limit */
1940 /* Timecode-based threshold accounts for possible rounding errors:
1941 * 5us should be bigger than all possible rounding errors but nowhere near
1942 * big enough to skip to another frame */
1943 if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
1944 splitmux->reference_ctx->in_running_time >
1945 splitmux->next_max_tc_time + 5 * GST_USECOND)
1946 return TRUE; /* Timecode threshold */
1948 if (check_robust_muxing) {
1949 GstClockTime mux_reserved_remain;
1951 g_object_get (splitmux->muxer,
1952 "reserved-duration-remaining", &mux_reserved_remain, NULL);
1954 GST_LOG_OBJECT (splitmux,
1955 "Muxer robust muxing report - %" G_GUINT64_FORMAT
1956 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT,
1957 mux_reserved_remain, queued_gop_time);
1959 if (queued_gop_time >= mux_reserved_remain) {
1960 GST_INFO_OBJECT (splitmux,
1961 "File is about to run out of header room - %" G_GUINT64_FORMAT
1962 " remaining. New GOP would enqueue %" G_GUINT64_FORMAT
1963 ". Switching to new file", mux_reserved_remain, queued_gop_time);
1968 /* Continue and mux this GOP */
1972 /* Called with splitmux lock held */
1973 /* Called when entering ProcessingCompleteGop state
1974 * Assess if mq contents overflowed the current file
1975 * -> If yes, need to switch to new file
1976 * -> if no, set max_out_running_time to let this GOP in and
1977 * go to COLLECTING_GOP_START state
1980 handle_gathered_gop (GstSplitMuxSink * splitmux)
1982 guint64 queued_bytes;
1983 GstClockTimeDiff queued_time = 0;
1984 GstClockTimeDiff queued_gop_time = 0;
1985 GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
1986 SplitMuxOutputCommand *cmd;
1988 /* Assess if the multiqueue contents overflowed the current file */
1989 /* When considering if a newly gathered GOP overflows
1990 * the time limit for the file, only consider the running time of the
1991 * reference stream. Other streams might have run ahead a little bit,
1992 * but extra pieces won't be released to the muxer beyond the reference
1993 * stream cut-off anyway - so it forms the limit. */
1994 queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
1995 queued_time = splitmux->reference_ctx->in_running_time;
1996 /* queued_gop_time tracks how much unwritten data there is waiting to
1997 * be written to this fragment including this GOP */
1998 if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
2000 splitmux->reference_ctx->in_running_time -
2001 splitmux->reference_ctx->out_running_time;
2004 splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
2006 GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
2008 g_assert (queued_gop_time >= 0);
2009 g_assert (queued_time >= splitmux->fragment_start_time);
2011 queued_time -= splitmux->fragment_start_time;
2012 if (queued_time < queued_gop_time)
2013 queued_gop_time = queued_time;
2015 /* Expand queued bytes estimate by muxer overhead */
2016 queued_bytes += (queued_bytes * splitmux->mux_overhead);
2018 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
2019 " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
2020 if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
2021 GST_LOG_OBJECT (splitmux,
2022 "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
2023 GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
2024 GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
2027 /* Check for overrun - have we output at least one byte and overrun
2028 * either threshold? */
2029 if (need_new_fragment (splitmux, queued_time, queued_gop_time, queued_bytes)) {
2030 GstClockTime *sink_running_time = g_new (GstClockTime, 1);
2031 *sink_running_time = splitmux->reference_ctx->out_running_time;
2032 g_object_set_qdata_full (G_OBJECT (splitmux->sink),
2033 RUNNING_TIME, sink_running_time, g_free);
2034 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
2035 /* Tell the output side to start a new fragment */
2036 GST_INFO_OBJECT (splitmux,
2037 "This GOP (dur %" GST_STIME_FORMAT
2038 ") would overflow the fragment, Sending start_new_fragment cmd",
2039 GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
2040 splitmux->gop_start_time));
2041 cmd = out_cmd_buf_new ();
2042 cmd->start_new_fragment = TRUE;
2043 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2044 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2046 new_out_ts = splitmux->reference_ctx->in_running_time;
2047 splitmux->fragment_start_time = splitmux->gop_start_time;
2048 splitmux->fragment_total_bytes = 0;
2050 if (request_next_keyframe (splitmux,
2051 splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
2052 GST_WARNING_OBJECT (splitmux,
2053 "Could not request a keyframe. Files may not split at the exact location they should");
2055 gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2058 /* And set up to collect the next GOP */
2059 if (!splitmux->reference_ctx->in_eos) {
2060 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2061 splitmux->gop_start_time = new_out_ts;
2063 /* This is probably already the current state, but just in case: */
2064 splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
2065 new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */
2068 /* And wake all input contexts to send a wake-up event */
2069 g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
2070 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2072 /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
2073 splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
2075 if (splitmux->gop_total_bytes > 0) {
2076 GST_LOG_OBJECT (splitmux,
2077 "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
2078 " time %" GST_STIME_FORMAT,
2079 splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
2081 /* Send this GOP to the output command queue */
2082 cmd = out_cmd_buf_new ();
2083 cmd->start_new_fragment = FALSE;
2084 cmd->max_output_ts = new_out_ts;
2085 GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
2086 GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
2087 g_queue_push_head (&splitmux->out_cmd_q, cmd);
2089 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2092 splitmux->gop_total_bytes = 0;
2095 /* Called with splitmux lock held */
2096 /* Called from each input pad when it is has all the pieces
2097 * for a GOP or EOS, starting with the reference pad which has set the
2098 * splitmux->max_in_running_time
2101 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2106 /* On ENDING_FILE, the reference stream sends a command to start a new
2107 * fragment, then releases the GOP for output in the new fragment.
2108 * If somes streams received no buffer during the last GOP that overran,
2109 * because its next buffer has a timestamp bigger than
2110 * ctx->max_in_running_time, its queue is empty. In that case the only
2111 * way to wakeup the output thread is by injecting an event in the
2112 * queue. This usually happen with subtitle streams.
2113 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
2114 if (ctx->need_unblock) {
2115 GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
2116 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
2117 GST_EVENT_TYPE_SERIALIZED,
2118 gst_structure_new ("splitmuxsink-unblock", "timestamp",
2119 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
2121 GST_SPLITMUX_UNLOCK (splitmux);
2122 gst_pad_send_event (ctx->sinkpad, event);
2123 GST_SPLITMUX_LOCK (splitmux);
2125 ctx->need_unblock = FALSE;
2126 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2127 /* state may have changed while we were unlocked. Loop again if so */
2128 if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
2132 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2133 gboolean ready = TRUE;
2135 /* Iterate each pad, and check that the input running time is at least
2136 * up to the reference running time, and if so handle the collected GOP */
2137 GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
2138 GST_STIME_FORMAT " ctx %p",
2139 GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
2140 for (cur = g_list_first (splitmux->contexts); cur != NULL;
2141 cur = g_list_next (cur)) {
2142 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2144 GST_LOG_OBJECT (splitmux,
2145 "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
2146 " EOS %d", tmpctx, tmpctx->srcpad,
2147 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
2149 if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
2150 tmpctx->in_running_time < splitmux->max_in_running_time &&
2152 GST_LOG_OBJECT (splitmux,
2153 "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
2154 tmpctx, tmpctx->srcpad);
2160 GST_DEBUG_OBJECT (splitmux,
2161 "Collected GOP is complete. Processing (ctx %p)", ctx);
2162 /* All pads have a complete GOP, release it into the multiqueue */
2163 handle_gathered_gop (splitmux);
2165 /* The user has requested a split, we can split now that the previous GOP
2166 * has been collected to the correct location */
2167 if (g_atomic_int_compare_and_exchange (&(splitmux->split_requested), TRUE,
2169 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
2174 /* If upstream reached EOS we are not expecting more data, no need to wait
2179 /* Some pad is not yet ready, or GOP is being pushed
2180 * either way, sleep and wait to get woken */
2181 while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
2183 (ctx->in_running_time >= splitmux->max_in_running_time) &&
2184 (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
2186 GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
2187 GST_SPLITMUX_WAIT_INPUT (splitmux);
2188 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
2192 static GstPadProbeReturn
2193 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
2195 GstSplitMuxSink *splitmux = ctx->splitmux;
2197 MqStreamBuf *buf_info = NULL;
2199 gboolean loop_again;
2200 gboolean keyframe = FALSE;
2202 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
2204 /* FIXME: Handle buffer lists, until then make it clear they won't work */
2205 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
2206 g_warning ("Buffer list handling not implemented");
2207 return GST_PAD_PROBE_DROP;
2209 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
2210 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
2211 GstEvent *event = gst_pad_probe_info_get_event (info);
2213 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
2215 switch (GST_EVENT_TYPE (event)) {
2216 case GST_EVENT_SEGMENT:
2217 gst_event_copy_segment (event, &ctx->in_segment);
2219 case GST_EVENT_FLUSH_STOP:
2220 GST_SPLITMUX_LOCK (splitmux);
2221 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
2222 ctx->in_eos = FALSE;
2223 ctx->in_running_time = GST_CLOCK_STIME_NONE;
2224 GST_SPLITMUX_UNLOCK (splitmux);
2227 GST_SPLITMUX_LOCK (splitmux);
2230 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2233 if (ctx->is_reference) {
2234 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
2235 /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
2236 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2237 /* Wake up other input pads to collect this GOP */
2238 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2239 check_completed_gop (splitmux, ctx);
2240 } else if (splitmux->input_state ==
2241 SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
2242 /* If we are waiting for a GOP to be completed (ie, for aux
2243 * pads to catch up), then this pad is complete, so check
2244 * if the whole GOP is.
2246 check_completed_gop (splitmux, ctx);
2248 GST_SPLITMUX_UNLOCK (splitmux);
2250 case GST_EVENT_GAP:{
2251 GstClockTime gap_ts;
2252 GstClockTimeDiff rtime;
2254 gst_event_parse_gap (event, &gap_ts, NULL);
2255 if (gap_ts == GST_CLOCK_TIME_NONE)
2258 GST_SPLITMUX_LOCK (splitmux);
2260 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2262 rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
2264 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
2265 GST_STIME_ARGS (rtime));
2267 if (ctx->is_reference
2268 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2269 splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
2270 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2271 GST_STIME_ARGS (splitmux->fragment_start_time));
2272 /* Also take this as the first start time when starting up,
2273 * so that we start counting overflow from the first frame */
2274 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2275 splitmux->max_in_running_time = splitmux->fragment_start_time;
2278 GST_SPLITMUX_UNLOCK (splitmux);
2284 return GST_PAD_PROBE_PASS;
2285 } else if (info->type & GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM) {
2286 switch (GST_QUERY_TYPE (GST_QUERY (info->data))) {
2287 case GST_QUERY_ALLOCATION:
2288 return GST_PAD_PROBE_DROP;
2290 return GST_PAD_PROBE_PASS;
2294 buf = gst_pad_probe_info_get_buffer (info);
2295 buf_info = mq_stream_buf_new ();
2297 if (GST_BUFFER_PTS_IS_VALID (buf))
2298 ts = GST_BUFFER_PTS (buf);
2300 ts = GST_BUFFER_DTS (buf);
2302 GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
2304 GST_SPLITMUX_LOCK (splitmux);
2306 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
2309 /* If this buffer has a timestamp, advance the input timestamp of the
2311 if (GST_CLOCK_TIME_IS_VALID (ts)) {
2312 GstClockTimeDiff running_time =
2313 my_segment_to_running_time (&ctx->in_segment, ts);
2315 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
2316 GST_STIME_ARGS (running_time));
2318 if (GST_CLOCK_STIME_IS_VALID (running_time)
2319 && running_time > ctx->in_running_time)
2320 ctx->in_running_time = running_time;
2323 /* Try to make sure we have a valid running time */
2324 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
2325 ctx->in_running_time =
2326 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
2329 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
2330 GST_STIME_ARGS (ctx->in_running_time));
2332 buf_info->run_ts = ctx->in_running_time;
2333 buf_info->buf_size = gst_buffer_get_size (buf);
2334 buf_info->duration = GST_BUFFER_DURATION (buf);
2336 /* initialize fragment_start_time */
2337 if (ctx->is_reference
2338 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
2339 splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
2340 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
2341 GST_STIME_ARGS (splitmux->fragment_start_time));
2342 gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2344 /* Also take this as the first start time when starting up,
2345 * so that we start counting overflow from the first frame */
2346 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
2347 splitmux->max_in_running_time = splitmux->fragment_start_time;
2348 if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
2349 GST_WARNING_OBJECT (splitmux,
2350 "Could not request a keyframe. Files may not split at the exact location they should");
2352 gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
2355 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
2356 " total GOP bytes %" G_GUINT64_FORMAT,
2357 GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
2364 switch (splitmux->input_state) {
2365 case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
2366 if (ctx->is_reference) {
2367 /* This is the reference context. If it's a keyframe,
2368 * it marks the start of a new GOP and we should wait in
2369 * check_completed_gop before continuing, but either way
2370 * (keyframe or no, we'll pass this buffer through after
2371 * so set loop_again to FALSE */
2374 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
2375 /* Allow other input pads to catch up to here too */
2376 splitmux->max_in_running_time = ctx->in_running_time;
2377 GST_LOG_OBJECT (splitmux,
2378 "Max in running time now %" GST_TIME_FORMAT,
2379 GST_TIME_ARGS (splitmux->max_in_running_time));
2380 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2383 GST_INFO_OBJECT (pad,
2384 "Have keyframe with running time %" GST_STIME_FORMAT,
2385 GST_STIME_ARGS (ctx->in_running_time));
2387 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
2388 splitmux->max_in_running_time = ctx->in_running_time;
2389 GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
2390 GST_TIME_ARGS (splitmux->max_in_running_time));
2391 /* Wake up other input pads to collect this GOP */
2392 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2393 check_completed_gop (splitmux, ctx);
2394 /* Store this new keyframe to remember the start of GOP */
2395 gst_buffer_replace (&ctx->prev_in_keyframe, buf);
2397 /* Pass this buffer if the reference ctx is far enough ahead */
2398 if (ctx->in_running_time < splitmux->max_in_running_time) {
2403 /* We're still waiting for a keyframe on the reference pad, sleep */
2404 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
2405 GST_SPLITMUX_WAIT_INPUT (splitmux);
2406 GST_LOG_OBJECT (pad,
2407 "Done sleeping for GOP start input state now %d",
2408 splitmux->input_state);
2411 case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
2412 /* We're collecting a GOP. If this is the reference context,
2413 * we need to check if this is a keyframe that marks the start
2414 * of the next GOP. If it is, it marks the end of the GOP we're
2415 * collecting, so sleep and wait until all the other pads also
2416 * reach that timestamp - at which point, we have an entire GOP
2417 * and either go to ENDING_FILE or release this GOP to the muxer and
2418 * go back to COLLECT_GOP_START. */
2420 /* If we overran the target timestamp, it might be time to process
2421 * the GOP, otherwise bail out for more data
2423 GST_LOG_OBJECT (pad,
2424 "Checking TS %" GST_STIME_FORMAT " against max %"
2425 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
2426 GST_STIME_ARGS (splitmux->max_in_running_time));
2428 if (ctx->in_running_time < splitmux->max_in_running_time) {
2433 GST_LOG_OBJECT (pad,
2434 "Collected last packet of GOP. Checking other pads");
2435 check_completed_gop (splitmux, ctx);
2438 case SPLITMUX_INPUT_STATE_FINISHING_UP:
2449 splitmux->queued_keyframes++;
2450 buf_info->keyframe = TRUE;
2453 /* Update total input byte counter for overflow detect */
2454 splitmux->gop_total_bytes += buf_info->buf_size;
2456 /* Now add this buffer to the queue just before returning */
2457 g_queue_push_head (&ctx->queued_bufs, buf_info);
2459 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
2460 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
2462 GST_SPLITMUX_UNLOCK (splitmux);
2463 return GST_PAD_PROBE_PASS;
2466 GST_SPLITMUX_UNLOCK (splitmux);
2468 mq_stream_buf_free (buf_info);
2469 return GST_PAD_PROBE_PASS;
2473 grow_blocked_queues (GstSplitMuxSink * splitmux)
2477 /* Scan other queues for full-ness and grow them */
2478 for (cur = g_list_first (splitmux->contexts);
2479 cur != NULL; cur = g_list_next (cur)) {
2480 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2482 guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
2484 g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
2485 GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
2487 if (cur_len >= cur_limit) {
2488 cur_limit = cur_len + 1;
2489 GST_DEBUG_OBJECT (tmpctx->q,
2490 "Queue overflowed and needs enlarging. Growing to %u buffers",
2492 g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
2498 handle_q_underrun (GstElement * q, gpointer user_data)
2500 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2501 GstSplitMuxSink *splitmux = ctx->splitmux;
2503 GST_SPLITMUX_LOCK (splitmux);
2504 GST_DEBUG_OBJECT (q,
2505 "Queue reported underrun with %d keyframes and %d cmds enqueued",
2506 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2507 grow_blocked_queues (splitmux);
2508 GST_SPLITMUX_UNLOCK (splitmux);
2512 handle_q_overrun (GstElement * q, gpointer user_data)
2514 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
2515 GstSplitMuxSink *splitmux = ctx->splitmux;
2516 gboolean allow_grow = FALSE;
2518 GST_SPLITMUX_LOCK (splitmux);
2519 GST_DEBUG_OBJECT (q,
2520 "Queue reported overrun with %d keyframes and %d cmds enqueued",
2521 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
2523 if (splitmux->queued_keyframes < 2) {
2524 /* Less than a full GOP queued, grow the queue */
2526 } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
2529 /* If another queue is starved, grow */
2531 for (cur = g_list_first (splitmux->contexts);
2532 cur != NULL; cur = g_list_next (cur)) {
2533 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
2534 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
2539 GST_SPLITMUX_UNLOCK (splitmux);
2544 g_object_get (q, "max-size-buffers", &cur_limit, NULL);
2547 GST_DEBUG_OBJECT (q,
2548 "Queue overflowed and needs enlarging. Growing to %u buffers",
2551 g_object_set (q, "max-size-buffers", cur_limit, NULL);
2556 gst_splitmux_sink_request_new_pad (GstElement * element,
2557 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
2559 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2560 GstPadTemplate *mux_template = NULL;
2563 GstPad *q_sink = NULL, *q_src = NULL;
2565 gboolean is_video = FALSE;
2568 GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
2570 GST_SPLITMUX_LOCK (splitmux);
2571 if (!create_muxer (splitmux))
2573 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
2575 if (templ->name_template) {
2576 if (g_str_equal (templ->name_template, "video")) {
2577 if (splitmux->have_video)
2578 goto already_have_video;
2580 /* FIXME: Look for a pad template with matching caps, rather than by name */
2581 GST_DEBUG_OBJECT (element,
2582 "searching for pad-template with name 'video_%%u'");
2584 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2585 (splitmux->muxer), "video_%u");
2587 /* Fallback to find sink pad templates named 'video' (flvmux) */
2588 if (!mux_template) {
2589 GST_DEBUG_OBJECT (element,
2590 "searching for pad-template with name 'video'");
2592 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2593 (splitmux->muxer), "video");
2598 GST_DEBUG_OBJECT (element, "searching for pad-template with name '%s'",
2599 templ->name_template);
2601 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2602 (splitmux->muxer), templ->name_template);
2604 /* Fallback to find sink pad templates named 'audio' (flvmux) */
2605 if (!mux_template) {
2606 GST_DEBUG_OBJECT (element,
2607 "searching for pad-template with name 'audio'");
2609 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2610 (splitmux->muxer), "audio");
2615 if (mux_template == NULL) {
2616 GST_DEBUG_OBJECT (element,
2617 "searching for pad-template with name 'sink_%%d'");
2619 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2620 (splitmux->muxer), "sink_%d");
2623 if (mux_template == NULL) {
2624 GST_DEBUG_OBJECT (element, "searching for pad-template with name 'sink'");
2626 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
2627 (splitmux->muxer), "sink");
2632 if (mux_template == NULL) {
2633 GST_ERROR_OBJECT (element,
2634 "unable to find a suitable sink pad-template on the muxer");
2638 GST_DEBUG_OBJECT (element, "found sink pad-template '%s' on the muxer",
2639 mux_template->name_template);
2641 if (mux_template->presence == GST_PAD_REQUEST) {
2642 GST_DEBUG_OBJECT (element, "requesting pad from pad-template");
2644 res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
2647 } else if (mux_template->presence == GST_PAD_ALWAYS) {
2648 GST_DEBUG_OBJECT (element, "accessing always pad from pad-template");
2651 gst_element_get_static_pad (splitmux->muxer,
2652 mux_template->name_template);
2656 GST_ERROR_OBJECT (element,
2657 "unexpected pad presence %d", mux_template->presence);
2663 gname = g_strdup ("video");
2664 else if (name == NULL)
2665 gname = gst_pad_get_name (res);
2667 gname = g_strdup (name);
2669 if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
2672 gst_element_set_state (q, GST_STATE_TARGET (splitmux));
2674 g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
2675 "max-size-buffers", 5, NULL);
2677 q_sink = gst_element_get_static_pad (q, "sink");
2678 q_src = gst_element_get_static_pad (q, "src");
2680 if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
2681 gst_element_release_request_pad (splitmux->muxer, res);
2682 gst_object_unref (GST_OBJECT (res));
2686 gst_object_unref (GST_OBJECT (res));
2688 ctx = mq_stream_ctx_new (splitmux);
2689 /* Context holds a ref: */
2690 ctx->q = gst_object_ref (q);
2691 ctx->srcpad = q_src;
2692 ctx->sinkpad = q_sink;
2694 g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
2695 g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
2697 ctx->src_pad_block_id =
2698 gst_pad_add_probe (q_src,
2699 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
2700 (GstPadProbeCallback) handle_mq_output, ctx, NULL);
2701 if (is_video && splitmux->reference_ctx != NULL) {
2702 splitmux->reference_ctx->is_reference = FALSE;
2703 splitmux->reference_ctx = NULL;
2705 if (splitmux->reference_ctx == NULL) {
2706 splitmux->reference_ctx = ctx;
2707 ctx->is_reference = TRUE;
2710 res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
2711 g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
2713 ctx->sink_pad_block_id =
2714 gst_pad_add_probe (q_sink,
2715 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
2716 GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2717 (GstPadProbeCallback) handle_mq_input, ctx, NULL);
2719 GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
2720 " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
2722 splitmux->contexts = g_list_append (splitmux->contexts, ctx);
2727 splitmux->have_video = TRUE;
2729 gst_pad_set_active (res, TRUE);
2730 gst_element_add_pad (element, res);
2732 GST_SPLITMUX_UNLOCK (splitmux);
2736 GST_SPLITMUX_UNLOCK (splitmux);
2739 gst_object_unref (q_sink);
2741 gst_object_unref (q_src);
2744 GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
2745 GST_SPLITMUX_UNLOCK (splitmux);
2750 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
2752 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2753 GstPad *muxpad = NULL;
2755 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
2757 GST_SPLITMUX_LOCK (splitmux);
2759 if (splitmux->muxer == NULL)
2760 goto fail; /* Elements don't exist yet - nothing to release */
2762 GST_INFO_OBJECT (pad, "releasing request pad");
2764 muxpad = gst_pad_get_peer (ctx->srcpad);
2766 /* Remove the context from our consideration */
2767 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
2769 if (ctx->sink_pad_block_id)
2770 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
2772 if (ctx->src_pad_block_id)
2773 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
2775 /* Can release the context now */
2776 mq_stream_ctx_free (ctx);
2777 if (ctx == splitmux->reference_ctx)
2778 splitmux->reference_ctx = NULL;
2780 /* Release and free the muxer input */
2782 gst_element_release_request_pad (splitmux->muxer, muxpad);
2783 gst_object_unref (muxpad);
2786 if (GST_PAD_PAD_TEMPLATE (pad) &&
2787 g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
2789 splitmux->have_video = FALSE;
2791 gst_element_remove_pad (element, pad);
2793 /* Reset the internal elements only after all request pads are released */
2794 if (splitmux->contexts == NULL)
2795 gst_splitmux_reset (splitmux);
2798 GST_SPLITMUX_UNLOCK (splitmux);
2802 create_element (GstSplitMuxSink * splitmux,
2803 const gchar * factory, const gchar * name, gboolean locked)
2805 GstElement *ret = gst_element_factory_make (factory, name);
2807 g_warning ("Failed to create %s - splitmuxsink will not work", name);
2812 /* Ensure the sink starts in locked state and NULL - it will be changed
2813 * by the filename setting code */
2814 gst_element_set_locked_state (ret, TRUE);
2815 gst_element_set_state (ret, GST_STATE_NULL);
2818 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
2819 g_warning ("Could not add %s element - splitmuxsink will not work", name);
2820 gst_object_unref (ret);
2828 create_muxer (GstSplitMuxSink * splitmux)
2830 /* Create internal elements */
2831 if (splitmux->muxer == NULL) {
2832 GstElement *provided_muxer = NULL;
2834 GST_OBJECT_LOCK (splitmux);
2835 if (splitmux->provided_muxer != NULL)
2836 provided_muxer = gst_object_ref (splitmux->provided_muxer);
2837 GST_OBJECT_UNLOCK (splitmux);
2839 if ((!splitmux->async_finalize && provided_muxer == NULL) ||
2840 (splitmux->async_finalize && splitmux->muxer_factory == NULL)) {
2841 if ((splitmux->muxer =
2842 create_element (splitmux, DEFAULT_MUXER, "muxer", FALSE)) == NULL)
2844 } else if (splitmux->async_finalize) {
2845 if ((splitmux->muxer =
2846 create_element (splitmux, splitmux->muxer_factory, "muxer",
2849 if (splitmux->muxer_properties)
2850 gst_structure_foreach (splitmux->muxer_properties,
2851 _set_property_from_structure, splitmux->muxer);
2853 /* Ensure it's not in locked state (we might be reusing an old element) */
2854 gst_element_set_locked_state (provided_muxer, FALSE);
2855 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
2856 g_warning ("Could not add muxer element - splitmuxsink will not work");
2857 gst_object_unref (provided_muxer);
2861 splitmux->muxer = provided_muxer;
2862 gst_object_unref (provided_muxer);
2865 if (splitmux->use_robust_muxing) {
2866 update_muxer_properties (splitmux);
2876 find_sink (GstElement * e)
2878 GstElement *res = NULL;
2880 gboolean done = FALSE;
2881 GValue data = { 0, };
2883 if (!GST_IS_BIN (e))
2886 if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
2889 iter = gst_bin_iterate_sinks (GST_BIN (e));
2891 switch (gst_iterator_next (iter, &data)) {
2892 case GST_ITERATOR_OK:
2894 GstElement *child = g_value_get_object (&data);
2895 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
2896 "location") != NULL) {
2900 g_value_reset (&data);
2903 case GST_ITERATOR_RESYNC:
2904 gst_iterator_resync (iter);
2906 case GST_ITERATOR_DONE:
2909 case GST_ITERATOR_ERROR:
2910 g_assert_not_reached ();
2914 g_value_unset (&data);
2915 gst_iterator_free (iter);
2921 create_sink (GstSplitMuxSink * splitmux)
2923 GstElement *provided_sink = NULL;
2925 if (splitmux->active_sink == NULL) {
2927 GST_OBJECT_LOCK (splitmux);
2928 if (splitmux->provided_sink != NULL)
2929 provided_sink = gst_object_ref (splitmux->provided_sink);
2930 GST_OBJECT_UNLOCK (splitmux);
2932 if ((!splitmux->async_finalize && provided_sink == NULL) ||
2933 (splitmux->async_finalize && splitmux->sink_factory == NULL)) {
2934 if ((splitmux->sink =
2935 create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
2937 splitmux->active_sink = splitmux->sink;
2938 } else if (splitmux->async_finalize) {
2939 if ((splitmux->sink =
2940 create_element (splitmux, splitmux->sink_factory, "sink",
2943 if (splitmux->sink_properties)
2944 gst_structure_foreach (splitmux->sink_properties,
2945 _set_property_from_structure, splitmux->sink);
2946 splitmux->active_sink = splitmux->sink;
2948 /* Ensure the sink starts in locked state and NULL - it will be changed
2949 * by the filename setting code */
2950 gst_element_set_locked_state (provided_sink, TRUE);
2951 gst_element_set_state (provided_sink, GST_STATE_NULL);
2952 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
2953 g_warning ("Could not add sink elements - splitmuxsink will not work");
2954 gst_object_unref (provided_sink);
2958 splitmux->active_sink = provided_sink;
2960 /* The bin holds a ref now, we can drop our tmp ref */
2961 gst_object_unref (provided_sink);
2963 /* Find the sink element */
2964 splitmux->sink = find_sink (splitmux->active_sink);
2965 if (splitmux->sink == NULL) {
2967 ("Could not locate sink element in provided sink - splitmuxsink will not work");
2973 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2975 /* async child elements are causing state change races and weird
2976 * failures, so let's try and turn that off */
2977 g_object_set (splitmux->sink, "async", FALSE, NULL);
2981 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
2982 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
2993 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
2996 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2998 gchar *fname = NULL;
3002 gst_splitmux_sink_ensure_max_files (splitmux);
3004 if (ctx->cur_out_buffer == NULL) {
3005 GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
3008 caps = gst_pad_get_current_caps (ctx->srcpad);
3009 sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
3010 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
3011 splitmux->fragment_id, sample, &fname);
3012 gst_sample_unref (sample);
3014 gst_caps_unref (caps);
3016 if (fname == NULL) {
3017 /* Fallback to the old signal if the new one returned nothing */
3018 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
3019 splitmux->fragment_id, &fname);
3023 fname = splitmux->location ?
3024 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
3027 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
3028 g_object_set (splitmux->sink, "location", fname, NULL);
3031 splitmux->fragment_id++;
3036 do_async_start (GstSplitMuxSink * splitmux)
3038 GstMessage *message;
3040 if (!splitmux->need_async_start) {
3041 GST_INFO_OBJECT (splitmux, "no async_start needed");
3045 splitmux->async_pending = TRUE;
3047 GST_INFO_OBJECT (splitmux, "Sending async_start message");
3048 message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
3049 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3050 (splitmux), message);
3054 do_async_done (GstSplitMuxSink * splitmux)
3056 GstMessage *message;
3058 if (splitmux->async_pending) {
3059 GST_INFO_OBJECT (splitmux, "Sending async_done message");
3061 gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
3062 GST_CLOCK_TIME_NONE);
3063 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
3064 (splitmux), message);
3066 splitmux->async_pending = FALSE;
3069 splitmux->need_async_start = FALSE;
3072 static GstStateChangeReturn
3073 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
3075 GstStateChangeReturn ret;
3076 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
3078 switch (transition) {
3079 case GST_STATE_CHANGE_NULL_TO_READY:{
3080 GST_SPLITMUX_LOCK (splitmux);
3081 if (!create_muxer (splitmux) || !create_sink (splitmux)) {
3082 ret = GST_STATE_CHANGE_FAILURE;
3083 GST_SPLITMUX_UNLOCK (splitmux);
3086 g_signal_emit (splitmux, signals[SIGNAL_MUXER_ADDED], 0, splitmux->muxer);
3087 g_signal_emit (splitmux, signals[SIGNAL_SINK_ADDED], 0, splitmux->sink);
3088 GST_SPLITMUX_UNLOCK (splitmux);
3089 splitmux->fragment_id = 0;
3092 case GST_STATE_CHANGE_READY_TO_PAUSED:{
3093 GST_SPLITMUX_LOCK (splitmux);
3094 /* Start by collecting one input on each pad */
3095 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
3096 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
3097 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
3098 splitmux->gop_start_time = splitmux->fragment_start_time =
3099 GST_CLOCK_STIME_NONE;
3100 splitmux->muxed_out_bytes = 0;
3101 splitmux->ready_for_output = FALSE;
3102 GST_SPLITMUX_UNLOCK (splitmux);
3105 case GST_STATE_CHANGE_PAUSED_TO_READY:
3106 g_atomic_int_set (&(splitmux->split_requested), FALSE);
3107 g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
3108 case GST_STATE_CHANGE_READY_TO_NULL:
3109 GST_SPLITMUX_LOCK (splitmux);
3110 gst_queue_array_clear (splitmux->times_to_split);
3111 splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
3112 splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
3113 /* Wake up any blocked threads */
3114 GST_LOG_OBJECT (splitmux,
3115 "State change -> NULL or READY. Waking threads");
3116 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
3117 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
3118 GST_SPLITMUX_UNLOCK (splitmux);
3124 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3125 if (ret == GST_STATE_CHANGE_FAILURE)
3128 switch (transition) {
3129 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3130 splitmux->need_async_start = TRUE;
3132 case GST_STATE_CHANGE_READY_TO_PAUSED:{
3133 /* Change state async, because our child sink might not
3134 * be ready to do that for us yet if it's state is still locked */
3136 splitmux->need_async_start = TRUE;
3137 /* we want to go async to PAUSED until we managed to configure and add the
3139 GST_SPLITMUX_LOCK (splitmux);
3140 do_async_start (splitmux);
3141 GST_SPLITMUX_UNLOCK (splitmux);
3142 ret = GST_STATE_CHANGE_ASYNC;
3145 case GST_STATE_CHANGE_READY_TO_NULL:
3146 GST_SPLITMUX_LOCK (splitmux);
3147 splitmux->fragment_id = 0;
3148 /* Reset internal elements only if no pad contexts are using them */
3149 if (splitmux->contexts == NULL)
3150 gst_splitmux_reset (splitmux);
3151 do_async_done (splitmux);
3152 GST_SPLITMUX_UNLOCK (splitmux);
3159 if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
3160 ret == GST_STATE_CHANGE_FAILURE) {
3161 /* Cleanup elements on failed transition out of NULL */
3162 gst_splitmux_reset (splitmux);
3163 GST_SPLITMUX_LOCK (splitmux);
3164 do_async_done (splitmux);
3165 GST_SPLITMUX_UNLOCK (splitmux);
3171 register_splitmuxsink (GstPlugin * plugin)
3173 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
3174 "Split File Muxing Sink");
3176 return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
3177 GST_TYPE_SPLITMUX_SINK);
3181 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
3183 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
3184 splitmux->fragment_id = 0;
3189 split_now (GstSplitMuxSink * splitmux)
3191 g_atomic_int_set (&(splitmux->do_split_next_gop), TRUE);
3195 split_after (GstSplitMuxSink * splitmux)
3197 g_atomic_int_set (&(splitmux->split_requested), TRUE);
3201 split_at_running_time (GstSplitMuxSink * splitmux, GstClockTime split_time)
3203 gboolean send_keyframe_requests;
3205 GST_SPLITMUX_LOCK (splitmux);
3206 gst_queue_array_push_tail_struct (splitmux->times_to_split, &split_time);
3207 send_keyframe_requests = splitmux->send_keyframe_requests;
3208 GST_SPLITMUX_UNLOCK (splitmux);
3210 if (send_keyframe_requests) {
3212 gst_video_event_new_upstream_force_key_unit (split_time, TRUE, 0);
3213 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
3214 GST_TIME_ARGS (split_time));
3215 if (!gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev)) {
3216 GST_WARNING_OBJECT (splitmux,
3217 "Could not request keyframe at %" GST_TIME_FORMAT,
3218 GST_TIME_ARGS (split_time));