gst/: Removed atomic operations, use existing LOCK.
authorWim Taymans <wim.taymans@gmail.com>
Thu, 30 Jun 2005 09:23:54 +0000 (09:23 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Thu, 30 Jun 2005 09:23:54 +0000 (09:23 +0000)
Original commit message from CVS:
* gst/gstpad.c: (_gst_do_pass_data_accumulator),
(default_have_data), (gst_pad_class_init), (gst_pad_init),
(gst_pad_emit_have_data_signal), (gst_pad_chain), (gst_pad_push),
(gst_pad_check_pull_range), (gst_pad_get_range),
(gst_pad_pull_range), (gst_pad_push_event), (gst_pad_send_event):
* gst/gstpad.h:
* gst/gstutils.c: (gst_atomic_int_set), (gst_pad_add_data_probe),
(gst_pad_add_event_probe), (gst_pad_add_buffer_probe),
(gst_pad_remove_data_probe), (gst_pad_remove_event_probe),
(gst_pad_remove_buffer_probe):
Removed atomic operations, use existing LOCK.
Move exception handling out of main code path.

ChangeLog
gst/gstpad.c
gst/gstpad.h
gst/gstutils.c

index 4d64eab..3084148 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,18 @@
+2005-06-30  Wim Taymans  <wim@fluendo.com>
+
+       * gst/gstpad.c: (_gst_do_pass_data_accumulator),
+       (default_have_data), (gst_pad_class_init), (gst_pad_init),
+       (gst_pad_emit_have_data_signal), (gst_pad_chain), (gst_pad_push),
+       (gst_pad_check_pull_range), (gst_pad_get_range),
+       (gst_pad_pull_range), (gst_pad_push_event), (gst_pad_send_event):
+       * gst/gstpad.h:
+       * gst/gstutils.c: (gst_atomic_int_set), (gst_pad_add_data_probe),
+       (gst_pad_add_event_probe), (gst_pad_add_buffer_probe),
+       (gst_pad_remove_data_probe), (gst_pad_remove_event_probe),
+       (gst_pad_remove_buffer_probe):
+       Removed atomic operations, use existing LOCK.
+       Move exception handling out of main code path.
+
 2005-06-29  Ronald S. Bultje  <rbultje@ronald.bitfreak.net>
 
        * gst/gstpad.c: (_gst_do_pass_data_accumulator),
index 38bf4f6..67e9519 100644 (file)
@@ -125,16 +125,16 @@ static gboolean
 _gst_do_pass_data_accumulator (GSignalInvocationHint * ihint,
     GValue * return_accu, const GValue * handler_return, gpointer dummy)
 {
-  if (!g_value_get_boolean (handler_return)) {
-    g_value_set_boolean (return_accu, FALSE);
-    return FALSE;
-  }
+  gboolean ret = g_value_get_boolean (handler_return);
 
-  return TRUE;
+  GST_DEBUG ("accumulated %d", ret);
+  g_value_set_boolean (return_accu, ret);
+
+  return ret;
 }
 
 static gboolean
-silly_return_true_function (GstPad * pad, GstMiniObject * o)
+default_have_data (GstPad * pad, GstMiniObject * o)
 {
   return TRUE;
 }
@@ -174,7 +174,8 @@ gst_pad_class_init (GstPadClass * klass)
       g_signal_new ("have-data", G_TYPE_FROM_CLASS (klass),
       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstPadClass, have_data),
       _gst_do_pass_data_accumulator,
-      NULL, gst_marshal_BOOLEAN__POINTER, G_TYPE_BOOLEAN, 1, G_TYPE_POINTER);
+      NULL, gst_marshal_BOOLEAN__POINTER, G_TYPE_BOOLEAN, 1,
+      GST_TYPE_MINI_OBJECT);
 
   g_object_class_install_property (G_OBJECT_CLASS (klass), PAD_PROP_CAPS,
       g_param_spec_boxed ("caps", "Caps", "The capabilities of the pad",
@@ -192,7 +193,8 @@ gst_pad_class_init (GstPadClass * klass)
   gstobject_class->save_thyself = GST_DEBUG_FUNCPTR (gst_pad_save_thyself);
 #endif
   gstobject_class->path_string_separator = ".";
-  klass->have_data = silly_return_true_function;
+
+  klass->have_data = default_have_data;
 }
 
 static void
@@ -214,7 +216,8 @@ gst_pad_init (GstPad * pad)
   pad->queryfunc = gst_pad_query_default;
   pad->intlinkfunc = gst_pad_get_internal_links_default;
 
-  pad->emit_buffer_signals = pad->emit_event_signals = 0;
+  pad->do_buffer_signals = 0;
+  pad->do_event_signals = 0;
 
   GST_PAD_UNSET_FLUSHING (pad);
 
@@ -2713,9 +2716,10 @@ GstFlowReturn
 gst_pad_chain (GstPad * pad, GstBuffer * buffer)
 {
   GstCaps *caps;
-  gboolean caps_changed, do_pass = TRUE;
+  gboolean caps_changed;
   GstPadChainFunction chainfunc;
   GstFlowReturn ret;
+  gboolean emit_signal;
 
   g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SINK,
@@ -2731,8 +2735,17 @@ gst_pad_chain (GstPad * pad, GstBuffer * buffer)
 
   caps = GST_BUFFER_CAPS (buffer);
   caps_changed = caps && caps != GST_PAD_CAPS (pad);
+
+  emit_signal = GST_PAD_DO_BUFFER_SIGNALS (pad) > 0;
   GST_UNLOCK (pad);
 
+  /* see if the signal should be emited, we emit before caps nego as
+   * we might drop the buffer and do capsnego for nothing. */
+  if (G_UNLIKELY (emit_signal)) {
+    if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (buffer)))
+      goto dropping;
+  }
+
   /* we got a new datatype on the pad, see if it can handle it */
   if (G_UNLIKELY (caps_changed)) {
     GST_DEBUG ("caps changed to %" GST_PTR_FORMAT, caps);
@@ -2748,21 +2761,11 @@ gst_pad_chain (GstPad * pad, GstBuffer * buffer)
   if (G_UNLIKELY ((chainfunc = GST_PAD_CHAINFUNC (pad)) == NULL))
     goto no_function;
 
-  if (g_atomic_int_get (&pad->emit_buffer_signals) >= 1) {
-    do_pass = gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (buffer));
-  }
+  GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
+      "calling chainfunction &%s of pad %s:%s",
+      GST_DEBUG_FUNCPTR_NAME (chainfunc), GST_DEBUG_PAD_NAME (pad));
 
