gst/gstpad.c: emit have-data before checking for peers. This allows for probe handle...
authorThomas Vander Stichele <thomas@apestaart.org>
Wed, 5 Oct 2005 16:16:58 +0000 (16:16 +0000)
committerThomas Vander Stichele <thomas@apestaart.org>
Wed, 5 Oct 2005 16:16:58 +0000 (16:16 +0000)
Original commit message from CVS:
* gst/gstpad.c: (gst_pad_push), (gst_pad_push_event):
emit have-data before checking for peers.  This allows
for probe handlers to connect elements.  This helps autopluggers.
* check/gst/gstpad.c: (GST_START_TEST), (_probe_handler),
(gst_pad_suite):
add six checks, linked/unlinked with no/true/false probe

ChangeLog
check/gst/gstpad.c
gst/gstpad.c
tests/check/gst/gstpad.c

index 32ab61e..a050eb9 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,12 @@
+2005-10-05  Thomas Vander Stichele  <thomas at apestaart dot org>
+
+       * gst/gstpad.c: (gst_pad_push), (gst_pad_push_event):
+         emit have-data before checking for peers.  This allows
+         for probe handlers to connect elements.  This helps autopluggers.
+       * check/gst/gstpad.c: (GST_START_TEST), (_probe_handler),
+       (gst_pad_suite):
+         add six checks, linked/unlinked with no/true/false probe
+
 2005-10-04  Wim Taymans  <wim@fluendo.com>
 
        * gst/elements/gstfakesink.c: (gst_fake_sink_get_property),
index 4c172a0..5f14d49 100644 (file)
@@ -227,6 +227,157 @@ GST_START_TEST (test_name_is_valid)
 
 GST_END_TEST;
 
+static gboolean
+_probe_handler (GstPad * pad, GstBuffer * buffer, gpointer userdata)
+{
+  gint ret = GPOINTER_TO_INT (userdata);
+
+  if (ret == 1)
+    return TRUE;
+  return FALSE;
+}
+
+GST_START_TEST (test_push_unlinked)
+{
+  GstPad *src;
+  GstCaps *caps;
+  GstBuffer *buffer;
+  gulong id;
+
+  src = gst_pad_new ("src", GST_PAD_SRC);
+  fail_if (src == NULL);
+  caps = gst_pad_get_allowed_caps (src);
+  fail_unless (caps == NULL);
+
+  caps = gst_caps_from_string ("foo/bar");
+
+  gst_pad_set_caps (src, caps);
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
+
+  /* pushing on an unlinked pad will drop the buffer */
+  buffer = gst_buffer_new ();
+  gst_buffer_ref (buffer);
+  fail_unless (gst_pad_push (src, buffer) == GST_FLOW_NOT_LINKED);
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+  gst_buffer_unref (buffer);
+
+  /* adding a probe that returns FALSE will drop the buffer without trying
+   * to chain */
+  id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
+      GINT_TO_POINTER (0));
+  buffer = gst_buffer_new ();
+  gst_buffer_ref (buffer);
+  fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+  gst_buffer_unref (buffer);
+  gst_pad_remove_buffer_probe (src, id);
+
+  /* adding a probe that returns TRUE will still chain the buffer,
+   * and hence drop because pad is unlinked */
+  id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
+      GINT_TO_POINTER (1));
+  buffer = gst_buffer_new ();
+  gst_buffer_ref (buffer);
+  fail_unless (gst_pad_push (src, buffer) == GST_FLOW_NOT_LINKED);
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+  gst_buffer_unref (buffer);
+  gst_pad_remove_buffer_probe (src, id);
+
+
+  /* cleanup */
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
+  ASSERT_OBJECT_REFCOUNT (src, "src", 1);
+
+  gst_object_unref (src);
+
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+  gst_caps_unref (caps);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_push_linked)
+{
+  GstPad *src, *sink;
+  GstPadLinkReturn plr;
+  GstCaps *caps;
+  GstBuffer *buffer;
+  ulong id;
+
+  /* setup */
+  sink = gst_pad_new ("sink", GST_PAD_SINK);
+  fail_if (sink == NULL);
+  gst_pad_set_chain_function (sink, gst_check_chain_func);
+
+  src = gst_pad_new ("src", GST_PAD_SRC);
+  fail_if (src == NULL);
+
+  caps = gst_caps_from_string ("foo/bar");
+  /* one for me */
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+
+  gst_pad_set_caps (src, caps);
+  gst_pad_set_caps (sink, caps);
+  /* one for me and one for each set_caps */
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
+
+  plr = gst_pad_link (src, sink);
+  fail_unless (GST_PAD_LINK_SUCCESSFUL (plr));
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
+
+  /* test */
+  /* pushing on a linked pad will drop the ref to the buffer */
+  buffer = gst_buffer_new ();
+  gst_buffer_ref (buffer);
+  fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 2);
+  gst_buffer_unref (buffer);
+  fail_unless_equals_int (g_list_length (buffers), 1);
+  buffer = GST_BUFFER (buffers->data);
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+  g_list_free (buffers);
+  buffers = NULL;
+
+  /* adding a probe that returns FALSE will drop the buffer without trying
+   * to chain */
+  id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
+      GINT_TO_POINTER (0));
+  buffer = gst_buffer_new ();
+  gst_buffer_ref (buffer);
+  fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+  gst_buffer_unref (buffer);
+  gst_pad_remove_buffer_probe (src, id);
+  fail_unless_equals_int (g_list_length (buffers), 0);
+
+  /* adding a probe that returns TRUE will still chain the buffer */
+  id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
+      GINT_TO_POINTER (1));
+  buffer = gst_buffer_new ();
+  gst_buffer_ref (buffer);
+  fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
+  gst_pad_remove_buffer_probe (src, id);
+
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 2);
+  gst_buffer_unref (buffer);
+  fail_unless_equals_int (g_list_length (buffers), 1);
+  buffer = GST_BUFFER (buffers->data);
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+  g_list_free (buffers);
+  buffers = NULL;
+
+  /* teardown */
+  gst_pad_unlink (src, sink);
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
+  gst_object_unref (src);
+  gst_object_unref (sink);
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+
+  gst_caps_unref (caps);
+}
+
+GST_END_TEST;
+
 
 Suite *
 gst_pad_suite (void)
