splitmux: Rewrite buffer collection and scheduling
authorJan Schmidt <jan@centricular.com>
Fri, 18 Nov 2016 11:42:18 +0000 (22:42 +1100)
committerJan Schmidt <jan@centricular.com>
Mon, 2 Jan 2017 14:34:02 +0000 (01:34 +1100)
Majorly change the way that splitmuxsink collects
incoming data and sends it to the output, so that it
makes all decisions about when / where to split files
on the input side.

Use separate queues for each stream, so they can be
grown individually and kept as small as possible.

This removes raciness I observed where sometimes
some data would end up put in a different output file
over multiple runs with the same input.

Also fixes hangs with input queues getting full
and causing muxing to stall out.

gst/multifile/gstsplitmuxsink.c
gst/multifile/gstsplitmuxsink.h

index 5d8b75f093d2b0fa43cc99de106d491eebd6929b..4b2f01042bf7e9b3fd3135c09f8c3f72001c5b68 100644 (file)
@@ -63,8 +63,11 @@ GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
 
 #define GST_SPLITMUX_LOCK(s) g_mutex_lock(&(s)->lock)
 #define GST_SPLITMUX_UNLOCK(s) g_mutex_unlock(&(s)->lock)
-#define GST_SPLITMUX_WAIT(s) g_cond_wait (&(s)->data_cond, &(s)->lock)
-#define GST_SPLITMUX_BROADCAST(s) g_cond_broadcast (&(s)->data_cond)
+#define GST_SPLITMUX_WAIT_INPUT(s) g_cond_wait (&(s)->input_cond, &(s)->lock)
+#define GST_SPLITMUX_BROADCAST_INPUT(s) g_cond_broadcast (&(s)->input_cond)
+
+#define GST_SPLITMUX_WAIT_OUTPUT(s) g_cond_wait (&(s)->output_cond, &(s)->lock)
+#define GST_SPLITMUX_BROADCAST_OUTPUT(s) g_cond_broadcast (&(s)->output_cond)
 
 enum
 {
@@ -124,7 +127,7 @@ _do_init (void)
 G_DEFINE_TYPE_EXTENDED (GstSplitMuxSink, gst_splitmux_sink, GST_TYPE_BIN, 0,
     _do_init ());
 
-static gboolean create_elements (GstSplitMuxSink * splitmux);
+static gboolean create_muxer (GstSplitMuxSink * splitmux);
 static gboolean create_sink (GstSplitMuxSink * splitmux);
 static void gst_splitmux_sink_set_property (GObject * object, guint prop_id,
     const GValue * value, GParamSpec * pspec);
@@ -143,10 +146,12 @@ static GstStateChangeReturn gst_splitmux_sink_change_state (GstElement *
 static void bus_handler (GstBin * bin, GstMessage * msg);
 static void set_next_filename (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
 static void start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
-static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
+static void grow_blocked_queues (GstSplitMuxSink * splitmux);
 
 static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
+static GstElement *create_element (GstSplitMuxSink * splitmux,
+    const gchar * factory, const gchar * name, gboolean locked);
 
 static void do_async_done (GstSplitMuxSink * splitmux);
 
@@ -162,6 +167,18 @@ mq_stream_buf_free (MqStreamBuf * data)
   g_slice_free (MqStreamBuf, data);
 }
 
+static SplitMuxOutputCommand *
+out_cmd_buf_new (void)
+{
+  return g_slice_new0 (SplitMuxOutputCommand);
+}
+
+static void
+out_cmd_buf_free (SplitMuxOutputCommand * data)
+{
+  g_slice_free (SplitMuxOutputCommand, data);
+}
+
 static void
 gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
 {
@@ -267,7 +284,9 @@ static void
 gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
 {
   g_mutex_init (&splitmux->lock);
-  g_cond_init (&splitmux->data_cond);
+  g_cond_init (&splitmux->input_cond);
+  g_cond_init (&splitmux->output_cond);
+  g_queue_init (&splitmux->out_cmd_q);
 
   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
@@ -275,19 +294,12 @@ gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
   splitmux->max_files = DEFAULT_MAX_FILES;
   splitmux->send_keyframe_requests = DEFAULT_SEND_KEYFRAME_REQUESTS;
 
-  splitmux->update_mux_start_time = FALSE;
-
   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
 }
 
 static void
 gst_splitmux_reset (GstSplitMuxSink * splitmux)
 {
-  if (splitmux->mq) {
-    gst_element_set_locked_state (splitmux->mq, TRUE);
-    gst_element_set_state (splitmux->mq, GST_STATE_NULL);
-    gst_bin_remove (GST_BIN (splitmux), splitmux->mq);
-  }
   if (splitmux->muxer) {
     gst_element_set_locked_state (splitmux->muxer, TRUE);
     gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
@@ -299,8 +311,7 @@ gst_splitmux_reset (GstSplitMuxSink * splitmux)
     gst_bin_remove (GST_BIN (splitmux), splitmux->active_sink);
   }
 
-  splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
-      NULL;
+  splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
 }
 
 static void
@@ -309,8 +320,7 @@ gst_splitmux_sink_dispose (GObject * object)
   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
 
   /* Calling parent dispose invalidates all child pointers */
-  splitmux->sink = splitmux->active_sink = splitmux->muxer = splitmux->mq =
-      NULL;
+  splitmux->sink = splitmux->active_sink = splitmux->muxer = NULL;
 
   G_OBJECT_CLASS (parent_class)->dispose (object);
 }
@@ -319,8 +329,12 @@ static void
 gst_splitmux_sink_finalize (GObject * object)
 {
   GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
-  g_cond_clear (&splitmux->data_cond);
+  g_cond_clear (&splitmux->input_cond);
+  g_cond_clear (&splitmux->output_cond);
   g_mutex_clear (&splitmux->lock);
+  g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
+  g_queue_clear (&splitmux->out_cmd_q);
+
   if (splitmux->provided_sink)
     gst_object_unref (splitmux->provided_sink);
   if (splitmux->provided_muxer)
@@ -466,51 +480,6 @@ my_segment_to_running_time (GstSegment * segment, GstClockTime val)
   return res;
 }
 
-static GstPad *
-mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
-{
-  gchar *tmp, *sinkname, *srcname;
-  GstPad *mq_src;
-
-  sinkname = gst_pad_get_name (sink_pad);
-  tmp = sinkname + 5;
-  srcname = g_strdup_printf ("src_%s", tmp);
-
-  mq_src = gst_element_get_static_pad (mq, srcname);
-
-  g_free (sinkname);
-  g_free (srcname);
-
-  return mq_src;
-}
-
-static gboolean
-get_pads_from_mq (GstSplitMuxSink * splitmux, GstPad ** sink_pad,
-    GstPad ** src_pad)
-{
-  GstPad *mq_sink;
-  GstPad *mq_src;
-
-  /* Request a pad from multiqueue, then connect this one, then
-   * discover the corresponding output pad and return both */
-  mq_sink = gst_element_get_request_pad (splitmux->mq, "sink_%u");
-  if (mq_sink == NULL)
-    return FALSE;
-
-  mq_src = mq_sink_to_src (splitmux->mq, mq_sink);
-  if (mq_src == NULL)
-    goto fail;
-
-  *sink_pad = mq_sink;
-  *src_pad = mq_src;
-
-  return TRUE;
-
-fail:
-  gst_element_release_request_pad (splitmux->mq, mq_sink);
-  return FALSE;
-}
-
 static MqStreamCtx *
 mq_stream_ctx_new (GstSplitMuxSink * splitmux)
 {
@@ -529,6 +498,15 @@ mq_stream_ctx_new (GstSplitMuxSink * splitmux)
 static void
 mq_stream_ctx_free (MqStreamCtx * ctx)
 {
+  if (ctx->q) {
+    g_signal_handler_disconnect (ctx->q, ctx->q_overrun_id);
+    gst_element_set_locked_state (ctx->q, TRUE);
+    gst_element_set_state (ctx->q, GST_STATE_NULL);
+    gst_bin_remove (GST_BIN (ctx->splitmux), ctx->q);
+    gst_object_unref (ctx->q);
+  }
+  gst_object_unref (ctx->sinkpad);
+  gst_object_unref (ctx->srcpad);
   g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
   g_queue_clear (&ctx->queued_bufs);
   g_free (ctx);
@@ -611,46 +589,99 @@ static void
 complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 {
   do {
+    /* When first starting up, the reference stream has to output
+     * the first buffer to prepare the muxer and sink */
+    gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
+
+    if (ctx->flushing
+        || splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
+      return;
 
     GST_LOG_OBJECT (ctx->srcpad,
         "Checking running time %" GST_STIME_FORMAT " against max %"
         GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
         GST_STIME_ARGS (splitmux->max_out_running_time));
 
-    if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
-        ctx->out_running_time < splitmux->max_out_running_time) {
-      splitmux->have_muxed_something = TRUE;
-      return;
-    }
+    if (can_output) {
+      if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
+          ctx->out_running_time < splitmux->max_out_running_time) {
+        return;
+      }
 
-    if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED)
-      return;
+      switch (splitmux->output_state) {
+        case SPLITMUX_OUTPUT_STATE_OUTPUT_GOP:
+          /* We only get here if we've finished outputting a GOP and need to know
+           * what to do next */
+          splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
+          GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
+          continue;
+
+        case SPLITMUX_OUTPUT_STATE_ENDING_FILE:
+          /* We've reached the max out running_time to get here, so end this file now */
+          if (ctx->out_eos == FALSE) {
+            send_eos (splitmux, ctx);
+            continue;
+          }
+          break;
+        case SPLITMUX_OUTPUT_STATE_START_NEXT_FILE:
+          if (ctx->is_reference) {
+            /* Special handling on the reference ctx to start new fragments
+             * and collect commands from the command queue */
+            /* drops the splitmux lock briefly: */
+            start_next_fragment (splitmux, ctx);
+            continue;
+          }
+          break;
 
-    if (splitmux->state == SPLITMUX_STATE_ENDING_FILE) {
-      if (ctx->out_eos == FALSE) {
-        send_eos (splitmux, ctx);
-        continue;
+        case SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND:{
+          do {
+            SplitMuxOutputCommand *cmd =
+                g_queue_pop_tail (&splitmux->out_cmd_q);
+            if (cmd != NULL) {
+              /* If we pop the last command, we need to make our queues bigger */
+              if (g_queue_get_length (&splitmux->out_cmd_q) == 0)
+                grow_blocked_queues (splitmux);
+
+              if (cmd->start_new_fragment) {
+                GST_DEBUG_OBJECT (splitmux, "Got cmd to start new fragment");
+                splitmux->output_state = SPLITMUX_OUTPUT_STATE_ENDING_FILE;
+              } else {
+                GST_DEBUG_OBJECT (splitmux,
+                    "Got new output cmd for time %" GST_STIME_FORMAT,
+                    GST_STIME_ARGS (cmd->max_output_ts));
+
+                /* Extend the output range immediately */
+                splitmux->max_out_running_time = cmd->max_output_ts;
+                splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
+              }
+              GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
+
+              out_cmd_buf_free (cmd);
+              break;
+            } else {
+              GST_SPLITMUX_WAIT_OUTPUT (splitmux);
+            }
+          } while (splitmux->output_state ==
+              SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND);
+          /* loop and re-check the state */
+          continue;
+        }
+        case SPLITMUX_OUTPUT_STATE_STOPPED:
+          return;
       }
-    } else if (ctx->is_reference
-        && splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) {
-      start_next_fragment (splitmux, ctx);
-      continue;
     }
 
     GST_INFO_OBJECT (ctx->srcpad,
         "Sleeping for running time %"
-        GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")",
+        GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ") or state change.",
         GST_STIME_ARGS (ctx->out_running_time),
         GST_STIME_ARGS (splitmux->max_out_running_time));
-    ctx->out_blocked = TRUE;
-    /* Expand the mq if needed before sleeping */
-    check_queue_length (splitmux, ctx);
-    GST_SPLITMUX_WAIT (splitmux);
-    ctx->out_blocked = FALSE;
+    GST_SPLITMUX_WAIT_OUTPUT (splitmux);
     GST_INFO_OBJECT (ctx->srcpad,
         "Woken for new max running time %" GST_STIME_FORMAT,
         GST_STIME_ARGS (splitmux->max_out_running_time));
-  } while (1);
+  }
+  while (1);
 }
 
 static gboolean
@@ -658,14 +689,14 @@ request_next_keyframe (GstSplitMuxSink * splitmux)
 {
   GstEvent *ev;
 
-  if (splitmux->send_keyframe_requests == FALSE || splitmux->threshold_time == 0
-      || splitmux->threshold_bytes != 0)
+  if (splitmux->send_keyframe_requests == FALSE
+      || splitmux->threshold_time == 0 || splitmux->threshold_bytes != 0)
     return TRUE;
 
-  ev = gst_video_event_new_upstream_force_key_unit (splitmux->mux_start_time +
-      splitmux->threshold_time, TRUE, 0);
+  ev = gst_video_event_new_upstream_force_key_unit
+      (splitmux->fragment_start_time + splitmux->threshold_time, TRUE, 0);
   GST_DEBUG_OBJECT (splitmux, "Requesting next keyframe at %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (splitmux->mux_start_time + splitmux->threshold_time));
+      GST_TIME_ARGS (splitmux->fragment_start_time + splitmux->threshold_time));
   return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
 }
 
@@ -684,6 +715,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   }
   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
     GstEvent *event = gst_pad_probe_info_get_event (info);
+    gboolean locked = FALSE;
 
     GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
 
@@ -693,25 +725,25 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         break;
       case GST_EVENT_FLUSH_STOP:
         GST_SPLITMUX_LOCK (splitmux);
+        locked = TRUE;
         gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
         g_queue_foreach (&ctx->queued_bufs, (GFunc) mq_stream_buf_free, NULL);
         g_queue_clear (&ctx->queued_bufs);
         ctx->flushing = FALSE;
-        GST_SPLITMUX_UNLOCK (splitmux);
         break;
       case GST_EVENT_FLUSH_START:
         GST_SPLITMUX_LOCK (splitmux);
+        locked = TRUE;
         GST_LOG_OBJECT (pad, "Flush start");
         ctx->flushing = TRUE;
-        GST_SPLITMUX_BROADCAST (splitmux);
-        GST_SPLITMUX_UNLOCK (splitmux);
+        GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
         break;
       case GST_EVENT_EOS:
         GST_SPLITMUX_LOCK (splitmux);
-        if (splitmux->state == SPLITMUX_STATE_STOPPED)
+        locked = TRUE;
+        if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
           goto beach;
         ctx->out_eos = TRUE;
-        GST_SPLITMUX_UNLOCK (splitmux);
         break;
       case GST_EVENT_GAP:{
         GstClockTime gap_ts;
@@ -722,13 +754,9 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
           break;
 
         GST_SPLITMUX_LOCK (splitmux);
+        locked = TRUE;
 
-        rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
-
-        GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
-            GST_STIME_ARGS (rtime));
-
-        if (splitmux->state == SPLITMUX_STATE_STOPPED)
+        if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
           goto beach;
 
         /* When we get a gap event on the
@@ -737,19 +765,22 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
          * the buffer afterwards
          */
         if (ctx->is_reference &&
-            (splitmux->opening_first_fragment ||
-                splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT)) {
+            (splitmux->output_state != SPLITMUX_OUTPUT_STATE_OUTPUT_GOP)) {
           GST_DEBUG_OBJECT (pad, "Storing GAP event until buffer arrives");
           gst_event_replace (&ctx->pending_gap, event);
           GST_SPLITMUX_UNLOCK (splitmux);
           return GST_PAD_PROBE_HANDLED;
         }
 
+        rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
+
+        GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
+            GST_STIME_ARGS (rtime));
+
         if (rtime != GST_CLOCK_STIME_NONE) {
           ctx->out_running_time = rtime;
           complete_or_wait_on_out (splitmux, ctx);
         }
-        GST_SPLITMUX_UNLOCK (splitmux);
         break;
       }
       case GST_EVENT_CUSTOM_DOWNSTREAM:{
@@ -763,8 +794,9 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         gst_structure_get_int64 (s, "timestamp", &ts);
 
         GST_SPLITMUX_LOCK (splitmux);
+        locked = TRUE;
 
-        if (splitmux->state == SPLITMUX_STATE_STOPPED)
+        if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_STOPPED)
           goto beach;
         ctx->out_running_time = ts;
         if (!ctx->is_reference)