-  if (do_pass) {
-    GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
-        "calling chainfunction &%s of pad %s:%s",
-        GST_DEBUG_FUNCPTR_NAME (chainfunc), GST_DEBUG_PAD_NAME (pad));
-
-    ret = chainfunc (pad, buffer);
-  } else {
-    GST_DEBUG ("Dropping buffer due to FALSE probe return");
-    gst_buffer_unref (buffer);
-    ret = GST_FLOW_UNEXPECTED;
-  }
+  ret = chainfunc (pad, buffer);
 
   GST_STREAM_UNLOCK (pad);
 
@@ -2778,6 +2781,14 @@ flushing:
     GST_STREAM_UNLOCK (pad);
     return GST_FLOW_UNEXPECTED;
   }
+dropping:
+  {
+    gst_buffer_unref (buffer);
+    GST_DEBUG ("Dropping buffer due to FALSE probe return");
+    GST_STREAM_UNLOCK (pad);
+    /* FIXME, failure? */
+    return GST_FLOW_UNEXPECTED;
+  }
 not_negotiated:
   {
     gst_buffer_unref (buffer);
@@ -2815,7 +2826,7 @@ gst_pad_push (GstPad * pad, GstBuffer * buffer)
 {
   GstPad *peer;
   GstFlowReturn ret;
-  gboolean do_pass = TRUE;
+  gboolean emit_signal;
 
   g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SRC, GST_FLOW_ERROR);
@@ -2829,20 +2840,19 @@ gst_pad_push (GstPad * pad, GstBuffer * buffer)
   if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
     goto not_linked;
 
+  /* we emit signals on the pad areg, the peer will have a chance to
+   * emit in the _chain() function */
+  emit_signal = GST_PAD_DO_BUFFER_SIGNALS (pad) > 0;
+
   gst_object_ref (peer);
   GST_UNLOCK (pad);
 
-  if (g_atomic_int_get (&pad->emit_buffer_signals) >= 1) {
-    do_pass = gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (buffer));
+  if (G_UNLIKELY (emit_signal)) {
+    if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (buffer)))
+      goto dropping;
   }
 
