GstPadProbeInfo *info;
gboolean dropped;
gboolean pass;
+ gboolean handled;
gboolean marshalled;
guint cookie;
} ProbeMarshall;
case GST_PAD_PROBE_OK:
GST_DEBUG_OBJECT (pad, "probe returned OK");
break;
+ case GST_PAD_PROBE_HANDLED:
+ GST_DEBUG_OBJECT (pad, "probe handled the data");
+ break;
default:
GST_DEBUG_OBJECT (pad, "probe returned %d", ret);
break;
info->type = GST_PAD_PROBE_TYPE_INVALID;
data->dropped = TRUE;
break;
+ case GST_PAD_PROBE_HANDLED:
+ GST_DEBUG_OBJECT (pad, "probe handled data");
+ data->handled = TRUE;
+ break;
case GST_PAD_PROBE_PASS:
/* inform the pad block to let things pass */
GST_DEBUG_OBJECT (pad, "asked to pass item");
if (G_UNLIKELY (pad->num_probes)) { \
GstFlowReturn pval = defaultval; \
/* pass NULL as the data item */ \
- GstPadProbeInfo info = { mask, 0, NULL, 0, 0 }; \
+ GstPadProbeInfo info = { mask, 0, NULL, 0, 0 }; \
+ info.ABI.abi.flow_ret = defaultval; \
ret = do_probe_callbacks (pad, &info, defaultval); \
if (G_UNLIKELY (ret != pval && ret != GST_FLOW_OK)) \
goto label; \
} \
} G_STMT_END
-#define PROBE_FULL(pad,mask,data,offs,size,label) \
- G_STMT_START { \
- if (G_UNLIKELY (pad->num_probes)) { \
- /* pass the data item */ \
- GstPadProbeInfo info = { mask, 0, data, offs, size }; \
- ret = do_probe_callbacks (pad, &info, GST_FLOW_OK); \
- /* store the possibly updated data item */ \
- data = GST_PAD_PROBE_INFO_DATA (&info); \
- /* if something went wrong, exit */ \
- if (G_UNLIKELY (ret != GST_FLOW_OK)) \
- goto label; \
- } \
+#define PROBE_FULL(pad,mask,data,offs,size,label,handleable,handle_label) \
+ G_STMT_START { \
+ if (G_UNLIKELY (pad->num_probes)) { \
+ /* pass the data item */ \
+ GstPadProbeInfo info = { mask, 0, data, offs, size }; \
+ info.ABI.abi.flow_ret = GST_FLOW_OK; \
+ ret = do_probe_callbacks (pad, &info, GST_FLOW_OK); \
+ /* store the possibly updated data item */ \
+ data = GST_PAD_PROBE_INFO_DATA (&info); \
+ /* if something went wrong, exit */ \
+ if (G_UNLIKELY (ret != GST_FLOW_OK)) { \
+ if (handleable && ret == GST_FLOW_CUSTOM_SUCCESS_1) { \
+ ret = info.ABI.abi.flow_ret; \
+ goto handle_label; \
+ } \
+ goto label; \
+ } \
+ } \
} G_STMT_END
-#define PROBE_PUSH(pad,mask,data,label) \
- PROBE_FULL(pad, mask, data, -1, -1, label);
-#define PROBE_PULL(pad,mask,data,offs,size,label) \
- PROBE_FULL(pad, mask, data, offs, size, label);
+#define PROBE_PUSH(pad,mask,data,label) \
+ PROBE_FULL(pad, mask, data, -1, -1, label, FALSE, label);
+#define PROBE_HANDLE(pad,mask,data,label,handle_label) \
+ PROBE_FULL(pad, mask, data, -1, -1, label, TRUE, handle_label);
+#define PROBE_PULL(pad,mask,data,offs,size,label) \
+ PROBE_FULL(pad, mask, data, offs, size, label, FALSE, label);
static GstFlowReturn
do_pad_idle_probe_wait (GstPad * pad)
data.pad = pad;
data.info = info;
data.pass = FALSE;
+ data.handled = FALSE;
data.marshalled = FALSE;
data.dropped = FALSE;
data.cookie = ++pad->priv->probe_cookie;
if (data.dropped)
goto dropped;
+ /* If one handler took care of it, let the the item pass */
+ if (data.handled) {
+ goto handled;
+ }
+
/* if no handler matched and we are blocking, let the item pass */
if (!data.marshalled && is_block)
goto passed;
GST_DEBUG_OBJECT (pad, "data is passed");
return GST_FLOW_OK;
}
+handled:
+ {
+ GST_DEBUG_OBJECT (pad, "data was handled");
+ return GST_FLOW_CUSTOM_SUCCESS_1;
+ }
}
/* pad offsets */
if (G_UNLIKELY (serialized))
GST_PAD_STREAM_UNLOCK (pad);
- /* if a probe dropped, we don't sent it further but assume that the probe
- * did not answer the query and return FALSE */
- res = FALSE;
+ /* if a probe dropped without handling, we don't sent it further but assume
+ * that the probe did not answer the query and return FALSE */
+ if (ret != GST_FLOW_CUSTOM_SUCCESS_1)
+ res = FALSE;
+ else
+ res = TRUE;
return res;
}
GST_DEBUG_OBJECT (pad, "probe stopped: %s", gst_flow_get_name (ret));
GST_OBJECT_UNLOCK (pad);
- /* if a probe dropped, we don't sent it further but assume that the probe
- * did not answer the query and return FALSE */
- res = FALSE;
+ /* if a probe dropped without handling, we don't sent it further but
+ * assume that the probe did not answer the query and return FALSE */
+ if (ret != GST_FLOW_CUSTOM_SUCCESS_1)
+ res = FALSE;
+ else
+ res = TRUE;
return res;
}
{
GstFlowReturn ret;
GstObject *parent;
+ gboolean handled = FALSE;
GST_PAD_STREAM_LOCK (pad);
}
#endif
- PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped);
+ PROBE_HANDLE (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped,
+ probe_handled);
- PROBE_PUSH (pad, type, data, probe_stopped);
+ PROBE_HANDLE (pad, type, data, probe_stopped, probe_handled);
ACQUIRE_PARENT (pad, parent, no_parent);
GST_OBJECT_UNLOCK (pad);
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
return GST_FLOW_ERROR;
}
+probe_handled:
+ handled = TRUE;
+ /* PASSTHROUGH */
probe_stopped:
{
GST_OBJECT_UNLOCK (pad);
GST_PAD_STREAM_UNLOCK (pad);
- gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
+ /* We unref the buffer, except if the probe handled it (CUSTOM_SUCCESS_1) */
+ if (!handled)
+ gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
switch (ret) {
case GST_FLOW_CUSTOM_SUCCESS:
- GST_DEBUG_OBJECT (pad, "dropped buffer");
+ case GST_FLOW_CUSTOM_SUCCESS_1:
+ GST_DEBUG_OBJECT (pad, "dropped or handled buffer");
ret = GST_FLOW_OK;
break;
default:
{
GstPad *peer;
GstFlowReturn ret;
+ gboolean handled = FALSE;
GST_OBJECT_LOCK (pad);
if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
goto events_error;
/* do block probes */
- PROBE_PUSH (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped);
+ PROBE_HANDLE (pad, type | GST_PAD_PROBE_TYPE_BLOCK, data, probe_stopped,
+ probe_handled);
/* recheck sticky events because the probe might have cause a relink */
if (G_UNLIKELY ((ret = check_sticky (pad, NULL))) != GST_FLOW_OK)
goto events_error;
/* do post-blocking probes */
- PROBE_PUSH (pad, type, data, probe_stopped);
+ PROBE_HANDLE (pad, type, data, probe_stopped, probe_handled);
if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
goto not_linked;
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
return ret;
}
+probe_handled:
+ handled = TRUE;
+ /* PASSTHROUGH */
probe_stopped:
{
GST_OBJECT_UNLOCK (pad);
- pad->ABI.abi.last_flowret =
- ret == GST_FLOW_CUSTOM_SUCCESS ? GST_FLOW_OK : ret;
- if (data != NULL)
+ if (data != NULL && !handled)
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
switch (ret) {
case GST_FLOW_CUSTOM_SUCCESS:
- GST_DEBUG_OBJECT (pad, "dropped buffer");
+ case GST_FLOW_CUSTOM_SUCCESS_1:
+ GST_DEBUG_OBJECT (pad, "dropped or handled buffer");
ret = GST_FLOW_OK;
break;
default:
GST_DEBUG_OBJECT (pad, "an error occurred %s", gst_flow_get_name (ret));
break;
}
+ pad->ABI.abi.last_flowret = ret;
return ret;
}
not_linked:
probe_stopped:
{
GST_OBJECT_FLAG_SET (pad, GST_PAD_FLAG_PENDING_EVENTS);
- gst_event_unref (event);
+ if (ret != GST_FLOW_CUSTOM_SUCCESS_1)
+ gst_event_unref (event);
switch (ret) {
+ case GST_FLOW_CUSTOM_SUCCESS_1:
+ GST_DEBUG_OBJECT (pad, "handled event");
+ break;
case GST_FLOW_CUSTOM_SUCCESS:
GST_DEBUG_OBJECT (pad, "dropped event");
break;
/* other events are pushed right away */
ret = gst_pad_push_event_unchecked (pad, event, type);
/* dropped events by a probe are not an error */
- res = (ret == GST_FLOW_OK || ret == GST_FLOW_CUSTOM_SUCCESS);
+ res = (ret == GST_FLOW_OK || ret == GST_FLOW_CUSTOM_SUCCESS
+ || ret == GST_FLOW_CUSTOM_SUCCESS_1);
} else {
/* Errors in sticky event pushing are no problem and ignored here
* as they will cause more meaningful errors during data flow.
GST_OBJECT_UNLOCK (pad);
if (need_unlock)
GST_PAD_STREAM_UNLOCK (pad);
- gst_event_unref (event);
+ /* Only unref if unhandled */
+ if (ret != GST_FLOW_CUSTOM_SUCCESS_1)
+ gst_event_unref (event);
switch (ret) {
+ case GST_FLOW_CUSTOM_SUCCESS_1:
case GST_FLOW_CUSTOM_SUCCESS:
- GST_DEBUG_OBJECT (pad, "dropped event");
+ GST_DEBUG_OBJECT (pad, "dropped or handled event");
ret = GST_FLOW_OK;
break;
default:
static GstPadProbeReturn
_probe_handler (GstPad * pad, GstPadProbeInfo * info, gpointer userdata)
{
- gint ret = GPOINTER_TO_INT (userdata);
+ GstPadProbeReturn ret = (GstPadProbeReturn) GPOINTER_TO_INT (userdata);
- if (ret == 1)
- return GST_PAD_PROBE_OK;
+ /* If we are handling the data, we unref it */
+ if (ret == GST_PAD_PROBE_HANDLED
+ && !(GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_QUERY_BOTH)) {
+ GST_DEBUG_OBJECT (pad, "Unreffing data");
+ gst_mini_object_unref (info->data);
+ }
+ return ret;
+}
+
+static GstPadProbeReturn
+_handled_probe_handler (GstPad * pad, GstPadProbeInfo * info, gpointer userdata)
+{
+ GstFlowReturn customflow = (GstFlowReturn) GPOINTER_TO_INT (userdata);
+
+ /* We are handling the data, we unref it */
+ if (!(GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_QUERY_BOTH))
+ gst_mini_object_unref (info->data);
+ GST_PAD_PROBE_INFO_FLOW_RETURN (info) = customflow;
+
+ return GST_PAD_PROBE_HANDLED;
+}
+
+
+
+GST_START_TEST (test_events_query_unlinked)
+{
+ GstPad *src;
+ GstCaps *caps;
+ gulong id;
+ GstQuery *query;
+
+ src = gst_pad_new ("src", GST_PAD_SRC);
+ fail_if (src == NULL);
+ caps = gst_pad_get_allowed_caps (src);
+ fail_unless (caps == NULL);
+
+ caps = gst_caps_from_string ("foo/bar");
+
+ gst_pad_set_active (src, TRUE);
+ fail_unless (gst_pad_push_event (src,
+ gst_event_new_stream_start ("test")) == TRUE);
+ gst_pad_set_caps (src, caps);
+ ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
+ fail_unless (gst_pad_push_event (src,
+ gst_event_new_segment (&dummy_segment)) == TRUE);
+ ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
+
+ /* Doing a query on an unlinked pad will return FALSE */
+ query = gst_query_new_duration (GST_FORMAT_TIME);
+ fail_unless (gst_pad_peer_query (src, query) == FALSE);
+ ASSERT_MINI_OBJECT_REFCOUNT (query, "query", 1);
+ gst_query_unref (query);
+
+ /* Add a probe that returns _DROP will make the event push return TRUE
+ * even if not linked */
+ GST_DEBUG ("event/query DROP");
+ id = gst_pad_add_probe (src,
+ GST_PAD_PROBE_TYPE_EVENT_BOTH | GST_PAD_PROBE_TYPE_QUERY_BOTH,
+ _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_DROP), NULL);
+ fail_unless (gst_pad_push_event (src,
+ gst_event_new_segment (&dummy_segment)) == TRUE);
+ /* Queries should stil fail */
+ query = gst_query_new_duration (GST_FORMAT_TIME);
+ fail_unless (gst_pad_peer_query (src, query) == FALSE);
+ ASSERT_MINI_OBJECT_REFCOUNT (query, "query", 1);
+ gst_query_unref (query);
+ gst_pad_remove_probe (src, id);
+
+ /* Add a probe that returns _HANDLED will make the event push return TRUE
+ * even if not linked */
+ GST_DEBUG ("event/query HANDLED");
+ id = gst_pad_add_probe (src,
+ GST_PAD_PROBE_TYPE_EVENT_BOTH | GST_PAD_PROBE_TYPE_QUERY_BOTH,
+ _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_HANDLED), NULL);
+ fail_unless (gst_pad_push_event (src,
+ gst_event_new_segment (&dummy_segment)) == TRUE);
- return GST_PAD_PROBE_DROP;
+ /* Queries will succeed */
+ query = gst_query_new_duration (GST_FORMAT_TIME);
+ fail_unless (gst_pad_peer_query (src, query) == TRUE);
+ ASSERT_MINI_OBJECT_REFCOUNT (query, "query", 1);
+ gst_query_unref (query);
+ gst_pad_remove_probe (src, id);
+
+ /* cleanup */
+ ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
+ ASSERT_OBJECT_REFCOUNT (src, "src", 1);
+
+ gst_object_unref (src);
+
+ ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+ gst_caps_unref (caps);
}
+GST_END_TEST;
+
GST_START_TEST (test_push_unlinked)
{
GstPad *src;
GstCaps *caps;
GstBuffer *buffer;
gulong id;
+ GstFlowReturn fl;
src = gst_pad_new ("src", GST_PAD_SRC);
fail_if (src == NULL);
* to chain */
GST_DEBUG ("push buffer drop");
id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER,
- _probe_handler, GINT_TO_POINTER (0), NULL);
+ _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_DROP), NULL);
+ buffer = gst_buffer_new ();
+ gst_buffer_ref (buffer);
+ fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
+ ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+ gst_buffer_unref (buffer);
+ gst_pad_remove_probe (src, id);
+
+ /* adding a probe that returns _HANDLED will drop the buffer without trying
+ * to chain */
+ GST_DEBUG ("push buffer handled");
+ id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER,
+ _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_HANDLED), NULL);
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
* and hence drop because pad is unlinked */
GST_DEBUG ("push buffer ok");
id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER,
- _probe_handler, GINT_TO_POINTER (1), NULL);
+ _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_OK), NULL);
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_NOT_LINKED);
gst_buffer_unref (buffer);
gst_pad_remove_probe (src, id);
+ GST_DEBUG ("push buffer handled and custom return");
+ for (fl = GST_FLOW_NOT_SUPPORTED; fl <= GST_FLOW_OK; fl += 1) {
+ GST_DEBUG ("Testing with %s", gst_flow_get_name (fl));
+ id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER,
+ _handled_probe_handler, GINT_TO_POINTER (fl), NULL);
+ buffer = gst_buffer_new ();
+ gst_buffer_ref (buffer);
+ fail_unless (gst_pad_push (src, buffer) == fl);
+ ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+ gst_buffer_unref (buffer);
+ gst_pad_remove_probe (src, id);
+
+ }
+
/* cleanup */
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
g_list_free (buffers);
buffers = NULL;
- /* adding a probe that returns FALSE will drop the buffer without trying
+ /* adding a probe that returns _DROP will drop the buffer without trying
* to chain */
id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER,
- _probe_handler, GINT_TO_POINTER (0), NULL);
+ _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_DROP), NULL);
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
gst_pad_remove_probe (src, id);
fail_unless_equals_int (g_list_length (buffers), 0);
- /* adding a probe that returns TRUE will still chain the buffer */
+ /* adding a probe that returns _OK will still chain the buffer */
id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER,
- _probe_handler, GINT_TO_POINTER (1), NULL);
+ _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_OK), NULL);
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
g_list_free (buffers);
buffers = NULL;
+ /* adding a probe that returns _HANDLED will not chain the buffer */
+ id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER,
+ _probe_handler, GINT_TO_POINTER (GST_PAD_PROBE_HANDLED), NULL);
+ buffer = gst_buffer_new ();
+ gst_buffer_ref (buffer);
+ fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
+ gst_pad_remove_probe (src, id);
+
+ ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
+ gst_buffer_unref (buffer);
+ fail_unless_equals_int (g_list_length (buffers), 0);
+ g_list_free (buffers);
+ buffers = NULL;
+
/* teardown */
gst_pad_unlink (src, sink);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
gst_pad_set_caps (src, caps);
fail_unless (gst_pad_push_event (src,
gst_event_new_segment (&dummy_segment)) == TRUE);
- /* adding a probe that returns FALSE will drop the buffer without trying
+ /* adding a probe that returns _DROP will drop the buffer without trying
* to chain */
id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER, _probe_handler,
- GINT_TO_POINTER (0), NULL);
+ GINT_TO_POINTER (GST_PAD_PROBE_DROP), NULL);
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_FLUSHING);
gst_buffer_unref (buffer);
gst_pad_remove_probe (src, id);
- /* adding a probe that returns TRUE will still chain the buffer,
+ /* adding a probe that returns _OK will still chain the buffer,
* and hence drop because pad is flushing */
id = gst_pad_add_probe (src, GST_PAD_PROBE_TYPE_BUFFER, _probe_handler,
- GINT_TO_POINTER (1), NULL);
+ GINT_TO_POINTER (GST_PAD_PROBE_OK), NULL);
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_FLUSHING);
tcase_add_test (tc_chain, test_pad_probe_block_add_remove);
tcase_add_test (tc_chain, test_pad_probe_block_and_drop_buffer);
tcase_add_test (tc_chain, test_pad_probe_flush_events);
+ tcase_add_test (tc_chain, test_events_query_unlinked);
tcase_add_test (tc_chain, test_queue_src_caps_notify_linked);
tcase_add_test (tc_chain, test_queue_src_caps_notify_not_linked);
#if 0