gst/rtpmanager/gstrtpbin.c: Remove internal sync pad, use signals instead to get...
authorWim Taymans <wim.taymans@gmail.com>
Wed, 19 Nov 2008 09:06:29 +0000 (09:06 +0000)
committerTim-Philipp Müller <tim.muller@collabora.co.uk>
Tue, 11 Aug 2009 01:30:38 +0000 (02:30 +0100)
Original commit message from CVS:
* gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_associate),
(gst_rtp_bin_handle_sync), (create_stream), (free_stream),
(new_ssrc_pad_found):
Remove internal sync pad, use signals instead to get lip-sync
notifications.
* gst/rtpmanager/gstrtpjitterbuffer.c:
(gst_rtp_jitter_buffer_base_init),
(gst_rtp_jitter_buffer_class_init),
(gst_rtp_jitter_buffer_internal_links), (create_rtcp_sink),
(remove_rtcp_sink), (gst_rtp_jitter_buffer_request_new_pad),
(gst_rtp_jitter_buffer_release_pad),
(gst_rtp_jitter_buffer_sink_rtcp_event),
(gst_rtp_jitter_buffer_chain_rtcp),
(gst_rtp_jitter_buffer_get_property):
* gst/rtpmanager/gstrtpjitterbuffer.h:
Make it possible to send SR packets to the jitterbuffer.
Check if the SR timestamps are valid by comparing them to the RTP
timestamps.
Signal the SR packet and the timing information to listeners.
* gst/rtpmanager/gstrtpssrcdemux.c: (create_demux_pad_for_ssrc),
(gst_rtp_ssrc_demux_rtcp_chain), (gst_rtp_ssrc_demux_src_query):
Remove some unused code.
* gst/rtpmanager/rtpjitterbuffer.c: (rtp_jitter_buffer_reset_skew),
(calculate_skew), (rtp_jitter_buffer_get_sync):
* gst/rtpmanager/rtpjitterbuffer.h:
Keep track of the last seen RTP timestamp so that we can filter out
invalid SR packets.

gst/rtpmanager/gstrtpbin.c
gst/rtpmanager/gstrtpjitterbuffer.c
gst/rtpmanager/gstrtpjitterbuffer.h
gst/rtpmanager/gstrtpssrcdemux.c
gst/rtpmanager/rtpjitterbuffer.c
gst/rtpmanager/rtpjitterbuffer.h

index 197be52..11d14d5 100644 (file)
@@ -176,14 +176,6 @@ GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d",
     GST_STATIC_CAPS ("application/x-rtp")
     );
 
-/* padtemplate for the internal pad */
-static GstStaticPadTemplate rtpbin_sync_sink_template =
-GST_STATIC_PAD_TEMPLATE ("sink_%d",
-    GST_PAD_SINK,
-    GST_PAD_SOMETIMES,
-    GST_STATIC_CAPS ("application/x-rtcp")
-    );
-
 #define GST_RTP_BIN_GET_PRIVATE(obj)  \
    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
 
@@ -310,8 +302,7 @@ struct _GstRtpBinStream
   gulong demux_ptreq_sig;
   gulong demux_pt_change_sig;
 
-  /* the internal pad we use to get RTCP sync messages */
-  GstPad *sync_pad;
+  /* data for the RTCP sync signal */
   gboolean have_sync;
   guint64 last_unix;
   guint64 last_extrtptime;
@@ -818,7 +809,8 @@ free_client (GstRtpBinClient * client)
 }
 
 /* associate a stream to the given CNAME. This will make sure all streams for
- * that CNAME are synchronized together. */
+ * that CNAME are synchronized together.
+ * Must be called with GST_RTP_BIN_LOCK */
 static void
 gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
     guint8 * data)
@@ -828,7 +820,6 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
   GSList *walk;
 
   /* first find or create the CNAME */
-  GST_RTP_BIN_LOCK (bin);
   client = get_client (bin, len, data, &created);
 
   /* find stream in the client */
@@ -851,13 +842,6 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
         stream->ssrc, client, client->cname);
   }
 