-  if (do_pass) {
-    ret = gst_pad_chain (peer, buffer);
-  } else {
-    GST_DEBUG ("Dropping buffer due to FALSE probe return");
-    gst_buffer_unref (buffer);
-    ret = GST_FLOW_UNEXPECTED;
-  }
+  ret = gst_pad_chain (peer, buffer);
 
   gst_object_unref (peer);
 
@@ -2857,6 +2867,14 @@ not_linked:
     GST_UNLOCK (pad);
     return GST_FLOW_NOT_LINKED;
   }
+dropping:
+  {
+    gst_buffer_unref (buffer);
+    gst_object_unref (peer);
+    GST_DEBUG ("Dropping buffer due to FALSE probe return");
+    /* FIXME, failure? */
+    return GST_FLOW_UNEXPECTED;
+  }
 }
 
 /**
@@ -2896,6 +2914,7 @@ gst_pad_check_pull_range (GstPad * pad)
 
   /* see note in above function */
   if (G_LIKELY ((checkgetrangefunc = peer->checkgetrangefunc) == NULL)) {
+    /* FIXME, kindoff ghetto */
     ret = GST_PAD_GETRANGEFUNC (peer) != NULL;
   } else {
     GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
@@ -2943,6 +2962,7 @@ gst_pad_get_range (GstPad * pad, guint64 offset, guint size,
 {
   GstFlowReturn ret;
   GstPadGetRangeFunction getrangefunc;
+  gboolean emit_signal;
 
   g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SRC, GST_FLOW_ERROR);
@@ -2953,6 +2973,8 @@ gst_pad_get_range (GstPad * pad, guint64 offset, guint size,
   GST_LOCK (pad);
   if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
     goto flushing;
+
+  emit_signal = GST_PAD_DO_BUFFER_SIGNALS (pad) > 0;
   GST_UNLOCK (pad);
 
   if (G_UNLIKELY ((getrangefunc = GST_PAD_GETRANGEFUNC (pad)) == NULL))
@@ -2966,20 +2988,14 @@ gst_pad_get_range (GstPad * pad, guint64 offset, guint size,
 
   ret = getrangefunc (pad, offset, size, buffer);
 
-  GST_STREAM_UNLOCK (pad);
-
-  if (ret == GST_FLOW_OK && g_atomic_int_get (&pad->emit_buffer_signals) >= 1) {
-    gboolean do_pass = TRUE;
-
-    do_pass = gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (*buffer));
-    if (!do_pass) {
-      GST_DEBUG ("Dropping data after FALSE probe return");
-      gst_buffer_unref (*buffer);
-      *buffer = NULL;
-      ret = GST_FLOW_UNEXPECTED;
-    }
+  /* can only fire the signal if we have a valid buffer */
+  if (G_UNLIKELY (emit_signal) && (ret == GST_FLOW_OK)) {
+    if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (*buffer)))
+      goto dropping;
   }
 
+  GST_STREAM_UNLOCK (pad);
+
   return ret;
 
   /* ERRORS */
@@ -2999,6 +3015,14 @@ no_function:
     GST_STREAM_UNLOCK (pad);
     return GST_FLOW_ERROR;
   }
+dropping:
+  {
+    GST_DEBUG ("Dropping data after FALSE probe return");
+    GST_STREAM_UNLOCK (pad);
+    gst_buffer_unref (*buffer);
+    *buffer = NULL;
+    return GST_FLOW_UNEXPECTED;
+  }
 }
 
 
