1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @short_description: Muxer wrapper for splitting output stream by size or time
24 * This element wraps a muxer and a sink, and starts a new file when the mux
25 * contents are about to cross a threshold of maximum size of maximum time,
26 * splitting at video keyframe boundaries. Exactly one input video stream
27 * can be muxed, with as many accompanying audio and subtitle streams as
30 * By default, it uses mp4mux and filesink, but they can be changed via
31 * the 'muxer' and 'sink' properties.
33 * The minimum file size is 1 GOP, however - so limits may be overrun if the
34 * distance between any 2 keyframes is larger than the limits.
36 * If a video stream is available, the splitting process is driven by the video
37 * stream contents, and the video stream must contain closed GOPs for the output
38 * file parts to be played individually correctly. In the absence of a video
39 * stream, the first available stream is used as reference for synchronization.
42 * <title>Example pipelines</title>
44 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
46 * Records a video stream captured from a v4l2 device and muxes it into
47 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48 * and 1MB maximum size.
57 #include <glib/gstdio.h>
58 #include <gst/video/video.h>
59 #include "gstsplitmuxsink.h"
61 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
62 #define GST_CAT_DEFAULT splitmux_debug
64 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
65 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
66 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
67 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
75 PROP_SEND_KEYFRAME_REQUESTS,
82 #define DEFAULT_MAX_SIZE_TIME 0
83 #define DEFAULT_MAX_SIZE_BYTES 0
84 #define DEFAULT_MAX_FILES 0
85 #define DEFAULT_MUXER_OVERHEAD 0.02
86 #define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
87 #define DEFAULT_MUXER "mp4mux"
88 #define DEFAULT_SINK "filesink"
92 SIGNAL_FORMAT_LOCATION,
96 static guint signals[SIGNAL_LAST];
98 static GstStaticPadTemplate video_sink_template =
99 GST_STATIC_PAD_TEMPLATE ("video",
102 GST_STATIC_CAPS_ANY);
103 static GstStaticPadTemplate audio_sink_template =
104 GST_STATIC_PAD_TEMPLATE ("audio_%u",
107 GST_STATIC_CAPS_ANY);
108 static GstStaticPadTemplate subtitle_sink_template =
109 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
112 GST_STATIC_CAPS_ANY);
114 static GQuark PAD_CONTEXT;
119 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
122 #define gst_splitmux_sink_parent_class parent_class
123 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
126 static gboolean create_elements (GstSplitMuxSink * splitmux);
127 static gboolean create_sink (GstSplitMuxSink * splitmux);
128 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
129 const GValue * value, GParamSpec * pspec);
130 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
131 GValue * value, GParamSpec * pspec);
132 static void gst_splitmux_sink_dispose (GObject * object);
133 static void gst_splitmux_sink_finalize (GObject * object);
135 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
136 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
137 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
139 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
140 element, GstStateChange transition);
142 static void bus_handler (GstBin * bin, GstMessage * msg);
143 static void set_next_filename (GstSplitMuxSink * splitmux);
144 static void start_next_fragment (GstSplitMuxSink * splitmux);
145 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
146 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
148 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
151 mq_stream_buf_new (void)
153 return g_slice_new0 (MqStreamBuf);
157 mq_stream_buf_free (MqStreamBuf * data)
159 g_slice_free (MqStreamBuf, data);
163 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
165 GObjectClass *gobject_class = (GObjectClass *) klass;
166 GstElementClass *gstelement_class = (GstElementClass *) klass;
167 GstBinClass *gstbin_class = (GstBinClass *) klass;
169 gobject_class->set_property = gst_splitmux_sink_set_property;
170 gobject_class->get_property = gst_splitmux_sink_get_property;
171 gobject_class->dispose = gst_splitmux_sink_dispose;
172 gobject_class->finalize = gst_splitmux_sink_finalize;
174 gst_element_class_set_static_metadata (gstelement_class,
175 "Split Muxing Bin", "Generic/Bin/Muxer",
176 "Convenience bin that muxes incoming streams into multiple time/size limited files",
177 "Jan Schmidt <jan@centricular.com>");
179 gst_element_class_add_static_pad_template (gstelement_class,
180 &video_sink_template);
181 gst_element_class_add_static_pad_template (gstelement_class,
182 &audio_sink_template);
183 gst_element_class_add_static_pad_template (gstelement_class,
184 &subtitle_sink_template);
186 gstelement_class->change_state =
187 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
188 gstelement_class->request_new_pad =
189 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
190 gstelement_class->release_pad =
191 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
193 gstbin_class->handle_message = bus_handler;
195 g_object_class_install_property (gobject_class, PROP_LOCATION,
196 g_param_spec_string ("location", "File Output Pattern",
197 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
198 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
199 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
200 g_param_spec_double ("mux-overhead", "Muxing Overhead",
201 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
202 DEFAULT_MUXER_OVERHEAD,
203 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
205 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
206 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
207 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
208 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
209 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
210 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
211 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
212 DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
213 g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
214 g_param_spec_boolean ("send-keyframe-requests",
215 "Request keyframes at max-size-time",
216 "Request a keyframe every max-size-time ns to try splitting at that point. "
217 "Needs max-size-bytes to be 0 in order to be effective.",
218 DEFAULT_SEND_KEYFRAME_REQUESTS,
219 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
220 g_object_class_install_property (gobject_class, PROP_MAX_FILES,
221 g_param_spec_uint ("max-files", "Max files",
222 "Maximum number of files to keep on disk. Once the maximum is reached,"
223 "old files start to be deleted to make room for new ones.", 0,
224 G_MAXUINT, DEFAULT_MAX_FILES,
225 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
228 g_object_class_install_property (gobject_class, PROP_MUXER,
229 g_param_spec_object ("muxer", "Muxer",
230 "The muxer element to use (NULL = default mp4mux)",
231 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
232 g_object_class_install_property (gobject_class, PROP_SINK,
233 g_param_spec_object ("sink", "Sink",
234 "The sink element (or element chain) to use (NULL = default filesink)",
235 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
238 * GstSplitMuxSink::format-location:
239 * @splitmux: the #GstSplitMuxSink
240 * @fragment_id: the sequence number of the file to be created
242 * Returns: the location to be used for the next output file
244 signals[SIGNAL_FORMAT_LOCATION] =
245 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
246 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
250 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
252 g_mutex_init (&splitmux->lock);
253 g_cond_init (&splitmux->data_cond);
255 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
256 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
257 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
258 splitmux->max_files = DEFAULT_MAX_FILES;
259 splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
261 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
265 gst_splitmux_reset (GstSplitMuxSink * splitmux)
268 gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
270 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
271 if (splitmux->active_sink)
272 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
274 splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
279 gst_splitmux_sink_dispose (GObject * object)
281 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
283 G_OBJECT_CLASS (parent_class)->dispose (object);
285 /* Calling parent dispose invalidates all child pointers */
286 splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
291 gst_splitmux_sink_finalize (GObject * object)
293 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
294 g_cond_clear (&splitmux->data_cond);
295 g_mutex_clear (&splitmux->lock);
296 if (splitmux->provided_sink)
297 gst_object_unref (splitmux->provided_sink);
298 if (splitmux->provided_muxer)
299 gst_object_unref (splitmux->provided_muxer);
301 g_free (splitmux->location);
303 /* Make sure to free any un-released contexts */
304 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
305 g_list_free (splitmux->contexts);
307 G_OBJECT_CLASS (parent_class)->finalize (object);
311 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
312 const GValue * value, GParamSpec * pspec)
314 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
318 GST_OBJECT_LOCK (splitmux);
319 g_free (splitmux->location);
320 splitmux->location = g_value_dup_string (value);
321 GST_OBJECT_UNLOCK (splitmux);
324 case PROP_MAX_SIZE_BYTES:
325 GST_OBJECT_LOCK (splitmux);
326 splitmux->threshold_bytes = g_value_get_uint64 (value);
327 GST_OBJECT_UNLOCK (splitmux);
329 case PROP_MAX_SIZE_TIME:
330 GST_OBJECT_LOCK (splitmux);
331 splitmux->threshold_time = g_value_get_uint64 (value);
332 GST_OBJECT_UNLOCK (splitmux);
334 case PROP_SEND_KEYFRAME_REQUESTS:
335 GST_OBJECT_LOCK (splitmux);
336 splitmux->send_keyframe_requests = g_value_get_boolean (value);
337 GST_OBJECT_UNLOCK (splitmux);
340 GST_OBJECT_LOCK (splitmux);
341 splitmux->max_files = g_value_get_uint (value);
342 GST_OBJECT_UNLOCK (splitmux);
344 case PROP_MUXER_OVERHEAD:
345 GST_OBJECT_LOCK (splitmux);
346 splitmux->mux_overhead = g_value_get_double (value);
347 GST_OBJECT_UNLOCK (splitmux);
350 GST_OBJECT_LOCK (splitmux);
351 if (splitmux->provided_sink)
352 gst_object_unref (splitmux->provided_sink);
353 splitmux->provided_sink = g_value_dup_object (value);
354 GST_OBJECT_UNLOCK (splitmux);
357 GST_OBJECT_LOCK (splitmux);
358 if (splitmux->provided_muxer)
359 gst_object_unref (splitmux->provided_muxer);
360 splitmux->provided_muxer = g_value_dup_object (value);
361 GST_OBJECT_UNLOCK (splitmux);
364 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
370 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
371 GValue * value, GParamSpec * pspec)
373 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
377 GST_OBJECT_LOCK (splitmux);
378 g_value_set_string (value, splitmux->location);
379 GST_OBJECT_UNLOCK (splitmux);
381 case PROP_MAX_SIZE_BYTES:
382 GST_OBJECT_LOCK (splitmux);
383 g_value_set_uint64 (value, splitmux->threshold_bytes);
384 GST_OBJECT_UNLOCK (splitmux);
386 case PROP_MAX_SIZE_TIME:
387 GST_OBJECT_LOCK (splitmux);
388 g_value_set_uint64 (value, splitmux->threshold_time);
389 GST_OBJECT_UNLOCK (splitmux);
391 case PROP_SEND_KEYFRAME_REQUESTS:
392 GST_OBJECT_LOCK (splitmux);
393 g_value_set_boolean (value, splitmux->send_keyframe_requests);
394 GST_OBJECT_UNLOCK (splitmux);
397 GST_OBJECT_LOCK (splitmux);
398 g_value_set_uint (value, splitmux->max_files);
399 GST_OBJECT_UNLOCK (splitmux);
401 case PROP_MUXER_OVERHEAD:
402 GST_OBJECT_LOCK (splitmux);
403 g_value_set_double (value, splitmux->mux_overhead);
404 GST_OBJECT_UNLOCK (splitmux);
407 GST_OBJECT_LOCK (splitmux);
408 g_value_set_object (value, splitmux->provided_sink);
409 GST_OBJECT_UNLOCK (splitmux);
412 GST_OBJECT_LOCK (splitmux);
413 g_value_set_object (value, splitmux->provided_muxer);
414 GST_OBJECT_UNLOCK (splitmux);
417 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
422 /* Convenience function */
423 static inline GstClockTimeDiff
424 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
426 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
428 if (GST_CLOCK_TIME_IS_VALID (val)) {
430 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
440 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
442 gchar *tmp, *sinkname, *srcname;
445 sinkname = gst_pad_get_name (sink_pad);
447 srcname = g_strdup_printf ("src_%s", tmp);
449 mq_src = gst_element_get_static_pad (mq, srcname);
458 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
464 /* Request a pad from multiqueue, then connect this one, then
465 * discover the corresponding output pad and return both */
466 mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
470 mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
480 gst_element_release_request_pad (splitmux->mq, mq_sink);
485 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
489 ctx = g_new0 (MqStreamCtx, 1);
490 g_atomic_int_set (&ctx->refcount, 1);
491 ctx->splitmux = splitmux;
492 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
493 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
494 ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
495 g_queue_init (&ctx->queued_bufs);
500 mq_stream_ctx_free (MqStreamCtx * ctx)
502 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
503 g_queue_clear (&ctx->queued_bufs);
508 mq_stream_ctx_unref (MqStreamCtx * ctx)
510 if (g_atomic_int_dec_and_test (&ctx->refcount))
511 mq_stream_ctx_free (ctx);
515 mq_stream_ctx_ref (MqStreamCtx * ctx)
517 g_atomic_int_inc (&ctx->refcount);
521 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
523 ctx->sink_pad_block_id = 0;
524 mq_stream_ctx_unref (ctx);
528 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
530 ctx->src_pad_block_id = 0;
531 mq_stream_ctx_unref (ctx);
535 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
537 gchar *location = NULL;
539 const gchar *msg_name = opened ?
540 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
542 g_object_get (splitmux->sink, "location", &location, NULL);
544 msg = gst_message_new_element (GST_OBJECT (splitmux),
545 gst_structure_new (msg_name,
546 "location", G_TYPE_STRING, location,
547 "running-time", GST_TYPE_CLOCK_TIME,
548 splitmux->reference_ctx->out_running_time, NULL));
549 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
554 /* Called with lock held, drops the lock to send EOS to the
558 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
563 eos = gst_event_new_eos ();
564 pad = gst_pad_get_peer (ctx->srcpad);
568 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
569 GST_SPLITMUX_UNLOCK (splitmux);
570 gst_pad_send_event (pad, eos);
571 GST_SPLITMUX_LOCK (splitmux);
573 gst_object_unref (pad);
576 /* Called with splitmux lock held to check if this output
577 * context needs to sleep to wait for the release of the
578 * next GOP, or to send EOS to close out the current file
581 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
585 GST_LOG_OBJECT (ctx->srcpad,
586 "Checking running time %" GST_STIME_FORMAT " against max %"
587 GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
588 GST_STIME_ARGS (splitmux->max_out_running_time));
590 if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
591 ctx->out_running_time < splitmux->max_out_running_time) {
592 splitmux->have_muxed_something = TRUE;
596 if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
599 if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
600 if (ctx->out_eos == FALSE) {
601 send_eos (splitmux, ctx);
604 } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
605 start_next_fragment (splitmux);
609 GST_INFO_OBJECT (ctx->srcpad,
610 "Sleeping for running time %"
611 GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")",
612 GST_STIME_ARGS (ctx->out_running_time),
613 GST_STIME_ARGS (splitmux->max_out_running_time));
614 ctx->out_blocked = TRUE;
615 /* Expand the mq if needed before sleeping */
616 check_queue_length (splitmux, ctx);
617 GST_SPLITMUX_WAIT (splitmux);
618 ctx->out_blocked = FALSE;
619 GST_INFO_OBJECT (ctx->srcpad,
620 "Woken for new max running time %" GST_STIME_FORMAT,
621 GST_STIME_ARGS (splitmux->max_out_running_time));
626 request_next_keyframe (GstSplitMuxSink * splitmux)
630 if (splitmux->send_keyframe_requests == FALSE || splitmux->threshold_time == 0
631 || splitmux->threshold_bytes != 0)
634 ev = gst_video_event_new_upstream_force_key_unit (splitmux->fragment_id *
635 splitmux->threshold_time, TRUE, 0);
636 GST_DEBUG_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
637 GST_TIME_ARGS (splitmux->fragment_id * splitmux->threshold_time));
638 return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
641 static GstPadProbeReturn
642 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
644 GstSplitMuxSink *splitmux = ctx->splitmux;
645 MqStreamBuf *buf_info = NULL;
647 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
649 /* FIXME: Handle buffer lists, until then make it clear they won't work */
650 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
651 g_warning ("Buffer list handling not implemented");
652 return GST_PAD_PROBE_DROP;
654 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
655 GstEvent *event = gst_pad_probe_info_get_event (info);
657 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
659 switch (GST_EVENT_TYPE (event)) {
660 case GST_EVENT_SEGMENT:
661 gst_event_copy_segment (event, &ctx->out_segment);
663 case GST_EVENT_FLUSH_STOP:
664 GST_SPLITMUX_LOCK (splitmux);
665 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
666 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
667 g_queue_clear (&ctx->queued_bufs);
668 ctx->flushing = FALSE;
669 GST_SPLITMUX_UNLOCK (splitmux);
671 case GST_EVENT_FLUSH_START:
672 GST_SPLITMUX_LOCK (splitmux);
673 GST_LOG_OBJECT (pad, "Flush start");
674 ctx->flushing = TRUE;
675 GST_SPLITMUX_BROADCAST (splitmux);
676 GST_SPLITMUX_UNLOCK (splitmux);
679 GST_SPLITMUX_LOCK (splitmux);
680 if (splitmux->state == SPLITMUX_STATE_STOPPED)
683 GST_SPLITMUX_UNLOCK (splitmux);
687 GstClockTimeDiff rtime;
689 gst_event_parse_gap (event, &gap_ts, NULL);
690 if (gap_ts == GST_CLOCK_TIME_NONE)
693 GST_SPLITMUX_LOCK (splitmux);
695 rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
697 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
698 GST_STIME_ARGS (rtime));
700 if (splitmux->state == SPLITMUX_STATE_STOPPED)
703 if (rtime != GST_CLOCK_STIME_NONE) {
704 ctx->out_running_time = rtime;
705 complete_or_wait_on_out (splitmux, ctx);
707 GST_SPLITMUX_UNLOCK (splitmux);
710 case GST_EVENT_CUSTOM_DOWNSTREAM:{
711 const GstStructure *s;
712 GstClockTimeDiff ts = 0;
714 s = gst_event_get_structure (event);
715 if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
718 gst_structure_get_int64 (s, "timestamp", &ts);
720 GST_SPLITMUX_LOCK (splitmux);
722 if (splitmux->state == SPLITMUX_STATE_STOPPED)
724 ctx->out_running_time = ts;
725 complete_or_wait_on_out (splitmux, ctx);
726 GST_SPLITMUX_UNLOCK (splitmux);
727 return GST_PAD_PROBE_DROP;
732 return GST_PAD_PROBE_PASS;
735 /* Allow everything through until the configured next stopping point */
736 GST_SPLITMUX_LOCK (splitmux);
738 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
739 if (buf_info == NULL)
740 /* Can only happen due to a poorly timed flush */
743 /* If we have popped a keyframe, decrement the queued_gop count */
744 if (buf_info->keyframe && splitmux->queued_gops > 0)
745 splitmux->queued_gops--;
747 ctx->out_running_time = buf_info->run_ts;
749 GST_LOG_OBJECT (splitmux,
750 "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
751 " size %" G_GUINT64_FORMAT,
752 pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
754 if (splitmux->opening_first_fragment) {
755 if (request_next_keyframe (splitmux) == FALSE)
756 GST_WARNING_OBJECT (splitmux,
757 "Could not request a keyframe. Files may not split at the exact location they should");
758 send_fragment_opened_closed_msg (splitmux, TRUE);
759 splitmux->opening_first_fragment = FALSE;
762 complete_or_wait_on_out (splitmux, ctx);
764 if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE ||
765 splitmux->muxed_out_time < buf_info->run_ts)
766 splitmux->muxed_out_time = buf_info->run_ts;
768 splitmux->muxed_out_bytes += buf_info->buf_size;
769 splitmux->last_frame_duration = buf_info->duration;
771 #ifndef GST_DISABLE_GST_DEBUG
773 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
774 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
775 " run ts %" GST_STIME_FORMAT, buf,
776 GST_STIME_ARGS (ctx->out_running_time));
780 GST_SPLITMUX_UNLOCK (splitmux);
782 mq_stream_buf_free (buf_info);
784 return GST_PAD_PROBE_PASS;
787 GST_SPLITMUX_UNLOCK (splitmux);
788 return GST_PAD_PROBE_DROP;
792 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
794 return gst_pad_send_event (peer, gst_event_ref (*event));
798 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
800 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
802 gst_pad_sticky_events_foreach (ctx->srcpad,
803 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
805 /* Clear EOS flag if not actually EOS */
806 ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
808 gst_object_unref (peer);
811 /* Called with lock held when a fragment
812 * reaches EOS and it is time to restart
816 start_next_fragment (GstSplitMuxSink * splitmux)
818 /* 1 change to new file */
819 splitmux->switching_fragment = TRUE;
821 gst_element_set_locked_state (splitmux->muxer, TRUE);
822 gst_element_set_locked_state (splitmux->active_sink, TRUE);
823 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
824 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
826 set_next_filename (splitmux);
828 gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
829 gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
830 gst_element_set_locked_state (splitmux->muxer, FALSE);
831 gst_element_set_locked_state (splitmux->active_sink, FALSE);
833 splitmux->switching_fragment = FALSE;
835 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
837 /* Switch state and go back to processing */
838 if (!splitmux->reference_ctx->in_eos) {
839 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
840 splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
842 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
843 splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
844 splitmux->have_muxed_something = FALSE;
846 splitmux->have_muxed_something =
847 (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
849 /* Store the overflow parameters as the basis for the next fragment */
850 splitmux->mux_start_time = splitmux->muxed_out_time;
851 if (splitmux->last_frame_duration != GST_CLOCK_STIME_NONE)
852 splitmux->mux_start_time += splitmux->last_frame_duration;
853 splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
855 GST_DEBUG_OBJECT (splitmux,
856 "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
857 GST_STIME_ARGS (splitmux->max_out_running_time));
859 send_fragment_opened_closed_msg (splitmux, TRUE);
860 if (request_next_keyframe (splitmux) == FALSE)
861 GST_WARNING_OBJECT (splitmux,
862 "Could not request a keyframe. Files may not split at the exact location they should");
864 GST_SPLITMUX_BROADCAST (splitmux);
868 bus_handler (GstBin * bin, GstMessage * message)
870 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
872 switch (GST_MESSAGE_TYPE (message)) {
873 case GST_MESSAGE_EOS:
874 /* If the state is draining out the current file, drop this EOS */
875 GST_SPLITMUX_LOCK (splitmux);
877 send_fragment_opened_closed_msg (splitmux, FALSE);
879 if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
880 splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) {
881 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
882 splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
883 GST_SPLITMUX_BROADCAST (splitmux);
885 gst_message_unref (message);
886 GST_SPLITMUX_UNLOCK (splitmux);
889 GST_SPLITMUX_UNLOCK (splitmux);
891 case GST_MESSAGE_ASYNC_START:
892 case GST_MESSAGE_ASYNC_DONE:
893 /* Ignore state changes from our children while switching */
894 if (splitmux->switching_fragment) {
895 if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink ||
896 GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
897 GST_LOG_OBJECT (splitmux,
898 "Ignoring state change from child %" GST_PTR_FORMAT
899 " while switching", GST_MESSAGE_SRC (message));
900 gst_message_unref (message);
909 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
912 /* Called with splitmux lock held */
913 /* Called when entering ProcessingCompleteGop state
914 * Assess if mq contents overflowed the current file
915 * -> If yes, need to switch to new file
916 * -> if no, set max_out_running_time to let this GOP in and
917 * go to COLLECTING_GOP_START state
920 handle_gathered_gop (GstSplitMuxSink * splitmux)
923 guint64 queued_bytes = 0;
924 GstClockTimeDiff queued_time = 0;
926 /* Assess if the multiqueue contents overflowed the current file */
927 for (cur = g_list_first (splitmux->contexts);
928 cur != NULL; cur = g_list_next (cur)) {
929 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
930 if (tmpctx->in_running_time > queued_time)
931 queued_time = tmpctx->in_running_time;
932 queued_bytes += tmpctx->in_bytes;
935 GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT
936 " splitmuxsink->mux_start_bytes %" G_GUINT64_FORMAT, queued_bytes,
937 splitmux->mux_start_bytes);
938 g_assert (queued_bytes >= splitmux->mux_start_bytes);
939 g_assert (queued_time >= splitmux->mux_start_time);
941 queued_bytes -= splitmux->mux_start_bytes;
942 queued_time -= splitmux->mux_start_time;
944 /* Expand queued bytes estimate by muxer overhead */
945 queued_bytes += (queued_bytes * splitmux->mux_overhead);
947 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
948 " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
950 /* Check for overrun - have we output at least one byte and overrun
951 * either threshold? */
952 if ((splitmux->have_muxed_something &&
953 ((splitmux->threshold_bytes > 0 &&
954 queued_bytes > splitmux->threshold_bytes) ||
955 (splitmux->threshold_time > 0 &&
956 queued_time > splitmux->threshold_time)))) {
958 splitmux->state = SPLITMUX_STATE_ENDING_FILE;
959 GST_INFO_OBJECT (splitmux,
960 "mq overflowed since last, draining out. max out TS is %"
961 GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
962 GST_SPLITMUX_BROADCAST (splitmux);
966 GST_LOG_OBJECT (splitmux,
967 "This GOP didn't overflow the fragment. Bytes sent %" G_GUINT64_FORMAT
968 " queued %" G_GSIZE_FORMAT " time %" GST_STIME_FORMAT " Continuing.",
969 splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
970 queued_bytes, GST_STIME_ARGS (queued_time));
972 /* Wake everyone up to push this one GOP, then sleep */
973 splitmux->have_muxed_something = TRUE;
975 if (!splitmux->reference_ctx->in_eos) {
976 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
977 splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
979 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
980 splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
983 GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
984 GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
985 GST_SPLITMUX_BROADCAST (splitmux);
990 /* Called with splitmux lock held */
991 /* Called from each input pad when it is has all the pieces
992 * for a GOP or EOS, starting with the reference pad which has set the
993 * splitmux->max_in_running_time
996 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
999 gboolean ready = TRUE;
1000 GstClockTimeDiff current_max_in_running_time;
1002 if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1003 /* Iterate each pad, and check that the input running time is at least
1004 * up to the reference running time, and if so handle the collected GOP */
1005 GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
1006 GST_STIME_FORMAT " ctx %p",
1007 GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
1008 for (cur = g_list_first (splitmux->contexts); cur != NULL;
1009 cur = g_list_next (cur)) {
1010 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1012 GST_LOG_OBJECT (splitmux,
1013 "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
1014 " EOS %d", tmpctx, tmpctx->srcpad,
1015 GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
1017 if (splitmux->max_in_running_time != G_MAXINT64 &&
1018 tmpctx->in_running_time < splitmux->max_in_running_time &&
1020 GST_LOG_OBJECT (splitmux,
1021 "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
1022 tmpctx, tmpctx->srcpad);
1028 GST_DEBUG_OBJECT (splitmux,
1029 "Collected GOP is complete. Processing (ctx %p)", ctx);
1030 /* All pads have a complete GOP, release it into the multiqueue */
1031 handle_gathered_gop (splitmux);
1035 /* If upstream reached EOS we are not expecting more data, no need to wait
1040 /* Some pad is not yet ready, or GOP is being pushed
1041 * either way, sleep and wait to get woken */
1042 current_max_in_running_time = splitmux->max_in_running_time;
1043 while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
1044 splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
1046 (current_max_in_running_time == splitmux->max_in_running_time)) {
1048 GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
1049 splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
1050 "GOP complete" : "EOF draining", ctx);
1051 GST_SPLITMUX_WAIT (splitmux);
1053 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
1058 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
1061 guint cur_len = g_queue_get_length (&ctx->queued_bufs);
1063 GST_DEBUG_OBJECT (ctx->sinkpad,
1064 "Checking queue length len %u cur_max %u queued gops %u",
1065 cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
1067 if (cur_len >= splitmux->mq_max_buffers) {
1068 gboolean allow_grow = FALSE;
1070 /* If collecting a GOP and this pad might block,
1071 * and there isn't already a pending GOP in the queue
1074 if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
1075 ctx->in_running_time < splitmux->max_in_running_time &&
1076 splitmux->queued_gops <= 1) {
1078 } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
1079 ctx->is_reference && splitmux->queued_gops <= 1) {
1084 for (cur = g_list_first (splitmux->contexts);
1085 cur != NULL; cur = g_list_next (cur)) {
1086 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
1087 GST_DEBUG_OBJECT (tmpctx->sinkpad,
1088 " len %u out_blocked %d",
1089 g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
1090 /* If another stream is starving, grow */
1091 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
1098 splitmux->mq_max_buffers = cur_len + 1;
1100 GST_INFO_OBJECT (splitmux,
1101 "Multiqueue overrun - enlarging to %u buffers ctx %p",
1102 splitmux->mq_max_buffers, ctx);
1104 g_object_set (splitmux->mq, "max-size-buffers",
1105 splitmux->mq_max_buffers, NULL);
1110 static GstPadProbeReturn
1111 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
1113 GstSplitMuxSink *splitmux = ctx->splitmux;
1115 MqStreamBuf *buf_info = NULL;
1117 gboolean loop_again;
1118 gboolean keyframe = FALSE;
1120 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
1122 /* FIXME: Handle buffer lists, until then make it clear they won't work */
1123 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1124 g_warning ("Buffer list handling not implemented");
1125 return GST_PAD_PROBE_DROP;
1127 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
1128 GstEvent *event = gst_pad_probe_info_get_event (info);
1129 switch (GST_EVENT_TYPE (event)) {
1130 case GST_EVENT_SEGMENT:
1131 gst_event_copy_segment (event, &ctx->in_segment);
1133 case GST_EVENT_FLUSH_STOP:
1134 GST_SPLITMUX_LOCK (splitmux);
1135 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
1136 ctx->in_eos = FALSE;
1138 ctx->in_running_time = GST_CLOCK_STIME_NONE;
1139 GST_SPLITMUX_UNLOCK (splitmux);
1142 GST_SPLITMUX_LOCK (splitmux);
1145 if (splitmux->state == SPLITMUX_STATE_STOPPED)
1148 if (ctx->is_reference) {
1149 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1150 /* Act as if this is a new keyframe with infinite timestamp */
1151 splitmux->max_in_running_time = G_MAXINT64;
1152 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1153 /* Wake up other input pads to collect this GOP */
1154 GST_SPLITMUX_BROADCAST (splitmux);
1155 check_completed_gop (splitmux, ctx);
1156 } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1157 /* If we are waiting for a GOP to be completed (ie, for aux
1158 * pads to catch up), then this pad is complete, so check
1159 * if the whole GOP is.
1161 check_completed_gop (splitmux, ctx);
1163 GST_SPLITMUX_UNLOCK (splitmux);
1168 return GST_PAD_PROBE_PASS;
1171 buf = gst_pad_probe_info_get_buffer (info);
1172 buf_info = mq_stream_buf_new ();
1174 if (GST_BUFFER_PTS_IS_VALID (buf))
1175 ts = GST_BUFFER_PTS (buf);
1177 ts = GST_BUFFER_DTS (buf);
1179 GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
1181 GST_SPLITMUX_LOCK (splitmux);
1183 if (splitmux->state == SPLITMUX_STATE_STOPPED)
1186 /* If this buffer has a timestamp, advance the input timestamp of the
1188 if (GST_CLOCK_TIME_IS_VALID (ts)) {
1189 GstClockTimeDiff running_time =
1190 my_segment_to_running_time (&ctx->in_segment, ts);
1192 GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
1193 GST_STIME_ARGS (running_time));
1195 if (GST_CLOCK_STIME_IS_VALID (running_time)
1196 && running_time > ctx->in_running_time)
1197 ctx->in_running_time = running_time;
1200 /* Try to make sure we have a valid running time */
1201 if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
1202 ctx->in_running_time =
1203 my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
1206 GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
1207 GST_STIME_ARGS (ctx->in_running_time));
1209 buf_info->run_ts = ctx->in_running_time;
1210 buf_info->buf_size = gst_buffer_get_size (buf);
1211 buf_info->duration = GST_BUFFER_DURATION (buf);
1213 /* Update total input byte counter for overflow detect */
1214 ctx->in_bytes += buf_info->buf_size;
1216 /* initialize mux_start_time */
1217 if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) {
1218 splitmux->mux_start_time = buf_info->run_ts;
1219 GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
1220 GST_STIME_ARGS (splitmux->mux_start_time));
1221 /* Also take this as the first start time when starting up,
1222 * so that we start counting overflow from the first frame */
1223 if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
1224 splitmux->max_in_running_time = splitmux->mux_start_time;
1227 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
1228 " total in_bytes %" G_GUINT64_FORMAT,
1229 GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1236 switch (splitmux->state) {
1237 case SPLITMUX_STATE_COLLECTING_GOP_START:
1238 if (ctx->is_reference) {
1239 /* If a keyframe, we have a complete GOP */
1240 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1241 !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
1242 splitmux->max_in_running_time >= ctx->in_running_time) {
1243 /* Pass this buffer through */
1247 GST_INFO_OBJECT (pad,
1248 "Have keyframe with running time %" GST_STIME_FORMAT,
1249 GST_STIME_ARGS (ctx->in_running_time));
1251 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1252 splitmux->max_in_running_time = ctx->in_running_time;
1253 /* Wake up other input pads to collect this GOP */
1254 GST_SPLITMUX_BROADCAST (splitmux);
1255 check_completed_gop (splitmux, ctx);
1257 /* We're still waiting for a keyframe on the reference pad, sleep */
1258 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1259 GST_SPLITMUX_WAIT (splitmux);
1260 GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1264 case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1266 /* If we overran the target timestamp, it might be time to process
1267 * the GOP, otherwise bail out for more data
1269 GST_LOG_OBJECT (pad,
1270 "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT,
1271 GST_STIME_ARGS (ctx->in_running_time),
1272 GST_STIME_ARGS (splitmux->max_in_running_time));
1274 if (ctx->in_running_time < splitmux->max_in_running_time) {
1279 GST_LOG_OBJECT (pad,
1280 "Collected last packet of GOP. Checking other pads");
1281 check_completed_gop (splitmux, ctx);
1283 case SPLITMUX_STATE_ENDING_FILE:{
1286 /* If somes streams received no buffer during the last GOP that overran,
1287 * because its next buffer has a timestamp bigger than
1288 * ctx->max_in_running_time, its queue is empty. In that case the only
1289 * way to wakeup the output thread is by injecting an event in the
1290 * queue. This usually happen with subtitle streams.
1291 * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
1292 GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
1293 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
1294 GST_EVENT_TYPE_SERIALIZED,
1295 gst_structure_new ("splitmuxsink-unblock", "timestamp",
1296 G_TYPE_INT64, splitmux->max_in_running_time, NULL));
1298 GST_SPLITMUX_UNLOCK (splitmux);
1299 gst_pad_send_event (ctx->sinkpad, event);
1300 GST_SPLITMUX_LOCK (splitmux);
1301 /* state may have changed while we were unlocked. Loop again if so */
1302 if (splitmux->state != SPLITMUX_STATE_ENDING_FILE)
1306 case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1307 /* A fragment is ending, wait until that's done before continuing */
1308 GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1309 GST_SPLITMUX_WAIT (splitmux);
1310 GST_DEBUG_OBJECT (pad,
1311 "Done sleeping for fragment restart state now %d", splitmux->state);
1317 } while (loop_again);
1320 splitmux->queued_gops++;
1321 buf_info->keyframe = TRUE;
1324 /* Now add this buffer to the queue just before returning */
1325 g_queue_push_head (&ctx->queued_bufs, buf_info);
1327 /* Check the buffer will fit in the mq */
1328 check_queue_length (splitmux, ctx);
1330 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1331 " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
1333 GST_SPLITMUX_UNLOCK (splitmux);
1334 return GST_PAD_PROBE_PASS;
1337 GST_SPLITMUX_UNLOCK (splitmux);
1339 mq_stream_buf_free (buf_info);
1340 return GST_PAD_PROBE_PASS;
1344 gst_splitmux_sink_request_new_pad (GstElement * element,
1345 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1347 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1348 GstPadTemplate *mux_template = NULL;
1350 GstPad *mq_sink, *mq_src;
1352 gboolean is_video = FALSE;
1355 GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1357 GST_SPLITMUX_LOCK (splitmux);
1358 if (!create_elements (splitmux))
1361 if (templ->name_template) {
1362 if (g_str_equal (templ->name_template, "video")) {
1363 /* FIXME: Look for a pad template with matching caps, rather than by name */
1365 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1366 (splitmux->muxer), "video_%u");
1371 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1372 (splitmux->muxer), templ->name_template);
1374 if (mux_template == NULL) {
1375 /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1377 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1378 (splitmux->muxer), "sink_%d");
1382 res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1387 gname = g_strdup ("video");
1388 else if (name == NULL)
1389 gname = gst_pad_get_name (res);
1391 gname = g_strdup (name);
1393 if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1394 gst_element_release_request_pad (splitmux->muxer, res);
1395 gst_object_unref (GST_OBJECT (res));
1399 if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1400 gst_element_release_request_pad (splitmux->muxer, res);
1401 gst_object_unref (GST_OBJECT (res));
1402 gst_element_release_request_pad (splitmux->mq, mq_sink);
1403 gst_object_unref (GST_OBJECT (mq_sink));
1407 gst_object_unref (GST_OBJECT (res));
1409 ctx = mq_stream_ctx_new (splitmux);
1410 ctx->srcpad = mq_src;
1411 ctx->sinkpad = mq_sink;
1413 mq_stream_ctx_ref (ctx);
1414 ctx->src_pad_block_id =
1415 gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1416 (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1417 _pad_block_destroy_src_notify);
1418 if (is_video && splitmux->reference_ctx != NULL) {
1419 splitmux->reference_ctx->is_reference = FALSE;
1420 splitmux->reference_ctx = NULL;
1422 if (splitmux->reference_ctx == NULL) {
1423 splitmux->reference_ctx = ctx;
1424 ctx->is_reference = TRUE;
1427 res = gst_ghost_pad_new (gname, mq_sink);
1428 g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1430 mq_stream_ctx_ref (ctx);
1431 ctx->sink_pad_block_id =
1432 gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1433 (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1434 _pad_block_destroy_sink_notify);
1436 GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1437 " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1439 splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1443 gst_object_unref (mq_sink);
1444 gst_object_unref (mq_src);
1446 gst_pad_set_active (res, TRUE);
1447 gst_element_add_pad (element, res);
1448 GST_SPLITMUX_UNLOCK (splitmux);
1452 GST_SPLITMUX_UNLOCK (splitmux);
1457 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1459 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1460 GstPad *mqsink, *mqsrc, *muxpad;
1462 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1464 GST_SPLITMUX_LOCK (splitmux);
1466 if (splitmux->muxer == NULL || splitmux->mq == NULL)
1467 goto fail; /* Elements don't exist yet - nothing to release */
1469 GST_INFO_OBJECT (pad, "releasing request pad");
1471 mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1472 mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1473 muxpad = gst_pad_get_peer (mqsrc);
1475 /* Remove the context from our consideration */
1476 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1478 if (ctx->sink_pad_block_id)
1479 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1481 if (ctx->src_pad_block_id)
1482 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1484 /* Can release the context now */
1485 mq_stream_ctx_unref (ctx);
1487 /* Release and free the mq input */
1488 gst_element_release_request_pad (splitmux->mq, mqsink);
1490 /* Release and free the muxer input */
1491 gst_element_release_request_pad (splitmux->muxer, muxpad);
1493 gst_object_unref (mqsink);
1494 gst_object_unref (mqsrc);
1495 gst_object_unref (muxpad);
1497 gst_element_remove_pad (element, pad);
1499 /* Reset the internal elements only after all request pads are released */
1500 if (splitmux->contexts == NULL)
1501 gst_splitmux_reset (splitmux);
1504 GST_SPLITMUX_UNLOCK (splitmux);
1508 create_element (GstSplitMuxSink * splitmux,
1509 const gchar * factory, const gchar * name)
1511 GstElement *ret = gst_element_factory_make (factory, name);
1513 g_warning ("Failed to create %s - splitmuxsink will not work", name);
1517 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1518 g_warning ("Could not add %s element - splitmuxsink will not work", name);
1519 gst_object_unref (ret);
1527 create_elements (GstSplitMuxSink * splitmux)
1529 /* Create internal elements */
1530 if (splitmux->mq == NULL) {
1532 create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1535 splitmux->mq_max_buffers = 5;
1536 /* No bytes or time limit, we limit buffers manually */
1537 g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1538 (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1541 if (splitmux->muxer == NULL) {
1542 GstElement *provided_muxer = NULL;
1544 GST_OBJECT_LOCK (splitmux);
1545 if (splitmux->provided_muxer != NULL)
1546 provided_muxer = gst_object_ref (splitmux->provided_muxer);
1547 GST_OBJECT_UNLOCK (splitmux);
1549 if (provided_muxer == NULL) {
1550 if ((splitmux->muxer =
1551 create_element (splitmux, "mp4mux", "muxer")) == NULL)
1554 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1555 g_warning ("Could not add muxer element - splitmuxsink will not work");
1556 gst_object_unref (provided_muxer);
1560 splitmux->muxer = provided_muxer;
1561 gst_object_unref (provided_muxer);
1571 find_sink (GstElement * e)
1573 GstElement *res = NULL;
1575 gboolean done = FALSE;
1576 GValue data = { 0, };
1578 if (!GST_IS_BIN (e))
1581 iter = gst_bin_iterate_sinks (GST_BIN (e));
1583 switch (gst_iterator_next (iter, &data)) {
1584 case GST_ITERATOR_OK:
1586 GstElement *child = g_value_get_object (&data);
1587 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1588 "location") != NULL) {
1592 g_value_reset (&data);
1595 case GST_ITERATOR_RESYNC:
1596 gst_iterator_resync (iter);
1598 case GST_ITERATOR_DONE:
1601 case GST_ITERATOR_ERROR:
1602 g_assert_not_reached ();
1606 g_value_unset (&data);
1607 gst_iterator_free (iter);
1613 create_sink (GstSplitMuxSink * splitmux)
1615 GstElement *provided_sink = NULL;
1617 if (splitmux->active_sink == NULL) {
1619 GST_OBJECT_LOCK (splitmux);
1620 if (splitmux->provided_sink != NULL)
1621 provided_sink = gst_object_ref (splitmux->provided_sink);
1622 GST_OBJECT_UNLOCK (splitmux);
1624 if (provided_sink == NULL) {
1625 if ((splitmux->sink =
1626 create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1628 splitmux->active_sink = splitmux->sink;
1630 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1631 g_warning ("Could not add sink elements - splitmuxsink will not work");
1632 gst_object_unref (provided_sink);
1636 splitmux->active_sink = provided_sink;
1638 /* The bin holds a ref now, we can drop our tmp ref */
1639 gst_object_unref (provided_sink);
1641 /* Find the sink element */
1642 splitmux->sink = find_sink (splitmux->active_sink);
1643 if (splitmux->sink == NULL) {
1645 ("Could not locate sink element in provided sink - splitmuxsink will not work");
1650 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1651 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1662 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1665 set_next_filename (GstSplitMuxSink * splitmux)
1667 gchar *fname = NULL;
1668 gst_splitmux_sink_ensure_max_files (splitmux);
1670 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1671 splitmux->fragment_id, &fname);
1674 fname = splitmux->location ?
1675 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1678 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1679 g_object_set (splitmux->sink, "location", fname, NULL);
1682 splitmux->fragment_id++;
1686 static GstStateChangeReturn
1687 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1689 GstStateChangeReturn ret;
1690 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1692 switch (transition) {
1693 case GST_STATE_CHANGE_NULL_TO_READY:{
1694 GST_SPLITMUX_LOCK (splitmux);
1695 if (!create_elements (splitmux) || !create_sink (splitmux)) {
1696 ret = GST_STATE_CHANGE_FAILURE;
1697 GST_SPLITMUX_UNLOCK (splitmux);
1700 GST_SPLITMUX_UNLOCK (splitmux);
1701 splitmux->fragment_id = 0;
1702 set_next_filename (splitmux);
1705 case GST_STATE_CHANGE_READY_TO_PAUSED:{
1706 GST_SPLITMUX_LOCK (splitmux);
1707 /* Start by collecting one input on each pad */
1708 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1709 splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
1710 splitmux->muxed_out_time = splitmux->mux_start_time =
1711 splitmux->last_frame_duration = GST_CLOCK_STIME_NONE;
1712 splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1713 splitmux->opening_first_fragment = TRUE;
1714 GST_SPLITMUX_UNLOCK (splitmux);
1717 case GST_STATE_CHANGE_PAUSED_TO_READY:
1718 case GST_STATE_CHANGE_READY_TO_NULL:
1719 GST_SPLITMUX_LOCK (splitmux);
1720 splitmux->state = SPLITMUX_STATE_STOPPED;
1721 /* Wake up any blocked threads */
1722 GST_LOG_OBJECT (splitmux,
1723 "State change -> NULL or READY. Waking threads");
1724 GST_SPLITMUX_BROADCAST (splitmux);
1725 GST_SPLITMUX_UNLOCK (splitmux);
1731 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1732 if (ret == GST_STATE_CHANGE_FAILURE)
1735 switch (transition) {
1736 case GST_STATE_CHANGE_READY_TO_NULL:
1737 GST_SPLITMUX_LOCK (splitmux);
1738 splitmux->fragment_id = 0;
1739 /* Reset internal elements only if no pad contexts are using them */
1740 if (splitmux->contexts == NULL)
1741 gst_splitmux_reset (splitmux);
1742 GST_SPLITMUX_UNLOCK (splitmux);
1750 if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1751 ret == GST_STATE_CHANGE_FAILURE) {
1752 /* Cleanup elements on failed transition out of NULL */
1753 gst_splitmux_reset (splitmux);
1759 register_splitmuxsink (GstPlugin * plugin)
1761 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1762 "Split File Muxing Sink");
1764 return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1765 GST_TYPE_SPLITMUX_SINK);
1769 gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
1771 if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
1772 splitmux->fragment_id = 0;