splitmuxsink: Reset ready_for_output on state change
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
index 7f5daee..4f8865b 100644 (file)
@@ -24,7 +24,7 @@
  * This element wraps a muxer and a sink, and starts a new file when the mux
  * contents are about to cross a threshold of maximum size of maximum time,
  * splitting at video keyframe boundaries. Exactly one input video stream
- * is required, with as many accompanying audio and subtitle streams as
+ * can be muxed, with as many accompanying audio and subtitle streams as
  * desired.
  *
  * By default, it uses mp4mux and filesink, but they can be changed via
  * The minimum file size is 1 GOP, however - so limits may be overrun if the
  * distance between any 2 keyframes is larger than the limits.
  *
- * The splitting process is driven by the video stream contents, and
- * the video stream must contain closed GOPs for the output file parts
- * to be played individually correctly.
+ * If a video stream is available, the splitting process is driven by the video
+ * stream contents, and the video stream must contain closed GOPs for the output
+ * file parts to be played individually correctly. In the absence of a video
+ * stream, the first available stream is used as reference for synchronization.
  *
  * <refsect2>
  * <title>Example pipelines</title>
@@ -53,6 +54,8 @@
 #endif
 
 #include <string.h>
+#include <glib/gstdio.h>
+#include <gst/video/video.h>
 #include "gstsplitmuxsink.h"
 
 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
@@ -60,8 +63,11 @@ GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
 
 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
-#define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
-#define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
+#define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
+#define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
+
+#define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
+#define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
 
 enum
 {
@@ -69,6 +75,9 @@ enum
   PROP_LOCATION,
   PROP_MAX_SIZE_TIME,
   PROP_MAX_SIZE_BYTES,
+  PROP_MAX_SIZE_TIMECODE,
+  PROP_SEND_KEYFRAME_REQUESTS,
+  PROP_MAX_FILES,
   PROP_MUXER_OVERHEAD,
   PROP_MUXER,
   PROP_SINK
@@ -76,10 +85,21 @@ enum
 
 #define DEFAULT_MAX_SIZE_TIME       0
 #define DEFAULT_MAX_SIZE_BYTES      0
+#define DEFAULT_MAX_FILES           0
 #define DEFAULT_MUXER_OVERHEAD      0.02
+#define DEFAULT_SEND_KEYFRAME_REQUESTS FALSE
 #define DEFAULT_MUXER "mp4mux"
 #define DEFAULT_SINK "filesink"
 
+enum
+{
+  SIGNAL_FORMAT_LOCATION,
+  SIGNAL_FORMAT_LOCATION_FULL,
+  SIGNAL_LAST
+};
+
+static guint signals[SIGNAL_LAST];
+
 static GstStaticPadTemplate video_sink_template =
 GST_STATIC_PAD_TEMPLATE ("video",
     GST_PAD_SINK,
@@ -108,7 +128,7 @@ _do_init (void)
 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
     _do_init ());
 
-static gboolean create_elements (GstSplitMuxSink * splitmux);
+static gboolean create_muxer (GstSplitMuxSink * splitmux);
 static gboolean create_sink (GstSplitMuxSink * splitmux);
 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
@@ -125,10 +145,16 @@ static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
     element, GstStateChange transition);
 
 static void bus_handler (GstBin * bin, GstMessage * msg);
-static void set_next_filename (GstSplitMuxSink * splitmux);
-static void start_next_fragment (GstSplitMuxSink * splitmux);
-static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
+static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
+static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
+static void grow_blocked_queues (GstSplitMuxSink * splitmux);
+
+static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
+static GstElement *create_element (GstSplitMuxSink * splitmux,
+    const gchar * factory, const gchar * name, gboolean locked);
+
+static void do_async_done (GstSplitMuxSink * splitmux);
 
 static MqStreamBuf *
 mq_stream_buf_new (void)
@@ -142,6 +168,18 @@ mq_stream_buf_free (MqStreamBuf * data)
   g_slice_free (MqStreamBuf, data);
 }
 
