check/: Small state change torture test.
authorWim Taymans <wim.taymans@gmail.com>
Tue, 18 Oct 2005 17:06:29 +0000 (17:06 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Tue, 18 Oct 2005 17:06:29 +0000 (17:06 +0000)
Original commit message from CVS:
* check/Makefile.am:
* check/pipelines/stress.c: (GST_START_TEST),
(simple_launch_lines_suite), (main):
Small state change torture test.

* docs/design/part-states.txt:
* gst/base/gstbasesink.c: (gst_base_sink_commit_state),
(gst_base_sink_handle_object), (gst_base_sink_event), (do_playing),
(gst_base_sink_change_state):
Never take state lock from streaming thread, clean up ugly
hacks. Unfortunatly core does not yet support nice ways to
async commit state.

* gst/gstbin.c: (gst_bin_remove_func), (gst_bin_recalc_state),
(bin_bus_handler):
Start state recalc if a STATE_DIRTY message is posted, but only
on the toplevel bin.

* gst/gstelement.c: (gst_element_sync_state_with_parent),
(gst_element_get_state_func), (gst_element_abort_state),
(gst_element_commit_state), (gst_element_lost_state),
(gst_element_set_state_func), (gst_element_change_state):
* gst/gstelement.h:
State variables are now protected with the LOCK, the state
lock is only used to serialize _set_state().

ChangeLog
check/Makefile.am
check/pipelines/stress.c [new file with mode: 0644]
docs/design/part-states.txt
gst/base/gstbasesink.c
gst/gstbin.c
gst/gstelement.c
gst/gstelement.h
libs/gst/base/gstbasesink.c
tests/check/Makefile.am
tests/check/pipelines/stress.c [new file with mode: 0644]

index 26323d8ce33410d0ade480046515faf261f410c4..e7e6c57f1ba7004ea4a6aa1f63f3a9be0edd6678 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,31 @@
+2005-10-18  Wim Taymans  <wim@fluendo.com>
+
+       * check/Makefile.am:
+       * check/pipelines/stress.c: (GST_START_TEST),
+       (simple_launch_lines_suite), (main):
+       Small state change torture test.
+
+       * docs/design/part-states.txt:
+       * gst/base/gstbasesink.c: (gst_base_sink_commit_state),
+       (gst_base_sink_handle_object), (gst_base_sink_event), (do_playing),
+       (gst_base_sink_change_state):
+       Never take state lock from streaming thread, clean up ugly
+       hacks. Unfortunatly core does not yet support nice ways to
+       async commit state.
+       
+       * gst/gstbin.c: (gst_bin_remove_func), (gst_bin_recalc_state),
+       (bin_bus_handler):
+       Start state recalc if a STATE_DIRTY message is posted, but only
+       on the toplevel bin.
+
+       * gst/gstelement.c: (gst_element_sync_state_with_parent),
+       (gst_element_get_state_func), (gst_element_abort_state),
+       (gst_element_commit_state), (gst_element_lost_state),
+       (gst_element_set_state_func), (gst_element_change_state):
+       * gst/gstelement.h:
+       State variables are now protected with the LOCK, the state
+       lock is only used to serialize _set_state().
+
 2005-10-18  Wim Taymans  <wim@fluendo.com>
 
        * check/gst/gstbin.c: (GST_START_TEST):
index bee868c2ea4ac5fa27d4a19567190e9b53f780dc..0dff8e3dbfd747c1306ca49f552eb15a36373c96 100644 (file)
@@ -52,6 +52,7 @@ check_PROGRAMS =                              \
        elements/identity                       \
        generic/states                          \
        pipelines/simple_launch_lines           \
+       pipelines/stress                        \
        pipelines/cleanup                       \
        states/sinks                            \
        gst-libs/controller                     \
diff --git a/check/pipelines/stress.c b/check/pipelines/stress.c
new file mode 100644 (file)
index 0000000..eb0ade1
--- /dev/null
@@ -0,0 +1,88 @@
+/* GStreamer
+ * Copyright (C) 2005 Andy Wingo <wingo@pobox.com>
+ *
+ * simple_launch_lines.c: Unit test for simple pipelines
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+
+#include <gst/check/gstcheck.h>
+
+
+GST_START_TEST (test_stress)
+{
+  GstElement *fakesrc, *fakesink, *pipeline;
+  gint i;
+
+  fakesrc = gst_element_factory_make ("fakesrc", NULL);
+  fakesink = gst_element_factory_make ("fakesink", NULL);
+  pipeline = gst_element_factory_make ("pipeline", NULL);
+
+  g_return_if_fail (fakesrc && fakesink && pipeline);
+
+  gst_bin_add_many (GST_BIN (pipeline), fakesrc, fakesink, NULL);
+  gst_element_link (fakesrc, fakesink);
+
+  i = 10000;
+  while (i--) {
+    gst_element_set_state (pipeline, GST_STATE_PAUSED);
+    gst_element_set_state (pipeline, GST_STATE_PLAYING);
+
+    gst_element_set_state (pipeline, GST_STATE_PAUSED);
+    gst_element_set_state (pipeline, GST_STATE_PLAYING);
+    gst_element_set_state (pipeline, GST_STATE_PAUSED);
+    gst_element_set_state (pipeline, GST_STATE_READY);
+    gst_element_set_state (pipeline, GST_STATE_PLAYING);
+    gst_element_set_state (pipeline, GST_STATE_PAUSED);
+    gst_element_set_state (pipeline, GST_STATE_READY);
+    gst_element_set_state (pipeline, GST_STATE_PAUSED);
+    gst_element_set_state (pipeline, GST_STATE_NULL);
+  }
+
+  gst_object_unref (pipeline);
+}
+
+GST_END_TEST Suite *
+simple_launch_lines_suite (void)
+{
+  Suite *s = suite_create ("stress");
+  TCase *tc_chain = tcase_create ("linear");
+
+  /* time out after 20s, not the default 3 */
+  tcase_set_timeout (tc_chain, 0);
+
+  suite_add_tcase (s, tc_chain);
+  tcase_add_test (tc_chain, test_stress);
+  return s;
+}
+
+int
+main (int argc, char **argv)
+{
+  int nf;
+
+  Suite *s = simple_launch_lines_suite ();
+  SRunner *sr = srunner_create (s);
+
+  gst_check_init (&argc, &argv);
+
+  srunner_run_all (sr, CK_NORMAL);
+  nf = srunner_ntests_failed (sr);
+  srunner_free (sr);
+
+  return nf;
+}
index 5b49e2b4ce82cdad2e634f8c480134ef15f65fff..7833397f0afb7f2251aa8532adf8fe84d251dd8e 100644 (file)
@@ -363,10 +363,32 @@ Locking overview (element)
                                                       STREAM_LOCK
                                                            | ...
                                                       STREAM_UNLOCK
-                                     
-       
-       
-       
-               
+
+*********************************************
+*********************************************
+
+set_state cannot be called from multiple threads at the same time. The STATE_LOCK
+prevents this.
+
+state variables are protected with the LOCK.
+
+calling set_state while gst_state is called should unlock the get_state with
+an error. The cookie will do that.
+
+
+ set_state(element)
+
+  STATE_LOCK
+
+  LOCK
+  update current, next, pending state
+  cookie++
+  UNLOCK
+
+  change_state
+
+  STATE_UNLOCK
    
 
+
index 41a49a26e42c13947bbda41cfcc396c31ad54258..d370d466251e3df3656cc0bf0ea2ec7d391c6e2b 100644 (file)
@@ -98,6 +98,8 @@ gst_base_sink_get_type (void)
   return base_sink_type;
 }
 
