docs/gst/gstreamer-sections.txt: Add new element field and method.
authorWim Taymans <wim.taymans@gmail.com>
Mon, 19 Mar 2007 10:47:56 +0000 (10:47 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Mon, 19 Mar 2007 10:47:56 +0000 (10:47 +0000)
Original commit message from CVS:
* docs/gst/gstreamer-sections.txt:
Add new element field and method.
* gst/gstbin.c: (gst_bin_class_init), (gst_bin_init),
(bin_remove_messages), (gst_bin_add_func), (gst_bin_remove_func),
(gst_bin_recalc_state), (gst_bin_get_state_func),
(gst_bin_element_set_state), (gst_bin_change_state_func),
(gst_bin_continue_func), (bin_bus_handler),
(bin_push_state_continue), (bin_handle_async_start),
(bin_handle_async_done), (gst_bin_handle_message_func):
Make async state changes a bit smarter by using new ASYNC_START and
ASYNC_DONE messages. This reduces the number of times we run the state
recalculation thread.
Don't change state of element with a pending ASYNC_START message.
Deprecate STATE_DIRTY messages.
* gst/gstelement.c: (gst_element_init), (gst_element_send_event),
(gst_element_get_state_func), (gst_element_continue_state),
(gst_element_lost_state), (gst_element_set_state_func),
(gst_element_change_state):
* gst/gstelement.h:
Keep the state that was last set by the app in a new element field.
Don't allow state changes when handling an element event.
Post ASYNC_START and ASYNC_DONE messages.
Change lost_state so that we go to PAUSED and wait for the parent to set
us to PLAYING again (so latency calculation can be performed)
Export gst_element_change_state() method so that subclasses can use it.
API: gst_element_change_state()
API: GST_STATE_TARGET
* gst/gstpipeline.c: (gst_pipeline_class_init),
(reset_stream_time), (gst_pipeline_change_state),
(gst_pipeline_handle_message), (gst_pipeline_set_new_stream_time):
Using the new ASYNC_START message we can reset the base_time when
needed. This can then be used to implement base_time redistribution in
flushing seeks so that we can remove the explicit seek handling.
Perform latency query and configuration when going to PLAYING.
* libs/gst/base/gstbasesink.c: (gst_base_sink_commit_state),
(gst_base_sink_query), (gst_base_sink_change_state):
Post new ASYNC_START/ASYNC_DONE messages.
* tests/check/generic/sinks.c: (GST_START_TEST):
Fix test because the bin will not set the async element to PLAYING right
away.
* tests/check/gst/gstbin.c: (pop_async_done), (GST_START_TEST):
Make the message check a little stronger.
Handle ASYNC messages.
* tests/check/pipelines/cleanup.c: (GST_START_TEST):
* tests/check/pipelines/simple-launch-lines.c: (GST_START_TEST):
Expect ASYNC_DONE messages.

ChangeLog
docs/gst/gstreamer-sections.txt
gst/gstbin.c
gst/gstelement.c
gst/gstelement.h
gst/gstpipeline.c
libs/gst/base/gstbasesink.c
tests/check/generic/sinks.c
tests/check/gst/gstbin.c
tests/check/pipelines/cleanup.c
tests/check/pipelines/simple-launch-lines.c

index e803469..84ffe41 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,6 +1,62 @@
 2007-03-19  Wim Taymans  <wim@fluendo.com>
 
        * docs/gst/gstreamer-sections.txt:
+       Add new element field and method.
+
+       * gst/gstbin.c: (gst_bin_class_init), (gst_bin_init),
+       (bin_remove_messages), (gst_bin_add_func), (gst_bin_remove_func),
+       (gst_bin_recalc_state), (gst_bin_get_state_func),
+       (gst_bin_element_set_state), (gst_bin_change_state_func),
+       (gst_bin_continue_func), (bin_bus_handler),
+       (bin_push_state_continue), (bin_handle_async_start),
+       (bin_handle_async_done), (gst_bin_handle_message_func):
+       Make async state changes a bit smarter by using new ASYNC_START and
+       ASYNC_DONE messages. This reduces the number of times we run the state
+       recalculation thread.
+       Don't change state of element with a pending ASYNC_START message.
+       Deprecate STATE_DIRTY messages.
+       
+       * gst/gstelement.c: (gst_element_init), (gst_element_send_event),
+       (gst_element_get_state_func), (gst_element_continue_state),
+       (gst_element_lost_state), (gst_element_set_state_func),
+       (gst_element_change_state):
+       * gst/gstelement.h:
+       Keep the state that was last set by the app in a new element field.
+       Don't allow state changes when handling an element event.
+       Post ASYNC_START and ASYNC_DONE messages.
+       Change lost_state so that we go to PAUSED and wait for the parent to set
+       us to PLAYING again (so latency calculation can be performed)
+       Export gst_element_change_state() method so that subclasses can use it.
+       API: gst_element_change_state()
+       API: GST_STATE_TARGET
+
+       * gst/gstpipeline.c: (gst_pipeline_class_init),
+       (reset_stream_time), (gst_pipeline_change_state),
+       (gst_pipeline_handle_message), (gst_pipeline_set_new_stream_time):
+       Using the new ASYNC_START message we can reset the base_time when
+       needed. This can then be used to implement base_time redistribution in
+       flushing seeks so that we can remove the explicit seek handling.
+       Perform latency query and configuration when going to PLAYING.
+
+       * libs/gst/base/gstbasesink.c: (gst_base_sink_commit_state),
+       (gst_base_sink_query), (gst_base_sink_change_state):
+       Post new ASYNC_START/ASYNC_DONE messages.
+
+       * tests/check/generic/sinks.c: (GST_START_TEST):
+       Fix test because the bin will not set the async element to PLAYING right
+       away.
+
+       * tests/check/gst/gstbin.c: (pop_async_done), (GST_START_TEST):
+       Make the message check a little stronger.
+       Handle ASYNC messages.
+
+       * tests/check/pipelines/cleanup.c: (GST_START_TEST):
+       * tests/check/pipelines/simple-launch-lines.c: (GST_START_TEST):
+       Expect ASYNC_DONE messages.
+
+2007-03-19  Wim Taymans  <wim@fluendo.com>
+
+       * docs/gst/gstreamer-sections.txt:
        * gst/gstmessage.c: (gst_message_new_async_start),
        (gst_message_new_async_done), (gst_message_parse_info),
        (gst_message_parse_async_start):
index d5bbdb6..0d42b58 100644 (file)
@@ -419,6 +419,7 @@ GST_STATE_GET_NEXT
 GST_STATE_NEXT
 GST_STATE_PENDING
 GST_STATE_RETURN
+GST_STATE_TARGET
 GST_STATE_TRANSITION
 GST_STATE_TRANSITION_CURRENT
 GST_STATE_TRANSITION_NEXT
@@ -500,6 +501,7 @@ gst_element_lost_state
 gst_element_state_get_name
 gst_element_state_change_return_get_name
 gst_element_sync_state_with_parent
+gst_element_change_state
 
 <SUBSECTION element-tags>
 gst_element_found_tags
index 34c91b9..9eab23a 100644 (file)
@@ -187,11 +187,12 @@ GST_ELEMENT_DETAILS ("Generic bin",
 
 static void gst_bin_dispose (GObject * object);
 
-static void gst_bin_recalc_state (GstBin * bin, gboolean force);
 static GstStateChangeReturn gst_bin_change_state_func (GstElement * element,
     GstStateChange transition);
 static GstStateChangeReturn gst_bin_get_state_func (GstElement * element,
     GstState * state, GstState * pending, GstClockTime timeout);
+static void bin_handle_async_done (GstBin * bin, GstMessage ** message);
+static void bin_push_state_continue (GstBin * bin);
 
 static gboolean gst_bin_add_func (GstBin * bin, GstElement * element);
 static gboolean gst_bin_remove_func (GstBin * bin, GstElement * element);
@@ -215,7 +216,7 @@ static void gst_bin_restore_thyself (GstObject * object, xmlNodePtr self);
 
 static void bin_remove_messages (GstBin * bin, GstObject * src,
     GstMessageType types);
-static void gst_bin_recalc_func (GstBin * child, gpointer data);
+static void gst_bin_continue_func (GstBin * child, gpointer data);
 static gint bin_element_is_sink (GstElement * child, GstBin * bin);
 static gint bin_element_is_src (GstElement * child, GstBin * bin);
 
@@ -398,7 +399,7 @@ gst_bin_class_init (GstBinClass * klass)
   GST_DEBUG ("creating bin thread pool");
   err = NULL;
   klass->pool =
-      g_thread_pool_new ((GFunc) gst_bin_recalc_func, NULL, -1, FALSE, &err);
+      g_thread_pool_new ((GFunc) gst_bin_continue_func, NULL, -1, FALSE, &err);
   if (err != NULL) {
     g_critical ("could alloc threadpool %s", err->message);
   }
@@ -413,9 +414,8 @@ gst_bin_init (GstBin * bin)
   bin->children = NULL;
   bin->children_cookie = 0;
   bin->messages = NULL;
-  bin->polling = FALSE;
-  bin->state_dirty = FALSE;
   bin->provided_clock = NULL;
+  bin->state_dirty = FALSE;
   bin->clock_dirty = FALSE;
 
   /* Set up a bus for listening to child elements */
@@ -680,12 +680,13 @@ bin_remove_messages (GstBin * bin, GstObject * src, GstMessageType types)
 
     if (message_check (message, &find) == 0) {
       GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message),
-          "deleting message of types %d", types);
+          "deleting message %p of types 0x%08x", message, types);
       bin->messages = g_list_delete_link (bin->messages, walk);
       gst_message_unref (message);
     } else {
       GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message),