+static SplitMuxOutputCommand *
+out_cmd_buf_new (void)
+{
+  return g_slice_new0 (SplitMuxOutputCommand);
+}
+
+static void
+out_cmd_buf_free (SplitMuxOutputCommand * data)
+{
+  g_slice_free (SplitMuxOutputCommand, data);
+}
+
 static void
 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
 {
@@ -159,12 +197,12 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
       "Convenience bin that muxes incoming streams into multiple time/size limited files",
       "Jan Schmidt <jan@centricular.com>");
 
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&video_sink_template));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&audio_sink_template));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&subtitle_sink_template));
+  gst_element_class_add_static_pad_template (gstelement_class,
+      &video_sink_template);
+  gst_element_class_add_static_pad_template (gstelement_class,
+      &audio_sink_template);
+  gst_element_class_add_static_pad_template (gstelement_class,
+      &subtitle_sink_template);
 
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
@@ -193,6 +231,26 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIMECODE,
+      g_param_spec_string ("max-size-timecode", "Maximum timecode difference",
+          "Maximum difference in timecode between first and last frame. "
+          "Separator is assumed to be \":\" everywhere (e.g. 01:00:00:00). "
+          "Will only be effective if a timecode track is present.",
+          NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_SEND_KEYFRAME_REQUESTS,
+      g_param_spec_boolean ("send-keyframe-requests",
+          "Request keyframes at max-size-time",
+          "Request a keyframe every max-size-time ns to try splitting at that point. "
+          "Needs max-size-bytes to be 0 in order to be effective.",
+          DEFAULT_SEND_KEYFRAME_REQUESTS,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_MAX_FILES,
+      g_param_spec_uint ("max-files", "Max files",
+          "Maximum number of files to keep on disk. Once the maximum is reached,"
+          "old files start to be deleted to make room for new ones.", 0,
+          G_MAXUINT, DEFAULT_MAX_FILES,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
 
   g_object_class_install_property (gobject_class, PROP_MUXER,
       g_param_spec_object ("muxer", "Muxer",
@@ -202,17 +260,49 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
       g_param_spec_object ("sink", "Sink",
           "The sink element (or element chain) to use (NULL = default filesink)",
           GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
+   * GstSplitMuxSink::format-location:
+   * @splitmux: the #GstSplitMuxSink
+   * @fragment_id: the sequence number of the file to be created
+   *
+   * Returns: the location to be used for the next output file
+   */
+  signals[SIGNAL_FORMAT_LOCATION] =
+      g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT);
+
+  /**
+   * GstSplitMuxSink::format-location-full:
+   * @splitmux: the #GstSplitMuxSink
+   * @fragment_id: the sequence number of the file to be created
+   * @first_sample: A #GstSample containing the first buffer
+   *   from the reference stream in the new file
+   *
+   * Returns: the location to be used for the next output file
+   */
+  signals[SIGNAL_FORMAT_LOCATION_FULL] =
+      g_signal_new ("format-location-full", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 2, G_TYPE_UINT,
+      GST_TYPE_SAMPLE);
 }
 
 static void
 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
 {
   g_mutex_init (&splitmux->lock);
-  g_cond_init (&splitmux->data_cond);
+  g_cond_init (&splitmux->input_cond);
+  g_cond_init (&splitmux->output_cond);
+  g_queue_init (&splitmux->out_cmd_q);
 
   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
+  splitmux->max_files = DEFAULT_MAX_FILES;
+  splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
+  splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
+
+  splitmux->threshold_timecode_str = NULL;
 
   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
 }
@@ -220,15 +310,18 @@ gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
 static void
 gst_splitmux_reset (GstSplitMuxSink * splitmux)
 {
-  if (splitmux->mq)
-    gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
-  if (splitmux->muxer)
+  if (splitmux->muxer) {
+    gst_element_set_locked_state (splitmux->muxer, TRUE);
+    gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
     gst_bin_remove (GST_BIN (splitmux), splitmux->muxer);
-  if (splitmux->active_sink)
+  }
+  if (splitmux->active_sink) {
+    gst_element_set_locked_state (splitmux->active_sink, TRUE);
+    gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
+  }
 
-  splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
-      NULL;
+  splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
 }
 
 static void
@@ -236,23 +329,30 @@ gst_splitmux_sink_dispose (GObject * object)
 {
   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
 
-  G_OBJECT_CLASS (parent_class)->dispose (object);
-
   /* Calling parent dispose invalidates all child pointers */
-  splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
-      NULL;
+  splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
+
+  G_OBJECT_CLASS (parent_class)->dispose (object);
 }
 
 static void
 gst_splitmux_sink_finalize (GObject * object)
 {
   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
-  g_cond_clear (&splitmux->data_cond);
+  g_cond_clear (&splitmux->input_cond);
+  g_cond_clear (&splitmux->output_cond);
+  g_mutex_clear (&splitmux->lock);
+  g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
+  g_queue_clear (&splitmux->out_cmd_q);
+
   if (splitmux->provided_sink)
     gst_object_unref (splitmux->provided_sink);
   if (splitmux->provided_muxer)
     gst_object_unref (splitmux->provided_muxer);
 
+  if (splitmux->threshold_timecode_str)
+    g_free (splitmux->threshold_timecode_str);
+
   g_free (splitmux->location);
 
   /* Make sure to free any un-released contexts */
@@ -286,24 +386,41 @@ gst_splitmux_sink_set_property (GObject * object, guint prop_id,
       splitmux->threshold_time = g_value_get_uint64 (value);
       GST_OBJECT_UNLOCK (splitmux);
       break;
+    case PROP_MAX_SIZE_TIMECODE:
+      GST_OBJECT_LOCK (splitmux);
+      splitmux->threshold_timecode_str = g_value_dup_string (value);
+      GST_OBJECT_UNLOCK (splitmux);
+      break;
+    case PROP_SEND_KEYFRAME_REQUESTS:
+      GST_OBJECT_LOCK (splitmux);
+      splitmux->send_keyframe_requests = g_value_get_boolean (value);
+      GST_OBJECT_UNLOCK (splitmux);
+      break;
+    case PROP_MAX_FILES:
+      GST_OBJECT_LOCK (splitmux);
+      splitmux->max_files = g_value_get_uint (value);
+      GST_OBJECT_UNLOCK (splitmux);
+      break;
     case PROP_MUXER_OVERHEAD:
       GST_OBJECT_LOCK (splitmux);
       splitmux->mux_overhead = g_value_get_double (value);
       GST_OBJECT_UNLOCK (splitmux);
       break;
     case PROP_SINK:
-      GST_SPLITMUX_LOCK (splitmux);
+      GST_OBJECT_LOCK (splitmux);
       if (splitmux->provided_sink)
         gst_object_unref (splitmux->provided_sink);
-      splitmux->provided_sink = g_value_dup_object (value);
-      GST_SPLITMUX_UNLOCK (splitmux);
+      splitmux->provided_sink = g_value_get_object (value);
+      gst_object_ref_sink (splitmux->provided_sink);
+      GST_OBJECT_UNLOCK (splitmux);
       break;
     case PROP_MUXER:
-      GST_SPLITMUX_LOCK (splitmux);
+      GST_OBJECT_LOCK (splitmux);
       if (splitmux->provided_muxer)
         gst_object_unref (splitmux->provided_muxer);
-      splitmux->provided_muxer = g_value_dup_object (value);
-      GST_SPLITMUX_UNLOCK (splitmux);
+      splitmux->provided_muxer = g_value_get_object (value);
+      gst_object_ref_sink (splitmux->provided_muxer);
+      GST_OBJECT_UNLOCK (splitmux);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -333,20 +450,35 @@ gst_splitmux_sink_get_property (GObject * object, guint prop_id,
       g_value_set_uint64 (value, splitmux->threshold_time);
       GST_OBJECT_UNLOCK (splitmux);
       break;
+    case PROP_MAX_SIZE_TIMECODE:
+      GST_OBJECT_LOCK (splitmux);
+      g_value_set_string (value, splitmux->threshold_timecode_str);
+      GST_OBJECT_UNLOCK (splitmux);
+      break;
+    case PROP_SEND_KEYFRAME_REQUESTS:
+      GST_OBJECT_LOCK (splitmux);
+      g_value_set_boolean (value, splitmux->send_keyframe_requests);
+      GST_OBJECT_UNLOCK (splitmux);
+      break;
+    case PROP_MAX_FILES:
+      GST_OBJECT_LOCK (splitmux);
+      g_value_set_uint (value, splitmux->max_files);
+      GST_OBJECT_UNLOCK (splitmux);
+      break;
     case PROP_MUXER_OVERHEAD:
       GST_OBJECT_LOCK (splitmux);
       g_value_set_double (value, splitmux->mux_overhead);
       GST_OBJECT_UNLOCK (splitmux);
       break;
     case PROP_SINK:
-      GST_SPLITMUX_LOCK (splitmux);
+      GST_OBJECT_LOCK (splitmux);
       g_value_set_object (value, splitmux->provided_sink);
-      GST_SPLITMUX_UNLOCK (splitmux);
+      GST_OBJECT_UNLOCK (splitmux);
       break;
     case PROP_MUXER:
-      GST_SPLITMUX_LOCK (splitmux);
+      GST_OBJECT_LOCK (splitmux);
       g_value_set_object (value, splitmux->provided_muxer);
-      GST_SPLITMUX_UNLOCK (splitmux);
+      GST_OBJECT_UNLOCK (splitmux);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -354,49 +486,21 @@ gst_splitmux_sink_get_property (GObject * object, guint prop_id,
   }
 }
 
-static GstPad *
-mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
-{
-  gchar *tmp, *sinkname, *srcname;
-  GstPad *mq_src;
-
-  sinkname = gst_pad_get_name (sink_pad);
-  tmp = sinkname + 5;
-  srcname = g_strdup_printf ("src_%s", tmp);
-
-  mq_src = gst_element_get_static_pad (mq, srcname);
-
-  g_free (sinkname);
-  g_free (srcname);
-
-  return mq_src;
-}
-
-static gboolean
-get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
-    GstPad ** src_pad)
+/* Convenience function */
+static inline GstClockTimeDiff
+my_segment_to_running_time (GstSegment * segment, GstClockTime val)
 {
-  GstPad *mq_sink;
-  GstPad *mq_src;
-
-  /* Request a pad from multiqueue, then connect this one, then
-   * discover the corresponding output pad and return both */
-  mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
-  if (mq_sink == NULL)
-    return FALSE;
-
-  mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
-  if (mq_src == NULL)
-    goto fail;
-
-  *sink_pad = mq_sink;
-  *src_pad = mq_src;
-
-  return TRUE;
-
-fail:
-  gst_element_release_request_pad (splitmux->mq, mq_sink);
-  return FALSE;
+  GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
+
+  if (GST_CLOCK_TIME_IS_VALID (val)) {
+    gboolean sign =
+        gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
+    if (sign > 0)
+      res = val;
+    else if (sign < 0)
+      res = -val;
+  }
+  return res;
 }
 
 static MqStreamCtx *
@@ -409,7 +513,7 @@ mq_stream_ctx_new (GstSplitMuxSink * splitmux)
   ctx->splitmux = splitmux;
   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
-  ctx->in_running_time = ctx->out_running_time = 0;
+  ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
   g_queue_init (&ctx->queued_bufs);
   return ctx;
 }
@@ -417,6 +521,16 @@ mq_stream_ctx_new (GstSplitMuxSink * splitmux)
 static void
 mq_stream_ctx_free (MqStreamCtx * ctx)
 {
+  if (ctx->q) {
+    g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
+    gst_element_set_locked_state (ctx->q, TRUE);
+    gst_element_set_state (ctx->q, GST_STATE_NULL);
+    gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
+    gst_object_unref (ctx->q);
+  }
+  gst_buffer_replace (&ctx->prev_in_keyframe, NULL);
+  gst_object_unref (ctx->sinkpad);
+  gst_object_unref (ctx->srcpad);
   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
   g_queue_clear (&ctx->queued_bufs);
   g_free (ctx);
@@ -449,6 +563,26 @@ _pad_block_destroy_src_notify (MqStreamCtx * ctx)
   mq_stream_ctx_unref (ctx);
 }
 
+static void
+send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
+{
+  gchar *location = NULL;
+  GstMessage *msg;
+  const gchar *msg_name = opened ?
+      "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
+
+  g_object_get (splitmux->sink, "location", &location, NULL);
+
+  msg = gst_message_new_element (GST_OBJECT (splitmux),
+      gst_structure_new (msg_name,
+          "location", G_TYPE_STRING, location,
+          "running-time", GST_TYPE_CLOCK_TIME,
+          splitmux->reference_ctx->out_running_time, NULL));
+  gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
+
+  g_free (location);
+}
+
 /* Called with lock held, drops the lock to send EOS to the
  * pad
  */
@@ -479,43 +613,182 @@ static void
 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 {
   do {
+    /* When first starting up, the reference stream has to output
+     * the first buffer to prepare the muxer and sink */
+    gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
+
+    if (ctx->flushing
+        || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
+      return;
 
     GST_LOG_OBJECT (ctx->srcpad,
-        "Checking running time %" GST_TIME_FORMAT " against max %"
-        GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
-        GST_TIME_ARGS (splitmux->max_out_running_time));
+        "Checking running time %" GST_STIME_FORMAT " against max %"
+        GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
+        GST_STIME_ARGS (splitmux->max_out_running_time));
 
-    if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
-        ctx->out_running_time < splitmux->max_out_running_time)
-      return;
+    if (can_output) {
+      if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
+          ctx->out_running_time < splitmux->max_out_running_time) {
+        return;
+      }
 
-    if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
-      return;
+      switch (splitmux->output_state) {
+        case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
+          /* We only get here if we've finished outputting a GOP and need to know
+           * what to do next */
+          splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
+          GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
+          continue;
+
+        case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
+          /* We've reached the max out running_time to get here, so end this file now */
+          if (ctx->out_eos == FALSE) {
+            send_eos (splitmux, ctx);
+            continue;
+          }
+          break;
+        case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
+          if (ctx->is_reference) {
+            /* Special handling on the reference ctx to start new fragments
+             * and collect commands from the command queue */
+            /* drops the splitmux lock briefly: */
+            start_next_fragment (splitmux, ctx);
+            continue;
+          }
+          break;
 
-    if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
-      if (ctx->out_eos == FALSE) {
-        send_eos (splitmux, ctx);
-        continue;
+        case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
+          do {
+            SplitMuxOutputCommand *cmd =
+                g_queue_pop_tail (&splitmux->out_cmd_q);
+            if (cmd != NULL) {
+              /* If we pop the last command, we need to make our queues bigger */
+              if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
+                grow_blocked_queues (splitmux);
+
+              if (cmd->start_new_fragment) {
+                GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
+                splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
+              } else {
+                GST_DEBUG_OBJECT (splitmux,
+                    "Got new output cmd for time %" GST_STIME_FORMAT,
+                    GST_STIME_ARGS (cmd->max_output_ts));
+
+                /* Extend the output range immediately */
+                splitmux->max_out_running_time = cmd->max_output_ts;
+                splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
+              }
+              GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
+
+              out_cmd_buf_free (cmd);
+              break;
+            } else {
+              GST_SPLITMUX_WAIT_OUTPUT (splitmux);
+            }
+          } while (splitmux->output_state ==
+              SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
+          /* loop and re-check the state */
+          continue;
+        }
+        case SPLITMUX_OUTPUT_STATE_STOPPED:
+          return;
       }
-    } else if (splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
-      start_next_fragment (splitmux);
-      continue;
     }
 
     GST_INFO_OBJECT (ctx->srcpad,
         "Sleeping for running time %"
-        GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
-        GST_TIME_ARGS (ctx->out_running_time),
-        GST_TIME_ARGS (splitmux->max_out_running_time));
-    ctx->out_blocked = TRUE;
-    /* Expand the mq if needed before sleeping */
-    check_queue_length (splitmux, ctx);
-    GST_SPLITMUX_WAIT (splitmux);
-    ctx->out_blocked = FALSE;
+        GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
+        GST_STIME_ARGS (ctx->out_running_time),
+        GST_STIME_ARGS (splitmux->max_out_running_time));
+    GST_SPLITMUX_WAIT_OUTPUT (splitmux);
     GST_INFO_OBJECT (ctx->srcpad,
-        "Woken for new max running time %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (splitmux->max_out_running_time));
-  } while (1);
+        "Woken for new max running time %" GST_STIME_FORMAT,
+        GST_STIME_ARGS (splitmux->max_out_running_time));
+  }
+  while (1);
+}
+
+static GstClockTime
+calculate_next_max_timecode (GstSplitMuxSink * splitmux,
+    const GstVideoTimeCode * cur_tc)
+{
+  GstVideoTimeCode *target_tc;
+  GstVideoTimeCodeInterval *tc_inter;
+  GstClockTime cur_tc_time, target_tc_time, next_max_tc_time;
+
+  if (cur_tc == NULL || splitmux->threshold_timecode_str == NULL)
+    return GST_CLOCK_TIME_NONE;
+
+  tc_inter =
+      gst_video_time_code_interval_new_from_string
+      (splitmux->threshold_timecode_str);
+  target_tc = gst_video_time_code_add_interval (cur_tc, tc_inter);
+  gst_video_time_code_interval_free (tc_inter);
+
+  /* Convert to ns */
+  target_tc_time = gst_video_time_code_nsec_since_daily_jam (target_tc);
+  cur_tc_time = gst_video_time_code_nsec_since_daily_jam (cur_tc);
+
+  /* Add fragment_start_time, accounting for wraparound */
+  if (target_tc_time >= cur_tc_time) {
+    next_max_tc_time =
+        target_tc_time - cur_tc_time + splitmux->fragment_start_time;
+  } else {
+    GstClockTime day_in_ns = 24 * 60 * 60 * GST_SECOND;
+
+    next_max_tc_time =
+        day_in_ns - cur_tc_time + target_tc_time +
+        splitmux->fragment_start_time;
+  }
+  GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
+      " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
+      GST_TIME_ARGS (cur_tc_time));
+  gst_video_time_code_free (target_tc);
+
+  return next_max_tc_time;
+}
+
+static gboolean
+request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer)
+{
+  GstEvent *ev;
+  GstClockTime target_time;
+  gboolean timecode_based = FALSE;
+
+  splitmux->next_max_tc_time = GST_CLOCK_TIME_NONE;
+  if (splitmux->threshold_timecode_str) {
+    GstVideoTimeCodeMeta *tc_meta;
+
+    if (buffer != NULL) {
+      tc_meta = gst_buffer_get_video_time_code_meta (buffer);
+      if (tc_meta) {
+        splitmux->next_max_tc_time =
+            calculate_next_max_timecode (splitmux, &tc_meta->tc);
+        timecode_based = (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE);
+      }
+    } else {
+      /* This can happen in the presence of GAP events that trigger
+       * a new fragment start */
+      GST_WARNING_OBJECT (splitmux,
+          "No buffer available to calculate next timecode");
+    }
+  }
+
+  if (splitmux->send_keyframe_requests == FALSE
+      || (splitmux->threshold_time == 0 && !timecode_based)
+      || splitmux->threshold_bytes != 0)
+    return TRUE;
+
+  if (timecode_based) {
+    /* We might have rounding errors: aim slightly earlier */
+    target_time = splitmux->next_max_tc_time - 5 * GST_USECOND;
+  } else {
+    target_time = splitmux->fragment_start_time + splitmux->threshold_time;
+  }
+  ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
+  GST_INFO_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
+      GST_TIME_ARGS (target_time));
+  return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
 }
 
 static GstPadProbeReturn