@@ -3021,6 +3045,7 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
 {
   GstPad *peer;
   GstFlowReturn ret;
+  gboolean emit_signal;
 
   g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SINK,
@@ -3035,6 +3060,10 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
   if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
     goto not_connected;
 
+  /* signal emision for the pad, peer has chance to emit when
+   * we call _get_range() */
+  emit_signal = GST_PAD_DO_BUFFER_SIGNALS (pad) > 0;
+
   gst_object_ref (peer);
   GST_UNLOCK (pad);
 
@@ -3042,18 +3071,11 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
 
   gst_object_unref (peer);
 
-  if (ret == GST_FLOW_OK && g_atomic_int_get (&pad->emit_buffer_signals) >= 1) {
-    gboolean do_pass = TRUE;
-
-    do_pass = gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (*buffer));
-    if (!do_pass) {
-      GST_DEBUG ("Dropping data after FALSE probe return");
-      gst_buffer_unref (*buffer);
-      *buffer = NULL;
-      ret = GST_FLOW_UNEXPECTED;
-    }
+  /* can only fire the signal if we have a valid buffer */
+  if (G_UNLIKELY (emit_signal) && (ret == GST_FLOW_OK)) {
+    if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (*buffer)))
+      goto dropping;
   }
-
   return ret;
 
   /* ERROR recovery here */
@@ -3064,6 +3086,13 @@ not_connected:
     GST_UNLOCK (pad);
     return GST_FLOW_NOT_LINKED;
   }
+dropping:
+  {
+    GST_DEBUG ("Dropping data after FALSE probe return");
+    gst_buffer_unref (*buffer);
+    *buffer = NULL;
+    return GST_FLOW_UNEXPECTED;
+  }
 }
 
 /**
@@ -3083,7 +3112,8 @@ gboolean
 gst_pad_push_event (GstPad * pad, GstEvent * event)
 {
   GstPad *peerpad;
-  gboolean result, do_pass = TRUE;
+  gboolean result;
+  gboolean emit_signal;
 
   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
   g_return_val_if_fail (event != NULL, FALSE);
@@ -3093,20 +3123,17 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
   if (peerpad == NULL)
     goto not_linked;
 
+  emit_signal = GST_PAD_DO_EVENT_SIGNALS (pad) > 0;
+
   gst_object_ref (peerpad);
   GST_UNLOCK (pad);
 
-  if (g_atomic_int_get (&pad->emit_event_signals) >= 1) {
-    do_pass = gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (event));
+  if (G_UNLIKELY (emit_signal)) {
+    if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (event)))
+      goto dropping;
   }
 
-  if (do_pass) {
-    result = gst_pad_send_event (peerpad, event);
-  } else {
-    GST_DEBUG ("Dropping event after FALSE probe return");
-    gst_event_unref (event);
-    result = FALSE;
-  }
+  result = gst_pad_send_event (peerpad, event);
 
   gst_object_unref (peerpad);
 
@@ -3119,6 +3146,13 @@ not_linked:
     GST_UNLOCK (pad);
     return FALSE;
   }
+dropping:
+  {
+    GST_DEBUG ("Dropping event after FALSE probe return");
+    gst_object_unref (peerpad);
+    gst_event_unref (event);
+    return FALSE;
+  }
 }
 
 /**
@@ -3134,8 +3168,9 @@ not_linked:
 gboolean
 gst_pad_send_event (GstPad * pad, GstEvent * event)
 {
-  gboolean result = FALSE, do_pass = TRUE;
+  gboolean result = FALSE;
   GstPadEventFunction eventfunc;
+  gboolean emit_signal;
 
   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
   g_return_val_if_fail (event != NULL, FALSE);
@@ -3173,20 +3208,17 @@ gst_pad_send_event (GstPad * pad, GstEvent * event)
   if ((eventfunc = GST_PAD_EVENTFUNC (pad)) == NULL)
     goto no_function;
 
+  emit_signal = GST_PAD_DO_EVENT_SIGNALS (pad) > 0;
+
   gst_object_ref (pad);
   GST_UNLOCK (pad);
 
-  if (g_atomic_int_get (&pad->emit_event_signals) >= 1) {
-    do_pass = gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (event));
+  if (G_UNLIKELY (emit_signal)) {
+    if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (event)))
+      goto dropping;
   }
 
-  if (do_pass) {
-    result = eventfunc (GST_PAD_CAST (pad), event);
-  } else {
-    GST_DEBUG ("Dropping event after FALSE probe return");
-    gst_event_unref (event);
-    result = FALSE;
-  }
+  result = eventfunc (GST_PAD_CAST (pad), event);
 
   gst_object_unref (pad);
 
@@ -3208,6 +3240,13 @@ flushing:
     gst_event_unref (event);
     return FALSE;
   }
+dropping:
+  {
+    GST_DEBUG ("Dropping event after FALSE probe return");
+    gst_object_unref (pad);
+    gst_event_unref (event);
+    return FALSE;
+  }
 }
 
 /************************************************************************
index 6b889c2..29c2c1a 100644 (file)
@@ -199,8 +199,10 @@ struct _GstPad {
 
   GstPadBufferAllocFunction      bufferallocfunc;
 
-  /* whether to emit signals for have-data */
-  gint                          emit_buffer_signals, emit_event_signals;
+  /* whether to emit signals for have-data. counts number
+   * of handlers attached. */
+  gint                          do_buffer_signals;
+  gint                          do_event_signals;
 
   /*< private >*/
   gpointer _gst_reserved[GST_PADDING];
