*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
*/
/**
- * SECTION:element-gstrtpssrcdemux
+ * SECTION:element-rtpssrcdemux
+ * @title: rtpssrcdemux
*
- * gstrtpssrcdemux acts as a demuxer for RTP packets based on the SSRC of the
+ * rtpssrcdemux acts as a demuxer for RTP packets based on the SSRC of the
* packets. Its main purpose is to allow an application to easily receive and
* decode an RTP stream with multiple SSRCs.
- *
+ *
* For each SSRC that is detected, a new pad will be created and the
- * #GstRtpSsrcDemux::new-ssrc-pad signal will be emitted.
- *
- * <refsect2>
- * <title>Example pipelines</title>
+ * #GstRtpSsrcDemux::new-ssrc-pad signal will be emitted.
+ *
+ * ## Example pipelines
* |[
- * gst-launch udpsrc caps="application/x-rtp" ! gstrtpssrcdemux ! fakesink
+ * gst-launch-1.0 udpsrc caps="application/x-rtp" ! rtpssrcdemux ! fakesink
* ]| Takes an RTP stream and send the RTP packets with the first detected SSRC
* to fakesink, discarding the other SSRCs.
- * </refsect2>
*
- * Last reviewed on 2007-05-28 (0.10.5)
*/
#ifdef HAVE_CONFIG_H
#include <gst/rtp/gstrtpbuffer.h>
#include <gst/rtp/gstrtcpbuffer.h>
-#include "gstrtpbin-marshal.h"
#include "gstrtpssrcdemux.h"
GST_DEBUG_CATEGORY_STATIC (gst_rtp_ssrc_demux_debug);
);
static GstStaticPadTemplate rtp_ssrc_demux_src_template =
-GST_STATIC_PAD_TEMPLATE ("src_%d",
+GST_STATIC_PAD_TEMPLATE ("src_%u",
GST_PAD_SRC,
GST_PAD_SOMETIMES,
GST_STATIC_CAPS ("application/x-rtp")
);
static GstStaticPadTemplate rtp_ssrc_demux_rtcp_src_template =
-GST_STATIC_PAD_TEMPLATE ("rtcp_src_%d",
+GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u",
GST_PAD_SRC,
GST_PAD_SOMETIMES,
GST_STATIC_CAPS ("application/x-rtcp")
);
-#define GST_PAD_LOCK(obj) (g_static_rec_mutex_lock (&(obj)->padlock))
-#define GST_PAD_UNLOCK(obj) (g_static_rec_mutex_unlock (&(obj)->padlock))
+#define INTERNAL_STREAM_LOCK(obj) (g_rec_mutex_lock (&(obj)->padlock))
+#define INTERNAL_STREAM_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock))
+
+typedef enum
+{
+ RTP_PAD,
+ RTCP_PAD
+} PadType;
+
+#define DEFAULT_MAX_STREAMS G_MAXUINT
+enum
+{
+ PROP_0,
+ PROP_MAX_STREAMS
+};
/* signals */
enum
guint32 ssrc);
/* sinkpad stuff */
-static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf);
-static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event);
-
-static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad,
+static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent,
GstBuffer * buf);
-static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad,
+static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
GstEvent * event);
+
+static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad,
+ GstObject * parent, GstBuffer * buf);
static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad *
- pad);
+ pad, GstObject * parent);
/* srcpad stuff */
-static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event);
-static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad *
- pad);
-static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query);
+static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
+ GstEvent * event);
+static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad,
+ GstObject * parent);
+static gboolean gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
+ GstQuery * query);
static guint gst_rtp_ssrc_demux_signals[LAST_SIGNAL] = { 0 };
};
/* find a src pad for a given SSRC, returns NULL if the SSRC was not found
+ * MUST be called with object lock
*/
static GstRtpSsrcDemuxPad *
find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
return NULL;
}
-/* with PAD_LOCK */
-static GstRtpSsrcDemuxPad *
-find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc)
+/* returns a reference to the pad if found, %NULL otherwise */
+static GstPad *
+get_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, PadType padtype)
+{
+ GstRtpSsrcDemuxPad *demuxpad;
+ GstPad *retpad;
+
+ GST_OBJECT_LOCK (demux);
+
+ demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
+ if (!demuxpad) {
+ GST_OBJECT_UNLOCK (demux);
+ return NULL;
+ }
+
+ switch (padtype) {
+ case RTP_PAD:
+ retpad = gst_object_ref (demuxpad->rtp_pad);
+ break;
+ case RTCP_PAD:
+ retpad = gst_object_ref (demuxpad->rtcp_pad);
+ break;
+ default:
+ retpad = NULL;
+ g_assert_not_reached ();
+ }
+
+ GST_OBJECT_UNLOCK (demux);
+
+ return retpad;
+}
+
+static GstEvent *
+add_ssrc_and_ref (GstEvent * event, guint32 ssrc)
+{
+ /* Set the ssrc on the output caps */
+ switch (GST_EVENT_TYPE (event)) {
+ case GST_EVENT_CAPS:
+ {
+ GstCaps *caps;
+ GstCaps *newcaps;
+ GstStructure *s;
+
+ gst_event_parse_caps (event, &caps);
+ newcaps = gst_caps_copy (caps);
+
+ s = gst_caps_get_structure (newcaps, 0);
+ gst_structure_set (s, "ssrc", G_TYPE_UINT, ssrc, NULL);
+ event = gst_event_new_caps (newcaps);
+ gst_caps_unref (newcaps);
+ break;
+ }
+ default:
+ gst_event_ref (event);
+ break;
+ }
+
+ return event;
+}
+
+struct ForwardStickyEventData
+{
+ GstPad *pad;
+ guint32 ssrc;
+};
+
+/* With internal stream lock held */
+static gboolean
+forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
+{
+ struct ForwardStickyEventData *data = user_data;
+ GstEvent *newevent;
+
+ newevent = add_ssrc_and_ref (*event, data->ssrc);
+
+ gst_pad_push_event (data->pad, newevent);
+
+ return TRUE;
+}
+
+/* With internal stream lock held */
+static void
+forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad,
+ PadType padtype)
+{
+ struct ForwardStickyEventData fdata;
+ GstPad *sinkpad = NULL;
+
+ if (padtype == RTP_PAD)
+ sinkpad = demux->rtp_sink;
+ else if (padtype == RTCP_PAD)
+ sinkpad = demux->rtcp_sink;
+ else
+ g_assert_not_reached ();
+
+ fdata.ssrc = ssrc;
+ fdata.pad = pad;
+
+ gst_pad_sticky_events_foreach (sinkpad, forward_sticky_events, &fdata);
+}
+
+/* MUST only be called from streaming thread */
+static GstPad *
+find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
+ PadType padtype)
{
GstPad *rtp_pad, *rtcp_pad;
GstElementClass *klass;
GstPadTemplate *templ;
gchar *padname;
GstRtpSsrcDemuxPad *demuxpad;
- GstCaps *caps;
+ GstPad *retpad;
+ guint num_streams;
- GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
+ INTERNAL_STREAM_LOCK (demux);
- demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
- if (demuxpad != NULL) {
- return demuxpad;
+ retpad = get_demux_pad_for_ssrc (demux, ssrc, padtype);
+ if (retpad != NULL) {
+ INTERNAL_STREAM_UNLOCK (demux);
+ return retpad;
+ }
+ /* We create 2 src pads per ssrc (RTP & RTCP). Checking if we are allowed
+ to create 2 more pads */
+ num_streams = (GST_ELEMENT_CAST (demux)->numsrcpads) >> 1;
+ if (num_streams >= demux->max_streams) {
+ INTERNAL_STREAM_UNLOCK (demux);
+ return NULL;
}
+ GST_DEBUG_OBJECT (demux, "creating new pad for SSRC %08x", ssrc);
+
klass = GST_ELEMENT_GET_CLASS (demux);
- templ = gst_element_class_get_pad_template (klass, "src_%d");
- padname = g_strdup_printf ("src_%d", ssrc);
+ templ = gst_element_class_get_pad_template (klass, "src_%u");
+ padname = g_strdup_printf ("src_%u", ssrc);
rtp_pad = gst_pad_new_from_template (templ, padname);
g_free (padname);
- templ = gst_element_class_get_pad_template (klass, "rtcp_src_%d");
- padname = g_strdup_printf ("rtcp_src_%d", ssrc);
+ templ = gst_element_class_get_pad_template (klass, "rtcp_src_%u");
+ padname = g_strdup_printf ("rtcp_src_%u", ssrc);
rtcp_pad = gst_pad_new_from_template (templ, padname);
g_free (padname);
gst_pad_set_element_private (rtp_pad, demuxpad);
gst_pad_set_element_private (rtcp_pad, demuxpad);
+ GST_OBJECT_LOCK (demux);
demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad);
+ GST_OBJECT_UNLOCK (demux);
- /* copy caps from input */
- caps = gst_pad_get_current_caps (demux->rtp_sink);
- gst_pad_set_caps (rtp_pad, caps);
- gst_caps_unref (caps);
- gst_pad_use_fixed_caps (rtp_pad);
- caps = gst_pad_get_current_caps (demux->rtcp_sink);
- gst_pad_set_caps (rtcp_pad, caps);
- gst_caps_unref (caps);
- gst_pad_use_fixed_caps (rtcp_pad);
-
- gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query);
gst_pad_set_iterate_internal_links_function (rtp_pad,
gst_rtp_ssrc_demux_iterate_internal_links_src);
+ gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
+ gst_pad_use_fixed_caps (rtp_pad);
gst_pad_set_active (rtp_pad, TRUE);
gst_pad_set_event_function (rtcp_pad, gst_rtp_ssrc_demux_src_event);
gst_pad_set_iterate_internal_links_function (rtcp_pad,
gst_rtp_ssrc_demux_iterate_internal_links_src);
+ gst_pad_use_fixed_caps (rtcp_pad);
gst_pad_set_active (rtcp_pad, TRUE);
+ forward_initial_events (demux, ssrc, rtp_pad, RTP_PAD);
+ forward_initial_events (demux, ssrc, rtcp_pad, RTCP_PAD);
+
gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad);
+ switch (padtype) {
+ case RTP_PAD:
+ retpad = gst_object_ref (demuxpad->rtp_pad);
+ break;
+ case RTCP_PAD:
+ retpad = gst_object_ref (demuxpad->rtcp_pad);
+ break;
+ default:
+ retpad = NULL;
+ g_assert_not_reached ();
+ }
+
g_signal_emit (G_OBJECT (demux),
gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad);
- return demuxpad;
+ INTERNAL_STREAM_UNLOCK (demux);
+
+ return retpad;
+}
+
+static void
+gst_rtp_ssrc_demux_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstRtpSsrcDemux *demux;
+
+ demux = GST_RTP_SSRC_DEMUX (object);
+ switch (prop_id) {
+ case PROP_MAX_STREAMS:
+ demux->max_streams = g_value_get_uint (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rtp_ssrc_demux_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstRtpSsrcDemux *demux;
+
+ demux = GST_RTP_SSRC_DEMUX (object);
+ switch (prop_id) {
+ case PROP_MAX_STREAMS:
+ g_value_set_uint (value, demux->max_streams);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
}
static void
gobject_klass->dispose = gst_rtp_ssrc_demux_dispose;
gobject_klass->finalize = gst_rtp_ssrc_demux_finalize;
+ gobject_klass->set_property = gst_rtp_ssrc_demux_set_property;
+ gobject_klass->get_property = gst_rtp_ssrc_demux_get_property;
+
+ g_object_class_install_property (gobject_klass, PROP_MAX_STREAMS,
+ g_param_spec_uint ("max-streams", "Max Streams",
+ "The maximum number of streams allowed",
+ 0, G_MAXUINT, DEFAULT_MAX_STREAMS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstRtpSsrcDemux::new-ssrc-pad:
* @ssrc: the SSRC of the pad
* @pad: the new pad.
*
- * Emited when a new SSRC pad has been created.
+ * Emitted when a new SSRC pad has been created.
*/
gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD] =
g_signal_new ("new-ssrc-pad",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, new_ssrc_pad),
- NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
- G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
+ NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
/**
* GstRtpSsrcDemux::removed-ssrc-pad:
* @ssrc: the SSRC of the pad
* @pad: the removed pad.
*
- * Emited when a SSRC pad has been removed.
+ * Emitted when a SSRC pad has been removed.
*/
gst_rtp_ssrc_demux_signals[SIGNAL_REMOVED_SSRC_PAD] =
g_signal_new ("removed-ssrc-pad",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, removed_ssrc_pad),
- NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
- G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
+ NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
/**
* GstRtpSsrcDemux::clear-ssrc:
g_signal_new ("clear-ssrc",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, clear_ssrc),
- NULL, NULL, gst_rtp_bin_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+ NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
gstelement_klass->change_state =
GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_change_state);
gstrtpssrcdemux_klass->clear_ssrc =
GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_clear_ssrc);
- gst_element_class_add_pad_template (gstelement_klass,
- gst_static_pad_template_get (&rtp_ssrc_demux_sink_template));
- gst_element_class_add_pad_template (gstelement_klass,
- gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_sink_template));
- gst_element_class_add_pad_template (gstelement_klass,
- gst_static_pad_template_get (&rtp_ssrc_demux_src_template));
- gst_element_class_add_pad_template (gstelement_klass,
- gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template));
+ gst_element_class_add_static_pad_template (gstelement_klass,
+ &rtp_ssrc_demux_sink_template);
+ gst_element_class_add_static_pad_template (gstelement_klass,
+ &rtp_ssrc_demux_rtcp_sink_template);
+ gst_element_class_add_static_pad_template (gstelement_klass,
+ &rtp_ssrc_demux_src_template);
+ gst_element_class_add_static_pad_template (gstelement_klass,
+ &rtp_ssrc_demux_rtcp_src_template);
- gst_element_class_set_details_simple (gstelement_klass, "RTP SSRC Demux",
+ gst_element_class_set_static_metadata (gstelement_klass, "RTP SSRC Demux",
"Demux/Network/RTP",
"Splits RTP streams based on the SSRC",
"Wim Taymans <wim.taymans@gmail.com>");
GST_DEBUG_CATEGORY_INIT (gst_rtp_ssrc_demux_debug,
"rtpssrcdemux", 0, "RTP SSRC demuxer");
+
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_ssrc_demux_chain);
+ GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_ssrc_demux_rtcp_chain);
}
static void
gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
"rtcp_sink"), "rtcp_sink");
gst_pad_set_chain_function (demux->rtcp_sink, gst_rtp_ssrc_demux_rtcp_chain);
- gst_pad_set_event_function (demux->rtcp_sink,
- gst_rtp_ssrc_demux_rtcp_sink_event);
+ gst_pad_set_event_function (demux->rtcp_sink, gst_rtp_ssrc_demux_sink_event);
gst_pad_set_iterate_internal_links_function (demux->rtcp_sink,
gst_rtp_ssrc_demux_iterate_internal_links_sink);
gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
- g_static_rec_mutex_init (&demux->padlock);
+ demux->max_streams = DEFAULT_MAX_STREAMS;
- gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
+ g_rec_mutex_init (&demux->padlock);
}
static void
GstRtpSsrcDemux *demux;
demux = GST_RTP_SSRC_DEMUX (object);
- g_static_rec_mutex_free (&demux->padlock);
+ g_rec_mutex_clear (&demux->padlock);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
{
GstRtpSsrcDemuxPad *dpad;
- GST_PAD_LOCK (demux);
+ GST_OBJECT_LOCK (demux);
dpad = find_demux_pad_for_ssrc (demux, ssrc);
if (dpad == NULL) {
- GST_PAD_UNLOCK (demux);
+ GST_OBJECT_UNLOCK (demux);
goto unknown_pad;
}
GST_DEBUG_OBJECT (demux, "clearing pad for SSRC %08x", ssrc);
demux->srcpads = g_slist_remove (demux->srcpads, dpad);
- GST_PAD_UNLOCK (demux);
+ GST_OBJECT_UNLOCK (demux);
gst_pad_set_active (dpad->rtp_pad, FALSE);
gst_pad_set_active (dpad->rtcp_pad, FALSE);
}
}
-static gboolean
-gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event)
+struct ForwardEventData
{
GstRtpSsrcDemux *demux;
- gboolean res = FALSE;
+ GstEvent *event;
+ gboolean res;
+ GstPad *pad;
+};
- demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
- if (G_UNLIKELY (demux == NULL)) {
- gst_event_unref (event);
- return FALSE;
- }
+static gboolean
+forward_event (GstPad * pad, gpointer user_data)
+{
+ struct ForwardEventData *fdata = user_data;
+ GSList *walk = NULL;
+ GstEvent *newevent = NULL;
- switch (GST_EVENT_TYPE (event)) {
- case GST_EVENT_FLUSH_STOP:
- gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
- default:
- {
- GSList *walk;
- GSList *pads = NULL;
-
- res = TRUE;
- /* need local snapshot of pads;
- * should not push downstream while holding lock as that might deadlock
- * with stuff traveling upstream tyring to get this lock while holding
- * other (stream)lock */
- GST_PAD_LOCK (demux);
- for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
- GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
-
- pads = g_slist_prepend (pads, gst_object_ref (pad->rtp_pad));
- }
- GST_PAD_UNLOCK (demux);
- for (walk = pads; walk; walk = g_slist_next (walk)) {
- GstPad *pad = (GstPad *) walk->data;
+ GST_OBJECT_LOCK (fdata->demux);
+ for (walk = fdata->demux->srcpads; walk; walk = walk->next) {
+ GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
- gst_event_ref (event);
- res &= gst_pad_push_event (pad, event);
- gst_object_unref (pad);
- }
- g_slist_free (pads);
- gst_event_unref (event);
+ if (pad == dpad->rtp_pad || pad == dpad->rtcp_pad) {
+ newevent = add_ssrc_and_ref (fdata->event, dpad->ssrc);
break;
}
}
+ GST_OBJECT_UNLOCK (fdata->demux);
- gst_object_unref (demux);
- return res;
+ if (newevent)
+ fdata->res &= gst_pad_push_event (pad, newevent);
+
+ return FALSE;
}
+
static gboolean
-gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstEvent * event)
+gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
+ GstEvent * event)
{
GstRtpSsrcDemux *demux;
- gboolean res = FALSE;
+ struct ForwardEventData fdata;
- demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
+ demux = GST_RTP_SSRC_DEMUX (parent);
- switch (GST_EVENT_TYPE (event)) {
- default:
- {
- GSList *walk;
- GSList *pads = NULL;
+ fdata.demux = demux;
+ fdata.pad = pad;
+ fdata.event = event;
+ fdata.res = TRUE;
- res = TRUE;
- GST_PAD_LOCK (demux);
- for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
- GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
+ gst_pad_forward (pad, forward_event, &fdata);
- pads = g_slist_prepend (pads, gst_object_ref (pad->rtcp_pad));
- }
- GST_PAD_UNLOCK (demux);
- for (walk = pads; walk; walk = g_slist_next (walk)) {
- GstPad *pad = (GstPad *) walk->data;
+ gst_event_unref (event);
- gst_event_ref (event);
- res &= gst_pad_push_event (pad, event);
- gst_object_unref (pad);
- }
- g_slist_free (pads);
- gst_event_unref (event);
- break;
- }
- }
- gst_object_unref (demux);
- return res;
+ return fdata.res;
}
static GstFlowReturn
-gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf)
+gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
{
GstFlowReturn ret;
GstRtpSsrcDemux *demux;
guint32 ssrc;
- GstRtpSsrcDemuxPad *dpad;
- GstRTPBuffer rtp;
+ GstRTPBuffer rtp = { NULL };
GstPad *srcpad;
- demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad));
+ demux = GST_RTP_SSRC_DEMUX (parent);
- if (!gst_rtp_buffer_validate (buf))
+ if (!gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp))
goto invalid_payload;
- gst_rtp_buffer_map (buf, GST_MAP_READ, &rtp);
ssrc = gst_rtp_buffer_get_ssrc (&rtp);
gst_rtp_buffer_unmap (&rtp);
GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc);
- GST_PAD_LOCK (demux);
- dpad = find_or_create_demux_pad_for_ssrc (demux, ssrc);
- if (dpad == NULL) {
- GST_PAD_UNLOCK (demux);
+ srcpad = find_or_create_demux_pad_for_ssrc (demux, ssrc, RTP_PAD);
+ if (srcpad == NULL)
goto create_failed;
- }
- srcpad = gst_object_ref (dpad->rtp_pad);
- GST_PAD_UNLOCK (demux);
/* push to srcpad */
ret = gst_pad_push (srcpad, buf);
+ if (ret != GST_FLOW_OK) {
+ GstPad *active_pad;
+
+ /* check if the ssrc still there, may have been removed */
+ active_pad = get_demux_pad_for_ssrc (demux, ssrc, RTP_PAD);
+
+ if (active_pad == NULL || active_pad != srcpad) {
+ /* SSRC was removed during the push ... ignore the error */
+ ret = GST_FLOW_OK;
+ }
+
+ g_clear_object (&active_pad);
+ }
+
gst_object_unref (srcpad);
return ret;
/* ERRORS */
invalid_payload:
{
- /* this is fatal and should be filtered earlier */
- GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
- ("Dropping invalid RTP payload"));
+ GST_DEBUG_OBJECT (demux, "Dropping invalid RTP packet");
gst_buffer_unref (buf);
- return GST_FLOW_ERROR;
+ return GST_FLOW_OK;
}
create_failed:
{
- GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
- ("Could not create new pad"));
gst_buffer_unref (buf);
- return GST_FLOW_ERROR;
+ GST_WARNING_OBJECT (demux,
+ "Dropping buffer SSRC %08x. "
+ "Max streams number reached (%u)", ssrc, demux->max_streams);
+ return GST_FLOW_OK;
}
}
static GstFlowReturn
-gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
+gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
+ GstBuffer * buf)
{
GstFlowReturn ret;
GstRtpSsrcDemux *demux;
guint32 ssrc;
- GstRtpSsrcDemuxPad *dpad;
GstRTCPPacket packet;
- GstRTCPBuffer rtcp;
+ GstRTCPBuffer rtcp = { NULL, };
GstPad *srcpad;
- demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad));
+ demux = GST_RTP_SSRC_DEMUX (parent);
- if (!gst_rtcp_buffer_validate (buf))
+ if (!gst_rtcp_buffer_validate_reduced (buf))
goto invalid_rtcp;
gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
goto invalid_rtcp;
}
- /* first packet must be SR or RR or else the validate would have failed */
+ /* first packet must be SR or RR, or in case of a reduced size RTCP packet
+ * it must be APP, RTPFB or PSFB feeadback, or else the validate would
+ * have failed */
switch (gst_rtcp_packet_get_type (&packet)) {
case GST_RTCP_TYPE_SR:
/* get the ssrc so that we can route it to the right source pad */
gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL,
NULL);
break;
+ case GST_RTCP_TYPE_RR:
+ ssrc = gst_rtcp_packet_rr_get_ssrc (&packet);
+ break;
+ case GST_RTCP_TYPE_APP:
+ ssrc = gst_rtcp_packet_app_get_ssrc (&packet);
+ break;
+ case GST_RTCP_TYPE_RTPFB:
+ case GST_RTCP_TYPE_PSFB:
+ ssrc = gst_rtcp_packet_fb_get_sender_ssrc (&packet);
+ break;
default:
goto unexpected_rtcp;
}
GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc);
- GST_PAD_LOCK (demux);
- dpad = find_or_create_demux_pad_for_ssrc (demux, ssrc);
- if (dpad == NULL) {
- GST_PAD_UNLOCK (demux);
+ srcpad = find_or_create_demux_pad_for_ssrc (demux, ssrc, RTCP_PAD);
+ if (srcpad == NULL)
goto create_failed;
- }
- srcpad = gst_object_ref (dpad->rtcp_pad);
- GST_PAD_UNLOCK (demux);
/* push to srcpad */
ret = gst_pad_push (srcpad, buf);
+ if (ret != GST_FLOW_OK) {
+ GstPad *active_pad;
+
+ /* check if the ssrc still there, may have been removed */
+ active_pad = get_demux_pad_for_ssrc (demux, ssrc, RTCP_PAD);
+ if (active_pad == NULL || active_pad != srcpad) {
+ /* SSRC was removed during the push ... ignore the error */
+ ret = GST_FLOW_OK;
+ }
+
+ g_clear_object (&active_pad);
+ }
+
gst_object_unref (srcpad);
return ret;
/* ERRORS */
invalid_rtcp:
{
- /* this is fatal and should be filtered earlier */
- GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
- ("Dropping invalid RTCP packet"));
+ GST_DEBUG_OBJECT (demux, "Dropping invalid RTCP packet");
gst_buffer_unref (buf);
- return GST_FLOW_ERROR;
+ return GST_FLOW_OK;
}
unexpected_rtcp:
{
}
create_failed:
{
- GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
- ("Could not create new pad"));
gst_buffer_unref (buf);
- return GST_FLOW_ERROR;
+ GST_WARNING_OBJECT (demux,
+ "Dropping buffer SSRC %08x. "
+ "Max streams number reached (%u)", ssrc, demux->max_streams);
+ return GST_FLOW_OK;
}
}
+static GstRtpSsrcDemuxPad *
+find_demux_pad_for_pad (GstRtpSsrcDemux * demux, GstPad * pad)
+{
+ GSList *walk;
+
+ for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
+ GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
+ if (dpad->rtp_pad == pad || dpad->rtcp_pad == pad) {
+ return dpad;
+ }
+ }
+
+ return NULL;
+}
+
+
static gboolean
-gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event)
+gst_rtp_ssrc_demux_src_event (GstPad * pad, GstObject * parent,
+ GstEvent * event)
{
GstRtpSsrcDemux *demux;
const GstStructure *s;
- demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
+ demux = GST_RTP_SSRC_DEMUX (parent);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CUSTOM_UPSTREAM:
case GST_EVENT_CUSTOM_BOTH_OOB:
s = gst_event_get_structure (event);
if (s && !gst_structure_has_field (s, "ssrc")) {
- GSList *walk;
-
- for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
- GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
+ GstRtpSsrcDemuxPad *dpad = find_demux_pad_for_pad (demux, pad);
- if (dpad->rtp_pad == pad || dpad->rtcp_pad == pad) {
- GstStructure *ws;
+ if (dpad) {
+ GstStructure *ws;
- event =
- GST_EVENT_CAST (gst_mini_object_make_writable
- (GST_MINI_OBJECT_CAST (event)));
- ws = gst_event_writable_structure (event);
- gst_structure_set (ws, "ssrc", G_TYPE_UINT, dpad->ssrc, NULL);
- break;
- }
+ event = gst_event_make_writable (event);
+ ws = gst_event_writable_structure (event);
+ gst_structure_set (ws, "ssrc", G_TYPE_UINT, dpad->ssrc, NULL);
}
}
break;
break;
}
- gst_object_unref (demux);
-
- return gst_pad_event_default (pad, event);
+ return gst_pad_event_default (pad, parent, event);
}
static GstIterator *
-gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad)
+gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent)
{
GstRtpSsrcDemux *demux;
GstPad *otherpad = NULL;
GstIterator *it = NULL;
GSList *current;
- demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
-
- if (!demux)
- return NULL;
+ demux = GST_RTP_SSRC_DEMUX (parent);
- GST_PAD_LOCK (demux);
+ GST_OBJECT_LOCK (demux);
for (current = demux->srcpads; current; current = g_slist_next (current)) {
GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) current->data;
g_value_set_object (&val, otherpad);
it = gst_iterator_new_single (GST_TYPE_PAD, &val);
g_value_unset (&val);
- gst_object_unref (otherpad);
+
}
- GST_PAD_UNLOCK (demux);
+ GST_OBJECT_UNLOCK (demux);
- gst_object_unref (demux);
return it;
}
static gint
src_pad_compare_func (gconstpointer a, gconstpointer b)
{
- GstPad *pad = GST_PAD (a);
- const gchar *prefix = b;
- gint res = 1;
+ GstPad *pad = GST_PAD (g_value_get_object (a));
+ const gchar *prefix = g_value_get_string (b);
+ gint res;
+ /* 0 means equal means we accept the pad, accepted if there is a name
+ * and it starts with the prefix */
GST_OBJECT_LOCK (pad);
- res = !GST_PAD_NAME (pad) || g_str_has_prefix (GST_PAD_NAME (pad), prefix);
+ res = !GST_PAD_NAME (pad) || !g_str_has_prefix (GST_PAD_NAME (pad), prefix);
GST_OBJECT_UNLOCK (pad);
return res;
}
static GstIterator *
-gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad)
+gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad * pad,
+ GstObject * parent)
{
GstRtpSsrcDemux *demux;
GstIterator *it = NULL;
GValue gval = { 0, };
- demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
-
- if (!demux)
- return NULL;
+ demux = GST_RTP_SSRC_DEMUX (parent);
g_value_init (&gval, G_TYPE_STRING);
if (pad == demux->rtp_sink)
else
g_assert_not_reached ();
- it = gst_element_iterate_src_pads (GST_ELEMENT (demux));
-
+ it = gst_element_iterate_src_pads (GST_ELEMENT_CAST (demux));
it = gst_iterator_filter (it, src_pad_compare_func, &gval);
- gst_object_unref (demux);
-
return it;
}
static gboolean
-gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query)
+gst_rtp_ssrc_demux_src_query (GstPad * pad, GstObject * parent,
+ GstQuery * query)
{
GstRtpSsrcDemux *demux;
gboolean res = FALSE;
- demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad));
- if (G_UNLIKELY (demux == NULL))
- return FALSE;
+ demux = GST_RTP_SSRC_DEMUX (parent);
switch (GST_QUERY_TYPE (query)) {
case GST_QUERY_LATENCY:
break;
}
default:
- res = gst_pad_query_default (pad, query);
+ res = gst_pad_query_default (pad, parent, query);
break;
}
- gst_object_unref (demux);
return res;
}