/**
* SECTION:element-funnel
+ * @title: funnel
*
* Takes packets from various input sinks into one output source.
*
{
GstPad parent;
- GstSegment segment;
+ gboolean got_eos;
};
struct _GstFunnelPadClass
G_DEFINE_TYPE (GstFunnelPad, gst_funnel_pad, GST_TYPE_PAD);
-static void
-gst_funnel_pad_class_init (GstFunnelPadClass * klass)
+#define DEFAULT_FORWARD_STICKY_EVENTS TRUE
+
+enum
{
-}
+ PROP_0,
+ PROP_FORWARD_STICKY_EVENTS
+};
static void
-gst_funnel_pad_reset (GstFunnelPad * pad)
+gst_funnel_pad_class_init (GstFunnelPadClass * klass)
{
- gst_segment_init (&pad->segment, GST_FORMAT_UNDEFINED);
}
static void
gst_funnel_pad_init (GstFunnelPad * pad)
{
- gst_funnel_pad_reset (pad);
+ pad->got_eos = FALSE;
}
-static GstStaticPadTemplate funnel_sink_template =
-GST_STATIC_PAD_TEMPLATE ("sink_%u",
+static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink_%u",
GST_PAD_SINK,
GST_PAD_REQUEST,
GST_STATIC_CAPS_ANY);
-static GstStaticPadTemplate funnel_src_template =
-GST_STATIC_PAD_TEMPLATE ("src",
+static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
static GstFlowReturn gst_funnel_sink_chain (GstPad * pad, GstObject * parent,
GstBuffer * buffer);
+static GstFlowReturn gst_funnel_sink_chain_list (GstPad * pad,
+ GstObject * parent, GstBufferList * list);
static gboolean gst_funnel_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event);
-static gboolean gst_funnel_sink_query (GstPad * pad, GstObject * parent,
- GstQuery * query);
-static gboolean gst_funnel_src_event (GstPad * pad, GstObject * parent,
- GstEvent * event);
+static void
+gst_funnel_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstFunnel *funnel = GST_FUNNEL (object);
+
+ switch (prop_id) {
+ case PROP_FORWARD_STICKY_EVENTS:
+ funnel->forward_sticky_events = g_value_get_boolean (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_funnel_get_property (GObject * object, guint prop_id, GValue * value,
+ GParamSpec * pspec)
+{
+ GstFunnel *funnel = GST_FUNNEL (object);
+
+ switch (prop_id) {
+ case PROP_FORWARD_STICKY_EVENTS:
+ g_value_set_boolean (value, funnel->forward_sticky_events);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
static void
gst_funnel_dispose (GObject * object)
{
+ GstFunnel *funnel = GST_FUNNEL (object);
GList *item;
+ gst_object_replace ((GstObject **) & funnel->last_sinkpad, NULL);
+
restart:
for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
GstPad *pad = GST_PAD (item->data);
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
+ gobject_class->set_property = gst_funnel_set_property;
+ gobject_class->get_property = gst_funnel_get_property;
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_funnel_dispose);
- gst_element_class_set_details_simple (gstelement_class,
+ g_object_class_install_property (gobject_class, PROP_FORWARD_STICKY_EVENTS,
+ g_param_spec_boolean ("forward-sticky-events", "Forward sticky events",
+ "Forward sticky events on stream changes",
+ DEFAULT_FORWARD_STICKY_EVENTS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
+ GST_PARAM_MUTABLE_READY));
+
+ gst_element_class_set_static_metadata (gstelement_class,
"Funnel pipe fitting", "Generic", "N-to-1 pipe fitting",
"Olivier Crete <olivier.crete@collabora.co.uk>");
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&funnel_sink_template));
- gst_element_class_add_pad_template (gstelement_class,
- gst_static_pad_template_get (&funnel_src_template));
+ gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
+ gst_element_class_add_static_pad_template (gstelement_class, &src_template);
gstelement_class->request_new_pad =
GST_DEBUG_FUNCPTR (gst_funnel_request_new_pad);
static void
gst_funnel_init (GstFunnel * funnel)
{
- funnel->srcpad = gst_pad_new_from_static_template (&funnel_src_template,
- "src");
- gst_pad_set_event_function (funnel->srcpad, gst_funnel_src_event);
+ funnel->srcpad = gst_pad_new_from_static_template (&src_template, "src");
gst_pad_use_fixed_caps (funnel->srcpad);
+
gst_element_add_pad (GST_ELEMENT (funnel), funnel->srcpad);
+
+ funnel->forward_sticky_events = DEFAULT_FORWARD_STICKY_EVENTS;
}
static GstPad *
gst_pad_set_chain_function (sinkpad,
GST_DEBUG_FUNCPTR (gst_funnel_sink_chain));
+ gst_pad_set_chain_list_function (sinkpad,
+ GST_DEBUG_FUNCPTR (gst_funnel_sink_chain_list));
gst_pad_set_event_function (sinkpad,
GST_DEBUG_FUNCPTR (gst_funnel_sink_event));
- gst_pad_set_query_function (sinkpad,
- GST_DEBUG_FUNCPTR (gst_funnel_sink_query));
+
+ GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_CAPS);
+ GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_ALLOCATION);
gst_pad_set_active (sinkpad, TRUE);
gst_element_add_pad (element, sinkpad);
+ GST_DEBUG_OBJECT (element, "requested pad %s:%s",
+ GST_DEBUG_PAD_NAME (sinkpad));
+
return sinkpad;
}
+static gboolean
+gst_funnel_all_sinkpads_eos_unlocked (GstFunnel * funnel, GstPad * pad)
+{
+ GstElement *element = GST_ELEMENT_CAST (funnel);
+ GList *item;
+ gboolean all_eos = FALSE;
+
+
+ if (element->numsinkpads == 0)
+ goto done;
+
+ for (item = element->sinkpads; item != NULL; item = g_list_next (item)) {
+ GstFunnelPad *sinkpad = GST_FUNNEL_PAD_CAST (item->data);
+
+ if (!sinkpad->got_eos) {
+ return FALSE;
+ }
+ }
+
+ all_eos = TRUE;
+
+done:
+ return all_eos;
+}
+
static void
gst_funnel_release_pad (GstElement * element, GstPad * pad)
{
GstFunnel *funnel = GST_FUNNEL (element);
+ GstFunnelPad *fpad = GST_FUNNEL_PAD_CAST (pad);
+ gboolean got_eos;
+ gboolean send_eos = FALSE;
- GST_DEBUG_OBJECT (funnel, "releasing pad");
+ GST_DEBUG_OBJECT (funnel, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
gst_pad_set_active (pad, FALSE);
+ got_eos = fpad->got_eos;
+
gst_element_remove_pad (GST_ELEMENT_CAST (funnel), pad);
+
+ GST_OBJECT_LOCK (funnel);
+ if (!got_eos && gst_funnel_all_sinkpads_eos_unlocked (funnel, NULL)) {
+ GST_DEBUG_OBJECT (funnel, "Pad removed. All others are EOS. Sending EOS");
+ send_eos = TRUE;
+ }
+ GST_OBJECT_UNLOCK (funnel);
+
+ if (send_eos)
+ if (!gst_pad_push_event (funnel->srcpad, gst_event_new_eos ()))
+ GST_WARNING_OBJECT (funnel, "Failure pushing EOS");
+}
+
+static gboolean
+forward_events (GstPad * pad, GstEvent ** event, gpointer user_data)
+{
+ GstPad *srcpad = user_data;
+
+ if (GST_EVENT_TYPE (*event) != GST_EVENT_EOS)
+ gst_pad_push_event (srcpad, gst_event_ref (*event));
+
+ return TRUE;
}
static GstFlowReturn
-gst_funnel_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+gst_funnel_sink_chain_object (GstPad * pad, GstFunnel * funnel,
+ gboolean is_list, GstMiniObject * obj)
{
GstFlowReturn res;
- GstFunnel *funnel = GST_FUNNEL (parent);
- GstFunnelPad *fpad = GST_FUNNEL_PAD_CAST (pad);
- GstEvent *event = NULL;
- GstClockTime newts;
-#if 0
- GstCaps *padcaps;
-#endif
- GST_DEBUG_OBJECT (funnel, "received buffer %p", buffer);
+ GST_DEBUG_OBJECT (pad, "received %" GST_PTR_FORMAT, obj);
- GST_OBJECT_LOCK (funnel);
- if (fpad->segment.format == GST_FORMAT_UNDEFINED) {
- GST_WARNING_OBJECT (funnel, "Got buffer without segment,"
- " setting segment [0,inf[");
- gst_segment_init (&fpad->segment, GST_FORMAT_TIME);
- }
+ GST_PAD_STREAM_LOCK (funnel->srcpad);
- if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_TIMESTAMP (buffer)))
- fpad->segment.position = GST_BUFFER_TIMESTAMP (buffer);
+ if ((funnel->last_sinkpad == NULL) || (funnel->forward_sticky_events
+ && (funnel->last_sinkpad != pad))) {
+ gst_object_replace ((GstObject **) & funnel->last_sinkpad,
+ GST_OBJECT (pad));
- newts = gst_segment_to_running_time (&fpad->segment,
- fpad->segment.format, GST_BUFFER_TIMESTAMP (buffer));
- if (newts != GST_BUFFER_TIMESTAMP (buffer)) {
- buffer = gst_buffer_make_writable (buffer);
- GST_BUFFER_TIMESTAMP (buffer) = newts;
+ GST_DEBUG_OBJECT (pad, "Forwarding sticky events");
+ gst_pad_sticky_events_foreach (pad, forward_events, funnel->srcpad);
}
- if (!funnel->has_segment) {
- GstSegment segment;
+ if (is_list)
+ res = gst_pad_push_list (funnel->srcpad, GST_BUFFER_LIST_CAST (obj));
+ else
+ res = gst_pad_push (funnel->srcpad, GST_BUFFER_CAST (obj));
- gst_segment_init (&segment, GST_FORMAT_TIME);
- event = gst_event_new_segment (&segment);
- funnel->has_segment = TRUE;
- }
- GST_OBJECT_UNLOCK (funnel);
+ GST_PAD_STREAM_UNLOCK (funnel->srcpad);
- if (event) {
- if (!gst_pad_push_event (funnel->srcpad, event))
- GST_WARNING_OBJECT (funnel, "Could not push out newsegment event");
- }
-#if 0
- GST_OBJECT_LOCK (pad);
- padcaps = GST_PAD_CAPS (funnel->srcpad);
- GST_OBJECT_UNLOCK (pad);
-
- if (GST_BUFFER_CAPS (buffer) && GST_BUFFER_CAPS (buffer) != padcaps) {
- if (!gst_pad_set_caps (funnel->srcpad, GST_BUFFER_CAPS (buffer))) {
- res = GST_FLOW_NOT_NEGOTIATED;
- goto out;
- }
- }
-#endif
+ GST_LOG_OBJECT (pad, "handled buffer%s %s", (is_list ? "list" : ""),
+ gst_flow_get_name (res));
- res = gst_pad_push (funnel->srcpad, buffer);
+ return res;
+}
- GST_LOG_OBJECT (funnel, "handled buffer %s", gst_flow_get_name (res));
+static GstFlowReturn
+gst_funnel_sink_chain_list (GstPad * pad, GstObject * parent,
+ GstBufferList * list)
+{
+ GstFunnel *funnel = GST_FUNNEL (parent);
-#if 0
-out:
-#endif
+ return gst_funnel_sink_chain_object (pad, funnel, TRUE,
+ GST_MINI_OBJECT_CAST (list));
+}
- return res;
+static GstFlowReturn
+gst_funnel_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
+{
+ GstFunnel *funnel = GST_FUNNEL (parent);
+
+ return gst_funnel_sink_chain_object (pad, funnel, FALSE,
+ GST_MINI_OBJECT_CAST (buffer));
}
static gboolean
GstFunnelPad *fpad = GST_FUNNEL_PAD_CAST (pad);
gboolean forward = TRUE;
gboolean res = TRUE;
+ gboolean unlock = FALSE;
- switch (GST_EVENT_TYPE (event)) {
- case GST_EVENT_SEGMENT:
- {
+ GST_DEBUG_OBJECT (pad, "received event %" GST_PTR_FORMAT, event);
+
+ if (GST_EVENT_IS_STICKY (event)) {
+ unlock = TRUE;
+ GST_PAD_STREAM_LOCK (funnel->srcpad);
+
+ if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
GST_OBJECT_LOCK (funnel);
- gst_event_copy_segment (event, &fpad->segment);
+ fpad->got_eos = TRUE;
+ if (!gst_funnel_all_sinkpads_eos_unlocked (funnel, pad)) {
+ forward = FALSE;
+ } else {
+ forward = TRUE;
+ }
GST_OBJECT_UNLOCK (funnel);
-
+ } else if (pad != funnel->last_sinkpad) {
forward = FALSE;
- break;
}
- case GST_EVENT_FLUSH_STOP:
- {
- GST_OBJECT_LOCK (funnel);
- gst_segment_init (&fpad->segment, GST_FORMAT_UNDEFINED);
- funnel->has_segment = FALSE;
- GST_OBJECT_UNLOCK (funnel);
- break;
+ } else if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP) {
+ unlock = TRUE;
+ GST_PAD_STREAM_LOCK (funnel->srcpad);
+ GST_OBJECT_LOCK (funnel);
+ fpad->got_eos = FALSE;
+ GST_OBJECT_UNLOCK (funnel);
+ }
+
+ if (forward && GST_EVENT_IS_SERIALIZED (event)) {
+ /* If no data is coming and we receive serialized event, need to forward all sticky events.
+ * Otherwise downstream has an inconsistent set of sticky events when
+ * handling the new event. */
+ if (!unlock) {
+ unlock = TRUE;
+ GST_PAD_STREAM_LOCK (funnel->srcpad);
+ }
+
+ if ((funnel->last_sinkpad == NULL) || (funnel->forward_sticky_events
+ && (funnel->last_sinkpad != pad))) {
+ gst_object_replace ((GstObject **) & funnel->last_sinkpad,
+ GST_OBJECT (pad));
+ gst_pad_sticky_events_foreach (pad, forward_events, funnel->srcpad);
}
- default:
- break;
}
if (forward)
else
gst_event_unref (event);
- return res;
-}
-
-static gboolean
-gst_funnel_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
-{
- GstFunnel *funnel = GST_FUNNEL (parent);
- gboolean forward = TRUE;
- gboolean res = TRUE;
-
- if (forward)
- res = gst_pad_peer_query (funnel->srcpad, query);
+ if (unlock)
+ GST_PAD_STREAM_UNLOCK (funnel->srcpad);
return res;
}
-static gboolean
-gst_funnel_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
-{
- GstElement *funnel;
- GstIterator *iter;
- GstPad *sinkpad;
- gboolean result = FALSE;
- gboolean done = FALSE;
- GValue value = { 0, };
-
- funnel = GST_ELEMENT_CAST (parent);
-
- iter = gst_element_iterate_sink_pads (funnel);
-
- while (!done) {
- switch (gst_iterator_next (iter, &value)) {
- case GST_ITERATOR_OK:
- sinkpad = g_value_get_object (&value);
- gst_event_ref (event);
- result |= gst_pad_push_event (sinkpad, event);
- g_value_reset (&value);
- break;
- case GST_ITERATOR_RESYNC:
- gst_iterator_resync (iter);
- result = FALSE;
- break;
- case GST_ITERATOR_ERROR:
- GST_WARNING_OBJECT (funnel, "Error iterating sinkpads");
- case GST_ITERATOR_DONE:
- done = TRUE;
- break;
- }
- }
- g_value_unset (&value);
- gst_iterator_free (iter);
- gst_event_unref (event);
-
- return result;
-}
-
static void
reset_pad (const GValue * data, gpointer user_data)
{
GstPad *pad = g_value_get_object (data);
GstFunnelPad *fpad = GST_FUNNEL_PAD_CAST (pad);
+ GstFunnel *funnel = user_data;
- GST_OBJECT_LOCK (pad);
- gst_funnel_pad_reset (fpad);
- GST_OBJECT_UNLOCK (pad);
+ GST_OBJECT_LOCK (funnel);
+ fpad->got_eos = FALSE;
+ GST_OBJECT_UNLOCK (funnel);
}
static GstStateChangeReturn
gst_funnel_change_state (GstElement * element, GstStateChange transition)
{
- GstFunnel *funnel = GST_FUNNEL (element);
GstStateChangeReturn ret;
+ ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
+ if (ret == GST_STATE_CHANGE_FAILURE)
+ return ret;
+
switch (transition) {
- case GST_STATE_CHANGE_READY_TO_PAUSED:
+ case GST_STATE_CHANGE_PAUSED_TO_READY:
{
GstIterator *iter = gst_element_iterate_sink_pads (element);
GstIteratorResult res;
do {
- res = gst_iterator_foreach (iter, reset_pad, NULL);
+ res = gst_iterator_foreach (iter, reset_pad, element);
+ if (res == GST_ITERATOR_RESYNC)
+ gst_iterator_resync (iter);
} while (res == GST_ITERATOR_RESYNC);
-
gst_iterator_free (iter);
if (res == GST_ITERATOR_ERROR)
return GST_STATE_CHANGE_FAILURE;
- GST_OBJECT_LOCK (funnel);
- funnel->has_segment = FALSE;
- GST_OBJECT_UNLOCK (funnel);
}
break;
default:
break;
}
- ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
-
return ret;
}