rtpsession: Take session lock when creating stats
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpssrcdemux.c
index 540558f..c4a3c43 100644 (file)
@@ -20,9 +20,9 @@
  */
 
 /**
- * SECTION:element-gstrtpssrcdemux
+ * SECTION:element-rtpssrcdemux
  *
- * gstrtpssrcdemux acts as a demuxer for RTP packets based on the SSRC of the
+ * rtpssrcdemux acts as a demuxer for RTP packets based on the SSRC of the
  * packets. Its main purpose is to allow an application to easily receive and
  * decode an RTP stream with multiple SSRCs.
  * 
  * <refsect2>
  * <title>Example pipelines</title>
  * |[
- * gst-launch-1.0 udpsrc caps="application/x-rtp" ! gstrtpssrcdemux ! fakesink
+ * gst-launch-1.0 udpsrc caps="application/x-rtp" ! rtpssrcdemux ! fakesink
  * ]| Takes an RTP stream and send the RTP packets with the first detected SSRC
  * to fakesink, discarding the other SSRCs.
  * </refsect2>
- *
- * Last reviewed on 2007-05-28 (0.10.5)
  */
 
 #ifdef HAVE_CONFIG_H
@@ -48,7 +46,6 @@
 #include <gst/rtp/gstrtpbuffer.h>
 #include <gst/rtp/gstrtcpbuffer.h>
 
-#include "gstrtpbin-marshal.h"
 #include "gstrtpssrcdemux.h"
 
 GST_DEBUG_CATEGORY_STATIC (gst_rtp_ssrc_demux_debug);
@@ -123,8 +120,6 @@ static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
 
 static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad,
     GstObject * parent, GstBuffer * buf);
-static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad,
-    GstObject * parent, GstEvent * event);
 static GstIterator *gst_rtp_ssrc_demux_iterate_internal_links_sink (GstPad *
     pad, GstObject * parent);
 
@@ -147,6 +142,9 @@ struct _GstRtpSsrcDemuxPad
   GstPad *rtp_pad;
   GstCaps *caps;
   GstPad *rtcp_pad;
