From d1bf683eee3be4f9994e7503aaad6feb113684c0 Mon Sep 17 00:00:00 2001 From: Hyoung Joo Ahn Date: Thu, 13 May 2021 19:28:24 +0900 Subject: [PATCH] [MQTTsrc] add mqtt QoS as a property MQTTsrc will return ack to server when it receives messages according to QoS level. Signed-off-by: Hyoung Joo Ahn --- gst/mqtt/mqttsrc.c | 41 +++++++++++++++++++++++++++++++++++++++-- gst/mqtt/mqttsrc.h | 1 + 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/gst/mqtt/mqttsrc.c b/gst/mqtt/mqttsrc.c index 9e00e1e..74a7a86 100644 --- a/gst/mqtt/mqttsrc.c +++ b/gst/mqtt/mqttsrc.c @@ -49,6 +49,7 @@ enum PROP_MQTT_SUB_TIMEOUT, PROP_MQTT_OPT_CLEANSESSION, PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL, + PROP_MQTT_QOS, PROP_LAST }; @@ -61,6 +62,7 @@ enum DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL = 60, /* 1 minute */ DEFAULT_MQTT_SUB_TIMEOUT = 10000000, /* 10 seconds */ DEFAULT_MQTT_SUB_TIMEOUT_MIN = 1000000, /* 1 seconds */ + DEFAULT_MQTT_QOS = 2, /* Once and one only */ }; static guint8 src_client_id = 0; @@ -118,6 +120,8 @@ static void gst_mqtt_src_set_opt_cleansession (GstMqttSrc * self, static gint gst_mqtt_src_get_opt_keep_alive_interval (GstMqttSrc * self); static void gst_mqtt_src_set_opt_keep_alive_interval (GstMqttSrc * self, const gint num); +static gint gst_mqtt_src_get_mqtt_qos (GstMqttSrc * self); +static void gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos); static void cb_mqtt_on_connection_lost (void *context, char *cause); static int cb_mqtt_on_message_arrived (void *context, char *topic_name, @@ -190,6 +194,7 @@ gst_mqtt_src_init (GstMqttSrc * self) self->mqtt_respn_opts.onSuccess = NULL; self->mqtt_respn_opts.onFailure = NULL; self->mqtt_respn_opts.context = self; + self->mqtt_qos = DEFAULT_MQTT_QOS; /** init private member variables */ self->err = NULL; @@ -272,6 +277,15 @@ gst_mqtt_src_class_init (GstMqttSrcClass * klass) 1, G_MAXINT32, DEFAULT_MQTT_OPT_KEEP_ALIVE_INTERVAL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_MQTT_QOS, + g_param_spec_int ("mqtt-qos", "mqtt QoS level", + "The QoS level of MQTT.\n" + "\t\t\t 0: At most once\n" + "\t\t\t 1: At least once\n" + "\t\t\t 2: Exactly once\n" + "\t\t\tsee also: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/qos.html", + 0, 2, DEFAULT_MQTT_QOS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_mqtt_src_change_state); @@ -328,6 +342,9 @@ gst_mqtt_src_set_property (GObject * object, guint prop_id, case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL: gst_mqtt_src_set_opt_keep_alive_interval (self, g_value_get_int (value)); break; + case PROP_MQTT_QOS: + gst_mqtt_src_set_mqtt_qos (self, g_value_get_int (value)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -371,6 +388,9 @@ gst_mqtt_src_get_property (GObject * object, guint prop_id, case PROP_MQTT_OPT_KEEP_ALIVE_INTERVAL: g_value_set_int (value, gst_mqtt_src_get_opt_keep_alive_interval (self)); break; + case PROP_MQTT_QOS: + g_value_set_int (value, gst_mqtt_src_get_mqtt_qos (self)); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -978,6 +998,24 @@ gst_mqtt_src_set_opt_keep_alive_interval (GstMqttSrc * self, const gint num) } /** + * @brief Getter for the 'mqtt-qos' property + */ +static gint +gst_mqtt_src_get_mqtt_qos (GstMqttSrc * self) +{ + return self->mqtt_qos; +} + +/** + * @brief Setter for the 'mqtt-qos' property + */ +static void +gst_mqtt_src_set_mqtt_qos (GstMqttSrc * self, const gint qos) +{ + self->mqtt_qos = qos; +} + +/** * @brief A callback to handle the connection lost to the broker */ static void @@ -1243,9 +1281,8 @@ _subscribe (GstMqttSrc * self) opts.onFailure = cb_mqtt_on_subscribe_failure; opts.subscribeOptions.retainHandling = 1; - /** @todo Support QoS option */ mqttasync_ret = MQTTAsync_subscribe (self->mqtt_client_handle, - self->mqtt_topic, 0, &opts); + self->mqtt_topic, self->mqtt_qos, &opts); if (mqttasync_ret != MQTTASYNC_SUCCESS) return FALSE; return TRUE; diff --git a/gst/mqtt/mqttsrc.h b/gst/mqtt/mqttsrc.h index 7cdd450..1be507e 100644 --- a/gst/mqtt/mqttsrc.h +++ b/gst/mqtt/mqttsrc.h @@ -58,6 +58,7 @@ struct _GstMqttSrc { gboolean debug; gboolean is_live; guint64 num_dumped; + gint mqtt_qos; GAsyncQueue *aqueue; GMutex mqtt_src_mutex; -- 2.7.4