From 6755691b28f0b368d16ec8fa480e22eac9c15e97 Mon Sep 17 00:00:00 2001 From: Jan Schmidt Date: Sun, 17 Jul 2016 22:41:02 +1000 Subject: [PATCH] splitmuxsink: Handle negative running time Use signed clock times for running time everywhere so that we handle negative running times without going haywire, similar to what queue and multiqueue do these days. --- gst/multifile/gstsplitmuxsink.c | 164 ++++++++++++++++++++++++---------------- gst/multifile/gstsplitmuxsink.h | 14 ++-- 2 files changed, 106 insertions(+), 72 deletions(-) diff --git a/gst/multifile/gstsplitmuxsink.c b/gst/multifile/gstsplitmuxsink.c index f76b8f1..8d5d70f 100644 --- a/gst/multifile/gstsplitmuxsink.c +++ b/gst/multifile/gstsplitmuxsink.c @@ -399,6 +399,23 @@ gst_splitmux_sink_get_property (GObject * object, guint prop_id, } } +/* Convenience function */ +static inline GstClockTimeDiff +my_segment_to_running_time (GstSegment * segment, GstClockTime val) +{ + GstClockTimeDiff res = GST_CLOCK_STIME_NONE; + + if (GST_CLOCK_TIME_IS_VALID (val)) { + gboolean sign = + gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val); + if (sign > 0) + res = val; + else if (sign < 0) + res = -val; + } + return res; +} + static GstPad * mq_sink_to_src (GstElement * mq, GstPad * sink_pad) { @@ -454,7 +471,7 @@ mq_stream_ctx_new (GstSplitMuxSink * splitmux) ctx->splitmux = splitmux; gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED); - ctx->in_running_time = ctx->out_running_time = 0; + ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE; g_queue_init (&ctx->queued_bufs); return ctx; } @@ -546,11 +563,11 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) do { GST_LOG_OBJECT (ctx->srcpad, - "Checking running time %" GST_TIME_FORMAT " against max %" - GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time), - GST_TIME_ARGS (splitmux->max_out_running_time)); + "Checking running time %" GST_STIME_FORMAT " against max %" + GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time), + GST_STIME_ARGS (splitmux->max_out_running_time)); - if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE || + if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE || ctx->out_running_time < splitmux->max_out_running_time) { splitmux->have_muxed_something = TRUE; return; @@ -571,17 +588,17 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) GST_INFO_OBJECT (ctx->srcpad, "Sleeping for running time %" - GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")", - GST_TIME_ARGS (ctx->out_running_time), - GST_TIME_ARGS (splitmux->max_out_running_time)); + GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")", + GST_STIME_ARGS (ctx->out_running_time), + GST_STIME_ARGS (splitmux->max_out_running_time)); ctx->out_blocked = TRUE; /* Expand the mq if needed before sleeping */ check_queue_length (splitmux, ctx); GST_SPLITMUX_WAIT (splitmux); ctx->out_blocked = FALSE; GST_INFO_OBJECT (ctx->srcpad, - "Woken for new max running time %" GST_TIME_FORMAT, - GST_TIME_ARGS (splitmux->max_out_running_time)); + "Woken for new max running time %" GST_STIME_FORMAT, + GST_STIME_ARGS (splitmux->max_out_running_time)); } while (1); } @@ -631,6 +648,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) break; case GST_EVENT_GAP:{ GstClockTime gap_ts; + GstClockTimeDiff rtime; gst_event_parse_gap (event, &gap_ts, NULL); if (gap_ts == GST_CLOCK_TIME_NONE) @@ -638,28 +656,30 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) GST_SPLITMUX_LOCK (splitmux); - gap_ts = gst_segment_to_running_time (&ctx->out_segment, - GST_FORMAT_TIME, gap_ts); + rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts); - GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT, - GST_TIME_ARGS (gap_ts)); + GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT, + GST_STIME_ARGS (rtime)); if (splitmux->state == SPLITMUX_STATE_STOPPED) goto beach; - ctx->out_running_time = gap_ts; - complete_or_wait_on_out (splitmux, ctx); + + if (rtime != GST_CLOCK_STIME_NONE) { + ctx->out_running_time = rtime; + complete_or_wait_on_out (splitmux, ctx); + } GST_SPLITMUX_UNLOCK (splitmux); break; } case GST_EVENT_CUSTOM_DOWNSTREAM:{ const GstStructure *s; - GstClockTime ts = 0; + GstClockTimeDiff ts = 0; s = gst_event_get_structure (event); if (!gst_structure_has_name (s, "splitmuxsink-unblock")) break; - gst_structure_get_uint64 (s, "timestamp", &ts); + gst_structure_get_int64 (s, "timestamp", &ts); GST_SPLITMUX_LOCK (splitmux); @@ -691,9 +711,9 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) ctx->out_running_time = buf_info->run_ts; GST_LOG_OBJECT (splitmux, - "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT + "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT " size %" G_GSIZE_FORMAT, - pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size); + pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size); if (splitmux->opening_first_fragment) { send_fragment_opened_closed_msg (splitmux, TRUE); @@ -702,7 +722,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) complete_or_wait_on_out (splitmux, ctx); - if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE || + if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE || splitmux->muxed_out_time < buf_info->run_ts) splitmux->muxed_out_time = buf_info->run_ts; @@ -712,8 +732,8 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) { GstBuffer *buf = gst_pad_probe_info_get_buffer (info); GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT - " run ts %" GST_TIME_FORMAT, buf, - GST_TIME_ARGS (ctx->out_running_time)); + " run ts %" GST_STIME_FORMAT, buf, + GST_STIME_ARGS (ctx->out_running_time)); } #endif @@ -776,7 +796,7 @@ start_next_fragment (GstSplitMuxSink * splitmux) splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time; } else { splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; - splitmux->max_out_running_time = GST_CLOCK_TIME_NONE; + splitmux->max_out_running_time = GST_CLOCK_STIME_NONE; splitmux->have_muxed_something = FALSE; } splitmux->have_muxed_something = @@ -787,8 +807,8 @@ start_next_fragment (GstSplitMuxSink * splitmux) splitmux->mux_start_bytes = splitmux->muxed_out_bytes; GST_DEBUG_OBJECT (splitmux, - "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT, - GST_TIME_ARGS (splitmux->max_out_running_time)); + "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT, + GST_STIME_ARGS (splitmux->max_out_running_time)); send_fragment_opened_closed_msg (splitmux, TRUE); @@ -808,7 +828,7 @@ bus_handler (GstBin * bin, GstMessage * message) send_fragment_opened_closed_msg (splitmux, FALSE); if (splitmux->state == SPLITMUX_STATE_ENDING_FILE && - splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) { + splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) { GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping"); splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT; GST_SPLITMUX_BROADCAST (splitmux); @@ -838,7 +858,7 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) { GList *cur; gsize queued_bytes = 0; - GstClockTime queued_time = 0; + GstClockTimeDiff queued_time = 0; /* Assess if the multiqueue contents overflowed the current file */ for (cur = g_list_first (splitmux->contexts); @@ -858,8 +878,8 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) /* Expand queued bytes estimate by muxer overhead */ queued_bytes += (queued_bytes * splitmux->mux_overhead); - GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT - " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes); + GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT + " bytes %" G_GSIZE_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes); /* Check for overrun - have we output at least one byte and overrun * either threshold? */ @@ -873,16 +893,16 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) GST_INFO_OBJECT (splitmux, "mq overflowed since last, draining out. max out TS is %" - GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time)); + GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time)); GST_SPLITMUX_BROADCAST (splitmux); } else { /* No overflow */ GST_LOG_OBJECT (splitmux, "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT - " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.", + " queued %" G_GSIZE_FORMAT " time %" GST_STIME_FORMAT " Continuing.", splitmux->muxed_out_bytes - splitmux->mux_start_bytes, - queued_bytes, GST_TIME_ARGS (queued_time)); + queued_bytes, GST_STIME_ARGS (queued_time)); /* Wake everyone up to push this one GOP, then sleep */ splitmux->have_muxed_something = TRUE; @@ -892,11 +912,11 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time; } else { splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; - splitmux->max_out_running_time = GST_CLOCK_TIME_NONE; + splitmux->max_out_running_time = GST_CLOCK_STIME_NONE; } GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %" - GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time)); + GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time)); GST_SPLITMUX_BROADCAST (splitmux); } @@ -912,24 +932,24 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { GList *cur; gboolean ready = TRUE; - GstClockTime current_max_in_running_time; + GstClockTimeDiff current_max_in_running_time; if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) { /* Iterate each pad, and check that the input running time is at least * up to the reference running time, and if so handle the collected GOP */ GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %" - GST_TIME_FORMAT " ctx %p", - GST_TIME_ARGS (splitmux->max_in_running_time), ctx); + GST_STIME_FORMAT " ctx %p", + GST_STIME_ARGS (splitmux->max_in_running_time), ctx); for (cur = g_list_first (splitmux->contexts); cur != NULL; cur = g_list_next (cur)) { MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); GST_LOG_OBJECT (splitmux, - "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT + "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT " EOS %d", tmpctx, tmpctx->srcpad, - GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos); + GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos); - if (splitmux->max_in_running_time != GST_CLOCK_TIME_NONE && + if (splitmux->max_in_running_time != G_MAXINT64 && tmpctx->in_running_time < splitmux->max_in_running_time && !tmpctx->in_eos) { GST_LOG_OBJECT (splitmux, @@ -1050,7 +1070,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); ctx->in_eos = FALSE; ctx->in_bytes = 0; - ctx->in_running_time = 0; + ctx->in_running_time = GST_CLOCK_STIME_NONE; GST_SPLITMUX_UNLOCK (splitmux); break; case GST_EVENT_EOS: @@ -1063,7 +1083,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) if (ctx->is_reference) { GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up"); /* Act as if this is a new keyframe with infinite timestamp */ - splitmux->max_in_running_time = GST_CLOCK_TIME_NONE; + splitmux->max_in_running_time = G_MAXINT64; splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; /* Wake up other input pads to collect this GOP */ GST_SPLITMUX_BROADCAST (splitmux); @@ -1091,6 +1111,8 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) else ts = GST_BUFFER_DTS (buf); + GST_LOG_OBJECT (pad, "Buffer TS is %" GST_STIME_FORMAT, GST_STIME_ARGS (ts)); + GST_SPLITMUX_LOCK (splitmux); if (splitmux->state == SPLITMUX_STATE_STOPPED) @@ -1099,23 +1121,27 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) /* If this buffer has a timestamp, advance the input timestamp of the * stream */ if (GST_CLOCK_TIME_IS_VALID (ts)) { - GstClockTime running_time = - gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME, + GstClockTimeDiff running_time = + my_segment_to_running_time (&ctx->in_segment, GST_BUFFER_TIMESTAMP (buf)); - if (GST_CLOCK_TIME_IS_VALID (running_time) && - (ctx->in_running_time == GST_CLOCK_TIME_NONE - || running_time > ctx->in_running_time)) + GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT, + GST_STIME_ARGS (running_time)); + + if (GST_CLOCK_STIME_IS_VALID (running_time) + && running_time > ctx->in_running_time) ctx->in_running_time = running_time; } /* Try to make sure we have a valid running time */ - if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) { + if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) { ctx->in_running_time = - gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME, - ctx->in_segment.start); + my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start); } + GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT, + GST_STIME_ARGS (ctx->in_running_time)); + buf_info->run_ts = ctx->in_running_time; buf_info->buf_size = gst_buffer_get_size (buf); @@ -1123,12 +1149,19 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) ctx->in_bytes += buf_info->buf_size; /* initialize mux_start_time */ - if (ctx->is_reference && splitmux->mux_start_time == 0) + if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) { splitmux->mux_start_time = buf_info->run_ts; + GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT, + GST_STIME_ARGS (splitmux->mux_start_time)); + /* Also take this as the first start time when starting up, + * so that we start counting overflow from the first frame */ + if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)) + splitmux->max_in_running_time = splitmux->mux_start_time; + } - GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT + GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT " total in_bytes %" G_GSIZE_FORMAT, - GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes); + GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes); loop_again = TRUE; do { @@ -1140,15 +1173,15 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) if (ctx->is_reference) { /* If a keyframe, we have a complete GOP */ if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) || - !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) || + !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) || splitmux->max_in_running_time >= ctx->in_running_time) { /* Pass this buffer through */ loop_again = FALSE; break; } GST_INFO_OBJECT (pad, - "Have keyframe with running time %" GST_TIME_FORMAT, - GST_TIME_ARGS (ctx->in_running_time)); + "Have keyframe with running time %" GST_STIME_FORMAT, + GST_STIME_ARGS (ctx->in_running_time)); keyframe = TRUE; splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; splitmux->max_in_running_time = ctx->in_running_time; @@ -1164,14 +1197,14 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) } break; case SPLITMUX_STATE_WAITING_GOP_COMPLETE: - /* After a GOP start is found, this buffer might complete the GOP */ + /* If we overran the target timestamp, it might be time to process * the GOP, otherwise bail out for more data */ GST_LOG_OBJECT (pad, - "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT, - GST_TIME_ARGS (ctx->in_running_time), - GST_TIME_ARGS (splitmux->max_in_running_time)); + "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT, + GST_STIME_ARGS (ctx->in_running_time), + GST_STIME_ARGS (splitmux->max_in_running_time)); if (ctx->in_running_time < splitmux->max_in_running_time) { loop_again = FALSE; @@ -1195,7 +1228,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM | GST_EVENT_TYPE_SERIALIZED, gst_structure_new ("splitmuxsink-unblock", "timestamp", - G_TYPE_UINT64, splitmux->max_in_running_time, NULL)); + G_TYPE_INT64, splitmux->max_in_running_time, NULL)); GST_SPLITMUX_UNLOCK (splitmux); gst_pad_send_event (ctx->sinkpad, event); @@ -1227,7 +1260,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) check_queue_length (splitmux, ctx); GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT - " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time)); + " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time)); GST_SPLITMUX_UNLOCK (splitmux); return GST_PAD_PROBE_PASS; @@ -1605,8 +1638,9 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) GST_SPLITMUX_LOCK (splitmux); /* Start by collecting one input on each pad */ splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; - splitmux->max_in_running_time = 0; - splitmux->muxed_out_time = splitmux->mux_start_time = 0; + splitmux->max_in_running_time = GST_CLOCK_STIME_NONE; + splitmux->muxed_out_time = splitmux->mux_start_time = + GST_CLOCK_STIME_NONE; splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0; splitmux->opening_first_fragment = TRUE; GST_SPLITMUX_UNLOCK (splitmux); diff --git a/gst/multifile/gstsplitmuxsink.h b/gst/multifile/gstsplitmuxsink.h index a233642..82e03a7 100644 --- a/gst/multifile/gstsplitmuxsink.h +++ b/gst/multifile/gstsplitmuxsink.h @@ -48,7 +48,7 @@ typedef enum _SplitMuxState { typedef struct _MqStreamBuf { gboolean keyframe; - GstClockTime run_ts; + GstClockTimeDiff run_ts; gsize buf_size; } MqStreamBuf; @@ -70,8 +70,8 @@ typedef struct _MqStreamCtx GstSegment in_segment; GstSegment out_segment; - GstClockTime in_running_time; - GstClockTime out_running_time; + GstClockTimeDiff in_running_time; + GstClockTimeDiff out_running_time; gsize in_bytes; @@ -114,14 +114,14 @@ struct _GstSplitMuxSink { MqStreamCtx *reference_ctx; guint queued_gops; - GstClockTime max_in_running_time; - GstClockTime max_out_running_time; + GstClockTimeDiff max_in_running_time; + GstClockTimeDiff max_out_running_time; - GstClockTime muxed_out_time; + GstClockTimeDiff muxed_out_time; gsize muxed_out_bytes; gboolean have_muxed_something; - GstClockTime mux_start_time; + GstClockTimeDiff mux_start_time; gsize mux_start_bytes; gboolean opening_first_fragment; -- 2.7.4