"Erik Walthinsen <omega@cse.ogi.edu>, "
"Wim \"Tim\" Taymans <wim@fluendo.com>");
+#define GST_TYPE_TEE_PULL_MODE (gst_tee_pull_mode_get_type())
+static GType
+gst_tee_pull_mode_get_type (void)
+{
+ static GType type = 0;
+ static const GEnumValue data[] = {
+ {GST_TEE_PULL_MODE_NEVER, "Never activate in pull mode", "never"},
+ {GST_TEE_PULL_MODE_SINGLE, "Only one src pad can be active in pull mode",
+ "single"},
+ {0, NULL, NULL},
+ };
+
+ if (!type) {
+ type = g_enum_register_static ("GstTeePullMode", data);
+ }
+ return type;
+}
+
#define DEFAULT_PROP_NUM_SRC_PADS 0
#define DEFAULT_PROP_HAS_SINK_LOOP FALSE
#define DEFAULT_PROP_HAS_CHAIN TRUE
#define DEFAULT_PROP_SILENT TRUE
#define DEFAULT_PROP_LAST_MESSAGE NULL
+#define DEFAULT_PULL_MODE GST_TEE_PULL_MODE_NEVER
enum
{
PROP_HAS_SINK_LOOP,
PROP_HAS_CHAIN,
PROP_SILENT,
- PROP_LAST_MESSAGE
- /* FILL ME */
+ PROP_LAST_MESSAGE,
+ PROP_PULL_MODE,
};
GstStaticPadTemplate tee_src_template = GST_STATIC_PAD_TEMPLATE ("src%d",
static GstFlowReturn gst_tee_chain (GstPad * pad, GstBuffer * buffer);
static GstFlowReturn gst_tee_buffer_alloc (GstPad * pad, guint64 offset,
guint size, GstCaps * caps, GstBuffer ** buf);
-static void gst_tee_loop (GstPad * pad);
static gboolean gst_tee_sink_activate_push (GstPad * pad, gboolean active);
-static gboolean gst_tee_sink_activate_pull (GstPad * pad, gboolean active);
+static gboolean gst_tee_src_check_get_range (GstPad * pad);
+static gboolean gst_tee_src_activate_pull (GstPad * pad, gboolean active);
+static GstFlowReturn gst_tee_src_get_range (GstPad * pad, guint64 offset,
+ guint length, GstBuffer ** buf);
static void
G_PARAM_READABLE));
g_object_class_install_property (gobject_class, PROP_HAS_SINK_LOOP,
g_param_spec_boolean ("has-sink-loop", "Has Sink Loop",
- "If the element can operate in pull mode (unimplemented)",
+ "If the element should spawn a thread (unimplemented and deprecated)",
DEFAULT_PROP_HAS_SINK_LOOP, G_PARAM_CONSTRUCT | G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_HAS_CHAIN,
g_param_spec_boolean ("has-chain", "Has Chain",
- "If the element can operate in push mode", DEFAULT_PROP_HAS_CHAIN,
- G_PARAM_CONSTRUCT | G_PARAM_READWRITE));
+ "If the element can operate in push mode",
+ DEFAULT_PROP_HAS_CHAIN, G_PARAM_CONSTRUCT | G_PARAM_READWRITE));
g_object_class_install_property (gobject_class, PROP_SILENT,
g_param_spec_boolean ("silent", "Silent",
"Don't produce last_message events", DEFAULT_PROP_SILENT,
g_param_spec_string ("last_message", "Last Message",
"The message describing current status", DEFAULT_PROP_LAST_MESSAGE,
G_PARAM_READABLE));
+ g_object_class_install_property (gobject_class, PROP_PULL_MODE,
+ g_param_spec_enum ("pull-mode", "Pull mode",
+ "Behavior of tee in pull mode", GST_TYPE_TEE_PULL_MODE,
+ DEFAULT_PULL_MODE, G_PARAM_CONSTRUCT | G_PARAM_READWRITE));
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_tee_request_new_pad);
gst_tee_init (GstTee * tee, GstTeeClass * g_class)
{
tee->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
+ tee->sink_mode = GST_ACTIVATE_NONE;
+
gst_pad_set_setcaps_function (tee->sinkpad,
GST_DEBUG_FUNCPTR (gst_pad_proxy_setcaps));
gst_pad_set_getcaps_function (tee->sinkpad,
GST_DEBUG_FUNCPTR (gst_pad_proxy_getcaps));
gst_pad_set_bufferalloc_function (tee->sinkpad,
GST_DEBUG_FUNCPTR (gst_tee_buffer_alloc));
+ gst_pad_set_activatepush_function (tee->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_tee_sink_activate_push));
+ gst_pad_set_chain_function (tee->sinkpad, GST_DEBUG_FUNCPTR (gst_tee_chain));
gst_element_add_pad (GST_ELEMENT (tee), tee->sinkpad);
tee->last_message = NULL;
}
-static void
-gst_tee_update_pad_functions (GstTee * tee)
-{
- gst_pad_set_activatepush_function (tee->sinkpad,
- GST_DEBUG_FUNCPTR (gst_tee_sink_activate_push));
- gst_pad_set_activatepull_function (tee->sinkpad,
- GST_DEBUG_FUNCPTR (gst_tee_sink_activate_pull));
-
- if (tee->has_chain)
- gst_pad_set_chain_function (tee->sinkpad,
- GST_DEBUG_FUNCPTR (gst_tee_chain));
- else
- gst_pad_set_chain_function (tee->sinkpad, NULL);
-}
-
static GstPad *
gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ,
const gchar * unused)
mode = tee->sink_mode;
GST_OBJECT_UNLOCK (tee);
- /* activate the pad in the right mode */
- if (mode == GST_ACTIVATE_PULL)
- res = gst_pad_activate_pull (srcpad, TRUE);
- if (mode == GST_ACTIVATE_PUSH)
- res = gst_pad_activate_push (srcpad, TRUE);
- else
- res = TRUE;
+ switch (mode) {
+ case GST_ACTIVATE_PULL:
+ /* we already have a src pad in pull mode, and our pull mode can only be
+ SINGLE, so fall through to activate this new pad in push mode */
+ case GST_ACTIVATE_PUSH:
+ res = gst_pad_activate_push (srcpad, TRUE);
+ break;
+ default:
+ res = TRUE;
+ break;
+ }
if (!res)
goto activate_failed;
GST_DEBUG_FUNCPTR (gst_pad_proxy_setcaps));
gst_pad_set_getcaps_function (srcpad,
GST_DEBUG_FUNCPTR (gst_pad_proxy_getcaps));
+ gst_pad_set_activatepull_function (srcpad,
+ GST_DEBUG_FUNCPTR (gst_tee_src_activate_pull));
+ gst_pad_set_checkgetrange_function (srcpad,
+ GST_DEBUG_FUNCPTR (gst_tee_src_check_get_range));
+ gst_pad_set_getrange_function (srcpad,
+ GST_DEBUG_FUNCPTR (gst_tee_src_get_range));
gst_element_add_pad (GST_ELEMENT_CAST (tee), srcpad);
return srcpad;
tee->allocpad = NULL;
GST_OBJECT_UNLOCK (tee);
- /* deactivate the pad before removing */
gst_pad_set_active (pad, FALSE);
gst_element_remove_pad (GST_ELEMENT_CAST (tee), pad);
switch (prop_id) {
case PROP_HAS_SINK_LOOP:
tee->has_sink_loop = g_value_get_boolean (value);
- gst_tee_update_pad_functions (tee);
+ if (tee->has_sink_loop) {
+ g_warning ("tee will never implement has-sink-loop==TRUE");
+ }
break;
case PROP_HAS_CHAIN:
tee->has_chain = g_value_get_boolean (value);
- gst_tee_update_pad_functions (tee);
break;
case PROP_SILENT:
tee->silent = g_value_get_boolean (value);
break;
+ case PROP_PULL_MODE:
+ tee->pull_mode = g_value_get_enum (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_LAST_MESSAGE:
g_value_set_string (value, tee->last_message);
break;
+ case PROP_PULL_MODE:
+ g_value_set_enum (value, tee->pull_mode);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
/* Push */
- res = gst_pad_push (pad, gst_buffer_ref (data->buffer));
- GST_LOG_OBJECT (tee, "Pushing buffer %p to %" GST_PTR_FORMAT
- " yielded result=%d", data->buffer, pad, res);
+ if (pad == data->tee->pull_pad) {
+ res = GST_FLOW_OK;
+ } else {
+ res = gst_pad_push (pad, gst_buffer_ref (data->buffer));
+ GST_LOG_OBJECT (tee, "Pushing buffer %p to %" GST_PTR_FORMAT
+ " yielded result=%d", data->buffer, pad, res);
+ }
/* If it's fatal or OK, or if ret is currently
* not-linked, we overwrite the previous value */
GstFlowReturn res;
GstTee *tee;
- tee = GST_TEE (GST_PAD_PARENT (pad));
-
- res = gst_tee_handle_buffer (tee, buffer);
-
- return res;
-}
-
-#define DEFAULT_SIZE 1024
-
-static void
-gst_tee_loop (GstPad * pad)
-{
- GstBuffer *buffer;
- GstFlowReturn res;
- GstTee *tee;
-
- tee = GST_TEE (GST_PAD_PARENT (pad));
-
- res = gst_pad_pull_range (pad, tee->offset, DEFAULT_SIZE, &buffer);
- if (res != GST_FLOW_OK)
- goto pause_task;
+ tee = GST_TEE (gst_pad_get_parent (pad));
res = gst_tee_handle_buffer (tee, buffer);
- if (res != GST_FLOW_OK)
- goto pause_task;
- return;
+ gst_object_unref (tee);
-pause_task:
- {
- gst_pad_pause_task (pad);
- return;
- }
+ return res;
}
static gboolean
no_chain:
{
GST_OBJECT_UNLOCK (tee);
- GST_ELEMENT_ERROR (tee, CORE, FAILED, (NULL),
- ("Tee cannot operate in push mode, set ::has-chain to TRUE."));
+ GST_INFO_OBJECT (tee,
+ "Tee cannot operate in push mode with has-chain==FALSE");
return FALSE;
}
}
-/* won't be called until we implement an activate function */
static gboolean
-gst_tee_sink_activate_pull (GstPad * pad, gboolean active)
+gst_tee_src_activate_pull (GstPad * pad, gboolean active)
{
GstTee *tee;
gboolean res;
+ GstPad *sinkpad;
- tee = GST_TEE (GST_OBJECT_PARENT (pad));
+ tee = GST_TEE (gst_pad_get_parent (pad));
GST_OBJECT_LOCK (tee);
- tee->sink_mode = active && GST_ACTIVATE_PULL;
+ if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER)
+ goto cannot_pull;
+
+ if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && active && tee->pull_pad)
+ goto cannot_pull_multiple_srcs;
+
+ sinkpad = gst_object_ref (tee->sinkpad);
+
+ GST_OBJECT_UNLOCK (tee);
+
+ res = gst_pad_activate_pull (sinkpad, active);
+ gst_object_unref (sinkpad);
+
+ if (!res)
+ goto sink_activate_failed;
+
+ GST_OBJECT_LOCK (tee);
if (active) {
- if (!tee->has_sink_loop)
- goto no_loop;
- GST_OBJECT_UNLOCK (tee);
- res = gst_pad_start_task (pad, (GstTaskFunction) gst_tee_loop, pad);
+ if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE)
+ tee->pull_pad = pad;
} else {
+ if (pad == tee->pull_pad)
+ tee->pull_pad = NULL;
+ }
+ tee->sink_mode = active && GST_ACTIVATE_PULL;
+ GST_OBJECT_UNLOCK (tee);
+
+ gst_object_unref (tee);
+
+ return res;
+
+ /* ERRORS */
+cannot_pull:
+ {
GST_OBJECT_UNLOCK (tee);
- res = gst_pad_stop_task (pad);
+ GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
+ "set to NEVER");
+ gst_object_unref (tee);
+ return FALSE;
}
+cannot_pull_multiple_srcs:
+ {
+ GST_OBJECT_UNLOCK (tee);
+ GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
+ "pull-mode set to SINGLE");
+ gst_object_unref (tee);
+ return FALSE;
+ }
+sink_activate_failed:
+ {
+ GST_INFO_OBJECT (tee, "Failed to %sactivate sink pad in pull mode",
+ active ? "" : "de");
+ gst_object_unref (tee);
+ return FALSE;
+ }
+}
+
+static gboolean
+gst_tee_src_check_get_range (GstPad * pad)
+{
+ GstTee *tee;
+ gboolean res;
+ GstPad *sinkpad;
+
+ tee = GST_TEE (gst_pad_get_parent (pad));
+
+ GST_OBJECT_LOCK (tee);
+
+ if (tee->pull_mode == GST_TEE_PULL_MODE_NEVER)
+ goto cannot_pull;
+
+ if (tee->pull_mode == GST_TEE_PULL_MODE_SINGLE && tee->pull_pad)
+ goto cannot_pull_multiple_srcs;
+
+ sinkpad = gst_object_ref (tee->sinkpad);
+
+ GST_OBJECT_UNLOCK (tee);
+
+ res = gst_pad_check_pull_range (sinkpad);
+ gst_object_unref (sinkpad);
+
+ gst_object_unref (tee);
+
return res;
/* ERRORS */
-no_loop:
+cannot_pull:
{
GST_OBJECT_UNLOCK (tee);
- GST_ELEMENT_ERROR (tee, CORE, FAILED, (NULL),
- ("Tee cannot operate in pull mode, set ::has-sink-loop to TRUE."));
+ GST_INFO_OBJECT (tee, "Cannot activate in pull mode, pull-mode "
+ "set to NEVER");
+ gst_object_unref (tee);
return FALSE;
}
+cannot_pull_multiple_srcs:
+ {
+ GST_OBJECT_UNLOCK (tee);
+ GST_INFO_OBJECT (tee, "Cannot activate multiple src pads in pull mode, "
+ "pull-mode set to SINGLE");
+ gst_object_unref (tee);
+ return FALSE;
+ }
+}
+
+static void
+gst_tee_push_eos (GstPad * pad, GstTee * tee)
+{
+ if (pad != tee->pull_pad)
+ gst_pad_push_event (pad, gst_event_new_eos ());
+ gst_object_unref (pad);
+}
+
+static void
+gst_tee_pull_eos (GstTee * tee)
+{
+ GstIterator *iter;
+
+ iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
+ gst_iterator_foreach (iter, (GFunc) gst_tee_push_eos, tee);
+ gst_iterator_free (iter);
+}
+
+static GstFlowReturn
+gst_tee_src_get_range (GstPad * pad, guint64 offset, guint length,
+ GstBuffer ** buf)
+{
+ GstTee *tee;
+ GstFlowReturn ret;
+
+ tee = GST_TEE (gst_pad_get_parent (pad));
+
+ ret = gst_pad_pull_range (tee->sinkpad, offset, length, buf);
+
+ if (ret == GST_FLOW_OK)
+ ret = gst_tee_handle_buffer (tee, gst_buffer_ref (*buf));
+ else if (ret == GST_FLOW_UNEXPECTED)
+ gst_tee_pull_eos (tee);
+
+ gst_object_unref (tee);
+
+ return ret;
}
GST_END_TEST;
+GST_START_TEST (test_tee)
+{
+ gchar *s;
+
+ s = "fakesrc can-activate-push=true ! tee ! fakesink can-activate-push=true";
+ run_pipeline (setup_pipeline (s), s,
+ GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN);
+
+ s = "fakesrc can-activate-push=true num-buffers=10 ! tee ! fakesink can-activate-push=true";
+ run_pipeline (setup_pipeline (s), s,
+ GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_EOS);
+
+ s = "fakesrc can-activate-push=false can-activate-pull=true ! tee ! fakesink can-activate-pull=true";
+ ASSERT_CRITICAL (run_pipeline (setup_pipeline (s), s,
+ GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED,
+ GST_MESSAGE_UNKNOWN));
+
+ s = "fakesrc can-activate-push=false can-activate-pull=true "
+ "! tee pull-mode=single ! fakesink can-activate-pull=true";
+ run_pipeline (setup_pipeline (s), s,
+ GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN);
+
+ s = "fakesrc can-activate-push=false can-activate-pull=true num-buffers=10 "
+ "! tee pull-mode=single ! fakesink can-activate-pull=true";
+ run_pipeline (setup_pipeline (s), s,
+ GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_EOS);
+
+ s = "fakesrc can-activate-push=false can-activate-pull=true "
+ "! tee name=t pull-mode=single ! fakesink can-activate-pull=true "
+ "t. ! queue ! fakesink can-activate-pull=true can-activate-push=false";
+ ASSERT_CRITICAL (run_pipeline (setup_pipeline (s), s,
+ GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED,
+ GST_MESSAGE_UNKNOWN));
+
+ s = "fakesrc can-activate-push=false can-activate-pull=true "
+ "! tee name=t pull-mode=single ! fakesink can-activate-pull=true "
+ "t. ! queue ! fakesink";
+ run_pipeline (setup_pipeline (s), s,
+ GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_UNKNOWN);
+
+ s = "fakesrc can-activate-push=false can-activate-pull=true num-buffers=10 "
+ "! tee name=t pull-mode=single ! fakesink can-activate-pull=true "
+ "t. ! queue ! fakesink";
+ run_pipeline (setup_pipeline (s), s,
+ GST_MESSAGE_NEW_CLOCK | GST_MESSAGE_STATE_CHANGED, GST_MESSAGE_EOS);
+}
+
+GST_END_TEST;
+
static void
got_handoff (GstElement * sink, GstBuffer * buf, GstPad * pad, gpointer unused)
{
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_2_elements);
+ tcase_add_test (tc_chain, test_tee);
tcase_add_test (tc_chain, test_stop_from_app);
return s;
}