splitmuxsink: Allow time and bytes to reach their respective thresholds
[platform/upstream/gst-plugins-good.git] / gst / multifile / gstsplitmuxsink.c
index 07ac307..98f744f 100644 (file)
@@ -24,7 +24,7 @@
  * This element wraps a muxer and a sink, and starts a new file when the mux
  * contents are about to cross a threshold of maximum size of maximum time,
  * splitting at video keyframe boundaries. Exactly one input video stream
- * is required, with as many accompanying audio and subtitle streams as
+ * can be muxed, with as many accompanying audio and subtitle streams as
  * desired.
  *
  * By default, it uses mp4mux and filesink, but they can be changed via
  * The minimum file size is 1 GOP, however - so limits may be overrun if the
  * distance between any 2 keyframes is larger than the limits.
  *
- * The splitting process is driven by the video stream contents, and
- * the video stream must contain closed GOPs for the output file parts
- * to be played individually correctly.
+ * If a video stream is available, the splitting process is driven by the video
+ * stream contents, and the video stream must contain closed GOPs for the output
+ * file parts to be played individually correctly. In the absence of a video
+ * stream, the first available stream is used as reference for synchronization.
  *
  * <refsect2>
  * <title>Example pipelines</title>
@@ -53,6 +54,7 @@
 #endif
 
 #include <string.h>
+#include <glib/gstdio.h>
 #include "gstsplitmuxsink.h"
 
 GST_DEBUG_CATEGORY_STATIC (splitmux_debug);
@@ -69,6 +71,7 @@ enum
   PROP_LOCATION,
   PROP_MAX_SIZE_TIME,
   PROP_MAX_SIZE_BYTES,
+  PROP_MAX_FILES,
   PROP_MUXER_OVERHEAD,
   PROP_MUXER,
   PROP_SINK
@@ -76,6 +79,7 @@ enum
 
 #define DEFAULT_MAX_SIZE_TIME       0
 #define DEFAULT_MAX_SIZE_BYTES      0
+#define DEFAULT_MAX_FILES           0
 #define DEFAULT_MUXER_OVERHEAD      0.02
 #define DEFAULT_MUXER "mp4mux"
 #define DEFAULT_SINK "filesink"
@@ -138,6 +142,8 @@ static void start_next_fragment (GstSplitMuxSink * splitmux);
 static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx);
 static void mq_stream_ctx_unref (MqStreamCtx * ctx);
 