+
+  gboolean pushed_initial_rtp_events;
+  gboolean pushed_initial_rtcp_events;
 };
 
 /* find a src pad for a given SSRC, returns NULL if the SSRC was not found
@@ -193,7 +191,7 @@ add_ssrc_and_ref (GstEvent * event, guint32 ssrc)
   return event;
 }
 
-struct ForwardEventData
+struct ForwardStickyEventData
 {
   GstPad *pad;
   guint32 ssrc;
@@ -202,7 +200,7 @@ struct ForwardEventData
 static gboolean
 forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
 {
-  struct ForwardEventData *data = user_data;
+  struct ForwardStickyEventData *data = user_data;
   GstEvent *newevent;
 
   newevent = add_ssrc_and_ref (*event, data->ssrc);
@@ -212,6 +210,25 @@ forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
   return TRUE;
 }
 
+static void
+forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad,
+    PadType padtype)
+{
+  struct ForwardStickyEventData fdata;
+  GstPad *sinkpad = NULL;
+
+  if (padtype == RTP_PAD)
+    sinkpad = demux->rtp_sink;
+  else if (padtype == RTCP_PAD)
+    sinkpad = demux->rtcp_sink;
+  else
+    g_assert_not_reached ();
+
+  fdata.ssrc = ssrc;
+  fdata.pad = pad;
+
+  gst_pad_sticky_events_foreach (sinkpad, forward_sticky_events, &fdata);
+}
 
 static GstPad *
 find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
@@ -222,32 +239,44 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
   GstPadTemplate *templ;
   gchar *padname;
   GstRtpSsrcDemuxPad *demuxpad;
-  GstCaps *caps;
-  struct ForwardEventData fdata;
   GstPad *retpad;
   gulong rtp_block, rtcp_block;
 
-  GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc);
-
   GST_PAD_LOCK (demux);
 
   demuxpad = find_demux_pad_for_ssrc (demux, ssrc);
   if (demuxpad != NULL) {
+    gboolean forward = FALSE;
+
     switch (padtype) {
       case RTP_PAD:
         retpad = gst_object_ref (demuxpad->rtp_pad);
+        if (!demuxpad->pushed_initial_rtp_events) {
+          forward = TRUE;
+          demuxpad->pushed_initial_rtp_events = TRUE;
+        }
         break;
       case RTCP_PAD:
         retpad = gst_object_ref (demuxpad->rtcp_pad);
+        if (!demuxpad->pushed_initial_rtcp_events) {
+          forward = TRUE;
+          demuxpad->pushed_initial_rtcp_events = TRUE;
+        }
         break;
       default:
         retpad = NULL;
         g_assert_not_reached ();
     }
+
     GST_PAD_UNLOCK (demux);
+
+    if (forward)
+      forward_initial_events (demux, ssrc, retpad, padtype);
     return retpad;
   }
 
+  GST_DEBUG_OBJECT (demux, "creating new pad for SSRC %08x", ssrc);
+
   klass = GST_ELEMENT_GET_CLASS (demux);
   templ = gst_element_class_get_pad_template (klass, "src_%u");
   padname = g_strdup_printf ("src_%u", ssrc);
@@ -265,8 +294,6 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
   demuxpad->rtp_pad = rtp_pad;
   demuxpad->rtcp_pad = rtcp_pad;
 
-  fdata.ssrc = ssrc;
-
   gst_pad_set_element_private (rtp_pad, demuxpad);
   gst_pad_set_element_private (rtcp_pad, demuxpad);
 
@@ -278,28 +305,23 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
   gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event);
   gst_pad_use_fixed_caps (rtp_pad);
   gst_pad_set_active (rtp_pad, TRUE);
-  fdata.pad = rtp_pad;
-  gst_pad_sticky_events_foreach (demux->rtp_sink, forward_sticky_events,
-      &fdata);
 
   gst_pad_set_event_function (rtcp_pad, gst_rtp_ssrc_demux_src_event);
   gst_pad_set_iterate_internal_links_function (rtcp_pad,
       gst_rtp_ssrc_demux_iterate_internal_links_src);
   gst_pad_use_fixed_caps (rtcp_pad);
   gst_pad_set_active (rtcp_pad, TRUE);
-  fdata.pad = rtcp_pad;
-  gst_pad_sticky_events_foreach (demux->rtcp_sink, forward_sticky_events,
-      &fdata);
-
-  /* copy caps from input */
-  if ((caps = gst_pad_get_current_caps (demux->rtp_sink))) {
-    gst_pad_set_caps (rtp_pad, caps);
-    gst_caps_unref (caps);
-  }
-  if ((caps = gst_pad_get_current_caps (demux->rtcp_sink))) {
-    gst_pad_set_caps (rtcp_pad, caps);
-    gst_caps_unref (caps);
+
+  if (padtype == RTP_PAD) {
+    demuxpad->pushed_initial_rtp_events = TRUE;
+    forward_initial_events (demux, ssrc, rtp_pad, padtype);
+  } else if (padtype == RTCP_PAD) {
+    demuxpad->pushed_initial_rtcp_events = TRUE;
+    forward_initial_events (demux, ssrc, rtcp_pad, padtype);
+  } else {
+    g_assert_not_reached ();
   }
+
   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad);
   gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad);
 
@@ -363,8 +385,8 @@ gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass)
       g_signal_new ("new-ssrc-pad",
       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, new_ssrc_pad),
