[Gst/MQTT] Introduce the Unix epoch synchronization using NTP
authorWook Song <wook16.song@samsung.com>
Wed, 28 Jul 2021 09:30:57 +0000 (18:30 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Thu, 30 Sep 2021 01:40:23 +0000 (10:40 +0900)
This patch introduces the self-synchronization mechanism of the Unix
epoch timestamps. Note that if the 'ntp-sync' property is set to true
(which means that if this feature is enabled), the pipeline would be
stuck after a while. This is because most of the public NTP servers
restrict clients not to poll too frequently. To fix it, polling
intervals and caching mechanisms should be applied to the NPT utility
functions.

Signed-off-by: Wook Song <wook16.song@samsung.com>
gst/mqtt/meson.build
gst/mqtt/mqttcommon.h
gst/mqtt/mqttsink.c
gst/mqtt/mqttsink.h

index e27065e..6f1537a 100644 (file)
@@ -5,6 +5,7 @@ mqtt_plugin_srcs = [
   join_paths(meson.current_source_dir(), 'mqttsink.c'),
   join_paths(meson.current_source_dir(), 'mqttsrc.c'),
   join_paths(meson.current_source_dir(), 'mqttelements.c'),
+  join_paths(meson.current_source_dir(), 'ntputil.c'),
 ]
 
 gstmqtt_shared = shared_library('gstmqtt',
index fc92aee..c906503 100644 (file)
 
 #ifndef __GST_MQTT_COMMON_H__
 #define __GST_MQTT_COMMON_H__
+#include <stdint.h>
+
+#ifndef UNUSED
+#define UNUSED(expr) do { (void)(expr); } while (0)
+#endif /* UNUSED */
 
 #ifndef GST_MQTT_PACKAGE
 #define GST_MQTT_PACKAGE "GStreamer MQTT Plugins"
@@ -55,4 +60,19 @@ typedef struct _GstMQTTMessageHdr {
   };
 } GstMQTTMessageHdr;
 
+typedef int64_t (*mqtt_get_unix_epoch)(uint32_t, char **, uint16_t *);
+
+/**
+ * @brief A wrapper function of g_get_real_time () to assign it to the function
+ * pointer, mqtt_get_unix_epoch
+ */
+static inline int64_t default_mqtt_get_unix_epoch (uint32_t hnum, char **hnames,
+    uint16_t *hports)
+{
+  UNUSED (hnum);
+  UNUSED (hnames);
+  UNUSED (hports);
+  return g_get_real_time ();
+}
+
 #endif /* !__GST_MQTT_COMMON_H__ */
index 3692f7e..0fde444 100644 (file)
@@ -27,6 +27,7 @@
 #include <nnstreamer_util.h>
 
 #include "mqttsink.h"
+#include "ntputil.h"
 
 static GstStaticPadTemplate sink_pad_template = GST_STATIC_PAD_TEMPLATE ("sink",
     GST_PAD_SINK, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
@@ -52,6 +53,8 @@ enum
   PROP_NUM_BUFFERS,
   PROP_MAX_MSG_BUF_SIZE,
   PROP_MQTT_QOS,
+  PROP_MQTT_NTP_SYNC,
+  PROP_MQTT_NTP_SRVS,
 
   PROP_LAST
 };
@@ -68,6 +71,8 @@ enum
   DEFAULT_MQTT_PUB_WAIT_TIMEOUT = 1,    /* 1 secs */
   DEFAULT_MAX_MSG_BUF_SIZE = 0, /* Buffer size is not fixed */
   DEFAULT_MQTT_QOS = 0,         /* fire and forget */
+  DEFAULT_MQTT_NTP_SYNC = FALSE,
+  MAX_LEN_PROP_NTP_SRVS = 4096,
 };
 
 static guint8 sink_client_id = 0;
@@ -78,6 +83,7 @@ static const gchar DEFAULT_MQTT_CLIENT_ID[] = "$HOST_$PID_^[0-9][0-9]?$|^255$";
 static const gchar DEFAULT_MQTT_CLIENT_ID_FORMAT[] = "%s_%u_sink%u";
 static const gchar DEFAULT_MQTT_PUB_TOPIC[] = "$client-id/topic";
 static const gchar DEFAULT_MQTT_PUB_TOPIC_FORMAT[] = "%s/topic";
+static const gchar DEFAULT_MQTT_NTP_SERVERS[] = "pool.ntp.org:123";
 
 /** Function prototype declarations */
 static void
@@ -131,6 +137,12 @@ static gint gst_mqtt_sink_get_num_buffers (GstMqttSink * self);
 static void gst_mqtt_sink_set_num_buffers (GstMqttSink * self, const gint num);
 static gint gst_mqtt_sink_get_mqtt_qos (GstMqttSink * self);
 static void gst_mqtt_sink_set_mqtt_qos (GstMqttSink * self, const gint qos);