@@ -524,7 +797,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   GstSplitMuxSink *splitmux = ctx->splitmux;
   MqStreamBuf *buf_info = NULL;
 
-  GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
+  GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
 
   /* FIXME: Handle buffer lists, until then make it clear they won't work */
   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
@@ -533,6 +806,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   }
   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
     GstEvent *event = gst_pad_probe_info_get_event (info);
+    gboolean locked = FALSE;
 
     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
 
@@ -542,51 +816,97 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         break;
       case GST_EVENT_FLUSH_STOP:
         GST_SPLITMUX_LOCK (splitmux);
+        locked = TRUE;
         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
         g_queue_clear (&ctx->queued_bufs);
         ctx->flushing = FALSE;
-        GST_SPLITMUX_UNLOCK (splitmux);
         break;
       case GST_EVENT_FLUSH_START:
         GST_SPLITMUX_LOCK (splitmux);
+        locked = TRUE;
         GST_LOG_OBJECT (pad, "Flush start");
         ctx->flushing = TRUE;
-        GST_SPLITMUX_BROADCAST (splitmux);
-        GST_SPLITMUX_UNLOCK (splitmux);
+        GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
         break;
       case GST_EVENT_EOS:
         GST_SPLITMUX_LOCK (splitmux);
-        if (splitmux->state == SPLITMUX_STATE_STOPPED)
+        locked = TRUE;
+        if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
           goto beach;
         ctx->out_eos = TRUE;
-        GST_SPLITMUX_UNLOCK (splitmux);
         break;
       case GST_EVENT_GAP:{
         GstClockTime gap_ts;
+        GstClockTimeDiff rtime;
 
         gst_event_parse_gap (event, &gap_ts, NULL);
         if (gap_ts == GST_CLOCK_TIME_NONE)
           break;
 
         GST_SPLITMUX_LOCK (splitmux);
+        locked = TRUE;
+
+        if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
+          goto beach;
+
+        /* When we get a gap event on the
+         * reference stream and we're trying to open a
+         * new file, we need to store it until we get
+         * the buffer afterwards
+         */
+        if (ctx->is_reference &&
+            (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
+          GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
+          gst_event_replace (&ctx->pending_gap, event);
+          GST_SPLITMUX_UNLOCK (splitmux);
+          return GST_PAD_PROBE_HANDLED;
+        }
 
-        gap_ts = gst_segment_to_running_time (&ctx->out_segment,
-            GST_FORMAT_TIME, gap_ts);
+        rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
 
-        GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
-            GST_TIME_ARGS (gap_ts));
+        GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
+            GST_STIME_ARGS (rtime));
 
-        if (splitmux->state == SPLITMUX_STATE_STOPPED)
+        if (rtime != GST_CLOCK_STIME_NONE) {
+          ctx->out_running_time = rtime;
+          complete_or_wait_on_out (splitmux, ctx);
+        }
+        break;
+      }
+      case GST_EVENT_CUSTOM_DOWNSTREAM:{
+        const GstStructure *s;
+        GstClockTimeDiff ts = 0;
+
+        s = gst_event_get_structure (event);
+        if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
+          break;
+
+        gst_structure_get_int64 (s, "timestamp", &ts);
+
+        GST_SPLITMUX_LOCK (splitmux);
+        locked = TRUE;
+
+        if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
           goto beach;
-        ctx->out_running_time = gap_ts;
-        complete_or_wait_on_out (splitmux, ctx);
+        ctx->out_running_time = ts;
+        if (!ctx->is_reference)
+          complete_or_wait_on_out (splitmux, ctx);
         GST_SPLITMUX_UNLOCK (splitmux);
-        break;
+        return GST_PAD_PROBE_DROP;
       }
       default:
         break;
     }
+
+    /* We need to make sure events aren't passed
+     * until the muxer / sink are ready for it */
+    if (!locked)
+      GST_SPLITMUX_LOCK (splitmux);
+    if (!ctx->is_reference)
+      complete_or_wait_on_out (splitmux, ctx);
+    GST_SPLITMUX_UNLOCK (splitmux);
+
     return GST_PAD_PROBE_PASS;
   }
 
@@ -599,35 +919,47 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
     goto beach;
 
   /* If we have popped a keyframe, decrement the queued_gop count */
-  if (buf_info->keyframe && splitmux->queued_gops > 0)
-    splitmux->queued_gops--;
+  if (buf_info->keyframe && splitmux->queued_keyframes > 0)
+    splitmux->queued_keyframes--;
 
   ctx->out_running_time = buf_info->run_ts;
+  ctx->cur_out_buffer = gst_pad_probe_info_get_buffer (info);
 
   GST_LOG_OBJECT (splitmux,
-      "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
-      " size %" G_GSIZE_FORMAT,
-      pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
+      "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
+      " size %" G_GUINT64_FORMAT,
+      pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
 
   complete_or_wait_on_out (splitmux, ctx);
 
-  if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
-      splitmux->muxed_out_time < buf_info->run_ts)
-    splitmux->muxed_out_time = buf_info->run_ts;
-
   splitmux->muxed_out_bytes += buf_info->buf_size;
 
 #ifndef GST_DISABLE_GST_DEBUG
   {
     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
-        " run ts %" GST_TIME_FORMAT, buf,
-        GST_TIME_ARGS (ctx->out_running_time));
+        " run ts %" GST_STIME_FORMAT, buf,
+        GST_STIME_ARGS (ctx->out_running_time));
   }
 #endif
 