+static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux);
+
 static MqStreamBuf *
 mq_stream_buf_new (void)
 {
@@ -167,12 +173,12 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
       "Convenience bin that muxes incoming streams into multiple time/size limited files",
       "Jan Schmidt <jan@centricular.com>");
 
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&video_sink_template));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&audio_sink_template));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&subtitle_sink_template));
+  gst_element_class_add_static_pad_template (gstelement_class,
+      &video_sink_template);
+  gst_element_class_add_static_pad_template (gstelement_class,
+      &audio_sink_template);
+  gst_element_class_add_static_pad_template (gstelement_class,
+      &subtitle_sink_template);
 
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state);
@@ -201,6 +207,13 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
       g_param_spec_uint64 ("max-size-bytes", "Max. size bytes",
           "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64,
           DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+  g_object_class_install_property (gobject_class, PROP_MAX_FILES,
+      g_param_spec_uint ("max-files", "Max files",
+          "Maximum number of files to keep on disk. Once the maximum is reached,"
+          "old files start to be deleted to make room for new ones.",
+          0, G_MAXUINT, DEFAULT_MAX_FILES,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
 
   g_object_class_install_property (gobject_class, PROP_MUXER,
       g_param_spec_object ("muxer", "Muxer",
@@ -232,6 +245,7 @@ gst_splitmux_sink_init (GstSplitMuxSink * splitmux)
   splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD;
   splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME;
   splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES;
+  splitmux->max_files = DEFAULT_MAX_FILES;
 
   GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK);
 }
@@ -306,6 +320,11 @@ gst_splitmux_sink_set_property (GObject * object, guint prop_id,
       splitmux->threshold_time = g_value_get_uint64 (value);
       GST_OBJECT_UNLOCK (splitmux);
       break;
+    case PROP_MAX_FILES:
+      GST_OBJECT_LOCK (splitmux);
+      splitmux->max_files = g_value_get_uint (value);
+      GST_OBJECT_UNLOCK (splitmux);
+      break;
     case PROP_MUXER_OVERHEAD:
       GST_OBJECT_LOCK (splitmux);
       splitmux->mux_overhead = g_value_get_double (value);
@@ -353,6 +372,11 @@ gst_splitmux_sink_get_property (GObject * object, guint prop_id,
       g_value_set_uint64 (value, splitmux->threshold_time);
       GST_OBJECT_UNLOCK (splitmux);
       break;
+    case PROP_MAX_FILES:
+      GST_OBJECT_LOCK (splitmux);
+      g_value_set_uint (value, splitmux->max_files);
+      GST_OBJECT_UNLOCK (splitmux);
+      break;
     case PROP_MUXER_OVERHEAD:
       GST_OBJECT_LOCK (splitmux);
       g_value_set_double (value, splitmux->mux_overhead);
@@ -374,6 +398,23 @@ gst_splitmux_sink_get_property (GObject * object, guint prop_id,
   }
 }
 
+/* Convenience function */
+static inline GstClockTimeDiff
+my_segment_to_running_time (GstSegment * segment, GstClockTime val)
+{
+  GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
+
+  if (GST_CLOCK_TIME_IS_VALID (val)) {
+    gboolean sign =
+        gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
+    if (sign > 0)
+      res = val;
+    else if (sign < 0)
+      res = -val;
+  }
+  return res;
+}
+
 static GstPad *
 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
 {
@@ -429,7 +470,7 @@ mq_stream_ctx_new (GstSplitMuxSink * splitmux)
   ctx->splitmux = splitmux;
   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
-  ctx->in_running_time = ctx->out_running_time = 0;
+  ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
   g_queue_init (&ctx->queued_bufs);
   return ctx;
 }
@@ -469,6 +510,26 @@ _pad_block_destroy_src_notify (MqStreamCtx * ctx)
   mq_stream_ctx_unref (ctx);
 }
 
+static void
+send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened)
+{
+  gchar *location = NULL;
+  GstMessage *msg;
+  const gchar *msg_name = opened ?
+      "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed";
+
+  g_object_get (splitmux->sink, "location", &location, NULL);
+
+  msg = gst_message_new_element (GST_OBJECT (splitmux),
+      gst_structure_new (msg_name,
+          "location", G_TYPE_STRING, location,
+          "running-time", GST_TYPE_CLOCK_TIME,
+          splitmux->reference_ctx->out_running_time, NULL));
+  gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg);
+
+  g_free (location);
+}
+
 /* Called with lock held, drops the lock to send EOS to the
  * pad
  */
@@ -501,11 +562,11 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
   do {
 
     GST_LOG_OBJECT (ctx->srcpad,
-        "Checking running time %" GST_TIME_FORMAT " against max %"
-        GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
-        GST_TIME_ARGS (splitmux->max_out_running_time));
+        "Checking running time %" GST_STIME_FORMAT " against max %"
+        GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
+        GST_STIME_ARGS (splitmux->max_out_running_time));
 
-    if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
+    if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
         ctx->out_running_time < splitmux->max_out_running_time) {
       splitmux->have_muxed_something = TRUE;
       return;
@@ -526,17 +587,17 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 
     GST_INFO_OBJECT (ctx->srcpad,
         "Sleeping for running time %"
-        GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
-        GST_TIME_ARGS (ctx->out_running_time),
-        GST_TIME_ARGS (splitmux->max_out_running_time));
+        GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")",
+        GST_STIME_ARGS (ctx->out_running_time),
+        GST_STIME_ARGS (splitmux->max_out_running_time));
     ctx->out_blocked = TRUE;
     /* Expand the mq if needed before sleeping */
     check_queue_length (splitmux, ctx);
     GST_SPLITMUX_WAIT (splitmux);
     ctx->out_blocked = FALSE;
     GST_INFO_OBJECT (ctx->srcpad,
-        "Woken for new max running time %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (splitmux->max_out_running_time));
+        "Woken for new max running time %" GST_STIME_FORMAT,
+        GST_STIME_ARGS (splitmux->max_out_running_time));
   } while (1);
 }
 