-  /* we can only continue if we know the local clock-base and clock-rate */
-  if (stream->clock_base == -1)
-    goto no_clock_base;
-
-  if (stream->clock_rate <= 0)
-    goto no_clock_rate;
-
   /* take the extended rtptime we found in the SR packet and map it to the
    * local rtptime. The local rtp time is used to construct timestamps on the
    * buffers. */
@@ -897,7 +881,7 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
      * offsets to streams, delaying their playback instead of trying to speed up
      * other streams (which might be imposible when we have to create negative
      * latencies).
-     * The stream that has the smalest diff is selected as the reference stream,
+     * The stream that has the smallest diff is selected as the reference stream,
      * all other streams will have a positive offset to this difference. */
     min = G_MAXINT64;
     for (walk = client->streams; walk; walk = g_slist_next (walk)) {
@@ -955,22 +939,7 @@ gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
           ostream->ssrc, ostream->ts_offset);
     }
   }
-  GST_RTP_BIN_UNLOCK (bin);
-
   return;
-
-no_clock_base:
-  {
-    GST_WARNING_OBJECT (bin, "we have no clock-base");
-    GST_RTP_BIN_UNLOCK (bin);
-    return;
-  }
-no_clock_rate:
-  {
-    GST_WARNING_OBJECT (bin, "we have no clock-rate");
-    GST_RTP_BIN_UNLOCK (bin);
-    return;
-  }
 }
 
 #define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
@@ -985,44 +954,37 @@ no_clock_rate:
   for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
           (b) = gst_rtcp_packet_sdes_next_entry ((packet)))
 
-static GstFlowReturn
-gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer)
+static void
+gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
+    GstRtpBinStream * stream)
 {
-  GstFlowReturn ret = GST_FLOW_OK;
-  GstRtpBinStream *stream;
   GstRtpBin *bin;
   GstRTCPPacket packet;
   guint32 ssrc;
   guint64 ntptime;
-  guint32 rtptime;
   gboolean have_sr, have_sdes;
   gboolean more;
   guint64 clock_base;
   guint64 clock_base_time;
   guint clock_rate;
+  guint64 extrtptime;
+  GstBuffer *buffer;
 
-  stream = gst_pad_get_element_private (pad);
   bin = stream->bin;
 
-  GST_DEBUG_OBJECT (bin, "received sync packet");
-
-  if (!gst_rtcp_buffer_validate (buffer))
-    goto invalid_rtcp;
+  GST_DEBUG_OBJECT (bin, "sync handler called");
 
   /* get the last relation between the rtp timestamps and the gstreamer
    * timestamps. We get this info directly from the jitterbuffer which
    * constructs gstreamer timestamps from rtp timestamps and so it know exactly
    * what the current situation is. */
-  gst_rtp_jitter_buffer_get_sync (GST_RTP_JITTER_BUFFER (stream->buffer),
-      &clock_base, &clock_base_time, &clock_rate);
-
-  /* clock base changes when there is a huge gap in the timestamps or seqnum.
-   * When this happens we don't want to calculate the extended timestamp based
-   * on the previous one but reset the calculation. */
-  if (stream->last_clock_base != clock_base) {
-    stream->last_extrtptime = -1;
-    stream->last_clock_base = clock_base;
-  }
+  clock_base = g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
+  clock_base_time =
+      g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
+  clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
+  extrtptime =
+      g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
+  buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
 
   have_sr = FALSE;
   have_sdes = FALSE;
@@ -1035,7 +997,7 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer)
         if (have_sr)
           break;
         /* get NTP and RTP times */
-        gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, &rtptime,
+        gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
             NULL, NULL);
 
         GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
@@ -1044,12 +1006,6 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer)
           continue;
 
         have_sr = TRUE;
