1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @short_description: Muxer wrapper for splitting output stream by size or time
24 * This element wraps a muxer and a sink, and starts a new file when the mux
25 * contents are about to cross a threshold of maximum size of maximum time,
26 * splitting at video keyframe boundaries. Exactly one input video stream
27 * can be muxed, with as many accompanying audio and subtitle streams as
30 * By default, it uses mp4mux and filesink, but they can be changed via
31 * the 'muxer' and 'sink' properties.
33 * The minimum file size is 1 GOP, however - so limits may be overrun if the
34 * distance between any 2 keyframes is larger than the limits.
36 * If a video stream is available, the splitting process is driven by the video
37 * stream contents, and the video stream must contain closed GOPs for the output
38 * file parts to be played individually correctly. In the absence of a video
39 * stream, the first available stream is used as reference for synchronization.
42 * <title>Example pipelines</title>
44 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
46 * Records a video stream captured from a v4l2 device and muxes it into
47 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48 * and 1MB maximum size.
57 #include <glib/gstdio.h>
58 #include "gstsplitmuxsink.h"
60 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
61 #define GST_CAT_DEFAULT splitmux_debug
63 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
64 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
65 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
66 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
80 #define DEFAULT_MAX_SIZE_TIME 0
81 #define DEFAULT_MAX_SIZE_BYTES 0
82 #define DEFAULT_MAX_FILES 0
83 #define DEFAULT_MUXER_OVERHEAD 0.02
84 #define DEFAULT_MUXER "mp4mux"
85 #define DEFAULT_SINK "filesink"
89 SIGNAL_FORMAT_LOCATION,
93 static guint signals[SIGNAL_LAST];
95 static GstStaticPadTemplate video_sink_template =
96 GST_STATIC_PAD_TEMPLATE ("video",
100 static GstStaticPadTemplate audio_sink_template =
101 GST_STATIC_PAD_TEMPLATE ("audio_%u",
104 GST_STATIC_CAPS_ANY);
105 static GstStaticPadTemplate subtitle_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
109 GST_STATIC_CAPS_ANY);
111 static GQuark PAD_CONTEXT;
116 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
119 #define gst_splitmux_sink_parent_class parent_class
120 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
123 static gboolean create_elements (GstSplitMuxSink * splitmux);
124 static gboolean create_sink (GstSplitMuxSink * splitmux);
125 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
126 const GValue * value, GParamSpec * pspec);
127 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
128 GValue * value, GParamSpec * pspec);
129 static void gst_splitmux_sink_dispose (GObject * object);
130 static void gst_splitmux_sink_finalize (GObject * object);
132 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
133 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
134 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
136 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
137 element, GstStateChange transition);
139 static void bus_handler (GstBin * bin, GstMessage * msg);
140 static void set_next_filename (GstSplitMuxSink * splitmux);
141 static void start_next_fragment (GstSplitMuxSink * splitmux);
142 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
143 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
145 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
148 mq_stream_buf_new (void)
150 return g_slice_new0 (MqStreamBuf);
154 mq_stream_buf_free (MqStreamBuf * data)
156 g_slice_free (MqStreamBuf, data);
160 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
162 GObjectClass *gobject_class = (GObjectClass *) klass;
163 GstElementClass *gstelement_class = (GstElementClass *) klass;
164 GstBinClass *gstbin_class = (GstBinClass *) klass;
166 gobject_class->set_property = gst_splitmux_sink_set_property;
167 gobject_class->get_property = gst_splitmux_sink_get_property;
168 gobject_class->dispose = gst_splitmux_sink_dispose;
169 gobject_class->finalize = gst_splitmux_sink_finalize;
171 gst_element_class_set_static_metadata (gstelement_class,
172 "Split Muxing Bin", "Generic/Bin/Muxer",
173 "Convenience bin that muxes incoming streams into multiple time/size limited files",
174 "Jan Schmidt <jan@centricular.com>");
176 gst_element_class_add_static_pad_template (gstelement_class,
177 &video_sink_template);
178 gst_element_class_add_static_pad_template (gstelement_class,
179 &audio_sink_template);
180 gst_element_class_add_static_pad_template (gstelement_class,
181 &subtitle_sink_template);
183 gstelement_class->change_state =
184 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
185 gstelement_class->request_new_pad =
186 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
187 gstelement_class->release_pad =
188 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
190 gstbin_class->handle_message = bus_handler;
192 g_object_class_install_property (gobject_class, PROP_LOCATION,
193 g_param_spec_string ("location", "File Output Pattern",
194 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
195 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
197 g_param_spec_double ("mux-overhead", "Muxing Overhead",
198 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
199 DEFAULT_MUXER_OVERHEAD,
200 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
202 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
203 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
205 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
207 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
208 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
209 DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
211 g_param_spec_uint ("max-files", "Max files",
212 "Maximum number of files to keep on disk. Once the maximum is reached,"
213 "old files start to be deleted to make room for new ones.",
214 0, G_MAXUINT, DEFAULT_MAX_FILES,
215 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
218 g_object_class_install_property (gobject_class, PROP_MUXER,
219 g_param_spec_object ("muxer", "Muxer",
220 "The muxer element to use (NULL = default mp4mux)",
221 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222 g_object_class_install_property (gobject_class, PROP_SINK,
223 g_param_spec_object ("sink", "Sink",
224 "The sink element (or element chain) to use (NULL = default filesink)",
225 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
228 * GstSplitMuxSink::format-location:
229 * @splitmux: the #GstSplitMuxSink
230 * @fragment_id: the sequence number of the file to be created
232 * Returns: the location to be used for the next output file
234 signals[SIGNAL_FORMAT_LOCATION] =
235 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
236 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
240 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
242 g_mutex_init (&splitmux->lock);
243 g_cond_init (&splitmux->data_cond);
245 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
246 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
247 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
248 splitmux->max_files = DEFAULT_MAX_FILES;
250 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
251 g_object_set (splitmux, "async-handling", TRUE, NULL);
255 gst_splitmux_reset (GstSplitMuxSink * splitmux)
258 gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
260 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
261 if (splitmux->active_sink)
262 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
264 splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
269 gst_splitmux_sink_dispose (GObject * object)
271 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
273 G_OBJECT_CLASS (parent_class)->dispose (object);
275 /* Calling parent dispose invalidates all child pointers */
276 splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
281 gst_splitmux_sink_finalize (GObject * object)
283 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
284 g_cond_clear (&splitmux->data_cond);
285 g_mutex_clear (&splitmux->lock);
286 if (splitmux->provided_sink)
287 gst_object_unref (splitmux->provided_sink);
288 if (splitmux->provided_muxer)
289 gst_object_unref (splitmux->provided_muxer);
291 g_free (splitmux->location);
293 /* Make sure to free any un-released contexts */
294 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
295 g_list_free (splitmux->contexts);
297 G_OBJECT_CLASS (parent_class)->finalize (object);
301 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
302 const GValue * value, GParamSpec * pspec)
304 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
308 GST_OBJECT_LOCK (splitmux);
309 g_free (splitmux->location);
310 splitmux->location = g_value_dup_string (value);
311 GST_OBJECT_UNLOCK (splitmux);
314 case PROP_MAX_SIZE_BYTES:
315 GST_OBJECT_LOCK (splitmux);
316 splitmux->threshold_bytes = g_value_get_uint64 (value);
317 GST_OBJECT_UNLOCK (splitmux);
319 case PROP_MAX_SIZE_TIME:
320 GST_OBJECT_LOCK (splitmux);
321 splitmux->threshold_time = g_value_get_uint64 (value);
322 GST_OBJECT_UNLOCK (splitmux);
325 GST_OBJECT_LOCK (splitmux);
326 splitmux->max_files = g_value_get_uint (value);
327 GST_OBJECT_UNLOCK (splitmux);
329 case PROP_MUXER_OVERHEAD:
330 GST_OBJECT_LOCK (splitmux);
331 splitmux->mux_overhead = g_value_get_double (value);
332 GST_OBJECT_UNLOCK (splitmux);
335 GST_OBJECT_LOCK (splitmux);
336 if (splitmux->provided_sink)
337 gst_object_unref (splitmux->provided_sink);
338 splitmux->provided_sink = g_value_dup_object (value);
339 GST_OBJECT_UNLOCK (splitmux);
342 GST_OBJECT_LOCK (splitmux);
343 if (splitmux->provided_muxer)
344 gst_object_unref (splitmux->provided_muxer);
345 splitmux->provided_muxer = g_value_dup_object (value);
346 GST_OBJECT_UNLOCK (splitmux);
349 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
355 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
356 GValue * value, GParamSpec * pspec)
358 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
362 GST_OBJECT_LOCK (splitmux);
363 g_value_set_string (value, splitmux->location);
364 GST_OBJECT_UNLOCK (splitmux);
366 case PROP_MAX_SIZE_BYTES:
367 GST_OBJECT_LOCK (splitmux);
368 g_value_set_uint64 (value, splitmux->threshold_bytes);
369 GST_OBJECT_UNLOCK (splitmux);
371 case PROP_MAX_SIZE_TIME:
372 GST_OBJECT_LOCK (splitmux);
373 g_value_set_uint64 (value, splitmux->threshold_time);
374 GST_OBJECT_UNLOCK (splitmux);
377 GST_OBJECT_LOCK (splitmux);
378 g_value_set_uint (value, splitmux->max_files);
379 GST_OBJECT_UNLOCK (splitmux);
381 case PROP_MUXER_OVERHEAD:
382 GST_OBJECT_LOCK (splitmux);
383 g_value_set_double (value, splitmux->mux_overhead);
384 GST_OBJECT_UNLOCK (splitmux);
387 GST_OBJECT_LOCK (splitmux);
388 g_value_set_object (value, splitmux->provided_sink);
389 GST_OBJECT_UNLOCK (splitmux);
392 GST_OBJECT_LOCK (splitmux);
393 g_value_set_object (value, splitmux->provided_muxer);
394 GST_OBJECT_UNLOCK (splitmux);
397 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
402 /* Convenience function */
403 static inline GstClockTimeDiff
404 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
406 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
408 if (GST_CLOCK_TIME_IS_VALID (val)) {
410 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
420 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
422 gchar *tmp, *sinkname, *srcname;
425 sinkname = gst_pad_get_name (sink_pad);
427 srcname = g_strdup_printf ("src_%s", tmp);
429 mq_src = gst_element_get_static_pad (mq, srcname);
438 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
444 /* Request a pad from multiqueue, then connect this one, then
445 * discover the corresponding output pad and return both */
446 mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
450 mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
460 gst_element_release_request_pad (splitmux->mq, mq_sink);
465 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
469 ctx = g_new0 (MqStreamCtx, 1);
470 g_atomic_int_set (&ctx->refcount, 1);
471 ctx->splitmux = splitmux;
472 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
473 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
474 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
475 g_queue_init (&ctx->queued_bufs);
480 mq_stream_ctx_free (MqStreamCtx * ctx)
482 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
483 g_queue_clear (&ctx->queued_bufs);
488 mq_stream_ctx_unref (MqStreamCtx * ctx)
490 if (g_atomic_int_dec_and_test (&ctx->refcount))
491 mq_stream_ctx_free (ctx);
495 mq_stream_ctx_ref (MqStreamCtx * ctx)
497 g_atomic_int_inc (&ctx->refcount);
501 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
503 ctx->sink_pad_block_id = 0;
504 mq_stream_ctx_unref (ctx);
508 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
510 ctx->src_pad_block_id = 0;
511 mq_stream_ctx_unref (ctx);
515 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
517 gchar *location = NULL;
519 const gchar *msg_name = opened ?
520 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
522 g_object_get (splitmux->sink, "location", &location, NULL);
524 msg = gst_message_new_element (GST_OBJECT (splitmux),
525 gst_structure_new (msg_name,
526 "location", G_TYPE_STRING, location,
527 "running-time", GST_TYPE_CLOCK_TIME,
528 splitmux->reference_ctx->out_running_time, NULL));
529 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
534 /* Called with lock held, drops the lock to send EOS to the
538 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
543 eos = gst_event_new_eos ();
544 pad = gst_pad_get_peer (ctx->srcpad);
548 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
549 GST_SPLITMUX_UNLOCK (splitmux);
550 gst_pad_send_event (pad, eos);
551 GST_SPLITMUX_LOCK (splitmux);
553 gst_object_unref (pad);
556 /* Called with splitmux lock held to check if this output
557 * context needs to sleep to wait for the release of the
558 * next GOP, or to send EOS to close out the current file
561 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
565 GST_LOG_OBJECT (ctx->srcpad,
566 "Checking running time %" GST_STIME_FORMAT " against max %"
567 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
568 GST_STIME_ARGS (splitmux->max_out_running_time));
570 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
571 ctx->out_running_time < splitmux->max_out_running_time) {
572 splitmux->have_muxed_something = TRUE;
576 if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
579 if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
580 if (ctx->out_eos == FALSE) {
581 send_eos (splitmux, ctx);
584 } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
585 start_next_fragment (splitmux);
589 GST_INFO_OBJECT (ctx->srcpad,
590 "Sleeping for running time %"
591 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")",
592 GST_STIME_ARGS (ctx->out_running_time),
593 GST_STIME_ARGS (splitmux->max_out_running_time));
594 ctx->out_blocked = TRUE;
595 /* Expand the mq if needed before sleeping */
596 check_queue_length (splitmux, ctx);
597 GST_SPLITMUX_WAIT (splitmux);
598 ctx->out_blocked = FALSE;
599 GST_INFO_OBJECT (ctx->srcpad,
600 "Woken for new max running time %" GST_STIME_FORMAT,
601 GST_STIME_ARGS (splitmux->max_out_running_time));
605 static GstPadProbeReturn
606 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
608 GstSplitMuxSink *splitmux = ctx->splitmux;
609 MqStreamBuf *buf_info = NULL;
611 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
613 /* FIXME: Handle buffer lists, until then make it clear they won't work */
614 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
615 g_warning ("Buffer list handling not implemented");
616 return GST_PAD_PROBE_DROP;
618 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
619 GstEvent *event = gst_pad_probe_info_get_event (info);
621 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
623 switch (GST_EVENT_TYPE (event)) {
624 case GST_EVENT_SEGMENT:
625 gst_event_copy_segment (event, &ctx->out_segment);
627 case GST_EVENT_FLUSH_STOP:
628 GST_SPLITMUX_LOCK (splitmux);
629 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
630 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
631 g_queue_clear (&ctx->queued_bufs);
632 ctx->flushing = FALSE;
633 GST_SPLITMUX_UNLOCK (splitmux);
635 case GST_EVENT_FLUSH_START:
636 GST_SPLITMUX_LOCK (splitmux);
637 GST_LOG_OBJECT (pad, "Flush start");
638 ctx->flushing = TRUE;
639 GST_SPLITMUX_BROADCAST (splitmux);
640 GST_SPLITMUX_UNLOCK (splitmux);
643 GST_SPLITMUX_LOCK (splitmux);
644 if (splitmux->state == SPLITMUX_STATE_STOPPED)
647 GST_SPLITMUX_UNLOCK (splitmux);
651 GstClockTimeDiff rtime;
653 gst_event_parse_gap (event, &gap_ts, NULL);
654 if (gap_ts == GST_CLOCK_TIME_NONE)
657 GST_SPLITMUX_LOCK (splitmux);
659 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
661 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
662 GST_STIME_ARGS (rtime));
664 if (splitmux->state == SPLITMUX_STATE_STOPPED)
667 if (rtime != GST_CLOCK_STIME_NONE) {
668 ctx->out_running_time = rtime;
669 complete_or_wait_on_out (splitmux, ctx);
671 GST_SPLITMUX_UNLOCK (splitmux);
674 case GST_EVENT_CUSTOM_DOWNSTREAM:{
675 const GstStructure *s;
676 GstClockTimeDiff ts = 0;
678 s = gst_event_get_structure (event);
679 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
682 gst_structure_get_int64 (s, "timestamp", &ts);
684 GST_SPLITMUX_LOCK (splitmux);
686 if (splitmux->state == SPLITMUX_STATE_STOPPED)
688 ctx->out_running_time = ts;
689 complete_or_wait_on_out (splitmux, ctx);
690 GST_SPLITMUX_UNLOCK (splitmux);
691 return GST_PAD_PROBE_DROP;
696 return GST_PAD_PROBE_PASS;
699 /* Allow everything through until the configured next stopping point */
700 GST_SPLITMUX_LOCK (splitmux);
702 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
703 if (buf_info == NULL)
704 /* Can only happen due to a poorly timed flush */
707 /* If we have popped a keyframe, decrement the queued_gop count */
708 if (buf_info->keyframe && splitmux->queued_gops > 0)
709 splitmux->queued_gops--;
711 ctx->out_running_time = buf_info->run_ts;
713 GST_LOG_OBJECT (splitmux,
714 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
715 " size %" G_GSIZE_FORMAT,
716 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
718 if (splitmux->opening_first_fragment) {
719 send_fragment_opened_closed_msg (splitmux, TRUE);
720 splitmux->opening_first_fragment = FALSE;
723 complete_or_wait_on_out (splitmux, ctx);
725 if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE ||
726 splitmux->muxed_out_time < buf_info->run_ts)
727 splitmux->muxed_out_time = buf_info->run_ts;
729 splitmux->muxed_out_bytes += buf_info->buf_size;
731 #ifndef GST_DISABLE_GST_DEBUG
733 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
734 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
735 " run ts %" GST_STIME_FORMAT, buf,
736 GST_STIME_ARGS (ctx->out_running_time));
740 GST_SPLITMUX_UNLOCK (splitmux);
742 mq_stream_buf_free (buf_info);
744 return GST_PAD_PROBE_PASS;
747 GST_SPLITMUX_UNLOCK (splitmux);
748 return GST_PAD_PROBE_DROP;
752 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
754 return gst_pad_send_event (peer, gst_event_ref (*event));
758 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
760 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
762 gst_pad_sticky_events_foreach (ctx->srcpad,
763 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
765 /* Clear EOS flag if not actually EOS */
766 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
768 gst_object_unref (peer);
771 /* Called with lock held when a fragment
772 * reaches EOS and it is time to restart
776 start_next_fragment (GstSplitMuxSink * splitmux)
778 /* 1 change to new file */
779 gst_element_set_locked_state (splitmux->muxer, TRUE);
780 gst_element_set_locked_state (splitmux->active_sink, TRUE);
781 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
782 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
784 set_next_filename (splitmux);
786 gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
787 gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
788 gst_element_set_locked_state (splitmux->muxer, FALSE);
789 gst_element_set_locked_state (splitmux->active_sink, FALSE);
791 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
793 /* Switch state and go back to processing */
794 if (!splitmux->reference_ctx->in_eos) {
795 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
796 splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
798 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
799 splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
800 splitmux->have_muxed_something = FALSE;
802 splitmux->have_muxed_something =
803 (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
805 /* Store the overflow parameters as the basis for the next fragment */
806 splitmux->mux_start_time = splitmux->muxed_out_time;
807 splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
809 GST_DEBUG_OBJECT (splitmux,
810 "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
811 GST_STIME_ARGS (splitmux->max_out_running_time));
813 send_fragment_opened_closed_msg (splitmux, TRUE);
815 GST_SPLITMUX_BROADCAST (splitmux);
819 bus_handler (GstBin * bin, GstMessage * message)
821 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
823 switch (GST_MESSAGE_TYPE (message)) {
824 case GST_MESSAGE_EOS:
825 /* If the state is draining out the current file, drop this EOS */
826 GST_SPLITMUX_LOCK (splitmux);
828 send_fragment_opened_closed_msg (splitmux, FALSE);
830 if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
831 splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) {
832 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
833 splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
834 GST_SPLITMUX_BROADCAST (splitmux);
836 gst_message_unref (message);
837 GST_SPLITMUX_UNLOCK (splitmux);
840 GST_SPLITMUX_UNLOCK (splitmux);
846 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
849 /* Called with splitmux lock held */
850 /* Called when entering ProcessingCompleteGop state
851 * Assess if mq contents overflowed the current file
852 * -> If yes, need to switch to new file
853 * -> if no, set max_out_running_time to let this GOP in and
854 * go to COLLECTING_GOP_START state
857 handle_gathered_gop (GstSplitMuxSink * splitmux)
860 gsize queued_bytes = 0;
861 GstClockTimeDiff queued_time = 0;
863 /* Assess if the multiqueue contents overflowed the current file */
864 for (cur = g_list_first (splitmux->contexts);
865 cur != NULL; cur = g_list_next (cur)) {
866 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
867 if (tmpctx->in_running_time > queued_time)
868 queued_time = tmpctx->in_running_time;
869 queued_bytes += tmpctx->in_bytes;
872 g_assert (queued_bytes >= splitmux->mux_start_bytes);
873 g_assert (queued_time >= splitmux->mux_start_time);
875 queued_bytes -= splitmux->mux_start_bytes;
876 queued_time -= splitmux->mux_start_time;
878 /* Expand queued bytes estimate by muxer overhead */
879 queued_bytes += (queued_bytes * splitmux->mux_overhead);
881 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
882 " bytes %" G_GSIZE_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
884 /* Check for overrun - have we output at least one byte and overrun
885 * either threshold? */
886 if ((splitmux->have_muxed_something &&
887 ((splitmux->threshold_bytes > 0 &&
888 queued_bytes >= splitmux->threshold_bytes) ||
889 (splitmux->threshold_time > 0 &&
890 queued_time >= splitmux->threshold_time)))) {
892 splitmux->state = SPLITMUX_STATE_ENDING_FILE;
894 GST_INFO_OBJECT (splitmux,
895 "mq overflowed since last, draining out. max out TS is %"
896 GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
897 GST_SPLITMUX_BROADCAST (splitmux);
901 GST_LOG_OBJECT (splitmux,
902 "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
903 " queued %" G_GSIZE_FORMAT " time %" GST_STIME_FORMAT " Continuing.",
904 splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
905 queued_bytes, GST_STIME_ARGS (queued_time));
907 /* Wake everyone up to push this one GOP, then sleep */
908 splitmux->have_muxed_something = TRUE;
910 if (!splitmux->reference_ctx->in_eos) {
911 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
912 splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
914 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
915 splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
918 GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
919 GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
920 GST_SPLITMUX_BROADCAST (splitmux);
925 /* Called with splitmux lock held */
926 /* Called from each input pad when it is has all the pieces
927 * for a GOP or EOS, starting with the reference pad which has set the
928 * splitmux->max_in_running_time
931 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
934 gboolean ready = TRUE;
935 GstClockTimeDiff current_max_in_running_time;
937 if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
938 /* Iterate each pad, and check that the input running time is at least
939 * up to the reference running time, and if so handle the collected GOP */
940 GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
941 GST_STIME_FORMAT " ctx %p",
942 GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
943 for (cur = g_list_first (splitmux->contexts); cur != NULL;
944 cur = g_list_next (cur)) {
945 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
947 GST_LOG_OBJECT (splitmux,
948 "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
949 " EOS %d", tmpctx, tmpctx->srcpad,
950 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
952 if (splitmux->max_in_running_time != G_MAXINT64 &&
953 tmpctx->in_running_time < splitmux->max_in_running_time &&
955 GST_LOG_OBJECT (splitmux,
956 "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
957 tmpctx, tmpctx->srcpad);
963 GST_DEBUG_OBJECT (splitmux,
964 "Collected GOP is complete. Processing (ctx %p)", ctx);
965 /* All pads have a complete GOP, release it into the multiqueue */
966 handle_gathered_gop (splitmux);
970 /* If upstream reached EOS we are not expecting more data, no need to wait
975 /* Some pad is not yet ready, or GOP is being pushed
976 * either way, sleep and wait to get woken */
977 current_max_in_running_time = splitmux->max_in_running_time;
978 while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
979 splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
981 (current_max_in_running_time == splitmux->max_in_running_time)) {
983 GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
984 splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
985 "GOP complete" : "EOF draining", ctx);
986 GST_SPLITMUX_WAIT (splitmux);
988 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
993 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
996 guint cur_len = g_queue_get_length (&ctx->queued_bufs);
998 GST_DEBUG_OBJECT (ctx->sinkpad,
999 "Checking queue length len %u cur_max %u queued gops %u",
1000 cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
1002 if (cur_len >= splitmux->mq_max_buffers) {
1003 gboolean allow_grow = FALSE;
1005 /* If collecting a GOP and this pad might block,
1006 * and there isn't already a pending GOP in the queue
1009 if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
1010 ctx->in_running_time < splitmux->max_in_running_time &&
1011 splitmux->queued_gops <= 1) {
1013 } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
1014 ctx->is_reference && splitmux->queued_gops <= 1) {
1019 for (cur = g_list_first (splitmux->contexts);
1020 cur != NULL; cur = g_list_next (cur)) {
1021 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1022 GST_DEBUG_OBJECT (tmpctx->sinkpad,
1023 " len %u out_blocked %d",
1024 g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
1025 /* If another stream is starving, grow */
1026 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1033 splitmux->mq_max_buffers = cur_len + 1;
1035 GST_INFO_OBJECT (splitmux,
1036 "Multiqueue overrun - enlarging to %u buffers ctx %p",
1037 splitmux->mq_max_buffers, ctx);
1039 g_object_set (splitmux->mq, "max-size-buffers",
1040 splitmux->mq_max_buffers, NULL);
1045 static GstPadProbeReturn
1046 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1048 GstSplitMuxSink *splitmux = ctx->splitmux;
1050 MqStreamBuf *buf_info = NULL;
1052 gboolean loop_again;
1053 gboolean keyframe = FALSE;
1055 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1057 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1058 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1059 g_warning ("Buffer list handling not implemented");
1060 return GST_PAD_PROBE_DROP;
1062 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1063 GstEvent *event = gst_pad_probe_info_get_event (info);
1064 switch (GST_EVENT_TYPE (event)) {
1065 case GST_EVENT_SEGMENT:
1066 gst_event_copy_segment (event, &ctx->in_segment);
1068 case GST_EVENT_FLUSH_STOP:
1069 GST_SPLITMUX_LOCK (splitmux);
1070 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1071 ctx->in_eos = FALSE;
1073 ctx->in_running_time = GST_CLOCK_STIME_NONE;
1074 GST_SPLITMUX_UNLOCK (splitmux);
1077 GST_SPLITMUX_LOCK (splitmux);
1080 if (splitmux->state == SPLITMUX_STATE_STOPPED)
1083 if (ctx->is_reference) {
1084 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1085 /* Act as if this is a new keyframe with infinite timestamp */
1086 splitmux->max_in_running_time = G_MAXINT64;
1087 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1088 /* Wake up other input pads to collect this GOP */
1089 GST_SPLITMUX_BROADCAST (splitmux);
1090 check_completed_gop (splitmux, ctx);
1091 } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1092 /* If we are waiting for a GOP to be completed (ie, for aux
1093 * pads to catch up), then this pad is complete, so check
1094 * if the whole GOP is.
1096 check_completed_gop (splitmux, ctx);
1098 GST_SPLITMUX_UNLOCK (splitmux);
1103 return GST_PAD_PROBE_PASS;
1106 buf = gst_pad_probe_info_get_buffer (info);
1107 buf_info = mq_stream_buf_new ();
1109 if (GST_BUFFER_PTS_IS_VALID (buf))
1110 ts = GST_BUFFER_PTS (buf);
1112 ts = GST_BUFFER_DTS (buf);
1114 GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
1116 GST_SPLITMUX_LOCK (splitmux);
1118 if (splitmux->state == SPLITMUX_STATE_STOPPED)
1121 /* If this buffer has a timestamp, advance the input timestamp of the
1123 if (GST_CLOCK_TIME_IS_VALID (ts)) {
1124 GstClockTimeDiff running_time =
1125 my_segment_to_running_time (&ctx->in_segment,
1126 GST_BUFFER_TIMESTAMP (buf));
1128 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
1129 GST_STIME_ARGS (running_time));
1131 if (GST_CLOCK_STIME_IS_VALID (running_time)
1132 && running_time > ctx->in_running_time)
1133 ctx->in_running_time = running_time;
1136 /* Try to make sure we have a valid running time */
1137 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
1138 ctx->in_running_time =
1139 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
1142 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
1143 GST_STIME_ARGS (ctx->in_running_time));
1145 buf_info->run_ts = ctx->in_running_time;
1146 buf_info->buf_size = gst_buffer_get_size (buf);
1148 /* Update total input byte counter for overflow detect */
1149 ctx->in_bytes += buf_info->buf_size;
1151 /* initialize mux_start_time */
1152 if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) {
1153 splitmux->mux_start_time = buf_info->run_ts;
1154 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1155 GST_STIME_ARGS (splitmux->mux_start_time));
1156 /* Also take this as the first start time when starting up,
1157 * so that we start counting overflow from the first frame */
1158 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1159 splitmux->max_in_running_time = splitmux->mux_start_time;
1162 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
1163 " total in_bytes %" G_GSIZE_FORMAT,
1164 GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1171 switch (splitmux->state) {
1172 case SPLITMUX_STATE_COLLECTING_GOP_START:
1173 if (ctx->is_reference) {
1174 /* If a keyframe, we have a complete GOP */
1175 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1176 !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
1177 splitmux->max_in_running_time >= ctx->in_running_time) {
1178 /* Pass this buffer through */
1182 GST_INFO_OBJECT (pad,
1183 "Have keyframe with running time %" GST_STIME_FORMAT,
1184 GST_STIME_ARGS (ctx->in_running_time));
1186 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1187 splitmux->max_in_running_time = ctx->in_running_time;
1188 /* Wake up other input pads to collect this GOP */
1189 GST_SPLITMUX_BROADCAST (splitmux);
1190 check_completed_gop (splitmux, ctx);
1192 /* We're still waiting for a keyframe on the reference pad, sleep */
1193 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1194 GST_SPLITMUX_WAIT (splitmux);
1195 GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1199 case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1201 /* If we overran the target timestamp, it might be time to process
1202 * the GOP, otherwise bail out for more data
1204 GST_LOG_OBJECT (pad,
1205 "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT,
1206 GST_STIME_ARGS (ctx->in_running_time),
1207 GST_STIME_ARGS (splitmux->max_in_running_time));
1209 if (ctx->in_running_time < splitmux->max_in_running_time) {
1214 GST_LOG_OBJECT (pad,
1215 "Collected last packet of GOP. Checking other pads");
1216 check_completed_gop (splitmux, ctx);
1218 case SPLITMUX_STATE_ENDING_FILE:{
1221 /* If somes streams received no buffer during the last GOP that overran,
1222 * because its next buffer has a timestamp bigger than
1223 * ctx->max_in_running_time, its queue is empty. In that case the only
1224 * way to wakeup the output thread is by injecting an event in the
1225 * queue. This usually happen with subtitle streams.
1226 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1227 GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1228 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1229 GST_EVENT_TYPE_SERIALIZED,
1230 gst_structure_new ("splitmuxsink-unblock", "timestamp",
1231 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
1233 GST_SPLITMUX_UNLOCK (splitmux);
1234 gst_pad_send_event (ctx->sinkpad, event);
1235 GST_SPLITMUX_LOCK (splitmux);
1238 case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1239 /* A fragment is ending, wait until that's done before continuing */
1240 GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1241 GST_SPLITMUX_WAIT (splitmux);
1242 GST_DEBUG_OBJECT (pad,
1243 "Done sleeping for fragment restart state now %d", splitmux->state);
1249 } while (loop_again);
1252 splitmux->queued_gops++;
1253 buf_info->keyframe = TRUE;
1256 /* Now add this buffer to the queue just before returning */
1257 g_queue_push_head (&ctx->queued_bufs, buf_info);
1259 /* Check the buffer will fit in the mq */
1260 check_queue_length (splitmux, ctx);
1262 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1263 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
1265 GST_SPLITMUX_UNLOCK (splitmux);
1266 return GST_PAD_PROBE_PASS;
1269 GST_SPLITMUX_UNLOCK (splitmux);
1271 mq_stream_buf_free (buf_info);
1272 return GST_PAD_PROBE_PASS;
1276 gst_splitmux_sink_request_new_pad (GstElement * element,
1277 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1279 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1280 GstPadTemplate *mux_template = NULL;
1282 GstPad *mq_sink, *mq_src;
1284 gboolean is_video = FALSE;
1287 GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1289 GST_SPLITMUX_LOCK (splitmux);
1290 if (!create_elements (splitmux))
1293 if (templ->name_template) {
1294 if (g_str_equal (templ->name_template, "video")) {
1295 /* FIXME: Look for a pad template with matching caps, rather than by name */
1297 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1298 (splitmux->muxer), "video_%u");
1303 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1304 (splitmux->muxer), templ->name_template);
1306 if (mux_template == NULL) {
1307 /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1309 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1310 (splitmux->muxer), "sink_%d");
1314 res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1319 gname = g_strdup ("video");
1320 else if (name == NULL)
1321 gname = gst_pad_get_name (res);
1323 gname = g_strdup (name);
1325 if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1326 gst_element_release_request_pad (splitmux->muxer, res);
1327 gst_object_unref (GST_OBJECT (res));
1331 if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1332 gst_element_release_request_pad (splitmux->muxer, res);
1333 gst_object_unref (GST_OBJECT (res));
1334 gst_element_release_request_pad (splitmux->mq, mq_sink);
1335 gst_object_unref (GST_OBJECT (mq_sink));
1339 gst_object_unref (GST_OBJECT (res));
1341 ctx = mq_stream_ctx_new (splitmux);
1342 ctx->srcpad = mq_src;
1343 ctx->sinkpad = mq_sink;
1345 mq_stream_ctx_ref (ctx);
1346 ctx->src_pad_block_id =
1347 gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1348 (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1349 _pad_block_destroy_src_notify);
1350 if (is_video && splitmux->reference_ctx != NULL) {
1351 splitmux->reference_ctx->is_reference = FALSE;
1352 splitmux->reference_ctx = NULL;
1354 if (splitmux->reference_ctx == NULL) {
1355 splitmux->reference_ctx = ctx;
1356 ctx->is_reference = TRUE;
1359 res = gst_ghost_pad_new (gname, mq_sink);
1360 g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1362 mq_stream_ctx_ref (ctx);
1363 ctx->sink_pad_block_id =
1364 gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1365 (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1366 _pad_block_destroy_sink_notify);
1368 GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1369 " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1371 splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1375 gst_object_unref (mq_sink);
1376 gst_object_unref (mq_src);
1378 gst_pad_set_active (res, TRUE);
1379 gst_element_add_pad (element, res);
1380 GST_SPLITMUX_UNLOCK (splitmux);
1384 GST_SPLITMUX_UNLOCK (splitmux);
1389 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1391 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1392 GstPad *mqsink, *mqsrc, *muxpad;
1394 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1396 GST_SPLITMUX_LOCK (splitmux);
1398 if (splitmux->muxer == NULL || splitmux->mq == NULL)
1399 goto fail; /* Elements don't exist yet - nothing to release */
1401 GST_INFO_OBJECT (pad, "releasing request pad");
1403 mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1404 mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1405 muxpad = gst_pad_get_peer (mqsrc);
1407 /* Remove the context from our consideration */
1408 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1410 if (ctx->sink_pad_block_id)
1411 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1413 if (ctx->src_pad_block_id)
1414 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1416 /* Can release the context now */
1417 mq_stream_ctx_unref (ctx);
1419 /* Release and free the mq input */
1420 gst_element_release_request_pad (splitmux->mq, mqsink);
1422 /* Release and free the muxer input */
1423 gst_element_release_request_pad (splitmux->muxer, muxpad);
1425 gst_object_unref (mqsink);
1426 gst_object_unref (mqsrc);
1427 gst_object_unref (muxpad);
1429 gst_element_remove_pad (element, pad);
1431 /* Reset the internal elements only after all request pads are released */
1432 if (splitmux->contexts == NULL)
1433 gst_splitmux_reset (splitmux);
1436 GST_SPLITMUX_UNLOCK (splitmux);
1440 create_element (GstSplitMuxSink * splitmux,
1441 const gchar * factory, const gchar * name)
1443 GstElement *ret = gst_element_factory_make (factory, name);
1445 g_warning ("Failed to create %s - splitmuxsink will not work", name);
1449 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1450 g_warning ("Could not add %s element - splitmuxsink will not work", name);
1451 gst_object_unref (ret);
1459 create_elements (GstSplitMuxSink * splitmux)
1461 /* Create internal elements */
1462 if (splitmux->mq == NULL) {
1464 create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1467 splitmux->mq_max_buffers = 5;
1468 /* No bytes or time limit, we limit buffers manually */
1469 g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1470 (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1473 if (splitmux->muxer == NULL) {
1474 GstElement *provided_muxer = NULL;
1476 GST_OBJECT_LOCK (splitmux);
1477 if (splitmux->provided_muxer != NULL)
1478 provided_muxer = gst_object_ref (splitmux->provided_muxer);
1479 GST_OBJECT_UNLOCK (splitmux);
1481 if (provided_muxer == NULL) {
1482 if ((splitmux->muxer =
1483 create_element (splitmux, "mp4mux", "muxer")) == NULL)
1486 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1487 g_warning ("Could not add muxer element - splitmuxsink will not work");
1488 gst_object_unref (provided_muxer);
1492 splitmux->muxer = provided_muxer;
1493 gst_object_unref (provided_muxer);
1503 find_sink (GstElement * e)
1505 GstElement *res = NULL;
1507 gboolean done = FALSE;
1508 GValue data = { 0, };
1510 if (!GST_IS_BIN (e))
1513 iter = gst_bin_iterate_sinks (GST_BIN (e));
1515 switch (gst_iterator_next (iter, &data)) {
1516 case GST_ITERATOR_OK:
1518 GstElement *child = g_value_get_object (&data);
1519 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1520 "location") != NULL) {
1524 g_value_reset (&data);
1527 case GST_ITERATOR_RESYNC:
1528 gst_iterator_resync (iter);
1530 case GST_ITERATOR_DONE:
1533 case GST_ITERATOR_ERROR:
1534 g_assert_not_reached ();
1538 g_value_unset (&data);
1539 gst_iterator_free (iter);
1545 create_sink (GstSplitMuxSink * splitmux)
1547 GstElement *provided_sink = NULL;
1549 if (splitmux->active_sink == NULL) {
1551 GST_OBJECT_LOCK (splitmux);
1552 if (splitmux->provided_sink != NULL)
1553 provided_sink = gst_object_ref (splitmux->provided_sink);
1554 GST_OBJECT_UNLOCK (splitmux);
1556 if (provided_sink == NULL) {
1557 if ((splitmux->sink =
1558 create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1560 splitmux->active_sink = splitmux->sink;
1562 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1563 g_warning ("Could not add sink elements - splitmuxsink will not work");
1564 gst_object_unref (provided_sink);
1568 splitmux->active_sink = provided_sink;
1570 /* The bin holds a ref now, we can drop our tmp ref */
1571 gst_object_unref (provided_sink);
1573 /* Find the sink element */
1574 splitmux->sink = find_sink (splitmux->active_sink);
1575 if (splitmux->sink == NULL) {
1577 ("Could not locate sink element in provided sink - splitmuxsink will not work");
1582 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1583 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1594 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1597 set_next_filename (GstSplitMuxSink * splitmux)
1599 gchar *fname = NULL;
1600 gst_splitmux_sink_ensure_max_files (splitmux);
1602 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1603 splitmux->fragment_id, &fname);
1606 fname = splitmux->location ?
1607 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1610 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1611 g_object_set (splitmux->sink, "location", fname, NULL);
1614 splitmux->fragment_id++;
1618 static GstStateChangeReturn
1619 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1621 GstStateChangeReturn ret;
1622 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1624 switch (transition) {
1625 case GST_STATE_CHANGE_NULL_TO_READY:{
1626 GST_SPLITMUX_LOCK (splitmux);
1627 if (!create_elements (splitmux) || !create_sink (splitmux)) {
1628 ret = GST_STATE_CHANGE_FAILURE;
1629 GST_SPLITMUX_UNLOCK (splitmux);
1632 GST_SPLITMUX_UNLOCK (splitmux);
1633 splitmux->fragment_id = 0;
1634 set_next_filename (splitmux);
1637 case GST_STATE_CHANGE_READY_TO_PAUSED:{
1638 GST_SPLITMUX_LOCK (splitmux);
1639 /* Start by collecting one input on each pad */
1640 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1641 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
1642 splitmux->muxed_out_time = splitmux->mux_start_time =
1643 GST_CLOCK_STIME_NONE;
1644 splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1645 splitmux->opening_first_fragment = TRUE;
1646 GST_SPLITMUX_UNLOCK (splitmux);
1649 case GST_STATE_CHANGE_PAUSED_TO_READY:
1650 case GST_STATE_CHANGE_READY_TO_NULL:
1651 GST_SPLITMUX_LOCK (splitmux);
1652 splitmux->state = SPLITMUX_STATE_STOPPED;
1653 /* Wake up any blocked threads */
1654 GST_LOG_OBJECT (splitmux,
1655 "State change -> NULL or READY. Waking threads");
1656 GST_SPLITMUX_BROADCAST (splitmux);
1657 GST_SPLITMUX_UNLOCK (splitmux);
1663 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1664 if (ret == GST_STATE_CHANGE_FAILURE)
1667 switch (transition) {
1668 case GST_STATE_CHANGE_READY_TO_NULL:
1669 GST_SPLITMUX_LOCK (splitmux);
1670 splitmux->fragment_id = 0;
1671 /* Reset internal elements only if no pad contexts are using them */
1672 if (splitmux->contexts == NULL)
1673 gst_splitmux_reset (splitmux);
1674 GST_SPLITMUX_UNLOCK (splitmux);
1682 if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1683 ret == GST_STATE_CHANGE_FAILURE) {
1684 /* Cleanup elements on failed transition out of NULL */
1685 gst_splitmux_reset (splitmux);
1691 register_splitmuxsink (GstPlugin * plugin)
1693 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1694 "Split File Muxing Sink");
1696 return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1697 GST_TYPE_SPLITMUX_SINK);
1701 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
1703 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
1704 splitmux->fragment_id = 0;