+  ctx->cur_out_buffer = NULL;
   GST_SPLITMUX_UNLOCK (splitmux);
 
+  /* pending_gap is protected by the STREAM lock */
+  if (ctx->pending_gap) {
+    /* If we previously stored a gap event, send it now */
+    GstPad *peer = gst_pad_get_peer (ctx->srcpad);
+
+    GST_DEBUG_OBJECT (splitmux,
+        "Pad %" GST_PTR_FORMAT " sending pending GAP event", ctx->srcpad);
+
+    gst_pad_send_event (peer, ctx->pending_gap);
+    ctx->pending_gap = NULL;
+
+    gst_object_unref (peer);
+  }
+
   mq_stream_buf_free (buf_info);
 
   return GST_PAD_PROBE_PASS;
@@ -651,8 +983,8 @@ restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
   gst_pad_sticky_events_foreach (ctx->srcpad,
       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
 
-  /* Clear EOS flag */
-  ctx->out_eos = FALSE;
+  /* Clear EOS flag if not actually EOS */
+  ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
 
   gst_object_unref (peer);
 }
@@ -662,35 +994,52 @@ restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
  * a new fragment
  */
 static void
-start_next_fragment (GstSplitMuxSink * splitmux)
+start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 {
+  GstElement *muxer, *sink;
+
   /* 1 change to new file */
-  gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
-  gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
+  splitmux->switching_fragment = TRUE;
 
-  set_next_filename (splitmux);
+  /* We need to drop the splitmux lock to acquire the state lock
+   * here and ensure there's no racy state change going on elsewhere */
+  muxer = gst_object_ref (splitmux->muxer);
+  sink = gst_object_ref (splitmux->active_sink);
 
-  gst_element_sync_state_with_parent (splitmux->active_sink);
-  gst_element_sync_state_with_parent (splitmux->muxer);
+  GST_SPLITMUX_UNLOCK (splitmux);
+  GST_STATE_LOCK (splitmux);
 
-  g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
+  gst_element_set_locked_state (muxer, TRUE);
+  gst_element_set_locked_state (sink, TRUE);
+  gst_element_set_state (muxer, GST_STATE_NULL);
+  gst_element_set_state (sink, GST_STATE_NULL);
 
-  /* Switch state and go back to processing */
-  splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
-  if (!splitmux->video_ctx->in_eos)
-    splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
-  else
-    splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
+  GST_SPLITMUX_LOCK (splitmux);
+  set_next_filename (splitmux, ctx);
+  GST_SPLITMUX_UNLOCK (splitmux);
+
+  gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
+  gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
+  gst_element_set_locked_state (muxer, FALSE);
+  gst_element_set_locked_state (sink, FALSE);
 
-  /* Store the overflow parameters as the basis for the next fragment */
-  splitmux->mux_start_time = splitmux->muxed_out_time;
-  splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
+  gst_object_unref (sink);
+  gst_object_unref (muxer);
 
-  GST_DEBUG_OBJECT (splitmux,
-      "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (splitmux->max_out_running_time));
+  GST_SPLITMUX_LOCK (splitmux);
+  GST_STATE_UNLOCK (splitmux);
+  splitmux->switching_fragment = FALSE;
+  do_async_done (splitmux);
+
+  splitmux->ready_for_output = TRUE;
 
-  GST_SPLITMUX_BROADCAST (splitmux);
+  g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
+
+  send_fragment_opened_closed_msg (splitmux, TRUE);
+
+  /* FIXME: Is this always the correct next state? */
+  splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
+  GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
 }
 
 static void
@@ -702,18 +1051,39 @@ bus_handler (GstBin * bin, GstMessage * message)
     case GST_MESSAGE_EOS:
       /* If the state is draining out the current file, drop this EOS */
       GST_SPLITMUX_LOCK (splitmux);
-      if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
-          splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
+
+      send_fragment_opened_closed_msg (splitmux, FALSE);
+
+      if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
-        splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
-        GST_SPLITMUX_BROADCAST (splitmux);
+        splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
+        GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
 
         gst_message_unref (message);
         GST_SPLITMUX_UNLOCK (splitmux);
         return;
+      } else {
+        GST_DEBUG_OBJECT (splitmux,
+            "Passing EOS message. Output state %d max_out_running_time %"
+            GST_STIME_FORMAT, splitmux->output_state,
+            GST_STIME_ARGS (splitmux->max_out_running_time));
       }
       GST_SPLITMUX_UNLOCK (splitmux);
       break;
+    case GST_MESSAGE_ASYNC_START:
+    case GST_MESSAGE_ASYNC_DONE:
+      /* Ignore state changes from our children while switching */
+      if (splitmux->switching_fragment) {
+        if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
+            || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
+          GST_LOG_OBJECT (splitmux,
+              "Ignoring state change from child %" GST_PTR_FORMAT
+              " while switching", GST_MESSAGE_SRC (message));
+          gst_message_unref (message);
+          return;
+        }
+      }
+      break;
     default:
       break;
   }
@@ -721,6 +1091,12 @@ bus_handler (GstBin * bin, GstMessage * message)
   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
 }
 
+static void
+ctx_set_unblock (MqStreamCtx * ctx)
+{
+  ctx->need_unblock = TRUE;
+}
+
 /* Called with splitmux lock held */
 /* Called when entering ProcessingCompleteGop state
  * Assess if mq contents overflowed the current file
@@ -731,93 +1107,168 @@ bus_handler (GstBin * bin, GstMessage * message)
 static void
 handle_gathered_gop (GstSplitMuxSink * splitmux)
 {
-  GList *cur;
-  gsize queued_bytes = 0;
-  GstClockTime queued_time = 0;
+  guint64 queued_bytes;
+  GstClockTimeDiff queued_time = 0;
+  GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
+  SplitMuxOutputCommand *cmd;
 
   /* Assess if the multiqueue contents overflowed the current file */
-  for (cur = g_list_first (splitmux->contexts);
-      cur != NULL; cur = g_list_next (cur)) {
-    MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
-    if (tmpctx->in_running_time > queued_time)
-      queued_time = tmpctx->in_running_time;
-    queued_bytes += tmpctx->in_bytes;
-  }
+  /* When considering if a newly gathered GOP overflows
+   * the time limit for the file, only consider the running time of the
+   * reference stream. Other streams might have run ahead a little bit,
+   * but extra pieces won't be released to the muxer beyond the reference
+   * stream cut-off anyway - so it forms the limit. */
+  queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
+  queued_time = splitmux->reference_ctx->in_running_time;
+
+  GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
 
-  g_assert (queued_bytes >= splitmux->mux_start_bytes);
-  g_assert (queued_time >= splitmux->mux_start_time);
+  g_assert (queued_time >= splitmux->fragment_start_time);
 
-  queued_bytes -= splitmux->mux_start_bytes;
-  queued_time -= splitmux->mux_start_time;
+  queued_time -= splitmux->fragment_start_time;
 
   /* Expand queued bytes estimate by muxer overhead */
   queued_bytes += (queued_bytes * splitmux->mux_overhead);
 
-  GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
-      " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
+  GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
+      " bytes %" G_GUINT64_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
+  if (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE) {
+    GST_LOG_OBJECT (splitmux,
+        "timecode mq TS %" GST_TIME_FORMAT " vs target %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (splitmux->reference_ctx->in_running_time),
+        GST_TIME_ARGS (splitmux->next_max_tc_time + 5 * GST_USECOND));
+  }
 
   /* Check for overrun - have we output at least one byte and overrun
    * either threshold? */
-  if ((splitmux->mux_start_bytes < splitmux->muxed_out_bytes) &&
-      ((splitmux->threshold_bytes > 0 &&
-              queued_bytes >= splitmux->threshold_bytes) ||
-          (splitmux->threshold_time > 0 &&
-              queued_time >= splitmux->threshold_time))) {
-
-    splitmux->state = SPLITMUX_STATE_ENDING_FILE;
-
+  /* Timecode-based threshold accounts for possible rounding errors:
+   * 5us should be bigger than all possible rounding errors but nowhere near
+   * big enough to skip to another frame */
+  if ((splitmux->fragment_total_bytes > 0 &&
+          ((splitmux->threshold_bytes > 0 &&
+                  queued_bytes > splitmux->threshold_bytes) ||
+              (splitmux->threshold_time > 0 &&
+                  queued_time > splitmux->threshold_time) ||
+              (splitmux->next_max_tc_time != GST_CLOCK_TIME_NONE &&
+                  splitmux->reference_ctx->in_running_time >
+                  splitmux->next_max_tc_time + 5 * GST_USECOND)))) {
+
+    /* Tell the output side to start a new fragment */
     GST_INFO_OBJECT (splitmux,
-        "mq overflowed since last, draining out. max out TS is %"
-        GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
-    GST_SPLITMUX_BROADCAST (splitmux);
+        "This GOP (dur %" GST_STIME_FORMAT
+        ") would overflow the fragment, Sending start_new_fragment cmd",
+        GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
+            splitmux->gop_start_time));
+    cmd = out_cmd_buf_new ();
+    cmd->start_new_fragment = TRUE;
+    g_queue_push_head (&splitmux->out_cmd_q, cmd);
+    GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
+
+    new_out_ts = splitmux->reference_ctx->in_running_time;
+    splitmux->fragment_start_time = splitmux->gop_start_time;
+    splitmux->fragment_total_bytes = 0;
+
+    if (request_next_keyframe (splitmux,
+            splitmux->reference_ctx->prev_in_keyframe) == FALSE) {
+      GST_WARNING_OBJECT (splitmux,
+          "Could not request a keyframe. Files may not split at the exact location they should");
+    }
+    gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
+  }
 
