1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @short_description: Muxer wrapper for splitting output stream by size or time
24 * This element wraps a muxer and a sink, and starts a new file when the mux
25 * contents are about to cross a threshold of maximum size of maximum time,
26 * splitting at video keyframe boundaries. Exactly one input video stream
27 * can be muxed, with as many accompanying audio and subtitle streams as
30 * By default, it uses mp4mux and filesink, but they can be changed via
31 * the 'muxer' and 'sink' properties.
33 * The minimum file size is 1 GOP, however - so limits may be overrun if the
34 * distance between any 2 keyframes is larger than the limits.
36 * If a video stream is available, the splitting process is driven by the video
37 * stream contents, and the video stream must contain closed GOPs for the output
38 * file parts to be played individually correctly. In the absence of a video
39 * stream, the first available stream is used as reference for synchronization.
42 * <title>Example pipelines</title>
44 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
46 * Records a video stream captured from a v4l2 device and muxes it into
47 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48 * and 1MB maximum size.
57 #include <glib/gstdio.h>
58 #include <gst/video/video.h>
59 #include "gstsplitmuxsink.h"
61 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
62 #define GST_CAT_DEFAULT splitmux_debug
64 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
65 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
66 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
67 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
69 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
70 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
78 PROP_MAX_SIZE_TIMECODE,
79 PROP_SEND_KEYFRAME_REQUESTS,
86 #define DEFAULT_MAX_SIZE_TIME 0
87 #define DEFAULT_MAX_SIZE_BYTES 0
88 #define DEFAULT_MAX_FILES 0
89 #define DEFAULT_MUXER_OVERHEAD 0.02
90 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
91 #define DEFAULT_MUXER "mp4mux"
92 #define DEFAULT_SINK "filesink"
96 SIGNAL_FORMAT_LOCATION,
97 SIGNAL_FORMAT_LOCATION_FULL,
101 static guint signals[SIGNAL_LAST];
103 static GstStaticPadTemplate video_sink_template =
104 GST_STATIC_PAD_TEMPLATE ("video",
107 GST_STATIC_CAPS_ANY);
108 static GstStaticPadTemplate audio_sink_template =
109 GST_STATIC_PAD_TEMPLATE ("audio_%u",
112 GST_STATIC_CAPS_ANY);
113 static GstStaticPadTemplate subtitle_sink_template =
114 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
117 GST_STATIC_CAPS_ANY);
119 static GQuark PAD_CONTEXT;
124 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
127 #define gst_splitmux_sink_parent_class parent_class
128 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
131 static gboolean create_muxer (GstSplitMuxSink * splitmux);
132 static gboolean create_sink (GstSplitMuxSink * splitmux);
133 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
134 const GValue * value, GParamSpec * pspec);
135 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
136 GValue * value, GParamSpec * pspec);
137 static void gst_splitmux_sink_dispose (GObject * object);
138 static void gst_splitmux_sink_finalize (GObject * object);
140 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
141 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
142 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
144 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
145 element, GstStateChange transition);
147 static void bus_handler (GstBin * bin, GstMessage * msg);
148 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
149 static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
150 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
151 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
153 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
154 static GstElement *create_element (GstSplitMuxSink * splitmux,
155 const gchar * factory, const gchar * name, gboolean locked);
157 static void do_async_done (GstSplitMuxSink * splitmux);
160 mq_stream_buf_new (void)
162 return g_slice_new0 (MqStreamBuf);
166 mq_stream_buf_free (MqStreamBuf * data)
168 g_slice_free (MqStreamBuf, data);
171 static SplitMuxOutputCommand *
172 out_cmd_buf_new (void)
174 return g_slice_new0 (SplitMuxOutputCommand);
178 out_cmd_buf_free (SplitMuxOutputCommand * data)
180 g_slice_free (SplitMuxOutputCommand, data);
184 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
186 GObjectClass *gobject_class = (GObjectClass *) klass;
187 GstElementClass *gstelement_class = (GstElementClass *) klass;
188 GstBinClass *gstbin_class = (GstBinClass *) klass;
190 gobject_class->set_property = gst_splitmux_sink_set_property;
191 gobject_class->get_property = gst_splitmux_sink_get_property;
192 gobject_class->dispose = gst_splitmux_sink_dispose;
193 gobject_class->finalize = gst_splitmux_sink_finalize;
195 gst_element_class_set_static_metadata (gstelement_class,
196 "Split Muxing Bin", "Generic/Bin/Muxer",
197 "Convenience bin that muxes incoming streams into multiple time/size limited files",
198 "Jan Schmidt <jan@centricular.com>");
200 gst_element_class_add_static_pad_template (gstelement_class,
201 &video_sink_template);
202 gst_element_class_add_static_pad_template (gstelement_class,
203 &audio_sink_template);
204 gst_element_class_add_static_pad_template (gstelement_class,
205 &subtitle_sink_template);
207 gstelement_class->change_state =
208 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
209 gstelement_class->request_new_pad =
210 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
211 gstelement_class->release_pad =
212 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
214 gstbin_class->handle_message = bus_handler;
216 g_object_class_install_property (gobject_class, PROP_LOCATION,
217 g_param_spec_string ("location", "File Output Pattern",
218 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
219 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
220 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
221 g_param_spec_double ("mux-overhead", "Muxing Overhead",
222 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
223 DEFAULT_MUXER_OVERHEAD,
224 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
226 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
227 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
228 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
229 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
230 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
231 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
232 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
233 DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
234 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
235 g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
236 "Maximum difference in timecode between first and last frame. "
237 "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
238 "Will only be effective if a timecode track is present.",
239 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
240 g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
241 g_param_spec_boolean ("send-keyframe-requests",
242 "Request keyframes at max-size-time",
243 "Request a keyframe every max-size-time ns to try splitting at that point. "
244 "Needs max-size-bytes to be 0 in order to be effective.",
245 DEFAULT_SEND_KEYFRAME_REQUESTS,
246 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
247 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
248 g_param_spec_uint ("max-files", "Max files",
249 "Maximum number of files to keep on disk. Once the maximum is reached,"
250 "old files start to be deleted to make room for new ones.", 0,
251 G_MAXUINT, DEFAULT_MAX_FILES,
252 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
255 g_object_class_install_property (gobject_class, PROP_MUXER,
256 g_param_spec_object ("muxer", "Muxer",
257 "The muxer element to use (NULL = default mp4mux)",
258 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
259 g_object_class_install_property (gobject_class, PROP_SINK,
260 g_param_spec_object ("sink", "Sink",
261 "The sink element (or element chain) to use (NULL = default filesink)",
262 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
265 * GstSplitMuxSink::format-location:
266 * @splitmux: the #GstSplitMuxSink
267 * @fragment_id: the sequence number of the file to be created
269 * Returns: the location to be used for the next output file
271 signals[SIGNAL_FORMAT_LOCATION] =
272 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
273 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
276 * GstSplitMuxSink::format-location-full:
277 * @splitmux: the #GstSplitMuxSink
278 * @fragment_id: the sequence number of the file to be created
279 * @first_sample: A #GstSample containing the first buffer
280 * from the reference stream in the new file
282 * Returns: the location to be used for the next output file
284 signals[SIGNAL_FORMAT_LOCATION_FULL] =
285 g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
286 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
291 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
293 g_mutex_init (&splitmux->lock);
294 g_cond_init (&splitmux->input_cond);
295 g_cond_init (&splitmux->output_cond);
296 g_queue_init (&splitmux->out_cmd_q);
298 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
299 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
300 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
301 splitmux->max_files = DEFAULT_MAX_FILES;
302 splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
303 splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
305 splitmux->threshold_timecode_str = NULL;
307 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
311 gst_splitmux_reset (GstSplitMuxSink * splitmux)
313 if (splitmux->muxer) {
314 gst_element_set_locked_state (splitmux->muxer, TRUE);
315 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
316 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
318 if (splitmux->active_sink) {
319 gst_element_set_locked_state (splitmux->active_sink, TRUE);
320 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
321 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
324 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
328 gst_splitmux_sink_dispose (GObject * object)
330 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
332 /* Calling parent dispose invalidates all child pointers */
333 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
335 G_OBJECT_CLASS (parent_class)->dispose (object);
339 gst_splitmux_sink_finalize (GObject * object)
341 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
342 g_cond_clear (&splitmux->input_cond);
343 g_cond_clear (&splitmux->output_cond);
344 g_mutex_clear (&splitmux->lock);
345 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
346 g_queue_clear (&splitmux->out_cmd_q);
348 if (splitmux->provided_sink)
349 gst_object_unref (splitmux->provided_sink);
350 if (splitmux->provided_muxer)
351 gst_object_unref (splitmux->provided_muxer);
353 if (splitmux->threshold_timecode_str)
354 g_free (splitmux->threshold_timecode_str);
356 g_free (splitmux->location);
358 /* Make sure to free any un-released contexts */
359 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
360 g_list_free (splitmux->contexts);
362 G_OBJECT_CLASS (parent_class)->finalize (object);
366 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
367 const GValue * value, GParamSpec * pspec)
369 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
373 GST_OBJECT_LOCK (splitmux);
374 g_free (splitmux->location);
375 splitmux->location = g_value_dup_string (value);
376 GST_OBJECT_UNLOCK (splitmux);
379 case PROP_MAX_SIZE_BYTES:
380 GST_OBJECT_LOCK (splitmux);
381 splitmux->threshold_bytes = g_value_get_uint64 (value);
382 GST_OBJECT_UNLOCK (splitmux);
384 case PROP_MAX_SIZE_TIME:
385 GST_OBJECT_LOCK (splitmux);
386 splitmux->threshold_time = g_value_get_uint64 (value);
387 GST_OBJECT_UNLOCK (splitmux);
389 case PROP_MAX_SIZE_TIMECODE:
390 GST_OBJECT_LOCK (splitmux);
391 splitmux->threshold_timecode_str = g_value_dup_string (value);
392 GST_OBJECT_UNLOCK (splitmux);
394 case PROP_SEND_KEYFRAME_REQUESTS:
395 GST_OBJECT_LOCK (splitmux);
396 splitmux->send_keyframe_requests = g_value_get_boolean (value);
397 GST_OBJECT_UNLOCK (splitmux);
400 GST_OBJECT_LOCK (splitmux);
401 splitmux->max_files = g_value_get_uint (value);
402 GST_OBJECT_UNLOCK (splitmux);
404 case PROP_MUXER_OVERHEAD:
405 GST_OBJECT_LOCK (splitmux);
406 splitmux->mux_overhead = g_value_get_double (value);
407 GST_OBJECT_UNLOCK (splitmux);
410 GST_OBJECT_LOCK (splitmux);
411 if (splitmux->provided_sink)
412 gst_object_unref (splitmux->provided_sink);
413 splitmux->provided_sink = g_value_get_object (value);
414 gst_object_ref_sink (splitmux->provided_sink);
415 GST_OBJECT_UNLOCK (splitmux);
418 GST_OBJECT_LOCK (splitmux);
419 if (splitmux->provided_muxer)
420 gst_object_unref (splitmux->provided_muxer);
421 splitmux->provided_muxer = g_value_get_object (value);
422 gst_object_ref_sink (splitmux->provided_muxer);
423 GST_OBJECT_UNLOCK (splitmux);
426 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
432 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
433 GValue * value, GParamSpec * pspec)
435 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
439 GST_OBJECT_LOCK (splitmux);
440 g_value_set_string (value, splitmux->location);
441 GST_OBJECT_UNLOCK (splitmux);
443 case PROP_MAX_SIZE_BYTES:
444 GST_OBJECT_LOCK (splitmux);
445 g_value_set_uint64 (value, splitmux->threshold_bytes);
446 GST_OBJECT_UNLOCK (splitmux);
448 case PROP_MAX_SIZE_TIME:
449 GST_OBJECT_LOCK (splitmux);
450 g_value_set_uint64 (value, splitmux->threshold_time);
451 GST_OBJECT_UNLOCK (splitmux);
453 case PROP_MAX_SIZE_TIMECODE:
454 GST_OBJECT_LOCK (splitmux);
455 g_value_set_string (value, splitmux->threshold_timecode_str);
456 GST_OBJECT_UNLOCK (splitmux);
458 case PROP_SEND_KEYFRAME_REQUESTS:
459 GST_OBJECT_LOCK (splitmux);
460 g_value_set_boolean (value, splitmux->send_keyframe_requests);
461 GST_OBJECT_UNLOCK (splitmux);
464 GST_OBJECT_LOCK (splitmux);
465 g_value_set_uint (value, splitmux->max_files);
466 GST_OBJECT_UNLOCK (splitmux);
468 case PROP_MUXER_OVERHEAD:
469 GST_OBJECT_LOCK (splitmux);
470 g_value_set_double (value, splitmux->mux_overhead);
471 GST_OBJECT_UNLOCK (splitmux);
474 GST_OBJECT_LOCK (splitmux);
475 g_value_set_object (value, splitmux->provided_sink);
476 GST_OBJECT_UNLOCK (splitmux);
479 GST_OBJECT_LOCK (splitmux);
480 g_value_set_object (value, splitmux->provided_muxer);
481 GST_OBJECT_UNLOCK (splitmux);
484 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
489 /* Convenience function */
490 static inline GstClockTimeDiff
491 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
493 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
495 if (GST_CLOCK_TIME_IS_VALID (val)) {
497 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
507 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
511 ctx = g_new0 (MqStreamCtx, 1);
512 g_atomic_int_set (&ctx->refcount, 1);
513 ctx->splitmux = splitmux;
514 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
515 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
516 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
517 g_queue_init (&ctx->queued_bufs);
522 mq_stream_ctx_free (MqStreamCtx * ctx)
525 g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
526 gst_element_set_locked_state (ctx->q, TRUE);
527 gst_element_set_state (ctx->q, GST_STATE_NULL);
528 gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
529 gst_object_unref (ctx->q);
531 gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
532 gst_object_unref (ctx->sinkpad);
533 gst_object_unref (ctx->srcpad);
534 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
535 g_queue_clear (&ctx->queued_bufs);
540 mq_stream_ctx_unref (MqStreamCtx * ctx)
542 if (g_atomic_int_dec_and_test (&ctx->refcount))
543 mq_stream_ctx_free (ctx);
547 mq_stream_ctx_ref (MqStreamCtx * ctx)
549 g_atomic_int_inc (&ctx->refcount);
553 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
555 ctx->sink_pad_block_id = 0;
556 mq_stream_ctx_unref (ctx);
560 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
562 ctx->src_pad_block_id = 0;
563 mq_stream_ctx_unref (ctx);
567 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
569 gchar *location = NULL;
571 const gchar *msg_name = opened ?
572 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
574 g_object_get (splitmux->sink, "location", &location, NULL);
576 msg = gst_message_new_element (GST_OBJECT (splitmux),
577 gst_structure_new (msg_name,
578 "location", G_TYPE_STRING, location,
579 "running-time", GST_TYPE_CLOCK_TIME,
580 splitmux->reference_ctx->out_running_time, NULL));
581 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
586 /* Called with lock held, drops the lock to send EOS to the
590 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
595 eos = gst_event_new_eos ();
596 pad = gst_pad_get_peer (ctx->srcpad);
600 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
601 GST_SPLITMUX_UNLOCK (splitmux);
602 gst_pad_send_event (pad, eos);
603 GST_SPLITMUX_LOCK (splitmux);
605 gst_object_unref (pad);
608 /* Called with splitmux lock held to check if this output
609 * context needs to sleep to wait for the release of the
610 * next GOP, or to send EOS to close out the current file
613 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
615 if (ctx->caps_change)
619 /* When first starting up, the reference stream has to output
620 * the first buffer to prepare the muxer and sink */
621 gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
624 || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
627 GST_LOG_OBJECT (ctx->srcpad,
628 "Checking running time %" GST_STIME_FORMAT " against max %"
629 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
630 GST_STIME_ARGS (splitmux->max_out_running_time));
633 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
634 ctx->out_running_time < splitmux->max_out_running_time) {
638 switch (splitmux->output_state) {
639 case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
640 /* We only get here if we've finished outputting a GOP and need to know
642 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
643 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
646 case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
647 /* We've reached the max out running_time to get here, so end this file now */
648 if (ctx->out_eos == FALSE) {
649 send_eos (splitmux, ctx);
653 case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
654 if (ctx->is_reference) {
655 /* Special handling on the reference ctx to start new fragments
656 * and collect commands from the command queue */
657 /* drops the splitmux lock briefly: */
658 start_next_fragment (splitmux, ctx);
663 case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
665 SplitMuxOutputCommand *cmd =
666 g_queue_pop_tail (&splitmux->out_cmd_q);
668 /* If we pop the last command, we need to make our queues bigger */
669 if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
670 grow_blocked_queues (splitmux);
672 if (cmd->start_new_fragment) {
673 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
674 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
676 GST_DEBUG_OBJECT (splitmux,
677 "Got new output cmd for time %" GST_STIME_FORMAT,
678 GST_STIME_ARGS (cmd->max_output_ts));
680 /* Extend the output range immediately */
681 splitmux->max_out_running_time = cmd->max_output_ts;
682 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
684 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
686 out_cmd_buf_free (cmd);
689 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
691 } while (splitmux->output_state ==
692 SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
693 /* loop and re-check the state */
696 case SPLITMUX_OUTPUT_STATE_STOPPED:
701 GST_INFO_OBJECT (ctx->srcpad,
702 "Sleeping for running time %"
703 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
704 GST_STIME_ARGS (ctx->out_running_time),
705 GST_STIME_ARGS (splitmux->max_out_running_time));
706 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
707 GST_INFO_OBJECT (ctx->srcpad,
708 "Woken for new max running time %" GST_STIME_FORMAT,
709 GST_STIME_ARGS (splitmux->max_out_running_time));
715 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
716 const GstVideoTimeCode * cur_tc)
718 GstVideoTimeCode *target_tc;
719 GstVideoTimeCodeInterval *tc_inter;
720 GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
722 if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
723 return GST_CLOCK_TIME_NONE;
726 gst_video_time_code_interval_new_from_string
727 (splitmux->threshold_timecode_str);
728 target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
729 gst_video_time_code_interval_free (tc_inter);
732 target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
733 cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
735 /* Add fragment_start_time, accounting for wraparound */
736 if (target_tc_time >= cur_tc_time) {
738 target_tc_time - cur_tc_time + splitmux->fragment_start_time;
740 GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
743 day_in_ns - cur_tc_time + target_tc_time +
744 splitmux->fragment_start_time;
746 GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
747 " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
748 GST_TIME_ARGS (cur_tc_time));
749 gst_video_time_code_free (target_tc);
751 return next_max_tc_time;
755 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
758 GstClockTime target_time;
759 gboolean timecode_based = FALSE;
761 splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
762 if (splitmux->threshold_timecode_str) {
763 GstVideoTimeCodeMeta *tc_meta;
765 if (buffer != NULL) {
766 tc_meta = gst_buffer_get_video_time_code_meta (buffer);
768 splitmux->next_max_tc_time =
769 calculate_next_max_timecode (splitmux, &tc_meta->tc);
770 timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
773 /* This can happen in the presence of GAP events that trigger
774 * a new fragment start */
775 GST_WARNING_OBJECT (splitmux,
776 "No buffer available to calculate next timecode");
780 if (splitmux->send_keyframe_requests == FALSE
781 || (splitmux->threshold_time == 0 && !timecode_based)
782 || splitmux->threshold_bytes != 0)
785 if (timecode_based) {
786 /* We might have rounding errors: aim slightly earlier */
787 target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
789 target_time = splitmux->fragment_start_time + splitmux->threshold_time;
791 ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
792 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
793 GST_TIME_ARGS (target_time));
794 return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
797 static GstPadProbeReturn
798 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
800 GstSplitMuxSink *splitmux = ctx->splitmux;
801 MqStreamBuf *buf_info = NULL;
803 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
805 /* FIXME: Handle buffer lists, until then make it clear they won't work */
806 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
807 g_warning ("Buffer list handling not implemented");
808 return GST_PAD_PROBE_DROP;
810 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
811 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
812 GstEvent *event = gst_pad_probe_info_get_event (info);
813 gboolean locked = FALSE;
815 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
817 switch (GST_EVENT_TYPE (event)) {
818 case GST_EVENT_SEGMENT:
819 gst_event_copy_segment (event, &ctx->out_segment);
821 case GST_EVENT_FLUSH_STOP:
822 GST_SPLITMUX_LOCK (splitmux);
824 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
825 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
826 g_queue_clear (&ctx->queued_bufs);
827 ctx->flushing = FALSE;
829 case GST_EVENT_FLUSH_START:
830 GST_SPLITMUX_LOCK (splitmux);
832 GST_LOG_OBJECT (pad, "Flush start");
833 ctx->flushing = TRUE;
834 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
837 GST_SPLITMUX_LOCK (splitmux);
839 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
845 GstClockTimeDiff rtime;
847 gst_event_parse_gap (event, &gap_ts, NULL);
848 if (gap_ts == GST_CLOCK_TIME_NONE)
851 GST_SPLITMUX_LOCK (splitmux);
854 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
857 /* When we get a gap event on the
858 * reference stream and we're trying to open a
859 * new file, we need to store it until we get
860 * the buffer afterwards
862 if (ctx->is_reference &&
863 (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
864 GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
865 gst_event_replace (&ctx->pending_gap, event);
866 GST_SPLITMUX_UNLOCK (splitmux);
867 return GST_PAD_PROBE_HANDLED;
870 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
872 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
873 GST_STIME_ARGS (rtime));
875 if (rtime != GST_CLOCK_STIME_NONE) {
876 ctx->out_running_time = rtime;
877 complete_or_wait_on_out (splitmux, ctx);
881 case GST_EVENT_CUSTOM_DOWNSTREAM:{
882 const GstStructure *s;
883 GstClockTimeDiff ts = 0;
885 s = gst_event_get_structure (event);
886 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
889 gst_structure_get_int64 (s, "timestamp", &ts);
891 GST_SPLITMUX_LOCK (splitmux);
894 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
896 ctx->out_running_time = ts;
897 if (!ctx->is_reference)
898 complete_or_wait_on_out (splitmux, ctx);
899 GST_SPLITMUX_UNLOCK (splitmux);
900 return GST_PAD_PROBE_DROP;
902 case GST_EVENT_CAPS:{
905 if (!ctx->is_reference)
908 peer = gst_pad_get_peer (pad);
910 gboolean ok = gst_pad_send_event (peer, gst_event_ref (event));
912 gst_object_unref (peer);
919 /* This is in the case the muxer doesn't allow this change of caps */
921 GST_SPLITMUX_LOCK (splitmux);
923 ctx->caps_change = TRUE;
924 splitmux->ready_for_output = FALSE;
926 if (splitmux->output_state != SPLITMUX_OUTPUT_STATE_START_NEXT_FILE) {
928 if (ctx->out_eos == FALSE) {
929 send_eos (splitmux, ctx);
931 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
934 /* Lets it fall through, if it fails again, then the muxer just can't
935 * support this format, but at least we have a closed file.
943 /* We need to make sure events aren't passed
944 * until the muxer / sink are ready for it */
946 GST_SPLITMUX_LOCK (splitmux);
947 if (!ctx->is_reference)
948 complete_or_wait_on_out (splitmux, ctx);
949 GST_SPLITMUX_UNLOCK (splitmux);
951 /* Don't try to forward sticky events before the next buffer is there
952 * because it would cause a new file to be created without the first
953 * buffer being available.
955 if (ctx->caps_change && GST_EVENT_IS_STICKY (event))
956 return GST_PAD_PROBE_DROP;
958 return GST_PAD_PROBE_PASS;
961 /* Allow everything through until the configured next stopping point */
962 GST_SPLITMUX_LOCK (splitmux);
964 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
965 if (buf_info == NULL)
966 /* Can only happen due to a poorly timed flush */
969 /* If we have popped a keyframe, decrement the queued_gop count */
970 if (buf_info->keyframe && splitmux->queued_keyframes > 0)
971 splitmux->queued_keyframes--;
973 ctx->out_running_time = buf_info->run_ts;
974 ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
976 GST_LOG_OBJECT (splitmux,
977 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
978 " size %" G_GUINT64_FORMAT,
979 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
981 ctx->caps_change = FALSE;
983 complete_or_wait_on_out (splitmux, ctx);
985 splitmux->muxed_out_bytes += buf_info->buf_size;
987 #ifndef GST_DISABLE_GST_DEBUG
989 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
990 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
991 " run ts %" GST_STIME_FORMAT, buf,
992 GST_STIME_ARGS (ctx->out_running_time));
996 ctx->cur_out_buffer = NULL;
997 GST_SPLITMUX_UNLOCK (splitmux);
999 /* pending_gap is protected by the STREAM lock */
1000 if (ctx->pending_gap) {
1001 /* If we previously stored a gap event, send it now */
1002 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1004 GST_DEBUG_OBJECT (splitmux,
1005 "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
1007 gst_pad_send_event (peer, ctx->pending_gap);
1008 ctx->pending_gap = NULL;
1010 gst_object_unref (peer);
1013 mq_stream_buf_free (buf_info);
1015 return GST_PAD_PROBE_PASS;
1018 GST_SPLITMUX_UNLOCK (splitmux);
1019 return GST_PAD_PROBE_DROP;
1023 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
1025 return gst_pad_send_event (peer, gst_event_ref (*event));
1029 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
1031 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
1033 gst_pad_sticky_events_foreach (ctx->srcpad,
1034 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
1036 /* Clear EOS flag if not actually EOS */
1037 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
1039 gst_object_unref (peer);
1042 /* Called with lock held when a fragment
1043 * reaches EOS and it is time to restart
1047 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1049 GstElement *muxer, *sink;
1051 /* 1 change to new file */
1052 splitmux->switching_fragment = TRUE;
1054 /* We need to drop the splitmux lock to acquire the state lock
1055 * here and ensure there's no racy state change going on elsewhere */
1056 muxer = gst_object_ref (splitmux->muxer);
1057 sink = gst_object_ref (splitmux->active_sink);
1059 GST_SPLITMUX_UNLOCK (splitmux);
1060 GST_STATE_LOCK (splitmux);
1062 gst_element_set_locked_state (muxer, TRUE);
1063 gst_element_set_locked_state (sink, TRUE);
1064 gst_element_set_state (muxer, GST_STATE_NULL);
1065 gst_element_set_state (sink, GST_STATE_NULL);
1067 GST_SPLITMUX_LOCK (splitmux);
1068 if (splitmux->muxed_out_bytes > 0 || splitmux->fragment_id == 0)
1069 set_next_filename (splitmux, ctx);
1070 splitmux->muxed_out_bytes = 0;
1071 GST_SPLITMUX_UNLOCK (splitmux);
1073 gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
1074 gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
1075 gst_element_set_locked_state (muxer, FALSE);
1076 gst_element_set_locked_state (sink, FALSE);
1078 gst_object_unref (sink);
1079 gst_object_unref (muxer);
1081 GST_SPLITMUX_LOCK (splitmux);
1082 GST_STATE_UNLOCK (splitmux);
1083 splitmux->switching_fragment = FALSE;
1084 do_async_done (splitmux);
1086 splitmux->ready_for_output = TRUE;
1088 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
1090 send_fragment_opened_closed_msg (splitmux, TRUE);
1092 /* FIXME: Is this always the correct next state? */
1093 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1094 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1098 bus_handler (GstBin * bin, GstMessage * message)
1100 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
1102 switch (GST_MESSAGE_TYPE (message)) {
1103 case GST_MESSAGE_EOS:
1104 /* If the state is draining out the current file, drop this EOS */
1105 GST_SPLITMUX_LOCK (splitmux);
1107 send_fragment_opened_closed_msg (splitmux, FALSE);
1109 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
1110 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
1111 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1112 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1114 gst_message_unref (message);
1115 GST_SPLITMUX_UNLOCK (splitmux);
1118 GST_DEBUG_OBJECT (splitmux,
1119 "Passing EOS message. Output state %d max_out_running_time %"
1120 GST_STIME_FORMAT, splitmux->output_state,
1121 GST_STIME_ARGS (splitmux->max_out_running_time));
1123 GST_SPLITMUX_UNLOCK (splitmux);
1125 case GST_MESSAGE_ASYNC_START:
1126 case GST_MESSAGE_ASYNC_DONE:
1127 /* Ignore state changes from our children while switching */
1128 if (splitmux->switching_fragment) {
1129 if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
1130 || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
1131 GST_LOG_OBJECT (splitmux,
1132 "Ignoring state change from child %" GST_PTR_FORMAT
1133 " while switching", GST_MESSAGE_SRC (message));
1134 gst_message_unref (message);
1143 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1147 ctx_set_unblock (MqStreamCtx * ctx)
1149 ctx->need_unblock = TRUE;
1152 /* Called with splitmux lock held */
1153 /* Called when entering ProcessingCompleteGop state
1154 * Assess if mq contents overflowed the current file
1155 * -> If yes, need to switch to new file
1156 * -> if no, set max_out_running_time to let this GOP in and
1157 * go to COLLECTING_GOP_START state
1160 handle_gathered_gop (GstSplitMuxSink * splitmux)
1162 guint64 queued_bytes;
1163 GstClockTimeDiff queued_time = 0;
1164 GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
1165 SplitMuxOutputCommand *cmd;
1167 /* Assess if the multiqueue contents overflowed the current file */
1168 /* When considering if a newly gathered GOP overflows
1169 * the time limit for the file, only consider the running time of the
1170 * reference stream. Other streams might have run ahead a little bit,
1171 * but extra pieces won't be released to the muxer beyond the reference
1172 * stream cut-off anyway - so it forms the limit. */
1173 queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
1174 queued_time = splitmux->reference_ctx->in_running_time;
1176 GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
1178 g_assert (queued_time >= splitmux->fragment_start_time);
1180 queued_time -= splitmux->fragment_start_time;
1182 /* Expand queued bytes estimate by muxer overhead */
1183 queued_bytes += (queued_bytes * splitmux->mux_overhead);
1185 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
1186 " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
1187 if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
1188 GST_LOG_OBJECT (splitmux,
1189 "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
1190 GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
1191 GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
1194 /* Check for overrun - have we output at least one byte and overrun
1195 * either threshold? */
1196 /* Timecode-based threshold accounts for possible rounding errors:
1197 * 5us should be bigger than all possible rounding errors but nowhere near
1198 * big enough to skip to another frame */
1199 if ((splitmux->fragment_total_bytes > 0 &&
1200 ((splitmux->threshold_bytes > 0 &&
1201 queued_bytes > splitmux->threshold_bytes) ||
1202 (splitmux->threshold_time > 0 &&
1203 queued_time > splitmux->threshold_time) ||
1204 (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
1205 splitmux->reference_ctx->in_running_time >
1206 splitmux->next_max_tc_time + 5 * GST_USECOND)))) {
1208 /* Tell the output side to start a new fragment */
1209 GST_INFO_OBJECT (splitmux,
1210 "This GOP (dur %" GST_STIME_FORMAT
1211 ") would overflow the fragment, Sending start_new_fragment cmd",
1212 GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
1213 splitmux->gop_start_time));
1214 cmd = out_cmd_buf_new ();
1215 cmd->start_new_fragment = TRUE;
1216 g_queue_push_head (&splitmux->out_cmd_q, cmd);
1217 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1219 new_out_ts = splitmux->reference_ctx->in_running_time;
1220 splitmux->fragment_start_time = splitmux->gop_start_time;
1221 splitmux->fragment_total_bytes = 0;
1223 if (request_next_keyframe (splitmux,
1224 splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
1225 GST_WARNING_OBJECT (splitmux,
1226 "Could not request a keyframe. Files may not split at the exact location they should");
1228 gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
1231 /* And set up to collect the next GOP */
1232 if (!splitmux->reference_ctx->in_eos) {
1233 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
1234 splitmux->gop_start_time = new_out_ts;
1236 /* This is probably already the current state, but just in case: */
1237 splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
1238 new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */
1241 /* And wake all input contexts to send a wake-up event */
1242 g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
1243 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1245 /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
1246 splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
1248 if (splitmux->gop_total_bytes > 0) {
1249 GST_LOG_OBJECT (splitmux,
1250 "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
1251 " time %" GST_STIME_FORMAT,
1252 splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
1254 /* Send this GOP to the output command queue */
1255 cmd = out_cmd_buf_new ();
1256 cmd->start_new_fragment = FALSE;
1257 cmd->max_output_ts = new_out_ts;
1258 GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
1259 GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
1260 g_queue_push_head (&splitmux->out_cmd_q, cmd);
1262 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1265 splitmux->gop_total_bytes = 0;
1268 /* Called with splitmux lock held */
1269 /* Called from each input pad when it is has all the pieces
1270 * for a GOP or EOS, starting with the reference pad which has set the
1271 * splitmux->max_in_running_time
1274 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1279 /* On ENDING_FILE, the reference stream sends a command to start a new
1280 * fragment, then releases the GOP for output in the new fragment.
1281 * If somes streams received no buffer during the last GOP that overran,
1282 * because its next buffer has a timestamp bigger than
1283 * ctx->max_in_running_time, its queue is empty. In that case the only
1284 * way to wakeup the output thread is by injecting an event in the
1285 * queue. This usually happen with subtitle streams.
1286 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1287 if (ctx->need_unblock) {
1288 GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
1289 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1290 GST_EVENT_TYPE_SERIALIZED,
1291 gst_structure_new ("splitmuxsink-unblock", "timestamp",
1292 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
1294 GST_SPLITMUX_UNLOCK (splitmux);
1295 gst_pad_send_event (ctx->sinkpad, event);
1296 GST_SPLITMUX_LOCK (splitmux);
1298 ctx->need_unblock = FALSE;
1299 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1300 /* state may have changed while we were unlocked. Loop again if so */
1301 if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
1305 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
1306 gboolean ready = TRUE;
1308 /* Iterate each pad, and check that the input running time is at least
1309 * up to the reference running time, and if so handle the collected GOP */
1310 GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
1311 GST_STIME_FORMAT " ctx %p",
1312 GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
1313 for (cur = g_list_first (splitmux->contexts); cur != NULL;
1314 cur = g_list_next (cur)) {
1315 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1317 GST_LOG_OBJECT (splitmux,
1318 "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
1319 " EOS %d", tmpctx, tmpctx->srcpad,
1320 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
1322 if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
1323 tmpctx->in_running_time < splitmux->max_in_running_time &&
1325 GST_LOG_OBJECT (splitmux,
1326 "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
1327 tmpctx, tmpctx->srcpad);
1333 GST_DEBUG_OBJECT (splitmux,
1334 "Collected GOP is complete. Processing (ctx %p)", ctx);
1335 /* All pads have a complete GOP, release it into the multiqueue */
1336 handle_gathered_gop (splitmux);
1340 /* If upstream reached EOS we are not expecting more data, no need to wait
1345 /* Some pad is not yet ready, or GOP is being pushed
1346 * either way, sleep and wait to get woken */
1347 while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
1349 (ctx->in_running_time >= splitmux->max_in_running_time) &&
1350 (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
1352 GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
1353 GST_SPLITMUX_WAIT_INPUT (splitmux);
1354 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
1358 static GstPadProbeReturn
1359 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1361 GstSplitMuxSink *splitmux = ctx->splitmux;
1363 MqStreamBuf *buf_info = NULL;
1365 gboolean loop_again;
1366 gboolean keyframe = FALSE;
1368 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1370 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1371 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1372 g_warning ("Buffer list handling not implemented");
1373 return GST_PAD_PROBE_DROP;
1375 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM ||
1376 info->type & GST_PAD_PROBE_TYPE_EVENT_FLUSH) {
1377 GstEvent *event = gst_pad_probe_info_get_event (info);
1379 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1381 switch (GST_EVENT_TYPE (event)) {
1382 case GST_EVENT_SEGMENT:
1383 gst_event_copy_segment (event, &ctx->in_segment);
1385 case GST_EVENT_FLUSH_STOP:
1386 GST_SPLITMUX_LOCK (splitmux);
1387 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1388 ctx->in_eos = FALSE;
1389 ctx->in_running_time = GST_CLOCK_STIME_NONE;
1390 GST_SPLITMUX_UNLOCK (splitmux);
1393 GST_SPLITMUX_LOCK (splitmux);
1396 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
1399 if (ctx->is_reference) {
1400 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1401 /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
1402 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
1403 /* Wake up other input pads to collect this GOP */
1404 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1405 check_completed_gop (splitmux, ctx);
1406 } else if (splitmux->input_state ==
1407 SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
1408 /* If we are waiting for a GOP to be completed (ie, for aux
1409 * pads to catch up), then this pad is complete, so check
1410 * if the whole GOP is.
1412 check_completed_gop (splitmux, ctx);
1414 GST_SPLITMUX_UNLOCK (splitmux);
1416 case GST_EVENT_GAP:{
1417 GstClockTime gap_ts;
1418 GstClockTimeDiff rtime;
1420 gst_event_parse_gap (event, &gap_ts, NULL);
1421 if (gap_ts == GST_CLOCK_TIME_NONE)
1424 GST_SPLITMUX_LOCK (splitmux);
1426 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
1428 rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
1430 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1431 GST_STIME_ARGS (rtime));
1433 if (ctx->is_reference
1434 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
1435 splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
1436 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1437 GST_STIME_ARGS (splitmux->fragment_start_time));
1438 /* Also take this as the first start time when starting up,
1439 * so that we start counting overflow from the first frame */
1440 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1441 splitmux->max_in_running_time = splitmux->fragment_start_time;
1444 GST_SPLITMUX_UNLOCK (splitmux);
1450 return GST_PAD_PROBE_PASS;
1453 buf = gst_pad_probe_info_get_buffer (info);
1454 buf_info = mq_stream_buf_new ();
1456 if (GST_BUFFER_PTS_IS_VALID (buf))
1457 ts = GST_BUFFER_PTS (buf);
1459 ts = GST_BUFFER_DTS (buf);
1461 GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
1463 GST_SPLITMUX_LOCK (splitmux);
1465 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
1468 /* If this buffer has a timestamp, advance the input timestamp of the
1470 if (GST_CLOCK_TIME_IS_VALID (ts)) {
1471 GstClockTimeDiff running_time =
1472 my_segment_to_running_time (&ctx->in_segment, ts);
1474 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
1475 GST_STIME_ARGS (running_time));
1477 if (GST_CLOCK_STIME_IS_VALID (running_time)
1478 && running_time > ctx->in_running_time)
1479 ctx->in_running_time = running_time;
1482 /* Try to make sure we have a valid running time */
1483 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
1484 ctx->in_running_time =
1485 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
1488 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
1489 GST_STIME_ARGS (ctx->in_running_time));
1491 buf_info->run_ts = ctx->in_running_time;
1492 buf_info->buf_size = gst_buffer_get_size (buf);
1493 buf_info->duration = GST_BUFFER_DURATION (buf);
1495 /* initialize fragment_start_time */
1496 if (ctx->is_reference
1497 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
1498 splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
1499 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1500 GST_STIME_ARGS (splitmux->fragment_start_time));
1501 gst_buffer_replace (&ctx->prev_in_keyframe, buf);
1503 /* Also take this as the first start time when starting up,
1504 * so that we start counting overflow from the first frame */
1505 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1506 splitmux->max_in_running_time = splitmux->fragment_start_time;
1507 if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
1508 GST_WARNING_OBJECT (splitmux,
1509 "Could not request a keyframe. Files may not split at the exact location they should");
1511 gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
1514 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
1515 " total GOP bytes %" G_GUINT64_FORMAT,
1516 GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
1523 switch (splitmux->input_state) {
1524 case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
1525 if (ctx->is_reference) {
1526 /* If a keyframe, we have a complete GOP */
1527 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1528 !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
1529 splitmux->max_in_running_time >= ctx->in_running_time) {
1530 /* Pass this buffer through */
1532 /* Allow other input pads to catch up to here too */
1533 splitmux->max_in_running_time = ctx->in_running_time;
1534 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1537 GST_INFO_OBJECT (pad,
1538 "Have keyframe with running time %" GST_STIME_FORMAT,
1539 GST_STIME_ARGS (ctx->in_running_time));
1541 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
1542 splitmux->max_in_running_time = ctx->in_running_time;
1543 /* Wake up other input pads to collect this GOP */
1544 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1545 check_completed_gop (splitmux, ctx);
1546 /* Store this new keyframe to remember the start of GOP */
1547 gst_buffer_replace (&ctx->prev_in_keyframe, buf);
1549 /* Pass this buffer if the reference ctx is far enough ahead */
1550 if (ctx->in_running_time < splitmux->max_in_running_time) {
1555 /* We're still waiting for a keyframe on the reference pad, sleep */
1556 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1557 GST_SPLITMUX_WAIT_INPUT (splitmux);
1558 GST_LOG_OBJECT (pad,
1559 "Done sleeping for GOP start input state now %d",
1560 splitmux->input_state);
1563 case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
1564 /* We're collecting a GOP. If this is the reference context,
1565 * we need to check if this is a keyframe that marks the start
1566 * of the next GOP. If it is, it marks the end of the GOP we're
1567 * collecting, so sleep and wait until all the other pads also
1568 * reach that timestamp - at which point, we have an entire GOP
1569 * and either go to ENDING_FILE or release this GOP to the muxer and
1570 * go back to COLLECT_GOP_START. */
1572 /* If we overran the target timestamp, it might be time to process
1573 * the GOP, otherwise bail out for more data
1575 GST_LOG_OBJECT (pad,
1576 "Checking TS %" GST_STIME_FORMAT " against max %"
1577 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
1578 GST_STIME_ARGS (splitmux->max_in_running_time));
1580 if (ctx->in_running_time < splitmux->max_in_running_time) {
1585 GST_LOG_OBJECT (pad,
1586 "Collected last packet of GOP. Checking other pads");
1587 check_completed_gop (splitmux, ctx);
1590 case SPLITMUX_INPUT_STATE_FINISHING_UP:
1601 splitmux->queued_keyframes++;
1602 buf_info->keyframe = TRUE;
1605 /* Update total input byte counter for overflow detect */
1606 splitmux->gop_total_bytes += buf_info->buf_size;
1608 /* Now add this buffer to the queue just before returning */
1609 g_queue_push_head (&ctx->queued_bufs, buf_info);
1611 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1612 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
1614 GST_SPLITMUX_UNLOCK (splitmux);
1615 return GST_PAD_PROBE_PASS;
1618 GST_SPLITMUX_UNLOCK (splitmux);
1620 mq_stream_buf_free (buf_info);
1621 return GST_PAD_PROBE_PASS;
1625 grow_blocked_queues (GstSplitMuxSink * splitmux)
1629 /* Scan other queues for full-ness and grow them */
1630 for (cur = g_list_first (splitmux->contexts);
1631 cur != NULL; cur = g_list_next (cur)) {
1632 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1634 guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
1636 g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
1637 GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
1639 if (cur_len >= cur_limit) {
1640 cur_limit = cur_len + 1;
1641 GST_DEBUG_OBJECT (tmpctx->q,
1642 "Queue overflowed and needs enlarging. Growing to %u buffers",
1644 g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
1650 handle_q_underrun (GstElement * q, gpointer user_data)
1652 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
1653 GstSplitMuxSink *splitmux = ctx->splitmux;
1655 GST_SPLITMUX_LOCK (splitmux);
1656 GST_DEBUG_OBJECT (q,
1657 "Queue reported underrun with %d keyframes and %d cmds enqueued",
1658 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
1659 grow_blocked_queues (splitmux);
1660 GST_SPLITMUX_UNLOCK (splitmux);
1664 handle_q_overrun (GstElement * q, gpointer user_data)
1666 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
1667 GstSplitMuxSink *splitmux = ctx->splitmux;
1668 gboolean allow_grow = FALSE;
1670 GST_SPLITMUX_LOCK (splitmux);
1671 GST_DEBUG_OBJECT (q,
1672 "Queue reported overrun with %d keyframes and %d cmds enqueued",
1673 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
1675 if (splitmux->queued_keyframes < 2) {
1676 /* Less than a full GOP queued, grow the queue */
1678 } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
1681 /* If another queue is starved, grow */
1683 for (cur = g_list_first (splitmux->contexts);
1684 cur != NULL; cur = g_list_next (cur)) {
1685 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1686 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1691 GST_SPLITMUX_UNLOCK (splitmux);
1696 g_object_get (q, "max-size-buffers", &cur_limit, NULL);
1699 GST_DEBUG_OBJECT (q,
1700 "Queue overflowed and needs enlarging. Growing to %u buffers",
1703 g_object_set (q, "max-size-buffers", cur_limit, NULL);
1708 gst_splitmux_sink_request_new_pad (GstElement * element,
1709 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1711 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1712 GstPadTemplate *mux_template = NULL;
1715 GstPad *q_sink = NULL, *q_src = NULL;
1717 gboolean is_video = FALSE;
1720 GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1722 GST_SPLITMUX_LOCK (splitmux);
1723 if (!create_muxer (splitmux))
1726 if (templ->name_template) {
1727 if (g_str_equal (templ->name_template, "video")) {
1728 if (splitmux->have_video)
1729 goto already_have_video;
1731 /* FIXME: Look for a pad template with matching caps, rather than by name */
1733 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1734 (splitmux->muxer), "video_%u");
1736 /* Fallback to find sink pad templates named 'video' (flvmux) */
1737 if (!mux_template) {
1739 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1740 (splitmux->muxer), "video");
1746 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1747 (splitmux->muxer), templ->name_template);
1749 /* Fallback to find sink pad templates named 'audio' (flvmux) */
1750 if (!mux_template) {
1752 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1753 (splitmux->muxer), "audio");
1756 if (mux_template == NULL) {
1757 /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1759 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1760 (splitmux->muxer), "sink_%d");
1764 res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1769 gname = g_strdup ("video");
1770 else if (name == NULL)
1771 gname = gst_pad_get_name (res);
1773 gname = g_strdup (name);
1775 if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
1778 gst_element_set_state (q, GST_STATE_TARGET (splitmux));
1780 g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
1781 "max-size-buffers", 5, NULL);
1783 q_sink = gst_element_get_static_pad (q, "sink");
1784 q_src = gst_element_get_static_pad (q, "src");
1786 if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
1787 gst_element_release_request_pad (splitmux->muxer, res);
1788 gst_object_unref (GST_OBJECT (res));
1792 gst_object_unref (GST_OBJECT (res));
1794 ctx = mq_stream_ctx_new (splitmux);
1795 /* Context holds a ref: */
1796 ctx->q = gst_object_ref (q);
1797 ctx->srcpad = q_src;
1798 ctx->sinkpad = q_sink;
1800 g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
1801 g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
1803 mq_stream_ctx_ref (ctx);
1804 ctx->src_pad_block_id =
1805 gst_pad_add_probe (q_src,
1806 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
1807 (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1808 _pad_block_destroy_src_notify);
1809 if (is_video && splitmux->reference_ctx != NULL) {
1810 splitmux->reference_ctx->is_reference = FALSE;
1811 splitmux->reference_ctx = NULL;
1813 if (splitmux->reference_ctx == NULL) {
1814 splitmux->reference_ctx = ctx;
1815 ctx->is_reference = TRUE;
1818 res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
1819 g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1821 mq_stream_ctx_ref (ctx);
1822 ctx->sink_pad_block_id =
1823 gst_pad_add_probe (q_sink,
1824 GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
1825 (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1826 _pad_block_destroy_sink_notify);
1828 GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1829 " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
1831 splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1836 splitmux->have_video = TRUE;
1838 gst_pad_set_active (res, TRUE);
1839 gst_element_add_pad (element, res);
1841 GST_SPLITMUX_UNLOCK (splitmux);
1845 GST_SPLITMUX_UNLOCK (splitmux);
1848 gst_object_unref (q_sink);
1850 gst_object_unref (q_src);
1853 GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
1854 GST_SPLITMUX_UNLOCK (splitmux);
1859 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1861 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1862 GstPad *muxpad = NULL;
1864 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1866 GST_SPLITMUX_LOCK (splitmux);
1868 if (splitmux->muxer == NULL)
1869 goto fail; /* Elements don't exist yet - nothing to release */
1871 GST_INFO_OBJECT (pad, "releasing request pad");
1873 muxpad = gst_pad_get_peer (ctx->srcpad);
1875 /* Remove the context from our consideration */
1876 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1878 if (ctx->sink_pad_block_id)
1879 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1881 if (ctx->src_pad_block_id)
1882 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1884 /* Can release the context now */
1885 mq_stream_ctx_unref (ctx);
1886 if (ctx == splitmux->reference_ctx)
1887 splitmux->reference_ctx = NULL;
1889 /* Release and free the muxer input */
1891 gst_element_release_request_pad (splitmux->muxer, muxpad);
1892 gst_object_unref (muxpad);
1895 if (GST_PAD_PAD_TEMPLATE (pad) &&
1896 g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
1898 splitmux->have_video = FALSE;
1900 gst_element_remove_pad (element, pad);
1902 /* Reset the internal elements only after all request pads are released */
1903 if (splitmux->contexts == NULL)
1904 gst_splitmux_reset (splitmux);
1907 GST_SPLITMUX_UNLOCK (splitmux);
1911 create_element (GstSplitMuxSink * splitmux,
1912 const gchar * factory, const gchar * name, gboolean locked)
1914 GstElement *ret = gst_element_factory_make (factory, name);
1916 g_warning ("Failed to create %s - splitmuxsink will not work", name);
1921 /* Ensure the sink starts in locked state and NULL - it will be changed
1922 * by the filename setting code */
1923 gst_element_set_locked_state (ret, TRUE);
1924 gst_element_set_state (ret, GST_STATE_NULL);
1927 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1928 g_warning ("Could not add %s element - splitmuxsink will not work", name);
1929 gst_object_unref (ret);
1937 create_muxer (GstSplitMuxSink * splitmux)
1939 /* Create internal elements */
1940 if (splitmux->muxer == NULL) {
1941 GstElement *provided_muxer = NULL;
1943 GST_OBJECT_LOCK (splitmux);
1944 if (splitmux->provided_muxer != NULL)
1945 provided_muxer = gst_object_ref (splitmux->provided_muxer);
1946 GST_OBJECT_UNLOCK (splitmux);
1948 if (provided_muxer == NULL) {
1949 if ((splitmux->muxer =
1950 create_element (splitmux, "mp4mux", "muxer", FALSE)) == NULL)
1953 /* Ensure it's not in locked state (we might be reusing an old element) */
1954 gst_element_set_locked_state (provided_muxer, FALSE);
1955 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1956 g_warning ("Could not add muxer element - splitmuxsink will not work");
1957 gst_object_unref (provided_muxer);
1961 splitmux->muxer = provided_muxer;
1962 gst_object_unref (provided_muxer);
1972 find_sink (GstElement * e)
1974 GstElement *res = NULL;
1976 gboolean done = FALSE;
1977 GValue data = { 0, };
1979 if (!GST_IS_BIN (e))
1982 if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
1985 iter = gst_bin_iterate_sinks (GST_BIN (e));
1987 switch (gst_iterator_next (iter, &data)) {
1988 case GST_ITERATOR_OK:
1990 GstElement *child = g_value_get_object (&data);
1991 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1992 "location") != NULL) {
1996 g_value_reset (&data);
1999 case GST_ITERATOR_RESYNC:
2000 gst_iterator_resync (iter);
2002 case GST_ITERATOR_DONE:
2005 case GST_ITERATOR_ERROR:
2006 g_assert_not_reached ();
2010 g_value_unset (&data);
2011 gst_iterator_free (iter);
2017 create_sink (GstSplitMuxSink * splitmux)
2019 GstElement *provided_sink = NULL;
2021 if (splitmux->active_sink == NULL) {
2023 GST_OBJECT_LOCK (splitmux);
2024 if (splitmux->provided_sink != NULL)
2025 provided_sink = gst_object_ref (splitmux->provided_sink);
2026 GST_OBJECT_UNLOCK (splitmux);
2028 if (provided_sink == NULL) {
2029 if ((splitmux->sink =
2030 create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
2032 splitmux->active_sink = splitmux->sink;
2034 /* Ensure the sink starts in locked state and NULL - it will be changed
2035 * by the filename setting code */
2036 gst_element_set_locked_state (provided_sink, TRUE);
2037 gst_element_set_state (provided_sink, GST_STATE_NULL);
2038 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
2039 g_warning ("Could not add sink elements - splitmuxsink will not work");
2040 gst_object_unref (provided_sink);
2044 splitmux->active_sink = provided_sink;
2046 /* The bin holds a ref now, we can drop our tmp ref */
2047 gst_object_unref (provided_sink);
2049 /* Find the sink element */
2050 splitmux->sink = find_sink (splitmux->active_sink);
2051 if (splitmux->sink == NULL) {
2053 ("Could not locate sink element in provided sink - splitmuxsink will not work");
2059 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2061 /* async child elements are causing state change races and weird
2062 * failures, so let's try and turn that off */
2063 g_object_set (splitmux->sink, "async", FALSE, NULL);
2067 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
2068 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
2079 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
2082 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2084 gchar *fname = NULL;
2088 gst_splitmux_sink_ensure_max_files (splitmux);
2090 if (ctx->cur_out_buffer == NULL) {
2091 GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
2094 caps = gst_pad_get_current_caps (ctx->srcpad);
2095 sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
2096 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
2097 splitmux->fragment_id, sample, &fname);
2098 gst_sample_unref (sample);
2100 gst_caps_unref (caps);
2102 if (fname == NULL) {
2103 /* Fallback to the old signal if the new one returned nothing */
2104 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
2105 splitmux->fragment_id, &fname);
2109 fname = splitmux->location ?
2110 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
2113 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
2114 g_object_set (splitmux->sink, "location", fname, NULL);
2117 splitmux->fragment_id++;
2122 do_async_start (GstSplitMuxSink * splitmux)
2124 GstMessage *message;
2126 if (!splitmux->need_async_start) {
2127 GST_INFO_OBJECT (splitmux, "no async_start needed");
2131 splitmux->async_pending = TRUE;
2133 GST_INFO_OBJECT (splitmux, "Sending async_start message");
2134 message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
2135 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2136 (splitmux), message);
2140 do_async_done (GstSplitMuxSink * splitmux)
2142 GstMessage *message;
2144 if (splitmux->async_pending) {
2145 GST_INFO_OBJECT (splitmux, "Sending async_done message");
2147 gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
2148 GST_CLOCK_TIME_NONE);
2149 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2150 (splitmux), message);
2152 splitmux->async_pending = FALSE;
2155 splitmux->need_async_start = FALSE;
2158 static GstStateChangeReturn
2159 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
2161 GstStateChangeReturn ret;
2162 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2164 switch (transition) {
2165 case GST_STATE_CHANGE_NULL_TO_READY:{
2166 GST_SPLITMUX_LOCK (splitmux);
2167 if (!create_muxer (splitmux) || !create_sink (splitmux)) {
2168 ret = GST_STATE_CHANGE_FAILURE;
2169 GST_SPLITMUX_UNLOCK (splitmux);
2172 GST_SPLITMUX_UNLOCK (splitmux);
2173 splitmux->fragment_id = 0;
2176 case GST_STATE_CHANGE_READY_TO_PAUSED:{
2177 GST_SPLITMUX_LOCK (splitmux);
2178 /* Start by collecting one input on each pad */
2179 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2180 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2181 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
2182 splitmux->gop_start_time = splitmux->fragment_start_time =
2183 GST_CLOCK_STIME_NONE;
2184 splitmux->muxed_out_bytes = 0;
2185 splitmux->ready_for_output = FALSE;
2186 GST_SPLITMUX_UNLOCK (splitmux);
2189 case GST_STATE_CHANGE_PAUSED_TO_READY:
2190 case GST_STATE_CHANGE_READY_TO_NULL:
2191 GST_SPLITMUX_LOCK (splitmux);
2192 splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
2193 splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
2194 /* Wake up any blocked threads */
2195 GST_LOG_OBJECT (splitmux,
2196 "State change -> NULL or READY. Waking threads");
2197 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2198 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2199 GST_SPLITMUX_UNLOCK (splitmux);
2205 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2206 if (ret == GST_STATE_CHANGE_FAILURE)
2209 switch (transition) {
2210 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2211 splitmux->need_async_start = TRUE;
2213 case GST_STATE_CHANGE_READY_TO_PAUSED:{
2214 /* Change state async, because our child sink might not
2215 * be ready to do that for us yet if it's state is still locked */
2217 splitmux->need_async_start = TRUE;
2218 /* we want to go async to PAUSED until we managed to configure and add the
2220 GST_SPLITMUX_LOCK (splitmux);
2221 do_async_start (splitmux);
2222 GST_SPLITMUX_UNLOCK (splitmux);
2223 ret = GST_STATE_CHANGE_ASYNC;
2226 case GST_STATE_CHANGE_READY_TO_NULL:
2227 GST_SPLITMUX_LOCK (splitmux);
2228 splitmux->fragment_id = 0;
2229 /* Reset internal elements only if no pad contexts are using them */
2230 if (splitmux->contexts == NULL)
2231 gst_splitmux_reset (splitmux);
2232 do_async_done (splitmux);
2233 GST_SPLITMUX_UNLOCK (splitmux);
2240 if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
2241 ret == GST_STATE_CHANGE_FAILURE) {
2242 /* Cleanup elements on failed transition out of NULL */
2243 gst_splitmux_reset (splitmux);
2244 GST_SPLITMUX_LOCK (splitmux);
2245 do_async_done (splitmux);
2246 GST_SPLITMUX_UNLOCK (splitmux);
2252 register_splitmuxsink (GstPlugin * plugin)
2254 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
2255 "Split File Muxing Sink");
2257 return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
2258 GST_TYPE_SPLITMUX_SINK);
2262 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
2264 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
2265 splitmux->fragment_id = 0;