media: implement ssrc-multiplexed retransmission support
authorMatthew Waters <matthew@centricular.com>
Wed, 26 Nov 2014 14:12:36 +0000 (01:12 +1100)
committerSebastian Dröge <sebastian@centricular.com>
Tue, 16 Dec 2014 15:41:08 +0000 (16:41 +0100)
based off RFC 4588 and the server-rtpaux example in -good

docs/libs/gst-rtsp-server-sections.txt
gst/rtsp-server/rtsp-media-factory.c
gst/rtsp-server/rtsp-media-factory.h
gst/rtsp-server/rtsp-media.c
gst/rtsp-server/rtsp-media.h
gst/rtsp-server/rtsp-sdp.c
gst/rtsp-server/rtsp-stream.c
gst/rtsp-server/rtsp-stream.h

index e832c4e..fe38081 100644 (file)
@@ -171,6 +171,9 @@ gst_rtsp_media_get_address_pool
 gst_rtsp_media_set_buffer_size
 gst_rtsp_media_get_buffer_size
 
+gst_rtsp_media_set_retransmission_time
+gst_rtsp_media_get_retransmission_time
+
 gst_rtsp_media_setup_sdp
 
 <SUBSECTION MediaPrepare>
@@ -257,6 +260,9 @@ gst_rtsp_media_factory_set_buffer_size
 gst_rtsp_media_factory_get_suspend_mode
 gst_rtsp_media_factory_set_suspend_mode
 
+gst_rtsp_media_factory_set_retransmission_time
+gst_rtsp_media_factory_get_retransmission_time
+
 gst_rtsp_media_factory_construct
 gst_rtsp_media_factory_create_element
 
@@ -541,6 +547,9 @@ gst_rtsp_stream_get_profiles
 gst_rtsp_stream_get_protocols
 gst_rtsp_stream_set_protocols
 
+gst_rtsp_stream_get_retransmission_time
+gst_rtsp_stream_set_retransmission_time
+
 gst_rtsp_stream_is_transport_supported
 
 gst_rtsp_stream_get_address_pool
index 7d85dcc..6e0da84 100644 (file)
@@ -58,6 +58,8 @@ struct _GstRTSPMediaFactoryPrivate
   guint buffer_size;
   GstRTSPAddressPool *pool;
 
+  GstClockTime rtx_time;
+
   GMutex medias_lock;
   GHashTable *medias;           /* protected by medias_lock */
 };
@@ -822,6 +824,55 @@ gst_rtsp_media_factory_get_protocols (GstRTSPMediaFactory * factory)
   return res;
 }
 