@@ -546,7 +607,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   GstSplitMuxSink *splitmux = ctx->splitmux;
   MqStreamBuf *buf_info = NULL;
 
-  GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
+  GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
 
   /* FIXME: Handle buffer lists, until then make it clear they won't work */
   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
@@ -586,6 +647,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         break;
       case GST_EVENT_GAP:{
         GstClockTime gap_ts;
+        GstClockTimeDiff rtime;
 
         gst_event_parse_gap (event, &gap_ts, NULL);
         if (gap_ts == GST_CLOCK_TIME_NONE)
@@ -593,19 +655,40 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
 
         GST_SPLITMUX_LOCK (splitmux);
 
-        gap_ts = gst_segment_to_running_time (&ctx->out_segment,
-            GST_FORMAT_TIME, gap_ts);
+        rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
 
-        GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
-            GST_TIME_ARGS (gap_ts));
+        GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
+            GST_STIME_ARGS (rtime));
 
         if (splitmux->state == SPLITMUX_STATE_STOPPED)
           goto beach;
-        ctx->out_running_time = gap_ts;
-        complete_or_wait_on_out (splitmux, ctx);
+
+        if (rtime != GST_CLOCK_STIME_NONE) {
+          ctx->out_running_time = rtime;
+          complete_or_wait_on_out (splitmux, ctx);
+        }
         GST_SPLITMUX_UNLOCK (splitmux);
         break;
       }
+      case GST_EVENT_CUSTOM_DOWNSTREAM:{
+        const GstStructure *s;
+        GstClockTimeDiff ts = 0;
+
+        s = gst_event_get_structure (event);
+        if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
+          break;
+
+        gst_structure_get_int64 (s, "timestamp", &ts);
+
+        GST_SPLITMUX_LOCK (splitmux);
+
+        if (splitmux->state == SPLITMUX_STATE_STOPPED)
+          goto beach;
+        ctx->out_running_time = ts;
+        complete_or_wait_on_out (splitmux, ctx);
+        GST_SPLITMUX_UNLOCK (splitmux);
+        return GST_PAD_PROBE_DROP;
+      }
       default:
         break;
     }
@@ -627,13 +710,18 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   ctx->out_running_time = buf_info->run_ts;
 
   GST_LOG_OBJECT (splitmux,
-      "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
+      "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
       " size %" G_GSIZE_FORMAT,
-      pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
+      pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
+
+  if (splitmux->opening_first_fragment) {
+    send_fragment_opened_closed_msg (splitmux, TRUE);
+    splitmux->opening_first_fragment = FALSE;
+  }
 
   complete_or_wait_on_out (splitmux, ctx);
 
-  if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
+  if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE ||
       splitmux->muxed_out_time < buf_info->run_ts)
     splitmux->muxed_out_time = buf_info->run_ts;
 
@@ -643,8 +731,8 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   {
     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
-        " run ts %" GST_TIME_FORMAT, buf,
-        GST_TIME_ARGS (ctx->out_running_time));
+        " run ts %" GST_STIME_FORMAT, buf,
+        GST_STIME_ARGS (ctx->out_running_time));
   }
 #endif
 
@@ -673,8 +761,8 @@ restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux)
   gst_pad_sticky_events_foreach (ctx->srcpad,
       (GstPadStickyEventsForeachFunction) (resend_sticky), peer);
 
-  /* Clear EOS flag */
-  ctx->out_eos = FALSE;
+  /* Clear EOS flag if not actually EOS */
+  ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad);
 
   gst_object_unref (peer);
 }