+static GstStateChangeReturn do_playing (GstBaseSink * sink);
+
 static void gst_base_sink_set_clock (GstElement * element, GstClock * clock);
 
 static void gst_base_sink_set_property (GObject * object, guint prop_id,
@@ -441,6 +443,74 @@ gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
   GST_PREROLL_SIGNAL (pad);
 }
 
+static gboolean
+gst_base_sink_commit_state (GstBaseSink * basesink)
+{
+  /* commit state and proceed to next pending state */
+  {
+    GstState current, next, pending;
+    GstMessage *message;
+    gboolean post_paused = FALSE;
+    gboolean post_playing = FALSE;
+
+    GST_LOCK (basesink);
+    current = GST_STATE (basesink);
+    next = GST_STATE_NEXT (basesink);
+    pending = GST_STATE_PENDING (basesink);
+
+    switch (pending) {
+      case GST_STATE_PLAYING:
+        do_playing (basesink);
+        post_playing = TRUE;
+        break;
+      case GST_STATE_PAUSED:
+        basesink->need_preroll = TRUE;
+        post_paused = TRUE;
+        break;
+      case GST_STATE_READY:
+        goto stopping;
+      default:
+        break;
+    }
+
+    if (pending != GST_STATE_VOID_PENDING) {
+      GST_STATE (basesink) = pending;
+      GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
+      GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
+      GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
+
+      pending = GST_STATE_VOID_PENDING;
+    }
+    GST_UNLOCK (basesink);
+
+    if (post_paused) {
+      message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
+          current, next, GST_STATE_VOID_PENDING);
+      gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
+    }
+    if (post_playing) {
+      message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
+          next, pending, GST_STATE_VOID_PENDING);
+      gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
+    }
+    /* and mark dirty */
+    if (post_paused || post_playing) {
+      gst_element_post_message (GST_ELEMENT_CAST (basesink),
+          gst_message_new_state_dirty (GST_OBJECT_CAST (basesink)));
+    }
+
+    GST_STATE_BROADCAST (basesink);
+  }
+  return TRUE;
+
+stopping:
+  {
+    /* app is going to READY */
+    GST_UNLOCK (basesink);
+    return FALSE;
+  }
+}
+
 /* with STREAM_LOCK */
 static GstFlowReturn
 gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad,
@@ -601,14 +671,12 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad,
     basesink->preroll_queued++;
     basesink->buffers_queued++;
   }
+
   GST_DEBUG_OBJECT (basesink,
       "now %d preroll, %d buffers, %d events on queue",
       basesink->preroll_queued,
       basesink->buffers_queued, basesink->events_queued);
 
-  if (basesink->playing_async)
-    goto playing_async;
-
   /* check if we are prerolling */
   if (!basesink->need_preroll)
     goto no_preroll;
@@ -633,60 +701,23 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad,
   GST_DEBUG_OBJECT (basesink, "prerolled length %d", length);
 
   if (length == 1) {
-    gint t;
-    GstTask *task;
 
     basesink->have_preroll = TRUE;
-    /* we are prerolling */
-    GST_PREROLL_UNLOCK (pad);
-
-    /* have to release STREAM_LOCK as we cannot take the STATE_LOCK
-     * inside the STREAM_LOCK */
-    t = GST_STREAM_UNLOCK_FULL (pad);
-    GST_DEBUG_OBJECT (basesink, "released stream lock %d times", t);
-    if (t <= 0) {
-      GST_WARNING ("STREAM_LOCK should have been locked !!");
-      g_warning ("STREAM_LOCK should have been locked !!");
-    }
-
-    /* now we commit our state, this will also automatically proceed to
-     * the next pending state. */
-    /* FIXME */
-    if ((task = GST_PAD_TASK (pad))) {
-      while (!GST_STATE_TRYLOCK (basesink)) {
-        GST_DEBUG_OBJECT (basesink,
-            "state change happening, checking shutdown");
-        GST_LOCK (pad);
-        if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
-          goto task_stopped;
-        GST_UNLOCK (pad);
-      }
-    } else {
-      GST_STATE_LOCK (basesink);
-    }
-    GST_DEBUG_OBJECT (basesink, "commit state");
-    gst_element_commit_state (GST_ELEMENT (basesink));
-    GST_STATE_UNLOCK (basesink);
-
-    /* reacquire stream lock, pad could be flushing now */
-    /* FIXME in glib, if t==0, the lock is still taken... hmmm.. bug #317802 */
-    if (t > 0)
-      GST_STREAM_LOCK_FULL (pad, t);
 
-    /* and wait if needed */
-    GST_PREROLL_LOCK (pad);
+    /* commit state */
+    if (!gst_base_sink_commit_state (basesink))
+      goto stopping;
 
     GST_LOCK (pad);
     if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
       goto flushing;
     GST_UNLOCK (pad);
 
-    /* it is possible that the application set the state to PLAYING
+    /* it is possible that commiting the state made us go to PLAYING
      * now in which case we don't need to block anymore. */
     if (!basesink->need_preroll)
       goto no_preroll;
 
-
     length = basesink->preroll_queued;
 
     /* FIXME: a pad probe could have made us lose the buffer, according
@@ -706,12 +737,11 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad,
     GST_DEBUG_OBJECT (basesink, "waiting to finish preroll");
     GST_PREROLL_WAIT (pad);
     GST_DEBUG_OBJECT (basesink, "done preroll");
+    GST_LOCK (pad);
+    if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
+      goto flushing;
+    GST_UNLOCK (pad);
   }
-  GST_LOCK (pad);
-  if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
-    goto flushing;
-  GST_UNLOCK (pad);
-
   GST_PREROLL_UNLOCK (pad);
 
   return GST_FLOW_OK;
@@ -740,74 +770,29 @@ dropping:
 
     return GST_FLOW_OK;
   }
-playing_async:
+flushing:
   {
-    GstFlowReturn ret;
-    gint t;
-
-    basesink->have_preroll = FALSE;
-    basesink->playing_async = FALSE;
-
-    /* handle buffer first */
-    ret = gst_base_sink_preroll_queue_empty (basesink, pad);
-
-    /* unroll locks, commit state, reacquire stream lock */
+    GST_UNLOCK (pad);
+    gst_base_sink_preroll_queue_flush (basesink, pad);
     GST_PREROLL_UNLOCK (pad);
-    t = GST_STREAM_UNLOCK_FULL (pad);
-    GST_DEBUG_OBJECT (basesink, "released stream lock %d times", t);
-    if (t <= 0) {
-      GST_WARNING ("STREAM_LOCK should have been locked !!");
-      g_warning ("STREAM_LOCK should have been locked !!");
-    }
-    GST_STATE_LOCK (basesink);
-    GST_DEBUG_OBJECT (basesink, "commit state");
-    gst_element_commit_state (GST_ELEMENT (basesink));
-    GST_STATE_UNLOCK (basesink);
-    if (t > 0)
-      GST_STREAM_LOCK_FULL (pad, t);
+    GST_DEBUG_OBJECT (basesink, "pad is flushing");
 
-    return ret;
-  }
-task_stopped:
-  {
-    GST_UNLOCK (pad);
-    GST_DEBUG_OBJECT (basesink, "task is stopped");
     return GST_FLOW_WRONG_STATE;
   }