-          "not deleting message of type %d", GST_MESSAGE_TYPE (message));
+          "not deleting message %p of type 0x%08x", message,
+          GST_MESSAGE_TYPE (message));
     }
   }
 }
@@ -803,6 +804,7 @@ gst_bin_add_func (GstBin * bin, GstElement * element)
    * that is not important right now. When the pipeline goes to PLAYING,
    * a new clock will be selected */
   gst_element_set_clock (element, GST_ELEMENT_CLOCK (bin));
+  GST_DEBUG_OBJECT (bin, "marking state dirty");
   bin->state_dirty = TRUE;
   GST_OBJECT_UNLOCK (bin);
 
@@ -848,7 +850,6 @@ had_parent:
   }
 }
 
-
 /**
  * gst_bin_add:
  * @bin: a #GstBin
@@ -906,7 +907,9 @@ gst_bin_remove_func (GstBin * bin, GstElement * element)
   gchar *elem_name;
   GstIterator *it;
   gboolean is_sink;
-  GstMessage *clock_message = NULL;
+  GstMessage *clock_message = NULL, *async_message = NULL, *smessage = NULL;
+  GList *walk, *next;
+  gboolean other_async = FALSE, this_async = FALSE, cont = FALSE;
 
   GST_OBJECT_LOCK (element);
   /* Check if the element is already being removed and immediately
@@ -958,12 +961,58 @@ gst_bin_remove_func (GstBin * bin, GstElement * element)
         gst_message_new_clock_lost (GST_OBJECT_CAST (bin), bin->provided_clock);
   }
 
+  /* remove messages for the element, if there was a pending ASYNC_START
+   * message we must see if removing the element caused the bin to lose its
+   * async state. */
+  for (walk = bin->messages; walk; walk = next) {
+    GstMessage *message = (GstMessage *) walk->data;
+    GstElement *src = GST_ELEMENT_CAST (GST_MESSAGE_SRC (message));
+
+    next = g_list_next (walk);
+
+    if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ASYNC_START) {
+      if (src == element)
+        this_async = TRUE;
+      else
+        other_async = TRUE;
+
+      GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message),
+          "looking at message %p", message);
+    }
+    if (src == element) {
+      /* delete all message types */
+      GST_DEBUG_OBJECT (GST_MESSAGE_SRC (message),
+          "deleting message %p of element \"%s\"", message, elem_name);
+      bin->messages = g_list_delete_link (bin->messages, walk);
+      gst_message_unref (message);
+    }
+  }
+  /* all other elements were not async and we removed the async one,
+   * post a ASYNC_DONE message because we are not async anymore now. */
+  if (!other_async && this_async) {
+    GST_DEBUG_OBJECT (bin, "we removed the last async element");
+
+    cont = GST_OBJECT_PARENT (bin) == NULL;
+    if (!cont) {
+      bin_handle_async_done (bin, &smessage);
+      async_message = gst_message_new_async_done (GST_OBJECT_CAST (bin));
+    }
+  }
+  GST_DEBUG_OBJECT (bin, "marking state dirty");
   bin->state_dirty = TRUE;
   GST_OBJECT_UNLOCK (bin);
 
-  if (clock_message) {
+  if (clock_message)
     gst_element_post_message (GST_ELEMENT_CAST (bin), clock_message);
-  }
+
+  if (smessage)
+    gst_element_post_message (GST_ELEMENT_CAST (bin), smessage);
+
+  if (async_message)
+    gst_element_post_message (GST_ELEMENT_CAST (bin), async_message);
+
+  if (cont)
+    bin_push_state_continue (bin);
 
   GST_CAT_INFO_OBJECT (GST_CAT_PARENTAGE, bin, "removed child \"%s\"",
       elem_name);
@@ -1277,26 +1326,6 @@ gst_bin_iterate_sources (GstBin * bin)
   return result;
 }
 
-/*
- * MT safe
- */
-static GstStateChangeReturn
-gst_bin_get_state_func (GstElement * element, GstState * state,
-    GstState * pending, GstClockTime timeout)
-{
-  GstBin *bin = GST_BIN (element);
-  GstStateChangeReturn ret;
-
-  GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "getting state");
-
-  /* do a non forced recalculation of the state */
-  gst_bin_recalc_state (bin, FALSE);
-
-  ret = parent_class->get_state (element, state, pending, timeout);
-
-  return ret;
-}
-
 static void
 gst_bin_recalc_state (GstBin * bin, gboolean force)
 {
@@ -1404,6 +1433,7 @@ restart:
 
 done:
   bin->polling = FALSE;
+  GST_STATE_RETURN (bin) = ret;
   GST_OBJECT_UNLOCK (bin);
 
   /* now we can take the state lock, it is possible that new elements
@@ -1461,6 +1491,26 @@ unknown_state:
   }
 }
 
+/*
+ * MT safe
+ */
+static GstStateChangeReturn
+gst_bin_get_state_func (GstElement * element, GstState * state,
+    GstState * pending, GstClockTime timeout)
+{
+  GstBin *bin = GST_BIN (element);
+  GstStateChangeReturn ret;
+
+  GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "getting state");
+
+  /* do a non forced recalculation of the state */
+  gst_bin_recalc_state (bin, FALSE);
+
+  ret = parent_class->get_state (element, state, pending, timeout);
+
+  return ret;
+}
+
 /***********************************************
  * Topologically sorted iterator
  * see http://en.wikipedia.org/wiki/Topological_sorting
@@ -1746,10 +1796,13 @@ gst_bin_iterate_sorted (GstBin * bin)
 }
 
 static GstStateChangeReturn
-gst_bin_element_set_state (GstBin * bin, GstElement * element, GstState pending)
+gst_bin_element_set_state (GstBin * bin, GstElement * element,
+    GstClockTime base_time, GstState current, GstState next)
 {
   GstStateChangeReturn ret;
   gboolean locked;
+  GList *found;
+  MessageFind find;
 
   /* peel off the locked flag */
   GST_OBJECT_LOCK (element);
@@ -1757,18 +1810,53 @@ gst_bin_element_set_state (GstBin * bin, GstElement * element, GstState pending)
   GST_OBJECT_UNLOCK (element);
 
   /* skip locked elements */
-  if (G_UNLIKELY (locked)) {
-    GST_DEBUG_OBJECT (element,
-        "element is locked, pretending state change succeeded");
-    ret = GST_STATE_CHANGE_SUCCESS;
-    goto done;
+  if (G_UNLIKELY (locked))
+    goto locked;
+
+  /* the element was busy with an upwards async state change, we must wait for
+   * an ASYNC_DONE message before we attemp to change the state. */
+  find.src = GST_OBJECT_CAST (element);
+  find.types = GST_MESSAGE_ASYNC_START;
+
+  GST_OBJECT_LOCK (bin);
+  if ((found = g_list_find_custom (bin->messages, &find,
+              (GCompareFunc) message_check))) {
+    GstMessage *message = GST_MESSAGE_CAST (found->data);
+    GstObject *src = GST_MESSAGE_SRC (message);
+
+    GST_DEBUG_OBJECT (element, "element message %p, %s async busy",
+        message, GST_ELEMENT_NAME (src));
+    /* only wait for upward state changes */
+    if (next > current)
+      goto was_busy;
   }
+  GST_OBJECT_UNLOCK (bin);
+
+  GST_DEBUG_OBJECT (bin,
+      "setting element %s to %s, base_time %" GST_TIME_FORMAT,
+      GST_ELEMENT_NAME (element), gst_element_state_get_name (next),
+      GST_TIME_ARGS (base_time));
+
+  /* set base_time on child */
+  gst_element_set_base_time (element, base_time);
 
   /* change state */
-  ret = gst_element_set_state (element, pending);
+  ret = gst_element_set_state (element, next);
 
-done:
   return ret;
+
+locked:
+  {
+    GST_DEBUG_OBJECT (element,
+        "element is locked, pretending state change succeeded");
+    return GST_STATE_CHANGE_SUCCESS;
+  }
+was_busy:
+  {
+    GST_DEBUG_OBJECT (element, "element is was busy delaying state change");
+    GST_OBJECT_UNLOCK (bin);
+    return GST_STATE_CHANGE_ASYNC;
+  }
 }
 
 /* gst_iterator_fold functions for pads_activate
@@ -1925,11 +2013,8 @@ restart:
 
         child = GST_ELEMENT_CAST (data);
 
-        /* set base_time on child */
-        gst_element_set_base_time (child, base_time);
-
-        /* set state now */
-        ret = gst_bin_element_set_state (bin, child, next);
+        /* set state and base_time now */
+        ret = gst_bin_element_set_state (bin, child, base_time, current, next);
 
         switch (ret) {
           case GST_STATE_CHANGE_SUCCESS:
@@ -1939,11 +2024,13 @@ restart:
                 gst_element_state_get_name (next));
             break;
           case GST_STATE_CHANGE_ASYNC:
