splitmuxsink: Add format-location-full signal
authorJan Schmidt <jan@centricular.com>
Thu, 17 Nov 2016 12:40:27 +0000 (23:40 +1100)
committerJan Schmidt <jan@centricular.com>
Mon, 2 Jan 2017 14:34:02 +0000 (01:34 +1100)
Add a new signal for formatting the filename, which receives
a GstSample containing the first buffer from the reference
stream that will be muxed into that file.

Useful for creating filenames that are based on the
running time or other attributes of the buffer.

To make it work, opening of files and setting filenames is
now deferred until there is some data to write to it,
which also requires some changes to how async state changes
and gap events are handled.

gst/multifile/gstsplitmuxsink.c
gst/multifile/gstsplitmuxsink.h
tests/check/elements/splitmux.c

index 0f05f7b..5d8b75f 100644 (file)
@@ -90,6 +90,7 @@ enum
 enum
 {
   SIGNAL_FORMAT_LOCATION,
+  SIGNAL_FORMAT_LOCATION_FULL,
   SIGNAL_LAST
 };
 
@@ -140,13 +141,15 @@ static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
     element, GstStateChange transition);
 
 static void bus_handler (GstBin * bin, GstMessage * msg);
-static void set_next_filename (GstSplitMuxSink * splitmux);
-static void start_next_fragment (GstSplitMuxSink * splitmux);
+static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
+static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
 
 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
 
+static void do_async_done (GstSplitMuxSink * splitmux);
+
 static MqStreamBuf *
 mq_stream_buf_new (void)
 {
@@ -244,6 +247,20 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
   signals[SIGNAL_FORMAT_LOCATION] =
       g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
+
+  /**
+   * GstSplitMuxSink::format-location-full:
+   * @splitmux: the #GstSplitMuxSink
+   * @fragment_id: the sequence number of the file to be created
+   * @first_sample: A #GstSample containing the first buffer
+   *   from the reference stream in the new file
+   *
+   * Returns: the location to be used for the next output file
+   */
+  signals[SIGNAL_FORMAT_LOCATION_FULL] =
+      g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
+      GST_TYPE_SAMPLE);
 }
 
 static void
@@ -614,8 +631,9 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
         send_eos (splitmux, ctx);
         continue;
       }
-    } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
-      start_next_fragment (splitmux);
+    } else if (ctx->is_reference
+        && splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
+      start_next_fragment (splitmux, ctx);
       continue;
     }
 
@@ -713,6 +731,20 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         if (splitmux->state == SPLITMUX_STATE_STOPPED)
           goto beach;
 
+        /* When we get a gap event on the
+         * reference stream and we're trying to open a
+         * new file, we need to store it until we get
+         * the buffer afterwards
+         */
+        if (ctx->is_reference &&
+            (splitmux->opening_first_fragment ||
+                splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT)) {
+          GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
+          gst_event_replace (&ctx->pending_gap, event);
+          GST_SPLITMUX_UNLOCK (splitmux);
+          return GST_PAD_PROBE_HANDLED;
+        }
+
         if (rtime != GST_CLOCK_STIME_NONE) {
           ctx->out_running_time = rtime;
           complete_or_wait_on_out (splitmux, ctx);
@@ -735,7 +767,8 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         if (splitmux->state == SPLITMUX_STATE_STOPPED)
           goto beach;
         ctx->out_running_time = ts;
-        complete_or_wait_on_out (splitmux, ctx);
+        if (!ctx->is_reference)
+          complete_or_wait_on_out (splitmux, ctx);
         GST_SPLITMUX_UNLOCK (splitmux);
         return GST_PAD_PROBE_DROP;
       }
@@ -758,17 +791,18 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
     splitmux->queued_gops--;
 
   ctx->out_running_time = buf_info->run_ts;
