[MQTTsrc] add mqtt QoS as a property
authorHyoung Joo Ahn <hello.ahn@samsung.com>
Thu, 13 May 2021 10:28:24 +0000 (19:28 +0900)
committerwooksong <wook16.song@samsung.com>
Tue, 25 May 2021 02:40:40 +0000 (11:40 +0900)
MQTTsrc will return ack to server when it receives messages according to QoS level.

Signed-off-by: Hyoung Joo Ahn <hello.ahn@samsung.com>
gst/mqtt/mqttsrc.c
gst/mqtt/mqttsrc.h

index 9e00e1e..74a7a86 100644 (file)
@@ -49,6 +49,7 @@ enum
   PROP_MQTT_SUB_TIMEOUT,
   PROP_MQTT_OPT_CLEANSESSION,
   PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL,
+  PROP_MQTT_QOS,
 
   PROP_LAST
 };
@@ -61,6 +62,7 @@ enum
   DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL = 60,    /* 1 minute */
   DEFAULT_MQTT_SUB_TIMEOUT = 10000000,  /* 10 seconds */
   DEFAULT_MQTT_SUB_TIMEOUT_MIN = 1000000,       /* 1 seconds */
+  DEFAULT_MQTT_QOS = 2,         /* Once and one only */
 };
 
 static guint8 src_client_id = 0;
@@ -118,6 +120,8 @@ static void gst_mqtt_src_set_opt_cleansession (GstMqttSrc * self,
 static gint gst_mqtt_src_get_opt_keep_alive_interval (GstMqttSrc * self);
 static void gst_mqtt_src_set_opt_keep_alive_interval (GstMqttSrc * self,
     const gint num);
+static gint gst_mqtt_src_get_mqtt_qos (GstMqttSrc * self);
+static void gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos);
 
 static void cb_mqtt_on_connection_lost (void *context, char *cause);
 static int cb_mqtt_on_message_arrived (void *context, char *topic_name,
@@ -190,6 +194,7 @@ gst_mqtt_src_init (GstMqttSrc * self)
   self->mqtt_respn_opts.onSuccess = NULL;
   self->mqtt_respn_opts.onFailure = NULL;
   self->mqtt_respn_opts.context = self;
+  self->mqtt_qos = DEFAULT_MQTT_QOS;
 
   /** init private member variables */
   self->err = NULL;
@@ -272,6 +277,15 @@ gst_mqtt_src_class_init (GstMqttSrcClass * klass)
           1, G_MAXINT32, DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
+  g_object_class_install_property (gobject_class, PROP_MQTT_QOS,
+      g_param_spec_int ("mqtt-qos", "mqtt QoS level",
+          "The QoS level of MQTT.\n"
+          "\t\t\t  0: At most once\n"
+          "\t\t\t  1: At least once\n"
+          "\t\t\t  2: Exactly once\n"
+          "\t\t\tsee also: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/qos.html",
+          0, 2, DEFAULT_MQTT_QOS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_mqtt_src_change_state);
 
@@ -328,6 +342,9 @@ gst_mqtt_src_set_property (GObject * object, guint prop_id,
     case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
       gst_mqtt_src_set_opt_keep_alive_interval (self, g_value_get_int (value));
       break;
+    case PROP_MQTT_QOS:
+      gst_mqtt_src_set_mqtt_qos (self, g_value_get_int (value));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -371,6 +388,9 @@ gst_mqtt_src_get_property (GObject * object, guint prop_id,
     case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL:
       g_value_set_int (value, gst_mqtt_src_get_opt_keep_alive_interval (self));
       break;
+    case PROP_MQTT_QOS:
+      g_value_set_int (value, gst_mqtt_src_get_mqtt_qos (self));
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -978,6 +998,24 @@ gst_mqtt_src_set_opt_keep_alive_interval (GstMqttSrc * self, const gint num)
 }
 
 /**
+ * @brief Getter for the 'mqtt-qos' property
+ */
+static gint
+gst_mqtt_src_get_mqtt_qos (GstMqttSrc * self)
+{
+  return self->mqtt_qos;
+}
+
+/**
+ * @brief Setter for the 'mqtt-qos' property
+ */
+static void
+gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos)
+{
+  self->mqtt_qos = qos;
+}
+
+/**
   * @brief A callback to handle the connection lost to the broker
   */
 static void
@@ -1243,9 +1281,8 @@ _subscribe (GstMqttSrc * self)
   opts.onFailure = cb_mqtt_on_subscribe_failure;
   opts.subscribeOptions.retainHandling = 1;
 
-  /** @todo Support QoS option */
   mqttasync_ret = MQTTAsync_subscribe (self->mqtt_client_handle,
-      self->mqtt_topic, 0, &opts);
+      self->mqtt_topic, self->mqtt_qos, &opts);
   if (mqttasync_ret != MQTTASYNC_SUCCESS)
     return FALSE;
   return TRUE;
index 7cdd450..1be507e 100644 (file)
@@ -58,6 +58,7 @@ struct _GstMqttSrc {
   gboolean debug;
   gboolean is_live;
   guint64 num_dumped;
+  gint mqtt_qos;
 
   GAsyncQueue *aqueue;
   GMutex mqtt_src_mutex;