@@ -775,6 +807,15 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
       default:
         break;
     }
+
+    /* We need to make sure events aren't passed
+     * until the muxer / sink are ready for it */
+    if (!locked)
+      GST_SPLITMUX_LOCK (splitmux);
+    if (!ctx->is_reference)
+      complete_or_wait_on_out (splitmux, ctx);
+    GST_SPLITMUX_UNLOCK (splitmux);
+
     return GST_PAD_PROBE_PASS;
   }
 
@@ -787,8 +828,8 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
     goto beach;
 
   /* If we have popped a keyframe, decrement the queued_gop count */
-  if (buf_info->keyframe && splitmux->queued_gops > 0)
-    splitmux->queued_gops--;
+  if (buf_info->keyframe && splitmux->queued_keyframes > 0)
+    splitmux->queued_keyframes--;
 
   ctx->out_running_time = buf_info->run_ts;
   ctx->cur_buffer = gst_pad_probe_info_get_buffer (info);
@@ -798,24 +839,8 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
       " size %" G_GUINT64_FORMAT,
       pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
 
-  if (ctx->is_reference && splitmux->opening_first_fragment) {
-    if (request_next_keyframe (splitmux) == FALSE)
-      GST_WARNING_OBJECT (splitmux,
-          "Could not request a keyframe. Files may not split at the exact location they should");
-    start_next_fragment (splitmux, ctx);
-    splitmux->opening_first_fragment = FALSE;
-  }
-
   complete_or_wait_on_out (splitmux, ctx);
 
