2007-03-19 Wim Taymans <wim@fluendo.com>
* docs/gst/gstreamer-sections.txt:
+ Add new element field and method.
+
+ * gst/gstbin.c: (gst_bin_class_init), (gst_bin_init),
+ (bin_remove_messages), (gst_bin_add_func), (gst_bin_remove_func),
+ (gst_bin_recalc_state), (gst_bin_get_state_func),
+ (gst_bin_element_set_state), (gst_bin_change_state_func),
+ (gst_bin_continue_func), (bin_bus_handler),
+ (bin_push_state_continue), (bin_handle_async_start),
+ (bin_handle_async_done), (gst_bin_handle_message_func):
+ Make async state changes a bit smarter by using new ASYNC_START and
+ ASYNC_DONE messages. This reduces the number of times we run the state
+ recalculation thread.
+ Don't change state of element with a pending ASYNC_START message.
+ Deprecate STATE_DIRTY messages.
+
+ * gst/gstelement.c: (gst_element_init), (gst_element_send_event),
+ (gst_element_get_state_func), (gst_element_continue_state),
+ (gst_element_lost_state), (gst_element_set_state_func),
+ (gst_element_change_state):
+ * gst/gstelement.h:
+ Keep the state that was last set by the app in a new element field.
+ Don't allow state changes when handling an element event.
+ Post ASYNC_START and ASYNC_DONE messages.
+ Change lost_state so that we go to PAUSED and wait for the parent to set
+ us to PLAYING again (so latency calculation can be performed)
+ Export gst_element_change_state() method so that subclasses can use it.
+ API: gst_element_change_state()
+ API: GST_STATE_TARGET
+
+ * gst/gstpipeline.c: (gst_pipeline_class_init),
+ (reset_stream_time), (gst_pipeline_change_state),
+ (gst_pipeline_handle_message), (gst_pipeline_set_new_stream_time):
+ Using the new ASYNC_START message we can reset the base_time when
+ needed. This can then be used to implement base_time redistribution in
+ flushing seeks so that we can remove the explicit seek handling.
+ Perform latency query and configuration when going to PLAYING.
+
+ * libs/gst/base/gstbasesink.c: (gst_base_sink_commit_state),
+ (gst_base_sink_query), (gst_base_sink_change_state):
+ Post new ASYNC_START/ASYNC_DONE messages.
+
+ * tests/check/generic/sinks.c: (GST_START_TEST):
+ Fix test because the bin will not set the async element to PLAYING right
+ away.
+
+ * tests/check/gst/gstbin.c: (pop_async_done), (GST_START_TEST):
+ Make the message check a little stronger.
+ Handle ASYNC messages.
+
+ * tests/check/pipelines/cleanup.c: (GST_START_TEST):
+ * tests/check/pipelines/simple-launch-lines.c: (GST_START_TEST):
+ Expect ASYNC_DONE messages.
+
+2007-03-19 Wim Taymans <wim@fluendo.com>
+
+ * docs/gst/gstreamer-sections.txt:
* gst/gstmessage.c: (gst_message_new_async_start),
(gst_message_new_async_done), (gst_message_parse_info),
(gst_message_parse_async_start):
GST_STATE_NEXT
GST_STATE_PENDING
GST_STATE_RETURN
+GST_STATE_TARGET
GST_STATE_TRANSITION
GST_STATE_TRANSITION_CURRENT
GST_STATE_TRANSITION_NEXT
gst_element_state_get_name
gst_element_state_change_return_get_name
gst_element_sync_state_with_parent
+gst_element_change_state
<SUBSECTION element-tags>
gst_element_found_tags
static void gst_bin_dispose (GObject * object);
-static void gst_bin_recalc_state (GstBin * bin, gboolean force);
static GstStateChangeReturn gst_bin_change_state_func (GstElement * element,
GstStateChange transition);
static GstStateChangeReturn gst_bin_get_state_func (GstElement * element,
GstState * state, GstState * pending, GstClockTime timeout);
+static void bin_handle_async_done (GstBin * bin, GstMessage ** message);
+static void bin_push_state_continue (GstBin * bin);
static gboolean gst_bin_add_func (GstBin * bin, GstElement * element);
static gboolean gst_bin_remove_func (GstBin * bin, GstElement * element);
static void bin_remove_messages (GstBin * bin, GstObject * src,
GstMessageType types);
-static void gst_bin_recalc_func (GstBin * child, gpointer data);
+static void gst_bin_continue_func (GstBin * child, gpointer data);
static gint bin_element_is_sink (GstElement * child, GstBin * bin);
static gint bin_element_is_src (GstElement * child, GstBin * bin);
GST_DEBUG ("creating bin thread pool");
err = NULL;
klass->pool =
- g_thread_pool_new ((GFunc) gst_bin_recalc_func, NULL, -1, FALSE, &err);
+ g_thread_pool_new ((GFunc) gst_bin_continue_func, NULL, -1, FALSE, &err);
if (err != NULL) {
g_critical ("could alloc threadpool %s", err->message);
}
bin->children = NULL;
bin->children_cookie = 0;
bin->messages = NULL;
- bin->polling = FALSE;
- bin->state_dirty = FALSE;
bin->provided_clock = NULL;
+ bin->state_dirty = FALSE;
bin->clock_dirty = FALSE;
/* Set up a bus for listening to child elements */
if (message_check (message, &find) == 0) {
GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message),
- "deleting message of types %d", types);
+ "deleting message %p of types 0x%08x", message, types);
bin->messages = g_list_delete_link (bin->messages, walk);
gst_message_unref (message);
} else {
GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message),
- "not deleting message of type %d", GST_MESSAGE_TYPE (message));
+ "not deleting message %p of type 0x%08x", message,
+ GST_MESSAGE_TYPE (message));
}
}
}
* that is not important right now. When the pipeline goes to PLAYING,
* a new clock will be selected */
gst_element_set_clock (element, GST_ELEMENT_CLOCK (bin));
+ GST_DEBUG_OBJECT (bin, "marking state dirty");
bin->state_dirty = TRUE;
GST_OBJECT_UNLOCK (bin);
}
}
-
/**
* gst_bin_add:
* @bin: a #GstBin
gchar *elem_name;
GstIterator *it;
gboolean is_sink;
- GstMessage *clock_message = NULL;
+ GstMessage *clock_message = NULL, *async_message = NULL, *smessage = NULL;
+ GList *walk, *next;
+ gboolean other_async = FALSE, this_async = FALSE, cont = FALSE;
GST_OBJECT_LOCK (element);
/* Check if the element is already being removed and immediately
gst_message_new_clock_lost (GST_OBJECT_CAST (bin), bin->provided_clock);
}
+ /* remove messages for the element, if there was a pending ASYNC_START
+ * message we must see if removing the element caused the bin to lose its
+ * async state. */
+ for (walk = bin->messages; walk; walk = next) {
+ GstMessage *message = (GstMessage *) walk->data;
+ GstElement *src = GST_ELEMENT_CAST (GST_MESSAGE_SRC (message));
+
+ next = g_list_next (walk);
+
+ if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ASYNC_START) {
+ if (src == element)
+ this_async = TRUE;
+ else
+ other_async = TRUE;
+
+ GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message),
+ "looking at message %p", message);
+ }
+ if (src == element) {
+ /* delete all message types */
+ GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message),
+ "deleting message %p of element \"%s\"", message, elem_name);
+ bin->messages = g_list_delete_link (bin->messages, walk);
+ gst_message_unref (message);
+ }
+ }
+ /* all other elements were not async and we removed the async one,
+ * post a ASYNC_DONE message because we are not async anymore now. */
+ if (!other_async && this_async) {
+ GST_DEBUG_OBJECT (bin, "we removed the last async element");
+
+ cont = GST_OBJECT_PARENT (bin) == NULL;
+ if (!cont) {
+ bin_handle_async_done (bin, &smessage);
+ async_message = gst_message_new_async_done (GST_OBJECT_CAST (bin));
+ }
+ }
+ GST_DEBUG_OBJECT (bin, "marking state dirty");
bin->state_dirty = TRUE;
GST_OBJECT_UNLOCK (bin);
- if (clock_message) {
+ if (clock_message)
gst_element_post_message (GST_ELEMENT_CAST (bin), clock_message);
- }
+
+ if (smessage)
+ gst_element_post_message (GST_ELEMENT_CAST (bin), smessage);
+
+ if (async_message)
+ gst_element_post_message (GST_ELEMENT_CAST (bin), async_message);
+
+ if (cont)
+ bin_push_state_continue (bin);
GST_CAT_INFO_OBJECT (GST_CAT_PARENTAGE, bin, "removed child \"%s\"",
elem_name);
return result;
}
-/*
- * MT safe
- */
-static GstStateChangeReturn
-gst_bin_get_state_func (GstElement * element, GstState * state,
- GstState * pending, GstClockTime timeout)
-{
- GstBin *bin = GST_BIN (element);
- GstStateChangeReturn ret;
-
- GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "getting state");
-
- /* do a non forced recalculation of the state */
- gst_bin_recalc_state (bin, FALSE);
-
- ret = parent_class->get_state (element, state, pending, timeout);
-
- return ret;
-}
-
static void
gst_bin_recalc_state (GstBin * bin, gboolean force)
{
done:
bin->polling = FALSE;
+ GST_STATE_RETURN (bin) = ret;
GST_OBJECT_UNLOCK (bin);
/* now we can take the state lock, it is possible that new elements
}
}
+/*
+ * MT safe
+ */
+static GstStateChangeReturn
+gst_bin_get_state_func (GstElement * element, GstState * state,
+ GstState * pending, GstClockTime timeout)
+{
+ GstBin *bin = GST_BIN (element);
+ GstStateChangeReturn ret;
+
+ GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "getting state");
+
+ /* do a non forced recalculation of the state */
+ gst_bin_recalc_state (bin, FALSE);
+
+ ret = parent_class->get_state (element, state, pending, timeout);
+
+ return ret;
+}
+
/***********************************************
* Topologically sorted iterator
* see http://en.wikipedia.org/wiki/Topological_sorting
}
static GstStateChangeReturn
-gst_bin_element_set_state (GstBin * bin, GstElement * element, GstState pending)
+gst_bin_element_set_state (GstBin * bin, GstElement * element,
+ GstClockTime base_time, GstState current, GstState next)
{
GstStateChangeReturn ret;
gboolean locked;
+ GList *found;
+ MessageFind find;
/* peel off the locked flag */
GST_OBJECT_LOCK (element);
GST_OBJECT_UNLOCK (element);
/* skip locked elements */
- if (G_UNLIKELY (locked)) {
- GST_DEBUG_OBJECT (element,
- "element is locked, pretending state change succeeded");
- ret = GST_STATE_CHANGE_SUCCESS;
- goto done;
+ if (G_UNLIKELY (locked))
+ goto locked;
+
+ /* the element was busy with an upwards async state change, we must wait for
+ * an ASYNC_DONE message before we attemp to change the state. */
+ find.src = GST_OBJECT_CAST (element);
+ find.types = GST_MESSAGE_ASYNC_START;
+
+ GST_OBJECT_LOCK (bin);
+ if ((found = g_list_find_custom (bin->messages, &find,
+ (GCompareFunc) message_check))) {
+ GstMessage *message = GST_MESSAGE_CAST (found->data);
+ GstObject *src = GST_MESSAGE_SRC (message);
+
+ GST_DEBUG_OBJECT (element, "element message %p, %s async busy",
+ message, GST_ELEMENT_NAME (src));
+ /* only wait for upward state changes */
+ if (next > current)
+ goto was_busy;
}
+ GST_OBJECT_UNLOCK (bin);
+
+ GST_DEBUG_OBJECT (bin,
+ "setting element %s to %s, base_time %" GST_TIME_FORMAT,
+ GST_ELEMENT_NAME (element), gst_element_state_get_name (next),
+ GST_TIME_ARGS (base_time));
+
+ /* set base_time on child */
+ gst_element_set_base_time (element, base_time);
/* change state */
- ret = gst_element_set_state (element, pending);
+ ret = gst_element_set_state (element, next);
-done:
return ret;
+
+locked:
+ {
+ GST_DEBUG_OBJECT (element,
+ "element is locked, pretending state change succeeded");
+ return GST_STATE_CHANGE_SUCCESS;
+ }
+was_busy:
+ {
+ GST_DEBUG_OBJECT (element, "element is was busy delaying state change");
+ GST_OBJECT_UNLOCK (bin);
+ return GST_STATE_CHANGE_ASYNC;
+ }
}
/* gst_iterator_fold functions for pads_activate
child = GST_ELEMENT_CAST (data);
- /* set base_time on child */
- gst_element_set_base_time (child, base_time);
-
- /* set state now */
- ret = gst_bin_element_set_state (bin, child, next);
+ /* set state and base_time now */
+ ret = gst_bin_element_set_state (bin, child, base_time, current, next);
switch (ret) {
case GST_STATE_CHANGE_SUCCESS:
gst_element_state_get_name (next));
break;
case GST_STATE_CHANGE_ASYNC:
+ {
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
"child '%s' is changing state asynchronously to %s",
GST_ELEMENT_NAME (child), gst_element_state_get_name (next));
have_async = TRUE;
break;
+ }
case GST_STATE_CHANGE_FAILURE:
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
"child '%s' failed to go to state %d(%s)",
gst_iterator_free (it);
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
- "done changing bin's state from %s to %s, now in %s, ret %d",
+ "done changing bin's state from %s to %s, now in %s, ret %s",
gst_element_state_get_name (current),
gst_element_state_get_name (next),
- gst_element_state_get_name (GST_STATE (element)), ret);
+ gst_element_state_get_name (GST_STATE (element)),
+ gst_element_state_change_return_get_name (ret));
return ret;
}
static void
-gst_bin_recalc_func (GstBin * bin, gpointer data)
+gst_bin_continue_func (GstBin * bin, gpointer data)
{
- GST_DEBUG_OBJECT (bin, "doing state recalc");
+ GstState current, next, pending, target, old_state, old_next;
+ GstStateChangeReturn old_ret, ret;
+ GstStateChange transition;
+ gboolean busy, post;
+
+ GST_DEBUG_OBJECT (bin, "waiting for state lock");
GST_STATE_LOCK (bin);
- gst_bin_recalc_state (bin, FALSE);
+
+ GST_DEBUG_OBJECT (bin, "doing state continue");
+ GST_OBJECT_LOCK (bin);
+
+ old_ret = GST_STATE_RETURN (bin);
+ GST_STATE_RETURN (bin) = GST_STATE_CHANGE_SUCCESS;
+ busy = (old_ret == GST_STATE_CHANGE_ASYNC);
+ target = GST_STATE_TARGET (bin);
+ pending = GST_STATE_PENDING (bin);
+
+ /* check if there is something to commit */
+ if (pending == GST_STATE_VOID_PENDING)
+ goto nothing_pending;
+
+ old_state = GST_STATE (bin);
+ /* this is the state we should go to next */
+ old_next = GST_STATE_NEXT (bin);
+
+ GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "busy %d, target %s",
+ busy, gst_element_state_get_name (target));
+
+ if (old_next == GST_STATE_PLAYING) {
+ post = FALSE;
+ } else {
+ post = TRUE;
+ }
+
+ /* we're going to PLAYING, always do the PAUSED->PLAYING state change */
+ if (target == GST_STATE_PLAYING) {
+ next = GST_STATE_PAUSED;
+ GST_STATE_PENDING (bin) = pending = target;
+ } else {
+ next = old_next;
+ }
+
+ /* update current state */
+ current = GST_STATE (bin) = next;
+
+ /* see if we reached the final state */
+ if (pending == current)
+ goto complete;
+
+ next = GST_STATE_GET_NEXT (current, pending);
+ transition = (GstStateChange) GST_STATE_TRANSITION (current, next);
+
+ GST_STATE_NEXT (bin) = next;
+ /* mark busy */
+ GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC;
+ GST_OBJECT_UNLOCK (bin);
+
+ if (post) {
+ GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin,
+ "committing state from %s to %s, pending %s",
+ gst_element_state_get_name (old_state),
+ gst_element_state_get_name (old_next),
+ gst_element_state_get_name (pending));
+
+ gst_element_post_message (GST_ELEMENT_CAST (bin),
+ gst_message_new_state_changed (GST_OBJECT_CAST (bin),
+ old_state, old_next, pending));
+ }
+
+ if (busy) {
+ GST_DEBUG_OBJECT (bin, "posting ASYNC_DONE");
+ gst_element_post_message (GST_ELEMENT_CAST (bin),
+ gst_message_new_async_done (GST_OBJECT_CAST (bin)));
+ }
+
+ GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin,
+ "continue state change %s to %s, final %s",
+ gst_element_state_get_name (current),
+ gst_element_state_get_name (next), gst_element_state_get_name (pending));
+
+ ret = gst_element_change_state (GST_ELEMENT_CAST (bin), transition);
+
+done:
GST_STATE_UNLOCK (bin);
- GST_DEBUG_OBJECT (bin, "state recalc done");
+ GST_DEBUG_OBJECT (bin, "state continue done");
gst_object_unref (bin);
+
+ return;
+
+nothing_pending:
+ {
+ GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "nothing pending");
+ GST_OBJECT_UNLOCK (bin);
+ goto done;
+ }
+complete:
+ {
+ GST_STATE_PENDING (bin) = GST_STATE_VOID_PENDING;
+ GST_STATE_NEXT (bin) = GST_STATE_VOID_PENDING;
+
+ GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "completed state change to %s",
+ gst_element_state_get_name (pending));
+ GST_OBJECT_UNLOCK (bin);
+
+ /* don't post silly messages with the same state. This can happen
+ * when an element state is changed to what it already was. For bins
+ * this can be the result of a lost state, which we check with the
+ * previous return value.
+ * We do signal the cond though as a _get_state() might be blocking
+ * on it. */
+ if (old_state != old_next || old_ret == GST_STATE_CHANGE_ASYNC) {
+ gst_element_post_message (GST_ELEMENT_CAST (bin),
+ gst_message_new_state_changed (GST_OBJECT_CAST (bin),
+ old_state, old_next, GST_STATE_VOID_PENDING));
+ }
+
+ GST_DEBUG_OBJECT (bin, "posting ASYNC_DONE");
+ gst_element_post_message (GST_ELEMENT_CAST (bin),
+ gst_message_new_async_done (GST_OBJECT_CAST (bin)));
+
+ GST_STATE_BROADCAST (bin);
+
+ goto done;
+ }
}
static GstBusSyncReply
bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin)
{
-
GstBinClass *bclass;
bclass = GST_BIN_GET_CLASS (bin);
return GST_BUS_DROP;
}
+static void
+bin_push_state_continue (GstBin * bin)
+{
+ GstBinClass *klass;
+
+ /* mark the bin dirty */
+ GST_OBJECT_LOCK (bin);
+ klass = GST_BIN_GET_CLASS (bin);
+ GST_DEBUG_OBJECT (bin, "pushing continue on thread pool");
+ gst_object_ref (bin);
+ g_thread_pool_push (klass->pool, bin, NULL);
+ GST_OBJECT_UNLOCK (bin);
+}
+
+/* an element started an async state change, if we were not busy with a state
+ * change, we perform a lost state.
+ */
+static void
+bin_handle_async_start (GstBin * bin, GstMessage ** message)
+{
+ GstState old_state, new_state;
+
+ if (GST_STATE_RETURN (bin) == GST_STATE_CHANGE_FAILURE)
+ goto had_error;
+
+ if (GST_STATE_PENDING (bin) != GST_STATE_VOID_PENDING)
+ goto was_busy;
+
+ old_state = GST_STATE (bin);
+
+ if (old_state > GST_STATE_PAUSED)
+ new_state = GST_STATE_PAUSED;
+ else
+ new_state = old_state;
+
+ GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin,
+ "lost state of %s, new %s", gst_element_state_get_name (old_state),
+ gst_element_state_get_name (new_state));
+
+ GST_STATE (bin) = new_state;
+ GST_STATE_NEXT (bin) = new_state;
+ GST_STATE_PENDING (bin) = new_state;
+ GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC;
+
+ *message = gst_message_new_state_changed (GST_OBJECT_CAST (bin),
+ new_state, new_state, new_state);
+
+ return;
+
+had_error:
+ {
+ GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, "we had an error");
+ return;
+ }
+was_busy:
+ {
+ GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, "state change busy");
+ return;
+ }
+}
+
+static void
+bin_handle_async_done (GstBin * bin, GstMessage ** message)
+{
+ GstState pending;
+ GstStateChangeReturn old_ret;
+ GstState old_state, old_next;
+ GstState current, next;
+
+ old_ret = GST_STATE_RETURN (bin);
+ GST_STATE_RETURN (bin) = GST_STATE_CHANGE_SUCCESS;
+ pending = GST_STATE_PENDING (bin);
+
+ /* check if there is something to commit */
+ if (pending == GST_STATE_VOID_PENDING)
+ goto nothing_pending;
+
+ old_state = GST_STATE (bin);
+ /* this is the state we should go to next */
+ old_next = GST_STATE_NEXT (bin);
+ /* update current state */
+ current = GST_STATE (bin) = old_next;
+
+ /* see if we reached the final state */
+ if (pending == current)
+ goto complete;
+
+ next = GST_STATE_GET_NEXT (current, pending);
+
+ GST_STATE_NEXT (bin) = next;
+ /* mark busy */
+ GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC;
+
+ GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin,
+ "committing state from %s to %s, pending %s",
+ gst_element_state_get_name (old_state),
+ gst_element_state_get_name (old_next),
+ gst_element_state_get_name (pending));
+
+ *message = gst_message_new_state_changed (GST_OBJECT_CAST (bin),
+ old_state, old_next, pending);
+
+ return;
+
+nothing_pending:
+ {
+ GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "nothing pending");
+ return;
+ }
+complete:
+ {
+ GST_STATE_PENDING (bin) = GST_STATE_VOID_PENDING;
+ GST_STATE_NEXT (bin) = GST_STATE_VOID_PENDING;
+
+ GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "completed state change");
+
+ /* don't post silly messages with the same state. This can happen
+ * when an element state is changed to what it already was. For bins
+ * this can be the result of a lost state, which we check with the
+ * previous return value.
+ * We do signal the cond though as a _get_state() might be blocking
+ * on it. */
+ if (old_state != old_next || old_ret == GST_STATE_CHANGE_ASYNC) {
+ *message = gst_message_new_state_changed (GST_OBJECT_CAST (bin),
+ old_state, old_next, GST_STATE_VOID_PENDING);
+ }
+
+ GST_STATE_BROADCAST (bin);
+
+ return;
+ }
+}
+
/* handle child messages:
*
* This method is called synchronously when a child posts a message on
* This message is never sent to the application but is forwarded to
* the parent.
*
+ * GST_MESSAGE_ASYNC_START: Create an internal ELEMENT message that stores
+ * the state of the element and the fact that the element will need a
+ * new base_time.
+ *
+ * GST_MESSAGE_ASYNC_DONE: Find the internal ELEMENT message we kept for the
+ * element when it posted ASYNC_START. If all elements are done, post a
+ * ASYNC_DONE message to the parent.
+ *
* OTHER: post upwards.
*/
static void
gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
{
- GST_DEBUG_OBJECT (bin, "[msg %p] handling child message of type %s",
- message, GST_MESSAGE_TYPE_NAME (message));
+ GstObject *src;
+ GstMessageType type;
+
+ src = GST_MESSAGE_SRC (message);
+ type = GST_MESSAGE_TYPE (message);
+
+ GST_DEBUG_OBJECT (bin, "[msg %p] handling child %s message of type %s",
+ message, GST_ELEMENT_NAME (src), GST_MESSAGE_TYPE_NAME (message));
- switch (GST_MESSAGE_TYPE (message)) {
+ switch (type) {
case GST_MESSAGE_EOS:
{
gboolean eos;
}
case GST_MESSAGE_STATE_DIRTY:
{
- GstObject *src;
- GstBinClass *klass;
-
- src = GST_MESSAGE_SRC (message);
-
- GST_DEBUG_OBJECT (bin, "%s gave state dirty", GST_ELEMENT_NAME (src));
-
- /* mark the bin dirty */
- GST_OBJECT_LOCK (bin);
- GST_DEBUG_OBJECT (bin, "marking dirty");
- bin->state_dirty = TRUE;
-
- if (GST_OBJECT_PARENT (bin))
- goto not_toplevel;
+ GST_WARNING_OBJECT (bin, "received deprecated STATE_DIRTY message");
/* free message */
gst_message_unref (message);
-
- klass = GST_BIN_GET_CLASS (bin);
- if (!bin->polling) {
- GST_DEBUG_OBJECT (bin, "pushing recalc on thread pool");
- gst_object_ref (bin);
- g_thread_pool_push (klass->pool, bin, NULL);
- } else {
- GST_DEBUG_OBJECT (bin,
- "state recalc already in progress, not pushing new recalc");
- }
- GST_OBJECT_UNLOCK (bin);
break;
-
- /* non toplevel bins just forward the message and don't start
- * a recalc themselves */
- not_toplevel:
- {
- GST_OBJECT_UNLOCK (bin);
- GST_DEBUG_OBJECT (bin, "not toplevel, forwarding");
- goto forward;
- }
}
case GST_MESSAGE_SEGMENT_START:
GST_OBJECT_LOCK (bin);
gst_message_unref (message);
break;
}
+ case GST_MESSAGE_ASYNC_START:
+ {
+ gboolean forward = FALSE, new_base_time;
+ GstState target;
+ GstMessage *smessage = NULL;
+
+ GST_DEBUG_OBJECT (bin, "ASYNC_START message %p, %s", message,
+ GST_OBJECT_NAME (src));
+
+ gst_message_parse_async_start (message, &new_base_time);
+
+ GST_OBJECT_LOCK (bin);
+ /* we ignore the message if we are going to <= READY */
+ target = GST_STATE_TARGET (bin);
+ if (target <= GST_STATE_READY)
+ goto ignore_start_message;
+
+ bin_replace_message (bin, message, GST_MESSAGE_ASYNC_START);
+
+ bin_handle_async_start (bin, &smessage);
+
+ /* prepare an ASYNC_START message */
+ if (GST_OBJECT_PARENT (bin)) {
+ forward = TRUE;
+ message =
+ gst_message_new_async_start (GST_OBJECT_CAST (bin), new_base_time);
+ } else {
+ message = NULL;
+ }
+ GST_OBJECT_UNLOCK (bin);
+
+ if (smessage)
+ gst_element_post_message (GST_ELEMENT_CAST (bin), smessage);
+
+ if (forward)
+ goto forward;
+ else
+ break;
+
+ ignore_start_message:
+ {
+ GST_DEBUG_OBJECT (bin, "ignoring message, target %s",
+ gst_element_state_get_name (target));
+ GST_OBJECT_UNLOCK (bin);
+ gst_message_unref (message);
+ break;
+ }
+ }
+ case GST_MESSAGE_ASYNC_DONE:
+ {
+ gboolean done = FALSE, toplevel = FALSE;
+ GstState target;
+ MessageFind find;
+ GstMessage *smessage = NULL;
+
+ GST_DEBUG_OBJECT (bin, "ASYNC_DONE message %p, %s", message,
+ GST_OBJECT_NAME (src));
+
+ GST_OBJECT_LOCK (bin);
+ target = GST_STATE_TARGET (bin);
+ /* ignore messages if we are shutting down */
+ if (target <= GST_STATE_READY)
+ goto ignore_done_message;
+
+ bin_replace_message (bin, message, GST_MESSAGE_ASYNC_START);
+ /* if there are no more ASYNC_START messages, everybody posted
+ * a ASYNC_DONE and we can post one on the bus. */
+
+ /* we don't care who still has a pending ASYNC_START */
+ find.src = NULL;
+ find.types = GST_MESSAGE_ASYNC_START;
+
+ if (!g_list_find_custom (bin->messages, &find,
+ (GCompareFunc) message_check)) {
+ /* nothing found, remove all old ASYNC_DONE messages */
+ bin_remove_messages (bin, NULL, GST_MESSAGE_ASYNC_DONE);
+ done = TRUE;
+ toplevel = GST_OBJECT_PARENT (bin) == NULL;
+ if (!toplevel)
+ bin_handle_async_done (bin, &smessage);
+ }
+ GST_OBJECT_UNLOCK (bin);
+
+ if (smessage) {
+ GST_DEBUG_OBJECT (bin, "posting state change message");
+ gst_element_post_message (GST_ELEMENT_CAST (bin), smessage);
+ }
+ if (done) {
+ if (!toplevel) {
+ GST_DEBUG_OBJECT (bin,
+ "all async-done, posting ASYNC_DONE to parent");
+ /* post our combined ASYNC_DONE when all is ASYNC_DONE. */
+ gst_element_post_message (GST_ELEMENT_CAST (bin),
+ gst_message_new_async_done (GST_OBJECT_CAST (bin)));
+ } else {
+ /* toplevel, start continue state */
+ GST_DEBUG_OBJECT (bin, "all async-done, starting state continue");
+ bin_push_state_continue (bin);
+ }
+ }
+ break;
+
+ ignore_done_message:
+ {
+ GST_DEBUG_OBJECT (bin, "ignoring message, target %s",
+ gst_element_state_get_name (target));
+ GST_OBJECT_UNLOCK (bin);
+ gst_message_unref (message);
+ break;
+ }
+ }
default:
goto forward;
}
#include <gobject/gvaluecollector.h>
#include "gstelement.h"
+#include "gstenumtypes.h"
#include "gstbus.h"
#include "gstmarshal.h"
#include "gsterror.h"
static void gst_element_dispose (GObject * object);
static void gst_element_finalize (GObject * object);
-static GstStateChangeReturn gst_element_change_state (GstElement * element,
- GstStateChange transition);
static GstStateChangeReturn gst_element_change_state_func (GstElement * element,
GstStateChange transition);
static GstStateChangeReturn gst_element_get_state_func (GstElement * element,
gst_element_init (GstElement * element)
{
GST_STATE (element) = GST_STATE_NULL;
+ GST_STATE_TARGET (element) = GST_STATE_NULL;
GST_STATE_NEXT (element) = GST_STATE_VOID_PENDING;
GST_STATE_PENDING (element) = GST_STATE_VOID_PENDING;
GST_STATE_RETURN (element) = GST_STATE_CHANGE_SUCCESS;
oclass = GST_ELEMENT_GET_CLASS (element);
+ GST_STATE_LOCK (element);
if (oclass->send_event) {
GST_CAT_DEBUG (GST_CAT_ELEMENT_PADS, "send %s event on element %s",
GST_EVENT_TYPE_NAME (event), GST_ELEMENT_NAME (element));
} else {
result = gst_element_default_send_event (element, event);
}
+ GST_STATE_UNLOCK (element);
+
return result;
}
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, "RETURN is %s",
gst_element_state_change_return_get_name (ret));
+ GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, "RETURN is %s",
+ gst_element_state_change_return_get_name (ret));
+
/* we got an error, report immediatly */
if (ret == GST_STATE_CHANGE_FAILURE)
goto done;
GstStateChangeReturn
gst_element_continue_state (GstElement * element, GstStateChangeReturn ret)
{
- GstState pending;
- GstState old_ret, old_state, old_next;
- GstState current, next;
+ GstStateChangeReturn old_ret;
+ GstState old_state, old_next;
+ GstState current, next, pending;
GstMessage *message;
GstStateChange transition;
GST_OBJECT_LOCK (element);
- old_ret = (GstState) GST_STATE_RETURN (element);
+ old_ret = GST_STATE_RETURN (element);
GST_STATE_RETURN (element) = ret;
pending = GST_STATE_PENDING (element);
GST_STATE_PENDING (element) = GST_STATE_VOID_PENDING;
GST_STATE_NEXT (element) = GST_STATE_VOID_PENDING;
- GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "completed state change");
+ GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
+ "completed state change to %s", gst_element_state_get_name (pending));
GST_OBJECT_UNLOCK (element);
/* don't post silly messages with the same state. This can happen
* element is copied to the pending state so that any call to
* gst_element_get_state() will return %GST_STATE_CHANGE_ASYNC.
*
+ * An ASYNC_START message is posted with an indication to distribute a new
+ * base_time to the element.
+ * If the element was PLAYING, it will go to PAUSED. The element
+ * will be restored to its PLAYING state by the parent pipeline when it
+ * prerolls again.
+ *
* This is mostly used for elements that lost their preroll buffer
- * in the %GST_STATE_PAUSED state after a flush, they become %GST_STATE_PAUSED
- * again if a new preroll buffer is queued.
- * This function can only be called when the element is currently
+ * in the %GST_STATE_PAUSED or %GST_STATE_PLAYING state after a flush,
+ * they will go to their pending state again when a new preroll buffer is
+ * queued. This function can only be called when the element is currently
* not in error or an async state change.
*
* This function is used internally and should normally not be called from
void
gst_element_lost_state (GstElement * element)
{
- GstState current_state;
+ GstState old_state, new_state;
GstMessage *message;
g_return_if_fail (GST_IS_ELEMENT (element));
GST_STATE_RETURN (element) == GST_STATE_CHANGE_FAILURE)
goto nothing_lost;
- current_state = GST_STATE (element);
+ old_state = GST_STATE (element);
+
+ /* when we were PLAYING, the new state is PAUSED. We will also not
+ * automatically go to PLAYING but let the parent bin(s) set us to PLAYING
+ * when we preroll. */
+ if (old_state > GST_STATE_PAUSED)
+ new_state = GST_STATE_PAUSED;
+ else
+ new_state = old_state;
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
- "lost state of %s", gst_element_state_get_name (current_state));
+ "lost state of %s to %s", gst_element_state_get_name (old_state),
+ gst_element_state_get_name (new_state));
- GST_STATE_NEXT (element) = current_state;
- GST_STATE_PENDING (element) = current_state;
+ GST_STATE (element) = new_state;
+ GST_STATE_NEXT (element) = new_state;
+ GST_STATE_PENDING (element) = new_state;
GST_STATE_RETURN (element) = GST_STATE_CHANGE_ASYNC;
GST_OBJECT_UNLOCK (element);
message = gst_message_new_state_changed (GST_OBJECT_CAST (element),
- current_state, current_state, current_state);
+ new_state, new_state, new_state);
gst_element_post_message (element, message);
- /* and mark us dirty */
- message = gst_message_new_state_dirty (GST_OBJECT_CAST (element));
+ message = gst_message_new_async_start (GST_OBJECT_CAST (element), TRUE);
gst_element_post_message (element, message);
return;
/* increment state cookie so that we can track each state change */
element->state_cookie++;
- /* this is the (new) state we should go to */
+ /* this is the (new) state we should go to. TARGET is the last state we set on
+ * the element. */
+ GST_STATE_TARGET (element) = state;
GST_STATE_PENDING (element) = state;
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
}
}
-/* with STATE_LOCK */
-static GstStateChangeReturn
+/**
+ * gst_element_change_state:
+ * @element: a #GstElement
+ * @transition: the requested transition
+ *
+ * Perform @transition on @element.
+ *
+ * This function must be called with STATE_LOCK held and is mainly used
+ * internally.
+ *
+ * Returns: the #GstStateChangeReturn of the state transition.
+ */
+GstStateChangeReturn
gst_element_change_state (GstElement * element, GstStateChange transition)
{
GstElementClass *oclass;
gst_element_abort_state (element);
break;
case GST_STATE_CHANGE_ASYNC:
+ {
+ GstState target;
+
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
"element will change state ASYNC");
- /* if we go upwards, we give the app a change to wait for
- * completion */
- if (current < next)
+ target = GST_STATE_TARGET (element);
+
+ if (target > GST_STATE_READY)
goto async;
/* else we just continue the state change downwards */
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
- "forcing commit state %s < %s",
- gst_element_state_get_name (current),
- gst_element_state_get_name (next));
+ "forcing commit state %s <= %s",
+ gst_element_state_get_name (target),
+ gst_element_state_get_name (GST_STATE_READY));
ret = gst_element_continue_state (element, GST_STATE_CHANGE_SUCCESS);
break;
+ }
case GST_STATE_CHANGE_SUCCESS:
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
"element changed state SUCCESS");
#define GST_STATE_PENDING(elem) (GST_ELEMENT_CAST(elem)->pending_state)
/**
+ * GST_STATE_TARGET:
+ * @elem: a #GstElement to return the target state for.
+ *
+ * This macro returns the target #GstState of the element.
+ *
+ * Since: 0.10.13
+ */
+#define GST_STATE_TARGET(elem) (GST_ELEMENT_CAST(elem)->abidata.ABI.target_state)
+
+/**
* GST_STATE_RETURN:
* @elem: a #GstElement to return the last state result for.
*
guint32 pads_cookie;
/*< private >*/
- gpointer _gst_reserved[GST_PADDING];
+ union {
+ struct {
+ /* state set by application */
+ GstState target_state;
+ } ABI;
+ /* adding + 0 to mark ABI change to be undone later */
+ gpointer _gst_reserved[GST_PADDING + 0];
+ } abidata;
};
/**
GstStateChangeReturn gst_element_set_state (GstElement *element, GstState state);
void gst_element_abort_state (GstElement * element);
+GstStateChangeReturn gst_element_change_state (GstElement * element,
+ GstStateChange transition);
GstStateChangeReturn gst_element_continue_state (GstElement * element,
GstStateChangeReturn ret);
void gst_element_lost_state (GstElement * element);
{
/* with LOCK */
gboolean auto_flush_bus;
+
+ GstClockTime new_stream_time;
};
static void gst_pipeline_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
-static gboolean gst_pipeline_send_event (GstElement * element,
- GstEvent * event);
-
static GstClock *gst_pipeline_provide_clock_func (GstElement * element);
static GstStateChangeReturn gst_pipeline_change_state (GstElement * element,
GstStateChange transition);
+static void gst_pipeline_handle_message (GstBin * bin, GstMessage * message);
+
static GstBinClass *parent_class = NULL;
/* static guint gst_pipeline_signals[LAST_SIGNAL] = { 0 }; */
{
GObjectClass *gobject_class = G_OBJECT_CLASS (g_class);
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
+ GstBinClass *gstbin_class = GST_BIN_CLASS (g_class);
GstPipelineClass *klass = GST_PIPELINE_CLASS (g_class);
parent_class = g_type_class_peek_parent (klass);
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_pipeline_dispose);
- gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_pipeline_send_event);
gstelement_class->change_state =
GST_DEBUG_FUNCPTR (gst_pipeline_change_state);
gstelement_class->provide_clock =
GST_DEBUG_FUNCPTR (gst_pipeline_provide_clock_func);
+
+ gstbin_class->handle_message =
+ GST_DEBUG_FUNCPTR (gst_pipeline_handle_message);
}
static void
}
}
-/* default pipeline seeking code:
- *
- * If the pipeline is PLAYING and a flushing seek is done, set
- * the pipeline to PAUSED before doing the seek.
- *
- * A flushing seek also resets the stream time to 0 so that when
- * we go back to PLAYING after the seek, the base_time is recalculated
- * and redistributed to the elements.
- */
-static gboolean
-do_pipeline_seek (GstElement * element, GstEvent * event)
-{
- gdouble rate;
- GstSeekFlags flags;
- gboolean flush;
- gboolean was_playing = FALSE;
- gboolean res;
-
- /* we are only interested in the FLUSH flag of the seek event. */
- gst_event_parse_seek (event, &rate, NULL, &flags, NULL, NULL, NULL, NULL);
-
- flush = flags & GST_SEEK_FLAG_FLUSH;
-
- /* if flushing seek, get the current state */
- if (flush) {
- GstState state;
-
- /* need to call _get_state() since a bin state is only updated
- * with this call. */
- gst_element_get_state (element, &state, NULL, 0);
- was_playing = (state == GST_STATE_PLAYING);
-
- if (was_playing) {
- /* and PAUSE when the pipeline was PLAYING, we don't need
- * to wait for the state change to complete since we are going
- * to flush out any preroll sample anyway. */
- gst_element_set_state (element, GST_STATE_PAUSED);
- }
- }
-
- /* let parent class implement the seek behaviour */
- res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
-
- /* if flushing seek restore previous state */
- if (flush) {
- gboolean need_reset;
-
- GST_OBJECT_LOCK (element);
- need_reset = GST_PIPELINE (element)->stream_time != GST_CLOCK_TIME_NONE;
- GST_OBJECT_UNLOCK (element);
-
- /* need to reset the stream time to 0 after a successfull flushing seek,
- * unless the user explicitly disabled this behavior by setting stream
- * time to NONE */
- if (need_reset && res)
- gst_pipeline_set_new_stream_time (GST_PIPELINE (element), 0);
-
- if (was_playing)
- /* and continue playing, this might return ASYNC in which case the
- * application can wait for the PREROLL to complete after the seek.
- */
- gst_element_set_state (element, GST_STATE_PLAYING);
- }
- return res;
-}
-
-static gboolean
-gst_pipeline_send_event (GstElement * element, GstEvent * event)
+/* set the stream time to 0 */
+static void
+reset_stream_time (GstPipeline * pipeline)
{
- gboolean res;
- GstEventType event_type = GST_EVENT_TYPE (event);
-
- switch (event_type) {
- case GST_EVENT_SEEK:
- /* do the default seek handling */
- res = do_pipeline_seek (element, event);
- break;
- default:
- /* else parent implements the defaults */
- res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
- break;
+ GST_OBJECT_LOCK (pipeline);
+ if (pipeline->stream_time != GST_CLOCK_TIME_NONE) {
+ GST_DEBUG_OBJECT (pipeline, "reset stream_time to 0");
+ pipeline->stream_time = 0;
+ pipeline->priv->new_stream_time = TRUE;
+ } else {
+ GST_DEBUG_OBJECT (pipeline, "application asked to not reset stream_time");
}
-
- return res;
+ GST_OBJECT_UNLOCK (pipeline);
}
/**
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
{
GstClockTime new_base_time;
+ GstQuery *query;
+ GstClockTime min_latency, max_latency;
GstClockTime start_time, stream_time, delay;
- gboolean new_clock;
+ gboolean new_clock, update;
+ gboolean res;
GST_DEBUG_OBJECT (element, "selecting clock and base_time");
GST_OBJECT_LOCK (element);
new_clock = element->clock != clock;
stream_time = pipeline->stream_time;
+ update = pipeline->priv->new_stream_time;
+ pipeline->priv->new_stream_time = FALSE;
delay = pipeline->delay;
GST_OBJECT_UNLOCK (element);
gst_message_new_new_clock (GST_OBJECT_CAST (element), clock));
}
- if (stream_time != GST_CLOCK_TIME_NONE
- && start_time != GST_CLOCK_TIME_NONE) {
- new_base_time = start_time - stream_time + delay;
- GST_DEBUG_OBJECT (element,
- "stream_time=%" GST_TIME_FORMAT ", now=%" GST_TIME_FORMAT
- ", base_time %" GST_TIME_FORMAT,
- GST_TIME_ARGS (stream_time), GST_TIME_ARGS (start_time),
- GST_TIME_ARGS (new_base_time));
- } else
- new_base_time = GST_CLOCK_TIME_NONE;
-
if (clock)
gst_object_unref (clock);
- if (new_base_time != GST_CLOCK_TIME_NONE)
- gst_element_set_base_time (element, new_base_time);
- else
+ /* stream time changed, either with a PAUSED or a flush, we need to update
+ * the base time */
+ if (update) {
+ GST_DEBUG_OBJECT (pipeline, "stream_time changed, updating base time");
+
+ if (stream_time != GST_CLOCK_TIME_NONE
+ && start_time != GST_CLOCK_TIME_NONE) {
+ new_base_time = start_time - stream_time + delay;
+ GST_DEBUG_OBJECT (element,
+ "stream_time=%" GST_TIME_FORMAT ", now=%" GST_TIME_FORMAT
+ ", base_time %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (stream_time), GST_TIME_ARGS (start_time),
+ GST_TIME_ARGS (new_base_time));
+ } else
+ new_base_time = GST_CLOCK_TIME_NONE;
+
+ if (new_base_time != GST_CLOCK_TIME_NONE)
+ gst_element_set_base_time (element, new_base_time);
+ else
+ GST_DEBUG_OBJECT (pipeline,
+ "NOT adjusting base_time because stream_time is NONE");
+ } else {
GST_DEBUG_OBJECT (pipeline,
- "NOT adjusting base time because stream time is NONE");
+ "NOT adjusting base_time because we selected one before");
+ }
+
+ /* determine latency in this pipeline */
+ GST_DEBUG_OBJECT (element, "querying pipeline latency");
+ query = gst_query_new_latency ();
+ if (gst_element_query (element, query)) {
+ gboolean live;
+
+ gst_query_parse_latency (query, &live, &min_latency, &max_latency);
+
+ GST_DEBUG_OBJECT (element,
+ "configuring min latency %" GST_TIME_FORMAT ", max latency %"
+ GST_TIME_FORMAT ", live %d", GST_TIME_ARGS (min_latency),
+ GST_TIME_ARGS (max_latency), live);
+
+ /* configure latency on elements */
+ res =
+ gst_element_send_event (element,
+ gst_event_new_latency (min_latency));
+ if (res) {
+ GST_INFO_OBJECT (element, "configured latency of %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (min_latency));
+ } else {
+ GST_WARNING_OBJECT (element,
+ "failed to configure latency of %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (min_latency));
+ GST_ELEMENT_WARNING (element, CORE, CLOCK, (NULL),
+ ("Failed to configure latency of %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (min_latency)));
+ }
+ } else {
+ /* this is not a real problem, we just don't configure any latency. */
+ GST_WARNING_OBJECT (element, "failed to query pipeline latency");
+ }
+ gst_query_unref (query);
break;
}
case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
break;
case GST_STATE_CHANGE_READY_TO_PAUSED:
{
- gboolean need_reset;
-
- /* only reset the stream_time when the application did not
- * specify a stream_time explicitly */
- GST_OBJECT_LOCK (element);
- need_reset = pipeline->stream_time != GST_CLOCK_TIME_NONE;
- GST_OBJECT_UNLOCK (element);
-
- if (need_reset)
- gst_pipeline_set_new_stream_time (pipeline, 0);
-
+ reset_stream_time (pipeline);
break;
}
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
GST_OBJECT_LOCK (element);
/* store the current stream time */
- if (pipeline->stream_time != GST_CLOCK_TIME_NONE)
+ if (pipeline->stream_time != GST_CLOCK_TIME_NONE) {
pipeline->stream_time = now - element->base_time;
+ pipeline->priv->new_stream_time = TRUE;
+ }
+
GST_DEBUG_OBJECT (element,
"stream_time=%" GST_TIME_FORMAT ", now=%" GST_TIME_FORMAT
", base_time %" GST_TIME_FORMAT,
}
}
+static void
+gst_pipeline_handle_message (GstBin * bin, GstMessage * message)
+{
+ GstPipeline *pipeline = GST_PIPELINE_CAST (bin);
+
+ switch (GST_MESSAGE_TYPE (message)) {
+ case GST_MESSAGE_ASYNC_START:
+ {
+ gboolean new_base_time;
+
+ gst_message_parse_async_start (message, &new_base_time);
+
+ /* reset our stream time if we need to distribute a new base_time to the
+ * children. */
+ if (new_base_time)
+ reset_stream_time (pipeline);
+
+ break;
+ }
+ default:
+ break;
+ }
+ GST_BIN_CLASS (parent_class)->handle_message (bin, message);
+}
+
+
/**
* gst_pipeline_get_bus:
* @pipeline: a #GstPipeline
GST_OBJECT_LOCK (pipeline);
pipeline->stream_time = time;
+ pipeline->priv->new_stream_time = TRUE;
GST_OBJECT_UNLOCK (pipeline);
GST_DEBUG_OBJECT (pipeline, "set new stream_time to %" GST_TIME_FORMAT,
/* latency stuff */
GstClockTime latency;
+
+ gboolean commited;
};
#define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
{
/* commit state and proceed to next pending state */
GstState current, next, pending, post_pending;
- GstMessage *message;
gboolean post_paused = FALSE;
+ gboolean post_async_done = FALSE;
gboolean post_playing = FALSE;
+ gboolean sync;
/* we are certainly not playing async anymore now */
basesink->playing_async = FALSE;
next = GST_STATE_NEXT (basesink);
pending = GST_STATE_PENDING (basesink);
post_pending = pending;
+ sync = basesink->sync;
switch (pending) {
case GST_STATE_PLAYING:
GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
basesink->need_preroll = FALSE;
+ post_async_done = TRUE;
+ basesink->priv->commited = TRUE;
post_playing = TRUE;
/* post PAUSED too when we were READY */
if (current == GST_STATE_READY) {
case GST_STATE_PAUSED:
GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
post_paused = TRUE;
+ post_async_done = TRUE;
+ basesink->priv->commited = TRUE;
post_pending = GST_STATE_VOID_PENDING;
break;
case GST_STATE_READY:
GST_OBJECT_UNLOCK (basesink);
if (post_paused) {
- message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
- current, next, post_pending);
- gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
+ gst_element_post_message (GST_ELEMENT_CAST (basesink),
+ gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
+ current, next, post_pending));
}
- if (post_playing) {
- message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
- next, pending, GST_STATE_VOID_PENDING);
- gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
+ if (post_async_done) {
+ gst_element_post_message (GST_ELEMENT_CAST (basesink),
+ gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
}
- /* and mark dirty */
- if (post_paused || post_playing) {
+ if (post_playing) {
gst_element_post_message (GST_ELEMENT_CAST (basesink),
- gst_message_new_state_dirty (GST_OBJECT_CAST (basesink)));
+ gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
+ next, pending, GST_STATE_VOID_PENDING));
}
GST_STATE_BROADCAST (basesink);
gboolean live, us_live;
GstClockTime min, max;
+ GST_PAD_PREROLL_LOCK (basesink->sinkpad);
+ if (!gst_base_sink_is_prerolled (basesink)) {
+ GST_DEBUG_OBJECT (basesink, "not prerolled yet, can't report latency");
+ res = FALSE;
+ goto done;
+ }
+
if ((res =
gst_base_sink_query_latency (basesink, &live, &us_live, &min,
&max))) {
}
gst_query_set_latency (query, live, min, max);
}
+ done:
+ GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
break;
}
case GST_QUERY_JITTER:
case GST_STATE_CHANGE_READY_TO_PAUSED:
/* need to complete preroll before this state change completes, there
* is no data flow in READY so we can safely assume we need to preroll. */
+ GST_PAD_PREROLL_LOCK (basesink->sinkpad);
GST_DEBUG_OBJECT (basesink, "READY to PAUSED, need preroll");
gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
gst_segment_init (basesink->abidata.ABI.clip_segment,
basesink->priv->latency = 0;
basesink->eos = FALSE;
gst_base_sink_reset_qos (basesink);
+ basesink->priv->commited = FALSE;
ret = GST_STATE_CHANGE_ASYNC;
+ gst_element_post_message (GST_ELEMENT_CAST (basesink),
+ gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
+ GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
GST_PAD_PREROLL_LOCK (basesink->sinkpad);
GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, need preroll");
basesink->need_preroll = TRUE;
basesink->playing_async = TRUE;
+ basesink->priv->commited = FALSE;
ret = GST_STATE_CHANGE_ASYNC;
+ gst_element_post_message (GST_ELEMENT_CAST (basesink),
+ gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
}
GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
break;
} else {
GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, need preroll");
basesink->playing_async = TRUE;
+ basesink->priv->commited = FALSE;
ret = GST_STATE_CHANGE_ASYNC;
+ gst_element_post_message (GST_ELEMENT_CAST (basesink),
+ gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
}
GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
", dropped: %" G_GUINT64_FORMAT, basesink->priv->rendered,
GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
+ GST_PAD_PREROLL_LOCK (basesink->sinkpad);
+ if (!basesink->priv->commited) {
+ GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
+
+ gst_element_post_message (GST_ELEMENT_CAST (basesink),
+ gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
+ GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
+
+ gst_element_post_message (GST_ELEMENT_CAST (basesink),
+ gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
+
+ basesink->priv->commited = TRUE;
+ } else {
+ GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
+ }
basesink->priv->current_sstart = 0;
basesink->priv->current_sstop = 0;
+ GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
break;
case GST_STATE_CHANGE_READY_TO_NULL:
if (bclass->stop)
if (n_sink == 2) {
fail_unless (old == GST_STATE_READY, "unexpected old");
fail_unless (new == GST_STATE_PAUSED, "unexpected new");
- fail_unless (pending == GST_STATE_PLAYING, "unexpected pending");
+ fail_unless (pending == GST_STATE_VOID_PENDING, "unexpected pending");
} else if (n_sink == 1) {
fail_unless (old == GST_STATE_PAUSED, "unexpected old");
fail_unless (new == GST_STATE_PLAYING, "unexpected new");
#include <gst/check/gstcheck.h>
static void
+pop_async_done (GstBus * bus)
+{
+ GstMessage *message;
+
+ GST_DEBUG ("popping async-done message");
+ message = gst_bus_poll (bus, GST_MESSAGE_ASYNC_DONE, -1);
+
+ fail_unless (message && GST_MESSAGE_TYPE (message)
+ == GST_MESSAGE_ASYNC_DONE, "did not get GST_MESSAGE_ASYNC_DONE");
+
+ gst_message_unref (message);
+ GST_DEBUG ("popped message");
+}
+
+static void
pop_messages (GstBus * bus, int count)
{
GstMessage *message;
fail_unless (pending == GST_STATE_VOID_PENDING);
/* wait for async thread to settle down */
- while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 2)
+ while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 3)
THREAD_SWITCH ();
/* each object is referenced by a message;
ASSERT_OBJECT_REFCOUNT_BETWEEN (pipeline, "pipeline", 2, 3);
pop_messages (bus, 3);
+ pop_async_done (bus);
fail_if ((gst_bus_pop (bus)) != NULL);
ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
pop_messages (bus, 6);
+ pop_async_done (bus);
fail_unless (gst_bus_have_pending (bus) == FALSE,
"Unexpected messages on bus");
pop_messages (bus, 3);
/* this one might return either SUCCESS or ASYNC, likely SUCCESS */
- gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED);
+ ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED);
gst_element_get_state (GST_ELEMENT (bin), NULL, NULL, GST_CLOCK_TIME_NONE);
pop_messages (bus, 3);
+ if (ret == GST_STATE_CHANGE_ASYNC)
+ pop_async_done (bus);
fail_unless (gst_bus_have_pending (bus) == FALSE,
"Unexpected messages on bus");
src = gst_element_factory_make ("fakesrc", NULL);
fail_if (src == NULL, "Could not create fakesrc");
+ g_object_set (src, "num-buffers", 5, NULL);
identity = gst_element_factory_make ("identity", NULL);
fail_if (identity == NULL, "Could not create identity");
/* READY => PAUSED */
/* because of pre-rolling, sink will return ASYNC on state
* change and change state later when it has a buffer */
+ GST_DEBUG ("popping READY -> PAUSED messages");
ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_READY, GST_STATE_PAUSED,
105);
#if 0
* in which case the sink will commit its new state before the source ... */
ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_READY, GST_STATE_PAUSED, 106);
ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 107);
+#else
+
+ pop_messages (bus, 2); /* pop remaining ready => paused messages off the bus */
ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
108);
-
+ pop_async_done (bus);
+#endif
/* PAUSED => PLAYING */
+ GST_DEBUG ("popping PAUSED -> PLAYING messages");
ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_PAUSED, GST_STATE_PLAYING, 109);
ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_PAUSED, GST_STATE_PLAYING,
110);
ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_PAUSED, GST_STATE_PLAYING, 111);
ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_PAUSED, GST_STATE_PLAYING,
112);
-#else
- pop_messages (bus, 3); /* pop remaining ready => paused messages off the bus */
- pop_messages (bus, 4); /* pop paused => playing messages off the bus */
-#endif
/* don't set to NULL that will set the bus flushing and kill our messages */
ret = gst_element_set_state (pipeline, GST_STATE_READY);
/* READY => PAUSED */
/* because of pre-rolling, sink will return ASYNC on state
* change and change state later when it has a buffer */
+ GST_DEBUG ("popping READY -> PAUSED messages");
ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_READY, GST_STATE_PAUSED,
205);
#if 0
* in which case the sink will commit its new state before the source ... */
ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_READY, GST_STATE_PAUSED, 206);
ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 207);
+#else
+ pop_messages (bus, 2); /* pop remaining ready => paused messages off the bus */
ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
208);
+ pop_async_done (bus);
/* PAUSED => PLAYING */
+ GST_DEBUG ("popping PAUSED -> PLAYING messages");
ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_PAUSED, GST_STATE_PLAYING, 209);
ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_PAUSED, GST_STATE_PLAYING,
210);
ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_PAUSED, GST_STATE_PLAYING, 211);
ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_PAUSED, GST_STATE_PLAYING,
212);
-#else
- pop_messages (bus, 3); /* pop remaining ready => paused messages off the bus */
- pop_messages (bus, 4); /* pop paused => playing messages off the bus */
#endif
/* don't set to NULL that will set the bus flushing and kill our messages */
sink = gst_bin_get_by_name (GST_BIN (pipeline), "sink");
fail_if (sink == NULL);
- run_pipeline (pipeline, s, GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED,
- GST_MESSAGE_EOS);
+ run_pipeline (pipeline, s,
+ GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
+ GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_EOS);
while (GST_OBJECT_REFCOUNT_VALUE (src) > 1)
THREAD_SWITCH ();
ASSERT_OBJECT_REFCOUNT (src, "src", 1);
s = "fakesrc can-activate-push=false ! fakesink can-activate-pull=true";
run_pipeline (setup_pipeline (s), s,
- GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN);
+ GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
+ GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_UNKNOWN);
s = "fakesrc can-activate-push=true ! fakesink can-activate-pull=false";
run_pipeline (setup_pipeline (s), s,
- GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN);
+ GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
+ GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_UNKNOWN);
s = "fakesrc can-activate-push=false num-buffers=10 ! fakesink can-activate-pull=true";
run_pipeline (setup_pipeline (s), s,
- GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_EOS);
+ GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
+ GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_EOS);
s = "fakesrc can-activate-push=true num-buffers=10 ! fakesink can-activate-pull=false";
run_pipeline (setup_pipeline (s), s,
- GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_EOS);
+ GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
+ GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_EOS);
s = "fakesrc can-activate-push=false ! fakesink can-activate-pull=false";
ASSERT_CRITICAL (run_pipeline (setup_pipeline (s), s,
- GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED,
- GST_MESSAGE_UNKNOWN));
+ GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
+ GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_UNKNOWN));
}
GST_END_TEST;