-      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
-      G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
+      NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
+      GST_TYPE_PAD);
 
   /**
    * GstRtpSsrcDemux::removed-ssrc-pad:
@@ -378,8 +400,8 @@ gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass)
       g_signal_new ("removed-ssrc-pad",
       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, removed_ssrc_pad),
-      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT_OBJECT,
-      G_TYPE_NONE, 2, G_TYPE_UINT, GST_TYPE_PAD);
+      NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 2, G_TYPE_UINT,
+      GST_TYPE_PAD);
 
   /**
    * GstRtpSsrcDemux::clear-ssrc:
@@ -392,21 +414,21 @@ gst_rtp_ssrc_demux_class_init (GstRtpSsrcDemuxClass * klass)
       g_signal_new ("clear-ssrc",
       G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
       G_STRUCT_OFFSET (GstRtpSsrcDemuxClass, clear_ssrc),
-      NULL, NULL, gst_rtp_bin_marshal_VOID__UINT, G_TYPE_NONE, 1, G_TYPE_UINT);
+      NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_UINT);
 
   gstelement_klass->change_state =
       GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_change_state);
   gstrtpssrcdemux_klass->clear_ssrc =
       GST_DEBUG_FUNCPTR (gst_rtp_ssrc_demux_clear_ssrc);
 
-  gst_element_class_add_pad_template (gstelement_klass,
-      gst_static_pad_template_get (&rtp_ssrc_demux_sink_template));
-  gst_element_class_add_pad_template (gstelement_klass,
-      gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_sink_template));
-  gst_element_class_add_pad_template (gstelement_klass,
-      gst_static_pad_template_get (&rtp_ssrc_demux_src_template));
-  gst_element_class_add_pad_template (gstelement_klass,
-      gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template));
+  gst_element_class_add_static_pad_template (gstelement_klass,
+      &rtp_ssrc_demux_sink_template);
+  gst_element_class_add_static_pad_template (gstelement_klass,
+      &rtp_ssrc_demux_rtcp_sink_template);
+  gst_element_class_add_static_pad_template (gstelement_klass,
+      &rtp_ssrc_demux_src_template);
+  gst_element_class_add_static_pad_template (gstelement_klass,
+      &rtp_ssrc_demux_rtcp_src_template);
 
   gst_element_class_set_static_metadata (gstelement_klass, "RTP SSRC Demux",
       "Demux/Network/RTP",
@@ -435,15 +457,12 @@ gst_rtp_ssrc_demux_init (GstRtpSsrcDemux * demux)
       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
           "rtcp_sink"), "rtcp_sink");
   gst_pad_set_chain_function (demux->rtcp_sink, gst_rtp_ssrc_demux_rtcp_chain);
-  gst_pad_set_event_function (demux->rtcp_sink,
-      gst_rtp_ssrc_demux_rtcp_sink_event);
+  gst_pad_set_event_function (demux->rtcp_sink, gst_rtp_ssrc_demux_sink_event);
   gst_pad_set_iterate_internal_links_function (demux->rtcp_sink,
       gst_rtp_ssrc_demux_iterate_internal_links_sink);
   gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink);
 
   g_rec_mutex_init (&demux->padlock);
-
-  gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
 }
 
 static void
@@ -527,95 +546,62 @@ unknown_pad:
   }
 }
 
-static gboolean
-gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
-    GstEvent * event)
+struct ForwardEventData
 {
   GstRtpSsrcDemux *demux;
-  gboolean res = FALSE;
-
-  demux = GST_RTP_SSRC_DEMUX (parent);
-
-  switch (GST_EVENT_TYPE (event)) {
-    case GST_EVENT_FLUSH_STOP:
-      gst_segment_init (&demux->segment, GST_FORMAT_UNDEFINED);
-      /* fallthrough */
-    default:
-    {
-      GSList *walk;
-      GSList *pads = NULL;
-
-      res = TRUE;
-      /* need local snapshot of pads;
-       * should not push downstream while holding lock as that might deadlock
-       * with stuff traveling upstream tyring to get this lock while holding
-       * other (stream)lock */
-      GST_PAD_LOCK (demux);
-      for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
-        GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
-
-        pad = g_slice_dup (GstRtpSsrcDemuxPad, pad);
-        gst_object_ref (pad->rtp_pad);
-
-        pads = g_slist_prepend (pads, pad);
-      }
-      GST_PAD_UNLOCK (demux);
+  GstEvent *event;
+  gboolean res;
+  GstPad *pad;
+};
 
-      for (walk = pads; walk; walk = g_slist_next (walk)) {
-        GstRtpSsrcDemuxPad *dpad = walk->data;
-        GstEvent *newevent;
+static gboolean
+forward_event (GstPad * pad, gpointer user_data)
+{
+  struct ForwardEventData *fdata = user_data;
+  GSList *walk = NULL;
+  GstEvent *newevent = NULL;
 
-        newevent = add_ssrc_and_ref (event, dpad->ssrc);
+  GST_PAD_LOCK (fdata->demux);
+  for (walk = fdata->demux->srcpads; walk; walk = walk->next) {
+    GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data;
 
-        res &= gst_pad_push_event (dpad->rtp_pad, newevent);
-        gst_object_unref (dpad->rtp_pad);
-        g_slice_free (GstRtpSsrcDemuxPad, dpad);
-      }
-      g_slist_free (pads);
-      gst_event_unref (event);
+    /* Only forward the event if the initial events have been through first,
+     * the initial events should be forwarded before any other event
+     * or buffer is pushed */
+    if ((pad == dpad->rtp_pad && dpad->pushed_initial_rtp_events) ||
+        (pad == dpad->rtcp_pad && dpad->pushed_initial_rtcp_events)) {
+      newevent = add_ssrc_and_ref (fdata->event, dpad->ssrc);
       break;
     }
   }
+  GST_PAD_UNLOCK (fdata->demux);
 
-  return res;
+  if (newevent)
+    fdata->res &= gst_pad_push_event (pad, newevent);
+
+  return TRUE;
 }
 