@@ -253,8 +255,11 @@ struct _GstPadClass {
 
 #define GST_PAD_BUFFERALLOCFUNC(pad)   (GST_PAD_CAST(pad)->bufferallocfunc)
 
+#define GST_PAD_DO_BUFFER_SIGNALS(pad)         (GST_PAD_CAST(pad)->do_buffer_signals)
+#define GST_PAD_DO_EVENT_SIGNALS(pad)  (GST_PAD_CAST(pad)->do_event_signals)
+
 #define GST_PAD_IS_LINKED(pad)         (GST_PAD_PEER(pad) != NULL)
-#define GST_PAD_IS_BLOCKED(pad)        (GST_FLAG_IS_SET (pad, GST_PAD_BLOCKED))
+#define GST_PAD_IS_BLOCKED(pad)                (GST_FLAG_IS_SET (pad, GST_PAD_BLOCKED))
 #define GST_PAD_IS_FLUSHING(pad)       (GST_FLAG_IS_SET (pad, GST_PAD_FLUSHING))
 #define GST_PAD_IS_IN_GETCAPS(pad)     (GST_FLAG_IS_SET (pad, GST_PAD_IN_GETCAPS))
 #define GST_PAD_IS_IN_SETCAPS(pad)     (GST_FLAG_IS_SET (pad, GST_PAD_IN_SETCAPS))
index 32c35bc..c0a5481 100644 (file)
@@ -1802,9 +1802,9 @@ gst_buffer_merge (GstBuffer * buf1, GstBuffer * buf2)
 }
 
 /**
- * gst_buffer_merge:
- * @buf1: a first source #GstBuffer to merge.
- * @buf2: the second source #GstBuffer to merge.
+ * gst_buffer_join:
+ * @buf1: a first source #GstBuffer to join.
+ * @buf2: the second source #GstBuffer to join.
  *
  * Create a new buffer that is the concatenation of the two source
  * buffers, and takes ownership of the original source buffers.
@@ -2070,6 +2070,7 @@ gst_atomic_int_set (gint * atomic_int, gint value)
   int ignore;
 
   *atomic_int = value;
+  /* read acts as a memory barrier */
   ignore = g_atomic_int_get (atomic_int);
 }
 
