From 53a45b12224ef9e4017199f6f0dacbd83207016d Mon Sep 17 00:00:00 2001 From: Havard Graff Date: Wed, 18 Oct 2017 11:14:36 +0200 Subject: [PATCH] Initial commit of GstRtpFunnel For funneling together rtp-streams into a single session. Use-cases include multiplexing and bundle. --- gst/rtpmanager/Makefile.am | 7 +- gst/rtpmanager/gstrtpfunnel.c | 468 +++++++++++++++++++++++++++++++++++++++ gst/rtpmanager/gstrtpfunnel.h | 40 ++++ gst/rtpmanager/gstrtpmanager.c | 5 + gst/rtpmanager/meson.build | 1 + tests/check/Makefile.am | 6 +- tests/check/elements/rtpfunnel.c | 193 ++++++++++++++++ tests/check/meson.build | 1 + 8 files changed, 718 insertions(+), 3 deletions(-) create mode 100644 gst/rtpmanager/gstrtpfunnel.c create mode 100644 gst/rtpmanager/gstrtpfunnel.h create mode 100644 tests/check/elements/rtpfunnel.c diff --git a/gst/rtpmanager/Makefile.am b/gst/rtpmanager/Makefile.am index 598e61f..0defa28 100644 --- a/gst/rtpmanager/Makefile.am +++ b/gst/rtpmanager/Makefile.am @@ -14,7 +14,8 @@ libgstrtpmanager_la_SOURCES = gstrtpmanager.c \ rtpsession.c \ rtpsource.c \ rtpstats.c \ - gstrtpsession.c + gstrtpsession.c \ + gstrtpfunnel.c noinst_HEADERS = gstrtpbin.h \ gstrtpdtmfmux.h \ @@ -29,7 +30,9 @@ noinst_HEADERS = gstrtpbin.h \ rtpsession.h \ rtpsource.h \ rtpstats.h \ - gstrtpsession.h + gstrtpsession.h \ + gstrtpfunnel.h + libgstrtpmanager_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_CFLAGS) \ $(GST_NET_CFLAGS) $(WARNING_CFLAGS) $(ERROR_CFLAGS) diff --git a/gst/rtpmanager/gstrtpfunnel.c b/gst/rtpmanager/gstrtpfunnel.c new file mode 100644 index 0000000..13c753a --- /dev/null +++ b/gst/rtpmanager/gstrtpfunnel.c @@ -0,0 +1,468 @@ +/* RTP funnel element for GStreamer + * + * gstrtpfunnel.c: + * + * Copyright (C) <2017> Pexip. + * Contact: Havard Graff + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "gstrtpfunnel.h" + +GST_DEBUG_CATEGORY_STATIC (gst_rtp_funnel_debug); +#define GST_CAT_DEFAULT gst_rtp_funnel_debug + +struct _GstRtpFunnelPad +{ + GstPad pad; + guint32 ssrc; +}; + +enum +{ + PROP_0, + PROP_COMMON_TS_OFFSET, +}; + +#define DEFAULT_COMMON_TS_OFFSET -1 + +G_DEFINE_TYPE (GstRtpFunnelPad, gst_rtp_funnel_pad, GST_TYPE_PAD); + +static void +gst_rtp_funnel_pad_class_init (GstRtpFunnelPadClass * klass) +{ + (void) klass; +} + +static void +gst_rtp_funnel_pad_init (GstRtpFunnelPad * pad) +{ + (void) pad; +} + +struct _GstRtpFunnel +{ + GstElement element; + + GstPad *srcpad; + GstCaps *srccaps; + gboolean send_sticky_events; + GHashTable *ssrc_to_pad; + + /* properties */ + gint common_ts_offset; +}; + +#define RTP_CAPS "application/x-rtp" + +static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink_%u", + GST_PAD_SINK, + GST_PAD_REQUEST, + GST_STATIC_CAPS (RTP_CAPS)); + +static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS (RTP_CAPS)); + +#define gst_rtp_funnel_parent_class parent_class +G_DEFINE_TYPE (GstRtpFunnel, gst_rtp_funnel, GST_TYPE_ELEMENT); + + +static gboolean +gst_rtp_funnel_send_sticky (GstRtpFunnel * funnel, GstPad * pad) +{ + GstEvent *stream_start; + GstEvent *caps; + GstEvent *segment; + + if (!funnel->send_sticky_events) + goto done; + + stream_start = gst_pad_get_sticky_event (pad, GST_EVENT_STREAM_START, 0); + if (stream_start && !gst_pad_push_event (funnel->srcpad, stream_start)) { + GST_ERROR_OBJECT (funnel, "Could not push stream start"); + goto done; + } + + caps = gst_event_new_caps (funnel->srccaps); + if (caps && !gst_pad_push_event (funnel->srcpad, caps)) { + GST_ERROR_OBJECT (funnel, "Could not push caps"); + goto done; + } + + segment = gst_pad_get_sticky_event (pad, GST_EVENT_SEGMENT, 0); + if (segment && !gst_pad_push_event (funnel->srcpad, segment)) { + GST_ERROR_OBJECT (funnel, "Could not push segment"); + goto done; + } + + funnel->send_sticky_events = FALSE; + +done: + return !funnel->send_sticky_events; +} + +static GstFlowReturn +gst_rtp_funnel_sink_chain_object (GstPad * pad, GstRtpFunnel * funnel, + gboolean is_list, GstMiniObject * obj) +{ + GstFlowReturn res; + + GST_DEBUG_OBJECT (pad, "received %" GST_PTR_FORMAT, obj); + + GST_PAD_STREAM_LOCK (funnel->srcpad); + + if (!gst_rtp_funnel_send_sticky (funnel, pad)) { + GST_PAD_STREAM_UNLOCK (funnel->srcpad); + gst_mini_object_unref (obj); + return GST_FLOW_OK; + } + + if (is_list) + res = gst_pad_push_list (funnel->srcpad, GST_BUFFER_LIST_CAST (obj)); + else + res = gst_pad_push (funnel->srcpad, GST_BUFFER_CAST (obj)); + + GST_PAD_STREAM_UNLOCK (funnel->srcpad); + + return res; +} + +static GstFlowReturn +gst_rtp_funnel_sink_chain_list (GstPad * pad, GstObject * parent, + GstBufferList * list) +{ + GstRtpFunnel *funnel = GST_RTP_FUNNEL_CAST (parent); + + return gst_rtp_funnel_sink_chain_object (pad, funnel, TRUE, + GST_MINI_OBJECT_CAST (list)); +} + +static GstFlowReturn +gst_rtp_funnel_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer) +{ + GstRtpFunnel *funnel = GST_RTP_FUNNEL_CAST (parent); + + return gst_rtp_funnel_sink_chain_object (pad, funnel, FALSE, + GST_MINI_OBJECT_CAST (buffer)); +} + +static gboolean +gst_rtp_funnel_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstRtpFunnel *funnel = GST_RTP_FUNNEL_CAST (parent); + gboolean forward = TRUE; + gboolean ret = TRUE; + + GST_DEBUG_OBJECT (pad, "received event %" GST_PTR_FORMAT, event); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_STREAM_START: + case GST_EVENT_SEGMENT: + forward = FALSE; + break; + case GST_EVENT_CAPS: + { + GstCaps *caps; + GstStructure *s; + guint ssrc; + gst_event_parse_caps (event, &caps); + + if (!gst_caps_can_intersect (funnel->srccaps, caps)) { + GST_ERROR_OBJECT (funnel, "Can't intersect with caps %" GST_PTR_FORMAT, + caps); + g_assert_not_reached (); + } + + s = gst_caps_get_structure (caps, 0); + if (gst_structure_get_uint (s, "ssrc", &ssrc)) { + GstRtpFunnelPad *fpad = GST_RTP_FUNNEL_PAD_CAST (pad); + fpad->ssrc = ssrc; + GST_DEBUG_OBJECT (pad, "Got ssrc: %u", ssrc); + GST_OBJECT_LOCK (funnel); + g_hash_table_insert (funnel->ssrc_to_pad, GUINT_TO_POINTER (ssrc), pad); + GST_OBJECT_UNLOCK (funnel); + } + + forward = FALSE; + break; + } + default: + break; + } + + if (forward) { + ret = gst_pad_event_default (pad, parent, event); + } else { + gst_event_unref (event); + } + + return ret; +} + +static gboolean +gst_rtp_funnel_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) +{ + GstRtpFunnel *funnel = GST_RTP_FUNNEL_CAST (parent); + gboolean res = FALSE; + (void) funnel; + + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_CAPS: + { + GstCaps *filter_caps; + GstCaps *new_caps; + + gst_query_parse_caps (query, &filter_caps); + + if (filter_caps) { + new_caps = gst_caps_intersect_full (funnel->srccaps, filter_caps, + GST_CAPS_INTERSECT_FIRST); + } else { + new_caps = gst_caps_copy (funnel->srccaps); + } + + if (funnel->common_ts_offset >= 0) + gst_caps_set_simple (new_caps, "timestamp-offset", G_TYPE_UINT, + (guint) funnel->common_ts_offset, NULL); + + gst_query_set_caps_result (query, new_caps); + GST_DEBUG_OBJECT (pad, "Answering caps-query with caps: %" + GST_PTR_FORMAT, new_caps); + gst_caps_unref (new_caps); + res = TRUE; + break; + } + default: + res = gst_pad_query_default (pad, parent, query); + break; + } + + return res; +} + +static gboolean +gst_rtp_funnel_src_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstRtpFunnel *funnel = GST_RTP_FUNNEL_CAST (parent); + gboolean handled = FALSE; + gboolean ret = TRUE; + + GST_DEBUG_OBJECT (pad, "received event %" GST_PTR_FORMAT, event); + + if (GST_EVENT_TYPE (event) == GST_EVENT_CUSTOM_UPSTREAM) { + const GstStructure *s = gst_event_get_structure (event); + GstPad *fpad; + guint ssrc; + if (s && gst_structure_get_uint (s, "ssrc", &ssrc)) { + handled = TRUE; + + GST_OBJECT_LOCK (funnel); + fpad = g_hash_table_lookup (funnel->ssrc_to_pad, GUINT_TO_POINTER (ssrc)); + if (fpad) + gst_object_ref (fpad); + GST_OBJECT_UNLOCK (funnel); + + if (fpad) { + GST_INFO_OBJECT (pad, "Sending %" GST_PTR_FORMAT " to %" GST_PTR_FORMAT, + event, fpad); + ret = gst_pad_push_event (fpad, event); + gst_object_unref (fpad); + } else { + gst_event_unref (event); + } + } + } + + if (!handled) { + gst_pad_event_default (pad, parent, event); + } + + return ret; +} + +static GstPad * +gst_rtp_funnel_request_new_pad (GstElement * element, GstPadTemplate * templ, + const gchar * name, const GstCaps * caps) +{ + GstPad *sinkpad; + (void) caps; + + GST_DEBUG_OBJECT (element, "requesting pad"); + + sinkpad = GST_PAD_CAST (g_object_new (GST_TYPE_RTP_FUNNEL_PAD, + "name", name, "direction", templ->direction, "template", templ, + NULL)); + + gst_pad_set_chain_function (sinkpad, + GST_DEBUG_FUNCPTR (gst_rtp_funnel_sink_chain)); + gst_pad_set_chain_list_function (sinkpad, + GST_DEBUG_FUNCPTR (gst_rtp_funnel_sink_chain_list)); + gst_pad_set_event_function (sinkpad, + GST_DEBUG_FUNCPTR (gst_rtp_funnel_sink_event)); + gst_pad_set_query_function (sinkpad, + GST_DEBUG_FUNCPTR (gst_rtp_funnel_sink_query)); + + GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_CAPS); + GST_OBJECT_FLAG_SET (sinkpad, GST_PAD_FLAG_PROXY_ALLOCATION); + + gst_pad_set_active (sinkpad, TRUE); + + gst_element_add_pad (element, sinkpad); + + GST_DEBUG_OBJECT (element, "requested pad %s:%s", + GST_DEBUG_PAD_NAME (sinkpad)); + + return sinkpad; +} + +static void +gst_rtp_funnel_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstRtpFunnel *funnel = GST_RTP_FUNNEL_CAST (object); + + switch (prop_id) { + case PROP_COMMON_TS_OFFSET: + funnel->common_ts_offset = g_value_get_int (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_rtp_funnel_get_property (GObject * object, guint prop_id, GValue * value, + GParamSpec * pspec) +{ + GstRtpFunnel *funnel = GST_RTP_FUNNEL_CAST (object); + + switch (prop_id) { + case PROP_COMMON_TS_OFFSET: + g_value_set_int (value, funnel->common_ts_offset); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstStateChangeReturn +gst_rtp_funnel_change_state (GstElement * element, GstStateChange transition) +{ + GstRtpFunnel *funnel = GST_RTP_FUNNEL_CAST (element); + GstStateChangeReturn ret; + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_PAUSED_TO_READY: + funnel->send_sticky_events = TRUE; + break; + default: + break; + } + + return ret; +} + +static gboolean +_remove_pad_func (gpointer key, gpointer value, gpointer user_data) +{ + (void) key; + if (GST_PAD_CAST (value) == GST_PAD_CAST (user_data)) + return TRUE; + return FALSE; +} + +static void +gst_rtp_funnel_release_pad (GstElement * element, GstPad * pad) +{ + GstRtpFunnel *funnel = GST_RTP_FUNNEL_CAST (element); + + GST_DEBUG_OBJECT (funnel, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad)); + + g_hash_table_foreach_remove (funnel->ssrc_to_pad, _remove_pad_func, pad); + + gst_pad_set_active (pad, FALSE); + gst_element_remove_pad (GST_ELEMENT_CAST (funnel), pad); +} + +static void +gst_rtp_funnel_dispose (GObject * object) +{ + GstRtpFunnel *funnel = GST_RTP_FUNNEL_CAST (object); + + g_assert (GST_ELEMENT_CAST (object)->numsinkpads == 0); + gst_caps_unref (funnel->srccaps); + g_hash_table_destroy (funnel->ssrc_to_pad); + + G_OBJECT_CLASS (parent_class)->dispose (object); +} + +static void +gst_rtp_funnel_class_init (GstRtpFunnelClass * klass) +{ + GObjectClass *gobject_class = G_OBJECT_CLASS (klass); + GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); + + gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_rtp_funnel_dispose); + gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_rtp_funnel_get_property); + gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_rtp_funnel_set_property); + gstelement_class->request_new_pad = + GST_DEBUG_FUNCPTR (gst_rtp_funnel_request_new_pad); + gstelement_class->release_pad = + GST_DEBUG_FUNCPTR (gst_rtp_funnel_release_pad); + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_rtp_funnel_change_state); + + gst_element_class_set_static_metadata (gstelement_class, "RTP funnel", + "RTP Funneling", + "Funnel RTP buffers together for multiplexing", + "Havard Graff "); + + gst_element_class_add_static_pad_template (gstelement_class, &sink_template); + gst_element_class_add_static_pad_template (gstelement_class, &src_template); + + g_object_class_install_property (gobject_class, PROP_COMMON_TS_OFFSET, + g_param_spec_int ("common-ts-offset", "Common Timestamp Offset", + "Use the same RTP timestamp offset for all sinkpads (-1 = disable)", + -1, G_MAXINT32, DEFAULT_COMMON_TS_OFFSET, + G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + GST_DEBUG_CATEGORY_INIT (gst_rtp_funnel_debug, + "gstrtpfunnel", 0, "funnel element"); +} + +static void +gst_rtp_funnel_init (GstRtpFunnel * funnel) +{ + funnel->srcpad = gst_pad_new_from_static_template (&src_template, "src"); + gst_pad_use_fixed_caps (funnel->srcpad); + gst_pad_set_event_function (funnel->srcpad, + GST_DEBUG_FUNCPTR (gst_rtp_funnel_src_event)); + gst_element_add_pad (GST_ELEMENT (funnel), funnel->srcpad); + + funnel->send_sticky_events = TRUE; + funnel->srccaps = gst_caps_new_empty_simple (RTP_CAPS); + funnel->ssrc_to_pad = g_hash_table_new (NULL, NULL); +} diff --git a/gst/rtpmanager/gstrtpfunnel.h b/gst/rtpmanager/gstrtpfunnel.h new file mode 100644 index 0000000..26f6230 --- /dev/null +++ b/gst/rtpmanager/gstrtpfunnel.h @@ -0,0 +1,40 @@ +/* RTP funnel element for GStreamer + * + * gstrtpfunnel.h: + * + * Copyright (C) <2017> Pexip. + * Contact: Havard Graff + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +#ifndef __GST_RTP_FUNNEL_H__ +#define __GST_RTP_FUNNEL_H__ + +#include + +G_BEGIN_DECLS + +G_DECLARE_FINAL_TYPE (GstRtpFunnel, gst_rtp_funnel, GST, RTP_FUNNEL, GstElement) +#define GST_TYPE_RTP_FUNNEL (gst_rtp_funnel_get_type()) +#define GST_RTP_FUNNEL_CAST(obj) ((GstRtpFunnel *)(obj)) + +G_DECLARE_FINAL_TYPE (GstRtpFunnelPad, gst_rtp_funnel_pad, GST, RTP_FUNNEL_PAD, GstPad) +#define GST_TYPE_RTP_FUNNEL_PAD (gst_rtp_funnel_pad_get_type()) +#define GST_RTP_FUNNEL_PAD_CAST(obj) ((GstRtpFunnelPad *)(obj)) + +G_END_DECLS + +#endif /* __GST_RTP_FUNNEL_H__ */ diff --git a/gst/rtpmanager/gstrtpmanager.c b/gst/rtpmanager/gstrtpmanager.c index 426b0b7..4ba624f 100644 --- a/gst/rtpmanager/gstrtpmanager.c +++ b/gst/rtpmanager/gstrtpmanager.c @@ -31,6 +31,7 @@ #include "gstrtpssrcdemux.h" #include "gstrtpdtmfmux.h" #include "gstrtpmux.h" +#include "gstrtpfunnel.h" static gboolean plugin_init (GstPlugin * plugin) @@ -69,6 +70,10 @@ plugin_init (GstPlugin * plugin) if (!gst_rtp_dtmf_mux_plugin_init (plugin)) return FALSE; + if (!gst_element_register (plugin, "rtpfunnel", GST_RANK_NONE, + GST_TYPE_RTP_FUNNEL)) + return FALSE; + return TRUE; } diff --git a/gst/rtpmanager/meson.build b/gst/rtpmanager/meson.build index 1c9fa45..c524ff4 100644 --- a/gst/rtpmanager/meson.build +++ b/gst/rtpmanager/meson.build @@ -14,6 +14,7 @@ rtpmanager_sources = [ 'rtpsource.c', 'rtpstats.c', 'gstrtpsession.c', + 'gstrtpfunnel.c', ] gstrtpmanager = library('gstrtpmanager', diff --git a/tests/check/Makefile.am b/tests/check/Makefile.am index 8339457..5d9e74b 100644 --- a/tests/check/Makefile.am +++ b/tests/check/Makefile.am @@ -254,7 +254,8 @@ check_rtpmanager = \ elements/rtpsession \ elements/rtpstorage \ elements/rtpred \ - elements/rtpulpfec + elements/rtpulpfec \ + elements/rtpfunnel else check_rtpmanager = endif @@ -605,6 +606,9 @@ elements_rtprtx_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstrtp-$(GST_API_VERSION) $(L elements_rtpsession_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(CFLAGS) $(AM_CFLAGS) elements_rtpsession_LDADD = $(GST_PLUGINS_BASE_LIBS) $(GST_NET_LIBS) -lgstrtp-$(GST_API_VERSION) -lgstvideo-$(GST_API_VERSION) $(GIO_LIBS) $(LDADD) +elements_rtpfunnel_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_BASE_CFLAGS) $(AM_CFLAGS) +elements_rtpfunnel_LDADD = $(GST_PLUGINS_BASE_LIBS) -lgstrtp-$(GST_API_VERSION) $(GST_BASE_LIBS) $(LDADD) + elements_rtpcollision_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(CFLAGS) $(AM_CFLAGS) elements_rtpcollision_LDADD = $(GST_PLUGINS_BASE_LIBS) $(GST_NET_LIBS) -lgstrtp-$(GST_API_VERSION) $(GIO_LIBS) $(LDADD) diff --git a/tests/check/elements/rtpfunnel.c b/tests/check/elements/rtpfunnel.c new file mode 100644 index 0000000..5c690e2 --- /dev/null +++ b/tests/check/elements/rtpfunnel.c @@ -0,0 +1,193 @@ +/* GStreamer + * + * unit test for rtpfunnel + * + * Copyright (C) <2017> Pexip. + * Contact: Havard Graff + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +#include +#include + +GST_START_TEST (rtpfunnel_ssrc_demuxing) +{ + GstHarness *h = gst_harness_new_with_padnames ("rtpfunnel", NULL, "src"); + GstHarness *h0 = gst_harness_new_with_element (h->element, "sink_0", NULL); + GstHarness *h1 = gst_harness_new_with_element (h->element, "sink_1", NULL); + + gst_harness_set_src_caps_str (h0, "application/x-rtp, ssrc=(uint)123"); + gst_harness_set_src_caps_str (h1, "application/x-rtp, ssrc=(uint)321"); + + /* unref latency events */ + gst_event_unref (gst_harness_pull_upstream_event (h0)); + gst_event_unref (gst_harness_pull_upstream_event (h1)); + fail_unless_equals_int (1, gst_harness_upstream_events_received (h0)); + fail_unless_equals_int (1, gst_harness_upstream_events_received (h1)); + + /* send to pad 0 */ + gst_harness_push_upstream_event (h, + gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, + gst_structure_new ("GstForceKeyUnit", + "ssrc", G_TYPE_UINT, 123, NULL))); + fail_unless_equals_int (2, gst_harness_upstream_events_received (h0)); + fail_unless_equals_int (1, gst_harness_upstream_events_received (h1)); + + /* send to pad 1 */ + gst_harness_push_upstream_event (h, + gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, + gst_structure_new ("GstForceKeyUnit", + "ssrc", G_TYPE_UINT, 321, NULL))); + fail_unless_equals_int (2, gst_harness_upstream_events_received (h0)); + fail_unless_equals_int (2, gst_harness_upstream_events_received (h1)); + + /* unknown ssrc, we drop it */ + gst_harness_push_upstream_event (h, + gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, + gst_structure_new ("GstForceKeyUnit", + "ssrc", G_TYPE_UINT, 666, NULL))); + fail_unless_equals_int (2, gst_harness_upstream_events_received (h0)); + fail_unless_equals_int (2, gst_harness_upstream_events_received (h1)); + + /* no ssrc, we send to all */ + gst_harness_push_upstream_event (h, + gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, + gst_structure_new_empty ("GstForceKeyUnit"))); + fail_unless_equals_int (3, gst_harness_upstream_events_received (h0)); + fail_unless_equals_int (3, gst_harness_upstream_events_received (h1)); + + /* remove pad 0, and send an event referencing the now dead ssrc */ + gst_harness_teardown (h0); + gst_harness_push_upstream_event (h, + gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM, + gst_structure_new ("GstForceKeyUnit", + "ssrc", G_TYPE_UINT, 123, NULL))); + fail_unless_equals_int (3, gst_harness_upstream_events_received (h1)); + + gst_harness_teardown (h); + gst_harness_teardown (h1); +} + +GST_END_TEST +GST_START_TEST (rtpfunnel_ssrc_downstream_not_leaking_through) +{ + GstHarness *h = gst_harness_new_with_padnames ("rtpfunnel", + "sink_0", "src"); + GstCaps *caps; + const GstStructure *s; + + gst_harness_set_sink_caps_str (h, "application/x-rtp, ssrc=(uint)123"); + + caps = gst_pad_peer_query_caps (h->srcpad, NULL); + s = gst_caps_get_structure (caps, 0); + + fail_unless (!gst_structure_has_field (s, "ssrc")); + + gst_caps_unref (caps); + gst_harness_teardown (h); +} + +GST_END_TEST +GST_START_TEST (rtpfunnel_common_ts_offset) +{ + GstHarness *h = gst_harness_new_with_padnames ("rtpfunnel", + "sink_0", "src"); + GstCaps *caps; + const GstStructure *s; + const guint expected_ts_offset = 12345; + guint ts_offset; + + g_object_set (h->element, "common-ts-offset", expected_ts_offset, NULL); + + caps = gst_pad_peer_query_caps (h->srcpad, NULL); + s = gst_caps_get_structure (caps, 0); + + fail_unless (gst_structure_get_uint (s, "timestamp-offset", &ts_offset)); + fail_unless_equals_int (expected_ts_offset, ts_offset); + + gst_caps_unref (caps); + gst_harness_teardown (h); +} + +GST_END_TEST +GST_START_TEST (rtpfunnel_stress) +{ + GstHarness *h = gst_harness_new_with_padnames ("rtpfunnel", + "sink_0", "src"); + GstHarness *h1 = gst_harness_new_with_element (h->element, "sink_1", NULL); + + GstPadTemplate *templ = + gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (h->element), + "sink_%u"); + GstCaps *caps = gst_caps_from_string ("application/x-rtp, ssrc=(uint)123"); + GstBuffer *buf = gst_buffer_new_allocate (NULL, 0, NULL); + GstSegment segment; + GstHarnessThread *statechange, *push, *req, *push1; + + gst_check_add_log_filter ("GStreamer", G_LOG_LEVEL_WARNING, + g_regex_new ("Got data flow before (stream-start|segment) event", + (GRegexCompileFlags) 0, (GRegexMatchFlags) 0, NULL), + NULL, NULL, NULL); + gst_check_add_log_filter ("GStreamer", G_LOG_LEVEL_WARNING, + g_regex_new ("Sticky event misordering", + (GRegexCompileFlags) 0, (GRegexMatchFlags) 0, NULL), + NULL, NULL, NULL); + + + gst_segment_init (&segment, GST_FORMAT_TIME); + + statechange = gst_harness_stress_statechange_start (h); + push = gst_harness_stress_push_buffer_start (h, caps, &segment, buf); + req = gst_harness_stress_requestpad_start (h, templ, NULL, NULL, TRUE); + push1 = gst_harness_stress_push_buffer_start (h1, caps, &segment, buf); + + gst_caps_unref (caps); + gst_buffer_unref (buf); + + /* test-length */ + g_usleep (G_USEC_PER_SEC * 1); + + gst_harness_stress_thread_stop (push1); + gst_harness_stress_thread_stop (req); + gst_harness_stress_thread_stop (push); + gst_harness_stress_thread_stop (statechange); + + gst_harness_teardown (h1); + gst_harness_teardown (h); + + gst_check_clear_log_filter (); +} + +GST_END_TEST; + +static Suite * +rtpfunnel_suite (void) +{ + Suite *s = suite_create ("rtpfunnel"); + TCase *tc_chain = tcase_create ("general"); + + suite_add_tcase (s, tc_chain); + + tcase_add_test (tc_chain, rtpfunnel_ssrc_demuxing); + tcase_add_test (tc_chain, rtpfunnel_ssrc_downstream_not_leaking_through); + tcase_add_test (tc_chain, rtpfunnel_common_ts_offset); + + tcase_add_test (tc_chain, rtpfunnel_stress); + + return s; +} + +GST_CHECK_MAIN (rtpfunnel) diff --git a/tests/check/meson.build b/tests/check/meson.build index cc2407d..ef0f447 100644 --- a/tests/check/meson.build +++ b/tests/check/meson.build @@ -75,6 +75,7 @@ good_tests = [ [ 'elements/rtpbin_buffer_list' ], [ 'elements/rtpbundle' ], [ 'elements/rtpcollision' ], + [ 'elements/rtpfunnel' ], [ 'elements/rtpjitterbuffer' ], [ 'elements/rtpmux' ], [ 'elements/rtprtx' ], -- 2.7.4