+  ctx->cur_buffer = gst_pad_probe_info_get_buffer (info);
 
   GST_LOG_OBJECT (splitmux,
       "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
       " size %" G_GUINT64_FORMAT,
       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
 
-  if (splitmux->opening_first_fragment) {
+  if (ctx->is_reference && splitmux->opening_first_fragment) {
     if (request_next_keyframe (splitmux) == FALSE)
       GST_WARNING_OBJECT (splitmux,
           "Could not request a keyframe. Files may not split at the exact location they should");
-    send_fragment_opened_closed_msg (splitmux, TRUE);
+    start_next_fragment (splitmux, ctx);
     splitmux->opening_first_fragment = FALSE;
   }
 
@@ -797,8 +831,23 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   }
 #endif
 
+  ctx->cur_buffer = NULL;
   GST_SPLITMUX_UNLOCK (splitmux);
 
+  /* pending_gap is protected by the STREAM lock */
+  if (ctx->pending_gap) {
+    /* If we previously stored a gap event, send it now */
+    GstPad *peer = gst_pad_get_peer (ctx->srcpad);
+
+    GST_DEBUG_OBJECT (splitmux,
+        "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
+
+    gst_pad_send_event (peer, ctx->pending_gap);
+    ctx->pending_gap = NULL;
+
+    gst_object_unref (peer);
+  }
+
   mq_stream_buf_free (buf_info);
 
   return GST_PAD_PROBE_PASS;
@@ -833,7 +882,7 @@ restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
  * a new fragment
  */
 static void
-start_next_fragment (GstSplitMuxSink * splitmux)
+start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 {
   /* 1 change to new file */
   splitmux->switching_fragment = TRUE;
@@ -843,7 +892,7 @@ start_next_fragment (GstSplitMuxSink * splitmux)
   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
 
-  set_next_filename (splitmux);
+  set_next_filename (splitmux, ctx);
 
   gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
   gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
@@ -851,27 +900,30 @@ start_next_fragment (GstSplitMuxSink * splitmux)
   gst_element_set_locked_state (splitmux->active_sink, FALSE);
 
   splitmux->switching_fragment = FALSE;
+  do_async_done (splitmux);
 
   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
 
-  /* Switch state and go back to processing */
-  if (!splitmux->reference_ctx->in_eos) {
-    splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
-    splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
-  } else {
-    splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
-    splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
-    splitmux->have_muxed_something = FALSE;
-  }
-  splitmux->have_muxed_something =
-      (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
+  if (!splitmux->opening_first_fragment) {
+    /* Switch state and go back to processing */
+    if (!splitmux->reference_ctx->in_eos) {
+      splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
+      splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
+    } else {
+      splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
+      splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
+      splitmux->have_muxed_something = FALSE;
+    }
+    splitmux->have_muxed_something =
+        (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
 
-  /* Store the overflow parameters as the basis for the next fragment */
-  splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
+    /* Store the overflow parameters as the basis for the next fragment */
+    splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
 
-  GST_DEBUG_OBJECT (splitmux,
-      "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
-      GST_STIME_ARGS (splitmux->max_out_running_time));
+    GST_DEBUG_OBJECT (splitmux,
+        "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
+        GST_STIME_ARGS (splitmux->max_out_running_time));
+  }
 
   send_fragment_opened_closed_msg (splitmux, TRUE);
 
@@ -1561,7 +1613,7 @@ fail:
 
 static GstElement *
 create_element (GstSplitMuxSink * splitmux,
-    const gchar * factory, const gchar * name)
+    const gchar * factory, const gchar * name, gboolean locked)
 {
   GstElement *ret = gst_element_factory_make (factory, name);
   if (ret == NULL) {
@@ -1569,6 +1621,13 @@ create_element (GstSplitMuxSink * splitmux,
     return NULL;
   }
 
+  if (locked) {
+    /* Ensure the sink starts in locked state and NULL - it will be changed
+     * by the filename setting code */
+    gst_element_set_locked_state (ret, TRUE);
+    gst_element_set_state (ret, GST_STATE_NULL);
+  }
+
   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
     g_warning ("Could not add %s element - splitmuxsink will not work", name);
     gst_object_unref (ret);
@@ -1584,7 +1643,8 @@ create_elements (GstSplitMuxSink * splitmux)
   /* Create internal elements */
   if (splitmux->mq == NULL) {
     if ((splitmux->mq =
-            create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
+            create_element (splitmux, "multiqueue", "multiqueue",
+                FALSE)) == NULL)
       goto fail;
 
     splitmux->mq_max_buffers = 5;
@@ -1603,7 +1663,7 @@ create_elements (GstSplitMuxSink * splitmux)
 
     if (provided_muxer == NULL) {
       if ((splitmux->muxer =
-              create_element (splitmux, "mp4mux", "muxer")) == NULL)
+              create_element (splitmux, "mp4mux", "muxer", FALSE)) == NULL)
         goto fail;
     } else {
       /* Ensure it's not in locked state (we might be reusing an old element) */
@@ -1680,12 +1740,14 @@ create_sink (GstSplitMuxSink * splitmux)
 
     if (provided_sink == NULL) {
       if ((splitmux->sink =
-              create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
+              create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
         goto fail;
       splitmux->active_sink = splitmux->sink;
     } else {
-      /* Ensure it's not in locked state (we might be reusing an old element) */
-      gst_element_set_locked_state (provided_sink, FALSE);
+      /* Ensure the sink starts in locked state and NULL - it will be changed
+       * by the filename setting code */
+      gst_element_set_locked_state (provided_sink, TRUE);
+      gst_element_set_state (provided_sink, GST_STATE_NULL);
       if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
         g_warning ("Could not add sink elements - splitmuxsink will not work");
         gst_object_unref (provided_sink);
@@ -1721,13 +1783,30 @@ fail:
 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
 #endif
 static void
-set_next_filename (GstSplitMuxSink * splitmux)
+set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 {
   gchar *fname = NULL;
+  GstSample *sample;
+  GstCaps *caps;
+
   gst_splitmux_sink_ensure_max_files (splitmux);
 
-  g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
-      splitmux->fragment_id, &fname);
+  if (ctx->cur_buffer == NULL)
+    g_warning ("Starting next file without buffer");
+
+  caps = gst_pad_get_current_caps (ctx->srcpad);
+  sample = gst_sample_new (ctx->cur_buffer, caps, &ctx->out_segment, NULL);
+  g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
+      splitmux->fragment_id, sample, &fname);
+  gst_sample_unref (sample);
+  if (caps)
+    gst_caps_unref (caps);
+
+  if (fname == NULL) {
+    /* Fallback to the old signal if the new one returned nothing */
+    g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
+        splitmux->fragment_id, &fname);
+  }
 
   if (!fname)
     fname = splitmux->location ?
@@ -1742,6 +1821,43 @@ set_next_filename (GstSplitMuxSink * splitmux)
   }
 }
 
+static void
+do_async_start (GstSplitMuxSink * splitmux)
+{
+  GstMessage *message;
+
+  if (!splitmux->need_async_start) {
+    GST_INFO_OBJECT (splitmux, "no async_start needed");
+    return;
+  }
+
+  splitmux->async_pending = TRUE;
+
+  GST_INFO_OBJECT (splitmux, "Sending async_start message");
+  message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
+  GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
+      (splitmux), message);
+}
+
+static void
+do_async_done (GstSplitMuxSink * splitmux)
+{
+  GstMessage *message;
+
+  if (splitmux->async_pending) {
+    GST_INFO_OBJECT (splitmux, "Sending async_done message");
+    message =
+        gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
+        GST_CLOCK_TIME_NONE);
+    GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
+        (splitmux), message);
+
+    splitmux->async_pending = FALSE;
+  }
+
+  splitmux->need_async_start = FALSE;
+}
+
 static GstStateChangeReturn
 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
 {
@@ -1758,7 +1874,6 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
       }
       GST_SPLITMUX_UNLOCK (splitmux);
       splitmux->fragment_id = 0;
-      set_next_filename (splitmux);
       break;
     }
     case GST_STATE_CHANGE_READY_TO_PAUSED:{
@@ -1792,12 +1907,29 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
     goto beach;
 
   switch (transition) {
+    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+      splitmux->need_async_start = TRUE;
+      break;
+    case GST_STATE_CHANGE_READY_TO_PAUSED:{
+      /* Change state async, because our child sink might not
+       * be ready to do that for us yet if it's state is still locked */
+
+      splitmux->need_async_start = TRUE;
+      /* we want to go async to PAUSED until we managed to configure and add the
+       * sink */
+      GST_SPLITMUX_LOCK (splitmux);
+      do_async_start (splitmux);
+      GST_SPLITMUX_UNLOCK (splitmux);
+      ret = GST_STATE_CHANGE_ASYNC;
+      break;
+    }
     case GST_STATE_CHANGE_READY_TO_NULL:
       GST_SPLITMUX_LOCK (splitmux);
       splitmux->fragment_id = 0;
       /* Reset internal elements only if no pad contexts are using them */
       if (splitmux->contexts == NULL)
         gst_splitmux_reset (splitmux);
+      do_async_done (splitmux);
       GST_SPLITMUX_UNLOCK (splitmux);
       break;
     default:
@@ -1805,12 +1937,14 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
   }
 
 beach:
-
   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
       ret == GST_STATE_CHANGE_FAILURE) {
     /* Cleanup elements on failed transition out of NULL */
     gst_splitmux_reset (splitmux);
   }
+  GST_SPLITMUX_LOCK (splitmux);
+  do_async_done (splitmux);
+  GST_SPLITMUX_UNLOCK (splitmux);
   return ret;
 }
 
