1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @short_description: Muxer wrapper for splitting output stream by size or time
24 * This element wraps a muxer and a sink, and starts a new file when the mux
25 * contents are about to cross a threshold of maximum size of maximum time,
26 * splitting at video keyframe boundaries. Exactly one input video stream
27 * can be muxed, with as many accompanying audio and subtitle streams as
30 * By default, it uses mp4mux and filesink, but they can be changed via
31 * the 'muxer' and 'sink' properties.
33 * The minimum file size is 1 GOP, however - so limits may be overrun if the
34 * distance between any 2 keyframes is larger than the limits.
36 * If a video stream is available, the splitting process is driven by the video
37 * stream contents, and the video stream must contain closed GOPs for the output
38 * file parts to be played individually correctly. In the absence of a video
39 * stream, the first available stream is used as reference for synchronization.
42 * <title>Example pipelines</title>
44 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
46 * Records a video stream captured from a v4l2 device and muxes it into
47 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48 * and 1MB maximum size.
57 #include <glib/gstdio.h>
58 #include "gstsplitmuxsink.h"
60 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
61 #define GST_CAT_DEFAULT splitmux_debug
63 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
64 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
65 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
66 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
80 #define DEFAULT_MAX_SIZE_TIME 0
81 #define DEFAULT_MAX_SIZE_BYTES 0
82 #define DEFAULT_MAX_FILES 0
83 #define DEFAULT_MUXER_OVERHEAD 0.02
84 #define DEFAULT_MUXER "mp4mux"
85 #define DEFAULT_SINK "filesink"
89 SIGNAL_FORMAT_LOCATION,
93 static guint signals[SIGNAL_LAST];
95 static GstStaticPadTemplate video_sink_template =
96 GST_STATIC_PAD_TEMPLATE ("video",
100 static GstStaticPadTemplate audio_sink_template =
101 GST_STATIC_PAD_TEMPLATE ("audio_%u",
104 GST_STATIC_CAPS_ANY);
105 static GstStaticPadTemplate subtitle_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
109 GST_STATIC_CAPS_ANY);
111 static GQuark PAD_CONTEXT;
116 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
119 #define gst_splitmux_sink_parent_class parent_class
120 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
123 static gboolean create_elements (GstSplitMuxSink * splitmux);
124 static gboolean create_sink (GstSplitMuxSink * splitmux);
125 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
126 const GValue * value, GParamSpec * pspec);
127 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
128 GValue * value, GParamSpec * pspec);
129 static void gst_splitmux_sink_dispose (GObject * object);
130 static void gst_splitmux_sink_finalize (GObject * object);
132 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
133 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
134 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
136 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
137 element, GstStateChange transition);
139 static void bus_handler (GstBin * bin, GstMessage * msg);
140 static void set_next_filename (GstSplitMuxSink * splitmux);
141 static void start_next_fragment (GstSplitMuxSink * splitmux);
142 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
143 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
145 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
148 mq_stream_buf_new (void)
150 return g_slice_new0 (MqStreamBuf);
154 mq_stream_buf_free (MqStreamBuf * data)
156 g_slice_free (MqStreamBuf, data);
160 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
162 GObjectClass *gobject_class = (GObjectClass *) klass;
163 GstElementClass *gstelement_class = (GstElementClass *) klass;
164 GstBinClass *gstbin_class = (GstBinClass *) klass;
166 gobject_class->set_property = gst_splitmux_sink_set_property;
167 gobject_class->get_property = gst_splitmux_sink_get_property;
168 gobject_class->dispose = gst_splitmux_sink_dispose;
169 gobject_class->finalize = gst_splitmux_sink_finalize;
171 gst_element_class_set_static_metadata (gstelement_class,
172 "Split Muxing Bin", "Generic/Bin/Muxer",
173 "Convenience bin that muxes incoming streams into multiple time/size limited files",
174 "Jan Schmidt <jan@centricular.com>");
176 gst_element_class_add_static_pad_template (gstelement_class,
177 &video_sink_template);
178 gst_element_class_add_static_pad_template (gstelement_class,
179 &audio_sink_template);
180 gst_element_class_add_static_pad_template (gstelement_class,
181 &subtitle_sink_template);
183 gstelement_class->change_state =
184 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
185 gstelement_class->request_new_pad =
186 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
187 gstelement_class->release_pad =
188 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
190 gstbin_class->handle_message = bus_handler;
192 g_object_class_install_property (gobject_class, PROP_LOCATION,
193 g_param_spec_string ("location", "File Output Pattern",
194 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
195 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
197 g_param_spec_double ("mux-overhead", "Muxing Overhead",
198 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
199 DEFAULT_MUXER_OVERHEAD,
200 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
202 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
203 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
205 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
207 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
208 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
209 DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
211 g_param_spec_uint ("max-files", "Max files",
212 "Maximum number of files to keep on disk. Once the maximum is reached,"
213 "old files start to be deleted to make room for new ones.",
214 0, G_MAXUINT, DEFAULT_MAX_FILES,
215 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
218 g_object_class_install_property (gobject_class, PROP_MUXER,
219 g_param_spec_object ("muxer", "Muxer",
220 "The muxer element to use (NULL = default mp4mux)",
221 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222 g_object_class_install_property (gobject_class, PROP_SINK,
223 g_param_spec_object ("sink", "Sink",
224 "The sink element (or element chain) to use (NULL = default filesink)",
225 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
228 * GstSplitMuxSink::format-location:
229 * @splitmux: the #GstSplitMuxSink
230 * @fragment_id: the sequence number of the file to be created
232 * Returns: the location to be used for the next output file
234 signals[SIGNAL_FORMAT_LOCATION] =
235 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
236 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
240 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
242 g_mutex_init (&splitmux->lock);
243 g_cond_init (&splitmux->data_cond);
245 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
246 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
247 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
248 splitmux->max_files = DEFAULT_MAX_FILES;
250 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
251 g_object_set (splitmux, "async-handling", TRUE, NULL);
255 gst_splitmux_reset (GstSplitMuxSink * splitmux)
258 gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
260 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
261 if (splitmux->active_sink)
262 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
264 splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
269 gst_splitmux_sink_dispose (GObject * object)
271 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
273 G_OBJECT_CLASS (parent_class)->dispose (object);
275 /* Calling parent dispose invalidates all child pointers */
276 splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
281 gst_splitmux_sink_finalize (GObject * object)
283 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
284 g_cond_clear (&splitmux->data_cond);
285 g_mutex_clear (&splitmux->lock);
286 if (splitmux->provided_sink)
287 gst_object_unref (splitmux->provided_sink);
288 if (splitmux->provided_muxer)
289 gst_object_unref (splitmux->provided_muxer);
291 g_free (splitmux->location);
293 /* Make sure to free any un-released contexts */
294 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
295 g_list_free (splitmux->contexts);
297 G_OBJECT_CLASS (parent_class)->finalize (object);
301 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
302 const GValue * value, GParamSpec * pspec)
304 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
308 GST_OBJECT_LOCK (splitmux);
309 g_free (splitmux->location);
310 splitmux->location = g_value_dup_string (value);
311 GST_OBJECT_UNLOCK (splitmux);
314 case PROP_MAX_SIZE_BYTES:
315 GST_OBJECT_LOCK (splitmux);
316 splitmux->threshold_bytes = g_value_get_uint64 (value);
317 GST_OBJECT_UNLOCK (splitmux);
319 case PROP_MAX_SIZE_TIME:
320 GST_OBJECT_LOCK (splitmux);
321 splitmux->threshold_time = g_value_get_uint64 (value);
322 GST_OBJECT_UNLOCK (splitmux);
325 GST_OBJECT_LOCK (splitmux);
326 splitmux->max_files = g_value_get_uint (value);
327 GST_OBJECT_UNLOCK (splitmux);
329 case PROP_MUXER_OVERHEAD:
330 GST_OBJECT_LOCK (splitmux);
331 splitmux->mux_overhead = g_value_get_double (value);
332 GST_OBJECT_UNLOCK (splitmux);
335 GST_OBJECT_LOCK (splitmux);
336 if (splitmux->provided_sink)
337 gst_object_unref (splitmux->provided_sink);
338 splitmux->provided_sink = g_value_dup_object (value);
339 GST_OBJECT_UNLOCK (splitmux);
342 GST_OBJECT_LOCK (splitmux);
343 if (splitmux->provided_muxer)
344 gst_object_unref (splitmux->provided_muxer);
345 splitmux->provided_muxer = g_value_dup_object (value);
346 GST_OBJECT_UNLOCK (splitmux);
349 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
355 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
356 GValue * value, GParamSpec * pspec)
358 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
362 GST_OBJECT_LOCK (splitmux);
363 g_value_set_string (value, splitmux->location);
364 GST_OBJECT_UNLOCK (splitmux);
366 case PROP_MAX_SIZE_BYTES:
367 GST_OBJECT_LOCK (splitmux);
368 g_value_set_uint64 (value, splitmux->threshold_bytes);
369 GST_OBJECT_UNLOCK (splitmux);
371 case PROP_MAX_SIZE_TIME:
372 GST_OBJECT_LOCK (splitmux);
373 g_value_set_uint64 (value, splitmux->threshold_time);
374 GST_OBJECT_UNLOCK (splitmux);
377 GST_OBJECT_LOCK (splitmux);
378 g_value_set_uint (value, splitmux->max_files);
379 GST_OBJECT_UNLOCK (splitmux);
381 case PROP_MUXER_OVERHEAD:
382 GST_OBJECT_LOCK (splitmux);
383 g_value_set_double (value, splitmux->mux_overhead);
384 GST_OBJECT_UNLOCK (splitmux);
387 GST_OBJECT_LOCK (splitmux);
388 g_value_set_object (value, splitmux->provided_sink);
389 GST_OBJECT_UNLOCK (splitmux);
392 GST_OBJECT_LOCK (splitmux);
393 g_value_set_object (value, splitmux->provided_muxer);
394 GST_OBJECT_UNLOCK (splitmux);
397 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
403 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
405 gchar *tmp, *sinkname, *srcname;
408 sinkname = gst_pad_get_name (sink_pad);
410 srcname = g_strdup_printf ("src_%s", tmp);
412 mq_src = gst_element_get_static_pad (mq, srcname);
421 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
427 /* Request a pad from multiqueue, then connect this one, then
428 * discover the corresponding output pad and return both */
429 mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
433 mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
443 gst_element_release_request_pad (splitmux->mq, mq_sink);
448 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
452 ctx = g_new0 (MqStreamCtx, 1);
453 g_atomic_int_set (&ctx->refcount, 1);
454 ctx->splitmux = splitmux;
455 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
456 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
457 ctx->in_running_time = ctx->out_running_time = 0;
458 g_queue_init (&ctx->queued_bufs);
463 mq_stream_ctx_free (MqStreamCtx * ctx)
465 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
466 g_queue_clear (&ctx->queued_bufs);
471 mq_stream_ctx_unref (MqStreamCtx * ctx)
473 if (g_atomic_int_dec_and_test (&ctx->refcount))
474 mq_stream_ctx_free (ctx);
478 mq_stream_ctx_ref (MqStreamCtx * ctx)
480 g_atomic_int_inc (&ctx->refcount);
484 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
486 ctx->sink_pad_block_id = 0;
487 mq_stream_ctx_unref (ctx);
491 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
493 ctx->src_pad_block_id = 0;
494 mq_stream_ctx_unref (ctx);
498 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
500 gchar *location = NULL;
502 const gchar *msg_name = opened ?
503 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
505 g_object_get (splitmux->sink, "location", &location, NULL);
507 msg = gst_message_new_element (GST_OBJECT (splitmux),
508 gst_structure_new (msg_name,
509 "location", G_TYPE_STRING, location,
510 "running-time", GST_TYPE_CLOCK_TIME,
511 splitmux->reference_ctx->out_running_time, NULL));
512 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
517 /* Called with lock held, drops the lock to send EOS to the
521 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
526 eos = gst_event_new_eos ();
527 pad = gst_pad_get_peer (ctx->srcpad);
531 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
532 GST_SPLITMUX_UNLOCK (splitmux);
533 gst_pad_send_event (pad, eos);
534 GST_SPLITMUX_LOCK (splitmux);
536 gst_object_unref (pad);
539 /* Called with splitmux lock held to check if this output
540 * context needs to sleep to wait for the release of the
541 * next GOP, or to send EOS to close out the current file
544 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
548 GST_LOG_OBJECT (ctx->srcpad,
549 "Checking running time %" GST_TIME_FORMAT " against max %"
550 GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
551 GST_TIME_ARGS (splitmux->max_out_running_time));
553 if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
554 ctx->out_running_time < splitmux->max_out_running_time) {
555 splitmux->have_muxed_something = TRUE;
559 if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
562 if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
563 if (ctx->out_eos == FALSE) {
564 send_eos (splitmux, ctx);
567 } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
568 start_next_fragment (splitmux);
572 GST_INFO_OBJECT (ctx->srcpad,
573 "Sleeping for running time %"
574 GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
575 GST_TIME_ARGS (ctx->out_running_time),
576 GST_TIME_ARGS (splitmux->max_out_running_time));
577 ctx->out_blocked = TRUE;
578 /* Expand the mq if needed before sleeping */
579 check_queue_length (splitmux, ctx);
580 GST_SPLITMUX_WAIT (splitmux);
581 ctx->out_blocked = FALSE;
582 GST_INFO_OBJECT (ctx->srcpad,
583 "Woken for new max running time %" GST_TIME_FORMAT,
584 GST_TIME_ARGS (splitmux->max_out_running_time));
588 static GstPadProbeReturn
589 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
591 GstSplitMuxSink *splitmux = ctx->splitmux;
592 MqStreamBuf *buf_info = NULL;
594 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
596 /* FIXME: Handle buffer lists, until then make it clear they won't work */
597 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
598 g_warning ("Buffer list handling not implemented");
599 return GST_PAD_PROBE_DROP;
601 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
602 GstEvent *event = gst_pad_probe_info_get_event (info);
604 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
606 switch (GST_EVENT_TYPE (event)) {
607 case GST_EVENT_SEGMENT:
608 gst_event_copy_segment (event, &ctx->out_segment);
610 case GST_EVENT_FLUSH_STOP:
611 GST_SPLITMUX_LOCK (splitmux);
612 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
613 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
614 g_queue_clear (&ctx->queued_bufs);
615 ctx->flushing = FALSE;
616 GST_SPLITMUX_UNLOCK (splitmux);
618 case GST_EVENT_FLUSH_START:
619 GST_SPLITMUX_LOCK (splitmux);
620 GST_LOG_OBJECT (pad, "Flush start");
621 ctx->flushing = TRUE;
622 GST_SPLITMUX_BROADCAST (splitmux);
623 GST_SPLITMUX_UNLOCK (splitmux);
626 GST_SPLITMUX_LOCK (splitmux);
627 if (splitmux->state == SPLITMUX_STATE_STOPPED)
630 GST_SPLITMUX_UNLOCK (splitmux);
635 gst_event_parse_gap (event, &gap_ts, NULL);
636 if (gap_ts == GST_CLOCK_TIME_NONE)
639 GST_SPLITMUX_LOCK (splitmux);
641 gap_ts = gst_segment_to_running_time (&ctx->out_segment,
642 GST_FORMAT_TIME, gap_ts);
644 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
645 GST_TIME_ARGS (gap_ts));
647 if (splitmux->state == SPLITMUX_STATE_STOPPED)
649 ctx->out_running_time = gap_ts;
650 complete_or_wait_on_out (splitmux, ctx);
651 GST_SPLITMUX_UNLOCK (splitmux);
654 case GST_EVENT_CUSTOM_DOWNSTREAM:{
655 const GstStructure *s;
658 s = gst_event_get_structure (event);
659 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
662 gst_structure_get_uint64 (s, "timestamp", &ts);
664 GST_SPLITMUX_LOCK (splitmux);
666 if (splitmux->state == SPLITMUX_STATE_STOPPED)
668 ctx->out_running_time = ts;
669 complete_or_wait_on_out (splitmux, ctx);
670 GST_SPLITMUX_UNLOCK (splitmux);
671 return GST_PAD_PROBE_DROP;
676 return GST_PAD_PROBE_PASS;
679 /* Allow everything through until the configured next stopping point */
680 GST_SPLITMUX_LOCK (splitmux);
682 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
683 if (buf_info == NULL)
684 /* Can only happen due to a poorly timed flush */
687 /* If we have popped a keyframe, decrement the queued_gop count */
688 if (buf_info->keyframe && splitmux->queued_gops > 0)
689 splitmux->queued_gops--;
691 ctx->out_running_time = buf_info->run_ts;
693 GST_LOG_OBJECT (splitmux,
694 "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
695 " size %" G_GSIZE_FORMAT,
696 pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
698 if (splitmux->opening_first_fragment) {
699 send_fragment_opened_closed_msg (splitmux, TRUE);
700 splitmux->opening_first_fragment = FALSE;
703 complete_or_wait_on_out (splitmux, ctx);
705 if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
706 splitmux->muxed_out_time < buf_info->run_ts)
707 splitmux->muxed_out_time = buf_info->run_ts;
709 splitmux->muxed_out_bytes += buf_info->buf_size;
711 #ifndef GST_DISABLE_GST_DEBUG
713 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
714 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
715 " run ts %" GST_TIME_FORMAT, buf,
716 GST_TIME_ARGS (ctx->out_running_time));
720 GST_SPLITMUX_UNLOCK (splitmux);
722 mq_stream_buf_free (buf_info);
724 return GST_PAD_PROBE_PASS;
727 GST_SPLITMUX_UNLOCK (splitmux);
728 return GST_PAD_PROBE_DROP;
732 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
734 return gst_pad_send_event (peer, gst_event_ref (*event));
738 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
740 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
742 gst_pad_sticky_events_foreach (ctx->srcpad,
743 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
745 /* Clear EOS flag if not actually EOS */
746 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
748 gst_object_unref (peer);
751 /* Called with lock held when a fragment
752 * reaches EOS and it is time to restart
756 start_next_fragment (GstSplitMuxSink * splitmux)
758 /* 1 change to new file */
759 gst_element_set_locked_state (splitmux->muxer, TRUE);
760 gst_element_set_locked_state (splitmux->active_sink, TRUE);
761 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
762 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
764 set_next_filename (splitmux);
766 gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
767 gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
768 gst_element_set_locked_state (splitmux->muxer, FALSE);
769 gst_element_set_locked_state (splitmux->active_sink, FALSE);
771 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
773 /* Switch state and go back to processing */
774 if (!splitmux->reference_ctx->in_eos) {
775 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
776 splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
778 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
779 splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
780 splitmux->have_muxed_something = FALSE;
782 splitmux->have_muxed_something =
783 (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
785 /* Store the overflow parameters as the basis for the next fragment */
786 splitmux->mux_start_time = splitmux->muxed_out_time;
787 splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
789 GST_DEBUG_OBJECT (splitmux,
790 "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
791 GST_TIME_ARGS (splitmux->max_out_running_time));
793 send_fragment_opened_closed_msg (splitmux, TRUE);
795 GST_SPLITMUX_BROADCAST (splitmux);
799 bus_handler (GstBin * bin, GstMessage * message)
801 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
803 switch (GST_MESSAGE_TYPE (message)) {
804 case GST_MESSAGE_EOS:
805 /* If the state is draining out the current file, drop this EOS */
806 GST_SPLITMUX_LOCK (splitmux);
808 send_fragment_opened_closed_msg (splitmux, FALSE);
810 if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
811 splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
812 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
813 splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
814 GST_SPLITMUX_BROADCAST (splitmux);
816 gst_message_unref (message);
817 GST_SPLITMUX_UNLOCK (splitmux);
820 GST_SPLITMUX_UNLOCK (splitmux);
826 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
829 /* Called with splitmux lock held */
830 /* Called when entering ProcessingCompleteGop state
831 * Assess if mq contents overflowed the current file
832 * -> If yes, need to switch to new file
833 * -> if no, set max_out_running_time to let this GOP in and
834 * go to COLLECTING_GOP_START state
837 handle_gathered_gop (GstSplitMuxSink * splitmux)
840 gsize queued_bytes = 0;
841 GstClockTime queued_time = 0;
843 /* Assess if the multiqueue contents overflowed the current file */
844 for (cur = g_list_first (splitmux->contexts);
845 cur != NULL; cur = g_list_next (cur)) {
846 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
847 if (tmpctx->in_running_time > queued_time)
848 queued_time = tmpctx->in_running_time;
849 queued_bytes += tmpctx->in_bytes;
852 g_assert (queued_bytes >= splitmux->mux_start_bytes);
853 g_assert (queued_time >= splitmux->mux_start_time);
855 queued_bytes -= splitmux->mux_start_bytes;
856 queued_time -= splitmux->mux_start_time;
858 /* Expand queued bytes estimate by muxer overhead */
859 queued_bytes += (queued_bytes * splitmux->mux_overhead);
861 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
862 " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
864 /* Check for overrun - have we output at least one byte and overrun
865 * either threshold? */
866 if ((splitmux->have_muxed_something &&
867 ((splitmux->threshold_bytes > 0 &&
868 queued_bytes >= splitmux->threshold_bytes) ||
869 (splitmux->threshold_time > 0 &&
870 queued_time >= splitmux->threshold_time)))) {
872 splitmux->state = SPLITMUX_STATE_ENDING_FILE;
874 GST_INFO_OBJECT (splitmux,
875 "mq overflowed since last, draining out. max out TS is %"
876 GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
877 GST_SPLITMUX_BROADCAST (splitmux);
881 GST_LOG_OBJECT (splitmux,
882 "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
883 " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
884 splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
885 queued_bytes, GST_TIME_ARGS (queued_time));
887 /* Wake everyone up to push this one GOP, then sleep */
888 splitmux->have_muxed_something = TRUE;
890 if (!splitmux->reference_ctx->in_eos) {
891 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
892 splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
894 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
895 splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
898 GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
899 GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
900 GST_SPLITMUX_BROADCAST (splitmux);
905 /* Called with splitmux lock held */
906 /* Called from each input pad when it is has all the pieces
907 * for a GOP or EOS, starting with the reference pad which has set the
908 * splitmux->max_in_running_time
911 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
914 gboolean ready = TRUE;
915 GstClockTime current_max_in_running_time;
917 if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
918 /* Iterate each pad, and check that the input running time is at least
919 * up to the reference running time, and if so handle the collected GOP */
920 GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
921 GST_TIME_FORMAT " ctx %p",
922 GST_TIME_ARGS (splitmux->max_in_running_time), ctx);
923 for (cur = g_list_first (splitmux->contexts); cur != NULL;
924 cur = g_list_next (cur)) {
925 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
927 GST_LOG_OBJECT (splitmux,
928 "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
929 " EOS %d", tmpctx, tmpctx->srcpad,
930 GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
932 if (splitmux->max_in_running_time != GST_CLOCK_TIME_NONE &&
933 tmpctx->in_running_time < splitmux->max_in_running_time &&
935 GST_LOG_OBJECT (splitmux,
936 "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
937 tmpctx, tmpctx->srcpad);
943 GST_DEBUG_OBJECT (splitmux,
944 "Collected GOP is complete. Processing (ctx %p)", ctx);
945 /* All pads have a complete GOP, release it into the multiqueue */
946 handle_gathered_gop (splitmux);
950 /* If upstream reached EOS we are not expecting more data, no need to wait
955 /* Some pad is not yet ready, or GOP is being pushed
956 * either way, sleep and wait to get woken */
957 current_max_in_running_time = splitmux->max_in_running_time;
958 while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
959 splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
961 (current_max_in_running_time == splitmux->max_in_running_time)) {
963 GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
964 splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
965 "GOP complete" : "EOF draining", ctx);
966 GST_SPLITMUX_WAIT (splitmux);
968 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
973 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
976 guint cur_len = g_queue_get_length (&ctx->queued_bufs);
978 GST_DEBUG_OBJECT (ctx->sinkpad,
979 "Checking queue length len %u cur_max %u queued gops %u",
980 cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
982 if (cur_len >= splitmux->mq_max_buffers) {
983 gboolean allow_grow = FALSE;
985 /* If collecting a GOP and this pad might block,
986 * and there isn't already a pending GOP in the queue
989 if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
990 ctx->in_running_time < splitmux->max_in_running_time &&
991 splitmux->queued_gops <= 1) {
993 } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
994 ctx->is_reference && splitmux->queued_gops <= 1) {
999 for (cur = g_list_first (splitmux->contexts);
1000 cur != NULL; cur = g_list_next (cur)) {
1001 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1002 GST_DEBUG_OBJECT (tmpctx->sinkpad,
1003 " len %u out_blocked %d",
1004 g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
1005 /* If another stream is starving, grow */
1006 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1013 splitmux->mq_max_buffers = cur_len + 1;
1015 GST_INFO_OBJECT (splitmux,
1016 "Multiqueue overrun - enlarging to %u buffers ctx %p",
1017 splitmux->mq_max_buffers, ctx);
1019 g_object_set (splitmux->mq, "max-size-buffers",
1020 splitmux->mq_max_buffers, NULL);
1025 static GstPadProbeReturn
1026 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1028 GstSplitMuxSink *splitmux = ctx->splitmux;
1030 MqStreamBuf *buf_info = NULL;
1032 gboolean loop_again;
1033 gboolean keyframe = FALSE;
1035 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1037 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1038 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1039 g_warning ("Buffer list handling not implemented");
1040 return GST_PAD_PROBE_DROP;
1042 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1043 GstEvent *event = gst_pad_probe_info_get_event (info);
1044 switch (GST_EVENT_TYPE (event)) {
1045 case GST_EVENT_SEGMENT:
1046 gst_event_copy_segment (event, &ctx->in_segment);
1048 case GST_EVENT_FLUSH_STOP:
1049 GST_SPLITMUX_LOCK (splitmux);
1050 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1051 ctx->in_eos = FALSE;
1053 ctx->in_running_time = 0;
1054 GST_SPLITMUX_UNLOCK (splitmux);
1057 GST_SPLITMUX_LOCK (splitmux);
1060 if (splitmux->state == SPLITMUX_STATE_STOPPED)
1063 if (ctx->is_reference) {
1064 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1065 /* Act as if this is a new keyframe with infinite timestamp */
1066 splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
1067 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1068 /* Wake up other input pads to collect this GOP */
1069 GST_SPLITMUX_BROADCAST (splitmux);
1070 check_completed_gop (splitmux, ctx);
1071 } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1072 /* If we are waiting for a GOP to be completed (ie, for aux
1073 * pads to catch up), then this pad is complete, so check
1074 * if the whole GOP is.
1076 check_completed_gop (splitmux, ctx);
1078 GST_SPLITMUX_UNLOCK (splitmux);
1083 return GST_PAD_PROBE_PASS;
1086 buf = gst_pad_probe_info_get_buffer (info);
1087 buf_info = mq_stream_buf_new ();
1089 if (GST_BUFFER_PTS_IS_VALID (buf))
1090 ts = GST_BUFFER_PTS (buf);
1092 ts = GST_BUFFER_DTS (buf);
1094 GST_SPLITMUX_LOCK (splitmux);
1096 if (splitmux->state == SPLITMUX_STATE_STOPPED)
1099 /* If this buffer has a timestamp, advance the input timestamp of the
1101 if (GST_CLOCK_TIME_IS_VALID (ts)) {
1102 GstClockTime running_time =
1103 gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1104 GST_BUFFER_TIMESTAMP (buf));
1106 if (GST_CLOCK_TIME_IS_VALID (running_time) &&
1107 (ctx->in_running_time == GST_CLOCK_TIME_NONE
1108 || running_time > ctx->in_running_time))
1109 ctx->in_running_time = running_time;
1112 /* Try to make sure we have a valid running time */
1113 if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
1114 ctx->in_running_time =
1115 gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1116 ctx->in_segment.start);
1119 buf_info->run_ts = ctx->in_running_time;
1120 buf_info->buf_size = gst_buffer_get_size (buf);
1122 /* Update total input byte counter for overflow detect */
1123 ctx->in_bytes += buf_info->buf_size;
1125 /* initialize mux_start_time */
1126 if (ctx->is_reference && splitmux->mux_start_time == 0)
1127 splitmux->mux_start_time = buf_info->run_ts;
1129 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
1130 " total in_bytes %" G_GSIZE_FORMAT,
1131 GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1138 switch (splitmux->state) {
1139 case SPLITMUX_STATE_COLLECTING_GOP_START:
1140 if (ctx->is_reference) {
1141 /* If a keyframe, we have a complete GOP */
1142 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1143 !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
1144 splitmux->max_in_running_time >= ctx->in_running_time) {
1145 /* Pass this buffer through */
1149 GST_INFO_OBJECT (pad,
1150 "Have keyframe with running time %" GST_TIME_FORMAT,
1151 GST_TIME_ARGS (ctx->in_running_time));
1153 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1154 splitmux->max_in_running_time = ctx->in_running_time;
1155 /* Wake up other input pads to collect this GOP */
1156 GST_SPLITMUX_BROADCAST (splitmux);
1157 check_completed_gop (splitmux, ctx);
1159 /* We're still waiting for a keyframe on the reference pad, sleep */
1160 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1161 GST_SPLITMUX_WAIT (splitmux);
1162 GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1166 case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1167 /* After a GOP start is found, this buffer might complete the GOP */
1168 /* If we overran the target timestamp, it might be time to process
1169 * the GOP, otherwise bail out for more data
1171 GST_LOG_OBJECT (pad,
1172 "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
1173 GST_TIME_ARGS (ctx->in_running_time),
1174 GST_TIME_ARGS (splitmux->max_in_running_time));
1176 if (ctx->in_running_time < splitmux->max_in_running_time) {
1181 GST_LOG_OBJECT (pad,
1182 "Collected last packet of GOP. Checking other pads");
1183 check_completed_gop (splitmux, ctx);
1185 case SPLITMUX_STATE_ENDING_FILE:{
1188 /* If somes streams received no buffer during the last GOP that overran,
1189 * because its next buffer has a timestamp bigger than
1190 * ctx->max_in_running_time, its queue is empty. In that case the only
1191 * way to wakeup the output thread is by injecting an event in the
1192 * queue. This usually happen with subtitle streams.
1193 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1194 GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1195 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1196 GST_EVENT_TYPE_SERIALIZED,
1197 gst_structure_new ("splitmuxsink-unblock", "timestamp",
1198 G_TYPE_UINT64, splitmux->max_in_running_time, NULL));
1199 gst_pad_send_event (ctx->sinkpad, event);
1202 case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1203 /* A fragment is ending, wait until that's done before continuing */
1204 GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1205 GST_SPLITMUX_WAIT (splitmux);
1206 GST_DEBUG_OBJECT (pad,
1207 "Done sleeping for fragment restart state now %d", splitmux->state);
1213 } while (loop_again);
1216 splitmux->queued_gops++;
1217 buf_info->keyframe = TRUE;
1220 /* Now add this buffer to the queue just before returning */
1221 g_queue_push_head (&ctx->queued_bufs, buf_info);
1223 /* Check the buffer will fit in the mq */
1224 check_queue_length (splitmux, ctx);
1226 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1227 " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
1229 GST_SPLITMUX_UNLOCK (splitmux);
1230 return GST_PAD_PROBE_PASS;
1233 GST_SPLITMUX_UNLOCK (splitmux);
1235 mq_stream_buf_free (buf_info);
1236 return GST_PAD_PROBE_PASS;
1240 gst_splitmux_sink_request_new_pad (GstElement * element,
1241 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1243 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1244 GstPadTemplate *mux_template = NULL;
1246 GstPad *mq_sink, *mq_src;
1248 gboolean is_video = FALSE;
1251 GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1253 GST_SPLITMUX_LOCK (splitmux);
1254 if (!create_elements (splitmux))
1257 if (templ->name_template) {
1258 if (g_str_equal (templ->name_template, "video")) {
1259 /* FIXME: Look for a pad template with matching caps, rather than by name */
1261 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1262 (splitmux->muxer), "video_%u");
1267 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1268 (splitmux->muxer), templ->name_template);
1270 if (mux_template == NULL) {
1271 /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1273 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1274 (splitmux->muxer), "sink_%d");
1278 res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1283 gname = g_strdup ("video");
1284 else if (name == NULL)
1285 gname = gst_pad_get_name (res);
1287 gname = g_strdup (name);
1289 if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1290 gst_element_release_request_pad (splitmux->muxer, res);
1291 gst_object_unref (GST_OBJECT (res));
1295 if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1296 gst_element_release_request_pad (splitmux->muxer, res);
1297 gst_object_unref (GST_OBJECT (res));
1298 gst_element_release_request_pad (splitmux->mq, mq_sink);
1299 gst_object_unref (GST_OBJECT (mq_sink));
1303 gst_object_unref (GST_OBJECT (res));
1305 ctx = mq_stream_ctx_new (splitmux);
1306 ctx->srcpad = mq_src;
1307 ctx->sinkpad = mq_sink;
1309 mq_stream_ctx_ref (ctx);
1310 ctx->src_pad_block_id =
1311 gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1312 (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1313 _pad_block_destroy_src_notify);
1314 if (is_video && splitmux->reference_ctx != NULL) {
1315 splitmux->reference_ctx->is_reference = FALSE;
1316 splitmux->reference_ctx = NULL;
1318 if (splitmux->reference_ctx == NULL) {
1319 splitmux->reference_ctx = ctx;
1320 ctx->is_reference = TRUE;
1323 res = gst_ghost_pad_new (gname, mq_sink);
1324 g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1326 mq_stream_ctx_ref (ctx);
1327 ctx->sink_pad_block_id =
1328 gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1329 (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1330 _pad_block_destroy_sink_notify);
1332 GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1333 " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1335 splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1339 gst_object_unref (mq_sink);
1340 gst_object_unref (mq_src);
1342 gst_pad_set_active (res, TRUE);
1343 gst_element_add_pad (element, res);
1344 GST_SPLITMUX_UNLOCK (splitmux);
1348 GST_SPLITMUX_UNLOCK (splitmux);
1353 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1355 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1356 GstPad *mqsink, *mqsrc, *muxpad;
1358 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1360 GST_SPLITMUX_LOCK (splitmux);
1362 if (splitmux->muxer == NULL || splitmux->mq == NULL)
1363 goto fail; /* Elements don't exist yet - nothing to release */
1365 GST_INFO_OBJECT (pad, "releasing request pad");
1367 mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1368 mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1369 muxpad = gst_pad_get_peer (mqsrc);
1371 /* Remove the context from our consideration */
1372 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1374 if (ctx->sink_pad_block_id)
1375 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1377 if (ctx->src_pad_block_id)
1378 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1380 /* Can release the context now */
1381 mq_stream_ctx_unref (ctx);
1383 /* Release and free the mq input */
1384 gst_element_release_request_pad (splitmux->mq, mqsink);
1386 /* Release and free the muxer input */
1387 gst_element_release_request_pad (splitmux->muxer, muxpad);
1389 gst_object_unref (mqsink);
1390 gst_object_unref (mqsrc);
1391 gst_object_unref (muxpad);
1393 gst_element_remove_pad (element, pad);
1395 /* Reset the internal elements only after all request pads are released */
1396 if (splitmux->contexts == NULL)
1397 gst_splitmux_reset (splitmux);
1400 GST_SPLITMUX_UNLOCK (splitmux);
1404 create_element (GstSplitMuxSink * splitmux,
1405 const gchar * factory, const gchar * name)
1407 GstElement *ret = gst_element_factory_make (factory, name);
1409 g_warning ("Failed to create %s - splitmuxsink will not work", name);
1413 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1414 g_warning ("Could not add %s element - splitmuxsink will not work", name);
1415 gst_object_unref (ret);
1423 create_elements (GstSplitMuxSink * splitmux)
1425 /* Create internal elements */
1426 if (splitmux->mq == NULL) {
1428 create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1431 splitmux->mq_max_buffers = 5;
1432 /* No bytes or time limit, we limit buffers manually */
1433 g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1434 (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1437 if (splitmux->muxer == NULL) {
1438 GstElement *provided_muxer = NULL;
1440 GST_OBJECT_LOCK (splitmux);
1441 if (splitmux->provided_muxer != NULL)
1442 provided_muxer = gst_object_ref (splitmux->provided_muxer);
1443 GST_OBJECT_UNLOCK (splitmux);
1445 if (provided_muxer == NULL) {
1446 if ((splitmux->muxer =
1447 create_element (splitmux, "mp4mux", "muxer")) == NULL)
1450 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1451 g_warning ("Could not add muxer element - splitmuxsink will not work");
1452 gst_object_unref (provided_muxer);
1456 splitmux->muxer = provided_muxer;
1457 gst_object_unref (provided_muxer);
1467 find_sink (GstElement * e)
1469 GstElement *res = NULL;
1471 gboolean done = FALSE;
1472 GValue data = { 0, };
1474 if (!GST_IS_BIN (e))
1477 iter = gst_bin_iterate_sinks (GST_BIN (e));
1479 switch (gst_iterator_next (iter, &data)) {
1480 case GST_ITERATOR_OK:
1482 GstElement *child = g_value_get_object (&data);
1483 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1484 "location") != NULL) {
1488 g_value_reset (&data);
1491 case GST_ITERATOR_RESYNC:
1492 gst_iterator_resync (iter);
1494 case GST_ITERATOR_DONE:
1497 case GST_ITERATOR_ERROR:
1498 g_assert_not_reached ();
1502 g_value_unset (&data);
1503 gst_iterator_free (iter);
1509 create_sink (GstSplitMuxSink * splitmux)
1511 GstElement *provided_sink = NULL;
1513 if (splitmux->active_sink == NULL) {
1515 GST_OBJECT_LOCK (splitmux);
1516 if (splitmux->provided_sink != NULL)
1517 provided_sink = gst_object_ref (splitmux->provided_sink);
1518 GST_OBJECT_UNLOCK (splitmux);
1520 if (provided_sink == NULL) {
1521 if ((splitmux->sink =
1522 create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1524 splitmux->active_sink = splitmux->sink;
1526 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1527 g_warning ("Could not add sink elements - splitmuxsink will not work");
1528 gst_object_unref (provided_sink);
1532 splitmux->active_sink = provided_sink;
1534 /* The bin holds a ref now, we can drop our tmp ref */
1535 gst_object_unref (provided_sink);
1537 /* Find the sink element */
1538 splitmux->sink = find_sink (splitmux->active_sink);
1539 if (splitmux->sink == NULL) {
1541 ("Could not locate sink element in provided sink - splitmuxsink will not work");
1546 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1547 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1558 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1561 set_next_filename (GstSplitMuxSink * splitmux)
1563 gchar *fname = NULL;
1564 gst_splitmux_sink_ensure_max_files (splitmux);
1566 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1567 splitmux->fragment_id, &fname);
1570 fname = splitmux->location ?
1571 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1574 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1575 g_object_set (splitmux->sink, "location", fname, NULL);
1578 splitmux->fragment_id++;
1582 static GstStateChangeReturn
1583 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1585 GstStateChangeReturn ret;
1586 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1588 switch (transition) {
1589 case GST_STATE_CHANGE_NULL_TO_READY:{
1590 GST_SPLITMUX_LOCK (splitmux);
1591 if (!create_elements (splitmux) || !create_sink (splitmux)) {
1592 ret = GST_STATE_CHANGE_FAILURE;
1593 GST_SPLITMUX_UNLOCK (splitmux);
1596 GST_SPLITMUX_UNLOCK (splitmux);
1597 splitmux->fragment_id = 0;
1598 set_next_filename (splitmux);
1601 case GST_STATE_CHANGE_READY_TO_PAUSED:{
1602 GST_SPLITMUX_LOCK (splitmux);
1603 /* Start by collecting one input on each pad */
1604 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1605 splitmux->max_in_running_time = 0;
1606 splitmux->muxed_out_time = splitmux->mux_start_time = 0;
1607 splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1608 splitmux->opening_first_fragment = TRUE;
1609 GST_SPLITMUX_UNLOCK (splitmux);
1612 case GST_STATE_CHANGE_PAUSED_TO_READY:
1613 case GST_STATE_CHANGE_READY_TO_NULL:
1614 GST_SPLITMUX_LOCK (splitmux);
1615 splitmux->state = SPLITMUX_STATE_STOPPED;
1616 /* Wake up any blocked threads */
1617 GST_LOG_OBJECT (splitmux,
1618 "State change -> NULL or READY. Waking threads");
1619 GST_SPLITMUX_BROADCAST (splitmux);
1620 GST_SPLITMUX_UNLOCK (splitmux);
1626 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1627 if (ret == GST_STATE_CHANGE_FAILURE)
1630 switch (transition) {
1631 case GST_STATE_CHANGE_READY_TO_NULL:
1632 GST_SPLITMUX_LOCK (splitmux);
1633 splitmux->fragment_id = 0;
1634 /* Reset internal elements only if no pad contexts are using them */
1635 if (splitmux->contexts == NULL)
1636 gst_splitmux_reset (splitmux);
1637 GST_SPLITMUX_UNLOCK (splitmux);
1645 if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1646 ret == GST_STATE_CHANGE_FAILURE) {
1647 /* Cleanup elements on failed transition out of NULL */
1648 gst_splitmux_reset (splitmux);
1654 register_splitmuxsink (GstPlugin * plugin)
1656 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1657 "Split File Muxing Sink");
1659 return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1660 GST_TYPE_SPLITMUX_SINK);
1664 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
1666 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
1667 splitmux->fragment_id = 0;