@@ -687,35 +775,45 @@ static void
 start_next_fragment (GstSplitMuxSink * splitmux)
 {
   /* 1 change to new file */
+  splitmux->switching_fragment = TRUE;
+
+  gst_element_set_locked_state (splitmux->muxer, TRUE);
+  gst_element_set_locked_state (splitmux->active_sink, TRUE);
   gst_element_set_state (splitmux->muxer, GST_STATE_NULL);
   gst_element_set_state (splitmux->active_sink, GST_STATE_NULL);
 
   set_next_filename (splitmux);
 
-  gst_element_sync_state_with_parent (splitmux->active_sink);
-  gst_element_sync_state_with_parent (splitmux->muxer);
+  gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux));
+  gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux));
+  gst_element_set_locked_state (splitmux->muxer, FALSE);
+  gst_element_set_locked_state (splitmux->active_sink, FALSE);
+
+  splitmux->switching_fragment = FALSE;
 
   g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux);
 
   /* Switch state and go back to processing */
-  splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
-
-  if (!splitmux->video_ctx->in_eos) {
-    splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
+  if (!splitmux->reference_ctx->in_eos) {
+    splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
+    splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
   } else {
-    splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
+    splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
+    splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
     splitmux->have_muxed_something = FALSE;
   }
   splitmux->have_muxed_something =
-      (splitmux->video_ctx->in_running_time > splitmux->muxed_out_time);
+      (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time);
 
   /* Store the overflow parameters as the basis for the next fragment */
   splitmux->mux_start_time = splitmux->muxed_out_time;
   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
 
   GST_DEBUG_OBJECT (splitmux,
-      "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (splitmux->max_out_running_time));
+      "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
+      GST_STIME_ARGS (splitmux->max_out_running_time));
+
+  send_fragment_opened_closed_msg (splitmux, TRUE);
 
   GST_SPLITMUX_BROADCAST (splitmux);
 }
@@ -729,8 +827,11 @@ bus_handler (GstBin * bin, GstMessage * message)
     case GST_MESSAGE_EOS:
       /* If the state is draining out the current file, drop this EOS */
       GST_SPLITMUX_LOCK (splitmux);
+
+      send_fragment_opened_closed_msg (splitmux, FALSE);
+
       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
-          splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
+          splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) {
         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
         GST_SPLITMUX_BROADCAST (splitmux);
@@ -741,6 +842,20 @@ bus_handler (GstBin * bin, GstMessage * message)
       }
       GST_SPLITMUX_UNLOCK (splitmux);
       break;
+    case GST_MESSAGE_ASYNC_START:
+    case GST_MESSAGE_ASYNC_DONE:
+      /* Ignore state changes from our children while switching */
+      if (splitmux->switching_fragment) {
+        if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink ||
+            GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) {
+          GST_LOG_OBJECT (splitmux,
+              "Ignoring state change from child %" GST_PTR_FORMAT
+              " while switching", GST_MESSAGE_SRC (message));
+          gst_message_unref (message);
+          return;
+        }
+      }
+      break;
     default:
       break;
   }