@@ -2080,7 +2081,7 @@ gst_atomic_int_set (gint * atomic_int, gint value)
  * @data: data to pass along with the handler
  *
  * Connects a signal handler to the pad's have-data signal, and increases
- * the emit_{buffer,event}_signals atomic number on the pads so that those
+ * the do_{buffer,event}_signals number on the pads so that those
  * signals are actually fired.
  */
 
@@ -2090,9 +2091,14 @@ gst_pad_add_data_probe (GstPad * pad, GCallback handler, gpointer data)
   g_return_if_fail (GST_IS_PAD (pad));
   g_return_if_fail (handler != NULL);
 
+  GST_LOCK (pad);
   g_signal_connect (pad, "have-data", handler, data);
-  g_atomic_int_inc (&pad->emit_event_signals);
-  g_atomic_int_inc (&pad->emit_buffer_signals);
+  GST_PAD_DO_EVENT_SIGNALS (pad)++;
+  GST_PAD_DO_BUFFER_SIGNALS (pad)++;
+  GST_DEBUG ("adding data probe to pad %s:%s, now %d data, %d event probes",
+      GST_DEBUG_PAD_NAME (pad),
+      GST_PAD_DO_BUFFER_SIGNALS (pad), GST_PAD_DO_EVENT_SIGNALS (pad));
+  GST_UNLOCK (pad);
 }
 
 /**
@@ -2102,7 +2108,7 @@ gst_pad_add_data_probe (GstPad * pad, GCallback handler, gpointer data)
  * @data: data to pass along with the handler
  *
  * Connects a signal handler to the pad's have-data signal, and increases
- * the emit_event_signals atomic number on the pads so that this signal
+ * the do_event_signals number on the pads so that this signal
  * is actually fired.
  */
 
@@ -2112,8 +2118,12 @@ gst_pad_add_event_probe (GstPad * pad, GCallback handler, gpointer data)
   g_return_if_fail (GST_IS_PAD (pad));
   g_return_if_fail (handler != NULL);
 
+  GST_LOCK (pad);
   g_signal_connect (pad, "have-data", handler, data);
-  g_atomic_int_inc (&pad->emit_event_signals);
+  GST_PAD_DO_EVENT_SIGNALS (pad)++;
+  GST_DEBUG ("adding event probe to pad %s:%s, now %d probes",
+      GST_DEBUG_PAD_NAME (pad), GST_PAD_DO_EVENT_SIGNALS (pad));
+  GST_UNLOCK (pad);
 }
 
 /**
@@ -2123,7 +2133,7 @@ gst_pad_add_event_probe (GstPad * pad, GCallback handler, gpointer data)
  * @data: data to pass along with the handler
  *
  * Connects a signal handler to the pad's have-data signal, and increases
- * the emit_buffer_signals atomic number on the pads so that this signal
+ * the do_buffer_signals number on the pads so that this signal
  * is actually fired.
  */
 
@@ -2133,12 +2143,14 @@ gst_pad_add_buffer_probe (GstPad * pad, GCallback handler, gpointer data)
   g_return_if_fail (GST_IS_PAD (pad));
   g_return_if_fail (handler != NULL);
 
+  GST_LOCK (pad);
   g_signal_connect (pad, "have-data", handler, data);
-  g_atomic_int_inc (&pad->emit_buffer_signals);
+  GST_PAD_DO_BUFFER_SIGNALS (pad)++;
+  GST_DEBUG ("adding buffer probe to pad %s:%s, now %d probes",
+      GST_DEBUG_PAD_NAME (pad), GST_PAD_DO_BUFFER_SIGNALS (pad));
+  GST_UNLOCK (pad);
 }
 
