X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Fmultifile%2Fgstsplitmuxsink.c;h=98f744ff6e32ce3986eddec6b22813cbef299707;hb=369d37d227178bc4bb3c760d72e5403a3f11f261;hp=7f5daeec585dd085c9be30ee4beab22122df9b50;hpb=f5b511b42bf6052601e3601cabe3da0dce40ebdc;p=platform%2Fupstream%2Fgst-plugins-good.git diff --git a/gst/multifile/gstsplitmuxsink.c b/gst/multifile/gstsplitmuxsink.c index 7f5daee..98f744f 100644 --- a/gst/multifile/gstsplitmuxsink.c +++ b/gst/multifile/gstsplitmuxsink.c @@ -24,7 +24,7 @@ * This element wraps a muxer and a sink, and starts a new file when the mux * contents are about to cross a threshold of maximum size of maximum time, * splitting at video keyframe boundaries. Exactly one input video stream - * is required, with as many accompanying audio and subtitle streams as + * can be muxed, with as many accompanying audio and subtitle streams as * desired. * * By default, it uses mp4mux and filesink, but they can be changed via @@ -33,9 +33,10 @@ * The minimum file size is 1 GOP, however - so limits may be overrun if the * distance between any 2 keyframes is larger than the limits. * - * The splitting process is driven by the video stream contents, and - * the video stream must contain closed GOPs for the output file parts - * to be played individually correctly. + * If a video stream is available, the splitting process is driven by the video + * stream contents, and the video stream must contain closed GOPs for the output + * file parts to be played individually correctly. In the absence of a video + * stream, the first available stream is used as reference for synchronization. * * * Example pipelines @@ -53,6 +54,7 @@ #endif #include +#include #include "gstsplitmuxsink.h" GST_DEBUG_CATEGORY_STATIC (splitmux_debug); @@ -69,6 +71,7 @@ enum PROP_LOCATION, PROP_MAX_SIZE_TIME, PROP_MAX_SIZE_BYTES, + PROP_MAX_FILES, PROP_MUXER_OVERHEAD, PROP_MUXER, PROP_SINK @@ -76,10 +79,19 @@ enum #define DEFAULT_MAX_SIZE_TIME 0 #define DEFAULT_MAX_SIZE_BYTES 0 +#define DEFAULT_MAX_FILES 0 #define DEFAULT_MUXER_OVERHEAD 0.02 #define DEFAULT_MUXER "mp4mux" #define DEFAULT_SINK "filesink" +enum +{ + SIGNAL_FORMAT_LOCATION, + SIGNAL_LAST +}; + +static guint signals[SIGNAL_LAST]; + static GstStaticPadTemplate video_sink_template = GST_STATIC_PAD_TEMPLATE ("video", GST_PAD_SINK, @@ -130,6 +142,8 @@ static void start_next_fragment (GstSplitMuxSink * splitmux); static void check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx); static void mq_stream_ctx_unref (MqStreamCtx * ctx); +static void gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux); + static MqStreamBuf * mq_stream_buf_new (void) { @@ -159,12 +173,12 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass) "Convenience bin that muxes incoming streams into multiple time/size limited files", "Jan Schmidt "); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&video_sink_template)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&audio_sink_template)); - gst_element_class_add_pad_template (gstelement_class, - gst_static_pad_template_get (&subtitle_sink_template)); + gst_element_class_add_static_pad_template (gstelement_class, + &video_sink_template); + gst_element_class_add_static_pad_template (gstelement_class, + &audio_sink_template); + gst_element_class_add_static_pad_template (gstelement_class, + &subtitle_sink_template); gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_splitmux_sink_change_state); @@ -193,6 +207,13 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass) g_param_spec_uint64 ("max-size-bytes", "Max. size bytes", "Max. amount of data per file (in bytes, 0=disable)", 0, G_MAXUINT64, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_MAX_FILES, + g_param_spec_uint ("max-files", "Max files", + "Maximum number of files to keep on disk. Once the maximum is reached," + "old files start to be deleted to make room for new ones.", + 0, G_MAXUINT, DEFAULT_MAX_FILES, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_MUXER, g_param_spec_object ("muxer", "Muxer", @@ -202,6 +223,17 @@ gst_splitmux_sink_class_init (GstSplitMuxSinkClass * klass) g_param_spec_object ("sink", "Sink", "The sink element (or element chain) to use (NULL = default filesink)", GST_TYPE_ELEMENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * GstSplitMuxSink::format-location: + * @splitmux: the #GstSplitMuxSink + * @fragment_id: the sequence number of the file to be created + * + * Returns: the location to be used for the next output file + */ + signals[SIGNAL_FORMAT_LOCATION] = + g_signal_new ("format-location", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_STRING, 1, G_TYPE_UINT); } static void @@ -213,6 +245,7 @@ gst_splitmux_sink_init (GstSplitMuxSink * splitmux) splitmux->mux_overhead = DEFAULT_MUXER_OVERHEAD; splitmux->threshold_time = DEFAULT_MAX_SIZE_TIME; splitmux->threshold_bytes = DEFAULT_MAX_SIZE_BYTES; + splitmux->max_files = DEFAULT_MAX_FILES; GST_OBJECT_FLAG_SET (splitmux, GST_ELEMENT_FLAG_SINK); } @@ -248,6 +281,7 @@ gst_splitmux_sink_finalize (GObject * object) { GstSplitMuxSink *splitmux = GST_SPLITMUX_SINK (object); g_cond_clear (&splitmux->data_cond); + g_mutex_clear (&splitmux->lock); if (splitmux->provided_sink) gst_object_unref (splitmux->provided_sink); if (splitmux->provided_muxer) @@ -286,24 +320,29 @@ gst_splitmux_sink_set_property (GObject * object, guint prop_id, splitmux->threshold_time = g_value_get_uint64 (value); GST_OBJECT_UNLOCK (splitmux); break; + case PROP_MAX_FILES: + GST_OBJECT_LOCK (splitmux); + splitmux->max_files = g_value_get_uint (value); + GST_OBJECT_UNLOCK (splitmux); + break; case PROP_MUXER_OVERHEAD: GST_OBJECT_LOCK (splitmux); splitmux->mux_overhead = g_value_get_double (value); GST_OBJECT_UNLOCK (splitmux); break; case PROP_SINK: - GST_SPLITMUX_LOCK (splitmux); + GST_OBJECT_LOCK (splitmux); if (splitmux->provided_sink) gst_object_unref (splitmux->provided_sink); splitmux->provided_sink = g_value_dup_object (value); - GST_SPLITMUX_UNLOCK (splitmux); + GST_OBJECT_UNLOCK (splitmux); break; case PROP_MUXER: - GST_SPLITMUX_LOCK (splitmux); + GST_OBJECT_LOCK (splitmux); if (splitmux->provided_muxer) gst_object_unref (splitmux->provided_muxer); splitmux->provided_muxer = g_value_dup_object (value); - GST_SPLITMUX_UNLOCK (splitmux); + GST_OBJECT_UNLOCK (splitmux); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -333,20 +372,25 @@ gst_splitmux_sink_get_property (GObject * object, guint prop_id, g_value_set_uint64 (value, splitmux->threshold_time); GST_OBJECT_UNLOCK (splitmux); break; + case PROP_MAX_FILES: + GST_OBJECT_LOCK (splitmux); + g_value_set_uint (value, splitmux->max_files); + GST_OBJECT_UNLOCK (splitmux); + break; case PROP_MUXER_OVERHEAD: GST_OBJECT_LOCK (splitmux); g_value_set_double (value, splitmux->mux_overhead); GST_OBJECT_UNLOCK (splitmux); break; case PROP_SINK: - GST_SPLITMUX_LOCK (splitmux); + GST_OBJECT_LOCK (splitmux); g_value_set_object (value, splitmux->provided_sink); - GST_SPLITMUX_UNLOCK (splitmux); + GST_OBJECT_UNLOCK (splitmux); break; case PROP_MUXER: - GST_SPLITMUX_LOCK (splitmux); + GST_OBJECT_LOCK (splitmux); g_value_set_object (value, splitmux->provided_muxer); - GST_SPLITMUX_UNLOCK (splitmux); + GST_OBJECT_UNLOCK (splitmux); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -354,6 +398,23 @@ gst_splitmux_sink_get_property (GObject * object, guint prop_id, } } +/* Convenience function */ +static inline GstClockTimeDiff +my_segment_to_running_time (GstSegment * segment, GstClockTime val) +{ + GstClockTimeDiff res = GST_CLOCK_STIME_NONE; + + if (GST_CLOCK_TIME_IS_VALID (val)) { + gboolean sign = + gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val); + if (sign > 0) + res = val; + else if (sign < 0) + res = -val; + } + return res; +} + static GstPad * mq_sink_to_src (GstElement * mq, GstPad * sink_pad) { @@ -409,7 +470,7 @@ mq_stream_ctx_new (GstSplitMuxSink * splitmux) ctx->splitmux = splitmux; gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); gst_segment_init (&ctx->out_segment, GST_FORMAT_UNDEFINED); - ctx->in_running_time = ctx->out_running_time = 0; + ctx->in_running_time = ctx->out_running_time = GST_CLOCK_STIME_NONE; g_queue_init (&ctx->queued_bufs); return ctx; } @@ -449,6 +510,26 @@ _pad_block_destroy_src_notify (MqStreamCtx * ctx) mq_stream_ctx_unref (ctx); } +static void +send_fragment_opened_closed_msg (GstSplitMuxSink * splitmux, gboolean opened) +{ + gchar *location = NULL; + GstMessage *msg; + const gchar *msg_name = opened ? + "splitmuxsink-fragment-opened" : "splitmuxsink-fragment-closed"; + + g_object_get (splitmux->sink, "location", &location, NULL); + + msg = gst_message_new_element (GST_OBJECT (splitmux), + gst_structure_new (msg_name, + "location", G_TYPE_STRING, location, + "running-time", GST_TYPE_CLOCK_TIME, + splitmux->reference_ctx->out_running_time, NULL)); + gst_element_post_message (GST_ELEMENT_CAST (splitmux), msg); + + g_free (location); +} + /* Called with lock held, drops the lock to send EOS to the * pad */ @@ -481,13 +562,15 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) do { GST_LOG_OBJECT (ctx->srcpad, - "Checking running time %" GST_TIME_FORMAT " against max %" - GST_TIME_FORMAT, GST_TIME_ARGS (ctx->out_running_time), - GST_TIME_ARGS (splitmux->max_out_running_time)); + "Checking running time %" GST_STIME_FORMAT " against max %" + GST_STIME_FORMAT, GST_STIME_ARGS (ctx->out_running_time), + GST_STIME_ARGS (splitmux->max_out_running_time)); - if (splitmux->max_out_running_time == GST_CLOCK_TIME_NONE || - ctx->out_running_time < splitmux->max_out_running_time) + if (splitmux->max_out_running_time == GST_CLOCK_STIME_NONE || + ctx->out_running_time < splitmux->max_out_running_time) { + splitmux->have_muxed_something = TRUE; return; + } if (ctx->flushing || splitmux->state == SPLITMUX_STATE_STOPPED) return; @@ -504,17 +587,17 @@ complete_or_wait_on_out (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) GST_INFO_OBJECT (ctx->srcpad, "Sleeping for running time %" - GST_TIME_FORMAT " (max %" GST_TIME_FORMAT ")", - GST_TIME_ARGS (ctx->out_running_time), - GST_TIME_ARGS (splitmux->max_out_running_time)); + GST_STIME_FORMAT " (max %" GST_STIME_FORMAT ")", + GST_STIME_ARGS (ctx->out_running_time), + GST_STIME_ARGS (splitmux->max_out_running_time)); ctx->out_blocked = TRUE; /* Expand the mq if needed before sleeping */ check_queue_length (splitmux, ctx); GST_SPLITMUX_WAIT (splitmux); ctx->out_blocked = FALSE; GST_INFO_OBJECT (ctx->srcpad, - "Woken for new max running time %" GST_TIME_FORMAT, - GST_TIME_ARGS (splitmux->max_out_running_time)); + "Woken for new max running time %" GST_STIME_FORMAT, + GST_STIME_ARGS (splitmux->max_out_running_time)); } while (1); } @@ -524,7 +607,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) GstSplitMuxSink *splitmux = ctx->splitmux; MqStreamBuf *buf_info = NULL; - GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type); + GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type); /* FIXME: Handle buffer lists, until then make it clear they won't work */ if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) { @@ -564,6 +647,7 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) break; case GST_EVENT_GAP:{ GstClockTime gap_ts; + GstClockTimeDiff rtime; gst_event_parse_gap (event, &gap_ts, NULL); if (gap_ts == GST_CLOCK_TIME_NONE) @@ -571,19 +655,40 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) GST_SPLITMUX_LOCK (splitmux); - gap_ts = gst_segment_to_running_time (&ctx->out_segment, - GST_FORMAT_TIME, gap_ts); + rtime = my_segment_to_running_time (&ctx->out_segment, gap_ts); - GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_TIME_FORMAT, - GST_TIME_ARGS (gap_ts)); + GST_LOG_OBJECT (pad, "Have GAP w/ ts %" GST_STIME_FORMAT, + GST_STIME_ARGS (rtime)); if (splitmux->state == SPLITMUX_STATE_STOPPED) goto beach; - ctx->out_running_time = gap_ts; - complete_or_wait_on_out (splitmux, ctx); + + if (rtime != GST_CLOCK_STIME_NONE) { + ctx->out_running_time = rtime; + complete_or_wait_on_out (splitmux, ctx); + } GST_SPLITMUX_UNLOCK (splitmux); break; } + case GST_EVENT_CUSTOM_DOWNSTREAM:{ + const GstStructure *s; + GstClockTimeDiff ts = 0; + + s = gst_event_get_structure (event); + if (!gst_structure_has_name (s, "splitmuxsink-unblock")) + break; + + gst_structure_get_int64 (s, "timestamp", &ts); + + GST_SPLITMUX_LOCK (splitmux); + + if (splitmux->state == SPLITMUX_STATE_STOPPED) + goto beach; + ctx->out_running_time = ts; + complete_or_wait_on_out (splitmux, ctx); + GST_SPLITMUX_UNLOCK (splitmux); + return GST_PAD_PROBE_DROP; + } default: break; } @@ -605,13 +710,18 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) ctx->out_running_time = buf_info->run_ts; GST_LOG_OBJECT (splitmux, - "Pad %" GST_PTR_FORMAT " buffer with TS %" GST_TIME_FORMAT + "Pad %" GST_PTR_FORMAT " buffer with run TS %" GST_STIME_FORMAT " size %" G_GSIZE_FORMAT, - pad, GST_TIME_ARGS (ctx->out_running_time), buf_info->buf_size); + pad, GST_STIME_ARGS (ctx->out_running_time), buf_info->buf_size); + + if (splitmux->opening_first_fragment) { + send_fragment_opened_closed_msg (splitmux, TRUE); + splitmux->opening_first_fragment = FALSE; + } complete_or_wait_on_out (splitmux, ctx); - if (splitmux->muxed_out_time == GST_CLOCK_TIME_NONE || + if (splitmux->muxed_out_time == GST_CLOCK_STIME_NONE || splitmux->muxed_out_time < buf_info->run_ts) splitmux->muxed_out_time = buf_info->run_ts; @@ -621,8 +731,8 @@ handle_mq_output (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) { GstBuffer *buf = gst_pad_probe_info_get_buffer (info); GST_LOG_OBJECT (pad, "Returning to pass buffer %" GST_PTR_FORMAT - " run ts %" GST_TIME_FORMAT, buf, - GST_TIME_ARGS (ctx->out_running_time)); + " run ts %" GST_STIME_FORMAT, buf, + GST_STIME_ARGS (ctx->out_running_time)); } #endif @@ -651,8 +761,8 @@ restart_context (MqStreamCtx * ctx, GstSplitMuxSink * splitmux) gst_pad_sticky_events_foreach (ctx->srcpad, (GstPadStickyEventsForeachFunction) (resend_sticky), peer); - /* Clear EOS flag */ - ctx->out_eos = FALSE; + /* Clear EOS flag if not actually EOS */ + ctx->out_eos = GST_PAD_IS_EOS (ctx->srcpad); gst_object_unref (peer); } @@ -665,30 +775,45 @@ static void start_next_fragment (GstSplitMuxSink * splitmux) { /* 1 change to new file */ + splitmux->switching_fragment = TRUE; + + gst_element_set_locked_state (splitmux->muxer, TRUE); + gst_element_set_locked_state (splitmux->active_sink, TRUE); gst_element_set_state (splitmux->muxer, GST_STATE_NULL); gst_element_set_state (splitmux->active_sink, GST_STATE_NULL); set_next_filename (splitmux); - gst_element_sync_state_with_parent (splitmux->active_sink); - gst_element_sync_state_with_parent (splitmux->muxer); + gst_element_set_state (splitmux->active_sink, GST_STATE_TARGET (splitmux)); + gst_element_set_state (splitmux->muxer, GST_STATE_TARGET (splitmux)); + gst_element_set_locked_state (splitmux->muxer, FALSE); + gst_element_set_locked_state (splitmux->active_sink, FALSE); + + splitmux->switching_fragment = FALSE; g_list_foreach (splitmux->contexts, (GFunc) restart_context, splitmux); /* Switch state and go back to processing */ - splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; - if (!splitmux->video_ctx->in_eos) - splitmux->max_out_running_time = splitmux->video_ctx->in_running_time; - else - splitmux->max_out_running_time = GST_CLOCK_TIME_NONE; + if (!splitmux->reference_ctx->in_eos) { + splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; + splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time; + } else { + splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; + splitmux->max_out_running_time = GST_CLOCK_STIME_NONE; + splitmux->have_muxed_something = FALSE; + } + splitmux->have_muxed_something = + (splitmux->reference_ctx->in_running_time > splitmux->muxed_out_time); /* Store the overflow parameters as the basis for the next fragment */ splitmux->mux_start_time = splitmux->muxed_out_time; splitmux->mux_start_bytes = splitmux->muxed_out_bytes; GST_DEBUG_OBJECT (splitmux, - "Restarting flow for new fragment. New running time %" GST_TIME_FORMAT, - GST_TIME_ARGS (splitmux->max_out_running_time)); + "Restarting flow for new fragment. New running time %" GST_STIME_FORMAT, + GST_STIME_ARGS (splitmux->max_out_running_time)); + + send_fragment_opened_closed_msg (splitmux, TRUE); GST_SPLITMUX_BROADCAST (splitmux); } @@ -702,8 +827,11 @@ bus_handler (GstBin * bin, GstMessage * message) case GST_MESSAGE_EOS: /* If the state is draining out the current file, drop this EOS */ GST_SPLITMUX_LOCK (splitmux); + + send_fragment_opened_closed_msg (splitmux, FALSE); + if (splitmux->state == SPLITMUX_STATE_ENDING_FILE && - splitmux->max_out_running_time != GST_CLOCK_TIME_NONE) { + splitmux->max_out_running_time != GST_CLOCK_STIME_NONE) { GST_DEBUG_OBJECT (splitmux, "Caught EOS at end of fragment, dropping"); splitmux->state = SPLITMUX_STATE_START_NEXT_FRAGMENT; GST_SPLITMUX_BROADCAST (splitmux); @@ -714,6 +842,20 @@ bus_handler (GstBin * bin, GstMessage * message) } GST_SPLITMUX_UNLOCK (splitmux); break; + case GST_MESSAGE_ASYNC_START: + case GST_MESSAGE_ASYNC_DONE: + /* Ignore state changes from our children while switching */ + if (splitmux->switching_fragment) { + if (GST_MESSAGE_SRC (message) == (GstObject *) splitmux->active_sink || + GST_MESSAGE_SRC (message) == (GstObject *) splitmux->muxer) { + GST_LOG_OBJECT (splitmux, + "Ignoring state change from child %" GST_PTR_FORMAT + " while switching", GST_MESSAGE_SRC (message)); + gst_message_unref (message); + return; + } + } + break; default: break; } @@ -733,7 +875,7 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) { GList *cur; gsize queued_bytes = 0; - GstClockTime queued_time = 0; + GstClockTimeDiff queued_time = 0; /* Assess if the multiqueue contents overflowed the current file */ for (cur = g_list_first (splitmux->contexts); @@ -753,41 +895,45 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) /* Expand queued bytes estimate by muxer overhead */ queued_bytes += (queued_bytes * splitmux->mux_overhead); - GST_LOG_OBJECT (splitmux, "mq at TS %" GST_TIME_FORMAT - " bytes %" G_GSIZE_FORMAT, GST_TIME_ARGS (queued_time), queued_bytes); + GST_LOG_OBJECT (splitmux, "mq at TS %" GST_STIME_FORMAT + " bytes %" G_GSIZE_FORMAT, GST_STIME_ARGS (queued_time), queued_bytes); /* Check for overrun - have we output at least one byte and overrun * either threshold? */ - if ((splitmux->mux_start_bytes < splitmux->muxed_out_bytes) && - ((splitmux->threshold_bytes > 0 && - queued_bytes >= splitmux->threshold_bytes) || - (splitmux->threshold_time > 0 && - queued_time >= splitmux->threshold_time))) { + if ((splitmux->have_muxed_something && + ((splitmux->threshold_bytes > 0 && + queued_bytes > splitmux->threshold_bytes) || + (splitmux->threshold_time > 0 && + queued_time > splitmux->threshold_time)))) { splitmux->state = SPLITMUX_STATE_ENDING_FILE; GST_INFO_OBJECT (splitmux, "mq overflowed since last, draining out. max out TS is %" - GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time)); + GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time)); GST_SPLITMUX_BROADCAST (splitmux); } else { /* No overflow */ GST_LOG_OBJECT (splitmux, "This GOP didn't overflow the fragment. Bytes sent %" G_GSIZE_FORMAT - " queued %" G_GSIZE_FORMAT " time %" GST_TIME_FORMAT " Continuing.", + " queued %" G_GSIZE_FORMAT " time %" GST_STIME_FORMAT " Continuing.", splitmux->muxed_out_bytes - splitmux->mux_start_bytes, - queued_bytes, GST_TIME_ARGS (queued_time)); + queued_bytes, GST_STIME_ARGS (queued_time)); /* Wake everyone up to push this one GOP, then sleep */ - splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; - if (!splitmux->video_ctx->in_eos) - splitmux->max_out_running_time = splitmux->video_ctx->in_running_time; - else - splitmux->max_out_running_time = GST_CLOCK_TIME_NONE; + splitmux->have_muxed_something = TRUE; + + if (!splitmux->reference_ctx->in_eos) { + splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; + splitmux->max_out_running_time = splitmux->reference_ctx->in_running_time; + } else { + splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; + splitmux->max_out_running_time = GST_CLOCK_STIME_NONE; + } GST_LOG_OBJECT (splitmux, "Waking output for complete GOP, TS %" - GST_TIME_FORMAT, GST_TIME_ARGS (splitmux->max_out_running_time)); + GST_STIME_FORMAT, GST_STIME_ARGS (splitmux->max_out_running_time)); GST_SPLITMUX_BROADCAST (splitmux); } @@ -795,7 +941,7 @@ handle_gathered_gop (GstSplitMuxSink * splitmux) /* Called with splitmux lock held */ /* Called from each input pad when it is has all the pieces - * for a GOP or EOS, starting with the video pad which has set the + * for a GOP or EOS, starting with the reference pad which has set the * splitmux->max_in_running_time */ static void @@ -803,21 +949,25 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) { GList *cur; gboolean ready = TRUE; + GstClockTimeDiff current_max_in_running_time; if (splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE) { /* Iterate each pad, and check that the input running time is at least - * up to the video runnning time, and if so handle the collected GOP */ - GST_LOG_OBJECT (splitmux, "Checking GOP collected, ctx %p", ctx); - for (cur = g_list_first (splitmux->contexts); - cur != NULL; cur = g_list_next (cur)) { + * up to the reference running time, and if so handle the collected GOP */ + GST_LOG_OBJECT (splitmux, "Checking GOP collected, Max in running time %" + GST_STIME_FORMAT " ctx %p", + GST_STIME_ARGS (splitmux->max_in_running_time), ctx); + for (cur = g_list_first (splitmux->contexts); cur != NULL; + cur = g_list_next (cur)) { MqStreamCtx *tmpctx = (MqStreamCtx *) (cur->data); GST_LOG_OBJECT (splitmux, - "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_TIME_FORMAT + "Context %p (src pad %" GST_PTR_FORMAT ") TS %" GST_STIME_FORMAT " EOS %d", tmpctx, tmpctx->srcpad, - GST_TIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos); + GST_STIME_ARGS (tmpctx->in_running_time), tmpctx->in_eos); - if (tmpctx->in_running_time < splitmux->max_in_running_time && + if (splitmux->max_in_running_time != G_MAXINT64 && + tmpctx->in_running_time < splitmux->max_in_running_time && !tmpctx->in_eos) { GST_LOG_OBJECT (splitmux, "Context %p (src pad %" GST_PTR_FORMAT ") not ready. We'll sleep", @@ -834,11 +984,18 @@ check_completed_gop (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) } } + /* If upstream reached EOS we are not expecting more data, no need to wait + * here. */ + if (ctx->in_eos) + return; + /* Some pad is not yet ready, or GOP is being pushed * either way, sleep and wait to get woken */ + current_max_in_running_time = splitmux->max_in_running_time; while ((splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE || splitmux->state == SPLITMUX_STATE_START_NEXT_FRAGMENT) && - !ctx->flushing) { + !ctx->flushing && + (current_max_in_running_time == splitmux->max_in_running_time)) { GST_LOG_OBJECT (splitmux, "Sleeping for %s (ctx %p)", splitmux->state == SPLITMUX_STATE_WAITING_GOP_COMPLETE ? @@ -871,7 +1028,7 @@ check_queue_length (GstSplitMuxSink * splitmux, MqStreamCtx * ctx) splitmux->queued_gops <= 1) { allow_grow = TRUE; } else if (splitmux->state == SPLITMUX_STATE_COLLECTING_GOP_START && - ctx->is_video) { + ctx->is_reference && splitmux->queued_gops <= 1) { allow_grow = TRUE; } @@ -912,7 +1069,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) gboolean loop_again; gboolean keyframe = FALSE; - GST_LOG_OBJECT (pad, "Fired probe type 0x%x\n", info->type); + GST_LOG_OBJECT (pad, "Fired probe type 0x%x", info->type); /* FIXME: Handle buffer lists, until then make it clear they won't work */ if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) { @@ -930,7 +1087,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) gst_segment_init (&ctx->in_segment, GST_FORMAT_UNDEFINED); ctx->in_eos = FALSE; ctx->in_bytes = 0; - ctx->in_running_time = 0; + ctx->in_running_time = GST_CLOCK_STIME_NONE; GST_SPLITMUX_UNLOCK (splitmux); break; case GST_EVENT_EOS: @@ -940,10 +1097,10 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) if (splitmux->state == SPLITMUX_STATE_STOPPED) goto beach; - if (ctx->is_video) { - GST_INFO_OBJECT (splitmux, "Got Video EOS. Finishing up"); + if (ctx->is_reference) { + GST_INFO_OBJECT (splitmux, "Got Reference EOS. Finishing up"); /* Act as if this is a new keyframe with infinite timestamp */ - splitmux->max_in_running_time = GST_CLOCK_TIME_NONE; + splitmux->max_in_running_time = G_MAXINT64; splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; /* Wake up other input pads to collect this GOP */ GST_SPLITMUX_BROADCAST (splitmux); @@ -964,8 +1121,6 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) } buf = gst_pad_probe_info_get_buffer (info); - ctx->in_running_time = gst_segment_to_running_time (&ctx->in_segment, - GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buf)); buf_info = mq_stream_buf_new (); if (GST_BUFFER_PTS_IS_VALID (buf)) @@ -973,6 +1128,8 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) else ts = GST_BUFFER_DTS (buf); + GST_LOG_OBJECT (pad, "Buffer TS is %" GST_TIME_FORMAT, GST_TIME_ARGS (ts)); + GST_SPLITMUX_LOCK (splitmux); if (splitmux->state == SPLITMUX_STATE_STOPPED) @@ -981,32 +1138,46 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) /* If this buffer has a timestamp, advance the input timestamp of the * stream */ if (GST_CLOCK_TIME_IS_VALID (ts)) { - GstClockTime running_time = - gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME, - GST_BUFFER_TIMESTAMP (buf)); + GstClockTimeDiff running_time = + my_segment_to_running_time (&ctx->in_segment, ts); - if (GST_CLOCK_TIME_IS_VALID (running_time) && - (ctx->in_running_time == GST_CLOCK_TIME_NONE - || running_time > ctx->in_running_time)) + GST_LOG_OBJECT (pad, "Buffer running TS is %" GST_STIME_FORMAT, + GST_STIME_ARGS (running_time)); + + if (GST_CLOCK_STIME_IS_VALID (running_time) + && running_time > ctx->in_running_time) ctx->in_running_time = running_time; } /* Try to make sure we have a valid running time */ - if (!GST_CLOCK_TIME_IS_VALID (ctx->in_running_time)) { + if (!GST_CLOCK_STIME_IS_VALID (ctx->in_running_time)) { ctx->in_running_time = - gst_segment_to_running_time (&ctx->in_segment, GST_FORMAT_TIME, - ctx->in_segment.start); + my_segment_to_running_time (&ctx->in_segment, ctx->in_segment.start); } + GST_LOG_OBJECT (pad, "in running time now %" GST_STIME_FORMAT, + GST_STIME_ARGS (ctx->in_running_time)); + buf_info->run_ts = ctx->in_running_time; buf_info->buf_size = gst_buffer_get_size (buf); /* Update total input byte counter for overflow detect */ ctx->in_bytes += buf_info->buf_size; - GST_DEBUG_OBJECT (pad, "Buf TS %" GST_TIME_FORMAT + /* initialize mux_start_time */ + if (ctx->is_reference && splitmux->mux_start_time == GST_CLOCK_STIME_NONE) { + splitmux->mux_start_time = buf_info->run_ts; + GST_LOG_OBJECT (splitmux, "Mux start time now %" GST_STIME_FORMAT, + GST_STIME_ARGS (splitmux->mux_start_time)); + /* Also take this as the first start time when starting up, + * so that we start counting overflow from the first frame */ + if (!GST_CLOCK_STIME_IS_VALID (splitmux->max_in_running_time)) + splitmux->max_in_running_time = splitmux->mux_start_time; + } + + GST_DEBUG_OBJECT (pad, "Buf TS %" GST_STIME_FORMAT " total in_bytes %" G_GSIZE_FORMAT, - GST_TIME_ARGS (buf_info->run_ts), ctx->in_bytes); + GST_STIME_ARGS (buf_info->run_ts), ctx->in_bytes); loop_again = TRUE; do { @@ -1015,18 +1186,18 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) switch (splitmux->state) { case SPLITMUX_STATE_COLLECTING_GOP_START: - if (ctx->is_video) { + if (ctx->is_reference) { /* If a keyframe, we have a complete GOP */ if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) || - !GST_CLOCK_TIME_IS_VALID (ctx->in_running_time) || + !GST_CLOCK_STIME_IS_VALID (ctx->in_running_time) || splitmux->max_in_running_time >= ctx->in_running_time) { /* Pass this buffer through */ loop_again = FALSE; break; } GST_INFO_OBJECT (pad, - "Have keyframe with running time %" GST_TIME_FORMAT, - GST_TIME_ARGS (ctx->in_running_time)); + "Have keyframe with running time %" GST_STIME_FORMAT, + GST_STIME_ARGS (ctx->in_running_time)); keyframe = TRUE; splitmux->state = SPLITMUX_STATE_WAITING_GOP_COMPLETE; splitmux->max_in_running_time = ctx->in_running_time; @@ -1034,7 +1205,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) GST_SPLITMUX_BROADCAST (splitmux); check_completed_gop (splitmux, ctx); } else { - /* We're still waiting for a keyframe on the video pad, sleep */ + /* We're still waiting for a keyframe on the reference pad, sleep */ GST_LOG_OBJECT (pad, "Sleeping for GOP start"); GST_SPLITMUX_WAIT (splitmux); GST_LOG_OBJECT (pad, "Done sleeping for GOP start state now %d", @@ -1042,14 +1213,14 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) } break; case SPLITMUX_STATE_WAITING_GOP_COMPLETE: - /* After a GOP start is found, this buffer might complete the GOP */ + /* If we overran the target timestamp, it might be time to process * the GOP, otherwise bail out for more data */ GST_LOG_OBJECT (pad, - "Checking TS %" GST_TIME_FORMAT " against max %" GST_TIME_FORMAT, - GST_TIME_ARGS (ctx->in_running_time), - GST_TIME_ARGS (splitmux->max_in_running_time)); + "Checking TS %" GST_STIME_FORMAT " against max %" GST_STIME_FORMAT, + GST_STIME_ARGS (ctx->in_running_time), + GST_STIME_ARGS (splitmux->max_in_running_time)); if (ctx->in_running_time < splitmux->max_in_running_time) { loop_again = FALSE; @@ -1060,7 +1231,29 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) "Collected last packet of GOP. Checking other pads"); check_completed_gop (splitmux, ctx); break; - case SPLITMUX_STATE_ENDING_FILE: + case SPLITMUX_STATE_ENDING_FILE:{ + GstEvent *event; + + /* If somes streams received no buffer during the last GOP that overran, + * because its next buffer has a timestamp bigger than + * ctx->max_in_running_time, its queue is empty. In that case the only + * way to wakeup the output thread is by injecting an event in the + * queue. This usually happen with subtitle streams. + * See https://bugzilla.gnome.org/show_bug.cgi?id=763711. */ + GST_LOG_OBJECT (pad, "Sending splitmuxsink-unblock event"); + event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM | + GST_EVENT_TYPE_SERIALIZED, + gst_structure_new ("splitmuxsink-unblock", "timestamp", + G_TYPE_INT64, splitmux->max_in_running_time, NULL)); + + GST_SPLITMUX_UNLOCK (splitmux); + gst_pad_send_event (ctx->sinkpad, event); + GST_SPLITMUX_LOCK (splitmux); + /* state may have changed while we were unlocked. Loop again if so */ + if (splitmux->state != SPLITMUX_STATE_ENDING_FILE) + break; + /* fallthrough */ + } case SPLITMUX_STATE_START_NEXT_FRAGMENT: /* A fragment is ending, wait until that's done before continuing */ GST_DEBUG_OBJECT (pad, "Sleeping for fragment restart"); @@ -1086,7 +1279,7 @@ handle_mq_input (GstPad * pad, GstPadProbeInfo * info, MqStreamCtx * ctx) check_queue_length (splitmux, ctx); GST_LOG_OBJECT (pad, "Returning to queue buffer %" GST_PTR_FORMAT - " run ts %" GST_TIME_FORMAT, buf, GST_TIME_ARGS (ctx->in_running_time)); + " run ts %" GST_STIME_FORMAT, buf, GST_STIME_ARGS (ctx->in_running_time)); GST_SPLITMUX_UNLOCK (splitmux); return GST_PAD_PROBE_PASS; @@ -1129,6 +1322,12 @@ gst_splitmux_sink_request_new_pad (GstElement * element, gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (splitmux->muxer), templ->name_template); } + if (mux_template == NULL) { + /* Fallback to find sink pad templates named 'sink_%d' (mpegtsmux) */ + mux_template = + gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS + (splitmux->muxer), "sink_%d"); + } } res = gst_element_request_pad (splitmux->muxer, mux_template, name, caps); @@ -1159,7 +1358,6 @@ gst_splitmux_sink_request_new_pad (GstElement * element, gst_object_unref (GST_OBJECT (res)); ctx = mq_stream_ctx_new (splitmux); - ctx->is_video = is_video; ctx->srcpad = mq_src; ctx->sinkpad = mq_sink; @@ -1168,15 +1366,21 @@ gst_splitmux_sink_request_new_pad (GstElement * element, gst_pad_add_probe (mq_src, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM, (GstPadProbeCallback) handle_mq_output, ctx, (GDestroyNotify) _pad_block_destroy_src_notify); - if (is_video) - splitmux->video_ctx = ctx; + if (is_video && splitmux->reference_ctx != NULL) { + splitmux->reference_ctx->is_reference = FALSE; + splitmux->reference_ctx = NULL; + } + if (splitmux->reference_ctx == NULL) { + splitmux->reference_ctx = ctx; + ctx->is_reference = TRUE; + } res = gst_ghost_pad_new (gname, mq_sink); g_object_set_qdata ((GObject *) (res), PAD_CONTEXT, ctx); mq_stream_ctx_ref (ctx); ctx->sink_pad_block_id = - gst_pad_add_probe (res, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM, + gst_pad_add_probe (mq_sink, GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM, (GstPadProbeCallback) handle_mq_input, ctx, (GDestroyNotify) _pad_block_destroy_sink_notify); @@ -1243,6 +1447,10 @@ gst_splitmux_sink_release_pad (GstElement * element, GstPad * pad) gst_element_remove_pad (element, pad); + /* Reset the internal elements only after all request pads are released */ + if (splitmux->contexts == NULL) + gst_splitmux_reset (splitmux); + fail: GST_SPLITMUX_UNLOCK (splitmux); } @@ -1282,16 +1490,26 @@ create_elements (GstSplitMuxSink * splitmux) } if (splitmux->muxer == NULL) { - if (splitmux->provided_muxer == NULL) { + GstElement *provided_muxer = NULL; + + GST_OBJECT_LOCK (splitmux); + if (splitmux->provided_muxer != NULL) + provided_muxer = gst_object_ref (splitmux->provided_muxer); + GST_OBJECT_UNLOCK (splitmux); + + if (provided_muxer == NULL) { if ((splitmux->muxer = create_element (splitmux, "mp4mux", "muxer")) == NULL) goto fail; } else { - splitmux->muxer = splitmux->provided_muxer; - if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_muxer)) { + if (!gst_bin_add (GST_BIN (splitmux), provided_muxer)) { g_warning ("Could not add muxer element - splitmuxsink will not work"); + gst_object_unref (provided_muxer); goto fail; } + + splitmux->muxer = provided_muxer; + gst_object_unref (provided_muxer); } } @@ -1345,33 +1563,45 @@ find_sink (GstElement * e) static gboolean create_sink (GstSplitMuxSink * splitmux) { - g_return_val_if_fail (splitmux->active_sink == NULL, TRUE); + GstElement *provided_sink = NULL; - if (splitmux->provided_sink == NULL) { - if ((splitmux->sink = - create_element (splitmux, DEFAULT_SINK, "sink")) == NULL) - goto fail; - splitmux->active_sink = splitmux->sink; - } else { - if (!gst_bin_add (GST_BIN (splitmux), splitmux->provided_sink)) { - g_warning ("Could not add sink elements - splitmuxsink will not work"); - goto fail; - } + if (splitmux->active_sink == NULL) { - splitmux->active_sink = splitmux->provided_sink; + GST_OBJECT_LOCK (splitmux); + if (splitmux->provided_sink != NULL) + provided_sink = gst_object_ref (splitmux->provided_sink); + GST_OBJECT_UNLOCK (splitmux); - /* Find the sink element */ - splitmux->sink = find_sink (splitmux->active_sink); - if (splitmux->sink == NULL) { - g_warning - ("Could not locate sink element in provided sink - splitmuxsink will not work"); - goto fail; + if (provided_sink == NULL) { + if ((splitmux->sink = + create_element (splitmux, DEFAULT_SINK, "sink")) == NULL) + goto fail; + splitmux->active_sink = splitmux->sink; + } else { + if (!gst_bin_add (GST_BIN (splitmux), provided_sink)) { + g_warning ("Could not add sink elements - splitmuxsink will not work"); + gst_object_unref (provided_sink); + goto fail; + } + + splitmux->active_sink = provided_sink; + + /* The bin holds a ref now, we can drop our tmp ref */ + gst_object_unref (provided_sink); + + /* Find the sink element */ + splitmux->sink = find_sink (splitmux->active_sink); + if (splitmux->sink == NULL) { + g_warning + ("Could not locate sink element in provided sink - splitmuxsink will not work"); + goto fail; + } } - } - if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) { - g_warning ("Failed to link muxer and sink- splitmuxsink will not work"); - goto fail; + if (!gst_element_link (splitmux->muxer, splitmux->active_sink)) { + g_warning ("Failed to link muxer and sink- splitmuxsink will not work"); + goto fail; + } } return TRUE; @@ -1385,10 +1615,17 @@ fail: static void set_next_filename (GstSplitMuxSink * splitmux) { - if (splitmux->location) { - gchar *fname; + gchar *fname = NULL; + gst_splitmux_sink_ensure_max_files (splitmux); - fname = g_strdup_printf (splitmux->location, splitmux->fragment_id); + g_signal_emit (splitmux, signals[SIGNAL_FORMAT_LOCATION], 0, + splitmux->fragment_id, &fname); + + if (!fname) + fname = splitmux->location ? + g_strdup_printf (splitmux->location, splitmux->fragment_id) : NULL; + + if (fname) { GST_INFO_OBJECT (splitmux, "Setting file to %s", fname); g_object_set (splitmux->sink, "location", fname, NULL); g_free (fname); @@ -1420,9 +1657,11 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) GST_SPLITMUX_LOCK (splitmux); /* Start by collecting one input on each pad */ splitmux->state = SPLITMUX_STATE_COLLECTING_GOP_START; - splitmux->max_in_running_time = 0; - splitmux->muxed_out_time = splitmux->mux_start_time = 0; + splitmux->max_in_running_time = GST_CLOCK_STIME_NONE; + splitmux->muxed_out_time = splitmux->mux_start_time = + GST_CLOCK_STIME_NONE; splitmux->muxed_out_bytes = splitmux->mux_start_bytes = 0; + splitmux->opening_first_fragment = TRUE; GST_SPLITMUX_UNLOCK (splitmux); break; } @@ -1448,7 +1687,9 @@ gst_splitmux_sink_change_state (GstElement * element, GstStateChange transition) case GST_STATE_CHANGE_READY_TO_NULL: GST_SPLITMUX_LOCK (splitmux); splitmux->fragment_id = 0; - gst_splitmux_reset (splitmux); + /* Reset internal elements only if no pad contexts are using them */ + if (splitmux->contexts == NULL) + gst_splitmux_reset (splitmux); GST_SPLITMUX_UNLOCK (splitmux); break; default: @@ -1474,3 +1715,11 @@ register_splitmuxsink (GstPlugin * plugin) return gst_element_register (plugin, "splitmuxsink", GST_RANK_NONE, GST_TYPE_SPLITMUX_SINK); } + +static void +gst_splitmux_sink_ensure_max_files (GstSplitMuxSink * splitmux) +{ + if (splitmux->max_files && splitmux->fragment_id >= splitmux->max_files) { + splitmux->fragment_id = 0; + } +}