-flushing:
+stopping:
   {
-    GST_UNLOCK (pad);
-    gst_base_sink_preroll_queue_flush (basesink, pad);
     GST_PREROLL_UNLOCK (pad);
-    GST_DEBUG_OBJECT (basesink, "pad is flushing");
+    GST_DEBUG_OBJECT (basesink, "stopping");
+
     return GST_FLOW_WRONG_STATE;
   }
 preroll_failed:
   {
-    gint t;
-
     GST_DEBUG_OBJECT (basesink, "preroll failed");
     gst_base_sink_preroll_queue_flush (basesink, pad);
-    GST_PREROLL_UNLOCK (pad);
-
-    /* have to release STREAM_LOCK as we cannot take the STATE_LOCK
-     * inside the STREAM_LOCK */
-    t = GST_STREAM_UNLOCK_FULL (pad);
-    GST_DEBUG_OBJECT (basesink, "released stream lock %d times", t);
-    if (t <= 0) {
-      GST_WARNING ("STREAM_LOCK should have been locked !!");
-      g_warning ("STREAM_LOCK should have been locked !!");
-    }
 
-    /* now we abort our state */
-    GST_STATE_LOCK (basesink);
     GST_DEBUG_OBJECT (basesink, "abort state");
     gst_element_abort_state (GST_ELEMENT (basesink));
-    GST_STATE_UNLOCK (basesink);
-
-    /* reacquire stream lock, pad could be flushing now */
-    if (t > 0)
-      GST_STREAM_LOCK_FULL (pad, t);
 
     return GST_FLOW_ERROR;
   }
@@ -871,11 +856,9 @@ gst_base_sink_event (GstPad * pad, GstEvent * event)
 
       /* and we need to commit our state again on the next
        * prerolled buffer */
-      GST_STATE_LOCK (basesink);
       GST_STREAM_LOCK (pad);
       gst_element_lost_state (GST_ELEMENT (basesink));
       GST_STREAM_UNLOCK (pad);
-      GST_STATE_UNLOCK (basesink);
       GST_DEBUG_OBJECT (basesink, "event unref %p %p", basesink, event);
       gst_event_unref (event);
       break;
@@ -1450,6 +1433,50 @@ gst_base_sink_query (GstElement * element, GstQuery * query)
   return res;
 }
 
+/* with PREROLL_LOCK */
+static GstStateChangeReturn
+do_playing (GstBaseSink * basesink)
+{
+  GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+
+  /* no preroll needed */
+  basesink->need_preroll = FALSE;
+
+  /* if we have EOS, we should empty the queue now as there will
+   * be no more data received in the chain function.
+   * FIXME, this could block the state change function too long when
+   * we are pushing and syncing the buffers, better start a new
+   * thread to do this. */
+  if (basesink->eos) {
+    gboolean do_eos = !basesink->eos_queued;
+
+    gst_base_sink_preroll_queue_empty (basesink, basesink->sinkpad);
+
+    /* need to post EOS message here if it was not in the preroll queue we
+     * just emptied. */
+    if (do_eos) {
+      GST_DEBUG_OBJECT (basesink, "Now posting EOS");
+      gst_element_post_message (GST_ELEMENT (basesink),
+          gst_message_new_eos (GST_OBJECT (basesink)));
+    }
+  } else if (!basesink->have_preroll) {
+    /* don't need preroll, but do queue a commit_state */
+    basesink->need_preroll = TRUE;
+    GST_DEBUG_OBJECT (basesink,
+        "PAUSED to PLAYING, !eos, !have_preroll, need preroll to FALSE");
+    ret = GST_STATE_CHANGE_ASYNC;
+    /* we know it's not waiting, no need to signal */
+  } else {
+    /* don't need the preroll anymore */
+    GST_DEBUG_OBJECT (basesink,
+        "PAUSED to PLAYING, !eos, have_preroll, need preroll to FALSE");
+    /* now let it play */
+    GST_PREROLL_SIGNAL (basesink->sinkpad);
+  }
+
+  return ret;
+}
+
 static GstStateChangeReturn
 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
 {
@@ -1484,40 +1511,7 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
       GST_PREROLL_LOCK (basesink->sinkpad);
-      /* if we have EOS, we should empty the queue now as there will
-       * be no more data received in the chain function.
-       * FIXME, this could block the state change function too long when
-       * we are pushing and syncing the buffers, better start a new
-       * thread to do this. */
-      if (basesink->eos) {
-        gboolean do_eos = !basesink->eos_queued;
-
-        gst_base_sink_preroll_queue_empty (basesink, basesink->sinkpad);
-        basesink->need_preroll = FALSE;
-
-        /* need to post EOS message here if it was not in the preroll queue we
-         * just emptied. */
-        if (do_eos) {
-          GST_DEBUG_OBJECT (basesink, "Now posting EOS");
-          gst_element_post_message (GST_ELEMENT (basesink),
-              gst_message_new_eos (GST_OBJECT (basesink)));
-        }
-      } else if (!basesink->have_preroll) {
-        /* don't need preroll, but do queue a commit_state */
-        GST_DEBUG_OBJECT (basesink,
-            "PAUSED to PLAYING, !eos, !have_preroll, need preroll to FALSE");
-        basesink->need_preroll = FALSE;
-        basesink->playing_async = TRUE;
-        ret = GST_STATE_CHANGE_ASYNC;
-        /* we know it's not waiting, no need to signal */
-      } else {
-        /* don't need the preroll anymore */
-        basesink->need_preroll = FALSE;
-        GST_DEBUG_OBJECT (basesink,
-            "PAUSED to PLAYING, !eos, have_preroll, need preroll to FALSE");
-        /* now let it play */
-        GST_PREROLL_SIGNAL (basesink->sinkpad);
-      }
+      ret = do_playing (basesink);
       GST_PREROLL_UNLOCK (basesink->sinkpad);
       break;
     default:
@@ -1547,8 +1541,6 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
       }
       GST_UNLOCK (basesink);
 
-      basesink->playing_async = FALSE;
-
       /* unlock any subclasses */
       if (bclass->unlock)
         bclass->unlock (basesink);
index d85b210f8f5105c5f6adee4d3f2e95ec08e98b87..259018dfa83ac34075fdf36345cd35bd30b42e64 100644 (file)
@@ -677,9 +677,9 @@ gst_bin_remove_func (GstBin * bin, GstElement * element)
    * we are waiting for an ASYNC state change on this element. The
    * element cannot be added to another bin yet as it is not yet
    * unparented. */
-  GST_STATE_LOCK (element);
+  GST_LOCK (element);
   GST_STATE_BROADCAST (element);
-  GST_STATE_UNLOCK (element);
+  GST_UNLOCK (element);
 
   /* we ref here because after the _unparent() the element can be disposed
    * and we still need it to reset the UNPARENTING flag and fire a signal. */
@@ -944,6 +944,7 @@ gst_bin_recalc_state (GstBin * bin, gboolean force)
   GstStateChangeReturn ret;
   GList *children;
   guint32 children_cookie;
+  guint32 state_cookie;
   gboolean have_no_preroll;
   gboolean have_async;
 
@@ -964,6 +965,7 @@ gst_bin_recalc_state (GstBin * bin, gboolean force)
     goto was_polling;
 
   bin->polling = TRUE;
+  state_cookie = GST_ELEMENT_CAST (bin)->state_cookie;
 
   GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "recalc state");
 
@@ -1002,6 +1004,10 @@ restart:
       GST_DEBUG_OBJECT (bin, "children added or removed, restarting recalc");
       goto restart;
     }