-  if (splitmux->update_mux_start_time && ctx->is_reference) {
-    splitmux->mux_start_time = buf_info->run_ts;
-    splitmux->update_mux_start_time = FALSE;
-    if (request_next_keyframe (splitmux) == FALSE)
-      GST_WARNING_OBJECT (splitmux,
-          "Could not request a keyframe. Files may not split at the exact location they should");
-  }
-
   if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE ||
       splitmux->muxed_out_time < buf_info->run_ts)
     splitmux->muxed_out_time = buf_info->run_ts;
@@ -884,50 +909,50 @@ restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
 static void
 start_next_fragment (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 {
+  GstElement *muxer, *sink;
+
   /* 1 change to new file */
   splitmux->switching_fragment = TRUE;
 
-  gst_element_set_locked_state (splitmux->muxer, TRUE);
-  gst_element_set_locked_state (splitmux->active_sink, TRUE);
-  gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
-  gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
+  /* We need to drop the splitmux lock to acquire the state lock
+   * here and ensure there's no racy state change going on elsewhere */
+  muxer = gst_object_ref (splitmux->muxer);
+  sink = gst_object_ref (splitmux->active_sink);
+
+  GST_SPLITMUX_UNLOCK (splitmux);
+  GST_STATE_LOCK (splitmux);
 
+  gst_element_set_locked_state (muxer, TRUE);
+  gst_element_set_locked_state (sink, TRUE);
+  gst_element_set_state (muxer, GST_STATE_NULL);
+  gst_element_set_state (sink, GST_STATE_NULL);
+
+  GST_SPLITMUX_LOCK (splitmux);
   set_next_filename (splitmux, ctx);
+  GST_SPLITMUX_UNLOCK (splitmux);
 
-  gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
-  gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
-  gst_element_set_locked_state (splitmux->muxer, FALSE);
-  gst_element_set_locked_state (splitmux->active_sink, FALSE);
+  gst_element_set_state (sink, GST_STATE_TARGET (splitmux));
+  gst_element_set_state (muxer, GST_STATE_TARGET (splitmux));
+  gst_element_set_locked_state (muxer, FALSE);
+  gst_element_set_locked_state (sink, FALSE);
 
+  gst_object_unref (sink);
+  gst_object_unref (muxer);
+
+  GST_SPLITMUX_LOCK (splitmux);
+  GST_STATE_UNLOCK (splitmux);
   splitmux->switching_fragment = FALSE;
   do_async_done (splitmux);
 
-  g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
-
-  if (!splitmux->opening_first_fragment) {
-    /* Switch state and go back to processing */
-    if (!splitmux->reference_ctx->in_eos) {
-      splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
-      splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
-    } else {
-      splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
-      splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
-      splitmux->have_muxed_something = FALSE;
-    }
-    splitmux->have_muxed_something =
-        (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
-
-    /* Store the overflow parameters as the basis for the next fragment */
-    splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
+  splitmux->ready_for_output = TRUE;
 
-    GST_DEBUG_OBJECT (splitmux,
-        "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
-        GST_STIME_ARGS (splitmux->max_out_running_time));
-  }
+  g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
 
   send_fragment_opened_closed_msg (splitmux, TRUE);
 
-  GST_SPLITMUX_BROADCAST (splitmux);
+  /* FIXME: Is this always the correct next state? */
+  splitmux->output_state = SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND;
+  GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
 }
 
 static void
@@ -942,15 +967,19 @@ bus_handler (GstBin * bin, GstMessage * message)
 
       send_fragment_opened_closed_msg (splitmux, FALSE);
 
-      if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
-          splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) {
+      if (splitmux->output_state == SPLITMUX_OUTPUT_STATE_ENDING_FILE) {
         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
-        splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
-        GST_SPLITMUX_BROADCAST (splitmux);
+        splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
+        GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
 
         gst_message_unref (message);
         GST_SPLITMUX_UNLOCK (splitmux);
         return;
+      } else {
+        GST_DEBUG_OBJECT (splitmux,
+            "Passing EOS message. Output state %d max_out_running_time %"
+            GST_STIME_FORMAT, splitmux->output_state,
+            GST_STIME_ARGS (splitmux->max_out_running_time));
       }
       GST_SPLITMUX_UNLOCK (splitmux);
       break;
@@ -958,8 +987,8 @@ bus_handler (GstBin * bin, GstMessage * message)
     case GST_MESSAGE_ASYNC_DONE:
       /* Ignore state changes from our children while switching */
       if (splitmux->switching_fragment) {
-        if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink ||
-            GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
+        if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink
+            || GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
           GST_LOG_OBJECT (splitmux,
               "Ignoring state change from child %" GST_PTR_FORMAT
               " while switching", GST_MESSAGE_SRC (message));
@@ -975,6 +1004,12 @@ bus_handler (GstBin * bin, GstMessage * message)
   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
 }
 
+static void
+ctx_set_unblock (MqStreamCtx * ctx)
+{
+  ctx->need_unblock = TRUE;
+}
+
 /* Called with splitmux lock held */
 /* Called when entering ProcessingCompleteGop state
  * Assess if mq contents overflowed the current file
@@ -985,27 +1020,25 @@ bus_handler (GstBin * bin, GstMessage * message)
 static void
 handle_gathered_gop (GstSplitMuxSink * splitmux)
 {
-  GList *cur;
-  guint64 queued_bytes = 0;
+  guint64 queued_bytes;
   GstClockTimeDiff queued_time = 0;
+  GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
+  SplitMuxOutputCommand *cmd;
 
   /* Assess if the multiqueue contents overflowed the current file */
-  for (cur = g_list_first (splitmux->contexts);
-      cur != NULL; cur = g_list_next (cur)) {
-    MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
-    if (tmpctx->in_running_time > queued_time)
-      queued_time = tmpctx->in_running_time;
-    queued_bytes += tmpctx->in_bytes;
-  }
+  /* When considering if a newly gathered GOP overflows
+   * the time limit for the file, only consider the running time of the
+   * reference stream. Other streams might have run ahead a little bit,
+   * but extra pieces won't be released to the muxer beyond the reference
+   * stream cut-off anyway - so it forms the limit. */
+  queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
+  queued_time = splitmux->reference_ctx->in_running_time;
+
+  GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
 
-  GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT
-      " splitmuxsink->mux_start_bytes %" G_GUINT64_FORMAT, queued_bytes,
-      splitmux->mux_start_bytes);
-  g_assert (queued_bytes >= splitmux->mux_start_bytes);
-  g_assert (queued_time >= splitmux->mux_start_time);
+  g_assert (queued_time >= splitmux->fragment_start_time);
 
-  queued_bytes -= splitmux->mux_start_bytes;
-  queued_time -= splitmux->mux_start_time;
+  queued_time -= splitmux->fragment_start_time;
 
   /* Expand queued bytes estimate by muxer overhead */
   queued_bytes += (queued_bytes * splitmux->mux_overhead);
@@ -1015,43 +1048,68 @@ handle_gathered_gop (GstSplitMuxSink * splitmux)
 
   /* Check for overrun - have we output at least one byte and overrun
    * either threshold? */
-  if ((splitmux->have_muxed_something &&
+  if ((splitmux->fragment_total_bytes > 0 &&
           ((splitmux->threshold_bytes > 0 &&
                   queued_bytes > splitmux->threshold_bytes) ||
               (splitmux->threshold_time > 0 &&
                   queued_time > splitmux->threshold_time)))) {
 
-    splitmux->state = SPLITMUX_STATE_ENDING_FILE;
-    splitmux->update_mux_start_time = TRUE;
+    /* Tell the output side to start a new fragment */
     GST_INFO_OBJECT (splitmux,
-        "mq overflowed since last, draining out. max out TS is %"
-        GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
-    GST_SPLITMUX_BROADCAST (splitmux);
+        "This GOP (dur %" GST_STIME_FORMAT
+        ") would overflow the fragment, Sending start_new_fragment cmd",
+        GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
+            splitmux->gop_start_time));
+    cmd = out_cmd_buf_new ();
+    cmd->start_new_fragment = TRUE;
+    g_queue_push_head (&splitmux->out_cmd_q, cmd);
+    GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
+
+    new_out_ts = splitmux->reference_ctx->in_running_time;
+    splitmux->fragment_start_time = splitmux->gop_start_time;
+    splitmux->fragment_total_bytes = 0;
+
+    if (request_next_keyframe (splitmux) == FALSE) {
+      GST_WARNING_OBJECT (splitmux,
+          "Could not request a keyframe. Files may not split at the exact location they should");
+    }
+  }
 
+  /* And set up to collect the next GOP */
+  if (!splitmux->reference_ctx->in_eos) {
+    splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
+    splitmux->gop_start_time = new_out_ts;
   } else {
-    /* No overflow */
-    GST_LOG_OBJECT (splitmux,
-        "This GOP didn't overflow the fragment. Bytes sent %" G_GUINT64_FORMAT
-        " queued %" G_GUINT64_FORMAT " time %" GST_STIME_FORMAT " Continuing.",
-        splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
-        queued_bytes, GST_STIME_ARGS (queued_time));
+    /* This is probably already the current state, but just in case: */
+    splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
+    new_out_ts = GST_CLOCK_STIME_NONE;  /* EOS runs until forever */
+  }
 
-    /* Wake everyone up to push this one GOP, then sleep */
-    splitmux->have_muxed_something = TRUE;
+  /* And wake all input contexts to send a wake-up event */
+  g_list_foreach (splitmux->contexts, (GFunc) ctx_set_unblock, NULL);
+  GST_SPLITMUX_BROADCAST_INPUT (splitmux);
 
-    if (!splitmux->reference_ctx->in_eos) {
-      splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
-      splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
-    } else {
-      splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
-      splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
-    }
+  /* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
+  splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
 
-    GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
-        GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
-    GST_SPLITMUX_BROADCAST (splitmux);
+  if (splitmux->gop_total_bytes > 0) {
+    GST_LOG_OBJECT (splitmux,
+        "Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
+        " time %" GST_STIME_FORMAT,
+        splitmux->fragment_total_bytes, GST_STIME_ARGS (queued_time));
+
+    /* Send this GOP to the output command queue */
+    cmd = out_cmd_buf_new ();
+    cmd->start_new_fragment = FALSE;
+    cmd->max_output_ts = new_out_ts;
+    GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
+        GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
+    g_queue_push_head (&splitmux->out_cmd_q, cmd);
+
+    GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
   }
 
+  splitmux->gop_total_bytes = 0;
 }
 
 /* Called with splitmux lock held */