+static gboolean gst_mqtt_sink_get_mqtt_ntp_sync (GstMqttSink * self);
+static void gst_mqtt_sink_set_mqtt_ntp_sync (GstMqttSink * self,
+    const gboolean flag);
+static gchar *gst_mqtt_sink_get_mqtt_ntp_srvs (GstMqttSink * self);
+static void gst_mqtt_sink_set_mqtt_ntp_srvs (GstMqttSink * self,
+    const gchar * pairs);
 
 static void cb_mqtt_on_connect (void *context,
     MQTTAsync_successData * response);
@@ -193,6 +205,12 @@ gst_mqtt_sink_init (GstMqttSink * self)
   self->mqtt_conn_opts.cleansession = DEFAULT_MQTT_OPT_CLEANSESSION;
   self->mqtt_conn_opts.keepAliveInterval = DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL;
   self->mqtt_qos = DEFAULT_MQTT_QOS;
+  self->mqtt_ntp_sync = DEFAULT_MQTT_NTP_SYNC;
+  self->mqtt_ntp_srvs = g_strdup (DEFAULT_MQTT_NTP_SERVERS);
+  self->mqtt_ntp_hnames = NULL;
+  self->mqtt_ntp_ports = NULL;
+  self->mqtt_ntp_num_srvs = 0;
+  self->get_epoch_func = default_mqtt_get_unix_epoch;
 
   /** init basesink properties */
   gst_base_sink_set_qos_enabled (basesink, DEFAULT_QOS);
@@ -236,6 +254,18 @@ gst_mqtt_sink_class_init (GstMqttSinkClass * klass)
           "Network port of host (broker) to connect to", DEFAULT_MQTT_HOST_PORT,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_MQTT_NTP_SYNC,
