rtpbin: proxy new "add-reference-timestamp-meta" property from rtpjitterbuffer
[platform/upstream/gstreamer.git] / subprojects / gst-plugins-good / gst / rtpmanager / gstrtpbin.c
index 6708fd2..d578c03 100644 (file)
@@ -350,10 +350,12 @@ enum
 #define DEFAULT_MAX_DROPOUT_TIME     60000
 #define DEFAULT_MAX_MISORDER_TIME    2000
 #define DEFAULT_RFC7273_SYNC         FALSE
+#define DEFAULT_ADD_REFERENCE_TIMESTAMP_META FALSE
 #define DEFAULT_MAX_STREAMS          G_MAXUINT
 #define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT G_GUINT64_CONSTANT(0)
 #define DEFAULT_MAX_TS_OFFSET        G_GINT64_CONSTANT(3000000000)
 #define DEFAULT_MIN_TS_OFFSET        MIN_TS_OFFSET_ROUND_OFF_COMP
+#define DEFAULT_TS_OFFSET_SMOOTHING_FACTOR  0
 
 enum
 {
@@ -378,10 +380,12 @@ enum
   PROP_MAX_DROPOUT_TIME,
   PROP_MAX_MISORDER_TIME,
   PROP_RFC7273_SYNC,
+  PROP_ADD_REFERENCE_TIMESTAMP_META,
   PROP_MAX_STREAMS,
   PROP_MAX_TS_OFFSET_ADJUSTMENT,
   PROP_MAX_TS_OFFSET,
   PROP_MIN_TS_OFFSET,
+  PROP_TS_OFFSET_SMOOTHING_FACTOR,
   PROP_FEC_DECODERS,
   PROP_FEC_ENCODERS,
 };
@@ -1347,11 +1351,35 @@ stream_set_ts_offset (GstRtpBin * bin, GstRtpBinStream * stream,
     return;
   }
 
