gst/: Implement gst_pad_pause/start/stop_task(), take STREAM lock in task function.
authorWim Taymans <wim.taymans@gmail.com>
Wed, 25 May 2005 11:50:11 +0000 (11:50 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Wed, 25 May 2005 11:50:11 +0000 (11:50 +0000)
Original commit message from CVS:
* gst/base/gstadapter.c: (gst_adapter_peek), (gst_adapter_flush):
* gst/base/gstbasesink.c: (gst_basesink_preroll_queue_push),
(gst_basesink_finish_preroll), (gst_basesink_chain),
(gst_basesink_loop), (gst_basesink_activate),
(gst_basesink_change_state):
* gst/base/gstbasesrc.c: (gst_basesrc_do_seek),
(gst_basesrc_get_range), (gst_basesrc_loop),
(gst_basesrc_activate):
* gst/elements/gsttee.c: (gst_tee_sink_activate):
* gst/gstpad.c: (gst_pad_dispose), (gst_real_pad_class_init),
(gst_real_pad_init), (gst_real_pad_set_property),
(gst_real_pad_get_property), (gst_pad_set_active),
(gst_pad_is_active), (gst_pad_get_query_types), (gst_pad_unlink),
(gst_pad_link_prepare), (gst_pad_link), (gst_pad_get_real_parent),
(gst_real_pad_get_caps_unlocked), (gst_pad_peer_get_caps),
(gst_pad_accept_caps), (gst_pad_get_peer), (gst_pad_realize),
(gst_pad_event_default_dispatch), (gst_pad_event_default),
(gst_pad_dispatcher), (gst_pad_query), (gst_real_pad_dispose),
(gst_pad_save_thyself), (handle_pad_block), (gst_pad_chain),
(gst_pad_push), (gst_pad_get_range), (gst_pad_pull_range),
(gst_pad_send_event), (gst_pad_start_task), (gst_pad_pause_task),
(gst_pad_stop_task):
* gst/gstpad.h:
* gst/gstqueue.c: (gst_queue_handle_sink_event), (gst_queue_chain),
(gst_queue_loop), (gst_queue_src_activate):
* gst/gsttask.c: (gst_task_init), (gst_task_set_lock),
(gst_task_get_state):
* gst/gsttask.h:
* gst/schedulers/threadscheduler.c:
(gst_thread_scheduler_task_start), (gst_thread_scheduler_func):
Implement gst_pad_pause/start/stop_task(), take STREAM lock
in task function.
Remove ACTIVE pad flag, use FLUSHING everywhere
Added _pad_chain(), _pad_get_range() to call chain/getrange
functions.
Add locks around IS_FLUSHING when reading.
Take STREAM lock in chain(), get_range() functions so plugins
don't need to take it anymore.

16 files changed:
ChangeLog
gst/base/gstadapter.c
gst/base/gstbasesink.c
gst/base/gstbasesrc.c
gst/elements/gsttee.c
gst/gstpad.c
gst/gstpad.h
gst/gstqueue.c
gst/gsttask.c
gst/gsttask.h
gst/schedulers/threadscheduler.c
libs/gst/base/gstadapter.c
libs/gst/base/gstbasesink.c
libs/gst/base/gstbasesrc.c
plugins/elements/gstqueue.c
plugins/elements/gsttee.c

index 9196257..34b566c 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,48 @@
 2005-05-25  Wim Taymans  <wim@fluendo.com>
 
+       * gst/base/gstadapter.c: (gst_adapter_peek), (gst_adapter_flush):
+       * gst/base/gstbasesink.c: (gst_basesink_preroll_queue_push),
+       (gst_basesink_finish_preroll), (gst_basesink_chain),
+       (gst_basesink_loop), (gst_basesink_activate),
+       (gst_basesink_change_state):
+       * gst/base/gstbasesrc.c: (gst_basesrc_do_seek),
+       (gst_basesrc_get_range), (gst_basesrc_loop),
+       (gst_basesrc_activate):
+       * gst/elements/gsttee.c: (gst_tee_sink_activate):
+       * gst/gstpad.c: (gst_pad_dispose), (gst_real_pad_class_init),
+       (gst_real_pad_init), (gst_real_pad_set_property),
+       (gst_real_pad_get_property), (gst_pad_set_active),
+       (gst_pad_is_active), (gst_pad_get_query_types), (gst_pad_unlink),
+       (gst_pad_link_prepare), (gst_pad_link), (gst_pad_get_real_parent),
+       (gst_real_pad_get_caps_unlocked), (gst_pad_peer_get_caps),
+       (gst_pad_accept_caps), (gst_pad_get_peer), (gst_pad_realize),
+       (gst_pad_event_default_dispatch), (gst_pad_event_default),
+       (gst_pad_dispatcher), (gst_pad_query), (gst_real_pad_dispose),
+       (gst_pad_save_thyself), (handle_pad_block), (gst_pad_chain),
+       (gst_pad_push), (gst_pad_get_range), (gst_pad_pull_range),
+       (gst_pad_send_event), (gst_pad_start_task), (gst_pad_pause_task),
+       (gst_pad_stop_task):
+       * gst/gstpad.h:
+       * gst/gstqueue.c: (gst_queue_handle_sink_event), (gst_queue_chain),
+       (gst_queue_loop), (gst_queue_src_activate):
+       * gst/gsttask.c: (gst_task_init), (gst_task_set_lock),
+       (gst_task_get_state):
+       * gst/gsttask.h:
+       * gst/schedulers/threadscheduler.c:
+       (gst_thread_scheduler_task_start), (gst_thread_scheduler_func):
+       Implement gst_pad_pause/start/stop_task(), take STREAM lock
+       in task function.
+       Remove ACTIVE pad flag, use FLUSHING everywhere
+       Added _pad_chain(), _pad_get_range() to call chain/getrange 
+       functions.
+       Add locks around IS_FLUSHING when reading.
+       Take STREAM lock in chain(), get_range() functions so plugins
+       don't need to take it anymore.
+       
+
+
+2005-05-25  Wim Taymans  <wim@fluendo.com>
+
        * tools/gst-launch.c: (event_loop):
        Unref message after using its contents instead of
        before.
index 993c78c..09239d0 100644 (file)
@@ -160,7 +160,7 @@ gst_adapter_peek (GstAdapter * adapter, guint size)
 
   if (adapter->assembled_size < size) {
     adapter->assembled_size = (size / DEFAULT_SIZE + 1) * DEFAULT_SIZE;
-    GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u\n",
+    GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u",
         adapter->assembled_size);
     adapter->assembled_data =
         g_realloc (adapter->assembled_data, adapter->assembled_size);
@@ -198,7 +198,7 @@ gst_adapter_flush (GstAdapter * adapter, guint flush)
   g_return_if_fail (flush > 0);
   g_return_if_fail (flush <= adapter->size);
 
-  GST_LOG_OBJECT (adapter, "flushing %u bytes\n", flush);
+  GST_LOG_OBJECT (adapter, "flushing %u bytes", flush);
   adapter->size -= flush;
   adapter->assembled_len = 0;
   while (flush > 0) {
index e677f0e..b8604c8 100644 (file)
@@ -378,6 +378,8 @@ gst_basesink_preroll_queue_push (GstBaseSink * basesink, GstPad * pad,
   if (basesink->preroll_queue->length == 0) {
     GstBaseSinkClass *bclass = GST_BASESINK_GET_CLASS (basesink);
 
+    GST_DEBUG ("preroll buffer with TS: %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
     if (bclass->preroll)
       bclass->preroll (basesink, buffer);
   }
@@ -448,7 +450,7 @@ PrerollReturn
 gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad,
     GstBuffer * buffer)
 {
-  gboolean usable;
+  gboolean flushing;
 
   DEBUG ("finish preroll %p <\n", basesink);
   /* lock order is important */
@@ -461,13 +463,13 @@ gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad,
   gst_element_commit_state (GST_ELEMENT (basesink));
   GST_STATE_UNLOCK (basesink);
 
+  gst_basesink_preroll_queue_push (basesink, pad, buffer);
+
   GST_LOCK (pad);
-  usable = !GST_RPAD_IS_FLUSHING (pad) && GST_RPAD_IS_ACTIVE (pad);
+  flushing = GST_RPAD_IS_FLUSHING (pad);
   GST_UNLOCK (pad);
-  if (!usable)
-    goto unusable;
-
-  gst_basesink_preroll_queue_push (basesink, pad, buffer);
+  if (flushing)
+    goto flushing;
 
   if (basesink->need_preroll)
     goto still_queueing;
@@ -490,7 +492,7 @@ no_preroll:
     GST_STATE_UNLOCK (basesink);
     return PREROLL_PLAYING;
   }
-unusable:
+flushing:
   {
     GST_DEBUG ("pad is flushing");
     GST_PREROLL_UNLOCK (pad);
@@ -726,12 +728,8 @@ gst_basesink_chain (GstPad * pad, GstBuffer * buf)
   g_assert (GST_BASESINK (GST_OBJECT_PARENT (pad))->pad_mode ==
       GST_ACTIVATE_PUSH);
 
-  GST_STREAM_LOCK (pad);
-
   result = gst_basesink_chain_unlocked (pad, buf);
 
-  GST_STREAM_UNLOCK (pad);
-
   return result;
 }
 
@@ -748,8 +746,6 @@ gst_basesink_loop (GstPad * pad)
 
   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
 
-  GST_STREAM_LOCK (pad);
-
   result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf);
   if (result != GST_FLOW_OK)
     goto paused;
@@ -759,12 +755,10 @@ gst_basesink_loop (GstPad * pad)
     goto paused;
 
   /* default */
-  GST_STREAM_UNLOCK (pad);
   return;
 
 paused:
-  gst_task_pause (GST_RPAD_TASK (pad));
-  GST_STREAM_UNLOCK (pad);
+  gst_pad_pause_task (pad);
   return;
 }
 
@@ -787,16 +781,8 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode)
       /* if we have a scheduler we can start the task */
       g_return_val_if_fail (basesink->has_loop, FALSE);
       gst_pad_peer_set_active (pad, mode);