@@ -760,7 +875,7 @@ handle_gathered_gop (GstSplitMuxSink * splitmux)
 {
   GList *cur;
   gsize queued_bytes = 0;
-  GstClockTime queued_time = 0;
+  GstClockTimeDiff queued_time = 0;
 
   /* Assess if the multiqueue contents overflowed the current file */
   for (cur = g_list_first (splitmux->contexts);
@@ -780,43 +895,45 @@ handle_gathered_gop (GstSplitMuxSink * splitmux)
   /* Expand queued bytes estimate by muxer overhead */
   queued_bytes += (queued_bytes * splitmux->mux_overhead);
 
-  GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
-      " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
+  GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
+      " bytes %" G_GSIZE_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
 
   /* Check for overrun - have we output at least one byte and overrun
    * either threshold? */
-  if (splitmux->have_muxed_something &&
-      ((splitmux->threshold_bytes > 0 &&
-              queued_bytes >= splitmux->threshold_bytes) ||
-          (splitmux->threshold_time > 0 &&
-              queued_time >= splitmux->threshold_time))) {
+  if ((splitmux->have_muxed_something &&
+          ((splitmux->threshold_bytes > 0 &&
+                  queued_bytes > splitmux->threshold_bytes) ||
+              (splitmux->threshold_time > 0 &&
+                  queued_time > splitmux->threshold_time)))) {
 
     splitmux->state = SPLITMUX_STATE_ENDING_FILE;
 
     GST_INFO_OBJECT (splitmux,
         "mq overflowed since last, draining out. max out TS is %"
-        GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
+        GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
     GST_SPLITMUX_BROADCAST (splitmux);
 
   } else {
     /* No overflow */
     GST_LOG_OBJECT (splitmux,
         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
-        " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
+        " queued %" G_GSIZE_FORMAT " time %" GST_STIME_FORMAT " Continuing.",
         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
-        queued_bytes, GST_TIME_ARGS (queued_time));
+        queued_bytes, GST_STIME_ARGS (queued_time));
 
     /* Wake everyone up to push this one GOP, then sleep */
-    splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
     splitmux->have_muxed_something = TRUE;
 
-    if (!splitmux->video_ctx->in_eos)
-      splitmux->max_out_running_time = splitmux->video_ctx->in_running_time;
-    else
-      splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
+    if (!splitmux->reference_ctx->in_eos) {
+      splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
+      splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
+    } else {
+      splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
+      splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
+    }
 
     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
-        GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
+        GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
     GST_SPLITMUX_BROADCAST (splitmux);
   }
 
@@ -824,7 +941,7 @@ handle_gathered_gop (GstSplitMuxSink * splitmux)
 
 /* Called with splitmux lock held */
 /* Called from each input pad when it is has all the pieces
- * for a GOP or EOS, starting with the video pad which has set the
+ * for a GOP or EOS, starting with the reference pad which has set the
  * splitmux->max_in_running_time
  */
 static void
@@ -832,22 +949,25 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 {
   GList *cur;
   gboolean ready = TRUE;
-  GstClockTime current_max_in_running_time;
+  GstClockTimeDiff current_max_in_running_time;
 
   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
     /* Iterate each pad, and check that the input running time is at least
-     * up to the video runnning time, and if so handle the collected GOP */
-    GST_LOG_OBJECT (splitmux, "Checking GOP collected, ctx %p", ctx);
-    for (cur = g_list_first (splitmux->contexts);
-        cur != NULL; cur = g_list_next (cur)) {
+     * up to the reference running time, and if so handle the collected GOP */
+    GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
+        GST_STIME_FORMAT " ctx %p",
+        GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
+    for (cur = g_list_first (splitmux->contexts); cur != NULL;
+        cur = g_list_next (cur)) {
       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
 
       GST_LOG_OBJECT (splitmux,
-          "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
+          "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
           " EOS %d", tmpctx, tmpctx->srcpad,
-          GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
+          GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
 
-      if (tmpctx->in_running_time < splitmux->max_in_running_time &&
+      if (splitmux->max_in_running_time != G_MAXINT64 &&
+          tmpctx->in_running_time < splitmux->max_in_running_time &&
           !tmpctx->in_eos) {
         GST_LOG_OBJECT (splitmux,
             "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep",
@@ -864,6 +984,11 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
     }
   }
 
+  /* If upstream reached EOS we are not expecting more data, no need to wait
+   * here. */
+  if (ctx->in_eos)
+    return;
+
   /* Some pad is not yet ready, or GOP is being pushed
    * either way, sleep and wait to get woken */
   current_max_in_running_time = splitmux->max_in_running_time;
@@ -903,7 +1028,7 @@ check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
         splitmux->queued_gops <= 1) {
       allow_grow = TRUE;
     } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START &&
-        ctx->is_video && splitmux->queued_gops <= 1) {
+        ctx->is_reference && splitmux->queued_gops <= 1) {
       allow_grow = TRUE;
     }
 
@@ -944,7 +1069,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   gboolean loop_again;
   gboolean keyframe = FALSE;
 
-  GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type);
+  GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type);
 
   /* FIXME: Handle buffer lists, until then make it clear they won't work */
   if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
@@ -962,7 +1087,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
         ctx->in_eos = FALSE;
         ctx->in_bytes = 0;
-        ctx->in_running_time = 0;
+        ctx->in_running_time = GST_CLOCK_STIME_NONE;
         GST_SPLITMUX_UNLOCK (splitmux);
         break;
       case GST_EVENT_EOS:
@@ -972,10 +1097,10 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         if (splitmux->state == SPLITMUX_STATE_STOPPED)
           goto beach;
 
-        if (ctx->is_video) {
-          GST_INFO_OBJECT (splitmux, "Got Video EOS. Finishing up");
+        if (ctx->is_reference) {
+          GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
           /* Act as if this is a new keyframe with infinite timestamp */
-          splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
+          splitmux->max_in_running_time = G_MAXINT64;
           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
           /* Wake up other input pads to collect this GOP */
           GST_SPLITMUX_BROADCAST (splitmux);
@@ -996,8 +1121,6 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   }
 
   buf = gst_pad_probe_info_get_buffer (info);
-  ctx->in_running_time = gst_segment_to_running_time (&ctx->in_segment,
-      GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buf));
   buf_info = mq_stream_buf_new ();
 
   if (GST_BUFFER_PTS_IS_VALID (buf))
@@ -1005,6 +1128,8 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   else
     ts = GST_BUFFER_DTS (buf);
 
+  GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
+
   GST_SPLITMUX_LOCK (splitmux);
 
   if (splitmux->state == SPLITMUX_STATE_STOPPED)
@@ -1013,32 +1138,46 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   /* If this buffer has a timestamp, advance the input timestamp of the
    * stream */
   if (GST_CLOCK_TIME_IS_VALID (ts)) {
-    GstClockTime running_time =
-        gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
-        GST_BUFFER_TIMESTAMP (buf));
+    GstClockTimeDiff running_time =
+        my_segment_to_running_time (&ctx->in_segment, ts);
 
-    if (GST_CLOCK_TIME_IS_VALID (running_time) &&
-        (ctx->in_running_time == GST_CLOCK_TIME_NONE
-            || running_time > ctx->in_running_time))
+    GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
+        GST_STIME_ARGS (running_time));
+
+    if (GST_CLOCK_STIME_IS_VALID (running_time)
+        && running_time > ctx->in_running_time)
       ctx->in_running_time = running_time;
   }
 
   /* Try to make sure we have a valid running time */
-  if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
+  if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
     ctx->in_running_time =
-        gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
-        ctx->in_segment.start);
+        my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
   }
 
