splitmuxsink: Prevent hang going back to NULL after failures
authorJan Schmidt <jan@centricular.com>
Wed, 7 Jul 2021 14:12:52 +0000 (00:12 +1000)
committerJan Schmidt <jan@centricular.com>
Mon, 26 Jul 2021 06:22:23 +0000 (16:22 +1000)
Prevent a condition where splitmuxsink won't go back to NULL state
after a child element fails to change state by making sure that
a READY->READY state change doesn't fail, and by returning
GST_FLOW_ERROR or GST_FLOW_FLUSHING upstream to shut down streaming
as quickly as possible.

This can happen after (for example) setting an invalid filename
on the sink element. In that case, the READY->PAUSED transition
fails, but with internal elements still in the NULL state. Trying
to set splitmuxsink back to NULL then ends up trying to bring
those NULL elements up to READY with a READY->READY transition,
(which fails, prevent splitmuxsink from getting to NULL)

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/merge_requests/1023>

gst/multifile/gstsplitmuxsink.c
tests/check/elements/splitmuxsink.c

index 8b02476..b040857 100644 (file)
@@ -234,7 +234,8 @@ static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
 
 static void bus_handler (GstBin * bin, GstMessage * msg);
 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
-static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
+static GstFlowReturn start_next_fragment (GstSplitMuxSink * splitmux,
+    MqStreamCtx * ctx);
 static void mq_stream_ctx_free (MqStreamCtx * ctx);
 static void grow_blocked_queues (GstSplitMuxSink * splitmux);
 
@@ -1226,11 +1227,11 @@ all_contexts_are_async_eos (GstSplitMuxSink * splitmux)
  * context needs to sleep to wait for the release of the
  * next GOP, or to send EOS to close out the current file
  */
-static void
+static GstFlowReturn
 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 {
   if (ctx->caps_change)
-    return;
+    return GST_FLOW_OK;
 
   do {
     /* When first starting up, the reference stream has to output
@@ -1252,7 +1253,7 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 
     if (ctx->flushing
         || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
-      return;
+      return GST_FLOW_FLUSHING;
 
     GST_LOG_OBJECT (ctx->srcpad,
         "Checking running time %" GST_STIME_FORMAT " against max %"
@@ -1262,7 +1263,7 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
     if (can_output) {
       if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
           ctx->out_running_time < my_max_out_running_time) {
-        return;
+        return GST_FLOW_OK;
       }
 
       switch (splitmux->output_state) {
@@ -1306,12 +1307,17 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
           break;
         case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
           if (ctx->is_reference) {
+            GstFlowReturn ret = GST_FLOW_OK;
+
             /* Special handling on the reference ctx to start new fragments
              * and collect commands from the command queue */
             /* drops the splitmux lock briefly: */
             /* We must have reference ctx in order for format-location-full to
              * have a sample */
-            start_next_fragment (splitmux, ctx);
+            ret = start_next_fragment (splitmux, ctx);
+            if (ret != GST_FLOW_OK)
+              return ret;
+
             continue;
           }
           break;
@@ -1354,7 +1360,7 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
           continue;
         }
         case SPLITMUX_OUTPUT_STATE_STOPPED:
-          return;
+          return GST_FLOW_FLUSHING;
       }
     } else {
       GST_LOG_OBJECT (ctx->srcpad, "Not yet ready for output");
@@ -1371,6 +1377,8 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
         GST_STIME_ARGS (splitmux->max_out_running_time));
   }
   while (1);
+
+  return GST_FLOW_OK;
 }
 
 static GstClockTime
@@ -1557,6 +1565,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
 {
   GstSplitMuxSink *splitmux = ctx->splitmux;
   MqStreamBuf *buf_info = NULL;
+  GstFlowReturn ret = GST_FLOW_OK;
 
   GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
 
@@ -1667,8 +1676,9 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
           goto beach;
         ctx->out_running_time = ts;
         if (!ctx->is_reference)
-          complete_or_wait_on_out (splitmux, ctx);
+          ret = complete_or_wait_on_out (splitmux, ctx);
         GST_SPLITMUX_UNLOCK (splitmux);
+        GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
         return GST_PAD_PROBE_DROP;
       }
       case GST_EVENT_CAPS:{
@@ -1717,27 +1727,31 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
     if (!locked)
       GST_SPLITMUX_LOCK (splitmux);
     if (wait)
-      complete_or_wait_on_out (splitmux, ctx);
+      ret = complete_or_wait_on_out (splitmux, ctx);
     GST_SPLITMUX_UNLOCK (splitmux);
 
     /* Don't try to forward sticky events before the next buffer is there
      * because it would cause a new file to be created without the first
      * buffer being available.
      */
+    GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
     if (ctx->caps_change && GST_EVENT_IS_STICKY (event)) {
       gst_event_unref (event);
       return GST_PAD_PROBE_HANDLED;
-    } else
+    } else {
       return GST_PAD_PROBE_PASS;
+    }
   }
 
   /* Allow everything through until the configured next stopping point */
   GST_SPLITMUX_LOCK (splitmux);
 
   buf_info = g_queue_pop_tail (&ctx->queued_bufs);