+/**
+ * gst_rtsp_media_factory_set_retransmission_time:
+ * @factory: a #GstRTSPMediaFactory
+ * @time: a #GstClockTime
+ *
+ * Configure the time to store for possible retransmission
+ */
+void
+gst_rtsp_media_factory_set_retransmission_time (GstRTSPMediaFactory * factory,
+    GstClockTime time)
+{
+  GstRTSPMediaFactoryPrivate *priv;
+
+  g_return_if_fail (GST_IS_RTSP_MEDIA_FACTORY (factory));
+
+  priv = factory->priv;
+
+  GST_DEBUG_OBJECT (factory, "retransmission time %" G_GUINT64_FORMAT, time);
+
+  GST_RTSP_MEDIA_FACTORY_LOCK (factory);
+  priv->rtx_time = time;
+  GST_RTSP_MEDIA_FACTORY_UNLOCK (factory);
+}
+
+/**
+ * gst_rtsp_media_factory_get_retransmission_time:
+ * @factory: a #GstRTSPMediaFactory
+ *
+ * Get the time that is stored for retransmission purposes
+ *
+ * Returns: a #GstClockTime
+ */
+GstClockTime
+gst_rtsp_media_factory_get_retransmission_time (GstRTSPMediaFactory * factory)
+{
+  GstRTSPMediaFactoryPrivate *priv;
+  GstClockTime res;
+
+  g_return_val_if_fail (GST_IS_RTSP_MEDIA_FACTORY (factory), 0);
+
+  priv = factory->priv;
+
+  GST_RTSP_MEDIA_FACTORY_LOCK (factory);
+  res = priv->rtx_time;
+  GST_RTSP_MEDIA_FACTORY_UNLOCK (factory);
+
+  return res;
+}
+
 static gboolean
 compare_media (gpointer key, GstRTSPMedia * media1, GstRTSPMedia * media2)
 {
@@ -1084,6 +1135,7 @@ default_configure (GstRTSPMediaFactory * factory, GstRTSPMedia * media)
   GstRTSPLowerTrans protocols;
   GstRTSPAddressPool *pool;
   GstRTSPPermissions *perms;
+  GstClockTime rtx_time;
 
   /* configure the sharedness */
   GST_RTSP_MEDIA_FACTORY_LOCK (factory);
@@ -1093,6 +1145,7 @@ default_configure (GstRTSPMediaFactory * factory, GstRTSPMedia * media)
   size = priv->buffer_size;
   profiles = priv->profiles;
   protocols = priv->protocols;
+  rtx_time = priv->rtx_time;
   GST_RTSP_MEDIA_FACTORY_UNLOCK (factory);
 
   gst_rtsp_media_set_suspend_mode (media, suspend_mode);
@@ -1101,6 +1154,7 @@ default_configure (GstRTSPMediaFactory * factory, GstRTSPMedia * media)
   gst_rtsp_media_set_buffer_size (media, size);
   gst_rtsp_media_set_profiles (media, profiles);
   gst_rtsp_media_set_protocols (media, protocols);
+  gst_rtsp_media_set_retransmission_time (media, rtx_time);
 
   if ((pool = gst_rtsp_media_factory_get_address_pool (factory))) {
     gst_rtsp_media_set_address_pool (media, pool);
index bb6aec3..ce069d7 100644 (file)
@@ -141,6 +141,9 @@ GstRTSPAddressPool *  gst_rtsp_media_factory_get_address_pool (GstRTSPMediaFacto
 void                  gst_rtsp_media_factory_set_buffer_size  (GstRTSPMediaFactory * factory,
                                                                guint size);
 guint                 gst_rtsp_media_factory_get_buffer_size  (GstRTSPMediaFactory * factory);
+void                  gst_rtsp_media_factory_set_retransmission_time (GstRTSPMediaFactory * factory,
+                                                                      GstClockTime time);
+GstClockTime          gst_rtsp_media_factory_get_retransmission_time (GstRTSPMediaFactory * factory);
 
 /* creating the media from the factory and a url */
 GstRTSPMedia *        gst_rtsp_media_factory_construct        (GstRTSPMediaFactory *factory,
index 85010dc..0c48c5c 100644 (file)
@@ -121,6 +121,9 @@ struct _GstRTSPMediaPrivate
   GstRTSPTimeRange range;       /* protected by lock */
   GstClockTime range_start;
   GstClockTime range_stop;
+
+  GList *payloads;              /* protected by lock */
+  GstClockTime rtx_time;        /* protected by lock */
 };
 
 #define DEFAULT_SHARED          FALSE
@@ -366,6 +369,8 @@ gst_rtsp_media_finalize (GObject * obj)
   gst_object_unref (priv->element);
   if (priv->pool)
     g_object_unref (priv->pool);
+  if (priv->payloads)
+    g_list_free (priv->payloads);
   g_mutex_clear (&priv->lock);
   g_cond_clear (&priv->cond);
   g_rec_mutex_clear (&priv->state_lock);
@@ -1091,6 +1096,63 @@ gst_rtsp_media_get_buffer_size (GstRTSPMedia * media)
 }
 
 /**
+ * gst_rtsp_media_set_retransmission_time:
+ * @media: a #GstRTSPMedia
+ * @time: the new value
+ *
+ * Set the amount of time to store retransmission packets.
+ */
+void
+gst_rtsp_media_set_retransmission_time (GstRTSPMedia * media, GstClockTime time)
+{
+  GstRTSPMediaPrivate *priv;
+  guint i;
+
+  g_return_if_fail (GST_IS_RTSP_MEDIA (media));
+
+  GST_LOG_OBJECT (media, "set retransmission time %" G_GUINT64_FORMAT, time);
+
+  priv = media->priv;
+
+  g_mutex_lock (&priv->lock);
+  priv->rtx_time = time;
+  for (i = 0; i < priv->streams->len; i++) {
+    GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
+
+    gst_rtsp_stream_set_retransmission_time (stream, time);
+  }
+
+  if (priv->rtpbin)
+    g_object_set (priv->rtpbin, "do-retransmission", time > 0, NULL);
+  g_mutex_unlock (&priv->lock);
+}
+
+/**
+ * gst_rtsp_media_get_retransmission_time:
+ * @media: a #GstRTSPMedia
+ *
+ * Get the amount of time to store retransmission data.
+ *
+ * Returns: the amount of time to store retransmission data.
+ */
+GstClockTime
+gst_rtsp_media_get_retransmission_time (GstRTSPMedia * media)
+{
+  GstRTSPMediaPrivate *priv;
+  GstClockTime res;
+
+  g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), FALSE);
+
+  priv = media->priv;
+
+  g_mutex_unlock (&priv->lock);
+  res = priv->rtx_time;
+  g_mutex_unlock (&priv->lock);
+
+  return res;
+}
+
+/**
  * gst_rtsp_media_use_time_provider:
  * @media: a #GstRTSPMedia
  * @time_provider: if a #GstNetTimeProvider should be used
@@ -1198,6 +1260,37 @@ gst_rtsp_media_get_address_pool (GstRTSPMedia * media)
   return result;
 }
 
+static GList *
+_find_payload_types (GstRTSPMedia * media)
+{
+  GList *ret = NULL;
+  gint i, n;
+
+  n = media->priv->streams->len;
+  for (i = 0; i < n; i++) {
+    GstRTSPStream *stream = g_ptr_array_index (media->priv->streams, i);
+    guint pt = gst_rtsp_stream_get_pt (stream);
+
+    ret = g_list_append (ret, GUINT_TO_POINTER (pt));
+  }
+
+  return ret;
+}
+
+static guint
+_next_available_pt (GList * payloads)
+{
+  guint i;
+
+  for (i = 96; i <= 127; i++) {
+    GList *iter = g_list_find (payloads, GINT_TO_POINTER (i));
+    if (!iter)
+      return GPOINTER_TO_UINT (i);
+  }
+
+  return 0;
+}
+
 /**
  * gst_rtsp_media_collect_streams:
  * @media: a #GstRTSPMedia
@@ -1279,6 +1372,7 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
   GstPad *srcpad;
   gchar *name;
   gint idx;
+  gint i, n;
 
   g_return_val_if_fail (GST_IS_RTSP_MEDIA (media), NULL);
   g_return_val_if_fail (GST_IS_ELEMENT (payloader), NULL);
@@ -1303,8 +1397,28 @@ gst_rtsp_media_create_stream (GstRTSPMedia * media, GstElement * payloader,
     gst_rtsp_stream_set_address_pool (stream, priv->pool);
   gst_rtsp_stream_set_profiles (stream, priv->profiles);
   gst_rtsp_stream_set_protocols (stream, priv->protocols);
+  gst_rtsp_stream_set_retransmission_time (stream, priv->rtx_time);
 
   g_ptr_array_add (priv->streams, stream);
+
+  if (priv->payloads)
+    g_list_free (priv->payloads);
+  priv->payloads = _find_payload_types (media);
+
+  n = priv->streams->len;
+  for (i = 0; i < n; i++) {
+    GstRTSPStream *stream = g_ptr_array_index (priv->streams, i);
+    guint rtx_pt = _next_available_pt (priv->payloads);
+
+    if (rtx_pt == 0) {
+      /* FIXME: ran out of space of dynamic payload types */
+      break;
+    }
+
+    gst_rtsp_stream_set_retransmission_pt (stream, rtx_pt);
+
+    priv->payloads = g_list_append (priv->payloads, GUINT_TO_POINTER (rtx_pt));
+  }
   g_mutex_unlock (&priv->lock);
 
   g_signal_emit (media, gst_rtsp_media_signals[SIGNAL_NEW_STREAM], 0, stream,
@@ -2149,6 +2263,9 @@ start_prepare (GstRTSPMedia * media)
     }
   }
 