+  GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
+      GST_STIME_ARGS (ctx->in_running_time));
+
   buf_info->run_ts = ctx->in_running_time;
   buf_info->buf_size = gst_buffer_get_size (buf);
 
   /* Update total input byte counter for overflow detect */
   ctx->in_bytes += buf_info->buf_size;
 
-  GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
+  /* initialize mux_start_time */
+  if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) {
+    splitmux->mux_start_time = buf_info->run_ts;
+    GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
+        GST_STIME_ARGS (splitmux->mux_start_time));
+    /* Also take this as the first start time when starting up,
+     * so that we start counting overflow from the first frame */
+    if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
+      splitmux->max_in_running_time = splitmux->mux_start_time;
+  }
+
+  GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
       " total in_bytes %" G_GSIZE_FORMAT,
-      GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
+      GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes);
 
   loop_again = TRUE;
   do {
@@ -1047,18 +1186,18 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
 
     switch (splitmux->state) {
       case SPLITMUX_STATE_COLLECTING_GOP_START:
-        if (ctx->is_video) {
+        if (ctx->is_reference) {
           /* If a keyframe, we have a complete GOP */
           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
-              !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
+              !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
               splitmux->max_in_running_time >= ctx->in_running_time) {
             /* Pass this buffer through */
             loop_again = FALSE;
             break;
           }
           GST_INFO_OBJECT (pad,
-              "Have keyframe with running time %" GST_TIME_FORMAT,
-              GST_TIME_ARGS (ctx->in_running_time));
+              "Have keyframe with running time %" GST_STIME_FORMAT,
+              GST_STIME_ARGS (ctx->in_running_time));
           keyframe = TRUE;
           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
           splitmux->max_in_running_time = ctx->in_running_time;
@@ -1066,7 +1205,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
           GST_SPLITMUX_BROADCAST (splitmux);
           check_completed_gop (splitmux, ctx);
         } else {
-          /* We're still waiting for a keyframe on the video pad, sleep */
+          /* We're still waiting for a keyframe on the reference pad, sleep */
           GST_LOG_OBJECT (pad, "Sleeping for GOP start");
           GST_SPLITMUX_WAIT (splitmux);
           GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d",
@@ -1074,14 +1213,14 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         }
         break;
       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
-        /* After a GOP start is found, this buffer might complete the GOP */
+
         /* If we overran the target timestamp, it might be time to process
          * the GOP, otherwise bail out for more data
          */
         GST_LOG_OBJECT (pad,
-            "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
-            GST_TIME_ARGS (ctx->in_running_time),
-            GST_TIME_ARGS (splitmux->max_in_running_time));
+            "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT,
+            GST_STIME_ARGS (ctx->in_running_time),
+            GST_STIME_ARGS (splitmux->max_in_running_time));
 
         if (ctx->in_running_time < splitmux->max_in_running_time) {
           loop_again = FALSE;
@@ -1092,7 +1231,29 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
             "Collected last packet of GOP. Checking other pads");
         check_completed_gop (splitmux, ctx);
         break;