+  /* And set up to collect the next GOP */
+  if (!splitmux->reference_ctx->in_eos) {
+    splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
+    splitmux->gop_start_time = new_out_ts;
   } else {
-    /* No overflow */
+    /* This is probably already the current state, but just in case: */
+    splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
+    new_out_ts = GST_CLOCK_STIME_NONE;  /* EOS runs until forever */
+  }
+
+  /* And wake all input contexts to send a wake-up event */
+  g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
+  GST_SPLITMUX_BROADCAST_INPUT (splitmux);
+
+  /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
+  splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
+
+  if (splitmux->gop_total_bytes > 0) {
     GST_LOG_OBJECT (splitmux,
-        "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
-        " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
-        splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
-        queued_bytes, GST_TIME_ARGS (queued_time));
-
-    /* Wake everyone up to push this one GOP, then sleep */
-    splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
-    if (!splitmux->video_ctx->in_eos)
-      splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
-    else
-      splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
-
-    GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
-        GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
-    GST_SPLITMUX_BROADCAST (splitmux);
+        "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
+        " time %" GST_STIME_FORMAT,
+        splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
+
+    /* Send this GOP to the output command queue */
+    cmd = out_cmd_buf_new ();
+    cmd->start_new_fragment = FALSE;
+    cmd->max_output_ts = new_out_ts;
+    GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
+        GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
+    g_queue_push_head (&splitmux->out_cmd_q, cmd);
+
+    GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
   }
 
+  splitmux->gop_total_bytes = 0;
 }
 
 /* Called with splitmux lock held */
 /* Called from each input pad when it is has all the pieces
- * for a GOP or EOS, starting with the video pad which has set the
+ * for a GOP or EOS, starting with the reference pad which has set the
  * splitmux->max_in_running_time
  */
 static void
 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 {
   GList *cur;
-  gboolean ready = TRUE;
+  GstEvent *event;
+
+  /* On ENDING_FILE, the reference stream sends a command to start a new
+   * fragment, then releases the GOP for output in the new fragment.
+   *  If somes streams received no buffer during the last GOP that overran,
+   * because its next buffer has a timestamp bigger than
+   * ctx->max_in_running_time, its queue is empty. In that case the only
+   * way to wakeup the output thread is by injecting an event in the
+   * queue. This usually happen with subtitle streams.
+   * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
+  if (ctx->need_unblock) {
+    GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
+    event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
+        GST_EVENT_TYPE_SERIALIZED,
+        gst_structure_new ("splitmuxsink-unblock", "timestamp",
+            G_TYPE_INT64, splitmux->max_in_running_time, NULL));
+
+    GST_SPLITMUX_UNLOCK (splitmux);
+    gst_pad_send_event (ctx->sinkpad, event);
+    GST_SPLITMUX_LOCK (splitmux);
+
+    ctx->need_unblock = FALSE;
+    GST_SPLITMUX_BROADCAST_INPUT (splitmux);
+    /* state may have changed while we were unlocked. Loop again if so */
+    if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
+      return;
+  }
+
+  if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
+    gboolean ready = TRUE;
 
-  if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
     /* Iterate each pad, and check that the input running time is at least
-     * up to the video runnning time, and if so handle the collected GOP */
-    GST_LOG_OBJECT (splitmux, "Checking GOP collected, ctx %p", ctx);
-    for (cur = g_list_first (splitmux->contexts);
-        cur != NULL; cur = g_list_next (cur)) {
+     * up to the reference running time, and if so handle the collected GOP */
+    GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
+        GST_STIME_FORMAT " ctx %p",
+        GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
+    for (cur = g_list_first (splitmux->contexts); cur != NULL;
+        cur = g_list_next (cur)) {
       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
 
       GST_LOG_OBJECT (splitmux,
-          "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
+          "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
           " EOS %d", tmpctx, tmpctx->srcpad,
-          GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
+          GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
 
-      if (tmpctx->in_running_time < splitmux->max_in_running_time &&
+      if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
+          tmpctx->in_running_time < splitmux->max_in_running_time &&
           !tmpctx->in_eos) {
         GST_LOG_OBJECT (splitmux,
             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
@@ -834,74 +1285,24 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
     }
   }
 
+  /* If upstream reached EOS we are not expecting more data, no need to wait
+   * here. */
+  if (ctx->in_eos)
+    return;
+
   /* Some pad is not yet ready, or GOP is being pushed
    * either way, sleep and wait to get woken */
-  while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
-          splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
-      !ctx->flushing) {
-
-    GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
-        splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
-        "GOP complete" : "EOF draining", ctx);
-    GST_SPLITMUX_WAIT (splitmux);
+  while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
+      !ctx->flushing &&
+      (ctx->in_running_time >= splitmux->max_in_running_time) &&
+      (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
 
+    GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
+    GST_SPLITMUX_WAIT_INPUT (splitmux);
     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
   }
 }
 
-static void
-check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
-{
-  GList *cur;
-  guint cur_len = g_queue_get_length (&ctx->queued_bufs);
-
-  GST_DEBUG_OBJECT (ctx->sinkpad,
-      "Checking queue length len %u cur_max %u queued gops %u",
-      cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
-
-  if (cur_len >= splitmux->mq_max_buffers) {
-    gboolean allow_grow = FALSE;
-
-    /* If collecting a GOP and this pad might block,
-     * and there isn't already a pending GOP in the queue
-     * then grow
-     */
-    if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
-        ctx->in_running_time < splitmux->max_in_running_time &&
-        splitmux->queued_gops <= 1) {
-      allow_grow = TRUE;
-    } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
-        ctx->is_video) {
-      allow_grow = TRUE;
-    }
-
-    if (!allow_grow) {
-      for (cur = g_list_first (splitmux->contexts);
-          cur != NULL; cur = g_list_next (cur)) {
-        MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
-        GST_DEBUG_OBJECT (tmpctx->sinkpad,
-            " len %u out_blocked %d",
-            g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
-        /* If another stream is starving, grow */
-        if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
-          allow_grow = TRUE;
-        }
-      }
-    }
-
-    if (allow_grow) {
-      splitmux->mq_max_buffers = cur_len + 1;
-
-      GST_INFO_OBJECT (splitmux,
-          "Multiqueue overrun - enlarging to %u buffers ctx %p",
-          splitmux->mq_max_buffers, ctx);
-
-      g_object_set (splitmux->mq, "max-size-buffers",
-          splitmux->mq_max_buffers, NULL);
-    }
-  }
-}
-
 static GstPadProbeReturn
 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
 {
@@ -912,7 +1313,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   gboolean loop_again;
   gboolean keyframe = FALSE;
 
-  GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
+  GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
 
   /* FIXME: Handle buffer lists, until then make it clear they won't work */
   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
@@ -921,6 +1322,9 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   }
   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
     GstEvent *event = gst_pad_probe_info_get_event (info);
+
+    GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
+
     switch (GST_EVENT_TYPE (event)) {
       case GST_EVENT_SEGMENT:
         gst_event_copy_segment (event, &ctx->in_segment);
@@ -929,26 +1333,25 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         GST_SPLITMUX_LOCK (splitmux);
         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
         ctx->in_eos = FALSE;
-        ctx->in_bytes = 0;
-        ctx->in_running_time = 0;
+        ctx->in_running_time = GST_CLOCK_STIME_NONE;
         GST_SPLITMUX_UNLOCK (splitmux);
         break;
       case GST_EVENT_EOS:
         GST_SPLITMUX_LOCK (splitmux);
         ctx->in_eos = TRUE;
 
-        if (splitmux->state == SPLITMUX_STATE_STOPPED)
+        if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
           goto beach;
 
-        if (ctx->is_video) {
-          GST_INFO_OBJECT (splitmux, "Got Video EOS. Finishing up");
-          /* Act as if this is a new keyframe with infinite timestamp */
-          splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
-          splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
+        if (ctx->is_reference) {
+          GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
+          /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
+          splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
           /* Wake up other input pads to collect this GOP */
-          GST_SPLITMUX_BROADCAST (splitmux);
+          GST_SPLITMUX_BROADCAST_INPUT (splitmux);
           check_completed_gop (splitmux, ctx);
-        } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
+        } else if (splitmux->input_state ==
+            SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
           /* If we are waiting for a GOP to be completed (ie, for aux
            * pads to catch up), then this pad is complete, so check
            * if the whole GOP is.
@@ -957,6 +1360,37 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         }
         GST_SPLITMUX_UNLOCK (splitmux);
         break;
+      case GST_EVENT_GAP:{
+        GstClockTime gap_ts;
+        GstClockTimeDiff rtime;
+
+        gst_event_parse_gap (event, &gap_ts, NULL);
+        if (gap_ts == GST_CLOCK_TIME_NONE)
+          break;
+
+        GST_SPLITMUX_LOCK (splitmux);
+
+        if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
+          goto beach;
+        rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
+
+        GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
+            GST_STIME_ARGS (rtime));
+
+        if (ctx->is_reference
+            && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
+          splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
+          GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
+              GST_STIME_ARGS (splitmux->fragment_start_time));
+          /* Also take this as the first start time when starting up,
+           * so that we start counting overflow from the first frame */
+          if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
+            splitmux->max_in_running_time = splitmux->fragment_start_time;
+        }
+
+        GST_SPLITMUX_UNLOCK (splitmux);
+        break;
+      }
       default:
         break;
     }
@@ -964,8 +1398,6 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   }
 
   buf = gst_pad_probe_info_get_buffer (info);
-  ctx->in_running_time = gst_segment_to_running_time (&ctx->in_segment,
-      GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buf));
   buf_info = mq_stream_buf_new ();
 
   if (GST_BUFFER_PTS_IS_VALID (buf))