+          {
             GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
                 "child '%s' is changing state asynchronously to %s",
                 GST_ELEMENT_NAME (child), gst_element_state_get_name (next));
             have_async = TRUE;
             break;
+          }
           case GST_STATE_CHANGE_FAILURE:
             GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
                 "child '%s' failed to go to state %d(%s)",
@@ -1991,10 +2078,11 @@ done:
   gst_iterator_free (it);
 
   GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
-      "done changing bin's state from %s to %s, now in %s, ret %d",
+      "done changing bin's state from %s to %s, now in %s, ret %s",
       gst_element_state_get_name (current),
       gst_element_state_get_name (next),
-      gst_element_state_get_name (GST_STATE (element)), ret);
+      gst_element_state_get_name (GST_STATE (element)),
+      gst_element_state_change_return_get_name (ret));
 
   return ret;
 
@@ -2067,20 +2155,137 @@ gst_bin_send_event (GstElement * element, GstEvent * event)
 }
 
 static void
-gst_bin_recalc_func (GstBin * bin, gpointer data)
+gst_bin_continue_func (GstBin * bin, gpointer data)
 {
-  GST_DEBUG_OBJECT (bin, "doing state recalc");
+  GstState current, next, pending, target, old_state, old_next;
+  GstStateChangeReturn old_ret, ret;
+  GstStateChange transition;
+  gboolean busy, post;
+
+  GST_DEBUG_OBJECT (bin, "waiting for state lock");
   GST_STATE_LOCK (bin);
-  gst_bin_recalc_state (bin, FALSE);
+
+  GST_DEBUG_OBJECT (bin, "doing state continue");
+  GST_OBJECT_LOCK (bin);
+
+  old_ret = GST_STATE_RETURN (bin);
+  GST_STATE_RETURN (bin) = GST_STATE_CHANGE_SUCCESS;
+  busy = (old_ret == GST_STATE_CHANGE_ASYNC);
+  target = GST_STATE_TARGET (bin);
+  pending = GST_STATE_PENDING (bin);
+
+  /* check if there is something to commit */
+  if (pending == GST_STATE_VOID_PENDING)
+    goto nothing_pending;
+
+  old_state = GST_STATE (bin);
+  /* this is the state we should go to next */
+  old_next = GST_STATE_NEXT (bin);
+
+  GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "busy %d, target %s",
+      busy, gst_element_state_get_name (target));
+
+  if (old_next == GST_STATE_PLAYING) {
+    post = FALSE;
+  } else {
+    post = TRUE;
+  }
+
+  /* we're going to PLAYING, always do the PAUSED->PLAYING state change */
+  if (target == GST_STATE_PLAYING) {
+    next = GST_STATE_PAUSED;
+    GST_STATE_PENDING (bin) = pending = target;
+  } else {
+    next = old_next;
+  }
+
+  /* update current state */
+  current = GST_STATE (bin) = next;
+
+  /* see if we reached the final state */
+  if (pending == current)
+    goto complete;
+
+  next = GST_STATE_GET_NEXT (current, pending);
+  transition = (GstStateChange) GST_STATE_TRANSITION (current, next);
+
+  GST_STATE_NEXT (bin) = next;
+  /* mark busy */
+  GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC;
+  GST_OBJECT_UNLOCK (bin);
+
+  if (post) {
+    GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin,
+        "committing state from %s to %s, pending %s",
+        gst_element_state_get_name (old_state),
+        gst_element_state_get_name (old_next),
+        gst_element_state_get_name (pending));
+
+    gst_element_post_message (GST_ELEMENT_CAST (bin),
+        gst_message_new_state_changed (GST_OBJECT_CAST (bin),
+            old_state, old_next, pending));
+  }
+
+  if (busy) {
+    GST_DEBUG_OBJECT (bin, "posting ASYNC_DONE");
+    gst_element_post_message (GST_ELEMENT_CAST (bin),
+        gst_message_new_async_done (GST_OBJECT_CAST (bin)));
+  }
+
+  GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin,
+      "continue state change %s to %s, final %s",
+      gst_element_state_get_name (current),
+      gst_element_state_get_name (next), gst_element_state_get_name (pending));
+
+  ret = gst_element_change_state (GST_ELEMENT_CAST (bin), transition);
+
+done:
   GST_STATE_UNLOCK (bin);
-  GST_DEBUG_OBJECT (bin, "state recalc done");
+  GST_DEBUG_OBJECT (bin, "state continue done");
   gst_object_unref (bin);
+
+  return;
+
+nothing_pending:
+  {
+    GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "nothing pending");
+    GST_OBJECT_UNLOCK (bin);
+    goto done;
+  }
+complete:
+  {
+    GST_STATE_PENDING (bin) = GST_STATE_VOID_PENDING;
+    GST_STATE_NEXT (bin) = GST_STATE_VOID_PENDING;
+
+    GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "completed state change to %s",
+        gst_element_state_get_name (pending));
+    GST_OBJECT_UNLOCK (bin);
+
+    /* don't post silly messages with the same state. This can happen
+     * when an element state is changed to what it already was. For bins
+     * this can be the result of a lost state, which we check with the
+     * previous return value. 
+     * We do signal the cond though as a _get_state() might be blocking 
+     * on it. */
+    if (old_state != old_next || old_ret == GST_STATE_CHANGE_ASYNC) {
+      gst_element_post_message (GST_ELEMENT_CAST (bin),
+          gst_message_new_state_changed (GST_OBJECT_CAST (bin),
+              old_state, old_next, GST_STATE_VOID_PENDING));
+    }
+
+    GST_DEBUG_OBJECT (bin, "posting ASYNC_DONE");
+    gst_element_post_message (GST_ELEMENT_CAST (bin),
+        gst_message_new_async_done (GST_OBJECT_CAST (bin)));
+
+    GST_STATE_BROADCAST (bin);
+
+    goto done;
+  }
 }
 
 static GstBusSyncReply
 bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin)
 {
-
   GstBinClass *bclass;
 
   bclass = GST_BIN_GET_CLASS (bin);
@@ -2092,6 +2297,139 @@ bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin)
   return GST_BUS_DROP;
 }
 
