1 /* GStreamer Muxer bin that splits output stream by size/time
2 * Copyright (C) <2014> Jan Schmidt <jan@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
21 * SECTION:element-splitmuxsink
22 * @short_description: Muxer wrapper for splitting output stream by size or time
24 * This element wraps a muxer and a sink, and starts a new file when the mux
25 * contents are about to cross a threshold of maximum size of maximum time,
26 * splitting at video keyframe boundaries. Exactly one input video stream
27 * can be muxed, with as many accompanying audio and subtitle streams as
30 * By default, it uses mp4mux and filesink, but they can be changed via
31 * the 'muxer' and 'sink' properties.
33 * The minimum file size is 1 GOP, however - so limits may be overrun if the
34 * distance between any 2 keyframes is larger than the limits.
36 * If a video stream is available, the splitting process is driven by the video
37 * stream contents, and the video stream must contain closed GOPs for the output
38 * file parts to be played individually correctly. In the absence of a video
39 * stream, the first available stream is used as reference for synchronization.
42 * <title>Example pipelines</title>
44 * gst-launch-1.0 -e v4l2src num-buffers=500 ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! timeoverlay ! x264enc key-int-max=10 ! h264parse ! splitmuxsink location=video%02d.mov max-size-time=10000000000 max-size-bytes=1000000
46 * Records a video stream captured from a v4l2 device and muxes it into
47 * ISO mp4 files, splitting as needed to limit size/duration to 10 seconds
48 * and 1MB maximum size.
57 #include "gstsplitmuxsink.h"
59 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
60 #define GST_CAT_DEFAULT splitmux_debug
62 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
63 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
64 #define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
65 #define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
78 #define DEFAULT_MAX_SIZE_TIME 0
79 #define DEFAULT_MAX_SIZE_BYTES 0
80 #define DEFAULT_MUXER_OVERHEAD 0.02
81 #define DEFAULT_MUXER "mp4mux"
82 #define DEFAULT_SINK "filesink"
86 SIGNAL_FORMAT_LOCATION,
90 static guint signals[SIGNAL_LAST];
92 static GstStaticPadTemplate video_sink_template =
93 GST_STATIC_PAD_TEMPLATE ("video",
97 static GstStaticPadTemplate audio_sink_template =
98 GST_STATIC_PAD_TEMPLATE ("audio_%u",
101 GST_STATIC_CAPS_ANY);
102 static GstStaticPadTemplate subtitle_sink_template =
103 GST_STATIC_PAD_TEMPLATE ("subtitle_%u",
106 GST_STATIC_CAPS_ANY);
108 static GQuark PAD_CONTEXT;
113 PAD_CONTEXT = g_quark_from_static_string ("pad-context");
116 #define gst_splitmux_sink_parent_class parent_class
117 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
120 static gboolean create_elements (GstSplitMuxSink * splitmux);
121 static gboolean create_sink (GstSplitMuxSink * splitmux);
122 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
123 const GValue * value, GParamSpec * pspec);
124 static void gst_splitmux_sink_get_property (GObject * object, guint prop_id,
125 GValue * value, GParamSpec * pspec);
126 static void gst_splitmux_sink_dispose (GObject * object);
127 static void gst_splitmux_sink_finalize (GObject * object);
129 static GstPad *gst_splitmux_sink_request_new_pad (GstElement * element,
130 GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
131 static void gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad);
133 static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
134 element, GstStateChange transition);
136 static void bus_handler (GstBin * bin, GstMessage * msg);
137 static void set_next_filename (GstSplitMuxSink * splitmux);
138 static void start_next_fragment (GstSplitMuxSink * splitmux);
139 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
140 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
143 mq_stream_buf_new (void)
145 return g_slice_new0 (MqStreamBuf);
149 mq_stream_buf_free (MqStreamBuf * data)
151 g_slice_free (MqStreamBuf, data);
155 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
157 GObjectClass *gobject_class = (GObjectClass *) klass;
158 GstElementClass *gstelement_class = (GstElementClass *) klass;
159 GstBinClass *gstbin_class = (GstBinClass *) klass;
161 gobject_class->set_property = gst_splitmux_sink_set_property;
162 gobject_class->get_property = gst_splitmux_sink_get_property;
163 gobject_class->dispose = gst_splitmux_sink_dispose;
164 gobject_class->finalize = gst_splitmux_sink_finalize;
166 gst_element_class_set_static_metadata (gstelement_class,
167 "Split Muxing Bin", "Generic/Bin/Muxer",
168 "Convenience bin that muxes incoming streams into multiple time/size limited files",
169 "Jan Schmidt <jan@centricular.com>");
171 gst_element_class_add_pad_template (gstelement_class,
172 gst_static_pad_template_get (&video_sink_template));
173 gst_element_class_add_pad_template (gstelement_class,
174 gst_static_pad_template_get (&audio_sink_template));
175 gst_element_class_add_pad_template (gstelement_class,
176 gst_static_pad_template_get (&subtitle_sink_template));
178 gstelement_class->change_state =
179 GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
180 gstelement_class->request_new_pad =
181 GST_DEBUG_FUNCPTR (gst_splitmux_sink_request_new_pad);
182 gstelement_class->release_pad =
183 GST_DEBUG_FUNCPTR (gst_splitmux_sink_release_pad);
185 gstbin_class->handle_message = bus_handler;
187 g_object_class_install_property (gobject_class, PROP_LOCATION,
188 g_param_spec_string ("location", "File Output Pattern",
189 "Format string pattern for the location of the files to write (e.g. video%05d.mp4)",
190 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
191 g_object_class_install_property (gobject_class, PROP_MUXER_OVERHEAD,
192 g_param_spec_double ("mux-overhead", "Muxing Overhead",
193 "Extra size overhead of muxing (0.02 = 2%)", 0.0, 1.0,
194 DEFAULT_MUXER_OVERHEAD,
195 G_PARAM_READWRITE | GST_PARAM_CONTROLLABLE | G_PARAM_STATIC_STRINGS));
197 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
198 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
199 "Max. amount of time per file (in ns, 0=disable)", 0, G_MAXUINT64,
200 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
201 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
202 g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
203 "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
204 DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
206 g_object_class_install_property (gobject_class, PROP_MUXER,
207 g_param_spec_object ("muxer", "Muxer",
208 "The muxer element to use (NULL = default mp4mux)",
209 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
210 g_object_class_install_property (gobject_class, PROP_SINK,
211 g_param_spec_object ("sink", "Sink",
212 "The sink element (or element chain) to use (NULL = default filesink)",
213 GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
216 * GstSplitMuxSink::format-location:
217 * @splitmux: the #GstSplitMuxSink
218 * @fragment_id: the sequence number of the file to be created
220 * Returns: the location to be used for the next output file
222 signals[SIGNAL_FORMAT_LOCATION] =
223 g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
224 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
228 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
230 g_mutex_init (&splitmux->lock);
231 g_cond_init (&splitmux->data_cond);
233 splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
234 splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
235 splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
237 GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
241 gst_splitmux_reset (GstSplitMuxSink * splitmux)
244 gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
246 gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
247 if (splitmux->active_sink)
248 gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
250 splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
255 gst_splitmux_sink_dispose (GObject * object)
257 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
259 G_OBJECT_CLASS (parent_class)->dispose (object);
261 /* Calling parent dispose invalidates all child pointers */
262 splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
267 gst_splitmux_sink_finalize (GObject * object)
269 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
270 g_cond_clear (&splitmux->data_cond);
271 g_mutex_clear (&splitmux->lock);
272 if (splitmux->provided_sink)
273 gst_object_unref (splitmux->provided_sink);
274 if (splitmux->provided_muxer)
275 gst_object_unref (splitmux->provided_muxer);
277 g_free (splitmux->location);
279 /* Make sure to free any un-released contexts */
280 g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_unref, NULL);
281 g_list_free (splitmux->contexts);
283 G_OBJECT_CLASS (parent_class)->finalize (object);
287 gst_splitmux_sink_set_property (GObject * object, guint prop_id,
288 const GValue * value, GParamSpec * pspec)
290 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
294 GST_OBJECT_LOCK (splitmux);
295 g_free (splitmux->location);
296 splitmux->location = g_value_dup_string (value);
297 GST_OBJECT_UNLOCK (splitmux);
300 case PROP_MAX_SIZE_BYTES:
301 GST_OBJECT_LOCK (splitmux);
302 splitmux->threshold_bytes = g_value_get_uint64 (value);
303 GST_OBJECT_UNLOCK (splitmux);
305 case PROP_MAX_SIZE_TIME:
306 GST_OBJECT_LOCK (splitmux);
307 splitmux->threshold_time = g_value_get_uint64 (value);
308 GST_OBJECT_UNLOCK (splitmux);
310 case PROP_MUXER_OVERHEAD:
311 GST_OBJECT_LOCK (splitmux);
312 splitmux->mux_overhead = g_value_get_double (value);
313 GST_OBJECT_UNLOCK (splitmux);
316 GST_OBJECT_LOCK (splitmux);
317 if (splitmux->provided_sink)
318 gst_object_unref (splitmux->provided_sink);
319 splitmux->provided_sink = g_value_dup_object (value);
320 GST_OBJECT_UNLOCK (splitmux);
323 GST_OBJECT_LOCK (splitmux);
324 if (splitmux->provided_muxer)
325 gst_object_unref (splitmux->provided_muxer);
326 splitmux->provided_muxer = g_value_dup_object (value);
327 GST_OBJECT_UNLOCK (splitmux);
330 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
336 gst_splitmux_sink_get_property (GObject * object, guint prop_id,
337 GValue * value, GParamSpec * pspec)
339 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
343 GST_OBJECT_LOCK (splitmux);
344 g_value_set_string (value, splitmux->location);
345 GST_OBJECT_UNLOCK (splitmux);
347 case PROP_MAX_SIZE_BYTES:
348 GST_OBJECT_LOCK (splitmux);
349 g_value_set_uint64 (value, splitmux->threshold_bytes);
350 GST_OBJECT_UNLOCK (splitmux);
352 case PROP_MAX_SIZE_TIME:
353 GST_OBJECT_LOCK (splitmux);
354 g_value_set_uint64 (value, splitmux->threshold_time);
355 GST_OBJECT_UNLOCK (splitmux);
357 case PROP_MUXER_OVERHEAD:
358 GST_OBJECT_LOCK (splitmux);
359 g_value_set_double (value, splitmux->mux_overhead);
360 GST_OBJECT_UNLOCK (splitmux);
363 GST_OBJECT_LOCK (splitmux);
364 g_value_set_object (value, splitmux->provided_sink);
365 GST_OBJECT_UNLOCK (splitmux);
368 GST_OBJECT_LOCK (splitmux);
369 g_value_set_object (value, splitmux->provided_muxer);
370 GST_OBJECT_UNLOCK (splitmux);
373 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
379 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
381 gchar *tmp, *sinkname, *srcname;
384 sinkname = gst_pad_get_name (sink_pad);
386 srcname = g_strdup_printf ("src_%s", tmp);
388 mq_src = gst_element_get_static_pad (mq, srcname);
397 get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
403 /* Request a pad from multiqueue, then connect this one, then
404 * discover the corresponding output pad and return both */
405 mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
409 mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
419 gst_element_release_request_pad (splitmux->mq, mq_sink);
424 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
428 ctx = g_new0 (MqStreamCtx, 1);
429 g_atomic_int_set (&ctx->refcount, 1);
430 ctx->splitmux = splitmux;
431 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
432 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
433 ctx->in_running_time = ctx->out_running_time = 0;
434 g_queue_init (&ctx->queued_bufs);
439 mq_stream_ctx_free (MqStreamCtx * ctx)
441 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
442 g_queue_clear (&ctx->queued_bufs);
447 mq_stream_ctx_unref (MqStreamCtx * ctx)
449 if (g_atomic_int_dec_and_test (&ctx->refcount))
450 mq_stream_ctx_free (ctx);
454 mq_stream_ctx_ref (MqStreamCtx * ctx)
456 g_atomic_int_inc (&ctx->refcount);
460 _pad_block_destroy_sink_notify (MqStreamCtx * ctx)
462 ctx->sink_pad_block_id = 0;
463 mq_stream_ctx_unref (ctx);
467 _pad_block_destroy_src_notify (MqStreamCtx * ctx)
469 ctx->src_pad_block_id = 0;
470 mq_stream_ctx_unref (ctx);
474 send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
476 gchar *location = NULL;
478 const gchar *msg_name = opened ?
479 "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
481 g_object_get (splitmux->sink, "location", &location, NULL);
483 msg = gst_message_new_element (GST_OBJECT (splitmux),
484 gst_structure_new (msg_name,
485 "location", G_TYPE_STRING, location,
486 "running-time", GST_TYPE_CLOCK_TIME,
487 splitmux->reference_ctx->out_running_time, NULL));
488 gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
493 /* Called with lock held, drops the lock to send EOS to the
497 send_eos (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
502 eos = gst_event_new_eos ();
503 pad = gst_pad_get_peer (ctx->srcpad);
507 GST_INFO_OBJECT (splitmux, "Sending EOS on %" GST_PTR_FORMAT, pad);
508 GST_SPLITMUX_UNLOCK (splitmux);
509 gst_pad_send_event (pad, eos);
510 GST_SPLITMUX_LOCK (splitmux);
512 gst_object_unref (pad);
515 /* Called with splitmux lock held to check if this output
516 * context needs to sleep to wait for the release of the
517 * next GOP, or to send EOS to close out the current file
520 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
524 GST_LOG_OBJECT (ctx->srcpad,
525 "Checking running time %" GST_TIME_FORMAT " against max %"
526 GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
527 GST_TIME_ARGS (splitmux->max_out_running_time));
529 if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
530 ctx->out_running_time < splitmux->max_out_running_time) {
531 splitmux->have_muxed_something = TRUE;
535 if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
538 if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
539 if (ctx->out_eos == FALSE) {
540 send_eos (splitmux, ctx);
543 } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
544 start_next_fragment (splitmux);
548 GST_INFO_OBJECT (ctx->srcpad,
549 "Sleeping for running time %"
550 GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
551 GST_TIME_ARGS (ctx->out_running_time),
552 GST_TIME_ARGS (splitmux->max_out_running_time));
553 ctx->out_blocked = TRUE;
554 /* Expand the mq if needed before sleeping */
555 check_queue_length (splitmux, ctx);
556 GST_SPLITMUX_WAIT (splitmux);
557 ctx->out_blocked = FALSE;
558 GST_INFO_OBJECT (ctx->srcpad,
559 "Woken for new max running time %" GST_TIME_FORMAT,
560 GST_TIME_ARGS (splitmux->max_out_running_time));
564 static GstPadProbeReturn
565 handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
567 GstSplitMuxSink *splitmux = ctx->splitmux;
568 MqStreamBuf *buf_info = NULL;
570 GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
572 /* FIXME: Handle buffer lists, until then make it clear they won't work */
573 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
574 g_warning ("Buffer list handling not implemented");
575 return GST_PAD_PROBE_DROP;
577 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
578 GstEvent *event = gst_pad_probe_info_get_event (info);
580 GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
582 switch (GST_EVENT_TYPE (event)) {
583 case GST_EVENT_SEGMENT:
584 gst_event_copy_segment (event, &ctx->out_segment);
586 case GST_EVENT_FLUSH_STOP:
587 GST_SPLITMUX_LOCK (splitmux);
588 gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
589 g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
590 g_queue_clear (&ctx->queued_bufs);
591 ctx->flushing = FALSE;
592 GST_SPLITMUX_UNLOCK (splitmux);
594 case GST_EVENT_FLUSH_START:
595 GST_SPLITMUX_LOCK (splitmux);
596 GST_LOG_OBJECT (pad, "Flush start");
597 ctx->flushing = TRUE;
598 GST_SPLITMUX_BROADCAST (splitmux);
599 GST_SPLITMUX_UNLOCK (splitmux);
602 GST_SPLITMUX_LOCK (splitmux);
603 if (splitmux->state == SPLITMUX_STATE_STOPPED)
606 GST_SPLITMUX_UNLOCK (splitmux);
611 gst_event_parse_gap (event, &gap_ts, NULL);
612 if (gap_ts == GST_CLOCK_TIME_NONE)
615 GST_SPLITMUX_LOCK (splitmux);
617 gap_ts = gst_segment_to_running_time (&ctx->out_segment,
618 GST_FORMAT_TIME, gap_ts);
620 GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
621 GST_TIME_ARGS (gap_ts));
623 if (splitmux->state == SPLITMUX_STATE_STOPPED)
625 ctx->out_running_time = gap_ts;
626 complete_or_wait_on_out (splitmux, ctx);
627 GST_SPLITMUX_UNLOCK (splitmux);
633 return GST_PAD_PROBE_PASS;
636 /* Allow everything through until the configured next stopping point */
637 GST_SPLITMUX_LOCK (splitmux);
639 buf_info = g_queue_pop_tail (&ctx->queued_bufs);
640 if (buf_info == NULL)
641 /* Can only happen due to a poorly timed flush */
644 /* If we have popped a keyframe, decrement the queued_gop count */
645 if (buf_info->keyframe && splitmux->queued_gops > 0)
646 splitmux->queued_gops--;
648 ctx->out_running_time = buf_info->run_ts;
650 GST_LOG_OBJECT (splitmux,
651 "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
652 " size %" G_GSIZE_FORMAT,
653 pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
655 if (splitmux->opening_first_fragment) {
656 send_fragment_opened_closed_msg (splitmux, TRUE);
657 splitmux->opening_first_fragment = FALSE;
660 complete_or_wait_on_out (splitmux, ctx);
662 if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
663 splitmux->muxed_out_time < buf_info->run_ts)
664 splitmux->muxed_out_time = buf_info->run_ts;
666 splitmux->muxed_out_bytes += buf_info->buf_size;
668 #ifndef GST_DISABLE_GST_DEBUG
670 GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
671 GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
672 " run ts %" GST_TIME_FORMAT, buf,
673 GST_TIME_ARGS (ctx->out_running_time));
677 GST_SPLITMUX_UNLOCK (splitmux);
679 mq_stream_buf_free (buf_info);
681 return GST_PAD_PROBE_PASS;
684 GST_SPLITMUX_UNLOCK (splitmux);
685 return GST_PAD_PROBE_DROP;
689 resend_sticky (GstPad * pad, GstEvent ** event, GstPad * peer)
691 return gst_pad_send_event (peer, gst_event_ref (*event));
695 restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
697 GstPad *peer = gst_pad_get_peer (ctx->srcpad);
699 gst_pad_sticky_events_foreach (ctx->srcpad,
700 (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
703 ctx->out_eos = FALSE;
705 gst_object_unref (peer);
708 /* Called with lock held when a fragment
709 * reaches EOS and it is time to restart
713 start_next_fragment (GstSplitMuxSink * splitmux)
715 /* 1 change to new file */
716 gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
717 gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
719 set_next_filename (splitmux);
721 gst_element_sync_state_with_parent (splitmux->active_sink);
722 gst_element_sync_state_with_parent (splitmux->muxer);
724 g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
726 /* Switch state and go back to processing */
727 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
729 if (!splitmux->reference_ctx->in_eos) {
730 splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
732 splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
733 splitmux->have_muxed_something = FALSE;
735 splitmux->have_muxed_something =
736 (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
738 /* Store the overflow parameters as the basis for the next fragment */
739 splitmux->mux_start_time = splitmux->muxed_out_time;
740 splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
742 GST_DEBUG_OBJECT (splitmux,
743 "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
744 GST_TIME_ARGS (splitmux->max_out_running_time));
746 send_fragment_opened_closed_msg (splitmux, TRUE);
748 GST_SPLITMUX_BROADCAST (splitmux);
752 bus_handler (GstBin * bin, GstMessage * message)
754 GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (bin);
756 switch (GST_MESSAGE_TYPE (message)) {
757 case GST_MESSAGE_EOS:
758 /* If the state is draining out the current file, drop this EOS */
759 GST_SPLITMUX_LOCK (splitmux);
761 send_fragment_opened_closed_msg (splitmux, FALSE);
763 if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
764 splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
765 GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
766 splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
767 GST_SPLITMUX_BROADCAST (splitmux);
769 gst_message_unref (message);
770 GST_SPLITMUX_UNLOCK (splitmux);
773 GST_SPLITMUX_UNLOCK (splitmux);
779 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
782 /* Called with splitmux lock held */
783 /* Called when entering ProcessingCompleteGop state
784 * Assess if mq contents overflowed the current file
785 * -> If yes, need to switch to new file
786 * -> if no, set max_out_running_time to let this GOP in and
787 * go to COLLECTING_GOP_START state
790 handle_gathered_gop (GstSplitMuxSink * splitmux)
793 gsize queued_bytes = 0;
794 GstClockTime queued_time = 0;
796 /* Assess if the multiqueue contents overflowed the current file */
797 for (cur = g_list_first (splitmux->contexts);
798 cur != NULL; cur = g_list_next (cur)) {
799 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
800 if (tmpctx->in_running_time > queued_time)
801 queued_time = tmpctx->in_running_time;
802 queued_bytes += tmpctx->in_bytes;
805 g_assert (queued_bytes >= splitmux->mux_start_bytes);
806 g_assert (queued_time >= splitmux->mux_start_time);
808 queued_bytes -= splitmux->mux_start_bytes;
809 queued_time -= splitmux->mux_start_time;
811 /* Expand queued bytes estimate by muxer overhead */
812 queued_bytes += (queued_bytes * splitmux->mux_overhead);
814 GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
815 " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
817 /* Check for overrun - have we output at least one byte and overrun
818 * either threshold? */
819 if (splitmux->have_muxed_something &&
820 ((splitmux->threshold_bytes > 0 &&
821 queued_bytes >= splitmux->threshold_bytes) ||
822 (splitmux->threshold_time > 0 &&
823 queued_time >= splitmux->threshold_time))) {
825 splitmux->state = SPLITMUX_STATE_ENDING_FILE;
827 GST_INFO_OBJECT (splitmux,
828 "mq overflowed since last, draining out. max out TS is %"
829 GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
830 GST_SPLITMUX_BROADCAST (splitmux);
834 GST_LOG_OBJECT (splitmux,
835 "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
836 " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
837 splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
838 queued_bytes, GST_TIME_ARGS (queued_time));
840 /* Wake everyone up to push this one GOP, then sleep */
841 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
842 splitmux->have_muxed_something = TRUE;
844 if (!splitmux->reference_ctx->in_eos)
845 splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
847 splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
849 GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
850 GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
851 GST_SPLITMUX_BROADCAST (splitmux);
856 /* Called with splitmux lock held */
857 /* Called from each input pad when it is has all the pieces
858 * for a GOP or EOS, starting with the reference pad which has set the
859 * splitmux->max_in_running_time
862 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
865 gboolean ready = TRUE;
866 GstClockTime current_max_in_running_time;
868 if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
869 /* Iterate each pad, and check that the input running time is at least
870 * up to the reference running time, and if so handle the collected GOP */
871 GST_LOG_OBJECT (splitmux, "Checking GOP collected, ctx %p", ctx);
872 for (cur = g_list_first (splitmux->contexts);
873 cur != NULL; cur = g_list_next (cur)) {
874 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
876 GST_LOG_OBJECT (splitmux,
877 "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
878 " EOS %d", tmpctx, tmpctx->srcpad,
879 GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
881 if (tmpctx->in_running_time < splitmux->max_in_running_time &&
883 GST_LOG_OBJECT (splitmux,
884 "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
885 tmpctx, tmpctx->srcpad);
891 GST_DEBUG_OBJECT (splitmux,
892 "Collected GOP is complete. Processing (ctx %p)", ctx);
893 /* All pads have a complete GOP, release it into the multiqueue */
894 handle_gathered_gop (splitmux);
898 /* Some pad is not yet ready, or GOP is being pushed
899 * either way, sleep and wait to get woken */
900 current_max_in_running_time = splitmux->max_in_running_time;
901 while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
902 splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
904 (current_max_in_running_time == splitmux->max_in_running_time)) {
906 GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
907 splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
908 "GOP complete" : "EOF draining", ctx);
909 GST_SPLITMUX_WAIT (splitmux);
911 GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
916 check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
919 guint cur_len = g_queue_get_length (&ctx->queued_bufs);
921 GST_DEBUG_OBJECT (ctx->sinkpad,
922 "Checking queue length len %u cur_max %u queued gops %u",
923 cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
925 if (cur_len >= splitmux->mq_max_buffers) {
926 gboolean allow_grow = FALSE;
928 /* If collecting a GOP and this pad might block,
929 * and there isn't already a pending GOP in the queue
932 if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
933 ctx->in_running_time < splitmux->max_in_running_time &&
934 splitmux->queued_gops <= 1) {
936 } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
937 ctx->is_reference && splitmux->queued_gops <= 1) {
942 for (cur = g_list_first (splitmux->contexts);
943 cur != NULL; cur = g_list_next (cur)) {
944 MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
945 GST_DEBUG_OBJECT (tmpctx->sinkpad,
946 " len %u out_blocked %d",
947 g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
948 /* If another stream is starving, grow */
949 if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
956 splitmux->mq_max_buffers = cur_len + 1;
958 GST_INFO_OBJECT (splitmux,
959 "Multiqueue overrun - enlarging to %u buffers ctx %p",
960 splitmux->mq_max_buffers, ctx);
962 g_object_set (splitmux->mq, "max-size-buffers",
963 splitmux->mq_max_buffers, NULL);
968 static GstPadProbeReturn
969 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
971 GstSplitMuxSink *splitmux = ctx->splitmux;
973 MqStreamBuf *buf_info = NULL;
976 gboolean keyframe = FALSE;
978 GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
980 /* FIXME: Handle buffer lists, until then make it clear they won't work */
981 if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
982 g_warning ("Buffer list handling not implemented");
983 return GST_PAD_PROBE_DROP;
985 if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
986 GstEvent *event = gst_pad_probe_info_get_event (info);
987 switch (GST_EVENT_TYPE (event)) {
988 case GST_EVENT_SEGMENT:
989 gst_event_copy_segment (event, &ctx->in_segment);
991 case GST_EVENT_FLUSH_STOP:
992 GST_SPLITMUX_LOCK (splitmux);
993 gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
996 ctx->in_running_time = 0;
997 GST_SPLITMUX_UNLOCK (splitmux);
1000 GST_SPLITMUX_LOCK (splitmux);
1003 if (splitmux->state == SPLITMUX_STATE_STOPPED)
1006 if (ctx->is_reference) {
1007 GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
1008 /* Act as if this is a new keyframe with infinite timestamp */
1009 splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
1010 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1011 /* Wake up other input pads to collect this GOP */
1012 GST_SPLITMUX_BROADCAST (splitmux);
1013 check_completed_gop (splitmux, ctx);
1014 } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
1015 /* If we are waiting for a GOP to be completed (ie, for aux
1016 * pads to catch up), then this pad is complete, so check
1017 * if the whole GOP is.
1019 check_completed_gop (splitmux, ctx);
1021 GST_SPLITMUX_UNLOCK (splitmux);
1026 return GST_PAD_PROBE_PASS;
1029 buf = gst_pad_probe_info_get_buffer (info);
1030 buf_info = mq_stream_buf_new ();
1032 if (GST_BUFFER_PTS_IS_VALID (buf))
1033 ts = GST_BUFFER_PTS (buf);
1035 ts = GST_BUFFER_DTS (buf);
1037 GST_SPLITMUX_LOCK (splitmux);
1039 if (splitmux->state == SPLITMUX_STATE_STOPPED)
1042 /* If this buffer has a timestamp, advance the input timestamp of the
1044 if (GST_CLOCK_TIME_IS_VALID (ts)) {
1045 GstClockTime running_time =
1046 gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1047 GST_BUFFER_TIMESTAMP (buf));
1049 if (GST_CLOCK_TIME_IS_VALID (running_time) &&
1050 (ctx->in_running_time == GST_CLOCK_TIME_NONE
1051 || running_time > ctx->in_running_time))
1052 ctx->in_running_time = running_time;
1055 /* Try to make sure we have a valid running time */
1056 if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
1057 ctx->in_running_time =
1058 gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
1059 ctx->in_segment.start);
1062 buf_info->run_ts = ctx->in_running_time;
1063 buf_info->buf_size = gst_buffer_get_size (buf);
1065 /* Update total input byte counter for overflow detect */
1066 ctx->in_bytes += buf_info->buf_size;
1068 /* initialize mux_start_time */
1069 if (ctx->is_reference && splitmux->mux_start_time == 0)
1070 splitmux->mux_start_time = buf_info->run_ts;
1072 GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
1073 " total in_bytes %" G_GSIZE_FORMAT,
1074 GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
1081 switch (splitmux->state) {
1082 case SPLITMUX_STATE_COLLECTING_GOP_START:
1083 if (ctx->is_reference) {
1084 /* If a keyframe, we have a complete GOP */
1085 if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
1086 !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
1087 splitmux->max_in_running_time >= ctx->in_running_time) {
1088 /* Pass this buffer through */
1092 GST_INFO_OBJECT (pad,
1093 "Have keyframe with running time %" GST_TIME_FORMAT,
1094 GST_TIME_ARGS (ctx->in_running_time));
1096 splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
1097 splitmux->max_in_running_time = ctx->in_running_time;
1098 /* Wake up other input pads to collect this GOP */
1099 GST_SPLITMUX_BROADCAST (splitmux);
1100 check_completed_gop (splitmux, ctx);
1102 /* We're still waiting for a keyframe on the reference pad, sleep */
1103 GST_LOG_OBJECT (pad, "Sleeping for GOP start");
1104 GST_SPLITMUX_WAIT (splitmux);
1105 GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
1109 case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
1110 /* After a GOP start is found, this buffer might complete the GOP */
1111 /* If we overran the target timestamp, it might be time to process
1112 * the GOP, otherwise bail out for more data
1114 GST_LOG_OBJECT (pad,
1115 "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
1116 GST_TIME_ARGS (ctx->in_running_time),
1117 GST_TIME_ARGS (splitmux->max_in_running_time));
1119 if (ctx->in_running_time < splitmux->max_in_running_time) {
1124 GST_LOG_OBJECT (pad,
1125 "Collected last packet of GOP. Checking other pads");
1126 check_completed_gop (splitmux, ctx);
1128 case SPLITMUX_STATE_ENDING_FILE:
1129 case SPLITMUX_STATE_START_NEXT_FRAGMENT:
1130 /* A fragment is ending, wait until that's done before continuing */
1131 GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
1132 GST_SPLITMUX_WAIT (splitmux);
1133 GST_DEBUG_OBJECT (pad,
1134 "Done sleeping for fragment restart state now %d", splitmux->state);
1140 } while (loop_again);
1143 splitmux->queued_gops++;
1144 buf_info->keyframe = TRUE;
1147 /* Now add this buffer to the queue just before returning */
1148 g_queue_push_head (&ctx->queued_bufs, buf_info);
1150 /* Check the buffer will fit in the mq */
1151 check_queue_length (splitmux, ctx);
1153 GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
1154 " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
1156 GST_SPLITMUX_UNLOCK (splitmux);
1157 return GST_PAD_PROBE_PASS;
1160 GST_SPLITMUX_UNLOCK (splitmux);
1162 mq_stream_buf_free (buf_info);
1163 return GST_PAD_PROBE_PASS;
1167 gst_splitmux_sink_request_new_pad (GstElement * element,
1168 GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
1170 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1171 GstPadTemplate *mux_template = NULL;
1173 GstPad *mq_sink, *mq_src;
1175 gboolean is_video = FALSE;
1178 GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
1180 GST_SPLITMUX_LOCK (splitmux);
1181 if (!create_elements (splitmux))
1184 if (templ->name_template) {
1185 if (g_str_equal (templ->name_template, "video")) {
1186 /* FIXME: Look for a pad template with matching caps, rather than by name */
1188 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1189 (splitmux->muxer), "video_%u");
1194 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1195 (splitmux->muxer), templ->name_template);
1197 if (mux_template == NULL) {
1198 /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
1200 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
1201 (splitmux->muxer), "sink_%d");
1205 res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps);
1210 gname = g_strdup ("video");
1211 else if (name == NULL)
1212 gname = gst_pad_get_name (res);
1214 gname = g_strdup (name);
1216 if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
1217 gst_element_release_request_pad (splitmux->muxer, res);
1218 gst_object_unref (GST_OBJECT (res));
1222 if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
1223 gst_element_release_request_pad (splitmux->muxer, res);
1224 gst_object_unref (GST_OBJECT (res));
1225 gst_element_release_request_pad (splitmux->mq, mq_sink);
1226 gst_object_unref (GST_OBJECT (mq_sink));
1230 gst_object_unref (GST_OBJECT (res));
1232 ctx = mq_stream_ctx_new (splitmux);
1233 ctx->srcpad = mq_src;
1234 ctx->sinkpad = mq_sink;
1236 mq_stream_ctx_ref (ctx);
1237 ctx->src_pad_block_id =
1238 gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1239 (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
1240 _pad_block_destroy_src_notify);
1241 if (is_video && splitmux->reference_ctx != NULL) {
1242 splitmux->reference_ctx->is_reference = FALSE;
1243 splitmux->reference_ctx = NULL;
1245 if (splitmux->reference_ctx == NULL) {
1246 splitmux->reference_ctx = ctx;
1247 ctx->is_reference = TRUE;
1250 res = gst_ghost_pad_new (gname, mq_sink);
1251 g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
1253 mq_stream_ctx_ref (ctx);
1254 ctx->sink_pad_block_id =
1255 gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
1256 (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
1257 _pad_block_destroy_sink_notify);
1259 GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
1260 " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
1262 splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
1266 gst_object_unref (mq_sink);
1267 gst_object_unref (mq_src);
1269 gst_pad_set_active (res, TRUE);
1270 gst_element_add_pad (element, res);
1271 GST_SPLITMUX_UNLOCK (splitmux);
1275 GST_SPLITMUX_UNLOCK (splitmux);
1280 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
1282 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1283 GstPad *mqsink, *mqsrc, *muxpad;
1285 (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
1287 GST_SPLITMUX_LOCK (splitmux);
1289 if (splitmux->muxer == NULL || splitmux->mq == NULL)
1290 goto fail; /* Elements don't exist yet - nothing to release */
1292 GST_INFO_OBJECT (pad, "releasing request pad");
1294 mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1295 mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
1296 muxpad = gst_pad_get_peer (mqsrc);
1298 /* Remove the context from our consideration */
1299 splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
1301 if (ctx->sink_pad_block_id)
1302 gst_pad_remove_probe (ctx->sinkpad, ctx->sink_pad_block_id);
1304 if (ctx->src_pad_block_id)
1305 gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
1307 /* Can release the context now */
1308 mq_stream_ctx_unref (ctx);
1310 /* Release and free the mq input */
1311 gst_element_release_request_pad (splitmux->mq, mqsink);
1313 /* Release and free the muxer input */
1314 gst_element_release_request_pad (splitmux->muxer, muxpad);
1316 gst_object_unref (mqsink);
1317 gst_object_unref (mqsrc);
1318 gst_object_unref (muxpad);
1320 gst_element_remove_pad (element, pad);
1322 /* Reset the internal elements only after all request pads are released */
1323 if (splitmux->contexts == NULL)
1324 gst_splitmux_reset (splitmux);
1327 GST_SPLITMUX_UNLOCK (splitmux);
1331 create_element (GstSplitMuxSink * splitmux,
1332 const gchar * factory, const gchar * name)
1334 GstElement *ret = gst_element_factory_make (factory, name);
1336 g_warning ("Failed to create %s - splitmuxsink will not work", name);
1340 if (!gst_bin_add (GST_BIN (splitmux), ret)) {
1341 g_warning ("Could not add %s element - splitmuxsink will not work", name);
1342 gst_object_unref (ret);
1350 create_elements (GstSplitMuxSink * splitmux)
1352 /* Create internal elements */
1353 if (splitmux->mq == NULL) {
1355 create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
1358 splitmux->mq_max_buffers = 5;
1359 /* No bytes or time limit, we limit buffers manually */
1360 g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
1361 (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
1364 if (splitmux->muxer == NULL) {
1365 GstElement *provided_muxer = NULL;
1367 GST_OBJECT_LOCK (splitmux);
1368 if (splitmux->provided_muxer != NULL)
1369 provided_muxer = gst_object_ref (splitmux->provided_muxer);
1370 GST_OBJECT_UNLOCK (splitmux);
1372 if (provided_muxer == NULL) {
1373 if ((splitmux->muxer =
1374 create_element (splitmux, "mp4mux", "muxer")) == NULL)
1377 if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
1378 g_warning ("Could not add muxer element - splitmuxsink will not work");
1379 gst_object_unref (provided_muxer);
1383 splitmux->muxer = provided_muxer;
1384 gst_object_unref (provided_muxer);
1394 find_sink (GstElement * e)
1396 GstElement *res = NULL;
1398 gboolean done = FALSE;
1399 GValue data = { 0, };
1401 if (!GST_IS_BIN (e))
1404 iter = gst_bin_iterate_sinks (GST_BIN (e));
1406 switch (gst_iterator_next (iter, &data)) {
1407 case GST_ITERATOR_OK:
1409 GstElement *child = g_value_get_object (&data);
1410 if (g_object_class_find_property (G_OBJECT_GET_CLASS (child),
1411 "location") != NULL) {
1415 g_value_reset (&data);
1418 case GST_ITERATOR_RESYNC:
1419 gst_iterator_resync (iter);
1421 case GST_ITERATOR_DONE:
1424 case GST_ITERATOR_ERROR:
1425 g_assert_not_reached ();
1429 g_value_unset (&data);
1430 gst_iterator_free (iter);
1436 create_sink (GstSplitMuxSink * splitmux)
1438 GstElement *provided_sink = NULL;
1440 g_return_val_if_fail (splitmux->active_sink == NULL, TRUE);
1442 GST_OBJECT_LOCK (splitmux);
1443 if (splitmux->provided_sink != NULL)
1444 provided_sink = gst_object_ref (splitmux->provided_sink);
1445 GST_OBJECT_UNLOCK (splitmux);
1447 if (provided_sink == NULL) {
1448 if ((splitmux->sink =
1449 create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
1451 splitmux->active_sink = splitmux->sink;
1453 if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
1454 g_warning ("Could not add sink elements - splitmuxsink will not work");
1455 gst_object_unref (provided_sink);
1459 splitmux->active_sink = provided_sink;
1461 /* The bin holds a ref now, we can drop our tmp ref */
1462 gst_object_unref (provided_sink);
1464 /* Find the sink element */
1465 splitmux->sink = find_sink (splitmux->active_sink);
1466 if (splitmux->sink == NULL) {
1468 ("Could not locate sink element in provided sink - splitmuxsink will not work");
1473 if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
1474 g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
1484 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
1487 set_next_filename (GstSplitMuxSink * splitmux)
1489 gchar *fname = NULL;
1491 g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
1492 splitmux->fragment_id, &fname);
1495 fname = splitmux->location ?
1496 g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
1499 GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
1500 g_object_set (splitmux->sink, "location", fname, NULL);
1503 splitmux->fragment_id++;
1507 static GstStateChangeReturn
1508 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
1510 GstStateChangeReturn ret;
1511 GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
1513 switch (transition) {
1514 case GST_STATE_CHANGE_NULL_TO_READY:{
1515 GST_SPLITMUX_LOCK (splitmux);
1516 if (!create_elements (splitmux) || !create_sink (splitmux)) {
1517 ret = GST_STATE_CHANGE_FAILURE;
1518 GST_SPLITMUX_UNLOCK (splitmux);
1521 GST_SPLITMUX_UNLOCK (splitmux);
1522 splitmux->fragment_id = 0;
1523 set_next_filename (splitmux);
1526 case GST_STATE_CHANGE_READY_TO_PAUSED:{
1527 GST_SPLITMUX_LOCK (splitmux);
1528 /* Start by collecting one input on each pad */
1529 splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
1530 splitmux->max_in_running_time = 0;
1531 splitmux->muxed_out_time = splitmux->mux_start_time = 0;
1532 splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
1533 splitmux->opening_first_fragment = TRUE;
1534 GST_SPLITMUX_UNLOCK (splitmux);
1537 case GST_STATE_CHANGE_PAUSED_TO_READY:
1538 case GST_STATE_CHANGE_READY_TO_NULL:
1539 GST_SPLITMUX_LOCK (splitmux);
1540 splitmux->state = SPLITMUX_STATE_STOPPED;
1541 /* Wake up any blocked threads */
1542 GST_LOG_OBJECT (splitmux,
1543 "State change -> NULL or READY. Waking threads");
1544 GST_SPLITMUX_BROADCAST (splitmux);
1545 GST_SPLITMUX_UNLOCK (splitmux);
1551 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1552 if (ret == GST_STATE_CHANGE_FAILURE)
1555 switch (transition) {
1556 case GST_STATE_CHANGE_READY_TO_NULL:
1557 GST_SPLITMUX_LOCK (splitmux);
1558 splitmux->fragment_id = 0;
1559 /* Reset internal elements only if no pad contexts are using them */
1560 if (splitmux->contexts == NULL)
1561 gst_splitmux_reset (splitmux);
1562 GST_SPLITMUX_UNLOCK (splitmux);
1570 if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
1571 ret == GST_STATE_CHANGE_FAILURE) {
1572 /* Cleanup elements on failed transition out of NULL */
1573 gst_splitmux_reset (splitmux);
1579 register_splitmuxsink (GstPlugin * plugin)
1581 GST_DEBUG_CATEGORY_INIT (splitmux_debug, "splitmuxsink", 0,
1582 "Split File Muxing Sink");
1584 return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
1585 GST_TYPE_SPLITMUX_SINK);