GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
#define GST_CAT_DEFAULT gst_base_sink_debug
-#define GST_BASE_SINK_GET_PRIVATE(obj) \
- (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
-
#define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
typedef struct
gboolean async_enabled;
GstClockTimeDiff ts_offset;
GstClockTime render_delay;
+ GstClockTime processing_deadline;
/* start, stop of current buffer, stream time, used to report position */
GstClockTime current_sstart;
/* latency stuff */
GstClockTime latency;
- /* if we already commited the state */
- gboolean commited;
+ /* if we already committed the state */
+ gboolean committed;
/* state change to playing ongoing */
gboolean to_playing;
#define DEFAULT_THROTTLE_TIME 0
#define DEFAULT_MAX_BITRATE 0
#define DEFAULT_DROP_OUT_OF_SEGMENT TRUE
+#define DEFAULT_PROCESSING_DEADLINE (20 * GST_MSECOND)
enum
{
PROP_RENDER_DELAY,
PROP_THROTTLE_TIME,
PROP_MAX_BITRATE,
+ PROP_PROCESSING_DEADLINE,
PROP_LAST
};
static GstElementClass *parent_class = NULL;
+static gint private_offset = 0;
static void gst_base_sink_class_init (GstBaseSinkClass * klass);
static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
_type = g_type_register_static (GST_TYPE_ELEMENT,
"GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
+
+ private_offset =
+ g_type_add_instance_private (_type, sizeof (GstBaseSinkPrivate));
+
g_once_init_leave (&base_sink_type, _type);
}
return base_sink_type;
}
+static inline GstBaseSinkPrivate *
+gst_base_sink_get_instance_private (GstBaseSink * self)
+{
+ return (G_STRUCT_MEMBER_P (self, private_offset));
+}
+
static void gst_base_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_base_sink_get_property (GObject * object, guint prop_id,
gobject_class = G_OBJECT_CLASS (klass);
gstelement_class = GST_ELEMENT_CLASS (klass);
+ if (private_offset != 0)
+ g_type_class_adjust_private_offset (klass, &private_offset);
+
GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
"basesink element");
- g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
-
parent_class = g_type_class_peek_parent (klass);
gobject_class->finalize = gst_base_sink_finalize;
"The maximum bits per second to render (0 = disabled)", 0,
G_MAXUINT64, DEFAULT_MAX_BITRATE,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstBaseSink:processing-deadline:
+ *
+ * Maximum amount of time (in nanoseconds) that the pipeline can take
+ * for processing the buffer. This is added to the latency of live
+ * pipelines.
+ *
+ * Since: 1.16
+ */
+ g_object_class_install_property (gobject_class, PROP_PROCESSING_DEADLINE,
+ g_param_spec_uint64 ("processing-deadline", "Processing deadline",
+ "Maximum processing deadline in nanoseconds", 0, G_MAXUINT64,
+ DEFAULT_PROCESSING_DEADLINE,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
GstPadTemplate *pad_template;
GstBaseSinkPrivate *priv;
- basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
+ basesink->priv = priv = gst_base_sink_get_instance_private (basesink);
pad_template =
gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
priv->async_enabled = DEFAULT_ASYNC;
priv->ts_offset = DEFAULT_TS_OFFSET;
priv->render_delay = DEFAULT_RENDER_DELAY;
+ priv->processing_deadline = DEFAULT_PROCESSING_DEADLINE;
priv->blocksize = DEFAULT_BLOCKSIZE;
priv->cached_clock_id = NULL;
g_atomic_int_set (&priv->enable_last_sample, DEFAULT_ENABLE_LAST_SAMPLE);
gst_base_sink_set_drop_out_of_segment (GstBaseSink * sink,
gboolean drop_out_of_segment)
{
- GstBaseSinkPrivate *priv;
-
g_return_if_fail (GST_IS_BASE_SINK (sink));
- priv = GST_BASE_SINK_GET_PRIVATE (sink);
-
GST_OBJECT_LOCK (sink);
- priv->drop_out_of_segment = drop_out_of_segment;
+ sink->priv->drop_out_of_segment = drop_out_of_segment;
GST_OBJECT_UNLOCK (sink);
}
gboolean
gst_base_sink_get_drop_out_of_segment (GstBaseSink * sink)
{
- GstBaseSinkPrivate *priv;
gboolean res;
g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
- priv = GST_BASE_SINK_GET_PRIVATE (sink);
-
GST_OBJECT_LOCK (sink);
- res = priv->drop_out_of_segment;
+ res = sink->priv->drop_out_of_segment;
GST_OBJECT_UNLOCK (sink);
return res;
GstClockTime * max_latency)
{
gboolean l, us_live, res, have_latency;
- GstClockTime min, max, render_delay;
+ GstClockTime min, max, render_delay, processing_deadline;
GstQuery *query;
GstClockTime us_min, us_max;
l = sink->sync;
have_latency = sink->priv->have_latency;
render_delay = sink->priv->render_delay;
+ processing_deadline = sink->priv->processing_deadline;
GST_OBJECT_UNLOCK (sink);
/* assume no latency */
* values to create the complete latency. */
min = us_min;
max = us_max;
+
+ if (l) {
+ if (max == -1 || min + processing_deadline <= max)
+ min += processing_deadline;
+ else {
+ GST_ELEMENT_WARNING (sink, CORE, CLOCK,
+ (_("Pipeline construction is invalid, please add queues.")),
+ ("Not enough buffering available for "
+ " the processing deadline of %" GST_TIME_FORMAT
+ ", add enough queues to buffer %" GST_TIME_FORMAT
+ " additional data. Shortening processing latency to %"
+ GST_TIME_FORMAT ".",
+ GST_TIME_ARGS (processing_deadline),
+ GST_TIME_ARGS (min + processing_deadline - max),
+ GST_TIME_ARGS (max - min)));
+ min = max;
+ }
+ }
}
if (l) {
/* we need to add the render delay if we are live */
return res;
}
+/**
+ * gst_base_sink_set_processing_deadline:
+ * @sink: a #GstBaseSink
+ * @processing_deadline: the new processing deadline in nanoseconds.
+ *
+ * Maximum amount of time (in nanoseconds) that the pipeline can take
+ * for processing the buffer. This is added to the latency of live
+ * pipelines.
+ *
+ * This function is usually called by subclasses.
+ *
+ * Since: 1.16
+ */
+void
+gst_base_sink_set_processing_deadline (GstBaseSink * sink,
+ GstClockTime processing_deadline)
+{
+ GstClockTime old_processing_deadline;
+
+ g_return_if_fail (GST_IS_BASE_SINK (sink));
+
+ GST_OBJECT_LOCK (sink);
+ old_processing_deadline = sink->priv->processing_deadline;
+ sink->priv->processing_deadline = processing_deadline;
+ GST_LOG_OBJECT (sink, "set render processing_deadline to %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (processing_deadline));
+ GST_OBJECT_UNLOCK (sink);
+
+ if (processing_deadline != old_processing_deadline) {
+ GST_DEBUG_OBJECT (sink, "posting latency changed");
+ gst_element_post_message (GST_ELEMENT_CAST (sink),
+ gst_message_new_latency (GST_OBJECT_CAST (sink)));
+ }
+}
+
+/**
+ * gst_base_sink_get_processing_deadline:
+ * @sink: a #GstBaseSink
+ *
+ * Get the processing deadline of @sink. see
+ * gst_base_sink_set_processing_deadline() for more information about
+ * the processing deadline.
+ *
+ * Returns: the processing deadline
+ *
+ * Since: 1.16
+ */
+GstClockTime
+gst_base_sink_get_processing_deadline (GstBaseSink * sink)
+{
+ GstClockTimeDiff res;
+
+ g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
+
+ GST_OBJECT_LOCK (sink);
+ res = sink->priv->processing_deadline;
+ GST_OBJECT_UNLOCK (sink);
+
+ return res;
+}
+
static void
gst_base_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
case PROP_MAX_BITRATE:
gst_base_sink_set_max_bitrate (sink, g_value_get_uint64 (value));
break;
+ case PROP_PROCESSING_DEADLINE:
+ gst_base_sink_set_processing_deadline (sink, g_value_get_uint64 (value));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_MAX_BITRATE:
g_value_set_uint64 (value, gst_base_sink_get_max_bitrate (sink));
break;
+ case PROP_PROCESSING_DEADLINE:
+ g_value_set_uint64 (value, gst_base_sink_get_processing_deadline (sink));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
switch (pending) {
case GST_STATE_PLAYING:
{
- GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
+ GST_DEBUG_OBJECT (basesink, "committing state to PLAYING");
basesink->need_preroll = FALSE;
post_async_done = TRUE;
- basesink->priv->commited = TRUE;
+ basesink->priv->committed = TRUE;
post_playing = TRUE;
/* post PAUSED too when we were READY */
if (current == GST_STATE_READY) {
break;
}
case GST_STATE_PAUSED:
- GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
+ GST_DEBUG_OBJECT (basesink, "committing state to PAUSED");
post_paused = TRUE;
post_async_done = TRUE;
- basesink->priv->commited = TRUE;
+ basesink->priv->committed = TRUE;
post_pending = GST_STATE_VOID_PENDING;
break;
case GST_STATE_READY:
time += base_time;
/* Re-use existing clockid if available */
- /* FIXME: Casting to GstClockEntry only works because the types
- * are the same */
if (G_LIKELY (sink->priv->cached_clock_id != NULL
- && GST_CLOCK_ENTRY_CLOCK ((GstClockEntry *) sink->
- priv->cached_clock_id) == clock)) {
+ && gst_clock_id_uses_clock (sink->priv->cached_clock_id, clock))) {
if (!gst_clock_single_shot_id_reinit (clock, sink->priv->cached_clock_id,
time)) {
gst_clock_id_unref (sink->priv->cached_clock_id);
}
stopping:
{
- GST_DEBUG_OBJECT (sink, "stopping while commiting state");
+ GST_DEBUG_OBJECT (sink, "stopping while committing state");
return GST_FLOW_FLUSHING;
}
preroll_failed:
goto flushing;
/* retry if we got unscheduled, which means we did not reach the timeout
- * yet. if some other error occures, we continue. */
+ * yet. if some other error occurs, we continue. */
} while (status == GST_CLOCK_UNSCHEDULED);
GST_DEBUG_OBJECT (sink, "end of stream");
return res;
}
-static gboolean
-count_list_bytes (GstBuffer ** buffer, guint idx, GstBaseSinkPrivate * priv)
-{
- priv->rc_accumulated += gst_buffer_get_size (*buffer);
- return TRUE;
-}
-
/* with STREAM_LOCK, PREROLL_LOCK
*
* Takes a buffer and compare the timestamps with the last segment.
", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
/* a dropped buffer does not participate in anything. Buffer can only be
- * dropped if their PTS falls completly outside the segment, while we sync
+ * dropped if their PTS falls completely outside the segment, while we sync
* preferably on DTS */
if (GST_CLOCK_TIME_IS_VALID (start) && (segment->format == GST_FORMAT_TIME)) {
GstClockTime pts = GST_BUFFER_PTS (sync_buf);
goto dropped;
if (priv->max_bitrate) {
- if (is_list) {
- gst_buffer_list_foreach (GST_BUFFER_LIST_CAST (obj),
- (GstBufferListFunc) count_list_bytes, priv);
- } else {
- priv->rc_accumulated += gst_buffer_get_size (GST_BUFFER_CAST (obj));
- }
+ gsize size;
+
+ if (is_list)
+ size = gst_buffer_list_calculate_size (GST_BUFFER_LIST_CAST (obj));
+ else
+ size = gst_buffer_get_size (GST_BUFFER_CAST (obj));
+
+ priv->rc_accumulated += size;
priv->rc_next = priv->rc_time + gst_util_uint64_scale (priv->rc_accumulated,
8 * GST_SECOND, priv->max_bitrate);
}
if (!gst_pad_peer_query (pad, query)) {
gst_query_unref (query);
- GST_DEBUG_OBJECT (basesink, "peer query faild, no pull mode");
+ GST_DEBUG_OBJECT (basesink, "peer query failed, no pull mode");
goto fallback;
}
GstSegment *segment;
GstClockTime now, latency;
GstClockTimeDiff base_time;
- gint64 time, base, duration;
+ gint64 time, base, offset, duration;
gdouble rate;
gint64 last;
gboolean last_seen, with_clock, in_paused;
else
time = 0;
+ if (GST_CLOCK_TIME_IS_VALID (segment->offset))
+ offset = segment->offset;
+ else
+ offset = 0;
+
if (GST_CLOCK_TIME_IS_VALID (segment->stop))
duration = segment->stop - segment->start;
else
if (rate < 0.0)
time += duration;
- *cur = time + gst_guint64_to_gdouble (now - base_time) * rate;
+ *cur = time + offset + gst_guint64_to_gdouble (now - base_time) * rate;
/* never report more than last seen position */
if (last != -1) {
priv->received_eos = FALSE;
gst_base_sink_reset_qos (basesink);
priv->rc_next = -1;
- priv->commited = FALSE;
+ priv->committed = FALSE;
priv->call_preroll = TRUE;
priv->current_step.valid = FALSE;
priv->pending_step.valid = FALSE;
basesink->need_preroll = TRUE;
basesink->playing_async = TRUE;
priv->call_preroll = TRUE;
- priv->commited = FALSE;
+ priv->committed = FALSE;
if (priv->async_enabled) {
GST_DEBUG_OBJECT (basesink, "doing async state change");
ret = GST_STATE_CHANGE_ASYNC;
"PLAYING to PAUSED, we are not prerolled");
basesink->playing_async = TRUE;
basesink->need_preroll = TRUE;
- priv->commited = FALSE;
+ priv->committed = FALSE;
priv->call_preroll = TRUE;
if (priv->async_enabled) {
GST_DEBUG_OBJECT (basesink, "doing async state change");
gst_base_sink_set_last_buffer_list (basesink, NULL);
priv->call_preroll = FALSE;
- if (!priv->commited) {
+ if (!priv->committed) {
if (priv->async_enabled) {
GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
gst_message_new_async_done (GST_OBJECT_CAST (basesink),
GST_CLOCK_TIME_NONE));
}
- priv->commited = TRUE;
+ priv->committed = TRUE;
} else {
GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
}