splitmuxsink: Handle negative running time
authorJan Schmidt <jan@centricular.com>
Sun, 17 Jul 2016 12:41:02 +0000 (22:41 +1000)
committerJan Schmidt <jan@centricular.com>
Tue, 19 Jul 2016 14:39:38 +0000 (00:39 +1000)
Use signed clock times for running time everywhere
so that we handle negative running times without
going haywire, similar to what queue and multiqueue
do these days.

gst/multifile/gstsplitmuxsink.c
gst/multifile/gstsplitmuxsink.h

index f76b8f1..8d5d70f 100644 (file)
@@ -399,6 +399,23 @@ gst_splitmux_sink_get_property (GObject * object, guint prop_id,
   }
 }
 
+/* Convenience function */
+static inline GstClockTimeDiff
+my_segment_to_running_time (GstSegment * segment, GstClockTime val)
+{
+  GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
+
+  if (GST_CLOCK_TIME_IS_VALID (val)) {
+    gboolean sign =
+        gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
+    if (sign > 0)
+      res = val;
+    else if (sign < 0)
+      res = -val;
+  }
+  return res;
+}
+
 static GstPad *
 mq_sink_to_src (GstElement * mq, GstPad * sink_pad)
 {
@@ -454,7 +471,7 @@ mq_stream_ctx_new (GstSplitMuxSink * splitmux)
   ctx->splitmux = splitmux;
   gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
   gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED);
-  ctx->in_running_time = ctx->out_running_time = 0;
+  ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE;
   g_queue_init (&ctx->queued_bufs);
   return ctx;
 }
@@ -546,11 +563,11 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
   do {
 
     GST_LOG_OBJECT (ctx->srcpad,
-        "Checking running time %" GST_TIME_FORMAT " against max %"
-        GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time),
-        GST_TIME_ARGS (splitmux->max_out_running_time));
+        "Checking running time %" GST_STIME_FORMAT " against max %"
+        GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time),
+        GST_STIME_ARGS (splitmux->max_out_running_time));
 
-    if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE ||
+    if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
         ctx->out_running_time < splitmux->max_out_running_time) {
       splitmux->have_muxed_something = TRUE;
       return;
@@ -571,17 +588,17 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 
     GST_INFO_OBJECT (ctx->srcpad,
         "Sleeping for running time %"
-        GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")",
-        GST_TIME_ARGS (ctx->out_running_time),
-        GST_TIME_ARGS (splitmux->max_out_running_time));
+        GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")",
+        GST_STIME_ARGS (ctx->out_running_time),
+        GST_STIME_ARGS (splitmux->max_out_running_time));
     ctx->out_blocked = TRUE;
     /* Expand the mq if needed before sleeping */
     check_queue_length (splitmux, ctx);
     GST_SPLITMUX_WAIT (splitmux);
     ctx->out_blocked = FALSE;
     GST_INFO_OBJECT (ctx->srcpad,
-        "Woken for new max running time %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (splitmux->max_out_running_time));
+        "Woken for new max running time %" GST_STIME_FORMAT,
+        GST_STIME_ARGS (splitmux->max_out_running_time));
   } while (1);
 }
 
@@ -631,6 +648,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         break;
       case GST_EVENT_GAP:{
         GstClockTime gap_ts;
+        GstClockTimeDiff rtime;
 
         gst_event_parse_gap (event, &gap_ts, NULL);
         if (gap_ts == GST_CLOCK_TIME_NONE)
@@ -638,28 +656,30 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
 
         GST_SPLITMUX_LOCK (splitmux);
 
-        gap_ts = gst_segment_to_running_time (&ctx->out_segment,
-            GST_FORMAT_TIME, gap_ts);
+        rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts);
 
-        GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT,
-            GST_TIME_ARGS (gap_ts));
+        GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
+            GST_STIME_ARGS (rtime));
 
         if (splitmux->state == SPLITMUX_STATE_STOPPED)
           goto beach;
