[AITT] Support AITT as edge connection type
authorgichan <gichan2.jang@samsung.com>
Mon, 29 Aug 2022 07:24:56 +0000 (16:24 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Tue, 6 Sep 2022 02:42:44 +0000 (11:42 +0900)
Support AITT as edge connection type.

Signed-off-by: gichan <gichan2.jang@samsung.com>
CMakeLists.txt
include/nnstreamer-edge.h
packaging/nnstreamer-edge.spec
src/CMakeLists.txt
src/libnnstreamer-edge/nnstreamer-edge-aitt.c
src/libnnstreamer-edge/nnstreamer-edge-internal.c
src/libnnstreamer-edge/nnstreamer-edge-internal.h

index 8f7325a74e8f5451a55514c4637b01ad7d9de793..f5d6bb7ce0cb3c4c39eee66a32f616e28fb37545 100644 (file)
@@ -61,6 +61,19 @@ IF(ENABLE_PAHO_MQTT)
     SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_MQTT=1")
 ENDIF()
 
+# AITT Library
+SET(ENABLE_AITT OFF)
+FIND_LIBRARY(AITT_LIB NAMES aitt)
+IF(NOT AITT_LIB)
+    MESSAGE("Cannot find AITT library.")
+ELSE()
+    MESSAGE("Found AITT library.")
+    SET(ENABLE_AITT ON)
+    SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_AITT=1")
+    SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_AITT=1")
+    SET(AITT_INCLUDE_DIR  ${PREFIX}/include/aitt)
+ENDIF()
+
 ADD_SUBDIRECTORY(src)
 
 IF (ENABLE_TEST)
index 5ee202ed05d8841ad127c3e590308afaa9d39626..82a51bcaff50be9269077713e5926323542da3e3 100644 (file)
@@ -69,6 +69,7 @@ typedef enum {
   NNS_EDGE_CONNECT_TYPE_UDP,
   NNS_EDGE_CONNECT_TYPE_MQTT,
   NNS_EDGE_CONNECT_TYPE_HYBRID,
+  NNS_EDGE_CONNECT_TYPE_AITT,
 
   NNS_EDGE_CONNECT_TYPE_UNKNOWN
 } nns_edge_connect_type_e;
index 9adaf625437974acb1fef68913178048a8b660e8..3d5ae48550ecee885677eba26f1c719d69531a8b 100644 (file)
@@ -2,6 +2,7 @@
 
 # Default features for Tizen releases
 %define                mqtt_support 1
+%define     aitt_support 1
 
 %bcond_with tizen
 
@@ -29,6 +30,10 @@ BuildRequires:  pkgconfig(dlog)
 BuildRequires:  pkgconfig(paho-mqtt-c)
 %endif
 
+%if 0%{?aitt_support}
+BuildRequires:  aitt-devel
+%endif
+
 %if 0%{?unit_test}
 BuildRequires:  gtest-devel
 BuildRequires:  procps
index 54ac18f9b2bd602eff4ae9f53dc49a2174a37ab1..76b6bdf14425ccbe324d9b4c51d93a8853e3a585 100644 (file)
@@ -11,6 +11,10 @@ IF(ENABLE_PAHO_MQTT)
     SET(NNS_EDGE_SRCS ${NNS_EDGE_SRCS} ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-mqtt.c)
 ENDIF()
 
+IF(ENABLE_AITT)
+    SET(NNS_EDGE_SRCS ${NNS_EDGE_SRCS} ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-aitt.c)
+ENDIF()
+
 ADD_LIBRARY(${NNS_EDGE_LIB_NAME} SHARED ${NNS_EDGE_SRCS})
 SET_TARGET_PROPERTIES(${NNS_EDGE_LIB_NAME} PROPERTIES VERSION ${SO_VERSION})
 TARGET_INCLUDE_DIRECTORIES(${NNS_EDGE_LIB_NAME} PRIVATE ${INCLUDE_DIR} ${EDGE_REQUIRE_PKGS_INCLUDE_DIRS})
@@ -18,6 +22,10 @@ TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${EDGE_REQUIRE_PKGS_LDFLAGS})
 IF(ENABLE_PAHO_MQTT)
     TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${PAHO_MQTT_LIB})
 ENDIF()