+  if (priv->rtpbin)
+    g_object_set (priv->rtpbin, "do-retransmission", priv->rtx_time > 0, NULL);
+
   for (walk = priv->dynamic; walk; walk = g_list_next (walk)) {
     GstElement *elem = walk->data;
     DynPaySignalHandlers *handlers = g_slice_new (DynPaySignalHandlers);
index a78650d..7c7515b 100644 (file)
@@ -188,6 +188,9 @@ GstRTSPAddressPool *  gst_rtsp_media_get_address_pool (GstRTSPMedia *media);
 void                  gst_rtsp_media_set_buffer_size  (GstRTSPMedia *media, guint size);
 guint                 gst_rtsp_media_get_buffer_size  (GstRTSPMedia *media);
 
+void                  gst_rtsp_media_set_retransmission_time  (GstRTSPMedia *media, GstClockTime time);
+GstClockTime          gst_rtsp_media_get_retransmission_time  (GstRTSPMedia *media);
+
 void                  gst_rtsp_media_use_time_provider (GstRTSPMedia *media, gboolean time_provider);
 gboolean              gst_rtsp_media_is_time_provider  (GstRTSPMedia *media);
 GstNetTimeProvider *  gst_rtsp_media_get_time_provider (GstRTSPMedia *media,
index d2551c0..dfc54be 100644 (file)
@@ -120,6 +120,7 @@ make_media (GstSDPMessage * sdp, GstSDPInfo * info, GstRTSPMedia * media,
   const gchar *addrtype, *proto;
   gchar *address;
   guint ttl;
+  GstClockTime rtx_time;
 
   gst_sdp_media_new (&smedia);
 
@@ -340,6 +341,9 @@ make_media (GstSDPMessage * sdp, GstSDPInfo * info, GstRTSPMedia * media,
       continue;
     if (g_str_has_prefix (fname, "srtcp-"))
       continue;
+    /* handled later */
+    if (g_str_has_prefix (fname, "x-gst-rtsp-server-rtx-time"))
+      continue;
 
     if (g_str_has_prefix (fname, "a-")) {
       /* attribute */
@@ -359,6 +363,7 @@ make_media (GstSDPMessage * sdp, GstSDPInfo * info, GstRTSPMedia * media,
       first = FALSE;
     }
   }
+
   if (!first) {
     tmp = g_string_free (fmtp, FALSE);
     gst_sdp_media_add_attribute (smedia, "fmtp", tmp);
@@ -369,6 +374,32 @@ make_media (GstSDPMessage * sdp, GstSDPInfo * info, GstRTSPMedia * media,
 
   update_sdp_from_tags (stream, smedia);
 
+  if ((rtx_time = gst_rtsp_stream_get_retransmission_time (stream))) {
+    /* ssrc multiplexed retransmit functionality */
+    guint rtx_pt = gst_rtsp_stream_get_retransmission_pt (stream);
+
+    if (rtx_pt == 0) {
+      g_warning ("failed to find an available dynamic payload type. "
+          "Not adding retransmission");
+    } else {
+      gchar *tmp;
+
+      tmp = g_strdup_printf ("%d", rtx_pt);
+      gst_sdp_media_add_format (smedia, tmp);
+      g_free (tmp);
+
+      tmp = g_strdup_printf ("%d rtx/%d", rtx_pt, caps_rate);
+      gst_sdp_media_add_attribute (smedia, "rtpmap", tmp);
+      g_free (tmp);
+
+      tmp =
+          g_strdup_printf ("%d apt=%d;rtx-time=%" G_GINT64_FORMAT, rtx_pt,
+          caps_pt, GST_TIME_AS_MSECONDS (rtx_time));
+      gst_sdp_media_add_attribute (smedia, "fmtp", tmp);
+      g_free (tmp);
+    }
+  }
+
   gst_sdp_message_add_media (sdp, smedia);
   gst_sdp_media_free (smedia);
 
index f2ac3db..24569dd 100644 (file)
@@ -111,6 +111,11 @@ struct _GstRTSPStreamPrivate
   GstElement *tee[2];
   GstElement *funnel[2];
 
+  /* retransmission */
+  GstElement *rtxsend;
+  guint rtx_pt;
+  GstClockTime rtx_time;
+
   /* server ports for sending/receiving over ipv4 */
   GstRTSPRange server_port_v4;
   GstRTSPAddress *server_addr_v4;
@@ -271,6 +276,9 @@ gst_rtsp_stream_finalize (GObject * obj)
     gst_rtsp_address_free (priv->server_addr_v6);
   if (priv->pool)
     g_object_unref (priv->pool);
+  if (priv->rtxsend)
+    g_object_unref (priv->rtxsend);
+
   gst_object_unref (priv->payloader);
   gst_object_unref (priv->srcpad);
   g_free (priv->control);
@@ -1306,6 +1314,82 @@ gst_rtsp_stream_get_ssrc (GstRTSPStream * stream, guint * ssrc)
   g_mutex_unlock (&priv->lock);
 }
 
+/**
+ * gst_rtsp_stream_set_retransmission_time:
+ * @stream: a #GstRTSPStream
+ * @time: a #GstClockTime
+ *
+ * Set the amount of time to store retransmission packets.
+ */
+void
+gst_rtsp_stream_set_retransmission_time (GstRTSPStream * stream,
+    GstClockTime time)
+{
+  GST_DEBUG_OBJECT (stream, "set retransmission time %" G_GUINT64_FORMAT, time);
+
+  g_mutex_lock (&stream->priv->lock);
+  stream->priv->rtx_time = time;
+  if (stream->priv->rtxsend)
+    g_object_set (stream->priv->rtxsend, "max-size-time",
+        GST_TIME_AS_MSECONDS (time), NULL);
+  g_mutex_unlock (&stream->priv->lock);
+}
+
+/**
+ * gst_rtsp_media_get_retransmission_time:
+ * @media: a #GstRTSPMedia
+ *
+ * Get the amount of time to store retransmission data.
+ *
+ * Returns: the amount of time to store retransmission data.
+ */
+GstClockTime
+gst_rtsp_stream_get_retransmission_time (GstRTSPStream * stream)
+{
+  GstClockTime ret;
+
+  g_return_if_fail (GST_IS_RTSP_STREAM (stream));
+
+  g_mutex_lock (&stream->priv->lock);
+  ret = stream->priv->rtx_time;
+  g_mutex_unlock (&stream->priv->lock);
+
+  return ret;
+}
+
+void
+gst_rtsp_stream_set_retransmission_pt (GstRTSPStream * stream, guint rtx_pt)
+{
+  g_return_if_fail (GST_IS_RTSP_STREAM (stream));
+
+  GST_DEBUG_OBJECT (stream, "set retransmission pt %u", rtx_pt);
+
+  g_mutex_lock (&stream->priv->lock);
+  stream->priv->rtx_pt = rtx_pt;
+  if (stream->priv->rtxsend) {
+    guint pt = gst_rtsp_stream_get_pt (stream);
+    gchar *pt_s = g_strdup_printf ("%d", pt);
+    GstStructure *rtx_pt_map = gst_structure_new ("application/x-rtp-pt-map",
+        pt_s, G_TYPE_UINT, rtx_pt, NULL);
+    g_object_set (stream->priv->rtxsend, "payload-type-map", rtx_pt_map, NULL);
+  }
+  g_mutex_unlock (&stream->priv->lock);
+}
+
+guint
+gst_rtsp_stream_get_retransmission_pt (GstRTSPStream * stream)
+{
+  guint rtx_pt;
+
+  g_return_if_fail (GST_IS_RTSP_STREAM (stream));
+
+  g_mutex_lock (&stream->priv->lock);
+  rtx_pt = stream->priv->rtx_pt;
+  g_mutex_unlock (&stream->priv->lock);
+
+  return rtx_pt;
+}
+
 /* executed from streaming thread */
 static void
 caps_notify (GstPad * pad, GParamSpec * unused, GstRTSPStream * stream)
@@ -1657,6 +1741,46 @@ request_rtcp_decoder (GstElement * rtpbin, guint session,
   return gst_object_ref (priv->srtpdec);
 }
 
+static GstElement *
+request_aux_sender (GstElement * rtpbin, guint sessid, GstRTSPStream * stream)
+{
+  GstElement *bin;
+  GstPad *pad;
+  GstStructure *pt_map;
+  gchar *name;
+  guint pt, rtx_pt;
+  gchar *pt_s;
+
+  pt = gst_rtsp_stream_get_pt (stream);
+  pt_s = g_strdup_printf ("%u", pt);
+  rtx_pt = stream->priv->rtx_pt;
+
+  GST_INFO ("creating rtxsend with pt %u to %u", pt, rtx_pt);
+
+  bin = gst_bin_new (NULL);
+  stream->priv->rtxsend = gst_element_factory_make ("rtprtxsend", NULL);
+  pt_map = gst_structure_new ("application/x-rtp-pt-map",
+      pt_s, G_TYPE_UINT, rtx_pt, NULL);
+  g_object_set (stream->priv->rtxsend, "payload-type-map", pt_map,
+      "max-size-time", GST_TIME_AS_MSECONDS (stream->priv->rtx_time), NULL);
+  gst_structure_free (pt_map);
+  gst_bin_add (GST_BIN (bin), gst_object_ref (stream->priv->rtxsend));
+
+  pad = gst_element_get_static_pad (stream->priv->rtxsend, "src");
+  name = g_strdup_printf ("src_%u", sessid);
+  gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
+  g_free (name);
+  gst_object_unref (pad);
+
+  pad = gst_element_get_static_pad (stream->priv->rtxsend, "sink");
+  name = g_strdup_printf ("sink_%u", sessid);
+  gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
+  g_free (name);
+  gst_object_unref (pad);
+
+  return bin;
+}
+
 /**
  * gst_rtsp_stream_join_bin:
  * @stream: a #GstRTSPStream
@@ -1714,6 +1838,12 @@ gst_rtsp_stream_join_bin (GstRTSPStream * stream, GstBin * bin,
         (GCallback) request_rtcp_decoder, stream);
   }
 
+  if (priv->rtx_time > 0) {
+    /* enable retransmission by setting rtprtxsend as the "aux" element of rtpbin */
+    g_signal_connect (rtpbin, "request-aux-sender",
+        (GCallback) request_aux_sender, stream);
+  }
+
   /* get a pad for sending RTP */
   name = g_strdup_printf ("send_rtp_sink_%u", idx);
   priv->send_rtp_sink = gst_element_get_request_pad (rtpbin, name);
index 135689e..4fb4195 100644 (file)
@@ -147,7 +147,6 @@ GSocket *         gst_rtsp_stream_get_rtcp_socket  (GstRTSPStream *stream,
 gboolean          gst_rtsp_stream_update_crypto    (GstRTSPStream * stream,
                                                     guint ssrc, GstCaps * crypto);
 
-
 gboolean          gst_rtsp_stream_query_position   (GstRTSPStream * stream,
                                                     gint64 * position);
 gboolean          gst_rtsp_stream_query_stop       (GstRTSPStream * stream,
@@ -155,6 +154,11 @@ gboolean          gst_rtsp_stream_query_stop       (GstRTSPStream * stream,
 
 void              gst_rtsp_stream_set_seqnum_offset          (GstRTSPStream *stream, guint16 seqnum);
 guint16           gst_rtsp_stream_get_current_seqnum          (GstRTSPStream *stream);
+void              gst_rtsp_stream_set_retransmission_time     (GstRTSPStream *stream, GstClockTime time);
+GstClockTime      gst_rtsp_stream_get_retransmission_time     (GstRTSPStream *stream);
+guint             gst_rtsp_stream_get_retransmission_pt       (GstRTSPStream * stream);
+void              gst_rtsp_stream_set_retransmission_pt       (GstRTSPStream * stream,
+                                                               guint rtx_pt);
 
 /**
  * GstRTSPStreamTransportFilterFunc: