gstpad: Add a new GST_PROBE_HANDLED return value for probes
authorEdward Hervey <edward@centricular.com>
Wed, 29 Apr 2015 13:49:17 +0000 (15:49 +0200)
committerEdward Hervey <bilboed@bilboed.com>
Sat, 15 Aug 2015 15:00:12 +0000 (17:00 +0200)
In some cases, probes might want to handle the buffer/event/query
themselves and stop the data from travelling further downstream.

While this was somewhat possible with buffer/events and using
GST_PROBE_DROP, it was not applicable to queries, and would result
in the query failing.

With this new GST_PROBE_HANDLED value, the buffer/event/query will
be considered as successfully handled, will not be pushed further
and the appropriate return value (TRUE or GST_FLOW_OK) will be returned

This also allows probes to return a non-default GstFlowReturn when dealing
with buffer push. This can be done by setting the
GST_PAD_PROBE_INFO_FLOW_RETURN() field accordingly

https://bugzilla.gnome.org/show_bug.cgi?id=748643

gst/gstpad.c
gst/gstpad.h
tests/check/gst/gstpad.c

index 49a3e954bf0a571e832ac1e555f942a258a6a397..5a7fc0ad264a9d4b996cdd88d2fd046f8f7ec114 100644 (file)
@@ -163,6 +163,7 @@ typedef struct
   GstPadProbeInfo *info;
   gboolean dropped;
   gboolean pass;
+  gboolean handled;
   gboolean marshalled;
   guint cookie;
 } ProbeMarshall;
@@ -1420,6 +1421,9 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask,
         case GST_PAD_PROBE_OK:
           GST_DEBUG_OBJECT (pad, "probe returned OK");
           break;
+        case GST_PAD_PROBE_HANDLED:
+          GST_DEBUG_OBJECT (pad, "probe handled the data");
+          break;
         default:
           GST_DEBUG_OBJECT (pad, "probe returned %d", ret);
           break;
@@ -3363,6 +3367,10 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
       info->type = GST_PAD_PROBE_TYPE_INVALID;
       data->dropped = TRUE;
       break;
+    case GST_PAD_PROBE_HANDLED:
+      GST_DEBUG_OBJECT (pad, "probe handled data");
+      data->handled = TRUE;
+      break;
     case GST_PAD_PROBE_PASS:
       /* inform the pad block to let things pass */
       GST_DEBUG_OBJECT (pad, "asked to pass item");
@@ -3392,31 +3400,40 @@ no_match:
     if (G_UNLIKELY (pad->num_probes)) {                                \
       GstFlowReturn pval = defaultval;                         \
       /* pass NULL as the data item */                          \
-      GstPadProbeInfo info = { mask, 0, NULL, 0, 0 };           \
+      GstPadProbeInfo info = { mask, 0, NULL, 0, 0 };          \
+      info.ABI.abi.flow_ret = defaultval;                      \
       ret = do_probe_callbacks (pad, &info, defaultval);       \
       if (G_UNLIKELY (ret != pval && ret != GST_FLOW_OK))      \
         goto label;                                            \
     }                                                          \
   } G_STMT_END
 
