1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @short_description: Muxer wrapper for splitting output stream by size or time
24 * This element wraps a muxer and a sink, and starts a new file when the mux
25 * contents are about to cross a threshold of maximum size of maximum time,
26 * splitting at video keyframe boundaries. Exactly one input video stream
27 * can be muxed, with as many accompanying audio and subtitle streams as
30 * By default, it uses mp4mux and filesink, but they can be changed via
31 * the 'muxer' and 'sink' properties.
33 * The minimum file size is 1 GOP, however - so limits may be overrun if the
34 * distance between any 2 keyframes is larger than the limits.
36 * If a video stream is available, the splitting process is driven by the video
37 * stream contents, and the video stream must contain closed GOPs for the output
38 * file parts to be played individually correctly. In the absence of a video
39 * stream, the first available stream is used as reference for synchronization.
42 * <title>Example pipelines</title>
44 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
46 * Records a video stream captured from a v4l2 device and muxes it into
47 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48 * and 1MB maximum size.
57 #include <glib/gstdio.h>
58 #include "gstsplitmuxsink.h"
60 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
61 #define GST_CAT_DEFAULT splitmux_debug
63 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
64 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
65 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
66 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
80 #define DEFAULT_MAX_SIZE_TIME 0
81 #define DEFAULT_MAX_SIZE_BYTES 0
82 #define DEFAULT_MAX_FILES 0
83 #define DEFAULT_MUXER_OVERHEAD 0.02
84 #define DEFAULT_MUXER "mp4mux"
85 #define DEFAULT_SINK "filesink"
89 SIGNAL_FORMAT_LOCATION,
93 static guint signals[SIGNAL_LAST];
95 static GstStaticPadTemplate video_sink_template =
96 GST_STATIC_PAD_TEMPLATE ("video",
100 static GstStaticPadTemplate audio_sink_template =
101 GST_STATIC_PAD_TEMPLATE ("audio_%u",
104 GST_STATIC_CAPS_ANY);
105 static GstStaticPadTemplate subtitle_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
109 GST_STATIC_CAPS_ANY);
111 static GQuark PAD_CONTEXT;
116 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
119 #define gst_splitmux_sink_parent_class parent_class
120 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
123 static gboolean create_elements (GstSplitMuxSink * splitmux);
124 static gboolean create_sink (GstSplitMuxSink * splitmux);
125 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
126 const GValue * value, GParamSpec * pspec);
127 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
128 GValue * value, GParamSpec * pspec);
129 static void gst_splitmux_sink_dispose (GObject * object);
130 static void gst_splitmux_sink_finalize (GObject * object);
132 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
133 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
134 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
136 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
137 element, GstStateChange transition);
139 static void bus_handler (GstBin * bin, GstMessage * msg);
140 static void set_next_filename (GstSplitMuxSink * splitmux);
141 static void start_next_fragment (GstSplitMuxSink * splitmux);
142 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
143 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
145 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
148 mq_stream_buf_new (void)
150 return g_slice_new0 (MqStreamBuf);
154 mq_stream_buf_free (MqStreamBuf * data)
156 g_slice_free (MqStreamBuf, data);
160 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
162 GObjectClass *gobject_class = (GObjectClass *) klass;
163 GstElementClass *gstelement_class = (GstElementClass *) klass;
164 GstBinClass *gstbin_class = (GstBinClass *) klass;
166 gobject_class->set_property = gst_splitmux_sink_set_property;
167 gobject_class->get_property = gst_splitmux_sink_get_property;
168 gobject_class->dispose = gst_splitmux_sink_dispose;
169 gobject_class->finalize = gst_splitmux_sink_finalize;
171 gst_element_class_set_static_metadata (gstelement_class,
172 "Split Muxing Bin", "Generic/Bin/Muxer",
173 "Convenience bin that muxes incoming streams into multiple time/size limited files",
174 "Jan Schmidt <jan@centricular.com>");
176 gst_element_class_add_static_pad_template (gstelement_class,
177 &video_sink_template);
178 gst_element_class_add_static_pad_template (gstelement_class,
179 &audio_sink_template);
180 gst_element_class_add_static_pad_template (gstelement_class,
181 &subtitle_sink_template);
183 gstelement_class->change_state =
184 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
185 gstelement_class->request_new_pad =
186 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
187 gstelement_class->release_pad =
188 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
190 gstbin_class->handle_message = bus_handler;
192 g_object_class_install_property (gobject_class, PROP_LOCATION,
193 g_param_spec_string ("location", "File Output Pattern",
194 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
195 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
197 g_param_spec_double ("mux-overhead", "Muxing Overhead",
198 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
199 DEFAULT_MUXER_OVERHEAD,
200 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
202 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
203 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
205 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
207 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
208 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
209 DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
211 g_param_spec_uint ("max-files", "Max files",
212 "Maximum number of files to keep on disk. Once the maximum is reached,"
213 "old files start to be deleted to make room for new ones.",
214 0, G_MAXUINT, DEFAULT_MAX_FILES,
215 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
218 g_object_class_install_property (gobject_class, PROP_MUXER,
219 g_param_spec_object ("muxer", "Muxer",
220 "The muxer element to use (NULL = default mp4mux)",
221 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222 g_object_class_install_property (gobject_class, PROP_SINK,
223 g_param_spec_object ("sink", "Sink",
224 "The sink element (or element chain) to use (NULL = default filesink)",
225 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
228 * GstSplitMuxSink::format-location:
229 * @splitmux: the #GstSplitMuxSink
230 * @fragment_id: the sequence number of the file to be created
232 * Returns: the location to be used for the next output file
234 signals[SIGNAL_FORMAT_LOCATION] =
235 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
236 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
240 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
242 g_mutex_init (&splitmux->lock);
243 g_cond_init (&splitmux->data_cond);
245 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
246 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
247 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
248 splitmux->max_files = DEFAULT_MAX_FILES;
250 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
254 gst_splitmux_reset (GstSplitMuxSink * splitmux)
257 gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
259 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
260 if (splitmux->active_sink)
261 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
263 splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
268 gst_splitmux_sink_dispose (GObject * object)
270 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
272 G_OBJECT_CLASS (parent_class)->dispose (object);
274 /* Calling parent dispose invalidates all child pointers */
275 splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
280 gst_splitmux_sink_finalize (GObject * object)
282 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
283 g_cond_clear (&splitmux->data_cond);
284 g_mutex_clear (&splitmux->lock);
285 if (splitmux->provided_sink)
286 gst_object_unref (splitmux->provided_sink);
287 if (splitmux->provided_muxer)
288 gst_object_unref (splitmux->provided_muxer);
290 g_free (splitmux->location);
292 /* Make sure to free any un-released contexts */
293 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
294 g_list_free (splitmux->contexts);
296 G_OBJECT_CLASS (parent_class)->finalize (object);
300 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
301 const GValue * value, GParamSpec * pspec)
303 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
307 GST_OBJECT_LOCK (splitmux);
308 g_free (splitmux->location);
309 splitmux->location = g_value_dup_string (value);
310 GST_OBJECT_UNLOCK (splitmux);
313 case PROP_MAX_SIZE_BYTES:
314 GST_OBJECT_LOCK (splitmux);
315 splitmux->threshold_bytes = g_value_get_uint64 (value);
316 GST_OBJECT_UNLOCK (splitmux);
318 case PROP_MAX_SIZE_TIME:
319 GST_OBJECT_LOCK (splitmux);
320 splitmux->threshold_time = g_value_get_uint64 (value);
321 GST_OBJECT_UNLOCK (splitmux);
324 GST_OBJECT_LOCK (splitmux);
325 splitmux->max_files = g_value_get_uint (value);
326 GST_OBJECT_UNLOCK (splitmux);
328 case PROP_MUXER_OVERHEAD:
329 GST_OBJECT_LOCK (splitmux);
330 splitmux->mux_overhead = g_value_get_double (value);
331 GST_OBJECT_UNLOCK (splitmux);
334 GST_OBJECT_LOCK (splitmux);
335 if (splitmux->provided_sink)
336 gst_object_unref (splitmux->provided_sink);
337 splitmux->provided_sink = g_value_dup_object (value);
338 GST_OBJECT_UNLOCK (splitmux);
341 GST_OBJECT_LOCK (splitmux);
342 if (splitmux->provided_muxer)
343 gst_object_unref (splitmux->provided_muxer);
344 splitmux->provided_muxer = g_value_dup_object (value);
345 GST_OBJECT_UNLOCK (splitmux);
348 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
354 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
355 GValue * value, GParamSpec * pspec)
357 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
361 GST_OBJECT_LOCK (splitmux);
362 g_value_set_string (value, splitmux->location);
363 GST_OBJECT_UNLOCK (splitmux);
365 case PROP_MAX_SIZE_BYTES:
366 GST_OBJECT_LOCK (splitmux);
367 g_value_set_uint64 (value, splitmux->threshold_bytes);
368 GST_OBJECT_UNLOCK (splitmux);
370 case PROP_MAX_SIZE_TIME:
371 GST_OBJECT_LOCK (splitmux);
372 g_value_set_uint64 (value, splitmux->threshold_time);
373 GST_OBJECT_UNLOCK (splitmux);
376 GST_OBJECT_LOCK (splitmux);
377 g_value_set_uint (value, splitmux->max_files);
378 GST_OBJECT_UNLOCK (splitmux);
380 case PROP_MUXER_OVERHEAD:
381 GST_OBJECT_LOCK (splitmux);
382 g_value_set_double (value, splitmux->mux_overhead);
383 GST_OBJECT_UNLOCK (splitmux);
386 GST_OBJECT_LOCK (splitmux);
387 g_value_set_object (value, splitmux->provided_sink);
388 GST_OBJECT_UNLOCK (splitmux);
391 GST_OBJECT_LOCK (splitmux);
392 g_value_set_object (value, splitmux->provided_muxer);
393 GST_OBJECT_UNLOCK (splitmux);
396 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
402 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
404 gchar *tmp, *sinkname, *srcname;
407 sinkname = gst_pad_get_name (sink_pad);
409 srcname = g_strdup_printf ("src_%s", tmp);
411 mq_src = gst_element_get_static_pad (mq, srcname);
420 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
426 /* Request a pad from multiqueue, then connect this one, then
427 * discover the corresponding output pad and return both */
428 mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
432 mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
442 gst_element_release_request_pad (splitmux->mq, mq_sink);
447 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
451 ctx = g_new0 (MqStreamCtx, 1);
452 g_atomic_int_set (&ctx->refcount, 1);
453 ctx->splitmux = splitmux;
454 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
455 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
456 ctx->in_running_time = ctx->out_running_time = 0;
457 g_queue_init (&ctx->queued_bufs);
462 mq_stream_ctx_free (MqStreamCtx * ctx)
464 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
465 g_queue_clear (&ctx->queued_bufs);
470 mq_stream_ctx_unref (MqStreamCtx * ctx)
472 if (g_atomic_int_dec_and_test (&ctx->refcount))
473 mq_stream_ctx_free (ctx);
477 mq_stream_ctx_ref (MqStreamCtx * ctx)
479 g_atomic_int_inc (&ctx->refcount);
483 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
485 ctx->sink_pad_block_id = 0;
486 mq_stream_ctx_unref (ctx);
490 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
492 ctx->src_pad_block_id = 0;
493 mq_stream_ctx_unref (ctx);
497 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
499 gchar *location = NULL;
501 const gchar *msg_name = opened ?
502 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
504 g_object_get (splitmux->sink, "location", &location, NULL);
506 msg = gst_message_new_element (GST_OBJECT (splitmux),
507 gst_structure_new (msg_name,
508 "location", G_TYPE_STRING, location,
509 "running-time", GST_TYPE_CLOCK_TIME,
510 splitmux->reference_ctx->out_running_time, NULL));
511 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
516 /* Called with lock held, drops the lock to send EOS to the
520 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
525 eos = gst_event_new_eos ();
526 pad = gst_pad_get_peer (ctx->srcpad);
530 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
531 GST_SPLITMUX_UNLOCK (splitmux);
532 gst_pad_send_event (pad, eos);
533 GST_SPLITMUX_LOCK (splitmux);
535 gst_object_unref (pad);
538 /* Called with splitmux lock held to check if this output
539 * context needs to sleep to wait for the release of the
540 * next GOP, or to send EOS to close out the current file
543 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
547 GST_LOG_OBJECT (ctx->srcpad,
548 "Checking running time %" GST_TIME_FORMAT " against max %"
549 GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
550 GST_TIME_ARGS (splitmux->max_out_running_time));
552 if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
553 ctx->out_running_time < splitmux->max_out_running_time) {
554 splitmux->have_muxed_something = TRUE;
558 if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
561 if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
562 if (ctx->out_eos == FALSE) {
563 send_eos (splitmux, ctx);
566 } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
567 start_next_fragment (splitmux);
571 GST_INFO_OBJECT (ctx->srcpad,
572 "Sleeping for running time %"
573 GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
574 GST_TIME_ARGS (ctx->out_running_time),
575 GST_TIME_ARGS (splitmux->max_out_running_time));
576 ctx->out_blocked = TRUE;
577 /* Expand the mq if needed before sleeping */
578 check_queue_length (splitmux, ctx);
579 GST_SPLITMUX_WAIT (splitmux);
580 ctx->out_blocked = FALSE;
581 GST_INFO_OBJECT (ctx->srcpad,
582 "Woken for new max running time %" GST_TIME_FORMAT,
583 GST_TIME_ARGS (splitmux->max_out_running_time));
587 static GstPadProbeReturn
588 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
590 GstSplitMuxSink *splitmux = ctx->splitmux;
591 MqStreamBuf *buf_info = NULL;
593 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
595 /* FIXME: Handle buffer lists, until then make it clear they won't work */
596 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
597 g_warning ("Buffer list handling not implemented");
598 return GST_PAD_PROBE_DROP;
600 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
601 GstEvent *event = gst_pad_probe_info_get_event (info);
603 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
605 switch (GST_EVENT_TYPE (event)) {
606 case GST_EVENT_SEGMENT:
607 gst_event_copy_segment (event, &ctx->out_segment);
609 case GST_EVENT_FLUSH_STOP:
610 GST_SPLITMUX_LOCK (splitmux);
611 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
612 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
613 g_queue_clear (&ctx->queued_bufs);
614 ctx->flushing = FALSE;
615 GST_SPLITMUX_UNLOCK (splitmux);
617 case GST_EVENT_FLUSH_START:
618 GST_SPLITMUX_LOCK (splitmux);
619 GST_LOG_OBJECT (pad, "Flush start");
620 ctx->flushing = TRUE;
621 GST_SPLITMUX_BROADCAST (splitmux);
622 GST_SPLITMUX_UNLOCK (splitmux);
625 GST_SPLITMUX_LOCK (splitmux);
626 if (splitmux->state == SPLITMUX_STATE_STOPPED)
629 GST_SPLITMUX_UNLOCK (splitmux);
634 gst_event_parse_gap (event, &gap_ts, NULL);
635 if (gap_ts == GST_CLOCK_TIME_NONE)
638 GST_SPLITMUX_LOCK (splitmux);
640 gap_ts = gst_segment_to_running_time (&ctx->out_segment,
641 GST_FORMAT_TIME, gap_ts);
643 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
644 GST_TIME_ARGS (gap_ts));
646 if (splitmux->state == SPLITMUX_STATE_STOPPED)
648 ctx->out_running_time = gap_ts;
649 complete_or_wait_on_out (splitmux, ctx);
650 GST_SPLITMUX_UNLOCK (splitmux);
653 case GST_EVENT_CUSTOM_DOWNSTREAM:{
654 const GstStructure *s;
657 s = gst_event_get_structure (event);
658 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
661 gst_structure_get_uint64 (s, "timestamp", &ts);
663 GST_SPLITMUX_LOCK (splitmux);
665 if (splitmux->state == SPLITMUX_STATE_STOPPED)
667 ctx->out_running_time = ts;
668 complete_or_wait_on_out (splitmux, ctx);
669 GST_SPLITMUX_UNLOCK (splitmux);
670 return GST_PAD_PROBE_DROP;
675 return GST_PAD_PROBE_PASS;
678 /* Allow everything through until the configured next stopping point */
679 GST_SPLITMUX_LOCK (splitmux);
681 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
682 if (buf_info == NULL)
683 /* Can only happen due to a poorly timed flush */
686 /* If we have popped a keyframe, decrement the queued_gop count */
687 if (buf_info->keyframe && splitmux->queued_gops > 0)
688 splitmux->queued_gops--;
690 ctx->out_running_time = buf_info->run_ts;
692 GST_LOG_OBJECT (splitmux,
693 "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
694 " size %" G_GSIZE_FORMAT,
695 pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
697 if (splitmux->opening_first_fragment) {
698 send_fragment_opened_closed_msg (splitmux, TRUE);
699 splitmux->opening_first_fragment = FALSE;
702 complete_or_wait_on_out (splitmux, ctx);
704 if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
705 splitmux->muxed_out_time < buf_info->run_ts)
706 splitmux->muxed_out_time = buf_info->run_ts;
708 splitmux->muxed_out_bytes += buf_info->buf_size;
710 #ifndef GST_DISABLE_GST_DEBUG
712 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
713 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
714 " run ts %" GST_TIME_FORMAT, buf,
715 GST_TIME_ARGS (ctx->out_running_time));
719 GST_SPLITMUX_UNLOCK (splitmux);
721 mq_stream_buf_free (buf_info);
723 return GST_PAD_PROBE_PASS;
726 GST_SPLITMUX_UNLOCK (splitmux);
727 return GST_PAD_PROBE_DROP;
731 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
733 return gst_pad_send_event (peer, gst_event_ref (*event));
737 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
739 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
741 gst_pad_sticky_events_foreach (ctx->srcpad,
742 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
744 /* Clear EOS flag if not actually EOS */
745 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
747 gst_object_unref (peer);
750 /* Called with lock held when a fragment
751 * reaches EOS and it is time to restart
755 start_next_fragment (GstSplitMuxSink * splitmux)
757 /* 1 change to new file */
758 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
759 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
761 set_next_filename (splitmux);
763 gst_element_sync_state_with_parent (splitmux->active_sink);
764 gst_element_sync_state_with_parent (splitmux->muxer);
766 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
768 /* Switch state and go back to processing */
769 if (!splitmux->reference_ctx->in_eos) {
770 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
771 splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
773 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
774 splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
775 splitmux->have_muxed_something = FALSE;
777 splitmux->have_muxed_something =
778 (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
780 /* Store the overflow parameters as the basis for the next fragment */
781 splitmux->mux_start_time = splitmux->muxed_out_time;
782 splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
784 GST_DEBUG_OBJECT (splitmux,
785 "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
786 GST_TIME_ARGS (splitmux->max_out_running_time));
788 send_fragment_opened_closed_msg (splitmux, TRUE);
790 GST_SPLITMUX_BROADCAST (splitmux);
794 bus_handler (GstBin * bin, GstMessage * message)
796 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
798 switch (GST_MESSAGE_TYPE (message)) {
799 case GST_MESSAGE_EOS:
800 /* If the state is draining out the current file, drop this EOS */
801 GST_SPLITMUX_LOCK (splitmux);
803 send_fragment_opened_closed_msg (splitmux, FALSE);
805 if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
806 splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
807 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
808 splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
809 GST_SPLITMUX_BROADCAST (splitmux);
811 gst_message_unref (message);
812 GST_SPLITMUX_UNLOCK (splitmux);
815 GST_SPLITMUX_UNLOCK (splitmux);
821 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
824 /* Called with splitmux lock held */
825 /* Called when entering ProcessingCompleteGop state
826 * Assess if mq contents overflowed the current file
827 * -> If yes, need to switch to new file
828 * -> if no, set max_out_running_time to let this GOP in and
829 * go to COLLECTING_GOP_START state
832 handle_gathered_gop (GstSplitMuxSink * splitmux)
835 gsize queued_bytes = 0;
836 GstClockTime queued_time = 0;
838 /* Assess if the multiqueue contents overflowed the current file */
839 for (cur = g_list_first (splitmux->contexts);
840 cur != NULL; cur = g_list_next (cur)) {
841 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
842 if (tmpctx->in_running_time > queued_time)
843 queued_time = tmpctx->in_running_time;
844 queued_bytes += tmpctx->in_bytes;
847 g_assert (queued_bytes >= splitmux->mux_start_bytes);
848 g_assert (queued_time >= splitmux->mux_start_time);
850 queued_bytes -= splitmux->mux_start_bytes;
851 queued_time -= splitmux->mux_start_time;
853 /* Expand queued bytes estimate by muxer overhead */
854 queued_bytes += (queued_bytes * splitmux->mux_overhead);
856 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
857 " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
859 /* Check for overrun - have we output at least one byte and overrun
860 * either threshold? */
861 if ((splitmux->have_muxed_something &&
862 ((splitmux->threshold_bytes > 0 &&
863 queued_bytes >= splitmux->threshold_bytes) ||
864 (splitmux->threshold_time > 0 &&
865 queued_time >= splitmux->threshold_time)))) {
867 splitmux->state = SPLITMUX_STATE_ENDING_FILE;
869 GST_INFO_OBJECT (splitmux,
870 "mq overflowed since last, draining out. max out TS is %"
871 GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
872 GST_SPLITMUX_BROADCAST (splitmux);
876 GST_LOG_OBJECT (splitmux,
877 "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
878 " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
879 splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
880 queued_bytes, GST_TIME_ARGS (queued_time));
882 /* Wake everyone up to push this one GOP, then sleep */
883 splitmux->have_muxed_something = TRUE;
885 if (!splitmux->reference_ctx->in_eos) {
886 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
887 splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
889 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
890 splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
893 GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
894 GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
895 GST_SPLITMUX_BROADCAST (splitmux);
900 /* Called with splitmux lock held */
901 /* Called from each input pad when it is has all the pieces
902 * for a GOP or EOS, starting with the reference pad which has set the
903 * splitmux->max_in_running_time
906 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
909 gboolean ready = TRUE;
910 GstClockTime current_max_in_running_time;
912 if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
913 /* Iterate each pad, and check that the input running time is at least
914 * up to the reference running time, and if so handle the collected GOP */
915 GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
916 GST_TIME_FORMAT " ctx %p",
917 GST_TIME_ARGS (splitmux->max_in_running_time), ctx);
918 for (cur = g_list_first (splitmux->contexts); cur != NULL;
919 cur = g_list_next (cur)) {
920 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
922 GST_LOG_OBJECT (splitmux,
923 "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
924 " EOS %d", tmpctx, tmpctx->srcpad,
925 GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
927 if (splitmux->max_in_running_time != GST_CLOCK_TIME_NONE &&
928 tmpctx->in_running_time < splitmux->max_in_running_time &&
930 GST_LOG_OBJECT (splitmux,
931 "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
932 tmpctx, tmpctx->srcpad);
938 GST_DEBUG_OBJECT (splitmux,
939 "Collected GOP is complete. Processing (ctx %p)", ctx);
940 /* All pads have a complete GOP, release it into the multiqueue */
941 handle_gathered_gop (splitmux);
945 /* If upstream reached EOS we are not expecting more data, no need to wait
950 /* Some pad is not yet ready, or GOP is being pushed
951 * either way, sleep and wait to get woken */
952 current_max_in_running_time = splitmux->max_in_running_time;
953 while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
954 splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
956 (current_max_in_running_time == splitmux->max_in_running_time)) {
958 GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
959 splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
960 "GOP complete" : "EOF draining", ctx);
961 GST_SPLITMUX_WAIT (splitmux);
963 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
968 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
971 guint cur_len = g_queue_get_length (&ctx->queued_bufs);
973 GST_DEBUG_OBJECT (ctx->sinkpad,
974 "Checking queue length len %u cur_max %u queued gops %u",
975 cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
977 if (cur_len >= splitmux->mq_max_buffers) {
978 gboolean allow_grow = FALSE;
980 /* If collecting a GOP and this pad might block,
981 * and there isn't already a pending GOP in the queue
984 if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
985 ctx->in_running_time < splitmux->max_in_running_time &&
986 splitmux->queued_gops <= 1) {
988 } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
989 ctx->is_reference && splitmux->queued_gops <= 1) {
994 for (cur = g_list_first (splitmux->contexts);
995 cur != NULL; cur = g_list_next (cur)) {
996 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
997 GST_DEBUG_OBJECT (tmpctx->sinkpad,
998 " len %u out_blocked %d",
999 g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
1000 /* If another stream is starving, grow */
1001 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1008 splitmux->mq_max_buffers = cur_len + 1;
1010 GST_INFO_OBJECT (splitmux,
1011 "Multiqueue overrun - enlarging to %u buffers ctx %p",
1012 splitmux->mq_max_buffers, ctx);
1014 g_object_set (splitmux->mq, "max-size-buffers",
1015 splitmux->mq_max_buffers, NULL);
1020 static GstPadProbeReturn
1021 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1023 GstSplitMuxSink *splitmux = ctx->splitmux;
1025 MqStreamBuf *buf_info = NULL;
1027 gboolean loop_again;
1028 gboolean keyframe = FALSE;
1030 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1032 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1033 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1034 g_warning ("Buffer list handling not implemented");
1035 return GST_PAD_PROBE_DROP;
1037 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1038 GstEvent *event = gst_pad_probe_info_get_event (info);
1039 switch (GST_EVENT_TYPE (event)) {
1040 case GST_EVENT_SEGMENT:
1041 gst_event_copy_segment (event, &ctx->in_segment);
1043 case GST_EVENT_FLUSH_STOP:
1044 GST_SPLITMUX_LOCK (splitmux);
1045 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1046 ctx->in_eos = FALSE;
1048 ctx->in_running_time = 0;
1049 GST_SPLITMUX_UNLOCK (splitmux);
1052 GST_SPLITMUX_LOCK (splitmux);
1055 if (splitmux->state == SPLITMUX_STATE_STOPPED)
1058 if (ctx->is_reference) {
1059 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1060 /* Act as if this is a new keyframe with infinite timestamp */
1061 splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
1062 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1063 /* Wake up other input pads to collect this GOP */
1064 GST_SPLITMUX_BROADCAST (splitmux);
1065 check_completed_gop (splitmux, ctx);
1066 } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1067 /* If we are waiting for a GOP to be completed (ie, for aux
1068 * pads to catch up), then this pad is complete, so check
1069 * if the whole GOP is.
1071 check_completed_gop (splitmux, ctx);
1073 GST_SPLITMUX_UNLOCK (splitmux);
1078 return GST_PAD_PROBE_PASS;
1081 buf = gst_pad_probe_info_get_buffer (info);
1082 buf_info = mq_stream_buf_new ();
1084 if (GST_BUFFER_PTS_IS_VALID (buf))
1085 ts = GST_BUFFER_PTS (buf);
1087 ts = GST_BUFFER_DTS (buf);
1089 GST_SPLITMUX_LOCK (splitmux);
1091 if (splitmux->state == SPLITMUX_STATE_STOPPED)
1094 /* If this buffer has a timestamp, advance the input timestamp of the
1096 if (GST_CLOCK_TIME_IS_VALID (ts)) {
1097 GstClockTime running_time =
1098 gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1099 GST_BUFFER_TIMESTAMP (buf));
1101 if (GST_CLOCK_TIME_IS_VALID (running_time) &&
1102 (ctx->in_running_time == GST_CLOCK_TIME_NONE
1103 || running_time > ctx->in_running_time))
1104 ctx->in_running_time = running_time;
1107 /* Try to make sure we have a valid running time */
1108 if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
1109 ctx->in_running_time =
1110 gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1111 ctx->in_segment.start);
1114 buf_info->run_ts = ctx->in_running_time;
1115 buf_info->buf_size = gst_buffer_get_size (buf);
1117 /* Update total input byte counter for overflow detect */
1118 ctx->in_bytes += buf_info->buf_size;
1120 /* initialize mux_start_time */
1121 if (ctx->is_reference && splitmux->mux_start_time == 0)
1122 splitmux->mux_start_time = buf_info->run_ts;
1124 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
1125 " total in_bytes %" G_GSIZE_FORMAT,
1126 GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1133 switch (splitmux->state) {
1134 case SPLITMUX_STATE_COLLECTING_GOP_START:
1135 if (ctx->is_reference) {
1136 /* If a keyframe, we have a complete GOP */
1137 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1138 !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
1139 splitmux->max_in_running_time >= ctx->in_running_time) {
1140 /* Pass this buffer through */
1144 GST_INFO_OBJECT (pad,
1145 "Have keyframe with running time %" GST_TIME_FORMAT,
1146 GST_TIME_ARGS (ctx->in_running_time));
1148 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1149 splitmux->max_in_running_time = ctx->in_running_time;
1150 /* Wake up other input pads to collect this GOP */
1151 GST_SPLITMUX_BROADCAST (splitmux);
1152 check_completed_gop (splitmux, ctx);
1154 /* We're still waiting for a keyframe on the reference pad, sleep */
1155 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1156 GST_SPLITMUX_WAIT (splitmux);
1157 GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1161 case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1162 /* After a GOP start is found, this buffer might complete the GOP */
1163 /* If we overran the target timestamp, it might be time to process
1164 * the GOP, otherwise bail out for more data
1166 GST_LOG_OBJECT (pad,
1167 "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
1168 GST_TIME_ARGS (ctx->in_running_time),
1169 GST_TIME_ARGS (splitmux->max_in_running_time));
1171 if (ctx->in_running_time < splitmux->max_in_running_time) {
1176 GST_LOG_OBJECT (pad,
1177 "Collected last packet of GOP. Checking other pads");
1178 check_completed_gop (splitmux, ctx);
1180 case SPLITMUX_STATE_ENDING_FILE:{
1183 /* If somes streams received no buffer during the last GOP that overran,
1184 * because its next buffer has a timestamp bigger than
1185 * ctx->max_in_running_time, its queue is empty. In that case the only
1186 * way to wakeup the output thread is by injecting an event in the
1187 * queue. This usually happen with subtitle streams.
1188 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1189 GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1190 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1191 GST_EVENT_TYPE_SERIALIZED,
1192 gst_structure_new ("splitmuxsink-unblock", "timestamp",
1193 G_TYPE_UINT64, splitmux->max_in_running_time, NULL));
1194 gst_pad_send_event (ctx->sinkpad, event);
1197 case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1198 /* A fragment is ending, wait until that's done before continuing */
1199 GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1200 GST_SPLITMUX_WAIT (splitmux);
1201 GST_DEBUG_OBJECT (pad,
1202 "Done sleeping for fragment restart state now %d", splitmux->state);
1208 } while (loop_again);
1211 splitmux->queued_gops++;
1212 buf_info->keyframe = TRUE;
1215 /* Now add this buffer to the queue just before returning */
1216 g_queue_push_head (&ctx->queued_bufs, buf_info);
1218 /* Check the buffer will fit in the mq */
1219 check_queue_length (splitmux, ctx);
1221 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1222 " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
1224 GST_SPLITMUX_UNLOCK (splitmux);
1225 return GST_PAD_PROBE_PASS;
1228 GST_SPLITMUX_UNLOCK (splitmux);
1230 mq_stream_buf_free (buf_info);
1231 return GST_PAD_PROBE_PASS;
1235 gst_splitmux_sink_request_new_pad (GstElement * element,
1236 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1238 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1239 GstPadTemplate *mux_template = NULL;
1241 GstPad *mq_sink, *mq_src;
1243 gboolean is_video = FALSE;
1246 GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1248 GST_SPLITMUX_LOCK (splitmux);
1249 if (!create_elements (splitmux))
1252 if (templ->name_template) {
1253 if (g_str_equal (templ->name_template, "video")) {
1254 /* FIXME: Look for a pad template with matching caps, rather than by name */
1256 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1257 (splitmux->muxer), "video_%u");
1262 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1263 (splitmux->muxer), templ->name_template);
1265 if (mux_template == NULL) {
1266 /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1268 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1269 (splitmux->muxer), "sink_%d");
1273 res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1278 gname = g_strdup ("video");
1279 else if (name == NULL)
1280 gname = gst_pad_get_name (res);
1282 gname = g_strdup (name);
1284 if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1285 gst_element_release_request_pad (splitmux->muxer, res);
1286 gst_object_unref (GST_OBJECT (res));
1290 if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1291 gst_element_release_request_pad (splitmux->muxer, res);
1292 gst_object_unref (GST_OBJECT (res));
1293 gst_element_release_request_pad (splitmux->mq, mq_sink);
1294 gst_object_unref (GST_OBJECT (mq_sink));
1298 gst_object_unref (GST_OBJECT (res));
1300 ctx = mq_stream_ctx_new (splitmux);
1301 ctx->srcpad = mq_src;
1302 ctx->sinkpad = mq_sink;
1304 mq_stream_ctx_ref (ctx);
1305 ctx->src_pad_block_id =
1306 gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1307 (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1308 _pad_block_destroy_src_notify);
1309 if (is_video && splitmux->reference_ctx != NULL) {
1310 splitmux->reference_ctx->is_reference = FALSE;
1311 splitmux->reference_ctx = NULL;
1313 if (splitmux->reference_ctx == NULL) {
1314 splitmux->reference_ctx = ctx;
1315 ctx->is_reference = TRUE;
1318 res = gst_ghost_pad_new (gname, mq_sink);
1319 g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1321 mq_stream_ctx_ref (ctx);
1322 ctx->sink_pad_block_id =
1323 gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1324 (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1325 _pad_block_destroy_sink_notify);
1327 GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1328 " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1330 splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1334 gst_object_unref (mq_sink);
1335 gst_object_unref (mq_src);
1337 gst_pad_set_active (res, TRUE);
1338 gst_element_add_pad (element, res);
1339 GST_SPLITMUX_UNLOCK (splitmux);
1343 GST_SPLITMUX_UNLOCK (splitmux);
1348 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1350 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1351 GstPad *mqsink, *mqsrc, *muxpad;
1353 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1355 GST_SPLITMUX_LOCK (splitmux);
1357 if (splitmux->muxer == NULL || splitmux->mq == NULL)
1358 goto fail; /* Elements don't exist yet - nothing to release */
1360 GST_INFO_OBJECT (pad, "releasing request pad");
1362 mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1363 mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1364 muxpad = gst_pad_get_peer (mqsrc);
1366 /* Remove the context from our consideration */
1367 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1369 if (ctx->sink_pad_block_id)
1370 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1372 if (ctx->src_pad_block_id)
1373 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1375 /* Can release the context now */
1376 mq_stream_ctx_unref (ctx);
1378 /* Release and free the mq input */
1379 gst_element_release_request_pad (splitmux->mq, mqsink);
1381 /* Release and free the muxer input */
1382 gst_element_release_request_pad (splitmux->muxer, muxpad);
1384 gst_object_unref (mqsink);
1385 gst_object_unref (mqsrc);
1386 gst_object_unref (muxpad);
1388 gst_element_remove_pad (element, pad);
1390 /* Reset the internal elements only after all request pads are released */
1391 if (splitmux->contexts == NULL)
1392 gst_splitmux_reset (splitmux);
1395 GST_SPLITMUX_UNLOCK (splitmux);
1399 create_element (GstSplitMuxSink * splitmux,
1400 const gchar * factory, const gchar * name)
1402 GstElement *ret = gst_element_factory_make (factory, name);
1404 g_warning ("Failed to create %s - splitmuxsink will not work", name);
1408 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1409 g_warning ("Could not add %s element - splitmuxsink will not work", name);
1410 gst_object_unref (ret);
1418 create_elements (GstSplitMuxSink * splitmux)
1420 /* Create internal elements */
1421 if (splitmux->mq == NULL) {
1423 create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1426 splitmux->mq_max_buffers = 5;
1427 /* No bytes or time limit, we limit buffers manually */
1428 g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1429 (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1432 if (splitmux->muxer == NULL) {
1433 GstElement *provided_muxer = NULL;
1435 GST_OBJECT_LOCK (splitmux);
1436 if (splitmux->provided_muxer != NULL)
1437 provided_muxer = gst_object_ref (splitmux->provided_muxer);
1438 GST_OBJECT_UNLOCK (splitmux);
1440 if (provided_muxer == NULL) {
1441 if ((splitmux->muxer =
1442 create_element (splitmux, "mp4mux", "muxer")) == NULL)
1445 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1446 g_warning ("Could not add muxer element - splitmuxsink will not work");
1447 gst_object_unref (provided_muxer);
1451 splitmux->muxer = provided_muxer;
1452 gst_object_unref (provided_muxer);
1462 find_sink (GstElement * e)
1464 GstElement *res = NULL;
1466 gboolean done = FALSE;
1467 GValue data = { 0, };
1469 if (!GST_IS_BIN (e))
1472 iter = gst_bin_iterate_sinks (GST_BIN (e));
1474 switch (gst_iterator_next (iter, &data)) {
1475 case GST_ITERATOR_OK:
1477 GstElement *child = g_value_get_object (&data);
1478 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1479 "location") != NULL) {
1483 g_value_reset (&data);
1486 case GST_ITERATOR_RESYNC:
1487 gst_iterator_resync (iter);
1489 case GST_ITERATOR_DONE:
1492 case GST_ITERATOR_ERROR:
1493 g_assert_not_reached ();
1497 g_value_unset (&data);
1498 gst_iterator_free (iter);
1504 create_sink (GstSplitMuxSink * splitmux)
1506 GstElement *provided_sink = NULL;
1508 if (splitmux->active_sink == NULL) {
1510 GST_OBJECT_LOCK (splitmux);
1511 if (splitmux->provided_sink != NULL)
1512 provided_sink = gst_object_ref (splitmux->provided_sink);
1513 GST_OBJECT_UNLOCK (splitmux);
1515 if (provided_sink == NULL) {
1516 if ((splitmux->sink =
1517 create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1519 splitmux->active_sink = splitmux->sink;
1521 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1522 g_warning ("Could not add sink elements - splitmuxsink will not work");
1523 gst_object_unref (provided_sink);
1527 splitmux->active_sink = provided_sink;
1529 /* The bin holds a ref now, we can drop our tmp ref */
1530 gst_object_unref (provided_sink);
1532 /* Find the sink element */
1533 splitmux->sink = find_sink (splitmux->active_sink);
1534 if (splitmux->sink == NULL) {
1536 ("Could not locate sink element in provided sink - splitmuxsink will not work");
1541 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1542 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1553 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1556 set_next_filename (GstSplitMuxSink * splitmux)
1558 gchar *fname = NULL;
1559 gst_splitmux_sink_ensure_max_files (splitmux);
1561 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1562 splitmux->fragment_id, &fname);
1565 fname = splitmux->location ?
1566 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1569 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1570 g_object_set (splitmux->sink, "location", fname, NULL);
1573 splitmux->fragment_id++;
1577 static GstStateChangeReturn
1578 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1580 GstStateChangeReturn ret;
1581 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1583 switch (transition) {
1584 case GST_STATE_CHANGE_NULL_TO_READY:{
1585 GST_SPLITMUX_LOCK (splitmux);
1586 if (!create_elements (splitmux) || !create_sink (splitmux)) {
1587 ret = GST_STATE_CHANGE_FAILURE;
1588 GST_SPLITMUX_UNLOCK (splitmux);
1591 GST_SPLITMUX_UNLOCK (splitmux);
1592 splitmux->fragment_id = 0;
1593 set_next_filename (splitmux);
1596 case GST_STATE_CHANGE_READY_TO_PAUSED:{
1597 GST_SPLITMUX_LOCK (splitmux);
1598 /* Start by collecting one input on each pad */
1599 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1600 splitmux->max_in_running_time = 0;
1601 splitmux->muxed_out_time = splitmux->mux_start_time = 0;
1602 splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1603 splitmux->opening_first_fragment = TRUE;
1604 GST_SPLITMUX_UNLOCK (splitmux);
1607 case GST_STATE_CHANGE_PAUSED_TO_READY:
1608 case GST_STATE_CHANGE_READY_TO_NULL:
1609 GST_SPLITMUX_LOCK (splitmux);
1610 splitmux->state = SPLITMUX_STATE_STOPPED;
1611 /* Wake up any blocked threads */
1612 GST_LOG_OBJECT (splitmux,
1613 "State change -> NULL or READY. Waking threads");
1614 GST_SPLITMUX_BROADCAST (splitmux);
1615 GST_SPLITMUX_UNLOCK (splitmux);
1621 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1622 if (ret == GST_STATE_CHANGE_FAILURE)
1625 switch (transition) {
1626 case GST_STATE_CHANGE_READY_TO_NULL:
1627 GST_SPLITMUX_LOCK (splitmux);
1628 splitmux->fragment_id = 0;
1629 /* Reset internal elements only if no pad contexts are using them */
1630 if (splitmux->contexts == NULL)
1631 gst_splitmux_reset (splitmux);
1632 GST_SPLITMUX_UNLOCK (splitmux);
1640 if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1641 ret == GST_STATE_CHANGE_FAILURE) {
1642 /* Cleanup elements on failed transition out of NULL */
1643 gst_splitmux_reset (splitmux);
1649 register_splitmuxsink (GstPlugin * plugin)
1651 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1652 "Split File Muxing Sink");
1654 return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1655 GST_TYPE_SPLITMUX_SINK);
1659 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
1661 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
1662 splitmux->fragment_id = 0;