+IF(ENABLE_AITT)
+    TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${AITT_LIB})
+    TARGET_INCLUDE_DIRECTORIES(${NNS_EDGE_LIB_NAME} PUBLIC ${AITT_INCLUDE_DIR})
+ENDIF()
 
 INSTALL (TARGETS ${NNS_EDGE_LIB_NAME} DESTINATION ${CMAKE_INSTALL_LIBDIR})
 INSTALL (FILES ${INCLUDE_DIR}/nnstreamer-edge.h DESTINATION ${INCLUDE_INSTALL_DIR})
index 3012406142db07928ba310925d333118dd677e38..64fc986717b24509f115ec3273e70a80c3dea5fe 100644 (file)
  * @bug    No known bugs except for NYI items
  */
 
+
+#include <stdbool.h>
 #include <aitt_c.h>
 #include "nnstreamer-edge-common.h"
+#include "nnstreamer-edge-internal.h"
+#include "nnstreamer-edge-util.h"
+#include "nnstreamer-edge-log.h"
 
 typedef void *nns_edge_aitt_h;
 typedef void *nns_edge_aitt_msg_h;
@@ -23,11 +28,224 @@ typedef void *nns_edge_aitt_sub_h;
  */
 typedef struct
 {
-  nns_edge_connect_type_e connect_type;
-  struct
-  {
-    nns_edge_aitt_h aitt_h;
-    nns_edge_aitt_msg_h msg_h;
-    nns_edge_aitt_sub_h sub_h;
-  };
-} nns_edge_handle_s;
+  nns_edge_aitt_h aitt_h;
+  nns_edge_aitt_msg_h msg_h;
+  nns_edge_aitt_sub_h sub_h;
+} nns_edge_aitt_handle_s;
+
+
+/**
+ * @brief Create AITT handle and connect to AITT.
+ * @note This is internal function for AITT. You should call this with edge-handle lock.
+ */
+int
+nns_edge_aitt_connect (nns_edge_h edge_h)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_aitt_handle_s *ah;
+  aitt_option_h option;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_logd ("Create AITT instance: My address: %s:%d", eh->host, eh->port);
+
+  ah = (nns_edge_aitt_handle_s *) calloc (1, sizeof (nns_edge_aitt_handle_s));
+  if (!ah) {
+    nns_edge_loge ("Failed to allocate memory for AITT handle.");
+    return NNS_EDGE_ERROR_OUT_OF_MEMORY;
+  }
+
+  option = aitt_option_new ();
+  aitt_option_set (option, AITT_OPT_MY_IP, eh->host);
+
+  ah->aitt_h = aitt_new (eh->id, option);
+  if (!ah->aitt_h) {
+    nns_edge_loge ("Failed to create AITT handle. AITT internal error.");
+    SAFE_FREE (ah);
+    return NNS_EDGE_ERROR_UNKNOWN;
+  }
+
+  if (AITT_ERROR_NONE != aitt_connect (ah->aitt_h, eh->dest_host,
+          eh->dest_port)) {
+    nns_edge_loge ("Failed to connect to AITT. IP:port = %s:%d", eh->dest_host,
+        eh->dest_port);
+    aitt_destroy (ah->aitt_h);
+    SAFE_FREE (ah);
+    return NNS_EDGE_ERROR_UNKNOWN;
+  }
+  eh->broker_h = ah;
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Release the AITT handle.
+ * @note This is internal function for AITT. You should call this with edge-handle lock.
+ */
+int
+nns_edge_aitt_close (nns_edge_h edge_h)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_aitt_handle_s *ah;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ah = (nns_edge_aitt_handle_s *) eh->broker_h;
+  if (AITT_ERROR_NONE != aitt_disconnect (ah->aitt_h)) {
+    nns_edge_loge ("Failed to close AITT handle.");
+    return NNS_EDGE_ERROR_UNKNOWN;
+  }
+  aitt_destroy (ah->aitt_h);
+  ah->aitt_h = NULL;
+  SAFE_FREE (eh->broker_h);
+  eh->broker_h = NULL;
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Publish raw data.
+ * @note This is internal function forAITT. You should call this with edge-handle lock.
+ */
+int
+nns_edge_aitt_publish (nns_edge_h edge_h, const void *data, const int length)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_aitt_handle_s *ah;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!data || length <= 0) {
+    nns_edge_loge ("Invalid param, given data is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ah = (nns_edge_aitt_handle_s *) eh->broker_h;
+
+  if (AITT_ERROR_NONE != aitt_publish (ah->aitt_h, eh->topic, data, length)) {
+    nns_edge_loge ("Failed to publish the message. topic: %s", eh->topic);
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Internal function to invoke event callback.
+ * @note This function should be called with handle lock.
+ */
+static int
+_nns_edge_invoke_event_cb (nns_edge_handle_s * eh, nns_edge_event_e event,
+    void *data, size_t data_len, nns_edge_data_destroy_cb destroy_cb)
+{
+  nns_edge_event_h event_h;
+  int ret;
+
+  /* If event callback is null, return ok. */
+  if (!eh->event_cb) {
+    nns_edge_logw ("AITT: The event callback is null, do nothing!");
+    return NNS_EDGE_ERROR_NONE;
+  }
+
+  ret = nns_edge_event_create (event, &event_h);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to create new edge event.");
+    return ret;
+  }
+
+  if (data) {
+    ret = nns_edge_event_set_data (event_h, data, data_len, destroy_cb);
+    if (ret != NNS_EDGE_ERROR_NONE) {
+      nns_edge_loge ("Failed to handle edge event due to invalid event data.");
+      goto error;
+    }
+  }
+
+  ret = eh->event_cb (event_h, eh->user_data);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("The event callback returns error.");
+  }
+
+error:
+  nns_edge_event_destroy (event_h);
+  return ret;
+}
+
+/**
+ * @brief Callback function to be called when a message is arrived.
+ */
+static void
+aitt_cb_message_arrived (aitt_msg_h msg_handle, const void *msg,
+    size_t msg_len, void *user_data)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_data_h data_h;
+
+  eh = (nns_edge_handle_s *) user_data;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return;
+  }
+
+  if (nns_edge_data_create (&data_h) != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to create data handle in msg thread.");
+    return;
+  }
+
+  /** @todo support multi memory chunk. Deserialize the received data. */
+  nns_edge_data_add (data_h, (void *) msg, msg_len, NULL);
+
+  _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_NEW_DATA_RECEIVED, data_h,
+      sizeof (nns_edge_data_h), NULL);
+
+  nns_edge_data_destroy (data_h);
+}
+
+/**
+ * @brief Subscribe a topic.
+ * @note This is internal function for AITT. You should call this with edge-handle lock.
+ */
+int
+nns_edge_aitt_subscribe (nns_edge_h edge_h)
+{
+  nns_edge_handle_s *eh;
+  nns_edge_aitt_handle_s *ah;
+
+  eh = (nns_edge_handle_s *) edge_h;
+
+  if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!eh->topic) {
+    nns_edge_loge ("Invalid param, topic cannot be NULL for AITT connection. "
+        "Please set topic using nns_edge_set_info()");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ah = (nns_edge_aitt_handle_s *) eh->broker_h;
+
+  if (AITT_ERROR_NONE != aitt_subscribe (ah->aitt_h, eh->topic,
+          aitt_cb_message_arrived, eh, &ah->msg_h)) {
+    nns_edge_loge ("Failed to subscribe the topoc: %s", eh->topic);
+    return NNS_EDGE_ERROR_UNKNOWN;
+  }
+  return NNS_EDGE_ERROR_NONE;
+}
index e9b31cc2b4756073ba9eb356d8686ba9932ceee8..12e9013f0b34db64e25ee4fda74ef3c0de76a628 100644 (file)
@@ -314,6 +314,35 @@ _nns_edge_cmd_send (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd)
   return NNS_EDGE_ERROR_NONE;
 }
 
