From 438f8ba8013305b357a016d048264d58058637ea Mon Sep 17 00:00:00 2001 From: gichan Date: Mon, 29 Aug 2022 16:24:56 +0900 Subject: [PATCH] [AITT] Support AITT as edge connection type Support AITT as edge connection type. Signed-off-by: gichan --- CMakeLists.txt | 13 + include/nnstreamer-edge.h | 1 + packaging/nnstreamer-edge.spec | 5 + src/CMakeLists.txt | 8 + src/libnnstreamer-edge/nnstreamer-edge-aitt.c | 234 +++++++++++++++++- .../nnstreamer-edge-internal.c | 145 ++++++++--- .../nnstreamer-edge-internal.h | 31 +++ 7 files changed, 392 insertions(+), 45 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8f7325a..f5d6bb7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -61,6 +61,19 @@ IF(ENABLE_PAHO_MQTT) SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_MQTT=1") ENDIF() +# AITT Library +SET(ENABLE_AITT OFF) +FIND_LIBRARY(AITT_LIB NAMES aitt) +IF(NOT AITT_LIB) + MESSAGE("Cannot find AITT library.") +ELSE() + MESSAGE("Found AITT library.") + SET(ENABLE_AITT ON) + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_AITT=1") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_AITT=1") + SET(AITT_INCLUDE_DIR ${PREFIX}/include/aitt) +ENDIF() + ADD_SUBDIRECTORY(src) IF (ENABLE_TEST) diff --git a/include/nnstreamer-edge.h b/include/nnstreamer-edge.h index 5ee202e..82a51bc 100644 --- a/include/nnstreamer-edge.h +++ b/include/nnstreamer-edge.h @@ -69,6 +69,7 @@ typedef enum { NNS_EDGE_CONNECT_TYPE_UDP, NNS_EDGE_CONNECT_TYPE_MQTT, NNS_EDGE_CONNECT_TYPE_HYBRID, + NNS_EDGE_CONNECT_TYPE_AITT, NNS_EDGE_CONNECT_TYPE_UNKNOWN } nns_edge_connect_type_e; diff --git a/packaging/nnstreamer-edge.spec b/packaging/nnstreamer-edge.spec index 9adaf62..3d5ae48 100644 --- a/packaging/nnstreamer-edge.spec +++ b/packaging/nnstreamer-edge.spec @@ -2,6 +2,7 @@ # Default features for Tizen releases %define mqtt_support 1 +%define aitt_support 1 %bcond_with tizen @@ -29,6 +30,10 @@ BuildRequires: pkgconfig(dlog) BuildRequires: pkgconfig(paho-mqtt-c) %endif +%if 0%{?aitt_support} +BuildRequires: aitt-devel +%endif + %if 0%{?unit_test} BuildRequires: gtest-devel BuildRequires: procps diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 54ac18f..76b6bdf 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -11,6 +11,10 @@ IF(ENABLE_PAHO_MQTT) SET(NNS_EDGE_SRCS ${NNS_EDGE_SRCS} ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-mqtt.c) ENDIF() +IF(ENABLE_AITT) + SET(NNS_EDGE_SRCS ${NNS_EDGE_SRCS} ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-aitt.c) +ENDIF() + ADD_LIBRARY(${NNS_EDGE_LIB_NAME} SHARED ${NNS_EDGE_SRCS}) SET_TARGET_PROPERTIES(${NNS_EDGE_LIB_NAME} PROPERTIES VERSION ${SO_VERSION}) TARGET_INCLUDE_DIRECTORIES(${NNS_EDGE_LIB_NAME} PRIVATE ${INCLUDE_DIR} ${EDGE_REQUIRE_PKGS_INCLUDE_DIRS}) @@ -18,6 +22,10 @@ TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${EDGE_REQUIRE_PKGS_LDFLAGS}) IF(ENABLE_PAHO_MQTT) TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${PAHO_MQTT_LIB}) ENDIF() +IF(ENABLE_AITT) + TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${AITT_LIB}) + TARGET_INCLUDE_DIRECTORIES(${NNS_EDGE_LIB_NAME} PUBLIC ${AITT_INCLUDE_DIR}) +ENDIF() INSTALL (TARGETS ${NNS_EDGE_LIB_NAME} DESTINATION ${CMAKE_INSTALL_LIBDIR}) INSTALL (FILES ${INCLUDE_DIR}/nnstreamer-edge.h DESTINATION ${INCLUDE_INSTALL_DIR}) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-aitt.c b/src/libnnstreamer-edge/nnstreamer-edge-aitt.c index 3012406..64fc986 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-aitt.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-aitt.c @@ -10,8 +10,13 @@ * @bug No known bugs except for NYI items */ + +#include #include #include "nnstreamer-edge-common.h" +#include "nnstreamer-edge-internal.h" +#include "nnstreamer-edge-util.h" +#include "nnstreamer-edge-log.h" typedef void *nns_edge_aitt_h; typedef void *nns_edge_aitt_msg_h; @@ -23,11 +28,224 @@ typedef void *nns_edge_aitt_sub_h; */ typedef struct { - nns_edge_connect_type_e connect_type; - struct - { - nns_edge_aitt_h aitt_h; - nns_edge_aitt_msg_h msg_h; - nns_edge_aitt_sub_h sub_h; - }; -} nns_edge_handle_s; + nns_edge_aitt_h aitt_h; + nns_edge_aitt_msg_h msg_h; + nns_edge_aitt_sub_h sub_h; +} nns_edge_aitt_handle_s; + + +/** + * @brief Create AITT handle and connect to AITT. + * @note This is internal function for AITT. You should call this with edge-handle lock. + */ +int +nns_edge_aitt_connect (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + nns_edge_aitt_handle_s *ah; + aitt_option_h option; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_logd ("Create AITT instance: My address: %s:%d", eh->host, eh->port); + + ah = (nns_edge_aitt_handle_s *) calloc (1, sizeof (nns_edge_aitt_handle_s)); + if (!ah) { + nns_edge_loge ("Failed to allocate memory for AITT handle."); + return NNS_EDGE_ERROR_OUT_OF_MEMORY; + } + + option = aitt_option_new (); + aitt_option_set (option, AITT_OPT_MY_IP, eh->host); + + ah->aitt_h = aitt_new (eh->id, option); + if (!ah->aitt_h) { + nns_edge_loge ("Failed to create AITT handle. AITT internal error."); + SAFE_FREE (ah); + return NNS_EDGE_ERROR_UNKNOWN; + } + + if (AITT_ERROR_NONE != aitt_connect (ah->aitt_h, eh->dest_host, + eh->dest_port)) { + nns_edge_loge ("Failed to connect to AITT. IP:port = %s:%d", eh->dest_host, + eh->dest_port); + aitt_destroy (ah->aitt_h); + SAFE_FREE (ah); + return NNS_EDGE_ERROR_UNKNOWN; + } + eh->broker_h = ah; + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Release the AITT handle. + * @note This is internal function for AITT. You should call this with edge-handle lock. + */ +int +nns_edge_aitt_close (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + nns_edge_aitt_handle_s *ah; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ah = (nns_edge_aitt_handle_s *) eh->broker_h; + if (AITT_ERROR_NONE != aitt_disconnect (ah->aitt_h)) { + nns_edge_loge ("Failed to close AITT handle."); + return NNS_EDGE_ERROR_UNKNOWN; + } + aitt_destroy (ah->aitt_h); + ah->aitt_h = NULL; + SAFE_FREE (eh->broker_h); + eh->broker_h = NULL; + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Publish raw data. + * @note This is internal function forAITT. You should call this with edge-handle lock. + */ +int +nns_edge_aitt_publish (nns_edge_h edge_h, const void *data, const int length) +{ + nns_edge_handle_s *eh; + nns_edge_aitt_handle_s *ah; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!data || length <= 0) { + nns_edge_loge ("Invalid param, given data is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ah = (nns_edge_aitt_handle_s *) eh->broker_h; + + if (AITT_ERROR_NONE != aitt_publish (ah->aitt_h, eh->topic, data, length)) { + nns_edge_loge ("Failed to publish the message. topic: %s", eh->topic); + return NNS_EDGE_ERROR_IO; + } + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Internal function to invoke event callback. + * @note This function should be called with handle lock. + */ +static int +_nns_edge_invoke_event_cb (nns_edge_handle_s * eh, nns_edge_event_e event, + void *data, size_t data_len, nns_edge_data_destroy_cb destroy_cb) +{ + nns_edge_event_h event_h; + int ret; + + /* If event callback is null, return ok. */ + if (!eh->event_cb) { + nns_edge_logw ("AITT: The event callback is null, do nothing!"); + return NNS_EDGE_ERROR_NONE; + } + + ret = nns_edge_event_create (event, &event_h); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to create new edge event."); + return ret; + } + + if (data) { + ret = nns_edge_event_set_data (event_h, data, data_len, destroy_cb); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to handle edge event due to invalid event data."); + goto error; + } + } + + ret = eh->event_cb (event_h, eh->user_data); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("The event callback returns error."); + } + +error: + nns_edge_event_destroy (event_h); + return ret; +} + +/** + * @brief Callback function to be called when a message is arrived. + */ +static void +aitt_cb_message_arrived (aitt_msg_h msg_handle, const void *msg, + size_t msg_len, void *user_data) +{ + nns_edge_handle_s *eh; + nns_edge_data_h data_h; + + eh = (nns_edge_handle_s *) user_data; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return; + } + + if (nns_edge_data_create (&data_h) != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to create data handle in msg thread."); + return; + } + + /** @todo support multi memory chunk. Deserialize the received data. */ + nns_edge_data_add (data_h, (void *) msg, msg_len, NULL); + + _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_NEW_DATA_RECEIVED, data_h, + sizeof (nns_edge_data_h), NULL); + + nns_edge_data_destroy (data_h); +} + +/** + * @brief Subscribe a topic. + * @note This is internal function for AITT. You should call this with edge-handle lock. + */ +int +nns_edge_aitt_subscribe (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + nns_edge_aitt_handle_s *ah; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh) || !eh->broker_h) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!eh->topic) { + nns_edge_loge ("Invalid param, topic cannot be NULL for AITT connection. " + "Please set topic using nns_edge_set_info()"); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ah = (nns_edge_aitt_handle_s *) eh->broker_h; + + if (AITT_ERROR_NONE != aitt_subscribe (ah->aitt_h, eh->topic, + aitt_cb_message_arrived, eh, &ah->msg_h)) { + nns_edge_loge ("Failed to subscribe the topoc: %s", eh->topic); + return NNS_EDGE_ERROR_UNKNOWN; + } + return NNS_EDGE_ERROR_NONE; +} diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c index e9b31cc..12e9013 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.c @@ -314,6 +314,35 @@ _nns_edge_cmd_send (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd) return NNS_EDGE_ERROR_NONE; } +/** + * @brief Send edge command to connected device using AITT. + */ +static int +_nns_edge_cmd_send_aitt (nns_edge_handle_s * eh, nns_edge_data_h * data_h) +{ + unsigned int n, num_mem; + int ret; + void *raw_data; + size_t size; + + if (!eh) { + nns_edge_loge ("Failed to send command, edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ret = nns_edge_data_get_count (data_h, &num_mem); + /** @todo Serialize the multi memory data. Now supporting one memory block */ + for (n = 0; n < num_mem; n++) { + nns_edge_data_get (data_h, n, &raw_data, &size); + if (NNS_EDGE_ERROR_NONE != nns_edge_aitt_publish (eh, raw_data, size)) { + nns_edge_loge ("Failed to send %uth memory to socket.", n); + return NNS_EDGE_ERROR_IO; + } + } + + return ret; +} + /** * @brief Receive edge command from connected device. * @note Before calling this function, you should initialize edge-cmd by using _nns_edge_cmd_init(). @@ -888,33 +917,42 @@ _nns_edge_send_thread (void *thread_data) while (nns_edge_queue_wait_pop (eh->send_queue, 0U, &data_h)) { /* Send data to destination */ - ret = nns_edge_data_get_info (data_h, "client_id", &val); - if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_logd - ("Cannot find client ID in edge data. Send to all connected nodes."); - - conn_data = (nns_edge_conn_data_s *) eh->connections; - while (conn_data) { - /** @todo update code for each connect type */ - conn = conn_data->sink_conn; - _nns_edge_transfer_data (conn, data_h, conn_data->id); - - conn_data = conn_data->next; - } - } else { - client_id = (int64_t) strtoll (val, NULL, 10); - SAFE_FREE (val); - - conn_data = _nns_edge_get_connection (eh, client_id); - if (conn_data) { - conn = conn_data->sink_conn; - _nns_edge_transfer_data (conn, data_h, client_id); - } else { - nns_edge_loge - ("Cannot find connection, invalid client ID or connection closed."); - } + switch (eh->connect_type) { + case NNS_EDGE_CONNECT_TYPE_TCP: + case NNS_EDGE_CONNECT_TYPE_HYBRID: + ret = nns_edge_data_get_info (data_h, "client_id", &val); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_logd + ("Cannot find client ID in edge data. Send to all connected nodes."); + + conn_data = (nns_edge_conn_data_s *) eh->connections; + while (conn_data) { + /** @todo update code for each connect type */ + conn = conn_data->sink_conn; + _nns_edge_transfer_data (conn, data_h, conn_data->id); + + conn_data = conn_data->next; + } + } else { + client_id = (int64_t) strtoll (val, NULL, 10); + SAFE_FREE (val); + + conn_data = _nns_edge_get_connection (eh, client_id); + if (conn_data) { + conn = conn_data->sink_conn; + _nns_edge_transfer_data (conn, data_h, client_id); + } else { + nns_edge_loge + ("Cannot find connection, invalid client ID or connection closed."); + } + } + break; + case NNS_EDGE_CONNECT_TYPE_AITT: + _nns_edge_cmd_send_aitt (eh, data_h); + break; + default: + break; } - nns_edge_data_destroy (data_h); } @@ -1238,7 +1276,7 @@ nns_edge_start (nns_edge_h edge_h) if (NNS_EDGE_ERROR_NONE != ret) { nns_edge_loge ("Failed to start nnstreamer-edge. Connection failure to broker."); - goto error; + goto done; } msg = nns_edge_get_host_string (eh->host, eh->port); @@ -1248,21 +1286,30 @@ nns_edge_start (nns_edge_h edge_h) if (NNS_EDGE_ERROR_NONE != ret) { nns_edge_loge ("Failed to publish the meesage to broker."); - goto error; + goto done; + } + } else if (NNS_EDGE_CONNECT_TYPE_AITT == eh->connect_type) { + ret = nns_edge_aitt_connect (eh); + if (NNS_EDGE_ERROR_NONE != ret) { + nns_edge_loge ("Failed to connect to AITT broker."); + goto done; } } } /* Start listener thread to accept socket. */ - if (!_nns_edge_create_socket_listener (eh)) { - nns_edge_loge ("Failed to create socket listener."); - ret = NNS_EDGE_ERROR_IO; - goto error; + if (NNS_EDGE_CONNECT_TYPE_TCP == eh->connect_type || + NNS_EDGE_CONNECT_TYPE_HYBRID == eh->connect_type) { + if (!_nns_edge_create_socket_listener (eh)) { + nns_edge_loge ("Failed to create socket listener."); + ret = NNS_EDGE_ERROR_IO; + goto done; + } } ret = _nns_edge_create_send_thread (eh); -error: +done: nns_edge_unlock (eh); return ret; } @@ -1289,10 +1336,19 @@ nns_edge_release_handle (nns_edge_h edge_h) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - if (nns_edge_mqtt_is_connected (eh)) { - if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh)) { - nns_edge_logw ("Failed to close mqtt connection."); - } + switch (eh->connect_type) { + case NNS_EDGE_CONNECT_TYPE_HYBRID: + if (NNS_EDGE_ERROR_NONE != nns_edge_mqtt_close (eh)) { + nns_edge_logw ("Failed to close mqtt connection."); + } + break; + case NNS_EDGE_CONNECT_TYPE_AITT: + if (NNS_EDGE_ERROR_NONE != nns_edge_aitt_close (eh)) { + nns_edge_logw ("Failed to close AITT connection."); + } + break; + default: + break; } eh->magic = NNS_EDGE_MAGIC_DEAD; @@ -1459,6 +1515,20 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port) break; } } + } else if (NNS_EDGE_CONNECT_TYPE_AITT == eh->connect_type) { + ret = nns_edge_aitt_connect (eh); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to connect to aitt broker. %s:%d", dest_host, + dest_port); + goto done; + } + ret = nns_edge_aitt_subscribe (eh); + if (NNS_EDGE_ERROR_NONE != ret) { + nns_edge_loge ("Failed to subscribe the topic using AITT: %s", eh->topic); + SAFE_FREE (eh->broker_h); + eh->broker_h = NULL; + goto done; + } } else { ret = _nns_edge_connect_to (eh, eh->client_id, dest_host, dest_port); if (ret != NNS_EDGE_ERROR_NONE) { @@ -1466,6 +1536,7 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port) } } +done: nns_edge_unlock (eh); return ret; } diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.h b/src/libnnstreamer-edge/nnstreamer-edge-internal.h index 0e44731..c24142e 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.h +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.h @@ -114,6 +114,37 @@ int nns_edge_mqtt_get_message (nns_edge_h edge_h, char **msg); #define nns_edge_mqtt_get_message(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) #endif +#if defined(ENABLE_AITT) +/** + * @brief Create AITT handle and connect to AITT. + * @note This is internal function for AITT. You should call this with edge-handle lock. + */ +int nns_edge_aitt_connect (nns_edge_h edge_h); + +/** + * @brief Release the AITT handle. + * @note This is internal function for AITT. You should call this with edge-handle lock. + */ +int nns_edge_aitt_close (nns_edge_h edge_h); + +/** + * @brief Publish raw data. + * @note This is internal function forAITT. You should call this with edge-handle lock. + */ +int nns_edge_aitt_publish (nns_edge_h edge_h, const void *data, const int length); + +/** + * @brief Subscribe a topic. + * @note This is internal function for AITT. You should call this with edge-handle lock. + */ +int nns_edge_aitt_subscribe (nns_edge_h edge_h); +#else +#define nns_edge_aitt_connect(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#define nns_edge_aitt_close(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#define nns_edge_aitt_publish(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#define nns_edge_aitt_subscribe(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#endif + #ifdef __cplusplus } #endif /* __cplusplus */ -- 2.34.1