-  if (!stream->is_initialized) {
-    stream->avg_ts_offset = ts_offset;
-    stream->is_initialized = TRUE;
+  if (bin->ts_offset_smoothing_factor > 0) {
+    if (!stream->is_initialized) {
+      stream->avg_ts_offset = ts_offset;
+      stream->is_initialized = TRUE;
+    } else {
+      /* RMA algorithm using smoothing factor is following, but split into
+       * parts to check for overflows:
+       * stream->avg_ts_offset =
+       *   ((bin->ts_offset_smoothing_factor - 1) * stream->avg_ts_offset
+       *    + ts_offset) / bin->ts_offset_smoothing_factor
+       */
+      guint64 max_possible_smoothing_factor =
+          G_MAXINT64 / ABS (stream->avg_ts_offset);
+      gint64 cur_avg_product =
+          (bin->ts_offset_smoothing_factor - 1) * stream->avg_ts_offset;
+
+      if ((max_possible_smoothing_factor < bin->ts_offset_smoothing_factor) ||
+          (cur_avg_product > 0 && G_MAXINT64 - cur_avg_product < ts_offset) ||
+          (cur_avg_product < 0 && G_MININT64 - cur_avg_product > ts_offset)) {
+        GST_WARNING_OBJECT (bin,
+            "ts-offset-smoothing-factor calculation overflow, fallback to using ts-offset directly");
+        stream->avg_ts_offset = ts_offset;
+      } else {
+        stream->avg_ts_offset =
+            (cur_avg_product + ts_offset) / bin->ts_offset_smoothing_factor;
+      }
+    }
   } else {
-    stream->avg_ts_offset = (9 * stream->avg_ts_offset + ts_offset) / 10;
+    stream->avg_ts_offset = ts_offset;
   }
 
   g_object_get (stream->buffer, "ts-offset", &prev_ts_offset, NULL);
@@ -1879,6 +1907,9 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
     g_object_set (buffer, "max-misorder-time", rtpbin->max_misorder_time, NULL);
   if (g_object_class_find_property (jb_class, "rfc7273-sync"))
     g_object_set (buffer, "rfc7273-sync", rtpbin->rfc7273_sync, NULL);
+  if (g_object_class_find_property (jb_class, "add-reference-timestamp-meta"))
+    g_object_set (buffer, "add-reference-timestamp-meta",
+        rtpbin->add_reference_timestamp_meta, NULL);
   if (g_object_class_find_property (jb_class, "max-ts-offset-adjustment"))
     g_object_set (buffer, "max-ts-offset-adjustment",
         rtpbin->max_ts_offset_adjustment, NULL);
@@ -2737,6 +2768,23 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
           "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  /**
+   * GstRtpBin:add-reference-timestamp-meta:
+   *
+   * When syncing to a RFC7273 clock, add #GstReferenceTimestampMeta
+   * to buffers with the original reconstructed reference clock timestamp.
+   *
+   * Since: 1.22
+   */
+  g_object_class_install_property (gobject_class,
+      PROP_ADD_REFERENCE_TIMESTAMP_META,
+      g_param_spec_boolean ("add-reference-timestamp-meta",
+          "Add Reference Timestamp Meta",
+          "Add Reference Timestamp Meta to buffers with the original clock timestamp "
+          "before any adjustments when syncing to an RFC7273 clock.",
+          DEFAULT_ADD_REFERENCE_TIMESTAMP_META,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   g_object_class_install_property (gobject_class, PROP_MAX_STREAMS,
       g_param_spec_uint ("max-streams", "Max Streams",
           "The maximum number of streams to create for one session",
@@ -2800,6 +2848,30 @@ gst_rtp_bin_class_init (GstRtpBinClass * klass)
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
   /**
+   * GstRtpBin:ts-offset-smoothing-factor:
+   *
+   * Controls the weighting between previous and current timestamp offsets in
+   * a running moving average (RMA):
+   * ts_offset_average(n) =
+   *   ((ts-offset-smoothing-factor - 1) * ts_offset_average(n - 1) + ts_offset(n)) /
+   *   ts-offset-smoothing-factor
+   *
+   * This can stabilize the timestamp offset and prevent unnecessary skew
+   * corrections due to jitter introduced by network or system load.
+   *
+   * Since: 1.22
+   */
+  g_object_class_install_property (gobject_class,
+      PROP_TS_OFFSET_SMOOTHING_FACTOR,
+      g_param_spec_uint ("ts-offset-smoothing-factor",
+          "Timestamp Offset Smoothing Factor",
+          "Sets a smoothing factor for the timestamp offset in number of "
+          "values for a calculated running moving average. "
+          "(0 = no smoothing factor)", 0, G_MAXUINT,
+          DEFAULT_TS_OFFSET_SMOOTHING_FACTOR,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  /**
    * GstRtpBin:fec-decoders:
    *
    * Used to provide a factory used to build the FEC decoder for a
@@ -2917,12 +2989,14 @@ gst_rtp_bin_init (GstRtpBin * rtpbin)
   rtpbin->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
   rtpbin->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
   rtpbin->rfc7273_sync = DEFAULT_RFC7273_SYNC;
+  rtpbin->add_reference_timestamp_meta = DEFAULT_ADD_REFERENCE_TIMESTAMP_META;
   rtpbin->max_streams = DEFAULT_MAX_STREAMS;
   rtpbin->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT;
   rtpbin->max_ts_offset = DEFAULT_MAX_TS_OFFSET;
   rtpbin->max_ts_offset_is_set = FALSE;
   rtpbin->min_ts_offset = DEFAULT_MIN_TS_OFFSET;
   rtpbin->min_ts_offset_is_set = FALSE;
+  rtpbin->ts_offset_smoothing_factor = DEFAULT_TS_OFFSET_SMOOTHING_FACTOR;
 
   /* some default SDES entries */
   cname = g_strdup_printf ("user%u@host-%x", g_random_int (), g_random_int ());
@@ -3232,6 +3306,11 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id,
       gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
           "rfc7273-sync", value);
       break;
+    case PROP_ADD_REFERENCE_TIMESTAMP_META:
+      rtpbin->add_reference_timestamp_meta = g_value_get_boolean (value);
+      gst_rtp_bin_propagate_property_to_jitterbuffer (rtpbin,
+          "add-reference-timestamp-meta", value);
+      break;
     case PROP_MAX_STREAMS:
       rtpbin->max_streams = g_value_get_uint (value);
       break;
@@ -3248,6 +3327,9 @@ gst_rtp_bin_set_property (GObject * object, guint prop_id,
       rtpbin->min_ts_offset = g_value_get_uint64 (value);
       rtpbin->min_ts_offset_is_set = TRUE;
       break;
+    case PROP_TS_OFFSET_SMOOTHING_FACTOR:
+      rtpbin->ts_offset_smoothing_factor = g_value_get_uint (value);
+      break;
     case PROP_FEC_DECODERS:
       gst_rtp_bin_set_fec_decoders_struct (rtpbin, g_value_get_boxed (value));
       break;
@@ -3339,6 +3421,9 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id,
     case PROP_RFC7273_SYNC:
       g_value_set_boolean (value, rtpbin->rfc7273_sync);
       break;
+    case PROP_ADD_REFERENCE_TIMESTAMP_META:
+      g_value_set_boolean (value, rtpbin->add_reference_timestamp_meta);
+      break;
     case PROP_MAX_STREAMS:
       g_value_set_uint (value, rtpbin->max_streams);
       break;
@@ -3351,6 +3436,9 @@ gst_rtp_bin_get_property (GObject * object, guint prop_id,
     case PROP_MIN_TS_OFFSET:
       g_value_set_uint64 (value, rtpbin->min_ts_offset);
       break;
+    case PROP_TS_OFFSET_SMOOTHING_FACTOR:
+      g_value_set_uint (value, rtpbin->ts_offset_smoothing_factor);
+      break;
     case PROP_FEC_DECODERS:
       g_value_take_boxed (value, gst_rtp_bin_get_fec_decoders_struct (rtpbin));
       break;
@@ -4319,20 +4407,47 @@ remove_recv_rtp (GstRtpBin * rtpbin, GstRtpBinSession * session)
   }
 }
 
+static gint
+fec_sinkpad_find (const GValue * item, gchar * padname)
+{
+  GstPad *pad = g_value_get_object (item);
+  return g_strcmp0 (GST_PAD_NAME (pad), padname);
+}
+
 static GstPad *
 complete_session_fec (GstRtpBin * rtpbin, GstRtpBinSession * session,
     guint fec_idx)
 {
+  gboolean have_static_pad;
   gchar *padname;
+
   GstPad *ret;
+  GstIterator *it;
+  GValue item = { 0, };
 
   if (!ensure_early_fec_decoder (rtpbin, session))
     goto no_decoder;
 
-  GST_DEBUG_OBJECT (rtpbin, "getting FEC sink pad");
   padname = g_strdup_printf ("fec_%u", fec_idx);
-  ret = gst_element_request_pad_simple (session->early_fec_decoder, padname);
+
+  GST_DEBUG_OBJECT (rtpbin, "getting FEC sink pad %s", padname);
+
+  /* First try to find the decoder static pad that matches the padname */
+  it = gst_element_iterate_sink_pads (session->early_fec_decoder);
+  have_static_pad =
+      gst_iterator_find_custom (it, (GCompareFunc) fec_sinkpad_find, &item,
+      padname);
+
+  if (have_static_pad) {
+    ret = g_value_get_object (&item);
+    gst_object_ref (ret);
+    g_value_unset (&item);
+  } else {
+    ret = gst_element_request_pad_simple (session->early_fec_decoder, padname);
+  }
+
   g_free (padname);
+  gst_iterator_free (it);
 
   if (ret == NULL)
     goto pad_failed;
@@ -4590,9 +4705,24 @@ remove_recv_fec_for_pad (GstRtpBin * rtpbin, GstRtpBinSession * session,
   if (target) {
     item = g_slist_find (session->recv_fec_sinks, target);
     if (item) {
-      gst_element_release_request_pad (session->early_fec_decoder, item->data);
+      GstPadTemplate *templ;
+      GstPad *pad;
+
+      pad = item->data;
+      templ = gst_pad_get_pad_template (pad);
+
+      if (GST_PAD_TEMPLATE_PRESENCE (templ) == GST_PAD_REQUEST) {
+        GST_DEBUG_OBJECT (rtpbin,
+            "Releasing FEC decoder pad %" GST_PTR_FORMAT, pad);
+        gst_element_release_request_pad (session->early_fec_decoder, pad);
+      } else {
+        gst_object_unref (pad);
+      }
+
       session->recv_fec_sinks =
           g_slist_delete_link (session->recv_fec_sinks, item);
+
+      gst_object_unref (templ);
     }
     gst_object_unref (target);
   }
@@ -4810,8 +4940,7 @@ setup_aux_sender (GstRtpBin * rtpbin, GstRtpBinSession * session,
 }
 
 static void
-fec_encoder_pad_added_cb (GstElement * encoder, GstPad * pad,
-    GstRtpBinSession * session)
+fec_encoder_add_pad_unlocked (GstPad * pad, GstRtpBinSession * session)
 {
   GstElementClass *klass;
   gchar *gname;
@@ -4829,7 +4958,6 @@ fec_encoder_pad_added_cb (GstElement * encoder, GstPad * pad,
   GST_INFO_OBJECT (session->bin, "FEC encoder for session %u exposed new pad",
       session->id);
 
-  GST_RTP_BIN_LOCK (session->bin);
   klass = GST_ELEMENT_GET_CLASS (session->bin);
   gname = g_strdup_printf ("send_fec_src_%u_%u", session->id, fec_idx);
   templ = gst_element_class_get_pad_template (klass, "send_fec_src_%u_%u");
@@ -4840,12 +4968,50 @@ fec_encoder_pad_added_cb (GstElement * encoder, GstPad * pad,
   gst_pad_sticky_events_foreach (pad, copy_sticky_events, ghost);
   gst_element_add_pad (GST_ELEMENT (session->bin), ghost);
   g_free (gname);
-  GST_RTP_BIN_UNLOCK (session->bin);
 
 done:
   return;
 }
 
+static void
+fec_encoder_add_pad (GstPad * pad, GstRtpBinSession * session)
+{
+  GST_RTP_BIN_LOCK (session->bin);
+  fec_encoder_add_pad_unlocked (pad, session);
+  GST_RTP_BIN_UNLOCK (session->bin);
+}
+
+static gint
+fec_srcpad_iterator_filter (const GValue * item, GValue * unused)
+{
+  guint fec_idx;
+  GstPad *pad = g_value_get_object (item);
+  GstPadTemplate *templ = gst_pad_get_pad_template (pad);
+
+  gint have_static_pad =
+      (GST_PAD_TEMPLATE_PRESENCE (templ) == GST_PAD_ALWAYS) &&
+      (sscanf (GST_PAD_NAME (pad), "fec_%u", &fec_idx) == 1);
+
+  gst_object_unref (templ);
+
+  /* return 0 to retain pad in filtered iterator */
+  return !have_static_pad;
+}
+
+static void
+fec_srcpad_iterator_foreach (const GValue * item, GstRtpBinSession * session)
+{
+  GstPad *pad = g_value_get_object (item);
+  fec_encoder_add_pad_unlocked (pad, session);
+}
+
+static void
+fec_encoder_pad_added_cb (GstElement * encoder, GstPad * pad,
+    GstRtpBinSession * session)
+{
+  fec_encoder_add_pad (pad, session);
+}
+
 static GstElement *
 request_fec_encoder (GstRtpBin * rtpbin, GstRtpBinSession * session,
     guint sessid)
@@ -4883,6 +5049,28 @@ request_fec_encoder (GstRtpBin * rtpbin, GstRtpBinSession * session,
     ret = session_request_element (session, SIGNAL_REQUEST_FEC_ENCODER);
 
   if (ret) {
+    /* First, add encoder pads that match fec_% template and are already present */
+    GstIterator *it, *filter;
+    GstIteratorResult it_ret = GST_ITERATOR_OK;
+
+    it = gst_element_iterate_src_pads (ret);
+    filter =
+        gst_iterator_filter (it, (GCompareFunc) fec_srcpad_iterator_filter,
+        NULL);
+
+    while (it_ret == GST_ITERATOR_OK || it_ret == GST_ITERATOR_RESYNC) {
+      it_ret =
+          gst_iterator_foreach (filter,
+          (GstIteratorForeachFunction) fec_srcpad_iterator_foreach, session);
+
+      if (it_ret == GST_ITERATOR_RESYNC)
+        gst_iterator_resync (filter);
+    }
+
+    gst_iterator_free (filter);
+
+    /* Finally, connect to pad-added signal if any of the encoder pads are
+     * added later */
     g_signal_connect (ret, "pad-added", G_CALLBACK (fec_encoder_pad_added_cb),
         session);
   }