+static void
+bin_push_state_continue (GstBin * bin)
+{
+  GstBinClass *klass;
+
+  /* mark the bin dirty */
+  GST_OBJECT_LOCK (bin);
+  klass = GST_BIN_GET_CLASS (bin);
+  GST_DEBUG_OBJECT (bin, "pushing continue on thread pool");
+  gst_object_ref (bin);
+  g_thread_pool_push (klass->pool, bin, NULL);
+  GST_OBJECT_UNLOCK (bin);
+}
+
+/* an element started an async state change, if we were not busy with a state
+ * change, we perform a lost state.
+ */
+static void
+bin_handle_async_start (GstBin * bin, GstMessage ** message)
+{
+  GstState old_state, new_state;
+
+  if (GST_STATE_RETURN (bin) == GST_STATE_CHANGE_FAILURE)
+    goto had_error;
+
+  if (GST_STATE_PENDING (bin) != GST_STATE_VOID_PENDING)
+    goto was_busy;
+
+  old_state = GST_STATE (bin);
+
+  if (old_state > GST_STATE_PAUSED)
+    new_state = GST_STATE_PAUSED;
+  else
+    new_state = old_state;
+
+  GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin,
+      "lost state of %s, new %s", gst_element_state_get_name (old_state),
+      gst_element_state_get_name (new_state));
+
+  GST_STATE (bin) = new_state;
+  GST_STATE_NEXT (bin) = new_state;
+  GST_STATE_PENDING (bin) = new_state;
+  GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC;
+
+  *message = gst_message_new_state_changed (GST_OBJECT_CAST (bin),
+      new_state, new_state, new_state);
+
+  return;
+
+had_error:
+  {
+    GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, "we had an error");
+    return;
+  }
+was_busy:
+  {
+    GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin, "state change busy");
+    return;
+  }
+}
+
+static void
+bin_handle_async_done (GstBin * bin, GstMessage ** message)
+{
+  GstState pending;
+  GstStateChangeReturn old_ret;
+  GstState old_state, old_next;
+  GstState current, next;
+
+  old_ret = GST_STATE_RETURN (bin);
+  GST_STATE_RETURN (bin) = GST_STATE_CHANGE_SUCCESS;
+  pending = GST_STATE_PENDING (bin);
+
+  /* check if there is something to commit */
+  if (pending == GST_STATE_VOID_PENDING)
+    goto nothing_pending;
+
+  old_state = GST_STATE (bin);
+  /* this is the state we should go to next */
+  old_next = GST_STATE_NEXT (bin);
+  /* update current state */
+  current = GST_STATE (bin) = old_next;
+
+  /* see if we reached the final state */
+  if (pending == current)
+    goto complete;
+
+  next = GST_STATE_GET_NEXT (current, pending);
+
+  GST_STATE_NEXT (bin) = next;
+  /* mark busy */
+  GST_STATE_RETURN (bin) = GST_STATE_CHANGE_ASYNC;
+
+  GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin,
+      "committing state from %s to %s, pending %s",
+      gst_element_state_get_name (old_state),
+      gst_element_state_get_name (old_next),
+      gst_element_state_get_name (pending));
+
+  *message = gst_message_new_state_changed (GST_OBJECT_CAST (bin),
+      old_state, old_next, pending);
+
+  return;
+
+nothing_pending:
+  {
+    GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "nothing pending");
+    return;
+  }
+complete:
+  {
+    GST_STATE_PENDING (bin) = GST_STATE_VOID_PENDING;
+    GST_STATE_NEXT (bin) = GST_STATE_VOID_PENDING;
+
+    GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "completed state change");
+
+    /* don't post silly messages with the same state. This can happen
+     * when an element state is changed to what it already was. For bins
+     * this can be the result of a lost state, which we check with the
+     * previous return value. 
+     * We do signal the cond though as a _get_state() might be blocking 
+     * on it. */
+    if (old_state != old_next || old_ret == GST_STATE_CHANGE_ASYNC) {
+      *message = gst_message_new_state_changed (GST_OBJECT_CAST (bin),
+          old_state, old_next, GST_STATE_VOID_PENDING);
+    }
+
+    GST_STATE_BROADCAST (bin);
+
+    return;
+  }
+}
+
 /* handle child messages:
  *
  * This method is called synchronously when a child posts a message on
@@ -2139,15 +2477,29 @@ bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin)
  *     This message is never sent to the application but is forwarded to
  *     the parent.
  *
+ * GST_MESSAGE_ASYNC_START: Create an internal ELEMENT message that stores
+ *     the state of the element and the fact that the element will need a 
+ *     new base_time.
+ *
+ * GST_MESSAGE_ASYNC_DONE: Find the internal ELEMENT message we kept for the
+ *     element when it posted ASYNC_START. If all elements are done, post a
+ *     ASYNC_DONE message to the parent.
+ *
  * OTHER: post upwards.
  */
 static void
 gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
 {
-  GST_DEBUG_OBJECT (bin, "[msg %p] handling child message of type %s",
-      message, GST_MESSAGE_TYPE_NAME (message));
+  GstObject *src;
+  GstMessageType type;
+
+  src = GST_MESSAGE_SRC (message);
+  type = GST_MESSAGE_TYPE (message);
+
+  GST_DEBUG_OBJECT (bin, "[msg %p] handling child %s message of type %s",
+      message, GST_ELEMENT_NAME (src), GST_MESSAGE_TYPE_NAME (message));
 
-  switch (GST_MESSAGE_TYPE (message)) {
+  switch (type) {
     case GST_MESSAGE_EOS:
     {
       gboolean eos;
@@ -2168,44 +2520,11 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
     }
     case GST_MESSAGE_STATE_DIRTY:
     {
-      GstObject *src;
-      GstBinClass *klass;
-
-      src = GST_MESSAGE_SRC (message);
-
-      GST_DEBUG_OBJECT (bin, "%s gave state dirty", GST_ELEMENT_NAME (src));
-
-      /* mark the bin dirty */
-      GST_OBJECT_LOCK (bin);
-      GST_DEBUG_OBJECT (bin, "marking dirty");
-      bin->state_dirty = TRUE;
-
-      if (GST_OBJECT_PARENT (bin))
-        goto not_toplevel;
+      GST_WARNING_OBJECT (bin, "received deprecated STATE_DIRTY message");
 
       /* free message */
       gst_message_unref (message);
-
-      klass = GST_BIN_GET_CLASS (bin);
-      if (!bin->polling) {
-        GST_DEBUG_OBJECT (bin, "pushing recalc on thread pool");
-        gst_object_ref (bin);
-        g_thread_pool_push (klass->pool, bin, NULL);
-      } else {
-        GST_DEBUG_OBJECT (bin,
-            "state recalc already in progress, not pushing new recalc");
-      }
-      GST_OBJECT_UNLOCK (bin);
       break;
-
-      /* non toplevel bins just forward the message and don't start
-       * a recalc themselves */
-    not_toplevel:
-      {
-        GST_OBJECT_UNLOCK (bin);
-        GST_DEBUG_OBJECT (bin, "not toplevel, forwarding");
-        goto forward;
-      }
     }
     case GST_MESSAGE_SEGMENT_START:
       GST_OBJECT_LOCK (bin);
@@ -2311,6 +2630,117 @@ gst_bin_handle_message_func (GstBin * bin, GstMessage * message)
       gst_message_unref (message);
       break;
     }
+    case GST_MESSAGE_ASYNC_START:
+    {
+      gboolean forward = FALSE, new_base_time;
+      GstState target;
+      GstMessage *smessage = NULL;
+
+      GST_DEBUG_OBJECT (bin, "ASYNC_START message %p, %s", message,
+          GST_OBJECT_NAME (src));
+
+      gst_message_parse_async_start (message, &new_base_time);
+
+      GST_OBJECT_LOCK (bin);
+      /* we ignore the message if we are going to <= READY */
+      target = GST_STATE_TARGET (bin);
+      if (target <= GST_STATE_READY)
+        goto ignore_start_message;
+
+      bin_replace_message (bin, message, GST_MESSAGE_ASYNC_START);
+
+      bin_handle_async_start (bin, &smessage);
+
+      /* prepare an ASYNC_START message */
+      if (GST_OBJECT_PARENT (bin)) {
+        forward = TRUE;
+        message =
+            gst_message_new_async_start (GST_OBJECT_CAST (bin), new_base_time);
+      } else {
+        message = NULL;
+      }
+      GST_OBJECT_UNLOCK (bin);
+
+      if (smessage)
+        gst_element_post_message (GST_ELEMENT_CAST (bin), smessage);
+
+      if (forward)
+        goto forward;
+      else
+        break;
+
+    ignore_start_message:
+      {
+        GST_DEBUG_OBJECT (bin, "ignoring message, target %s",
+            gst_element_state_get_name (target));
+        GST_OBJECT_UNLOCK (bin);
+        gst_message_unref (message);
+        break;
+      }
+    }
+    case GST_MESSAGE_ASYNC_DONE:
+    {
+      gboolean done = FALSE, toplevel = FALSE;
+      GstState target;
+      MessageFind find;
+      GstMessage *smessage = NULL;
+
+      GST_DEBUG_OBJECT (bin, "ASYNC_DONE message %p, %s", message,
+          GST_OBJECT_NAME (src));
+
+      GST_OBJECT_LOCK (bin);
+      target = GST_STATE_TARGET (bin);
+      /* ignore messages if we are shutting down */
+      if (target <= GST_STATE_READY)
+        goto ignore_done_message;
+
+      bin_replace_message (bin, message, GST_MESSAGE_ASYNC_START);
+      /* if there are no more ASYNC_START messages, everybody posted
+       * a ASYNC_DONE and we can post one on the bus. */
+
+      /* we don't care who still has a pending ASYNC_START */
+      find.src = NULL;
+      find.types = GST_MESSAGE_ASYNC_START;
+
+      if (!g_list_find_custom (bin->messages, &find,
+              (GCompareFunc) message_check)) {
+        /* nothing found, remove all old ASYNC_DONE messages */
+        bin_remove_messages (bin, NULL, GST_MESSAGE_ASYNC_DONE);
+        done = TRUE;
+        toplevel = GST_OBJECT_PARENT (bin) == NULL;
+        if (!toplevel)
+          bin_handle_async_done (bin, &smessage);
+      }
+      GST_OBJECT_UNLOCK (bin);
+
+      if (smessage) {
+        GST_DEBUG_OBJECT (bin, "posting state change message");
+        gst_element_post_message (GST_ELEMENT_CAST (bin), smessage);
+      }
+      if (done) {
+        if (!toplevel) {
+          GST_DEBUG_OBJECT (bin,
+              "all async-done, posting ASYNC_DONE to parent");
+          /* post our combined ASYNC_DONE when all is ASYNC_DONE. */
+          gst_element_post_message (GST_ELEMENT_CAST (bin),
+              gst_message_new_async_done (GST_OBJECT_CAST (bin)));
+        } else {
+          /* toplevel, start continue state */
+          GST_DEBUG_OBJECT (bin, "all async-done, starting state continue");
+          bin_push_state_continue (bin);
+        }
+      }
+      break;
+
+    ignore_done_message:
+      {
+        GST_DEBUG_OBJECT (bin, "ignoring message, target %s",
+            gst_element_state_get_name (target));
+        GST_OBJECT_UNLOCK (bin);
+        gst_message_unref (message);
+        break;
+      }
+    }
     default:
       goto forward;
   }
index be458ca..30880f8 100644 (file)
@@ -82,6 +82,7 @@
 #include <gobject/gvaluecollector.h>
 
 #include "gstelement.h"
+#include "gstenumtypes.h"
 #include "gstbus.h"
 #include "gstmarshal.h"
 #include "gsterror.h"
@@ -118,8 +119,6 @@ static void gst_element_base_class_finalize (gpointer g_class);
 static void gst_element_dispose (GObject * object);
 static void gst_element_finalize (GObject * object);
 
-static GstStateChangeReturn gst_element_change_state (GstElement * element,
-    GstStateChange transition);
 static GstStateChangeReturn gst_element_change_state_func (GstElement * element,
     GstStateChange transition);
 static GstStateChangeReturn gst_element_get_state_func (GstElement * element,
@@ -254,6 +253,7 @@ static void
 gst_element_init (GstElement * element)
 {
   GST_STATE (element) = GST_STATE_NULL;
+  GST_STATE_TARGET (element) = GST_STATE_NULL;
   GST_STATE_NEXT (element) = GST_STATE_VOID_PENDING;
   GST_STATE_PENDING (element) = GST_STATE_VOID_PENDING;
   GST_STATE_RETURN (element) = GST_STATE_CHANGE_SUCCESS;
@@ -1295,6 +1295,7 @@ gst_element_send_event (GstElement * element, GstEvent * event)
 
   oclass = GST_ELEMENT_GET_CLASS (element);
 
+  GST_STATE_LOCK (element);
   if (oclass->send_event) {
     GST_CAT_DEBUG (GST_CAT_ELEMENT_PADS, "send %s event on element %s",
         GST_EVENT_TYPE_NAME (event), GST_ELEMENT_NAME (element));
@@ -1302,6 +1303,8 @@ gst_element_send_event (GstElement * element, GstEvent * event)
   } else {
     result = gst_element_default_send_event (element, event);
   }
+  GST_STATE_UNLOCK (element);
+
   return result;
 }
 
@@ -1762,6 +1765,9 @@ gst_element_get_state_func (GstElement * element,
   GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, "RETURN is %s",
       gst_element_state_change_return_get_name (ret));
 
+  GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, "RETURN is %s",
+      gst_element_state_change_return_get_name (ret));
+
   /* we got an error, report immediatly */
   if (ret == GST_STATE_CHANGE_FAILURE)
     goto done;
@@ -1984,14 +1990,14 @@ nothing_aborted:
 GstStateChangeReturn
 gst_element_continue_state (GstElement * element, GstStateChangeReturn ret)
 {
-  GstState pending;
-  GstState old_ret, old_state, old_next;
-  GstState current, next;
+  GstStateChangeReturn old_ret;
+  GstState old_state, old_next;
+  GstState current, next, pending;
   GstMessage *message;
   GstStateChange transition;
 
   GST_OBJECT_LOCK (element);
-  old_ret = (GstState) GST_STATE_RETURN (element);
+  old_ret = GST_STATE_RETURN (element);
   GST_STATE_RETURN (element) = ret;
   pending = GST_STATE_PENDING (element);
 
@@ -2047,7 +2053,8 @@ complete:
     GST_STATE_PENDING (element) = GST_STATE_VOID_PENDING;
     GST_STATE_NEXT (element) = GST_STATE_VOID_PENDING;
 
-    GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "completed state change");
+    GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
+        "completed state change to %s", gst_element_state_get_name (pending));
     GST_OBJECT_UNLOCK (element);
 
     /* don't post silly messages with the same state. This can happen
@@ -2076,10 +2083,16 @@ complete:
  * element is copied to the pending state so that any call to
  * gst_element_get_state() will return %GST_STATE_CHANGE_ASYNC.
  *
+ * An ASYNC_START message is posted with an indication to distribute a new
+ * base_time to the element.
+ * If the element was PLAYING, it will go to PAUSED. The element
+ * will be restored to its PLAYING state by the parent pipeline when it
+ * prerolls again.
+ *
  * This is mostly used for elements that lost their preroll buffer
- * in the %GST_STATE_PAUSED state after a flush, they become %GST_STATE_PAUSED
- * again if a new preroll buffer is queued.
- * This function can only be called when the element is currently
+ * in the %GST_STATE_PAUSED or %GST_STATE_PLAYING state after a flush,
+ * they will go to their pending state again when a new preroll buffer is
+ * queued. This function can only be called when the element is currently
  * not in error or an async state change.
  *
  * This function is used internally and should normally not be called from
@@ -2090,7 +2103,7 @@ complete:
 void
 gst_element_lost_state (GstElement * element)
 {
-  GstState current_state;
+  GstState old_state, new_state;
   GstMessage *message;
 
   g_return_if_fail (GST_IS_ELEMENT (element));
@@ -2100,22 +2113,31 @@ gst_element_lost_state (GstElement * element)
       GST_STATE_RETURN (element) == GST_STATE_CHANGE_FAILURE)
     goto nothing_lost;
 
-  current_state = GST_STATE (element);
+  old_state = GST_STATE (element);
+
+  /* when we were PLAYING, the new state is PAUSED. We will also not
+   * automatically go to PLAYING but let the parent bin(s) set us to PLAYING
+   * when we preroll. */
+  if (old_state > GST_STATE_PAUSED)
+    new_state = GST_STATE_PAUSED;
+  else
+    new_state = old_state;
 
   GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