-      if (GST_ELEMENT_SCHEDULER (basesink)) {
-        GST_STREAM_LOCK (pad);
-        GST_RPAD_TASK (pad) =
-            gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesink),
-            (GstTaskFunction) gst_basesink_loop, pad);
-
-        gst_task_start (GST_RPAD_TASK (pad));
-        GST_STREAM_UNLOCK (pad);
-        result = TRUE;
-      }
+      result =
+          gst_pad_start_task (pad, (GstTaskFunction) gst_basesink_loop, pad);
       break;
     case GST_ACTIVATE_NONE:
       /* step 1, unblock clock sync (if any) or any other blocking thing */
@@ -816,16 +802,7 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode)
       GST_PREROLL_UNLOCK (pad);
 
       /* step 2, make sure streaming finishes */
-      GST_STREAM_LOCK (pad);
-      /* step 3, stop the task */
-      if (GST_RPAD_TASK (pad)) {
-        gst_task_stop (GST_RPAD_TASK (pad));
-        gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
-        GST_RPAD_TASK (pad) = NULL;
-      }
-      GST_STREAM_UNLOCK (pad);
-
-      result = TRUE;
+      result = gst_pad_stop_task (pad);
       break;
   }
   basesink->pad_mode = mode;
@@ -911,9 +888,6 @@ gst_basesink_change_state (GstElement * element)
       basesink->have_preroll = FALSE;
       GST_PREROLL_UNLOCK (basesink->sinkpad);
 
-      /* make sure the element is finished processing */
-      GST_STREAM_LOCK (basesink->sinkpad);
-      GST_STREAM_UNLOCK (basesink->sinkpad);
       /* clear EOS state */
       basesink->eos = FALSE;
       break;
index 43ddf5a..5ac5adb 100644 (file)
@@ -333,9 +333,8 @@ gst_basesrc_do_seek (GstBaseSrc * src, GstEvent * event)
   }
 
   /* and restart the task */
-  if (GST_RPAD_TASK (src->srcpad)) {
-    gst_task_start (GST_RPAD_TASK (src->srcpad));
-  }
+  gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_basesrc_loop,
+      src->srcpad);
   GST_STREAM_UNLOCK (src->srcpad);
 
   gst_event_unref (event);
@@ -447,7 +446,7 @@ gst_basesrc_get_property (GObject * object, guint prop_id, GValue * value,
 }
 
 static GstFlowReturn
-gst_basesrc_get_range_unlocked (GstPad * pad, guint64 offset, guint length,
+gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length,
     GstBuffer ** buf)
 {
   GstFlowReturn ret;
@@ -499,21 +498,6 @@ unexpected_length:
   }
 }
 
-static GstFlowReturn
-gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length,
-    GstBuffer ** ret)
-{
-  GstFlowReturn fret;
-
-  GST_STREAM_LOCK (pad);
-
-  fret = gst_basesrc_get_range_unlocked (pad, offset, length, ret);
-
-  GST_STREAM_UNLOCK (pad);
-
-  return fret;
-}
-
 static gboolean
 gst_basesrc_check_get_range (GstPad * pad)
 {
@@ -538,9 +522,7 @@ gst_basesrc_loop (GstPad * pad)
 
   src = GST_BASESRC (GST_OBJECT_PARENT (pad));
 
-  GST_STREAM_LOCK (pad);
-
-  ret = gst_basesrc_get_range_unlocked (pad, src->offset, src->blocksize, &buf);
+  ret = gst_basesrc_get_range (pad, src->offset, src->blocksize, &buf);
   if (ret != GST_FLOW_OK)
     goto eos;
 
@@ -550,22 +532,19 @@ gst_basesrc_loop (GstPad * pad)
   if (ret != GST_FLOW_OK)
     goto pause;
 
-  GST_STREAM_UNLOCK (pad);
   return;
 
 eos:
   {
     GST_DEBUG_OBJECT (src, "going to EOS");
-    gst_task_pause (GST_RPAD_TASK (pad));
+    gst_pad_pause_task (pad);
     gst_pad_push_event (pad, gst_event_new (GST_EVENT_EOS));
-    GST_STREAM_UNLOCK (pad);
     return;
   }
 pause:
   {
     GST_DEBUG_OBJECT (src, "pausing task");
-    gst_task_pause (GST_RPAD_TASK (pad));
-    GST_STREAM_UNLOCK (pad);
+    gst_pad_pause_task (pad);
     return;
   }
 }
@@ -733,17 +712,8 @@ gst_basesrc_activate (GstPad * pad, GstActivateMode mode)
   result = FALSE;
   switch (mode) {
     case GST_ACTIVATE_PUSH:
-      /* if we have a scheduler we can start the task */
-      if (GST_ELEMENT_SCHEDULER (basesrc)) {
-        GST_STREAM_LOCK (pad);
-        GST_RPAD_TASK (pad) =
-            gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesrc),
-            (GstTaskFunction) gst_basesrc_loop, pad);
-
-        gst_task_start (GST_RPAD_TASK (pad));
-        GST_STREAM_UNLOCK (pad);
-        result = TRUE;
-      }
+      result =
+          gst_pad_start_task (pad, (GstTaskFunction) gst_basesrc_loop, pad);
       break;
     case GST_ACTIVATE_PULL:
       result = TRUE;
@@ -753,16 +723,7 @@ gst_basesrc_activate (GstPad * pad, GstActivateMode mode)
       gst_basesrc_unlock (basesrc);
 
       /* step 2, make sure streaming finishes */
-      GST_STREAM_LOCK (pad);
-      /* step 3, stop the task */
-      if (GST_RPAD_TASK (pad)) {
-        gst_task_stop (GST_RPAD_TASK (pad));
-        gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
-        GST_RPAD_TASK (pad) = NULL;
-      }
-      GST_STREAM_UNLOCK (pad);
-
-      result = TRUE;
+      result = gst_pad_stop_task (pad);
       break;
   }
   return result;
index cd7213c..9806606 100644 (file)
@@ -370,27 +370,10 @@ gst_tee_sink_activate (GstPad * pad, GstActivateMode mode)
       break;
     case GST_ACTIVATE_PULL:
       g_return_val_if_fail (tee->has_sink_loop, FALSE);
-      if (GST_ELEMENT_SCHEDULER (tee)) {
-        GST_STREAM_LOCK (pad);
-        GST_RPAD_TASK (pad) =
-            gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (tee),
-            (GstTaskFunction) gst_tee_loop, pad);
-
-        gst_pad_start_task (pad);
-        GST_STREAM_UNLOCK (pad);
-        result = TRUE;
-      }
+      result = gst_pad_start_task (pad, (GstTaskFunction) gst_tee_loop, pad);
       break;
     case GST_ACTIVATE_NONE:
-      GST_STREAM_LOCK (pad);
-      if (GST_RPAD_TASK (pad)) {
-        gst_pad_stop_task (pad);
-        gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
-        GST_RPAD_TASK (pad) = NULL;
-      }
-      GST_STREAM_UNLOCK (pad);
-
-      result = TRUE;
+      result = gst_pad_stop_task (pad);
       break;
   }
   tee->sink_mode = mode;
index 8379f43..b8d4cbb 100644 (file)
@@ -126,7 +126,7 @@ gst_pad_init (GstPad * pad)
 static void
 gst_pad_dispose (GObject * object)
 {
-  GstPad *pad = GST_PAD (object);
+  GstPad *pad = GST_PAD_CAST (object);
 
   gst_pad_set_pad_template (pad, NULL);
   /* FIXME, we have links to many other things like caps
@@ -152,8 +152,7 @@ enum
 {
   REAL_ARG_0,
   REAL_ARG_CAPS,
-  REAL_ARG_ACTIVE
-      /* FILL ME */
+  /* FILL ME */
 };
 
 static void gst_real_pad_class_init (GstRealPadClass * klass);
@@ -219,9 +218,6 @@ gst_real_pad_class_init (GstRealPadClass * klass)
       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRealPadClass, request_link), NULL,
       NULL, gst_marshal_VOID__OBJECT, G_TYPE_NONE, 0);
 
-  g_object_class_install_property (G_OBJECT_CLASS (klass), REAL_ARG_ACTIVE,
-      g_param_spec_boolean ("active", "Active", "Whether the pad is active.",
-          TRUE, G_PARAM_READWRITE));
   g_object_class_install_property (G_OBJECT_CLASS (klass), REAL_ARG_CAPS,
       g_param_spec_boxed ("caps", "Caps", "The capabilities of the pad",
           GST_TYPE_CAPS, G_PARAM_READABLE));
@@ -251,7 +247,7 @@ gst_real_pad_init (GstRealPad * pad)
   pad->queryfunc = gst_pad_query_default;
   pad->intlinkfunc = gst_pad_get_internal_links_default;
 
-  GST_FLAG_UNSET (pad, GST_PAD_ACTIVE);
+  GST_RPAD_UNSET_FLUSHING (pad);
 
   pad->preroll_lock = g_mutex_new ();
   pad->preroll_cond = g_cond_new ();
@@ -271,10 +267,6 @@ gst_real_pad_set_property (GObject * object, guint prop_id,
   g_return_if_fail (GST_IS_PAD (object));
 
   switch (prop_id) {
-    case REAL_ARG_ACTIVE:
-      g_warning ("FIXME: not useful any more!!!");
-      gst_pad_set_active (GST_PAD (object), g_value_get_boolean (value));
-      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -288,9 +280,6 @@ gst_real_pad_get_property (GObject * object, guint prop_id,
   g_return_if_fail (GST_IS_PAD (object));
 
   switch (prop_id) {
-    case REAL_ARG_ACTIVE:
-      g_value_set_boolean (value, GST_FLAG_IS_SET (object, GST_PAD_ACTIVE));
-      break;
     case REAL_ARG_CAPS:
       g_value_set_boxed (value, GST_PAD_CAPS (GST_REAL_PAD (object)));
       break;
@@ -451,7 +440,7 @@ gboolean
 gst_pad_set_active (GstPad * pad, GstActivateMode mode)
 {
   GstRealPad *realpad;
-  gboolean old;
+  GstActivateMode old;
   GstPadActivateFunction activatefunc;
   gboolean active;
 
@@ -460,17 +449,17 @@ gst_pad_set_active (GstPad * pad, GstActivateMode mode)
   GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
 
   active = GST_PAD_MODE_ACTIVATE (mode);
-  old = GST_PAD_IS_ACTIVE (realpad);
+  old = GST_RPAD_ACTIVATE_MODE (realpad);
 
   /* if nothing changed, we can just exit */
-  if (G_UNLIKELY (old == active))
+  if (G_UNLIKELY (old == mode))
     goto was_ok;
 
   /* make sure data is disallowed when going inactive */
   if (!active) {
     GST_CAT_DEBUG (GST_CAT_PADS, "de-activating pad %s:%s",
         GST_DEBUG_PAD_NAME (realpad));
-    GST_FLAG_UNSET (realpad, GST_PAD_ACTIVE);
+    GST_RPAD_SET_FLUSHING (realpad);
     /* unlock blocked pads so element can resume and stop */
     GST_PAD_BLOCK_SIGNAL (realpad);
   }
@@ -510,16 +499,24 @@ gst_pad_set_active (GstPad * pad, GstActivateMode mode)
     GST_LOCK (realpad);
     if (result == FALSE)
       goto activate_error;
+
+    /* store the mode */
+    GST_RPAD_ACTIVATE_MODE (realpad) = mode;
   }
 
   /* when going to active allow data passing now */
   if (active) {
     GST_CAT_DEBUG (GST_CAT_PADS, "activating pad %s:%s in mode %d",
         GST_DEBUG_PAD_NAME (realpad), mode);
-    GST_FLAG_SET (realpad, GST_PAD_ACTIVE);
-  }
-  GST_UNLOCK (realpad);
+    GST_RPAD_UNSET_FLUSHING (realpad);
+    GST_UNLOCK (realpad);
+  } else {
+    GST_UNLOCK (realpad);
 
+    /* and make streaming finish */
+    GST_STREAM_LOCK (realpad);
+    GST_STREAM_UNLOCK (realpad);
+  }
   return TRUE;
 
 was_ok:
@@ -607,7 +604,7 @@ gst_pad_is_active (GstPad * pad)
   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
 
   GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
-  result = GST_FLAG_IS_SET (realpad, GST_PAD_ACTIVE);
+  result = GST_PAD_MODE_ACTIVATE (GST_RPAD_ACTIVATE_MODE (realpad));
   GST_UNLOCK (realpad);
 
   return result;
@@ -933,7 +930,7 @@ gst_pad_get_query_types (GstPad * pad)
   if (G_UNLIKELY ((func = GST_RPAD_QUERYTYPEFUNC (rpad)) == NULL))
     goto no_func;
 
-  return func (GST_PAD (rpad));
+  return func (GST_PAD_CAST (rpad));
 
 no_func:
   {
@@ -1187,10 +1184,10 @@ gst_pad_unlink (GstPad * srcpad, GstPad * sinkpad)
     goto not_linked_together;
 
   if (GST_RPAD_UNLINKFUNC (realsrc)) {
-    GST_RPAD_UNLINKFUNC (realsrc) (GST_PAD (realsrc));
+    GST_RPAD_UNLINKFUNC (realsrc) (GST_PAD_CAST (realsrc));
   }
   if (GST_RPAD_UNLINKFUNC (realsink)) {
-    GST_RPAD_UNLINKFUNC (realsink) (GST_PAD (realsink));
+    GST_RPAD_UNLINKFUNC (realsink) (GST_PAD_CAST (realsink));
   }
 
   /* first clear peers */
@@ -1329,7 +1326,8 @@ gst_pad_link_prepare (GstPad * srcpad, GstPad * sinkpad,
   if (G_UNLIKELY (GST_RPAD_PEER (realsink) != NULL))
     goto sink_was_linked;
 
-  if ((GST_PAD (realsrc) != srcpad) || (GST_PAD (realsink) != sinkpad)) {
+  if ((GST_PAD_CAST (realsrc) != srcpad)
+      || (GST_PAD_CAST (realsink) != sinkpad)) {
     GST_CAT_INFO (GST_CAT_PADS, "*actually* linking %s:%s and %s:%s",
         GST_DEBUG_PAD_NAME (realsrc), GST_DEBUG_PAD_NAME (realsink));
   }
@@ -1427,12 +1425,14 @@ gst_pad_link (GstPad * srcpad, GstPad * sinkpad)
   if (GST_RPAD_LINKFUNC (realsrc)) {
     /* this one will call the peer link function */
     result =
-        GST_RPAD_LINKFUNC (realsrc) (GST_PAD (realsrc), GST_PAD (realsink));
+        GST_RPAD_LINKFUNC (realsrc) (GST_PAD_CAST (realsrc),
+        GST_PAD_CAST (realsink));
   } else if (GST_RPAD_LINKFUNC (realsink)) {
     /* if no source link function, we need to call the sink link
      * function ourselves. */
     result =
-        GST_RPAD_LINKFUNC (realsink) (GST_PAD (realsink), GST_PAD (realsrc));
+        GST_RPAD_LINKFUNC (realsink) (GST_PAD_CAST (realsink),
+        GST_PAD_CAST (realsrc));
   } else {
     result = GST_PAD_LINK_OK;
   }
@@ -1530,7 +1530,7 @@ gst_pad_get_real_parent (GstPad * pad)
   GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
   element = GST_PAD_PARENT (realpad);
   if (element)
-    gst_object_ref (GST_OBJECT (element));
+    gst_object_ref (GST_OBJECT_CAST (element));
   GST_UNLOCK (realpad);
 
   return element;
@@ -1586,7 +1586,7 @@ gst_real_pad_get_caps_unlocked (GstRealPad * realpad)
 
     GST_FLAG_SET (realpad, GST_PAD_IN_GETCAPS);
     GST_UNLOCK (realpad);
-    result = GST_RPAD_GETCAPSFUNC (realpad) (GST_PAD (realpad));
+    result = GST_RPAD_GETCAPSFUNC (realpad) (GST_PAD_CAST (realpad));
     GST_LOCK (realpad);
     GST_FLAG_UNSET (realpad, GST_PAD_IN_GETCAPS);
 
@@ -1725,12 +1725,12 @@ gst_pad_peer_get_caps (GstPad * pad)
   if (G_UNLIKELY (GST_RPAD_IS_IN_GETCAPS (peerpad)))
     goto was_dispatching;
 
-  gst_object_ref (GST_OBJECT (peerpad));
+  gst_object_ref (GST_OBJECT_CAST (peerpad));
   GST_UNLOCK (realpad);
 
   result = gst_pad_get_caps (GST_PAD_CAST (peerpad));
 
-  gst_object_unref (GST_OBJECT (peerpad));
+  gst_object_unref (GST_OBJECT_CAST (peerpad));
 
   return result;
 
@@ -2029,7 +2029,7 @@ gst_pad_get_peer (GstPad * pad)
   GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
   result = GST_RPAD_PEER (realpad);
   if (result)
-    gst_object_ref (GST_OBJECT (result));
+    gst_object_ref (GST_OBJECT_CAST (result));
   GST_UNLOCK (realpad);
 
   return GST_PAD_CAST (result);
@@ -2072,11 +2072,11 @@ gst_pad_realize (GstPad * pad)
   GST_LOCK (pad);
   result = GST_PAD_REALIZE (pad);
   if (result && pad != GST_PAD_CAST (result)) {
-    gst_object_ref (GST_OBJECT (result));
+    gst_object_ref (GST_OBJECT_CAST (result));
     GST_UNLOCK (pad);
     /* no other thread could dispose this since we
      * hold at least one ref */
-    gst_object_unref (GST_OBJECT (pad));
+    gst_object_unref (GST_OBJECT_CAST (pad));
   } else {
     GST_UNLOCK (pad);
   }
@@ -2385,7 +2385,7 @@ gst_pad_event_default_dispatch (GstPad * pad, GstEvent * event)
   orig = pads = gst_pad_get_internal_links (pad);
 
   while (pads) {
-    GstPad *eventpad = GST_PAD (pads->data);
+    GstPad *eventpad = GST_PAD_CAST (pads->data);
 
     pads = g_list_next (pads);
 
@@ -2444,7 +2444,7 @@ gst_pad_event_default (GstPad * pad, GstEvent * event)
 
       if (GST_RPAD_TASK (rpad)) {
         GST_DEBUG_OBJECT (rpad, "pausing task because of eos");
-        gst_task_pause (GST_RPAD_TASK (rpad));
+        gst_pad_pause_task (GST_PAD_CAST (rpad));
       }
     }
     default:
@@ -2484,7 +2484,7 @@ gst_pad_dispatcher (GstPad * pad, GstPadDispatcherFunction dispatch,
     GstRealPad *int_peer = GST_RPAD_PEER (int_rpad);
 
     if (int_peer) {
-      res = dispatch (GST_PAD (int_peer), data);
+      res = dispatch (GST_PAD_CAST (int_peer), data);
       if (res)
         break;
     }
@@ -2561,7 +2561,7 @@ gst_real_pad_dispose (GObject * object)
   GstPad *pad;
   GstRealPad *rpad;
 
-  pad = GST_PAD (object);
+  pad = GST_PAD_CAST (object);
   rpad = GST_REAL_PAD (object);
 
   /* No linked pad can ever be disposed.
@@ -2582,7 +2582,7 @@ gst_real_pad_dispose (GObject * object)
     orig = ghostpads = g_list_copy (rpad->ghostpads);
 
     while (ghostpads) {
-      GstPad *ghostpad = GST_PAD (ghostpads->data);
+      GstPad *ghostpad = GST_PAD_CAST (ghostpads->data);
 
       if (GST_IS_ELEMENT (GST_OBJECT_PARENT (ghostpad))) {
         GstElement *parent = GST_ELEMENT (GST_OBJECT_PARENT (ghostpad));
@@ -2738,7 +2738,7 @@ gst_pad_save_thyself (GstObject * object, xmlNodePtr parent)
   if (GST_RPAD_PEER (realpad) != NULL) {
     gchar *content;
 
-    peer = GST_PAD (GST_RPAD_PEER (realpad));
+    peer = GST_PAD_CAST (GST_RPAD_PEER (realpad));
     /* first check to see if the peer's parent's parent is the same */
     /* we just save it off */
     content = g_strdup_printf ("%s.%s",
@@ -2793,7 +2793,7 @@ handle_pad_block (GstRealPad * pad)
       "signal block taken on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
 
   /* need to grab extra ref for the callbacks */
-  gst_object_ref (GST_OBJECT (pad));
+  gst_object_ref (GST_OBJECT_CAST (pad));
 
   callback = pad->block_callback;
   if (callback) {
@@ -2820,115 +2820,88 @@ handle_pad_block (GstRealPad * pad)
     GST_PAD_BLOCK_SIGNAL (pad);
   }
 
-  gst_object_unref (GST_OBJECT (pad));
+  gst_object_unref (GST_OBJECT_CAST (pad));
 }
 
 /**********************************************************************
  * Data passing functions
  */
 
-
 /**
- * gst_pad_push:
- * @pad: a source #GstPad.
- * @buffer: the #GstBuffer to push.
+ * gst_pad_chain:
+ * @pad: a sink #GstPad.
+ * @buffer: the #GstBuffer to send.
  *
- * Pushes a buffer to the peer of @pad. @pad must be linked.
+ * Chain a buffer to @pad. The pad has to be a GstRealPad.
  *
- * Returns: a #GstFlowReturn from the peer pad.
+ * Returns: a #GstFlowReturn from the pad.
  *
  * MT safe.
  */
 GstFlowReturn
-gst_pad_push (GstPad * pad, GstBuffer * buffer)
+gst_pad_chain (GstPad * pad, GstBuffer * buffer)
 {
-  GstRealPad *peer;
-  GstFlowReturn ret;
-  GstPadChainFunction chainfunc;
   GstCaps *caps;
   gboolean caps_changed;
+  GstPadChainFunction chainfunc;
+  GstFlowReturn ret;
 
   g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR);
-  g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SRC,
+  g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK,
       GST_FLOW_ERROR);
   g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
 
+  GST_STREAM_LOCK (pad);
 
   GST_LOCK (pad);
-  while (G_UNLIKELY (GST_RPAD_IS_BLOCKED (pad)))
-    handle_pad_block (GST_REAL_PAD_CAST (pad));
-
-  if (G_UNLIKELY ((peer = GST_RPAD_PEER (pad)) == NULL))
-    goto not_linked;
-
-  if (G_UNLIKELY (!GST_RPAD_IS_ACTIVE (peer)))
-    goto not_active;
-
-  if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (peer)))
+  if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (pad)))
     goto flushing;
 
-  gst_object_ref (GST_OBJECT_CAST (peer));
+  caps = GST_BUFFER_CAPS (buffer);
+  caps_changed = caps && caps != GST_RPAD_CAPS (pad);
   GST_UNLOCK (pad);
 
-  /* FIXME, move capnego this into a base class? */
-  caps = GST_BUFFER_CAPS (buffer);
-  caps_changed = caps && caps != GST_RPAD_CAPS (peer);
-  /* we got a new datatype on the peer pad, see if it can handle it */
+  /* we got a new datatype on the pad, see if it can handle it */
   if (G_UNLIKELY (caps_changed)) {
     GST_DEBUG ("caps changed to %" GST_PTR_FORMAT, caps);
-    if (G_UNLIKELY (!gst_pad_configure_sink (GST_PAD_CAST (peer), caps)))
+    if (G_UNLIKELY (!gst_pad_configure_sink (pad, caps)))
       goto not_negotiated;
   }
 
-  /* NOTE: we read the peer chainfunc unlocked. 
-   * we cannot hold the lock for the peer so we might send
+  /* NOTE: we read the chainfunc unlocked. 
+   * we cannot hold the lock for the pad so we might send
    * the data to the wrong function. This is not really a
    * problem since functions are assigned at creation time
    * and don't change that often... */
-  if (G_UNLIKELY ((chainfunc = peer->chainfunc) == NULL))
+  if (G_UNLIKELY ((chainfunc = GST_RPAD_CHAINFUNC (pad)) == NULL))
     goto no_function;
 
   GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-      "calling chainfunction &%s of peer pad %s:%s",
-      GST_DEBUG_FUNCPTR_NAME (chainfunc), GST_DEBUG_PAD_NAME (peer));
+      "calling chainfunction &%s of pad %s:%s",
+      GST_DEBUG_FUNCPTR_NAME (chainfunc), GST_DEBUG_PAD_NAME (pad));
 