+
 static gboolean
-gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstObject * parent,
+gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstObject * parent,
     GstEvent * event)
 {
   GstRtpSsrcDemux *demux;
-  gboolean res = TRUE;
-  GSList *walk;
-  GSList *pads = NULL;
+  struct ForwardEventData fdata;
 
   demux = GST_RTP_SSRC_DEMUX (parent);
 
-  GST_PAD_LOCK (demux);
-  for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) {
-    GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data;
-
-    pad = g_slice_dup (GstRtpSsrcDemuxPad, pad);
-    gst_object_ref (pad->rtcp_pad);
+  fdata.demux = demux;
+  fdata.pad = pad;
+  fdata.event = event;
+  fdata.res = TRUE;
 
-    pads = g_slist_prepend (pads, pad);
-  }
-  GST_PAD_UNLOCK (demux);
-
-  for (walk = pads; walk; walk = g_slist_next (walk)) {
-    GstRtpSsrcDemuxPad *dpad = walk->data;
-    GstEvent *newevent;
-
-    newevent = add_ssrc_and_ref (event, dpad->ssrc);
+  gst_pad_forward (pad, forward_event, &fdata);
 
-    res &= gst_pad_push_event (dpad->rtcp_pad, newevent);
-    gst_object_unref (dpad->rtcp_pad);
-    g_slice_free (GstRtpSsrcDemuxPad, dpad);
-  }
-  g_slist_free (pads);
   gst_event_unref (event);
 
-  return res;
+  return fdata.res;
 }
 
 static GstFlowReturn
@@ -692,7 +678,7 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
 
   demux = GST_RTP_SSRC_DEMUX (parent);
 
-  if (!gst_rtcp_buffer_validate (buf))
+  if (!gst_rtcp_buffer_validate_reduced (buf))
     goto invalid_rtcp;
 
   gst_rtcp_buffer_map (buf, GST_MAP_READ, &rtcp);
@@ -701,13 +687,23 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent,
     goto invalid_rtcp;
   }
 
-  /* first packet must be SR or RR or else the validate would have failed */
+  /* first packet must be SR or RR, or in case of a reduced size RTCP packet
+   * it must be APP, RTPFB or PSFB feeadback, or else the validate would
+   * have failed */
   switch (gst_rtcp_packet_get_type (&packet)) {
     case GST_RTCP_TYPE_SR:
       /* get the ssrc so that we can route it to the right source pad */
       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL,
           NULL);
       break;
+    case GST_RTCP_TYPE_RR:
+      ssrc = gst_rtcp_packet_rr_get_ssrc (&packet);
+      break;
+    case GST_RTCP_TYPE_APP:
+    case GST_RTCP_TYPE_RTPFB:
+    case GST_RTCP_TYPE_PSFB:
+      ssrc = gst_rtcp_packet_fb_get_sender_ssrc (&packet);
+      break;
     default:
       goto unexpected_rtcp;
   }
@@ -852,10 +848,12 @@ src_pad_compare_func (gconstpointer a, gconstpointer b)
 {
   GstPad *pad = GST_PAD (g_value_get_object (a));
   const gchar *prefix = g_value_get_string (b);
-  gint res = 1;
+  gint res;
 
+  /* 0 means equal means we accept the pad, accepted if there is a name
+   * and it starts with the prefix */
   GST_OBJECT_LOCK (pad);
-  res = !GST_PAD_NAME (pad) || g_str_has_prefix (GST_PAD_NAME (pad), prefix);
+  res = !GST_PAD_NAME (pad) || !g_str_has_prefix (GST_PAD_NAME (pad), prefix);
   GST_OBJECT_UNLOCK (pad);
 
   return res;