-#define PROBE_FULL(pad,mask,data,offs,size,label)               \
-  G_STMT_START {                                               \
-    if (G_UNLIKELY (pad->num_probes)) {                                \
-      /* pass the data item */                                  \
-      GstPadProbeInfo info = { mask, 0, data, offs, size };     \
-      ret = do_probe_callbacks (pad, &info, GST_FLOW_OK);      \
-      /* store the possibly updated data item */                \
-      data = GST_PAD_PROBE_INFO_DATA (&info);                   \
-      /* if something went wrong, exit */                       \
-      if (G_UNLIKELY (ret != GST_FLOW_OK))                     \
-        goto label;                                            \
-    }                                                          \
+#define PROBE_FULL(pad,mask,data,offs,size,label,handleable,handle_label) \
+  G_STMT_START {                                                       \
+    if (G_UNLIKELY (pad->num_probes)) {                                        \
+      /* pass the data item */                                         \
+      GstPadProbeInfo info = { mask, 0, data, offs, size };            \
+      info.ABI.abi.flow_ret = GST_FLOW_OK;                             \
+      ret = do_probe_callbacks (pad, &info, GST_FLOW_OK);              \
+      /* store the possibly updated data item */                       \
+      data = GST_PAD_PROBE_INFO_DATA (&info);                          \
+      /* if something went wrong, exit */                              \
+      if (G_UNLIKELY (ret != GST_FLOW_OK)) {                           \
+       if (handleable && ret == GST_FLOW_CUSTOM_SUCCESS_1) {           \
+         ret = info.ABI.abi.flow_ret;                                          \
+         goto handle_label;                                            \
+       }                                                               \
+       goto label;                                                     \
+      }                                                                        \
+    }                                                                  \
   } G_STMT_END
 
-#define PROBE_PUSH(pad,mask,data,label)                                 \
-  PROBE_FULL(pad, mask, data, -1, -1, label);
-#define PROBE_PULL(pad,mask,data,offs,size,label)               \
-  PROBE_FULL(pad, mask, data, offs, size, label);
+#define PROBE_PUSH(pad,mask,data,label)                \
+  PROBE_FULL(pad, mask, data, -1, -1, label, FALSE, label);
+#define PROBE_HANDLE(pad,mask,data,label,handle_label) \
+  PROBE_FULL(pad, mask, data, -1, -1, label, TRUE, handle_label);
+#define PROBE_PULL(pad,mask,data,offs,size,label)              \
+  PROBE_FULL(pad, mask, data, offs, size, label, FALSE, label);
 
 static GstFlowReturn
 do_pad_idle_probe_wait (GstPad * pad)
@@ -3461,6 +3478,7 @@ do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
   data.pad = pad;
   data.info = info;
   data.pass = FALSE;
+  data.handled = FALSE;
   data.marshalled = FALSE;
   data.dropped = FALSE;
   data.cookie = ++pad->priv->probe_cookie;
@@ -3493,6 +3511,11 @@ again:
   if (data.dropped)
     goto dropped;
 
+  /* If one handler took care of it, let the the item pass */
+  if (data.handled) {
+    goto handled;
+  }
+
   /* if no handler matched and we are blocking, let the item pass */
   if (!data.marshalled && is_block)
     goto passed;
@@ -3554,6 +3577,11 @@ passed:
     GST_DEBUG_OBJECT (pad, "data is passed");
     return GST_FLOW_OK;
   }
+handled:
+  {
+    GST_DEBUG_OBJECT (pad, "data was handled");
+    return GST_FLOW_CUSTOM_SUCCESS_1;
+  }
 }
 
 /* pad offsets */
@@ -3855,9 +3883,12 @@ probe_stopped:
     if (G_UNLIKELY (serialized))
       GST_PAD_STREAM_UNLOCK (pad);
 
-    /* if a probe dropped, we don't sent it further but assume that the probe
-     * did not answer the query and return FALSE */
-    res = FALSE;
+    /* if a probe dropped without handling, we don't sent it further but assume
+     * that the probe did not answer the query and return FALSE */
+    if (ret != GST_FLOW_CUSTOM_SUCCESS_1)
+      res = FALSE;
+    else
+      res = TRUE;
 
     return res;
   }
@@ -3969,9 +4000,12 @@ probe_stopped:
     GST_DEBUG_OBJECT (pad, "probe stopped: %s", gst_flow_get_name (ret));
     GST_OBJECT_UNLOCK (pad);
 
-    /* if a probe dropped, we don't sent it further but assume that the probe
-     * did not answer the query and return FALSE */
-    res = FALSE;
+    /* if a probe dropped without handling, we don't sent it further but
+     * assume that the probe did not answer the query and return FALSE */
+    if (ret != GST_FLOW_CUSTOM_SUCCESS_1)
+      res = FALSE;
+    else
+      res = TRUE;
 
     return res;
   }
@@ -3989,6 +4023,7 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data)
 {
   GstFlowReturn ret;
   GstObject *parent;
+  gboolean handled = FALSE;
 
   GST_PAD_STREAM_LOCK (pad);
 
@@ -4018,9 +4053,10 @@ gst_pad_chain_data_unchecked (GstPad * pad, GstPadProbeType type, void *data)
   }
 #endif
 
-  PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped);
+  PROBE_HANDLE (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped,
+      probe_handled);
 
-  PROBE_PUSH (pad, type, data, probe_stopped);
+  PROBE_HANDLE (pad, type, data, probe_stopped, probe_handled);
 
   ACQUIRE_PARENT (pad, parent, no_parent);
   GST_OBJECT_UNLOCK (pad);
@@ -4095,15 +4131,21 @@ wrong_mode:
     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
     return GST_FLOW_ERROR;
   }