-  ret = chainfunc (GST_PAD_CAST (peer), buffer);
-
-  gst_object_unref (GST_OBJECT_CAST (peer));
+  ret = chainfunc (pad, buffer);
+  GST_STREAM_UNLOCK (pad);
 
   return ret;
 
-  /* ERROR recovery here */
-not_linked:
-  {
-    gst_buffer_unref (buffer);
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-        "pushing, but it was not linked");
-    GST_UNLOCK (pad);
-    return GST_FLOW_NOT_CONNECTED;
-  }
-not_active:
-  {
-    gst_buffer_unref (buffer);
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-        "pushing, but it was inactive");
-    GST_UNLOCK (pad);
-    return GST_FLOW_WRONG_STATE;
-  }
+  /* ERRORS */
 flushing:
   {
     gst_buffer_unref (buffer);
     GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
         "pushing, but pad was flushing");
     GST_UNLOCK (pad);
+    GST_STREAM_UNLOCK (pad);
     return GST_FLOW_UNEXPECTED;
   }
 not_negotiated:
   {
     gst_buffer_unref (buffer);
     GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-        "pushing buffer but peer did not accept");
+        "pushing buffer but pad did not accept");
+    GST_STREAM_UNLOCK (pad);
     return GST_FLOW_NOT_NEGOTIATED;
   }
 no_function:
@@ -2937,14 +2910,64 @@ no_function:
     GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
         "pushing, but not chainhandler");
     GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL),
-        ("push on pad %s:%s but the peer pad %s:%s has no chainfunction",
-            GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (peer)));
-    gst_object_unref (GST_OBJECT (peer));
+        ("push on pad %s:%s but it has no chainfunction",
+            GST_DEBUG_PAD_NAME (pad)));
+    GST_STREAM_UNLOCK (pad);
     return GST_FLOW_ERROR;
   }
 }
 
 /**
+ * gst_pad_push:
+ * @pad: a source #GstPad.
+ * @buffer: the #GstBuffer to push.
+ *
+ * Pushes a buffer to the peer of @pad. @pad must be linked.
+ *
+ * Returns: a #GstFlowReturn from the peer pad.
+ *
+ * MT safe.
+ */
+GstFlowReturn
+gst_pad_push (GstPad * pad, GstBuffer * buffer)
+{
+  GstRealPad *peer;
+  GstFlowReturn ret;
+
+  g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR);
+  g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SRC,
+      GST_FLOW_ERROR);
+  g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
+  g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
+
+  GST_LOCK (pad);
+  while (G_UNLIKELY (GST_RPAD_IS_BLOCKED (pad)))
+    handle_pad_block (GST_REAL_PAD_CAST (pad));
+
+  if (G_UNLIKELY ((peer = GST_RPAD_PEER (pad)) == NULL))
+    goto not_linked;
+
+  gst_object_ref (GST_OBJECT_CAST (peer));
+  GST_UNLOCK (pad);
+
+  ret = gst_pad_chain (GST_PAD_CAST (peer), buffer);
+
+  gst_object_unref (GST_OBJECT_CAST (peer));
+
+  return ret;
+
+  /* ERROR recovery here */
+not_linked:
+  {
+    gst_buffer_unref (buffer);
+    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+        "pushing, but it was not linked");
+    GST_UNLOCK (pad);
+    return GST_FLOW_NOT_CONNECTED;
+  }
+}
+
+/**
  * gst_pad_check_pull_range:
  * @pad: a sink #GstRealPad.
  *
@@ -3010,6 +3033,73 @@ not_connected:
 }
 
 /**
+ * gst_pad_get_range:
+ * @pad: a src #GstPad.
+ * @buffer: a pointer to hold the #GstBuffer.
+ * @offset: The start offset of the buffer
+ * @length: The length of the buffer
+ *
+ * Calls the getrange function of @pad. 
+ *
+ * Returns: a #GstFlowReturn from the pad.
+ *
+ * MT safe.
+ */
+GstFlowReturn
+gst_pad_get_range (GstPad * pad, guint64 offset, guint size,
+    GstBuffer ** buffer)
+{
+  GstFlowReturn ret;
+  GstPadGetRangeFunction getrangefunc;
+
+  g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR);
+  g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SRC,
+      GST_FLOW_ERROR);
+  g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
+
+  GST_STREAM_LOCK (pad);
+
+  GST_LOCK (pad);
+  if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (pad)))
+    goto flushing;
+  GST_UNLOCK (pad);
+
+  if (G_UNLIKELY ((getrangefunc = GST_RPAD_GETRANGEFUNC (pad)) == NULL))
+    goto no_function;
+
+  GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+      "calling getrangefunc %s of peer pad %s:%s, offset %"
+      G_GUINT64_FORMAT ", size %u",
+      GST_DEBUG_FUNCPTR_NAME (getrangefunc), GST_DEBUG_PAD_NAME (pad),
+      offset, size);
+
+  ret = getrangefunc (pad, offset, size, buffer);
+
+  GST_STREAM_UNLOCK (pad);
+
+  return ret;
+
+  /* ERRORS */
+flushing:
+  {
+    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+        "pulling range, but pad was flushing");
+    GST_UNLOCK (pad);
+    GST_STREAM_UNLOCK (pad);
+    return GST_FLOW_UNEXPECTED;
+  }
+no_function:
+  {
+    GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL),
+        ("pullrange on pad %s:%s but it has no getrangefunction",
+            GST_DEBUG_PAD_NAME (pad)));
+    GST_STREAM_UNLOCK (pad);
+    return GST_FLOW_ERROR;
+  }
+}
+
+
+/**
  * gst_pad_pull_range:
  * @pad: a sink #GstPad.
  * @buffer: a pointer to hold the #GstBuffer.
@@ -3028,7 +3118,6 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
 {
   GstRealPad *peer;
   GstFlowReturn ret;
-  GstPadGetRangeFunction getrangefunc;
 
   g_return_val_if_fail (GST_IS_REAL_PAD (pad), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK,
@@ -3043,26 +3132,10 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
   if (G_UNLIKELY ((peer = GST_RPAD_PEER (pad)) == NULL))
     goto not_connected;
 
-  if (G_UNLIKELY (!GST_RPAD_IS_ACTIVE (peer)))
-    goto not_active;
-
-  if (G_UNLIKELY (GST_RPAD_IS_FLUSHING (peer)))
-    goto flushing;
-
   gst_object_ref (GST_OBJECT_CAST (peer));
   GST_UNLOCK (pad);
 
-  /* see note in above function */
-  if (G_UNLIKELY ((getrangefunc = peer->getrangefunc) == NULL))
-    goto no_function;
-
-  GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-      "calling getrangefunc %s of peer pad %s:%s, offset %"
-      G_GUINT64_FORMAT ", size %u",
-      GST_DEBUG_FUNCPTR_NAME (getrangefunc), GST_DEBUG_PAD_NAME (peer),
-      offset, size);
-
-  ret = getrangefunc (GST_PAD_CAST (peer), offset, size, buffer);
+  ret = gst_pad_get_range (GST_PAD_CAST (peer), offset, size, buffer);
 
   gst_object_unref (GST_OBJECT_CAST (peer));
 