@@ -243,6 +394,8 @@ gst_pad_suite (void)
   tcase_add_test (tc_chain, test_get_allowed_caps);
   tcase_add_test (tc_chain, test_link_unlink_threaded);
   tcase_add_test (tc_chain, test_name_is_valid);
+  tcase_add_test (tc_chain, test_push_unlinked);
+  tcase_add_test (tc_chain, test_push_linked);
   return s;
 }
 
index e7b6b6d..f50cb8b 100644 (file)
@@ -766,13 +766,16 @@ gst_pad_is_active (GstPad * pad)
 /**
  * gst_pad_set_blocked_async:
  * @pad: the #GstPad to block or unblock
- * @blocked: boolean indicating we should block or unblock
+ * @blocked: boolean indicating whether the pad should be blocked or unblocked
  * @callback: #GstPadBlockCallback that will be called when the
- *            operation succeeds.
+ *            operation succeeds
  * @user_data: user data passed to the callback
  *
  * Blocks or unblocks the dataflow on a pad. The provided callback
- * is called when the operation succeeds. This can take a while as
+ * is called when the operation succeeds; this happens right before the next
+ * attempt at pushing a buffer on the pad.
+ *
+ * This can take a while as
  * the pad can only become blocked when real dataflow is happening.
  * When the pipeline is stalled, for example in PAUSED, this can
  * take an indeterminate amount of time.
@@ -781,7 +784,7 @@ gst_pad_is_active (GstPad * pad)
  * reasons stated above.
  *
  * Returns: TRUE if the pad could be blocked. This function can fail
- *   if wrong parameters were passed or the pad was already in the 
+ *   if wrong parameters were passed or the pad was already in the
  *   requested state.
  *
  * MT safe.
@@ -3075,7 +3078,8 @@ no_function:
  * @pad: a source #GstPad.
  * @buffer: the #GstBuffer to push.
  *
- * Pushes a buffer to the peer of @pad. @pad must be linked.
+ * Pushes a buffer to the peer of @pad.
+ * buffer probes will be triggered before the buffer gets pushed.
  *
  * Returns: a #GstFlowReturn from the peer pad.
  *
@@ -3087,6 +3091,7 @@ gst_pad_push (GstPad * pad, GstBuffer * buffer)
   GstPad *peer;
   GstFlowReturn ret;
   gboolean emit_signal;
+  gboolean signal_ret = TRUE;
 
   g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR);
   g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SRC, GST_FLOW_ERROR);
@@ -3094,24 +3099,36 @@ gst_pad_push (GstPad * pad, GstBuffer * buffer)
   g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
 
   GST_LOCK (pad);
+
+  /* FIXME: this check can go away; pad_set_blocked could be implemented with
+   * probes completely */
   while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
     handle_pad_block (pad);
 