index 534f54a..11eb5d5 100644 (file)
@@ -82,6 +82,9 @@ typedef struct _MqStreamCtx
   GstPad *srcpad;
 
   gboolean out_blocked;
+
+  GstBuffer *cur_buffer;
+  GstEvent *pending_gap;
 } MqStreamCtx;
 
 struct _GstSplitMuxSink {
@@ -131,6 +134,9 @@ struct _GstSplitMuxSink {
   gboolean switching_fragment;
 
   gboolean have_video;
+
+  gboolean need_async_start;
+  gboolean async_pending;
 };
 
 struct _GstSplitMuxSinkClass {
index 7f4896d..ce11da9 100644 (file)
@@ -192,6 +192,20 @@ GST_START_TEST (test_splitmuxsrc_format_location)
 
 GST_END_TEST;
 
+static gchar *
+check_format_location (GstElement * object,
+    guint fragment_id, GstSample * first_sample)
+{
+  GstBuffer *buf = gst_sample_get_buffer (first_sample);
+
+  /* Must have a buffer */
+  fail_if (buf == NULL);
+  GST_LOG ("New file - first buffer %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
+
+  return NULL;
+}
+
 GST_START_TEST (test_splitmuxsink)
 {
   GstMessage *msg;
@@ -213,6 +227,8 @@ GST_START_TEST (test_splitmuxsink)
   fail_if (pipeline == NULL);
   sink = gst_bin_get_by_name (GST_BIN (pipeline), "splitsink");
   fail_if (sink == NULL);
+  g_signal_connect (sink, "format-location-full",
+      (GCallback) check_format_location, NULL);
   dest_pattern = g_build_filename (tmpdir, "out%05d.ogg", NULL);
   g_object_set (G_OBJECT (sink), "location", dest_pattern, NULL);
   g_free (dest_pattern);