+probe_handled:
+  handled = TRUE;
+  /* PASSTHROUGH */
 probe_stopped:
   {
     GST_OBJECT_UNLOCK (pad);
     GST_PAD_STREAM_UNLOCK (pad);
-    gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+    /* We unref the buffer, except if the probe handled it (CUSTOM_SUCCESS_1) */
+    if (!handled)
+      gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
 
     switch (ret) {
       case GST_FLOW_CUSTOM_SUCCESS:
-        GST_DEBUG_OBJECT (pad, "dropped buffer");
+      case GST_FLOW_CUSTOM_SUCCESS_1:
+        GST_DEBUG_OBJECT (pad, "dropped or handled buffer");
         ret = GST_FLOW_OK;
         break;
       default:
@@ -4236,6 +4278,7 @@ gst_pad_push_data (GstPad * pad, GstPadProbeType type, void *data)
 {
   GstPad *peer;
   GstFlowReturn ret;
+  gboolean handled = FALSE;
 
   GST_OBJECT_LOCK (pad);
   if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
@@ -4267,14 +4310,15 @@ gst_pad_push_data (GstPad * pad, GstPadProbeType type, void *data)
     goto events_error;
 
   /* do block probes */
-  PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped);
+  PROBE_HANDLE (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped,
+      probe_handled);
 
   /* recheck sticky events because the probe might have cause a relink */
   if (G_UNLIKELY ((ret = check_sticky (pad, NULL))) != GST_FLOW_OK)
     goto events_error;
 
   /* do post-blocking probes */
-  PROBE_PUSH (pad, type, data, probe_stopped);
+  PROBE_HANDLE (pad, type, data, probe_stopped, probe_handled);
 
   if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
     goto not_linked;
@@ -4338,23 +4382,26 @@ events_error:
     gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
     return ret;
   }
+probe_handled:
+  handled = TRUE;
+  /* PASSTHROUGH */
 probe_stopped:
   {
     GST_OBJECT_UNLOCK (pad);
-    pad->ABI.abi.last_flowret =
-        ret == GST_FLOW_CUSTOM_SUCCESS ? GST_FLOW_OK : ret;
-    if (data != NULL)
+    if (data != NULL && !handled)
       gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
 
     switch (ret) {
       case GST_FLOW_CUSTOM_SUCCESS:
-        GST_DEBUG_OBJECT (pad, "dropped buffer");
+      case GST_FLOW_CUSTOM_SUCCESS_1:
+        GST_DEBUG_OBJECT (pad, "dropped or handled buffer");
         ret = GST_FLOW_OK;
         break;
       default:
         GST_DEBUG_OBJECT (pad, "an error occurred %s", gst_flow_get_name (ret));
         break;
     }
+    pad->ABI.abi.last_flowret = ret;
     return ret;
   }
 not_linked:
@@ -5102,9 +5149,13 @@ inactive:
 probe_stopped:
   {
     GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS);
-    gst_event_unref (event);
+    if (ret != GST_FLOW_CUSTOM_SUCCESS_1)
+      gst_event_unref (event);
 
     switch (ret) {
+      case GST_FLOW_CUSTOM_SUCCESS_1:
+        GST_DEBUG_OBJECT (pad, "handled event");
+        break;
       case GST_FLOW_CUSTOM_SUCCESS:
         GST_DEBUG_OBJECT (pad, "dropped event");
         break;
@@ -5200,7 +5251,8 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
     /* other events are pushed right away */
     ret = gst_pad_push_event_unchecked (pad, event, type);
     /* dropped events by a probe are not an error */
-    res = (ret == GST_FLOW_OK || ret == GST_FLOW_CUSTOM_SUCCESS);
+    res = (ret == GST_FLOW_OK || ret == GST_FLOW_CUSTOM_SUCCESS
+        || ret == GST_FLOW_CUSTOM_SUCCESS_1);
   } else {
     /* Errors in sticky event pushing are no problem and ignored here
      * as they will cause more meaningful errors during data flow.
@@ -5467,11 +5519,14 @@ probe_stopped:
     GST_OBJECT_UNLOCK (pad);
     if (need_unlock)
       GST_PAD_STREAM_UNLOCK (pad);
-    gst_event_unref (event);
+    /* Only unref if unhandled */
+    if (ret != GST_FLOW_CUSTOM_SUCCESS_1)
+      gst_event_unref (event);
 
     switch (ret) {
+      case GST_FLOW_CUSTOM_SUCCESS_1:
       case GST_FLOW_CUSTOM_SUCCESS:
-        GST_DEBUG_OBJECT (pad, "dropped event");
+        GST_DEBUG_OBJECT (pad, "dropped or handled event");
         ret = GST_FLOW_OK;
         break;
       default:
index 89a24d42a3ac44d9834d2b95b82d55cd43c5b5ac..157d147b77c52a91ad3eb4be87cc327afc23e0d4 100644 (file)
@@ -514,6 +514,14 @@ typedef enum
  * @GST_PAD_PROBE_REMOVE: remove this probe.
  * @GST_PAD_PROBE_PASS: pass the data item in the block probe and block on the
  *        next item.
+ * @GST_PAD_PROBE_HANDLED: Data has been handled in the probe and will not be
+ *        forwarded further. For events and buffers this is the same behaviour as
+ *        @GST_PAD_PROBE_DROP (except that in this case you need to unref the buffer
+ *        or event yourself). For queries it will also return %TRUE to the caller.
+ *        The probe can also modify the #GstFlowReturn value by using the
+ *        #GST_PAD_PROBE_INFO_FLOW_RETURN() accessor.
+ *        Note that the resulting query must contain valid entries.
+ *        Since: 1.6
  *
  * Different return values for the #GstPadProbeCallback.
  */
@@ -523,6 +531,7 @@ typedef enum
   GST_PAD_PROBE_OK,
   GST_PAD_PROBE_REMOVE,
   GST_PAD_PROBE_PASS,
+  GST_PAD_PROBE_HANDLED
 } GstPadProbeReturn;
 
 
@@ -548,12 +557,18 @@ struct _GstPadProbeInfo
   guint size;
 
   /*< private >*/
-  gpointer _gst_reserved[GST_PADDING];
+  union {
+    gpointer _gst_reserved[GST_PADDING];
+    struct {
+      GstFlowReturn flow_ret;
+    } abi;
+  } ABI;
 };
 
 #define GST_PAD_PROBE_INFO_TYPE(d)         ((d)->type)
 #define GST_PAD_PROBE_INFO_ID(d)           ((d)->id)
 #define GST_PAD_PROBE_INFO_DATA(d)         ((d)->data)
+#define GST_PAD_PROBE_INFO_FLOW_RETURN(d)  ((d)->ABI.abi.flow_ret)
 
 #define GST_PAD_PROBE_INFO_BUFFER(d)       GST_BUFFER_CAST(GST_PAD_PROBE_INFO_DATA(d))
 #define GST_PAD_PROBE_INFO_BUFFER_LIST(d)  GST_BUFFER_LIST_CAST(GST_PAD_PROBE_INFO_DATA(d))
index 544744499941ed0bf5f78514d43dac7b281d62cf..f74318f92b63e6693f2fa9d6763fd5d241cbed27 100644 (file)
@@ -595,20 +595,111 @@ GST_END_TEST;
 static GstPadProbeReturn
 _probe_handler (GstPad * pad, GstPadProbeInfo * info, gpointer userdata)
 {
-  gint ret = GPOINTER_TO_INT (userdata);
+  GstPadProbeReturn ret = (GstPadProbeReturn) GPOINTER_TO_INT (userdata);
 
-  if (ret == 1)
-    return GST_PAD_PROBE_OK;
+  /* If we are handling the data, we unref it */
+  if (ret == GST_PAD_PROBE_HANDLED
+      && !(GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_QUERY_BOTH)) {
+    GST_DEBUG_OBJECT (pad, "Unreffing data");
+    gst_mini_object_unref (info->data);
+  }
+  return ret;
+}
+
+static GstPadProbeReturn
+_handled_probe_handler (GstPad * pad, GstPadProbeInfo * info, gpointer userdata)
+{
+  GstFlowReturn customflow = (GstFlowReturn) GPOINTER_TO_INT (userdata);
+
+  /* We are handling the data, we unref it */
+  if (!(GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_QUERY_BOTH))
+    gst_mini_object_unref (info->data);
+  GST_PAD_PROBE_INFO_FLOW_RETURN (info) = customflow;
+
+  return GST_PAD_PROBE_HANDLED;
+}
+
+
+
+GST_START_TEST (test_events_query_unlinked)
+{
+  GstPad *src;
+  GstCaps *caps;
+  gulong id;
+  GstQuery *query;
+
+  src = gst_pad_new ("src", GST_PAD_SRC);
+  fail_if (src == NULL);
+  caps = gst_pad_get_allowed_caps (src);
+  fail_unless (caps == NULL);
+
+  caps = gst_caps_from_string ("foo/bar");
+
+  gst_pad_set_active (src, TRUE);
+  fail_unless (gst_pad_push_event (src,
+          gst_event_new_stream_start ("test")) == TRUE);
+  gst_pad_set_caps (src, caps);
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
+  fail_unless (gst_pad_push_event (src,
+          gst_event_new_segment (&dummy_segment)) == TRUE);
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
+
+  /* Doing a query on an unlinked pad will return FALSE */
+  query = gst_query_new_duration (GST_FORMAT_TIME);
+  fail_unless (gst_pad_peer_query (src, query) == FALSE);
+  ASSERT_MINI_OBJECT_REFCOUNT (query, "query", 1);
+  gst_query_unref (query);
+
+  /* Add a probe that returns _DROP will make the event push return TRUE
+   * even if not linked */
+  GST_DEBUG ("event/query DROP");
+  id = gst_pad_add_probe (src,
+      GST_PAD_PROBE_TYPE_EVENT_BOTH | GST_PAD_PROBE_TYPE_QUERY_BOTH,
+      _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_DROP), NULL);
+  fail_unless (gst_pad_push_event (src,
+          gst_event_new_segment (&dummy_segment)) == TRUE);
+  /* Queries should stil fail */
+  query = gst_query_new_duration (GST_FORMAT_TIME);
+  fail_unless (gst_pad_peer_query (src, query) == FALSE);
+  ASSERT_MINI_OBJECT_REFCOUNT (query, "query", 1);
+  gst_query_unref (query);
+  gst_pad_remove_probe (src, id);
+
+  /* Add a probe that returns _HANDLED will make the event push return TRUE
+   * even if not linked */
+  GST_DEBUG ("event/query HANDLED");
+  id = gst_pad_add_probe (src,
+      GST_PAD_PROBE_TYPE_EVENT_BOTH | GST_PAD_PROBE_TYPE_QUERY_BOTH,
+      _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_HANDLED), NULL);
+  fail_unless (gst_pad_push_event (src,
+          gst_event_new_segment (&dummy_segment)) == TRUE);
 
