const gchar * factory, const gchar * name, gboolean locked);
static void do_async_done (GstSplitMuxSink * splitmux);
-static void gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux);
+
+static GstClockTime calculate_next_max_timecode (GstSplitMuxSink * splitmux,
+ const GstVideoTimeCode * cur_tc, GstClockTime running_time,
+ GstVideoTimeCode ** next_tc);
static MqStreamBuf *
mq_stream_buf_new (void)
}
static void
+input_gop_free (InputGop * gop)
+{
+ g_clear_pointer (&gop->start_tc, gst_video_time_code_free);
+ g_slice_free (InputGop, gop);
+}
+
+static void
gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass)
{
GObjectClass *gobject_class = (GObjectClass *) klass;
splitmux->reset_muxer = DEFAULT_RESET_MUXER;
splitmux->threshold_timecode_str = NULL;
- gst_splitmux_reset_timecode (splitmux);
splitmux->async_finalize = DEFAULT_ASYNC_FINALIZE;
splitmux->muxer_factory = g_strdup (DEFAULT_MUXER);
splitmux->do_split_next_gop = FALSE;
splitmux->times_to_split = gst_queue_array_new_for_struct (8, 8);
splitmux->next_fku_time = GST_CLOCK_TIME_NONE;
+
+ g_queue_init (&splitmux->pending_input_gops);
}
static void
}
static void
-gst_splitmux_reset_timecode (GstSplitMuxSink * splitmux)
-{
- g_clear_pointer (&splitmux->in_tc, gst_video_time_code_free);
- g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
- g_clear_pointer (&splitmux->gop_start_tc, gst_video_time_code_free);
- splitmux->next_fragment_start_tc_time = GST_CLOCK_TIME_NONE;
-}
-
-static void
gst_splitmux_sink_dispose (GObject * object)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
gst_splitmux_sink_finalize (GObject * object)
{
GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object);
+
g_cond_clear (&splitmux->input_cond);
g_cond_clear (&splitmux->output_cond);
g_mutex_clear (&splitmux->lock);
g_mutex_clear (&splitmux->state_lock);
g_queue_foreach (&splitmux->out_cmd_q, (GFunc) out_cmd_buf_free, NULL);
g_queue_clear (&splitmux->out_cmd_q);
+ g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
+ g_queue_clear (&splitmux->pending_input_gops);
+
+ g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
if (splitmux->muxerpad_map)
gst_structure_free (splitmux->muxerpad_map);
* because the dispose will have freed all request pads though */
g_list_foreach (splitmux->contexts, (GFunc) mq_stream_ctx_free, NULL);
g_list_free (splitmux->contexts);
- gst_splitmux_reset_timecode (splitmux);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
/* will be calculated later */
g_clear_pointer (&splitmux->tc_interval,
gst_video_time_code_interval_free);
- gst_splitmux_reset_timecode (splitmux);
splitmux->threshold_timecode_str = g_value_dup_string (value);
if (splitmux->threshold_timecode_str) {
splitmux->threshold_timecode_str = NULL;
}
}
+ splitmux->next_fragment_start_tc_time =
+ calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
+ splitmux->fragment_start_time, NULL);
+ if (splitmux->tc_interval && splitmux->fragment_start_tc
+ && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
+ GST_WARNING_OBJECT (splitmux,
+ "Couldn't calculate next fragment start time for timecode mode");
+ }
GST_OBJECT_UNLOCK (splitmux);
break;
case PROP_SEND_KEYFRAME_REQUESTS:
gboolean can_output = (ctx->is_reference || splitmux->ready_for_output);
GstClockTimeDiff my_max_out_running_time = splitmux->max_out_running_time;
- if (!(splitmux->max_out_running_time == 0 ||
- splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
- splitmux->alignment_threshold == 0 ||
- splitmux->max_out_running_time < splitmux->alignment_threshold)) {
+ if (my_max_out_running_time != GST_CLOCK_STIME_NONE
+ && my_max_out_running_time != G_MAXINT64) {
my_max_out_running_time -= splitmux->alignment_threshold;
GST_LOG_OBJECT (ctx->srcpad,
"Max out running time currently %" GST_STIME_FORMAT
GST_STIME_ARGS (my_max_out_running_time));
if (can_output) {
- if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE ||
+ if (splitmux->max_out_running_time != GST_CLOCK_STIME_NONE &&
ctx->out_running_time < my_max_out_running_time) {
return GST_FLOW_OK;
}
GST_STIME_ARGS (cmd->max_output_ts));
/* Extend the output range immediately */
- splitmux->max_out_running_time = cmd->max_output_ts;
+ if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE
+ || cmd->max_output_ts > splitmux->max_out_running_time)
+ splitmux->max_out_running_time = cmd->max_output_ts;
+ GST_DEBUG_OBJECT (splitmux,
+ "Max out running time now %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (splitmux->max_out_running_time));
splitmux->output_state = SPLITMUX_OUTPUT_STATE_OUTPUT_GOP;
}
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
next_max_tc_time = day_in_ns - cur_tc_time + target_tc_time + running_time;
}
- GST_INFO_OBJECT (splitmux, "Next max TC time: %" GST_TIME_FORMAT
- " from ref TC: %" GST_TIME_FORMAT, GST_TIME_ARGS (next_max_tc_time),
- GST_TIME_ARGS (cur_tc_time));
+#ifndef GST_DISABLE_GST_DEBUG
+ {
+ gchar *next_max_tc_str, *cur_tc_str;
+
+ cur_tc_str = gst_video_time_code_to_string (cur_tc);
+ next_max_tc_str = gst_video_time_code_to_string (target_tc);
+
+ GST_INFO_OBJECT (splitmux, "Next max timecode %s time: %" GST_TIME_FORMAT
+ " from ref timecode %s time: %" GST_TIME_FORMAT,
+ next_max_tc_str,
+ GST_TIME_ARGS (next_max_tc_time),
+ cur_tc_str, GST_TIME_ARGS (cur_tc_time));
+
+ g_free (next_max_tc_str);
+ g_free (cur_tc_str);
+ }
+#endif
+
if (next_tc)
*next_tc = target_tc;
else
static gboolean
request_next_keyframe (GstSplitMuxSink * splitmux, GstBuffer * buffer,
- GstClockTime running_time)
+ GstClockTimeDiff running_time_dts)
{
GstEvent *ev;
GstClockTime target_time;
GstClockTime next_max_tc_time = GST_CLOCK_TIME_NONE;
GstClockTime next_fku_time = GST_CLOCK_TIME_NONE;
GstClockTime tc_rounding_error = 5 * GST_USECOND;
+ InputGop *newest_gop = NULL;
+ GList *l;
if (!splitmux->send_keyframe_requests)
return TRUE;
+ /* Find the newest GOP where we passed in DTS the start PTS */
+ for (l = splitmux->pending_input_gops.tail; l; l = l->prev) {
+ InputGop *tmp = l->data;
+
+ GST_TRACE_OBJECT (splitmux,
+ "Having pending input GOP with start PTS %" GST_STIME_FORMAT
+ " and start time %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (tmp->start_time_pts), GST_STIME_ARGS (tmp->start_time));
+
+ if (tmp->sent_fku) {
+ GST_DEBUG_OBJECT (splitmux,
+ "Already checked for a keyframe request for this GOP");
+ return TRUE;
+ }
+
+ if (running_time_dts == GST_CLOCK_STIME_NONE ||
+ tmp->start_time_pts == GST_CLOCK_STIME_NONE ||
+ running_time_dts >= tmp->start_time_pts) {
+ GST_DEBUG_OBJECT (splitmux,
+ "Using GOP with start PTS %" GST_STIME_FORMAT " and start time %"
+ GST_STIME_FORMAT, GST_STIME_ARGS (tmp->start_time_pts),
+ GST_STIME_ARGS (tmp->start_time));
+ newest_gop = tmp;
+ break;
+ }
+ }
+
+ if (!newest_gop) {
+ GST_DEBUG_OBJECT (splitmux, "Have no complete enough pending input GOP");
+ return TRUE;
+ }
+
if (splitmux->tc_interval) {
- if (splitmux->in_tc && gst_video_time_code_is_valid (splitmux->in_tc)) {
+ if (newest_gop->start_tc
+ && gst_video_time_code_is_valid (newest_gop->start_tc)) {
GstVideoTimeCode *next_tc = NULL;
max_tc_time =
- calculate_next_max_timecode (splitmux, splitmux->in_tc,
- running_time, &next_tc);
+ calculate_next_max_timecode (splitmux, newest_gop->start_tc,
+ newest_gop->start_time, &next_tc);
/* calculate the next expected keyframe time to prevent too early fku
* event */
timecode_based = GST_CLOCK_TIME_IS_VALID (max_tc_time) &&
GST_CLOCK_TIME_IS_VALID (next_max_tc_time);
+
+ if (!timecode_based) {
+ GST_WARNING_OBJECT (splitmux,
+ "Couldn't calculate maximum fragment time for timecode mode");
+ }
} else {
/* This can happen in the presence of GAP events that trigger
* a new fragment start */
next_fku_time = 0;
}
} else {
- target_time = running_time + splitmux->threshold_time;
+ target_time = newest_gop->start_time + splitmux->threshold_time;
}
if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fku_time)) {
next_fku_time = target_time + splitmux->threshold_time;
}
- splitmux->next_fku_time = next_fku_time;
-
- ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
GST_INFO_OBJECT (splitmux, "Requesting keyframe at %" GST_TIME_FORMAT
- ", the next expected keyframe is %" GST_TIME_FORMAT,
+ ", the next expected keyframe request time is %" GST_TIME_FORMAT,
GST_TIME_ARGS (target_time), GST_TIME_ARGS (next_fku_time));
+
+ newest_gop->sent_fku = TRUE;
+
+ splitmux->next_fku_time = next_fku_time;
+ ev = gst_video_event_new_upstream_force_key_unit (target_time, TRUE, 0);
+
return gst_pad_push_event (splitmux->reference_ctx->sinkpad, ev);
}
}
/* If we have popped a keyframe, decrement the queued_gop count */
- if (buf_info->keyframe && splitmux->queued_keyframes > 0)
+ if (buf_info->keyframe && splitmux->queued_keyframes > 0 && ctx->is_reference)
splitmux->queued_keyframes--;
ctx->out_running_time = buf_info->run_ts;
gboolean check_robust_muxing;
GstClockTime time_to_split = GST_CLOCK_TIME_NONE;
GstClockTime *ptr_to_time;
+ const InputGop *gop, *next_gop;
GST_OBJECT_LOCK (splitmux);
thresh_bytes = splitmux->threshold_bytes;
return TRUE;
}
+ gop = g_queue_peek_head (&splitmux->pending_input_gops);
+ /* We need a full GOP queued up at this point */
+ g_assert (gop != NULL);
+ next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
+ /* And the beginning of the next GOP or otherwise EOS */
+
/* User told us to split at this running time */
- if (splitmux->gop_start_time >= time_to_split) {
+ if (gop->start_time >= time_to_split) {
GST_OBJECT_LOCK (splitmux);
/* Dequeue running time */
gst_queue_array_pop_head_struct (splitmux->times_to_split);
ptr_to_time = gst_queue_array_peek_head_struct (splitmux->times_to_split);
while (ptr_to_time) {
time_to_split = *ptr_to_time;
- if (splitmux->gop_start_time < time_to_split) {
+ if (gop->start_time < time_to_split) {
break;
}
gst_queue_array_pop_head_struct (splitmux->times_to_split);
}
GST_TRACE_OBJECT (splitmux,
"GOP start time %" GST_STIME_FORMAT " is after requested split point %"
- GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->gop_start_time),
+ GST_STIME_FORMAT, GST_STIME_ARGS (gop->start_time),
GST_STIME_ARGS (time_to_split));
GST_OBJECT_UNLOCK (splitmux);
return TRUE;
return TRUE; /* Would overrun time limit */
}
- if (splitmux->tc_interval &&
- GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
- splitmux->reference_ctx->in_running_time >
- splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
- GST_TRACE_OBJECT (splitmux,
- "in running time %" GST_STIME_FORMAT " overruns time limit %"
- GST_TIME_FORMAT,
- GST_STIME_ARGS (splitmux->reference_ctx->in_running_time),
- GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
- return TRUE;
+ if (splitmux->tc_interval) {
+ GstClockTime next_gop_start_time =
+ next_gop ? next_gop->start_time : splitmux->max_in_running_time;
+
+ if (GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time) &&
+ GST_CLOCK_STIME_IS_VALID (next_gop_start_time) &&
+ next_gop_start_time >
+ splitmux->next_fragment_start_tc_time + 5 * GST_USECOND) {
+ GST_TRACE_OBJECT (splitmux,
+ "in running time %" GST_STIME_FORMAT " overruns time limit %"
+ GST_TIME_FORMAT, GST_STIME_ARGS (next_gop_start_time),
+ GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
+ return TRUE;
+ }
}
if (check_robust_muxing) {
* go to COLLECTING_GOP_START state
*/
static void
-handle_gathered_gop (GstSplitMuxSink * splitmux)
+handle_gathered_gop (GstSplitMuxSink * splitmux, const InputGop * gop,
+ GstClockTimeDiff next_gop_start_time, GstClockTimeDiff max_out_running_time)
{
guint64 queued_bytes;
GstClockTimeDiff queued_time = 0;
GstClockTimeDiff queued_gop_time = 0;
- GstClockTimeDiff new_out_ts = splitmux->reference_ctx->in_running_time;
SplitMuxOutputCommand *cmd;
/* Assess if the multiqueue contents overflowed the current file */
* reference stream. Other streams might have run ahead a little bit,
* but extra pieces won't be released to the muxer beyond the reference
* stream cut-off anyway - so it forms the limit. */
- queued_bytes = splitmux->fragment_total_bytes + splitmux->gop_total_bytes;
- queued_time = splitmux->reference_ctx->in_running_time;
+ queued_bytes = splitmux->fragment_total_bytes + gop->total_bytes;
+ queued_time = next_gop_start_time;
/* queued_gop_time tracks how much unwritten data there is waiting to
* be written to this fragment including this GOP */
if (splitmux->reference_ctx->out_running_time != GST_CLOCK_STIME_NONE)
- queued_gop_time =
- splitmux->reference_ctx->in_running_time -
- splitmux->reference_ctx->out_running_time;
+ queued_gop_time = queued_time - splitmux->reference_ctx->out_running_time;
else
- queued_gop_time =
- splitmux->reference_ctx->in_running_time - splitmux->gop_start_time;
+ queued_gop_time = queued_time - gop->start_time;
GST_LOG_OBJECT (splitmux, " queued_bytes %" G_GUINT64_FORMAT, queued_bytes);
GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT
- " bytes %" G_GUINT64_FORMAT " in running time %" GST_STIME_FORMAT
+ " bytes %" G_GUINT64_FORMAT " in next gop start time %" GST_STIME_FORMAT
" gop start time %" GST_STIME_FORMAT,
GST_STIME_ARGS (queued_time), queued_bytes,
- GST_STIME_ARGS (splitmux->reference_ctx->in_running_time),
- GST_STIME_ARGS (splitmux->gop_start_time));
+ GST_STIME_ARGS (next_gop_start_time), GST_STIME_ARGS (gop->start_time));
if (queued_gop_time < 0)
goto error_gop_duration;
GST_INFO_OBJECT (splitmux,
"This GOP (dur %" GST_STIME_FORMAT
") would overflow the fragment, Sending start_new_fragment cmd",
- GST_STIME_ARGS (splitmux->reference_ctx->in_running_time -
- splitmux->gop_start_time));
+ GST_STIME_ARGS (queued_gop_time));
cmd = out_cmd_buf_new ();
cmd->start_new_fragment = TRUE;
g_queue_push_head (&splitmux->out_cmd_q, cmd);
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
- new_out_ts = splitmux->reference_ctx->in_running_time;
- splitmux->fragment_start_time = splitmux->gop_start_time;
+ splitmux->fragment_start_time = gop->start_time;
+ splitmux->fragment_start_time_pts = gop->start_time_pts;
splitmux->fragment_total_bytes = 0;
splitmux->fragment_reference_bytes = 0;
- if (splitmux->tc_interval) {
- video_time_code_replace (&splitmux->fragment_start_tc,
- splitmux->gop_start_tc);
- splitmux->next_fragment_start_tc_time =
- calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
- splitmux->fragment_start_time, NULL);
- if (!GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
- GST_WARNING_OBJECT (splitmux,
- "Couldn't calculate next fragment start time for timecode mode");
- /* shouldn't happen, but reset all and try again with next buffers */
- gst_splitmux_reset_timecode (splitmux);
- }
+ video_time_code_replace (&splitmux->fragment_start_tc, gop->start_tc);
+ splitmux->next_fragment_start_tc_time =
+ calculate_next_max_timecode (splitmux, splitmux->fragment_start_tc,
+ splitmux->fragment_start_time, NULL);
+ if (splitmux->tc_interval && splitmux->fragment_start_tc
+ && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time)) {
+ GST_WARNING_OBJECT (splitmux,
+ "Couldn't calculate next fragment start time for timecode mode");
}
}
/* And set up to collect the next GOP */
- if (!splitmux->reference_ctx->in_eos) {
+ if (max_out_running_time != G_MAXINT64) {
splitmux->input_state = SPLITMUX_INPUT_STATE_COLLECTING_GOP_START;
- splitmux->gop_start_time = new_out_ts;
- if (splitmux->tc_interval)
- video_time_code_replace (&splitmux->gop_start_tc, splitmux->in_tc);
} else {
/* This is probably already the current state, but just in case: */
splitmux->input_state = SPLITMUX_INPUT_STATE_FINISHING_UP;
- new_out_ts = GST_CLOCK_STIME_NONE; /* EOS runs until forever */
}
/* And wake all input contexts to send a wake-up event */
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
/* Now either way - either there was no overflow, or we requested a new fragment: release this GOP */
- splitmux->fragment_total_bytes += splitmux->gop_total_bytes;
- splitmux->fragment_reference_bytes += splitmux->gop_reference_bytes;
+ splitmux->fragment_total_bytes += gop->total_bytes;
+ splitmux->fragment_reference_bytes += gop->reference_bytes;
- if (splitmux->gop_total_bytes > 0) {
+ if (gop->total_bytes > 0) {
GST_LOG_OBJECT (splitmux,
"Releasing GOP to output. Bytes in fragment now %" G_GUINT64_FORMAT
" time %" GST_STIME_FORMAT,
/* Send this GOP to the output command queue */
cmd = out_cmd_buf_new ();
cmd->start_new_fragment = FALSE;
- cmd->max_output_ts = new_out_ts;
+ cmd->max_output_ts = max_out_running_time;
GST_LOG_OBJECT (splitmux, "Sending GOP cmd to output for TS %"
- GST_STIME_FORMAT, GST_STIME_ARGS (new_out_ts));
+ GST_STIME_FORMAT, GST_STIME_ARGS (max_out_running_time));
g_queue_push_head (&splitmux->out_cmd_q, cmd);
GST_SPLITMUX_BROADCAST_OUTPUT (splitmux);
}
- splitmux->gop_total_bytes = 0;
- splitmux->gop_reference_bytes = 0;
return;
error_gop_duration:
}
do {
+ GstClockTimeDiff next_gop_start = GST_CLOCK_STIME_NONE;
+
if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
+ GstClockTimeDiff max_out_running_time;
gboolean ready = TRUE;
+ InputGop *gop;
+ const InputGop *next_gop;
+
+ gop = g_queue_peek_head (&splitmux->pending_input_gops);
+ next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
+
+ /* If we have no GOP or no next GOP here then the reference context is
+ * at EOS, otherwise use the start time of the next GOP if we're far
+ * enough in the GOP to know it */
+ if (gop && next_gop) {
+ if (!splitmux->reference_ctx->in_eos
+ && splitmux->max_in_running_time_dts != GST_CLOCK_STIME_NONE
+ && splitmux->max_in_running_time_dts < next_gop->start_time_pts) {
+ GST_LOG_OBJECT (splitmux,
+ "No further GOPs finished collecting, waiting until current DTS %"
+ GST_STIME_FORMAT " has passed next GOP start PTS %"
+ GST_STIME_FORMAT,
+ GST_STIME_ARGS (splitmux->max_in_running_time_dts),
+ GST_STIME_ARGS (next_gop->start_time_pts));
+ break;
+ }
+
+ GST_LOG_OBJECT (splitmux,
+ "Finished collecting GOP with start time %" GST_STIME_FORMAT
+ ", next GOP start time %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (gop->start_time),
+ GST_STIME_ARGS (next_gop->start_time));
+ next_gop_start = next_gop->start_time;
+ max_out_running_time =
+ splitmux->reference_ctx->in_eos ? G_MAXINT64 : next_gop->start_time;
+ } else if (!next_gop) {
+ GST_LOG_OBJECT (splitmux, "Reference context is EOS");
+ next_gop_start = splitmux->max_in_running_time;
+ max_out_running_time = G_MAXINT64;
+ } else if (!gop) {
+ GST_LOG_OBJECT (splitmux, "No further GOPs finished collecting");
+ break;
+ } else {
+ g_assert_not_reached ();
+ }
+
+ g_assert (gop != NULL);
/* Iterate each pad, and check that the input running time is at least
- * up to the reference running time, and if so handle the collected GOP */
- GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %"
- GST_STIME_FORMAT " ctx %p",
- GST_STIME_ARGS (splitmux->max_in_running_time), ctx);
+ * up to the start running time of the next GOP or EOS, and if so handle
+ * the collected GOP */
+ GST_LOG_OBJECT (splitmux, "Checking GOP collected, next GOP start %"
+ GST_STIME_FORMAT " ctx %p", GST_STIME_ARGS (next_gop_start), ctx);
for (cur = g_list_first (splitmux->contexts); cur != NULL;
cur = g_list_next (cur)) {
MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data);
" EOS %d", tmpctx, tmpctx->sinkpad,
GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos);
- if (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE &&
- tmpctx->in_running_time < splitmux->max_in_running_time &&
- !tmpctx->in_eos) {
+ if (next_gop_start != GST_CLOCK_STIME_NONE &&
+ tmpctx->in_running_time < next_gop_start && !tmpctx->in_eos) {
GST_LOG_OBJECT (splitmux,
"Context %p sink pad %" GST_PTR_FORMAT " not ready. We'll sleep",
tmpctx, tmpctx->sinkpad);
GST_DEBUG_OBJECT (splitmux,
"Collected GOP is complete. Processing (ctx %p)", ctx);
/* All pads have a complete GOP, release it into the multiqueue */
- handle_gathered_gop (splitmux);
+ handle_gathered_gop (splitmux, gop, next_gop_start,
+ max_out_running_time);
+
+ g_queue_pop_head (&splitmux->pending_input_gops);
+ input_gop_free (gop);
/* The user has requested a split, we can split now that the previous GOP
* has been collected to the correct location */
if (splitmux->input_state == SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT &&
!ctx->flushing &&
- (ctx->in_running_time >= splitmux->max_in_running_time) &&
- (splitmux->max_in_running_time != GST_CLOCK_STIME_NONE)) {
+ ctx->in_running_time >= next_gop_start &&
+ next_gop_start != GST_CLOCK_STIME_NONE) {
/* Some pad is not yet ready, or GOP is being pushed
* either way, sleep and wait to get woken */
GST_LOG_OBJECT (splitmux, "Sleeping for GOP collection (ctx %p)", ctx);
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *buf;
MqStreamBuf *buf_info = NULL;
- GstClockTime ts;
+ GstClockTime ts, pts, dts;
+ GstClockTimeDiff running_time, running_time_pts, running_time_dts;
gboolean loop_again;
gboolean keyframe = FALSE;
splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
/* Wake up other input pads to collect this GOP */
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
- check_completed_gop (splitmux, ctx);
+ if (g_queue_is_empty (&splitmux->pending_input_gops)) {
+ GST_WARNING_OBJECT (splitmux,
+ "EOS with no buffers received on the reference pad");
+
+ /* - child muxer and sink might be still locked state
+ * (see gst_splitmux_reset_elements()) so should be unlocked
+ * for state change of splitmuxsink to be applied to child
+ * - would need to post async done message
+ * - location on sink element is still null then it will post
+ * error message on bus (muxer will produce something, header
+ * data for example)
+ *
+ * Calls start_next_fragment() here, the method will address
+ * everything the above mentioned one */
+ ret = start_next_fragment (splitmux, ctx);
+ if (ret != GST_FLOW_OK)
+ goto beach;
+ } else {
+ check_completed_gop (splitmux, ctx);
+ }
} else if (splitmux->input_state ==
SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT) {
/* If we are waiting for a GOP to be completed (ie, for aux
* pads to catch up), then this pad is complete, so check
* if the whole GOP is.
*/
- check_completed_gop (splitmux, ctx);
+ if (!g_queue_is_empty (&splitmux->pending_input_gops))
+ check_completed_gop (splitmux, ctx);
}
GST_SPLITMUX_UNLOCK (splitmux);
break;
GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT,
GST_STIME_ARGS (rtime));
- if (ctx->is_reference
- && splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
- splitmux->gop_start_time = splitmux->fragment_start_time = rtime;
- GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
- GST_STIME_ARGS (splitmux->fragment_start_time));
- /* Also take this as the first start time when starting up,
- * so that we start counting overflow from the first frame */
- if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
- splitmux->max_in_running_time = splitmux->fragment_start_time;
+ if (ctx->is_reference && GST_CLOCK_STIME_IS_VALID (rtime)) {
+ /* If this GAP event happens before the first fragment then
+ * initialize the fragment start time here. */
+ if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)) {
+ splitmux->fragment_start_time = rtime;
+ GST_LOG_OBJECT (splitmux,
+ "Fragment start time now %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (splitmux->fragment_start_time));
+
+ /* Also take this as the first start time when starting up,
+ * so that we start counting overflow from the first frame */
+ if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
+ splitmux->max_in_running_time = rtime;
+ if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
+ splitmux->max_in_running_time_dts = rtime;
+ }
+
+ /* Similarly take it as fragment start PTS and GOP start time if
+ * these are not set */
+ if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time_pts))
+ splitmux->fragment_start_time_pts = rtime;
+
+ if (g_queue_is_empty (&splitmux->pending_input_gops)) {
+ InputGop *gop = g_slice_new0 (InputGop);
+
+ gop->from_gap = TRUE;
+ gop->start_time = rtime;
+ gop->start_time_pts = rtime;
+
+ g_queue_push_tail (&splitmux->pending_input_gops, gop);
+ }
}
GST_SPLITMUX_UNLOCK (splitmux);
buf = gst_pad_probe_info_get_buffer (info);
buf_info = mq_stream_buf_new ();
+ pts = GST_BUFFER_PTS (buf);
+ dts = GST_BUFFER_DTS (buf);
if (GST_BUFFER_PTS_IS_VALID (buf))
ts = GST_BUFFER_PTS (buf);
else
ts = GST_BUFFER_DTS (buf);
- GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts));
+ GST_LOG_OBJECT (pad,
+ "Buffer TS is %" GST_TIME_FORMAT " (PTS %" GST_TIME_FORMAT ", DTS %"
+ GST_TIME_FORMAT ")", GST_TIME_ARGS (ts), GST_TIME_ARGS (pts),
+ GST_TIME_ARGS (dts));
GST_SPLITMUX_LOCK (splitmux);
/* If this buffer has a timestamp, advance the input timestamp of the
* stream */
if (GST_CLOCK_TIME_IS_VALID (ts)) {
- GstClockTimeDiff running_time =
- my_segment_to_running_time (&ctx->in_segment, ts);
+ running_time = my_segment_to_running_time (&ctx->in_segment, ts);
GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT,
GST_STIME_ARGS (running_time));
+ /* in running time is always the maximum PTS (or DTS) that was observed so far */
if (GST_CLOCK_STIME_IS_VALID (running_time)
&& running_time > ctx->in_running_time)
ctx->in_running_time = running_time;
+ } else {
+ running_time = ctx->in_running_time;
+ }
+
+ if (GST_CLOCK_TIME_IS_VALID (pts))
+ running_time_pts = my_segment_to_running_time (&ctx->in_segment, pts);
+ else
+ running_time_pts = GST_CLOCK_STIME_NONE;
+
+ if (GST_CLOCK_TIME_IS_VALID (dts)) {
+ running_time_dts = my_segment_to_running_time (&ctx->in_segment, dts);
+
+ /* DTS > PTS makes conceptually no sense so catch such invalid DTS here
+ * by clamping to the PTS */
+ running_time_dts = MIN (running_time_pts, running_time_dts);
+ } else {
+ /* If there is no DTS then assume PTS=DTS */
+ running_time_dts = running_time_pts;
}
/* Try to make sure we have a valid running time */
buf_info->duration = GST_BUFFER_DURATION (buf);
if (ctx->is_reference) {
- /* initialize fragment_start_time */
- if (splitmux->fragment_start_time == GST_CLOCK_STIME_NONE) {
- splitmux->gop_start_time = splitmux->fragment_start_time =
- buf_info->run_ts;
- GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT,
- GST_STIME_ARGS (splitmux->fragment_start_time));
+ InputGop *gop = NULL;
+ GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
+
+ /* initialize fragment_start_time if it was not set yet (i.e. for the
+ * first fragment), or otherwise set it to the minimum observed time */
+ if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time)
+ || splitmux->fragment_start_time > running_time) {
+ if (!GST_CLOCK_STIME_IS_VALID (splitmux->fragment_start_time))
+ splitmux->fragment_start_time_pts = running_time_pts;
+ splitmux->fragment_start_time = running_time;
+
+ GST_LOG_OBJECT (splitmux,
+ "Fragment start time now %" GST_STIME_FORMAT " (initial PTS %"
+ GST_STIME_FORMAT ")", GST_STIME_ARGS (splitmux->fragment_start_time),
+ GST_STIME_ARGS (splitmux->fragment_start_time_pts));
/* Also take this as the first start time when starting up,
* so that we start counting overflow from the first frame */
- if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time))
+ if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)
+ || splitmux->max_in_running_time < splitmux->fragment_start_time)
splitmux->max_in_running_time = splitmux->fragment_start_time;
- }
- if (splitmux->tc_interval) {
- GstVideoTimeCodeMeta *tc_meta = gst_buffer_get_video_time_code_meta (buf);
+ if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time_dts))
+ splitmux->max_in_running_time_dts = running_time_dts;
+
if (tc_meta) {
- video_time_code_replace (&splitmux->in_tc, &tc_meta->tc);
-
- if (!splitmux->fragment_start_tc) {
- /* also initialize fragment_start_tc */
- video_time_code_replace (&splitmux->gop_start_tc, &tc_meta->tc);
- video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
-
- splitmux->next_fragment_start_tc_time =
- calculate_next_max_timecode (splitmux, splitmux->in_tc,
- ctx->in_running_time, NULL);
- GST_DEBUG_OBJECT (splitmux, "Initialize next fragment start tc time %"
- GST_TIME_FORMAT,
+ video_time_code_replace (&splitmux->fragment_start_tc, &tc_meta->tc);
+
+ splitmux->next_fragment_start_tc_time =
+ calculate_next_max_timecode (splitmux, &tc_meta->tc,
+ running_time, NULL);
+ if (splitmux->tc_interval
+ && !GST_CLOCK_TIME_IS_VALID (splitmux->next_fragment_start_tc_time))
+ {
+ GST_WARNING_OBJECT (splitmux,
+ "Couldn't calculate next fragment start time for timecode mode");
+ }
+#ifndef GST_DISABLE_GST_DEBUG
+ {
+ gchar *tc_str;
+
+ tc_str = gst_video_time_code_to_string (&tc_meta->tc);
+ GST_DEBUG_OBJECT (splitmux,
+ "Initialize fragment start timecode %s, next fragment start timecode time %"
+ GST_TIME_FORMAT, tc_str,
GST_TIME_ARGS (splitmux->next_fragment_start_tc_time));
+ g_free (tc_str);
+ }
+#endif
+ }
+ }
+
+
+ /* First check if we're at the very first GOP and the tracking was created
+ * from a GAP event. In that case don't start a new GOP on keyframes but
+ * just updated it as needed */
+ gop = g_queue_peek_tail (&splitmux->pending_input_gops);
+
+ if (!gop || (!gop->from_gap
+ && !GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT))) {
+ gop = g_slice_new0 (InputGop);
+
+ gop->start_time = running_time;
+ gop->start_time_pts = running_time_pts;
+
+ GST_LOG_OBJECT (splitmux,
+ "Next GOP start time now %" GST_STIME_FORMAT " (initial PTS %"
+ GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
+ GST_STIME_ARGS (gop->start_time_pts));
+
+ if (tc_meta) {
+ video_time_code_replace (&gop->start_tc, &tc_meta->tc);
+
+#ifndef GST_DISABLE_GST_DEBUG
+ {
+ gchar *tc_str;
+
+ tc_str = gst_video_time_code_to_string (&tc_meta->tc);
+ GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode %s", tc_str);
+ g_free (tc_str);
+ }
+#endif
+ }
+
+ g_queue_push_tail (&splitmux->pending_input_gops, gop);
+ } else {
+ gop->from_gap = FALSE;
+
+ if (!GST_CLOCK_STIME_IS_VALID (gop->start_time)
+ || gop->start_time > running_time) {
+ gop->start_time = running_time;
+
+ GST_LOG_OBJECT (splitmux,
+ "GOP start time updated now %" GST_STIME_FORMAT " (initial PTS %"
+ GST_STIME_FORMAT ")", GST_STIME_ARGS (gop->start_time),
+ GST_STIME_ARGS (gop->start_time_pts));
+
+ if (tc_meta) {
+ video_time_code_replace (&gop->start_tc, &tc_meta->tc);
+
+#ifndef GST_DISABLE_GST_DEBUG
+ {
+ gchar *tc_str;
+
+ tc_str = gst_video_time_code_to_string (&tc_meta->tc);
+ GST_DEBUG_OBJECT (splitmux, "Next GOP start timecode updated %s",
+ tc_str);
+ g_free (tc_str);
+ }
+#endif
}
}
}
/* Check whether we need to request next keyframe depending on
* current running time */
- if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) &&
- request_next_keyframe (splitmux, buf, ctx->in_running_time) == FALSE) {
+ if (request_next_keyframe (splitmux, buf, running_time_dts) == FALSE) {
GST_WARNING_OBJECT (splitmux,
"Could not request a keyframe. Files may not split at the exact location they should");
}
}
- GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
- " total GOP bytes %" G_GUINT64_FORMAT,
- GST_STIME_ARGS (buf_info->run_ts), splitmux->gop_total_bytes);
+ {
+ InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
+
+ if (gop) {
+ GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT
+ " total GOP bytes %" G_GUINT64_FORMAT ", total next GOP bytes %"
+ G_GUINT64_FORMAT, GST_STIME_ARGS (buf_info->run_ts),
+ gop->total_bytes, gop->total_bytes);
+ }
+ }
loop_again = TRUE;
do {
- if (ctx->flushing)
- break;
+ if (ctx->flushing) {
+ ret = GST_FLOW_FLUSHING;
+ goto beach;
+ }
switch (splitmux->input_state) {
case SPLITMUX_INPUT_STATE_COLLECTING_GOP_START:
- if (ctx->is_releasing) {
- /* The pad belonging to this context is being released */
- GST_WARNING_OBJECT (pad, "Pad is being released while the muxer is "
- "running. Data might not drain correctly");
- loop_again = FALSE;
- } else if (ctx->is_reference) {
+ if (ctx->is_reference) {
+ const InputGop *gop, *next_gop;
+
/* This is the reference context. If it's a keyframe,
* it marks the start of a new GOP and we should wait in
* check_completed_gop before continuing, but either way
* so set loop_again to FALSE */
loop_again = FALSE;
- if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
- /* Allow other input pads to catch up to here too */
+ gop = g_queue_peek_head (&splitmux->pending_input_gops);
+ g_assert (gop != NULL);
+ next_gop = g_queue_peek_nth (&splitmux->pending_input_gops, 1);
+
+ if (ctx->in_running_time > splitmux->max_in_running_time)
splitmux->max_in_running_time = ctx->in_running_time;
- GST_LOG_OBJECT (splitmux,
- "Max in running time now %" GST_TIME_FORMAT,
- GST_TIME_ARGS (splitmux->max_in_running_time));
+ if (running_time_dts > splitmux->max_in_running_time_dts)
+ splitmux->max_in_running_time_dts = running_time_dts;
+
+ GST_LOG_OBJECT (splitmux,
+ "Max in running time now %" GST_STIME_FORMAT ", DTS %"
+ GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_in_running_time),
+ GST_STIME_ARGS (splitmux->max_in_running_time_dts));
+
+ if (!next_gop) {
+ GST_DEBUG_OBJECT (pad, "Waiting for end of GOP");
+ /* Allow other input pads to catch up to here too */
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
break;
}
- GST_INFO_OBJECT (pad,
- "Have keyframe with running time %" GST_STIME_FORMAT,
- GST_STIME_ARGS (ctx->in_running_time));
- keyframe = TRUE;
+
+ if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT)) {
+ GST_INFO_OBJECT (pad,
+ "Have keyframe with running time %" GST_STIME_FORMAT,
+ GST_STIME_ARGS (ctx->in_running_time));
+ keyframe = TRUE;
+ }
+
+ if (running_time_dts != GST_CLOCK_STIME_NONE
+ && running_time_dts < next_gop->start_time_pts) {
+ GST_DEBUG_OBJECT (splitmux,
+ "Waiting until DTS (%" GST_STIME_FORMAT
+ ") has passed next GOP start PTS (%" GST_STIME_FORMAT ")",
+ GST_STIME_ARGS (running_time_dts),
+ GST_STIME_ARGS (next_gop->start_time_pts));
+ /* Allow other input pads to catch up to here too */
+ GST_SPLITMUX_BROADCAST_INPUT (splitmux);
+ break;
+ }
+
splitmux->input_state = SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT;
- splitmux->max_in_running_time = ctx->in_running_time;
- GST_LOG_OBJECT (splitmux, "Max in running time now %" GST_TIME_FORMAT,
- GST_TIME_ARGS (splitmux->max_in_running_time));
/* Wake up other input pads to collect this GOP */
GST_SPLITMUX_BROADCAST_INPUT (splitmux);
check_completed_gop (splitmux, ctx);
}
break;
case SPLITMUX_INPUT_STATE_WAITING_GOP_COLLECT:{
- /* We're collecting a GOP. If this is the reference context,
- * we need to check if this is a keyframe that marks the start
- * of the next GOP. If it is, it marks the end of the GOP we're
- * collecting, so sleep and wait until all the other pads also
- * reach that timestamp - at which point, we have an entire GOP
- * and either go to ENDING_FILE or release this GOP to the muxer and
- * go back to COLLECT_GOP_START. */
+ /* We're collecting a GOP, this is only ever called for non-reference
+ * contexts as the reference context would be waiting inside
+ * check_completed_gop() */
+
+ g_assert (!ctx->is_reference);
/* If we overran the target timestamp, it might be time to process
- * the GOP, otherwise bail out for more data
- */
+ * the GOP, otherwise bail out for more data. */
GST_LOG_OBJECT (pad,
"Checking TS %" GST_STIME_FORMAT " against max %"
GST_STIME_FORMAT, GST_STIME_ARGS (ctx->in_running_time),
GST_LOG_OBJECT (pad,
"Collected last packet of GOP. Checking other pads");
+
+ if (g_queue_is_empty (&splitmux->pending_input_gops)) {
+ GST_WARNING_OBJECT (pad,
+ "Reference was closed without GOP, dropping");
+ goto drop;
+ }
+
check_completed_gop (splitmux, ctx);
break;
}
}
while (loop_again);
- if (keyframe) {
+ if (keyframe && ctx->is_reference)
splitmux->queued_keyframes++;
- buf_info->keyframe = TRUE;
- }
+ buf_info->keyframe = keyframe;
- /* Update total input byte counter for overflow detect */
- splitmux->gop_total_bytes += buf_info->buf_size;
- if (ctx->is_reference) {
- splitmux->gop_reference_bytes += buf_info->buf_size;
+ /* Update total input byte counter for overflow detect unless we're after
+ * EOS now */
+ if (splitmux->input_state != SPLITMUX_INPUT_STATE_FINISHING_UP
+ && splitmux->input_state != SPLITMUX_INPUT_STATE_STOPPED) {
+ InputGop *gop = g_queue_peek_tail (&splitmux->pending_input_gops);
+
+ /* We must have a GOP at this point */
+ g_assert (gop != NULL);
+
+ gop->total_bytes += buf_info->buf_size;
+ if (ctx->is_reference) {
+ gop->reference_bytes += buf_info->buf_size;
+ }
}
/* Now add this buffer to the queue just before returning */
mq_stream_buf_free (buf_info);
GST_PAD_PROBE_INFO_FLOW_RETURN (info) = ret;
return GST_PAD_PROBE_PASS;
+drop:
+ GST_SPLITMUX_UNLOCK (splitmux);
+ if (buf_info)
+ mq_stream_buf_free (buf_info);
+ GST_PAD_PROBE_INFO_FLOW_RETURN (info) = GST_FLOW_EOS;
+ return GST_PAD_PROBE_DROP;
}
static void
/* Remove the context from our consideration */
splitmux->contexts = g_list_remove (splitmux->contexts, ctx);
+ ctx->flushing = TRUE;
+ GST_SPLITMUX_BROADCAST_INPUT (splitmux);
+
GST_SPLITMUX_UNLOCK (splitmux);
if (ctx->sink_pad_block_id) {
if (ctx->src_pad_block_id)
gst_pad_remove_probe (ctx->srcpad, ctx->src_pad_block_id);
+ /* Wait for the pad to be free */
+ GST_PAD_STREAM_LOCK (pad);
GST_SPLITMUX_LOCK (splitmux);
-
- ctx->is_releasing = TRUE;
- GST_SPLITMUX_BROADCAST_INPUT (splitmux);
+ GST_PAD_STREAM_UNLOCK (pad);
/* Can release the context now */
mq_stream_ctx_free (ctx);
gst_splitmux_sink_reset (GstSplitMuxSink * splitmux)
{
splitmux->max_in_running_time = GST_CLOCK_STIME_NONE;
- splitmux->gop_start_time = splitmux->fragment_start_time =
- GST_CLOCK_STIME_NONE;
- splitmux->max_out_running_time = 0;
+ splitmux->max_in_running_time_dts = GST_CLOCK_STIME_NONE;
+
+ splitmux->fragment_start_time = GST_CLOCK_STIME_NONE;
+ splitmux->fragment_start_time_pts = GST_CLOCK_STIME_NONE;
+ g_clear_pointer (&splitmux->fragment_start_tc, gst_video_time_code_free);
+
+ g_queue_foreach (&splitmux->pending_input_gops, (GFunc) input_gop_free, NULL);
+ g_queue_clear (&splitmux->pending_input_gops);
+
+ splitmux->max_out_running_time = GST_CLOCK_STIME_NONE;
splitmux->fragment_total_bytes = 0;
splitmux->fragment_reference_bytes = 0;
- splitmux->gop_total_bytes = 0;
- splitmux->gop_reference_bytes = 0;
splitmux->muxed_out_bytes = 0;
splitmux->ready_for_output = FALSE;