-      case SPLITMUX_STATE_ENDING_FILE:
+      case SPLITMUX_STATE_ENDING_FILE:{
+        GstEvent *event;
+
+        /* If somes streams received no buffer during the last GOP that overran,
+         * because its next buffer has a timestamp bigger than
+         * ctx->max_in_running_time, its queue is empty. In that case the only
+         * way to wakeup the output thread is by injecting an event in the
+         * queue. This usually happen with subtitle streams.
+         * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */
+        GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event");
+        event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
+            GST_EVENT_TYPE_SERIALIZED,
+            gst_structure_new ("splitmuxsink-unblock", "timestamp",
+                G_TYPE_INT64, splitmux->max_in_running_time, NULL));
+
+        GST_SPLITMUX_UNLOCK (splitmux);
+        gst_pad_send_event (ctx->sinkpad, event);
+        GST_SPLITMUX_LOCK (splitmux);
+        /* state may have changed while we were unlocked. Loop again if so */
+        if (splitmux->state != SPLITMUX_STATE_ENDING_FILE)
+          break;
+        /* fallthrough */
+      }
       case SPLITMUX_STATE_START_NEXT_FRAGMENT:
         /* A fragment is ending, wait until that's done before continuing */
         GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart");
@@ -1118,7 +1279,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   check_queue_length (splitmux, ctx);
 
   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
-      " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
+      " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
 
   GST_SPLITMUX_UNLOCK (splitmux);
   return GST_PAD_PROBE_PASS;
@@ -1197,7 +1358,6 @@ gst_splitmux_sink_request_new_pad (GstElement * element,
   gst_object_unref (GST_OBJECT (res));
 
   ctx = mq_stream_ctx_new (splitmux);
-  ctx->is_video = is_video;
   ctx->srcpad = mq_src;
   ctx->sinkpad = mq_sink;
 
@@ -1206,8 +1366,14 @@ gst_splitmux_sink_request_new_pad (GstElement * element,
       gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
       (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify)
       _pad_block_destroy_src_notify);
-  if (is_video)
-    splitmux->video_ctx = ctx;
+  if (is_video && splitmux->reference_ctx != NULL) {
+    splitmux->reference_ctx->is_reference = FALSE;
+    splitmux->reference_ctx = NULL;
+  }
+  if (splitmux->reference_ctx == NULL) {
+    splitmux->reference_ctx = ctx;
+    ctx->is_reference = TRUE;
+  }
 
   res = gst_ghost_pad_new (gname, mq_sink);
   g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx);
@@ -1281,6 +1447,10 @@ gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad)
 
   gst_element_remove_pad (element, pad);
 
+  /* Reset the internal elements only after all request pads are released */
+  if (splitmux->contexts == NULL)
+    gst_splitmux_reset (splitmux);
+
 fail:
   GST_SPLITMUX_UNLOCK (splitmux);
 }