-        ctx->out_running_time = gap_ts;
-        complete_or_wait_on_out (splitmux, ctx);
+
+        if (rtime != GST_CLOCK_STIME_NONE) {
+          ctx->out_running_time = rtime;
+          complete_or_wait_on_out (splitmux, ctx);
+        }
         GST_SPLITMUX_UNLOCK (splitmux);
         break;
       }
       case GST_EVENT_CUSTOM_DOWNSTREAM:{
         const GstStructure *s;
-        GstClockTime ts = 0;
+        GstClockTimeDiff ts = 0;
 
         s = gst_event_get_structure (event);
         if (!gst_structure_has_name (s, "splitmuxsink-unblock"))
           break;
 
-        gst_structure_get_uint64 (s, "timestamp", &ts);
+        gst_structure_get_int64 (s, "timestamp", &ts);
 
         GST_SPLITMUX_LOCK (splitmux);
 
@@ -691,9 +711,9 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   ctx->out_running_time = buf_info->run_ts;
 
   GST_LOG_OBJECT (splitmux,
-      "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT
+      "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT
       " size %" G_GSIZE_FORMAT,
-      pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size);
+      pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size);
 
   if (splitmux->opening_first_fragment) {
     send_fragment_opened_closed_msg (splitmux, TRUE);
@@ -702,7 +722,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
 
   complete_or_wait_on_out (splitmux, ctx);
 
-  if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE ||
+  if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE ||
       splitmux->muxed_out_time < buf_info->run_ts)
     splitmux->muxed_out_time = buf_info->run_ts;
 
@@ -712,8 +732,8 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   {
     GstBuffer *buf = gst_pad_probe_info_get_buffer (info);
     GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT
-        " run ts %" GST_TIME_FORMAT, buf,
-        GST_TIME_ARGS (ctx->out_running_time));
+        " run ts %" GST_STIME_FORMAT, buf,
+        GST_STIME_ARGS (ctx->out_running_time));
   }
 #endif
 
@@ -776,7 +796,7 @@ start_next_fragment (GstSplitMuxSink * splitmux)
     splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
   } else {
     splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
-    splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
+    splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
     splitmux->have_muxed_something = FALSE;
   }
   splitmux->have_muxed_something =
@@ -787,8 +807,8 @@ start_next_fragment (GstSplitMuxSink * splitmux)
   splitmux->mux_start_bytes = splitmux->muxed_out_bytes;
 
   GST_DEBUG_OBJECT (splitmux,
-      "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (splitmux->max_out_running_time));
+      "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT,
+      GST_STIME_ARGS (splitmux->max_out_running_time));
 
   send_fragment_opened_closed_msg (splitmux, TRUE);
 
@@ -808,7 +828,7 @@ bus_handler (GstBin * bin, GstMessage * message)
       send_fragment_opened_closed_msg (splitmux, FALSE);
 
       if (splitmux->state == SPLITMUX_STATE_ENDING_FILE &&
-          splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) {
+          splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) {
         GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping");
         splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT;
         GST_SPLITMUX_BROADCAST (splitmux);
@@ -838,7 +858,7 @@ handle_gathered_gop (GstSplitMuxSink * splitmux)
 {
   GList *cur;
   gsize queued_bytes = 0;
-  GstClockTime queued_time = 0;
+  GstClockTimeDiff queued_time = 0;
 
   /* Assess if the multiqueue contents overflowed the current file */
   for (cur = g_list_first (splitmux->contexts);
@@ -858,8 +878,8 @@ handle_gathered_gop (GstSplitMuxSink * splitmux)
   /* Expand queued bytes estimate by muxer overhead */
   queued_bytes += (queued_bytes * splitmux->mux_overhead);
 
-  GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT
-      " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes);
+  GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
+      " bytes %" G_GSIZE_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes);
 
   /* Check for overrun - have we output at least one byte and overrun
    * either threshold? */
@@ -873,16 +893,16 @@ handle_gathered_gop (GstSplitMuxSink * splitmux)
 
     GST_INFO_OBJECT (splitmux,
         "mq overflowed since last, draining out. max out TS is %"
-        GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
+        GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
     GST_SPLITMUX_BROADCAST (splitmux);
 
   } else {
     /* No overflow */
     GST_LOG_OBJECT (splitmux,
         "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT
-        " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.",
+        " queued %" G_GSIZE_FORMAT " time %" GST_STIME_FORMAT " Continuing.",
         splitmux->muxed_out_bytes - splitmux->mux_start_bytes,
-        queued_bytes, GST_TIME_ARGS (queued_time));
+        queued_bytes, GST_STIME_ARGS (queued_time));
 
     /* Wake everyone up to push this one GOP, then sleep */
     splitmux->have_muxed_something = TRUE;
@@ -892,11 +912,11 @@ handle_gathered_gop (GstSplitMuxSink * splitmux)
       splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time;
     } else {
       splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
-      splitmux->max_out_running_time = GST_CLOCK_TIME_NONE;
+      splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
     }
 
     GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %"
-        GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time));
+        GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time));
     GST_SPLITMUX_BROADCAST (splitmux);
   }
 
