gst/base/gstbasesink.c: Reworked the base sink, handle event and buffer serialisation...
authorWim Taymans <wim.taymans@gmail.com>
Sat, 25 Jun 2005 17:54:58 +0000 (17:54 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Sat, 25 Jun 2005 17:54:58 +0000 (17:54 +0000)
Original commit message from CVS:
* gst/base/gstbasesink.c: (gst_basesink_set_property),
(gst_basesink_preroll_queue_empty),
(gst_basesink_preroll_queue_flush), (gst_basesink_handle_object),
(gst_basesink_event), (gst_basesink_do_sync),
(gst_basesink_handle_event), (gst_basesink_handle_buffer),
(gst_basesink_chain), (gst_basesink_loop), (gst_basesink_activate),
(gst_basesink_change_state):
Reworked the base sink, handle event and buffer serialisation
correctly and removed possible deadlock.
Handle EOS correctly.

ChangeLog
gst/base/gstbasesink.c
libs/gst/base/gstbasesink.c

index 8e1c21e..4959a1f 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,18 @@
 2005-06-25  Wim Taymans  <wim@fluendo.com>
 
+       * gst/base/gstbasesink.c: (gst_basesink_set_property),
+       (gst_basesink_preroll_queue_empty),
+       (gst_basesink_preroll_queue_flush), (gst_basesink_handle_object),
+       (gst_basesink_event), (gst_basesink_do_sync),
+       (gst_basesink_handle_event), (gst_basesink_handle_buffer),
+       (gst_basesink_chain), (gst_basesink_loop), (gst_basesink_activate),
+       (gst_basesink_change_state):
+       Reworked the base sink, handle event and buffer serialisation
+       correctly and removed possible deadlock.
+       Handle EOS correctly.
+
+2005-06-25  Wim Taymans  <wim@fluendo.com>
+
        * gst/gstpipeline.c: (is_eos), (pipeline_bus_handler),
        (gst_pipeline_change_state):
        * tools/gst-launch.c: (check_intr), (event_loop), (main):
index c54ee85..7d75d49 100644 (file)
 GST_DEBUG_CATEGORY_STATIC (gst_basesink_debug);
 #define GST_CAT_DEFAULT gst_basesink_debug
 
-/* #define DEBUGGING */
-#ifdef DEBUGGING
-#define DEBUG(str,args...) g_print (str,##args)
-#else
-#define DEBUG(str,args...)
-#endif
-
 /* BaseSink signals and properties */
 enum
 {
@@ -104,14 +97,15 @@ static void gst_basesink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
 
 static GstElementStateReturn gst_basesink_change_state (GstElement * element);
 
-static GstFlowReturn gst_basesink_chain_unlocked (GstPad * pad,
-    GstBuffer * buffer);
+static GstFlowReturn gst_basesink_chain (GstPad * pad, GstBuffer * buffer);
 static void gst_basesink_loop (GstPad * pad);
 static GstFlowReturn gst_basesink_chain (GstPad * pad, GstBuffer * buffer);
 static gboolean gst_basesink_activate (GstPad * pad, GstActivateMode mode);
 static gboolean gst_basesink_event (GstPad * pad, GstEvent * event);
 static inline GstFlowReturn gst_basesink_handle_buffer (GstBaseSink * basesink,
     GstBuffer * buf);
+static inline gboolean gst_basesink_handle_event (GstBaseSink * basesink,
+    GstEvent * event);
 
 static void
 gst_basesink_base_init (gpointer g_class)
@@ -304,15 +298,18 @@ gst_basesink_set_property (GObject * object, guint prop_id,
 
   sink = GST_BASESINK (object);
 
-  GST_LOCK (sink);
   switch (prop_id) {
     case PROP_HAS_LOOP:
+      GST_LOCK (sink);
       sink->has_loop = g_value_get_boolean (value);
       gst_basesink_set_all_pad_functions (sink);
+      GST_UNLOCK (sink);
       break;
     case PROP_HAS_CHAIN:
+      GST_LOCK (sink);
       sink->has_chain = g_value_get_boolean (value);
       gst_basesink_set_all_pad_functions (sink);
+      GST_UNLOCK (sink);
       break;
     case PROP_PREROLL_QUEUE_LEN:
       /* preroll lock necessary to serialize with finish_preroll */
@@ -324,7 +321,6 @@ gst_basesink_set_property (GObject * object, guint prop_id,
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
   }
-  GST_UNLOCK (sink);
 }
 
 static void
@@ -374,59 +370,35 @@ gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
 }
 
 /* with PREROLL_LOCK */
-static void
-gst_basesink_preroll_queue_push (GstBaseSink * basesink, GstPad * pad,
-    GstBuffer * buffer)
-{
-  /* if we don't have a buffer we just start blocking */
-  if (buffer == NULL)
-    goto block;
-
-  /* first buffer */
-  if (basesink->preroll_queue->length == 0) {
-    GstBaseSinkClass *bclass = GST_BASESINK_GET_CLASS (basesink);
-
-    GST_DEBUG ("preroll buffer with TS: %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
-    if (bclass->preroll)
-      bclass->preroll (basesink, buffer);
-  }
-
-  if (basesink->preroll_queue->length < basesink->preroll_queue_max_len) {
-    DEBUG ("push %p %p\n", basesink, buffer);
-    g_queue_push_tail (basesink->preroll_queue, buffer);
-    return;
-  }
-
-block:
-  /* block until the state changes, or we get a flush, or something */
-  DEBUG ("block %p %p\n", basesink, buffer);
-  GST_DEBUG ("element %s waiting to finish preroll",
-      GST_ELEMENT_NAME (basesink));
-  basesink->need_preroll = FALSE;
-  basesink->have_preroll = TRUE;
-  GST_PREROLL_WAIT (pad);
-  GST_DEBUG ("done preroll");
-  basesink->have_preroll = FALSE;
-}
-
-/* with PREROLL_LOCK */
 static GstFlowReturn
 gst_basesink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad)
 {
-  GstBuffer *buf;
+  GstMiniObject *obj;
   GQueue *q = basesink->preroll_queue;
   GstFlowReturn ret;
 
   ret = GST_FLOW_OK;
 
   if (q) {
-    DEBUG ("empty queue\n");
-    while ((buf = g_queue_pop_head (q))) {
-      DEBUG ("pop %p\n", buf);
-      ret = gst_basesink_handle_buffer (basesink, buf);
+    GST_DEBUG ("emptying queue");
+    while ((obj = g_queue_pop_head (q))) {
+      /* we release the preroll lock while pushing so that we
+       * can still flush it while blocking on the clock or
+       * inside the element. */
+      GST_PREROLL_UNLOCK (pad);
+
+      if (GST_IS_BUFFER (obj)) {
+        GST_DEBUG ("poped buffer %p", obj);
+        ret = gst_basesink_handle_buffer (basesink, GST_BUFFER (obj));
+      } else {
+        GST_DEBUG ("poped event %p", obj);
+        gst_basesink_handle_event (basesink, GST_EVENT (obj));
+        ret = GST_FLOW_OK;
+      }
+
+      GST_PREROLL_LOCK (pad);
     }
-    DEBUG ("queue len %p %d\n", basesink, q->length);
+    GST_DEBUG ("queue empty");
   }
   return ret;
 }
@@ -435,83 +407,126 @@ gst_basesink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad)
 static void
 gst_basesink_preroll_queue_flush (GstBaseSink * basesink)
 {
-  GstBuffer *buf;
+  GstMiniObject *obj;
   GQueue *q = basesink->preroll_queue;
 
-  DEBUG ("flush %p\n", basesink);
+  GST_DEBUG ("flushing queue %p", basesink);
   if (q) {
-    while ((buf = g_queue_pop_head (q))) {
-      DEBUG ("pop %p\n", buf);
-      gst_buffer_unref (buf);
+    while ((obj = g_queue_pop_head (q))) {
+      GST_DEBUG ("poped %p", obj);
+      gst_mini_object_unref (obj);
     }
   }
+  /* we can't have EOS anymore now */
+  basesink->eos = FALSE;
 }
 
-typedef enum
-{
-  PREROLL_QUEUEING,
-  PREROLL_PLAYING,
-  PREROLL_FLUSHING,
-  PREROLL_ERROR
-} PrerollReturn;
-
 /* with STREAM_LOCK */
-PrerollReturn
-gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad,
-    GstBuffer * buffer)
+static GstFlowReturn
+gst_basesink_handle_object (GstBaseSink * basesink, GstPad * pad,
+    GstMiniObject * obj)
 {
-  gboolean flushing;
+  gint length;
+  gboolean have_event;
 
-  DEBUG ("finish preroll %p <\n", basesink);
-  /* lock order is important */
-  GST_STATE_LOCK (basesink);
   GST_PREROLL_LOCK (pad);
-  DEBUG ("finish preroll %p >\n", basesink);
+  /* push object on the queue */
+  GST_DEBUG ("push on queue %p %p", basesink, obj);
+  g_queue_push_tail (basesink->preroll_queue, obj);
+
+  have_event = GST_IS_EVENT (obj);
+
+  if (have_event && GST_EVENT_TYPE (obj) == GST_EVENT_EOS) {
+    basesink->eos = TRUE;
+  }
+
+  /* check if we are prerolling */
   if (!basesink->need_preroll)
     goto no_preroll;
 
+  length = basesink->preroll_queue->length;
+  /* this is the first object we queued */
+  if (length == 1) {
+    GST_DEBUG ("do preroll %p", obj);
+
+    /* if it's a buffer, we need to call the preroll method */
+    if (GST_IS_BUFFER (obj)) {
+      GstBaseSinkClass *bclass;
+
+      bclass = GST_BASESINK_GET_CLASS (basesink);
+      if (bclass->preroll)
+        bclass->preroll (basesink, GST_BUFFER (obj));
+    }
+  }
+  /* we are prerolling */
+  GST_DEBUG ("finish preroll %p >", basesink);
+  GST_PREROLL_UNLOCK (pad);
+
+  /* have to release STREAM_LOCK as we cannot take the STATE_LOCK
+   * inside the STREAM_LOCK */
+  GST_STREAM_UNLOCK (pad);
+
+  /* now we commit our state */
+  GST_STATE_LOCK (basesink);
+  GST_DEBUG ("commit state %p >", basesink);
   gst_element_commit_state (GST_ELEMENT (basesink));
   GST_STATE_UNLOCK (basesink);
 
-  gst_basesink_preroll_queue_push (basesink, pad, buffer);
+  /* reacquire stream lock, pad could be flushing now */
+  GST_STREAM_LOCK (pad);
 
   GST_LOCK (pad);
-  flushing = GST_PAD_IS_FLUSHING (pad);
-  GST_UNLOCK (pad);
-  if (flushing)
+  if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
     goto flushing;
+  GST_UNLOCK (pad);
 
-  if (basesink->need_preroll)
-    goto still_queueing;
-
-  GST_DEBUG ("done preroll");
-
-  gst_basesink_preroll_queue_empty (basesink, pad);
+  /* and wait if needed */
+  GST_PREROLL_LOCK (pad);
+  /* it is possible that the application set the state to PLAYING
+   * now in which case we don't need to block anymore. */
+  if (!basesink->need_preroll)
+    goto no_preroll;
 
+  length = basesink->preroll_queue->length;
+  GST_DEBUG ("prerolled length %d", length);
+  /* see if we need to block now. We cannot block on events, only
+   * on buffers, the reason is that events can be sent from the
+   * application thread and we don't want to block there. */
+  if (length > basesink->preroll_queue_max_len && !have_event) {
+    /* block until the state changes, or we get a flush, or something */
+    GST_DEBUG ("element %s waiting to finish preroll",
+        GST_ELEMENT_NAME (basesink));
+    basesink->have_preroll = TRUE;
+    GST_PREROLL_WAIT (pad);
+    GST_DEBUG ("done preroll");
+    basesink->have_preroll = FALSE;
+  }
   GST_PREROLL_UNLOCK (pad);
 
-  return PREROLL_PLAYING;
+  GST_LOCK (pad);
+  if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
+    goto flushing;
+  GST_UNLOCK (pad);
+
+  return GST_FLOW_OK;
 
 no_preroll:
   {
+    GstFlowReturn ret;
+
+    GST_DEBUG ("no preroll needed");
     /* maybe it was another sink that blocked in preroll, need to check for
        buffers to drain */
-    if (basesink->preroll_queue->length)
-      gst_basesink_preroll_queue_empty (basesink, pad);
+    ret = gst_basesink_preroll_queue_empty (basesink, pad);
     GST_PREROLL_UNLOCK (pad);
-    GST_STATE_UNLOCK (basesink);
-    return PREROLL_PLAYING;
+
+    return ret;
   }
 flushing:
   {
+    GST_UNLOCK (pad);
     GST_DEBUG ("pad is flushing");
-    GST_PREROLL_UNLOCK (pad);
-    return PREROLL_FLUSHING;
-  }
-still_queueing:
-  {
-    GST_PREROLL_UNLOCK (pad);
-    return PREROLL_QUEUEING;
+    return GST_FLOW_WRONG_STATE;
   }
 }
 
@@ -526,83 +541,66 @@ gst_basesink_event (GstPad * pad, GstEvent * event)
 
   bclass = GST_BASESINK_GET_CLASS (basesink);
 
-  DEBUG ("event %p\n", basesink);
-
-  if (bclass->event)
-    bclass->event (basesink, event);
+  GST_DEBUG ("event %p", event);
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_EOS:
     {
-      PrerollReturn pre_ret;
-      gboolean need_eos;
+      GstFlowReturn ret;
 
       GST_STREAM_LOCK (pad);
-
       /* EOS also finishes the preroll */
-      pre_ret = gst_basesink_finish_preroll (basesink, pad, NULL);
-
-      if (pre_ret == PREROLL_PLAYING) {
-        GST_LOCK (basesink);
-        need_eos = basesink->eos = TRUE;
-        if (basesink->clock) {
-          /* wait for last buffer to finish if we have a valid end time */
-          if (GST_CLOCK_TIME_IS_VALID (basesink->end_time)) {
-            basesink->clock_id = gst_clock_new_single_shot_id (basesink->clock,
-                basesink->end_time + GST_ELEMENT (basesink)->base_time);
-            GST_UNLOCK (basesink);
-
-            gst_clock_id_wait (basesink->clock_id, NULL);
-
-            GST_LOCK (basesink);
-            if (basesink->clock_id) {
-              gst_clock_id_unref (basesink->clock_id);
-              basesink->clock_id = NULL;
-            }
-            basesink->end_time = GST_CLOCK_TIME_NONE;
-            need_eos = basesink->eos;
-          }
-          GST_UNLOCK (basesink);
-
-          /* if we are still EOS, we can post the EOS message */
-          if (need_eos) {
-            /* ok, now we can post the message */
-            gst_element_post_message (GST_ELEMENT (basesink),
-                gst_message_new_eos (GST_OBJECT (basesink)));
-          }
-        }
-      }
-
+      ret = gst_basesink_handle_object (basesink, pad, GST_MINI_OBJECT (event));
       GST_STREAM_UNLOCK (pad);
       break;
     }
     case GST_EVENT_DISCONTINUOUS:
+    {
+      GstFlowReturn ret;
+
       GST_STREAM_LOCK (pad);
       if (basesink->clock) {
         //gint64 value = GST_EVENT_DISCONT_OFFSET (event, 0).value;
       }
+      ret = gst_basesink_handle_object (basesink, pad, GST_MINI_OBJECT (event));
       GST_STREAM_UNLOCK (pad);
       break;
+    }
     case GST_EVENT_FLUSH:
       /* make sure we are not blocked on the clock also clear any pending
        * eos state. */
+      if (bclass->event)
+        bclass->event (basesink, event);
+
       if (!GST_EVENT_FLUSH_DONE (event)) {
+        GST_PREROLL_LOCK (pad);
+        /* we need preroll after the flush */
+        basesink->need_preroll = TRUE;
+        gst_basesink_preroll_queue_flush (basesink);
+        /* unlock from a possible state change/preroll */
+        GST_PREROLL_SIGNAL (pad);
+
         GST_LOCK (basesink);
-        basesink->eos = FALSE;
         if (basesink->clock_id) {
           gst_clock_id_unschedule (basesink->clock_id);
         }
         GST_UNLOCK (basesink);
-
-        /* unlock from a possible state change/preroll */
-        GST_PREROLL_LOCK (pad);
-        basesink->need_preroll = TRUE;
-        gst_basesink_preroll_queue_flush (basesink);
-        GST_PREROLL_SIGNAL (pad);
         GST_PREROLL_UNLOCK (pad);
+
+        /* and we need to commit our state again on the next
+         * prerolled buffer */
+        GST_STATE_LOCK (basesink);
+        GST_STREAM_LOCK (pad);
+        gst_element_lost_state (GST_ELEMENT (basesink));
+        GST_STREAM_UNLOCK (pad);
+        GST_STATE_UNLOCK (basesink);
+      } else {
+        /* now we are completely unblocked and the _chain method
+         * will return */
+        GST_STREAM_LOCK (pad);
+        GST_STREAM_UNLOCK (pad);
       }
-      /* now we are completely unblocked and the _chain method
-       * will return */
+
       break;
     default:
       result = gst_pad_event_default (pad, event);
@@ -640,11 +638,12 @@ gst_basesink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
  * 4) wait on the clock, this blocks
  * 5) unref the clockid again
  */
