Support AITT as edge connection type.
Signed-off-by: gichan <gichan2.jang@samsung.com>
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_MQTT=1")
ENDIF()
+# AITT Library
+SET(ENABLE_AITT OFF)
+FIND_LIBRARY(AITT_LIB NAMES aitt)
+IF(NOT AITT_LIB)
+ MESSAGE("Cannot find AITT library.")
+ELSE()
+ MESSAGE("Found AITT library.")
+ SET(ENABLE_AITT ON)
+ SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_AITT=1")
+ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_AITT=1")
+ SET(AITT_INCLUDE_DIR ${PREFIX}/include/aitt)
+ENDIF()
+
ADD_SUBDIRECTORY(src)
IF (ENABLE_TEST)
NNS_EDGE_CONNECT_TYPE_UDP,
NNS_EDGE_CONNECT_TYPE_MQTT,
NNS_EDGE_CONNECT_TYPE_HYBRID,
+ NNS_EDGE_CONNECT_TYPE_AITT,
NNS_EDGE_CONNECT_TYPE_UNKNOWN
} nns_edge_connect_type_e;
# Default features for Tizen releases
%define mqtt_support 1
+%define aitt_support 1
%bcond_with tizen
BuildRequires: pkgconfig(paho-mqtt-c)
%endif
+%if 0%{?aitt_support}
+BuildRequires: aitt-devel
+%endif
+
%if 0%{?unit_test}
BuildRequires: gtest-devel
BuildRequires: procps
SET(NNS_EDGE_SRCS ${NNS_EDGE_SRCS} ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-mqtt.c)
ENDIF()
+IF(ENABLE_AITT)
+ SET(NNS_EDGE_SRCS ${NNS_EDGE_SRCS} ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-aitt.c)
+ENDIF()
+
ADD_LIBRARY(${NNS_EDGE_LIB_NAME} SHARED ${NNS_EDGE_SRCS})
SET_TARGET_PROPERTIES(${NNS_EDGE_LIB_NAME} PROPERTIES VERSION ${SO_VERSION})
TARGET_INCLUDE_DIRECTORIES(${NNS_EDGE_LIB_NAME} PRIVATE ${INCLUDE_DIR} ${EDGE_REQUIRE_PKGS_INCLUDE_DIRS})
IF(ENABLE_PAHO_MQTT)
TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${PAHO_MQTT_LIB})
ENDIF()
+IF(ENABLE_AITT)
+ TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${AITT_LIB})
+ TARGET_INCLUDE_DIRECTORIES(${NNS_EDGE_LIB_NAME} PUBLIC ${AITT_INCLUDE_DIR})
+ENDIF()
INSTALL (TARGETS ${NNS_EDGE_LIB_NAME} DESTINATION ${CMAKE_INSTALL_LIBDIR})
INSTALL (FILES ${INCLUDE_DIR}/nnstreamer-edge.h DESTINATION ${INCLUDE_INSTALL_DIR})
* @bug No known bugs except for NYI items
*/
+
+#include <stdbool.h>
#include <aitt_c.h>
#include "nnstreamer-edge-common.h"
+#include "nnstreamer-edge-internal.h"
+#include "nnstreamer-edge-util.h"
+#include "nnstreamer-edge-log.h"
typedef void *nns_edge_aitt_h;
typedef void *nns_edge_aitt_msg_h;
*/
typedef struct
{
- nns_edge_connect_type_e connect_type;
- struct
- {
- nns_edge_aitt_h aitt_h;
- nns_edge_aitt_msg_h msg_h;
- nns_edge_aitt_sub_h sub_h;
- };
-} nns_edge_handle_s;
+ nns_edge_aitt_h aitt_h;
+ nns_edge_aitt_msg_h msg_h;
+ nns_edge_aitt_sub_h sub_h;
+} nns_edge_aitt_handle_s;
+
+
+/**
+ * @brief Create AITT handle and connect to AITT.
+ * @note This is internal function for AITT. You should call this with edge-handle lock.
+ */
+int
+nns_edge_aitt_connect (nns_edge_h edge_h)
+{
+ nns_edge_handle_s *eh;
+ nns_edge_aitt_handle_s *ah;
+ aitt_option_h option;
+
+ eh = (nns_edge_handle_s *) edge_h;
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ nns_edge_logd ("Create AITT instance: My address: %s:%d", eh->host, eh->port);
+
+ ah = (nns_edge_aitt_handle_s *) calloc (1, sizeof (nns_edge_aitt_handle_s));
+ if (!ah) {
+ nns_edge_loge ("Failed to allocate memory for AITT handle.");
+ return NNS_EDGE_ERROR_OUT_OF_MEMORY;
+ }
+
+ option = aitt_option_new ();
+ aitt_option_set (option, AITT_OPT_MY_IP, eh->host);
+
+ ah->aitt_h = aitt_new (eh->id, option);
+ if (!ah->aitt_h) {
+ nns_edge_loge ("Failed to create AITT handle. AITT internal error.");
+ SAFE_FREE (ah);
+ return NNS_EDGE_ERROR_UNKNOWN;
+ }
+
+ if (AITT_ERROR_NONE != aitt_connect (ah->aitt_h, eh->dest_host,
+ eh->dest_port)) {
+ nns_edge_loge ("Failed to connect to AITT. IP:port = %s:%d", eh->dest_host,
+ eh->dest_port);
+ aitt_destroy (ah->aitt_h);
+ SAFE_FREE (ah);
+ return NNS_EDGE_ERROR_UNKNOWN;
+ }
+ eh->broker_h = ah;
+
+ return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Release the AITT handle.
+ * @note This is internal function for AITT. You should call this with edge-handle lock.
+ */
+int
+nns_edge_aitt_close (nns_edge_h edge_h)
+{
+ nns_edge_handle_s *eh;
+ nns_edge_aitt_handle_s *ah;
+
+ eh = (nns_edge_handle_s *) edge_h;
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ ah = (nns_edge_aitt_handle_s *) eh->broker_h;
+ if (AITT_ERROR_NONE != aitt_disconnect (ah->aitt_h)) {
+ nns_edge_loge ("Failed to close AITT handle.");
+ return NNS_EDGE_ERROR_UNKNOWN;
+ }
+ aitt_destroy (ah->aitt_h);
+ ah->aitt_h = NULL;
+ SAFE_FREE (eh->broker_h);
+ eh->broker_h = NULL;
+
+ return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Publish raw data.
+ * @note This is internal function forAITT. You should call this with edge-handle lock.
+ */
+int
+nns_edge_aitt_publish (nns_edge_h edge_h, const void *data, const int length)
+{
+ nns_edge_handle_s *eh;
+ nns_edge_aitt_handle_s *ah;
+
+ eh = (nns_edge_handle_s *) edge_h;
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ if (!data || length <= 0) {
+ nns_edge_loge ("Invalid param, given data is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ ah = (nns_edge_aitt_handle_s *) eh->broker_h;
+
+ if (AITT_ERROR_NONE != aitt_publish (ah->aitt_h, eh->topic, data, length)) {
+ nns_edge_loge ("Failed to publish the message. topic: %s", eh->topic);
+ return NNS_EDGE_ERROR_IO;
+ }
+
+ return NNS_EDGE_ERROR_NONE;
+}
+
+/**
+ * @brief Internal function to invoke event callback.
+ * @note This function should be called with handle lock.
+ */
+static int
+_nns_edge_invoke_event_cb (nns_edge_handle_s * eh, nns_edge_event_e event,
+ void *data, size_t data_len, nns_edge_data_destroy_cb destroy_cb)
+{
+ nns_edge_event_h event_h;
+ int ret;
+
+ /* If event callback is null, return ok. */
+ if (!eh->event_cb) {
+ nns_edge_logw ("AITT: The event callback is null, do nothing!");
+ return NNS_EDGE_ERROR_NONE;
+ }
+
+ ret = nns_edge_event_create (event, &event_h);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to create new edge event.");
+ return ret;
+ }
+
+ if (data) {
+ ret = nns_edge_event_set_data (event_h, data, data_len, destroy_cb);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to handle edge event due to invalid event data.");
+ goto error;
+ }
+ }
+
+ ret = eh->event_cb (event_h, eh->user_data);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("The event callback returns error.");
+ }
+
+error:
+ nns_edge_event_destroy (event_h);
+ return ret;
+}
+
+/**
+ * @brief Callback function to be called when a message is arrived.
+ */
+static void
+aitt_cb_message_arrived (aitt_msg_h msg_handle, const void *msg,
+ size_t msg_len, void *user_data)
+{
+ nns_edge_handle_s *eh;
+ nns_edge_data_h data_h;
+
+ eh = (nns_edge_handle_s *) user_data;
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ return;
+ }
+
+ if (nns_edge_data_create (&data_h) != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to create data handle in msg thread.");
+ return;
+ }
+
+ /** @todo support multi memory chunk. Deserialize the received data. */
+ nns_edge_data_add (data_h, (void *) msg, msg_len, NULL);
+
+ _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_NEW_DATA_RECEIVED, data_h,
+ sizeof (nns_edge_data_h), NULL);
+
+ nns_edge_data_destroy (data_h);
+}
+
+/**
+ * @brief Subscribe a topic.
+ * @note This is internal function for AITT. You should call this with edge-handle lock.
+ */
+int
+nns_edge_aitt_subscribe (nns_edge_h edge_h)
+{
+ nns_edge_handle_s *eh;
+ nns_edge_aitt_handle_s *ah;
+
+ eh = (nns_edge_handle_s *) edge_h;
+
+ if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) {
+ nns_edge_loge ("Invalid param, given edge handle is invalid.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ if (!eh->topic) {
+ nns_edge_loge ("Invalid param, topic cannot be NULL for AITT connection. "
+ "Please set topic using nns_edge_set_info()");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ ah = (nns_edge_aitt_handle_s *) eh->broker_h;
+
+ if (AITT_ERROR_NONE != aitt_subscribe (ah->aitt_h, eh->topic,
+ aitt_cb_message_arrived, eh, &ah->msg_h)) {
+ nns_edge_loge ("Failed to subscribe the topoc: %s", eh->topic);
+ return NNS_EDGE_ERROR_UNKNOWN;
+ }
+ return NNS_EDGE_ERROR_NONE;
+}
return NNS_EDGE_ERROR_NONE;
}
+/**
+ * @brief Send edge command to connected device using AITT.
+ */
+static int
+_nns_edge_cmd_send_aitt (nns_edge_handle_s * eh, nns_edge_data_h * data_h)
+{
+ unsigned int n, num_mem;
+ int ret;
+ void *raw_data;
+ size_t size;
+
+ if (!eh) {
+ nns_edge_loge ("Failed to send command, edge handle is null.");
+ return NNS_EDGE_ERROR_INVALID_PARAMETER;
+ }
+
+ ret = nns_edge_data_get_count (data_h, &num_mem);
+ /** @todo Serialize the multi memory data. Now supporting one memory block */
+ for (n = 0; n < num_mem; n++) {
+ nns_edge_data_get (data_h, n, &raw_data, &size);
+ if (NNS_EDGE_ERROR_NONE != nns_edge_aitt_publish (eh, raw_data, size)) {
+ nns_edge_loge ("Failed to send %uth memory to socket.", n);
+ return NNS_EDGE_ERROR_IO;
+ }
+ }
+
+ return ret;
+}
+
/**
* @brief Receive edge command from connected device.
* @note Before calling this function, you should initialize edge-cmd by using _nns_edge_cmd_init().
while (nns_edge_queue_wait_pop (eh->send_queue, 0U, &data_h)) {
/* Send data to destination */
- ret = nns_edge_data_get_info (data_h, "client_id", &val);
- if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_logd
- ("Cannot find client ID in edge data. Send to all connected nodes.");
-
- conn_data = (nns_edge_conn_data_s *) eh->connections;
- while (conn_data) {
- /** @todo update code for each connect type */
- conn = conn_data->sink_conn;
- _nns_edge_transfer_data (conn, data_h, conn_data->id);
-
- conn_data = conn_data->next;
- }
- } else {
- client_id = (int64_t) strtoll (val, NULL, 10);
- SAFE_FREE (val);
-
- conn_data = _nns_edge_get_connection (eh, client_id);
- if (conn_data) {
- conn = conn_data->sink_conn;
- _nns_edge_transfer_data (conn, data_h, client_id);
- } else {
- nns_edge_loge
- ("Cannot find connection, invalid client ID or connection closed.");
- }
+ switch (eh->connect_type) {
+ case NNS_EDGE_CONNECT_TYPE_TCP:
+ case NNS_EDGE_CONNECT_TYPE_HYBRID:
+ ret = nns_edge_data_get_info (data_h, "client_id", &val);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_logd
+ ("Cannot find client ID in edge data. Send to all connected nodes.");
+
+ conn_data = (nns_edge_conn_data_s *) eh->connections;
+ while (conn_data) {
+ /** @todo update code for each connect type */
+ conn = conn_data->sink_conn;
+ _nns_edge_transfer_data (conn, data_h, conn_data->id);
+
+ conn_data = conn_data->next;
+ }
+ } else {
+ client_id = (int64_t) strtoll (val, NULL, 10);
+ SAFE_FREE (val);
+
+ conn_data = _nns_edge_get_connection (eh, client_id);
+ if (conn_data) {
+ conn = conn_data->sink_conn;
+ _nns_edge_transfer_data (conn, data_h, client_id);
+ } else {
+ nns_edge_loge
+ ("Cannot find connection, invalid client ID or connection closed.");
+ }
+ }
+ break;
+ case NNS_EDGE_CONNECT_TYPE_AITT:
+ _nns_edge_cmd_send_aitt (eh, data_h);
+ break;
+ default:
+ break;
}
-
nns_edge_data_destroy (data_h);
}
if (NNS_EDGE_ERROR_NONE != ret) {
nns_edge_loge
("Failed to start nnstreamer-edge. Connection failure to broker.");
- goto error;
+ goto done;
}
msg = nns_edge_get_host_string (eh->host, eh->port);
if (NNS_EDGE_ERROR_NONE != ret) {
nns_edge_loge ("Failed to publish the meesage to broker.");
- goto error;
+ goto done;
+ }
+ } else if (NNS_EDGE_CONNECT_TYPE_AITT == eh->connect_type) {
+ ret = nns_edge_aitt_connect (eh);
+ if (NNS_EDGE_ERROR_NONE != ret) {
+ nns_edge_loge ("Failed to connect to AITT broker.");
+ goto done;
}
}
}
/* Start listener thread to accept socket. */
- if (!_nns_edge_create_socket_listener (eh)) {
- nns_edge_loge ("Failed to create socket listener.");
- ret = NNS_EDGE_ERROR_IO;
- goto error;
+ if (NNS_EDGE_CONNECT_TYPE_TCP == eh->connect_type ||
+ NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) {
+ if (!_nns_edge_create_socket_listener (eh)) {
+ nns_edge_loge ("Failed to create socket listener.");
+ ret = NNS_EDGE_ERROR_IO;
+ goto done;
+ }
}
ret = _nns_edge_create_send_thread (eh);
-error:
+done:
nns_edge_unlock (eh);
return ret;
}
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- if (nns_edge_mqtt_is_connected (eh)) {
- if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh)) {
- nns_edge_logw ("Failed to close mqtt connection.");
- }
+ switch (eh->connect_type) {
+ case NNS_EDGE_CONNECT_TYPE_HYBRID:
+ if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh)) {
+ nns_edge_logw ("Failed to close mqtt connection.");
+ }
+ break;
+ case NNS_EDGE_CONNECT_TYPE_AITT:
+ if (NNS_EDGE_ERROR_NONE != nns_edge_aitt_close (eh)) {
+ nns_edge_logw ("Failed to close AITT connection.");
+ }
+ break;
+ default:
+ break;
}
eh->magic = NNS_EDGE_MAGIC_DEAD;
break;
}
}
+ } else if (NNS_EDGE_CONNECT_TYPE_AITT == eh->connect_type) {
+ ret = nns_edge_aitt_connect (eh);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to connect to aitt broker. %s:%d", dest_host,
+ dest_port);
+ goto done;
+ }
+ ret = nns_edge_aitt_subscribe (eh);
+ if (NNS_EDGE_ERROR_NONE != ret) {
+ nns_edge_loge ("Failed to subscribe the topic using AITT: %s", eh->topic);
+ SAFE_FREE (eh->broker_h);
+ eh->broker_h = NULL;
+ goto done;
+ }
} else {
ret = _nns_edge_connect_to (eh, eh->client_id, dest_host, dest_port);
if (ret != NNS_EDGE_ERROR_NONE) {
}
}
+done:
nns_edge_unlock (eh);
return ret;
}
#define nns_edge_mqtt_get_message(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
#endif
+#if defined(ENABLE_AITT)
+/**
+ * @brief Create AITT handle and connect to AITT.
+ * @note This is internal function for AITT. You should call this with edge-handle lock.
+ */
+int nns_edge_aitt_connect (nns_edge_h edge_h);
+
+/**
+ * @brief Release the AITT handle.
+ * @note This is internal function for AITT. You should call this with edge-handle lock.
+ */
+int nns_edge_aitt_close (nns_edge_h edge_h);
+
+/**
+ * @brief Publish raw data.
+ * @note This is internal function forAITT. You should call this with edge-handle lock.
+ */
+int nns_edge_aitt_publish (nns_edge_h edge_h, const void *data, const int length);
+
+/**
+ * @brief Subscribe a topic.
+ * @note This is internal function for AITT. You should call this with edge-handle lock.
+ */
+int nns_edge_aitt_subscribe (nns_edge_h edge_h);
+#else
+#define nns_edge_aitt_connect(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_aitt_close(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_aitt_publish(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#define nns_edge_aitt_subscribe(...) (NNS_EDGE_ERROR_NOT_SUPPORTED)
+#endif
+
#ifdef __cplusplus
}
#endif /* __cplusplus */