-
-        /* store values in the stream */
-        stream->have_sync = TRUE;
-        stream->last_unix = gst_rtcp_ntp_to_unix (ntptime);
-        /* use extended timestamp */
-        gst_rtp_buffer_ext_timestamp (&stream->last_extrtptime, rtptime);
         break;
       case GST_RTCP_TYPE_SDES:
       {
@@ -1075,11 +1031,17 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer)
             gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);
 
             if (type == GST_RTCP_SDES_CNAME) {
+              GST_RTP_BIN_LOCK (bin);
+              /* store values in the stream */
+              stream->have_sync = TRUE;
+              stream->last_unix = gst_rtcp_ntp_to_unix (ntptime);
+              stream->last_extrtptime = extrtptime;
               stream->clock_base = clock_base;
               stream->clock_base_time = clock_base_time;
               stream->clock_rate = clock_rate;
               /* associate the stream to CNAME */
               gst_rtp_bin_associate (bin, stream, len, data);
+              GST_RTP_BIN_UNLOCK (bin);
             }
           }
         }
@@ -1091,20 +1053,6 @@ gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer)
         break;
     }
   }
-
-  gst_buffer_unref (buffer);
-
-  return ret;
-
-  /* ERRORS */
-invalid_rtcp:
-  {
-    /* this is fatal and should be filtered earlier */
-    GST_ELEMENT_ERROR (bin, STREAM, DECODE, (NULL),
-        ("invalid RTCP packet received"));
-    gst_buffer_unref (buffer);
-    return GST_FLOW_ERROR;
-  }
 }
 
 /* create a new stream with @ssrc in @session. Must be called with
@@ -1114,8 +1062,6 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
 {
   GstElement *buffer, *demux;
   GstRtpBinStream *stream;
-  GstPadTemplate *templ;
-  gchar *padname;
 
   if (!(buffer = gst_element_factory_make ("gstrtpjitterbuffer", NULL)))
     goto no_jitterbuffer;
@@ -1134,19 +1080,6 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
   stream->have_sync = FALSE;
   session->streams = g_slist_prepend (session->streams, stream);
 
-  /* make an internal sinkpad for RTCP sync packets. Take ownership of the
-   * pad. We will link this pad later. */
-  padname = g_strdup_printf ("sync_%d", ssrc);
-  templ = gst_static_pad_template_get (&rtpbin_sync_sink_template);
-  stream->sync_pad = gst_pad_new_from_template (templ, padname);
-  gst_object_unref (templ);
-  g_free (padname);
-  gst_object_ref (stream->sync_pad);
-  gst_object_sink (stream->sync_pad);
-  gst_pad_set_element_private (stream->sync_pad, stream);
-  gst_pad_set_chain_function (stream->sync_pad, gst_rtp_bin_sync_chain);
-  gst_pad_set_active (stream->sync_pad, TRUE);
-
   /* provide clock_rate to the jitterbuffer when needed */
   g_signal_connect (buffer, "request-pt-map",
       (GCallback) pt_map_requested, session);
@@ -1192,8 +1125,6 @@ free_stream (GstRtpBinStream * stream)
   gst_bin_remove (GST_BIN_CAST (session->bin), stream->buffer);
   gst_bin_remove (GST_BIN_CAST (session->bin), stream->demux);
 
-  gst_object_unref (stream->sync_pad);
-
   session->streams = g_slist_remove (session->streams, stream);
 
   g_free (stream);
@@ -1985,7 +1916,7 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
   }
 
   /* get pad and link */
-  GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer");
+  GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTP");
   padname = g_strdup_printf ("src_%d", ssrc);
   srcpad = gst_element_get_static_pad (element, padname);
   g_free (padname);
@@ -1994,14 +1925,20 @@ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad,
   gst_object_unref (sinkpad);
   gst_object_unref (srcpad);
 
-  /* get the RTCP sync pad */
-  GST_DEBUG_OBJECT (rtpbin, "linking sync pad");
+  GST_DEBUG_OBJECT (rtpbin, "linking jitterbuffer RTCP");
   padname = g_strdup_printf ("rtcp_src_%d", ssrc);
   srcpad = gst_element_get_static_pad (element, padname);
   g_free (padname);
-  gst_pad_link (srcpad, stream->sync_pad);
+  sinkpad = gst_element_get_request_pad (stream->buffer, "sink_rtcp");
+  gst_pad_link (srcpad, sinkpad);
+  gst_object_unref (sinkpad);
   gst_object_unref (srcpad);
 