+    if (state_cookie != GST_ELEMENT_CAST (bin)->state_cookie) {
+      GST_DEBUG_OBJECT (bin, "concurrent state change");
+      goto concurrent_state;
+    }
     if (bin->state_dirty) {
       GST_DEBUG_OBJECT (bin, "state dirty again, restarting recalc");
       goto restart;
@@ -1045,7 +1051,6 @@ done:
    * are added now and we still report the old state. No problem though as
    * the return is still consistent, the effect is as if the element was
    * added after this function completed. */
-  GST_STATE_LOCK (bin);
   switch (ret) {
     case GST_STATE_CHANGE_SUCCESS:
     case GST_STATE_CHANGE_NO_PREROLL:
@@ -1060,9 +1065,10 @@ done:
     default:
       goto unknown_state;
   }
+
   GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "return now %d",
       GST_STATE_RETURN (bin));
-  GST_STATE_UNLOCK (bin);
+
   return;
 
 not_dirty:
@@ -1077,6 +1083,13 @@ was_polling:
     GST_UNLOCK (bin);
     return;
   }
+concurrent_state:
+  {
+    GST_CAT_INFO_OBJECT (GST_CAT_STATES, bin, "concurrent_state");
+    bin->polling = FALSE;
+    GST_UNLOCK (bin);
+    return;
+  }
 unknown_state:
   {
     /* somebody added a GST_STATE_ and forgot to do stuff here ! */
@@ -1622,60 +1635,41 @@ bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin)
       gst_message_unref (message);
       break;
     }
-    case GST_MESSAGE_STATE_CHANGED:
+    case GST_MESSAGE_STATE_DIRTY:
     {
-      GstState old, new, pending;
       GstObject *src;
+      GstBinClass *klass;
 
-      gst_message_parse_state_changed (message, &old, &new, &pending);
       src = GST_MESSAGE_SRC (message);
-      /* ref src, as we need it after we post the message up */
-      gst_object_ref (src);
-
-      GST_DEBUG_OBJECT (bin, "%s gave state change, %s -> %s, pending %s",
-          GST_ELEMENT_NAME (src),
-          gst_element_state_get_name (old),
-          gst_element_state_get_name (new),
-          gst_element_state_get_name (pending));
 
-      /* post message up */
-      gst_element_post_message (GST_ELEMENT_CAST (bin), message);
+      GST_DEBUG_OBJECT (bin, "%s gave state dirty", GST_ELEMENT_NAME (src));
 
-      /* we only act on our own children */
+      /* mark the bin dirty */
       GST_LOCK (bin);
-      if (!g_list_find (bin->children, src))
-        goto not_our_child;
-      GST_UNLOCK (bin);
-
-      gst_object_unref (src);
+      GST_DEBUG_OBJECT (bin, "marking dirty");
+      bin->state_dirty = TRUE;
 
-      /* we can lock, either the state change is sync and we can
-       * recursively lock or the state change is async and we
-       * lock when the bin has done its state change. We can check which
-       * case it is by looking at the CHANGING_STATE flag. */
-      GST_STATE_LOCK (bin);
-      GST_DEBUG_OBJECT (bin, "locked");
+      if (GST_OBJECT_PARENT (bin))
+        goto not_toplevel;
 
-      GST_LOCK (bin);
-      if (!GST_OBJECT_FLAG_IS_SET (bin, GST_ELEMENT_CHANGING_STATE)) {
-        GST_UNLOCK (bin);
-        GST_DEBUG_OBJECT (bin, "got ASYNC message, forcing recalc state");
-        GST_STATE_UNLOCK (bin);
+      /* free message */
+      gst_message_unref (message);
 
-        /* force bin state recalculation on async messages. */
-        gst_bin_recalc_state (bin, TRUE);
-      } else {
-        GST_UNLOCK (bin);
-        GST_STATE_UNLOCK (bin);
-        GST_DEBUG_OBJECT (bin, "got SYNC message");
-      }
+      klass = GST_BIN_GET_CLASS (bin);
+      gst_object_ref (bin);
+      GST_DEBUG_OBJECT (bin, "pushing recalc on thread pool");
+      g_thread_pool_push (klass->pool, bin, NULL);
+      GST_UNLOCK (bin);
       break;
 
-    not_our_child:
+    not_toplevel:
       {
         GST_UNLOCK (bin);
-        GST_DEBUG_OBJECT (bin, "not our child");
-        gst_object_unref (src);
+        GST_DEBUG_OBJECT (bin, "not toplevel");
+
+        /* post message up, mark parent bins dirty */
+        gst_element_post_message (GST_ELEMENT_CAST (bin), message);
+
         break;
       }
     }
index 34a610c3592e41bb4c3caf449e3bc05c2cedf659..b21d9cedc8076d3e2892944193088e6086c16442 100644 (file)
@@ -1577,10 +1577,10 @@ gst_element_sync_state_with_parent (GstElement * element)
     GstState parent_current, parent_pending;
     GstStateChangeReturn ret;
 
-    GST_STATE_LOCK (parent);
+    GST_LOCK (parent);
     parent_current = GST_STATE (parent);
     parent_pending = GST_STATE_PENDING (parent);
-    GST_STATE_UNLOCK (parent);
+    GST_UNLOCK (parent);
 
     GST_CAT_DEBUG (GST_CAT_STATES,
         "syncing state of element %s (%s) to %s (%s, %s)",
@@ -1616,7 +1616,7 @@ gst_element_get_state_func (GstElement * element,
 
   GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, "getting state");
 
-  GST_STATE_LOCK (element);
+  GST_LOCK (element);
   ret = GST_STATE_RETURN (element);
 
   /* we got an error, report immediatly */
@@ -1634,6 +1634,7 @@ gst_element_get_state_func (GstElement * element,
   old_pending = GST_STATE_PENDING (element);
   if (old_pending != GST_STATE_VOID_PENDING) {
     GTimeVal *timeval, abstimeout;
+    guint32 cookie;
 
     if (timeout != GST_CLOCK_TIME_NONE) {
       glong add = timeout / 1000;
@@ -1648,14 +1649,21 @@ gst_element_get_state_func (GstElement * element,
     } else {
       timeval = NULL;
     }
+    /* get cookie to dected state change during waiting */
+    cookie = element->state_cookie;
+
     GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
         "waiting for element to commit state");
+
     /* we have a pending state change, wait for it to complete */
     if (!GST_STATE_TIMED_WAIT (element, timeval)) {
       GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "timed out");
       /* timeout triggered */
       ret = GST_STATE_CHANGE_ASYNC;
     } else {
+      if (cookie != element->state_cookie)
+        goto interrupted;
+
       /* could be success or failure */
       if (old_pending == GST_STATE (element)) {
         GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, "got success");
@@ -1682,10 +1690,23 @@ done:
       "state current: %s, pending: %s, result: %d",
       gst_element_state_get_name (GST_STATE (element)),
       gst_element_state_get_name (GST_STATE_PENDING (element)), ret);
-
-  GST_STATE_UNLOCK (element);
+  GST_UNLOCK (element);
 
   return ret;
+
+interrupted:
+  {
+    if (state)
+      *state = GST_STATE_VOID_PENDING;
+    if (pending)
+      *pending = GST_STATE_VOID_PENDING;
+
+    GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "get_state() interruped");
+
+    GST_UNLOCK (element);
+
+    return GST_STATE_CHANGE_FAILURE;
+  }
 }
 
 /**
@@ -1753,8 +1774,10 @@ gst_element_abort_state (GstElement * element)
 #ifndef GST_DISABLE_GST_DEBUG
   GstState old_state;
 #endif
+
   g_return_if_fail (GST_IS_ELEMENT (element));
 
+  GST_LOCK (element);
   pending = GST_STATE_PENDING (element);
 
   if (pending == GST_STATE_VOID_PENDING ||
@@ -1773,11 +1796,13 @@ gst_element_abort_state (GstElement * element)
   GST_STATE_RETURN (element) = GST_STATE_CHANGE_FAILURE;
 
   GST_STATE_BROADCAST (element);
+  GST_UNLOCK (element);
 
   return;
 
 nothing_aborted:
   {
+    GST_UNLOCK (element);
     return;
   }
 }
@@ -1808,77 +1833,81 @@ gst_element_commit_state (GstElement * element)
 {
   GstState pending;
   GstStateChangeReturn ret;
+  GstState old_state, old_next;
+  GstState current, next;
+  GstMessage *message;
+  GstStateChange transition;
 
   g_return_val_if_fail (GST_IS_ELEMENT (element), GST_STATE_CHANGE_FAILURE);
 
-  ret = GST_STATE_CHANGE_SUCCESS;
-
+  GST_LOCK (element);
   pending = GST_STATE_PENDING (element);
 
   /* check if there is something to commit */