@@ -973,83 +1405,124 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   else
     ts = GST_BUFFER_DTS (buf);
 
+  GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
+
   GST_SPLITMUX_LOCK (splitmux);
 
-  if (splitmux->state == SPLITMUX_STATE_STOPPED)
+  if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
     goto beach;
 
   /* If this buffer has a timestamp, advance the input timestamp of the
    * stream */
   if (GST_CLOCK_TIME_IS_VALID (ts)) {
-    GstClockTime running_time =
-        gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
-        GST_BUFFER_TIMESTAMP (buf));
+    GstClockTimeDiff running_time =
+        my_segment_to_running_time (&ctx->in_segment, ts);
+
+    GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
+        GST_STIME_ARGS (running_time));
 
-    if (GST_CLOCK_TIME_IS_VALID (running_time) &&
-        (ctx->in_running_time == GST_CLOCK_TIME_NONE
-            || running_time > ctx->in_running_time))
+    if (GST_CLOCK_STIME_IS_VALID (running_time)
+        && running_time > ctx->in_running_time)
       ctx->in_running_time = running_time;
   }
 
   /* Try to make sure we have a valid running time */
-  if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
+  if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
     ctx->in_running_time =
-        gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
-        ctx->in_segment.start);
+        my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
   }
 
+  GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
+      GST_STIME_ARGS (ctx->in_running_time));
+
   buf_info->run_ts = ctx->in_running_time;
   buf_info->buf_size = gst_buffer_get_size (buf);
+  buf_info->duration = GST_BUFFER_DURATION (buf);
+
+  /* initialize fragment_start_time */
+  if (ctx->is_reference
+      && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
+    splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
+    GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
+        GST_STIME_ARGS (splitmux->fragment_start_time));
+    gst_buffer_replace (&ctx->prev_in_keyframe, buf);
+
+    /* Also take this as the first start time when starting up,
+     * so that we start counting overflow from the first frame */
+    if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
+      splitmux->max_in_running_time = splitmux->fragment_start_time;
+    if (request_next_keyframe (splitmux, ctx->prev_in_keyframe) == FALSE) {
+      GST_WARNING_OBJECT (splitmux,
+          "Could not request a keyframe. Files may not split at the exact location they should");
+    }
+    gst_buffer_replace (&splitmux->reference_ctx->prev_in_keyframe, NULL);
+  }
 
-  /* Update total input byte counter for overflow detect */
-  ctx->in_bytes += buf_info->buf_size;
-
-  GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
-      " total in_bytes %" G_GSIZE_FORMAT,
-      GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
+  GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
+      " total GOP bytes %" G_GUINT64_FORMAT,
+      GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
 
   loop_again = TRUE;
   do {
     if (ctx->flushing)
       break;
 
-    switch (splitmux->state) {
-      case SPLITMUX_STATE_COLLECTING_GOP_START:
-        if (ctx->is_video) {
+    switch (splitmux->input_state) {
+      case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
+        if (ctx->is_reference) {
           /* If a keyframe, we have a complete GOP */
           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
-              !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
+              !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
               splitmux->max_in_running_time >= ctx->in_running_time) {
             /* Pass this buffer through */
             loop_again = FALSE;
+            /* Allow other input pads to catch up to here too */
+            splitmux->max_in_running_time = ctx->in_running_time;
+            GST_SPLITMUX_BROADCAST_INPUT (splitmux);
             break;
           }
           GST_INFO_OBJECT (pad,
-              "Have keyframe with running time %" GST_TIME_FORMAT,
-              GST_TIME_ARGS (ctx->in_running_time));
+              "Have keyframe with running time %" GST_STIME_FORMAT,
+              GST_STIME_ARGS (ctx->in_running_time));
           keyframe = TRUE;
-          splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
+          splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
           splitmux->max_in_running_time = ctx->in_running_time;
           /* Wake up other input pads to collect this GOP */
-          GST_SPLITMUX_BROADCAST (splitmux);
+          GST_SPLITMUX_BROADCAST_INPUT (splitmux);
           check_completed_gop (splitmux, ctx);
+          /* Store this new keyframe to remember the start of GOP */
+          gst_buffer_replace (&ctx->prev_in_keyframe, buf);
         } else {
-          /* We're still waiting for a keyframe on the video pad, sleep */
+          /* Pass this buffer if the reference ctx is far enough ahead */
+          if (ctx->in_running_time < splitmux->max_in_running_time) {
+            loop_again = FALSE;
+            break;
+          }
+
+          /* We're still waiting for a keyframe on the reference pad, sleep */
           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
-          GST_SPLITMUX_WAIT (splitmux);
-          GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
-              splitmux->state);
+          GST_SPLITMUX_WAIT_INPUT (splitmux);
+          GST_LOG_OBJECT (pad,
+              "Done sleeping for GOP start input state now %d",
+              splitmux->input_state);
         }
         break;
-      case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
-        /* After a GOP start is found, this buffer might complete the GOP */
+      case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
+        /* We're collecting a GOP. If this is the reference context,
+         * we need to check if this is a keyframe that marks the start
+         * of the next GOP. If it is, it marks the end of the GOP we're
+         * collecting, so sleep and wait until all the other pads also
+         * reach that timestamp - at which point, we have an entire GOP
+         * and either go to ENDING_FILE or release this GOP to the muxer and
+         * go back to COLLECT_GOP_START. */
+
         /* If we overran the target timestamp, it might be time to process
          * the GOP, otherwise bail out for more data
          */
         GST_LOG_OBJECT (pad,
-            "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
-            GST_TIME_ARGS (ctx->in_running_time),
-            GST_TIME_ARGS (splitmux->max_in_running_time));
+            "Checking TS %" GST_STIME_FORMAT " against max %"
+            GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
+            GST_STIME_ARGS (splitmux->max_in_running_time));
 
         if (ctx->in_running_time < splitmux->max_in_running_time) {
           loop_again = FALSE;
@@ -1060,33 +1533,30 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
             "Collected last packet of GOP. Checking other pads");
         check_completed_gop (splitmux, ctx);
         break;
-      case SPLITMUX_STATE_ENDING_FILE:
-      case SPLITMUX_STATE_START_NEXT_FRAGMENT:
-        /* A fragment is ending, wait until that's done before continuing */
-        GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
-        GST_SPLITMUX_WAIT (splitmux);
-        GST_DEBUG_OBJECT (pad,
-            "Done sleeping for fragment restart state now %d", splitmux->state);
+      }
+      case SPLITMUX_INPUT_STATE_FINISHING_UP:
+        loop_again = FALSE;
         break;
       default:
         loop_again = FALSE;
         break;
     }
-  } while (loop_again);
+  }
+  while (loop_again);
 
   if (keyframe) {
-    splitmux->queued_gops++;
+    splitmux->queued_keyframes++;
     buf_info->keyframe = TRUE;
   }
 
+  /* Update total input byte counter for overflow detect */
+  splitmux->gop_total_bytes += buf_info->buf_size;
+
   /* Now add this buffer to the queue just before returning */
   g_queue_push_head (&ctx->queued_bufs, buf_info);
 
-  /* Check the buffer will fit in the mq */
-  check_queue_length (splitmux, ctx);
-
   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
-      " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
+      " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
 
   GST_SPLITMUX_UNLOCK (splitmux);
   return GST_PAD_PROBE_PASS;
@@ -1098,6 +1568,89 @@ beach:
   return GST_PAD_PROBE_PASS;
 }
 
+static void
+grow_blocked_queues (GstSplitMuxSink * splitmux)
+{
+  GList *cur;
+
+  /* Scan other queues for full-ness and grow them */
+  for (cur = g_list_first (splitmux->contexts);
+      cur != NULL; cur = g_list_next (cur)) {
+    MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
+    guint cur_limit;
+    guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
+
+    g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
+    GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
+
+    if (cur_len >= cur_limit) {
+      cur_limit = cur_len + 1;
+      GST_DEBUG_OBJECT (tmpctx->q,
+          "Queue overflowed and needs enlarging. Growing to %u buffers",
+          cur_limit);
+      g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
+    }
+  }
+}
+
+static void
+handle_q_underrun (GstElement * q, gpointer user_data)
+{
+  MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
+  GstSplitMuxSink *splitmux = ctx->splitmux;
+
+  GST_SPLITMUX_LOCK (splitmux);
+  GST_DEBUG_OBJECT (q,
+      "Queue reported underrun with %d keyframes and %d cmds enqueued",
+      splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
+  grow_blocked_queues (splitmux);
+  GST_SPLITMUX_UNLOCK (splitmux);
+}
+
+static void
+handle_q_overrun (GstElement * q, gpointer user_data)
+{
+  MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
+  GstSplitMuxSink *splitmux = ctx->splitmux;
+  gboolean allow_grow = FALSE;
+
+  GST_SPLITMUX_LOCK (splitmux);
+  GST_DEBUG_OBJECT (q,
+      "Queue reported overrun with %d keyframes and %d cmds enqueued",
+      splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
+
+  if (splitmux->queued_keyframes < 2) {
+    /* Less than a full GOP queued, grow the queue */
+    allow_grow = TRUE;
+  } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
+    allow_grow = TRUE;
+  } else {
+    /* If another queue is starved, grow */
+    GList *cur;
+    for (cur = g_list_first (splitmux->contexts);
+        cur != NULL; cur = g_list_next (cur)) {
+      MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
+      if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
+        allow_grow = TRUE;
+      }
+    }
+  }
+  GST_SPLITMUX_UNLOCK (splitmux);
+
+  if (allow_grow) {
+    guint cur_limit;
+
+    g_object_get (q, "max-size-buffers", &cur_limit, NULL);
+    cur_limit++;
+
+    GST_DEBUG_OBJECT (q,
+        "Queue overflowed and needs enlarging. Growing to %u buffers",
+        cur_limit);
+
+    g_object_set (q, "max-size-buffers", cur_limit, NULL);
+  }
+}
+
 static GstPad *
 gst_splitmux_sink_request_new_pad (GstElement * element,
     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
@@ -1105,7 +1658,8 @@ gst_splitmux_sink_request_new_pad (GstElement * element,
   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
   GstPadTemplate *mux_template = NULL;
   GstPad *res = NULL;
-  GstPad *mq_sink, *mq_src;
+  GstElement *q;
+  GstPad *q_sink = NULL, *q_src = NULL;
   gchar *gname;
   gboolean is_video = FALSE;
   MqStreamCtx *ctx;
@@ -1113,21 +1667,44 @@ gst_splitmux_sink_request_new_pad (GstElement * element,
   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
 
   GST_SPLITMUX_LOCK (splitmux);
-  if (!create_elements (splitmux))
+  if (!create_muxer (splitmux))
     goto fail;
 
   if (templ->name_template) {
     if (g_str_equal (templ->name_template, "video")) {
+      if (splitmux->have_video)
+        goto already_have_video;
+
       /* FIXME: Look for a pad template with matching caps, rather than by name */
       mux_template =
           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
           (splitmux->muxer), "video_%u");
+
+      /* Fallback to find sink pad templates named 'video' (flvmux) */
+      if (!mux_template) {
+        mux_template =
+            gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
+            (splitmux->muxer), "video");
+      }
       is_video = TRUE;
       name = NULL;
     } else {
       mux_template =
           gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
           (splitmux->muxer), templ->name_template);
+
+      /* Fallback to find sink pad templates named 'audio' (flvmux) */
+      if (!mux_template) {
+        mux_template =
+            gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
+            (splitmux->muxer), "audio");
+      }
+    }
+    if (mux_template == NULL) {
+      /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */
+      mux_template =
+          gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS
+          (splitmux->muxer), "sink_%d");
     }
   }
 