+  /* connect to the RTCP sync signal from the jitterbuffer */
+  GST_DEBUG_OBJECT (rtpbin, "connecting sync signal");
+  g_signal_connect (stream->buffer,
+      "handle-sync", (GCallback) gst_rtp_bin_handle_sync, stream);
+
   /* connect to the new-pad signal of the payload demuxer, this will expose the
    * new pad by ghosting it. */
   stream->demux_newpad_sig = g_signal_connect (stream->demux,
index bd47bde..e9c2f79 100644 (file)
@@ -87,6 +87,7 @@ enum
 {
   SIGNAL_REQUEST_PT_MAP,
   SIGNAL_CLEAR_PT_MAP,
+  SIGNAL_HANDLE_SYNC,
   LAST_SIGNAL
 };
 
@@ -127,6 +128,7 @@ enum
 struct _GstRtpJitterBufferPrivate
 {
   GstPad *sinkpad, *srcpad;
+  GstPad *rtcpsinkpad;
 
   RTPJitterBuffer *jbuf;
   GMutex *jbuf_lock;
@@ -190,6 +192,13 @@ GST_STATIC_PAD_TEMPLATE ("sink",
          */ )
     );
 
+static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
+GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
+    GST_PAD_SINK,
+    GST_PAD_REQUEST,
+    GST_STATIC_CAPS ("application/x-rtcp")
+    );
+
 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
 GST_STATIC_PAD_TEMPLATE ("src",
     GST_PAD_SRC,
@@ -216,20 +225,30 @@ static void gst_rtp_jitter_buffer_finalize (GObject * object);
 /* element overrides */
 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
     * element, GstStateChange transition);
+static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
+    GstPadTemplate * templ, const gchar * name);
+static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
+    GstPad * pad);
 
 /* pad overrides */
 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad);
+static GList *gst_rtp_jitter_buffer_internal_links (GstPad * pad);
 
 /* sinkpad overrides */
 static gboolean gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps);
-static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
-    GstEvent * event);
 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
     GstEvent * event);
 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
     GstBuffer * buffer);
 
+static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
+    GstEvent * event);
+static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
+    GstBuffer * buffer);
+
 /* srcpad overrides */
+static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
+    GstEvent * event);
 static gboolean
 gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active);
 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
@@ -247,6 +266,9 @@ gst_rtp_jitter_buffer_base_init (gpointer klass)
       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
   gst_element_class_add_pad_template (element_class,
       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
+  gst_element_class_add_pad_template (element_class,
+      gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
+
   gst_element_class_set_details (element_class, &gst_rtp_jitter_buffer_details);
 }
 
@@ -321,6 +343,19 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
           request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT,
       GST_TYPE_CAPS, 1, G_TYPE_UINT);
   /**
+   * GstRtpJitterBuffer::handle-sync:
+   * @buffer: the object which received the signal
+   * @struct: a GstStructure containing sync values.
+   *
+   * Be notified of new sync values.
+   */
+  gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
+      g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
+          handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
+      G_TYPE_NONE, 1, GST_TYPE_STRUCTURE);
+
+  /**
    * GstRtpJitterBuffer::clear-pt-map:
    * @buffer: the object which received the signal
    *
@@ -329,11 +364,16 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
    */
   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
-      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
-          clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID,
-      G_TYPE_NONE, 0, G_TYPE_NONE);
+      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+      G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
+      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
 
-  gstelement_class->change_state = gst_rtp_jitter_buffer_change_state;
+  gstelement_class->change_state =
+      GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
+  gstelement_class->request_new_pad =
+      GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
+  gstelement_class->release_pad =
+      GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
 
   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
 