@@ -912,24 +932,24 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx)
 {
   GList *cur;
   gboolean ready = TRUE;
-  GstClockTime current_max_in_running_time;
+  GstClockTimeDiff current_max_in_running_time;
 
   if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) {
     /* Iterate each pad, and check that the input running time is at least
      * up to the reference running time, and if so handle the collected GOP */
     GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
-        GST_TIME_FORMAT " ctx %p",
-        GST_TIME_ARGS (splitmux->max_in_running_time), ctx);
+        GST_STIME_FORMAT " ctx %p",
+        GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
     for (cur = g_list_first (splitmux->contexts); cur != NULL;
         cur = g_list_next (cur)) {
       MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
 
       GST_LOG_OBJECT (splitmux,
-          "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT
+          "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT
           " EOS %d", tmpctx, tmpctx->srcpad,
-          GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
+          GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
 
-      if (splitmux->max_in_running_time != GST_CLOCK_TIME_NONE &&
+      if (splitmux->max_in_running_time != G_MAXINT64 &&
           tmpctx->in_running_time < splitmux->max_in_running_time &&
           !tmpctx->in_eos) {
         GST_LOG_OBJECT (splitmux,
@@ -1050,7 +1070,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED);
         ctx->in_eos = FALSE;
         ctx->in_bytes = 0;
-        ctx->in_running_time = 0;
+        ctx->in_running_time = GST_CLOCK_STIME_NONE;
         GST_SPLITMUX_UNLOCK (splitmux);
         break;
       case GST_EVENT_EOS:
@@ -1063,7 +1083,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         if (ctx->is_reference) {
           GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up");
           /* Act as if this is a new keyframe with infinite timestamp */
-          splitmux->max_in_running_time = GST_CLOCK_TIME_NONE;
+          splitmux->max_in_running_time = G_MAXINT64;
           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
           /* Wake up other input pads to collect this GOP */
           GST_SPLITMUX_BROADCAST (splitmux);
@@ -1091,6 +1111,8 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   else
     ts = GST_BUFFER_DTS (buf);
 
+  GST_LOG_OBJECT (pad, "Buffer TS is %" GST_STIME_FORMAT, GST_STIME_ARGS (ts));
+
   GST_SPLITMUX_LOCK (splitmux);
 
   if (splitmux->state == SPLITMUX_STATE_STOPPED)
@@ -1099,23 +1121,27 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   /* If this buffer has a timestamp, advance the input timestamp of the
    * stream */
   if (GST_CLOCK_TIME_IS_VALID (ts)) {
-    GstClockTime running_time =
-        gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
+    GstClockTimeDiff running_time =
+        my_segment_to_running_time (&ctx->in_segment,
         GST_BUFFER_TIMESTAMP (buf));
 
-    if (GST_CLOCK_TIME_IS_VALID (running_time) &&
-        (ctx->in_running_time == GST_CLOCK_TIME_NONE
-            || running_time > ctx->in_running_time))
+    GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
+        GST_STIME_ARGS (running_time));
+
+    if (GST_CLOCK_STIME_IS_VALID (running_time)
+        && running_time > ctx->in_running_time)
       ctx->in_running_time = running_time;
   }
 
   /* Try to make sure we have a valid running time */
-  if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) {
+  if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) {
     ctx->in_running_time =
-        gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME,
-        ctx->in_segment.start);
+        my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start);
   }
 
+  GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT,
+      GST_STIME_ARGS (ctx->in_running_time));
+
   buf_info->run_ts = ctx->in_running_time;
   buf_info->buf_size = gst_buffer_get_size (buf);
 
@@ -1123,12 +1149,19 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   ctx->in_bytes += buf_info->buf_size;
 
   /* initialize mux_start_time */
-  if (ctx->is_reference && splitmux->mux_start_time == 0)
+  if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) {
     splitmux->mux_start_time = buf_info->run_ts;
+    GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
+        GST_STIME_ARGS (splitmux->mux_start_time));
+    /* Also take this as the first start time when starting up,
+     * so that we start counting overflow from the first frame */
+    if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
+      splitmux->max_in_running_time = splitmux->mux_start_time;
+  }
 
