[MQTT] Implement mqtt using mosquitto lib
authorgichan <gichan2.jang@samsung.com>
Mon, 17 Oct 2022 09:41:50 +0000 (18:41 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Mon, 31 Oct 2022 10:10:44 +0000 (19:10 +0900)
Implement mqtt using moosquitto lib.

Signed-off-by: gichan <gichan2.jang@samsung.com>
src/libnnstreamer-edge/nnstreamer-edge-mqtt-mosquitto.c
tests/unittest_nnstreamer-edge.cc

index 291a13e63eab89b581f480d08834202f0a8ab8ab..fa52859133d595263df18baf846be8584b22f688 100644 (file)
@@ -10,6 +10,7 @@
  * @bug    No known bugs except for NYI items
  */
 
+#include <mosquitto.h>
 #include "nnstreamer-edge-internal.h"
 #include "nnstreamer-edge-log.h"
 #include "nnstreamer-edge-util.h"
@@ -23,8 +24,102 @@ typedef struct
   void *mqtt_h;
   nns_edge_queue_h server_list;
   char *topic;
+  bool connected;
 } nns_edge_broker_s;
 
+/**
+ * @brief Callback function to be called when a message is arrived.
+ */
+static void
+on_message_callback (struct mosquitto *client, void *data,
+    const struct mosquitto_message *message)
+{
+  nns_edge_broker_s *bh = (nns_edge_broker_s *) data;
+  char *msg = NULL;
+
+  if (!bh) {
+    nns_edge_loge ("Invalid param, given broker handle is invalid.");
+    return;
+  }
+
+  if (0 >= message->payloadlen) {
+    nns_edge_logw ("Invalid payload lenth: %d", message->payloadlen);
+    return;
+  }
+
+  nns_edge_logd ("MQTT message is arrived (ID:%d, Topic:%s).",
+      message->mid, message->topic);
+
+  msg = nns_edge_memdup (message->payload, message->payloadlen);
+  if (msg)
+    nns_edge_queue_push (bh->server_list, msg, nns_edge_free);
+
+  return;
+}
+
+/**
+ * @brief Initializes MQTT object.
+ */
+static int
+_nns_edge_mqtt_init_client (nns_edge_handle_s * eh, const char *topic)
+{
+  nns_edge_broker_s *bh;
+  int mret;
+  char *client_id;
+  struct mosquitto *handle;
+  int ver = MQTT_PROTOCOL_V311;
+
+  mosquitto_lib_init ();
+
+  bh = (nns_edge_broker_s *) calloc (1, sizeof (nns_edge_broker_s));
+  if (!bh) {
+    nns_edge_loge ("Failed to allocate memory for broker handle.");
+    return NNS_EDGE_ERROR_OUT_OF_MEMORY;
+  }
+
+  client_id = nns_edge_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ());
+
+  handle = mosquitto_new (client_id, TRUE, NULL);
+  SAFE_FREE (client_id);
+
+  if (!handle) {
+    nns_edge_loge ("Failed to create mosquitto client instance.");
+    SAFE_FREE (bh);
+    mosquitto_lib_cleanup ();
+    return NNS_EDGE_ERROR_UNKNOWN;
+  }
+
+  mosquitto_user_data_set (handle, bh);
+
+  mret = mosquitto_opts_set (handle, MOSQ_OPT_PROTOCOL_VERSION, &ver);
+  if (MOSQ_ERR_SUCCESS != mret) {
+    nns_edge_loge ("Failed to set MQTT protocol version 3.1.1.");
+    goto error;
+  }
+
+  mosquitto_message_callback_set (handle, on_message_callback);
+
+  mret = mosquitto_loop_start (handle);
+  if (mret != MOSQ_ERR_SUCCESS) {
+    nns_edge_loge ("Failed to start mosquitto loop.");
+    goto error;
+  }
+
+  bh->topic = nns_edge_strdup (topic);
+  bh->mqtt_h = handle;
+  bh->connected = false;
+  nns_edge_queue_create (&bh->server_list);
+  eh->broker_h = bh;
+
+  return NNS_EDGE_ERROR_NONE;
+
+error:
+  SAFE_FREE (bh);
+  mosquitto_destroy (handle);
+  mosquitto_lib_cleanup ();
+  return NNS_EDGE_ERROR_UNKNOWN;
+}
+
 /**
  * @brief Connect to MQTT.
  * @note This is internal function for MQTT broker. You should call this with edge-handle lock.
@@ -32,9 +127,47 @@ typedef struct
 int
 nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
 {
-  UNUSED (edge_h);
-  UNUSED (topic);
-  return NNS_EDGE_ERROR_NOT_SUPPORTED;
+  nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+  int ret = NNS_EDGE_ERROR_NONE;
+  struct mosquitto *handle;
+
+  if (!STR_IS_VALID (topic)) {
+    nns_edge_loge ("Invalid param, given topic is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).",
+      eh->id, eh->dest_host, eh->dest_port);
+
+  if (NNS_EDGE_ERROR_NONE != _nns_edge_mqtt_init_client (eh, topic)) {
+    nns_edge_loge ("Failed to initialize the mqtt client objects.");
+    return NNS_EDGE_ERROR_CONNECTION_FAILURE;
+  }
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+  handle = bh->mqtt_h;
+
+  if (MOSQ_ERR_SUCCESS != mosquitto_connect (handle, eh->dest_host,
+          eh->dest_port, 60)) {
+    nns_edge_loge ("Failed to connect MQTT.");
+    ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+    goto error;
+  }
+
+  bh->connected = true;
+  return NNS_EDGE_ERROR_NONE;
+
+error:
+  nns_edge_mqtt_close (eh);
+  return ret;
 }
 
 /**
@@ -44,8 +177,39 @@ nns_edge_mqtt_connect (nns_edge_h edge_h, const char *topic)
 int
 nns_edge_mqtt_close (nns_edge_h edge_h)
 {
-  UNUSED (edge_h);
-  return NNS_EDGE_ERROR_NOT_SUPPORTED;
+  nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+  struct mosquitto *handle;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+  handle = bh->mqtt_h;
+
+  if (handle) {
+    nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).",
+        eh->id, eh->dest_host, eh->dest_port);
+
+    /* Clear retained message */
+    mosquitto_publish (handle, NULL, bh->topic, 0, NULL, 1, true);
+
+    mosquitto_disconnect (handle);
+    mosquitto_destroy (handle);
+    mosquitto_lib_cleanup ();
+  }
+
+  nns_edge_queue_destroy (bh->server_list);
+  bh->server_list = NULL;
+  SAFE_FREE (bh->topic);
+  SAFE_FREE (bh);
+  eh->broker_h = NULL;
+
+  return NNS_EDGE_ERROR_NONE;
 }
 
 /**
@@ -55,10 +219,45 @@ nns_edge_mqtt_close (nns_edge_h edge_h)
 int
 nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
 {
-  UNUSED (edge_h);
-  UNUSED (data);
-  UNUSED (length);
-  return NNS_EDGE_ERROR_NOT_SUPPORTED;
+  nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+  struct mosquitto *handle;
+  int ret;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!data || length <= 0) {
+    nns_edge_loge ("Invalid param, given data is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+  handle = bh->mqtt_h;
+
+  if (!handle) {
+    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!bh->connected) {
+    nns_edge_loge ("Failed to publish message, MQTT is not connected.");
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  /* Publish a message (default QoS 1 - at least once and retained true). */
+  ret = mosquitto_publish (handle, NULL, bh->topic, length, data, 1, true);
+  if (MOSQ_ERR_SUCCESS != ret) {
+    nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).",
+        eh->id, eh->topic);
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
 }
 
 /**
@@ -68,27 +267,91 @@ nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length)
 int
 nns_edge_mqtt_subscribe (nns_edge_h edge_h)
 {
-  UNUSED (edge_h);
-  return NNS_EDGE_ERROR_NOT_SUPPORTED;
+  nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+  void *handle;
+  int ret;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+  handle = bh->mqtt_h;
+
+  if (!handle) {
+    nns_edge_loge ("Invalid state, MQTT connection was not completed.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!bh->connected) {
+    nns_edge_loge ("Failed to subscribe, MQTT is not connected.");
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  /* Subscribe a topic (default QoS 1 - at least once). */
+  ret = mosquitto_subscribe (handle, NULL, bh->topic, 1);
+  if (MOSQ_ERR_SUCCESS != ret) {
+    nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).",
+        eh->id, eh->topic);
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
 }
 
 /**
- * @brief Check mqtt connection
+ * @brief Get message from mqtt broker.
  */