@@ -403,6 +443,139 @@ gst_rtp_jitter_buffer_finalize (GObject * object)
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
+static GList *
+gst_rtp_jitter_buffer_internal_links (GstPad * pad)
+{
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
+  GList *res = NULL;
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+  priv = jitterbuffer->priv;
+
+  if (pad == priv->sinkpad) {
+    res = g_list_prepend (res, priv->srcpad);
+  } else if (pad == priv->srcpad) {
+    res = g_list_prepend (res, priv->sinkpad);
+  } else if (pad == priv->rtcpsinkpad) {
+    res = NULL;
+  }
+
+  gst_object_unref (jitterbuffer);
+
+  return res;
+}
+
+static GstPad *
+create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
+{
+  GstRtpJitterBufferPrivate *priv;
+
+  priv = jitterbuffer->priv;
+
+  GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
+
+  priv->rtcpsinkpad =
+      gst_pad_new_from_static_template
+      (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
+  gst_pad_set_chain_function (priv->rtcpsinkpad,
+      gst_rtp_jitter_buffer_chain_rtcp);
+  gst_pad_set_event_function (priv->rtcpsinkpad,
+      (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
+  gst_pad_set_internal_link_function (priv->rtcpsinkpad,
+      gst_rtp_jitter_buffer_internal_links);
+  gst_pad_set_active (priv->rtcpsinkpad, TRUE);
+  gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
+
+  return priv->rtcpsinkpad;
+}
+
+static void
+remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
+{
+  GstRtpJitterBufferPrivate *priv;
+
+  priv = jitterbuffer->priv;
+
+  GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
+
+  gst_pad_set_active (priv->rtcpsinkpad, FALSE);
+
+  gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
+  priv->rtcpsinkpad = NULL;
+}
+
+static GstPad *
+gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
+    GstPadTemplate * templ, const gchar * name)
+{
+  GstRtpJitterBuffer *jitterbuffer;
+  GstElementClass *klass;
+  GstPad *result;
+  GstRtpJitterBufferPrivate *priv;
+
+  g_return_val_if_fail (templ != NULL, NULL);
+  g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (element);
+  priv = jitterbuffer->priv;
+  klass = GST_ELEMENT_GET_CLASS (element);
+
+  GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
+
+  /* figure out the template */
+  if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
+    if (priv->rtcpsinkpad != NULL)
+      goto exists;
+
+    result = create_rtcp_sink (jitterbuffer);
+  } else
+    goto wrong_template;
+
+  return result;
+
+  /* ERRORS */
+wrong_template:
+  {
+    g_warning ("gstrtpjitterbuffer: this is not our template");
+    return NULL;
+  }
+exists:
+  {
+    g_warning ("gstrtpjitterbuffer: pad already requested");
+    return NULL;
+  }
+}
+
+static void
+gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
+{
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
+
+  g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
+  g_return_if_fail (GST_IS_PAD (pad));
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (element);
+  priv = jitterbuffer->priv;
+
+  GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
+
+  if (priv->rtcpsinkpad == pad) {
+    remove_rtcp_sink (jitterbuffer);
+  } else
+    goto wrong_pad;
+
+  return;
+
+  /* ERRORS */
+wrong_pad:
+  {
+    g_warning ("gstjitterbuffer: asked to release an unknown pad");
+    return;
+  }
+}
+
 static void
 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
 {
@@ -787,6 +960,31 @@ newseg_wrong_format:
 }
 
 static gboolean
+gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstEvent * event)
+{
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+  priv = jitterbuffer->priv;
+
+  GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
+
+  switch (GST_EVENT_TYPE (event)) {
+    case GST_EVENT_FLUSH_START:
+      break;
+    case GST_EVENT_FLUSH_STOP:
+      break;
+    default:
+      break;
+  }
+  gst_event_unref (event);
+  gst_object_unref (jitterbuffer);
+
+  return TRUE;
+}
+
+static gboolean
 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
     guint8 pt)
 {
@@ -1335,6 +1533,124 @@ pause:
   }
 }
 