@@ -1063,10 +1121,37 @@ static void
 check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 {
   GList *cur;
-  gboolean ready = TRUE;
-  GstClockTimeDiff current_max_in_running_time;
+  GstEvent *event;
+
+  /* On ENDING_FILE, the reference stream sends a command to start a new
+   * fragment, then releases the GOP for output in the new fragment.
+   *  If somes streams received no buffer during the last GOP that overran,
+   * because its next buffer has a timestamp bigger than
+   * ctx->max_in_running_time, its queue is empty. In that case the only
+   * way to wakeup the output thread is by injecting an event in the
+   * queue. This usually happen with subtitle streams.
+   * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
+  if (ctx->need_unblock) {
+    GST_LOG_OBJECT (ctx->sinkpad, "Sending splitmuxsink-unblock event");
+    event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
+        GST_EVENT_TYPE_SERIALIZED,
+        gst_structure_new ("splitmuxsink-unblock", "timestamp",
+            G_TYPE_INT64, splitmux->max_in_running_time, NULL));
+
+    GST_SPLITMUX_UNLOCK (splitmux);
+    gst_pad_send_event (ctx->sinkpad, event);
+    GST_SPLITMUX_LOCK (splitmux);
+
+    ctx->need_unblock = FALSE;
+    GST_SPLITMUX_BROADCAST_INPUT (splitmux);
+    /* state may have changed while we were unlocked. Loop again if so */
+    if (splitmux->input_state != SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT)
+      return;
+  }
+
+  if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
+    gboolean ready = TRUE;
 
-  if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
     /* Iterate each pad, and check that the input running time is at least
      * up to the reference running time, and if so handle the collected GOP */
     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
@@ -1081,7 +1166,7 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
           " EOS %d", tmpctx, tmpctx->srcpad,
           GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
 
-      if (splitmux->max_in_running_time != G_MAXINT64 &&
+      if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
           tmpctx->in_running_time < splitmux->max_in_running_time &&
           !tmpctx->in_eos) {
         GST_LOG_OBJECT (splitmux,
@@ -1106,74 +1191,17 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 
   /* Some pad is not yet ready, or GOP is being pushed
    * either way, sleep and wait to get woken */
-  current_max_in_running_time = splitmux->max_in_running_time;
-  while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ||
-          splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) &&
+  while (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
       !ctx->flushing &&
-      (current_max_in_running_time == splitmux->max_in_running_time)) {
-
-    GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)",
-        splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ?
-        "GOP complete" : "EOF draining", ctx);
-    GST_SPLITMUX_WAIT (splitmux);
+      (ctx->in_running_time >= splitmux->max_in_running_time) &&
+      (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
 
+    GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
+    GST_SPLITMUX_WAIT_INPUT (splitmux);
     GST_LOG_OBJECT (splitmux, "Done waiting for complete GOP (ctx %p)", ctx);
   }
 }
 
-static void
-check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
-{
-  GList *cur;
-  guint cur_len = g_queue_get_length (&ctx->queued_bufs);
-
-  GST_DEBUG_OBJECT (ctx->sinkpad,
-      "Checking queue length len %u cur_max %u queued gops %u",
-      cur_len, splitmux->mq_max_buffers, splitmux->queued_gops);
-
-  if (cur_len >= splitmux->mq_max_buffers) {
-    gboolean allow_grow = FALSE;
-
-    /* If collecting a GOP and this pad might block,
-     * and there isn't already a pending GOP in the queue
-     * then grow
-     */
-    if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE &&
-        ctx->in_running_time < splitmux->max_in_running_time &&
-        splitmux->queued_gops <= 1) {
-      allow_grow = TRUE;
-    } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
-        ctx->is_reference && splitmux->queued_gops <= 1) {
-      allow_grow = TRUE;
-    }
-
-    if (!allow_grow) {
-      for (cur = g_list_first (splitmux->contexts);
-          cur != NULL; cur = g_list_next (cur)) {
-        MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
-        GST_DEBUG_OBJECT (tmpctx->sinkpad,
-            " len %u out_blocked %d",
-            g_queue_get_length (&tmpctx->queued_bufs), tmpctx->out_blocked);
-        /* If another stream is starving, grow */
-        if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
-          allow_grow = TRUE;
-        }
-      }
-    }
-
-    if (allow_grow) {
-      splitmux->mq_max_buffers = cur_len + 1;
-
-      GST_INFO_OBJECT (splitmux,
-          "Multiqueue overrun - enlarging to %u buffers ctx %p",
-          splitmux->mq_max_buffers, ctx);
-
-      g_object_set (splitmux->mq, "max-size-buffers",
-          splitmux->mq_max_buffers, NULL);
-    }
-  }
-}
-
 static GstPadProbeReturn
 handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
 {
@@ -1193,6 +1221,9 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   }
   if (info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM) {
     GstEvent *event = gst_pad_probe_info_get_event (info);
+
+    GST_LOG_OBJECT (pad, "Event %" GST_PTR_FORMAT, event);
+
     switch (GST_EVENT_TYPE (event)) {
       case GST_EVENT_SEGMENT:
         gst_event_copy_segment (event, &ctx->in_segment);
@@ -1201,7 +1232,6 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         GST_SPLITMUX_LOCK (splitmux);
         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
         ctx->in_eos = FALSE;
-        ctx->in_bytes = 0;
         ctx->in_running_time = GST_CLOCK_STIME_NONE;
         GST_SPLITMUX_UNLOCK (splitmux);
         break;
@@ -1209,18 +1239,18 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         GST_SPLITMUX_LOCK (splitmux);
         ctx->in_eos = TRUE;
 
-        if (splitmux->state == SPLITMUX_STATE_STOPPED)
+        if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
           goto beach;
 
         if (ctx->is_reference) {
           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
-          /* Act as if this is a new keyframe with infinite timestamp */
-          splitmux->max_in_running_time = G_MAXINT64;
-          splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
+          /* check_completed_gop will act as if this is a new keyframe with infinite timestamp */
+          splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
           /* Wake up other input pads to collect this GOP */
-          GST_SPLITMUX_BROADCAST (splitmux);
+          GST_SPLITMUX_BROADCAST_INPUT (splitmux);
           check_completed_gop (splitmux, ctx);
-        } else if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
+        } else if (splitmux->input_state ==
+            SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
           /* If we are waiting for a GOP to be completed (ie, for aux
            * pads to catch up), then this pad is complete, so check
            * if the whole GOP is.
@@ -1229,6 +1259,37 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         }
         GST_SPLITMUX_UNLOCK (splitmux);
         break;
+      case GST_EVENT_GAP:{
+        GstClockTime gap_ts;
+        GstClockTimeDiff rtime;
+
+        gst_event_parse_gap (event, &gap_ts, NULL);
+        if (gap_ts == GST_CLOCK_TIME_NONE)
+          break;
+
+        GST_SPLITMUX_LOCK (splitmux);
+
+        if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
+          goto beach;
+        rtime = my_segment_to_running_time (&ctx->in_segment, gap_ts);
+
+        GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
+            GST_STIME_ARGS (rtime));
+
+        if (ctx->is_reference
+            && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
+          splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
+          GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
+              GST_STIME_ARGS (splitmux->fragment_start_time));
+          /* Also take this as the first start time when starting up,
+           * so that we start counting overflow from the first frame */
+          if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
+            splitmux->max_in_running_time = splitmux->fragment_start_time;
+        }
+
+        GST_SPLITMUX_UNLOCK (splitmux);
+        break;
+      }
       default:
         break;
     }
@@ -1247,7 +1308,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
 
   GST_SPLITMUX_LOCK (splitmux);
 
-  if (splitmux->state == SPLITMUX_STATE_STOPPED)
+  if (splitmux->input_state == SPLITMUX_INPUT_STATE_STOPPED)
     goto beach;
 
   /* If this buffer has a timestamp, advance the input timestamp of the
@@ -1277,31 +1338,33 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   buf_info->buf_size = gst_buffer_get_size (buf);
   buf_info->duration = GST_BUFFER_DURATION (buf);
 
-  /* Update total input byte counter for overflow detect */
-  ctx->in_bytes += buf_info->buf_size;
-
-  /* initialize mux_start_time */
-  if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) {
-    splitmux->mux_start_time = buf_info->run_ts;
+  /* initialize fragment_start_time */
+  if (ctx->is_reference
+      && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
+    splitmux->gop_start_time = splitmux->fragment_start_time = buf_info->run_ts;
     GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
-        GST_STIME_ARGS (splitmux->mux_start_time));
+        GST_STIME_ARGS (splitmux->fragment_start_time));
     /* Also take this as the first start time when starting up,
      * so that we start counting overflow from the first frame */
     if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
-      splitmux->max_in_running_time = splitmux->mux_start_time;
+      splitmux->max_in_running_time = splitmux->fragment_start_time;
+    if (request_next_keyframe (splitmux) == FALSE) {
+      GST_WARNING_OBJECT (splitmux,
+          "Could not request a keyframe. Files may not split at the exact location they should");
+    }
   }
 
   GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