@@ -3076,28 +3149,6 @@ not_connected:
     GST_UNLOCK (pad);
     return GST_FLOW_NOT_CONNECTED;
   }
-not_active:
-  {
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-        "pulling range, but it was inactive");
-    GST_UNLOCK (pad);
-    return GST_FLOW_WRONG_STATE;
-  }
-flushing:
-  {
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-        "pulling range, but pad was flushing");
-    GST_UNLOCK (pad);
-    return GST_FLOW_UNEXPECTED;
-  }
-no_function:
-  {
-    GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL),
-        ("pullrange on pad %s:%s but the peer pad %s:%s has no getrangefunction",
-            GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (peer)));
-    gst_object_unref (GST_OBJECT (peer));
-    return GST_FLOW_ERROR;
-  }
 }
 
 /**
@@ -3105,7 +3156,9 @@ no_function:
  * @pad: a #GstPad to push the event to.
  * @event: the #GstEvent to send to the pad.
  *
- * Sends the event to the peer of the given pad.
+ * Sends the event to the peer of the given pad. This function is
+ * mainly used by elements to send events to their peer
+ * elements.
  *
  * Returns: TRUE if the event was handled.
  *
@@ -3157,47 +3210,73 @@ gboolean
 gst_pad_send_event (GstPad * pad, GstEvent * event)
 {
   gboolean result = FALSE;
-  GstRealPad *rpad;
+  GstRealPad *realpad;
   GstPadEventFunction eventfunc;
 
   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
   g_return_val_if_fail (event != NULL, FALSE);
 
-  rpad = GST_PAD_REALIZE (pad);
+  GST_PAD_REALIZE_AND_LOCK (pad, realpad, lost_ghostpad);
 
   if (GST_EVENT_SRC (event) == NULL)
-    GST_EVENT_SRC (event) = gst_object_ref (GST_OBJECT (rpad));
+    GST_EVENT_SRC (event) = gst_object_ref (GST_OBJECT_CAST (realpad));
 
   GST_CAT_DEBUG (GST_CAT_EVENT, "have event type %d on pad %s:%s",
-      GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (rpad));
-
-  if (GST_PAD_IS_SINK (pad)) {
-    if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH) {
-      GST_CAT_DEBUG (GST_CAT_EVENT, "have flush event");
-      GST_LOCK (pad);
-      if (GST_EVENT_FLUSH_DONE (event)) {
-        GST_CAT_DEBUG (GST_CAT_EVENT, "clear flush flag");
-        GST_FLAG_UNSET (pad, GST_PAD_FLUSHING);
-      } else {
-        GST_CAT_DEBUG (GST_CAT_EVENT, "set flush flag");
-        GST_FLAG_SET (pad, GST_PAD_FLUSHING);
-      }
-      GST_UNLOCK (pad);
+      GST_EVENT_TYPE (event), GST_DEBUG_PAD_NAME (realpad));
+
+  if (GST_PAD_IS_SINK (realpad)) {
+    switch (GST_EVENT_TYPE (event)) {
+      case GST_EVENT_FLUSH:
+        GST_CAT_DEBUG (GST_CAT_EVENT, "have flush event");
+        if (GST_EVENT_FLUSH_DONE (event)) {
+          GST_RPAD_UNSET_FLUSHING (realpad);
+          GST_CAT_DEBUG (GST_CAT_EVENT, "cleared flush flag");
+        } else {
+          /* can't even accept a flush begin event when flushing */
+          if (GST_RPAD_IS_FLUSHING (realpad))
+            goto flushing;
+          GST_RPAD_SET_FLUSHING (realpad);
+          GST_CAT_DEBUG (GST_CAT_EVENT, "set flush flag");
+        }
+        break;
+      default:
+        if (GST_RPAD_IS_FLUSHING (realpad))
+          goto flushing;
+        break;
     }
   }
 
-  if ((eventfunc = GST_RPAD_EVENTFUNC (rpad)) == NULL)
+  if ((eventfunc = GST_RPAD_EVENTFUNC (realpad)) == NULL)
     goto no_function;
 
-  result = eventfunc (GST_PAD_CAST (rpad), event);
+  gst_object_ref (GST_OBJECT_CAST (realpad));
+  GST_UNLOCK (realpad);
+
+  result = eventfunc (GST_PAD_CAST (realpad), event);
+
+  gst_object_unref (GST_OBJECT_CAST (realpad));
 
   return result;
 
   /* ERROR handling */
+lost_ghostpad:
+  {
+    GST_CAT_DEBUG (GST_CAT_EVENT, "lost ghostpad");
+    gst_event_unref (event);
+    return FALSE;
+  }
 no_function:
   {
     g_warning ("pad %s:%s has no event handler, file a bug.",
-        GST_DEBUG_PAD_NAME (rpad));
+        GST_DEBUG_PAD_NAME (realpad));
+    GST_UNLOCK (realpad);
+    gst_event_unref (event);
+    return FALSE;
+  }
+flushing:
+  {
+    GST_UNLOCK (realpad);
+    GST_CAT_DEBUG (GST_CAT_EVENT, "received event on flushing pad");
     gst_event_unref (event);
     return FALSE;
   }
@@ -3444,24 +3523,107 @@ gst_pad_get_element_private (GstPad * pad)
 }
 
 gboolean
-gst_pad_start_task (GstPad * pad)
+gst_pad_start_task (GstPad * pad, GstTaskFunction func, gpointer data)
 {
-  g_warning ("implement gst_pad_start_task()");
-  return FALSE;
+  GstElement *parent;
+  GstScheduler *sched;
+  GstTask *task;
+
+  g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
+  g_return_val_if_fail (func != NULL, FALSE);
+
+  GST_LOCK (pad);
+  parent = GST_PAD_PARENT (pad);
+
+  if (parent == NULL || !GST_IS_ELEMENT (parent))
+    goto no_parent;
+
+  sched = GST_ELEMENT_SCHEDULER (parent);
+  if (sched == NULL)
+    goto no_sched;
+
+  task = GST_RPAD_TASK (pad);
+  if (task == NULL) {
+    task = gst_scheduler_create_task (sched, func, data);
+    gst_task_set_lock (task, GST_STREAM_GET_LOCK (pad));
+
+    GST_RPAD_TASK (pad) = task;
+  }
+  GST_UNLOCK (pad);
+
+  gst_task_start (task);
+
+  return TRUE;
+
+  /* ERRORS */
+no_parent:
+  {
+    GST_UNLOCK (pad);
+    GST_DEBUG ("no parent");
+    return FALSE;
+  }
+no_sched:
+  {
+    GST_UNLOCK (pad);
+    GST_DEBUG ("no scheduler");
+    return FALSE;
+  }
 }
 
 gboolean
 gst_pad_pause_task (GstPad * pad)
 {
-  g_warning ("implement gst_pad_pause_task()");
-  return FALSE;
+  GstTask *task;
+
+  g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
+
+  GST_LOCK (pad);
+  task = GST_RPAD_TASK (pad);
+  if (task == NULL)
+    goto no_task;
+  gst_task_pause (task);
+  GST_UNLOCK (pad);
+
+  GST_STREAM_LOCK (pad);
+  GST_STREAM_UNLOCK (pad);
+
+  return TRUE;
+
+no_task:
+  {
+    GST_UNLOCK (pad);
+    return TRUE;
+  }
 }
 
 gboolean
 gst_pad_stop_task (GstPad * pad)
 {
-  g_warning ("implement gst_pad_stop_task()");
-  return FALSE;
+  GstTask *task;
+
+  g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
+
+  GST_LOCK (pad);
+  task = GST_RPAD_TASK (pad);
+  if (task == NULL)
+    goto no_task;
+  GST_RPAD_TASK (pad) = NULL;
+  GST_UNLOCK (pad);
+
+  gst_task_stop (task);
+
+  GST_STREAM_LOCK (pad);
+  GST_STREAM_UNLOCK (pad);
+
+  gst_object_unref (GST_OBJECT_CAST (task));
+
+  return TRUE;
+
+no_task:
+  {
+    GST_UNLOCK (pad);
+    return TRUE;
+  }
 }
 
 