+      g_param_spec_boolean ("ntp-sync", "NTP Synchronization",
+          "Synchronize received streams to the NTP clock",
+          DEFAULT_MQTT_NTP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_MQTT_NTP_SRVS,
+      g_param_spec_string ("ntp-srvs", "NTP Server Host Name and Port Pairs",
+          "NTP Servers' HOST_NAME:PORT pairs to use (valid only if ntp-sync is true)\n"
+          "\t\t\tUse ',' to separate each pair if there are more pairs than one",
+          DEFAULT_MQTT_NTP_SERVERS,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   g_object_class_install_property (gobject_class, PROP_MQTT_PUB_TOPIC,
       g_param_spec_string ("pub-topic", "Topic to Publish",
           "The topic's name to publish", NULL,
@@ -345,6 +375,12 @@ gst_mqtt_sink_set_property (GObject * object, guint prop_id,
     case PROP_MQTT_QOS:
       gst_mqtt_sink_set_mqtt_qos (self, g_value_get_int (value));
       break;
+    case PROP_MQTT_NTP_SYNC:
+      gst_mqtt_sink_set_mqtt_ntp_sync (self, g_value_get_boolean (value));
+      break;
+    case PROP_MQTT_NTP_SRVS:
+      gst_mqtt_sink_set_mqtt_ntp_srvs (self, g_value_get_string (value));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -394,6 +430,12 @@ gst_mqtt_sink_get_property (GObject * object, guint prop_id,
     case PROP_MQTT_QOS:
       g_value_set_int (value, gst_mqtt_sink_get_mqtt_qos (self));
       break;
+    case PROP_MQTT_NTP_SYNC:
+      g_value_set_boolean (value, gst_mqtt_sink_get_mqtt_ntp_sync (self));
+      break;
+    case PROP_MQTT_NTP_SRVS:
+      g_value_set_string (value, gst_mqtt_sink_get_mqtt_ntp_srvs (self));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -422,6 +464,13 @@ gst_mqtt_sink_class_finalize (GObject * object)
   self->mqtt_topic = NULL;
   gst_caps_replace (&self->in_caps, NULL);
   g_free (self->mqtt_msg_buf);
+  g_free (self->mqtt_ntp_srvs);
+  self->mqtt_ntp_srvs = NULL;
+  self->mqtt_ntp_num_srvs = 0;
+  g_strfreev (self->mqtt_ntp_hnames);
+  self->mqtt_ntp_hnames = NULL;
+  g_free (self->mqtt_ntp_ports);
+  self->mqtt_ntp_ports = NULL;
 
   if (self->err)
     g_error_free (self->err);
@@ -455,6 +504,8 @@ gst_mqtt_sink_change_state (GstElement * element, GstStateChange transition)
       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+      if (self->mqtt_ntp_sync)
+        self->get_epoch_func = ntputil_get_epoch;
       self->base_time_epoch = GST_CLOCK_TIME_NONE;
       elem_clock = gst_element_get_clock (element);
       if (!elem_clock)
@@ -464,7 +515,8 @@ gst_mqtt_sink_change_state (GstElement * element, GstStateChange transition)
       gst_object_unref (elem_clock);
       diff = GST_CLOCK_DIFF (base_time, cur_time);
       self->base_time_epoch =
-          g_get_real_time () * GST_US_TO_NS_MULTIPLIER - diff;
+          self->get_epoch_func (self->mqtt_ntp_num_srvs, self->mqtt_ntp_hnames,
+          self->mqtt_ntp_ports) * GST_US_TO_NS_MULTIPLIER - diff;
       GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_PLAYING");
       break;
     default:
@@ -609,7 +661,8 @@ _put_timestamp_to_msg_buf_hdr (GstMqttSink * self, GstBuffer * gst_buf,
     GstMQTTMessageHdr * hdr)
 {
   hdr->base_time_epoch = self->base_time_epoch;
-  hdr->sent_time_epoch = g_get_real_time () * GST_US_TO_NS_MULTIPLIER;
+  hdr->sent_time_epoch = self->get_epoch_func (self->mqtt_ntp_num_srvs,
+      self->mqtt_ntp_hnames, self->mqtt_ntp_ports) * GST_US_TO_NS_MULTIPLIER;
 
   hdr->duration = GST_BUFFER_DURATION_IS_VALID (gst_buf) ?
       GST_BUFFER_DURATION (gst_buf) : GST_CLOCK_TIME_NONE;
@@ -1067,6 +1120,104 @@ gst_mqtt_sink_set_mqtt_qos (GstMqttSink * self, const gint qos)
   self->mqtt_qos = qos;
 }
 
+/**
+ * @brief Getter for the 'ntp-sync' property.
+ */
+static gboolean
+gst_mqtt_sink_get_mqtt_ntp_sync (GstMqttSink * self)
+{
+  return self->mqtt_ntp_sync;
+}
+
+/**
+ * @brief Setter for the 'ntp-sync' property
+ */
+static void
+gst_mqtt_sink_set_mqtt_ntp_sync (GstMqttSink * self, const gboolean flag)
+{
+  self->mqtt_ntp_sync = flag;
+}
+
+/**
+ * @brief Getter for the 'ntp-srvs' property.
+ */
+static gchar *
+gst_mqtt_sink_get_mqtt_ntp_srvs (GstMqttSink * self)
+{
+  return self->mqtt_ntp_srvs;
+}
+
+/**
+ * @brief Setter for the 'ntp-srvs' property
+ */
+static void
+gst_mqtt_sink_set_mqtt_ntp_srvs (GstMqttSink * self, const gchar * pairs)
+{
+  gchar **pair_arrs = NULL;
+  guint hnum = 0;
+  gchar *pair;
+  guint i, j;
+
+  if (g_strcmp0 (self->mqtt_ntp_srvs, pairs) == 0)
+    return;
+
+  g_free (self->mqtt_ntp_srvs);
+  self->mqtt_ntp_srvs = g_strdup (pairs);
+
+  pair_arrs = g_strsplit (pairs, ",", -1);
+  if (pair_arrs == NULL)
+    return;
+
+  hnum = g_strv_length (pair_arrs);
+  if (hnum == 0)
+    goto err_free_pair_arrs;
+
+  g_free (self->mqtt_ntp_hnames);
+  self->mqtt_ntp_hnames =
+      g_try_malloc0 (hnum * sizeof (*self->mqtt_ntp_hnames));
+  if (!self->mqtt_ntp_hnames)
+    goto err_free_pair_arrs;
+
+  g_free (self->mqtt_ntp_ports);
+  self->mqtt_ntp_ports = g_try_malloc0 (hnum * sizeof (self->mqtt_ntp_ports));
+  if (!self->mqtt_ntp_ports)
+    goto err_free_mqtt_ntp_hnames;
+
+  self->mqtt_ntp_num_srvs = hnum;
+  for (i = 0, j = 0; i < hnum; i++) {
+    gchar **hname_port;
+    gchar *hname;
+    gchar *eport;
+    gulong port_ul;
+
+    pair = pair_arrs[i];
+    hname_port = g_strsplit (pair, ":", 2);
+    hname = hname_port[0];
+    port_ul = strtoul (hname_port[1], &eport, 10);
+    if ((port_ul == 0) || (port_ul > UINT16_MAX)) {
+      self->mqtt_ntp_num_srvs--;
+    } else {
+      self->mqtt_ntp_hnames[j] = g_strdup (hname);
+      self->mqtt_ntp_ports[j] = (uint16_t) port_ul;
+      ++j;
+    }
+
+    g_strfreev (hname_port);
+  }
+
+  g_strfreev (pair_arrs);
+  return;
+
+err_free_mqtt_ntp_hnames:
+  g_strfreev (self->mqtt_ntp_hnames);
+  self->mqtt_ntp_hnames = NULL;
+
+err_free_pair_arrs:
+  g_strfreev (pair_arrs);
+
+  return;
+}
+
 /** Callback function definitions */
 /**
  * @brief A callback function corresponding to MQTTAsync_connectOptions's
index 78d9ec6..f5f3482 100644 (file)
@@ -75,6 +75,13 @@ struct _GstMqttSink {
   mqtt_sink_state_t mqtt_sink_state;
   gboolean debug;
   gint mqtt_qos;
+  gboolean mqtt_ntp_sync;
+  guint mqtt_ntp_num_srvs;
+  gchar *mqtt_ntp_srvs;
+  gchar **mqtt_ntp_hnames;
+  guint16 *mqtt_ntp_ports;
+
+  mqtt_get_unix_epoch get_epoch_func;
 
   GstMQTTMessageHdr mqtt_msg_hdr;
   gpointer mqtt_msg_buf;