-      " total in_bytes %" G_GUINT64_FORMAT,
-      GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes);
+      " total GOP bytes %" G_GUINT64_FORMAT,
+      GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
 
   loop_again = TRUE;
   do {
     if (ctx->flushing)
       break;
 
-    switch (splitmux->state) {
-      case SPLITMUX_STATE_COLLECTING_GOP_START:
+    switch (splitmux->input_state) {
+      case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
         if (ctx->is_reference) {
           /* If a keyframe, we have a complete GOP */
           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
@@ -1309,33 +1372,50 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
               splitmux->max_in_running_time >= ctx->in_running_time) {
             /* Pass this buffer through */
             loop_again = FALSE;
+            /* Allow other input pads to catch up to here too */
+            splitmux->max_in_running_time = ctx->in_running_time;
+            GST_SPLITMUX_BROADCAST_INPUT (splitmux);
             break;
           }
           GST_INFO_OBJECT (pad,
               "Have keyframe with running time %" GST_STIME_FORMAT,
               GST_STIME_ARGS (ctx->in_running_time));
           keyframe = TRUE;
-          splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
+          splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
           splitmux->max_in_running_time = ctx->in_running_time;
           /* Wake up other input pads to collect this GOP */
-          GST_SPLITMUX_BROADCAST (splitmux);
+          GST_SPLITMUX_BROADCAST_INPUT (splitmux);
           check_completed_gop (splitmux, ctx);
         } else {
+          /* Pass this buffer if the reference ctx is far enough ahead */
+          if (ctx->in_running_time < splitmux->max_in_running_time) {
+            loop_again = FALSE;
+            break;
+          }
+
           /* We're still waiting for a keyframe on the reference pad, sleep */
           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
-          GST_SPLITMUX_WAIT (splitmux);
-          GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
-              splitmux->state);
+          GST_SPLITMUX_WAIT_INPUT (splitmux);
+          GST_LOG_OBJECT (pad,
+              "Done sleeping for GOP start input state now %d",
+              splitmux->input_state);
         }
         break;
-      case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
+      case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
+        /* We're collecting a GOP. If this is the reference context,
+         * we need to check if this is a keyframe that marks the start
+         * of the next GOP. If it is, it marks the end of the GOP we're
+         * collecting, so sleep and wait until all the other pads also
+         * reach that timestamp - at which point, we have an entire GOP
+         * and either go to ENDING_FILE or release this GOP to the muxer and
+         * go back to COLLECT_GOP_START. */
 
         /* If we overran the target timestamp, it might be time to process
          * the GOP, otherwise bail out for more data
          */
         GST_LOG_OBJECT (pad,
-            "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT,
-            GST_STIME_ARGS (ctx->in_running_time),
+            "Checking TS %" GST_STIME_FORMAT " against max %"
+            GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
             GST_STIME_ARGS (splitmux->max_in_running_time));
 
         if (ctx->in_running_time < splitmux->max_in_running_time) {
@@ -1347,53 +1427,28 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
             "Collected last packet of GOP. Checking other pads");
         check_completed_gop (splitmux, ctx);
         break;
-      case SPLITMUX_STATE_ENDING_FILE:{
-        GstEvent *event;
-
-        /* If somes streams received no buffer during the last GOP that overran,
-         * because its next buffer has a timestamp bigger than
-         * ctx->max_in_running_time, its queue is empty. In that case the only
-         * way to wakeup the output thread is by injecting an event in the
-         * queue. This usually happen with subtitle streams.
-         * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
-        GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
-        event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
-            GST_EVENT_TYPE_SERIALIZED,
-            gst_structure_new ("splitmuxsink-unblock", "timestamp",
-                G_TYPE_INT64, splitmux->max_in_running_time, NULL));
-
-        GST_SPLITMUX_UNLOCK (splitmux);
-        gst_pad_send_event (ctx->sinkpad, event);
-        GST_SPLITMUX_LOCK (splitmux);
-        /* state may have changed while we were unlocked. Loop again if so */
-        if (splitmux->state != SPLITMUX_STATE_ENDING_FILE)
-          break;
-        /* fallthrough */
       }
-      case SPLITMUX_STATE_START_NEXT_FRAGMENT:
-        /* A fragment is ending, wait until that's done before continuing */
-        GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
-        GST_SPLITMUX_WAIT (splitmux);
-        GST_DEBUG_OBJECT (pad,
-            "Done sleeping for fragment restart state now %d", splitmux->state);
+      case SPLITMUX_INPUT_STATE_FINISHING_UP:
+        loop_again = FALSE;
         break;
       default:
         loop_again = FALSE;
         break;
     }
-  } while (loop_again);
+  }
+  while (loop_again);
 
   if (keyframe) {
-    splitmux->queued_gops++;
+    splitmux->queued_keyframes++;
     buf_info->keyframe = TRUE;
   }
 
+  /* Update total input byte counter for overflow detect */
+  splitmux->gop_total_bytes += buf_info->buf_size;
+
   /* Now add this buffer to the queue just before returning */
   g_queue_push_head (&ctx->queued_bufs, buf_info);
 
-  /* Check the buffer will fit in the mq */
-  check_queue_length (splitmux, ctx);
-
   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
       " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
 
@@ -1407,6 +1462,89 @@ beach:
   return GST_PAD_PROBE_PASS;
 }
 