index d8fcd9f..915db75 100644 (file)
@@ -174,8 +174,7 @@ typedef enum {
 } GstPadDirection;
 
 typedef enum {
-  GST_PAD_ACTIVE               = GST_OBJECT_FLAG_LAST,
-  GST_PAD_BLOCKED,
+  GST_PAD_BLOCKED              = GST_OBJECT_FLAG_LAST,
   GST_PAD_FLUSHING,
   GST_PAD_IN_GETCAPS,
   GST_PAD_IN_SETCAPS,
@@ -242,6 +241,8 @@ struct _GstRealPad {
   GstPadGetRangeFunction        getrangefunc;
   GstPadEventFunction           eventfunc;
 
+  GstActivateMode               mode;
+
   /* ghostpads */
   GList                        *ghostpads;
   guint32                       ghostpads_cookie;
@@ -304,6 +305,7 @@ struct _GstGhostPadClass {
 #define GST_RPAD_CHECKGETRANGEFUNC(pad)        (GST_REAL_PAD_CAST(pad)->checkgetrangefunc)
 #define GST_RPAD_GETRANGEFUNC(pad)     (GST_REAL_PAD_CAST(pad)->getrangefunc)
 #define GST_RPAD_EVENTFUNC(pad)                (GST_REAL_PAD_CAST(pad)->eventfunc)
+#define GST_RPAD_ACTIVATE_MODE(pad)    (GST_REAL_PAD_CAST(pad)->mode)
 #define GST_RPAD_QUERYTYPEFUNC(pad)    (GST_REAL_PAD_CAST(pad)->querytypefunc)
 #define GST_RPAD_QUERYFUNC(pad)                (GST_REAL_PAD_CAST(pad)->queryfunc)
 #define GST_RPAD_INTLINKFUNC(pad)      (GST_REAL_PAD_CAST(pad)->intlinkfunc)
@@ -321,16 +323,18 @@ struct _GstGhostPadClass {
 #define GST_RPAD_BUFFERALLOCFUNC(pad)  (GST_REAL_PAD_CAST(pad)->bufferallocfunc)
 
 #define GST_RPAD_IS_LINKED(pad)                (GST_RPAD_PEER(pad) != NULL)
-#define GST_RPAD_IS_ACTIVE(pad)                (GST_FLAG_IS_SET (pad, GST_PAD_ACTIVE))
 #define GST_RPAD_IS_BLOCKED(pad)       (GST_FLAG_IS_SET (pad, GST_PAD_BLOCKED))
 #define GST_RPAD_IS_FLUSHING(pad)      (GST_FLAG_IS_SET (pad, GST_PAD_FLUSHING))
 #define GST_RPAD_IS_IN_GETCAPS(pad)    (GST_FLAG_IS_SET (pad, GST_PAD_IN_GETCAPS))
 #define GST_RPAD_IS_IN_SETCAPS(pad)    (GST_FLAG_IS_SET (pad, GST_PAD_IN_SETCAPS))
 #define GST_RPAD_IS_USABLE(pad)                (GST_RPAD_IS_LINKED (pad) && \
-                                        GST_RPAD_IS_ACTIVE(pad) && GST_RPAD_IS_ACTIVE(GST_RPAD_PEER (pad)))
+                                        !GST_RPAD_IS_FLUSHING(pad) && !GST_RPAD_IS_FLUSHING(GST_RPAD_PEER (pad)))
 #define GST_RPAD_IS_SRC(pad)           (GST_RPAD_DIRECTION(pad) == GST_PAD_SRC)
 #define GST_RPAD_IS_SINK(pad)          (GST_RPAD_DIRECTION(pad) == GST_PAD_SINK)
 
+#define GST_RPAD_SET_FLUSHING(pad)     (GST_FLAG_SET (pad, GST_PAD_FLUSHING))
+#define GST_RPAD_UNSET_FLUSHING(pad)   (GST_FLAG_UNSET (pad, GST_PAD_FLUSHING))
+
 #define GST_STREAM_GET_LOCK(pad)        (GST_PAD_REALIZE(pad)->stream_rec_lock)
 #define GST_STREAM_LOCK(pad)            (g_static_rec_mutex_lock(GST_STREAM_GET_LOCK(pad)))
 #define GST_STREAM_TRYLOCK(pad)         (g_static_rec_mutex_trylock(GST_STREAM_GET_LOCK(pad)))
@@ -360,9 +364,10 @@ struct _GstGhostPadClass {
 #define GST_PAD_CAPS(pad)              GST_RPAD_CAPS(GST_PAD_REALIZE (pad))
 #define GST_PAD_PEER(pad)              GST_PAD_CAST(GST_RPAD_PEER(GST_PAD_REALIZE(pad)))
 
+#define GST_PAD_TASK(pad)              GST_RPAD_TASK(pad)
+
 /* Some check functions (unused?) */
 #define GST_PAD_IS_LINKED(pad)         (GST_RPAD_IS_LINKED(GST_PAD_REALIZE(pad)))
-#define GST_PAD_IS_ACTIVE(pad)         (GST_RPAD_IS_ACTIVE(GST_PAD_REALIZE(pad)))
 #define GST_PAD_IS_BLOCKED(pad)                (GST_RPAD_IS_BLOCKED(GST_PAD_REALIZE(pad)))
 #define GST_PAD_IS_FLUSHING(pad)       (GST_RPAD_IS_FLUSHING(GST_PAD_REALIZE(pad)))
 #define GST_PAD_IS_IN_GETCAPS(pad)     (GST_RPAD_IS_IN_GETCAPS(GST_PAD_REALIZE(pad)))
@@ -509,17 +514,23 @@ gboolean                  gst_pad_peer_accept_caps                (GstPad * pad, GstCaps *caps);
 GstCaps *              gst_pad_get_allowed_caps                (GstPad * srcpad);
 GstCaps *              gst_pad_get_negotiated_caps             (GstPad * pad);
 
-/* data passing functions */
+/* data passing functions to peer */
 GstFlowReturn          gst_pad_push                            (GstPad *pad, GstBuffer *buffer);
 gboolean               gst_pad_check_pull_range                (GstPad *pad);
 GstFlowReturn          gst_pad_pull_range                      (GstPad *pad, guint64 offset, guint size,
                                                                 GstBuffer **buffer);
 gboolean               gst_pad_push_event                      (GstPad *pad, GstEvent *event);
-gboolean               gst_pad_send_event                      (GstPad *pad, GstEvent *event);
 gboolean               gst_pad_event_default                   (GstPad *pad, GstEvent *event);
 
+/* data passing functions on pad */
+GstFlowReturn          gst_pad_chain                           (GstPad *pad, GstBuffer *buffer);
+GstFlowReturn          gst_pad_get_range                       (GstPad *pad, guint64 offset, guint size,
+                                                                GstBuffer **buffer);
+gboolean               gst_pad_send_event                      (GstPad *pad, GstEvent *event);
+
 /* pad tasks */
-gboolean               gst_pad_start_task                      (GstPad *pad);
+gboolean               gst_pad_start_task                      (GstPad *pad, GstTaskFunction func,
+                                                                gpointer data);
 gboolean               gst_pad_pause_task                      (GstPad *pad);
 gboolean               gst_pad_stop_task                       (GstPad *pad);
 
index 2fd8f6e..256d0d1 100644 (file)
@@ -483,9 +483,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
       /* forward event */
       gst_pad_event_default (pad, event);
       if (GST_EVENT_FLUSH_DONE (event)) {
-        GST_STREAM_LOCK (queue->srcpad);
-        gst_task_start (GST_RPAD_TASK (queue->srcpad));
-        GST_STREAM_UNLOCK (queue->srcpad);
+        gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
+            queue->srcpad);
       } else {
         /* now unblock the chain function */
         GST_QUEUE_MUTEX_LOCK;
@@ -498,10 +497,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
         g_cond_signal (queue->item_add);
 
         /* make sure it stops */
-        GST_STREAM_LOCK (queue->srcpad);
-        gst_task_pause (GST_RPAD_TASK (queue->srcpad));
+        gst_pad_pause_task (queue->srcpad);
         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
-        GST_STREAM_UNLOCK (queue->srcpad);
       }
       goto done;
     case GST_EVENT_EOS:
@@ -555,8 +552,6 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
 
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
 
-  GST_STREAM_LOCK (pad);
-
   /* we have to lock the queue since we span threads */
   GST_QUEUE_MUTEX_LOCK;
 
@@ -633,6 +628,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
           STATUS (queue, "waiting for item_del signal from thread using qlock");
           g_cond_wait (queue->item_del, queue->qlock);
 
+          if (GST_RPAD_IS_FLUSHING (pad))
+            goto out_flushing;
+
           /* if there's a pending state change for this queue
            * or its manager, switch back to iterator so bottom
            * half of state change executes */
@@ -663,13 +661,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
   g_cond_signal (queue->item_add);
   GST_QUEUE_MUTEX_UNLOCK;
-  GST_STREAM_UNLOCK (pad);
 
   return GST_FLOW_OK;
 
 out_unref:
   GST_QUEUE_MUTEX_UNLOCK;
-  GST_STREAM_UNLOCK (pad);
 
   gst_buffer_unref (buffer);
 
@@ -678,8 +674,7 @@ out_unref:
 out_flushing:
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
   GST_QUEUE_MUTEX_UNLOCK;
-  gst_task_pause (GST_RPAD_TASK (queue->srcpad));
-  GST_STREAM_UNLOCK (pad);
+  gst_pad_pause_task (queue->srcpad);
 
   gst_buffer_unref (buffer);
 
@@ -695,8 +690,6 @@ gst_queue_loop (GstPad * pad)
 
   queue = GST_QUEUE (GST_PAD_PARENT (pad));
 
-  GST_STREAM_LOCK (pad);
-
   /* have to lock for thread-safety */
   GST_QUEUE_MUTEX_LOCK;
 
@@ -770,14 +763,12 @@ restart:
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
   g_cond_signal (queue->item_del);
   GST_QUEUE_MUTEX_UNLOCK;
-  GST_STREAM_UNLOCK (pad);
   return;
 
 out_flushing:
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
-  gst_task_pause (GST_RPAD_TASK (pad));
+  gst_pad_pause_task (pad);
   GST_QUEUE_MUTEX_UNLOCK;
-  GST_STREAM_UNLOCK (pad);
   return;
 }
 
@@ -860,17 +851,7 @@ gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
 
   if (mode == GST_ACTIVATE_PUSH) {
-    /* if we have a scheduler we can start the task */
-    if (GST_ELEMENT_SCHEDULER (queue)) {
-      GST_STREAM_LOCK (pad);
-      GST_RPAD_TASK (pad) =
-          gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (queue),
-          (GstTaskFunction) gst_queue_loop, pad);
-
-      gst_task_start (GST_RPAD_TASK (pad));
-      GST_STREAM_UNLOCK (pad);
-      result = TRUE;
-    }
+    result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
   } else {
     /* step 1, unblock chain and loop functions */
     GST_QUEUE_MUTEX_LOCK;
@@ -879,13 +860,7 @@ gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
     GST_QUEUE_MUTEX_UNLOCK;
 
     /* step 2, make sure streaming finishes */
-    GST_STREAM_LOCK (pad);
-    /* step 3, stop the task */
-    gst_task_stop (GST_RPAD_TASK (pad));
-    gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
-    GST_STREAM_UNLOCK (pad);
-
-    result = TRUE;
+    result = gst_pad_stop_task (pad);
   }
   return result;
 }