-  return GST_PAD_PROBE_DROP;
+  /* Queries will succeed */
+  query = gst_query_new_duration (GST_FORMAT_TIME);
+  fail_unless (gst_pad_peer_query (src, query) == TRUE);
+  ASSERT_MINI_OBJECT_REFCOUNT (query, "query", 1);
+  gst_query_unref (query);
+  gst_pad_remove_probe (src, id);
+
+  /* cleanup */
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
+  ASSERT_OBJECT_REFCOUNT (src, "src", 1);
+
+  gst_object_unref (src);
+
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+  gst_caps_unref (caps);
 }
 
+GST_END_TEST;
+
 GST_START_TEST (test_push_unlinked)
 {
   GstPad *src;
   GstCaps *caps;
   GstBuffer *buffer;
   gulong id;
+  GstFlowReturn fl;
 
   src = gst_pad_new ("src", GST_PAD_SRC);
   fail_if (src == NULL);
@@ -646,7 +737,19 @@ GST_START_TEST (test_push_unlinked)
    * to chain */
   GST_DEBUG ("push buffer drop");
   id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER,
-      _probe_handler, GINT_TO_POINTER (0), NULL);
+      _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_DROP), NULL);
+  buffer = gst_buffer_new ();
+  gst_buffer_ref (buffer);
+  fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+  gst_buffer_unref (buffer);
+  gst_pad_remove_probe (src, id);
+
+  /* adding a probe that returns _HANDLED will drop the buffer without trying
+   * to chain */
+  GST_DEBUG ("push buffer handled");
+  id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER,
+      _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_HANDLED), NULL);
   buffer = gst_buffer_new ();
   gst_buffer_ref (buffer);
   fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