+/**
+ * @brief Send edge command to connected device using AITT.
+ */
+static int
+_nns_edge_cmd_send_aitt (nns_edge_handle_s * eh, nns_edge_data_h * data_h)
+{
+  unsigned int n, num_mem;
+  int ret;
+  void *raw_data;
+  size_t size;
+
+  if (!eh) {
+    nns_edge_loge ("Failed to send command, edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ret = nns_edge_data_get_count (data_h, &num_mem);
+  /** @todo Serialize the multi memory data. Now supporting one memory block */
+  for (n = 0; n < num_mem; n++) {
+    nns_edge_data_get (data_h, n, &raw_data, &size);
+    if (NNS_EDGE_ERROR_NONE != nns_edge_aitt_publish (eh, raw_data, size)) {
+      nns_edge_loge ("Failed to send %uth memory to socket.", n);
+      return NNS_EDGE_ERROR_IO;
+    }
+  }
+
+  return ret;
+}
+
 /**
  * @brief Receive edge command from connected device.
  * @note Before calling this function, you should initialize edge-cmd by using _nns_edge_cmd_init().
@@ -888,33 +917,42 @@ _nns_edge_send_thread (void *thread_data)
 
   while (nns_edge_queue_wait_pop (eh->send_queue, 0U, &data_h)) {
     /* Send data to destination */
-    ret = nns_edge_data_get_info (data_h, "client_id", &val);
-    if (ret != NNS_EDGE_ERROR_NONE) {
-      nns_edge_logd
-          ("Cannot find client ID in edge data. Send to all connected nodes.");
-
-      conn_data = (nns_edge_conn_data_s *) eh->connections;
-      while (conn_data) {
-        /** @todo update code for each connect type */
-        conn = conn_data->sink_conn;
-        _nns_edge_transfer_data (conn, data_h, conn_data->id);
-
-        conn_data = conn_data->next;
-      }
-    } else {
-      client_id = (int64_t) strtoll (val, NULL, 10);
-      SAFE_FREE (val);
-
-      conn_data = _nns_edge_get_connection (eh, client_id);
-      if (conn_data) {
-        conn = conn_data->sink_conn;
-        _nns_edge_transfer_data (conn, data_h, client_id);
-      } else {
-        nns_edge_loge
-            ("Cannot find connection, invalid client ID or connection closed.");
-      }
+    switch (eh->connect_type) {
+      case NNS_EDGE_CONNECT_TYPE_TCP:
+      case NNS_EDGE_CONNECT_TYPE_HYBRID:
+        ret = nns_edge_data_get_info (data_h, "client_id", &val);
+        if (ret != NNS_EDGE_ERROR_NONE) {
+          nns_edge_logd
+              ("Cannot find client ID in edge data. Send to all connected nodes.");
+
+          conn_data = (nns_edge_conn_data_s *) eh->connections;
+          while (conn_data) {
+            /** @todo update code for each connect type */
+            conn = conn_data->sink_conn;
+            _nns_edge_transfer_data (conn, data_h, conn_data->id);
+
+            conn_data = conn_data->next;
+          }
+        } else {
+          client_id = (int64_t) strtoll (val, NULL, 10);
+          SAFE_FREE (val);
+
+          conn_data = _nns_edge_get_connection (eh, client_id);
+          if (conn_data) {
+            conn = conn_data->sink_conn;
+            _nns_edge_transfer_data (conn, data_h, client_id);
+          } else {
+            nns_edge_loge
+                ("Cannot find connection, invalid client ID or connection closed.");
+          }
+        }
+        break;
+      case NNS_EDGE_CONNECT_TYPE_AITT:
+        _nns_edge_cmd_send_aitt (eh, data_h);
+        break;
+      default:
+        break;
     }
-
     nns_edge_data_destroy (data_h);
   }
 
@@ -1238,7 +1276,7 @@ nns_edge_start (nns_edge_h edge_h)
       if (NNS_EDGE_ERROR_NONE != ret) {
         nns_edge_loge
             ("Failed to start nnstreamer-edge. Connection failure to broker.");
-        goto error;
+        goto done;
       }
 
       msg = nns_edge_get_host_string (eh->host, eh->port);
@@ -1248,21 +1286,30 @@ nns_edge_start (nns_edge_h edge_h)
 
       if (NNS_EDGE_ERROR_NONE != ret) {
         nns_edge_loge ("Failed to publish the meesage to broker.");
-        goto error;
+        goto done;
+      }
+    } else if (NNS_EDGE_CONNECT_TYPE_AITT == eh->connect_type) {
+      ret = nns_edge_aitt_connect (eh);
+      if (NNS_EDGE_ERROR_NONE != ret) {
+        nns_edge_loge ("Failed to connect to AITT broker.");
+        goto done;
       }
     }
   }
 
   /* Start listener thread to accept socket. */
