RTCP_PAD
} PadType;
+#define DEFAULT_MAX_STREAMS G_MAXUINT
+enum
+{
+ PROP_0,
+ PROP_MAX_STREAMS
+};
+
/* signals */
enum
{
gchar *padname;
GstRtpSsrcDemuxPad *demuxpad;
GstPad *retpad;
+ guint num_streams;
INTERNAL_STREAM_LOCK (demux);
INTERNAL_STREAM_UNLOCK (demux);
return retpad;
}
+ /* We create 2 src pads per ssrc (RTP & RTCP). Checking if we are allowed
+ to create 2 more pads */
+ num_streams = (GST_ELEMENT_CAST (demux)->numsrcpads) >> 1;
+ if (num_streams >= demux->max_streams) {
+ INTERNAL_STREAM_UNLOCK (demux);
+ return NULL;
+ }
GST_DEBUG_OBJECT (demux, "creating new pad for SSRC %08x", ssrc);
}
static void
+gst_rtp_ssrc_demux_set_property (GObject * object, guint prop_id,
+ const GValue * value, GParamSpec * pspec)
+{
+ GstRtpSsrcDemux *demux;
+
+ demux = GST_RTP_SSRC_DEMUX (object);
+ switch (prop_id) {
+ case PROP_MAX_STREAMS:
+ demux->max_streams = g_value_get_uint (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gst_rtp_ssrc_demux_get_property (GObject * object, guint prop_id,
+ GValue * value, GParamSpec * pspec)
+{
+ GstRtpSsrcDemux *demux;
+
+ demux = GST_RTP_SSRC_DEMUX (object);
+ switch (prop_id) {
+ case PROP_MAX_STREAMS:
+ g_value_set_uint (value, demux->max_streams);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass)
{
GObjectClass *gobject_klass;
gobject_klass->dispose = gst_rtp_ssrc_demux_dispose;
gobject_klass->finalize = gst_rtp_ssrc_demux_finalize;
+ gobject_klass->set_property = gst_rtp_ssrc_demux_set_property;
+ gobject_klass->get_property = gst_rtp_ssrc_demux_get_property;
+
+ g_object_class_install_property (gobject_klass, PROP_MAX_STREAMS,
+ g_param_spec_uint ("max-streams", "Max Streams",
+ "The maximum number of streams allowed",
+ 0, G_MAXUINT, DEFAULT_MAX_STREAMS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstRtpSsrcDemux::new-ssrc-pad:
gst_rtp_ssrc_demux_iterate_internal_links_sink);
gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
+ demux->max_streams = DEFAULT_MAX_STREAMS;
+
g_rec_mutex_init (&demux->padlock);
}
}
create_failed:
{
- GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
- ("Could not create new pad"));
gst_buffer_unref (buf);
- return GST_FLOW_ERROR;
+ GST_WARNING_OBJECT (demux,
+ "Dropping buffer SSRC %08x. "
+ "Max streams number reached (%u)", ssrc, demux->max_streams);
+ return GST_FLOW_OK;
}
}
}
create_failed:
{
- GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL),
- ("Could not create new pad"));
gst_buffer_unref (buf);
- return GST_FLOW_ERROR;
+ GST_WARNING_OBJECT (demux,
+ "Dropping buffer SSRC %08x. "
+ "Max streams number reached (%u)", ssrc, demux->max_streams);
+ return GST_FLOW_OK;
}
}
*
* Copyright (C) 2018 Collabora Ltd.
* Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
+ * Copyright (C) 2019 Pexip
+ * Author: Havard Graff <havard@pexip.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
GST_END_TEST;
+
+static void
+new_ssrc_pad_found (GstElement * element, G_GNUC_UNUSED guint ssrc,
+ GstPad * pad, GSList ** src_h)
+{
+ GstHarness *h = gst_harness_new_with_element (element, NULL, NULL);
+ gst_harness_add_element_src_pad (h, pad);
+ *src_h = g_slist_prepend (*src_h, h);
+}
+
+GST_START_TEST (test_rtpssrcdemux_max_streams)
+{
+ GstHarness *h = gst_harness_new_with_padnames ("rtpssrcdemux", "sink", NULL);
+ GSList *src_h = NULL;
+ gint i;
+
+ g_object_set (h->element, "max-streams", 64, NULL);
+ gst_harness_set_src_caps_str (h, "application/x-rtp");
+ g_signal_connect (h->element,
+ "new-ssrc-pad", (GCallback) new_ssrc_pad_found, &src_h);
+ gst_harness_play (h);
+
+ for (i = 0; i < 128; ++i) {
+ fail_unless_equals_int (GST_FLOW_OK,
+ gst_harness_push (h, create_buffer (0, i)));
+ }
+
+ fail_unless_equals_int (g_slist_length (src_h), 64);
+ g_slist_free_full (src_h, (GDestroyNotify) gst_harness_teardown);
+ gst_harness_teardown (h);
+}
+
+GST_END_TEST;
+
static Suite *
rtpssrcdemux_suite (void)
{
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_event_forwarding);
tcase_add_test (tc_chain, test_oob_event_locking);
+ tcase_add_test (tc_chain, test_rtpssrcdemux_max_streams);
return s;
}