@@ -658,7 +761,7 @@ GST_START_TEST (test_push_unlinked)
    * and hence drop because pad is unlinked */
   GST_DEBUG ("push buffer ok");
   id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER,
-      _probe_handler, GINT_TO_POINTER (1), NULL);
+      _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_OK), NULL);
   buffer = gst_buffer_new ();
   gst_buffer_ref (buffer);
   fail_unless (gst_pad_push (src, buffer) == GST_FLOW_NOT_LINKED);
@@ -666,6 +769,20 @@ GST_START_TEST (test_push_unlinked)
   gst_buffer_unref (buffer);
   gst_pad_remove_probe (src, id);
 
+  GST_DEBUG ("push buffer handled and custom return");
+  for (fl = GST_FLOW_NOT_SUPPORTED; fl <= GST_FLOW_OK; fl += 1) {
+    GST_DEBUG ("Testing with %s", gst_flow_get_name (fl));
+    id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER,
+        _handled_probe_handler, GINT_TO_POINTER (fl), NULL);
+    buffer = gst_buffer_new ();
+    gst_buffer_ref (buffer);
+    fail_unless (gst_pad_push (src, buffer) == fl);
+    ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+    gst_buffer_unref (buffer);
+    gst_pad_remove_probe (src, id);
+
+  }
+
 
   /* cleanup */
   ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
