#define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
#define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
-#define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
-#define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
+#define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
+#define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
+
+#define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
+#define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
enum
{
G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
_do_init ());
-static gboolean create_elements (GstSplitMuxSink * splitmux);
+static gboolean create_muxer (GstSplitMuxSink * splitmux);
static gboolean create_sink (GstSplitMuxSink * splitmux);
static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void bus_handler (GstBin * bin, GstMessage * msg);
static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
-static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
static void mq_stream_ctx_unref (MqStreamCtx * ctx);
+static void grow_blocked_queues (GstSplitMuxSink * splitmux);
static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
+static GstElement *create_element (GstSplitMuxSink * splitmux,
+ const gchar * factory, const gchar * name, gboolean locked);
static void do_async_done (GstSplitMuxSink * splitmux);
g_slice_free (MqStreamBuf, data);
}
+static SplitMuxOutputCommand *
+out_cmd_buf_new (void)
+{
+ return g_slice_new0 (SplitMuxOutputCommand);
+}
+
+static void
+out_cmd_buf_free (SplitMuxOutputCommand * data)
+{
+ g_slice_free (SplitMuxOutputCommand, data);
+}
+
static void
gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
{
gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
{
g_mutex_init (&splitmux->lock);
- g_cond_init (&splitmux->data_cond);
+ g_cond_init (&splitmux->input_cond);
+ g_cond_init (&splitmux->output_cond);
+ g_queue_init (&splitmux->out_cmd_q);
splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
splitmux->max_files = DEFAULT_MAX_FILES;
splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
- splitmux->update_mux_start_time = FALSE;
-
GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
}
static void
gst_splitmux_reset (GstSplitMuxSink * splitmux)
{
- if (splitmux->mq) {
- gst_element_set_locked_state (splitmux->mq, TRUE);
- gst_element_set_state (splitmux->mq, GST_STATE_NULL);
- gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
- }
if (splitmux->muxer) {
gst_element_set_locked_state (splitmux->muxer, TRUE);
gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
}
- splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
- NULL;
+ splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
}
static void
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
/* Calling parent dispose invalidates all child pointers */
- splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
- NULL;
+ splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
G_OBJECT_CLASS (parent_class)->dispose (object);
}
gst_splitmux_sink_finalize (GObject * object)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
- g_cond_clear (&splitmux->data_cond);
+ g_cond_clear (&splitmux->input_cond);
+ g_cond_clear (&splitmux->output_cond);
g_mutex_clear (&splitmux->lock);
+ g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
+ g_queue_clear (&splitmux->out_cmd_q);
+
if (splitmux->provided_sink)
gst_object_unref (splitmux->provided_sink);
if (splitmux->provided_muxer)
return res;
}
-static GstPad *
-mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
-{
- gchar *tmp, *sinkname, *srcname;
- GstPad *mq_src;
-
- sinkname = gst_pad_get_name (sink_pad);
- tmp = sinkname + 5;
- srcname = g_strdup_printf ("src_%s", tmp);
-
- mq_src = gst_element_get_static_pad (mq, srcname);
-
- g_free (sinkname);
- g_free (srcname);
-
- return mq_src;
-}
-
-static gboolean
-get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
- GstPad ** src_pad)
-{
- GstPad *mq_sink;
- GstPad *mq_src;
-
- /* Request a pad from multiqueue, then connect this one, then
- * discover the corresponding output pad and return both */
- mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
- if (mq_sink == NULL)
- return FALSE;
-
- mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
- if (mq_src == NULL)
- goto fail;
-
- *sink_pad = mq_sink;
- *src_pad = mq_src;
-
- return TRUE;
-
-fail:
- gst_element_release_request_pad (splitmux->mq, mq_sink);
- return FALSE;
-}
-
static MqStreamCtx *
mq_stream_ctx_new (GstSplitMuxSink * splitmux)
{
static void
mq_stream_ctx_free (MqStreamCtx * ctx)
{
+ if (ctx->q) {
+ g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
+ gst_element_set_locked_state (ctx->q, TRUE);
+ gst_element_set_state (ctx->q, GST_STATE_NULL);
+ gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
+ gst_object_unref (ctx->q);
+ }
+ gst_object_unref (ctx->sinkpad);
+ gst_object_unref (ctx->srcpad);
g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
g_queue_clear (&ctx->queued_bufs);
g_free (ctx);
complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
do {
+ /* When first starting up, the reference stream has to output
+ * the first buffer to prepare the muxer and sink */
+ gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
+
+ if (ctx->flushing
+ || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
+ return;
GST_LOG_OBJECT (ctx->srcpad,
"Checking running time %" GST_STIME_FORMAT " against max %"
GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
GST_STIME_ARGS (splitmux->max_out_running_time));
- if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
- ctx->out_running_time < splitmux->max_out_running_time) {
- splitmux->have_muxed_something = TRUE;
- return;
- }
+ if (can_output) {
+ if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
+ ctx->out_running_time < splitmux->max_out_running_time) {
+ return;
+ }
- if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
- return;
+ switch (splitmux->output_state) {
+ case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
+ /* We only get here if we've finished outputting a GOP and need to know
+ * what to do next */
+ splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
+ continue;
+
+ case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
+ /* We've reached the max out running_time to get here, so end this file now */
+ if (ctx->out_eos == FALSE) {
+ send_eos (splitmux, ctx);
+ continue;
+ }
+ break;
+ case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
+ if (ctx->is_reference) {
+ /* Special handling on the reference ctx to start new fragments
+ * and collect commands from the command queue */
+ /* drops the splitmux lock briefly: */
+ start_next_fragment (splitmux, ctx);
+ continue;
+ }
+ break;
- if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
- if (ctx->out_eos == FALSE) {
- send_eos (splitmux, ctx);
- continue;
+ case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
+ do {
+ SplitMuxOutputCommand *cmd =
+ g_queue_pop_tail (&splitmux->out_cmd_q);
+ if (cmd != NULL) {
+ /* If we pop the last command, we need to make our queues bigger */
+ if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
+ grow_blocked_queues (splitmux);
+
+ if (cmd->start_new_fragment) {
+ GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
+ splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
+ } else {
+ GST_DEBUG_OBJECT (splitmux,
+ "Got new output cmd for time %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (cmd->max_output_ts));
+
+ /* Extend the output range immediately */
+ splitmux->max_out_running_time = cmd->max_output_ts;
+ splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
+ }
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
+
+ out_cmd_buf_free (cmd);
+ break;
+ } else {
+ GST_SPLITMUX_WAIT_OUTPUT (splitmux);
+ }
+ } while (splitmux->output_state ==
+ SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
+ /* loop and re-check the state */
+ continue;
+ }
+ case SPLITMUX_OUTPUT_STATE_STOPPED:
+ return;
}
- } else if (ctx->is_reference
- && splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
- start_next_fragment (splitmux, ctx);
- continue;
}
GST_INFO_OBJECT (ctx->srcpad,
"Sleeping for running time %"
- GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")",
+ GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
GST_STIME_ARGS (ctx->out_running_time),
GST_STIME_ARGS (splitmux->max_out_running_time));
- ctx->out_blocked = TRUE;
- /* Expand the mq if needed before sleeping */
- check_queue_length (splitmux, ctx);
- GST_SPLITMUX_WAIT (splitmux);
- ctx->out_blocked = FALSE;
+ GST_SPLITMUX_WAIT_OUTPUT (splitmux);
GST_INFO_OBJECT (ctx->srcpad,
"Woken for new max running time %" GST_STIME_FORMAT,
GST_STIME_ARGS (splitmux->max_out_running_time));
- } while (1);
+ }
+ while (1);
}
static gboolean
{
GstEvent *ev;
- if (splitmux->send_keyframe_requests == FALSE || splitmux->threshold_time == 0
- || splitmux->threshold_bytes != 0)
+ if (splitmux->send_keyframe_requests == FALSE
+ || splitmux->threshold_time == 0 || splitmux->threshold_bytes != 0)
return TRUE;
- ev = gst_video_event_new_upstream_force_key_unit (splitmux->mux_start_time +
- splitmux->threshold_time, TRUE, 0);
+ ev = gst_video_event_new_upstream_force_key_unit
+ (splitmux->fragment_start_time + splitmux->threshold_time, TRUE, 0);
GST_DEBUG_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
- GST_TIME_ARGS (splitmux->mux_start_time + splitmux->threshold_time));
+ GST_TIME_ARGS (splitmux->fragment_start_time + splitmux->threshold_time));
return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
}
}
if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
GstEvent *event = gst_pad_probe_info_get_event (info);
+ gboolean locked = FALSE;
GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
break;
case GST_EVENT_FLUSH_STOP:
GST_SPLITMUX_LOCK (splitmux);
+ locked = TRUE;
gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
g_queue_clear (&ctx->queued_bufs);
ctx->flushing = FALSE;
- GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_EVENT_FLUSH_START:
GST_SPLITMUX_LOCK (splitmux);
+ locked = TRUE;
GST_LOG_OBJECT (pad, "Flush start");
ctx->flushing = TRUE;
- GST_SPLITMUX_BROADCAST (splitmux);
- GST_SPLITMUX_UNLOCK (splitmux);
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
break;
case GST_EVENT_EOS:
GST_SPLITMUX_LOCK (splitmux);
- if (splitmux->state == SPLITMUX_STATE_STOPPED)
+ locked = TRUE;
+ if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
goto beach;
ctx->out_eos = TRUE;
- GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_EVENT_GAP:{
GstClockTime gap_ts;
break;
GST_SPLITMUX_LOCK (splitmux);
+ locked = TRUE;
- rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
-
- GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
- GST_STIME_ARGS (rtime));
-
- if (splitmux->state == SPLITMUX_STATE_STOPPED)
+ if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
goto beach;
/* When we get a gap event on the
* the buffer afterwards
*/
if (ctx->is_reference &&
- (splitmux->opening_first_fragment ||
- splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT)) {
+ (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
gst_event_replace (&ctx->pending_gap, event);
GST_SPLITMUX_UNLOCK (splitmux);
return GST_PAD_PROBE_HANDLED;
}
+ rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
+
+ GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (rtime));
+
if (rtime != GST_CLOCK_STIME_NONE) {
ctx->out_running_time = rtime;
complete_or_wait_on_out (splitmux, ctx);
}
- GST_SPLITMUX_UNLOCK (splitmux);
break;
}
case GST_EVENT_CUSTOM_DOWNSTREAM:{
gst_structure_get_int64 (s, "timestamp", &ts);
GST_SPLITMUX_LOCK (splitmux);
+ locked = TRUE;
- if (splitmux->state == SPLITMUX_STATE_STOPPED)
+ if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
goto beach;
ctx->out_running_time = ts;
if (!ctx->is_reference)
default:
break;
}
+
+ /* We need to make sure events aren't passed
+ * until the muxer / sink are ready for it */
+ if (!locked)
+ GST_SPLITMUX_LOCK (splitmux);
+ if (!ctx->is_reference)
+ complete_or_wait_on_out (splitmux, ctx);
+ GST_SPLITMUX_UNLOCK (splitmux);
+
return GST_PAD_PROBE_PASS;
}
goto beach;
/* If we have popped a keyframe, decrement the queued_gop count */
- if (buf_info->keyframe && splitmux->queued_gops > 0)
- splitmux->queued_gops--;
+ if (buf_info->keyframe && splitmux->queued_keyframes > 0)
+ splitmux->queued_keyframes--;
ctx->out_running_time = buf_info->run_ts;
ctx->cur_buffer = gst_pad_probe_info_get_buffer (info);
" size %" G_GUINT64_FORMAT,
pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
- if (ctx->is_reference && splitmux->opening_first_fragment) {
- if (request_next_keyframe (splitmux) == FALSE)
- GST_WARNING_OBJECT (splitmux,
- "Could not request a keyframe. Files may not split at the exact location they should");
- start_next_fragment (splitmux, ctx);
- splitmux->opening_first_fragment = FALSE;
- }
-
complete_or_wait_on_out (splitmux, ctx);
- if (splitmux->update_mux_start_time && ctx->is_reference) {
- splitmux->mux_start_time = buf_info->run_ts;
- splitmux->update_mux_start_time = FALSE;
- if (request_next_keyframe (splitmux) == FALSE)
- GST_WARNING_OBJECT (splitmux,
- "Could not request a keyframe. Files may not split at the exact location they should");
- }
-
if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE ||
splitmux->muxed_out_time < buf_info->run_ts)
splitmux->muxed_out_time = buf_info->run_ts;
static void
start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
+ GstElement *muxer, *sink;
+
/* 1 change to new file */
splitmux->switching_fragment = TRUE;
- gst_element_set_locked_state (splitmux->muxer, TRUE);
- gst_element_set_locked_state (splitmux->active_sink, TRUE);
- gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
- gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
+ /* We need to drop the splitmux lock to acquire the state lock
+ * here and ensure there's no racy state change going on elsewhere */
+ muxer = gst_object_ref (splitmux->muxer);
+ sink = gst_object_ref (splitmux->active_sink);
+
+ GST_SPLITMUX_UNLOCK (splitmux);
+ GST_STATE_LOCK (splitmux);
+ gst_element_set_locked_state (muxer, TRUE);
+ gst_element_set_locked_state (sink, TRUE);
+ gst_element_set_state (muxer, GST_STATE_NULL);
+ gst_element_set_state (sink, GST_STATE_NULL);
+
+ GST_SPLITMUX_LOCK (splitmux);
set_next_filename (splitmux, ctx);
+ GST_SPLITMUX_UNLOCK (splitmux);
- gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
- gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
- gst_element_set_locked_state (splitmux->muxer, FALSE);
- gst_element_set_locked_state (splitmux->active_sink, FALSE);
+ gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
+ gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
+ gst_element_set_locked_state (muxer, FALSE);
+ gst_element_set_locked_state (sink, FALSE);
+ gst_object_unref (sink);
+ gst_object_unref (muxer);
+
+ GST_SPLITMUX_LOCK (splitmux);
+ GST_STATE_UNLOCK (splitmux);
splitmux->switching_fragment = FALSE;
do_async_done (splitmux);
- g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
-
- if (!splitmux->opening_first_fragment) {
- /* Switch state and go back to processing */
- if (!splitmux->reference_ctx->in_eos) {
- splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
- splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
- } else {
- splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
- splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
- splitmux->have_muxed_something = FALSE;
- }
- splitmux->have_muxed_something =
- (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
-
- /* Store the overflow parameters as the basis for the next fragment */
- splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
+ splitmux->ready_for_output = TRUE;
- GST_DEBUG_OBJECT (splitmux,
- "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
- GST_STIME_ARGS (splitmux->max_out_running_time));
- }
+ g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
send_fragment_opened_closed_msg (splitmux, TRUE);
- GST_SPLITMUX_BROADCAST (splitmux);
+ /* FIXME: Is this always the correct next state? */
+ splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
}
static void
send_fragment_opened_closed_msg (splitmux, FALSE);
- if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
- splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) {
+ if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
- splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
- GST_SPLITMUX_BROADCAST (splitmux);
+ splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
gst_message_unref (message);
GST_SPLITMUX_UNLOCK (splitmux);
return;
+ } else {
+ GST_DEBUG_OBJECT (splitmux,
+ "Passing EOS message. Output state %d max_out_running_time %"
+ GST_STIME_FORMAT, splitmux->output_state,
+ GST_STIME_ARGS (splitmux->max_out_running_time));
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
case GST_MESSAGE_ASYNC_DONE:
/* Ignore state changes from our children while switching */
if (splitmux->switching_fragment) {
- if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink ||
- GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
+ if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
+ || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
GST_LOG_OBJECT (splitmux,
"Ignoring state change from child %" GST_PTR_FORMAT
" while switching", GST_MESSAGE_SRC (message));
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
}
+static void
+ctx_set_unblock (MqStreamCtx * ctx)
+{
+ ctx->need_unblock = TRUE;
+}
+
/* Called with splitmux lock held */
/* Called when entering ProcessingCompleteGop state
* Assess if mq contents overflowed the current file
static void
handle_gathered_gop (GstSplitMuxSink * splitmux)
{
- GList *cur;
- guint64 queued_bytes = 0;
+ guint64 queued_bytes;
GstClockTimeDiff queued_time = 0;
+ GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
+ SplitMuxOutputCommand *cmd;
/* Assess if the multiqueue contents overflowed the current file */
- for (cur = g_list_first (splitmux->contexts);
- cur != NULL; cur = g_list_next (cur)) {
- MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
- if (tmpctx->in_running_time > queued_time)
- queued_time = tmpctx->in_running_time;
- queued_bytes += tmpctx->in_bytes;
- }
+ /* When considering if a newly gathered GOP overflows
+ * the time limit for the file, only consider the running time of the
+ * reference stream. Other streams might have run ahead a little bit,
+ * but extra pieces won't be released to the muxer beyond the reference
+ * stream cut-off anyway - so it forms the limit. */
+ queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
+ queued_time = splitmux->reference_ctx->in_running_time;
+
+ GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
- GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT
- " splitmuxsink->mux_start_bytes %" G_GUINT64_FORMAT, queued_bytes,
- splitmux->mux_start_bytes);
- g_assert (queued_bytes >= splitmux->mux_start_bytes);
- g_assert (queued_time >= splitmux->mux_start_time);
+ g_assert (queued_time >= splitmux->fragment_start_time);
- queued_bytes -= splitmux->mux_start_bytes;
- queued_time -= splitmux->mux_start_time;
+ queued_time -= splitmux->fragment_start_time;
/* Expand queued bytes estimate by muxer overhead */
queued_bytes += (queued_bytes * splitmux->mux_overhead);
/* Check for overrun - have we output at least one byte and overrun
* either threshold? */
- if ((splitmux->have_muxed_something &&
+ if ((splitmux->fragment_total_bytes > 0 &&
((splitmux->threshold_bytes > 0 &&
queued_bytes > splitmux->threshold_bytes) ||
(splitmux->threshold_time > 0 &&
queued_time > splitmux->threshold_time)))) {
- splitmux->state = SPLITMUX_STATE_ENDING_FILE;
- splitmux->update_mux_start_time = TRUE;
+ /* Tell the output side to start a new fragment */
GST_INFO_OBJECT (splitmux,
- "mq overflowed since last, draining out. max out TS is %"
- GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
- GST_SPLITMUX_BROADCAST (splitmux);
+ "This GOP (dur %" GST_STIME_FORMAT
+ ") would overflow the fragment, Sending start_new_fragment cmd",
+ GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
+ splitmux->gop_start_time));
+ cmd = out_cmd_buf_new ();
+ cmd->start_new_fragment = TRUE;
+ g_queue_push_head (&splitmux->out_cmd_q, cmd);
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
+
+ new_out_ts = splitmux->reference_ctx->in_running_time;
+ splitmux->fragment_start_time = splitmux->gop_start_time;
+ splitmux->fragment_total_bytes = 0;
+
+ if (request_next_keyframe (splitmux) == FALSE) {
+ GST_WARNING_OBJECT (splitmux,
+ "Could not request a keyframe. Files may not split at the exact location they should");
+ }
+ }
+ /* And set up to collect the next GOP */
+ if (!splitmux->reference_ctx->in_eos) {
+ splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
+ splitmux->gop_start_time = new_out_ts;
} else {
- /* No overflow */
- GST_LOG_OBJECT (splitmux,
- "This GOP didn't overflow the fragment. Bytes sent %" G_GUINT64_FORMAT
- " queued %" G_GUINT64_FORMAT " time %" GST_STIME_FORMAT " Continuing.",
- splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
- queued_bytes, GST_STIME_ARGS (queued_time));
+ /* This is probably already the current state, but just in case: */
+ splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
+ new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */
+ }
- /* Wake everyone up to push this one GOP, then sleep */
- splitmux->have_muxed_something = TRUE;
+ /* And wake all input contexts to send a wake-up event */
+ g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
+ GST_SPLITMUX_BROADCAST_INPUT (splitmux);
- if (!splitmux->reference_ctx->in_eos) {
- splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
- splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
- } else {
- splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
- splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
- }
+ /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
+ splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
- GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
- GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
- GST_SPLITMUX_BROADCAST (splitmux);
+ if (splitmux->gop_total_bytes > 0) {
+ GST_LOG_OBJECT (splitmux,
+ "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
+ " time %" GST_STIME_FORMAT,
+ splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
+
+ /* Send this GOP to the output command queue */
+ cmd = out_cmd_buf_new ();
+ cmd->start_new_fragment = FALSE;
+ cmd->max_output_ts = new_out_ts;
+ GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
+ GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
+ g_queue_push_head (&splitmux->out_cmd_q, cmd);
+
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
}
+ splitmux->gop_total_bytes = 0;
}
/* Called with splitmux lock held */
check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
GList *cur;
- gboolean ready = TRUE;
- GstClockTimeDiff current_max_in_running_time;
+ GstEvent *event;
+
+ /* On ENDING_FILE, the reference stream sends a command to start a new
+ * fragment, then releases the GOP for output in the new fragment.
+ * If somes streams received no buffer during the last GOP that overran,
+ * because its next buffer has a timestamp bigger than
+ * ctx->max_in_running_time, its queue is empty. In that case the only
+ * way to wakeup the output thread is by injecting an event in the
+ * queue. This usually happen with subtitle streams.
+ * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
+ if (ctx->need_unblock) {
+ GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
+ event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
+ GST_EVENT_TYPE_SERIALIZED,
+ gst_structure_new ("splitmuxsink-unblock", "timestamp",
+ G_TYPE_INT64, splitmux->max_in_running_time, NULL));
+
+ GST_SPLITMUX_UNLOCK (splitmux);
+ gst_pad_send_event (ctx->sinkpad, event);
+ GST_SPLITMUX_LOCK (splitmux);
+
+ ctx->need_unblock = FALSE;
+ GST_SPLITMUX_BROADCAST_INPUT (splitmux);
+ /* state may have changed while we were unlocked. Loop again if so */
+ if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
+ return;
+ }
+
+ if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
+ gboolean ready = TRUE;
- if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
/* Iterate each pad, and check that the input running time is at least
* up to the reference running time, and if so handle the collected GOP */
GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
" EOS %d", tmpctx, tmpctx->srcpad,
GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
- if (splitmux->max_in_running_time != G_MAXINT64 &&
+ if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
tmpctx->in_running_time < splitmux->max_in_running_time &&
!tmpctx->in_eos) {
GST_LOG_OBJECT (splitmux,
/* Some pad is not yet ready, or GOP is being pushed
* either way, sleep and wait to get woken */
- current_max_in_running_time = splitmux->max_in_running_time;
- while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
- splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
+ while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
!ctx->flushing &&
- (current_max_in_running_time == splitmux->max_in_running_time)) {
-
- GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
- splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
- "GOP complete" : "EOF draining", ctx);
- GST_SPLITMUX_WAIT (splitmux);
+ (ctx->in_running_time >= splitmux->max_in_running_time) &&
+ (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
+ GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
+ GST_SPLITMUX_WAIT_INPUT (splitmux);
GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
}
}
-static void
-check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
-{
- GList *cur;
- guint cur_len = g_queue_get_length (&ctx->queued_bufs);
-
- GST_DEBUG_OBJECT (ctx->sinkpad,
- "Checking queue length len %u cur_max %u queued gops %u",
- cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
-
- if (cur_len >= splitmux->mq_max_buffers) {
- gboolean allow_grow = FALSE;
-
- /* If collecting a GOP and this pad might block,
- * and there isn't already a pending GOP in the queue
- * then grow
- */
- if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
- ctx->in_running_time < splitmux->max_in_running_time &&
- splitmux->queued_gops <= 1) {
- allow_grow = TRUE;
- } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
- ctx->is_reference && splitmux->queued_gops <= 1) {
- allow_grow = TRUE;
- }
-
- if (!allow_grow) {
- for (cur = g_list_first (splitmux->contexts);
- cur != NULL; cur = g_list_next (cur)) {
- MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
- GST_DEBUG_OBJECT (tmpctx->sinkpad,
- " len %u out_blocked %d",
- g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
- /* If another stream is starving, grow */
- if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
- allow_grow = TRUE;
- }
- }
- }
-
- if (allow_grow) {
- splitmux->mq_max_buffers = cur_len + 1;
-
- GST_INFO_OBJECT (splitmux,
- "Multiqueue overrun - enlarging to %u buffers ctx %p",
- splitmux->mq_max_buffers, ctx);
-
- g_object_set (splitmux->mq, "max-size-buffers",
- splitmux->mq_max_buffers, NULL);
- }
- }
-}
-
static GstPadProbeReturn
handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
{
}
if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
GstEvent *event = gst_pad_probe_info_get_event (info);
+
+ GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
+
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_SEGMENT:
gst_event_copy_segment (event, &ctx->in_segment);
GST_SPLITMUX_LOCK (splitmux);
gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
ctx->in_eos = FALSE;
- ctx->in_bytes = 0;
ctx->in_running_time = GST_CLOCK_STIME_NONE;
GST_SPLITMUX_UNLOCK (splitmux);
break;
GST_SPLITMUX_LOCK (splitmux);
ctx->in_eos = TRUE;
- if (splitmux->state == SPLITMUX_STATE_STOPPED)
+ if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
goto beach;
if (ctx->is_reference) {
GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
- /* Act as if this is a new keyframe with infinite timestamp */
- splitmux->max_in_running_time = G_MAXINT64;
- splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
+ /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
+ splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
/* Wake up other input pads to collect this GOP */
- GST_SPLITMUX_BROADCAST (splitmux);
+ GST_SPLITMUX_BROADCAST_INPUT (splitmux);
check_completed_gop (splitmux, ctx);
- } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
+ } else if (splitmux->input_state ==
+ SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
/* If we are waiting for a GOP to be completed (ie, for aux
* pads to catch up), then this pad is complete, so check
* if the whole GOP is.
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
+ case GST_EVENT_GAP:{
+ GstClockTime gap_ts;
+ GstClockTimeDiff rtime;
+
+ gst_event_parse_gap (event, &gap_ts, NULL);
+ if (gap_ts == GST_CLOCK_TIME_NONE)
+ break;
+
+ GST_SPLITMUX_LOCK (splitmux);
+
+ if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
+ goto beach;
+ rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
+
+ GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (rtime));
+
+ if (ctx->is_reference
+ && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
+ splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
+ GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (splitmux->fragment_start_time));
+ /* Also take this as the first start time when starting up,
+ * so that we start counting overflow from the first frame */
+ if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
+ splitmux->max_in_running_time = splitmux->fragment_start_time;
+ }
+
+ GST_SPLITMUX_UNLOCK (splitmux);
+ break;
+ }
default:
break;
}
GST_SPLITMUX_LOCK (splitmux);
- if (splitmux->state == SPLITMUX_STATE_STOPPED)
+ if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
goto beach;
/* If this buffer has a timestamp, advance the input timestamp of the
buf_info->buf_size = gst_buffer_get_size (buf);
buf_info->duration = GST_BUFFER_DURATION (buf);
- /* Update total input byte counter for overflow detect */
- ctx->in_bytes += buf_info->buf_size;
-
- /* initialize mux_start_time */
- if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) {
- splitmux->mux_start_time = buf_info->run_ts;
+ /* initialize fragment_start_time */
+ if (ctx->is_reference
+ && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
+ splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
- GST_STIME_ARGS (splitmux->mux_start_time));
+ GST_STIME_ARGS (splitmux->fragment_start_time));
/* Also take this as the first start time when starting up,
* so that we start counting overflow from the first frame */
if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
- splitmux->max_in_running_time = splitmux->mux_start_time;
+ splitmux->max_in_running_time = splitmux->fragment_start_time;
+ if (request_next_keyframe (splitmux) == FALSE) {
+ GST_WARNING_OBJECT (splitmux,
+ "Could not request a keyframe. Files may not split at the exact location they should");
+ }
}
GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
- " total in_bytes %" G_GUINT64_FORMAT,
- GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes);
+ " total GOP bytes %" G_GUINT64_FORMAT,
+ GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
loop_again = TRUE;
do {
if (ctx->flushing)
break;
- switch (splitmux->state) {
- case SPLITMUX_STATE_COLLECTING_GOP_START:
+ switch (splitmux->input_state) {
+ case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
if (ctx->is_reference) {
/* If a keyframe, we have a complete GOP */
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
splitmux->max_in_running_time >= ctx->in_running_time) {
/* Pass this buffer through */
loop_again = FALSE;
+ /* Allow other input pads to catch up to here too */
+ splitmux->max_in_running_time = ctx->in_running_time;
+ GST_SPLITMUX_BROADCAST_INPUT (splitmux);
break;
}
GST_INFO_OBJECT (pad,
"Have keyframe with running time %" GST_STIME_FORMAT,
GST_STIME_ARGS (ctx->in_running_time));
keyframe = TRUE;
- splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
+ splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
splitmux->max_in_running_time = ctx->in_running_time;
/* Wake up other input pads to collect this GOP */
- GST_SPLITMUX_BROADCAST (splitmux);
+ GST_SPLITMUX_BROADCAST_INPUT (splitmux);
check_completed_gop (splitmux, ctx);
} else {
+ /* Pass this buffer if the reference ctx is far enough ahead */
+ if (ctx->in_running_time < splitmux->max_in_running_time) {
+ loop_again = FALSE;
+ break;
+ }
+
/* We're still waiting for a keyframe on the reference pad, sleep */
GST_LOG_OBJECT (pad, "Sleeping for GOP start");
- GST_SPLITMUX_WAIT (splitmux);
- GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
- splitmux->state);
+ GST_SPLITMUX_WAIT_INPUT (splitmux);
+ GST_LOG_OBJECT (pad,
+ "Done sleeping for GOP start input state now %d",
+ splitmux->input_state);
}
break;
- case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
+ case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
+ /* We're collecting a GOP. If this is the reference context,
+ * we need to check if this is a keyframe that marks the start
+ * of the next GOP. If it is, it marks the end of the GOP we're
+ * collecting, so sleep and wait until all the other pads also
+ * reach that timestamp - at which point, we have an entire GOP
+ * and either go to ENDING_FILE or release this GOP to the muxer and
+ * go back to COLLECT_GOP_START. */
/* If we overran the target timestamp, it might be time to process
* the GOP, otherwise bail out for more data
*/
GST_LOG_OBJECT (pad,
- "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT,
- GST_STIME_ARGS (ctx->in_running_time),
+ "Checking TS %" GST_STIME_FORMAT " against max %"
+ GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
GST_STIME_ARGS (splitmux->max_in_running_time));
if (ctx->in_running_time < splitmux->max_in_running_time) {
"Collected last packet of GOP. Checking other pads");
check_completed_gop (splitmux, ctx);
break;
- case SPLITMUX_STATE_ENDING_FILE:{
- GstEvent *event;
-
- /* If somes streams received no buffer during the last GOP that overran,
- * because its next buffer has a timestamp bigger than
- * ctx->max_in_running_time, its queue is empty. In that case the only
- * way to wakeup the output thread is by injecting an event in the
- * queue. This usually happen with subtitle streams.
- * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
- GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
- event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
- GST_EVENT_TYPE_SERIALIZED,
- gst_structure_new ("splitmuxsink-unblock", "timestamp",
- G_TYPE_INT64, splitmux->max_in_running_time, NULL));
-
- GST_SPLITMUX_UNLOCK (splitmux);
- gst_pad_send_event (ctx->sinkpad, event);
- GST_SPLITMUX_LOCK (splitmux);
- /* state may have changed while we were unlocked. Loop again if so */
- if (splitmux->state != SPLITMUX_STATE_ENDING_FILE)
- break;
- /* fallthrough */
}
- case SPLITMUX_STATE_START_NEXT_FRAGMENT:
- /* A fragment is ending, wait until that's done before continuing */
- GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
- GST_SPLITMUX_WAIT (splitmux);
- GST_DEBUG_OBJECT (pad,
- "Done sleeping for fragment restart state now %d", splitmux->state);
+ case SPLITMUX_INPUT_STATE_FINISHING_UP:
+ loop_again = FALSE;
break;
default:
loop_again = FALSE;
break;
}
- } while (loop_again);
+ }
+ while (loop_again);
if (keyframe) {
- splitmux->queued_gops++;
+ splitmux->queued_keyframes++;
buf_info->keyframe = TRUE;
}
+ /* Update total input byte counter for overflow detect */
+ splitmux->gop_total_bytes += buf_info->buf_size;
+
/* Now add this buffer to the queue just before returning */
g_queue_push_head (&ctx->queued_bufs, buf_info);
- /* Check the buffer will fit in the mq */
- check_queue_length (splitmux, ctx);
-
GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
" run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
return GST_PAD_PROBE_PASS;
}
+static void
+grow_blocked_queues (GstSplitMuxSink * splitmux)
+{
+ GList *cur;
+
+ /* Scan other queues for full-ness and grow them */
+ for (cur = g_list_first (splitmux->contexts);
+ cur != NULL; cur = g_list_next (cur)) {
+ MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
+ guint cur_limit;
+ guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
+
+ g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
+ GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
+
+ if (cur_len >= cur_limit) {
+ cur_limit = cur_len + 1;
+ GST_DEBUG_OBJECT (tmpctx->q,
+ "Queue overflowed and needs enlarging. Growing to %u buffers",
+ cur_limit);
+ g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
+ }
+ }
+}
+
+static void
+handle_q_underrun (GstElement * q, gpointer user_data)
+{
+ MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
+ GstSplitMuxSink *splitmux = ctx->splitmux;
+
+ GST_SPLITMUX_LOCK (splitmux);
+ GST_DEBUG_OBJECT (q,
+ "Queue reported underrun with %d keyframes and %d cmds enqueued",
+ splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
+ grow_blocked_queues (splitmux);
+ GST_SPLITMUX_UNLOCK (splitmux);
+}
+
+static void
+handle_q_overrun (GstElement * q, gpointer user_data)
+{
+ MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
+ GstSplitMuxSink *splitmux = ctx->splitmux;
+ gboolean allow_grow = FALSE;
+
+ GST_SPLITMUX_LOCK (splitmux);
+ GST_DEBUG_OBJECT (q,
+ "Queue reported overrun with %d keyframes and %d cmds enqueued",
+ splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
+
+ if (splitmux->queued_keyframes < 2) {
+ /* Less than a full GOP queued, grow the queue */
+ allow_grow = TRUE;
+ } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
+ allow_grow = TRUE;
+ } else {
+ /* If another queue is starved, grow */
+ GList *cur;
+ for (cur = g_list_first (splitmux->contexts);
+ cur != NULL; cur = g_list_next (cur)) {
+ MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
+ if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
+ allow_grow = TRUE;
+ }
+ }
+ }
+ GST_SPLITMUX_UNLOCK (splitmux);
+
+ if (allow_grow) {
+ guint cur_limit;
+
+ g_object_get (q, "max-size-buffers", &cur_limit, NULL);
+ cur_limit++;
+
+ GST_DEBUG_OBJECT (q,
+ "Queue overflowed and needs enlarging. Growing to %u buffers",
+ cur_limit);
+
+ g_object_set (q, "max-size-buffers", cur_limit, NULL);
+ }
+}
+
static GstPad *
gst_splitmux_sink_request_new_pad (GstElement * element,
GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
GstPadTemplate *mux_template = NULL;
GstPad *res = NULL;
- GstPad *mq_sink, *mq_src;
+ GstElement *q;
+ GstPad *q_sink = NULL, *q_src = NULL;
gchar *gname;
gboolean is_video = FALSE;
MqStreamCtx *ctx;
GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
GST_SPLITMUX_LOCK (splitmux);
- if (!create_elements (splitmux))
+ if (!create_muxer (splitmux))
goto fail;
if (templ->name_template) {
else
gname = g_strdup (name);
- if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
- gst_element_release_request_pad (splitmux->muxer, res);
- gst_object_unref (GST_OBJECT (res));
+ if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
goto fail;
- }
- if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
+ gst_element_set_state (q, GST_STATE_TARGET (splitmux));
+
+ g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
+ "max-size-buffers", 5, NULL);
+
+ q_sink = gst_element_get_static_pad (q, "sink");
+ q_src = gst_element_get_static_pad (q, "src");
+
+ if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
gst_element_release_request_pad (splitmux->muxer, res);
gst_object_unref (GST_OBJECT (res));
- gst_element_release_request_pad (splitmux->mq, mq_sink);
- gst_object_unref (GST_OBJECT (mq_sink));
goto fail;
}
gst_object_unref (GST_OBJECT (res));
ctx = mq_stream_ctx_new (splitmux);
- ctx->srcpad = mq_src;
- ctx->sinkpad = mq_sink;
+ /* Context holds a ref: */
+ ctx->q = gst_object_ref (q);
+ ctx->srcpad = q_src;
+ ctx->sinkpad = q_sink;
+ ctx->q_overrun_id =
+ g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
+ g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
mq_stream_ctx_ref (ctx);
ctx->src_pad_block_id =
- gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
+ gst_pad_add_probe (q_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
(GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
_pad_block_destroy_src_notify);
if (is_video && splitmux->reference_ctx != NULL) {
ctx->is_reference = TRUE;
}
- res = gst_ghost_pad_new_from_template (gname, mq_sink, templ);
+ res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
mq_stream_ctx_ref (ctx);
ctx->sink_pad_block_id =
- gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
+ gst_pad_add_probe (q_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
(GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
_pad_block_destroy_sink_notify);
GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
- " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
+ " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
g_free (gname);
- gst_object_unref (mq_sink);
- gst_object_unref (mq_src);
-
if (is_video)
splitmux->have_video = TRUE;
return res;
fail:
GST_SPLITMUX_UNLOCK (splitmux);
+
+ if (q_sink)
+ gst_object_unref (q_sink);
+ if (q_src)
+ gst_object_unref (q_src);
return NULL;
already_have_video:
GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
{
GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
- GstPad *mqsink, *mqsrc = NULL, *muxpad = NULL;
+ GstPad *muxpad = NULL;
MqStreamCtx *ctx =
(MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
GST_SPLITMUX_LOCK (splitmux);
- if (splitmux->muxer == NULL || splitmux->mq == NULL)
+ if (splitmux->muxer == NULL)
goto fail; /* Elements don't exist yet - nothing to release */
GST_INFO_OBJECT (pad, "releasing request pad");
- mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
- /* The ghostpad target might have disappeared during pipeline destruct */
- if (mqsink)
- mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
- if (mqsrc)
- muxpad = gst_pad_get_peer (mqsrc);
+ muxpad = gst_pad_get_peer (ctx->srcpad);
/* Remove the context from our consideration */
splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
if (ctx == splitmux->reference_ctx)
splitmux->reference_ctx = NULL;
- /* Release and free the mq input */
- if (mqsink) {
- gst_element_release_request_pad (splitmux->mq, mqsink);
- gst_object_unref (mqsink);
- }
-
/* Release and free the muxer input */
if (muxpad) {
gst_element_release_request_pad (splitmux->muxer, muxpad);
gst_object_unref (muxpad);
}
- if (mqsrc)
- gst_object_unref (mqsrc);
-
if (GST_PAD_PAD_TEMPLATE (pad) &&
- g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE (pad)),
- "video"))
+ g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
+ (pad)), "video"))
splitmux->have_video = FALSE;
gst_element_remove_pad (element, pad);
}
static gboolean
-create_elements (GstSplitMuxSink * splitmux)
+create_muxer (GstSplitMuxSink * splitmux)
{
/* Create internal elements */
- if (splitmux->mq == NULL) {
- if ((splitmux->mq =
- create_element (splitmux, "multiqueue", "multiqueue",
- FALSE)) == NULL)
- goto fail;
-
- splitmux->mq_max_buffers = 5;
- /* No bytes or time limit, we limit buffers manually */
- g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
- (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
- }
-
if (splitmux->muxer == NULL) {
GstElement *provided_muxer = NULL;
}
}
+#if 1
+ if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
+ "async") != NULL) {
+ /* async child elements are causing state change races and weird
+ * failures, so let's try and turn that off */
+ g_object_set (splitmux->sink, "async", FALSE, NULL);
+ }
+#endif
+
if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
goto fail;
switch (transition) {
case GST_STATE_CHANGE_NULL_TO_READY:{
GST_SPLITMUX_LOCK (splitmux);
- if (!create_elements (splitmux) || !create_sink (splitmux)) {
+ if (!create_muxer (splitmux) || !create_sink (splitmux)) {
ret = GST_STATE_CHANGE_FAILURE;
GST_SPLITMUX_UNLOCK (splitmux);
goto beach;
case GST_STATE_CHANGE_READY_TO_PAUSED:{
GST_SPLITMUX_LOCK (splitmux);
/* Start by collecting one input on each pad */
- splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
+ splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
+ splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
- splitmux->muxed_out_time = splitmux->mux_start_time =
- GST_CLOCK_STIME_NONE;
- splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
- splitmux->opening_first_fragment = TRUE;
+ splitmux->gop_start_time = splitmux->muxed_out_time =
+ splitmux->fragment_start_time = GST_CLOCK_STIME_NONE;
+ splitmux->muxed_out_bytes = 0;
GST_SPLITMUX_UNLOCK (splitmux);
break;
}
case GST_STATE_CHANGE_PAUSED_TO_READY:
case GST_STATE_CHANGE_READY_TO_NULL:
GST_SPLITMUX_LOCK (splitmux);
- splitmux->state = SPLITMUX_STATE_STOPPED;
+ splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
+ splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
/* Wake up any blocked threads */
GST_LOG_OBJECT (splitmux,
"State change -> NULL or READY. Waking threads");
- GST_SPLITMUX_BROADCAST (splitmux);
+ GST_SPLITMUX_BROADCAST_INPUT (splitmux);
+ GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
break;
default:
ret == GST_STATE_CHANGE_FAILURE) {
/* Cleanup elements on failed transition out of NULL */
gst_splitmux_reset (splitmux);
+ GST_SPLITMUX_LOCK (splitmux);
+ do_async_done (splitmux);
+ GST_SPLITMUX_UNLOCK (splitmux);
}
- GST_SPLITMUX_LOCK (splitmux);
- do_async_done (splitmux);
- GST_SPLITMUX_UNLOCK (splitmux);
return ret;
}