-static void
+static gboolean
 gst_basesink_do_sync (GstBaseSink * basesink, GstBuffer * buffer)
 {
+  gboolean result = TRUE;
+
   if (basesink->clock) {
-    GstClockReturn ret;
     GstClockTime start, end;
     GstBaseSinkClass *bclass;
 
@@ -657,6 +656,8 @@ gst_basesink_do_sync (GstBaseSink * basesink, GstBuffer * buffer)
         ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
 
     if (GST_CLOCK_TIME_IS_VALID (start)) {
+      GstClockReturn ret;
+
       /* save clock id so that we can unlock it if needed */
       GST_LOCK (basesink);
       basesink->clock_id = gst_clock_new_single_shot_id (basesink->clock,
@@ -671,13 +672,79 @@ gst_basesink_do_sync (GstBaseSink * basesink, GstBuffer * buffer)
         gst_clock_id_unref (basesink->clock_id);
         basesink->clock_id = NULL;
       }
-      /* FIXME, don't mess with end_time here */
-      basesink->end_time = GST_CLOCK_TIME_NONE;
       GST_UNLOCK (basesink);
 
       GST_LOG_OBJECT (basesink, "clock entry done: %d", ret);
+      if (ret == GST_CLOCK_UNSCHEDULED)
+        result = FALSE;
     }
   }
+  return result;
+}
+
+
+/* handle an event
+ *
+ * 2) render the event
+ * 3) unref the event
+ */
+static inline gboolean
+gst_basesink_handle_event (GstBaseSink * basesink, GstEvent * event)
+{
+  GstBaseSinkClass *bclass;
+  gboolean ret;
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_EOS:
+      GST_LOCK (basesink);
+      if (basesink->clock) {
+        /* wait for last buffer to finish if we have a valid end time */
+        if (GST_CLOCK_TIME_IS_VALID (basesink->end_time)) {
+          basesink->clock_id = gst_clock_new_single_shot_id (basesink->clock,
+              basesink->end_time + GST_ELEMENT (basesink)->base_time);
+          GST_UNLOCK (basesink);
+
+          gst_clock_id_wait (basesink->clock_id, NULL);
+
+          GST_LOCK (basesink);
+          if (basesink->clock_id) {
+            gst_clock_id_unref (basesink->clock_id);
+            basesink->clock_id = NULL;
+          }
+          basesink->end_time = GST_CLOCK_TIME_NONE;
+        }
+      }
+      GST_UNLOCK (basesink);
+      break;
+    default:
+      break;
+  }
+
+  bclass = GST_BASESINK_GET_CLASS (basesink);
+  if (bclass->event)
+    ret = bclass->event (basesink, event);
+  else
+    ret = TRUE;
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_EOS:
+      GST_PREROLL_LOCK (basesink->sinkpad);
+      /* if we are still EOS, we can post the EOS message */
+      if (basesink->eos) {
+        /* ok, now we can post the message */
+        gst_element_post_message (GST_ELEMENT (basesink),
+            gst_message_new_eos (GST_OBJECT (basesink)));
+      }
+      GST_PREROLL_UNLOCK (basesink->sinkpad);
+      break;
+    default:
+      break;
+  }
+
+  GST_DEBUG ("event unref %p %p", basesink, event);
+  gst_event_unref (event);
+
+  return ret;
 }
 
 /* handle a buffer
@@ -700,46 +767,21 @@ gst_basesink_handle_buffer (GstBaseSink * basesink, GstBuffer * buf)
   else
     ret = GST_FLOW_OK;
 
-  DEBUG ("unref %p %p\n", basesink, buf);
+  GST_DEBUG ("buffer unref after render %p", basesink, buf);
   gst_buffer_unref (buf);
 
   return ret;
 }
 
 static GstFlowReturn
-gst_basesink_chain_unlocked (GstPad * pad, GstBuffer * buf)
+gst_basesink_chain (GstPad * pad, GstBuffer * buf)
 {
   GstBaseSink *basesink;
-  PrerollReturn result;
+  GstFlowReturn result;
 
   basesink = GST_BASESINK (GST_OBJECT_PARENT (pad));
 
-  DEBUG ("chain_unlocked %p\n", basesink);
-
-  result = gst_basesink_finish_preroll (basesink, pad, buf);
-
-  DEBUG ("chain_unlocked %p after, result %d\n", basesink, result);
-
-  switch (result) {
-    case PREROLL_QUEUEING:
-      return GST_FLOW_OK;
-    case PREROLL_PLAYING:
-      return gst_basesink_handle_buffer (basesink, buf);
-    case PREROLL_FLUSHING:
-      gst_buffer_unref (buf);
-      return GST_FLOW_UNEXPECTED;
-    default:
-      g_assert_not_reached ();
-      return GST_FLOW_ERROR;
-  }
-}
-
-static GstFlowReturn
-gst_basesink_chain (GstPad * pad, GstBuffer * buf)
-{
-  GstFlowReturn result;
-
-  result = gst_basesink_chain_unlocked (pad, buf);
+  result = gst_basesink_handle_object (basesink, pad, GST_MINI_OBJECT (buf));
 
   return result;
 }
@@ -761,7 +803,7 @@ gst_basesink_loop (GstPad * pad)
   if (result != GST_FLOW_OK)
     goto paused;
 
-  result = gst_basesink_chain_unlocked (pad, buf);
+  result = gst_basesink_chain (pad, buf);
   if (result != GST_FLOW_OK)
     goto paused;
 
@@ -797,6 +839,7 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode)
       break;
     case GST_ACTIVATE_NONE:
       /* step 1, unblock clock sync (if any) or any other blocking thing */