@@ -1395,42 +1565,43 @@ create_sink (GstSplitMuxSink * splitmux)
 {
   GstElement *provided_sink = NULL;
 
-  g_return_val_if_fail (splitmux->active_sink == NULL, TRUE);
+  if (splitmux->active_sink == NULL) {
 
-  GST_OBJECT_LOCK (splitmux);
-  if (splitmux->provided_sink != NULL)
-    provided_sink = gst_object_ref (splitmux->provided_sink);
-  GST_OBJECT_UNLOCK (splitmux);
+    GST_OBJECT_LOCK (splitmux);
+    if (splitmux->provided_sink != NULL)
+      provided_sink = gst_object_ref (splitmux->provided_sink);
+    GST_OBJECT_UNLOCK (splitmux);
 
-  if (provided_sink == NULL) {
-    if ((splitmux->sink =
-            create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
-      goto fail;
-    splitmux->active_sink = splitmux->sink;
-  } else {
-    if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
-      g_warning ("Could not add sink elements - splitmuxsink will not work");
-      gst_object_unref (provided_sink);
-      goto fail;
-    }
+    if (provided_sink == NULL) {
+      if ((splitmux->sink =
+              create_element (splitmux, DEFAULT_SINK, "sink")) == NULL)
+        goto fail;
+      splitmux->active_sink = splitmux->sink;
+    } else {
+      if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) {
+        g_warning ("Could not add sink elements - splitmuxsink will not work");
+        gst_object_unref (provided_sink);
+        goto fail;
+      }
 
-    splitmux->active_sink = provided_sink;
+      splitmux->active_sink = provided_sink;
 
-    /* The bin holds a ref now, we can drop our tmp ref */
-    gst_object_unref (provided_sink);
+      /* The bin holds a ref now, we can drop our tmp ref */
+      gst_object_unref (provided_sink);
 
-    /* Find the sink element */
-    splitmux->sink = find_sink (splitmux->active_sink);
-    if (splitmux->sink == NULL) {
-      g_warning
-          ("Could not locate sink element in provided sink - splitmuxsink will not work");
-      goto fail;
+      /* Find the sink element */
+      splitmux->sink = find_sink (splitmux->active_sink);
+      if (splitmux->sink == NULL) {
+        g_warning
+            ("Could not locate sink element in provided sink - splitmuxsink will not work");
+        goto fail;
+      }
     }
-  }
 
-  if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
-    g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
-    goto fail;
+    if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) {
+      g_warning ("Failed to link muxer and sink- splitmuxsink will not work");
+      goto fail;
+    }
   }
 
   return TRUE;
@@ -1445,6 +1616,7 @@ static void
 set_next_filename (GstSplitMuxSink * splitmux)
 {
   gchar *fname = NULL;
+  gst_splitmux_sink_ensure_max_files (splitmux);
 
   g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0,
       splitmux->fragment_id, &fname);
@@ -1485,9 +1657,11 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
       GST_SPLITMUX_LOCK (splitmux);
       /* Start by collecting one input on each pad */
       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
-      splitmux->max_in_running_time = 0;
-      splitmux->muxed_out_time = splitmux->mux_start_time = 0;
+      splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
+      splitmux->muxed_out_time = splitmux->mux_start_time =
+          GST_CLOCK_STIME_NONE;
       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
+      splitmux->opening_first_fragment = TRUE;
       GST_SPLITMUX_UNLOCK (splitmux);
       break;
     }
@@ -1513,7 +1687,9 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_READY_TO_NULL:
       GST_SPLITMUX_LOCK (splitmux);
       splitmux->fragment_id = 0;
-      gst_splitmux_reset (splitmux);
+      /* Reset internal elements only if no pad contexts are using them */
+      if (splitmux->contexts == NULL)
+        gst_splitmux_reset (splitmux);
       GST_SPLITMUX_UNLOCK (splitmux);
       break;
     default:
@@ -1539,3 +1715,11 @@ register_splitmuxsink (GstPlugin * plugin)
   return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE,
       GST_TYPE_SPLITMUX_SINK);
 }
+
+static void
+gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux)
+{
+  if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) {
+    splitmux->fragment_id = 0;
+  }
+}