-#define g_atomic_int_dec(atomic) (g_atomic_int_add ((atomic), -1))
-
 /**
  * gst_pad_remove_data_probe:
  * @pad: pad to remove the data probe handler from
@@ -2146,19 +2158,27 @@ gst_pad_add_buffer_probe (GstPad * pad, GCallback handler, gpointer data)
  * @data: data that was assigned to the signal handler
  *
  * Unconnects a signal handler to the pad's have-data signal, and decreases
- * the emit_{buffer,event}_signals atomic number on the pads so that those
+ * the do_{buffer,event}_signals number on the pads so that those
  * signals are actually no more fired if no signals are connected.
  */
 
 void
 gst_pad_remove_data_probe (GstPad * pad, GCallback handler, gpointer data)
 {
+  guint count;
+
   g_return_if_fail (GST_IS_PAD (pad));
   g_return_if_fail (handler != NULL);
 
-  g_signal_handlers_disconnect_by_func (pad, handler, data);
-  g_atomic_int_dec (&pad->emit_event_signals);
-  g_atomic_int_dec (&pad->emit_buffer_signals);
+  GST_LOCK (pad);
+  count = g_signal_handlers_disconnect_by_func (pad, handler, data);
+  GST_PAD_DO_BUFFER_SIGNALS (pad) -= count;
+  GST_PAD_DO_EVENT_SIGNALS (pad) -= count;
+  GST_DEBUG
+      ("removing %d data probes from pad %s:%s, now %d event, %d buffer probes",
+      count, GST_DEBUG_PAD_NAME (pad), GST_PAD_DO_EVENT_SIGNALS (pad),
+      GST_PAD_DO_BUFFER_SIGNALS (pad));
+  GST_UNLOCK (pad);
 }
 
 /**
@@ -2168,18 +2188,24 @@ gst_pad_remove_data_probe (GstPad * pad, GCallback handler, gpointer data)
  * @data: data that was assigned to the signal handler
  *
  * Unconnects a signal handler to the pad's have-data signal, and decreases
- * the emit_event_signals atomic number on the pads so that this signal is
+ * the do_event_signals number on the pads so that this signal is
  * actually no more fired if no signals are connected.
  */
 
 void
 gst_pad_remove_event_probe (GstPad * pad, GCallback handler, gpointer data)
 {
+  guint count;
+
   g_return_if_fail (GST_IS_PAD (pad));
   g_return_if_fail (handler != NULL);
 
-  g_signal_handlers_disconnect_by_func (pad, handler, data);
-  g_atomic_int_dec (&pad->emit_event_signals);
+  GST_LOCK (pad);
+  count = g_signal_handlers_disconnect_by_func (pad, handler, data);
+  GST_PAD_DO_EVENT_SIGNALS (pad) -= count;
+  GST_DEBUG ("removing %d event probes from pad %s:%s, now %d event probes",
+      count, GST_DEBUG_PAD_NAME (pad), GST_PAD_DO_EVENT_SIGNALS (pad));
+  GST_UNLOCK (pad);
 }
 
 /**
@@ -2189,16 +2215,22 @@ gst_pad_remove_event_probe (GstPad * pad, GCallback handler, gpointer data)
  * @data: data that was assigned to the signal handler
  *
  * Unconnects a signal handler to the pad's have-data signal, and decreases
- * the emit_buffer_signals atomic number on the pads so that this signal is
+ * the emit_buffer_signals number on the pads so that this signal is
  * actually no more fired if no signals are connected.
  */
 
 void
 gst_pad_remove_buffer_probe (GstPad * pad, GCallback handler, gpointer data)
 {
+  guint count;
+
   g_return_if_fail (GST_IS_PAD (pad));
   g_return_if_fail (handler != NULL);
 
-  g_signal_handlers_disconnect_by_func (pad, handler, data);
-  g_atomic_int_dec (&pad->emit_buffer_signals);
+  GST_LOCK (pad);
+  count = g_signal_handlers_disconnect_by_func (pad, handler, data);
+  GST_PAD_DO_BUFFER_SIGNALS (pad) -= count;
+  GST_DEBUG ("removing %d buffer probes from pad %s:%s, now %d buffer probes",
+      count, GST_DEBUG_PAD_NAME (pad), GST_PAD_DO_BUFFER_SIGNALS (pad));
+  GST_UNLOCK (pad);
 }