-  if (!_nns_edge_create_socket_listener (eh)) {
-    nns_edge_loge ("Failed to create socket listener.");
-    ret = NNS_EDGE_ERROR_IO;
-    goto error;
+  if (NNS_EDGE_CONNECT_TYPE_TCP == eh->connect_type ||
+      NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
+    if (!_nns_edge_create_socket_listener (eh)) {
+      nns_edge_loge ("Failed to create socket listener.");
+      ret = NNS_EDGE_ERROR_IO;
+      goto done;
+    }
   }
 
   ret = _nns_edge_create_send_thread (eh);
 
-error:
+done:
   nns_edge_unlock (eh);
   return ret;
 }
@@ -1289,10 +1336,19 @@ nns_edge_release_handle (nns_edge_h edge_h)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  if (nns_edge_mqtt_is_connected (eh)) {
-    if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh)) {
-      nns_edge_logw ("Failed to close mqtt connection.");
-    }
+  switch (eh->connect_type) {
+    case NNS_EDGE_CONNECT_TYPE_HYBRID:
+      if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh)) {
+        nns_edge_logw ("Failed to close mqtt connection.");
+      }
+      break;
+    case NNS_EDGE_CONNECT_TYPE_AITT:
+      if (NNS_EDGE_ERROR_NONE != nns_edge_aitt_close (eh)) {
+        nns_edge_logw ("Failed to close AITT connection.");
+      }
+      break;
+    default:
+      break;
   }
 
   eh->magic = NNS_EDGE_MAGIC_DEAD;