@@ -1142,61 +1719,84 @@ gst_splitmux_sink_request_new_pad (GstElement * element,
   else
     gname = g_strdup (name);
 
-  if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
-    gst_element_release_request_pad (splitmux->muxer, res);
-    gst_object_unref (GST_OBJECT (res));
+  if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
     goto fail;
-  }
 
-  if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
+  gst_element_set_state (q, GST_STATE_TARGET (splitmux));
+
+  g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
+      "max-size-buffers", 5, NULL);
+
+  q_sink = gst_element_get_static_pad (q, "sink");
+  q_src = gst_element_get_static_pad (q, "src");
+
+  if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
     gst_element_release_request_pad (splitmux->muxer, res);
     gst_object_unref (GST_OBJECT (res));
-    gst_element_release_request_pad (splitmux->mq, mq_sink);
-    gst_object_unref (GST_OBJECT (mq_sink));
     goto fail;
   }
 
   gst_object_unref (GST_OBJECT (res));
 
   ctx = mq_stream_ctx_new (splitmux);
-  ctx->is_video = is_video;
-  ctx->srcpad = mq_src;
-  ctx->sinkpad = mq_sink;
+  /* Context holds a ref: */
+  ctx->q = gst_object_ref (q);
+  ctx->srcpad = q_src;
+  ctx->sinkpad = q_sink;
+  ctx->q_overrun_id =
+      g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
+  g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
 
   mq_stream_ctx_ref (ctx);
   ctx->src_pad_block_id =
-      gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
+      gst_pad_add_probe (q_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
       _pad_block_destroy_src_notify);
-  if (is_video)
-    splitmux->video_ctx = ctx;
+  if (is_video && splitmux->reference_ctx != NULL) {
+    splitmux->reference_ctx->is_reference = FALSE;
+    splitmux->reference_ctx = NULL;
+  }
+  if (splitmux->reference_ctx == NULL) {
+    splitmux->reference_ctx = ctx;
+    ctx->is_reference = TRUE;
+  }
 
-  res = gst_ghost_pad_new (gname, mq_sink);
+  res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
 
   mq_stream_ctx_ref (ctx);
   ctx->sink_pad_block_id =
-      gst_pad_add_probe (res, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
+      gst_pad_add_probe (q_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
       _pad_block_destroy_sink_notify);
 
   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
-      " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
+      " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
 
   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
 
   g_free (gname);
 
-  gst_object_unref (mq_sink);
-  gst_object_unref (mq_src);
+  if (is_video)
+    splitmux->have_video = TRUE;
 
   gst_pad_set_active (res, TRUE);
   gst_element_add_pad (element, res);
+
   GST_SPLITMUX_UNLOCK (splitmux);
 
   return res;
 fail:
   GST_SPLITMUX_UNLOCK (splitmux);
+
+  if (q_sink)
+    gst_object_unref (q_sink);
+  if (q_src)
+    gst_object_unref (q_src);
+  return NULL;
+already_have_video:
+  GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
+  GST_SPLITMUX_UNLOCK (splitmux);
   return NULL;
 }
 
@@ -1204,20 +1804,18 @@ static void
 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
 {
   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
-  GstPad *mqsink, *mqsrc, *muxpad;
+  GstPad *muxpad = NULL;
   MqStreamCtx *ctx =
       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
 
   GST_SPLITMUX_LOCK (splitmux);
 
-  if (splitmux->muxer == NULL || splitmux->mq == NULL)
+  if (splitmux->muxer == NULL)
     goto fail;                  /* Elements don't exist yet - nothing to release */
 
   GST_INFO_OBJECT (pad, "releasing request pad");
 
-  mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
-  mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
-  muxpad = gst_pad_get_peer (mqsrc);
+  muxpad = gst_pad_get_peer (ctx->srcpad);
 
   /* Remove the context from our consideration */
   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
@@ -1230,26 +1828,33 @@ gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
 
   /* Can release the context now */
   mq_stream_ctx_unref (ctx);
-
-  /* Release and free the mq input */
-  gst_element_release_request_pad (splitmux->mq, mqsink);
+  if (ctx == splitmux->reference_ctx)
+    splitmux->reference_ctx = NULL;
 
   /* Release and free the muxer input */
-  gst_element_release_request_pad (splitmux->muxer, muxpad);
+  if (muxpad) {
+    gst_element_release_request_pad (splitmux->muxer, muxpad);
+    gst_object_unref (muxpad);
+  }
 
-  gst_object_unref (mqsink);
-  gst_object_unref (mqsrc);
-  gst_object_unref (muxpad);
+  if (GST_PAD_PAD_TEMPLATE (pad) &&
+      g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
+              (pad)), "video"))
+    splitmux->have_video = FALSE;
 
   gst_element_remove_pad (element, pad);
 