index 31ea0f9..332724b 100644 (file)
@@ -72,6 +72,7 @@ gst_task_class_init (GstTaskClass * klass)
 static void
 gst_task_init (GstTask * task)
 {
+  task->lock = NULL;
   task->cond = g_cond_new ();
   task->state = GST_TASK_STOPPED;
 }
@@ -108,6 +109,24 @@ gst_task_create (GstTaskFunction func, gpointer data)
 }
 
 /**
+ * gst_task_set_lock:
+ * @task: The #GstTask to use
+ * @mutex: The GMutex to use
+ *
+ * Set the mutex used by the task.
+ *
+ * MT safe.
+ */
+void
+gst_task_set_lock (GstTask * task, GStaticRecMutex * mutex)
+{
+  GST_LOCK (task);
+  task->lock = mutex;
+  GST_UNLOCK (task);
+}
+
+
+/**
  * gst_task_get_state:
  * @task: The #GstTask to query
  *
index 935a4c7..6bed518 100644 (file)
@@ -54,15 +54,22 @@ typedef enum {
 #define GST_TASK_SIGNAL(task)          g_cond_signal(GST_TASK_GET_COND (task))
 #define GST_TASK_BROADCAST(task)       g_cond_breadcast(GST_TASK_GET_COND (task))
 
+#define GST_TASK_GET_LOCK(task)                (GST_TASK_CAST(task)->lock)
+#define GST_TASK_LOCK(task)            g_static_rec_mutex_lock(GST_TASK_GET_LOCK(task))
+#define GST_TASK_UNLOCK(task)          g_static_rec_mutex_unlock(GST_TASK_GET_LOCK(task))
+
 struct _GstTask {
   GstObject      object;
 
-  /*< public >*/ /* with TASK_LOCK */
-  GstTaskState state;
-  GCond *cond;
+
+  /*< public >*/ /* with LOCK */
+  GstTaskState     state;
+  GCond          *cond;
+
+  GStaticRecMutex *lock;
 
   GstTaskFunction func;
-  gpointer data;
+  gpointer      data;
 
   /*< private >*/
   gpointer _gst_reserved[GST_PADDING];
@@ -83,6 +90,7 @@ struct _GstTaskClass {
 GType           gst_task_get_type       (void);
 
 GstTask*       gst_task_create         (GstTaskFunction func, gpointer data);
+void           gst_task_set_lock       (GstTask *task, GStaticRecMutex *mutex);
 
 GstTaskState   gst_task_get_state      (GstTask *task);
 
index 75f68da..1ba67d8 100644 (file)
@@ -142,9 +142,16 @@ gst_thread_scheduler_task_start (GstTask * task)
   GstThreadScheduler *tsched =
       GST_THREAD_SCHEDULER (GST_OBJECT_PARENT (GST_OBJECT (task)));
   GstTaskState old;
+  GStaticRecMutex *lock;
 
   GST_DEBUG_OBJECT (task, "Starting task %p", task);
 
+  if ((lock = GST_TASK_GET_LOCK (task)) == NULL) {
+    lock = g_new (GStaticRecMutex, 1);
+    g_static_rec_mutex_init (lock);
+    GST_TASK_GET_LOCK (task) = lock;
+  }
+
   GST_LOCK (ttask);
   old = GST_TASK_CAST (ttask)->state;
   GST_TASK_CAST (ttask)->state = GST_TASK_STARTED;
@@ -269,11 +276,18 @@ gst_thread_scheduler_func (GstThreadSchedulerTask * ttask,
   GST_DEBUG_OBJECT (sched, "Entering task %p, thread %p", task,
       g_thread_self ());
 
+  /* locking order is TASK_LOCK, LOCK */
+  GST_TASK_LOCK (task);
   GST_LOCK (task);
   while (G_LIKELY (task->state != GST_TASK_STOPPED)) {
     while (G_UNLIKELY (task->state == GST_TASK_PAUSED)) {
+      GST_TASK_UNLOCK (task);
       GST_TASK_SIGNAL (task);
       GST_TASK_WAIT (task);
+      GST_UNLOCK (task);
+      /* locking order.. */
+      GST_TASK_LOCK (task);
+      GST_LOCK (task);
       if (task->state == GST_TASK_STOPPED)
         goto done;
     }
@@ -285,6 +299,7 @@ gst_thread_scheduler_func (GstThreadSchedulerTask * ttask,
   }
 done:
   GST_UNLOCK (task);
+  GST_TASK_UNLOCK (task);
 
   GST_DEBUG_OBJECT (sched, "Exit task %p, thread %p", task, g_thread_self ());
 
index 993c78c..09239d0 100644 (file)
@@ -160,7 +160,7 @@ gst_adapter_peek (GstAdapter * adapter, guint size)
 
   if (adapter->assembled_size < size) {
     adapter->assembled_size = (size / DEFAULT_SIZE + 1) * DEFAULT_SIZE;
-    GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u\n",
+    GST_DEBUG_OBJECT (adapter, "setting size of internal buffer to %u",
         adapter->assembled_size);
     adapter->assembled_data =
         g_realloc (adapter->assembled_data, adapter->assembled_size);
@@ -198,7 +198,7 @@ gst_adapter_flush (GstAdapter * adapter, guint flush)
   g_return_if_fail (flush > 0);
   g_return_if_fail (flush <= adapter->size);
 
-  GST_LOG_OBJECT (adapter, "flushing %u bytes\n", flush);
+  GST_LOG_OBJECT (adapter, "flushing %u bytes", flush);
   adapter->size -= flush;
   adapter->assembled_len = 0;
   while (flush > 0) {
index e677f0e..b8604c8 100644 (file)
@@ -378,6 +378,8 @@ gst_basesink_preroll_queue_push (GstBaseSink * basesink, GstPad * pad,
   if (basesink->preroll_queue->length == 0) {
     GstBaseSinkClass *bclass = GST_BASESINK_GET_CLASS (basesink);
 
+    GST_DEBUG ("preroll buffer with TS: %" GST_TIME_FORMAT,
+        GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)));
     if (bclass->preroll)
       bclass->preroll (basesink, buffer);
   }
@@ -448,7 +450,7 @@ PrerollReturn
 gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad,
     GstBuffer * buffer)
 {
-  gboolean usable;
+  gboolean flushing;
 
   DEBUG ("finish preroll %p <\n", basesink);
   /* lock order is important */
@@ -461,13 +463,13 @@ gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad,
   gst_element_commit_state (GST_ELEMENT (basesink));
   GST_STATE_UNLOCK (basesink);
 
+  gst_basesink_preroll_queue_push (basesink, pad, buffer);
+
   GST_LOCK (pad);
-  usable = !GST_RPAD_IS_FLUSHING (pad) && GST_RPAD_IS_ACTIVE (pad);
+  flushing = GST_RPAD_IS_FLUSHING (pad);
   GST_UNLOCK (pad);
-  if (!usable)
-    goto unusable;
-
-  gst_basesink_preroll_queue_push (basesink, pad, buffer);
+  if (flushing)
+    goto flushing;
 
   if (basesink->need_preroll)
     goto still_queueing;
@@ -490,7 +492,7 @@ no_preroll:
     GST_STATE_UNLOCK (basesink);
     return PREROLL_PLAYING;
   }
-unusable:
+flushing:
   {
     GST_DEBUG ("pad is flushing");
     GST_PREROLL_UNLOCK (pad);
@@ -726,12 +728,8 @@ gst_basesink_chain (GstPad * pad, GstBuffer * buf)
   g_assert (GST_BASESINK (GST_OBJECT_PARENT (pad))->pad_mode ==
       GST_ACTIVATE_PUSH);
 
-  GST_STREAM_LOCK (pad);
-
   result = gst_basesink_chain_unlocked (pad, buf);
 
-  GST_STREAM_UNLOCK (pad);
-
   return result;
 }
 
@@ -748,8 +746,6 @@ gst_basesink_loop (GstPad * pad)
 
   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
 
-  GST_STREAM_LOCK (pad);
-
   result = gst_pad_pull_range (pad, basesink->offset, DEFAULT_SIZE, &buf);
   if (result != GST_FLOW_OK)
     goto paused;
@@ -759,12 +755,10 @@ gst_basesink_loop (GstPad * pad)
     goto paused;
 
   /* default */
-  GST_STREAM_UNLOCK (pad);
   return;
 
 paused:
-  gst_task_pause (GST_RPAD_TASK (pad));
-  GST_STREAM_UNLOCK (pad);
+  gst_pad_pause_task (pad);
   return;
 }
 
@@ -787,16 +781,8 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode)
       /* if we have a scheduler we can start the task */
       g_return_val_if_fail (basesink->has_loop, FALSE);
       gst_pad_peer_set_active (pad, mode);
-      if (GST_ELEMENT_SCHEDULER (basesink)) {
-        GST_STREAM_LOCK (pad);
-        GST_RPAD_TASK (pad) =
-            gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesink),
-            (GstTaskFunction) gst_basesink_loop, pad);
-
-        gst_task_start (GST_RPAD_TASK (pad));
-        GST_STREAM_UNLOCK (pad);
-        result = TRUE;
-      }
+      result =
+          gst_pad_start_task (pad, (GstTaskFunction) gst_basesink_loop, pad);
       break;
     case GST_ACTIVATE_NONE:
       /* step 1, unblock clock sync (if any) or any other blocking thing */
@@ -816,16 +802,7 @@ gst_basesink_activate (GstPad * pad, GstActivateMode mode)
       GST_PREROLL_UNLOCK (pad);
 
       /* step 2, make sure streaming finishes */
-      GST_STREAM_LOCK (pad);
-      /* step 3, stop the task */
-      if (GST_RPAD_TASK (pad)) {
-        gst_task_stop (GST_RPAD_TASK (pad));
-        gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
-        GST_RPAD_TASK (pad) = NULL;
-      }
-      GST_STREAM_UNLOCK (pad);
-
-      result = TRUE;
+      result = gst_pad_stop_task (pad);
       break;
   }
   basesink->pad_mode = mode;