@@ -1459,6 +1515,20 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port)
         break;
       }
     }
+  } else if (NNS_EDGE_CONNECT_TYPE_AITT == eh->connect_type) {
+    ret = nns_edge_aitt_connect (eh);
+    if (ret != NNS_EDGE_ERROR_NONE) {
+      nns_edge_loge ("Failed to connect to aitt broker. %s:%d", dest_host,
+          dest_port);
+      goto done;
+    }
+    ret = nns_edge_aitt_subscribe (eh);
+    if (NNS_EDGE_ERROR_NONE != ret) {
+      nns_edge_loge ("Failed to subscribe the topic using AITT: %s", eh->topic);
+      SAFE_FREE (eh->broker_h);
+      eh->broker_h = NULL;
+      goto done;
+    }
   } else {
     ret = _nns_edge_connect_to (eh, eh->client_id, dest_host, dest_port);
     if (ret != NNS_EDGE_ERROR_NONE) {
@@ -1466,6 +1536,7 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port)
     }
   }
 
+done:
   nns_edge_unlock (eh);
   return ret;
 }
index 0e4473137d00bffec595ffaa6256410a824561d8..c24142e22487fd7a8fbecc478e824f0a14580741 100644 (file)
@@ -114,6 +114,37 @@ int nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg);
 #define nns_edge_mqtt_get_message(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
 #endif
 
+#if defined(ENABLE_AITT)
+/**
+ * @brief Create AITT handle and connect to AITT.
+ * @note This is internal function for AITT. You should call this with edge-handle lock.
+ */
+int nns_edge_aitt_connect (nns_edge_h edge_h);
+
+/**
+ * @brief Release the AITT handle.
+ * @note This is internal function for AITT. You should call this with edge-handle lock.
+ */
+int nns_edge_aitt_close (nns_edge_h edge_h);
+
+/**
+ * @brief Publish raw data.
+ * @note This is internal function forAITT. You should call this with edge-handle lock.
+ */
+int nns_edge_aitt_publish (nns_edge_h edge_h, const void *data, const int length);
+
+/**
+ * @brief Subscribe a topic.
+ * @note This is internal function for AITT. You should call this with edge-handle lock.
+ */
+int nns_edge_aitt_subscribe (nns_edge_h edge_h);
+#else
+#define nns_edge_aitt_connect(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_aitt_close(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_aitt_publish(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_aitt_subscribe(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#endif
+
 #ifdef __cplusplus
 }
 #endif /* __cplusplus */