-bool
-nns_edge_mqtt_is_connected (nns_edge_h edge_h)
+int
+nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
 {
-  UNUSED (edge_h);
-  return false;
+  nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!msg) {
+    nns_edge_loge ("Invalid param, given msg param is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+
+  /* Wait for 1 second */
+  if (!nns_edge_queue_wait_pop (bh->server_list, 1000U, (void **) msg)) {
+    nns_edge_loge ("Failed to get message from mqtt broker within timeout.");
+    return NNS_EDGE_ERROR_UNKNOWN;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
 }
 
 /**
- * @brief Get message from mqtt broker.
+ * @brief Check mqtt connection
  */
-int
-nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg)
+bool
+nns_edge_mqtt_is_connected (nns_edge_h edge_h)
 {
-  UNUSED (edge_h);
-  UNUSED (msg);
-  return NNS_EDGE_ERROR_NOT_SUPPORTED;
+  nns_edge_handle_s *eh;
+  nns_edge_broker_s *bh;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return false;
+  }
+
+  bh = (nns_edge_broker_s *) eh->broker_h;
+
+  return bh->connected;
 }
index 8748969310eec4d51283a3a41a00dbd354f0f92d..3ceabd040257330ed0d43e1b306fa5e2505aab4a 100644 (file)
@@ -3548,7 +3548,7 @@ TEST(edgeMqtt, connectLocal)
   nns_edge_set_event_callback (server_h, _test_edge_hybrid_event_cb, _td_server);
   nns_edge_set_info (server_h, "HOST", "localhost");
   nns_edge_set_info (server_h, "PORT", "0");