-      "lost state of %s", gst_element_state_get_name (current_state));
+      "lost state of %s to %s", gst_element_state_get_name (old_state),
+      gst_element_state_get_name (new_state));
 
-  GST_STATE_NEXT (element) = current_state;
-  GST_STATE_PENDING (element) = current_state;
+  GST_STATE (element) = new_state;
+  GST_STATE_NEXT (element) = new_state;
+  GST_STATE_PENDING (element) = new_state;
   GST_STATE_RETURN (element) = GST_STATE_CHANGE_ASYNC;
   GST_OBJECT_UNLOCK (element);
 
   message = gst_message_new_state_changed (GST_OBJECT_CAST (element),
-      current_state, current_state, current_state);
+      new_state, new_state, new_state);
   gst_element_post_message (element, message);
 
-  /* and mark us dirty */
-  message = gst_message_new_state_dirty (GST_OBJECT_CAST (element));
+  message = gst_message_new_async_start (GST_OBJECT_CAST (element), TRUE);
   gst_element_post_message (element, message);
 
   return;
@@ -2200,7 +2222,9 @@ gst_element_set_state_func (GstElement * element, GstState state)
   /* increment state cookie so that we can track each state change */
   element->state_cookie++;
 
-  /* this is the (new) state we should go to */
+  /* this is the (new) state we should go to. TARGET is the last state we set on
+   * the element. */
+  GST_STATE_TARGET (element) = state;
   GST_STATE_PENDING (element) = state;
 
   GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
@@ -2271,8 +2295,19 @@ was_busy:
   }
 }
 
-/* with STATE_LOCK */
-static GstStateChangeReturn
+/**
+ * gst_element_change_state:
+ * @element: a #GstElement
+ * @transition: the requested transition
+ *
+ * Perform @transition on @element.
+ *
+ * This function must be called with STATE_LOCK held and is mainly used
+ * internally.
+ *
+ * Returns: the #GstStateChangeReturn of the state transition.
+ */
+GstStateChangeReturn
 gst_element_change_state (GstElement * element, GstStateChange transition)
 {
   GstElementClass *oclass;
@@ -2300,22 +2335,26 @@ gst_element_change_state (GstElement * element, GstStateChange transition)
       gst_element_abort_state (element);
       break;
     case GST_STATE_CHANGE_ASYNC:
+    {
+      GstState target;
+
       GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
           "element will change state ASYNC");
 
-      /* if we go upwards, we give the app a change to wait for
-       * completion */
-      if (current < next)
+      target = GST_STATE_TARGET (element);
+
+      if (target > GST_STATE_READY)
         goto async;
 
       /* else we just continue the state change downwards */
       GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
-          "forcing commit state %s < %s",
-          gst_element_state_get_name (current),
-          gst_element_state_get_name (next));
+          "forcing commit state %s <= %s",
+          gst_element_state_get_name (target),
+          gst_element_state_get_name (GST_STATE_READY));
 
       ret = gst_element_continue_state (element, GST_STATE_CHANGE_SUCCESS);
       break;
+    }
     case GST_STATE_CHANGE_SUCCESS:
       GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
           "element changed state SUCCESS");
index 9aad5e5..d5181ac 100644 (file)
@@ -120,6 +120,16 @@ typedef enum {
 #define GST_STATE_PENDING(elem)                (GST_ELEMENT_CAST(elem)->pending_state)
 
 /**
+ * GST_STATE_TARGET:
+ * @elem: a #GstElement to return the target state for.
+ *
+ * This macro returns the target #GstState of the element.
+ *
+ * Since: 0.10.13
+ */
+#define GST_STATE_TARGET(elem)         (GST_ELEMENT_CAST(elem)->abidata.ABI.target_state)
+
+/**
  * GST_STATE_RETURN:
  * @elem: a #GstElement to return the last state result for.
  *
@@ -427,7 +437,14 @@ struct _GstElement
   guint32               pads_cookie;
 
   /*< private >*/
-  gpointer _gst_reserved[GST_PADDING];
+  union {
+    struct {
+      /* state set by application */
+      GstState              target_state;
+    } ABI;
+    /* adding + 0 to mark ABI change to be undone later */
+    gpointer _gst_reserved[GST_PADDING + 0];
+  } abidata;
 };
 
 /**
@@ -626,6 +643,8 @@ GstStateChangeReturn        gst_element_get_state           (GstElement * element,
 GstStateChangeReturn   gst_element_set_state           (GstElement *element, GstState state);
 
 void                   gst_element_abort_state         (GstElement * element);
+GstStateChangeReturn    gst_element_change_state        (GstElement * element,
+                                                        GstStateChange transition);
 GstStateChangeReturn   gst_element_continue_state      (GstElement * element,
                                                          GstStateChangeReturn ret);
 void                   gst_element_lost_state          (GstElement * element);
index a28f5ec..215078e 100644 (file)
@@ -120,6 +120,8 @@ struct _GstPipelinePrivate
 {
   /* with LOCK */
   gboolean auto_flush_bus;
