1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @short_description: Muxer wrapper for splitting output stream by size or time
24 * This element wraps a muxer and a sink, and starts a new file when the mux
25 * contents are about to cross a threshold of maximum size of maximum time,
26 * splitting at video keyframe boundaries. Exactly one input video stream
27 * can be muxed, with as many accompanying audio and subtitle streams as
30 * By default, it uses mp4mux and filesink, but they can be changed via
31 * the 'muxer' and 'sink' properties.
33 * The minimum file size is 1 GOP, however - so limits may be overrun if the
34 * distance between any 2 keyframes is larger than the limits.
36 * If a video stream is available, the splitting process is driven by the video
37 * stream contents, and the video stream must contain closed GOPs for the output
38 * file parts to be played individually correctly. In the absence of a video
39 * stream, the first available stream is used as reference for synchronization.
42 * <title>Example pipelines</title>
44 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
46 * Records a video stream captured from a v4l2 device and muxes it into
47 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48 * and 1MB maximum size.
57 #include <glib/gstdio.h>
58 #include "gstsplitmuxsink.h"
60 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
61 #define GST_CAT_DEFAULT splitmux_debug
63 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
64 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
65 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
66 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
80 #define DEFAULT_MAX_SIZE_TIME 0
81 #define DEFAULT_MAX_SIZE_BYTES 0
82 #define DEFAULT_MAX_FILES 0
83 #define DEFAULT_MUXER_OVERHEAD 0.02
84 #define DEFAULT_MUXER "mp4mux"
85 #define DEFAULT_SINK "filesink"
89 SIGNAL_FORMAT_LOCATION,
93 static guint signals[SIGNAL_LAST];
95 static GstStaticPadTemplate video_sink_template =
96 GST_STATIC_PAD_TEMPLATE ("video",
100 static GstStaticPadTemplate audio_sink_template =
101 GST_STATIC_PAD_TEMPLATE ("audio_%u",
104 GST_STATIC_CAPS_ANY);
105 static GstStaticPadTemplate subtitle_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
109 GST_STATIC_CAPS_ANY);
111 static GQuark PAD_CONTEXT;
116 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
119 #define gst_splitmux_sink_parent_class parent_class
120 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
123 static gboolean create_elements (GstSplitMuxSink * splitmux);
124 static gboolean create_sink (GstSplitMuxSink * splitmux);
125 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
126 const GValue * value, GParamSpec * pspec);
127 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
128 GValue * value, GParamSpec * pspec);
129 static void gst_splitmux_sink_dispose (GObject * object);
130 static void gst_splitmux_sink_finalize (GObject * object);
132 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
133 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
134 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
136 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
137 element, GstStateChange transition);
139 static void bus_handler (GstBin * bin, GstMessage * msg);
140 static void set_next_filename (GstSplitMuxSink * splitmux);
141 static void start_next_fragment (GstSplitMuxSink * splitmux);
142 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
143 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
145 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
148 mq_stream_buf_new (void)
150 return g_slice_new0 (MqStreamBuf);
154 mq_stream_buf_free (MqStreamBuf * data)
156 g_slice_free (MqStreamBuf, data);
160 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
162 GObjectClass *gobject_class = (GObjectClass *) klass;
163 GstElementClass *gstelement_class = (GstElementClass *) klass;
164 GstBinClass *gstbin_class = (GstBinClass *) klass;
166 gobject_class->set_property = gst_splitmux_sink_set_property;
167 gobject_class->get_property = gst_splitmux_sink_get_property;
168 gobject_class->dispose = gst_splitmux_sink_dispose;
169 gobject_class->finalize = gst_splitmux_sink_finalize;
171 gst_element_class_set_static_metadata (gstelement_class,
172 "Split Muxing Bin", "Generic/Bin/Muxer",
173 "Convenience bin that muxes incoming streams into multiple time/size limited files",
174 "Jan Schmidt <jan@centricular.com>");
176 gst_element_class_add_static_pad_template (gstelement_class,
177 &video_sink_template);
178 gst_element_class_add_static_pad_template (gstelement_class,
179 &audio_sink_template);
180 gst_element_class_add_static_pad_template (gstelement_class,
181 &subtitle_sink_template);
183 gstelement_class->change_state =
184 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
185 gstelement_class->request_new_pad =
186 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
187 gstelement_class->release_pad =
188 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
190 gstbin_class->handle_message = bus_handler;
192 g_object_class_install_property (gobject_class, PROP_LOCATION,
193 g_param_spec_string ("location", "File Output Pattern",
194 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
195 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
196 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
197 g_param_spec_double ("mux-overhead", "Muxing Overhead",
198 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
199 DEFAULT_MUXER_OVERHEAD,
200 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
202 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
203 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
204 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
205 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
207 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
208 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
209 DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
211 g_param_spec_uint ("max-files", "Max files",
212 "Maximum number of files to keep on disk. Once the maximum is reached,"
213 "old files start to be deleted to make room for new ones.",
214 0, G_MAXUINT, DEFAULT_MAX_FILES,
215 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
218 g_object_class_install_property (gobject_class, PROP_MUXER,
219 g_param_spec_object ("muxer", "Muxer",
220 "The muxer element to use (NULL = default mp4mux)",
221 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222 g_object_class_install_property (gobject_class, PROP_SINK,
223 g_param_spec_object ("sink", "Sink",
224 "The sink element (or element chain) to use (NULL = default filesink)",
225 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
228 * GstSplitMuxSink::format-location:
229 * @splitmux: the #GstSplitMuxSink
230 * @fragment_id: the sequence number of the file to be created
232 * Returns: the location to be used for the next output file
234 signals[SIGNAL_FORMAT_LOCATION] =
235 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
236 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
240 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
242 g_mutex_init (&splitmux->lock);
243 g_cond_init (&splitmux->data_cond);
245 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
246 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
247 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
248 splitmux->max_files = DEFAULT_MAX_FILES;
250 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
254 gst_splitmux_reset (GstSplitMuxSink * splitmux)
257 gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
259 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
260 if (splitmux->active_sink)
261 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
263 splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
268 gst_splitmux_sink_dispose (GObject * object)
270 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
272 G_OBJECT_CLASS (parent_class)->dispose (object);
274 /* Calling parent dispose invalidates all child pointers */
275 splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
280 gst_splitmux_sink_finalize (GObject * object)
282 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
283 g_cond_clear (&splitmux->data_cond);
284 g_mutex_clear (&splitmux->lock);
285 if (splitmux->provided_sink)
286 gst_object_unref (splitmux->provided_sink);
287 if (splitmux->provided_muxer)
288 gst_object_unref (splitmux->provided_muxer);
290 g_free (splitmux->location);
292 /* Make sure to free any un-released contexts */
293 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
294 g_list_free (splitmux->contexts);
296 G_OBJECT_CLASS (parent_class)->finalize (object);
300 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
301 const GValue * value, GParamSpec * pspec)
303 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
307 GST_OBJECT_LOCK (splitmux);
308 g_free (splitmux->location);
309 splitmux->location = g_value_dup_string (value);
310 GST_OBJECT_UNLOCK (splitmux);
313 case PROP_MAX_SIZE_BYTES:
314 GST_OBJECT_LOCK (splitmux);
315 splitmux->threshold_bytes = g_value_get_uint64 (value);
316 GST_OBJECT_UNLOCK (splitmux);
318 case PROP_MAX_SIZE_TIME:
319 GST_OBJECT_LOCK (splitmux);
320 splitmux->threshold_time = g_value_get_uint64 (value);
321 GST_OBJECT_UNLOCK (splitmux);
324 GST_OBJECT_LOCK (splitmux);
325 splitmux->max_files = g_value_get_uint (value);
326 GST_OBJECT_UNLOCK (splitmux);
328 case PROP_MUXER_OVERHEAD:
329 GST_OBJECT_LOCK (splitmux);
330 splitmux->mux_overhead = g_value_get_double (value);
331 GST_OBJECT_UNLOCK (splitmux);
334 GST_OBJECT_LOCK (splitmux);
335 if (splitmux->provided_sink)
336 gst_object_unref (splitmux->provided_sink);
337 splitmux->provided_sink = g_value_dup_object (value);
338 GST_OBJECT_UNLOCK (splitmux);
341 GST_OBJECT_LOCK (splitmux);
342 if (splitmux->provided_muxer)
343 gst_object_unref (splitmux->provided_muxer);
344 splitmux->provided_muxer = g_value_dup_object (value);
345 GST_OBJECT_UNLOCK (splitmux);
348 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
354 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
355 GValue * value, GParamSpec * pspec)
357 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
361 GST_OBJECT_LOCK (splitmux);
362 g_value_set_string (value, splitmux->location);
363 GST_OBJECT_UNLOCK (splitmux);
365 case PROP_MAX_SIZE_BYTES:
366 GST_OBJECT_LOCK (splitmux);
367 g_value_set_uint64 (value, splitmux->threshold_bytes);
368 GST_OBJECT_UNLOCK (splitmux);
370 case PROP_MAX_SIZE_TIME:
371 GST_OBJECT_LOCK (splitmux);
372 g_value_set_uint64 (value, splitmux->threshold_time);
373 GST_OBJECT_UNLOCK (splitmux);
376 GST_OBJECT_LOCK (splitmux);
377 g_value_set_uint (value, splitmux->max_files);
378 GST_OBJECT_UNLOCK (splitmux);
380 case PROP_MUXER_OVERHEAD:
381 GST_OBJECT_LOCK (splitmux);
382 g_value_set_double (value, splitmux->mux_overhead);
383 GST_OBJECT_UNLOCK (splitmux);
386 GST_OBJECT_LOCK (splitmux);
387 g_value_set_object (value, splitmux->provided_sink);
388 GST_OBJECT_UNLOCK (splitmux);
391 GST_OBJECT_LOCK (splitmux);
392 g_value_set_object (value, splitmux->provided_muxer);
393 GST_OBJECT_UNLOCK (splitmux);
396 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
401 /* Convenience function */
402 static inline GstClockTimeDiff
403 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
405 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
407 if (GST_CLOCK_TIME_IS_VALID (val)) {
409 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
419 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
421 gchar *tmp, *sinkname, *srcname;
424 sinkname = gst_pad_get_name (sink_pad);
426 srcname = g_strdup_printf ("src_%s", tmp);
428 mq_src = gst_element_get_static_pad (mq, srcname);
437 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
443 /* Request a pad from multiqueue, then connect this one, then
444 * discover the corresponding output pad and return both */
445 mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
449 mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
459 gst_element_release_request_pad (splitmux->mq, mq_sink);
464 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
468 ctx = g_new0 (MqStreamCtx, 1);
469 g_atomic_int_set (&ctx->refcount, 1);
470 ctx->splitmux = splitmux;
471 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
472 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
473 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
474 g_queue_init (&ctx->queued_bufs);
479 mq_stream_ctx_free (MqStreamCtx * ctx)
481 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
482 g_queue_clear (&ctx->queued_bufs);
487 mq_stream_ctx_unref (MqStreamCtx * ctx)
489 if (g_atomic_int_dec_and_test (&ctx->refcount))
490 mq_stream_ctx_free (ctx);
494 mq_stream_ctx_ref (MqStreamCtx * ctx)
496 g_atomic_int_inc (&ctx->refcount);
500 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
502 ctx->sink_pad_block_id = 0;
503 mq_stream_ctx_unref (ctx);
507 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
509 ctx->src_pad_block_id = 0;
510 mq_stream_ctx_unref (ctx);
514 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
516 gchar *location = NULL;
518 const gchar *msg_name = opened ?
519 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
521 g_object_get (splitmux->sink, "location", &location, NULL);
523 msg = gst_message_new_element (GST_OBJECT (splitmux),
524 gst_structure_new (msg_name,
525 "location", G_TYPE_STRING, location,
526 "running-time", GST_TYPE_CLOCK_TIME,
527 splitmux->reference_ctx->out_running_time, NULL));
528 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
533 /* Called with lock held, drops the lock to send EOS to the
537 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
542 eos = gst_event_new_eos ();
543 pad = gst_pad_get_peer (ctx->srcpad);
547 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
548 GST_SPLITMUX_UNLOCK (splitmux);
549 gst_pad_send_event (pad, eos);
550 GST_SPLITMUX_LOCK (splitmux);
552 gst_object_unref (pad);
555 /* Called with splitmux lock held to check if this output
556 * context needs to sleep to wait for the release of the
557 * next GOP, or to send EOS to close out the current file
560 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
564 GST_LOG_OBJECT (ctx->srcpad,
565 "Checking running time %" GST_STIME_FORMAT " against max %"
566 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
567 GST_STIME_ARGS (splitmux->max_out_running_time));
569 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
570 ctx->out_running_time < splitmux->max_out_running_time) {
571 splitmux->have_muxed_something = TRUE;
575 if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
578 if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
579 if (ctx->out_eos == FALSE) {
580 send_eos (splitmux, ctx);
583 } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
584 start_next_fragment (splitmux);
588 GST_INFO_OBJECT (ctx->srcpad,
589 "Sleeping for running time %"
590 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")",
591 GST_STIME_ARGS (ctx->out_running_time),
592 GST_STIME_ARGS (splitmux->max_out_running_time));
593 ctx->out_blocked = TRUE;
594 /* Expand the mq if needed before sleeping */
595 check_queue_length (splitmux, ctx);
596 GST_SPLITMUX_WAIT (splitmux);
597 ctx->out_blocked = FALSE;
598 GST_INFO_OBJECT (ctx->srcpad,
599 "Woken for new max running time %" GST_STIME_FORMAT,
600 GST_STIME_ARGS (splitmux->max_out_running_time));
604 static GstPadProbeReturn
605 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
607 GstSplitMuxSink *splitmux = ctx->splitmux;
608 MqStreamBuf *buf_info = NULL;
610 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
612 /* FIXME: Handle buffer lists, until then make it clear they won't work */
613 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
614 g_warning ("Buffer list handling not implemented");
615 return GST_PAD_PROBE_DROP;
617 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
618 GstEvent *event = gst_pad_probe_info_get_event (info);
620 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
622 switch (GST_EVENT_TYPE (event)) {
623 case GST_EVENT_SEGMENT:
624 gst_event_copy_segment (event, &ctx->out_segment);
626 case GST_EVENT_FLUSH_STOP:
627 GST_SPLITMUX_LOCK (splitmux);
628 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
629 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
630 g_queue_clear (&ctx->queued_bufs);
631 ctx->flushing = FALSE;
632 GST_SPLITMUX_UNLOCK (splitmux);
634 case GST_EVENT_FLUSH_START:
635 GST_SPLITMUX_LOCK (splitmux);
636 GST_LOG_OBJECT (pad, "Flush start");
637 ctx->flushing = TRUE;
638 GST_SPLITMUX_BROADCAST (splitmux);
639 GST_SPLITMUX_UNLOCK (splitmux);
642 GST_SPLITMUX_LOCK (splitmux);
643 if (splitmux->state == SPLITMUX_STATE_STOPPED)
646 GST_SPLITMUX_UNLOCK (splitmux);
650 GstClockTimeDiff rtime;
652 gst_event_parse_gap (event, &gap_ts, NULL);
653 if (gap_ts == GST_CLOCK_TIME_NONE)
656 GST_SPLITMUX_LOCK (splitmux);
658 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
660 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
661 GST_STIME_ARGS (rtime));
663 if (splitmux->state == SPLITMUX_STATE_STOPPED)
666 if (rtime != GST_CLOCK_STIME_NONE) {
667 ctx->out_running_time = rtime;
668 complete_or_wait_on_out (splitmux, ctx);
670 GST_SPLITMUX_UNLOCK (splitmux);
673 case GST_EVENT_CUSTOM_DOWNSTREAM:{
674 const GstStructure *s;
675 GstClockTimeDiff ts = 0;
677 s = gst_event_get_structure (event);
678 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
681 gst_structure_get_int64 (s, "timestamp", &ts);
683 GST_SPLITMUX_LOCK (splitmux);
685 if (splitmux->state == SPLITMUX_STATE_STOPPED)
687 ctx->out_running_time = ts;
688 complete_or_wait_on_out (splitmux, ctx);
689 GST_SPLITMUX_UNLOCK (splitmux);
690 return GST_PAD_PROBE_DROP;
695 return GST_PAD_PROBE_PASS;
698 /* Allow everything through until the configured next stopping point */
699 GST_SPLITMUX_LOCK (splitmux);
701 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
702 if (buf_info == NULL)
703 /* Can only happen due to a poorly timed flush */
706 /* If we have popped a keyframe, decrement the queued_gop count */
707 if (buf_info->keyframe && splitmux->queued_gops > 0)
708 splitmux->queued_gops--;
710 ctx->out_running_time = buf_info->run_ts;
712 GST_LOG_OBJECT (splitmux,
713 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
714 " size %" G_GSIZE_FORMAT,
715 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
717 if (splitmux->opening_first_fragment) {
718 send_fragment_opened_closed_msg (splitmux, TRUE);
719 splitmux->opening_first_fragment = FALSE;
722 complete_or_wait_on_out (splitmux, ctx);
724 if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE ||
725 splitmux->muxed_out_time < buf_info->run_ts)
726 splitmux->muxed_out_time = buf_info->run_ts;
728 splitmux->muxed_out_bytes += buf_info->buf_size;
730 #ifndef GST_DISABLE_GST_DEBUG
732 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
733 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
734 " run ts %" GST_STIME_FORMAT, buf,
735 GST_STIME_ARGS (ctx->out_running_time));
739 GST_SPLITMUX_UNLOCK (splitmux);
741 mq_stream_buf_free (buf_info);
743 return GST_PAD_PROBE_PASS;
746 GST_SPLITMUX_UNLOCK (splitmux);
747 return GST_PAD_PROBE_DROP;
751 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
753 return gst_pad_send_event (peer, gst_event_ref (*event));
757 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
759 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
761 gst_pad_sticky_events_foreach (ctx->srcpad,
762 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
764 /* Clear EOS flag if not actually EOS */
765 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
767 gst_object_unref (peer);
770 /* Called with lock held when a fragment
771 * reaches EOS and it is time to restart
775 start_next_fragment (GstSplitMuxSink * splitmux)
777 /* 1 change to new file */
778 splitmux->switching_fragment = TRUE;
780 gst_element_set_locked_state (splitmux->muxer, TRUE);
781 gst_element_set_locked_state (splitmux->active_sink, TRUE);
782 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
783 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
785 set_next_filename (splitmux);
787 gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
788 gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
789 gst_element_set_locked_state (splitmux->muxer, FALSE);
790 gst_element_set_locked_state (splitmux->active_sink, FALSE);
792 splitmux->switching_fragment = FALSE;
794 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
796 /* Switch state and go back to processing */
797 if (!splitmux->reference_ctx->in_eos) {
798 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
799 splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
801 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
802 splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
803 splitmux->have_muxed_something = FALSE;
805 splitmux->have_muxed_something =
806 (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
808 /* Store the overflow parameters as the basis for the next fragment */
809 splitmux->mux_start_time = splitmux->muxed_out_time;
810 splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
812 GST_DEBUG_OBJECT (splitmux,
813 "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
814 GST_STIME_ARGS (splitmux->max_out_running_time));
816 send_fragment_opened_closed_msg (splitmux, TRUE);
818 GST_SPLITMUX_BROADCAST (splitmux);
822 bus_handler (GstBin * bin, GstMessage * message)
824 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
826 switch (GST_MESSAGE_TYPE (message)) {
827 case GST_MESSAGE_EOS:
828 /* If the state is draining out the current file, drop this EOS */
829 GST_SPLITMUX_LOCK (splitmux);
831 send_fragment_opened_closed_msg (splitmux, FALSE);
833 if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
834 splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) {
835 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
836 splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
837 GST_SPLITMUX_BROADCAST (splitmux);
839 gst_message_unref (message);
840 GST_SPLITMUX_UNLOCK (splitmux);
843 GST_SPLITMUX_UNLOCK (splitmux);
845 case GST_MESSAGE_ASYNC_START:
846 case GST_MESSAGE_ASYNC_DONE:
847 /* Ignore state changes from our children while switching */
848 if (splitmux->switching_fragment) {
849 if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink ||
850 GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
851 GST_LOG_OBJECT (splitmux,
852 "Ignoring state change from child %" GST_PTR_FORMAT
853 " while switching", GST_MESSAGE_SRC (message));
854 gst_message_unref (message);
863 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
866 /* Called with splitmux lock held */
867 /* Called when entering ProcessingCompleteGop state
868 * Assess if mq contents overflowed the current file
869 * -> If yes, need to switch to new file
870 * -> if no, set max_out_running_time to let this GOP in and
871 * go to COLLECTING_GOP_START state
874 handle_gathered_gop (GstSplitMuxSink * splitmux)
877 gsize queued_bytes = 0;
878 GstClockTimeDiff queued_time = 0;
880 /* Assess if the multiqueue contents overflowed the current file */
881 for (cur = g_list_first (splitmux->contexts);
882 cur != NULL; cur = g_list_next (cur)) {
883 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
884 if (tmpctx->in_running_time > queued_time)
885 queued_time = tmpctx->in_running_time;
886 queued_bytes += tmpctx->in_bytes;
889 g_assert (queued_bytes >= splitmux->mux_start_bytes);
890 g_assert (queued_time >= splitmux->mux_start_time);
892 queued_bytes -= splitmux->mux_start_bytes;
893 queued_time -= splitmux->mux_start_time;
895 /* Expand queued bytes estimate by muxer overhead */
896 queued_bytes += (queued_bytes * splitmux->mux_overhead);
898 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
899 " bytes %" G_GSIZE_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
901 /* Check for overrun - have we output at least one byte and overrun
902 * either threshold? */
903 if ((splitmux->have_muxed_something &&
904 ((splitmux->threshold_bytes > 0 &&
905 queued_bytes > splitmux->threshold_bytes) ||
906 (splitmux->threshold_time > 0 &&
907 queued_time > splitmux->threshold_time)))) {
909 splitmux->state = SPLITMUX_STATE_ENDING_FILE;
911 GST_INFO_OBJECT (splitmux,
912 "mq overflowed since last, draining out. max out TS is %"
913 GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
914 GST_SPLITMUX_BROADCAST (splitmux);
918 GST_LOG_OBJECT (splitmux,
919 "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
920 " queued %" G_GSIZE_FORMAT " time %" GST_STIME_FORMAT " Continuing.",
921 splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
922 queued_bytes, GST_STIME_ARGS (queued_time));
924 /* Wake everyone up to push this one GOP, then sleep */
925 splitmux->have_muxed_something = TRUE;
927 if (!splitmux->reference_ctx->in_eos) {
928 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
929 splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
931 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
932 splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
935 GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
936 GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
937 GST_SPLITMUX_BROADCAST (splitmux);
942 /* Called with splitmux lock held */
943 /* Called from each input pad when it is has all the pieces
944 * for a GOP or EOS, starting with the reference pad which has set the
945 * splitmux->max_in_running_time
948 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
951 gboolean ready = TRUE;
952 GstClockTimeDiff current_max_in_running_time;
954 if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
955 /* Iterate each pad, and check that the input running time is at least
956 * up to the reference running time, and if so handle the collected GOP */
957 GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
958 GST_STIME_FORMAT " ctx %p",
959 GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
960 for (cur = g_list_first (splitmux->contexts); cur != NULL;
961 cur = g_list_next (cur)) {
962 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
964 GST_LOG_OBJECT (splitmux,
965 "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
966 " EOS %d", tmpctx, tmpctx->srcpad,
967 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
969 if (splitmux->max_in_running_time != G_MAXINT64 &&
970 tmpctx->in_running_time < splitmux->max_in_running_time &&
972 GST_LOG_OBJECT (splitmux,
973 "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
974 tmpctx, tmpctx->srcpad);
980 GST_DEBUG_OBJECT (splitmux,
981 "Collected GOP is complete. Processing (ctx %p)", ctx);
982 /* All pads have a complete GOP, release it into the multiqueue */
983 handle_gathered_gop (splitmux);
987 /* If upstream reached EOS we are not expecting more data, no need to wait
992 /* Some pad is not yet ready, or GOP is being pushed
993 * either way, sleep and wait to get woken */
994 current_max_in_running_time = splitmux->max_in_running_time;
995 while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
996 splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
998 (current_max_in_running_time == splitmux->max_in_running_time)) {
1000 GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
1001 splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
1002 "GOP complete" : "EOF draining", ctx);
1003 GST_SPLITMUX_WAIT (splitmux);
1005 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
1010 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1013 guint cur_len = g_queue_get_length (&ctx->queued_bufs);
1015 GST_DEBUG_OBJECT (ctx->sinkpad,
1016 "Checking queue length len %u cur_max %u queued gops %u",
1017 cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
1019 if (cur_len >= splitmux->mq_max_buffers) {
1020 gboolean allow_grow = FALSE;
1022 /* If collecting a GOP and this pad might block,
1023 * and there isn't already a pending GOP in the queue
1026 if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
1027 ctx->in_running_time < splitmux->max_in_running_time &&
1028 splitmux->queued_gops <= 1) {
1030 } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
1031 ctx->is_reference && splitmux->queued_gops <= 1) {
1036 for (cur = g_list_first (splitmux->contexts);
1037 cur != NULL; cur = g_list_next (cur)) {
1038 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1039 GST_DEBUG_OBJECT (tmpctx->sinkpad,
1040 " len %u out_blocked %d",
1041 g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
1042 /* If another stream is starving, grow */
1043 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1050 splitmux->mq_max_buffers = cur_len + 1;
1052 GST_INFO_OBJECT (splitmux,
1053 "Multiqueue overrun - enlarging to %u buffers ctx %p",
1054 splitmux->mq_max_buffers, ctx);
1056 g_object_set (splitmux->mq, "max-size-buffers",
1057 splitmux->mq_max_buffers, NULL);
1062 static GstPadProbeReturn
1063 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1065 GstSplitMuxSink *splitmux = ctx->splitmux;
1067 MqStreamBuf *buf_info = NULL;
1069 gboolean loop_again;
1070 gboolean keyframe = FALSE;
1072 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1074 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1075 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1076 g_warning ("Buffer list handling not implemented");
1077 return GST_PAD_PROBE_DROP;
1079 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1080 GstEvent *event = gst_pad_probe_info_get_event (info);
1081 switch (GST_EVENT_TYPE (event)) {
1082 case GST_EVENT_SEGMENT:
1083 gst_event_copy_segment (event, &ctx->in_segment);
1085 case GST_EVENT_FLUSH_STOP:
1086 GST_SPLITMUX_LOCK (splitmux);
1087 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1088 ctx->in_eos = FALSE;
1090 ctx->in_running_time = GST_CLOCK_STIME_NONE;
1091 GST_SPLITMUX_UNLOCK (splitmux);
1094 GST_SPLITMUX_LOCK (splitmux);
1097 if (splitmux->state == SPLITMUX_STATE_STOPPED)
1100 if (ctx->is_reference) {
1101 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1102 /* Act as if this is a new keyframe with infinite timestamp */
1103 splitmux->max_in_running_time = G_MAXINT64;
1104 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1105 /* Wake up other input pads to collect this GOP */
1106 GST_SPLITMUX_BROADCAST (splitmux);
1107 check_completed_gop (splitmux, ctx);
1108 } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1109 /* If we are waiting for a GOP to be completed (ie, for aux
1110 * pads to catch up), then this pad is complete, so check
1111 * if the whole GOP is.
1113 check_completed_gop (splitmux, ctx);
1115 GST_SPLITMUX_UNLOCK (splitmux);
1120 return GST_PAD_PROBE_PASS;
1123 buf = gst_pad_probe_info_get_buffer (info);
1124 buf_info = mq_stream_buf_new ();
1126 if (GST_BUFFER_PTS_IS_VALID (buf))
1127 ts = GST_BUFFER_PTS (buf);
1129 ts = GST_BUFFER_DTS (buf);
1131 GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
1133 GST_SPLITMUX_LOCK (splitmux);
1135 if (splitmux->state == SPLITMUX_STATE_STOPPED)
1138 /* If this buffer has a timestamp, advance the input timestamp of the
1140 if (GST_CLOCK_TIME_IS_VALID (ts)) {
1141 GstClockTimeDiff running_time =
1142 my_segment_to_running_time (&ctx->in_segment, ts);
1144 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
1145 GST_STIME_ARGS (running_time));
1147 if (GST_CLOCK_STIME_IS_VALID (running_time)
1148 && running_time > ctx->in_running_time)
1149 ctx->in_running_time = running_time;
1152 /* Try to make sure we have a valid running time */
1153 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
1154 ctx->in_running_time =
1155 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
1158 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
1159 GST_STIME_ARGS (ctx->in_running_time));
1161 buf_info->run_ts = ctx->in_running_time;
1162 buf_info->buf_size = gst_buffer_get_size (buf);
1164 /* Update total input byte counter for overflow detect */
1165 ctx->in_bytes += buf_info->buf_size;
1167 /* initialize mux_start_time */
1168 if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) {
1169 splitmux->mux_start_time = buf_info->run_ts;
1170 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1171 GST_STIME_ARGS (splitmux->mux_start_time));
1172 /* Also take this as the first start time when starting up,
1173 * so that we start counting overflow from the first frame */
1174 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1175 splitmux->max_in_running_time = splitmux->mux_start_time;
1178 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
1179 " total in_bytes %" G_GSIZE_FORMAT,
1180 GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1187 switch (splitmux->state) {
1188 case SPLITMUX_STATE_COLLECTING_GOP_START:
1189 if (ctx->is_reference) {
1190 /* If a keyframe, we have a complete GOP */
1191 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1192 !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
1193 splitmux->max_in_running_time >= ctx->in_running_time) {
1194 /* Pass this buffer through */
1198 GST_INFO_OBJECT (pad,
1199 "Have keyframe with running time %" GST_STIME_FORMAT,
1200 GST_STIME_ARGS (ctx->in_running_time));
1202 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1203 splitmux->max_in_running_time = ctx->in_running_time;
1204 /* Wake up other input pads to collect this GOP */
1205 GST_SPLITMUX_BROADCAST (splitmux);
1206 check_completed_gop (splitmux, ctx);
1208 /* We're still waiting for a keyframe on the reference pad, sleep */
1209 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1210 GST_SPLITMUX_WAIT (splitmux);
1211 GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1215 case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1217 /* If we overran the target timestamp, it might be time to process
1218 * the GOP, otherwise bail out for more data
1220 GST_LOG_OBJECT (pad,
1221 "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT,
1222 GST_STIME_ARGS (ctx->in_running_time),
1223 GST_STIME_ARGS (splitmux->max_in_running_time));
1225 if (ctx->in_running_time < splitmux->max_in_running_time) {
1230 GST_LOG_OBJECT (pad,
1231 "Collected last packet of GOP. Checking other pads");
1232 check_completed_gop (splitmux, ctx);
1234 case SPLITMUX_STATE_ENDING_FILE:{
1237 /* If somes streams received no buffer during the last GOP that overran,
1238 * because its next buffer has a timestamp bigger than
1239 * ctx->max_in_running_time, its queue is empty. In that case the only
1240 * way to wakeup the output thread is by injecting an event in the
1241 * queue. This usually happen with subtitle streams.
1242 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1243 GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1244 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1245 GST_EVENT_TYPE_SERIALIZED,
1246 gst_structure_new ("splitmuxsink-unblock", "timestamp",
1247 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
1249 GST_SPLITMUX_UNLOCK (splitmux);
1250 gst_pad_send_event (ctx->sinkpad, event);
1251 GST_SPLITMUX_LOCK (splitmux);
1252 /* state may have changed while we were unlocked. Loop again if so */
1253 if (splitmux->state != SPLITMUX_STATE_ENDING_FILE)
1257 case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1258 /* A fragment is ending, wait until that's done before continuing */
1259 GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1260 GST_SPLITMUX_WAIT (splitmux);
1261 GST_DEBUG_OBJECT (pad,
1262 "Done sleeping for fragment restart state now %d", splitmux->state);
1268 } while (loop_again);
1271 splitmux->queued_gops++;
1272 buf_info->keyframe = TRUE;
1275 /* Now add this buffer to the queue just before returning */
1276 g_queue_push_head (&ctx->queued_bufs, buf_info);
1278 /* Check the buffer will fit in the mq */
1279 check_queue_length (splitmux, ctx);
1281 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1282 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
1284 GST_SPLITMUX_UNLOCK (splitmux);
1285 return GST_PAD_PROBE_PASS;
1288 GST_SPLITMUX_UNLOCK (splitmux);
1290 mq_stream_buf_free (buf_info);
1291 return GST_PAD_PROBE_PASS;
1295 gst_splitmux_sink_request_new_pad (GstElement * element,
1296 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1298 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1299 GstPadTemplate *mux_template = NULL;
1301 GstPad *mq_sink, *mq_src;
1303 gboolean is_video = FALSE;
1306 GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1308 GST_SPLITMUX_LOCK (splitmux);
1309 if (!create_elements (splitmux))
1312 if (templ->name_template) {
1313 if (g_str_equal (templ->name_template, "video")) {
1314 /* FIXME: Look for a pad template with matching caps, rather than by name */
1316 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1317 (splitmux->muxer), "video_%u");
1322 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1323 (splitmux->muxer), templ->name_template);
1325 if (mux_template == NULL) {
1326 /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1328 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1329 (splitmux->muxer), "sink_%d");
1333 res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1338 gname = g_strdup ("video");
1339 else if (name == NULL)
1340 gname = gst_pad_get_name (res);
1342 gname = g_strdup (name);
1344 if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1345 gst_element_release_request_pad (splitmux->muxer, res);
1346 gst_object_unref (GST_OBJECT (res));
1350 if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1351 gst_element_release_request_pad (splitmux->muxer, res);
1352 gst_object_unref (GST_OBJECT (res));
1353 gst_element_release_request_pad (splitmux->mq, mq_sink);
1354 gst_object_unref (GST_OBJECT (mq_sink));
1358 gst_object_unref (GST_OBJECT (res));
1360 ctx = mq_stream_ctx_new (splitmux);
1361 ctx->srcpad = mq_src;
1362 ctx->sinkpad = mq_sink;
1364 mq_stream_ctx_ref (ctx);
1365 ctx->src_pad_block_id =
1366 gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1367 (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1368 _pad_block_destroy_src_notify);
1369 if (is_video && splitmux->reference_ctx != NULL) {
1370 splitmux->reference_ctx->is_reference = FALSE;
1371 splitmux->reference_ctx = NULL;
1373 if (splitmux->reference_ctx == NULL) {
1374 splitmux->reference_ctx = ctx;
1375 ctx->is_reference = TRUE;
1378 res = gst_ghost_pad_new (gname, mq_sink);
1379 g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1381 mq_stream_ctx_ref (ctx);
1382 ctx->sink_pad_block_id =
1383 gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1384 (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1385 _pad_block_destroy_sink_notify);
1387 GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1388 " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1390 splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1394 gst_object_unref (mq_sink);
1395 gst_object_unref (mq_src);
1397 gst_pad_set_active (res, TRUE);
1398 gst_element_add_pad (element, res);
1399 GST_SPLITMUX_UNLOCK (splitmux);
1403 GST_SPLITMUX_UNLOCK (splitmux);
1408 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1410 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1411 GstPad *mqsink, *mqsrc, *muxpad;
1413 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1415 GST_SPLITMUX_LOCK (splitmux);
1417 if (splitmux->muxer == NULL || splitmux->mq == NULL)
1418 goto fail; /* Elements don't exist yet - nothing to release */
1420 GST_INFO_OBJECT (pad, "releasing request pad");
1422 mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1423 mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1424 muxpad = gst_pad_get_peer (mqsrc);
1426 /* Remove the context from our consideration */
1427 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1429 if (ctx->sink_pad_block_id)
1430 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1432 if (ctx->src_pad_block_id)
1433 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1435 /* Can release the context now */
1436 mq_stream_ctx_unref (ctx);
1438 /* Release and free the mq input */
1439 gst_element_release_request_pad (splitmux->mq, mqsink);
1441 /* Release and free the muxer input */
1442 gst_element_release_request_pad (splitmux->muxer, muxpad);
1444 gst_object_unref (mqsink);
1445 gst_object_unref (mqsrc);
1446 gst_object_unref (muxpad);
1448 gst_element_remove_pad (element, pad);
1450 /* Reset the internal elements only after all request pads are released */
1451 if (splitmux->contexts == NULL)
1452 gst_splitmux_reset (splitmux);
1455 GST_SPLITMUX_UNLOCK (splitmux);
1459 create_element (GstSplitMuxSink * splitmux,
1460 const gchar * factory, const gchar * name)
1462 GstElement *ret = gst_element_factory_make (factory, name);
1464 g_warning ("Failed to create %s - splitmuxsink will not work", name);
1468 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1469 g_warning ("Could not add %s element - splitmuxsink will not work", name);
1470 gst_object_unref (ret);
1478 create_elements (GstSplitMuxSink * splitmux)
1480 /* Create internal elements */
1481 if (splitmux->mq == NULL) {
1483 create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1486 splitmux->mq_max_buffers = 5;
1487 /* No bytes or time limit, we limit buffers manually */
1488 g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1489 (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1492 if (splitmux->muxer == NULL) {
1493 GstElement *provided_muxer = NULL;
1495 GST_OBJECT_LOCK (splitmux);
1496 if (splitmux->provided_muxer != NULL)
1497 provided_muxer = gst_object_ref (splitmux->provided_muxer);
1498 GST_OBJECT_UNLOCK (splitmux);
1500 if (provided_muxer == NULL) {
1501 if ((splitmux->muxer =
1502 create_element (splitmux, "mp4mux", "muxer")) == NULL)
1505 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1506 g_warning ("Could not add muxer element - splitmuxsink will not work");
1507 gst_object_unref (provided_muxer);
1511 splitmux->muxer = provided_muxer;
1512 gst_object_unref (provided_muxer);
1522 find_sink (GstElement * e)
1524 GstElement *res = NULL;
1526 gboolean done = FALSE;
1527 GValue data = { 0, };
1529 if (!GST_IS_BIN (e))
1532 iter = gst_bin_iterate_sinks (GST_BIN (e));
1534 switch (gst_iterator_next (iter, &data)) {
1535 case GST_ITERATOR_OK:
1537 GstElement *child = g_value_get_object (&data);
1538 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1539 "location") != NULL) {
1543 g_value_reset (&data);
1546 case GST_ITERATOR_RESYNC:
1547 gst_iterator_resync (iter);
1549 case GST_ITERATOR_DONE:
1552 case GST_ITERATOR_ERROR:
1553 g_assert_not_reached ();
1557 g_value_unset (&data);
1558 gst_iterator_free (iter);
1564 create_sink (GstSplitMuxSink * splitmux)
1566 GstElement *provided_sink = NULL;
1568 if (splitmux->active_sink == NULL) {
1570 GST_OBJECT_LOCK (splitmux);
1571 if (splitmux->provided_sink != NULL)
1572 provided_sink = gst_object_ref (splitmux->provided_sink);
1573 GST_OBJECT_UNLOCK (splitmux);
1575 if (provided_sink == NULL) {
1576 if ((splitmux->sink =
1577 create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1579 splitmux->active_sink = splitmux->sink;
1581 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1582 g_warning ("Could not add sink elements - splitmuxsink will not work");
1583 gst_object_unref (provided_sink);
1587 splitmux->active_sink = provided_sink;
1589 /* The bin holds a ref now, we can drop our tmp ref */
1590 gst_object_unref (provided_sink);
1592 /* Find the sink element */
1593 splitmux->sink = find_sink (splitmux->active_sink);
1594 if (splitmux->sink == NULL) {
1596 ("Could not locate sink element in provided sink - splitmuxsink will not work");
1601 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1602 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1613 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1616 set_next_filename (GstSplitMuxSink * splitmux)
1618 gchar *fname = NULL;
1619 gst_splitmux_sink_ensure_max_files (splitmux);
1621 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1622 splitmux->fragment_id, &fname);
1625 fname = splitmux->location ?
1626 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1629 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1630 g_object_set (splitmux->sink, "location", fname, NULL);
1633 splitmux->fragment_id++;
1637 static GstStateChangeReturn
1638 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1640 GstStateChangeReturn ret;
1641 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1643 switch (transition) {
1644 case GST_STATE_CHANGE_NULL_TO_READY:{
1645 GST_SPLITMUX_LOCK (splitmux);
1646 if (!create_elements (splitmux) || !create_sink (splitmux)) {
1647 ret = GST_STATE_CHANGE_FAILURE;
1648 GST_SPLITMUX_UNLOCK (splitmux);
1651 GST_SPLITMUX_UNLOCK (splitmux);
1652 splitmux->fragment_id = 0;
1653 set_next_filename (splitmux);
1656 case GST_STATE_CHANGE_READY_TO_PAUSED:{
1657 GST_SPLITMUX_LOCK (splitmux);
1658 /* Start by collecting one input on each pad */
1659 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1660 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
1661 splitmux->muxed_out_time = splitmux->mux_start_time =
1662 GST_CLOCK_STIME_NONE;
1663 splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1664 splitmux->opening_first_fragment = TRUE;
1665 GST_SPLITMUX_UNLOCK (splitmux);
1668 case GST_STATE_CHANGE_PAUSED_TO_READY:
1669 case GST_STATE_CHANGE_READY_TO_NULL:
1670 GST_SPLITMUX_LOCK (splitmux);
1671 splitmux->state = SPLITMUX_STATE_STOPPED;
1672 /* Wake up any blocked threads */
1673 GST_LOG_OBJECT (splitmux,
1674 "State change -> NULL or READY. Waking threads");
1675 GST_SPLITMUX_BROADCAST (splitmux);
1676 GST_SPLITMUX_UNLOCK (splitmux);
1682 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1683 if (ret == GST_STATE_CHANGE_FAILURE)
1686 switch (transition) {
1687 case GST_STATE_CHANGE_READY_TO_NULL:
1688 GST_SPLITMUX_LOCK (splitmux);
1689 splitmux->fragment_id = 0;
1690 /* Reset internal elements only if no pad contexts are using them */
1691 if (splitmux->contexts == NULL)
1692 gst_splitmux_reset (splitmux);
1693 GST_SPLITMUX_UNLOCK (splitmux);
1701 if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1702 ret == GST_STATE_CHANGE_FAILURE) {
1703 /* Cleanup elements on failed transition out of NULL */
1704 gst_splitmux_reset (splitmux);
1710 register_splitmuxsink (GstPlugin * plugin)
1712 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1713 "Split File Muxing Sink");
1715 return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1716 GST_TYPE_SPLITMUX_SINK);
1720 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
1722 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
1723 splitmux->fragment_id = 0;