@@ -732,10 +849,10 @@ GST_START_TEST (test_push_linked)
   g_list_free (buffers);
   buffers = NULL;
 
-  /* adding a probe that returns FALSE will drop the buffer without trying
+  /* adding a probe that returns _DROP will drop the buffer without trying
    * to chain */
   id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER,
-      _probe_handler, GINT_TO_POINTER (0), NULL);
+      _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_DROP), NULL);
   buffer = gst_buffer_new ();
   gst_buffer_ref (buffer);
   fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
@@ -744,9 +861,9 @@ GST_START_TEST (test_push_linked)
   gst_pad_remove_probe (src, id);
   fail_unless_equals_int (g_list_length (buffers), 0);
 
-  /* adding a probe that returns TRUE will still chain the buffer */
+  /* adding a probe that returns _OK will still chain the buffer */
   id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER,
-      _probe_handler, GINT_TO_POINTER (1), NULL);
+      _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_OK), NULL);
   buffer = gst_buffer_new ();
   gst_buffer_ref (buffer);
   fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
@@ -761,6 +878,20 @@ GST_START_TEST (test_push_linked)
   g_list_free (buffers);
   buffers = NULL;
 
+  /* adding a probe that returns _HANDLED will not chain the buffer */
+  id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER,
+      _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_HANDLED), NULL);
+  buffer = gst_buffer_new ();
+  gst_buffer_ref (buffer);
+  fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
+  gst_pad_remove_probe (src, id);
+
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+  gst_buffer_unref (buffer);
+  fail_unless_equals_int (g_list_length (buffers), 0);
+  g_list_free (buffers);
+  buffers = NULL;
+
   /* teardown */
   gst_pad_unlink (src, sink);
   ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
@@ -832,10 +963,10 @@ GST_START_TEST (test_push_linked_flushing)
   gst_pad_set_caps (src, caps);
   fail_unless (gst_pad_push_event (src,
           gst_event_new_segment (&dummy_segment)) == TRUE);
-  /* adding a probe that returns FALSE will drop the buffer without trying
+  /* adding a probe that returns _DROP will drop the buffer without trying
    * to chain */
   id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER, _probe_handler,
-      GINT_TO_POINTER (0), NULL);
+      GINT_TO_POINTER (GST_PAD_PROBE_DROP), NULL);
   buffer = gst_buffer_new ();
   gst_buffer_ref (buffer);
   fail_unless (gst_pad_push (src, buffer) == GST_FLOW_FLUSHING);
@@ -844,10 +975,10 @@ GST_START_TEST (test_push_linked_flushing)
   gst_buffer_unref (buffer);
   gst_pad_remove_probe (src, id);
 
-  /* adding a probe that returns TRUE will still chain the buffer,
+  /* adding a probe that returns _OK will still chain the buffer,
    * and hence drop because pad is flushing */
   id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER, _probe_handler,
-      GINT_TO_POINTER (1), NULL);
+      GINT_TO_POINTER (GST_PAD_PROBE_OK), NULL);
   buffer = gst_buffer_new ();
   gst_buffer_ref (buffer);
   fail_unless (gst_pad_push (src, buffer) == GST_FLOW_FLUSHING);
@@ -2350,6 +2481,7 @@ gst_pad_suite (void)
   tcase_add_test (tc_chain, test_pad_probe_block_add_remove);
   tcase_add_test (tc_chain, test_pad_probe_block_and_drop_buffer);
   tcase_add_test (tc_chain, test_pad_probe_flush_events);
+  tcase_add_test (tc_chain, test_events_query_unlinked);
   tcase_add_test (tc_chain, test_queue_src_caps_notify_linked);
   tcase_add_test (tc_chain, test_queue_src_caps_notify_not_linked);
 #if 0