-  if (pending != GST_STATE_VOID_PENDING) {
-    GstState old_state;
-    GstState current, next;
-    GstMessage *message;
+  if (pending == GST_STATE_VOID_PENDING)
+    goto nothing_pending;
 
-    GST_LOCK (element);
-    GST_OBJECT_FLAG_SET (element, GST_ELEMENT_CHANGING_STATE);
-    GST_UNLOCK (element);
+  old_state = GST_STATE (element);
+  /* this is the state we should go to next */
+  old_next = GST_STATE_NEXT (element);
+  /* update current state */
+  current = GST_STATE (element) = old_next;
 
-    old_state = GST_STATE (element);
-    /* this is the state we should go to next */
-    next = GST_STATE_NEXT (element);
-    /* update current state */
-    current = GST_STATE (element) = next;
-
-    /* see if we reached the final state */
-    if (pending == next) {
-      pending = GST_STATE_VOID_PENDING;
-      GST_STATE_PENDING (element) = pending;
-      GST_STATE_NEXT (element) = pending;
-      ret = GST_STATE_CHANGE_SUCCESS;
-    } else {
-      /* not there yet, will get there ASYNC */
-      ret = GST_STATE_CHANGE_ASYNC;
-    }
+  /* see if we reached the final state */
+  if (pending == current)
+    goto complete;
 
-    GST_STATE_RETURN (element) = ret;
+  /* not there yet, will get there ASYNC */
+  ret = GST_STATE_CHANGE_ASYNC;
 
-    GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
-        "committing state from %s to %s, pending %s",
-        gst_element_state_get_name (old_state),
-        gst_element_state_get_name (next),
-        gst_element_state_get_name (pending));
+  next = GST_STATE_GET_NEXT (current, pending);
+  transition = GST_STATE_TRANSITION (current, next);
 
-    message = gst_message_new_state_changed (GST_OBJECT (element),
-        old_state, next, pending);
-    gst_element_post_message (element, message);
+  GST_STATE_NEXT (element) = next;
+  GST_UNLOCK (element);
 
-    if (pending != GST_STATE_VOID_PENDING) {
-      GstStateChange transition;
+  GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
+      "committing state from %s to %s, pending %s",
+      gst_element_state_get_name (old_state),
+      gst_element_state_get_name (old_next),
+      gst_element_state_get_name (pending));
 
-      /* calc new next state */
-      next = GST_STATE_GET_NEXT (current, pending);
+  message = gst_message_new_state_changed (GST_OBJECT (element),
+      old_state, old_next, pending);
+  gst_element_post_message (element, message);
 
-      GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
-          "continue state change %s to %s, final %s",
-          gst_element_state_get_name (current),
-          gst_element_state_get_name (next),
-          gst_element_state_get_name (pending));
+  GST_CAT_INFO_OBJECT (GST_CAT_STATES, element,
+      "continue state change %s to %s, final %s",
+      gst_element_state_get_name (current),
+      gst_element_state_get_name (next), gst_element_state_get_name (pending));
 
-      /* create transition */
-      transition = GST_STATE_TRANSITION (current, next);
+  ret = gst_element_change_state (element, transition);
 
-      /* perform next transition */
-      ret = gst_element_change_state (element, transition);
-    } else {
-      GST_STATE_BROADCAST (element);
-    }
-    GST_LOCK (element);
-    GST_OBJECT_FLAG_UNSET (element, GST_ELEMENT_CHANGING_STATE);
+  return ret;
+
+nothing_pending:
+  {
+    GST_DEBUG_OBJECT (element, "nothing pending");
     GST_UNLOCK (element);
+    return GST_STATE_CHANGE_SUCCESS;
+  }
+complete:
+  {
+    GST_STATE_PENDING (element) = GST_STATE_VOID_PENDING;
+    GST_STATE_NEXT (element) = GST_STATE_VOID_PENDING;
+    ret = GST_STATE_RETURN (element) = GST_STATE_CHANGE_SUCCESS;
+
+    GST_DEBUG_OBJECT (element, "completed state change");
+    GST_UNLOCK (element);
+
+    message = gst_message_new_state_changed (GST_OBJECT (element),
+        old_state, old_next, GST_STATE_VOID_PENDING);
+    gst_element_post_message (element, message);
+
+    GST_STATE_BROADCAST (element);
+
+    return ret;
   }
-  return ret;
 }
 
 /**
@@ -1907,6 +1936,7 @@ gst_element_lost_state (GstElement * element)
 
   g_return_if_fail (GST_IS_ELEMENT (element));
 
+  GST_LOCK (element);
   if (GST_STATE_PENDING (element) != GST_STATE_VOID_PENDING ||
       GST_STATE_RETURN (element) == GST_STATE_CHANGE_FAILURE)
     goto nothing_lost;
@@ -1919,15 +1949,21 @@ gst_element_lost_state (GstElement * element)
   GST_STATE_NEXT (element) = current_state;
   GST_STATE_PENDING (element) = current_state;
   GST_STATE_RETURN (element) = GST_STATE_CHANGE_ASYNC;
+  GST_UNLOCK (element);
 
   message = gst_message_new_state_changed (GST_OBJECT (element),
       current_state, current_state, current_state);
   gst_element_post_message (element, message);
 
+  /* and mark us dirty */
+  message = gst_message_new_state_dirty (GST_OBJECT (element));
+  gst_element_post_message (element, message);
+
   return;
 
 nothing_lost:
   {
+    GST_UNLOCK (element);
     return;
   }
 }
@@ -1983,12 +2019,12 @@ gst_element_set_state_func (GstElement * element, GstState state)
   GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element, "set_state to %s",
       gst_element_state_get_name (state));
 
+  /* state lock is taken to protect the set_state() and get_state() 
+   * procedures, it does not lock any variables. */
   GST_STATE_LOCK (element);
 
+  /* now calculate how to get to the new state */
   GST_LOCK (element);
-  GST_OBJECT_FLAG_SET (element, GST_ELEMENT_CHANGING_STATE);
-  GST_UNLOCK (element);
-
   old_ret = GST_STATE_RETURN (element);
   /* previous state change returned an error, remove all pending
    * and next states */
@@ -2001,6 +2037,7 @@ gst_element_set_state_func (GstElement * element, GstState state)
   current = GST_STATE (element);
   next = GST_STATE_NEXT (element);
   old_pending = GST_STATE_PENDING (element);
+  element->state_cookie++;
 
   /* this is the (new) state we should go to */
   GST_STATE_PENDING (element) = state;
