#include <nnstreamer_util.h>
#include "mqttsink.h"
+#include "ntputil.h"
static GstStaticPadTemplate sink_pad_template = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
PROP_NUM_BUFFERS,
PROP_MAX_MSG_BUF_SIZE,
PROP_MQTT_QOS,
+ PROP_MQTT_NTP_SYNC,
+ PROP_MQTT_NTP_SRVS,
PROP_LAST
};
DEFAULT_MQTT_PUB_WAIT_TIMEOUT = 1, /* 1 secs */
DEFAULT_MAX_MSG_BUF_SIZE = 0, /* Buffer size is not fixed */
DEFAULT_MQTT_QOS = 0, /* fire and forget */
+ DEFAULT_MQTT_NTP_SYNC = FALSE,
+ MAX_LEN_PROP_NTP_SRVS = 4096,
};
static guint8 sink_client_id = 0;
static const gchar DEFAULT_MQTT_CLIENT_ID_FORMAT[] = "%s_%u_sink%u";
static const gchar DEFAULT_MQTT_PUB_TOPIC[] = "$client-id/topic";
static const gchar DEFAULT_MQTT_PUB_TOPIC_FORMAT[] = "%s/topic";
+static const gchar DEFAULT_MQTT_NTP_SERVERS[] = "pool.ntp.org:123";
/** Function prototype declarations */
static void
static void gst_mqtt_sink_set_num_buffers (GstMqttSink * self, const gint num);
static gint gst_mqtt_sink_get_mqtt_qos (GstMqttSink * self);
static void gst_mqtt_sink_set_mqtt_qos (GstMqttSink * self, const gint qos);
+static gboolean gst_mqtt_sink_get_mqtt_ntp_sync (GstMqttSink * self);
+static void gst_mqtt_sink_set_mqtt_ntp_sync (GstMqttSink * self,
+ const gboolean flag);
+static gchar *gst_mqtt_sink_get_mqtt_ntp_srvs (GstMqttSink * self);
+static void gst_mqtt_sink_set_mqtt_ntp_srvs (GstMqttSink * self,
+ const gchar * pairs);
static void cb_mqtt_on_connect (void *context,
MQTTAsync_successData * response);
self->mqtt_conn_opts.cleansession = DEFAULT_MQTT_OPT_CLEANSESSION;
self->mqtt_conn_opts.keepAliveInterval = DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL;
self->mqtt_qos = DEFAULT_MQTT_QOS;
+ self->mqtt_ntp_sync = DEFAULT_MQTT_NTP_SYNC;
+ self->mqtt_ntp_srvs = g_strdup (DEFAULT_MQTT_NTP_SERVERS);
+ self->mqtt_ntp_hnames = NULL;
+ self->mqtt_ntp_ports = NULL;
+ self->mqtt_ntp_num_srvs = 0;
+ self->get_epoch_func = default_mqtt_get_unix_epoch;
/** init basesink properties */
gst_base_sink_set_qos_enabled (basesink, DEFAULT_QOS);
"Network port of host (broker) to connect to", DEFAULT_MQTT_HOST_PORT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ g_object_class_install_property (gobject_class, PROP_MQTT_NTP_SYNC,
+ g_param_spec_boolean ("ntp-sync", "NTP Synchronization",
+ "Synchronize received streams to the NTP clock",
+ DEFAULT_MQTT_NTP_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ g_object_class_install_property (gobject_class, PROP_MQTT_NTP_SRVS,
+ g_param_spec_string ("ntp-srvs", "NTP Server Host Name and Port Pairs",
+ "NTP Servers' HOST_NAME:PORT pairs to use (valid only if ntp-sync is true)\n"
+ "\t\t\tUse ',' to separate each pair if there are more pairs than one",
+ DEFAULT_MQTT_NTP_SERVERS,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
g_object_class_install_property (gobject_class, PROP_MQTT_PUB_TOPIC,
g_param_spec_string ("pub-topic", "Topic to Publish",
"The topic's name to publish", NULL,
case PROP_MQTT_QOS:
gst_mqtt_sink_set_mqtt_qos (self, g_value_get_int (value));
break;
+ case PROP_MQTT_NTP_SYNC:
+ gst_mqtt_sink_set_mqtt_ntp_sync (self, g_value_get_boolean (value));
+ break;
+ case PROP_MQTT_NTP_SRVS:
+ gst_mqtt_sink_set_mqtt_ntp_srvs (self, g_value_get_string (value));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_MQTT_QOS:
g_value_set_int (value, gst_mqtt_sink_get_mqtt_qos (self));
break;
+ case PROP_MQTT_NTP_SYNC:
+ g_value_set_boolean (value, gst_mqtt_sink_get_mqtt_ntp_sync (self));
+ break;
+ case PROP_MQTT_NTP_SRVS:
+ g_value_set_string (value, gst_mqtt_sink_get_mqtt_ntp_srvs (self));
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
self->mqtt_topic = NULL;
gst_caps_replace (&self->in_caps, NULL);
g_free (self->mqtt_msg_buf);
+ g_free (self->mqtt_ntp_srvs);
+ self->mqtt_ntp_srvs = NULL;
+ self->mqtt_ntp_num_srvs = 0;
+ g_strfreev (self->mqtt_ntp_hnames);
+ self->mqtt_ntp_hnames = NULL;
+ g_free (self->mqtt_ntp_ports);
+ self->mqtt_ntp_ports = NULL;
if (self->err)
g_error_free (self->err);
GST_INFO_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
+ if (self->mqtt_ntp_sync)
+ self->get_epoch_func = ntputil_get_epoch;
self->base_time_epoch = GST_CLOCK_TIME_NONE;
elem_clock = gst_element_get_clock (element);
if (!elem_clock)
gst_object_unref (elem_clock);
diff = GST_CLOCK_DIFF (base_time, cur_time);
self->base_time_epoch =
- g_get_real_time () * GST_US_TO_NS_MULTIPLIER - diff;
+ self->get_epoch_func (self->mqtt_ntp_num_srvs, self->mqtt_ntp_hnames,
+ self->mqtt_ntp_ports) * GST_US_TO_NS_MULTIPLIER - diff;
GST_INFO_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_PLAYING");
break;
default:
GstMQTTMessageHdr * hdr)
{
hdr->base_time_epoch = self->base_time_epoch;
- hdr->sent_time_epoch = g_get_real_time () * GST_US_TO_NS_MULTIPLIER;
+ hdr->sent_time_epoch = self->get_epoch_func (self->mqtt_ntp_num_srvs,
+ self->mqtt_ntp_hnames, self->mqtt_ntp_ports) * GST_US_TO_NS_MULTIPLIER;
hdr->duration = GST_BUFFER_DURATION_IS_VALID (gst_buf) ?
GST_BUFFER_DURATION (gst_buf) : GST_CLOCK_TIME_NONE;
self->mqtt_qos = qos;
}
+/**
+ * @brief Getter for the 'ntp-sync' property.
+ */
+static gboolean
+gst_mqtt_sink_get_mqtt_ntp_sync (GstMqttSink * self)
+{
+ return self->mqtt_ntp_sync;
+}
+
+/**
+ * @brief Setter for the 'ntp-sync' property
+ */
+static void
+gst_mqtt_sink_set_mqtt_ntp_sync (GstMqttSink * self, const gboolean flag)
+{
+ self->mqtt_ntp_sync = flag;
+}
+
+/**
+ * @brief Getter for the 'ntp-srvs' property.
+ */
+static gchar *
+gst_mqtt_sink_get_mqtt_ntp_srvs (GstMqttSink * self)
+{
+ return self->mqtt_ntp_srvs;
+}
+
+/**
+ * @brief Setter for the 'ntp-srvs' property
+ */
+static void
+gst_mqtt_sink_set_mqtt_ntp_srvs (GstMqttSink * self, const gchar * pairs)
+{
+ gchar **pair_arrs = NULL;
+ guint hnum = 0;
+ gchar *pair;
+ guint i, j;
+
+ if (g_strcmp0 (self->mqtt_ntp_srvs, pairs) == 0)
+ return;
+
+ g_free (self->mqtt_ntp_srvs);
+ self->mqtt_ntp_srvs = g_strdup (pairs);
+
+ pair_arrs = g_strsplit (pairs, ",", -1);
+ if (pair_arrs == NULL)
+ return;
+
+ hnum = g_strv_length (pair_arrs);
+ if (hnum == 0)
+ goto err_free_pair_arrs;
+
+ g_free (self->mqtt_ntp_hnames);
+ self->mqtt_ntp_hnames =
+ g_try_malloc0 (hnum * sizeof (*self->mqtt_ntp_hnames));
+ if (!self->mqtt_ntp_hnames)
+ goto err_free_pair_arrs;
+
+ g_free (self->mqtt_ntp_ports);
+ self->mqtt_ntp_ports = g_try_malloc0 (hnum * sizeof (self->mqtt_ntp_ports));
+ if (!self->mqtt_ntp_ports)
+ goto err_free_mqtt_ntp_hnames;
+
+ self->mqtt_ntp_num_srvs = hnum;
+ for (i = 0, j = 0; i < hnum; i++) {
+ gchar **hname_port;
+ gchar *hname;
+ gchar *eport;
+ gulong port_ul;
+
+ pair = pair_arrs[i];
+ hname_port = g_strsplit (pair, ":", 2);
+ hname = hname_port[0];
+ port_ul = strtoul (hname_port[1], &eport, 10);
+ if ((port_ul == 0) || (port_ul > UINT16_MAX)) {
+ self->mqtt_ntp_num_srvs--;
+ } else {
+ self->mqtt_ntp_hnames[j] = g_strdup (hname);
+ self->mqtt_ntp_ports[j] = (uint16_t) port_ul;
+ ++j;
+ }
+
+ g_strfreev (hname_port);
+ }
+
+ g_strfreev (pair_arrs);
+ return;
+
+err_free_mqtt_ntp_hnames:
+ g_strfreev (self->mqtt_ntp_hnames);
+ self->mqtt_ntp_hnames = NULL;
+
+err_free_pair_arrs:
+ g_strfreev (pair_arrs);
+
+ return;
+}
+
/** Callback function definitions */
/**
* @brief A callback function corresponding to MQTTAsync_connectOptions's