+  /* Reset the internal elements only after all request pads are released */
+  if (splitmux->contexts == NULL)
+    gst_splitmux_reset (splitmux);
+
 fail:
   GST_SPLITMUX_UNLOCK (splitmux);
 }
 
 static GstElement *
 create_element (GstSplitMuxSink * splitmux,
-    const gchar * factory, const gchar * name)
+    const gchar * factory, const gchar * name, gboolean locked)
 {
   GstElement *ret = gst_element_factory_make (factory, name);
   if (ret == NULL) {
@@ -1257,6 +1862,13 @@ create_element (GstSplitMuxSink * splitmux,
     return NULL;
   }
 
+  if (locked) {
+    /* Ensure the sink starts in locked state and NULL - it will be changed
+     * by the filename setting code */
+    gst_element_set_locked_state (ret, TRUE);
+    gst_element_set_state (ret, GST_STATE_NULL);
+  }
+
   if (!gst_bin_add (GST_BIN (splitmux), ret)) {
     g_warning ("Could not add %s element - splitmuxsink will not work", name);
     gst_object_unref (ret);
@@ -1267,31 +1879,32 @@ create_element (GstSplitMuxSink * splitmux,
 }
 
 static gboolean
-create_elements (GstSplitMuxSink * splitmux)
+create_muxer (GstSplitMuxSink * splitmux)
 {
   /* Create internal elements */
-  if (splitmux->mq == NULL) {
-    if ((splitmux->mq =
-            create_element (splitmux, "multiqueue", "multiqueue")) == NULL)
-      goto fail;
+  if (splitmux->muxer == NULL) {
+    GstElement *provided_muxer = NULL;
 
-    splitmux->mq_max_buffers = 5;
-    /* No bytes or time limit, we limit buffers manually */
-    g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
-        (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
-  }
+    GST_OBJECT_LOCK (splitmux);
+    if (splitmux->provided_muxer != NULL)
+      provided_muxer = gst_object_ref (splitmux->provided_muxer);
+    GST_OBJECT_UNLOCK (splitmux);
 
-  if (splitmux->muxer == NULL) {
-    if (splitmux->provided_muxer == NULL) {
+    if (provided_muxer == NULL) {
       if ((splitmux->muxer =
-              create_element (splitmux, "mp4mux", "muxer")) == NULL)
+              create_element (splitmux, "mp4mux", "muxer", FALSE)) == NULL)
         goto fail;
     } else {
-      splitmux->muxer = splitmux->provided_muxer;
-      if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_muxer)) {
+      /* Ensure it's not in locked state (we might be reusing an old element) */
+      gst_element_set_locked_state (provided_muxer, FALSE);
+      if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) {
         g_warning ("Could not add muxer element - splitmuxsink will not work");
+        gst_object_unref (provided_muxer);
         goto fail;
       }
+
+      splitmux->muxer = provided_muxer;
+      gst_object_unref (provided_muxer);
     }
   }
 
@@ -1311,6 +1924,9 @@ find_sink (GstElement * e)
   if (!GST_IS_BIN (e))
     return e;
 
+  if (g_object_class_find_property (G_OBJECT_GET_CLASS (e), "location") != NULL)
+    return e;
+
   iter = gst_bin_iterate_sinks (GST_BIN (e));
   while (!done) {
     switch (gst_iterator_next (iter, &data)) {
@@ -1345,35 +1961,60 @@ find_sink (GstElement * e)
 static gboolean
 create_sink (GstSplitMuxSink * splitmux)
 {
-  g_return_val_if_fail (splitmux->active_sink == NULL, TRUE);
+  GstElement *provided_sink = NULL;
 
-  if (splitmux->provided_sink == NULL) {
-    if ((splitmux->sink =
-            create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
-      goto fail;
-    splitmux->active_sink = splitmux->sink;
-  } else {
-    if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_sink)) {
-      g_warning ("Could not add sink elements - splitmuxsink will not work");
-      goto fail;
+  if (splitmux->active_sink == NULL) {
+
+    GST_OBJECT_LOCK (splitmux);
+    if (splitmux->provided_sink != NULL)
+      provided_sink = gst_object_ref (splitmux->provided_sink);
+    GST_OBJECT_UNLOCK (splitmux);
+
+    if (provided_sink == NULL) {
+      if ((splitmux->sink =
+              create_element (splitmux, DEFAULT_SINK, "sink", TRUE)) == NULL)
+        goto fail;
+      splitmux->active_sink = splitmux->sink;
+    } else {
+      /* Ensure the sink starts in locked state and NULL - it will be changed
+       * by the filename setting code */
+      gst_element_set_locked_state (provided_sink, TRUE);
+      gst_element_set_state (provided_sink, GST_STATE_NULL);
+      if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
+        g_warning ("Could not add sink elements - splitmuxsink will not work");
+        gst_object_unref (provided_sink);
+        goto fail;
+      }
+
+      splitmux->active_sink = provided_sink;
+
+      /* The bin holds a ref now, we can drop our tmp ref */
+      gst_object_unref (provided_sink);
+
+      /* Find the sink element */
+      splitmux->sink = find_sink (splitmux->active_sink);
+      if (splitmux->sink == NULL) {
+        g_warning
+            ("Could not locate sink element in provided sink - splitmuxsink will not work");
+        goto fail;
+      }
     }
 
-    splitmux->active_sink = splitmux->provided_sink;
+#if 1
+    if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
+            "async") != NULL) {
+      /* async child elements are causing state change races and weird
+       * failures, so let's try and turn that off */
+      g_object_set (splitmux->sink, "async", FALSE, NULL);
+    }
+#endif
 
-    /* Find the sink element */
-    splitmux->sink = find_sink (splitmux->active_sink);
-    if (splitmux->sink == NULL) {
-      g_warning
-          ("Could not locate sink element in provided sink - splitmuxsink will not work");
+    if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
+      g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
       goto fail;
     }
   }
 
-  if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
-    g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
-    goto fail;
-  }
-
   return TRUE;
 fail:
   return FALSE;
@@ -1383,12 +2024,37 @@ fail:
 #pragma GCC diagnostic ignored "-Wformat-nonliteral"
 #endif
 static void
-set_next_filename (GstSplitMuxSink * splitmux)
+set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 {
-  if (splitmux->location) {
-    gchar *fname;
+  gchar *fname = NULL;
+  GstSample *sample;
+  GstCaps *caps;
+
+  gst_splitmux_sink_ensure_max_files (splitmux);
 
-    fname = g_strdup_printf (splitmux->location, splitmux->fragment_id);
+  if (ctx->cur_out_buffer == NULL) {
+    GST_WARNING_OBJECT (splitmux, "Starting next file without buffer");
+  }
+
+  caps = gst_pad_get_current_caps (ctx->srcpad);
+  sample = gst_sample_new (ctx->cur_out_buffer, caps, &ctx->out_segment, NULL);
+  g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION_FULL], 0,
+      splitmux->fragment_id, sample, &fname);
+  gst_sample_unref (sample);
+  if (caps)
+    gst_caps_unref (caps);
+
+  if (fname == NULL) {
+    /* Fallback to the old signal if the new one returned nothing */
+    g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
+        splitmux->fragment_id, &fname);
+  }
+
+  if (!fname)
+    fname = splitmux->location ?
+        g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL;
+
+  if (fname) {
     GST_INFO_OBJECT (splitmux, "Setting file to %s", fname);
     g_object_set (splitmux->sink, "location", fname, NULL);
     g_free (fname);
@@ -1397,6 +2063,43 @@ set_next_filename (GstSplitMuxSink * splitmux)
   }
 }
 
+static void
+do_async_start (GstSplitMuxSink * splitmux)
+{
+  GstMessage *message;
+
+  if (!splitmux->need_async_start) {
+    GST_INFO_OBJECT (splitmux, "no async_start needed");
+    return;
+  }
+
+  splitmux->async_pending = TRUE;
+
+  GST_INFO_OBJECT (splitmux, "Sending async_start message");
+  message = gst_message_new_async_start (GST_OBJECT_CAST (splitmux));
+  GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
+      (splitmux), message);
+}
+
+static void
+do_async_done (GstSplitMuxSink * splitmux)
+{
+  GstMessage *message;
+
+  if (splitmux->async_pending) {
+    GST_INFO_OBJECT (splitmux, "Sending async_done message");
+    message =
+        gst_message_new_async_done (GST_OBJECT_CAST (splitmux),
+        GST_CLOCK_TIME_NONE);
+    GST_BIN_CLASS (parent_class)->handle_message (GST_BIN_CAST
+        (splitmux), message);
+
+    splitmux->async_pending = FALSE;
+  }
+
+  splitmux->need_async_start = FALSE;
+}
+
 static GstStateChangeReturn
 gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
 {
@@ -1406,34 +2109,38 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
   switch (transition) {
     case GST_STATE_CHANGE_NULL_TO_READY:{
       GST_SPLITMUX_LOCK (splitmux);
-      if (!create_elements (splitmux) || !create_sink (splitmux)) {
+      if (!create_muxer (splitmux) || !create_sink (splitmux)) {
         ret = GST_STATE_CHANGE_FAILURE;
         GST_SPLITMUX_UNLOCK (splitmux);
         goto beach;
       }
       GST_SPLITMUX_UNLOCK (splitmux);
       splitmux->fragment_id = 0;
-      set_next_filename (splitmux);
       break;
     }
     case GST_STATE_CHANGE_READY_TO_PAUSED:{
       GST_SPLITMUX_LOCK (splitmux);
       /* Start by collecting one input on each pad */
-      splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
-      splitmux->max_in_running_time = 0;
-      splitmux->muxed_out_time = splitmux->mux_start_time = 0;
-      splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
+      splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
+      splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
+      splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
+      splitmux->gop_start_time = splitmux->fragment_start_time =
+          GST_CLOCK_STIME_NONE;
+      splitmux->muxed_out_bytes = 0;
+      splitmux->ready_for_output = FALSE;
       GST_SPLITMUX_UNLOCK (splitmux);
       break;
     }
     case GST_STATE_CHANGE_PAUSED_TO_READY:
     case GST_STATE_CHANGE_READY_TO_NULL:
       GST_SPLITMUX_LOCK (splitmux);
-      splitmux->state = SPLITMUX_STATE_STOPPED;
+      splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
+      splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
       /* Wake up any blocked threads */
       GST_LOG_OBJECT (splitmux,
           "State change -> NULL or READY. Waking threads");
-      GST_SPLITMUX_BROADCAST (splitmux);
+      GST_SPLITMUX_BROADCAST_INPUT (splitmux);
+      GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
       GST_SPLITMUX_UNLOCK (splitmux);
       break;
     default:
@@ -1445,10 +2152,29 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
     goto beach;
 
   switch (transition) {
+    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
+      splitmux->need_async_start = TRUE;
+      break;
+    case GST_STATE_CHANGE_READY_TO_PAUSED:{
+      /* Change state async, because our child sink might not
+       * be ready to do that for us yet if it's state is still locked */
+
+      splitmux->need_async_start = TRUE;
+      /* we want to go async to PAUSED until we managed to configure and add the
+       * sink */
+      GST_SPLITMUX_LOCK (splitmux);
+      do_async_start (splitmux);
+      GST_SPLITMUX_UNLOCK (splitmux);
+      ret = GST_STATE_CHANGE_ASYNC;
+      break;
+    }
     case GST_STATE_CHANGE_READY_TO_NULL:
       GST_SPLITMUX_LOCK (splitmux);
       splitmux->fragment_id = 0;
-      gst_splitmux_reset (splitmux);
+      /* Reset internal elements only if no pad contexts are using them */
+      if (splitmux->contexts == NULL)
+        gst_splitmux_reset (splitmux);
+      do_async_done (splitmux);
       GST_SPLITMUX_UNLOCK (splitmux);
       break;
     default:
@@ -1456,11 +2182,13 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
   }
 
 beach:
-
   if (transition == GST_STATE_CHANGE_NULL_TO_READY &&
       ret == GST_STATE_CHANGE_FAILURE) {
     /* Cleanup elements on failed transition out of NULL */
     gst_splitmux_reset (splitmux);
+    GST_SPLITMUX_LOCK (splitmux);
+    do_async_done (splitmux);
+    GST_SPLITMUX_UNLOCK (splitmux);
   }
   return ret;
 }
@@ -1474,3 +2202,11 @@ register_splitmuxsink (GstPlugin * plugin)
   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
       GST_TYPE_SPLITMUX_SINK);
 }
+
+static void
+gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
+{
+  if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
+    splitmux->fragment_id = 0;
+  }
+}