+      GST_PREROLL_LOCK (pad);
       GST_LOCK (basesink);
       if (basesink->clock_id) {
         gst_clock_id_unschedule (basesink->clock_id);
@@ -807,8 +850,9 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode)
       if (bclass->unlock)
         bclass->unlock (basesink);
 
-      /* unlock preroll */
-      GST_PREROLL_LOCK (pad);
+      /* flush out the data thread if it's locked in finish_preroll */
+      gst_basesink_preroll_queue_flush (basesink);
+      basesink->need_preroll = FALSE;
       GST_PREROLL_SIGNAL (pad);
       GST_PREROLL_UNLOCK (pad);
 
@@ -828,8 +872,6 @@ gst_basesink_change_state (GstElement * element)
   GstBaseSink *basesink = GST_BASESINK (element);
   GstElementState transition = GST_STATE_TRANSITION (element);
 
-  DEBUG ("state change > %p %x\n", basesink, transition);
-
   switch (transition) {
     case GST_STATE_NULL_TO_READY:
       break;
@@ -838,25 +880,31 @@ gst_basesink_change_state (GstElement * element)
        * is no data flow in READY so we can safely assume we need to preroll. */
       basesink->offset = 0;
       GST_PREROLL_LOCK (basesink->sinkpad);
-      basesink->need_preroll = TRUE;
       basesink->have_preroll = FALSE;
+      basesink->need_preroll = TRUE;
       GST_PREROLL_UNLOCK (basesink->sinkpad);
       ret = GST_STATE_ASYNC;
       break;
     case GST_STATE_PAUSED_TO_PLAYING:
+    {
       GST_PREROLL_LOCK (basesink->sinkpad);
+      /* if we have EOS, we should empty the queue now as there will
+       * be no more data received in the chain function.
+       * FIXME, this could block the state change function too long when
+       * we are pushing and syncing the buffers, better start a new
+       * thread to do this. */
+      if (basesink->eos) {
+        gst_basesink_preroll_queue_empty (basesink, basesink->sinkpad);
+      }
+      /* don't need the preroll anymore */
+      basesink->need_preroll = FALSE;
       if (basesink->have_preroll) {
         /* now let it play */
         GST_PREROLL_SIGNAL (basesink->sinkpad);
-      } else {
-        /* FIXME. We do not have a preroll and we don't need it anymore 
-         * now, this is a case we want to avoid. One way would be to make
-         * a 'lost state' function that makes get_state return PAUSED with
-         * ASYNC to indicate that we are prerolling again. */
-        basesink->need_preroll = FALSE;
       }
       GST_PREROLL_UNLOCK (basesink->sinkpad);
       break;
+    }
     default:
       break;
   }
