* This element wraps a muxer and a sink, and starts a new file when the mux
* contents are about to cross a threshold of maximum size of maximum time,
* splitting at video keyframe boundaries. Exactly one input video stream
- * is required, with as many accompanying audio and subtitle streams as
+ * can be muxed, with as many accompanying audio and subtitle streams as
* desired.
*
* By default, it uses mp4mux and filesink, but they can be changed via
* The minimum file size is 1 GOP, however - so limits may be overrun if the
* distance between any 2 keyframes is larger than the limits.
*
- * The splitting process is driven by the video stream contents, and
- * the video stream must contain closed GOPs for the output file parts
- * to be played individually correctly.
+ * If a video stream is available, the splitting process is driven by the video
+ * stream contents, and the video stream must contain closed GOPs for the output
+ * file parts to be played individually correctly. In the absence of a video
+ * stream, the first available stream is used as reference for synchronization.
*
* <refsect2>
* <title>Example pipelines</title>
#endif
#include <string.h>
+#include <glib/gstdio.h>
+#include <gst/video/video.h>
#include "gstsplitmuxsink.h"
GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
#define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
#define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
-#define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
-#define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
+#define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
+#define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
+
+#define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
+#define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
enum
{
PROP_LOCATION,
PROP_MAX_SIZE_TIME,
PROP_MAX_SIZE_BYTES,
+ PROP_MAX_SIZE_TIMECODE,
+ PROP_SEND_KEYFRAME_REQUESTS,
+ PROP_MAX_FILES,
PROP_MUXER_OVERHEAD,
PROP_MUXER,
PROP_SINK
#define DEFAULT_MAX_SIZE_TIME 0
#define DEFAULT_MAX_SIZE_BYTES 0
+#define DEFAULT_MAX_FILES 0
#define DEFAULT_MUXER_OVERHEAD 0.02
+#define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
#define DEFAULT_MUXER "mp4mux"
#define DEFAULT_SINK "filesink"
+enum
+{
+ SIGNAL_FORMAT_LOCATION,
+ SIGNAL_FORMAT_LOCATION_FULL,
+ SIGNAL_LAST
+};
+
+static guint signals[SIGNAL_LAST];
+
static GstStaticPadTemplate video_sink_template =
GST_STATIC_PAD_TEMPLATE ("video",
GST_PAD_SINK,
G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
_do_init ());
-static gboolean create_elements (GstSplitMuxSink * splitmux);
+static gboolean create_muxer (GstSplitMuxSink * splitmux);
static gboolean create_sink (GstSplitMuxSink * splitmux);
static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
element, GstStateChange transition);
static void bus_handler (GstBin * bin, GstMessage * msg);
-static void set_next_filename (GstSplitMuxSink * splitmux);
-static void start_next_fragment (GstSplitMuxSink * splitmux);
-static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
+static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
+static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
static void mq_stream_ctx_unref (MqStreamCtx * ctx);
+static void grow_blocked_queues (GstSplitMuxSink * splitmux);
+
+static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
+static GstElement *create_element (GstSplitMuxSink * splitmux,
+ const gchar * factory, const gchar * name, gboolean locked);
+
+static void do_async_done (GstSplitMuxSink * splitmux);
static MqStreamBuf *
mq_stream_buf_new (void)
g_slice_free (MqStreamBuf, data);
}
+static SplitMuxOutputCommand *
+out_cmd_buf_new (void)
+{
+ return g_slice_new0 (SplitMuxOutputCommand);
+}
+
+static void
+out_cmd_buf_free (SplitMuxOutputCommand * data)
+{
+ g_slice_free (SplitMuxOutputCommand, data);
+}
+
static void
gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
{
"Convenience bin that muxes incoming streams into multiple time/size limited files",
"Jan Schmidt <jan@centricular.com>");
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&video_sink_template));
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&audio_sink_template));
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&subtitle_sink_template));
+ gst_element_class_add_static_pad_template (gstelement_class,
+ &video_sink_template);
+ gst_element_class_add_static_pad_template (gstelement_class,
+ &audio_sink_template);
+ gst_element_class_add_static_pad_template (gstelement_class,
+ &subtitle_sink_template);
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
"Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
+ g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
+ "Maximum difference in timecode between first and last frame. "
+ "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
+ "Will only be effective if a timecode track is present.",
+ NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
+ g_param_spec_boolean ("send-keyframe-requests",
+ "Request keyframes at max-size-time",
+ "Request a keyframe every max-size-time ns to try splitting at that point. "
+ "Needs max-size-bytes to be 0 in order to be effective.",
+ DEFAULT_SEND_KEYFRAME_REQUESTS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_MAX_FILES,
+ g_param_spec_uint ("max-files", "Max files",
+ "Maximum number of files to keep on disk. Once the maximum is reached,"
+ "old files start to be deleted to make room for new ones.", 0,
+ G_MAXUINT, DEFAULT_MAX_FILES,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
g_object_class_install_property (gobject_class, PROP_MUXER,
g_param_spec_object ("muxer", "Muxer",
g_param_spec_object ("sink", "Sink",
"The sink element (or element chain) to use (NULL = default filesink)",
GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * GstSplitMuxSink::format-location:
+ * @splitmux: the #GstSplitMuxSink
+ * @fragment_id: the sequence number of the file to be created
+ *
+ * Returns: the location to be used for the next output file
+ */
+ signals[SIGNAL_FORMAT_LOCATION] =
+ g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
+
+ /**
+ * GstSplitMuxSink::format-location-full:
+ * @splitmux: the #GstSplitMuxSink
+ * @fragment_id: the sequence number of the file to be created
+ * @first_sample: A #GstSample containing the first buffer
+ * from the reference stream in the new file
+ *
+ * Returns: the location to be used for the next output file
+ */
+ signals[SIGNAL_FORMAT_LOCATION_FULL] =
+ g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
+ G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
+ GST_TYPE_SAMPLE);
}
static void
gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
{
g_mutex_init (&splitmux->lock);
- g_cond_init (&splitmux->data_cond);
+ g_cond_init (&splitmux->input_cond);
+ g_cond_init (&splitmux->output_cond);
+ g_queue_init (&splitmux->out_cmd_q);
splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
+ splitmux->max_files = DEFAULT_MAX_FILES;
+ splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
+ splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
+
+ splitmux->threshold_timecode_str = NULL;
GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
}
static void
gst_splitmux_reset (GstSplitMuxSink * splitmux)
{
- if (splitmux->mq)
- gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
- if (splitmux->muxer)
+ if (splitmux->muxer) {
+ gst_element_set_locked_state (splitmux->muxer, TRUE);
+ gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
- if (splitmux->active_sink)
+ }
+ if (splitmux->active_sink) {
+ gst_element_set_locked_state (splitmux->active_sink, TRUE);
+ gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
+ }
- splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
- NULL;
+ splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
}
static void
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
- G_OBJECT_CLASS (parent_class)->dispose (object);
-
/* Calling parent dispose invalidates all child pointers */
- splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
- NULL;
+ splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
+
+ G_OBJECT_CLASS (parent_class)->dispose (object);
}
static void
gst_splitmux_sink_finalize (GObject * object)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
- g_cond_clear (&splitmux->data_cond);
+ g_cond_clear (&splitmux->input_cond);
+ g_cond_clear (&splitmux->output_cond);
+ g_mutex_clear (&splitmux->lock);
+ g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
+ g_queue_clear (&splitmux->out_cmd_q);
+
if (splitmux->provided_sink)
gst_object_unref (splitmux->provided_sink);
if (splitmux->provided_muxer)
gst_object_unref (splitmux->provided_muxer);
+ if (splitmux->threshold_timecode_str)
+ g_free (splitmux->threshold_timecode_str);
+
g_free (splitmux->location);
/* Make sure to free any un-released contexts */
splitmux->threshold_time = g_value_get_uint64 (value);
GST_OBJECT_UNLOCK (splitmux);
break;
+ case PROP_MAX_SIZE_TIMECODE:
+ GST_OBJECT_LOCK (splitmux);
+ splitmux->threshold_timecode_str = g_value_dup_string (value);
+ GST_OBJECT_UNLOCK (splitmux);
+ break;
+ case PROP_SEND_KEYFRAME_REQUESTS:
+ GST_OBJECT_LOCK (splitmux);
+ splitmux->send_keyframe_requests = g_value_get_boolean (value);
+ GST_OBJECT_UNLOCK (splitmux);
+ break;
+ case PROP_MAX_FILES:
+ GST_OBJECT_LOCK (splitmux);
+ splitmux->max_files = g_value_get_uint (value);
+ GST_OBJECT_UNLOCK (splitmux);
+ break;
case PROP_MUXER_OVERHEAD:
GST_OBJECT_LOCK (splitmux);
splitmux->mux_overhead = g_value_get_double (value);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK:
- GST_SPLITMUX_LOCK (splitmux);
+ GST_OBJECT_LOCK (splitmux);
if (splitmux->provided_sink)
gst_object_unref (splitmux->provided_sink);
- splitmux->provided_sink = g_value_dup_object (value);
- GST_SPLITMUX_UNLOCK (splitmux);
+ splitmux->provided_sink = g_value_get_object (value);
+ gst_object_ref_sink (splitmux->provided_sink);
+ GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER:
- GST_SPLITMUX_LOCK (splitmux);
+ GST_OBJECT_LOCK (splitmux);
if (splitmux->provided_muxer)
gst_object_unref (splitmux->provided_muxer);
- splitmux->provided_muxer = g_value_dup_object (value);
- GST_SPLITMUX_UNLOCK (splitmux);
+ splitmux->provided_muxer = g_value_get_object (value);
+ gst_object_ref_sink (splitmux->provided_muxer);
+ GST_OBJECT_UNLOCK (splitmux);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
g_value_set_uint64 (value, splitmux->threshold_time);
GST_OBJECT_UNLOCK (splitmux);
break;
+ case PROP_MAX_SIZE_TIMECODE:
+ GST_OBJECT_LOCK (splitmux);
+ g_value_set_string (value, splitmux->threshold_timecode_str);
+ GST_OBJECT_UNLOCK (splitmux);
+ break;
+ case PROP_SEND_KEYFRAME_REQUESTS:
+ GST_OBJECT_LOCK (splitmux);
+ g_value_set_boolean (value, splitmux->send_keyframe_requests);
+ GST_OBJECT_UNLOCK (splitmux);
+ break;
+ case PROP_MAX_FILES:
+ GST_OBJECT_LOCK (splitmux);
+ g_value_set_uint (value, splitmux->max_files);
+ GST_OBJECT_UNLOCK (splitmux);
+ break;
case PROP_MUXER_OVERHEAD:
GST_OBJECT_LOCK (splitmux);
g_value_set_double (value, splitmux->mux_overhead);
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SINK:
- GST_SPLITMUX_LOCK (splitmux);
+ GST_OBJECT_LOCK (splitmux);
g_value_set_object (value, splitmux->provided_sink);
- GST_SPLITMUX_UNLOCK (splitmux);
+ GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_MUXER:
- GST_SPLITMUX_LOCK (splitmux);
+ GST_OBJECT_LOCK (splitmux);
g_value_set_object (value, splitmux->provided_muxer);
- GST_SPLITMUX_UNLOCK (splitmux);
+ GST_OBJECT_UNLOCK (splitmux);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
}
}
-static GstPad *
-mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
-{
- gchar *tmp, *sinkname, *srcname;
- GstPad *mq_src;
-
- sinkname = gst_pad_get_name (sink_pad);
- tmp = sinkname + 5;
- srcname = g_strdup_printf ("src_%s", tmp);
-
- mq_src = gst_element_get_static_pad (mq, srcname);
-
- g_free (sinkname);
- g_free (srcname);
-
- return mq_src;
-}
-
-static gboolean
-get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
- GstPad ** src_pad)
+/* Convenience function */
+static inline GstClockTimeDiff
+my_segment_to_running_time (GstSegment * segment, GstClockTime val)
{
- GstPad *mq_sink;
- GstPad *mq_src;
-
- /* Request a pad from multiqueue, then connect this one, then
- * discover the corresponding output pad and return both */
- mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
- if (mq_sink == NULL)
- return FALSE;
-
- mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
- if (mq_src == NULL)
- goto fail;
-
- *sink_pad = mq_sink;
- *src_pad = mq_src;
-
- return TRUE;
-
-fail:
- gst_element_release_request_pad (splitmux->mq, mq_sink);
- return FALSE;
+ GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
+
+ if (GST_CLOCK_TIME_IS_VALID (val)) {
+ gboolean sign =
+ gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
+ if (sign > 0)
+ res = val;
+ else if (sign < 0)
+ res = -val;
+ }
+ return res;
}
static MqStreamCtx *
ctx->splitmux = splitmux;
gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
- ctx->in_running_time = ctx->out_running_time = 0;
+ ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
g_queue_init (&ctx->queued_bufs);
return ctx;
}
static void
mq_stream_ctx_free (MqStreamCtx * ctx)
{
+ if (ctx->q) {
+ g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
+ gst_element_set_locked_state (ctx->q, TRUE);
+ gst_element_set_state (ctx->q, GST_STATE_NULL);
+ gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
+ gst_object_unref (ctx->q);
+ }
+ gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
+ gst_object_unref (ctx->sinkpad);
+ gst_object_unref (ctx->srcpad);
g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
g_queue_clear (&ctx->queued_bufs);
g_free (ctx);
mq_stream_ctx_unref (ctx);
}
+static void
+send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
+{
+ gchar *location = NULL;
+ GstMessage *msg;
+ const gchar *msg_name = opened ?
+ "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
+
+ g_object_get (splitmux->sink, "location", &location, NULL);
+
+ msg = gst_message_new_element (GST_OBJECT (splitmux),
+ gst_structure_new (msg_name,
+ "location", G_TYPE_STRING, location,
+ "running-time", GST_TYPE_CLOCK_TIME,
+ splitmux->reference_ctx->out_running_time, NULL));
+ gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
+
+ g_free (location);
+}
+
/* Called with lock held, drops the lock to send EOS to the
* pad
*/
complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
do {
+ /* When first starting up, the reference stream has to output
+ * the first buffer to prepare the muxer and sink */
+ gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
+
+ if (ctx->flushing
+ || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
+ return;
GST_LOG_OBJECT (ctx->srcpad,
- "Checking running time %" GST_TIME_FORMAT " against max %"
- GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
- GST_TIME_ARGS (splitmux->max_out_running_time));
+ "Checking running time %" GST_STIME_FORMAT " against max %"
+ GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
+ GST_STIME_ARGS (splitmux->max_out_running_time));
- if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
- ctx->out_running_time < splitmux->max_out_running_time)
- return;
+ if (can_output) {
+ if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
+ ctx->out_running_time < splitmux->max_out_running_time) {
+ return;
+ }
- if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
- return;
+ switch (splitmux->output_state) {
+ case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
+ /* We only get here if we've finished outputting a GOP and need to know
+ * what to do next */
+ splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
+ continue;
+
+ case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
+ /* We've reached the max out running_time to get here, so end this file now */
+ if (ctx->out_eos == FALSE) {
+ send_eos (splitmux, ctx);
+ continue;
+ }
+ break;
+ case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
+ if (ctx->is_reference) {
+ /* Special handling on the reference ctx to start new fragments
+ * and collect commands from the command queue */
+ /* drops the splitmux lock briefly: */
+ start_next_fragment (splitmux, ctx);
+ continue;
+ }
+ break;
- if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
- if (ctx->out_eos == FALSE) {
- send_eos (splitmux, ctx);
- continue;
+ case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
+ do {
+ SplitMuxOutputCommand *cmd =
+ g_queue_pop_tail (&splitmux->out_cmd_q);
+ if (cmd != NULL) {
+ /* If we pop the last command, we need to make our queues bigger */
+ if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
+ grow_blocked_queues (splitmux);
+
+ if (cmd->start_new_fragment) {
+ GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
+ splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
+ } else {
+ GST_DEBUG_OBJECT (splitmux,
+ "Got new output cmd for time %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (cmd->max_output_ts));
+
+ /* Extend the output range immediately */
+ splitmux->max_out_running_time = cmd->max_output_ts;
+ splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
+ }
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
+
+ out_cmd_buf_free (cmd);
+ break;
+ } else {
+ GST_SPLITMUX_WAIT_OUTPUT (splitmux);
+ }
+ } while (splitmux->output_state ==
+ SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
+ /* loop and re-check the state */
+ continue;
+ }
+ case SPLITMUX_OUTPUT_STATE_STOPPED:
+ return;
}
- } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
- start_next_fragment (splitmux);
- continue;
}
GST_INFO_OBJECT (ctx->srcpad,
"Sleeping for running time %"
- GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
- GST_TIME_ARGS (ctx->out_running_time),
- GST_TIME_ARGS (splitmux->max_out_running_time));
- ctx->out_blocked = TRUE;
- /* Expand the mq if needed before sleeping */
- check_queue_length (splitmux, ctx);
- GST_SPLITMUX_WAIT (splitmux);
- ctx->out_blocked = FALSE;
+ GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
+ GST_STIME_ARGS (ctx->out_running_time),
+ GST_STIME_ARGS (splitmux->max_out_running_time));
+ GST_SPLITMUX_WAIT_OUTPUT (splitmux);
GST_INFO_OBJECT (ctx->srcpad,
- "Woken for new max running time %" GST_TIME_FORMAT,
- GST_TIME_ARGS (splitmux->max_out_running_time));
- } while (1);
+ "Woken for new max running time %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (splitmux->max_out_running_time));
+ }
+ while (1);
+}
+
+static GstClockTime
+calculate_next_max_timecode (GstSplitMuxSink * splitmux,
+ const GstVideoTimeCode * cur_tc)
+{
+ GstVideoTimeCode *target_tc;
+ GstVideoTimeCodeInterval *tc_inter;
+ GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
+
+ if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
+ return GST_CLOCK_TIME_NONE;
+
+ tc_inter =
+ gst_video_time_code_interval_new_from_string
+ (splitmux->threshold_timecode_str);
+ target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
+ gst_video_time_code_interval_free (tc_inter);
+
+ /* Convert to ns */
+ target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
+ cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
+
+ /* Add fragment_start_time, accounting for wraparound */
+ if (target_tc_time >= cur_tc_time) {
+ next_max_tc_time =
+ target_tc_time - cur_tc_time + splitmux->fragment_start_time;
+ } else {
+ GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
+
+ next_max_tc_time =
+ day_in_ns - cur_tc_time + target_tc_time +
+ splitmux->fragment_start_time;
+ }
+ GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
+ " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
+ GST_TIME_ARGS (cur_tc_time));
+ gst_video_time_code_free (target_tc);
+
+ return next_max_tc_time;
+}
+
+static gboolean
+request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
+{
+ GstEvent *ev;
+ GstClockTime target_time;
+ gboolean timecode_based = FALSE;
+
+ splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
+ if (splitmux->threshold_timecode_str) {
+ GstVideoTimeCodeMeta *tc_meta;
+
+ if (buffer != NULL) {
+ tc_meta = gst_buffer_get_video_time_code_meta (buffer);
+ if (tc_meta) {
+ splitmux->next_max_tc_time =
+ calculate_next_max_timecode (splitmux, &tc_meta->tc);
+ timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
+ }
+ } else {
+ /* This can happen in the presence of GAP events that trigger
+ * a new fragment start */
+ GST_WARNING_OBJECT (splitmux,
+ "No buffer available to calculate next timecode");
+ }
+ }
+
+ if (splitmux->send_keyframe_requests == FALSE
+ || (splitmux->threshold_time == 0 && !timecode_based)
+ || splitmux->threshold_bytes != 0)
+ return TRUE;
+
+ if (timecode_based) {
+ /* We might have rounding errors: aim slightly earlier */
+ target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
+ } else {
+ target_time = splitmux->fragment_start_time + splitmux->threshold_time;
+ }
+ ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
+ GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (target_time));
+ return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
}
static GstPadProbeReturn
GstSplitMuxSink *splitmux = ctx->splitmux;
MqStreamBuf *buf_info = NULL;
- GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
+ GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
/* FIXME: Handle buffer lists, until then make it clear they won't work */
if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
}
if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
GstEvent *event = gst_pad_probe_info_get_event (info);
+ gboolean locked = FALSE;
GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
break;
case GST_EVENT_FLUSH_STOP:
GST_SPLITMUX_LOCK (splitmux);
+ locked = TRUE;
gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
g_queue_clear (&ctx->queued_bufs);
ctx->flushing = FALSE;
- GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_EVENT_FLUSH_START:
GST_SPLITMUX_LOCK (splitmux);
+ locked = TRUE;
GST_LOG_OBJECT (pad, "Flush start");
ctx->flushing = TRUE;
- GST_SPLITMUX_BROADCAST (splitmux);
- GST_SPLITMUX_UNLOCK (splitmux);
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
break;
case GST_EVENT_EOS:
GST_SPLITMUX_LOCK (splitmux);
- if (splitmux->state == SPLITMUX_STATE_STOPPED)
+ locked = TRUE;
+ if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
goto beach;
ctx->out_eos = TRUE;
- GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_EVENT_GAP:{
GstClockTime gap_ts;
+ GstClockTimeDiff rtime;
gst_event_parse_gap (event, &gap_ts, NULL);
if (gap_ts == GST_CLOCK_TIME_NONE)
break;
GST_SPLITMUX_LOCK (splitmux);
+ locked = TRUE;
+
+ if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
+ goto beach;
+
+ /* When we get a gap event on the
+ * reference stream and we're trying to open a
+ * new file, we need to store it until we get
+ * the buffer afterwards
+ */
+ if (ctx->is_reference &&
+ (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
+ GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
+ gst_event_replace (&ctx->pending_gap, event);
+ GST_SPLITMUX_UNLOCK (splitmux);
+ return GST_PAD_PROBE_HANDLED;
+ }
- gap_ts = gst_segment_to_running_time (&ctx->out_segment,
- GST_FORMAT_TIME, gap_ts);
+ rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
- GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
- GST_TIME_ARGS (gap_ts));
+ GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (rtime));
- if (splitmux->state == SPLITMUX_STATE_STOPPED)
+ if (rtime != GST_CLOCK_STIME_NONE) {
+ ctx->out_running_time = rtime;
+ complete_or_wait_on_out (splitmux, ctx);
+ }
+ break;
+ }
+ case GST_EVENT_CUSTOM_DOWNSTREAM:{
+ const GstStructure *s;
+ GstClockTimeDiff ts = 0;
+
+ s = gst_event_get_structure (event);
+ if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
+ break;
+
+ gst_structure_get_int64 (s, "timestamp", &ts);
+
+ GST_SPLITMUX_LOCK (splitmux);
+ locked = TRUE;
+
+ if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
goto beach;
- ctx->out_running_time = gap_ts;
- complete_or_wait_on_out (splitmux, ctx);
+ ctx->out_running_time = ts;
+ if (!ctx->is_reference)
+ complete_or_wait_on_out (splitmux, ctx);
GST_SPLITMUX_UNLOCK (splitmux);
- break;
+ return GST_PAD_PROBE_DROP;
}
default:
break;
}
+
+ /* We need to make sure events aren't passed
+ * until the muxer / sink are ready for it */
+ if (!locked)
+ GST_SPLITMUX_LOCK (splitmux);
+ if (!ctx->is_reference)
+ complete_or_wait_on_out (splitmux, ctx);
+ GST_SPLITMUX_UNLOCK (splitmux);
+
return GST_PAD_PROBE_PASS;
}
goto beach;
/* If we have popped a keyframe, decrement the queued_gop count */
- if (buf_info->keyframe && splitmux->queued_gops > 0)
- splitmux->queued_gops--;
+ if (buf_info->keyframe && splitmux->queued_keyframes > 0)
+ splitmux->queued_keyframes--;
ctx->out_running_time = buf_info->run_ts;
+ ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
GST_LOG_OBJECT (splitmux,
- "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
- " size %" G_GSIZE_FORMAT,
- pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
+ "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
+ " size %" G_GUINT64_FORMAT,
+ pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
complete_or_wait_on_out (splitmux, ctx);
- if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
- splitmux->muxed_out_time < buf_info->run_ts)
- splitmux->muxed_out_time = buf_info->run_ts;
-
splitmux->muxed_out_bytes += buf_info->buf_size;
#ifndef GST_DISABLE_GST_DEBUG
{
GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
- " run ts %" GST_TIME_FORMAT, buf,
- GST_TIME_ARGS (ctx->out_running_time));
+ " run ts %" GST_STIME_FORMAT, buf,
+ GST_STIME_ARGS (ctx->out_running_time));
}
#endif
+ ctx->cur_out_buffer = NULL;
GST_SPLITMUX_UNLOCK (splitmux);
+ /* pending_gap is protected by the STREAM lock */
+ if (ctx->pending_gap) {
+ /* If we previously stored a gap event, send it now */
+ GstPad *peer = gst_pad_get_peer (ctx->srcpad);
+
+ GST_DEBUG_OBJECT (splitmux,
+ "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
+
+ gst_pad_send_event (peer, ctx->pending_gap);
+ ctx->pending_gap = NULL;
+
+ gst_object_unref (peer);
+ }
+
mq_stream_buf_free (buf_info);
return GST_PAD_PROBE_PASS;
gst_pad_sticky_events_foreach (ctx->srcpad,
(GstPadStickyEventsForeachFunction) (resend_sticky), peer);
- /* Clear EOS flag */
- ctx->out_eos = FALSE;
+ /* Clear EOS flag if not actually EOS */
+ ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
gst_object_unref (peer);
}
* a new fragment
*/
static void
-start_next_fragment (GstSplitMuxSink * splitmux)
+start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
+ GstElement *muxer, *sink;
+
/* 1 change to new file */
- gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
- gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
+ splitmux->switching_fragment = TRUE;
- set_next_filename (splitmux);
+ /* We need to drop the splitmux lock to acquire the state lock
+ * here and ensure there's no racy state change going on elsewhere */
+ muxer = gst_object_ref (splitmux->muxer);
+ sink = gst_object_ref (splitmux->active_sink);
- gst_element_sync_state_with_parent (splitmux->active_sink);
- gst_element_sync_state_with_parent (splitmux->muxer);
+ GST_SPLITMUX_UNLOCK (splitmux);
+ GST_STATE_LOCK (splitmux);
- g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
+ gst_element_set_locked_state (muxer, TRUE);
+ gst_element_set_locked_state (sink, TRUE);
+ gst_element_set_state (muxer, GST_STATE_NULL);
+ gst_element_set_state (sink, GST_STATE_NULL);
- /* Switch state and go back to processing */
- splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
- if (!splitmux->video_ctx->in_eos)
- splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
- else
- splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
+ GST_SPLITMUX_LOCK (splitmux);
+ set_next_filename (splitmux, ctx);
+ GST_SPLITMUX_UNLOCK (splitmux);
+
+ gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
+ gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
+ gst_element_set_locked_state (muxer, FALSE);
+ gst_element_set_locked_state (sink, FALSE);
- /* Store the overflow parameters as the basis for the next fragment */
- splitmux->mux_start_time = splitmux->muxed_out_time;
- splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
+ gst_object_unref (sink);
+ gst_object_unref (muxer);
- GST_DEBUG_OBJECT (splitmux,
- "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
- GST_TIME_ARGS (splitmux->max_out_running_time));
+ GST_SPLITMUX_LOCK (splitmux);
+ GST_STATE_UNLOCK (splitmux);
+ splitmux->switching_fragment = FALSE;
+ do_async_done (splitmux);
+
+ splitmux->ready_for_output = TRUE;
- GST_SPLITMUX_BROADCAST (splitmux);
+ g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
+
+ send_fragment_opened_closed_msg (splitmux, TRUE);
+
+ /* FIXME: Is this always the correct next state? */
+ splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
}
static void
case GST_MESSAGE_EOS:
/* If the state is draining out the current file, drop this EOS */
GST_SPLITMUX_LOCK (splitmux);
- if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
- splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
+
+ send_fragment_opened_closed_msg (splitmux, FALSE);
+
+ if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
- splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
- GST_SPLITMUX_BROADCAST (splitmux);
+ splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
gst_message_unref (message);
GST_SPLITMUX_UNLOCK (splitmux);
return;
+ } else {
+ GST_DEBUG_OBJECT (splitmux,
+ "Passing EOS message. Output state %d max_out_running_time %"
+ GST_STIME_FORMAT, splitmux->output_state,
+ GST_STIME_ARGS (splitmux->max_out_running_time));
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
+ case GST_MESSAGE_ASYNC_START:
+ case GST_MESSAGE_ASYNC_DONE:
+ /* Ignore state changes from our children while switching */
+ if (splitmux->switching_fragment) {
+ if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
+ || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
+ GST_LOG_OBJECT (splitmux,
+ "Ignoring state change from child %" GST_PTR_FORMAT
+ " while switching", GST_MESSAGE_SRC (message));
+ gst_message_unref (message);
+ return;
+ }
+ }
+ break;
default:
break;
}
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
}
+static void
+ctx_set_unblock (MqStreamCtx * ctx)
+{
+ ctx->need_unblock = TRUE;
+}
+
/* Called with splitmux lock held */
/* Called when entering ProcessingCompleteGop state
* Assess if mq contents overflowed the current file
static void
handle_gathered_gop (GstSplitMuxSink * splitmux)
{
- GList *cur;
- gsize queued_bytes = 0;
- GstClockTime queued_time = 0;
+ guint64 queued_bytes;
+ GstClockTimeDiff queued_time = 0;
+ GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
+ SplitMuxOutputCommand *cmd;
/* Assess if the multiqueue contents overflowed the current file */
- for (cur = g_list_first (splitmux->contexts);
- cur != NULL; cur = g_list_next (cur)) {
- MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
- if (tmpctx->in_running_time > queued_time)
- queued_time = tmpctx->in_running_time;
- queued_bytes += tmpctx->in_bytes;
- }
+ /* When considering if a newly gathered GOP overflows
+ * the time limit for the file, only consider the running time of the
+ * reference stream. Other streams might have run ahead a little bit,
+ * but extra pieces won't be released to the muxer beyond the reference
+ * stream cut-off anyway - so it forms the limit. */
+ queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
+ queued_time = splitmux->reference_ctx->in_running_time;
+
+ GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
- g_assert (queued_bytes >= splitmux->mux_start_bytes);
- g_assert (queued_time >= splitmux->mux_start_time);
+ g_assert (queued_time >= splitmux->fragment_start_time);
- queued_bytes -= splitmux->mux_start_bytes;
- queued_time -= splitmux->mux_start_time;
+ queued_time -= splitmux->fragment_start_time;
/* Expand queued bytes estimate by muxer overhead */
queued_bytes += (queued_bytes * splitmux->mux_overhead);
- GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
- " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
+ GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
+ " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
+ if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
+ GST_LOG_OBJECT (splitmux,
+ "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
+ GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
+ }
/* Check for overrun - have we output at least one byte and overrun
* either threshold? */
- if ((splitmux->mux_start_bytes < splitmux->muxed_out_bytes) &&
- ((splitmux->threshold_bytes > 0 &&
- queued_bytes >= splitmux->threshold_bytes) ||
- (splitmux->threshold_time > 0 &&
- queued_time >= splitmux->threshold_time))) {
-
- splitmux->state = SPLITMUX_STATE_ENDING_FILE;
-
+ /* Timecode-based threshold accounts for possible rounding errors:
+ * 5us should be bigger than all possible rounding errors but nowhere near
+ * big enough to skip to another frame */
+ if ((splitmux->fragment_total_bytes > 0 &&
+ ((splitmux->threshold_bytes > 0 &&
+ queued_bytes > splitmux->threshold_bytes) ||
+ (splitmux->threshold_time > 0 &&
+ queued_time > splitmux->threshold_time) ||
+ (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
+ splitmux->reference_ctx->in_running_time >
+ splitmux->next_max_tc_time + 5 * GST_USECOND)))) {
+
+ /* Tell the output side to start a new fragment */
GST_INFO_OBJECT (splitmux,
- "mq overflowed since last, draining out. max out TS is %"
- GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
- GST_SPLITMUX_BROADCAST (splitmux);
+ "This GOP (dur %" GST_STIME_FORMAT
+ ") would overflow the fragment, Sending start_new_fragment cmd",
+ GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
+ splitmux->gop_start_time));
+ cmd = out_cmd_buf_new ();
+ cmd->start_new_fragment = TRUE;
+ g_queue_push_head (&splitmux->out_cmd_q, cmd);
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
+
+ new_out_ts = splitmux->reference_ctx->in_running_time;
+ splitmux->fragment_start_time = splitmux->gop_start_time;
+ splitmux->fragment_total_bytes = 0;
+
+ if (request_next_keyframe (splitmux,
+ splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
+ GST_WARNING_OBJECT (splitmux,
+ "Could not request a keyframe. Files may not split at the exact location they should");
+ }
+ gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
+ }
+ /* And set up to collect the next GOP */
+ if (!splitmux->reference_ctx->in_eos) {
+ splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
+ splitmux->gop_start_time = new_out_ts;
} else {
- /* No overflow */
+ /* This is probably already the current state, but just in case: */
+ splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
+ new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */
+ }
+
+ /* And wake all input contexts to send a wake-up event */
+ g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
+ GST_SPLITMUX_BROADCAST_INPUT (splitmux);
+
+ /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
+ splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
+
+ if (splitmux->gop_total_bytes > 0) {
GST_LOG_OBJECT (splitmux,
- "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
- " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
- splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
- queued_bytes, GST_TIME_ARGS (queued_time));
-
- /* Wake everyone up to push this one GOP, then sleep */
- splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
- if (!splitmux->video_ctx->in_eos)
- splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
- else
- splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
-
- GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
- GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
- GST_SPLITMUX_BROADCAST (splitmux);
+ "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
+ " time %" GST_STIME_FORMAT,
+ splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
+
+ /* Send this GOP to the output command queue */
+ cmd = out_cmd_buf_new ();
+ cmd->start_new_fragment = FALSE;
+ cmd->max_output_ts = new_out_ts;
+ GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
+ GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
+ g_queue_push_head (&splitmux->out_cmd_q, cmd);
+
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
}
+ splitmux->gop_total_bytes = 0;
}
/* Called with splitmux lock held */
/* Called from each input pad when it is has all the pieces
- * for a GOP or EOS, starting with the video pad which has set the
+ * for a GOP or EOS, starting with the reference pad which has set the
* splitmux->max_in_running_time
*/
static void
check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
GList *cur;
- gboolean ready = TRUE;
+ GstEvent *event;
+
+ /* On ENDING_FILE, the reference stream sends a command to start a new
+ * fragment, then releases the GOP for output in the new fragment.
+ * If somes streams received no buffer during the last GOP that overran,
+ * because its next buffer has a timestamp bigger than
+ * ctx->max_in_running_time, its queue is empty. In that case the only
+ * way to wakeup the output thread is by injecting an event in the
+ * queue. This usually happen with subtitle streams.
+ * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
+ if (ctx->need_unblock) {
+ GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
+ event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
+ GST_EVENT_TYPE_SERIALIZED,
+ gst_structure_new ("splitmuxsink-unblock", "timestamp",
+ G_TYPE_INT64, splitmux->max_in_running_time, NULL));
+
+ GST_SPLITMUX_UNLOCK (splitmux);
+ gst_pad_send_event (ctx->sinkpad, event);
+ GST_SPLITMUX_LOCK (splitmux);
+
+ ctx->need_unblock = FALSE;
+ GST_SPLITMUX_BROADCAST_INPUT (splitmux);
+ /* state may have changed while we were unlocked. Loop again if so */
+ if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
+ return;
+ }
+
+ if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
+ gboolean ready = TRUE;
- if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
/* Iterate each pad, and check that the input running time is at least
- * up to the video runnning time, and if so handle the collected GOP */
- GST_LOG_OBJECT (splitmux, "Checking GOP collected, ctx %p", ctx);
- for (cur = g_list_first (splitmux->contexts);
- cur != NULL; cur = g_list_next (cur)) {
+ * up to the reference running time, and if so handle the collected GOP */
+ GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
+ GST_STIME_FORMAT " ctx %p",
+ GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
+ for (cur = g_list_first (splitmux->contexts); cur != NULL;
+ cur = g_list_next (cur)) {
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
GST_LOG_OBJECT (splitmux,
- "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
+ "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
" EOS %d", tmpctx, tmpctx->srcpad,
- GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
+ GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
- if (tmpctx->in_running_time < splitmux->max_in_running_time &&
+ if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
+ tmpctx->in_running_time < splitmux->max_in_running_time &&
!tmpctx->in_eos) {
GST_LOG_OBJECT (splitmux,
"Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
}
}
+ /* If upstream reached EOS we are not expecting more data, no need to wait
+ * here. */
+ if (ctx->in_eos)
+ return;
+
/* Some pad is not yet ready, or GOP is being pushed
* either way, sleep and wait to get woken */
- while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
- splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
- !ctx->flushing) {
-
- GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
- splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
- "GOP complete" : "EOF draining", ctx);
- GST_SPLITMUX_WAIT (splitmux);
+ while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
+ !ctx->flushing &&
+ (ctx->in_running_time >= splitmux->max_in_running_time) &&
+ (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
+ GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
+ GST_SPLITMUX_WAIT_INPUT (splitmux);
GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
}
}
-static void
-check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
-{
- GList *cur;
- guint cur_len = g_queue_get_length (&ctx->queued_bufs);
-
- GST_DEBUG_OBJECT (ctx->sinkpad,
- "Checking queue length len %u cur_max %u queued gops %u",
- cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
-
- if (cur_len >= splitmux->mq_max_buffers) {
- gboolean allow_grow = FALSE;
-
- /* If collecting a GOP and this pad might block,
- * and there isn't already a pending GOP in the queue
- * then grow
- */
- if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
- ctx->in_running_time < splitmux->max_in_running_time &&
- splitmux->queued_gops <= 1) {
- allow_grow = TRUE;
- } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
- ctx->is_video) {
- allow_grow = TRUE;
- }
-
- if (!allow_grow) {
- for (cur = g_list_first (splitmux->contexts);
- cur != NULL; cur = g_list_next (cur)) {
- MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
- GST_DEBUG_OBJECT (tmpctx->sinkpad,
- " len %u out_blocked %d",
- g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
- /* If another stream is starving, grow */
- if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
- allow_grow = TRUE;
- }
- }
- }
-
- if (allow_grow) {
- splitmux->mq_max_buffers = cur_len + 1;
-
- GST_INFO_OBJECT (splitmux,
- "Multiqueue overrun - enlarging to %u buffers ctx %p",
- splitmux->mq_max_buffers, ctx);
-
- g_object_set (splitmux->mq, "max-size-buffers",
- splitmux->mq_max_buffers, NULL);
- }
- }
-}
-
static GstPadProbeReturn
handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
{
gboolean loop_again;
gboolean keyframe = FALSE;
- GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
+ GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
/* FIXME: Handle buffer lists, until then make it clear they won't work */
if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
}
if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
GstEvent *event = gst_pad_probe_info_get_event (info);
+
+ GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
+
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEGMENT:
gst_event_copy_segment (event, &ctx->in_segment);
GST_SPLITMUX_LOCK (splitmux);
gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
ctx->in_eos = FALSE;
- ctx->in_bytes = 0;
- ctx->in_running_time = 0;
+ ctx->in_running_time = GST_CLOCK_STIME_NONE;
GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_EVENT_EOS:
GST_SPLITMUX_LOCK (splitmux);
ctx->in_eos = TRUE;
- if (splitmux->state == SPLITMUX_STATE_STOPPED)
+ if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
goto beach;
- if (ctx->is_video) {
- GST_INFO_OBJECT (splitmux, "Got Video EOS. Finishing up");
- /* Act as if this is a new keyframe with infinite timestamp */
- splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
- splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
+ if (ctx->is_reference) {
+ GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
+ /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
+ splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
/* Wake up other input pads to collect this GOP */
- GST_SPLITMUX_BROADCAST (splitmux);
+ GST_SPLITMUX_BROADCAST_INPUT (splitmux);
check_completed_gop (splitmux, ctx);
- } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
+ } else if (splitmux->input_state ==
+ SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
/* If we are waiting for a GOP to be completed (ie, for aux
* pads to catch up), then this pad is complete, so check
* if the whole GOP is.
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
+ case GST_EVENT_GAP:{
+ GstClockTime gap_ts;
+ GstClockTimeDiff rtime;
+
+ gst_event_parse_gap (event, &gap_ts, NULL);
+ if (gap_ts == GST_CLOCK_TIME_NONE)
+ break;
+
+ GST_SPLITMUX_LOCK (splitmux);
+
+ if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
+ goto beach;
+ rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
+
+ GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (rtime));
+
+ if (ctx->is_reference
+ && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
+ splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
+ GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (splitmux->fragment_start_time));
+ /* Also take this as the first start time when starting up,
+ * so that we start counting overflow from the first frame */
+ if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
+ splitmux->max_in_running_time = splitmux->fragment_start_time;
+ }
+
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ }
default:
break;
}
}
buf = gst_pad_probe_info_get_buffer (info);
- ctx->in_running_time = gst_segment_to_running_time (&ctx->in_segment,
- GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buf));
buf_info = mq_stream_buf_new ();
if (GST_BUFFER_PTS_IS_VALID (buf))
else
ts = GST_BUFFER_DTS (buf);
+ GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
+
GST_SPLITMUX_LOCK (splitmux);
- if (splitmux->state == SPLITMUX_STATE_STOPPED)
+ if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
goto beach;
/* If this buffer has a timestamp, advance the input timestamp of the
* stream */
if (GST_CLOCK_TIME_IS_VALID (ts)) {
- GstClockTime running_time =
- gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
- GST_BUFFER_TIMESTAMP (buf));
+ GstClockTimeDiff running_time =
+ my_segment_to_running_time (&ctx->in_segment, ts);
+
+ GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (running_time));
- if (GST_CLOCK_TIME_IS_VALID (running_time) &&
- (ctx->in_running_time == GST_CLOCK_TIME_NONE
- || running_time > ctx->in_running_time))
+ if (GST_CLOCK_STIME_IS_VALID (running_time)
+ && running_time > ctx->in_running_time)
ctx->in_running_time = running_time;
}
/* Try to make sure we have a valid running time */
- if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
+ if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
ctx->in_running_time =
- gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
- ctx->in_segment.start);
+ my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
}
+ GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (ctx->in_running_time));
+
buf_info->run_ts = ctx->in_running_time;
buf_info->buf_size = gst_buffer_get_size (buf);
+ buf_info->duration = GST_BUFFER_DURATION (buf);
+
+ /* initialize fragment_start_time */
+ if (ctx->is_reference
+ && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
+ splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
+ GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (splitmux->fragment_start_time));
+ gst_buffer_replace (&ctx->prev_in_keyframe, buf);
+
+ /* Also take this as the first start time when starting up,
+ * so that we start counting overflow from the first frame */
+ if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
+ splitmux->max_in_running_time = splitmux->fragment_start_time;
+ if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
+ GST_WARNING_OBJECT (splitmux,
+ "Could not request a keyframe. Files may not split at the exact location they should");
+ }
+ gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
+ }
- /* Update total input byte counter for overflow detect */
- ctx->in_bytes += buf_info->buf_size;
-
- GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
- " total in_bytes %" G_GSIZE_FORMAT,
- GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
+ GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
+ " total GOP bytes %" G_GUINT64_FORMAT,
+ GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
loop_again = TRUE;
do {
if (ctx->flushing)
break;
- switch (splitmux->state) {
- case SPLITMUX_STATE_COLLECTING_GOP_START:
- if (ctx->is_video) {
+ switch (splitmux->input_state) {
+ case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
+ if (ctx->is_reference) {
/* If a keyframe, we have a complete GOP */
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
- !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
+ !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
splitmux->max_in_running_time >= ctx->in_running_time) {
/* Pass this buffer through */
loop_again = FALSE;
+ /* Allow other input pads to catch up to here too */
+ splitmux->max_in_running_time = ctx->in_running_time;
+ GST_SPLITMUX_BROADCAST_INPUT (splitmux);
break;
}
GST_INFO_OBJECT (pad,
- "Have keyframe with running time %" GST_TIME_FORMAT,
- GST_TIME_ARGS (ctx->in_running_time));
+ "Have keyframe with running time %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (ctx->in_running_time));
keyframe = TRUE;
- splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
+ splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
splitmux->max_in_running_time = ctx->in_running_time;
/* Wake up other input pads to collect this GOP */
- GST_SPLITMUX_BROADCAST (splitmux);
+ GST_SPLITMUX_BROADCAST_INPUT (splitmux);
check_completed_gop (splitmux, ctx);
+ /* Store this new keyframe to remember the start of GOP */
+ gst_buffer_replace (&ctx->prev_in_keyframe, buf);
} else {
- /* We're still waiting for a keyframe on the video pad, sleep */
+ /* Pass this buffer if the reference ctx is far enough ahead */
+ if (ctx->in_running_time < splitmux->max_in_running_time) {
+ loop_again = FALSE;
+ break;
+ }
+
+ /* We're still waiting for a keyframe on the reference pad, sleep */
GST_LOG_OBJECT (pad, "Sleeping for GOP start");
- GST_SPLITMUX_WAIT (splitmux);
- GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
- splitmux->state);
+ GST_SPLITMUX_WAIT_INPUT (splitmux);
+ GST_LOG_OBJECT (pad,
+ "Done sleeping for GOP start input state now %d",
+ splitmux->input_state);
}
break;
- case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
- /* After a GOP start is found, this buffer might complete the GOP */
+ case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
+ /* We're collecting a GOP. If this is the reference context,
+ * we need to check if this is a keyframe that marks the start
+ * of the next GOP. If it is, it marks the end of the GOP we're
+ * collecting, so sleep and wait until all the other pads also
+ * reach that timestamp - at which point, we have an entire GOP
+ * and either go to ENDING_FILE or release this GOP to the muxer and
+ * go back to COLLECT_GOP_START. */
+
/* If we overran the target timestamp, it might be time to process
* the GOP, otherwise bail out for more data
*/
GST_LOG_OBJECT (pad,
- "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
- GST_TIME_ARGS (ctx->in_running_time),
- GST_TIME_ARGS (splitmux->max_in_running_time));
+ "Checking TS %" GST_STIME_FORMAT " against max %"
+ GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
+ GST_STIME_ARGS (splitmux->max_in_running_time));
if (ctx->in_running_time < splitmux->max_in_running_time) {
loop_again = FALSE;
"Collected last packet of GOP. Checking other pads");
check_completed_gop (splitmux, ctx);
break;
- case SPLITMUX_STATE_ENDING_FILE:
- case SPLITMUX_STATE_START_NEXT_FRAGMENT:
- /* A fragment is ending, wait until that's done before continuing */
- GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
- GST_SPLITMUX_WAIT (splitmux);
- GST_DEBUG_OBJECT (pad,
- "Done sleeping for fragment restart state now %d", splitmux->state);
+ }
+ case SPLITMUX_INPUT_STATE_FINISHING_UP:
+ loop_again = FALSE;
break;
default:
loop_again = FALSE;
break;
}
- } while (loop_again);
+ }
+ while (loop_again);
if (keyframe) {
- splitmux->queued_gops++;
+ splitmux->queued_keyframes++;
buf_info->keyframe = TRUE;
}
+ /* Update total input byte counter for overflow detect */
+ splitmux->gop_total_bytes += buf_info->buf_size;
+
/* Now add this buffer to the queue just before returning */
g_queue_push_head (&ctx->queued_bufs, buf_info);
- /* Check the buffer will fit in the mq */
- check_queue_length (splitmux, ctx);
-
GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
- " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
+ " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
GST_SPLITMUX_UNLOCK (splitmux);
return GST_PAD_PROBE_PASS;
return GST_PAD_PROBE_PASS;
}
+static void
+grow_blocked_queues (GstSplitMuxSink * splitmux)
+{
+ GList *cur;
+
+ /* Scan other queues for full-ness and grow them */
+ for (cur = g_list_first (splitmux->contexts);
+ cur != NULL; cur = g_list_next (cur)) {
+ MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
+ guint cur_limit;
+ guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
+
+ g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
+ GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
+
+ if (cur_len >= cur_limit) {
+ cur_limit = cur_len + 1;
+ GST_DEBUG_OBJECT (tmpctx->q,
+ "Queue overflowed and needs enlarging. Growing to %u buffers",
+ cur_limit);
+ g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
+ }
+ }
+}
+
+static void
+handle_q_underrun (GstElement * q, gpointer user_data)
+{
+ MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
+ GstSplitMuxSink *splitmux = ctx->splitmux;
+
+ GST_SPLITMUX_LOCK (splitmux);
+ GST_DEBUG_OBJECT (q,
+ "Queue reported underrun with %d keyframes and %d cmds enqueued",
+ splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
+ grow_blocked_queues (splitmux);
+ GST_SPLITMUX_UNLOCK (splitmux);
+}
+
+static void
+handle_q_overrun (GstElement * q, gpointer user_data)
+{
+ MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
+ GstSplitMuxSink *splitmux = ctx->splitmux;
+ gboolean allow_grow = FALSE;
+
+ GST_SPLITMUX_LOCK (splitmux);
+ GST_DEBUG_OBJECT (q,
+ "Queue reported overrun with %d keyframes and %d cmds enqueued",
+ splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
+
+ if (splitmux->queued_keyframes < 2) {
+ /* Less than a full GOP queued, grow the queue */
+ allow_grow = TRUE;
+ } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
+ allow_grow = TRUE;
+ } else {
+ /* If another queue is starved, grow */
+ GList *cur;
+ for (cur = g_list_first (splitmux->contexts);
+ cur != NULL; cur = g_list_next (cur)) {
+ MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
+ if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
+ allow_grow = TRUE;
+ }
+ }
+ }
+ GST_SPLITMUX_UNLOCK (splitmux);
+
+ if (allow_grow) {
+ guint cur_limit;
+
+ g_object_get (q, "max-size-buffers", &cur_limit, NULL);
+ cur_limit++;
+
+ GST_DEBUG_OBJECT (q,
+ "Queue overflowed and needs enlarging. Growing to %u buffers",
+ cur_limit);
+
+ g_object_set (q, "max-size-buffers", cur_limit, NULL);
+ }
+}
+
static GstPad *
gst_splitmux_sink_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
GstPadTemplate *mux_template = NULL;
GstPad *res = NULL;
- GstPad *mq_sink, *mq_src;
+ GstElement *q;
+ GstPad *q_sink = NULL, *q_src = NULL;
gchar *gname;
gboolean is_video = FALSE;
MqStreamCtx *ctx;
GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
GST_SPLITMUX_LOCK (splitmux);
- if (!create_elements (splitmux))
+ if (!create_muxer (splitmux))
goto fail;
if (templ->name_template) {
if (g_str_equal (templ->name_template, "video")) {
+ if (splitmux->have_video)
+ goto already_have_video;
+
/* FIXME: Look for a pad template with matching caps, rather than by name */
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), "video_%u");
+
+ /* Fallback to find sink pad templates named 'video' (flvmux) */
+ if (!mux_template) {
+ mux_template =
+ gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
+ (splitmux->muxer), "video");
+ }
is_video = TRUE;
name = NULL;
} else {
mux_template =
gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
(splitmux->muxer), templ->name_template);
+
+ /* Fallback to find sink pad templates named 'audio' (flvmux) */
+ if (!mux_template) {
+ mux_template =
+ gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
+ (splitmux->muxer), "audio");
+ }
+ }
+ if (mux_template == NULL) {
+ /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
+ mux_template =
+ gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
+ (splitmux->muxer), "sink_%d");
}
}
else
gname = g_strdup (name);
- if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
- gst_element_release_request_pad (splitmux->muxer, res);
- gst_object_unref (GST_OBJECT (res));
+ if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
goto fail;
- }
- if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
+ gst_element_set_state (q, GST_STATE_TARGET (splitmux));
+
+ g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
+ "max-size-buffers", 5, NULL);
+
+ q_sink = gst_element_get_static_pad (q, "sink");
+ q_src = gst_element_get_static_pad (q, "src");
+
+ if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
gst_element_release_request_pad (splitmux->muxer, res);
gst_object_unref (GST_OBJECT (res));
- gst_element_release_request_pad (splitmux->mq, mq_sink);
- gst_object_unref (GST_OBJECT (mq_sink));
goto fail;
}
gst_object_unref (GST_OBJECT (res));
ctx = mq_stream_ctx_new (splitmux);
- ctx->is_video = is_video;
- ctx->srcpad = mq_src;
- ctx->sinkpad = mq_sink;
+ /* Context holds a ref: */
+ ctx->q = gst_object_ref (q);
+ ctx->srcpad = q_src;
+ ctx->sinkpad = q_sink;
+ ctx->q_overrun_id =
+ g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
+ g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
mq_stream_ctx_ref (ctx);
ctx->src_pad_block_id =
- gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
+ gst_pad_add_probe (q_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
(GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
_pad_block_destroy_src_notify);
- if (is_video)
- splitmux->video_ctx = ctx;
+ if (is_video && splitmux->reference_ctx != NULL) {
+ splitmux->reference_ctx->is_reference = FALSE;
+ splitmux->reference_ctx = NULL;
+ }
+ if (splitmux->reference_ctx == NULL) {
+ splitmux->reference_ctx = ctx;
+ ctx->is_reference = TRUE;
+ }
- res = gst_ghost_pad_new (gname, mq_sink);
+ res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
mq_stream_ctx_ref (ctx);
ctx->sink_pad_block_id =
- gst_pad_add_probe (res, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
+ gst_pad_add_probe (q_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
(GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
_pad_block_destroy_sink_notify);
GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
- " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
+ " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
g_free (gname);
- gst_object_unref (mq_sink);
- gst_object_unref (mq_src);
+ if (is_video)
+ splitmux->have_video = TRUE;
gst_pad_set_active (res, TRUE);
gst_element_add_pad (element, res);
+
GST_SPLITMUX_UNLOCK (splitmux);
return res;
fail:
GST_SPLITMUX_UNLOCK (splitmux);
+
+ if (q_sink)
+ gst_object_unref (q_sink);
+ if (q_src)
+ gst_object_unref (q_src);
+ return NULL;
+already_have_video:
+ GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
+ GST_SPLITMUX_UNLOCK (splitmux);
return NULL;
}
gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
{
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
- GstPad *mqsink, *mqsrc, *muxpad;
+ GstPad *muxpad = NULL;
MqStreamCtx *ctx =
(MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
GST_SPLITMUX_LOCK (splitmux);
- if (splitmux->muxer == NULL || splitmux->mq == NULL)
+ if (splitmux->muxer == NULL)
goto fail; /* Elements don't exist yet - nothing to release */
GST_INFO_OBJECT (pad, "releasing request pad");
- mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
- mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
- muxpad = gst_pad_get_peer (mqsrc);
+ muxpad = gst_pad_get_peer (ctx->srcpad);
/* Remove the context from our consideration */
splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
/* Can release the context now */
mq_stream_ctx_unref (ctx);
-
- /* Release and free the mq input */
- gst_element_release_request_pad (splitmux->mq, mqsink);
+ if (ctx == splitmux->reference_ctx)
+ splitmux->reference_ctx = NULL;
/* Release and free the muxer input */
- gst_element_release_request_pad (splitmux->muxer, muxpad);
+ if (muxpad) {
+ gst_element_release_request_pad (splitmux->muxer, muxpad);
+ gst_object_unref (muxpad);
+ }
- gst_object_unref (mqsink);
- gst_object_unref (mqsrc);
- gst_object_unref (muxpad);
+ if (GST_PAD_PAD_TEMPLATE (pad) &&
+ g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
+ (pad)), "video"))
+ splitmux->have_video = FALSE;
gst_element_remove_pad (element, pad);
+ /* Reset the internal elements only after all request pads are released */
+ if (splitmux->contexts == NULL)
+ gst_splitmux_reset (splitmux);
+
fail:
GST_SPLITMUX_UNLOCK (splitmux);
}
static GstElement *
create_element (GstSplitMuxSink * splitmux,
- const gchar * factory, const gchar * name)
+ const gchar * factory, const gchar * name, gboolean locked)
{
GstElement *ret = gst_element_factory_make (factory, name);
if (ret == NULL) {
return NULL;
}
+ if (locked) {
+ /* Ensure the sink starts in locked state and NULL - it will be changed
+ * by the filename setting code */
+ gst_element_set_locked_state (ret, TRUE);
+ gst_element_set_state (ret, GST_STATE_NULL);
+ }
+
if (!gst_bin_add (GST_BIN (splitmux), ret)) {
g_warning ("Could not add %s element - splitmuxsink will not work", name);
gst_object_unref (ret);
}
static gboolean
-create_elements (GstSplitMuxSink * splitmux)
+create_muxer (GstSplitMuxSink * splitmux)
{
/* Create internal elements */
- if (splitmux->mq == NULL) {
- if ((splitmux->mq =
- create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
- goto fail;
+ if (splitmux->muxer == NULL) {
+ GstElement *provided_muxer = NULL;
- splitmux->mq_max_buffers = 5;
- /* No bytes or time limit, we limit buffers manually */
- g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
- (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
- }
+ GST_OBJECT_LOCK (splitmux);
+ if (splitmux->provided_muxer != NULL)
+ provided_muxer = gst_object_ref (splitmux->provided_muxer);
+ GST_OBJECT_UNLOCK (splitmux);
- if (splitmux->muxer == NULL) {
- if (splitmux->provided_muxer == NULL) {
+ if (provided_muxer == NULL) {
if ((splitmux->muxer =
- create_element (splitmux, "mp4mux", "muxer")) == NULL)
+ create_element (splitmux, "mp4mux", "muxer", FALSE)) == NULL)
goto fail;
} else {
- splitmux->muxer = splitmux->provided_muxer;
- if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_muxer)) {
+ /* Ensure it's not in locked state (we might be reusing an old element) */
+ gst_element_set_locked_state (provided_muxer, FALSE);
+ if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
g_warning ("Could not add muxer element - splitmuxsink will not work");
+ gst_object_unref (provided_muxer);
goto fail;
}
+
+ splitmux->muxer = provided_muxer;
+ gst_object_unref (provided_muxer);
}
}
if (!GST_IS_BIN (e))
return e;
+ if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
+ return e;
+
iter = gst_bin_iterate_sinks (GST_BIN (e));
while (!done) {
switch (gst_iterator_next (iter, &data)) {
static gboolean
create_sink (GstSplitMuxSink * splitmux)
{
- g_return_val_if_fail (splitmux->active_sink == NULL, TRUE);
+ GstElement *provided_sink = NULL;
- if (splitmux->provided_sink == NULL) {
- if ((splitmux->sink =
- create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
- goto fail;
- splitmux->active_sink = splitmux->sink;
- } else {
- if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_sink)) {
- g_warning ("Could not add sink elements - splitmuxsink will not work");
- goto fail;
+ if (splitmux->active_sink == NULL) {
+
+ GST_OBJECT_LOCK (splitmux);
+ if (splitmux->provided_sink != NULL)
+ provided_sink = gst_object_ref (splitmux->provided_sink);
+ GST_OBJECT_UNLOCK (splitmux);
+
+ if (provided_sink == NULL) {
+ if ((splitmux->sink =
+ create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
+ goto fail;
+ splitmux->active_sink = splitmux->sink;
+ } else {
+ /* Ensure the sink starts in locked state and NULL - it will be changed
+ * by the filename setting code */
+ gst_element_set_locked_state (provided_sink, TRUE);
+ gst_element_set_state (provided_sink, GST_STATE_NULL);
+ if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
+ g_warning ("Could not add sink elements - splitmuxsink will not work");
+ gst_object_unref (provided_sink);
+ goto fail;
+ }
+
+ splitmux->active_sink = provided_sink;
+
+ /* The bin holds a ref now, we can drop our tmp ref */
+ gst_object_unref (provided_sink);
+
+ /* Find the sink element */
+ splitmux->sink = find_sink (splitmux->active_sink);
+ if (splitmux->sink == NULL) {
+ g_warning
+ ("Could not locate sink element in provided sink - splitmuxsink will not work");
+ goto fail;
+ }
}
- splitmux->active_sink = splitmux->provided_sink;
+#if 1
+ if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
+ "async") != NULL) {
+ /* async child elements are causing state change races and weird
+ * failures, so let's try and turn that off */
+ g_object_set (splitmux->sink, "async", FALSE, NULL);
+ }
+#endif
- /* Find the sink element */
- splitmux->sink = find_sink (splitmux->active_sink);
- if (splitmux->sink == NULL) {
- g_warning
- ("Could not locate sink element in provided sink - splitmuxsink will not work");
+ if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
+ g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
goto fail;
}
}
- if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
- g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
- goto fail;
- }
-
return TRUE;
fail:
return FALSE;
#pragma GCC diagnostic ignored "-Wformat-nonliteral"
#endif
static void
-set_next_filename (GstSplitMuxSink * splitmux)
+set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
- if (splitmux->location) {
- gchar *fname;
+ gchar *fname = NULL;
+ GstSample *sample;
+ GstCaps *caps;
+
+ gst_splitmux_sink_ensure_max_files (splitmux);
- fname = g_strdup_printf (splitmux->location, splitmux->fragment_id);
+ if (ctx->cur_out_buffer == NULL) {
+ GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
+ }
+
+ caps = gst_pad_get_current_caps (ctx->srcpad);
+ sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
+ g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
+ splitmux->fragment_id, sample, &fname);
+ gst_sample_unref (sample);
+ if (caps)
+ gst_caps_unref (caps);
+
+ if (fname == NULL) {
+ /* Fallback to the old signal if the new one returned nothing */
+ g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
+ splitmux->fragment_id, &fname);
+ }
+
+ if (!fname)
+ fname = splitmux->location ?
+ g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
+
+ if (fname) {
GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
g_object_set (splitmux->sink, "location", fname, NULL);
g_free (fname);
}
}
+static void
+do_async_start (GstSplitMuxSink * splitmux)
+{
+ GstMessage *message;
+
+ if (!splitmux->need_async_start) {
+ GST_INFO_OBJECT (splitmux, "no async_start needed");
+ return;
+ }
+
+ splitmux->async_pending = TRUE;
+
+ GST_INFO_OBJECT (splitmux, "Sending async_start message");
+ message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
+ GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
+ (splitmux), message);
+}
+
+static void
+do_async_done (GstSplitMuxSink * splitmux)
+{
+ GstMessage *message;
+
+ if (splitmux->async_pending) {
+ GST_INFO_OBJECT (splitmux, "Sending async_done message");
+ message =
+ gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
+ GST_CLOCK_TIME_NONE);
+ GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
+ (splitmux), message);
+
+ splitmux->async_pending = FALSE;
+ }
+
+ splitmux->need_async_start = FALSE;
+}
+
static GstStateChangeReturn
gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
{
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:{
GST_SPLITMUX_LOCK (splitmux);
- if (!create_elements (splitmux) || !create_sink (splitmux)) {
+ if (!create_muxer (splitmux) || !create_sink (splitmux)) {
ret = GST_STATE_CHANGE_FAILURE;
GST_SPLITMUX_UNLOCK (splitmux);
goto beach;
}
GST_SPLITMUX_UNLOCK (splitmux);
splitmux->fragment_id = 0;
- set_next_filename (splitmux);
break;
}
case GST_STATE_CHANGE_READY_TO_PAUSED:{
GST_SPLITMUX_LOCK (splitmux);
/* Start by collecting one input on each pad */
- splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
- splitmux->max_in_running_time = 0;
- splitmux->muxed_out_time = splitmux->mux_start_time = 0;
- splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
+ splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
+ splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
+ splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
+ splitmux->gop_start_time = splitmux->fragment_start_time =
+ GST_CLOCK_STIME_NONE;
+ splitmux->muxed_out_bytes = 0;
+ splitmux->ready_for_output = FALSE;
GST_SPLITMUX_UNLOCK (splitmux);
break;
}
case GST_STATE_CHANGE_PAUSED_TO_READY:
case GST_STATE_CHANGE_READY_TO_NULL:
GST_SPLITMUX_LOCK (splitmux);
- splitmux->state = SPLITMUX_STATE_STOPPED;
+ splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
+ splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
/* Wake up any blocked threads */
GST_LOG_OBJECT (splitmux,
"State change -> NULL or READY. Waking threads");
- GST_SPLITMUX_BROADCAST (splitmux);
+ GST_SPLITMUX_BROADCAST_INPUT (splitmux);
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
break;
default:
goto beach;
switch (transition) {
+ case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+ splitmux->need_async_start = TRUE;
+ break;
+ case GST_STATE_CHANGE_READY_TO_PAUSED:{
+ /* Change state async, because our child sink might not
+ * be ready to do that for us yet if it's state is still locked */
+
+ splitmux->need_async_start = TRUE;
+ /* we want to go async to PAUSED until we managed to configure and add the
+ * sink */
+ GST_SPLITMUX_LOCK (splitmux);
+ do_async_start (splitmux);
+ GST_SPLITMUX_UNLOCK (splitmux);
+ ret = GST_STATE_CHANGE_ASYNC;
+ break;
+ }
case GST_STATE_CHANGE_READY_TO_NULL:
GST_SPLITMUX_LOCK (splitmux);
splitmux->fragment_id = 0;
- gst_splitmux_reset (splitmux);
+ /* Reset internal elements only if no pad contexts are using them */
+ if (splitmux->contexts == NULL)
+ gst_splitmux_reset (splitmux);
+ do_async_done (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
break;
default:
}
beach:
-
if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
ret == GST_STATE_CHANGE_FAILURE) {
/* Cleanup elements on failed transition out of NULL */
gst_splitmux_reset (splitmux);
+ GST_SPLITMUX_LOCK (splitmux);
+ do_async_done (splitmux);
+ GST_SPLITMUX_UNLOCK (splitmux);
}
return ret;
}
return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
GST_TYPE_SPLITMUX_SINK);
}
+
+static void
+gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
+{
+ if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
+ splitmux->fragment_id = 0;
+ }
+}