-  nns_edge_set_info (server_h, "DEST_HOST", "tcp://localhost");
+  nns_edge_set_info (server_h, "DEST_HOST", "127.0.0.1");
   nns_edge_set_info (server_h, "DEST_PORT", "1883");
   nns_edge_set_info (server_h, "TOPIC", "temp-mqtt-topic");
   nns_edge_set_info (server_h, "CAPS", "test server");
@@ -3571,7 +3571,7 @@ TEST(edgeMqtt, connectLocal)
 
   usleep (200000);
 
-  ret = nns_edge_connect (client_h, "tcp://localhost", 1883);
+  ret = nns_edge_connect (client_h, "127.0.0.1", 1883);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
   usleep (10000);
 
@@ -3655,7 +3655,7 @@ TEST(edgeMqtt, connectInvalidParam2_n)
   ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
       NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  nns_edge_set_info (edge_h, "DEST_HOST", "tcp://localhost");
+  nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
   nns_edge_set_info (edge_h, "DEST_PORT", "1883");
 
   ret = nns_edge_mqtt_connect (edge_h, NULL);
@@ -3679,7 +3679,7 @@ TEST(edgeMqtt, connectInvalidParam3_n)
   ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
       NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  nns_edge_set_info (edge_h, "DEST_HOST", "tcp://localhost");
+  nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
   nns_edge_set_info (edge_h, "DEST_PORT", "1883");
 
   ret = nns_edge_mqtt_connect (edge_h, "");
@@ -3779,7 +3779,7 @@ TEST(edgeMqtt, publishInvalidParam2_n)
   ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
       NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  nns_edge_set_info (edge_h, "DEST_HOST", "tcp://localhost");
+  nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
   nns_edge_set_info (edge_h, "DEST_PORT", "1883");
 
   ret = nns_edge_start (edge_h);
@@ -3808,7 +3808,7 @@ TEST(edgeMqtt, publishInvalidParam3_n)
   ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
       NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  nns_edge_set_info (edge_h, "DEST_HOST", "tcp://localhost");
+  nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
   nns_edge_set_info (edge_h, "DEST_PORT", "1883");
 
   ret = nns_edge_start (edge_h);
@@ -3837,7 +3837,7 @@ TEST(edgeMqtt, publishInvalidParam4_n)
   ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
       NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  nns_edge_set_info (edge_h, "DEST_HOST", "tcp://localhost");
+  nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
   nns_edge_set_info (edge_h, "DEST_PORT", "1883");
 
   ret = nns_edge_mqtt_publish (edge_h, msg, strlen (msg) + 1);
@@ -3912,7 +3912,7 @@ TEST(edgeMqtt, getMessageInvalidParam2_n)
   ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
       NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  nns_edge_set_info (edge_h, "DEST_HOST", "tcp://localhost");
+  nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
   nns_edge_set_info (edge_h, "DEST_PORT", "1883");
 
   ret = nns_edge_start (edge_h);
@@ -3940,7 +3940,7 @@ TEST(edgeMqtt, getMessageWithinTimeout_n)
   ret = nns_edge_create_handle ("temp-server", NNS_EDGE_CONNECT_TYPE_HYBRID,
       NNS_EDGE_NODE_TYPE_QUERY_SERVER, &edge_h);
   EXPECT_EQ (ret, NNS_EDGE_ERROR_NONE);
-  nns_edge_set_info (edge_h, "DEST_HOST", "tcp://localhost");
+  nns_edge_set_info (edge_h, "DEST_HOST", "127.0.0.1");
   nns_edge_set_info (edge_h, "DEST_PORT", "1883");
 
   ret = nns_edge_mqtt_connect (edge_h, "temp-mqtt-topic");