@@ -866,20 +914,25 @@ gst_basesink_change_state (GstElement * element)
   switch (transition) {
     case GST_STATE_PLAYING_TO_PAUSED:
     {
-      gboolean eos;
+      GstBaseSinkClass *bclass;
 
-      /* unlock clock wait if any */
+      bclass = GST_BASESINK_GET_CLASS (basesink);
+
+      GST_PREROLL_LOCK (basesink->sinkpad);
       GST_LOCK (basesink);
+      /* unlock clock wait if any */
       if (basesink->clock_id) {
         gst_clock_id_unschedule (basesink->clock_id);
       }
-      eos = basesink->eos;
       GST_UNLOCK (basesink);
 
-      GST_PREROLL_LOCK (basesink->sinkpad);
+      /* unlock any subclasses */
+      if (bclass->unlock)
+        bclass->unlock (basesink);
+
       /* if we don't have a preroll buffer and we have not received EOS,
        * we need to wait for a preroll */
-      if (!basesink->have_preroll && !eos) {
+      if (!basesink->have_preroll && !basesink->eos) {
         basesink->need_preroll = TRUE;
         ret = GST_STATE_ASYNC;
       }
@@ -887,20 +940,6 @@ gst_basesink_change_state (GstElement * element)
       break;
     }
     case GST_STATE_PAUSED_TO_READY:
-      /* flush out the data thread if it's locked in finish_preroll */
-      GST_PREROLL_LOCK (basesink->sinkpad);
-
-      gst_basesink_preroll_queue_flush (basesink);
-
-      if (basesink->have_preroll)
-        GST_PREROLL_SIGNAL (basesink->sinkpad);
-
-      basesink->need_preroll = FALSE;
-      basesink->have_preroll = FALSE;
-      GST_PREROLL_UNLOCK (basesink->sinkpad);
-
-      /* clear EOS state */
-      basesink->eos = FALSE;
       break;
     case GST_STATE_READY_TO_NULL:
       break;
@@ -908,6 +947,5 @@ gst_basesink_change_state (GstElement * element)
       break;
   }
 
-  DEBUG ("state change < %p %x\n", basesink, transition);
   return ret;
 }