+static GstFlowReturn
+gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstBuffer * buffer)
+{
+  GstRtpJitterBuffer *jitterbuffer;
+  GstRtpJitterBufferPrivate *priv;
+  GstFlowReturn ret;
+  guint64 base_rtptime, timestamp;
+  guint32 clock_rate;
+  guint64 last_rtptime;
+  guint32 ssrc;
+  GstRTCPPacket packet;
+  guint64 ext_rtptime, diff;
+  guint32 rtptime;
+  gboolean drop = FALSE;
+
+  jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
+
+  if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
+    goto invalid_buffer;
+
+  priv = jitterbuffer->priv;
+
+  if (!gst_rtcp_buffer_get_first_packet (buffer, &packet))
+    goto invalid_buffer;
+
+  /* first packet must be SR or RR or else the validate would have failed */
+  switch (gst_rtcp_packet_get_type (&packet)) {
+    case GST_RTCP_TYPE_SR:
+      gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
+          NULL, NULL);
+      break;
+    default:
+      goto ignore_buffer;
+  }
+
+  GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
+
+  JBUF_LOCK (priv);
+  /* convert the RTP timestamp to our extended timestamp, using the same offset
+   * we used in the jitterbuffer */
+  ext_rtptime = priv->jbuf->ext_rtptime;
+  ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
+
+  /* get the last values from the jitterbuffer */
+  rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &timestamp,
+      &clock_rate, &last_rtptime);
+
+  GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
+      G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT,
+      ext_rtptime, base_rtptime, clock_rate);
+
+  if (base_rtptime == -1 || clock_rate == -1 || timestamp == -1) {
+    GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values");
+    drop = TRUE;
+  } else {
+    /* we can't accept anything that happened before we did the last resync */
+    if (base_rtptime > ext_rtptime) {
+      GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
+      drop = TRUE;
+    } else {
+      /* the SR RTP timestamp must be something close to what we last observed
+       * in the jitterbuffer */
+      if (ext_rtptime > last_rtptime) {
+        /* check how far ahead it is to our RTP timestamps */
+        diff = ext_rtptime - last_rtptime;
+        /* if bigger than 1 second, we drop it */
+        if (diff > clock_rate) {
+          GST_DEBUG_OBJECT (jitterbuffer, "dropping, too far ahead");
+          drop = TRUE;
+        }
+        GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
+            G_GUINT64_FORMAT, last_rtptime, diff);
+      }
+    }
+  }
+  JBUF_UNLOCK (priv);
+
+  if (!drop) {
+    GstStructure *s;
+
+    s = gst_structure_new ("application/x-rtp-sync",
+        "base-rtptime", G_TYPE_UINT64, base_rtptime,
+        "base-time", G_TYPE_UINT64, timestamp,
+        "clock-rate", G_TYPE_UINT, clock_rate,
+        "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
+        "sr-buffer", GST_TYPE_BUFFER, buffer, NULL);
+
+    GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
+    g_signal_emit (jitterbuffer,
+        gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
+    gst_structure_free (s);
+  } else {
+    GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
+    ret = GST_FLOW_OK;
+  }
+
+done:
+  gst_buffer_unref (buffer);
+  gst_object_unref (jitterbuffer);
+
+  return ret;
+
+invalid_buffer:
+  {
+    /* this is not fatal but should be filtered earlier */
+    GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
+        ("Received invalid RTCP payload, dropping"));
+    ret = GST_FLOW_OK;
+    goto done;
+  }
+ignore_buffer:
+  {
+    GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
+    ret = GST_FLOW_OK;
+    goto done;
+  }
+}
+
 static gboolean
 gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query)
 {
@@ -1485,18 +1801,3 @@ gst_rtp_jitter_buffer_get_property (GObject * object,
       break;
   }
 }
-
-void
-gst_rtp_jitter_buffer_get_sync (GstRtpJitterBuffer * buffer, guint64 * rtptime,
-    guint64 * timestamp, guint32 * clock_rate)
-{
-  GstRtpJitterBufferPrivate *priv;
-
-  g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (buffer));
-
-  priv = buffer->priv;
-
-  JBUF_LOCK (priv);
-  rtp_jitter_buffer_get_sync (priv->jbuf, rtptime, timestamp, clock_rate);
-  JBUF_UNLOCK (priv);
-}
index 40908ea..45e6897 100644 (file)
@@ -71,6 +71,9 @@ struct _GstRtpJitterBufferClass
   /* signals */
   GstCaps* (*request_pt_map) (GstRtpJitterBuffer *buffer, guint pt);
 
