1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @short_description: Muxer wrapper for splitting output stream by size or time
24 * This element wraps a muxer and a sink, and starts a new file when the mux
25 * contents are about to cross a threshold of maximum size of maximum time,
26 * splitting at video keyframe boundaries. Exactly one input video stream
27 * can be muxed, with as many accompanying audio and subtitle streams as
30 * By default, it uses mp4mux and filesink, but they can be changed via
31 * the 'muxer' and 'sink' properties.
33 * The minimum file size is 1 GOP, however - so limits may be overrun if the
34 * distance between any 2 keyframes is larger than the limits.
36 * If a video stream is available, the splitting process is driven by the video
37 * stream contents, and the video stream must contain closed GOPs for the output
38 * file parts to be played individually correctly. In the absence of a video
39 * stream, the first available stream is used as reference for synchronization.
42 * <title>Example pipelines</title>
44 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
46 * Records a video stream captured from a v4l2 device and muxes it into
47 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48 * and 1MB maximum size.
57 #include <glib/gstdio.h>
58 #include "gstsplitmuxsink.h"
60 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
61 #define GST_CAT_DEFAULT splitmux_debug
63 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
64 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
65 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
66 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
80 #define DEFAULT_MAX_SIZE_TIME 0
81 #define DEFAULT_MAX_SIZE_BYTES 0
82 #define DEFAULT_MAX_FILES 0
83 #define DEFAULT_MUXER_OVERHEAD 0.02
84 #define DEFAULT_MUXER "mp4mux"
85 #define DEFAULT_SINK "filesink"
89 SIGNAL_FORMAT_LOCATION,
93 static guint signals[SIGNAL_LAST];
95 static GstStaticPadTemplate video_sink_template =
96 GST_STATIC_PAD_TEMPLATE ("video",
100 static GstStaticPadTemplate audio_sink_template =
101 GST_STATIC_PAD_TEMPLATE ("audio_%u",
104 GST_STATIC_CAPS_ANY);
105 static GstStaticPadTemplate subtitle_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
109 GST_STATIC_CAPS_ANY);
111 static GQuark PAD_CONTEXT;
116 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
119 #define gst_splitmux_sink_parent_class parent_class
120 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
123 static gboolean create_elements (GstSplitMuxSink * splitmux);
124 static gboolean create_sink (GstSplitMuxSink * splitmux);
125 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
126 const GValue * value, GParamSpec * pspec);
127 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
128 GValue * value, GParamSpec * pspec);
129 static void gst_splitmux_sink_dispose (GObject * object);
130 static void gst_splitmux_sink_finalize (GObject * object);
132 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
133 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
134 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
136 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
137 element, GstStateChange transition);
139 static void bus_handler (GstBin * bin, GstMessage * msg);
140 static void set_next_filename (GstSplitMuxSink * splitmux);
141 static void start_next_fragment (GstSplitMuxSink * splitmux);
142 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
143 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
145 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
148 mq_stream_buf_new (void)
150 return g_slice_new0 (MqStreamBuf);
154 mq_stream_buf_free (MqStreamBuf * data)
156 g_slice_free (MqStreamBuf, data);
160 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
162 GObjectClass *gobject_class = (GObjectClass *) klass;
163 GstElementClass *gstelement_class = (GstElementClass *) klass;
164 GstBinClass *gstbin_class = (GstBinClass *) klass;
166 gobject_class->set_property = gst_splitmux_sink_set_property;
167 gobject_class->get_property = gst_splitmux_sink_get_property;
168 gobject_class->dispose = gst_splitmux_sink_dispose;
169 gobject_class->finalize = gst_splitmux_sink_finalize;
171 gst_element_class_set_static_metadata (gstelement_class,
172 "Split Muxing Bin", "Generic/Bin/Muxer",
173 "Convenience bin that muxes incoming streams into multiple time/size limited files",
174 "Jan Schmidt <jan@centricular.com>");
176 gst_element_class_add_static_pad_template (gstelement_class,
177 &video_sink_template);
178 gst_element_class_add_static_pad_template (gstelement_class,
179 &audio_sink_template);
180 gst_element_class_add_static_pad_template (gstelement_class,
181 &subtitle_sink_template);
183 gstelement_class->change_state =
184 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
185 gstelement_class->request_new_pad =
186 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
187 gstelement_class->release_pad =
188 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
190 gstbin_class->handle_message = bus_handler;
192 g_object_class_install_property (gobject_class, PROP_LOCATION,
193 g_param_spec_string ("location", "File Output Pattern",
194 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
195 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
197 g_param_spec_double ("mux-overhead", "Muxing Overhead",
198 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
199 DEFAULT_MUXER_OVERHEAD,
200 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
202 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
203 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
205 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
207 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
208 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
209 DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
211 g_param_spec_uint ("max-files", "Max files",
212 "Maximum number of files to keep on disk. Once the maximum is reached,"
213 "old files start to be deleted to make room for new ones.",
214 0, G_MAXUINT, DEFAULT_MAX_FILES,
215 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
218 g_object_class_install_property (gobject_class, PROP_MUXER,
219 g_param_spec_object ("muxer", "Muxer",
220 "The muxer element to use (NULL = default mp4mux)",
221 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222 g_object_class_install_property (gobject_class, PROP_SINK,
223 g_param_spec_object ("sink", "Sink",
224 "The sink element (or element chain) to use (NULL = default filesink)",
225 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
228 * GstSplitMuxSink::format-location:
229 * @splitmux: the #GstSplitMuxSink
230 * @fragment_id: the sequence number of the file to be created
232 * Returns: the location to be used for the next output file
234 signals[SIGNAL_FORMAT_LOCATION] =
235 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
236 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
240 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
242 g_mutex_init (&splitmux->lock);
243 g_cond_init (&splitmux->data_cond);
245 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
246 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
247 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
248 splitmux->max_files = DEFAULT_MAX_FILES;
250 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
251 g_object_set (splitmux, "async-handling", TRUE, NULL);
255 gst_splitmux_reset (GstSplitMuxSink * splitmux)
258 gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
260 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
261 if (splitmux->active_sink)
262 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
264 splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
269 gst_splitmux_sink_dispose (GObject * object)
271 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
273 G_OBJECT_CLASS (parent_class)->dispose (object);
275 /* Calling parent dispose invalidates all child pointers */
276 splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
281 gst_splitmux_sink_finalize (GObject * object)
283 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
284 g_cond_clear (&splitmux->data_cond);
285 g_mutex_clear (&splitmux->lock);
286 if (splitmux->provided_sink)
287 gst_object_unref (splitmux->provided_sink);
288 if (splitmux->provided_muxer)
289 gst_object_unref (splitmux->provided_muxer);
291 g_free (splitmux->location);
293 /* Make sure to free any un-released contexts */
294 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
295 g_list_free (splitmux->contexts);
297 G_OBJECT_CLASS (parent_class)->finalize (object);
301 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
302 const GValue * value, GParamSpec * pspec)
304 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
308 GST_OBJECT_LOCK (splitmux);
309 g_free (splitmux->location);
310 splitmux->location = g_value_dup_string (value);
311 GST_OBJECT_UNLOCK (splitmux);
314 case PROP_MAX_SIZE_BYTES:
315 GST_OBJECT_LOCK (splitmux);
316 splitmux->threshold_bytes = g_value_get_uint64 (value);
317 GST_OBJECT_UNLOCK (splitmux);
319 case PROP_MAX_SIZE_TIME:
320 GST_OBJECT_LOCK (splitmux);
321 splitmux->threshold_time = g_value_get_uint64 (value);
322 GST_OBJECT_UNLOCK (splitmux);
325 GST_OBJECT_LOCK (splitmux);
326 splitmux->max_files = g_value_get_uint (value);
327 GST_OBJECT_UNLOCK (splitmux);
329 case PROP_MUXER_OVERHEAD:
330 GST_OBJECT_LOCK (splitmux);
331 splitmux->mux_overhead = g_value_get_double (value);
332 GST_OBJECT_UNLOCK (splitmux);
335 GST_OBJECT_LOCK (splitmux);
336 if (splitmux->provided_sink)
337 gst_object_unref (splitmux->provided_sink);
338 splitmux->provided_sink = g_value_dup_object (value);
339 GST_OBJECT_UNLOCK (splitmux);
342 GST_OBJECT_LOCK (splitmux);
343 if (splitmux->provided_muxer)
344 gst_object_unref (splitmux->provided_muxer);
345 splitmux->provided_muxer = g_value_dup_object (value);
346 GST_OBJECT_UNLOCK (splitmux);
349 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
355 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
356 GValue * value, GParamSpec * pspec)
358 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
362 GST_OBJECT_LOCK (splitmux);
363 g_value_set_string (value, splitmux->location);
364 GST_OBJECT_UNLOCK (splitmux);
366 case PROP_MAX_SIZE_BYTES:
367 GST_OBJECT_LOCK (splitmux);
368 g_value_set_uint64 (value, splitmux->threshold_bytes);
369 GST_OBJECT_UNLOCK (splitmux);
371 case PROP_MAX_SIZE_TIME:
372 GST_OBJECT_LOCK (splitmux);
373 g_value_set_uint64 (value, splitmux->threshold_time);
374 GST_OBJECT_UNLOCK (splitmux);
377 GST_OBJECT_LOCK (splitmux);
378 g_value_set_uint (value, splitmux->max_files);
379 GST_OBJECT_UNLOCK (splitmux);
381 case PROP_MUXER_OVERHEAD:
382 GST_OBJECT_LOCK (splitmux);
383 g_value_set_double (value, splitmux->mux_overhead);
384 GST_OBJECT_UNLOCK (splitmux);
387 GST_OBJECT_LOCK (splitmux);
388 g_value_set_object (value, splitmux->provided_sink);
389 GST_OBJECT_UNLOCK (splitmux);
392 GST_OBJECT_LOCK (splitmux);
393 g_value_set_object (value, splitmux->provided_muxer);
394 GST_OBJECT_UNLOCK (splitmux);
397 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
403 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
405 gchar *tmp, *sinkname, *srcname;
408 sinkname = gst_pad_get_name (sink_pad);
410 srcname = g_strdup_printf ("src_%s", tmp);
412 mq_src = gst_element_get_static_pad (mq, srcname);
421 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
427 /* Request a pad from multiqueue, then connect this one, then
428 * discover the corresponding output pad and return both */
429 mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
433 mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
443 gst_element_release_request_pad (splitmux->mq, mq_sink);
448 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
452 ctx = g_new0 (MqStreamCtx, 1);
453 g_atomic_int_set (&ctx->refcount, 1);
454 ctx->splitmux = splitmux;
455 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
456 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
457 ctx->in_running_time = ctx->out_running_time = 0;
458 g_queue_init (&ctx->queued_bufs);
463 mq_stream_ctx_free (MqStreamCtx * ctx)
465 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
466 g_queue_clear (&ctx->queued_bufs);
471 mq_stream_ctx_unref (MqStreamCtx * ctx)
473 if (g_atomic_int_dec_and_test (&ctx->refcount))
474 mq_stream_ctx_free (ctx);
478 mq_stream_ctx_ref (MqStreamCtx * ctx)
480 g_atomic_int_inc (&ctx->refcount);
484 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
486 ctx->sink_pad_block_id = 0;
487 mq_stream_ctx_unref (ctx);
491 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
493 ctx->src_pad_block_id = 0;
494 mq_stream_ctx_unref (ctx);
498 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
500 gchar *location = NULL;
502 const gchar *msg_name = opened ?
503 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
505 g_object_get (splitmux->sink, "location", &location, NULL);
507 msg = gst_message_new_element (GST_OBJECT (splitmux),
508 gst_structure_new (msg_name,
509 "location", G_TYPE_STRING, location,
510 "running-time", GST_TYPE_CLOCK_TIME,
511 splitmux->reference_ctx->out_running_time, NULL));
512 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
517 /* Called with lock held, drops the lock to send EOS to the
521 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
526 eos = gst_event_new_eos ();
527 pad = gst_pad_get_peer (ctx->srcpad);
531 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
532 GST_SPLITMUX_UNLOCK (splitmux);
533 gst_pad_send_event (pad, eos);
534 GST_SPLITMUX_LOCK (splitmux);
536 gst_object_unref (pad);
539 /* Called with splitmux lock held to check if this output
540 * context needs to sleep to wait for the release of the
541 * next GOP, or to send EOS to close out the current file
544 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
548 GST_LOG_OBJECT (ctx->srcpad,
549 "Checking running time %" GST_TIME_FORMAT " against max %"
550 GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
551 GST_TIME_ARGS (splitmux->max_out_running_time));
553 if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
554 ctx->out_running_time < splitmux->max_out_running_time) {
555 splitmux->have_muxed_something = TRUE;
559 if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
562 if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
563 if (ctx->out_eos == FALSE) {
564 send_eos (splitmux, ctx);
567 } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
568 start_next_fragment (splitmux);
572 GST_INFO_OBJECT (ctx->srcpad,
573 "Sleeping for running time %"
574 GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
575 GST_TIME_ARGS (ctx->out_running_time),
576 GST_TIME_ARGS (splitmux->max_out_running_time));
577 ctx->out_blocked = TRUE;
578 /* Expand the mq if needed before sleeping */
579 check_queue_length (splitmux, ctx);
580 GST_SPLITMUX_WAIT (splitmux);
581 ctx->out_blocked = FALSE;
582 GST_INFO_OBJECT (ctx->srcpad,
583 "Woken for new max running time %" GST_TIME_FORMAT,
584 GST_TIME_ARGS (splitmux->max_out_running_time));
588 static GstPadProbeReturn
589 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
591 GstSplitMuxSink *splitmux = ctx->splitmux;
592 MqStreamBuf *buf_info = NULL;
594 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
596 /* FIXME: Handle buffer lists, until then make it clear they won't work */
597 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
598 g_warning ("Buffer list handling not implemented");
599 return GST_PAD_PROBE_DROP;
601 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
602 GstEvent *event = gst_pad_probe_info_get_event (info);
604 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
606 switch (GST_EVENT_TYPE (event)) {
607 case GST_EVENT_SEGMENT:
608 gst_event_copy_segment (event, &ctx->out_segment);
610 case GST_EVENT_FLUSH_STOP:
611 GST_SPLITMUX_LOCK (splitmux);
612 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
613 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
614 g_queue_clear (&ctx->queued_bufs);
615 ctx->flushing = FALSE;
616 GST_SPLITMUX_UNLOCK (splitmux);
618 case GST_EVENT_FLUSH_START:
619 GST_SPLITMUX_LOCK (splitmux);
620 GST_LOG_OBJECT (pad, "Flush start");
621 ctx->flushing = TRUE;
622 GST_SPLITMUX_BROADCAST (splitmux);
623 GST_SPLITMUX_UNLOCK (splitmux);
626 GST_SPLITMUX_LOCK (splitmux);
627 if (splitmux->state == SPLITMUX_STATE_STOPPED)
630 GST_SPLITMUX_UNLOCK (splitmux);
635 gst_event_parse_gap (event, &gap_ts, NULL);
636 if (gap_ts == GST_CLOCK_TIME_NONE)
639 GST_SPLITMUX_LOCK (splitmux);
641 gap_ts = gst_segment_to_running_time (&ctx->out_segment,
642 GST_FORMAT_TIME, gap_ts);
644 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
645 GST_TIME_ARGS (gap_ts));
647 if (splitmux->state == SPLITMUX_STATE_STOPPED)
649 ctx->out_running_time = gap_ts;
650 complete_or_wait_on_out (splitmux, ctx);
651 GST_SPLITMUX_UNLOCK (splitmux);
654 case GST_EVENT_CUSTOM_DOWNSTREAM:{
655 const GstStructure *s;
658 s = gst_event_get_structure (event);
659 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
662 gst_structure_get_uint64 (s, "timestamp", &ts);
664 GST_SPLITMUX_LOCK (splitmux);
666 if (splitmux->state == SPLITMUX_STATE_STOPPED)
668 ctx->out_running_time = ts;
669 complete_or_wait_on_out (splitmux, ctx);
670 GST_SPLITMUX_UNLOCK (splitmux);
671 return GST_PAD_PROBE_DROP;
676 return GST_PAD_PROBE_PASS;
679 /* Allow everything through until the configured next stopping point */
680 GST_SPLITMUX_LOCK (splitmux);
682 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
683 if (buf_info == NULL)
684 /* Can only happen due to a poorly timed flush */
687 /* If we have popped a keyframe, decrement the queued_gop count */
688 if (buf_info->keyframe && splitmux->queued_gops > 0)
689 splitmux->queued_gops--;
691 ctx->out_running_time = buf_info->run_ts;
693 GST_LOG_OBJECT (splitmux,
694 "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
695 " size %" G_GSIZE_FORMAT,
696 pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
698 if (splitmux->opening_first_fragment) {
699 send_fragment_opened_closed_msg (splitmux, TRUE);
700 splitmux->opening_first_fragment = FALSE;
703 complete_or_wait_on_out (splitmux, ctx);
705 if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
706 splitmux->muxed_out_time < buf_info->run_ts)
707 splitmux->muxed_out_time = buf_info->run_ts;
709 splitmux->muxed_out_bytes += buf_info->buf_size;
711 #ifndef GST_DISABLE_GST_DEBUG
713 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
714 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
715 " run ts %" GST_TIME_FORMAT, buf,
716 GST_TIME_ARGS (ctx->out_running_time));
720 GST_SPLITMUX_UNLOCK (splitmux);
722 mq_stream_buf_free (buf_info);
724 return GST_PAD_PROBE_PASS;
727 GST_SPLITMUX_UNLOCK (splitmux);
728 return GST_PAD_PROBE_DROP;
732 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
734 return gst_pad_send_event (peer, gst_event_ref (*event));
738 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
740 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
742 gst_pad_sticky_events_foreach (ctx->srcpad,
743 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
745 /* Clear EOS flag if not actually EOS */
746 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
748 gst_object_unref (peer);
751 /* Called with lock held when a fragment
752 * reaches EOS and it is time to restart
756 start_next_fragment (GstSplitMuxSink * splitmux)
758 /* 1 change to new file */
759 gst_element_set_locked_state (splitmux->muxer, TRUE);
760 gst_element_set_locked_state (splitmux->active_sink, TRUE);
761 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
762 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
764 set_next_filename (splitmux);
766 gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
767 gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
768 gst_element_set_locked_state (splitmux->muxer, FALSE);
769 gst_element_set_locked_state (splitmux->active_sink, FALSE);
771 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
773 /* Switch state and go back to processing */
774 if (!splitmux->reference_ctx->in_eos) {
775 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
776 splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
778 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
779 splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
780 splitmux->have_muxed_something = FALSE;
782 splitmux->have_muxed_something =
783 (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
785 /* Store the overflow parameters as the basis for the next fragment */
786 splitmux->mux_start_time = splitmux->muxed_out_time;
787 splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
789 GST_DEBUG_OBJECT (splitmux,
790 "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
791 GST_TIME_ARGS (splitmux->max_out_running_time));
793 send_fragment_opened_closed_msg (splitmux, TRUE);
795 GST_SPLITMUX_BROADCAST (splitmux);
799 bus_handler (GstBin * bin, GstMessage * message)
801 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
803 switch (GST_MESSAGE_TYPE (message)) {
804 case GST_MESSAGE_EOS:
805 /* If the state is draining out the current file, drop this EOS */
806 GST_SPLITMUX_LOCK (splitmux);
808 send_fragment_opened_closed_msg (splitmux, FALSE);
810 if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
811 splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
812 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
813 splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
814 GST_SPLITMUX_BROADCAST (splitmux);
816 gst_message_unref (message);
817 GST_SPLITMUX_UNLOCK (splitmux);
820 GST_SPLITMUX_UNLOCK (splitmux);
826 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
829 /* Called with splitmux lock held */
830 /* Called when entering ProcessingCompleteGop state
831 * Assess if mq contents overflowed the current file
832 * -> If yes, need to switch to new file
833 * -> if no, set max_out_running_time to let this GOP in and
834 * go to COLLECTING_GOP_START state
837 handle_gathered_gop (GstSplitMuxSink * splitmux)
840 gsize queued_bytes = 0;
841 GstClockTime queued_time = 0;
843 /* Assess if the multiqueue contents overflowed the current file */
844 for (cur = g_list_first (splitmux->contexts);
845 cur != NULL; cur = g_list_next (cur)) {
846 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
847 if (tmpctx->in_running_time > queued_time)
848 queued_time = tmpctx->in_running_time;
849 queued_bytes += tmpctx->in_bytes;
852 g_assert (queued_bytes >= splitmux->mux_start_bytes);
853 g_assert (queued_time >= splitmux->mux_start_time);
855 queued_bytes -= splitmux->mux_start_bytes;
856 queued_time -= splitmux->mux_start_time;
858 /* Expand queued bytes estimate by muxer overhead */
859 queued_bytes += (queued_bytes * splitmux->mux_overhead);
861 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
862 " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
864 /* Check for overrun - have we output at least one byte and overrun
865 * either threshold? */
866 if ((splitmux->have_muxed_something &&
867 ((splitmux->threshold_bytes > 0 &&
868 queued_bytes >= splitmux->threshold_bytes) ||
869 (splitmux->threshold_time > 0 &&
870 queued_time >= splitmux->threshold_time)))) {
872 splitmux->state = SPLITMUX_STATE_ENDING_FILE;
874 GST_INFO_OBJECT (splitmux,
875 "mq overflowed since last, draining out. max out TS is %"
876 GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
877 GST_SPLITMUX_BROADCAST (splitmux);
881 GST_LOG_OBJECT (splitmux,
882 "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
883 " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
884 splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
885 queued_bytes, GST_TIME_ARGS (queued_time));
887 /* Wake everyone up to push this one GOP, then sleep */
888 splitmux->have_muxed_something = TRUE;
890 if (!splitmux->reference_ctx->in_eos) {
891 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
892 splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
894 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
895 splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
898 GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
899 GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
900 GST_SPLITMUX_BROADCAST (splitmux);
905 /* Called with splitmux lock held */
906 /* Called from each input pad when it is has all the pieces
907 * for a GOP or EOS, starting with the reference pad which has set the
908 * splitmux->max_in_running_time
911 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
914 gboolean ready = TRUE;
915 GstClockTime current_max_in_running_time;
917 if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
918 /* Iterate each pad, and check that the input running time is at least
919 * up to the reference running time, and if so handle the collected GOP */
920 GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
921 GST_TIME_FORMAT " ctx %p",
922 GST_TIME_ARGS (splitmux->max_in_running_time), ctx);
923 for (cur = g_list_first (splitmux->contexts); cur != NULL;
924 cur = g_list_next (cur)) {
925 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
927 GST_LOG_OBJECT (splitmux,
928 "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
929 " EOS %d", tmpctx, tmpctx->srcpad,
930 GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
932 if (splitmux->max_in_running_time != GST_CLOCK_TIME_NONE &&
933 tmpctx->in_running_time < splitmux->max_in_running_time &&
935 GST_LOG_OBJECT (splitmux,
936 "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
937 tmpctx, tmpctx->srcpad);
943 GST_DEBUG_OBJECT (splitmux,
944 "Collected GOP is complete. Processing (ctx %p)", ctx);
945 /* All pads have a complete GOP, release it into the multiqueue */
946 handle_gathered_gop (splitmux);
950 /* If upstream reached EOS we are not expecting more data, no need to wait
955 /* Some pad is not yet ready, or GOP is being pushed
956 * either way, sleep and wait to get woken */
957 current_max_in_running_time = splitmux->max_in_running_time;
958 while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
959 splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
961 (current_max_in_running_time == splitmux->max_in_running_time)) {
963 GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
964 splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
965 "GOP complete" : "EOF draining", ctx);
966 GST_SPLITMUX_WAIT (splitmux);
968 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
973 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
976 guint cur_len = g_queue_get_length (&ctx->queued_bufs);
978 GST_DEBUG_OBJECT (ctx->sinkpad,
979 "Checking queue length len %u cur_max %u queued gops %u",
980 cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
982 if (cur_len >= splitmux->mq_max_buffers) {
983 gboolean allow_grow = FALSE;
985 /* If collecting a GOP and this pad might block,
986 * and there isn't already a pending GOP in the queue
989 if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
990 ctx->in_running_time < splitmux->max_in_running_time &&
991 splitmux->queued_gops <= 1) {
993 } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
994 ctx->is_reference && splitmux->queued_gops <= 1) {
999 for (cur = g_list_first (splitmux->contexts);
1000 cur != NULL; cur = g_list_next (cur)) {
1001 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1002 GST_DEBUG_OBJECT (tmpctx->sinkpad,
1003 " len %u out_blocked %d",
1004 g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
1005 /* If another stream is starving, grow */
1006 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1013 splitmux->mq_max_buffers = cur_len + 1;
1015 GST_INFO_OBJECT (splitmux,
1016 "Multiqueue overrun - enlarging to %u buffers ctx %p",
1017 splitmux->mq_max_buffers, ctx);
1019 g_object_set (splitmux->mq, "max-size-buffers",
1020 splitmux->mq_max_buffers, NULL);
1025 static GstPadProbeReturn
1026 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1028 GstSplitMuxSink *splitmux = ctx->splitmux;
1030 MqStreamBuf *buf_info = NULL;
1032 gboolean loop_again;
1033 gboolean keyframe = FALSE;
1035 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1037 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1038 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1039 g_warning ("Buffer list handling not implemented");
1040 return GST_PAD_PROBE_DROP;
1042 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1043 GstEvent *event = gst_pad_probe_info_get_event (info);
1044 switch (GST_EVENT_TYPE (event)) {
1045 case GST_EVENT_SEGMENT:
1046 gst_event_copy_segment (event, &ctx->in_segment);
1048 case GST_EVENT_FLUSH_STOP:
1049 GST_SPLITMUX_LOCK (splitmux);
1050 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1051 ctx->in_eos = FALSE;
1053 ctx->in_running_time = 0;
1054 GST_SPLITMUX_UNLOCK (splitmux);
1057 GST_SPLITMUX_LOCK (splitmux);
1060 if (splitmux->state == SPLITMUX_STATE_STOPPED)
1063 if (ctx->is_reference) {
1064 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1065 /* Act as if this is a new keyframe with infinite timestamp */
1066 splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
1067 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1068 /* Wake up other input pads to collect this GOP */
1069 GST_SPLITMUX_BROADCAST (splitmux);
1070 check_completed_gop (splitmux, ctx);
1071 } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1072 /* If we are waiting for a GOP to be completed (ie, for aux
1073 * pads to catch up), then this pad is complete, so check
1074 * if the whole GOP is.
1076 check_completed_gop (splitmux, ctx);
1078 GST_SPLITMUX_UNLOCK (splitmux);
1083 return GST_PAD_PROBE_PASS;
1086 buf = gst_pad_probe_info_get_buffer (info);
1087 buf_info = mq_stream_buf_new ();
1089 if (GST_BUFFER_PTS_IS_VALID (buf))
1090 ts = GST_BUFFER_PTS (buf);
1092 ts = GST_BUFFER_DTS (buf);
1094 GST_SPLITMUX_LOCK (splitmux);
1096 if (splitmux->state == SPLITMUX_STATE_STOPPED)
1099 /* If this buffer has a timestamp, advance the input timestamp of the
1101 if (GST_CLOCK_TIME_IS_VALID (ts)) {
1102 GstClockTime running_time =
1103 gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1104 GST_BUFFER_TIMESTAMP (buf));
1106 if (GST_CLOCK_TIME_IS_VALID (running_time) &&
1107 (ctx->in_running_time == GST_CLOCK_TIME_NONE
1108 || running_time > ctx->in_running_time))
1109 ctx->in_running_time = running_time;
1112 /* Try to make sure we have a valid running time */
1113 if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
1114 ctx->in_running_time =
1115 gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1116 ctx->in_segment.start);
1119 buf_info->run_ts = ctx->in_running_time;
1120 buf_info->buf_size = gst_buffer_get_size (buf);
1122 /* Update total input byte counter for overflow detect */
1123 ctx->in_bytes += buf_info->buf_size;
1125 /* initialize mux_start_time */
1126 if (ctx->is_reference && splitmux->mux_start_time == 0)
1127 splitmux->mux_start_time = buf_info->run_ts;
1129 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
1130 " total in_bytes %" G_GSIZE_FORMAT,
1131 GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1138 switch (splitmux->state) {
1139 case SPLITMUX_STATE_COLLECTING_GOP_START:
1140 if (ctx->is_reference) {
1141 /* If a keyframe, we have a complete GOP */
1142 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1143 !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
1144 splitmux->max_in_running_time >= ctx->in_running_time) {
1145 /* Pass this buffer through */
1149 GST_INFO_OBJECT (pad,
1150 "Have keyframe with running time %" GST_TIME_FORMAT,
1151 GST_TIME_ARGS (ctx->in_running_time));
1153 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1154 splitmux->max_in_running_time = ctx->in_running_time;
1155 /* Wake up other input pads to collect this GOP */
1156 GST_SPLITMUX_BROADCAST (splitmux);
1157 check_completed_gop (splitmux, ctx);
1159 /* We're still waiting for a keyframe on the reference pad, sleep */
1160 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1161 GST_SPLITMUX_WAIT (splitmux);
1162 GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1166 case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1167 /* After a GOP start is found, this buffer might complete the GOP */
1168 /* If we overran the target timestamp, it might be time to process
1169 * the GOP, otherwise bail out for more data
1171 GST_LOG_OBJECT (pad,
1172 "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
1173 GST_TIME_ARGS (ctx->in_running_time),
1174 GST_TIME_ARGS (splitmux->max_in_running_time));
1176 if (ctx->in_running_time < splitmux->max_in_running_time) {
1181 GST_LOG_OBJECT (pad,
1182 "Collected last packet of GOP. Checking other pads");
1183 check_completed_gop (splitmux, ctx);
1185 case SPLITMUX_STATE_ENDING_FILE:{
1188 /* If somes streams received no buffer during the last GOP that overran,
1189 * because its next buffer has a timestamp bigger than
1190 * ctx->max_in_running_time, its queue is empty. In that case the only
1191 * way to wakeup the output thread is by injecting an event in the
1192 * queue. This usually happen with subtitle streams.
1193 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1194 GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1195 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1196 GST_EVENT_TYPE_SERIALIZED,
1197 gst_structure_new ("splitmuxsink-unblock", "timestamp",
1198 G_TYPE_UINT64, splitmux->max_in_running_time, NULL));
1200 GST_SPLITMUX_UNLOCK (splitmux);
1201 gst_pad_send_event (ctx->sinkpad, event);
1202 GST_SPLITMUX_LOCK (splitmux);
1205 case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1206 /* A fragment is ending, wait until that's done before continuing */
1207 GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1208 GST_SPLITMUX_WAIT (splitmux);
1209 GST_DEBUG_OBJECT (pad,
1210 "Done sleeping for fragment restart state now %d", splitmux->state);
1216 } while (loop_again);
1219 splitmux->queued_gops++;
1220 buf_info->keyframe = TRUE;
1223 /* Now add this buffer to the queue just before returning */
1224 g_queue_push_head (&ctx->queued_bufs, buf_info);
1226 /* Check the buffer will fit in the mq */
1227 check_queue_length (splitmux, ctx);
1229 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1230 " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
1232 GST_SPLITMUX_UNLOCK (splitmux);
1233 return GST_PAD_PROBE_PASS;
1236 GST_SPLITMUX_UNLOCK (splitmux);
1238 mq_stream_buf_free (buf_info);
1239 return GST_PAD_PROBE_PASS;
1243 gst_splitmux_sink_request_new_pad (GstElement * element,
1244 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1246 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1247 GstPadTemplate *mux_template = NULL;
1249 GstPad *mq_sink, *mq_src;
1251 gboolean is_video = FALSE;
1254 GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1256 GST_SPLITMUX_LOCK (splitmux);
1257 if (!create_elements (splitmux))
1260 if (templ->name_template) {
1261 if (g_str_equal (templ->name_template, "video")) {
1262 /* FIXME: Look for a pad template with matching caps, rather than by name */
1264 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1265 (splitmux->muxer), "video_%u");
1270 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1271 (splitmux->muxer), templ->name_template);
1273 if (mux_template == NULL) {
1274 /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1276 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1277 (splitmux->muxer), "sink_%d");
1281 res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1286 gname = g_strdup ("video");
1287 else if (name == NULL)
1288 gname = gst_pad_get_name (res);
1290 gname = g_strdup (name);
1292 if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1293 gst_element_release_request_pad (splitmux->muxer, res);
1294 gst_object_unref (GST_OBJECT (res));
1298 if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1299 gst_element_release_request_pad (splitmux->muxer, res);
1300 gst_object_unref (GST_OBJECT (res));
1301 gst_element_release_request_pad (splitmux->mq, mq_sink);
1302 gst_object_unref (GST_OBJECT (mq_sink));
1306 gst_object_unref (GST_OBJECT (res));
1308 ctx = mq_stream_ctx_new (splitmux);
1309 ctx->srcpad = mq_src;
1310 ctx->sinkpad = mq_sink;
1312 mq_stream_ctx_ref (ctx);
1313 ctx->src_pad_block_id =
1314 gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1315 (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1316 _pad_block_destroy_src_notify);
1317 if (is_video && splitmux->reference_ctx != NULL) {
1318 splitmux->reference_ctx->is_reference = FALSE;
1319 splitmux->reference_ctx = NULL;
1321 if (splitmux->reference_ctx == NULL) {
1322 splitmux->reference_ctx = ctx;
1323 ctx->is_reference = TRUE;
1326 res = gst_ghost_pad_new (gname, mq_sink);
1327 g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1329 mq_stream_ctx_ref (ctx);
1330 ctx->sink_pad_block_id =
1331 gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1332 (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1333 _pad_block_destroy_sink_notify);
1335 GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1336 " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1338 splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1342 gst_object_unref (mq_sink);
1343 gst_object_unref (mq_src);
1345 gst_pad_set_active (res, TRUE);
1346 gst_element_add_pad (element, res);
1347 GST_SPLITMUX_UNLOCK (splitmux);
1351 GST_SPLITMUX_UNLOCK (splitmux);
1356 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1358 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1359 GstPad *mqsink, *mqsrc, *muxpad;
1361 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1363 GST_SPLITMUX_LOCK (splitmux);
1365 if (splitmux->muxer == NULL || splitmux->mq == NULL)
1366 goto fail; /* Elements don't exist yet - nothing to release */
1368 GST_INFO_OBJECT (pad, "releasing request pad");
1370 mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1371 mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1372 muxpad = gst_pad_get_peer (mqsrc);
1374 /* Remove the context from our consideration */
1375 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1377 if (ctx->sink_pad_block_id)
1378 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1380 if (ctx->src_pad_block_id)
1381 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1383 /* Can release the context now */
1384 mq_stream_ctx_unref (ctx);
1386 /* Release and free the mq input */
1387 gst_element_release_request_pad (splitmux->mq, mqsink);
1389 /* Release and free the muxer input */
1390 gst_element_release_request_pad (splitmux->muxer, muxpad);
1392 gst_object_unref (mqsink);
1393 gst_object_unref (mqsrc);
1394 gst_object_unref (muxpad);
1396 gst_element_remove_pad (element, pad);
1398 /* Reset the internal elements only after all request pads are released */
1399 if (splitmux->contexts == NULL)
1400 gst_splitmux_reset (splitmux);
1403 GST_SPLITMUX_UNLOCK (splitmux);
1407 create_element (GstSplitMuxSink * splitmux,
1408 const gchar * factory, const gchar * name)
1410 GstElement *ret = gst_element_factory_make (factory, name);
1412 g_warning ("Failed to create %s - splitmuxsink will not work", name);
1416 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1417 g_warning ("Could not add %s element - splitmuxsink will not work", name);
1418 gst_object_unref (ret);
1426 create_elements (GstSplitMuxSink * splitmux)
1428 /* Create internal elements */
1429 if (splitmux->mq == NULL) {
1431 create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1434 splitmux->mq_max_buffers = 5;
1435 /* No bytes or time limit, we limit buffers manually */
1436 g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1437 (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1440 if (splitmux->muxer == NULL) {
1441 GstElement *provided_muxer = NULL;
1443 GST_OBJECT_LOCK (splitmux);
1444 if (splitmux->provided_muxer != NULL)
1445 provided_muxer = gst_object_ref (splitmux->provided_muxer);
1446 GST_OBJECT_UNLOCK (splitmux);
1448 if (provided_muxer == NULL) {
1449 if ((splitmux->muxer =
1450 create_element (splitmux, "mp4mux", "muxer")) == NULL)
1453 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1454 g_warning ("Could not add muxer element - splitmuxsink will not work");
1455 gst_object_unref (provided_muxer);
1459 splitmux->muxer = provided_muxer;
1460 gst_object_unref (provided_muxer);
1470 find_sink (GstElement * e)
1472 GstElement *res = NULL;
1474 gboolean done = FALSE;
1475 GValue data = { 0, };
1477 if (!GST_IS_BIN (e))
1480 iter = gst_bin_iterate_sinks (GST_BIN (e));
1482 switch (gst_iterator_next (iter, &data)) {
1483 case GST_ITERATOR_OK:
1485 GstElement *child = g_value_get_object (&data);
1486 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1487 "location") != NULL) {
1491 g_value_reset (&data);
1494 case GST_ITERATOR_RESYNC:
1495 gst_iterator_resync (iter);
1497 case GST_ITERATOR_DONE:
1500 case GST_ITERATOR_ERROR:
1501 g_assert_not_reached ();
1505 g_value_unset (&data);
1506 gst_iterator_free (iter);
1512 create_sink (GstSplitMuxSink * splitmux)
1514 GstElement *provided_sink = NULL;
1516 if (splitmux->active_sink == NULL) {
1518 GST_OBJECT_LOCK (splitmux);
1519 if (splitmux->provided_sink != NULL)
1520 provided_sink = gst_object_ref (splitmux->provided_sink);
1521 GST_OBJECT_UNLOCK (splitmux);
1523 if (provided_sink == NULL) {
1524 if ((splitmux->sink =
1525 create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1527 splitmux->active_sink = splitmux->sink;
1529 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1530 g_warning ("Could not add sink elements - splitmuxsink will not work");
1531 gst_object_unref (provided_sink);
1535 splitmux->active_sink = provided_sink;
1537 /* The bin holds a ref now, we can drop our tmp ref */
1538 gst_object_unref (provided_sink);
1540 /* Find the sink element */
1541 splitmux->sink = find_sink (splitmux->active_sink);
1542 if (splitmux->sink == NULL) {
1544 ("Could not locate sink element in provided sink - splitmuxsink will not work");
1549 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1550 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1561 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1564 set_next_filename (GstSplitMuxSink * splitmux)
1566 gchar *fname = NULL;
1567 gst_splitmux_sink_ensure_max_files (splitmux);
1569 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1570 splitmux->fragment_id, &fname);
1573 fname = splitmux->location ?
1574 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1577 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1578 g_object_set (splitmux->sink, "location", fname, NULL);
1581 splitmux->fragment_id++;
1585 static GstStateChangeReturn
1586 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1588 GstStateChangeReturn ret;
1589 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1591 switch (transition) {
1592 case GST_STATE_CHANGE_NULL_TO_READY:{
1593 GST_SPLITMUX_LOCK (splitmux);
1594 if (!create_elements (splitmux) || !create_sink (splitmux)) {
1595 ret = GST_STATE_CHANGE_FAILURE;
1596 GST_SPLITMUX_UNLOCK (splitmux);
1599 GST_SPLITMUX_UNLOCK (splitmux);
1600 splitmux->fragment_id = 0;
1601 set_next_filename (splitmux);
1604 case GST_STATE_CHANGE_READY_TO_PAUSED:{
1605 GST_SPLITMUX_LOCK (splitmux);
1606 /* Start by collecting one input on each pad */
1607 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1608 splitmux->max_in_running_time = 0;
1609 splitmux->muxed_out_time = splitmux->mux_start_time = 0;
1610 splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1611 splitmux->opening_first_fragment = TRUE;
1612 GST_SPLITMUX_UNLOCK (splitmux);
1615 case GST_STATE_CHANGE_PAUSED_TO_READY:
1616 case GST_STATE_CHANGE_READY_TO_NULL:
1617 GST_SPLITMUX_LOCK (splitmux);
1618 splitmux->state = SPLITMUX_STATE_STOPPED;
1619 /* Wake up any blocked threads */
1620 GST_LOG_OBJECT (splitmux,
1621 "State change -> NULL or READY. Waking threads");
1622 GST_SPLITMUX_BROADCAST (splitmux);
1623 GST_SPLITMUX_UNLOCK (splitmux);
1629 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1630 if (ret == GST_STATE_CHANGE_FAILURE)
1633 switch (transition) {
1634 case GST_STATE_CHANGE_READY_TO_NULL:
1635 GST_SPLITMUX_LOCK (splitmux);
1636 splitmux->fragment_id = 0;
1637 /* Reset internal elements only if no pad contexts are using them */
1638 if (splitmux->contexts == NULL)
1639 gst_splitmux_reset (splitmux);
1640 GST_SPLITMUX_UNLOCK (splitmux);
1648 if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1649 ret == GST_STATE_CHANGE_FAILURE) {
1650 /* Cleanup elements on failed transition out of NULL */
1651 gst_splitmux_reset (splitmux);
1657 register_splitmuxsink (GstPlugin * plugin)
1659 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1660 "Split File Muxing Sink");
1662 return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1663 GST_TYPE_SPLITMUX_SINK);
1667 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
1669 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
1670 splitmux->fragment_id = 0;