index c54ee85..7d75d49 100644 (file)
 GST_DEBUG_CATEGORY_STATIC (gst_basesink_debug);
 #define GST_CAT_DEFAULT gst_basesink_debug
 
-/* #define DEBUGGING */
-#ifdef DEBUGGING
-#define DEBUG(str,args...) g_print (str,##args)
-#else
-#define DEBUG(str,args...)
-#endif
-
 /* BaseSink signals and properties */
 enum
 {
@@ -104,14 +97,15 @@ static void gst_basesink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
 
 static GstElementStateReturn gst_basesink_change_state (GstElement * element);
 
-static GstFlowReturn gst_basesink_chain_unlocked (GstPad * pad,
-    GstBuffer * buffer);
+static GstFlowReturn gst_basesink_chain (GstPad * pad, GstBuffer * buffer);
 static void gst_basesink_loop (GstPad * pad);
 static GstFlowReturn gst_basesink_chain (GstPad * pad, GstBuffer * buffer);
 static gboolean gst_basesink_activate (GstPad * pad, GstActivateMode mode);
 static gboolean gst_basesink_event (GstPad * pad, GstEvent * event);
 static inline GstFlowReturn gst_basesink_handle_buffer (GstBaseSink * basesink,
     GstBuffer * buf);
+static inline gboolean gst_basesink_handle_event (GstBaseSink * basesink,
+    GstEvent * event);
 
 static void
 gst_basesink_base_init (gpointer g_class)
@@ -304,15 +298,18 @@ gst_basesink_set_property (GObject * object, guint prop_id,
 
   sink = GST_BASESINK (object);
 
-  GST_LOCK (sink);
   switch (prop_id) {
     case PROP_HAS_LOOP:
+      GST_LOCK (sink);
       sink->has_loop = g_value_get_boolean (value);
       gst_basesink_set_all_pad_functions (sink);
+      GST_UNLOCK (sink);
       break;
     case PROP_HAS_CHAIN:
+      GST_LOCK (sink);
       sink->has_chain = g_value_get_boolean (value);
       gst_basesink_set_all_pad_functions (sink);
+      GST_UNLOCK (sink);
       break;
     case PROP_PREROLL_QUEUE_LEN:
       /* preroll lock necessary to serialize with finish_preroll */
@@ -324,7 +321,6 @@ gst_basesink_set_property (GObject * object, guint prop_id,
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
   }
-  GST_UNLOCK (sink);
 }
 
 static void
@@ -374,59 +370,35 @@ gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
 }
 
 /* with PREROLL_LOCK */
-static void
-gst_basesink_preroll_queue_push (GstBaseSink * basesink, GstPad * pad,
-    GstBuffer * buffer)
-{
-  /* if we don't have a buffer we just start blocking */
-  if (buffer == NULL)
-    goto block;
-
-  /* first buffer */
-  if (basesink->preroll_queue->length == 0) {
-    GstBaseSinkClass *bclass = GST_BASESINK_GET_CLASS (basesink);
-
-    GST_DEBUG ("preroll buffer with TS: %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
-    if (bclass->preroll)
-      bclass->preroll (basesink, buffer);
-  }
-
-  if (basesink->preroll_queue->length < basesink->preroll_queue_max_len) {
-    DEBUG ("push %p %p\n", basesink, buffer);
-    g_queue_push_tail (basesink->preroll_queue, buffer);
-    return;
-  }
-
-block:
-  /* block until the state changes, or we get a flush, or something */
-  DEBUG ("block %p %p\n", basesink, buffer);
-  GST_DEBUG ("element %s waiting to finish preroll",
-      GST_ELEMENT_NAME (basesink));
-  basesink->need_preroll = FALSE;
-  basesink->have_preroll = TRUE;
-  GST_PREROLL_WAIT (pad);
-  GST_DEBUG ("done preroll");
-  basesink->have_preroll = FALSE;
-}
-
-/* with PREROLL_LOCK */
 static GstFlowReturn
 gst_basesink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad)
 {
-  GstBuffer *buf;
+  GstMiniObject *obj;
   GQueue *q = basesink->preroll_queue;
   GstFlowReturn ret;
 
   ret = GST_FLOW_OK;
 
   if (q) {
-    DEBUG ("empty queue\n");
-    while ((buf = g_queue_pop_head (q))) {
-      DEBUG ("pop %p\n", buf);
-      ret = gst_basesink_handle_buffer (basesink, buf);
+    GST_DEBUG ("emptying queue");
+    while ((obj = g_queue_pop_head (q))) {
+      /* we release the preroll lock while pushing so that we
+       * can still flush it while blocking on the clock or
+       * inside the element. */
+      GST_PREROLL_UNLOCK (pad);
+
+      if (GST_IS_BUFFER (obj)) {
+        GST_DEBUG ("poped buffer %p", obj);
+        ret = gst_basesink_handle_buffer (basesink, GST_BUFFER (obj));
+      } else {
+        GST_DEBUG ("poped event %p", obj);
+        gst_basesink_handle_event (basesink, GST_EVENT (obj));
+        ret = GST_FLOW_OK;
+      }
+
+      GST_PREROLL_LOCK (pad);
     }
-    DEBUG ("queue len %p %d\n", basesink, q->length);
+    GST_DEBUG ("queue empty");
   }
   return ret;
 }
@@ -435,83 +407,126 @@ gst_basesink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad)
 static void
 gst_basesink_preroll_queue_flush (GstBaseSink * basesink)
 {
-  GstBuffer *buf;
+  GstMiniObject *obj;
   GQueue *q = basesink->preroll_queue;
 
-  DEBUG ("flush %p\n", basesink);
+  GST_DEBUG ("flushing queue %p", basesink);
   if (q) {
-    while ((buf = g_queue_pop_head (q))) {
-      DEBUG ("pop %p\n", buf);
-      gst_buffer_unref (buf);
+    while ((obj = g_queue_pop_head (q))) {
+      GST_DEBUG ("poped %p", obj);
+      gst_mini_object_unref (obj);
     }
   }
+  /* we can't have EOS anymore now */
+  basesink->eos = FALSE;
 }
 
-typedef enum
-{
-  PREROLL_QUEUEING,
-  PREROLL_PLAYING,
-  PREROLL_FLUSHING,
-  PREROLL_ERROR
-} PrerollReturn;
-
 /* with STREAM_LOCK */
-PrerollReturn
-gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad,
-    GstBuffer * buffer)
+static GstFlowReturn
+gst_basesink_handle_object (GstBaseSink * basesink, GstPad * pad,
+    GstMiniObject * obj)
 {
-  gboolean flushing;
+  gint length;
+  gboolean have_event;
 
-  DEBUG ("finish preroll %p <\n", basesink);
-  /* lock order is important */
-  GST_STATE_LOCK (basesink);
   GST_PREROLL_LOCK (pad);
-  DEBUG ("finish preroll %p >\n", basesink);
+  /* push object on the queue */
+  GST_DEBUG ("push on queue %p %p", basesink, obj);
+  g_queue_push_tail (basesink->preroll_queue, obj);
+
+  have_event = GST_IS_EVENT (obj);
+
+  if (have_event && GST_EVENT_TYPE (obj) == GST_EVENT_EOS) {
+    basesink->eos = TRUE;
+  }
+
+  /* check if we are prerolling */
   if (!basesink->need_preroll)
     goto no_preroll;
 
+  length = basesink->preroll_queue->length;
+  /* this is the first object we queued */
+  if (length == 1) {
+    GST_DEBUG ("do preroll %p", obj);
+
+    /* if it's a buffer, we need to call the preroll method */
+    if (GST_IS_BUFFER (obj)) {
+      GstBaseSinkClass *bclass;
+
+      bclass = GST_BASESINK_GET_CLASS (basesink);
+      if (bclass->preroll)
+        bclass->preroll (basesink, GST_BUFFER (obj));
+    }
+  }
+  /* we are prerolling */
+  GST_DEBUG ("finish preroll %p >", basesink);
+  GST_PREROLL_UNLOCK (pad);
+
+  /* have to release STREAM_LOCK as we cannot take the STATE_LOCK
+   * inside the STREAM_LOCK */
+  GST_STREAM_UNLOCK (pad);
+
+  /* now we commit our state */
+  GST_STATE_LOCK (basesink);
+  GST_DEBUG ("commit state %p >", basesink);
   gst_element_commit_state (GST_ELEMENT (basesink));
   GST_STATE_UNLOCK (basesink);
 
-  gst_basesink_preroll_queue_push (basesink, pad, buffer);
+  /* reacquire stream lock, pad could be flushing now */
+  GST_STREAM_LOCK (pad);
 
   GST_LOCK (pad);
-  flushing = GST_PAD_IS_FLUSHING (pad);
-  GST_UNLOCK (pad);
-  if (flushing)
+  if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
     goto flushing;
+  GST_UNLOCK (pad);
 
-  if (basesink->need_preroll)
-    goto still_queueing;
-
-  GST_DEBUG ("done preroll");
-
-  gst_basesink_preroll_queue_empty (basesink, pad);
+  /* and wait if needed */
+  GST_PREROLL_LOCK (pad);
+  /* it is possible that the application set the state to PLAYING
+   * now in which case we don't need to block anymore. */
+  if (!basesink->need_preroll)
+    goto no_preroll;
 
+  length = basesink->preroll_queue->length;
+  GST_DEBUG ("prerolled length %d", length);
+  /* see if we need to block now. We cannot block on events, only
+   * on buffers, the reason is that events can be sent from the
+   * application thread and we don't want to block there. */
+  if (length > basesink->preroll_queue_max_len && !have_event) {
+    /* block until the state changes, or we get a flush, or something */
+    GST_DEBUG ("element %s waiting to finish preroll",
+        GST_ELEMENT_NAME (basesink));
+    basesink->have_preroll = TRUE;
+    GST_PREROLL_WAIT (pad);
+    GST_DEBUG ("done preroll");
+    basesink->have_preroll = FALSE;
+  }
   GST_PREROLL_UNLOCK (pad);
 
-  return PREROLL_PLAYING;
+  GST_LOCK (pad);
+  if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
+    goto flushing;
+  GST_UNLOCK (pad);
+
+  return GST_FLOW_OK;
 
 no_preroll:
   {
+    GstFlowReturn ret;
+
+    GST_DEBUG ("no preroll needed");
     /* maybe it was another sink that blocked in preroll, need to check for
        buffers to drain */
-    if (basesink->preroll_queue->length)
-      gst_basesink_preroll_queue_empty (basesink, pad);
+    ret = gst_basesink_preroll_queue_empty (basesink, pad);
     GST_PREROLL_UNLOCK (pad);
-    GST_STATE_UNLOCK (basesink);
-    return PREROLL_PLAYING;
+
+    return ret;
   }
 flushing:
   {
+    GST_UNLOCK (pad);
     GST_DEBUG ("pad is flushing");
-    GST_PREROLL_UNLOCK (pad);
-    return PREROLL_FLUSHING;
-  }
-still_queueing:
-  {
-    GST_PREROLL_UNLOCK (pad);
-    return PREROLL_QUEUEING;
+    return GST_FLOW_WRONG_STATE;
   }
 }
 
@@ -526,83 +541,66 @@ gst_basesink_event (GstPad * pad, GstEvent * event)
 
   bclass = GST_BASESINK_GET_CLASS (basesink);
 
-  DEBUG ("event %p\n", basesink);
-
-  if (bclass->event)
-    bclass->event (basesink, event);
+  GST_DEBUG ("event %p", event);
 
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_EOS:
     {
-      PrerollReturn pre_ret;
-      gboolean need_eos;
+      GstFlowReturn ret;
 
       GST_STREAM_LOCK (pad);
-
       /* EOS also finishes the preroll */
-      pre_ret = gst_basesink_finish_preroll (basesink, pad, NULL);
-
-      if (pre_ret == PREROLL_PLAYING) {
-        GST_LOCK (basesink);
-        need_eos = basesink->eos = TRUE;
-        if (basesink->clock) {
-          /* wait for last buffer to finish if we have a valid end time */
-          if (GST_CLOCK_TIME_IS_VALID (basesink->end_time)) {
-            basesink->clock_id = gst_clock_new_single_shot_id (basesink->clock,
-                basesink->end_time + GST_ELEMENT (basesink)->base_time);
-            GST_UNLOCK (basesink);
-
-            gst_clock_id_wait (basesink->clock_id, NULL);
-
-            GST_LOCK (basesink);
-            if (basesink->clock_id) {
-              gst_clock_id_unref (basesink->clock_id);
-              basesink->clock_id = NULL;
-            }
-            basesink->end_time = GST_CLOCK_TIME_NONE;
-            need_eos = basesink->eos;
-          }
-          GST_UNLOCK (basesink);
-
-          /* if we are still EOS, we can post the EOS message */
-          if (need_eos) {
-            /* ok, now we can post the message */
-            gst_element_post_message (GST_ELEMENT (basesink),
-                gst_message_new_eos (GST_OBJECT (basesink)));
-          }
-        }
-      }
-
+      ret = gst_basesink_handle_object (basesink, pad, GST_MINI_OBJECT (event));
       GST_STREAM_UNLOCK (pad);
       break;
     }
     case GST_EVENT_DISCONTINUOUS:
+    {
+      GstFlowReturn ret;
+
       GST_STREAM_LOCK (pad);
       if (basesink->clock) {
         //gint64 value = GST_EVENT_DISCONT_OFFSET (event, 0).value;
       }
+      ret = gst_basesink_handle_object (basesink, pad, GST_MINI_OBJECT (event));
       GST_STREAM_UNLOCK (pad);
       break;
+    }
     case GST_EVENT_FLUSH:
       /* make sure we are not blocked on the clock also clear any pending
        * eos state. */
+      if (bclass->event)
+        bclass->event (basesink, event);
+
       if (!GST_EVENT_FLUSH_DONE (event)) {
+        GST_PREROLL_LOCK (pad);
+        /* we need preroll after the flush */
+        basesink->need_preroll = TRUE;
+        gst_basesink_preroll_queue_flush (basesink);
+        /* unlock from a possible state change/preroll */
+        GST_PREROLL_SIGNAL (pad);
+
         GST_LOCK (basesink);
-        basesink->eos = FALSE;
         if (basesink->clock_id) {
           gst_clock_id_unschedule (basesink->clock_id);
         }
         GST_UNLOCK (basesink);
-
-        /* unlock from a possible state change/preroll */
-        GST_PREROLL_LOCK (pad);
-        basesink->need_preroll = TRUE;
-        gst_basesink_preroll_queue_flush (basesink);
-        GST_PREROLL_SIGNAL (pad);
         GST_PREROLL_UNLOCK (pad);
+
+        /* and we need to commit our state again on the next
+         * prerolled buffer */
+        GST_STATE_LOCK (basesink);
+        GST_STREAM_LOCK (pad);
+        gst_element_lost_state (GST_ELEMENT (basesink));
+        GST_STREAM_UNLOCK (pad);
+        GST_STATE_UNLOCK (basesink);
+      } else {
+        /* now we are completely unblocked and the _chain method
+         * will return */
+        GST_STREAM_LOCK (pad);
+        GST_STREAM_UNLOCK (pad);
       }
-      /* now we are completely unblocked and the _chain method
-       * will return */
+
       break;
     default:
       result = gst_pad_event_default (pad, event);
@@ -640,11 +638,12 @@ gst_basesink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
  * 4) wait on the clock, this blocks
  * 5) unref the clockid again
  */
