1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @short_description: Muxer wrapper for splitting output stream by size or time
24 * This element wraps a muxer and a sink, and starts a new file when the mux
25 * contents are about to cross a threshold of maximum size of maximum time,
26 * splitting at video keyframe boundaries. Exactly one input video stream
27 * can be muxed, with as many accompanying audio and subtitle streams as
30 * By default, it uses mp4mux and filesink, but they can be changed via
31 * the 'muxer' and 'sink' properties.
33 * The minimum file size is 1 GOP, however - so limits may be overrun if the
34 * distance between any 2 keyframes is larger than the limits.
36 * If a video stream is available, the splitting process is driven by the video
37 * stream contents, and the video stream must contain closed GOPs for the output
38 * file parts to be played individually correctly. In the absence of a video
39 * stream, the first available stream is used as reference for synchronization.
42 * <title>Example pipelines</title>
44 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
46 * Records a video stream captured from a v4l2 device and muxes it into
47 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48 * and 1MB maximum size.
57 #include <glib/gstdio.h>
58 #include <gst/video/video.h>
59 #include "gstsplitmuxsink.h"
61 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
62 #define GST_CAT_DEFAULT splitmux_debug
64 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
65 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
66 #define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
67 #define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
69 #define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
70 #define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
78 PROP_MAX_SIZE_TIMECODE,
79 PROP_SEND_KEYFRAME_REQUESTS,
86 #define DEFAULT_MAX_SIZE_TIME 0
87 #define DEFAULT_MAX_SIZE_BYTES 0
88 #define DEFAULT_MAX_FILES 0
89 #define DEFAULT_MUXER_OVERHEAD 0.02
90 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
91 #define DEFAULT_MUXER "mp4mux"
92 #define DEFAULT_SINK "filesink"
96 SIGNAL_FORMAT_LOCATION,
97 SIGNAL_FORMAT_LOCATION_FULL,
101 static guint signals[SIGNAL_LAST];
103 static GstStaticPadTemplate video_sink_template =
104 GST_STATIC_PAD_TEMPLATE ("video",
107 GST_STATIC_CAPS_ANY);
108 static GstStaticPadTemplate audio_sink_template =
109 GST_STATIC_PAD_TEMPLATE ("audio_%u",
112 GST_STATIC_CAPS_ANY);
113 static GstStaticPadTemplate subtitle_sink_template =
114 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
117 GST_STATIC_CAPS_ANY);
119 static GQuark PAD_CONTEXT;
124 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
127 #define gst_splitmux_sink_parent_class parent_class
128 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
131 static gboolean create_muxer (GstSplitMuxSink * splitmux);
132 static gboolean create_sink (GstSplitMuxSink * splitmux);
133 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
134 const GValue * value, GParamSpec * pspec);
135 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
136 GValue * value, GParamSpec * pspec);
137 static void gst_splitmux_sink_dispose (GObject * object);
138 static void gst_splitmux_sink_finalize (GObject * object);
140 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
141 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
142 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
144 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
145 element, GstStateChange transition);
147 static void bus_handler (GstBin * bin, GstMessage * msg);
148 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
149 static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
150 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
151 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
153 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
154 static GstElement *create_element (GstSplitMuxSink * splitmux,
155 const gchar * factory, const gchar * name, gboolean locked);
157 static void do_async_done (GstSplitMuxSink * splitmux);
160 mq_stream_buf_new (void)
162 return g_slice_new0 (MqStreamBuf);
166 mq_stream_buf_free (MqStreamBuf * data)
168 g_slice_free (MqStreamBuf, data);
171 static SplitMuxOutputCommand *
172 out_cmd_buf_new (void)
174 return g_slice_new0 (SplitMuxOutputCommand);
178 out_cmd_buf_free (SplitMuxOutputCommand * data)
180 g_slice_free (SplitMuxOutputCommand, data);
184 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
186 GObjectClass *gobject_class = (GObjectClass *) klass;
187 GstElementClass *gstelement_class = (GstElementClass *) klass;
188 GstBinClass *gstbin_class = (GstBinClass *) klass;
190 gobject_class->set_property = gst_splitmux_sink_set_property;
191 gobject_class->get_property = gst_splitmux_sink_get_property;
192 gobject_class->dispose = gst_splitmux_sink_dispose;
193 gobject_class->finalize = gst_splitmux_sink_finalize;
195 gst_element_class_set_static_metadata (gstelement_class,
196 "Split Muxing Bin", "Generic/Bin/Muxer",
197 "Convenience bin that muxes incoming streams into multiple time/size limited files",
198 "Jan Schmidt <jan@centricular.com>");
200 gst_element_class_add_static_pad_template (gstelement_class,
201 &video_sink_template);
202 gst_element_class_add_static_pad_template (gstelement_class,
203 &audio_sink_template);
204 gst_element_class_add_static_pad_template (gstelement_class,
205 &subtitle_sink_template);
207 gstelement_class->change_state =
208 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
209 gstelement_class->request_new_pad =
210 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
211 gstelement_class->release_pad =
212 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
214 gstbin_class->handle_message = bus_handler;
216 g_object_class_install_property (gobject_class, PROP_LOCATION,
217 g_param_spec_string ("location", "File Output Pattern",
218 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
219 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
220 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
221 g_param_spec_double ("mux-overhead", "Muxing Overhead",
222 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
223 DEFAULT_MUXER_OVERHEAD,
224 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
226 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
227 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
228 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
229 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
230 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
231 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
232 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
233 DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
234 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
235 g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
236 "Maximum difference in timecode between first and last frame. "
237 "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
238 "Will only be effective if a timecode track is present.",
239 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
240 g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
241 g_param_spec_boolean ("send-keyframe-requests",
242 "Request keyframes at max-size-time",
243 "Request a keyframe every max-size-time ns to try splitting at that point. "
244 "Needs max-size-bytes to be 0 in order to be effective.",
245 DEFAULT_SEND_KEYFRAME_REQUESTS,
246 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
247 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
248 g_param_spec_uint ("max-files", "Max files",
249 "Maximum number of files to keep on disk. Once the maximum is reached,"
250 "old files start to be deleted to make room for new ones.", 0,
251 G_MAXUINT, DEFAULT_MAX_FILES,
252 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
255 g_object_class_install_property (gobject_class, PROP_MUXER,
256 g_param_spec_object ("muxer", "Muxer",
257 "The muxer element to use (NULL = default mp4mux)",
258 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
259 g_object_class_install_property (gobject_class, PROP_SINK,
260 g_param_spec_object ("sink", "Sink",
261 "The sink element (or element chain) to use (NULL = default filesink)",
262 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
265 * GstSplitMuxSink::format-location:
266 * @splitmux: the #GstSplitMuxSink
267 * @fragment_id: the sequence number of the file to be created
269 * Returns: the location to be used for the next output file
271 signals[SIGNAL_FORMAT_LOCATION] =
272 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
273 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
276 * GstSplitMuxSink::format-location-full:
277 * @splitmux: the #GstSplitMuxSink
278 * @fragment_id: the sequence number of the file to be created
279 * @first_sample: A #GstSample containing the first buffer
280 * from the reference stream in the new file
282 * Returns: the location to be used for the next output file
284 signals[SIGNAL_FORMAT_LOCATION_FULL] =
285 g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
286 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
291 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
293 g_mutex_init (&splitmux->lock);
294 g_cond_init (&splitmux->input_cond);
295 g_cond_init (&splitmux->output_cond);
296 g_queue_init (&splitmux->out_cmd_q);
298 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
299 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
300 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
301 splitmux->max_files = DEFAULT_MAX_FILES;
302 splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
303 splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
305 splitmux->threshold_timecode_str = NULL;
307 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
311 gst_splitmux_reset (GstSplitMuxSink * splitmux)
313 if (splitmux->muxer) {
314 gst_element_set_locked_state (splitmux->muxer, TRUE);
315 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
316 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
318 if (splitmux->active_sink) {
319 gst_element_set_locked_state (splitmux->active_sink, TRUE);
320 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
321 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
324 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
328 gst_splitmux_sink_dispose (GObject * object)
330 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
332 /* Calling parent dispose invalidates all child pointers */
333 splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
335 G_OBJECT_CLASS (parent_class)->dispose (object);
339 gst_splitmux_sink_finalize (GObject * object)
341 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
342 g_cond_clear (&splitmux->input_cond);
343 g_cond_clear (&splitmux->output_cond);
344 g_mutex_clear (&splitmux->lock);
345 g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
346 g_queue_clear (&splitmux->out_cmd_q);
348 if (splitmux->provided_sink)
349 gst_object_unref (splitmux->provided_sink);
350 if (splitmux->provided_muxer)
351 gst_object_unref (splitmux->provided_muxer);
353 if (splitmux->threshold_timecode_str)
354 g_free (splitmux->threshold_timecode_str);
356 g_free (splitmux->location);
358 /* Make sure to free any un-released contexts */
359 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
360 g_list_free (splitmux->contexts);
362 G_OBJECT_CLASS (parent_class)->finalize (object);
366 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
367 const GValue * value, GParamSpec * pspec)
369 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
373 GST_OBJECT_LOCK (splitmux);
374 g_free (splitmux->location);
375 splitmux->location = g_value_dup_string (value);
376 GST_OBJECT_UNLOCK (splitmux);
379 case PROP_MAX_SIZE_BYTES:
380 GST_OBJECT_LOCK (splitmux);
381 splitmux->threshold_bytes = g_value_get_uint64 (value);
382 GST_OBJECT_UNLOCK (splitmux);
384 case PROP_MAX_SIZE_TIME:
385 GST_OBJECT_LOCK (splitmux);
386 splitmux->threshold_time = g_value_get_uint64 (value);
387 GST_OBJECT_UNLOCK (splitmux);
389 case PROP_MAX_SIZE_TIMECODE:
390 GST_OBJECT_LOCK (splitmux);
391 splitmux->threshold_timecode_str = g_value_dup_string (value);
392 GST_OBJECT_UNLOCK (splitmux);
394 case PROP_SEND_KEYFRAME_REQUESTS:
395 GST_OBJECT_LOCK (splitmux);
396 splitmux->send_keyframe_requests = g_value_get_boolean (value);
397 GST_OBJECT_UNLOCK (splitmux);
400 GST_OBJECT_LOCK (splitmux);
401 splitmux->max_files = g_value_get_uint (value);
402 GST_OBJECT_UNLOCK (splitmux);
404 case PROP_MUXER_OVERHEAD:
405 GST_OBJECT_LOCK (splitmux);
406 splitmux->mux_overhead = g_value_get_double (value);
407 GST_OBJECT_UNLOCK (splitmux);
410 GST_OBJECT_LOCK (splitmux);
411 if (splitmux->provided_sink)
412 gst_object_unref (splitmux->provided_sink);
413 splitmux->provided_sink = g_value_get_object (value);
414 gst_object_ref_sink (splitmux->provided_sink);
415 GST_OBJECT_UNLOCK (splitmux);
418 GST_OBJECT_LOCK (splitmux);
419 if (splitmux->provided_muxer)
420 gst_object_unref (splitmux->provided_muxer);
421 splitmux->provided_muxer = g_value_get_object (value);
422 gst_object_ref_sink (splitmux->provided_muxer);
423 GST_OBJECT_UNLOCK (splitmux);
426 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
432 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
433 GValue * value, GParamSpec * pspec)
435 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
439 GST_OBJECT_LOCK (splitmux);
440 g_value_set_string (value, splitmux->location);
441 GST_OBJECT_UNLOCK (splitmux);
443 case PROP_MAX_SIZE_BYTES:
444 GST_OBJECT_LOCK (splitmux);
445 g_value_set_uint64 (value, splitmux->threshold_bytes);
446 GST_OBJECT_UNLOCK (splitmux);
448 case PROP_MAX_SIZE_TIME:
449 GST_OBJECT_LOCK (splitmux);
450 g_value_set_uint64 (value, splitmux->threshold_time);
451 GST_OBJECT_UNLOCK (splitmux);
453 case PROP_MAX_SIZE_TIMECODE:
454 GST_OBJECT_LOCK (splitmux);
455 g_value_set_string (value, splitmux->threshold_timecode_str);
456 GST_OBJECT_UNLOCK (splitmux);
458 case PROP_SEND_KEYFRAME_REQUESTS:
459 GST_OBJECT_LOCK (splitmux);
460 g_value_set_boolean (value, splitmux->send_keyframe_requests);
461 GST_OBJECT_UNLOCK (splitmux);
464 GST_OBJECT_LOCK (splitmux);
465 g_value_set_uint (value, splitmux->max_files);
466 GST_OBJECT_UNLOCK (splitmux);
468 case PROP_MUXER_OVERHEAD:
469 GST_OBJECT_LOCK (splitmux);
470 g_value_set_double (value, splitmux->mux_overhead);
471 GST_OBJECT_UNLOCK (splitmux);
474 GST_OBJECT_LOCK (splitmux);
475 g_value_set_object (value, splitmux->provided_sink);
476 GST_OBJECT_UNLOCK (splitmux);
479 GST_OBJECT_LOCK (splitmux);
480 g_value_set_object (value, splitmux->provided_muxer);
481 GST_OBJECT_UNLOCK (splitmux);
484 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
489 /* Convenience function */
490 static inline GstClockTimeDiff
491 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
493 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
495 if (GST_CLOCK_TIME_IS_VALID (val)) {
497 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
507 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
511 ctx = g_new0 (MqStreamCtx, 1);
512 g_atomic_int_set (&ctx->refcount, 1);
513 ctx->splitmux = splitmux;
514 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
515 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
516 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
517 g_queue_init (&ctx->queued_bufs);
522 mq_stream_ctx_free (MqStreamCtx * ctx)
525 g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
526 gst_element_set_locked_state (ctx->q, TRUE);
527 gst_element_set_state (ctx->q, GST_STATE_NULL);
528 gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
529 gst_object_unref (ctx->q);
531 gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
532 gst_object_unref (ctx->sinkpad);
533 gst_object_unref (ctx->srcpad);
534 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
535 g_queue_clear (&ctx->queued_bufs);
540 mq_stream_ctx_unref (MqStreamCtx * ctx)
542 if (g_atomic_int_dec_and_test (&ctx->refcount))
543 mq_stream_ctx_free (ctx);
547 mq_stream_ctx_ref (MqStreamCtx * ctx)
549 g_atomic_int_inc (&ctx->refcount);
553 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
555 ctx->sink_pad_block_id = 0;
556 mq_stream_ctx_unref (ctx);
560 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
562 ctx->src_pad_block_id = 0;
563 mq_stream_ctx_unref (ctx);
567 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
569 gchar *location = NULL;
571 const gchar *msg_name = opened ?
572 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
574 g_object_get (splitmux->sink, "location", &location, NULL);
576 msg = gst_message_new_element (GST_OBJECT (splitmux),
577 gst_structure_new (msg_name,
578 "location", G_TYPE_STRING, location,
579 "running-time", GST_TYPE_CLOCK_TIME,
580 splitmux->reference_ctx->out_running_time, NULL));
581 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
586 /* Called with lock held, drops the lock to send EOS to the
590 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
595 eos = gst_event_new_eos ();
596 pad = gst_pad_get_peer (ctx->srcpad);
600 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
601 GST_SPLITMUX_UNLOCK (splitmux);
602 gst_pad_send_event (pad, eos);
603 GST_SPLITMUX_LOCK (splitmux);
605 gst_object_unref (pad);
608 /* Called with splitmux lock held to check if this output
609 * context needs to sleep to wait for the release of the
610 * next GOP, or to send EOS to close out the current file
613 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
616 /* When first starting up, the reference stream has to output
617 * the first buffer to prepare the muxer and sink */
618 gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
621 || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
624 GST_LOG_OBJECT (ctx->srcpad,
625 "Checking running time %" GST_STIME_FORMAT " against max %"
626 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
627 GST_STIME_ARGS (splitmux->max_out_running_time));
630 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
631 ctx->out_running_time < splitmux->max_out_running_time) {
635 switch (splitmux->output_state) {
636 case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
637 /* We only get here if we've finished outputting a GOP and need to know
639 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
640 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
643 case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
644 /* We've reached the max out running_time to get here, so end this file now */
645 if (ctx->out_eos == FALSE) {
646 send_eos (splitmux, ctx);
650 case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
651 if (ctx->is_reference) {
652 /* Special handling on the reference ctx to start new fragments
653 * and collect commands from the command queue */
654 /* drops the splitmux lock briefly: */
655 start_next_fragment (splitmux, ctx);
660 case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
662 SplitMuxOutputCommand *cmd =
663 g_queue_pop_tail (&splitmux->out_cmd_q);
665 /* If we pop the last command, we need to make our queues bigger */
666 if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
667 grow_blocked_queues (splitmux);
669 if (cmd->start_new_fragment) {
670 GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
671 splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
673 GST_DEBUG_OBJECT (splitmux,
674 "Got new output cmd for time %" GST_STIME_FORMAT,
675 GST_STIME_ARGS (cmd->max_output_ts));
677 /* Extend the output range immediately */
678 splitmux->max_out_running_time = cmd->max_output_ts;
679 splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
681 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
683 out_cmd_buf_free (cmd);
686 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
688 } while (splitmux->output_state ==
689 SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
690 /* loop and re-check the state */
693 case SPLITMUX_OUTPUT_STATE_STOPPED:
698 GST_INFO_OBJECT (ctx->srcpad,
699 "Sleeping for running time %"
700 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
701 GST_STIME_ARGS (ctx->out_running_time),
702 GST_STIME_ARGS (splitmux->max_out_running_time));
703 GST_SPLITMUX_WAIT_OUTPUT (splitmux);
704 GST_INFO_OBJECT (ctx->srcpad,
705 "Woken for new max running time %" GST_STIME_FORMAT,
706 GST_STIME_ARGS (splitmux->max_out_running_time));
712 calculate_next_max_timecode (GstSplitMuxSink * splitmux,
713 const GstVideoTimeCode * cur_tc)
715 GstVideoTimeCode *target_tc;
716 GstVideoTimeCodeInterval *tc_inter;
717 GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
719 if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
720 return GST_CLOCK_TIME_NONE;
723 gst_video_time_code_interval_new_from_string
724 (splitmux->threshold_timecode_str);
725 target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
726 gst_video_time_code_interval_free (tc_inter);
729 target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
730 cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
732 /* Add fragment_start_time, accounting for wraparound */
733 if (target_tc_time >= cur_tc_time) {
735 target_tc_time - cur_tc_time + splitmux->fragment_start_time;
737 GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
740 day_in_ns - cur_tc_time + target_tc_time +
741 splitmux->fragment_start_time;
743 GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
744 " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
745 GST_TIME_ARGS (cur_tc_time));
746 gst_video_time_code_free (target_tc);
748 return next_max_tc_time;
752 request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
755 GstClockTime target_time;
756 gboolean timecode_based = FALSE;
758 splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
759 if (splitmux->threshold_timecode_str) {
760 GstVideoTimeCodeMeta *tc_meta;
762 if (buffer != NULL) {
763 tc_meta = gst_buffer_get_video_time_code_meta (buffer);
765 splitmux->next_max_tc_time =
766 calculate_next_max_timecode (splitmux, &tc_meta->tc);
767 timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
770 /* This can happen in the presence of GAP events that trigger
771 * a new fragment start */
772 GST_WARNING_OBJECT (splitmux,
773 "No buffer available to calculate next timecode");
777 if (splitmux->send_keyframe_requests == FALSE
778 || (splitmux->threshold_time == 0 && !timecode_based)
779 || splitmux->threshold_bytes != 0)
782 if (timecode_based) {
783 /* We might have rounding errors: aim slightly earlier */
784 target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
786 target_time = splitmux->fragment_start_time + splitmux->threshold_time;
788 ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
789 GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
790 GST_TIME_ARGS (target_time));
791 return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
794 static GstPadProbeReturn
795 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
797 GstSplitMuxSink *splitmux = ctx->splitmux;
798 MqStreamBuf *buf_info = NULL;
800 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
802 /* FIXME: Handle buffer lists, until then make it clear they won't work */
803 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
804 g_warning ("Buffer list handling not implemented");
805 return GST_PAD_PROBE_DROP;
807 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
808 GstEvent *event = gst_pad_probe_info_get_event (info);
809 gboolean locked = FALSE;
811 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
813 switch (GST_EVENT_TYPE (event)) {
814 case GST_EVENT_SEGMENT:
815 gst_event_copy_segment (event, &ctx->out_segment);
817 case GST_EVENT_FLUSH_STOP:
818 GST_SPLITMUX_LOCK (splitmux);
820 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
821 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
822 g_queue_clear (&ctx->queued_bufs);
823 ctx->flushing = FALSE;
825 case GST_EVENT_FLUSH_START:
826 GST_SPLITMUX_LOCK (splitmux);
828 GST_LOG_OBJECT (pad, "Flush start");
829 ctx->flushing = TRUE;
830 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
833 GST_SPLITMUX_LOCK (splitmux);
835 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
841 GstClockTimeDiff rtime;
843 gst_event_parse_gap (event, &gap_ts, NULL);
844 if (gap_ts == GST_CLOCK_TIME_NONE)
847 GST_SPLITMUX_LOCK (splitmux);
850 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
853 /* When we get a gap event on the
854 * reference stream and we're trying to open a
855 * new file, we need to store it until we get
856 * the buffer afterwards
858 if (ctx->is_reference &&
859 (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
860 GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
861 gst_event_replace (&ctx->pending_gap, event);
862 GST_SPLITMUX_UNLOCK (splitmux);
863 return GST_PAD_PROBE_HANDLED;
866 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
868 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
869 GST_STIME_ARGS (rtime));
871 if (rtime != GST_CLOCK_STIME_NONE) {
872 ctx->out_running_time = rtime;
873 complete_or_wait_on_out (splitmux, ctx);
877 case GST_EVENT_CUSTOM_DOWNSTREAM:{
878 const GstStructure *s;
879 GstClockTimeDiff ts = 0;
881 s = gst_event_get_structure (event);
882 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
885 gst_structure_get_int64 (s, "timestamp", &ts);
887 GST_SPLITMUX_LOCK (splitmux);
890 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
892 ctx->out_running_time = ts;
893 if (!ctx->is_reference)
894 complete_or_wait_on_out (splitmux, ctx);
895 GST_SPLITMUX_UNLOCK (splitmux);
896 return GST_PAD_PROBE_DROP;
902 /* We need to make sure events aren't passed
903 * until the muxer / sink are ready for it */
905 GST_SPLITMUX_LOCK (splitmux);
906 if (!ctx->is_reference)
907 complete_or_wait_on_out (splitmux, ctx);
908 GST_SPLITMUX_UNLOCK (splitmux);
910 return GST_PAD_PROBE_PASS;
913 /* Allow everything through until the configured next stopping point */
914 GST_SPLITMUX_LOCK (splitmux);
916 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
917 if (buf_info == NULL)
918 /* Can only happen due to a poorly timed flush */
921 /* If we have popped a keyframe, decrement the queued_gop count */
922 if (buf_info->keyframe && splitmux->queued_keyframes > 0)
923 splitmux->queued_keyframes--;
925 ctx->out_running_time = buf_info->run_ts;
926 ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
928 GST_LOG_OBJECT (splitmux,
929 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
930 " size %" G_GUINT64_FORMAT,
931 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
933 complete_or_wait_on_out (splitmux, ctx);
935 splitmux->muxed_out_bytes += buf_info->buf_size;
937 #ifndef GST_DISABLE_GST_DEBUG
939 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
940 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
941 " run ts %" GST_STIME_FORMAT, buf,
942 GST_STIME_ARGS (ctx->out_running_time));
946 ctx->cur_out_buffer = NULL;
947 GST_SPLITMUX_UNLOCK (splitmux);
949 /* pending_gap is protected by the STREAM lock */
950 if (ctx->pending_gap) {
951 /* If we previously stored a gap event, send it now */
952 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
954 GST_DEBUG_OBJECT (splitmux,
955 "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
957 gst_pad_send_event (peer, ctx->pending_gap);
958 ctx->pending_gap = NULL;
960 gst_object_unref (peer);
963 mq_stream_buf_free (buf_info);
965 return GST_PAD_PROBE_PASS;
968 GST_SPLITMUX_UNLOCK (splitmux);
969 return GST_PAD_PROBE_DROP;
973 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
975 return gst_pad_send_event (peer, gst_event_ref (*event));
979 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
981 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
983 gst_pad_sticky_events_foreach (ctx->srcpad,
984 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
986 /* Clear EOS flag if not actually EOS */
987 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
989 gst_object_unref (peer);
992 /* Called with lock held when a fragment
993 * reaches EOS and it is time to restart
997 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
999 GstElement *muxer, *sink;
1001 /* 1 change to new file */
1002 splitmux->switching_fragment = TRUE;
1004 /* We need to drop the splitmux lock to acquire the state lock
1005 * here and ensure there's no racy state change going on elsewhere */
1006 muxer = gst_object_ref (splitmux->muxer);
1007 sink = gst_object_ref (splitmux->active_sink);
1009 GST_SPLITMUX_UNLOCK (splitmux);
1010 GST_STATE_LOCK (splitmux);
1012 gst_element_set_locked_state (muxer, TRUE);
1013 gst_element_set_locked_state (sink, TRUE);
1014 gst_element_set_state (muxer, GST_STATE_NULL);
1015 gst_element_set_state (sink, GST_STATE_NULL);
1017 GST_SPLITMUX_LOCK (splitmux);
1018 set_next_filename (splitmux, ctx);
1019 GST_SPLITMUX_UNLOCK (splitmux);
1021 gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
1022 gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
1023 gst_element_set_locked_state (muxer, FALSE);
1024 gst_element_set_locked_state (sink, FALSE);
1026 gst_object_unref (sink);
1027 gst_object_unref (muxer);
1029 GST_SPLITMUX_LOCK (splitmux);
1030 GST_STATE_UNLOCK (splitmux);
1031 splitmux->switching_fragment = FALSE;
1032 do_async_done (splitmux);
1034 splitmux->ready_for_output = TRUE;
1036 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
1038 send_fragment_opened_closed_msg (splitmux, TRUE);
1040 /* FIXME: Is this always the correct next state? */
1041 splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
1042 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1046 bus_handler (GstBin * bin, GstMessage * message)
1048 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
1050 switch (GST_MESSAGE_TYPE (message)) {
1051 case GST_MESSAGE_EOS:
1052 /* If the state is draining out the current file, drop this EOS */
1053 GST_SPLITMUX_LOCK (splitmux);
1055 send_fragment_opened_closed_msg (splitmux, FALSE);
1057 if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
1058 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
1059 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
1060 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1062 gst_message_unref (message);
1063 GST_SPLITMUX_UNLOCK (splitmux);
1066 GST_DEBUG_OBJECT (splitmux,
1067 "Passing EOS message. Output state %d max_out_running_time %"
1068 GST_STIME_FORMAT, splitmux->output_state,
1069 GST_STIME_ARGS (splitmux->max_out_running_time));
1071 GST_SPLITMUX_UNLOCK (splitmux);
1073 case GST_MESSAGE_ASYNC_START:
1074 case GST_MESSAGE_ASYNC_DONE:
1075 /* Ignore state changes from our children while switching */
1076 if (splitmux->switching_fragment) {
1077 if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
1078 || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
1079 GST_LOG_OBJECT (splitmux,
1080 "Ignoring state change from child %" GST_PTR_FORMAT
1081 " while switching", GST_MESSAGE_SRC (message));
1082 gst_message_unref (message);
1091 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1095 ctx_set_unblock (MqStreamCtx * ctx)
1097 ctx->need_unblock = TRUE;
1100 /* Called with splitmux lock held */
1101 /* Called when entering ProcessingCompleteGop state
1102 * Assess if mq contents overflowed the current file
1103 * -> If yes, need to switch to new file
1104 * -> if no, set max_out_running_time to let this GOP in and
1105 * go to COLLECTING_GOP_START state
1108 handle_gathered_gop (GstSplitMuxSink * splitmux)
1110 guint64 queued_bytes;
1111 GstClockTimeDiff queued_time = 0;
1112 GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
1113 SplitMuxOutputCommand *cmd;
1115 /* Assess if the multiqueue contents overflowed the current file */
1116 /* When considering if a newly gathered GOP overflows
1117 * the time limit for the file, only consider the running time of the
1118 * reference stream. Other streams might have run ahead a little bit,
1119 * but extra pieces won't be released to the muxer beyond the reference
1120 * stream cut-off anyway - so it forms the limit. */
1121 queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
1122 queued_time = splitmux->reference_ctx->in_running_time;
1124 GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
1126 g_assert (queued_time >= splitmux->fragment_start_time);
1128 queued_time -= splitmux->fragment_start_time;
1130 /* Expand queued bytes estimate by muxer overhead */
1131 queued_bytes += (queued_bytes * splitmux->mux_overhead);
1133 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
1134 " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
1135 if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
1136 GST_LOG_OBJECT (splitmux,
1137 "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
1138 GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
1139 GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
1142 /* Check for overrun - have we output at least one byte and overrun
1143 * either threshold? */
1144 /* Timecode-based threshold accounts for possible rounding errors:
1145 * 5us should be bigger than all possible rounding errors but nowhere near
1146 * big enough to skip to another frame */
1147 if ((splitmux->fragment_total_bytes > 0 &&
1148 ((splitmux->threshold_bytes > 0 &&
1149 queued_bytes > splitmux->threshold_bytes) ||
1150 (splitmux->threshold_time > 0 &&
1151 queued_time > splitmux->threshold_time) ||
1152 (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
1153 splitmux->reference_ctx->in_running_time >
1154 splitmux->next_max_tc_time + 5 * GST_USECOND)))) {
1156 /* Tell the output side to start a new fragment */
1157 GST_INFO_OBJECT (splitmux,
1158 "This GOP (dur %" GST_STIME_FORMAT
1159 ") would overflow the fragment, Sending start_new_fragment cmd",
1160 GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
1161 splitmux->gop_start_time));
1162 cmd = out_cmd_buf_new ();
1163 cmd->start_new_fragment = TRUE;
1164 g_queue_push_head (&splitmux->out_cmd_q, cmd);
1165 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1167 new_out_ts = splitmux->reference_ctx->in_running_time;
1168 splitmux->fragment_start_time = splitmux->gop_start_time;
1169 splitmux->fragment_total_bytes = 0;
1171 if (request_next_keyframe (splitmux,
1172 splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
1173 GST_WARNING_OBJECT (splitmux,
1174 "Could not request a keyframe. Files may not split at the exact location they should");
1176 gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
1179 /* And set up to collect the next GOP */
1180 if (!splitmux->reference_ctx->in_eos) {
1181 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
1182 splitmux->gop_start_time = new_out_ts;
1184 /* This is probably already the current state, but just in case: */
1185 splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
1186 new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */
1189 /* And wake all input contexts to send a wake-up event */
1190 g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
1191 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1193 /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
1194 splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
1196 if (splitmux->gop_total_bytes > 0) {
1197 GST_LOG_OBJECT (splitmux,
1198 "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
1199 " time %" GST_STIME_FORMAT,
1200 splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
1202 /* Send this GOP to the output command queue */
1203 cmd = out_cmd_buf_new ();
1204 cmd->start_new_fragment = FALSE;
1205 cmd->max_output_ts = new_out_ts;
1206 GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
1207 GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
1208 g_queue_push_head (&splitmux->out_cmd_q, cmd);
1210 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
1213 splitmux->gop_total_bytes = 0;
1216 /* Called with splitmux lock held */
1217 /* Called from each input pad when it is has all the pieces
1218 * for a GOP or EOS, starting with the reference pad which has set the
1219 * splitmux->max_in_running_time
1222 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1227 /* On ENDING_FILE, the reference stream sends a command to start a new
1228 * fragment, then releases the GOP for output in the new fragment.
1229 * If somes streams received no buffer during the last GOP that overran,
1230 * because its next buffer has a timestamp bigger than
1231 * ctx->max_in_running_time, its queue is empty. In that case the only
1232 * way to wakeup the output thread is by injecting an event in the
1233 * queue. This usually happen with subtitle streams.
1234 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1235 if (ctx->need_unblock) {
1236 GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
1237 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1238 GST_EVENT_TYPE_SERIALIZED,
1239 gst_structure_new ("splitmuxsink-unblock", "timestamp",
1240 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
1242 GST_SPLITMUX_UNLOCK (splitmux);
1243 gst_pad_send_event (ctx->sinkpad, event);
1244 GST_SPLITMUX_LOCK (splitmux);
1246 ctx->need_unblock = FALSE;
1247 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1248 /* state may have changed while we were unlocked. Loop again if so */
1249 if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
1253 if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
1254 gboolean ready = TRUE;
1256 /* Iterate each pad, and check that the input running time is at least
1257 * up to the reference running time, and if so handle the collected GOP */
1258 GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
1259 GST_STIME_FORMAT " ctx %p",
1260 GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
1261 for (cur = g_list_first (splitmux->contexts); cur != NULL;
1262 cur = g_list_next (cur)) {
1263 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1265 GST_LOG_OBJECT (splitmux,
1266 "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
1267 " EOS %d", tmpctx, tmpctx->srcpad,
1268 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
1270 if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
1271 tmpctx->in_running_time < splitmux->max_in_running_time &&
1273 GST_LOG_OBJECT (splitmux,
1274 "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
1275 tmpctx, tmpctx->srcpad);
1281 GST_DEBUG_OBJECT (splitmux,
1282 "Collected GOP is complete. Processing (ctx %p)", ctx);
1283 /* All pads have a complete GOP, release it into the multiqueue */
1284 handle_gathered_gop (splitmux);
1288 /* If upstream reached EOS we are not expecting more data, no need to wait
1293 /* Some pad is not yet ready, or GOP is being pushed
1294 * either way, sleep and wait to get woken */
1295 while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
1297 (ctx->in_running_time >= splitmux->max_in_running_time) &&
1298 (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
1300 GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
1301 GST_SPLITMUX_WAIT_INPUT (splitmux);
1302 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
1306 static GstPadProbeReturn
1307 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1309 GstSplitMuxSink *splitmux = ctx->splitmux;
1311 MqStreamBuf *buf_info = NULL;
1313 gboolean loop_again;
1314 gboolean keyframe = FALSE;
1316 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1318 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1319 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1320 g_warning ("Buffer list handling not implemented");
1321 return GST_PAD_PROBE_DROP;
1323 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1324 GstEvent *event = gst_pad_probe_info_get_event (info);
1326 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
1328 switch (GST_EVENT_TYPE (event)) {
1329 case GST_EVENT_SEGMENT:
1330 gst_event_copy_segment (event, &ctx->in_segment);
1332 case GST_EVENT_FLUSH_STOP:
1333 GST_SPLITMUX_LOCK (splitmux);
1334 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1335 ctx->in_eos = FALSE;
1336 ctx->in_running_time = GST_CLOCK_STIME_NONE;
1337 GST_SPLITMUX_UNLOCK (splitmux);
1340 GST_SPLITMUX_LOCK (splitmux);
1343 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
1346 if (ctx->is_reference) {
1347 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1348 /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
1349 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
1350 /* Wake up other input pads to collect this GOP */
1351 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1352 check_completed_gop (splitmux, ctx);
1353 } else if (splitmux->input_state ==
1354 SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
1355 /* If we are waiting for a GOP to be completed (ie, for aux
1356 * pads to catch up), then this pad is complete, so check
1357 * if the whole GOP is.
1359 check_completed_gop (splitmux, ctx);
1361 GST_SPLITMUX_UNLOCK (splitmux);
1363 case GST_EVENT_GAP:{
1364 GstClockTime gap_ts;
1365 GstClockTimeDiff rtime;
1367 gst_event_parse_gap (event, &gap_ts, NULL);
1368 if (gap_ts == GST_CLOCK_TIME_NONE)
1371 GST_SPLITMUX_LOCK (splitmux);
1373 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
1375 rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
1377 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
1378 GST_STIME_ARGS (rtime));
1380 if (ctx->is_reference
1381 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
1382 splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
1383 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1384 GST_STIME_ARGS (splitmux->fragment_start_time));
1385 /* Also take this as the first start time when starting up,
1386 * so that we start counting overflow from the first frame */
1387 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1388 splitmux->max_in_running_time = splitmux->fragment_start_time;
1391 GST_SPLITMUX_UNLOCK (splitmux);
1397 return GST_PAD_PROBE_PASS;
1400 buf = gst_pad_probe_info_get_buffer (info);
1401 buf_info = mq_stream_buf_new ();
1403 if (GST_BUFFER_PTS_IS_VALID (buf))
1404 ts = GST_BUFFER_PTS (buf);
1406 ts = GST_BUFFER_DTS (buf);
1408 GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
1410 GST_SPLITMUX_LOCK (splitmux);
1412 if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
1415 /* If this buffer has a timestamp, advance the input timestamp of the
1417 if (GST_CLOCK_TIME_IS_VALID (ts)) {
1418 GstClockTimeDiff running_time =
1419 my_segment_to_running_time (&ctx->in_segment, ts);
1421 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
1422 GST_STIME_ARGS (running_time));
1424 if (GST_CLOCK_STIME_IS_VALID (running_time)
1425 && running_time > ctx->in_running_time)
1426 ctx->in_running_time = running_time;
1429 /* Try to make sure we have a valid running time */
1430 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
1431 ctx->in_running_time =
1432 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
1435 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
1436 GST_STIME_ARGS (ctx->in_running_time));
1438 buf_info->run_ts = ctx->in_running_time;
1439 buf_info->buf_size = gst_buffer_get_size (buf);
1440 buf_info->duration = GST_BUFFER_DURATION (buf);
1442 /* initialize fragment_start_time */
1443 if (ctx->is_reference
1444 && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
1445 splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
1446 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1447 GST_STIME_ARGS (splitmux->fragment_start_time));
1448 gst_buffer_replace (&ctx->prev_in_keyframe, buf);
1450 /* Also take this as the first start time when starting up,
1451 * so that we start counting overflow from the first frame */
1452 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1453 splitmux->max_in_running_time = splitmux->fragment_start_time;
1454 if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
1455 GST_WARNING_OBJECT (splitmux,
1456 "Could not request a keyframe. Files may not split at the exact location they should");
1458 gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
1461 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
1462 " total GOP bytes %" G_GUINT64_FORMAT,
1463 GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
1470 switch (splitmux->input_state) {
1471 case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
1472 if (ctx->is_reference) {
1473 /* If a keyframe, we have a complete GOP */
1474 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1475 !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
1476 splitmux->max_in_running_time >= ctx->in_running_time) {
1477 /* Pass this buffer through */
1479 /* Allow other input pads to catch up to here too */
1480 splitmux->max_in_running_time = ctx->in_running_time;
1481 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1484 GST_INFO_OBJECT (pad,
1485 "Have keyframe with running time %" GST_STIME_FORMAT,
1486 GST_STIME_ARGS (ctx->in_running_time));
1488 splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
1489 splitmux->max_in_running_time = ctx->in_running_time;
1490 /* Wake up other input pads to collect this GOP */
1491 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
1492 check_completed_gop (splitmux, ctx);
1493 /* Store this new keyframe to remember the start of GOP */
1494 gst_buffer_replace (&ctx->prev_in_keyframe, buf);
1496 /* Pass this buffer if the reference ctx is far enough ahead */
1497 if (ctx->in_running_time < splitmux->max_in_running_time) {
1502 /* We're still waiting for a keyframe on the reference pad, sleep */
1503 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1504 GST_SPLITMUX_WAIT_INPUT (splitmux);
1505 GST_LOG_OBJECT (pad,
1506 "Done sleeping for GOP start input state now %d",
1507 splitmux->input_state);
1510 case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
1511 /* We're collecting a GOP. If this is the reference context,
1512 * we need to check if this is a keyframe that marks the start
1513 * of the next GOP. If it is, it marks the end of the GOP we're
1514 * collecting, so sleep and wait until all the other pads also
1515 * reach that timestamp - at which point, we have an entire GOP
1516 * and either go to ENDING_FILE or release this GOP to the muxer and
1517 * go back to COLLECT_GOP_START. */
1519 /* If we overran the target timestamp, it might be time to process
1520 * the GOP, otherwise bail out for more data
1522 GST_LOG_OBJECT (pad,
1523 "Checking TS %" GST_STIME_FORMAT " against max %"
1524 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
1525 GST_STIME_ARGS (splitmux->max_in_running_time));
1527 if (ctx->in_running_time < splitmux->max_in_running_time) {
1532 GST_LOG_OBJECT (pad,
1533 "Collected last packet of GOP. Checking other pads");
1534 check_completed_gop (splitmux, ctx);
1537 case SPLITMUX_INPUT_STATE_FINISHING_UP:
1548 splitmux->queued_keyframes++;
1549 buf_info->keyframe = TRUE;
1552 /* Update total input byte counter for overflow detect */
1553 splitmux->gop_total_bytes += buf_info->buf_size;
1555 /* Now add this buffer to the queue just before returning */
1556 g_queue_push_head (&ctx->queued_bufs, buf_info);
1558 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1559 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
1561 GST_SPLITMUX_UNLOCK (splitmux);
1562 return GST_PAD_PROBE_PASS;
1565 GST_SPLITMUX_UNLOCK (splitmux);
1567 mq_stream_buf_free (buf_info);
1568 return GST_PAD_PROBE_PASS;
1572 grow_blocked_queues (GstSplitMuxSink * splitmux)
1576 /* Scan other queues for full-ness and grow them */
1577 for (cur = g_list_first (splitmux->contexts);
1578 cur != NULL; cur = g_list_next (cur)) {
1579 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1581 guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
1583 g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
1584 GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
1586 if (cur_len >= cur_limit) {
1587 cur_limit = cur_len + 1;
1588 GST_DEBUG_OBJECT (tmpctx->q,
1589 "Queue overflowed and needs enlarging. Growing to %u buffers",
1591 g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
1597 handle_q_underrun (GstElement * q, gpointer user_data)
1599 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
1600 GstSplitMuxSink *splitmux = ctx->splitmux;
1602 GST_SPLITMUX_LOCK (splitmux);
1603 GST_DEBUG_OBJECT (q,
1604 "Queue reported underrun with %d keyframes and %d cmds enqueued",
1605 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
1606 grow_blocked_queues (splitmux);
1607 GST_SPLITMUX_UNLOCK (splitmux);
1611 handle_q_overrun (GstElement * q, gpointer user_data)
1613 MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
1614 GstSplitMuxSink *splitmux = ctx->splitmux;
1615 gboolean allow_grow = FALSE;
1617 GST_SPLITMUX_LOCK (splitmux);
1618 GST_DEBUG_OBJECT (q,
1619 "Queue reported overrun with %d keyframes and %d cmds enqueued",
1620 splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
1622 if (splitmux->queued_keyframes < 2) {
1623 /* Less than a full GOP queued, grow the queue */
1625 } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
1628 /* If another queue is starved, grow */
1630 for (cur = g_list_first (splitmux->contexts);
1631 cur != NULL; cur = g_list_next (cur)) {
1632 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1633 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1638 GST_SPLITMUX_UNLOCK (splitmux);
1643 g_object_get (q, "max-size-buffers", &cur_limit, NULL);
1646 GST_DEBUG_OBJECT (q,
1647 "Queue overflowed and needs enlarging. Growing to %u buffers",
1650 g_object_set (q, "max-size-buffers", cur_limit, NULL);
1655 gst_splitmux_sink_request_new_pad (GstElement * element,
1656 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1658 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1659 GstPadTemplate *mux_template = NULL;
1662 GstPad *q_sink = NULL, *q_src = NULL;
1664 gboolean is_video = FALSE;
1667 GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1669 GST_SPLITMUX_LOCK (splitmux);
1670 if (!create_muxer (splitmux))
1673 if (templ->name_template) {
1674 if (g_str_equal (templ->name_template, "video")) {
1675 if (splitmux->have_video)
1676 goto already_have_video;
1678 /* FIXME: Look for a pad template with matching caps, rather than by name */
1680 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1681 (splitmux->muxer), "video_%u");
1683 /* Fallback to find sink pad templates named 'video' (flvmux) */
1684 if (!mux_template) {
1686 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1687 (splitmux->muxer), "video");
1693 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1694 (splitmux->muxer), templ->name_template);
1696 /* Fallback to find sink pad templates named 'audio' (flvmux) */
1697 if (!mux_template) {
1699 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1700 (splitmux->muxer), "audio");
1703 if (mux_template == NULL) {
1704 /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1706 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1707 (splitmux->muxer), "sink_%d");
1711 res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1716 gname = g_strdup ("video");
1717 else if (name == NULL)
1718 gname = gst_pad_get_name (res);
1720 gname = g_strdup (name);
1722 if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
1725 gst_element_set_state (q, GST_STATE_TARGET (splitmux));
1727 g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
1728 "max-size-buffers", 5, NULL);
1730 q_sink = gst_element_get_static_pad (q, "sink");
1731 q_src = gst_element_get_static_pad (q, "src");
1733 if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
1734 gst_element_release_request_pad (splitmux->muxer, res);
1735 gst_object_unref (GST_OBJECT (res));
1739 gst_object_unref (GST_OBJECT (res));
1741 ctx = mq_stream_ctx_new (splitmux);
1742 /* Context holds a ref: */
1743 ctx->q = gst_object_ref (q);
1744 ctx->srcpad = q_src;
1745 ctx->sinkpad = q_sink;
1747 g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
1748 g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
1750 mq_stream_ctx_ref (ctx);
1751 ctx->src_pad_block_id =
1752 gst_pad_add_probe (q_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1753 (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1754 _pad_block_destroy_src_notify);
1755 if (is_video && splitmux->reference_ctx != NULL) {
1756 splitmux->reference_ctx->is_reference = FALSE;
1757 splitmux->reference_ctx = NULL;
1759 if (splitmux->reference_ctx == NULL) {
1760 splitmux->reference_ctx = ctx;
1761 ctx->is_reference = TRUE;
1764 res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
1765 g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1767 mq_stream_ctx_ref (ctx);
1768 ctx->sink_pad_block_id =
1769 gst_pad_add_probe (q_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1770 (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1771 _pad_block_destroy_sink_notify);
1773 GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1774 " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
1776 splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1781 splitmux->have_video = TRUE;
1783 gst_pad_set_active (res, TRUE);
1784 gst_element_add_pad (element, res);
1786 GST_SPLITMUX_UNLOCK (splitmux);
1790 GST_SPLITMUX_UNLOCK (splitmux);
1793 gst_object_unref (q_sink);
1795 gst_object_unref (q_src);
1798 GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
1799 GST_SPLITMUX_UNLOCK (splitmux);
1804 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1806 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1807 GstPad *muxpad = NULL;
1809 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1811 GST_SPLITMUX_LOCK (splitmux);
1813 if (splitmux->muxer == NULL)
1814 goto fail; /* Elements don't exist yet - nothing to release */
1816 GST_INFO_OBJECT (pad, "releasing request pad");
1818 muxpad = gst_pad_get_peer (ctx->srcpad);
1820 /* Remove the context from our consideration */
1821 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1823 if (ctx->sink_pad_block_id)
1824 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1826 if (ctx->src_pad_block_id)
1827 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1829 /* Can release the context now */
1830 mq_stream_ctx_unref (ctx);
1831 if (ctx == splitmux->reference_ctx)
1832 splitmux->reference_ctx = NULL;
1834 /* Release and free the muxer input */
1836 gst_element_release_request_pad (splitmux->muxer, muxpad);
1837 gst_object_unref (muxpad);
1840 if (GST_PAD_PAD_TEMPLATE (pad) &&
1841 g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
1843 splitmux->have_video = FALSE;
1845 gst_element_remove_pad (element, pad);
1847 /* Reset the internal elements only after all request pads are released */
1848 if (splitmux->contexts == NULL)
1849 gst_splitmux_reset (splitmux);
1852 GST_SPLITMUX_UNLOCK (splitmux);
1856 create_element (GstSplitMuxSink * splitmux,
1857 const gchar * factory, const gchar * name, gboolean locked)
1859 GstElement *ret = gst_element_factory_make (factory, name);
1861 g_warning ("Failed to create %s - splitmuxsink will not work", name);
1866 /* Ensure the sink starts in locked state and NULL - it will be changed
1867 * by the filename setting code */
1868 gst_element_set_locked_state (ret, TRUE);
1869 gst_element_set_state (ret, GST_STATE_NULL);
1872 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1873 g_warning ("Could not add %s element - splitmuxsink will not work", name);
1874 gst_object_unref (ret);
1882 create_muxer (GstSplitMuxSink * splitmux)
1884 /* Create internal elements */
1885 if (splitmux->muxer == NULL) {
1886 GstElement *provided_muxer = NULL;
1888 GST_OBJECT_LOCK (splitmux);
1889 if (splitmux->provided_muxer != NULL)
1890 provided_muxer = gst_object_ref (splitmux->provided_muxer);
1891 GST_OBJECT_UNLOCK (splitmux);
1893 if (provided_muxer == NULL) {
1894 if ((splitmux->muxer =
1895 create_element (splitmux, "mp4mux", "muxer", FALSE)) == NULL)
1898 /* Ensure it's not in locked state (we might be reusing an old element) */
1899 gst_element_set_locked_state (provided_muxer, FALSE);
1900 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1901 g_warning ("Could not add muxer element - splitmuxsink will not work");
1902 gst_object_unref (provided_muxer);
1906 splitmux->muxer = provided_muxer;
1907 gst_object_unref (provided_muxer);
1917 find_sink (GstElement * e)
1919 GstElement *res = NULL;
1921 gboolean done = FALSE;
1922 GValue data = { 0, };
1924 if (!GST_IS_BIN (e))
1927 if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
1930 iter = gst_bin_iterate_sinks (GST_BIN (e));
1932 switch (gst_iterator_next (iter, &data)) {
1933 case GST_ITERATOR_OK:
1935 GstElement *child = g_value_get_object (&data);
1936 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1937 "location") != NULL) {
1941 g_value_reset (&data);
1944 case GST_ITERATOR_RESYNC:
1945 gst_iterator_resync (iter);
1947 case GST_ITERATOR_DONE:
1950 case GST_ITERATOR_ERROR:
1951 g_assert_not_reached ();
1955 g_value_unset (&data);
1956 gst_iterator_free (iter);
1962 create_sink (GstSplitMuxSink * splitmux)
1964 GstElement *provided_sink = NULL;
1966 if (splitmux->active_sink == NULL) {
1968 GST_OBJECT_LOCK (splitmux);
1969 if (splitmux->provided_sink != NULL)
1970 provided_sink = gst_object_ref (splitmux->provided_sink);
1971 GST_OBJECT_UNLOCK (splitmux);
1973 if (provided_sink == NULL) {
1974 if ((splitmux->sink =
1975 create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
1977 splitmux->active_sink = splitmux->sink;
1979 /* Ensure the sink starts in locked state and NULL - it will be changed
1980 * by the filename setting code */
1981 gst_element_set_locked_state (provided_sink, TRUE);
1982 gst_element_set_state (provided_sink, GST_STATE_NULL);
1983 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1984 g_warning ("Could not add sink elements - splitmuxsink will not work");
1985 gst_object_unref (provided_sink);
1989 splitmux->active_sink = provided_sink;
1991 /* The bin holds a ref now, we can drop our tmp ref */
1992 gst_object_unref (provided_sink);
1994 /* Find the sink element */
1995 splitmux->sink = find_sink (splitmux->active_sink);
1996 if (splitmux->sink == NULL) {
1998 ("Could not locate sink element in provided sink - splitmuxsink will not work");
2004 if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
2006 /* async child elements are causing state change races and weird
2007 * failures, so let's try and turn that off */
2008 g_object_set (splitmux->sink, "async", FALSE, NULL);
2012 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
2013 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
2024 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
2027 set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
2029 gchar *fname = NULL;
2033 gst_splitmux_sink_ensure_max_files (splitmux);
2035 if (ctx->cur_out_buffer == NULL) {
2036 GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
2039 caps = gst_pad_get_current_caps (ctx->srcpad);
2040 sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
2041 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
2042 splitmux->fragment_id, sample, &fname);
2043 gst_sample_unref (sample);
2045 gst_caps_unref (caps);
2047 if (fname == NULL) {
2048 /* Fallback to the old signal if the new one returned nothing */
2049 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
2050 splitmux->fragment_id, &fname);
2054 fname = splitmux->location ?
2055 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
2058 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
2059 g_object_set (splitmux->sink, "location", fname, NULL);
2062 splitmux->fragment_id++;
2067 do_async_start (GstSplitMuxSink * splitmux)
2069 GstMessage *message;
2071 if (!splitmux->need_async_start) {
2072 GST_INFO_OBJECT (splitmux, "no async_start needed");
2076 splitmux->async_pending = TRUE;
2078 GST_INFO_OBJECT (splitmux, "Sending async_start message");
2079 message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
2080 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2081 (splitmux), message);
2085 do_async_done (GstSplitMuxSink * splitmux)
2087 GstMessage *message;
2089 if (splitmux->async_pending) {
2090 GST_INFO_OBJECT (splitmux, "Sending async_done message");
2092 gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
2093 GST_CLOCK_TIME_NONE);
2094 GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
2095 (splitmux), message);
2097 splitmux->async_pending = FALSE;
2100 splitmux->need_async_start = FALSE;
2103 static GstStateChangeReturn
2104 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
2106 GstStateChangeReturn ret;
2107 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
2109 switch (transition) {
2110 case GST_STATE_CHANGE_NULL_TO_READY:{
2111 GST_SPLITMUX_LOCK (splitmux);
2112 if (!create_muxer (splitmux) || !create_sink (splitmux)) {
2113 ret = GST_STATE_CHANGE_FAILURE;
2114 GST_SPLITMUX_UNLOCK (splitmux);
2117 GST_SPLITMUX_UNLOCK (splitmux);
2118 splitmux->fragment_id = 0;
2121 case GST_STATE_CHANGE_READY_TO_PAUSED:{
2122 GST_SPLITMUX_LOCK (splitmux);
2123 /* Start by collecting one input on each pad */
2124 splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
2125 splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
2126 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
2127 splitmux->gop_start_time = splitmux->fragment_start_time =
2128 GST_CLOCK_STIME_NONE;
2129 splitmux->muxed_out_bytes = 0;
2130 GST_SPLITMUX_UNLOCK (splitmux);
2133 case GST_STATE_CHANGE_PAUSED_TO_READY:
2134 case GST_STATE_CHANGE_READY_TO_NULL:
2135 GST_SPLITMUX_LOCK (splitmux);
2136 splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
2137 splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
2138 /* Wake up any blocked threads */
2139 GST_LOG_OBJECT (splitmux,
2140 "State change -> NULL or READY. Waking threads");
2141 GST_SPLITMUX_BROADCAST_INPUT (splitmux);
2142 GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
2143 GST_SPLITMUX_UNLOCK (splitmux);
2149 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2150 if (ret == GST_STATE_CHANGE_FAILURE)
2153 switch (transition) {
2154 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2155 splitmux->need_async_start = TRUE;
2157 case GST_STATE_CHANGE_READY_TO_PAUSED:{
2158 /* Change state async, because our child sink might not
2159 * be ready to do that for us yet if it's state is still locked */
2161 splitmux->need_async_start = TRUE;
2162 /* we want to go async to PAUSED until we managed to configure and add the
2164 GST_SPLITMUX_LOCK (splitmux);
2165 do_async_start (splitmux);
2166 GST_SPLITMUX_UNLOCK (splitmux);
2167 ret = GST_STATE_CHANGE_ASYNC;
2170 case GST_STATE_CHANGE_READY_TO_NULL:
2171 GST_SPLITMUX_LOCK (splitmux);
2172 splitmux->fragment_id = 0;
2173 /* Reset internal elements only if no pad contexts are using them */
2174 if (splitmux->contexts == NULL)
2175 gst_splitmux_reset (splitmux);
2176 do_async_done (splitmux);
2177 GST_SPLITMUX_UNLOCK (splitmux);
2184 if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
2185 ret == GST_STATE_CHANGE_FAILURE) {
2186 /* Cleanup elements on failed transition out of NULL */
2187 gst_splitmux_reset (splitmux);
2188 GST_SPLITMUX_LOCK (splitmux);
2189 do_async_done (splitmux);
2190 GST_SPLITMUX_UNLOCK (splitmux);
2196 register_splitmuxsink (GstPlugin * plugin)
2198 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
2199 "Split File Muxing Sink");
2201 return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
2202 GST_TYPE_SPLITMUX_SINK);
2206 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
2208 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
2209 splitmux->fragment_id = 0;