static void bus_handler (GstBin * bin, GstMessage * msg);
static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
-static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
+static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
+ MqStreamCtx * ctx);
static void mq_stream_ctx_free (MqStreamCtx * ctx);
static void grow_blocked_queues (GstSplitMuxSink * splitmux);
* context needs to sleep to wait for the release of the
* next GOP, or to send EOS to close out the current file
*/
-static void
+static GstFlowReturn
complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
if (ctx->caps_change)
- return;
+ return GST_FLOW_OK;
do {
/* When first starting up, the reference stream has to output
if (ctx->flushing
|| splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
- return;
+ return GST_FLOW_FLUSHING;
GST_LOG_OBJECT (ctx->srcpad,
"Checking running time %" GST_STIME_FORMAT " against max %"
if (can_output) {
if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
ctx->out_running_time < my_max_out_running_time) {
- return;
+ return GST_FLOW_OK;
}
switch (splitmux->output_state) {
break;
case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
if (ctx->is_reference) {
+ GstFlowReturn ret = GST_FLOW_OK;
+
/* Special handling on the reference ctx to start new fragments
* and collect commands from the command queue */
/* drops the splitmux lock briefly: */
/* We must have reference ctx in order for format-location-full to
* have a sample */
- start_next_fragment (splitmux, ctx);
+ ret = start_next_fragment (splitmux, ctx);
+ if (ret != GST_FLOW_OK)
+ return ret;
+
continue;
}
break;
continue;
}
case SPLITMUX_OUTPUT_STATE_STOPPED:
- return;
+ return GST_FLOW_FLUSHING;
}
} else {
GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
GST_STIME_ARGS (splitmux->max_out_running_time));
}
while (1);
+
+ return GST_FLOW_OK;
}
static GstClockTime
{
GstSplitMuxSink *splitmux = ctx->splitmux;
MqStreamBuf *buf_info = NULL;
+ GstFlowReturn ret = GST_FLOW_OK;
GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
goto beach;
ctx->out_running_time = ts;
if (!ctx->is_reference)
- complete_or_wait_on_out (splitmux, ctx);
+ ret = complete_or_wait_on_out (splitmux, ctx);
GST_SPLITMUX_UNLOCK (splitmux);
+ GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
return GST_PAD_PROBE_DROP;
}
case GST_EVENT_CAPS:{
if (!locked)
GST_SPLITMUX_LOCK (splitmux);
if (wait)
- complete_or_wait_on_out (splitmux, ctx);
+ ret = complete_or_wait_on_out (splitmux, ctx);
GST_SPLITMUX_UNLOCK (splitmux);
/* Don't try to forward sticky events before the next buffer is there
* because it would cause a new file to be created without the first
* buffer being available.
*/
+ GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
gst_event_unref (event);
return GST_PAD_PROBE_HANDLED;
- } else
+ } else {
return GST_PAD_PROBE_PASS;
+ }
}
/* Allow everything through until the configured next stopping point */
GST_SPLITMUX_LOCK (splitmux);
buf_info = g_queue_pop_tail (&ctx->queued_bufs);
- if (buf_info == NULL)
+ if (buf_info == NULL) {
/* Can only happen due to a poorly timed flush */
+ ret = GST_FLOW_FLUSHING;
goto beach;
+ }
/* If we have popped a keyframe, decrement the queued_gop count */
if (buf_info->keyframe && splitmux->queued_keyframes > 0)
ctx->caps_change = FALSE;
- complete_or_wait_on_out (splitmux, ctx);
+ ret = complete_or_wait_on_out (splitmux, ctx);
splitmux->muxed_out_bytes += buf_info->buf_size;
mq_stream_buf_free (buf_info);
+ GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
return GST_PAD_PROBE_PASS;
beach:
GST_SPLITMUX_UNLOCK (splitmux);
+ GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
return GST_PAD_PROBE_DROP;
}
* reaches EOS and it is time to restart
* a new fragment
*/
-static void
+static GstFlowReturn
start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
{
GstElement *muxer, *sink;
if (splitmux->shutdown) {
GST_DEBUG_OBJECT (splitmux,
"Shutdown requested. Aborting fragment switch.");
+ GST_SPLITMUX_LOCK (splitmux);
GST_SPLITMUX_STATE_UNLOCK (splitmux);
- return;
+ return GST_FLOW_FLUSHING;
}
if (splitmux->async_finalize) {
splitmux->muxed_out_bytes = 0;
GST_SPLITMUX_UNLOCK (splitmux);
- gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
- gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
+ if (gst_element_set_state (sink,
+ GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
+ gst_element_set_state (sink, GST_STATE_NULL);
+ gst_element_set_locked_state (muxer, FALSE);
+ gst_element_set_locked_state (sink, FALSE);
+
+ goto fail_output;
+ }
+
+ if (gst_element_set_state (muxer,
+ GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
+ gst_element_set_state (muxer, GST_STATE_NULL);
+ gst_element_set_state (sink, GST_STATE_NULL);
+ gst_element_set_locked_state (muxer, FALSE);
+ gst_element_set_locked_state (sink, FALSE);
+ goto fail_muxer;
+ }
+
gst_element_set_locked_state (muxer, FALSE);
gst_element_set_locked_state (sink, FALSE);
GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
- return;
+ return GST_FLOW_OK;
fail:
+ GST_SPLITMUX_LOCK (splitmux);
GST_SPLITMUX_STATE_UNLOCK (splitmux);
GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
("Could not create the new muxer/sink"), NULL);
+ return GST_FLOW_ERROR;
+
+fail_output:
+ GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
+ ("Could not start new output sink"), NULL);
+
+ GST_SPLITMUX_LOCK (splitmux);
+ GST_SPLITMUX_STATE_UNLOCK (splitmux);
+ splitmux->switching_fragment = FALSE;
+ return GST_FLOW_ERROR;
+
+fail_muxer:
+ GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
+ ("Could not start new muxer"), NULL);
+
+ GST_SPLITMUX_LOCK (splitmux);
+ GST_SPLITMUX_STATE_UNLOCK (splitmux);
+ splitmux->switching_fragment = FALSE;
+ return GST_FLOW_ERROR;
}
static void
handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
{
GstSplitMuxSink *splitmux = ctx->splitmux;
+ GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *buf;
MqStreamBuf *buf_info = NULL;
GstClockTime ts;
GST_SPLITMUX_LOCK (splitmux);
ctx->in_eos = TRUE;
- if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
+ if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
+ ret = GST_FLOW_FLUSHING;
goto beach;
+ }
if (ctx->is_reference) {
GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
GST_SPLITMUX_LOCK (splitmux);
- if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
+ if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
+ ret = GST_FLOW_FLUSHING;
goto beach;
+ }
rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
GST_SPLITMUX_LOCK (splitmux);
- if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
+ if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
+ ret = GST_FLOW_FLUSHING;
goto beach;
+ }
/* If this buffer has a timestamp, advance the input timestamp of the
* stream */
" run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
GST_SPLITMUX_UNLOCK (splitmux);
+ GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
return GST_PAD_PROBE_PASS;
beach:
GST_SPLITMUX_UNLOCK (splitmux);
if (buf_info)
mq_stream_buf_free (buf_info);
+ GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
return GST_PAD_PROBE_PASS;
}
break;
}
case GST_STATE_CHANGE_PAUSED_TO_READY:
+ case GST_STATE_CHANGE_READY_TO_READY:
g_atomic_int_set (&(splitmux->split_requested), FALSE);
g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
-
+ /* Fall through */
case GST_STATE_CHANGE_READY_TO_NULL:
GST_SPLITMUX_STATE_LOCK (splitmux);
splitmux->shutdown = TRUE;
break;
}
+ return ret;
+
beach:
- if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
- ret == GST_STATE_CHANGE_FAILURE) {
+ if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
/* Cleanup elements on failed transition out of NULL */
gst_splitmux_reset_elements (splitmux);
GST_SPLITMUX_LOCK (splitmux);
do_async_done (splitmux);
GST_SPLITMUX_UNLOCK (splitmux);
}
+ if (transition == GST_STATE_CHANGE_READY_TO_READY) {
+ /* READY to READY transition only happens when we're already
+ * in READY state, but a child element is in NULL, which
+ * happens when there's an error changing the state of the sink.
+ * We need to make sure not to fail the state transition, or
+ * the core won't transition us back to NULL successfully */
+ ret = GST_STATE_CHANGE_SUCCESS;
+ }
return ret;
}