-static void
+static gboolean
 gst_basesink_do_sync (GstBaseSink * basesink, GstBuffer * buffer)
 {
+  gboolean result = TRUE;
+
   if (basesink->clock) {
-    GstClockReturn ret;
     GstClockTime start, end;
     GstBaseSinkClass *bclass;
 
@@ -657,6 +656,8 @@ gst_basesink_do_sync (GstBaseSink * basesink, GstBuffer * buffer)
         ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
 
     if (GST_CLOCK_TIME_IS_VALID (start)) {
+      GstClockReturn ret;
+
       /* save clock id so that we can unlock it if needed */
       GST_LOCK (basesink);
       basesink->clock_id = gst_clock_new_single_shot_id (basesink->clock,
@@ -671,13 +672,79 @@ gst_basesink_do_sync (GstBaseSink * basesink, GstBuffer * buffer)
         gst_clock_id_unref (basesink->clock_id);
         basesink->clock_id = NULL;
       }
-      /* FIXME, don't mess with end_time here */
-      basesink->end_time = GST_CLOCK_TIME_NONE;
       GST_UNLOCK (basesink);
 
       GST_LOG_OBJECT (basesink, "clock entry done: %d", ret);
+      if (ret == GST_CLOCK_UNSCHEDULED)
+        result = FALSE;
     }
   }
+  return result;
+}
+
+
+/* handle an event
+ *
+ * 2) render the event
+ * 3) unref the event
+ */
+static inline gboolean
+gst_basesink_handle_event (GstBaseSink * basesink, GstEvent * event)
+{
+  GstBaseSinkClass *bclass;
+  gboolean ret;
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_EOS:
+      GST_LOCK (basesink);
+      if (basesink->clock) {
+        /* wait for last buffer to finish if we have a valid end time */
+        if (GST_CLOCK_TIME_IS_VALID (basesink->end_time)) {
+          basesink->clock_id = gst_clock_new_single_shot_id (basesink->clock,
+              basesink->end_time + GST_ELEMENT (basesink)->base_time);
+          GST_UNLOCK (basesink);
+
+          gst_clock_id_wait (basesink->clock_id, NULL);
+
+          GST_LOCK (basesink);
+          if (basesink->clock_id) {
+            gst_clock_id_unref (basesink->clock_id);
+            basesink->clock_id = NULL;
+          }
+          basesink->end_time = GST_CLOCK_TIME_NONE;
+        }
+      }
+      GST_UNLOCK (basesink);
+      break;
+    default:
+      break;
+  }
+
+  bclass = GST_BASESINK_GET_CLASS (basesink);
+  if (bclass->event)
+    ret = bclass->event (basesink, event);
+  else
+    ret = TRUE;
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_EOS:
+      GST_PREROLL_LOCK (basesink->sinkpad);
+      /* if we are still EOS, we can post the EOS message */
+      if (basesink->eos) {
+        /* ok, now we can post the message */
+        gst_element_post_message (GST_ELEMENT (basesink),
+            gst_message_new_eos (GST_OBJECT (basesink)));
+      }
+      GST_PREROLL_UNLOCK (basesink->sinkpad);
+      break;
+    default:
+      break;
+  }
+
+  GST_DEBUG ("event unref %p %p", basesink, event);
+  gst_event_unref (event);
+
+  return ret;
 }
 
 /* handle a buffer
@@ -700,46 +767,21 @@ gst_basesink_handle_buffer (GstBaseSink * basesink, GstBuffer * buf)
   else
     ret = GST_FLOW_OK;
 
-  DEBUG ("unref %p %p\n", basesink, buf);
+  GST_DEBUG ("buffer unref after render %p", basesink, buf);
   gst_buffer_unref (buf);
 
   return ret;
 }
 
 static GstFlowReturn
-gst_basesink_chain_unlocked (GstPad * pad, GstBuffer * buf)
+gst_basesink_chain (GstPad * pad, GstBuffer * buf)
 {
   GstBaseSink *basesink;
-  PrerollReturn result;
+  GstFlowReturn result;
 
   basesink = GST_BASESINK (GST_OBJECT_PARENT (pad));
 
-  DEBUG ("chain_unlocked %p\n", basesink);
-
-  result = gst_basesink_finish_preroll (basesink, pad, buf);
-
-  DEBUG ("chain_unlocked %p after, result %d\n", basesink, result);
-
-  switch (result) {
-    case PREROLL_QUEUEING:
-      return GST_FLOW_OK;
-    case PREROLL_PLAYING:
-      return gst_basesink_handle_buffer (basesink, buf);
-    case PREROLL_FLUSHING:
-      gst_buffer_unref (buf);
-      return GST_FLOW_UNEXPECTED;
-    default:
-      g_assert_not_reached ();
-      return GST_FLOW_ERROR;
-  }
-}
-
-static GstFlowReturn
-gst_basesink_chain (GstPad * pad, GstBuffer * buf)
-{
-  GstFlowReturn result;
-
-  result = gst_basesink_chain_unlocked (pad, buf);
+  result = gst_basesink_handle_object (basesink, pad, GST_MINI_OBJECT (buf));
 
   return result;
 }
@@ -761,7 +803,7 @@ gst_basesink_loop (GstPad * pad)
   if (result != GST_FLOW_OK)
     goto paused;
 
-  result = gst_basesink_chain_unlocked (pad, buf);
+  result = gst_basesink_chain (pad, buf);
   if (result != GST_FLOW_OK)
     goto paused;
 
@@ -797,6 +839,7 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode)
       break;
     case GST_ACTIVATE_NONE:
       /* step 1, unblock clock sync (if any) or any other blocking thing */