+  void     (*handle_sync)    (GstRtpJitterBuffer *buffer, GstStructure *s);
+
+  /* actions */
   void     (*clear_pt_map)   (GstRtpJitterBuffer *buffer);
 
   /*< private > */
@@ -79,10 +82,6 @@ struct _GstRtpJitterBufferClass
 
 GType gst_rtp_jitter_buffer_get_type (void);
 
-void        gst_rtp_jitter_buffer_get_sync            (GstRtpJitterBuffer *buffer,
-                                                       guint64 *rtptime, guint64 *timestamp,
-                                                      guint32 *clock_rate);
-
 G_END_DECLS
 
 #endif /* __GST_RTP_JITTER_BUFFER_H__ */
index e5b989f..64394c4 100644 (file)
@@ -137,7 +137,6 @@ struct _GstRtpSsrcDemuxPad
   GstPad *rtp_pad;
   GstCaps *caps;
   GstPad *rtcp_pad;
-  GstClockTime first_ts;
 };
 
 /* find a src pad for a given SSRC, returns NULL if the SSRC was not found
@@ -190,7 +189,6 @@ create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc,
   demuxpad->ssrc = ssrc;
   demuxpad->rtp_pad = rtp_pad;
   demuxpad->rtcp_pad = rtcp_pad;
-  demuxpad->first_ts = timestamp;
 
   GST_DEBUG_OBJECT (demux, "first timestamp %" GST_TIME_FORMAT,
       GST_TIME_ARGS (timestamp));
@@ -484,9 +482,6 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf)
       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL,
           NULL);
       break;
-    case GST_RTCP_TYPE_RR:
-      ssrc = gst_rtcp_packet_rr_get_ssrc (&packet);
-      break;
     default:
       goto invalid_rtcp;
   }
@@ -599,9 +594,7 @@ gst_rtp_ssrc_demux_src_query (GstPad * pad, GstQuery * query)
         GST_DEBUG_OBJECT (demux, "peer min latency %" GST_TIME_FORMAT,
             GST_TIME_ARGS (min_latency));
 
-        GST_DEBUG_OBJECT (demux,
-            "latency for SSRC %08x, latency %" GST_TIME_FORMAT, demuxpad->ssrc,
-            GST_TIME_ARGS (demuxpad->first_ts));
+        GST_DEBUG_OBJECT (demux, "latency for SSRC %08x", demuxpad->ssrc);
 
         gst_query_set_latency (query, live, min_latency, max_latency);
       }
index 6cfbab6..d027dc2 100644 (file)
@@ -107,6 +107,7 @@ rtp_jitter_buffer_reset_skew (RTPJitterBuffer * jbuf)
   jbuf->base_extrtp = -1;
   jbuf->clock_rate = -1;
   jbuf->ext_rtptime = -1;
+  jbuf->last_rtptime = -1;
   jbuf->window_pos = 0;
   jbuf->window_filling = TRUE;
   jbuf->window_min = 0;
@@ -188,11 +189,15 @@ calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time,
 
   gstrtptime = gst_util_uint64_scale_int (ext_rtptime, GST_SECOND, clock_rate);
 
-  if (jbuf->clock_rate != -1 && jbuf->clock_rate != clock_rate) {
-    GST_DEBUG ("Clock rate changed from %" G_GUINT32_FORMAT " to %"
+  /* keep track of the last extended rtptime */
+  jbuf->last_rtptime = ext_rtptime;
+
+  if (jbuf->clock_rate != clock_rate) {
+    GST_WARNING ("Clock rate changed from %" G_GUINT32_FORMAT " to %"
         G_GUINT32_FORMAT, jbuf->clock_rate, clock_rate);
     jbuf->base_time = -1;
     jbuf->base_rtptime = -1;
+    jbuf->clock_rate = clock_rate;
   }
 
   /* first time, lock on to time and gstrtptime */