+static void
+grow_blocked_queues (GstSplitMuxSink * splitmux)
+{
+  GList *cur;
+
+  /* Scan other queues for full-ness and grow them */
+  for (cur = g_list_first (splitmux->contexts);
+      cur != NULL; cur = g_list_next (cur)) {
+    MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
+    guint cur_limit;
+    guint cur_len = g_queue_get_length (&tmpctx->queued_bufs);
+
+    g_object_get (tmpctx->q, "max-size-buffers", &cur_limit, NULL);
+    GST_LOG_OBJECT (tmpctx->q, "Queue len %u", cur_len);
+
+    if (cur_len >= cur_limit) {
+      cur_limit = cur_len + 1;
+      GST_DEBUG_OBJECT (tmpctx->q,
+          "Queue overflowed and needs enlarging. Growing to %u buffers",
+          cur_limit);
+      g_object_set (tmpctx->q, "max-size-buffers", cur_limit, NULL);
+    }
+  }
+}
+
+static void
+handle_q_underrun (GstElement * q, gpointer user_data)
+{
+  MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
+  GstSplitMuxSink *splitmux = ctx->splitmux;
+
+  GST_SPLITMUX_LOCK (splitmux);
+  GST_DEBUG_OBJECT (q,
+      "Queue reported underrun with %d keyframes and %d cmds enqueued",
+      splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
+  grow_blocked_queues (splitmux);
+  GST_SPLITMUX_UNLOCK (splitmux);
+}
+
+static void
+handle_q_overrun (GstElement * q, gpointer user_data)
+{
+  MqStreamCtx *ctx = (MqStreamCtx *) (user_data);
+  GstSplitMuxSink *splitmux = ctx->splitmux;
+  gboolean allow_grow = FALSE;
+
+  GST_SPLITMUX_LOCK (splitmux);
+  GST_DEBUG_OBJECT (q,
+      "Queue reported overrun with %d keyframes and %d cmds enqueued",
+      splitmux->queued_keyframes, g_queue_get_length (&splitmux->out_cmd_q));
+
+  if (splitmux->queued_keyframes < 2) {
+    /* Less than a full GOP queued, grow the queue */
+    allow_grow = TRUE;
+  } else if (g_queue_get_length (&splitmux->out_cmd_q) < 1) {
+    allow_grow = TRUE;
+  } else {
+    /* If another queue is starved, grow */
+    GList *cur;
+    for (cur = g_list_first (splitmux->contexts);
+        cur != NULL; cur = g_list_next (cur)) {
+      MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
+      if (tmpctx != ctx && g_queue_get_length (&tmpctx->queued_bufs) < 1) {
+        allow_grow = TRUE;
+      }
+    }
+  }
+  GST_SPLITMUX_UNLOCK (splitmux);
+
+  if (allow_grow) {
+    guint cur_limit;
+
+    g_object_get (q, "max-size-buffers", &cur_limit, NULL);
+    cur_limit++;
+
+    GST_DEBUG_OBJECT (q,
+        "Queue overflowed and needs enlarging. Growing to %u buffers",
+        cur_limit);
+
+    g_object_set (q, "max-size-buffers", cur_limit, NULL);
+  }
+}
+
 static GstPad *
 gst_splitmux_sink_request_new_pad (GstElement * element,
     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
@@ -1414,7 +1552,8 @@ gst_splitmux_sink_request_new_pad (GstElement * element,
   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
   GstPadTemplate *mux_template = NULL;
   GstPad *res = NULL;
-  GstPad *mq_sink, *mq_src;
+  GstElement *q;
+  GstPad *q_sink = NULL, *q_src = NULL;
   gchar *gname;
   gboolean is_video = FALSE;
   MqStreamCtx *ctx;
@@ -1422,7 +1561,7 @@ gst_splitmux_sink_request_new_pad (GstElement * element,
   GST_DEBUG_OBJECT (element, "templ:%s, name:%s", templ->name_template, name);
 
   GST_SPLITMUX_LOCK (splitmux);
-  if (!create_elements (splitmux))
+  if (!create_muxer (splitmux))
     goto fail;
 
   if (templ->name_template) {
@@ -1474,29 +1613,37 @@ gst_splitmux_sink_request_new_pad (GstElement * element,
   else
     gname = g_strdup (name);
 
-  if (!get_pads_from_mq (splitmux, &mq_sink, &mq_src)) {
-    gst_element_release_request_pad (splitmux->muxer, res);
-    gst_object_unref (GST_OBJECT (res));
+  if ((q = create_element (splitmux, "queue", NULL, FALSE)) == NULL)
     goto fail;
-  }
 
-  if (gst_pad_link (mq_src, res) != GST_PAD_LINK_OK) {
+  gst_element_set_state (q, GST_STATE_TARGET (splitmux));
+
+  g_object_set (q, "max-size-bytes", 0, "max-size-time", (guint64) (0),
+      "max-size-buffers", 5, NULL);
+
+  q_sink = gst_element_get_static_pad (q, "sink");
+  q_src = gst_element_get_static_pad (q, "src");
+
+  if (gst_pad_link (q_src, res) != GST_PAD_LINK_OK) {
     gst_element_release_request_pad (splitmux->muxer, res);
     gst_object_unref (GST_OBJECT (res));
-    gst_element_release_request_pad (splitmux->mq, mq_sink);
-    gst_object_unref (GST_OBJECT (mq_sink));
     goto fail;
   }
 
   gst_object_unref (GST_OBJECT (res));
 
   ctx = mq_stream_ctx_new (splitmux);
-  ctx->srcpad = mq_src;
-  ctx->sinkpad = mq_sink;
+  /* Context holds a ref: */
+  ctx->q = gst_object_ref (q);
+  ctx->srcpad = q_src;
+  ctx->sinkpad = q_sink;
+  ctx->q_overrun_id =
+      g_signal_connect (q, "overrun", (GCallback) handle_q_overrun, ctx);
+  g_signal_connect (q, "underrun", (GCallback) handle_q_underrun, ctx);
 
   mq_stream_ctx_ref (ctx);
   ctx->src_pad_block_id =
-      gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
+      gst_pad_add_probe (q_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
       _pad_block_destroy_src_notify);
   if (is_video && splitmux->reference_ctx != NULL) {
@@ -1508,25 +1655,22 @@ gst_splitmux_sink_request_new_pad (GstElement * element,
     ctx->is_reference = TRUE;
   }
 
-  res = gst_ghost_pad_new_from_template (gname, mq_sink, templ);
+  res = gst_ghost_pad_new_from_template (gname, q_sink, templ);
   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
 
   mq_stream_ctx_ref (ctx);
   ctx->sink_pad_block_id =
-      gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
+      gst_pad_add_probe (q_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
       (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify)
       _pad_block_destroy_sink_notify);
 
   GST_DEBUG_OBJECT (splitmux, "Request pad %" GST_PTR_FORMAT
-      " is mq pad %" GST_PTR_FORMAT, res, mq_sink);
+      " feeds queue pad %" GST_PTR_FORMAT, res, q_sink);
 
   splitmux->contexts = g_list_prepend (splitmux->contexts, ctx);
 
   g_free (gname);
 
-  gst_object_unref (mq_sink);
-  gst_object_unref (mq_src);
-
   if (is_video)
     splitmux->have_video = TRUE;
 
@@ -1538,6 +1682,11 @@ gst_splitmux_sink_request_new_pad (GstElement * element,
   return res;
 fail:
   GST_SPLITMUX_UNLOCK (splitmux);
+
+  if (q_sink)
+    gst_object_unref (q_sink);
+  if (q_src)
+    gst_object_unref (q_src);
   return NULL;
 already_have_video:
   GST_DEBUG_OBJECT (splitmux, "video sink pad already requested");
@@ -1549,23 +1698,18 @@ static void
 gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
 {
   GstSplitMuxSink *splitmux = (GstSplitMuxSink *) element;
-  GstPad *mqsink, *mqsrc = NULL, *muxpad = NULL;
+  GstPad *muxpad = NULL;
   MqStreamCtx *ctx =
       (MqStreamCtx *) (g_object_get_qdata ((GObject *) (pad), PAD_CONTEXT));
 
   GST_SPLITMUX_LOCK (splitmux);
 
-  if (splitmux->muxer == NULL || splitmux->mq == NULL)
+  if (splitmux->muxer == NULL)
     goto fail;                  /* Elements don't exist yet - nothing to release */
 
   GST_INFO_OBJECT (pad, "releasing request pad");
 
-  mqsink = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
-  /* The ghostpad target might have disappeared during pipeline destruct */
-  if (mqsink)
-    mqsrc = mq_sink_to_src (splitmux->mq, mqsink);
-  if (mqsrc)
-    muxpad = gst_pad_get_peer (mqsrc);
+  muxpad = gst_pad_get_peer (ctx->srcpad);
 
   /* Remove the context from our consideration */
   splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
@@ -1581,24 +1725,15 @@ gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
   if (ctx == splitmux->reference_ctx)
     splitmux->reference_ctx = NULL;
 
-  /* Release and free the mq input */
-  if (mqsink) {
-    gst_element_release_request_pad (splitmux->mq, mqsink);
-    gst_object_unref (mqsink);
-  }
-
   /* Release and free the muxer input */
   if (muxpad) {
     gst_element_release_request_pad (splitmux->muxer, muxpad);
     gst_object_unref (muxpad);
   }
 
-  if (mqsrc)
-    gst_object_unref (mqsrc);
-
   if (GST_PAD_PAD_TEMPLATE (pad) &&
-      g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE (pad)),
-          "video"))
+      g_str_equal (GST_PAD_TEMPLATE_NAME_TEMPLATE (GST_PAD_PAD_TEMPLATE
+              (pad)), "video"))
     splitmux->have_video = FALSE;
 
   gst_element_remove_pad (element, pad);
@@ -1638,21 +1773,9 @@ create_element (GstSplitMuxSink * splitmux,
 }
 
 static gboolean
-create_elements (GstSplitMuxSink * splitmux)
+create_muxer (GstSplitMuxSink * splitmux)
 {
   /* Create internal elements */
-  if (splitmux->mq == NULL) {
-    if ((splitmux->mq =
-            create_element (splitmux, "multiqueue", "multiqueue",
-                FALSE)) == NULL)
-      goto fail;
-
-    splitmux->mq_max_buffers = 5;
-    /* No bytes or time limit, we limit buffers manually */
-    g_object_set (splitmux->mq, "max-size-bytes", 0, "max-size-time",
-        (guint64) 0, "max-size-buffers", splitmux->mq_max_buffers, NULL);
-  }
-
   if (splitmux->muxer == NULL) {
     GstElement *provided_muxer = NULL;
 
@@ -1768,6 +1891,15 @@ create_sink (GstSplitMuxSink * splitmux)
       }
     }
 
+#if 1
+    if (g_object_class_find_property (G_OBJECT_GET_CLASS (splitmux->sink),
+            "async") != NULL) {
+      /* async child elements are causing state change races and weird
+       * failures, so let's try and turn that off */
+      g_object_set (splitmux->sink, "async", FALSE, NULL);
+    }
+#endif
+
     if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
       g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
       goto fail;
@@ -1867,7 +1999,7 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
   switch (transition) {
     case GST_STATE_CHANGE_NULL_TO_READY:{
       GST_SPLITMUX_LOCK (splitmux);
-      if (!create_elements (splitmux) || !create_sink (splitmux)) {
+      if (!create_muxer (splitmux) || !create_sink (splitmux)) {
         ret = GST_STATE_CHANGE_FAILURE;
         GST_SPLITMUX_UNLOCK (splitmux);
         goto beach;
@@ -1879,23 +2011,25 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_READY_TO_PAUSED:{
       GST_SPLITMUX_LOCK (splitmux);
       /* Start by collecting one input on each pad */
-      splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
+      splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
+      splitmux->output_state = SPLITMUX_OUTPUT_STATE_START_NEXT_FILE;
       splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
-      splitmux->muxed_out_time = splitmux->mux_start_time =
-          GST_CLOCK_STIME_NONE;
-      splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
-      splitmux->opening_first_fragment = TRUE;
+      splitmux->gop_start_time = splitmux->muxed_out_time =
+          splitmux->fragment_start_time = GST_CLOCK_STIME_NONE;
+      splitmux->muxed_out_bytes = 0;
       GST_SPLITMUX_UNLOCK (splitmux);
       break;
     }
     case GST_STATE_CHANGE_PAUSED_TO_READY:
     case GST_STATE_CHANGE_READY_TO_NULL:
       GST_SPLITMUX_LOCK (splitmux);
-      splitmux->state = SPLITMUX_STATE_STOPPED;
+      splitmux->output_state = SPLITMUX_OUTPUT_STATE_STOPPED;
+      splitmux->input_state = SPLITMUX_INPUT_STATE_STOPPED;
       /* Wake up any blocked threads */
       GST_LOG_OBJECT (splitmux,
           "State change -> NULL or READY. Waking threads");
-      GST_SPLITMUX_BROADCAST (splitmux);
+      GST_SPLITMUX_BROADCAST_INPUT (splitmux);
+      GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
       GST_SPLITMUX_UNLOCK (splitmux);
       break;
     default:
@@ -1941,10 +2075,10 @@ beach:
       ret == GST_STATE_CHANGE_FAILURE) {
     /* Cleanup elements on failed transition out of NULL */
     gst_splitmux_reset (splitmux);
+    GST_SPLITMUX_LOCK (splitmux);
+    do_async_done (splitmux);
+    GST_SPLITMUX_UNLOCK (splitmux);
   }
-  GST_SPLITMUX_LOCK (splitmux);
-  do_async_done (splitmux);
-  GST_SPLITMUX_UNLOCK (splitmux);
   return ret;
 }
 
index 11eb5d5e0a9e4a073d219ec209324769e703e868..afd73f7f2d9964ca9fd3e38fca32d3ab8548cf4e 100644 (file)
 #include <gst/pbutils/pbutils.h>
 
 G_BEGIN_DECLS
-
 #define GST_TYPE_SPLITMUX_SINK               (gst_splitmux_sink_get_type())
 #define GST_SPLITMUX_SINK(obj)               (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_SPLITMUX_SINK,GstSplitMuxSink))
 #define GST_SPLITMUX_SINK_CLASS(klass)       (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_SPLITMUX_SINK,GstSplitMuxSinkClass))
 #define GST_IS_SPLITMUX_SINK(obj)            (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_SPLITMUX_SINK))
 #define GST_IS_SPLITMUX_SINK_CLASS(klass)    (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_SPLITMUX_SINK))
-
 typedef struct _GstSplitMuxSink GstSplitMuxSink;
 typedef struct _GstSplitMuxSinkClass GstSplitMuxSinkClass;
 
-GType gst_splitmux_sink_get_type(void);
+GType gst_splitmux_sink_get_type (void);
 gboolean register_splitmuxsink (GstPlugin * plugin);
 
-typedef enum _SplitMuxState {
-  SPLITMUX_STATE_STOPPED,
-  SPLITMUX_STATE_COLLECTING_GOP_START,
-  SPLITMUX_STATE_WAITING_GOP_COMPLETE,
-  SPLITMUX_STATE_ENDING_FILE,
-  SPLITMUX_STATE_START_NEXT_FRAGMENT,
-} SplitMuxState;
+typedef enum _SplitMuxInputState
+{
+  SPLITMUX_INPUT_STATE_STOPPED,
+  SPLITMUX_INPUT_STATE_COLLECTING_GOP_START,    /* Waiting for the next ref ctx keyframe */
+  SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT,     /* Waiting for all streams to collect GOP */
+  SPLITMUX_INPUT_STATE_FINISHING_UP             /* Got EOS from reference ctx, send everything */
+} SplitMuxInputState;
+
+typedef enum _SplitMuxOutputState
+{
+  SPLITMUX_OUTPUT_STATE_STOPPED,
+  SPLITMUX_OUTPUT_STATE_AWAITING_COMMAND,       /* Waiting first command packet from input */
+  SPLITMUX_OUTPUT_STATE_OUTPUT_GOP,     /* Outputting a collected GOP */
+  SPLITMUX_OUTPUT_STATE_ENDING_FILE,    /* Finishing the current fragment */
+  SPLITMUX_OUTPUT_STATE_START_NEXT_FILE /* Restarting after ENDING_FILE */
+} SplitMuxOutputState;
+
+typedef struct _SplitMuxOutputCommand
+{
+  gboolean start_new_fragment;  /* Whether to start a new fragment before advancing output ts */
+  GstClockTimeDiff max_output_ts;       /* Set the limit to stop GOP output */
+} SplitMuxOutputCommand;
 
 typedef struct _MqStreamBuf
 {
@@ -59,6 +72,7 @@ typedef struct _MqStreamCtx
 
   GstSplitMuxSink *splitmux;
 
+  guint q_overrun_id;
   guint sink_pad_block_id;
   guint src_pad_block_id;
 
@@ -67,6 +81,7 @@ typedef struct _MqStreamCtx
   gboolean flushing;
   gboolean in_eos;
   gboolean out_eos;
+  gboolean need_unblock;
 
   GstSegment in_segment;
   GstSegment out_segment;
@@ -74,26 +89,24 @@ typedef struct _MqStreamCtx
   GstClockTimeDiff in_running_time;
   GstClockTimeDiff out_running_time;
 
-  guint64 in_bytes;
-
+  GstElement *q;
   GQueue queued_bufs;
 
   GstPad *sinkpad;
   GstPad *srcpad;
 
-  gboolean out_blocked;
-
   GstBuffer *cur_buffer;
   GstEvent *pending_gap;
 } MqStreamCtx;
 
-struct _GstSplitMuxSink {
+struct _GstSplitMuxSink
+{
   GstBin parent;
 
   GMutex lock;
-  GCond data_cond;
+  GCond input_cond;
+  GCond output_cond;
 
-  SplitMuxState state;
   gdouble mux_overhead;
 
   GstClockTime threshold_time;
@@ -101,9 +114,6 @@ struct _GstSplitMuxSink {
   guint max_files;
   gboolean send_keyframe_requests;
 
-  guint mq_max_buffers;
-
-  GstElement *mq;
   GstElement *muxer;
   GstElement *sink;
 
@@ -112,25 +122,39 @@ struct _GstSplitMuxSink {
   GstElement *provided_sink;
   GstElement *active_sink;
 
+  gboolean ready_for_output;
+
   gchar *location;
   guint fragment_id;
 
   GList *contexts;
 
-  MqStreamCtx *reference_ctx;
-  guint queued_gops;
+  SplitMuxInputState input_state;
   GstClockTimeDiff max_in_running_time;
+  /* Number of bytes sent to the
+   * current fragment */
+  guint64 fragment_total_bytes;
+  /* Number of bytes we've collected into
+   * the GOP that's being collected */
+  guint64 gop_total_bytes;
+  /* Start time of the current fragment */
+  GstClockTimeDiff fragment_start_time;
+  /* Start time of the current GOP */
+  GstClockTimeDiff gop_start_time;
+
+  GQueue out_cmd_q;             /* Queue of commands for output thread */
+
+  SplitMuxOutputState output_state;
   GstClockTimeDiff max_out_running_time;
+  GstClockTimeDiff next_max_out_running_time;
 
   GstClockTimeDiff muxed_out_time;
   guint64 muxed_out_bytes;
-  gboolean have_muxed_something;
-  gboolean update_mux_start_time;
 
-  GstClockTimeDiff mux_start_time;
-  guint64 mux_start_bytes;
+  MqStreamCtx *reference_ctx;
+  /* Count of queued keyframes in the reference ctx */
+  guint queued_keyframes;
 
-  gboolean opening_first_fragment;
   gboolean switching_fragment;
 
   gboolean have_video;
@@ -139,10 +163,10 @@ struct _GstSplitMuxSink {
   gboolean async_pending;
 };
 
-struct _GstSplitMuxSinkClass {
+struct _GstSplitMuxSinkClass
+{
   GstBinClass parent_class;
 };
 
 G_END_DECLS
-
 #endif /* __GST_SPLITMUXSINK_H__ */