+2008-10-10 Wim Taymans <wim.taymans@collabora.co.uk>
+
+ * docs/design/part-negotiation.txt:
+ Small doc update.
+
+ * docs/libs/gstreamer-libs-sections.txt:
+ * libs/gst/base/gstbasesink.c: (gst_base_sink_class_init),
+ (gst_base_sink_pad_getcaps), (gst_base_sink_pad_setcaps),
+ (gst_base_sink_init), (gst_base_sink_set_blocksize),
+ (gst_base_sink_get_blocksize), (gst_base_sink_set_property),
+ (gst_base_sink_get_property), (gst_base_sink_needs_preroll),
+ (gst_base_sink_loop), (gst_base_sink_pad_activate),
+ (gst_base_sink_negotiate_pull), (gst_base_sink_pad_activate_pull),
+ (gst_base_sink_change_state):
+ * libs/gst/base/gstbasesink.h:
+ Add blocksize property and methods to control the amount of data
+ to pull.
+ Negotiate first before activating upstream in pull mode so that they can
+ negotiate themselves.
+ When we operate in pull mode, we only accept the caps that we
+ negotiated.
+ Make the sink go ASYNC to PAUSED, like all other sinks.
+ API: GstBaseSink::gst_base_sink_set_blocksize()
+ API: GstBaseSink::gst_base_sink_get_blocksize()
+ API: GstBaseSink::blocksize
+
+ * libs/gst/base/gstbasesrc.c: (gst_base_src_wait_playing),
+ (gst_base_src_set_live), (gst_base_src_is_live),
+ (gst_base_src_set_format), (gst_base_src_query_latency),
+ (gst_base_src_set_blocksize), (gst_base_src_get_blocksize),
+ (gst_base_src_set_do_timestamp), (gst_base_src_get_do_timestamp),
+ (gst_base_src_set_property), (gst_base_src_get_property):
+ * libs/gst/base/gstbasesrc.h:
+ Add typechecking in public API functions.
+ Add methods to control the blocksize in subclasses.
+ API: GstBaseSrc::gst_base_src_set_blocksize()
+ API: GstBaseSrc::gst_base_src_get_blocksize()
+
2008-10-10 Edward Hervey <edward.hervey@collabora.co.uk>
* tests/check/gst/gstutils.c: (probe_do_nothing), (data_probe),
/* the last buffer we prerolled or rendered. Useful for making snapshots */
GstBuffer *last_buffer;
+
+ /* caps for pull based scheduling */
+ GstCaps *pull_caps;
+
+ guint blocksize;
};
#define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
/* BaseSink properties */
-#define DEFAULT_SIZE 1024
#define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
#define DEFAULT_CAN_ACTIVATE_PUSH TRUE
#define DEFAULT_QOS FALSE
#define DEFAULT_ASYNC TRUE
#define DEFAULT_TS_OFFSET 0
+#define DEFAULT_BLOCKSIZE 4096
enum
{
PROP_ASYNC,
PROP_TS_OFFSET,
PROP_LAST_BUFFER,
+ PROP_BLOCKSIZE,
PROP_LAST
};
static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
static gboolean gst_base_sink_peer_query (GstBaseSink * sink, GstQuery * query);
+static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
+
/* check if an object was too late */
static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
GstMiniObject * obj, GstClockTime start, GstClockTime stop,
"The last buffer received in the sink", GST_TYPE_BUFFER,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstBaseSink:blocksize
+ *
+ * The amount of bytes to pull when operating in pull mode.
+ *
+ * Since: 0.10.22
+ */
+ g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
+ g_param_spec_uint ("blocksize", "Block size",
+ "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
+ DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
bclass = GST_BASE_SINK_GET_CLASS (bsink);
- if (bclass->get_caps)
- caps = bclass->get_caps (bsink);
+ if (bsink->pad_mode == GST_ACTIVATE_PULL) {
+ /* if we are operating in pull mode we only accept the negotiated caps */
+ GST_OBJECT_LOCK (pad);
+ if ((caps = GST_PAD_CAPS (pad)))
+ gst_caps_ref (caps);
+ GST_OBJECT_UNLOCK (pad);
+ }
if (caps == NULL) {
- GstPadTemplate *pad_template;
+ if (bclass->get_caps)
+ caps = bclass->get_caps (bsink);
- pad_template =
- gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "sink");
- if (pad_template != NULL) {
- caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
+ if (caps == NULL) {
+ GstPadTemplate *pad_template;
+
+ pad_template =
+ gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
+ "sink");
+ if (pad_template != NULL) {
+ caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
+ }
}
}
gst_object_unref (bsink);
bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
bclass = GST_BASE_SINK_GET_CLASS (bsink);
- if (bsink->pad_mode == GST_ACTIVATE_PULL) {
- GstPad *peer = gst_pad_get_peer (pad);
-
- if (peer)
- res = gst_pad_set_caps (peer, caps);
- else
- res = FALSE;
-
- if (!res)
- GST_DEBUG_OBJECT (bsink, "peer setcaps() failed");
- }
-
if (res && bclass->set_caps)
res = bclass->set_caps (bsink, caps);
priv->async_enabled = DEFAULT_ASYNC;
priv->ts_offset = DEFAULT_TS_OFFSET;
priv->render_delay = 0;
+ priv->blocksize = DEFAULT_BLOCKSIZE;
GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
}
return res;
}
+/**
+ * gst_base_sink_set_blocksize:
+ * @sink: a #GstBaseSink
+ * @blocksize: the blocksize in bytes
+ *
+ * Set the number of bytes that the sink will pull when it is operating in pull
+ * mode.
+ *
+ * Since: 0.10.22
+ */
+void
+gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
+{
+ g_return_if_fail (GST_IS_BASE_SINK (sink));
+
+ GST_OBJECT_LOCK (sink);
+ sink->priv->blocksize = blocksize;
+ GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
+ GST_OBJECT_UNLOCK (sink);
+}
+
+/**
+ * gst_base_sink_get_blocksize:
+ * @sink: a #GstBaseSink
+ *
+ * Get the number of bytes that the sink will pull when it is operating in pull
+ * mode.
+ *
+ * Returns: the number of bytes @sink will pull in pull mode.
+ *
+ * Since: 0.10.22
+ */
+guint
+gst_base_sink_get_blocksize (GstBaseSink * sink)
+{
+ guint res;
+
+ g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
+
+ GST_OBJECT_LOCK (sink);
+ res = sink->priv->blocksize;
+ GST_OBJECT_UNLOCK (sink);
+
+ return res;
+}
+
static void
gst_base_sink_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
case PROP_TS_OFFSET:
gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
break;
+ case PROP_BLOCKSIZE:
+ gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_LAST_BUFFER:
gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
break;
+ case PROP_BLOCKSIZE:
+ g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
* 2) we are syncing on the clock
*/
is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
- res = !is_prerolled && basesink->pad_mode != GST_ACTIVATE_PULL;
+ res = !is_prerolled;
+
GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
basesink->have_preroll, basesink->priv->received_eos, res);
GstBaseSink *basesink;
GstBuffer *buf = NULL;
GstFlowReturn result;
+ guint blocksize;
basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
+ if ((blocksize = basesink->priv->blocksize) == 0)
+ blocksize = -1;
+
GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
- basesink->offset, (guint) DEFAULT_SIZE);
+ basesink->offset, blocksize);
- result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf);
+ result = gst_pad_pull_range (pad, basesink->offset, blocksize, &buf);
if (G_UNLIKELY (result != GST_FLOW_OK))
goto paused;
gst_base_sink_set_flushing (basesink, pad, FALSE);
- if (basesink->can_activate_pull && gst_pad_check_pull_range (pad)
- && gst_pad_activate_pull (pad, TRUE)) {
- GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
- result = TRUE;
- } else {
- GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
- if (gst_pad_activate_push (pad, TRUE)) {
- GST_DEBUG_OBJECT (basesink, "Success activating push mode");
- result = TRUE;
- }
+ /* we need to have the pull mode enabled */
+ if (!basesink->can_activate_pull)
+ goto fallback;
+
+ /* check if downstreams supports pull mode at all */
+ if (!gst_pad_check_pull_range (pad))
+ goto fallback;
+
+ /* set the pad mode before starting the task so that it's in the
+ * correct state for the new thread. also the sink set_caps and get_caps
+ * function checks this */
+ basesink->pad_mode = GST_ACTIVATE_PULL;
+
+ /* we first try to negotiate a format so that when we try to activate
+ * downstream, it knows about our format */
+ if (!gst_base_sink_negotiate_pull (basesink))
+ goto fallback;
+
+ /* ok activate now */
+ if (!gst_pad_activate_pull (pad, TRUE)) {
+ /* clear any pending caps */
+ GST_OBJECT_LOCK (basesink);
+ gst_caps_replace (&basesink->priv->pull_caps, NULL);
+ GST_OBJECT_UNLOCK (basesink);
+ goto fallback;
+ }
+
+ GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
+ result = TRUE;
+ goto done;
+
+ /* push mode fallback */
+fallback:
+ GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
+ if ((result = gst_pad_activate_push (pad, TRUE))) {
+ GST_DEBUG_OBJECT (basesink, "Success activating push mode");
}
+done:
if (!result) {
GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
gst_base_sink_set_flushing (basesink, pad, TRUE);
} else if (gst_caps_is_fixed (caps)) {
if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
goto could_not_set_caps;
+
+ GST_OBJECT_LOCK (basesink);
+ gst_caps_replace (&basesink->priv->pull_caps, caps);
+ GST_OBJECT_UNLOCK (basesink);
+
result = TRUE;
}
bclass = GST_BASE_SINK_GET_CLASS (basesink);
if (active) {
- if (!basesink->can_activate_pull) {
- result = FALSE;
- basesink->pad_mode = GST_ACTIVATE_NONE;
- } else {
- GstPad *peer = gst_pad_get_peer (pad);
+ /* we mark we have a newsegment here because pull based
+ * mode works just fine without having a newsegment before the
+ * first buffer */
+ gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
+ gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
+ GST_OBJECT_LOCK (basesink);
+ basesink->have_newsegment = TRUE;
+ GST_OBJECT_UNLOCK (basesink);
- if (G_UNLIKELY (peer == NULL)) {
- g_warning ("Trying to activate pad in pull mode, but no peer");
- result = FALSE;
- basesink->pad_mode = GST_ACTIVATE_NONE;
- } else {
- if (gst_pad_activate_pull (peer, TRUE)) {
- /* we mark we have a newsegment here because pull based
- * mode works just fine without having a newsegment before the
- * first buffer */
- gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
- gst_segment_init (basesink->abidata.ABI.clip_segment,
- GST_FORMAT_UNDEFINED);
- GST_OBJECT_LOCK (basesink);
- basesink->have_newsegment = TRUE;
- GST_OBJECT_UNLOCK (basesink);
+ if (bclass->activate_pull)
+ result = bclass->activate_pull (basesink, TRUE);
+ else
+ result = FALSE;
- /* set the pad mode before starting the task so that it's in the
- correct state for the new thread. also the sink set_caps function
- checks this */
- basesink->pad_mode = GST_ACTIVATE_PULL;
- if ((result = gst_base_sink_negotiate_pull (basesink))) {
- if (bclass->activate_pull)
- result = bclass->activate_pull (basesink, TRUE);
- else
- result = FALSE;
- }
- /* but if starting the thread fails, set it back */
- if (!result)
- basesink->pad_mode = GST_ACTIVATE_NONE;
- } else {
- GST_DEBUG_OBJECT (pad, "Failed to activate peer in pull mode");
- result = FALSE;
- basesink->pad_mode = GST_ACTIVATE_NONE;
- }
- gst_object_unref (peer);
- }
- }
+ /* but if starting the thread fails, set it back */
+ if (!result)
+ basesink->pad_mode = GST_ACTIVATE_NONE;
} else {
if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
g_warning ("Internal GStreamer activation error!!!");
if (bclass->activate_pull)
result &= bclass->activate_pull (basesink, FALSE);
basesink->pad_mode = GST_ACTIVATE_NONE;
+ /* clear any pending caps */
+ GST_OBJECT_LOCK (basesink);
+ gst_caps_replace (&basesink->priv->pull_caps, NULL);
+ GST_OBJECT_UNLOCK (basesink);
}
}
-
gst_object_unref (basesink);
return result;
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
+#if 0
/* note that this is the upward case, which doesn't follow most
patterns */
if (basesink->pad_mode == GST_ACTIVATE_PULL) {
GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
ret = GST_STATE_CHANGE_SUCCESS;
}
+#endif
break;
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
GstFlowReturn
gst_base_src_wait_playing (GstBaseSrc * src)
{
+ g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
+
/* block until the state changes, or we get a flush, or something */
GST_DEBUG_OBJECT (src, "live source waiting for running state");
GST_LIVE_WAIT (src);
void
gst_base_src_set_live (GstBaseSrc * src, gboolean live)
{
+ g_return_if_fail (GST_IS_BASE_SRC (src));
+
GST_OBJECT_LOCK (src);
src->is_live = live;
GST_OBJECT_UNLOCK (src);
{
gboolean result;
+ g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
+
GST_OBJECT_LOCK (src);
result = src->is_live;
GST_OBJECT_UNLOCK (src);
void
gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
{
+ g_return_if_fail (GST_IS_BASE_SRC (src));
+
gst_segment_init (&src->segment, format);
}
{
GstClockTime min;
+ g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
+
GST_OBJECT_LOCK (src);
if (live)
*live = src->is_live;
}
/**
+ * gst_base_src_set_blocksize:
+ * @src: the source
+ * @blocksize: the new blocksize in bytes
+ *
+ * Set the number of bytes that @src will push out with each buffer.
+ *
+ * Since: 0.10.22
+ */
+void
+gst_base_src_set_blocksize (GstBaseSrc * src, gulong blocksize)
+{
+ g_return_if_fail (GST_IS_BASE_SRC (src));
+
+ GST_OBJECT_LOCK (src);
+ src->blocksize = blocksize;
+ GST_OBJECT_UNLOCK (src);
+}
+
+/**
+ * gst_base_src_get_blocksize:
+ * @src: the source
+ *
+ * Get the number of bytes that @src will push out with each buffer.
+ *
+ * Returns: the number of bytes pushed with each buffer.
+ *
+ * Since: 0.10.22
+ */
+gulong
+gst_base_src_get_blocksize (GstBaseSrc * src)
+{
+ gulong res;
+
+ g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
+
+ GST_OBJECT_LOCK (src);
+ res = src->blocksize;
+ GST_OBJECT_UNLOCK (src);
+
+ return res;
+}
+
+
+/**
* gst_base_src_set_do_timestamp:
* @src: the source
* @timestamp: enable or disable timestamping
void
gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
{
+ g_return_if_fail (GST_IS_BASE_SRC (src));
+
GST_OBJECT_LOCK (src);
src->priv->do_timestamp = timestamp;
GST_OBJECT_UNLOCK (src);
{
gboolean res;
+ g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
+
GST_OBJECT_LOCK (src);
res = src->priv->do_timestamp;
GST_OBJECT_UNLOCK (src);
switch (prop_id) {
case PROP_BLOCKSIZE:
- src->blocksize = g_value_get_ulong (value);
+ gst_base_src_set_blocksize (src, g_value_get_ulong (value));
break;
case PROP_NUM_BUFFERS:
src->num_buffers = g_value_get_int (value);
src->data.ABI.typefind = g_value_get_boolean (value);
break;
case PROP_DO_TIMESTAMP:
- src->priv->do_timestamp = g_value_get_boolean (value);
+ gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
switch (prop_id) {
case PROP_BLOCKSIZE:
- g_value_set_ulong (value, src->blocksize);
+ g_value_set_ulong (value, gst_base_src_get_blocksize (src));
break;
case PROP_NUM_BUFFERS:
g_value_set_int (value, src->num_buffers);
g_value_set_boolean (value, src->data.ABI.typefind);
break;
case PROP_DO_TIMESTAMP:
- g_value_set_boolean (value, src->priv->do_timestamp);
+ g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);