-  GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT
+  GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
       " total in_bytes %" G_GSIZE_FORMAT,
-      GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes);
+      GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes);
 
   loop_again = TRUE;
   do {
@@ -1140,15 +1173,15 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         if (ctx->is_reference) {
           /* If a keyframe, we have a complete GOP */
           if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
-              !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) ||
+              !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) ||
               splitmux->max_in_running_time >= ctx->in_running_time) {
             /* Pass this buffer through */
             loop_again = FALSE;
             break;
           }
           GST_INFO_OBJECT (pad,
-              "Have keyframe with running time %" GST_TIME_FORMAT,
-              GST_TIME_ARGS (ctx->in_running_time));
+              "Have keyframe with running time %" GST_STIME_FORMAT,
+              GST_STIME_ARGS (ctx->in_running_time));
           keyframe = TRUE;
           splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE;
           splitmux->max_in_running_time = ctx->in_running_time;
@@ -1164,14 +1197,14 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         }
         break;
       case SPLITMUX_STATE_WAITING_GOP_COMPLETE:
-        /* After a GOP start is found, this buffer might complete the GOP */
+
         /* If we overran the target timestamp, it might be time to process
          * the GOP, otherwise bail out for more data
          */
         GST_LOG_OBJECT (pad,
-            "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT,
-            GST_TIME_ARGS (ctx->in_running_time),
-            GST_TIME_ARGS (splitmux->max_in_running_time));
+            "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT,
+            GST_STIME_ARGS (ctx->in_running_time),
+            GST_STIME_ARGS (splitmux->max_in_running_time));
 
         if (ctx->in_running_time < splitmux->max_in_running_time) {
           loop_again = FALSE;
@@ -1195,7 +1228,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM |
             GST_EVENT_TYPE_SERIALIZED,
             gst_structure_new ("splitmuxsink-unblock", "timestamp",
-                G_TYPE_UINT64, splitmux->max_in_running_time, NULL));
+                G_TYPE_INT64, splitmux->max_in_running_time, NULL));
 
         GST_SPLITMUX_UNLOCK (splitmux);
         gst_pad_send_event (ctx->sinkpad, event);
@@ -1227,7 +1260,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx)
   check_queue_length (splitmux, ctx);
 
   GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT
-      " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time));
+      " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time));
 
   GST_SPLITMUX_UNLOCK (splitmux);
   return GST_PAD_PROBE_PASS;
@@ -1605,8 +1638,9 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition)
       GST_SPLITMUX_LOCK (splitmux);
       /* Start by collecting one input on each pad */
       splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START;
-      splitmux->max_in_running_time = 0;
-      splitmux->muxed_out_time = splitmux->mux_start_time = 0;
+      splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
+      splitmux->muxed_out_time = splitmux->mux_start_time =
+          GST_CLOCK_STIME_NONE;
       splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0;
       splitmux->opening_first_fragment = TRUE;
       GST_SPLITMUX_UNLOCK (splitmux);
index a233642..82e03a7 100644 (file)
@@ -48,7 +48,7 @@ typedef enum _SplitMuxState {
 typedef struct _MqStreamBuf
 {
   gboolean keyframe;
-  GstClockTime run_ts;
+  GstClockTimeDiff run_ts;
   gsize buf_size;
 } MqStreamBuf;
 
@@ -70,8 +70,8 @@ typedef struct _MqStreamCtx
   GstSegment in_segment;
   GstSegment out_segment;
 
-  GstClockTime in_running_time;
-  GstClockTime out_running_time;
+  GstClockTimeDiff in_running_time;
+  GstClockTimeDiff out_running_time;
 
   gsize in_bytes;
 
@@ -114,14 +114,14 @@ struct _GstSplitMuxSink {
 
   MqStreamCtx *reference_ctx;
   guint queued_gops;
-  GstClockTime max_in_running_time;
-  GstClockTime max_out_running_time;
+  GstClockTimeDiff max_in_running_time;
+  GstClockTimeDiff max_out_running_time;
 
-  GstClockTime muxed_out_time;
+  GstClockTimeDiff muxed_out_time;
   gsize muxed_out_bytes;
   gboolean have_muxed_something;
 
-  GstClockTime mux_start_time;
+  GstClockTimeDiff mux_start_time;
   gsize mux_start_bytes;
 
   gboolean opening_first_fragment;