@@ -2029,19 +2066,22 @@ gst_element_set_state_func (GstElement * element, GstState state)
     }
   }
   next = GST_STATE_GET_NEXT (current, state);
+  /* now we store the next state */
+  GST_STATE_NEXT (element) = next;
+  transition = GST_STATE_TRANSITION (current, next);
 
   GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, element,
       "%s: setting state from %s to %s",
       (next != state ? "intermediate" : "final"),
       gst_element_state_get_name (current), gst_element_state_get_name (next));
 
-  transition = GST_STATE_TRANSITION (current, next);
+  /* now signal any waiters, they will error since the cookie was increased */
+  GST_STATE_BROADCAST (element);
+
+  GST_UNLOCK (element);
 
   ret = gst_element_change_state (element, transition);
 
-  GST_LOCK (element);
-  GST_OBJECT_FLAG_UNSET (element, GST_ELEMENT_CHANGING_STATE);
-  GST_UNLOCK (element);
   GST_STATE_UNLOCK (element);
 
   GST_DEBUG_OBJECT (element, "returned %d", ret);
@@ -2051,12 +2091,10 @@ gst_element_set_state_func (GstElement * element, GstState state)
 was_busy:
   {
     GST_STATE_RETURN (element) = GST_STATE_CHANGE_ASYNC;
-    GST_LOCK (element);
-    GST_OBJECT_FLAG_UNSET (element, GST_ELEMENT_CHANGING_STATE);
+    GST_DEBUG_OBJECT (element, "element was busy with async state change");
     GST_UNLOCK (element);
-    GST_STATE_UNLOCK (element);
 
-    GST_DEBUG_OBJECT (element, "element was busy with async state change");
+    GST_STATE_UNLOCK (element);
 
     return GST_STATE_CHANGE_ASYNC;
   }
@@ -2077,9 +2115,6 @@ gst_element_change_state (GstElement * element, GstStateChange transition)
   current = GST_STATE_TRANSITION_CURRENT (transition);
   next = GST_STATE_TRANSITION_NEXT (transition);
 
-  /* now we store the next state */
-  GST_STATE_NEXT (element) = next;
-
   /* call the state change function so it can set the state */
   if (oclass->change_state)
     ret = (oclass->change_state) (element, transition);
@@ -2108,7 +2143,9 @@ gst_element_change_state (GstElement * element, GstStateChange transition)
           gst_element_state_get_name (current),
           gst_element_state_get_name (next));
 
+      GST_LOCK (element);
       GST_STATE_RETURN (element) = GST_STATE_CHANGE_SUCCESS;
+      GST_UNLOCK (element);
 
       ret = gst_element_commit_state (element);
       break;
@@ -2133,7 +2170,9 @@ gst_element_change_state (GstElement * element, GstStateChange transition)
   }
 
 exit:
+  GST_LOCK (element);
   GST_STATE_RETURN (element) = ret;
+  GST_UNLOCK (element);
 
   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "exit state change %d", ret);
 
@@ -2142,7 +2181,10 @@ exit:
   /* ERROR */
 invalid_return:
   {
+    GST_LOCK (element);
     GST_STATE_RETURN (element) = ret;
+    GST_UNLOCK (element);
+
     /* somebody added a GST_STATE_ and forgot to do stuff here ! */
     g_critical ("unknown return value %d from a state change function", ret);
     return ret;
index 416b22dee16f5b9a978e9c7081573906410611e9..c14cd9fd057a866008ad0c4ce44e2de40d032891 100644 (file)
@@ -155,7 +155,6 @@ typedef enum
   GST_ELEMENT_LOCKED_STATE      = (GST_OBJECT_FLAG_LAST << 0),
   GST_ELEMENT_IS_SINK           = (GST_OBJECT_FLAG_LAST << 1),
   GST_ELEMENT_UNPARENTING       = (GST_OBJECT_FLAG_LAST << 2),
-  GST_ELEMENT_CHANGING_STATE    = (GST_OBJECT_FLAG_LAST << 3),
   /* padding */
   GST_ELEMENT_FLAG_LAST         = (GST_OBJECT_FLAG_LAST << 16)
 } GstElementFlags;
@@ -281,10 +280,10 @@ G_STMT_START {                                                            \
 #define GST_STATE_UNLOCK_FULL(elem)            g_static_rec_mutex_unlock_full(GST_STATE_GET_LOCK(elem))
 #define GST_STATE_LOCK_FULL(elem,t)            g_static_rec_mutex_lock_full(GST_STATE_GET_LOCK(elem), t)
 #define GST_STATE_GET_COND(elem)               (GST_ELEMENT_CAST(elem)->state_cond)
-#define GST_STATE_WAIT(elem)                   g_static_rec_cond_wait (GST_STATE_GET_COND (elem), \
-                                                       GST_STATE_GET_LOCK (elem))
-#define GST_STATE_TIMED_WAIT(elem, timeval)    g_static_rec_cond_timed_wait (GST_STATE_GET_COND (elem), \
-                                                       GST_STATE_GET_LOCK (elem), timeval)
+#define GST_STATE_WAIT(elem)                   g_cond_wait (GST_STATE_GET_COND (elem), \
+                                                       GST_GET_LOCK (elem))
+#define GST_STATE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_STATE_GET_COND (elem), \
+                                                       GST_GET_LOCK (elem), timeval)
 #define GST_STATE_SIGNAL(elem)                 g_cond_signal (GST_STATE_GET_COND (elem));
 #define GST_STATE_BROADCAST(elem)              g_cond_broadcast (GST_STATE_GET_COND (elem));
 
@@ -292,7 +291,7 @@ struct _GstElement
 {
   GstObject            object;
 
-  /*< public >*/ /* with STATE_LOCK */
+  /*< public >*/ /* with LOCK */
   /* element state */
   GStaticRecMutex      *state_lock;
   GCond                *state_cond;
index 41a49a26e42c13947bbda41cfcc396c31ad54258..d370d466251e3df3656cc0bf0ea2ec7d391c6e2b 100644 (file)
@@ -98,6 +98,8 @@ gst_base_sink_get_type (void)
   return base_sink_type;
 }
 
+static GstStateChangeReturn do_playing (GstBaseSink * sink);
+
 static void gst_base_sink_set_clock (GstElement * element, GstClock * clock);
 
 static void gst_base_sink_set_property (GObject * object, guint prop_id,
@@ -441,6 +443,74 @@ gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
   GST_PREROLL_SIGNAL (pad);
 }
 
+static gboolean
+gst_base_sink_commit_state (GstBaseSink * basesink)
+{
+  /* commit state and proceed to next pending state */
+  {
+    GstState current, next, pending;
+    GstMessage *message;
+    gboolean post_paused = FALSE;
+    gboolean post_playing = FALSE;
+
+    GST_LOCK (basesink);
+    current = GST_STATE (basesink);
+    next = GST_STATE_NEXT (basesink);
+    pending = GST_STATE_PENDING (basesink);
+
+    switch (pending) {
+      case GST_STATE_PLAYING:
+        do_playing (basesink);
+        post_playing = TRUE;
+        break;
+      case GST_STATE_PAUSED:
+        basesink->need_preroll = TRUE;
+        post_paused = TRUE;
+        break;
+      case GST_STATE_READY:
+        goto stopping;
+      default:
+        break;
+    }
+
+    if (pending != GST_STATE_VOID_PENDING) {
+      GST_STATE (basesink) = pending;
+      GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
+      GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
+      GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
+
+      pending = GST_STATE_VOID_PENDING;
+    }
+    GST_UNLOCK (basesink);
+
+    if (post_paused) {
+      message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
+          current, next, GST_STATE_VOID_PENDING);
+      gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
+    }
+    if (post_playing) {
+      message = gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
+          next, pending, GST_STATE_VOID_PENDING);
+      gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
+    }
+    /* and mark dirty */
+    if (post_paused || post_playing) {
+      gst_element_post_message (GST_ELEMENT_CAST (basesink),
+          gst_message_new_state_dirty (GST_OBJECT_CAST (basesink)));
+    }
+
+    GST_STATE_BROADCAST (basesink);
+  }
+  return TRUE;
+
+stopping:
+  {
+    /* app is going to READY */
+    GST_UNLOCK (basesink);
+    return FALSE;
+  }
+}
+
 /* with STREAM_LOCK */
 static GstFlowReturn
 gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad,