+      GST_PREROLL_LOCK (pad);
       GST_LOCK (basesink);
       if (basesink->clock_id) {
         gst_clock_id_unschedule (basesink->clock_id);
@@ -807,8 +850,9 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode)
       if (bclass->unlock)
         bclass->unlock (basesink);
 
-      /* unlock preroll */
-      GST_PREROLL_LOCK (pad);
+      /* flush out the data thread if it's locked in finish_preroll */
+      gst_basesink_preroll_queue_flush (basesink);
+      basesink->need_preroll = FALSE;
       GST_PREROLL_SIGNAL (pad);
       GST_PREROLL_UNLOCK (pad);
 
@@ -828,8 +872,6 @@ gst_basesink_change_state (GstElement * element)
   GstBaseSink *basesink = GST_BASESINK (element);
   GstElementState transition = GST_STATE_TRANSITION (element);
 
-  DEBUG ("state change > %p %x\n", basesink, transition);
-
   switch (transition) {
     case GST_STATE_NULL_TO_READY:
       break;
@@ -838,25 +880,31 @@ gst_basesink_change_state (GstElement * element)
        * is no data flow in READY so we can safely assume we need to preroll. */
       basesink->offset = 0;
       GST_PREROLL_LOCK (basesink->sinkpad);
-      basesink->need_preroll = TRUE;
       basesink->have_preroll = FALSE;
+      basesink->need_preroll = TRUE;
       GST_PREROLL_UNLOCK (basesink->sinkpad);
       ret = GST_STATE_ASYNC;
       break;
     case GST_STATE_PAUSED_TO_PLAYING:
+    {
       GST_PREROLL_LOCK (basesink->sinkpad);
+      /* if we have EOS, we should empty the queue now as there will
+       * be no more data received in the chain function.
+       * FIXME, this could block the state change function too long when
+       * we are pushing and syncing the buffers, better start a new
+       * thread to do this. */
+      if (basesink->eos) {
+        gst_basesink_preroll_queue_empty (basesink, basesink->sinkpad);
+      }
+      /* don't need the preroll anymore */
+      basesink->need_preroll = FALSE;
       if (basesink->have_preroll) {
         /* now let it play */
         GST_PREROLL_SIGNAL (basesink->sinkpad);
-      } else {
-        /* FIXME. We do not have a preroll and we don't need it anymore 
-         * now, this is a case we want to avoid. One way would be to make
-         * a 'lost state' function that makes get_state return PAUSED with
-         * ASYNC to indicate that we are prerolling again. */
-        basesink->need_preroll = FALSE;
       }
       GST_PREROLL_UNLOCK (basesink->sinkpad);
       break;
+    }
     default:
       break;
   }