-  if (buf_info == NULL)
+  if (buf_info == NULL) {
     /* Can only happen due to a poorly timed flush */
+    ret = GST_FLOW_FLUSHING;
     goto beach;
+  }
 
   /* If we have popped a keyframe, decrement the queued_gop count */
   if (buf_info->keyframe && splitmux->queued_keyframes > 0)
@@ -1753,7 +1767,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
 
   ctx->caps_change = FALSE;
 
-  complete_or_wait_on_out (splitmux, ctx);
+  ret = complete_or_wait_on_out (splitmux, ctx);
 
   splitmux->muxed_out_bytes += buf_info->buf_size;
 
@@ -1785,10 +1799,12 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
 
   mq_stream_buf_free (buf_info);
 
+  GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
   return GST_PAD_PROBE_PASS;
 
 beach:
   GST_SPLITMUX_UNLOCK (splitmux);
+  GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
   return GST_PAD_PROBE_DROP;
 }
 
@@ -1907,7 +1923,7 @@ _send_event (const GValue * value, gpointer user_data)
  * reaches EOS and it is time to restart
  * a new fragment
  */
-static void
+static GstFlowReturn
 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 {
   GstElement *muxer, *sink;
@@ -1928,8 +1944,9 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
   if (splitmux->shutdown) {
     GST_DEBUG_OBJECT (splitmux,
         "Shutdown requested. Aborting fragment switch.");
+    GST_SPLITMUX_LOCK (splitmux);
     GST_SPLITMUX_STATE_UNLOCK (splitmux);
-    return;
+    return GST_FLOW_FLUSHING;
   }
 
   if (splitmux->async_finalize) {
@@ -2032,8 +2049,24 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
   splitmux->muxed_out_bytes = 0;
   GST_SPLITMUX_UNLOCK (splitmux);
 
-  gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
-  gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
+  if (gst_element_set_state (sink,
+          GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
+    gst_element_set_state (sink, GST_STATE_NULL);
+    gst_element_set_locked_state (muxer, FALSE);
+    gst_element_set_locked_state (sink, FALSE);
+
+    goto fail_output;
+  }
+
+  if (gst_element_set_state (muxer,
+          GST_STATE_TARGET (splitmux)) == GST_STATE_CHANGE_FAILURE) {
+    gst_element_set_state (muxer, GST_STATE_NULL);
+    gst_element_set_state (sink, GST_STATE_NULL);
+    gst_element_set_locked_state (muxer, FALSE);
+    gst_element_set_locked_state (sink, FALSE);
+    goto fail_muxer;
+  }
+
   gst_element_set_locked_state (muxer, FALSE);
   gst_element_set_locked_state (sink, FALSE);
 
@@ -2056,12 +2089,32 @@ start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
   GST_LOG_OBJECT (splitmux, "Resetting state to AWAITING_COMMAND");
   splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
   GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
-  return;
+  return GST_FLOW_OK;
 
 fail:
+  GST_SPLITMUX_LOCK (splitmux);
   GST_SPLITMUX_STATE_UNLOCK (splitmux);
   GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
       ("Could not create the new muxer/sink"), NULL);
+  return GST_FLOW_ERROR;
+
+fail_output:
+  GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
+      ("Could not start new output sink"), NULL);
+
+  GST_SPLITMUX_LOCK (splitmux);
+  GST_SPLITMUX_STATE_UNLOCK (splitmux);
+  splitmux->switching_fragment = FALSE;
+  return GST_FLOW_ERROR;
+
+fail_muxer:
+  GST_ELEMENT_ERROR (splitmux, RESOURCE, SETTINGS,
+      ("Could not start new muxer"), NULL);
+
+  GST_SPLITMUX_LOCK (splitmux);
+  GST_SPLITMUX_STATE_UNLOCK (splitmux);
+  splitmux->switching_fragment = FALSE;
+  return GST_FLOW_ERROR;
 }
 
 static void
@@ -2578,6 +2631,7 @@ static GstPadProbeReturn
 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
 {
   GstSplitMuxSink *splitmux = ctx->splitmux;
+  GstFlowReturn ret = GST_FLOW_OK;
   GstBuffer *buf;
   MqStreamBuf *buf_info = NULL;
   GstClockTime ts;
@@ -2612,8 +2666,10 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         GST_SPLITMUX_LOCK (splitmux);
         ctx->in_eos = TRUE;
 
-        if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
+        if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
+          ret = GST_FLOW_FLUSHING;
           goto beach;
+        }
 
         if (ctx->is_reference) {
           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
@@ -2642,8 +2698,10 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
 
         GST_SPLITMUX_LOCK (splitmux);
 
-        if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
+        if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
+          ret = GST_FLOW_FLUSHING;
           goto beach;
+        }
         rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
 
         GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