+
+  GstClockTime new_stream_time;
 };
 
 
@@ -133,13 +135,12 @@ static void gst_pipeline_set_property (GObject * object, guint prop_id,
 static void gst_pipeline_get_property (GObject * object, guint prop_id,
     GValue * value, GParamSpec * pspec);
 
-static gboolean gst_pipeline_send_event (GstElement * element,
-    GstEvent * event);
-
 static GstClock *gst_pipeline_provide_clock_func (GstElement * element);
 static GstStateChangeReturn gst_pipeline_change_state (GstElement * element,
     GstStateChange transition);
 
+static void gst_pipeline_handle_message (GstBin * bin, GstMessage * message);
+
 static GstBinClass *parent_class = NULL;
 
 /* static guint gst_pipeline_signals[LAST_SIGNAL] = { 0 }; */
@@ -185,6 +186,7 @@ gst_pipeline_class_init (gpointer g_class, gpointer class_data)
 {
   GObjectClass *gobject_class = G_OBJECT_CLASS (g_class);
   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
+  GstBinClass *gstbin_class = GST_BIN_CLASS (g_class);
   GstPipelineClass *klass = GST_PIPELINE_CLASS (g_class);
 
   parent_class = g_type_class_peek_parent (klass);
@@ -226,11 +228,13 @@ gst_pipeline_class_init (gpointer g_class, gpointer class_data)
 
   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_pipeline_dispose);
 
-  gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_pipeline_send_event);
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_pipeline_change_state);
   gstelement_class->provide_clock =
       GST_DEBUG_FUNCPTR (gst_pipeline_provide_clock_func);
+
+  gstbin_class->handle_message =
+      GST_DEBUG_FUNCPTR (gst_pipeline_handle_message);
 }
 
 static void
@@ -311,90 +315,19 @@ gst_pipeline_get_property (GObject * object, guint prop_id,
   }
 }
 
-/* default pipeline seeking code:
- *
- * If the pipeline is PLAYING and a flushing seek is done, set
- * the pipeline to PAUSED before doing the seek.
- *
- * A flushing seek also resets the stream time to 0 so that when
- * we go back to PLAYING after the seek, the base_time is recalculated
- * and redistributed to the elements.
- */
-static gboolean
-do_pipeline_seek (GstElement * element, GstEvent * event)
-{
-  gdouble rate;
-  GstSeekFlags flags;
-  gboolean flush;
-  gboolean was_playing = FALSE;
-  gboolean res;
-
-  /* we are only interested in the FLUSH flag of the seek event. */
-  gst_event_parse_seek (event, &rate, NULL, &flags, NULL, NULL, NULL, NULL);
-
-  flush = flags & GST_SEEK_FLAG_FLUSH;
-
-  /* if flushing seek, get the current state */
-  if (flush) {
-    GstState state;
-
-    /* need to call _get_state() since a bin state is only updated
-     * with this call. */
-    gst_element_get_state (element, &state, NULL, 0);
-    was_playing = (state == GST_STATE_PLAYING);
-
-    if (was_playing) {
-      /* and PAUSE when the pipeline was PLAYING, we don't need
-       * to wait for the state change to complete since we are going
-       * to flush out any preroll sample anyway. */
-      gst_element_set_state (element, GST_STATE_PAUSED);
-    }
-  }
-
-  /* let parent class implement the seek behaviour */
-  res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
-
-  /* if flushing seek restore previous state */
-  if (flush) {
-    gboolean need_reset;
-
-    GST_OBJECT_LOCK (element);
-    need_reset = GST_PIPELINE (element)->stream_time != GST_CLOCK_TIME_NONE;
-    GST_OBJECT_UNLOCK (element);
-
-    /* need to reset the stream time to 0 after a successfull flushing seek, 
-     * unless the user explicitly disabled this behavior by setting stream 
-     * time to NONE */
-    if (need_reset && res)
-      gst_pipeline_set_new_stream_time (GST_PIPELINE (element), 0);
-
-    if (was_playing)
-      /* and continue playing, this might return ASYNC in which case the
-       * application can wait for the PREROLL to complete after the seek. 
-       */
-      gst_element_set_state (element, GST_STATE_PLAYING);
-  }
-  return res;
-}
-
-static gboolean
-gst_pipeline_send_event (GstElement * element, GstEvent * event)
+/* set the stream time to 0 */
+static void
+reset_stream_time (GstPipeline * pipeline)
 {
-  gboolean res;
-  GstEventType event_type = GST_EVENT_TYPE (event);
-
-  switch (event_type) {
-    case GST_EVENT_SEEK:
-      /* do the default seek handling */
-      res = do_pipeline_seek (element, event);
-      break;
-    default:
-      /* else parent implements the defaults */
-      res = GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
-      break;
+  GST_OBJECT_LOCK (pipeline);
+  if (pipeline->stream_time != GST_CLOCK_TIME_NONE) {
+    GST_DEBUG_OBJECT (pipeline, "reset stream_time to 0");
+    pipeline->stream_time = 0;
+    pipeline->priv->new_stream_time = TRUE;
+  } else {
+    GST_DEBUG_OBJECT (pipeline, "application asked to not reset stream_time");
   }
-
-  return res;
+  GST_OBJECT_UNLOCK (pipeline);
 }
 
 /**
@@ -433,8 +366,11 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
     {
       GstClockTime new_base_time;
+      GstQuery *query;
+      GstClockTime min_latency, max_latency;
       GstClockTime start_time, stream_time, delay;
-      gboolean new_clock;
+      gboolean new_clock, update;
+      gboolean res;
 
       GST_DEBUG_OBJECT (element, "selecting clock and base_time");
 
@@ -452,6 +388,8 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition)
       GST_OBJECT_LOCK (element);
       new_clock = element->clock != clock;
       stream_time = pipeline->stream_time;
+      update = pipeline->priv->new_stream_time;
+      pipeline->priv->new_stream_time = FALSE;
       delay = pipeline->delay;
       GST_OBJECT_UNLOCK (element);
 
@@ -468,25 +406,68 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition)
             gst_message_new_new_clock (GST_OBJECT_CAST (element), clock));
       }
 
-      if (stream_time != GST_CLOCK_TIME_NONE
-          && start_time != GST_CLOCK_TIME_NONE) {
-        new_base_time = start_time - stream_time + delay;
-        GST_DEBUG_OBJECT (element,
-            "stream_time=%" GST_TIME_FORMAT ", now=%" GST_TIME_FORMAT
-            ", base_time %" GST_TIME_FORMAT,
-            GST_TIME_ARGS (stream_time), GST_TIME_ARGS (start_time),
-            GST_TIME_ARGS (new_base_time));
-      } else
-        new_base_time = GST_CLOCK_TIME_NONE;
-
       if (clock)
         gst_object_unref (clock);
 
-      if (new_base_time != GST_CLOCK_TIME_NONE)
-        gst_element_set_base_time (element, new_base_time);
-      else
+      /* stream time changed, either with a PAUSED or a flush, we need to update
+       * the base time */
+      if (update) {
+        GST_DEBUG_OBJECT (pipeline, "stream_time changed, updating base time");
+
+        if (stream_time != GST_CLOCK_TIME_NONE
+            && start_time != GST_CLOCK_TIME_NONE) {
+          new_base_time = start_time - stream_time + delay;
+          GST_DEBUG_OBJECT (element,
+              "stream_time=%" GST_TIME_FORMAT ", now=%" GST_TIME_FORMAT
+              ", base_time %" GST_TIME_FORMAT,
+              GST_TIME_ARGS (stream_time), GST_TIME_ARGS (start_time),
+              GST_TIME_ARGS (new_base_time));
+        } else
+          new_base_time = GST_CLOCK_TIME_NONE;
+
+        if (new_base_time != GST_CLOCK_TIME_NONE)
+          gst_element_set_base_time (element, new_base_time);
+        else
+          GST_DEBUG_OBJECT (pipeline,
+              "NOT adjusting base_time because stream_time is NONE");
+      } else {
         GST_DEBUG_OBJECT (pipeline,
-            "NOT adjusting base time because stream time is NONE");
+            "NOT adjusting base_time because we selected one before");
+      }
+
+      /* determine latency in this pipeline */
+      GST_DEBUG_OBJECT (element, "querying pipeline latency");
+      query = gst_query_new_latency ();
+      if (gst_element_query (element, query)) {
+        gboolean live;
+
+        gst_query_parse_latency (query, &live, &min_latency, &max_latency);
+
+        GST_DEBUG_OBJECT (element,
+            "configuring min latency %" GST_TIME_FORMAT ", max latency %"
+            GST_TIME_FORMAT ", live %d", GST_TIME_ARGS (min_latency),
+            GST_TIME_ARGS (max_latency), live);
+
+        /* configure latency on elements */
+        res =
+            gst_element_send_event (element,
+            gst_event_new_latency (min_latency));
+        if (res) {
+          GST_INFO_OBJECT (element, "configured latency of %" GST_TIME_FORMAT,
+              GST_TIME_ARGS (min_latency));
+        } else {
+          GST_WARNING_OBJECT (element,
+              "failed to configure latency of %" GST_TIME_FORMAT,
+              GST_TIME_ARGS (min_latency));
+          GST_ELEMENT_WARNING (element, CORE, CLOCK, (NULL),
+              ("Failed to configure latency of %" GST_TIME_FORMAT,
+                  GST_TIME_ARGS (min_latency)));
+        }
+      } else {
+        /* this is not a real problem, we just don't configure any latency. */
+        GST_WARNING_OBJECT (element, "failed to query pipeline latency");
+      }
+      gst_query_unref (query);
       break;
     }
     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