@@ -202,7 +207,6 @@ calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time,
   }
   if (G_UNLIKELY (jbuf->base_rtptime == -1)) {
     jbuf->base_rtptime = gstrtptime;
-    jbuf->clock_rate = clock_rate;
     jbuf->base_extrtp = ext_rtptime;
     GST_DEBUG ("Taking new base rtptime %" GST_TIME_FORMAT,
         GST_TIME_ARGS (gstrtptime));
@@ -213,10 +217,9 @@ calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time,
   else {
     /* elapsed time at sender, timestamps can go backwards and thus be smaller
      * than our base time, take a new base time in that case. */
-    GST_DEBUG ("backward timestamps at server, taking new base time");
+    GST_WARNING ("backward timestamps at server, taking new base time");
     jbuf->base_time = time;
     jbuf->base_rtptime = gstrtptime;
-    jbuf->clock_rate = clock_rate;
     jbuf->base_extrtp = ext_rtptime;
     send_diff = 0;
   }
@@ -245,12 +248,11 @@ calculate_skew (RTPJitterBuffer * jbuf, guint32 rtptime, GstClockTime time,
    * changed too quickly we have to resync because the server likely restarted
    * its timestamps. */
   if (ABS (delta - jbuf->skew) > GST_SECOND) {
-    GST_DEBUG ("delta %" GST_TIME_FORMAT " too big, reset skew",
+    GST_WARNING ("delta %" GST_TIME_FORMAT " too big, reset skew",
         GST_TIME_ARGS (delta - jbuf->skew));
     jbuf->base_time = time;
     jbuf->base_rtptime = gstrtptime;
     jbuf->base_extrtp = ext_rtptime;
-    jbuf->clock_rate = clock_rate;
     send_diff = 0;
     delta = 0;
   }
@@ -536,13 +538,20 @@ rtp_jitter_buffer_get_ts_diff (RTPJitterBuffer * jbuf)
  * @rtptime: result RTP time
  * @timestamp: result GStreamer timestamp
  * @clock_rate: clock-rate of @rtptime
+ * @last_rtptime: last seen rtptime.
  *
  * Returns the relation between the RTP timestamp and the GStreamer timestamp
  * used for constructing timestamps.
+ *
+ * For extended RTP timestamp @rtptime with a clock-rate of @clock_rate,
+ * the GStreamer timestamp is currently @timestamp.
+ *
+ * The last seen extended RTP timestamp with clock-rate @clock-rate is returned in
+ * @last_rtptime.
  */
 void
 rtp_jitter_buffer_get_sync (RTPJitterBuffer * jbuf, guint64 * rtptime,
-    guint64 * timestamp, guint32 * clock_rate)
+    guint64 * timestamp, guint32 * clock_rate, guint64 * last_rtptime)
 {
   if (rtptime)
     *rtptime = jbuf->base_extrtp;
@@ -550,4 +559,6 @@ rtp_jitter_buffer_get_sync (RTPJitterBuffer * jbuf, guint64 * rtptime,
     *timestamp = jbuf->base_time + jbuf->skew;
   if (clock_rate)
     *clock_rate = jbuf->clock_rate;
+  if (last_rtptime)
+    *last_rtptime = jbuf->last_rtptime;
 }
index 325f8f7..aa00919 100644 (file)
@@ -59,6 +59,7 @@ struct _RTPJitterBuffer {
   guint32        clock_rate;
   GstClockTime   base_extrtp;
   guint64        ext_rtptime;
+  guint64        last_rtptime;
   gint64         window[RTP_JITTER_BUFFER_MAX_WINDOW];
   guint          window_pos;
   guint          window_size;
@@ -92,7 +93,8 @@ guint                 rtp_jitter_buffer_num_packets      (RTPJitterBuffer *jbuf)
 guint32               rtp_jitter_buffer_get_ts_diff      (RTPJitterBuffer *jbuf);
 
 void                  rtp_jitter_buffer_get_sync         (RTPJitterBuffer *jbuf, guint64 *rtptime,
-                                                          guint64 *timestamp, guint32 *clock_rate);
+                                                          guint64 *timestamp, guint32 *clock_rate,
+                                                         guint64 *last_rtptime);
 
 
 #endif /* __RTP_JITTER_BUFFER_H__ */