@@ -2688,8 +2746,10 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
 
   GST_SPLITMUX_LOCK (splitmux);
 
-  if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
+  if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED) {
+    ret = GST_FLOW_FLUSHING;
     goto beach;
+  }
 
   /* If this buffer has a timestamp, advance the input timestamp of the
    * stream */
@@ -2875,12 +2935,14 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
 
   GST_SPLITMUX_UNLOCK (splitmux);
+  GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
   return GST_PAD_PROBE_PASS;
 
 beach:
   GST_SPLITMUX_UNLOCK (splitmux);
   if (buf_info)
     mq_stream_buf_free (buf_info);
+  GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
   return GST_PAD_PROBE_PASS;
 }
 
@@ -3625,9 +3687,10 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
       break;
     }
     case GST_STATE_CHANGE_PAUSED_TO_READY:
+    case GST_STATE_CHANGE_READY_TO_READY:
       g_atomic_int_set (&(splitmux->split_requested), FALSE);
       g_atomic_int_set (&(splitmux->do_split_next_gop), FALSE);
-
+      /* Fall through */
     case GST_STATE_CHANGE_READY_TO_NULL:
       GST_SPLITMUX_STATE_LOCK (splitmux);
       splitmux->shutdown = TRUE;
@@ -3682,15 +3745,24 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
       break;
   }
 
+  return ret;
+
 beach:
-  if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
-      ret == GST_STATE_CHANGE_FAILURE) {
+  if (transition == GST_STATE_CHANGE_NULL_TO_READY) {
     /* Cleanup elements on failed transition out of NULL */
     gst_splitmux_reset_elements (splitmux);
     GST_SPLITMUX_LOCK (splitmux);
     do_async_done (splitmux);
     GST_SPLITMUX_UNLOCK (splitmux);
   }
+  if (transition == GST_STATE_CHANGE_READY_TO_READY) {
+    /* READY to READY transition only happens when we're already
+     * in READY state, but a child element is in NULL, which
+     * happens when there's an error changing the state of the sink.
+     * We need to make sure not to fail the state transition, or
+     * the core won't transition us back to NULL successfully */
+    ret = GST_STATE_CHANGE_SUCCESS;
+  }
   return ret;
 }
 
index 9db9b91..93c86e7 100644 (file)
@@ -433,6 +433,43 @@ GST_START_TEST (test_splitmuxsink)
 
 GST_END_TEST;
 
+GST_START_TEST (test_splitmuxsink_clean_failure)
+{
+  GstMessage *msg;
+  GstElement *pipeline;
+  GstElement *sink, *fakesink;
+
+  /* This pipeline has a small time cutoff - it should start a new file
+   * every GOP, ie 1 second */
+  pipeline =
+      gst_parse_launch
+      ("videotestsrc horizontal-speed=2 is-live=true ! video/x-raw,width=80,height=64,framerate=5/1 ! videoconvert !"
+      " queue ! theoraenc keyframe-force=5 ! splitmuxsink name=splitsink "
+      " max-size-time=1000000 max-size-bytes=1000000 muxer=oggmux", NULL);
+  fail_if (pipeline == NULL);
+  sink = gst_bin_get_by_name (GST_BIN (pipeline), "splitsink");
+  fail_if (sink == NULL);
+
+  fakesink = gst_element_factory_make ("fakesink", "fakesink-fail");
+  fail_if (fakesink == NULL);
+
+  /* Trigger an error on READY->PAUSED */
+  g_object_set (fakesink, "state-error", 2, NULL);
+  g_object_set (sink, "sink", fakesink, NULL);
+  gst_object_unref (sink);
+
+  msg = run_pipeline (pipeline);
+
+  fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_ERROR);
+  gst_message_unref (msg);
+
+  fail_unless (gst_element_set_state (pipeline,
+          GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
+  gst_object_unref (pipeline);
+}
+
+GST_END_TEST;
+
 GST_START_TEST (test_splitmuxsink_multivid)
 {
   GstMessage *msg;
@@ -807,6 +844,7 @@ splitmuxsink_suite (void)
     tcase_add_checked_fixture (tc_chain, tempdir_setup, tempdir_cleanup);
 
     tcase_add_test (tc_chain, test_splitmuxsink);
+    tcase_add_test (tc_chain, test_splitmuxsink_clean_failure);
 
     if (have_matroska && have_vorbis) {
       tcase_add_checked_fixture (tc_chain_complex, tempdir_setup,