@@ -503,17 +484,7 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition)
       break;
     case GST_STATE_CHANGE_READY_TO_PAUSED:
     {
-      gboolean need_reset;
-
-      /* only reset the stream_time when the application did not
-       * specify a stream_time explicitly */
-      GST_OBJECT_LOCK (element);
-      need_reset = pipeline->stream_time != GST_CLOCK_TIME_NONE;
-      GST_OBJECT_UNLOCK (element);
-
-      if (need_reset)
-        gst_pipeline_set_new_stream_time (pipeline, 0);
-
+      reset_stream_time (pipeline);
       break;
     }
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
@@ -532,8 +503,11 @@ gst_pipeline_change_state (GstElement * element, GstStateChange transition)
 
         GST_OBJECT_LOCK (element);
         /* store the current stream time */
-        if (pipeline->stream_time != GST_CLOCK_TIME_NONE)
+        if (pipeline->stream_time != GST_CLOCK_TIME_NONE) {
           pipeline->stream_time = now - element->base_time;
+          pipeline->priv->new_stream_time = TRUE;
+        }
+
         GST_DEBUG_OBJECT (element,
             "stream_time=%" GST_TIME_FORMAT ", now=%" GST_TIME_FORMAT
             ", base_time %" GST_TIME_FORMAT,
@@ -574,6 +548,32 @@ invalid_clock:
   }
 }
 
+static void
+gst_pipeline_handle_message (GstBin * bin, GstMessage * message)
+{
+  GstPipeline *pipeline = GST_PIPELINE_CAST (bin);
+
+  switch (GST_MESSAGE_TYPE (message)) {
+    case GST_MESSAGE_ASYNC_START:
+    {
+      gboolean new_base_time;
+
+      gst_message_parse_async_start (message, &new_base_time);
+
+      /* reset our stream time if we need to distribute a new base_time to the
+       * children. */
+      if (new_base_time)
+        reset_stream_time (pipeline);
+
+      break;
+    }
+    default:
+      break;
+  }
+  GST_BIN_CLASS (parent_class)->handle_message (bin, message);
+}
+
+
 /**
  * gst_pipeline_get_bus:
  * @pipeline: a #GstPipeline
@@ -614,6 +614,7 @@ gst_pipeline_set_new_stream_time (GstPipeline * pipeline, GstClockTime time)
 
   GST_OBJECT_LOCK (pipeline);
   pipeline->stream_time = time;
+  pipeline->priv->new_stream_time = TRUE;
   GST_OBJECT_UNLOCK (pipeline);
 
   GST_DEBUG_OBJECT (pipeline, "set new stream_time to %" GST_TIME_FORMAT,
index ca9da77..3ac30de 100644 (file)
@@ -192,6 +192,8 @@ struct _GstBaseSinkPrivate
 
   /* latency stuff */
   GstClockTime latency;
+
+  gboolean commited;
 };
 
 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
@@ -888,9 +890,10 @@ gst_base_sink_commit_state (GstBaseSink * basesink)
 {
   /* commit state and proceed to next pending state */
   GstState current, next, pending, post_pending;
-  GstMessage *message;
   gboolean post_paused = FALSE;
+  gboolean post_async_done = FALSE;
   gboolean post_playing = FALSE;
+  gboolean sync;
 
   /* we are certainly not playing async anymore now */
   basesink->playing_async = FALSE;
@@ -900,6 +903,7 @@ gst_base_sink_commit_state (GstBaseSink * basesink)
   next = GST_STATE_NEXT (basesink);
   pending = GST_STATE_PENDING (basesink);
   post_pending = pending;
+  sync = basesink->sync;
 
   switch (pending) {
     case GST_STATE_PLAYING:
@@ -912,6 +916,8 @@ gst_base_sink_commit_state (GstBaseSink * basesink)
       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
 
       basesink->need_preroll = FALSE;
+      post_async_done = TRUE;
+      basesink->priv->commited = TRUE;
       post_playing = TRUE;
       /* post PAUSED too when we were READY */
       if (current == GST_STATE_READY) {
@@ -929,6 +935,8 @@ gst_base_sink_commit_state (GstBaseSink * basesink)
     case GST_STATE_PAUSED:
       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
       post_paused = TRUE;
+      post_async_done = TRUE;
+      basesink->priv->commited = TRUE;
       post_pending = GST_STATE_VOID_PENDING;
       break;
     case GST_STATE_READY:
@@ -947,19 +955,18 @@ gst_base_sink_commit_state (GstBaseSink * basesink)
   GST_OBJECT_UNLOCK (basesink);
 
   if (post_paused) {
-    message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
-        current, next, post_pending);
-    gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
+    gst_element_post_message (GST_ELEMENT_CAST (basesink),
+        gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
+            current, next, post_pending));
   }
-  if (post_playing) {
-    message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
-        next, pending, GST_STATE_VOID_PENDING);
-    gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
+  if (post_async_done) {
+    gst_element_post_message (GST_ELEMENT_CAST (basesink),
+        gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
   }
-  /* and mark dirty */
-  if (post_paused || post_playing) {
+  if (post_playing) {
     gst_element_post_message (GST_ELEMENT_CAST (basesink),
-        gst_message_new_state_dirty (GST_OBJECT_CAST (basesink)));
+        gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
+            next, pending, GST_STATE_VOID_PENDING));
   }
 
   GST_STATE_BROADCAST (basesink);
@@ -2724,6 +2731,13 @@ gst_base_sink_query (GstElement * element, GstQuery * query)
       gboolean live, us_live;
       GstClockTime min, max;
 
+      GST_PAD_PREROLL_LOCK (basesink->sinkpad);
+      if (!gst_base_sink_is_prerolled (basesink)) {
+        GST_DEBUG_OBJECT (basesink, "not prerolled yet, can't report latency");
+        res = FALSE;
+        goto done;
+      }
+
       if ((res =
               gst_base_sink_query_latency (basesink, &live, &us_live, &min,
                   &max))) {
@@ -2737,6 +2751,8 @@ gst_base_sink_query (GstElement * element, GstQuery * query)
         }
         gst_query_set_latency (query, live, min, max);
       }
+    done:
+      GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
       break;
     }
     case GST_QUERY_JITTER:
@@ -2780,6 +2796,7 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
     case GST_STATE_CHANGE_READY_TO_PAUSED:
       /* need to complete preroll before this state change completes, there
        * is no data flow in READY so we can safely assume we need to preroll. */