@@ -866,20 +914,25 @@ gst_basesink_change_state (GstElement * element)
   switch (transition) {
     case GST_STATE_PLAYING_TO_PAUSED:
     {
-      gboolean eos;
+      GstBaseSinkClass *bclass;
 
-      /* unlock clock wait if any */
+      bclass = GST_BASESINK_GET_CLASS (basesink);
+
+      GST_PREROLL_LOCK (basesink->sinkpad);
       GST_LOCK (basesink);
+      /* unlock clock wait if any */
       if (basesink->clock_id) {
         gst_clock_id_unschedule (basesink->clock_id);
       }
-      eos = basesink->eos;
       GST_UNLOCK (basesink);
 
-      GST_PREROLL_LOCK (basesink->sinkpad);
+      /* unlock any subclasses */
+      if (bclass->unlock)
+        bclass->unlock (basesink);
+
       /* if we don't have a preroll buffer and we have not received EOS,
        * we need to wait for a preroll */
-      if (!basesink->have_preroll && !eos) {
+      if (!basesink->have_preroll && !basesink->eos) {
         basesink->need_preroll = TRUE;
         ret = GST_STATE_ASYNC;
       }
@@ -887,20 +940,6 @@ gst_basesink_change_state (GstElement * element)
       break;
     }
     case GST_STATE_PAUSED_TO_READY:
-      /* flush out the data thread if it's locked in finish_preroll */
-      GST_PREROLL_LOCK (basesink->sinkpad);
-
-      gst_basesink_preroll_queue_flush (basesink);
-
-      if (basesink->have_preroll)
-        GST_PREROLL_SIGNAL (basesink->sinkpad);
-
-      basesink->need_preroll = FALSE;
-      basesink->have_preroll = FALSE;
-      GST_PREROLL_UNLOCK (basesink->sinkpad);
-
-      /* clear EOS state */
-      basesink->eos = FALSE;
       break;
     case GST_STATE_READY_TO_NULL:
       break;
@@ -908,6 +947,5 @@ gst_basesink_change_state (GstElement * element)
       break;
   }
 
-  DEBUG ("state change < %p %x\n", basesink, transition);
   return ret;
 }