2005-05-25 Wim Taymans <wim@fluendo.com>
+ * gst/base/gstadapter.c: (gst_adapter_peek), (gst_adapter_flush):
+ * gst/base/gstbasesink.c: (gst_basesink_preroll_queue_push),
+ (gst_basesink_finish_preroll), (gst_basesink_chain),
+ (gst_basesink_loop), (gst_basesink_activate),
+ (gst_basesink_change_state):
+ * gst/base/gstbasesrc.c: (gst_basesrc_do_seek),
+ (gst_basesrc_get_range), (gst_basesrc_loop),
+ (gst_basesrc_activate):
+ * gst/elements/gsttee.c: (gst_tee_sink_activate):
+ * gst/gstpad.c: (gst_pad_dispose), (gst_real_pad_class_init),
+ (gst_real_pad_init), (gst_real_pad_set_property),
+ (gst_real_pad_get_property), (gst_pad_set_active),
+ (gst_pad_is_active), (gst_pad_get_query_types), (gst_pad_unlink),
+ (gst_pad_link_prepare), (gst_pad_link), (gst_pad_get_real_parent),
+ (gst_real_pad_get_caps_unlocked), (gst_pad_peer_get_caps),
+ (gst_pad_accept_caps), (gst_pad_get_peer), (gst_pad_realize),
+ (gst_pad_event_default_dispatch), (gst_pad_event_default),
+ (gst_pad_dispatcher), (gst_pad_query), (gst_real_pad_dispose),
+ (gst_pad_save_thyself), (handle_pad_block), (gst_pad_chain),
+ (gst_pad_push), (gst_pad_get_range), (gst_pad_pull_range),
+ (gst_pad_send_event), (gst_pad_start_task), (gst_pad_pause_task),
+ (gst_pad_stop_task):
+ * gst/gstpad.h:
+ * gst/gstqueue.c: (gst_queue_handle_sink_event), (gst_queue_chain),
+ (gst_queue_loop), (gst_queue_src_activate):
+ * gst/gsttask.c: (gst_task_init), (gst_task_set_lock),
+ (gst_task_get_state):
+ * gst/gsttask.h:
+ * gst/schedulers/threadscheduler.c:
+ (gst_thread_scheduler_task_start), (gst_thread_scheduler_func):
+ Implement gst_pad_pause/start/stop_task(), take STREAM lock
+ in task function.
+ Remove ACTIVE pad flag, use FLUSHING everywhere
+ Added _pad_chain(), _pad_get_range() to call chain/getrange
+ functions.
+ Add locks around IS_FLUSHING when reading.
+ Take STREAM lock in chain(), get_range() functions so plugins
+ don't need to take it anymore.
+
+
+
+2005-05-25 Wim Taymans <wim@fluendo.com>
+
* tools/gst-launch.c: (event_loop):
Unref message after using its contents instead of
before.
if (adapter->assembled_size < size) {
adapter->assembled_size = (size / DEFAULT_SIZE + 1) * DEFAULT_SIZE;
- GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u\n",
+ GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u",
adapter->assembled_size);
adapter->assembled_data =
g_realloc (adapter->assembled_data, adapter->assembled_size);
g_return_if_fail (flush > 0);
g_return_if_fail (flush <= adapter->size);
- GST_LOG_OBJECT (adapter, "flushing %u bytes\n", flush);
+ GST_LOG_OBJECT (adapter, "flushing %u bytes", flush);
adapter->size -= flush;
adapter->assembled_len = 0;
while (flush > 0) {
if (basesink->preroll_queue->length == 0) {
GstBaseSinkClass *bclass = GST_BASESINK_GET_CLASS (basesink);
+ GST_DEBUG ("preroll buffer with TS: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
if (bclass->preroll)
bclass->preroll (basesink, buffer);
}
gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad,
GstBuffer * buffer)
{
- gboolean usable;
+ gboolean flushing;
DEBUG ("finish preroll %p <\n", basesink);
/* lock order is important */
gst_element_commit_state (GST_ELEMENT (basesink));
GST_STATE_UNLOCK (basesink);
+ gst_basesink_preroll_queue_push (basesink, pad, buffer);
+
GST_LOCK (pad);
- usable = !GST_RPAD_IS_FLUSHING (pad) && GST_RPAD_IS_ACTIVE (pad);
+ flushing = GST_RPAD_IS_FLUSHING (pad);
GST_UNLOCK (pad);
- if (!usable)
- goto unusable;
-
- gst_basesink_preroll_queue_push (basesink, pad, buffer);
+ if (flushing)
+ goto flushing;
if (basesink->need_preroll)
goto still_queueing;
GST_STATE_UNLOCK (basesink);
return PREROLL_PLAYING;
}
-unusable:
+flushing:
{
GST_DEBUG ("pad is flushing");
GST_PREROLL_UNLOCK (pad);
g_assert (GST_BASESINK (GST_OBJECT_PARENT (pad))->pad_mode ==
GST_ACTIVATE_PUSH);
- GST_STREAM_LOCK (pad);
-
result = gst_basesink_chain_unlocked (pad, buf);
- GST_STREAM_UNLOCK (pad);
-
return result;
}
g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
- GST_STREAM_LOCK (pad);
-
result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf);
if (result != GST_FLOW_OK)
goto paused;
goto paused;
/* default */
- GST_STREAM_UNLOCK (pad);
return;
paused:
- gst_task_pause (GST_RPAD_TASK (pad));
- GST_STREAM_UNLOCK (pad);
+ gst_pad_pause_task (pad);
return;
}
/* if we have a scheduler we can start the task */
g_return_val_if_fail (basesink->has_loop, FALSE);
gst_pad_peer_set_active (pad, mode);
- if (GST_ELEMENT_SCHEDULER (basesink)) {
- GST_STREAM_LOCK (pad);
- GST_RPAD_TASK (pad) =
- gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesink),
- (GstTaskFunction) gst_basesink_loop, pad);
-
- gst_task_start (GST_RPAD_TASK (pad));
- GST_STREAM_UNLOCK (pad);
- result = TRUE;
- }
+ result =
+ gst_pad_start_task (pad, (GstTaskFunction) gst_basesink_loop, pad);
break;
case GST_ACTIVATE_NONE:
/* step 1, unblock clock sync (if any) or any other blocking thing */
GST_PREROLL_UNLOCK (pad);
/* step 2, make sure streaming finishes */
- GST_STREAM_LOCK (pad);
- /* step 3, stop the task */
- if (GST_RPAD_TASK (pad)) {
- gst_task_stop (GST_RPAD_TASK (pad));
- gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
- GST_RPAD_TASK (pad) = NULL;
- }
- GST_STREAM_UNLOCK (pad);
-
- result = TRUE;
+ result = gst_pad_stop_task (pad);
break;
}
basesink->pad_mode = mode;
basesink->have_preroll = FALSE;
GST_PREROLL_UNLOCK (basesink->sinkpad);
- /* make sure the element is finished processing */
- GST_STREAM_LOCK (basesink->sinkpad);
- GST_STREAM_UNLOCK (basesink->sinkpad);
/* clear EOS state */
basesink->eos = FALSE;
break;
}
/* and restart the task */
- if (GST_RPAD_TASK (src->srcpad)) {
- gst_task_start (GST_RPAD_TASK (src->srcpad));
- }
+ gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_basesrc_loop,
+ src->srcpad);
GST_STREAM_UNLOCK (src->srcpad);
gst_event_unref (event);
}
static GstFlowReturn
-gst_basesrc_get_range_unlocked (GstPad * pad, guint64 offset, guint length,
+gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length,
GstBuffer ** buf)
{
GstFlowReturn ret;
}
}
-static GstFlowReturn
-gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length,
- GstBuffer ** ret)
-{
- GstFlowReturn fret;
-
- GST_STREAM_LOCK (pad);
-
- fret = gst_basesrc_get_range_unlocked (pad, offset, length, ret);
-
- GST_STREAM_UNLOCK (pad);
-
- return fret;
-}
-
static gboolean
gst_basesrc_check_get_range (GstPad * pad)
{
src = GST_BASESRC (GST_OBJECT_PARENT (pad));
- GST_STREAM_LOCK (pad);
-
- ret = gst_basesrc_get_range_unlocked (pad, src->offset, src->blocksize, &buf);
+ ret = gst_basesrc_get_range (pad, src->offset, src->blocksize, &buf);
if (ret != GST_FLOW_OK)
goto eos;
if (ret != GST_FLOW_OK)
goto pause;
- GST_STREAM_UNLOCK (pad);
return;
eos:
{
GST_DEBUG_OBJECT (src, "going to EOS");
- gst_task_pause (GST_RPAD_TASK (pad));
+ gst_pad_pause_task (pad);
gst_pad_push_event (pad, gst_event_new (GST_EVENT_EOS));
- GST_STREAM_UNLOCK (pad);
return;
}
pause:
{
GST_DEBUG_OBJECT (src, "pausing task");
- gst_task_pause (GST_RPAD_TASK (pad));
- GST_STREAM_UNLOCK (pad);
+ gst_pad_pause_task (pad);
return;
}
}
result = FALSE;
switch (mode) {
case GST_ACTIVATE_PUSH:
- /* if we have a scheduler we can start the task */
- if (GST_ELEMENT_SCHEDULER (basesrc)) {
- GST_STREAM_LOCK (pad);
- GST_RPAD_TASK (pad) =
- gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesrc),
- (GstTaskFunction) gst_basesrc_loop, pad);
-
- gst_task_start (GST_RPAD_TASK (pad));
- GST_STREAM_UNLOCK (pad);
- result = TRUE;
- }
+ result =
+ gst_pad_start_task (pad, (GstTaskFunction) gst_basesrc_loop, pad);
break;
case GST_ACTIVATE_PULL:
result = TRUE;
gst_basesrc_unlock (basesrc);
/* step 2, make sure streaming finishes */
- GST_STREAM_LOCK (pad);
- /* step 3, stop the task */
- if (GST_RPAD_TASK (pad)) {
- gst_task_stop (GST_RPAD_TASK (pad));
- gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
- GST_RPAD_TASK (pad) = NULL;
- }
- GST_STREAM_UNLOCK (pad);
-
- result = TRUE;
+ result = gst_pad_stop_task (pad);
break;
}
return result;
break;
case GST_ACTIVATE_PULL:
g_return_val_if_fail (tee->has_sink_loop, FALSE);
- if (GST_ELEMENT_SCHEDULER (tee)) {
- GST_STREAM_LOCK (pad);
- GST_RPAD_TASK (pad) =
- gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (tee),
- (GstTaskFunction) gst_tee_loop, pad);
-
- gst_pad_start_task (pad);
- GST_STREAM_UNLOCK (pad);
- result = TRUE;
- }
+ result = gst_pad_start_task (pad, (GstTaskFunction) gst_tee_loop, pad);
break;
case GST_ACTIVATE_NONE:
- GST_STREAM_LOCK (pad);
- if (GST_RPAD_TASK (pad)) {
- gst_pad_stop_task (pad);
- gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
- GST_RPAD_TASK (pad) = NULL;
- }
- GST_STREAM_UNLOCK (pad);
-
- result = TRUE;
+ result = gst_pad_stop_task (pad);
break;
}
tee->sink_mode = mode;
static void
gst_pad_dispose (GObject * object)
{
- GstPad *pad = GST_PAD (object);
+ GstPad *pad = GST_PAD_CAST (object);
gst_pad_set_pad_template (pad, NULL);
/* FIXME, we have links to many other things like caps
{
REAL_ARG_0,
REAL_ARG_CAPS,
- REAL_ARG_ACTIVE
- /* FILL ME */
+ /* FILL ME */
};
static void gst_real_pad_class_init (GstRealPadClass * klass);
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRealPadClass, request_link), NULL,
NULL, gst_marshal_VOID__OBJECT, G_TYPE_NONE, 0);
- g_object_class_install_property (G_OBJECT_CLASS (klass), REAL_ARG_ACTIVE,
- g_param_spec_boolean ("active", "Active", "Whether the pad is active.",
- TRUE, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), REAL_ARG_CAPS,
g_param_spec_boxed ("caps", "Caps", "The capabilities of the pad",
GST_TYPE_CAPS, G_PARAM_READABLE));
pad->queryfunc = gst_pad_query_default;
pad->intlinkfunc = gst_pad_get_internal_links_default;
- GST_FLAG_UNSET (pad, GST_PAD_ACTIVE);
+ GST_RPAD_UNSET_FLUSHING (pad);
pad->preroll_lock = g_mutex_new ();
pad->preroll_cond = g_cond_new ();
g_return_if_fail (GST_IS_PAD (object));
switch (prop_id) {
- case REAL_ARG_ACTIVE:
- g_warning ("FIXME: not useful any more!!!");
- gst_pad_set_active (GST_PAD (object), g_value_get_boolean (value));
- break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
g_return_if_fail (GST_IS_PAD (object));
switch (prop_id) {
- case REAL_ARG_ACTIVE:
- g_value_set_boolean (value, GST_FLAG_IS_SET (object, GST_PAD_ACTIVE));
- break;
case REAL_ARG_CAPS:
g_value_set_boxed (value, GST_PAD_CAPS (GST_REAL_PAD (object)));
break;
gst_pad_set_active (GstPad * pad, GstActivateMode mode)
{
GstRealPad *realpad;
- gboolean old;
+ GstActivateMode old;
GstPadActivateFunction activatefunc;
gboolean active;
GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
active = GST_PAD_MODE_ACTIVATE (mode);
- old = GST_PAD_IS_ACTIVE (realpad);
+ old = GST_RPAD_ACTIVATE_MODE (realpad);
/* if nothing changed, we can just exit */
- if (G_UNLIKELY (old == active))
+ if (G_UNLIKELY (old == mode))
goto was_ok;
/* make sure data is disallowed when going inactive */
if (!active) {
GST_CAT_DEBUG (GST_CAT_PADS, "de-activating pad %s:%s",
GST_DEBUG_PAD_NAME (realpad));
- GST_FLAG_UNSET (realpad, GST_PAD_ACTIVE);
+ GST_RPAD_SET_FLUSHING (realpad);
/* unlock blocked pads so element can resume and stop */
GST_PAD_BLOCK_SIGNAL (realpad);
}
GST_LOCK (realpad);
if (result == FALSE)
goto activate_error;
+
+ /* store the mode */
+ GST_RPAD_ACTIVATE_MODE (realpad) = mode;
}
/* when going to active allow data passing now */
if (active) {
GST_CAT_DEBUG (GST_CAT_PADS, "activating pad %s:%s in mode %d",
GST_DEBUG_PAD_NAME (realpad), mode);
- GST_FLAG_SET (realpad, GST_PAD_ACTIVE);
- }
- GST_UNLOCK (realpad);
+ GST_RPAD_UNSET_FLUSHING (realpad);
+ GST_UNLOCK (realpad);
+ } else {
+ GST_UNLOCK (realpad);
+ /* and make streaming finish */
+ GST_STREAM_LOCK (realpad);
+ GST_STREAM_UNLOCK (realpad);
+ }
return TRUE;
was_ok:
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
- result = GST_FLAG_IS_SET (realpad, GST_PAD_ACTIVE);
+ result = GST_PAD_MODE_ACTIVATE (GST_RPAD_ACTIVATE_MODE (realpad));
GST_UNLOCK (realpad);
return result;
if (G_UNLIKELY ((func = GST_RPAD_QUERYTYPEFUNC (rpad)) == NULL))
goto no_func;
- return func (GST_PAD (rpad));
+ return func (GST_PAD_CAST (rpad));
no_func:
{
goto not_linked_together;
if (GST_RPAD_UNLINKFUNC (realsrc)) {
- GST_RPAD_UNLINKFUNC (realsrc) (GST_PAD (realsrc));
+ GST_RPAD_UNLINKFUNC (realsrc) (GST_PAD_CAST (realsrc));
}
if (GST_RPAD_UNLINKFUNC (realsink)) {
- GST_RPAD_UNLINKFUNC (realsink) (GST_PAD (realsink));
+ GST_RPAD_UNLINKFUNC (realsink) (GST_PAD_CAST (realsink));
}
/* first clear peers */
if (G_UNLIKELY (GST_RPAD_PEER (realsink) != NULL))
goto sink_was_linked;
- if ((GST_PAD (realsrc) != srcpad) || (GST_PAD (realsink) != sinkpad)) {
+ if ((GST_PAD_CAST (realsrc) != srcpad)
+ || (GST_PAD_CAST (realsink) != sinkpad)) {
GST_CAT_INFO (GST_CAT_PADS, "*actually* linking %s:%s and %s:%s",
GST_DEBUG_PAD_NAME (realsrc), GST_DEBUG_PAD_NAME (realsink));
}
if (GST_RPAD_LINKFUNC (realsrc)) {
/* this one will call the peer link function */
result =
- GST_RPAD_LINKFUNC (realsrc) (GST_PAD (realsrc), GST_PAD (realsink));
+ GST_RPAD_LINKFUNC (realsrc) (GST_PAD_CAST (realsrc),
+ GST_PAD_CAST (realsink));
} else if (GST_RPAD_LINKFUNC (realsink)) {
/* if no source link function, we need to call the sink link
* function ourselves. */
result =
- GST_RPAD_LINKFUNC (realsink) (GST_PAD (realsink), GST_PAD (realsrc));
+ GST_RPAD_LINKFUNC (realsink) (GST_PAD_CAST (realsink),
+ GST_PAD_CAST (realsrc));
} else {
result = GST_PAD_LINK_OK;
}
GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
element = GST_PAD_PARENT (realpad);
if (element)
- gst_object_ref (GST_OBJECT (element));
+ gst_object_ref (GST_OBJECT_CAST (element));
GST_UNLOCK (realpad);
return element;
GST_FLAG_SET (realpad, GST_PAD_IN_GETCAPS);
GST_UNLOCK (realpad);
- result = GST_RPAD_GETCAPSFUNC (realpad) (GST_PAD (realpad));
+ result = GST_RPAD_GETCAPSFUNC (realpad) (GST_PAD_CAST (realpad));
GST_LOCK (realpad);
GST_FLAG_UNSET (realpad, GST_PAD_IN_GETCAPS);
if (G_UNLIKELY (GST_RPAD_IS_IN_GETCAPS (peerpad)))
goto was_dispatching;
- gst_object_ref (GST_OBJECT (peerpad));
+ gst_object_ref (GST_OBJECT_CAST (peerpad));
GST_UNLOCK (realpad);
result = gst_pad_get_caps (GST_PAD_CAST (peerpad));
- gst_object_unref (GST_OBJECT (peerpad));
+ gst_object_unref (GST_OBJECT_CAST (peerpad));
return result;
GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
result = GST_RPAD_PEER (realpad);
if (result)
- gst_object_ref (GST_OBJECT (result));
+ gst_object_ref (GST_OBJECT_CAST (result));
GST_UNLOCK (realpad);
return GST_PAD_CAST (result);
GST_LOCK (pad);
result = GST_PAD_REALIZE (pad);
if (result && pad != GST_PAD_CAST (result)) {
- gst_object_ref (GST_OBJECT (result));
+ gst_object_ref (GST_OBJECT_CAST (result));
GST_UNLOCK (pad);
/* no other thread could dispose this since we
* hold at least one ref */
- gst_object_unref (GST_OBJECT (pad));
+ gst_object_unref (GST_OBJECT_CAST (pad));
} else {
GST_UNLOCK (pad);
}
orig = pads = gst_pad_get_internal_links (pad);
while (pads) {
- GstPad *eventpad = GST_PAD (pads->data);
+ GstPad *eventpad = GST_PAD_CAST (pads->data);
pads = g_list_next (pads);
if (GST_RPAD_TASK (rpad)) {
GST_DEBUG_OBJECT (rpad, "pausing task because of eos");
- gst_task_pause (GST_RPAD_TASK (rpad));
+ gst_pad_pause_task (GST_PAD_CAST (rpad));
}
}
default:
GstRealPad *int_peer = GST_RPAD_PEER (int_rpad);
if (int_peer) {
- res = dispatch (GST_PAD (int_peer), data);
+ res = dispatch (GST_PAD_CAST (int_peer), data);
if (res)
break;
}
GstPad *pad;
GstRealPad *rpad;
- pad = GST_PAD (object);
+ pad = GST_PAD_CAST (object);
rpad = GST_REAL_PAD (object);
/* No linked pad can ever be disposed.
orig = ghostpads = g_list_copy (rpad->ghostpads);
while (ghostpads) {
- GstPad *ghostpad = GST_PAD (ghostpads->data);
+ GstPad *ghostpad = GST_PAD_CAST (ghostpads->data);
if (GST_IS_ELEMENT (GST_OBJECT_PARENT (ghostpad))) {
GstElement *parent = GST_ELEMENT (GST_OBJECT_PARENT (ghostpad));
if (GST_RPAD_PEER (realpad) != NULL) {
gchar *content;
- peer = GST_PAD (GST_RPAD_PEER (realpad));
+ peer = GST_PAD_CAST (GST_RPAD_PEER (realpad));
/* first check to see if the peer's parent's parent is the same */
/* we just save it off */
content = g_strdup_printf ("%s.%s",
"signal block taken on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
/* need to grab extra ref for the callbacks */
- gst_object_ref (GST_OBJECT (pad));
+ gst_object_ref (GST_OBJECT_CAST (pad));
callback = pad->block_callback;
if (callback) {
GST_PAD_BLOCK_SIGNAL (pad);
}
- gst_object_unref (GST_OBJECT (pad));
+ gst_object_unref (GST_OBJECT_CAST (pad));
}
/**********************************************************************
* Data passing functions
*/
-
/**
- * gst_pad_push:
- * @pad: a source #GstPad.
- * @buffer: the #GstBuffer to push.
+ * gst_pad_chain:
+ * @pad: a sink #GstPad.
+ * @buffer: the #GstBuffer to send.
*
- * Pushes a buffer to the peer of @pad. @pad must be linked.
+ * Chain a buffer to @pad. The pad has to be a GstRealPad.
*
- * Returns: a #GstFlowReturn from the peer pad.
+ * Returns: a #GstFlowReturn from the pad.
*
* MT safe.
*/
GstFlowReturn
-gst_pad_push (GstPad * pad, GstBuffer * buffer)
+gst_pad_chain (GstPad * pad, GstBuffer * buffer)
{
- GstRealPad *peer;
- GstFlowReturn ret;
- GstPadChainFunction chainfunc;
GstCaps *caps;
gboolean caps_changed;
+ GstPadChainFunction chainfunc;
+ GstFlowReturn ret;
g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR);
- g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SRC,
+ g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK,
GST_FLOW_ERROR);
g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+ GST_STREAM_LOCK (pad);
GST_LOCK (pad);
- while (G_UNLIKELY (GST_RPAD_IS_BLOCKED (pad)))
- handle_pad_block (GST_REAL_PAD_CAST (pad));
-
- if (G_UNLIKELY ((peer = GST_RPAD_PEER (pad)) == NULL))
- goto not_linked;
-
- if (G_UNLIKELY (!GST_RPAD_IS_ACTIVE (peer)))
- goto not_active;
-
- if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (peer)))
+ if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (pad)))
goto flushing;
- gst_object_ref (GST_OBJECT_CAST (peer));
+ caps = GST_BUFFER_CAPS (buffer);
+ caps_changed = caps && caps != GST_RPAD_CAPS (pad);
GST_UNLOCK (pad);
- /* FIXME, move capnego this into a base class? */
- caps = GST_BUFFER_CAPS (buffer);
- caps_changed = caps && caps != GST_RPAD_CAPS (peer);
- /* we got a new datatype on the peer pad, see if it can handle it */
+ /* we got a new datatype on the pad, see if it can handle it */
if (G_UNLIKELY (caps_changed)) {
GST_DEBUG ("caps changed to %" GST_PTR_FORMAT, caps);
- if (G_UNLIKELY (!gst_pad_configure_sink (GST_PAD_CAST (peer), caps)))
+ if (G_UNLIKELY (!gst_pad_configure_sink (pad, caps)))
goto not_negotiated;
}
- /* NOTE: we read the peer chainfunc unlocked.
- * we cannot hold the lock for the peer so we might send
+ /* NOTE: we read the chainfunc unlocked.
+ * we cannot hold the lock for the pad so we might send
* the data to the wrong function. This is not really a
* problem since functions are assigned at creation time
* and don't change that often... */
- if (G_UNLIKELY ((chainfunc = peer->chainfunc) == NULL))
+ if (G_UNLIKELY ((chainfunc = GST_RPAD_CHAINFUNC (pad)) == NULL))
goto no_function;
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
- "calling chainfunction &%s of peer pad %s:%s",
- GST_DEBUG_FUNCPTR_NAME (chainfunc), GST_DEBUG_PAD_NAME (peer));
+ "calling chainfunction &%s of pad %s:%s",
+ GST_DEBUG_FUNCPTR_NAME (chainfunc), GST_DEBUG_PAD_NAME (pad));
- ret = chainfunc (GST_PAD_CAST (peer), buffer);
-
- gst_object_unref (GST_OBJECT_CAST (peer));
+ ret = chainfunc (pad, buffer);
+ GST_STREAM_UNLOCK (pad);
return ret;
- /* ERROR recovery here */
-not_linked:
- {
- gst_buffer_unref (buffer);
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
- "pushing, but it was not linked");
- GST_UNLOCK (pad);
- return GST_FLOW_NOT_CONNECTED;
- }
-not_active:
- {
- gst_buffer_unref (buffer);
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
- "pushing, but it was inactive");
- GST_UNLOCK (pad);
- return GST_FLOW_WRONG_STATE;
- }
+ /* ERRORS */
flushing:
{
gst_buffer_unref (buffer);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pushing, but pad was flushing");
GST_UNLOCK (pad);
+ GST_STREAM_UNLOCK (pad);
return GST_FLOW_UNEXPECTED;
}
not_negotiated:
{
gst_buffer_unref (buffer);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
- "pushing buffer but peer did not accept");
+ "pushing buffer but pad did not accept");
+ GST_STREAM_UNLOCK (pad);
return GST_FLOW_NOT_NEGOTIATED;
}
no_function:
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pushing, but not chainhandler");
GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL),
- ("push on pad %s:%s but the peer pad %s:%s has no chainfunction",
- GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (peer)));
- gst_object_unref (GST_OBJECT (peer));
+ ("push on pad %s:%s but it has no chainfunction",
+ GST_DEBUG_PAD_NAME (pad)));
+ GST_STREAM_UNLOCK (pad);
return GST_FLOW_ERROR;
}
}
/**
+ * gst_pad_push:
+ * @pad: a source #GstPad.
+ * @buffer: the #GstBuffer to push.
+ *
+ * Pushes a buffer to the peer of @pad. @pad must be linked.
+ *
+ * Returns: a #GstFlowReturn from the peer pad.
+ *
+ * MT safe.
+ */
+GstFlowReturn
+gst_pad_push (GstPad * pad, GstBuffer * buffer)
+{
+ GstRealPad *peer;
+ GstFlowReturn ret;
+
+ g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR);
+ g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SRC,
+ GST_FLOW_ERROR);
+ g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
+ g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+
+ GST_LOCK (pad);
+ while (G_UNLIKELY (GST_RPAD_IS_BLOCKED (pad)))
+ handle_pad_block (GST_REAL_PAD_CAST (pad));
+
+ if (G_UNLIKELY ((peer = GST_RPAD_PEER (pad)) == NULL))
+ goto not_linked;
+
+ gst_object_ref (GST_OBJECT_CAST (peer));
+ GST_UNLOCK (pad);
+
+ ret = gst_pad_chain (GST_PAD_CAST (peer), buffer);
+
+ gst_object_unref (GST_OBJECT_CAST (peer));
+
+ return ret;
+
+ /* ERROR recovery here */
+not_linked:
+ {
+ gst_buffer_unref (buffer);
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+ "pushing, but it was not linked");
+ GST_UNLOCK (pad);
+ return GST_FLOW_NOT_CONNECTED;
+ }
+}
+
+/**
* gst_pad_check_pull_range:
* @pad: a sink #GstRealPad.
*
}
/**
+ * gst_pad_get_range:
+ * @pad: a src #GstPad.
+ * @buffer: a pointer to hold the #GstBuffer.
+ * @offset: The start offset of the buffer
+ * @length: The length of the buffer
+ *
+ * Calls the getrange function of @pad.
+ *
+ * Returns: a #GstFlowReturn from the pad.
+ *
+ * MT safe.
+ */
+GstFlowReturn
+gst_pad_get_range (GstPad * pad, guint64 offset, guint size,
+ GstBuffer ** buffer)
+{
+ GstFlowReturn ret;
+ GstPadGetRangeFunction getrangefunc;
+
+ g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR);
+ g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SRC,
+ GST_FLOW_ERROR);
+ g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
+
+ GST_STREAM_LOCK (pad);
+
+ GST_LOCK (pad);
+ if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (pad)))
+ goto flushing;
+ GST_UNLOCK (pad);
+
+ if (G_UNLIKELY ((getrangefunc = GST_RPAD_GETRANGEFUNC (pad)) == NULL))
+ goto no_function;
+
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+ "calling getrangefunc %s of peer pad %s:%s, offset %"
+ G_GUINT64_FORMAT ", size %u",
+ GST_DEBUG_FUNCPTR_NAME (getrangefunc), GST_DEBUG_PAD_NAME (pad),
+ offset, size);
+
+ ret = getrangefunc (pad, offset, size, buffer);
+
+ GST_STREAM_UNLOCK (pad);
+
+ return ret;
+
+ /* ERRORS */
+flushing:
+ {
+ GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+ "pulling range, but pad was flushing");
+ GST_UNLOCK (pad);
+ GST_STREAM_UNLOCK (pad);
+ return GST_FLOW_UNEXPECTED;
+ }
+no_function:
+ {
+ GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL),
+ ("pullrange on pad %s:%s but it has no getrangefunction",
+ GST_DEBUG_PAD_NAME (pad)));
+ GST_STREAM_UNLOCK (pad);
+ return GST_FLOW_ERROR;
+ }
+}
+
+
+/**
* gst_pad_pull_range:
* @pad: a sink #GstPad.
* @buffer: a pointer to hold the #GstBuffer.
{
GstRealPad *peer;
GstFlowReturn ret;
- GstPadGetRangeFunction getrangefunc;
g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR);
g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK,
if (G_UNLIKELY ((peer = GST_RPAD_PEER (pad)) == NULL))
goto not_connected;
- if (G_UNLIKELY (!GST_RPAD_IS_ACTIVE (peer)))
- goto not_active;
-
- if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (peer)))
- goto flushing;
-
gst_object_ref (GST_OBJECT_CAST (peer));
GST_UNLOCK (pad);
- /* see note in above function */
- if (G_UNLIKELY ((getrangefunc = peer->getrangefunc) == NULL))
- goto no_function;
-
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
- "calling getrangefunc %s of peer pad %s:%s, offset %"
- G_GUINT64_FORMAT ", size %u",
- GST_DEBUG_FUNCPTR_NAME (getrangefunc), GST_DEBUG_PAD_NAME (peer),
- offset, size);
-
- ret = getrangefunc (GST_PAD_CAST (peer), offset, size, buffer);
+ ret = gst_pad_get_range (GST_PAD_CAST (peer), offset, size, buffer);
gst_object_unref (GST_OBJECT_CAST (peer));
GST_UNLOCK (pad);
return GST_FLOW_NOT_CONNECTED;
}
-not_active:
- {
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
- "pulling range, but it was inactive");
- GST_UNLOCK (pad);
- return GST_FLOW_WRONG_STATE;
- }
-flushing:
- {
- GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
- "pulling range, but pad was flushing");
- GST_UNLOCK (pad);
- return GST_FLOW_UNEXPECTED;
- }
-no_function:
- {
- GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL),
- ("pullrange on pad %s:%s but the peer pad %s:%s has no getrangefunction",
- GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (peer)));
- gst_object_unref (GST_OBJECT (peer));
- return GST_FLOW_ERROR;
- }
}
/**
* @pad: a #GstPad to push the event to.
* @event: the #GstEvent to send to the pad.
*
- * Sends the event to the peer of the given pad.
+ * Sends the event to the peer of the given pad. This function is
+ * mainly used by elements to send events to their peer
+ * elements.
*
* Returns: TRUE if the event was handled.
*
gst_pad_send_event (GstPad * pad, GstEvent * event)
{
gboolean result = FALSE;
- GstRealPad *rpad;
+ GstRealPad *realpad;
GstPadEventFunction eventfunc;
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
g_return_val_if_fail (event != NULL, FALSE);
- rpad = GST_PAD_REALIZE (pad);
+ GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
if (GST_EVENT_SRC (event) == NULL)
- GST_EVENT_SRC (event) = gst_object_ref (GST_OBJECT (rpad));
+ GST_EVENT_SRC (event) = gst_object_ref (GST_OBJECT_CAST (realpad));
GST_CAT_DEBUG (GST_CAT_EVENT, "have event type %d on pad %s:%s",
- GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (rpad));
-
- if (GST_PAD_IS_SINK (pad)) {
- if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH) {
- GST_CAT_DEBUG (GST_CAT_EVENT, "have flush event");
- GST_LOCK (pad);
- if (GST_EVENT_FLUSH_DONE (event)) {
- GST_CAT_DEBUG (GST_CAT_EVENT, "clear flush flag");
- GST_FLAG_UNSET (pad, GST_PAD_FLUSHING);
- } else {
- GST_CAT_DEBUG (GST_CAT_EVENT, "set flush flag");
- GST_FLAG_SET (pad, GST_PAD_FLUSHING);
- }
- GST_UNLOCK (pad);
+ GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (realpad));
+
+ if (GST_PAD_IS_SINK (realpad)) {
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_FLUSH:
+ GST_CAT_DEBUG (GST_CAT_EVENT, "have flush event");
+ if (GST_EVENT_FLUSH_DONE (event)) {
+ GST_RPAD_UNSET_FLUSHING (realpad);
+ GST_CAT_DEBUG (GST_CAT_EVENT, "cleared flush flag");
+ } else {
+ /* can't even accept a flush begin event when flushing */
+ if (GST_RPAD_IS_FLUSHING (realpad))
+ goto flushing;
+ GST_RPAD_SET_FLUSHING (realpad);
+ GST_CAT_DEBUG (GST_CAT_EVENT, "set flush flag");
+ }
+ break;
+ default:
+ if (GST_RPAD_IS_FLUSHING (realpad))
+ goto flushing;
+ break;
}
}
- if ((eventfunc = GST_RPAD_EVENTFUNC (rpad)) == NULL)
+ if ((eventfunc = GST_RPAD_EVENTFUNC (realpad)) == NULL)
goto no_function;
- result = eventfunc (GST_PAD_CAST (rpad), event);
+ gst_object_ref (GST_OBJECT_CAST (realpad));
+ GST_UNLOCK (realpad);
+
+ result = eventfunc (GST_PAD_CAST (realpad), event);
+
+ gst_object_unref (GST_OBJECT_CAST (realpad));
return result;
/* ERROR handling */
+lost_ghostpad:
+ {
+ GST_CAT_DEBUG (GST_CAT_EVENT, "lost ghostpad");
+ gst_event_unref (event);
+ return FALSE;
+ }
no_function:
{
g_warning ("pad %s:%s has no event handler, file a bug.",
- GST_DEBUG_PAD_NAME (rpad));
+ GST_DEBUG_PAD_NAME (realpad));
+ GST_UNLOCK (realpad);
+ gst_event_unref (event);
+ return FALSE;
+ }
+flushing:
+ {
+ GST_UNLOCK (realpad);
+ GST_CAT_DEBUG (GST_CAT_EVENT, "received event on flushing pad");
gst_event_unref (event);
return FALSE;
}
}
gboolean
-gst_pad_start_task (GstPad * pad)
+gst_pad_start_task (GstPad * pad, GstTaskFunction func, gpointer data)
{
- g_warning ("implement gst_pad_start_task()");
- return FALSE;
+ GstElement *parent;
+ GstScheduler *sched;
+ GstTask *task;
+
+ g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
+ g_return_val_if_fail (func != NULL, FALSE);
+
+ GST_LOCK (pad);
+ parent = GST_PAD_PARENT (pad);
+
+ if (parent == NULL || !GST_IS_ELEMENT (parent))
+ goto no_parent;
+
+ sched = GST_ELEMENT_SCHEDULER (parent);
+ if (sched == NULL)
+ goto no_sched;
+
+ task = GST_RPAD_TASK (pad);
+ if (task == NULL) {
+ task = gst_scheduler_create_task (sched, func, data);
+ gst_task_set_lock (task, GST_STREAM_GET_LOCK (pad));
+
+ GST_RPAD_TASK (pad) = task;
+ }
+ GST_UNLOCK (pad);
+
+ gst_task_start (task);
+
+ return TRUE;
+
+ /* ERRORS */
+no_parent:
+ {
+ GST_UNLOCK (pad);
+ GST_DEBUG ("no parent");
+ return FALSE;
+ }
+no_sched:
+ {
+ GST_UNLOCK (pad);
+ GST_DEBUG ("no scheduler");
+ return FALSE;
+ }
}
gboolean
gst_pad_pause_task (GstPad * pad)
{
- g_warning ("implement gst_pad_pause_task()");
- return FALSE;
+ GstTask *task;
+
+ g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
+
+ GST_LOCK (pad);
+ task = GST_RPAD_TASK (pad);
+ if (task == NULL)
+ goto no_task;
+ gst_task_pause (task);
+ GST_UNLOCK (pad);
+
+ GST_STREAM_LOCK (pad);
+ GST_STREAM_UNLOCK (pad);
+
+ return TRUE;
+
+no_task:
+ {
+ GST_UNLOCK (pad);
+ return TRUE;
+ }
}
gboolean
gst_pad_stop_task (GstPad * pad)
{
- g_warning ("implement gst_pad_stop_task()");
- return FALSE;
+ GstTask *task;
+
+ g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
+
+ GST_LOCK (pad);
+ task = GST_RPAD_TASK (pad);
+ if (task == NULL)
+ goto no_task;
+ GST_RPAD_TASK (pad) = NULL;
+ GST_UNLOCK (pad);
+
+ gst_task_stop (task);
+
+ GST_STREAM_LOCK (pad);
+ GST_STREAM_UNLOCK (pad);
+
+ gst_object_unref (GST_OBJECT_CAST (task));
+
+ return TRUE;
+
+no_task:
+ {
+ GST_UNLOCK (pad);
+ return TRUE;
+ }
}
} GstPadDirection;
typedef enum {
- GST_PAD_ACTIVE = GST_OBJECT_FLAG_LAST,
- GST_PAD_BLOCKED,
+ GST_PAD_BLOCKED = GST_OBJECT_FLAG_LAST,
GST_PAD_FLUSHING,
GST_PAD_IN_GETCAPS,
GST_PAD_IN_SETCAPS,
GstPadGetRangeFunction getrangefunc;
GstPadEventFunction eventfunc;
+ GstActivateMode mode;
+
/* ghostpads */
GList *ghostpads;
guint32 ghostpads_cookie;
#define GST_RPAD_CHECKGETRANGEFUNC(pad) (GST_REAL_PAD_CAST(pad)->checkgetrangefunc)
#define GST_RPAD_GETRANGEFUNC(pad) (GST_REAL_PAD_CAST(pad)->getrangefunc)
#define GST_RPAD_EVENTFUNC(pad) (GST_REAL_PAD_CAST(pad)->eventfunc)
+#define GST_RPAD_ACTIVATE_MODE(pad) (GST_REAL_PAD_CAST(pad)->mode)
#define GST_RPAD_QUERYTYPEFUNC(pad) (GST_REAL_PAD_CAST(pad)->querytypefunc)
#define GST_RPAD_QUERYFUNC(pad) (GST_REAL_PAD_CAST(pad)->queryfunc)
#define GST_RPAD_INTLINKFUNC(pad) (GST_REAL_PAD_CAST(pad)->intlinkfunc)
#define GST_RPAD_BUFFERALLOCFUNC(pad) (GST_REAL_PAD_CAST(pad)->bufferallocfunc)
#define GST_RPAD_IS_LINKED(pad) (GST_RPAD_PEER(pad) != NULL)
-#define GST_RPAD_IS_ACTIVE(pad) (GST_FLAG_IS_SET (pad, GST_PAD_ACTIVE))
#define GST_RPAD_IS_BLOCKED(pad) (GST_FLAG_IS_SET (pad, GST_PAD_BLOCKED))
#define GST_RPAD_IS_FLUSHING(pad) (GST_FLAG_IS_SET (pad, GST_PAD_FLUSHING))
#define GST_RPAD_IS_IN_GETCAPS(pad) (GST_FLAG_IS_SET (pad, GST_PAD_IN_GETCAPS))
#define GST_RPAD_IS_IN_SETCAPS(pad) (GST_FLAG_IS_SET (pad, GST_PAD_IN_SETCAPS))
#define GST_RPAD_IS_USABLE(pad) (GST_RPAD_IS_LINKED (pad) && \
- GST_RPAD_IS_ACTIVE(pad) && GST_RPAD_IS_ACTIVE(GST_RPAD_PEER (pad)))
+ !GST_RPAD_IS_FLUSHING(pad) && !GST_RPAD_IS_FLUSHING(GST_RPAD_PEER (pad)))
#define GST_RPAD_IS_SRC(pad) (GST_RPAD_DIRECTION(pad) == GST_PAD_SRC)
#define GST_RPAD_IS_SINK(pad) (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK)
+#define GST_RPAD_SET_FLUSHING(pad) (GST_FLAG_SET (pad, GST_PAD_FLUSHING))
+#define GST_RPAD_UNSET_FLUSHING(pad) (GST_FLAG_UNSET (pad, GST_PAD_FLUSHING))
+
#define GST_STREAM_GET_LOCK(pad) (GST_PAD_REALIZE(pad)->stream_rec_lock)
#define GST_STREAM_LOCK(pad) (g_static_rec_mutex_lock(GST_STREAM_GET_LOCK(pad)))
#define GST_STREAM_TRYLOCK(pad) (g_static_rec_mutex_trylock(GST_STREAM_GET_LOCK(pad)))
#define GST_PAD_CAPS(pad) GST_RPAD_CAPS(GST_PAD_REALIZE (pad))
#define GST_PAD_PEER(pad) GST_PAD_CAST(GST_RPAD_PEER(GST_PAD_REALIZE(pad)))
+#define GST_PAD_TASK(pad) GST_RPAD_TASK(pad)
+
/* Some check functions (unused?) */
#define GST_PAD_IS_LINKED(pad) (GST_RPAD_IS_LINKED(GST_PAD_REALIZE(pad)))
-#define GST_PAD_IS_ACTIVE(pad) (GST_RPAD_IS_ACTIVE(GST_PAD_REALIZE(pad)))
#define GST_PAD_IS_BLOCKED(pad) (GST_RPAD_IS_BLOCKED(GST_PAD_REALIZE(pad)))
#define GST_PAD_IS_FLUSHING(pad) (GST_RPAD_IS_FLUSHING(GST_PAD_REALIZE(pad)))
#define GST_PAD_IS_IN_GETCAPS(pad) (GST_RPAD_IS_IN_GETCAPS(GST_PAD_REALIZE(pad)))
GstCaps * gst_pad_get_allowed_caps (GstPad * srcpad);
GstCaps * gst_pad_get_negotiated_caps (GstPad * pad);
-/* data passing functions */
+/* data passing functions to peer */
GstFlowReturn gst_pad_push (GstPad *pad, GstBuffer *buffer);
gboolean gst_pad_check_pull_range (GstPad *pad);
GstFlowReturn gst_pad_pull_range (GstPad *pad, guint64 offset, guint size,
GstBuffer **buffer);
gboolean gst_pad_push_event (GstPad *pad, GstEvent *event);
-gboolean gst_pad_send_event (GstPad *pad, GstEvent *event);
gboolean gst_pad_event_default (GstPad *pad, GstEvent *event);
+/* data passing functions on pad */
+GstFlowReturn gst_pad_chain (GstPad *pad, GstBuffer *buffer);
+GstFlowReturn gst_pad_get_range (GstPad *pad, guint64 offset, guint size,
+ GstBuffer **buffer);
+gboolean gst_pad_send_event (GstPad *pad, GstEvent *event);
+
/* pad tasks */
-gboolean gst_pad_start_task (GstPad *pad);
+gboolean gst_pad_start_task (GstPad *pad, GstTaskFunction func,
+ gpointer data);
gboolean gst_pad_pause_task (GstPad *pad);
gboolean gst_pad_stop_task (GstPad *pad);
/* forward event */
gst_pad_event_default (pad, event);
if (GST_EVENT_FLUSH_DONE (event)) {
- GST_STREAM_LOCK (queue->srcpad);
- gst_task_start (GST_RPAD_TASK (queue->srcpad));
- GST_STREAM_UNLOCK (queue->srcpad);
+ gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
+ queue->srcpad);
} else {
/* now unblock the chain function */
GST_QUEUE_MUTEX_LOCK;
g_cond_signal (queue->item_add);
/* make sure it stops */
- GST_STREAM_LOCK (queue->srcpad);
- gst_task_pause (GST_RPAD_TASK (queue->srcpad));
+ gst_pad_pause_task (queue->srcpad);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
- GST_STREAM_UNLOCK (queue->srcpad);
}
goto done;
case GST_EVENT_EOS:
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
- GST_STREAM_LOCK (pad);
-
/* we have to lock the queue since we span threads */
GST_QUEUE_MUTEX_LOCK;
STATUS (queue, "waiting for item_del signal from thread using qlock");
g_cond_wait (queue->item_del, queue->qlock);
+ if (GST_RPAD_IS_FLUSHING (pad))
+ goto out_flushing;
+
/* if there's a pending state change for this queue
* or its manager, switch back to iterator so bottom
* half of state change executes */
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK;
- GST_STREAM_UNLOCK (pad);
return GST_FLOW_OK;
out_unref:
GST_QUEUE_MUTEX_UNLOCK;
- GST_STREAM_UNLOCK (pad);
gst_buffer_unref (buffer);
out_flushing:
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
GST_QUEUE_MUTEX_UNLOCK;
- gst_task_pause (GST_RPAD_TASK (queue->srcpad));
- GST_STREAM_UNLOCK (pad);
+ gst_pad_pause_task (queue->srcpad);
gst_buffer_unref (buffer);
queue = GST_QUEUE (GST_PAD_PARENT (pad));
- GST_STREAM_LOCK (pad);
-
/* have to lock for thread-safety */
GST_QUEUE_MUTEX_LOCK;
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK;
- GST_STREAM_UNLOCK (pad);
return;
out_flushing:
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
- gst_task_pause (GST_RPAD_TASK (pad));
+ gst_pad_pause_task (pad);
GST_QUEUE_MUTEX_UNLOCK;
- GST_STREAM_UNLOCK (pad);
return;
}
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
if (mode == GST_ACTIVATE_PUSH) {
- /* if we have a scheduler we can start the task */
- if (GST_ELEMENT_SCHEDULER (queue)) {
- GST_STREAM_LOCK (pad);
- GST_RPAD_TASK (pad) =
- gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (queue),
- (GstTaskFunction) gst_queue_loop, pad);
-
- gst_task_start (GST_RPAD_TASK (pad));
- GST_STREAM_UNLOCK (pad);
- result = TRUE;
- }
+ result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
} else {
/* step 1, unblock chain and loop functions */
GST_QUEUE_MUTEX_LOCK;
GST_QUEUE_MUTEX_UNLOCK;
/* step 2, make sure streaming finishes */
- GST_STREAM_LOCK (pad);
- /* step 3, stop the task */
- gst_task_stop (GST_RPAD_TASK (pad));
- gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
- GST_STREAM_UNLOCK (pad);
-
- result = TRUE;
+ result = gst_pad_stop_task (pad);
}
return result;
}
static void
gst_task_init (GstTask * task)
{
+ task->lock = NULL;
task->cond = g_cond_new ();
task->state = GST_TASK_STOPPED;
}
}
/**
+ * gst_task_set_lock:
+ * @task: The #GstTask to use
+ * @mutex: The GMutex to use
+ *
+ * Set the mutex used by the task.
+ *
+ * MT safe.
+ */
+void
+gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex)
+{
+ GST_LOCK (task);
+ task->lock = mutex;
+ GST_UNLOCK (task);
+}
+
+
+/**
* gst_task_get_state:
* @task: The #GstTask to query
*
#define GST_TASK_SIGNAL(task) g_cond_signal(GST_TASK_GET_COND (task))
#define GST_TASK_BROADCAST(task) g_cond_breadcast(GST_TASK_GET_COND (task))
+#define GST_TASK_GET_LOCK(task) (GST_TASK_CAST(task)->lock)
+#define GST_TASK_LOCK(task) g_static_rec_mutex_lock(GST_TASK_GET_LOCK(task))
+#define GST_TASK_UNLOCK(task) g_static_rec_mutex_unlock(GST_TASK_GET_LOCK(task))
+
struct _GstTask {
GstObject object;
- /*< public >*/ /* with TASK_LOCK */
- GstTaskState state;
- GCond *cond;
+
+ /*< public >*/ /* with LOCK */
+ GstTaskState state;
+ GCond *cond;
+
+ GStaticRecMutex *lock;
GstTaskFunction func;
- gpointer data;
+ gpointer data;
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
GType gst_task_get_type (void);
GstTask* gst_task_create (GstTaskFunction func, gpointer data);
+void gst_task_set_lock (GstTask *task, GStaticRecMutex *mutex);
GstTaskState gst_task_get_state (GstTask *task);
GstThreadScheduler *tsched =
GST_THREAD_SCHEDULER (GST_OBJECT_PARENT (GST_OBJECT (task)));
GstTaskState old;
+ GStaticRecMutex *lock;
GST_DEBUG_OBJECT (task, "Starting task %p", task);
+ if ((lock = GST_TASK_GET_LOCK (task)) == NULL) {
+ lock = g_new (GStaticRecMutex, 1);
+ g_static_rec_mutex_init (lock);
+ GST_TASK_GET_LOCK (task) = lock;
+ }
+
GST_LOCK (ttask);
old = GST_TASK_CAST (ttask)->state;
GST_TASK_CAST (ttask)->state = GST_TASK_STARTED;
GST_DEBUG_OBJECT (sched, "Entering task %p, thread %p", task,
g_thread_self ());
+ /* locking order is TASK_LOCK, LOCK */
+ GST_TASK_LOCK (task);
GST_LOCK (task);
while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) {
+ GST_TASK_UNLOCK (task);
GST_TASK_SIGNAL (task);
GST_TASK_WAIT (task);
+ GST_UNLOCK (task);
+ /* locking order.. */
+ GST_TASK_LOCK (task);
+ GST_LOCK (task);
if (task->state == GST_TASK_STOPPED)
goto done;
}
}
done:
GST_UNLOCK (task);
+ GST_TASK_UNLOCK (task);
GST_DEBUG_OBJECT (sched, "Exit task %p, thread %p", task, g_thread_self ());
if (adapter->assembled_size < size) {
adapter->assembled_size = (size / DEFAULT_SIZE + 1) * DEFAULT_SIZE;
- GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u\n",
+ GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u",
adapter->assembled_size);
adapter->assembled_data =
g_realloc (adapter->assembled_data, adapter->assembled_size);
g_return_if_fail (flush > 0);
g_return_if_fail (flush <= adapter->size);
- GST_LOG_OBJECT (adapter, "flushing %u bytes\n", flush);
+ GST_LOG_OBJECT (adapter, "flushing %u bytes", flush);
adapter->size -= flush;
adapter->assembled_len = 0;
while (flush > 0) {
if (basesink->preroll_queue->length == 0) {
GstBaseSinkClass *bclass = GST_BASESINK_GET_CLASS (basesink);
+ GST_DEBUG ("preroll buffer with TS: %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
if (bclass->preroll)
bclass->preroll (basesink, buffer);
}
gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad,
GstBuffer * buffer)
{
- gboolean usable;
+ gboolean flushing;
DEBUG ("finish preroll %p <\n", basesink);
/* lock order is important */
gst_element_commit_state (GST_ELEMENT (basesink));
GST_STATE_UNLOCK (basesink);
+ gst_basesink_preroll_queue_push (basesink, pad, buffer);
+
GST_LOCK (pad);
- usable = !GST_RPAD_IS_FLUSHING (pad) && GST_RPAD_IS_ACTIVE (pad);
+ flushing = GST_RPAD_IS_FLUSHING (pad);
GST_UNLOCK (pad);
- if (!usable)
- goto unusable;
-
- gst_basesink_preroll_queue_push (basesink, pad, buffer);
+ if (flushing)
+ goto flushing;
if (basesink->need_preroll)
goto still_queueing;
GST_STATE_UNLOCK (basesink);
return PREROLL_PLAYING;
}
-unusable:
+flushing:
{
GST_DEBUG ("pad is flushing");
GST_PREROLL_UNLOCK (pad);
g_assert (GST_BASESINK (GST_OBJECT_PARENT (pad))->pad_mode ==
GST_ACTIVATE_PUSH);
- GST_STREAM_LOCK (pad);
-
result = gst_basesink_chain_unlocked (pad, buf);
- GST_STREAM_UNLOCK (pad);
-
return result;
}
g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
- GST_STREAM_LOCK (pad);
-
result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf);
if (result != GST_FLOW_OK)
goto paused;
goto paused;
/* default */
- GST_STREAM_UNLOCK (pad);
return;
paused:
- gst_task_pause (GST_RPAD_TASK (pad));
- GST_STREAM_UNLOCK (pad);
+ gst_pad_pause_task (pad);
return;
}
/* if we have a scheduler we can start the task */
g_return_val_if_fail (basesink->has_loop, FALSE);
gst_pad_peer_set_active (pad, mode);
- if (GST_ELEMENT_SCHEDULER (basesink)) {
- GST_STREAM_LOCK (pad);
- GST_RPAD_TASK (pad) =
- gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesink),
- (GstTaskFunction) gst_basesink_loop, pad);
-
- gst_task_start (GST_RPAD_TASK (pad));
- GST_STREAM_UNLOCK (pad);
- result = TRUE;
- }
+ result =
+ gst_pad_start_task (pad, (GstTaskFunction) gst_basesink_loop, pad);
break;
case GST_ACTIVATE_NONE:
/* step 1, unblock clock sync (if any) or any other blocking thing */
GST_PREROLL_UNLOCK (pad);
/* step 2, make sure streaming finishes */
- GST_STREAM_LOCK (pad);
- /* step 3, stop the task */
- if (GST_RPAD_TASK (pad)) {
- gst_task_stop (GST_RPAD_TASK (pad));
- gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
- GST_RPAD_TASK (pad) = NULL;
- }
- GST_STREAM_UNLOCK (pad);
-
- result = TRUE;
+ result = gst_pad_stop_task (pad);
break;
}
basesink->pad_mode = mode;
basesink->have_preroll = FALSE;
GST_PREROLL_UNLOCK (basesink->sinkpad);
- /* make sure the element is finished processing */
- GST_STREAM_LOCK (basesink->sinkpad);
- GST_STREAM_UNLOCK (basesink->sinkpad);
/* clear EOS state */
basesink->eos = FALSE;
break;
}
/* and restart the task */
- if (GST_RPAD_TASK (src->srcpad)) {
- gst_task_start (GST_RPAD_TASK (src->srcpad));
- }
+ gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_basesrc_loop,
+ src->srcpad);
GST_STREAM_UNLOCK (src->srcpad);
gst_event_unref (event);
}
static GstFlowReturn
-gst_basesrc_get_range_unlocked (GstPad * pad, guint64 offset, guint length,
+gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length,
GstBuffer ** buf)
{
GstFlowReturn ret;
}
}
-static GstFlowReturn
-gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length,
- GstBuffer ** ret)
-{
- GstFlowReturn fret;
-
- GST_STREAM_LOCK (pad);
-
- fret = gst_basesrc_get_range_unlocked (pad, offset, length, ret);
-
- GST_STREAM_UNLOCK (pad);
-
- return fret;
-}
-
static gboolean
gst_basesrc_check_get_range (GstPad * pad)
{
src = GST_BASESRC (GST_OBJECT_PARENT (pad));
- GST_STREAM_LOCK (pad);
-
- ret = gst_basesrc_get_range_unlocked (pad, src->offset, src->blocksize, &buf);
+ ret = gst_basesrc_get_range (pad, src->offset, src->blocksize, &buf);
if (ret != GST_FLOW_OK)
goto eos;
if (ret != GST_FLOW_OK)
goto pause;
- GST_STREAM_UNLOCK (pad);
return;
eos:
{
GST_DEBUG_OBJECT (src, "going to EOS");
- gst_task_pause (GST_RPAD_TASK (pad));
+ gst_pad_pause_task (pad);
gst_pad_push_event (pad, gst_event_new (GST_EVENT_EOS));
- GST_STREAM_UNLOCK (pad);
return;
}
pause:
{
GST_DEBUG_OBJECT (src, "pausing task");
- gst_task_pause (GST_RPAD_TASK (pad));
- GST_STREAM_UNLOCK (pad);
+ gst_pad_pause_task (pad);
return;
}
}
result = FALSE;
switch (mode) {
case GST_ACTIVATE_PUSH:
- /* if we have a scheduler we can start the task */
- if (GST_ELEMENT_SCHEDULER (basesrc)) {
- GST_STREAM_LOCK (pad);
- GST_RPAD_TASK (pad) =
- gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesrc),
- (GstTaskFunction) gst_basesrc_loop, pad);
-
- gst_task_start (GST_RPAD_TASK (pad));
- GST_STREAM_UNLOCK (pad);
- result = TRUE;
- }
+ result =
+ gst_pad_start_task (pad, (GstTaskFunction) gst_basesrc_loop, pad);
break;
case GST_ACTIVATE_PULL:
result = TRUE;
gst_basesrc_unlock (basesrc);
/* step 2, make sure streaming finishes */
- GST_STREAM_LOCK (pad);
- /* step 3, stop the task */
- if (GST_RPAD_TASK (pad)) {
- gst_task_stop (GST_RPAD_TASK (pad));
- gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
- GST_RPAD_TASK (pad) = NULL;
- }
- GST_STREAM_UNLOCK (pad);
-
- result = TRUE;
+ result = gst_pad_stop_task (pad);
break;
}
return result;
/* forward event */
gst_pad_event_default (pad, event);
if (GST_EVENT_FLUSH_DONE (event)) {
- GST_STREAM_LOCK (queue->srcpad);
- gst_task_start (GST_RPAD_TASK (queue->srcpad));
- GST_STREAM_UNLOCK (queue->srcpad);
+ gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
+ queue->srcpad);
} else {
/* now unblock the chain function */
GST_QUEUE_MUTEX_LOCK;
g_cond_signal (queue->item_add);
/* make sure it stops */
- GST_STREAM_LOCK (queue->srcpad);
- gst_task_pause (GST_RPAD_TASK (queue->srcpad));
+ gst_pad_pause_task (queue->srcpad);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
- GST_STREAM_UNLOCK (queue->srcpad);
}
goto done;
case GST_EVENT_EOS:
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
- GST_STREAM_LOCK (pad);
-
/* we have to lock the queue since we span threads */
GST_QUEUE_MUTEX_LOCK;
STATUS (queue, "waiting for item_del signal from thread using qlock");
g_cond_wait (queue->item_del, queue->qlock);
+ if (GST_RPAD_IS_FLUSHING (pad))
+ goto out_flushing;
+
/* if there's a pending state change for this queue
* or its manager, switch back to iterator so bottom
* half of state change executes */
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK;
- GST_STREAM_UNLOCK (pad);
return GST_FLOW_OK;
out_unref:
GST_QUEUE_MUTEX_UNLOCK;
- GST_STREAM_UNLOCK (pad);
gst_buffer_unref (buffer);
out_flushing:
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
GST_QUEUE_MUTEX_UNLOCK;
- gst_task_pause (GST_RPAD_TASK (queue->srcpad));
- GST_STREAM_UNLOCK (pad);
+ gst_pad_pause_task (queue->srcpad);
gst_buffer_unref (buffer);
queue = GST_QUEUE (GST_PAD_PARENT (pad));
- GST_STREAM_LOCK (pad);
-
/* have to lock for thread-safety */
GST_QUEUE_MUTEX_LOCK;
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK;
- GST_STREAM_UNLOCK (pad);
return;
out_flushing:
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
- gst_task_pause (GST_RPAD_TASK (pad));
+ gst_pad_pause_task (pad);
GST_QUEUE_MUTEX_UNLOCK;
- GST_STREAM_UNLOCK (pad);
return;
}
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
if (mode == GST_ACTIVATE_PUSH) {
- /* if we have a scheduler we can start the task */
- if (GST_ELEMENT_SCHEDULER (queue)) {
- GST_STREAM_LOCK (pad);
- GST_RPAD_TASK (pad) =
- gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (queue),
- (GstTaskFunction) gst_queue_loop, pad);
-
- gst_task_start (GST_RPAD_TASK (pad));
- GST_STREAM_UNLOCK (pad);
- result = TRUE;
- }
+ result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
} else {
/* step 1, unblock chain and loop functions */
GST_QUEUE_MUTEX_LOCK;
GST_QUEUE_MUTEX_UNLOCK;
/* step 2, make sure streaming finishes */
- GST_STREAM_LOCK (pad);
- /* step 3, stop the task */
- gst_task_stop (GST_RPAD_TASK (pad));
- gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
- GST_STREAM_UNLOCK (pad);
-
- result = TRUE;
+ result = gst_pad_stop_task (pad);
}
return result;
}
break;
case GST_ACTIVATE_PULL:
g_return_val_if_fail (tee->has_sink_loop, FALSE);
- if (GST_ELEMENT_SCHEDULER (tee)) {
- GST_STREAM_LOCK (pad);
- GST_RPAD_TASK (pad) =
- gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (tee),
- (GstTaskFunction) gst_tee_loop, pad);
-
- gst_pad_start_task (pad);
- GST_STREAM_UNLOCK (pad);
- result = TRUE;
- }
+ result = gst_pad_start_task (pad, (GstTaskFunction) gst_tee_loop, pad);
break;
case GST_ACTIVATE_NONE:
- GST_STREAM_LOCK (pad);
- if (GST_RPAD_TASK (pad)) {
- gst_pad_stop_task (pad);
- gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
- GST_RPAD_TASK (pad) = NULL;
- }
- GST_STREAM_UNLOCK (pad);
-
- result = TRUE;
+ result = gst_pad_stop_task (pad);
break;
}
tee->sink_mode = mode;