X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Frtpmanager%2Fgstrtpssrcdemux.c;h=a1fee7b77a6d3b781e575603883f58c1c3a95261;hb=deeb3be3ec26feef48f277d85bf55e816a228d4e;hp=0f4b23cae17be68658761e9d5c44953d0f497304;hpb=bbca040336a1477032f2baf675c5750cc7b21960;p=platform%2Fupstream%2Fgst-plugins-good.git diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index 0f4b23c..a1fee7b 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -21,21 +21,21 @@ /** * SECTION:element-rtpssrcdemux + * @title: rtpssrcdemux * * rtpssrcdemux acts as a demuxer for RTP packets based on the SSRC of the * packets. Its main purpose is to allow an application to easily receive and * decode an RTP stream with multiple SSRCs. - * + * * For each SSRC that is detected, a new pad will be created and the - * #GstRtpSsrcDemux::new-ssrc-pad signal will be emitted. - * - * - * Example pipelines + * #GstRtpSsrcDemux::new-ssrc-pad signal will be emitted. + * + * ## Example pipelines * |[ * gst-launch-1.0 udpsrc caps="application/x-rtp" ! rtpssrcdemux ! fakesink * ]| Takes an RTP stream and send the RTP packets with the first detected SSRC * to fakesink, discarding the other SSRCs. - * + * */ #ifdef HAVE_CONFIG_H @@ -80,8 +80,8 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u", GST_STATIC_CAPS ("application/x-rtcp") ); -#define GST_PAD_LOCK(obj) (g_rec_mutex_lock (&(obj)->padlock)) -#define GST_PAD_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock)) +#define INTERNAL_STREAM_LOCK(obj) (g_rec_mutex_lock (&(obj)->padlock)) +#define INTERNAL_STREAM_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock)) typedef enum { @@ -89,6 +89,13 @@ typedef enum RTCP_PAD } PadType; +#define DEFAULT_MAX_STREAMS G_MAXUINT +enum +{ + PROP_0, + PROP_MAX_STREAMS +}; + /* signals */ enum { @@ -142,12 +149,10 @@ struct _GstRtpSsrcDemuxPad GstPad *rtp_pad; GstCaps *caps; GstPad *rtcp_pad; - - gboolean pushed_initial_rtp_events; - gboolean pushed_initial_rtcp_events; }; /* find a src pad for a given SSRC, returns NULL if the SSRC was not found + * MUST be called with object lock */ static GstRtpSsrcDemuxPad * find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) @@ -163,6 +168,38 @@ find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) return NULL; } +/* returns a reference to the pad if found, %NULL otherwise */ +static GstPad * +get_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, PadType padtype) +{ + GstRtpSsrcDemuxPad *demuxpad; + GstPad *retpad; + + GST_OBJECT_LOCK (demux); + + demuxpad = find_demux_pad_for_ssrc (demux, ssrc); + if (!demuxpad) { + GST_OBJECT_UNLOCK (demux); + return NULL; + } + + switch (padtype) { + case RTP_PAD: + retpad = gst_object_ref (demuxpad->rtp_pad); + break; + case RTCP_PAD: + retpad = gst_object_ref (demuxpad->rtcp_pad); + break; + default: + retpad = NULL; + g_assert_not_reached (); + } + + GST_OBJECT_UNLOCK (demux); + + return retpad; +} + static GstEvent * add_ssrc_and_ref (GstEvent * event, guint32 ssrc) { @@ -197,6 +234,7 @@ struct ForwardStickyEventData guint32 ssrc; }; +/* With internal stream lock held */ static gboolean forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data) { @@ -210,6 +248,7 @@ forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data) return TRUE; } +/* With internal stream lock held */ static void forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad, PadType padtype) @@ -230,6 +269,7 @@ forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad, gst_pad_sticky_events_foreach (sinkpad, forward_sticky_events, &fdata); } +/* MUST only be called from streaming thread */ static GstPad * find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, PadType padtype) @@ -240,40 +280,22 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, gchar *padname; GstRtpSsrcDemuxPad *demuxpad; GstPad *retpad; - gulong rtp_block, rtcp_block; - - GST_PAD_LOCK (demux); - - demuxpad = find_demux_pad_for_ssrc (demux, ssrc); - if (demuxpad != NULL) { - gboolean forward = FALSE; - - switch (padtype) { - case RTP_PAD: - retpad = gst_object_ref (demuxpad->rtp_pad); - if (!demuxpad->pushed_initial_rtp_events) { - forward = TRUE; - demuxpad->pushed_initial_rtp_events = TRUE; - } - break; - case RTCP_PAD: - retpad = gst_object_ref (demuxpad->rtcp_pad); - if (!demuxpad->pushed_initial_rtcp_events) { - forward = TRUE; - demuxpad->pushed_initial_rtcp_events = TRUE; - } - break; - default: - retpad = NULL; - g_assert_not_reached (); - } + guint num_streams; - GST_PAD_UNLOCK (demux); + INTERNAL_STREAM_LOCK (demux); - if (forward) - forward_initial_events (demux, ssrc, retpad, padtype); + retpad = get_demux_pad_for_ssrc (demux, ssrc, padtype); + if (retpad != NULL) { + INTERNAL_STREAM_UNLOCK (demux); return retpad; } + /* We create 2 src pads per ssrc (RTP & RTCP). Checking if we are allowed + to create 2 more pads */ + num_streams = (GST_ELEMENT_CAST (demux)->numsrcpads) >> 1; + if (num_streams >= demux->max_streams) { + INTERNAL_STREAM_UNLOCK (demux); + return NULL; + } GST_DEBUG_OBJECT (demux, "creating new pad for SSRC %08x", ssrc); @@ -297,7 +319,9 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, gst_pad_set_element_private (rtp_pad, demuxpad); gst_pad_set_element_private (rtcp_pad, demuxpad); + GST_OBJECT_LOCK (demux); demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad); + GST_OBJECT_UNLOCK (demux); gst_pad_set_query_function (rtp_pad, gst_rtp_ssrc_demux_src_query); gst_pad_set_iterate_internal_links_function (rtp_pad, @@ -312,15 +336,8 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, gst_pad_use_fixed_caps (rtcp_pad); gst_pad_set_active (rtcp_pad, TRUE); - if (padtype == RTP_PAD) { - demuxpad->pushed_initial_rtp_events = TRUE; - forward_initial_events (demux, ssrc, rtp_pad, padtype); - } else if (padtype == RTCP_PAD) { - demuxpad->pushed_initial_rtcp_events = TRUE; - forward_initial_events (demux, ssrc, rtcp_pad, padtype); - } else { - g_assert_not_reached (); - } + forward_initial_events (demux, ssrc, rtp_pad, RTP_PAD); + forward_initial_events (demux, ssrc, rtcp_pad, RTCP_PAD); gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad); gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad); @@ -337,26 +354,46 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, g_assert_not_reached (); } - gst_object_ref (rtp_pad); - gst_object_ref (rtcp_pad); + g_signal_emit (G_OBJECT (demux), + gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad); - rtp_block = gst_pad_add_probe (rtp_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, - NULL, NULL, NULL); - rtcp_block = gst_pad_add_probe (rtcp_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, - NULL, NULL, NULL); + INTERNAL_STREAM_UNLOCK (demux); - GST_PAD_UNLOCK (demux); + return retpad; +} - g_signal_emit (G_OBJECT (demux), - gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad); +static void +gst_rtp_ssrc_demux_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstRtpSsrcDemux *demux; - gst_pad_remove_probe (rtp_pad, rtp_block); - gst_pad_remove_probe (rtcp_pad, rtcp_block); + demux = GST_RTP_SSRC_DEMUX (object); + switch (prop_id) { + case PROP_MAX_STREAMS: + demux->max_streams = g_value_get_uint (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} - gst_object_unref (rtp_pad); - gst_object_unref (rtcp_pad); +static void +gst_rtp_ssrc_demux_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstRtpSsrcDemux *demux; - return retpad; + demux = GST_RTP_SSRC_DEMUX (object); + switch (prop_id) { + case PROP_MAX_STREAMS: + g_value_set_uint (value, demux->max_streams); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } } static void @@ -372,6 +409,14 @@ gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass) gobject_klass->dispose = gst_rtp_ssrc_demux_dispose; gobject_klass->finalize = gst_rtp_ssrc_demux_finalize; + gobject_klass->set_property = gst_rtp_ssrc_demux_set_property; + gobject_klass->get_property = gst_rtp_ssrc_demux_get_property; + + g_object_class_install_property (gobject_klass, PROP_MAX_STREAMS, + g_param_spec_uint ("max-streams", "Max Streams", + "The maximum number of streams allowed", + 0, G_MAXUINT, DEFAULT_MAX_STREAMS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); /** * GstRtpSsrcDemux::new-ssrc-pad: @@ -379,14 +424,13 @@ gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass) * @ssrc: the SSRC of the pad * @pad: the new pad. * - * Emited when a new SSRC pad has been created. + * Emitted when a new SSRC pad has been created. */ gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD] = g_signal_new ("new-ssrc-pad", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, new_ssrc_pad), - NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT, - GST_TYPE_PAD); + NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD); /** * GstRtpSsrcDemux::removed-ssrc-pad: @@ -394,14 +438,13 @@ gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass) * @ssrc: the SSRC of the pad * @pad: the removed pad. * - * Emited when a SSRC pad has been removed. + * Emitted when a SSRC pad has been removed. */ gst_rtp_ssrc_demux_signals[SIGNAL_REMOVED_SSRC_PAD] = g_signal_new ("removed-ssrc-pad", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, removed_ssrc_pad), - NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT, - GST_TYPE_PAD); + NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD); /** * GstRtpSsrcDemux::clear-ssrc: @@ -414,21 +457,21 @@ gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass) g_signal_new ("clear-ssrc", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, clear_ssrc), - NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT); + NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT); gstelement_klass->change_state = GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_change_state); gstrtpssrcdemux_klass->clear_ssrc = GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_clear_ssrc); - gst_element_class_add_pad_template (gstelement_klass, - gst_static_pad_template_get (&rtp_ssrc_demux_sink_template)); - gst_element_class_add_pad_template (gstelement_klass, - gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_sink_template)); - gst_element_class_add_pad_template (gstelement_klass, - gst_static_pad_template_get (&rtp_ssrc_demux_src_template)); - gst_element_class_add_pad_template (gstelement_klass, - gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template)); + gst_element_class_add_static_pad_template (gstelement_klass, + &rtp_ssrc_demux_sink_template); + gst_element_class_add_static_pad_template (gstelement_klass, + &rtp_ssrc_demux_rtcp_sink_template); + gst_element_class_add_static_pad_template (gstelement_klass, + &rtp_ssrc_demux_src_template); + gst_element_class_add_static_pad_template (gstelement_klass, + &rtp_ssrc_demux_rtcp_src_template); gst_element_class_set_static_metadata (gstelement_klass, "RTP SSRC Demux", "Demux/Network/RTP", @@ -437,6 +480,9 @@ gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass) GST_DEBUG_CATEGORY_INIT (gst_rtp_ssrc_demux_debug, "rtpssrcdemux", 0, "RTP SSRC demuxer"); + + GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_ssrc_demux_chain); + GST_DEBUG_REGISTER_FUNCPTR (gst_rtp_ssrc_demux_rtcp_chain); } static void @@ -462,6 +508,8 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux) gst_rtp_ssrc_demux_iterate_internal_links_sink); gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink); + demux->max_streams = DEFAULT_MAX_STREAMS; + g_rec_mutex_init (&demux->padlock); } @@ -512,17 +560,17 @@ gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) { GstRtpSsrcDemuxPad *dpad; - GST_PAD_LOCK (demux); + GST_OBJECT_LOCK (demux); dpad = find_demux_pad_for_ssrc (demux, ssrc); if (dpad == NULL) { - GST_PAD_UNLOCK (demux); + GST_OBJECT_UNLOCK (demux); goto unknown_pad; } GST_DEBUG_OBJECT (demux, "clearing pad for SSRC %08x", ssrc); demux->srcpads = g_slist_remove (demux->srcpads, dpad); - GST_PAD_UNLOCK (demux); + GST_OBJECT_UNLOCK (demux); gst_pad_set_active (dpad->rtp_pad, FALSE); gst_pad_set_active (dpad->rtcp_pad, FALSE); @@ -561,25 +609,21 @@ forward_event (GstPad * pad, gpointer user_data) GSList *walk = NULL; GstEvent *newevent = NULL; - GST_PAD_LOCK (fdata->demux); + GST_OBJECT_LOCK (fdata->demux); for (walk = fdata->demux->srcpads; walk; walk = walk->next) { GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data; - /* Only forward the event if the initial events have been through first, - * the initial events should be forwarded before any other event - * or buffer is pushed */ - if ((pad == dpad->rtp_pad && dpad->pushed_initial_rtp_events) || - (pad == dpad->rtcp_pad && dpad->pushed_initial_rtcp_events)) { + if (pad == dpad->rtp_pad || pad == dpad->rtcp_pad) { newevent = add_ssrc_and_ref (fdata->event, dpad->ssrc); break; } } - GST_PAD_UNLOCK (fdata->demux); + GST_OBJECT_UNLOCK (fdata->demux); if (newevent) fdata->res &= gst_pad_push_event (pad, newevent); - return TRUE; + return FALSE; } @@ -612,7 +656,6 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) guint32 ssrc; GstRTPBuffer rtp = { NULL }; GstPad *srcpad; - GstRtpSsrcDemuxPad *dpad; demux = GST_RTP_SSRC_DEMUX (parent); @@ -632,14 +675,17 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) ret = gst_pad_push (srcpad, buf); if (ret != GST_FLOW_OK) { + GstPad *active_pad; + /* check if the ssrc still there, may have been removed */ - GST_PAD_LOCK (demux); - dpad = find_demux_pad_for_ssrc (demux, ssrc); - if (dpad == NULL || dpad->rtp_pad != srcpad) { + active_pad = get_demux_pad_for_ssrc (demux, ssrc, RTP_PAD); + + if (active_pad == NULL || active_pad != srcpad) { /* SSRC was removed during the push ... ignore the error */ ret = GST_FLOW_OK; } - GST_PAD_UNLOCK (demux); + + g_clear_object (&active_pad); } gst_object_unref (srcpad); @@ -649,18 +695,17 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) /* ERRORS */ invalid_payload: { - /* this is fatal and should be filtered earlier */ - GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), - ("Dropping invalid RTP payload")); + GST_DEBUG_OBJECT (demux, "Dropping invalid RTP packet"); gst_buffer_unref (buf); - return GST_FLOW_ERROR; + return GST_FLOW_OK; } create_failed: { - GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), - ("Could not create new pad")); gst_buffer_unref (buf); - return GST_FLOW_ERROR; + GST_WARNING_OBJECT (demux, + "Dropping buffer SSRC %08x. " + "Max streams number reached (%u)", ssrc, demux->max_streams); + return GST_FLOW_OK; } } @@ -674,11 +719,10 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent, GstRTCPPacket packet; GstRTCPBuffer rtcp = { NULL, }; GstPad *srcpad; - GstRtpSsrcDemuxPad *dpad; demux = GST_RTP_SSRC_DEMUX (parent); - if (!gst_rtcp_buffer_validate (buf)) + if (!gst_rtcp_buffer_validate_reduced (buf)) goto invalid_rtcp; gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp); @@ -687,7 +731,9 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent, goto invalid_rtcp; } - /* first packet must be SR or RR or else the validate would have failed */ + /* first packet must be SR or RR, or in case of a reduced size RTCP packet + * it must be APP, RTPFB or PSFB feeadback, or else the validate would + * have failed */ switch (gst_rtcp_packet_get_type (&packet)) { case GST_RTCP_TYPE_SR: /* get the ssrc so that we can route it to the right source pad */ @@ -697,6 +743,13 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent, case GST_RTCP_TYPE_RR: ssrc = gst_rtcp_packet_rr_get_ssrc (&packet); break; + case GST_RTCP_TYPE_APP: + ssrc = gst_rtcp_packet_app_get_ssrc (&packet); + break; + case GST_RTCP_TYPE_RTPFB: + case GST_RTCP_TYPE_PSFB: + ssrc = gst_rtcp_packet_fb_get_sender_ssrc (&packet); + break; default: goto unexpected_rtcp; } @@ -712,14 +765,16 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent, ret = gst_pad_push (srcpad, buf); if (ret != GST_FLOW_OK) { + GstPad *active_pad; + /* check if the ssrc still there, may have been removed */ - GST_PAD_LOCK (demux); - dpad = find_demux_pad_for_ssrc (demux, ssrc); - if (dpad == NULL || dpad->rtcp_pad != srcpad) { + active_pad = get_demux_pad_for_ssrc (demux, ssrc, RTCP_PAD); + if (active_pad == NULL || active_pad != srcpad) { /* SSRC was removed during the push ... ignore the error */ ret = GST_FLOW_OK; } - GST_PAD_UNLOCK (demux); + + g_clear_object (&active_pad); } gst_object_unref (srcpad); @@ -729,11 +784,9 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent, /* ERRORS */ invalid_rtcp: { - /* this is fatal and should be filtered earlier */ - GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), - ("Dropping invalid RTCP packet")); + GST_DEBUG_OBJECT (demux, "Dropping invalid RTCP packet"); gst_buffer_unref (buf); - return GST_FLOW_ERROR; + return GST_FLOW_OK; } unexpected_rtcp: { @@ -743,10 +796,11 @@ unexpected_rtcp: } create_failed: { - GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), - ("Could not create new pad")); gst_buffer_unref (buf); - return GST_FLOW_ERROR; + GST_WARNING_OBJECT (demux, + "Dropping buffer SSRC %08x. " + "Max streams number reached (%u)", ssrc, demux->max_streams); + return GST_FLOW_OK; } } @@ -809,7 +863,7 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent) demux = GST_RTP_SSRC_DEMUX (parent); - GST_PAD_LOCK (demux); + GST_OBJECT_LOCK (demux); for (current = demux->srcpads; current; current = g_slist_next (current)) { GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) current->data; @@ -830,7 +884,7 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent) g_value_unset (&val); } - GST_PAD_UNLOCK (demux); + GST_OBJECT_UNLOCK (demux); return it; }