@@ -601,14 +671,12 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad,
     basesink->preroll_queued++;
     basesink->buffers_queued++;
   }
+
   GST_DEBUG_OBJECT (basesink,
       "now %d preroll, %d buffers, %d events on queue",
       basesink->preroll_queued,
       basesink->buffers_queued, basesink->events_queued);
 
-  if (basesink->playing_async)
-    goto playing_async;
-
   /* check if we are prerolling */
   if (!basesink->need_preroll)
     goto no_preroll;
@@ -633,60 +701,23 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad,
   GST_DEBUG_OBJECT (basesink, "prerolled length %d", length);
 
   if (length == 1) {
-    gint t;
-    GstTask *task;
 
     basesink->have_preroll = TRUE;
-    /* we are prerolling */
-    GST_PREROLL_UNLOCK (pad);
-
-    /* have to release STREAM_LOCK as we cannot take the STATE_LOCK
-     * inside the STREAM_LOCK */
-    t = GST_STREAM_UNLOCK_FULL (pad);
-    GST_DEBUG_OBJECT (basesink, "released stream lock %d times", t);
-    if (t <= 0) {
-      GST_WARNING ("STREAM_LOCK should have been locked !!");
-      g_warning ("STREAM_LOCK should have been locked !!");
-    }
-
-    /* now we commit our state, this will also automatically proceed to
-     * the next pending state. */
-    /* FIXME */
-    if ((task = GST_PAD_TASK (pad))) {
-      while (!GST_STATE_TRYLOCK (basesink)) {
-        GST_DEBUG_OBJECT (basesink,
-            "state change happening, checking shutdown");
-        GST_LOCK (pad);
-        if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
-          goto task_stopped;
-        GST_UNLOCK (pad);
-      }
-    } else {
-      GST_STATE_LOCK (basesink);
-    }
-    GST_DEBUG_OBJECT (basesink, "commit state");
-    gst_element_commit_state (GST_ELEMENT (basesink));
-    GST_STATE_UNLOCK (basesink);
-
-    /* reacquire stream lock, pad could be flushing now */
-    /* FIXME in glib, if t==0, the lock is still taken... hmmm.. bug #317802 */
-    if (t > 0)
-      GST_STREAM_LOCK_FULL (pad, t);
 
-    /* and wait if needed */
-    GST_PREROLL_LOCK (pad);
+    /* commit state */
+    if (!gst_base_sink_commit_state (basesink))
+      goto stopping;
 
     GST_LOCK (pad);
     if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
       goto flushing;
     GST_UNLOCK (pad);
 
-    /* it is possible that the application set the state to PLAYING
+    /* it is possible that commiting the state made us go to PLAYING
      * now in which case we don't need to block anymore. */
     if (!basesink->need_preroll)
       goto no_preroll;
 
-
     length = basesink->preroll_queued;
 
     /* FIXME: a pad probe could have made us lose the buffer, according
@@ -706,12 +737,11 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad,
     GST_DEBUG_OBJECT (basesink, "waiting to finish preroll");
     GST_PREROLL_WAIT (pad);
     GST_DEBUG_OBJECT (basesink, "done preroll");
+    GST_LOCK (pad);
+    if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
+      goto flushing;
+    GST_UNLOCK (pad);
   }
-  GST_LOCK (pad);
-  if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
-    goto flushing;
-  GST_UNLOCK (pad);
-
   GST_PREROLL_UNLOCK (pad);
 
   return GST_FLOW_OK;
@@ -740,74 +770,29 @@ dropping:
 
     return GST_FLOW_OK;
   }
-playing_async:
+flushing:
   {
-    GstFlowReturn ret;
-    gint t;
-
-    basesink->have_preroll = FALSE;
-    basesink->playing_async = FALSE;
-
-    /* handle buffer first */
-    ret = gst_base_sink_preroll_queue_empty (basesink, pad);
-
-    /* unroll locks, commit state, reacquire stream lock */
+    GST_UNLOCK (pad);
+    gst_base_sink_preroll_queue_flush (basesink, pad);
     GST_PREROLL_UNLOCK (pad);
-    t = GST_STREAM_UNLOCK_FULL (pad);
-    GST_DEBUG_OBJECT (basesink, "released stream lock %d times", t);
-    if (t <= 0) {
-      GST_WARNING ("STREAM_LOCK should have been locked !!");
-      g_warning ("STREAM_LOCK should have been locked !!");
-    }
-    GST_STATE_LOCK (basesink);
-    GST_DEBUG_OBJECT (basesink, "commit state");
-    gst_element_commit_state (GST_ELEMENT (basesink));
-    GST_STATE_UNLOCK (basesink);
-    if (t > 0)
-      GST_STREAM_LOCK_FULL (pad, t);
+    GST_DEBUG_OBJECT (basesink, "pad is flushing");
 
-    return ret;
-  }
-task_stopped:
-  {
-    GST_UNLOCK (pad);
-    GST_DEBUG_OBJECT (basesink, "task is stopped");
     return GST_FLOW_WRONG_STATE;
   }
-flushing:
+stopping:
   {
-    GST_UNLOCK (pad);
-    gst_base_sink_preroll_queue_flush (basesink, pad);
     GST_PREROLL_UNLOCK (pad);
-    GST_DEBUG_OBJECT (basesink, "pad is flushing");
+    GST_DEBUG_OBJECT (basesink, "stopping");
+
     return GST_FLOW_WRONG_STATE;
   }
 preroll_failed:
   {
-    gint t;
-
     GST_DEBUG_OBJECT (basesink, "preroll failed");
     gst_base_sink_preroll_queue_flush (basesink, pad);
-    GST_PREROLL_UNLOCK (pad);
-
-    /* have to release STREAM_LOCK as we cannot take the STATE_LOCK
-     * inside the STREAM_LOCK */
-    t = GST_STREAM_UNLOCK_FULL (pad);
-    GST_DEBUG_OBJECT (basesink, "released stream lock %d times", t);
-    if (t <= 0) {
-      GST_WARNING ("STREAM_LOCK should have been locked !!");
-      g_warning ("STREAM_LOCK should have been locked !!");
-    }
 
-    /* now we abort our state */
-    GST_STATE_LOCK (basesink);
     GST_DEBUG_OBJECT (basesink, "abort state");
     gst_element_abort_state (GST_ELEMENT (basesink));
-    GST_STATE_UNLOCK (basesink);
-
-    /* reacquire stream lock, pad could be flushing now */
-    if (t > 0)
-      GST_STREAM_LOCK_FULL (pad, t);
 
     return GST_FLOW_ERROR;
   }
@@ -871,11 +856,9 @@ gst_base_sink_event (GstPad * pad, GstEvent * event)
 
       /* and we need to commit our state again on the next
        * prerolled buffer */