@@ -911,9 +888,6 @@ gst_basesink_change_state (GstElement * element)
       basesink->have_preroll = FALSE;
       GST_PREROLL_UNLOCK (basesink->sinkpad);
 
-      /* make sure the element is finished processing */
-      GST_STREAM_LOCK (basesink->sinkpad);
-      GST_STREAM_UNLOCK (basesink->sinkpad);
       /* clear EOS state */
       basesink->eos = FALSE;
       break;
index 43ddf5a..5ac5adb 100644 (file)
@@ -333,9 +333,8 @@ gst_basesrc_do_seek (GstBaseSrc * src, GstEvent * event)
   }
 
   /* and restart the task */
-  if (GST_RPAD_TASK (src->srcpad)) {
-    gst_task_start (GST_RPAD_TASK (src->srcpad));
-  }
+  gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_basesrc_loop,
+      src->srcpad);
   GST_STREAM_UNLOCK (src->srcpad);
 
   gst_event_unref (event);
@@ -447,7 +446,7 @@ gst_basesrc_get_property (GObject * object, guint prop_id, GValue * value,
 }
 
 static GstFlowReturn
-gst_basesrc_get_range_unlocked (GstPad * pad, guint64 offset, guint length,
+gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length,
     GstBuffer ** buf)
 {
   GstFlowReturn ret;
@@ -499,21 +498,6 @@ unexpected_length:
   }
 }
 
-static GstFlowReturn
-gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length,
-    GstBuffer ** ret)
-{
-  GstFlowReturn fret;
-
-  GST_STREAM_LOCK (pad);
-
-  fret = gst_basesrc_get_range_unlocked (pad, offset, length, ret);
-
-  GST_STREAM_UNLOCK (pad);
-
-  return fret;
-}
-
 static gboolean
 gst_basesrc_check_get_range (GstPad * pad)
 {
@@ -538,9 +522,7 @@ gst_basesrc_loop (GstPad * pad)
 
   src = GST_BASESRC (GST_OBJECT_PARENT (pad));
 
-  GST_STREAM_LOCK (pad);
-
-  ret = gst_basesrc_get_range_unlocked (pad, src->offset, src->blocksize, &buf);
+  ret = gst_basesrc_get_range (pad, src->offset, src->blocksize, &buf);
   if (ret != GST_FLOW_OK)
     goto eos;
 
@@ -550,22 +532,19 @@ gst_basesrc_loop (GstPad * pad)
   if (ret != GST_FLOW_OK)
     goto pause;
 
-  GST_STREAM_UNLOCK (pad);
   return;
 
 eos:
   {
     GST_DEBUG_OBJECT (src, "going to EOS");
-    gst_task_pause (GST_RPAD_TASK (pad));
+    gst_pad_pause_task (pad);
     gst_pad_push_event (pad, gst_event_new (GST_EVENT_EOS));
-    GST_STREAM_UNLOCK (pad);
     return;
   }
 pause:
   {
     GST_DEBUG_OBJECT (src, "pausing task");
-    gst_task_pause (GST_RPAD_TASK (pad));
-    GST_STREAM_UNLOCK (pad);
+    gst_pad_pause_task (pad);
     return;
   }
 }
@@ -733,17 +712,8 @@ gst_basesrc_activate (GstPad * pad, GstActivateMode mode)
   result = FALSE;
   switch (mode) {
     case GST_ACTIVATE_PUSH:
-      /* if we have a scheduler we can start the task */
-      if (GST_ELEMENT_SCHEDULER (basesrc)) {
-        GST_STREAM_LOCK (pad);
-        GST_RPAD_TASK (pad) =
-            gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (basesrc),
-            (GstTaskFunction) gst_basesrc_loop, pad);
-
-        gst_task_start (GST_RPAD_TASK (pad));
-        GST_STREAM_UNLOCK (pad);
-        result = TRUE;
-      }
+      result =
+          gst_pad_start_task (pad, (GstTaskFunction) gst_basesrc_loop, pad);
       break;
     case GST_ACTIVATE_PULL:
       result = TRUE;
@@ -753,16 +723,7 @@ gst_basesrc_activate (GstPad * pad, GstActivateMode mode)
       gst_basesrc_unlock (basesrc);
 
       /* step 2, make sure streaming finishes */
-      GST_STREAM_LOCK (pad);
-      /* step 3, stop the task */
-      if (GST_RPAD_TASK (pad)) {
-        gst_task_stop (GST_RPAD_TASK (pad));
-        gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
-        GST_RPAD_TASK (pad) = NULL;
-      }
-      GST_STREAM_UNLOCK (pad);
-
-      result = TRUE;
+      result = gst_pad_stop_task (pad);
       break;
   }
   return result;
index 2fd8f6e..256d0d1 100644 (file)
@@ -483,9 +483,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
       /* forward event */
       gst_pad_event_default (pad, event);
       if (GST_EVENT_FLUSH_DONE (event)) {
-        GST_STREAM_LOCK (queue->srcpad);
-        gst_task_start (GST_RPAD_TASK (queue->srcpad));
-        GST_STREAM_UNLOCK (queue->srcpad);
+        gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
+            queue->srcpad);
       } else {
         /* now unblock the chain function */
         GST_QUEUE_MUTEX_LOCK;
@@ -498,10 +497,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
         g_cond_signal (queue->item_add);
 
         /* make sure it stops */
-        GST_STREAM_LOCK (queue->srcpad);
-        gst_task_pause (GST_RPAD_TASK (queue->srcpad));
+        gst_pad_pause_task (queue->srcpad);
         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
-        GST_STREAM_UNLOCK (queue->srcpad);
       }
       goto done;
     case GST_EVENT_EOS:
@@ -555,8 +552,6 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
 
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
 
-  GST_STREAM_LOCK (pad);
-
   /* we have to lock the queue since we span threads */
   GST_QUEUE_MUTEX_LOCK;
 
@@ -633,6 +628,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
           STATUS (queue, "waiting for item_del signal from thread using qlock");
           g_cond_wait (queue->item_del, queue->qlock);
 
+          if (GST_RPAD_IS_FLUSHING (pad))
+            goto out_flushing;
+
           /* if there's a pending state change for this queue
            * or its manager, switch back to iterator so bottom
            * half of state change executes */
@@ -663,13 +661,11 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_add");
   g_cond_signal (queue->item_add);
   GST_QUEUE_MUTEX_UNLOCK;
-  GST_STREAM_UNLOCK (pad);
 
   return GST_FLOW_OK;
 
 out_unref:
   GST_QUEUE_MUTEX_UNLOCK;
-  GST_STREAM_UNLOCK (pad);
 
   gst_buffer_unref (buffer);
 
@@ -678,8 +674,7 @@ out_unref:
 out_flushing:
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
   GST_QUEUE_MUTEX_UNLOCK;
-  gst_task_pause (GST_RPAD_TASK (queue->srcpad));
-  GST_STREAM_UNLOCK (pad);
+  gst_pad_pause_task (queue->srcpad);
 
   gst_buffer_unref (buffer);
 
@@ -695,8 +690,6 @@ gst_queue_loop (GstPad * pad)
 
   queue = GST_QUEUE (GST_PAD_PARENT (pad));
 
-  GST_STREAM_LOCK (pad);
-
   /* have to lock for thread-safety */
   GST_QUEUE_MUTEX_LOCK;
 
@@ -770,14 +763,12 @@ restart:
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
   g_cond_signal (queue->item_del);
   GST_QUEUE_MUTEX_UNLOCK;
-  GST_STREAM_UNLOCK (pad);
   return;
 
 out_flushing:
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
-  gst_task_pause (GST_RPAD_TASK (pad));
+  gst_pad_pause_task (pad);
   GST_QUEUE_MUTEX_UNLOCK;
-  GST_STREAM_UNLOCK (pad);
   return;
 }
 
@@ -860,17 +851,7 @@ gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
 
   if (mode == GST_ACTIVATE_PUSH) {
-    /* if we have a scheduler we can start the task */
-    if (GST_ELEMENT_SCHEDULER (queue)) {
-      GST_STREAM_LOCK (pad);
-      GST_RPAD_TASK (pad) =
-          gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (queue),
-          (GstTaskFunction) gst_queue_loop, pad);
-
-      gst_task_start (GST_RPAD_TASK (pad));
-      GST_STREAM_UNLOCK (pad);
-      result = TRUE;
-    }
+    result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
   } else {
     /* step 1, unblock chain and loop functions */
     GST_QUEUE_MUTEX_LOCK;
@@ -879,13 +860,7 @@ gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
     GST_QUEUE_MUTEX_UNLOCK;
 
     /* step 2, make sure streaming finishes */
-    GST_STREAM_LOCK (pad);
-    /* step 3, stop the task */
-    gst_task_stop (GST_RPAD_TASK (pad));
-    gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
-    GST_STREAM_UNLOCK (pad);
-
-    result = TRUE;
+    result = gst_pad_stop_task (pad);
   }
   return result;
 }
index cd7213c..9806606 100644 (file)
@@ -370,27 +370,10 @@ gst_tee_sink_activate (GstPad * pad, GstActivateMode mode)
       break;
     case GST_ACTIVATE_PULL:
       g_return_val_if_fail (tee->has_sink_loop, FALSE);
-      if (GST_ELEMENT_SCHEDULER (tee)) {
-        GST_STREAM_LOCK (pad);
-        GST_RPAD_TASK (pad) =
-            gst_scheduler_create_task (GST_ELEMENT_SCHEDULER (tee),
-            (GstTaskFunction) gst_tee_loop, pad);
-
-        gst_pad_start_task (pad);
-        GST_STREAM_UNLOCK (pad);
-        result = TRUE;
-      }
+      result = gst_pad_start_task (pad, (GstTaskFunction) gst_tee_loop, pad);
       break;
     case GST_ACTIVATE_NONE:
-      GST_STREAM_LOCK (pad);
-      if (GST_RPAD_TASK (pad)) {
-        gst_pad_stop_task (pad);
-        gst_object_unref (GST_OBJECT (GST_RPAD_TASK (pad)));
-        GST_RPAD_TASK (pad) = NULL;
-      }
-      GST_STREAM_UNLOCK (pad);
-
-      result = TRUE;
+      result = gst_pad_stop_task (pad);
       break;
   }
   tee->sink_mode = mode;