+      GST_PAD_PREROLL_LOCK (basesink->sinkpad);
       GST_DEBUG_OBJECT (basesink, "READY to PAUSED, need preroll");
       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
       gst_segment_init (basesink->abidata.ABI.clip_segment,
@@ -2795,7 +2812,11 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
       basesink->priv->latency = 0;
       basesink->eos = FALSE;
       gst_base_sink_reset_qos (basesink);
+      basesink->priv->commited = FALSE;
       ret = GST_STATE_CHANGE_ASYNC;
+      gst_element_post_message (GST_ELEMENT_CAST (basesink),
+          gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
+      GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
@@ -2817,7 +2838,10 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, need preroll");
         basesink->need_preroll = TRUE;
         basesink->playing_async = TRUE;
+        basesink->priv->commited = FALSE;
         ret = GST_STATE_CHANGE_ASYNC;
+        gst_element_post_message (GST_ELEMENT_CAST (basesink),
+            gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
       }
       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
       break;
@@ -2857,7 +2881,10 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
       } else {
         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, need preroll");
         basesink->playing_async = TRUE;
+        basesink->priv->commited = FALSE;
         ret = GST_STATE_CHANGE_ASYNC;
+        gst_element_post_message (GST_ELEMENT_CAST (basesink),
+            gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
       }
       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
           ", dropped: %" G_GUINT64_FORMAT, basesink->priv->rendered,
@@ -2867,8 +2894,24 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
+      GST_PAD_PREROLL_LOCK (basesink->sinkpad);
+      if (!basesink->priv->commited) {
+        GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
+
+        gst_element_post_message (GST_ELEMENT_CAST (basesink),
+            gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
+                GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
+
+        gst_element_post_message (GST_ELEMENT_CAST (basesink),
+            gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
+
+        basesink->priv->commited = TRUE;
+      } else {
+        GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
+      }
       basesink->priv->current_sstart = 0;
       basesink->priv->current_sstop = 0;
+      GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
       break;
     case GST_STATE_CHANGE_READY_TO_NULL:
       if (bclass->stop)
index 7cf5bea..bacfd7f 100644 (file)
@@ -313,7 +313,7 @@ GST_START_TEST (test_livesrc_sink)
         if (n_sink == 2) {
           fail_unless (old == GST_STATE_READY, "unexpected old");
           fail_unless (new == GST_STATE_PAUSED, "unexpected new");
-          fail_unless (pending == GST_STATE_PLAYING, "unexpected pending");
+          fail_unless (pending == GST_STATE_VOID_PENDING, "unexpected pending");
         } else if (n_sink == 1) {
           fail_unless (old == GST_STATE_PAUSED, "unexpected old");
           fail_unless (new == GST_STATE_PLAYING, "unexpected new");
index 61612b3..4843717 100644 (file)
 #include <gst/check/gstcheck.h>
 
 static void
+pop_async_done (GstBus * bus)
+{
+  GstMessage *message;
+
+  GST_DEBUG ("popping async-done message");
+  message = gst_bus_poll (bus, GST_MESSAGE_ASYNC_DONE, -1);
+
+  fail_unless (message && GST_MESSAGE_TYPE (message)
+      == GST_MESSAGE_ASYNC_DONE, "did not get GST_MESSAGE_ASYNC_DONE");
+
+  gst_message_unref (message);
+  GST_DEBUG ("popped message");
+}
+
+static void
 pop_messages (GstBus * bus, int count)
 {
   GstMessage *message;
@@ -277,7 +292,7 @@ GST_START_TEST (test_message_state_changed_children)
   fail_unless (pending == GST_STATE_VOID_PENDING);
 
   /* wait for async thread to settle down */
-  while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 2)
+  while (GST_OBJECT_REFCOUNT_VALUE (pipeline) > 3)
     THREAD_SWITCH ();
 
   /* each object is referenced by a message;
@@ -291,6 +306,7 @@ GST_START_TEST (test_message_state_changed_children)
   ASSERT_OBJECT_REFCOUNT_BETWEEN (pipeline, "pipeline", 2, 3);
 
   pop_messages (bus, 3);
+  pop_async_done (bus);
   fail_if ((gst_bus_pop (bus)) != NULL);
 
   ASSERT_OBJECT_REFCOUNT (bus, "bus", 2);
@@ -390,6 +406,7 @@ GST_START_TEST (test_watch_for_state_change)
   fail_unless (ret == GST_STATE_CHANGE_SUCCESS);
 
   pop_messages (bus, 6);
+  pop_async_done (bus);
 
   fail_unless (gst_bus_have_pending (bus) == FALSE,
       "Unexpected messages on bus");
@@ -400,10 +417,12 @@ GST_START_TEST (test_watch_for_state_change)
   pop_messages (bus, 3);
 
   /* this one might return either SUCCESS or ASYNC, likely SUCCESS */
-  gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED);
+  ret = gst_element_set_state (GST_ELEMENT (bin), GST_STATE_PAUSED);
   gst_element_get_state (GST_ELEMENT (bin), NULL, NULL, GST_CLOCK_TIME_NONE);
 
   pop_messages (bus, 3);
+  if (ret == GST_STATE_CHANGE_ASYNC)
+    pop_async_done (bus);
 
   fail_unless (gst_bus_have_pending (bus) == FALSE,
       "Unexpected messages on bus");
@@ -521,6 +540,7 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
 
   src = gst_element_factory_make ("fakesrc", NULL);
   fail_if (src == NULL, "Could not create fakesrc");
+  g_object_set (src, "num-buffers", 5, NULL);
 
   identity = gst_element_factory_make ("identity", NULL);
   fail_if (identity == NULL, "Could not create identity");
@@ -552,6 +572,7 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
   /* READY => PAUSED */
   /* because of pre-rolling, sink will return ASYNC on state
    * change and change state later when it has a buffer */
+  GST_DEBUG ("popping READY -> PAUSED messages");
   ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_READY, GST_STATE_PAUSED,
       105);
 #if 0
@@ -561,20 +582,21 @@ GST_START_TEST (test_children_state_change_order_flagged_sink)
    * in which case the sink will commit its new state before the source ...  */
   ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_READY, GST_STATE_PAUSED, 106);
   ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 107);
+#else
+
+  pop_messages (bus, 2);        /* pop remaining ready => paused messages off the bus */
   ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
       108);
-
+  pop_async_done (bus);
+#endif
   /* PAUSED => PLAYING */
+  GST_DEBUG ("popping PAUSED -> PLAYING messages");
   ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_PAUSED, GST_STATE_PLAYING, 109);
   ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_PAUSED, GST_STATE_PLAYING,
       110);
   ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_PAUSED, GST_STATE_PLAYING, 111);
   ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_PAUSED, GST_STATE_PLAYING,
       112);
-#else
-  pop_messages (bus, 3);        /* pop remaining ready => paused messages off the bus */
-  pop_messages (bus, 4);        /* pop paused => playing messages off the bus */
-#endif
 
   /* don't set to NULL that will set the bus flushing and kill our messages */
   ret = gst_element_set_state (pipeline, GST_STATE_READY);
@@ -656,6 +678,7 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
   /* READY => PAUSED */
   /* because of pre-rolling, sink will return ASYNC on state
    * change and change state later when it has a buffer */
+  GST_DEBUG ("popping READY -> PAUSED messages");
   ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_READY, GST_STATE_PAUSED,
       205);
 #if 0
@@ -665,19 +688,20 @@ GST_START_TEST (test_children_state_change_order_semi_sink)
    * in which case the sink will commit its new state before the source ...  */
   ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_READY, GST_STATE_PAUSED, 206);
   ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_READY, GST_STATE_PAUSED, 207);
+#else
+  pop_messages (bus, 2);        /* pop remaining ready => paused messages off the bus */
   ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_READY, GST_STATE_PAUSED,
       208);
+  pop_async_done (bus);
 
   /* PAUSED => PLAYING */
+  GST_DEBUG ("popping PAUSED -> PLAYING messages");
   ASSERT_STATE_CHANGE_MSG (bus, sink, GST_STATE_PAUSED, GST_STATE_PLAYING, 209);
   ASSERT_STATE_CHANGE_MSG (bus, identity, GST_STATE_PAUSED, GST_STATE_PLAYING,
       210);
   ASSERT_STATE_CHANGE_MSG (bus, src, GST_STATE_PAUSED, GST_STATE_PLAYING, 211);
   ASSERT_STATE_CHANGE_MSG (bus, pipeline, GST_STATE_PAUSED, GST_STATE_PLAYING,
       212);
-#else
-  pop_messages (bus, 3);        /* pop remaining ready => paused messages off the bus */
-  pop_messages (bus, 4);        /* pop paused => playing messages off the bus */
 #endif
 
   /* don't set to NULL that will set the bus flushing and kill our messages */
index ca7f19a..eeafe90 100644 (file)
@@ -88,8 +88,9 @@ GST_START_TEST (test_pipeline_unref)
   sink = gst_bin_get_by_name (GST_BIN (pipeline), "sink");
   fail_if (sink == NULL);
 
-  run_pipeline (pipeline, s, GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED,
-      GST_MESSAGE_EOS);
+  run_pipeline (pipeline, s,
+      GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
+      GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_EOS);
   while (GST_OBJECT_REFCOUNT_VALUE (src) > 1)
     THREAD_SWITCH ();
   ASSERT_OBJECT_REFCOUNT (src, "src", 1);
index 630637b..8c1306d 100644 (file)
@@ -97,24 +97,28 @@ GST_START_TEST (test_2_elements)
 
   s = "fakesrc can-activate-push=false ! fakesink can-activate-pull=true";
   run_pipeline (setup_pipeline (s), s,
-      GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN);
+      GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
+      GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_UNKNOWN);
 
   s = "fakesrc can-activate-push=true ! fakesink can-activate-pull=false";
   run_pipeline (setup_pipeline (s), s,
-      GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN);
+      GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
+      GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_UNKNOWN);
 
   s = "fakesrc can-activate-push=false num-buffers=10 ! fakesink can-activate-pull=true";
   run_pipeline (setup_pipeline (s), s,
-      GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_EOS);
+      GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
+      GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_EOS);
 
   s = "fakesrc can-activate-push=true num-buffers=10 ! fakesink can-activate-pull=false";
   run_pipeline (setup_pipeline (s), s,
-      GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_EOS);
+      GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
+      GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_EOS);
 
   s = "fakesrc can-activate-push=false ! fakesink can-activate-pull=false";
   ASSERT_CRITICAL (run_pipeline (setup_pipeline (s), s,
-          GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED,
-          GST_MESSAGE_UNKNOWN));
+          GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED |
+          GST_MESSAGE_ASYNC_DONE, GST_MESSAGE_UNKNOWN));
 }
 
 GST_END_TEST;