-      GST_STATE_LOCK (basesink);
       GST_STREAM_LOCK (pad);
       gst_element_lost_state (GST_ELEMENT (basesink));
       GST_STREAM_UNLOCK (pad);
-      GST_STATE_UNLOCK (basesink);
       GST_DEBUG_OBJECT (basesink, "event unref %p %p", basesink, event);
       gst_event_unref (event);
       break;
@@ -1450,6 +1433,50 @@ gst_base_sink_query (GstElement * element, GstQuery * query)
   return res;
 }
 
+/* with PREROLL_LOCK */
+static GstStateChangeReturn
+do_playing (GstBaseSink * basesink)
+{
+  GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
+
+  /* no preroll needed */
+  basesink->need_preroll = FALSE;
+
+  /* if we have EOS, we should empty the queue now as there will
+   * be no more data received in the chain function.
+   * FIXME, this could block the state change function too long when
+   * we are pushing and syncing the buffers, better start a new
+   * thread to do this. */
+  if (basesink->eos) {
+    gboolean do_eos = !basesink->eos_queued;
+
+    gst_base_sink_preroll_queue_empty (basesink, basesink->sinkpad);
+
+    /* need to post EOS message here if it was not in the preroll queue we
+     * just emptied. */
+    if (do_eos) {
+      GST_DEBUG_OBJECT (basesink, "Now posting EOS");
+      gst_element_post_message (GST_ELEMENT (basesink),
+          gst_message_new_eos (GST_OBJECT (basesink)));
+    }
+  } else if (!basesink->have_preroll) {
+    /* don't need preroll, but do queue a commit_state */
+    basesink->need_preroll = TRUE;
+    GST_DEBUG_OBJECT (basesink,
+        "PAUSED to PLAYING, !eos, !have_preroll, need preroll to FALSE");
+    ret = GST_STATE_CHANGE_ASYNC;
+    /* we know it's not waiting, no need to signal */
+  } else {
+    /* don't need the preroll anymore */
+    GST_DEBUG_OBJECT (basesink,
+        "PAUSED to PLAYING, !eos, have_preroll, need preroll to FALSE");
+    /* now let it play */
+    GST_PREROLL_SIGNAL (basesink->sinkpad);
+  }
+
+  return ret;
+}
+
 static GstStateChangeReturn
 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
 {
@@ -1484,40 +1511,7 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
       GST_PREROLL_LOCK (basesink->sinkpad);
-      /* if we have EOS, we should empty the queue now as there will
-       * be no more data received in the chain function.
-       * FIXME, this could block the state change function too long when
-       * we are pushing and syncing the buffers, better start a new
-       * thread to do this. */
-      if (basesink->eos) {
-        gboolean do_eos = !basesink->eos_queued;
-
-        gst_base_sink_preroll_queue_empty (basesink, basesink->sinkpad);
-        basesink->need_preroll = FALSE;
-
-        /* need to post EOS message here if it was not in the preroll queue we
-         * just emptied. */
-        if (do_eos) {
-          GST_DEBUG_OBJECT (basesink, "Now posting EOS");
-          gst_element_post_message (GST_ELEMENT (basesink),
-              gst_message_new_eos (GST_OBJECT (basesink)));
-        }
-      } else if (!basesink->have_preroll) {
-        /* don't need preroll, but do queue a commit_state */
-        GST_DEBUG_OBJECT (basesink,
-            "PAUSED to PLAYING, !eos, !have_preroll, need preroll to FALSE");
-        basesink->need_preroll = FALSE;
-        basesink->playing_async = TRUE;
-        ret = GST_STATE_CHANGE_ASYNC;
-        /* we know it's not waiting, no need to signal */
-      } else {
-        /* don't need the preroll anymore */
-        basesink->need_preroll = FALSE;
-        GST_DEBUG_OBJECT (basesink,
-            "PAUSED to PLAYING, !eos, have_preroll, need preroll to FALSE");
-        /* now let it play */
-        GST_PREROLL_SIGNAL (basesink->sinkpad);
-      }
+      ret = do_playing (basesink);
       GST_PREROLL_UNLOCK (basesink->sinkpad);
       break;
     default:
@@ -1547,8 +1541,6 @@ gst_base_sink_change_state (GstElement * element, GstStateChange transition)
       }
       GST_UNLOCK (basesink);
 
-      basesink->playing_async = FALSE;
-
       /* unlock any subclasses */
       if (bclass->unlock)
         bclass->unlock (basesink);
index bee868c2ea4ac5fa27d4a19567190e9b53f780dc..0dff8e3dbfd747c1306ca49f552eb15a36373c96 100644 (file)
@@ -52,6 +52,7 @@ check_PROGRAMS =                              \
        elements/identity                       \
        generic/states                          \
        pipelines/simple_launch_lines           \
+       pipelines/stress                        \
        pipelines/cleanup                       \
        states/sinks                            \
        gst-libs/controller                     \
diff --git a/tests/check/pipelines/stress.c b/tests/check/pipelines/stress.c
new file mode 100644 (file)
index 0000000..eb0ade1
--- /dev/null
@@ -0,0 +1,88 @@
+/* GStreamer
+ * Copyright (C) 2005 Andy Wingo <wingo@pobox.com>
+ *
+ * simple_launch_lines.c: Unit test for simple pipelines
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+
+#include <gst/check/gstcheck.h>
+
+
+GST_START_TEST (test_stress)
+{
+  GstElement *fakesrc, *fakesink, *pipeline;
+  gint i;
+
+  fakesrc = gst_element_factory_make ("fakesrc", NULL);
+  fakesink = gst_element_factory_make ("fakesink", NULL);
+  pipeline = gst_element_factory_make ("pipeline", NULL);
+
+  g_return_if_fail (fakesrc && fakesink && pipeline);
+
+  gst_bin_add_many (GST_BIN (pipeline), fakesrc, fakesink, NULL);
+  gst_element_link (fakesrc, fakesink);
+
+  i = 10000;
+  while (i--) {
+    gst_element_set_state (pipeline, GST_STATE_PAUSED);
+    gst_element_set_state (pipeline, GST_STATE_PLAYING);
+
+    gst_element_set_state (pipeline, GST_STATE_PAUSED);
+    gst_element_set_state (pipeline, GST_STATE_PLAYING);
+    gst_element_set_state (pipeline, GST_STATE_PAUSED);
+    gst_element_set_state (pipeline, GST_STATE_READY);
+    gst_element_set_state (pipeline, GST_STATE_PLAYING);
+    gst_element_set_state (pipeline, GST_STATE_PAUSED);
+    gst_element_set_state (pipeline, GST_STATE_READY);
+    gst_element_set_state (pipeline, GST_STATE_PAUSED);
+    gst_element_set_state (pipeline, GST_STATE_NULL);
+  }
+
+  gst_object_unref (pipeline);
+}
+
+GST_END_TEST Suite *
+simple_launch_lines_suite (void)
+{
+  Suite *s = suite_create ("stress");
+  TCase *tc_chain = tcase_create ("linear");
+
+  /* time out after 20s, not the default 3 */
+  tcase_set_timeout (tc_chain, 0);
+
+  suite_add_tcase (s, tc_chain);
+  tcase_add_test (tc_chain, test_stress);
+  return s;
+}
+
+int
+main (int argc, char **argv)
+{
+  int nf;
+
+  Suite *s = simple_launch_lines_suite ();
+  SRunner *sr = srunner_create (s);
+
+  gst_check_init (&argc, &argv);
+
+  srunner_run_all (sr, CK_NORMAL);
+  nf = srunner_ntests_failed (sr);
+  srunner_free (sr);
+
+  return nf;
+}