-  if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
-    goto not_linked;
-
-  /* we emit signals on the pad areg, the peer will have a chance to
+  /* we emit signals on the pad arg, the peer will have a chance to
    * emit in the _chain() function */
   emit_signal = GST_PAD_DO_BUFFER_SIGNALS (pad) > 0;
-
-  gst_object_ref (peer);
   GST_UNLOCK (pad);
 
   if (G_UNLIKELY (emit_signal)) {
-    if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (buffer)))
-      goto dropping;
+    signal_ret = gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (buffer));
   }
 
+  /* if the signal handler returned FALSE, it means we should just drop the
+   * buffer */
+  if (signal_ret == FALSE) {
+    gst_buffer_unref (buffer);
+    GST_DEBUG_OBJECT (pad, "Dropping buffer due to FALSE probe return");
+    return GST_FLOW_OK;
+  }
+
+  GST_LOCK (pad);
+
+  if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
+    goto not_linked;
+  gst_object_ref (peer);
+  GST_UNLOCK (pad);
+
   ret = gst_pad_chain (peer, buffer);
 
   gst_object_unref (peer);
@@ -3127,13 +3144,6 @@ not_linked:
     GST_UNLOCK (pad);
     return GST_FLOW_NOT_LINKED;
   }
-dropping:
-  {
-    gst_buffer_unref (buffer);
-    gst_object_unref (peer);
-    GST_DEBUG ("Dropping buffer due to FALSE probe return");
-    return GST_FLOW_OK;
-  }
 }
 
 /**
@@ -3374,25 +3384,31 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
   GstPad *peerpad;
   gboolean result;
   gboolean emit_signal;
+  gboolean signal_ret = TRUE;
 
   g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
   g_return_val_if_fail (event != NULL, FALSE);
 
+  emit_signal = GST_PAD_DO_EVENT_SIGNALS (pad) > 0;
+
+  if (G_UNLIKELY (emit_signal)) {
+    signal_ret = gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (event));
+  }
+
+  if (signal_ret == FALSE) {
+    GST_DEBUG_OBJECT (pad, "Dropping event after FALSE probe return");
+    gst_event_unref (event);
+    return FALSE;
+  }
+
   GST_LOCK (pad);
   peerpad = GST_PAD_PEER (pad);
   if (peerpad == NULL)
     goto not_linked;
 
-  emit_signal = GST_PAD_DO_EVENT_SIGNALS (pad) > 0;
-
   gst_object_ref (peerpad);
   GST_UNLOCK (pad);
 
-  if (G_UNLIKELY (emit_signal)) {
-    if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (event)))
-      goto dropping;
-  }
-
   result = gst_pad_send_event (peerpad, event);
 
   gst_object_unref (peerpad);
@@ -3406,13 +3422,6 @@ not_linked:
     GST_UNLOCK (pad);
     return FALSE;
   }
-dropping:
-  {
-    GST_DEBUG ("Dropping event after FALSE probe return");
-    gst_object_unref (peerpad);
-    gst_event_unref (event);
-    return FALSE;
-  }
 }
 
 /**
index 4c172a0..5f14d49 100644 (file)
@@ -227,6 +227,157 @@ GST_START_TEST (test_name_is_valid)
 
 GST_END_TEST;
 
+static gboolean
+_probe_handler (GstPad * pad, GstBuffer * buffer, gpointer userdata)
+{
+  gint ret = GPOINTER_TO_INT (userdata);
+
+  if (ret == 1)
+    return TRUE;
+  return FALSE;
+}
+
+GST_START_TEST (test_push_unlinked)
+{
+  GstPad *src;
+  GstCaps *caps;
+  GstBuffer *buffer;
+  gulong id;
+
+  src = gst_pad_new ("src", GST_PAD_SRC);
+  fail_if (src == NULL);
+  caps = gst_pad_get_allowed_caps (src);
+  fail_unless (caps == NULL);
+
+  caps = gst_caps_from_string ("foo/bar");
+
+  gst_pad_set_caps (src, caps);
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
+
+  /* pushing on an unlinked pad will drop the buffer */
+  buffer = gst_buffer_new ();
+  gst_buffer_ref (buffer);
+  fail_unless (gst_pad_push (src, buffer) == GST_FLOW_NOT_LINKED);
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+  gst_buffer_unref (buffer);
+
+  /* adding a probe that returns FALSE will drop the buffer without trying
+   * to chain */
+  id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
+      GINT_TO_POINTER (0));
+  buffer = gst_buffer_new ();
+  gst_buffer_ref (buffer);
+  fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+  gst_buffer_unref (buffer);
+  gst_pad_remove_buffer_probe (src, id);
+
+  /* adding a probe that returns TRUE will still chain the buffer,
+   * and hence drop because pad is unlinked */
+  id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
+      GINT_TO_POINTER (1));
+  buffer = gst_buffer_new ();
+  gst_buffer_ref (buffer);
+  fail_unless (gst_pad_push (src, buffer) == GST_FLOW_NOT_LINKED);
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+  gst_buffer_unref (buffer);
+  gst_pad_remove_buffer_probe (src, id);
+
+
+  /* cleanup */
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
+  ASSERT_OBJECT_REFCOUNT (src, "src", 1);
+
+  gst_object_unref (src);
+
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+  gst_caps_unref (caps);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_push_linked)
+{
+  GstPad *src, *sink;
+  GstPadLinkReturn plr;
+  GstCaps *caps;
+  GstBuffer *buffer;
+  ulong id;
+
+  /* setup */
+  sink = gst_pad_new ("sink", GST_PAD_SINK);
+  fail_if (sink == NULL);
+  gst_pad_set_chain_function (sink, gst_check_chain_func);
+
+  src = gst_pad_new ("src", GST_PAD_SRC);
+  fail_if (src == NULL);
+
+  caps = gst_caps_from_string ("foo/bar");
+  /* one for me */
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+
+  gst_pad_set_caps (src, caps);
+  gst_pad_set_caps (sink, caps);
+  /* one for me and one for each set_caps */
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
+
+  plr = gst_pad_link (src, sink);
+  fail_unless (GST_PAD_LINK_SUCCESSFUL (plr));
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
+
+  /* test */
+  /* pushing on a linked pad will drop the ref to the buffer */
+  buffer = gst_buffer_new ();
+  gst_buffer_ref (buffer);
+  fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 2);
+  gst_buffer_unref (buffer);
+  fail_unless_equals_int (g_list_length (buffers), 1);
+  buffer = GST_BUFFER (buffers->data);
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+  g_list_free (buffers);
+  buffers = NULL;
+
+  /* adding a probe that returns FALSE will drop the buffer without trying
+   * to chain */
+  id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
+      GINT_TO_POINTER (0));
+  buffer = gst_buffer_new ();
+  gst_buffer_ref (buffer);
+  fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+  gst_buffer_unref (buffer);
+  gst_pad_remove_buffer_probe (src, id);
+  fail_unless_equals_int (g_list_length (buffers), 0);
+
+  /* adding a probe that returns TRUE will still chain the buffer */
+  id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
+      GINT_TO_POINTER (1));
+  buffer = gst_buffer_new ();
+  gst_buffer_ref (buffer);
+  fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
+  gst_pad_remove_buffer_probe (src, id);
+
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 2);
+  gst_buffer_unref (buffer);
+  fail_unless_equals_int (g_list_length (buffers), 1);
+  buffer = GST_BUFFER (buffers->data);
+  ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+  g_list_free (buffers);
+  buffers = NULL;
+
+  /* teardown */
+  gst_pad_unlink (src, sink);
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
+  gst_object_unref (src);
+  gst_object_unref (sink);
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+
+  gst_caps_unref (caps);
+}
+
+GST_END_TEST;
+
 
 Suite *
 gst_pad_suite (void)
@@ -243,6 +394,8 @@ gst_pad_suite (void)
   tcase_add_test (tc_chain, test_get_allowed_caps);
   tcase_add_test (tc_chain, test_link_unlink_threaded);
   tcase_add_test (tc_chain, test_name_is_valid);
+  tcase_add_test (tc_chain, test_push_unlinked);
+  tcase_add_test (tc_chain, test_push_linked);
   return s;
 }