From: Jaeyun Date: Mon, 27 Jun 2022 10:45:49 +0000 (+0900) Subject: [Edge] migrate nnstreamer-edge X-Git-Tag: submit/tizen/20220701.024407~1 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=5989cfd1a424cc9aee37f8885f4c23864c68e359;p=platform%2Fupstream%2Fnnstreamer-edge.git [Edge] migrate nnstreamer-edge Migrate nnstreamer-edge library from nnstreamer repo. Signed-off-by: Jaeyun --- diff --git a/CMakeLists.txt b/CMakeLists.txt index cd05304..604ea83 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,9 +8,15 @@ IF (NOT DEFINED VERSION) SET(VERSION 0.0.1) ENDIF() +# GoogleTest requires at least C++11 and match the nnstreamer cpp version. +SET(CMAKE_CXX_STANDARD 14) + # Set CFLAGS SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -Werror") SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${EXTRA_CFLAGS} -pthread -fPIE -fPIC -g") +SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_MQTT=1") +# TODO FIXME remove glib dependency +SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DGLIB_USE_G_MEMDUP2=1") IF (ENABLE_DEBUG) SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DDEBUG=1") @@ -25,21 +31,22 @@ SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--as-needed") SET(SO_VERSION ${VERSION}) SET(PREFIX ${CMAKE_INSTALL_PREFIX}) SET(EXEC_PREFIX ${PREFIX}/bin) +SET(LIB_INSTALL_DIR ${CMAKE_INSTALL_LIBDIR}) SET(INCLUDE_INSTALL_DIR "${PREFIX}/include") SET(SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src) SET(INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/include) +SET(NNS_EDGE_SRC_DIR ${SOURCE_DIR}/libnnstreamer-edge) SET(SENSOR_SRC_DIR ${SOURCE_DIR}/libsensor) # Check requires packages -SET(REQUIRES_LIST - paho-mqtt-c -) +# TODO FIXME remove glib dependency +SET(REQUIRES_LIST "paho-mqtt-c glib-2.0 gio-2.0") INCLUDE(FindPkgConfig) PKG_CHECK_MODULES(EDGE_REQUIRE_PKGS REQUIRED ${REQUIRES_LIST}) -FOREACH(flag ${EDGE__REQUIRE_PKGS_CFLAGS}) +FOREACH(flag ${EDGE_REQUIRE_PKGS_CFLAGS}) SET(EXTRA_CFLAGS "${EXTRA_CFLAGS} ${flag}") ENDFOREACH(flag) @@ -53,6 +60,10 @@ IF (ENABLE_TEST) ENDIF() # pkgconfig file +CONFIGURE_FILE(nnstreamer-edge.pc.in nnstreamer-edge.pc @ONLY) +INSTALL(FILES ${CMAKE_CURRENT_BINARY_DIR}/nnstreamer-edge.pc + DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig) + CONFIGURE_FILE(nnstreamer-edge-sensor.pc.in nnstreamer-edge-sensor.pc @ONLY) INSTALL(FILES ${CMAKE_CURRENT_BINARY_DIR}/nnstreamer-edge-sensor.pc DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig) diff --git a/debian/control b/debian/control index 249a39a..4ef5b8d 100644 --- a/debian/control +++ b/debian/control @@ -7,6 +7,20 @@ Build-Depends: gcc-9 | gcc-8 | gcc-7 | gcc-6 | gcc-5 (>=5.4), Standards-Version: 0.0.1 Homepage: https://github.com/nnstreamer/nnstreamer-edge +Package: nnstreamer-edge +Architecture: any +Multi-Arch: same +Depends: ${shlibs:Depends}, ${misc:Depends} +Description: nnstreamer-edge library + It provides interfaces to support among-device feature, publishing or subscribing raw data and specific messages. + +Package: nnstreamer-edge-dev +Architecture: any +Multi-Arch: same +Depends: nnstreamer-edge +Description: development package for nnstreamer-edge + It is a development package for nnstreamer-edge. + Package: nnstreamer-edge-sensor Architecture: any Multi-Arch: same diff --git a/debian/nnstreamer-edge-dev.install b/debian/nnstreamer-edge-dev.install new file mode 100644 index 0000000..c47950c --- /dev/null +++ b/debian/nnstreamer-edge-dev.install @@ -0,0 +1,3 @@ +/usr/include/nnstreamer-edge.h +/usr/lib/*/pkgconfig/nnstreamer-edge.pc +/usr/lib/*/libnnstreamer-edge.so diff --git a/debian/nnstreamer-edge.install b/debian/nnstreamer-edge.install new file mode 100644 index 0000000..838ac9a --- /dev/null +++ b/debian/nnstreamer-edge.install @@ -0,0 +1 @@ +/usr/lib/*/libnnstreamer-edge.so.* diff --git a/include/nnstreamer-edge.h b/include/nnstreamer-edge.h new file mode 100644 index 0000000..5c4fd9c --- /dev/null +++ b/include/nnstreamer-edge.h @@ -0,0 +1,217 @@ +/* SPDX-License-Identifier: Apache-2.0 */ +/** + * Copyright (C) 2022 Gichan Jang + * + * @file nnstreamer-edge.h + * @date 25 Mar 2022 + * @brief Common library to support communication among devices. + * @see https://github.com/nnstreamer/nnstreamer + * @author Gichan Jang + * @bug No known bugs except for NYI items + * + * @note This file will be moved to nnstreamer-edge repo. (https://github.com/nnstreamer/nnstreamer-edge) + * + * @todo Update document and sample code. + * 1. Add sample code when the 1st API set is complete - connection, pub/sub, request, ... + * 2. Update license when migrating this into edge repo. (Apache-2.0) + */ + +#ifndef __NNSTREAMER_EDGE_H__ +#define __NNSTREAMER_EDGE_H__ + +#include +#include +#include +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif /* __cplusplus */ + +typedef void *nns_edge_h; +typedef void *nns_edge_event_h; +typedef void *nns_edge_data_h; + +/** + * @brief The maximum number of data instances that nnstreamer-edge data may have. + */ +#define NNS_EDGE_DATA_LIMIT (16) + +/** + * @brief Enumeration for the error codes of nnstreamer-edge. + * @todo define detailed error code later (linux standard error, sync with tizen error code) + */ +typedef enum { + NNS_EDGE_ERROR_NONE = 0, + NNS_EDGE_ERROR_INVALID_PARAMETER = -EINVAL, + NNS_EDGE_ERROR_OUT_OF_MEMORY = -ENOMEM, + NNS_EDGE_ERROR_IO = -EIO, + NNS_EDGE_ERROR_CONNECTION_FAILURE = -ECONNREFUSED, + NNS_EDGE_ERROR_UNKNOWN = (-1073741824LL), + NNS_EDGE_ERROR_NOT_SUPPORTED = (NNS_EDGE_ERROR_UNKNOWN + 2), +} nns_edge_error_e; + +typedef enum { + NNS_EDGE_EVENT_UNKNOWN = 0, + NNS_EDGE_EVENT_CAPABILITY, + NNS_EDGE_EVENT_NEW_DATA_RECEIVED, + NNS_EDGE_EVENT_CALLBACK_RELEASED, + + NNS_EDGE_EVENT_CUSTOM = 0x01000000 +} nns_edge_event_e; + +typedef enum { + NNS_EDGE_PROTOCOL_TCP = 0, + NNS_EDGE_PROTOCOL_MQTT, + NNS_EDGE_PROTOCOL_AITT, + NNS_EDGE_PROTOCOL_AITT_TCP, + + NNS_EDGE_PROTOCOL_MAX +} nns_edge_protocol_e; + +/** + * @brief Callback for the nnstreamer edge event. + * @note This callback will suspend data stream. Do not spend too much time in the callback. + * @return User should return NNS_EDGE_ERROR_NONE if an event is successfully handled. + */ +typedef int (*nns_edge_event_cb) (nns_edge_event_h event_h, void *user_data); + +/** + * @brief Callback called when nnstreamer-edge data is released. + */ +typedef void (*nns_edge_data_destroy_cb) (void *data); + +/** + * @brief Get registered handle. If not registered, create new handle and register it. + */ +int nns_edge_create_handle (const char *id, const char *topic, nns_edge_h *edge_h); + +/** + * @brief Start the nnstreamer edge. + */ +int nns_edge_start (nns_edge_h edge_h, bool is_server); + +/** + * @brief Release the given handle. + */ +int nns_edge_release_handle (nns_edge_h edge_h); + +/** + * @brief Set the event callback. + */ +int nns_edge_set_event_callback (nns_edge_h edge_h, nns_edge_event_cb cb, void *user_data); + +/** + * @brief Connect to the destination node. + */ +int nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol, const char *ip, int port); + +/** + * @brief Disconnect from the destination node. + */ +int nns_edge_disconnect (nns_edge_h edge_h); + +/** + * @brief Publish a message to a given topic. + */ +int nns_edge_publish (nns_edge_h edge_h, nns_edge_data_h data_h); + +/** + * @brief Request result to the server. + */ +int nns_edge_request (nns_edge_h edge_h, nns_edge_data_h data_h, void *user_data); + +/** + * @brief Respond to a request. + */ +int nns_edge_respond (nns_edge_h edge_h, nns_edge_data_h data_h); + +/** + * @brief Subscribe a message to a given topic. + */ +int nns_edge_subscribe (nns_edge_h edge_h, nns_edge_data_h data_h, void *user_data); + +/** + * @brief Unsubscribe a message to a given topic. + */ +int nns_edge_unsubscribe (nns_edge_h edge_h); + +/** + * @brief Get the topic of edge handle. Caller should release returned string using free(). + */ +int nns_edge_get_topic (nns_edge_h edge_h, char **topic); + +/** + * @brief Set nnstreamer edge info. + */ +int nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value); + +/** + * @brief Get the nnstreamer edge event type. + */ +int nns_edge_event_get_type (nns_edge_event_h event_h, nns_edge_event_e *event); + +/** + * @brief Parse edge event (NNS_EDGE_EVENT_NEW_DATA_RECEIVED) and get received data. + * @note Caller should release returned edge data using nns_edge_data_destroy(). + */ +int nns_edge_event_parse_new_data (nns_edge_event_h event_h, nns_edge_data_h *data_h); + +/** + * @brief Parse edge event (NNS_EDGE_EVENT_CAPABILITY) and get capability string. + * @note Caller should release returned string using free(). + */ +int nns_edge_event_parse_capability (nns_edge_event_h event_h, char **capability); + +/** + * @brief Create nnstreamer edge data. + */ +int nns_edge_data_create (nns_edge_data_h *data_h); + +/** + * @brief Destroy nnstreamer edge data. + */ +int nns_edge_data_destroy (nns_edge_data_h data_h); + +/** + * @brief Validate edge data handle. + */ +int nns_edge_data_is_valid (nns_edge_data_h data_h); + +/** + * @brief Copy edge data and return new handle. + */ +int nns_edge_data_copy (nns_edge_data_h data_h, nns_edge_data_h *new_data_h); + +/** + * @brief Add raw data into nnstreamer edge data. + */ +int nns_edge_data_add (nns_edge_data_h data_h, void *data, size_t data_len, nns_edge_data_destroy_cb destroy_cb); + +/** + * @brief Get the nnstreamer edge data. + * @note DO NOT release returned data. You should copy the data to another buffer if the returned data is necessary. + */ +int nns_edge_data_get (nns_edge_data_h data_h, unsigned int index, void **data, size_t *data_len); + +/** + * @brief Get the number of nnstreamer edge data. + */ +int nns_edge_data_get_count (nns_edge_data_h data_h, unsigned int *count); + +/** + * @brief Set the information of edge data. + */ +int nns_edge_data_set_info (nns_edge_data_h data_h, const char *key, const char *value); + +/** + * @brief Get the information of edge data. Caller should release the returned value using free(). + */ +int nns_edge_data_get_info (nns_edge_data_h data_h, const char *key, char **value); + +#ifdef __cplusplus +} +#endif /* __cplusplus */ +#endif /* __NNSTREAMER_EDGE_H__ */ diff --git a/nnstreamer-edge.pc.in b/nnstreamer-edge.pc.in new file mode 100644 index 0000000..cf31b4c --- /dev/null +++ b/nnstreamer-edge.pc.in @@ -0,0 +1,13 @@ +# Package information for pkg-config + +prefix=@PREFIX@ +exec_prefix=@EXEC_PREFIX@ +libdir=@LIB_INSTALL_DIR@ +includedir=@INCLUDE_INSTALL_DIR@ + +Name: nnstreamer-edge +Description: NNStreamer-Edge library +Version: @VERSION@ +Requires: @REQUIRES_LIST@ +Libs: -L${libdir} -lnnstreamer-edge +Cflags: -I${includedir} -pthread diff --git a/packaging/nnstreamer-edge.spec b/packaging/nnstreamer-edge.spec index 888e540..5b2e1f8 100644 --- a/packaging/nnstreamer-edge.spec +++ b/packaging/nnstreamer-edge.spec @@ -12,6 +12,8 @@ Source1001: nnstreamer-edge.manifest BuildRequires: cmake BuildRequires: pkgconfig(paho-mqtt-c) +# TODO remove glib +BuildRequires: glib2-devel %if 0%{?sensor_test} BuildRequires: gtest-devel %endif @@ -22,7 +24,13 @@ BuildRequires: lcov %description nnstreamer-edge provides remote source nodes for NNStreamer pipelines without GStreamer dependencies. -It also contains communicaton library for sharing server node information & status +It also contains communicaton library for sharing server node information & status. + +%package devel +Summary: development package for nnstreamer-edge +Requires: nnstreamer-edge = %{version}-%{release} +%description devel +It is a development package for nnstreamer-edge. %package sensor Summary: communication library for edge sensor @@ -121,6 +129,15 @@ cp -r result %{buildroot}%{_datadir}/nnstreamer-edge/unittest/ %clean rm -rf %{buildroot} +%files +%manifest nnstreamer-edge.manifest +%defattr(-,root,root,-) +%{_libdir}/libnnstreamer-edge.so* + +%files devel +%{_includedir}/nnstreamer-edge.h +%{_libdir}/pkgconfig/nnstreamer-edge.pc + %files sensor %manifest nnstreamer-edge.manifest %defattr(-,root,root,-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1dbd8d0..8f820e9 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,3 +1,19 @@ +# NNStreamer-Edge library +SET(NNS_EDGE_LIB_NAME nnstreamer-edge) +SET(NNS_EDGE_SRCS + ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-common.c + ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-internal.c + ${NNS_EDGE_SRC_DIR}/nnstreamer-edge-mqtt.c +) + +ADD_LIBRARY(${NNS_EDGE_LIB_NAME} SHARED ${NNS_EDGE_SRCS}) +SET_TARGET_PROPERTIES(${NNS_EDGE_LIB_NAME} PROPERTIES VERSION ${SO_VERSION}) +TARGET_INCLUDE_DIRECTORIES(${NNS_EDGE_LIB_NAME} PRIVATE ${INCLUDE_DIR} ${EDGE_REQUIRE_PKGS_INCLUDE_DIRS}) +TARGET_LINK_LIBRARIES(${NNS_EDGE_LIB_NAME} ${EDGE_REQUIRE_PKGS_LDFLAGS}) + +INSTALL (TARGETS ${NNS_EDGE_LIB_NAME} DESTINATION ${CMAKE_INSTALL_LIBDIR}) +INSTALL (FILES ${INCLUDE_DIR}/nnstreamer-edge.h DESTINATION ${INCLUDE_INSTALL_DIR}) + # Edge Sensor SET(SENSOR_SRCS ${SENSOR_SRC_DIR}/edge_sensor.c) ADD_LIBRARY(edge-sensor SHARED ${SENSOR_SRCS}) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-aitt.c b/src/libnnstreamer-edge/nnstreamer-edge-aitt.c new file mode 100644 index 0000000..1a1b024 --- /dev/null +++ b/src/libnnstreamer-edge/nnstreamer-edge-aitt.c @@ -0,0 +1,33 @@ +/* SPDX-License-Identifier: Apache-2.0 */ +/** + * Copyright (C) 2022 Gichan Jang + * + * @file nnstreamer-edge-aitt.c + * @date 28 Mar 2022 + * @brief Common library to support communication among devices using aitt. + * @see https://github.com/nnstreamer/nnstreamer + * @author Gichan Jang + * @bug No known bugs except for NYI items + */ + +#include +#include "nnstreamer-edge-common.h" + +typedef void *nns_edge_aitt_h; +typedef void *nns_edge_aitt_msg_h; +typedef void *nns_edge_aitt_sub_h; + +/** + * @brief Data structure for aitt handle. + * @todo Update AITT-related handle later. This is internal struct to manage edge-handle with AITT. + */ +typedef struct +{ + nns_edge_protocol_e protocol; + struct + { + nns_edge_aitt_h aitt_h; + nns_edge_aitt_msg_h msg_h; + nns_edge_aitt_sub_h sub_h; + }; +} nns_edge_handle_s; diff --git a/src/libnnstreamer-edge/nnstreamer-edge-common.c b/src/libnnstreamer-edge/nnstreamer-edge-common.c new file mode 100644 index 0000000..b3eba68 --- /dev/null +++ b/src/libnnstreamer-edge/nnstreamer-edge-common.c @@ -0,0 +1,472 @@ +/* SPDX-License-Identifier: Apache-2.0 */ +/** + * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. + * + * @file nnstreamer-edge-common.c + * @date 6 April 2022 + * @brief Common util functions for nnstreamer edge. + * @see https://github.com/nnstreamer/nnstreamer + * @author Gichan Jang + * @bug No known bugs except for NYI items + */ + +#include "nnstreamer-edge-common.h" + +/** + * @brief Create nnstreamer edge event. + */ +int +nns_edge_event_create (nns_edge_event_e event, nns_edge_event_h * event_h) +{ + nns_edge_event_s *ee; + + if (!event_h) { + nns_edge_loge ("Invalid param, event_h should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (event <= NNS_EDGE_EVENT_UNKNOWN) { + nns_edge_loge ("Invalid param, given event type is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ee = (nns_edge_event_s *) malloc (sizeof (nns_edge_event_s)); + if (!ee) { + nns_edge_loge ("Failed to allocate memory for edge event."); + return NNS_EDGE_ERROR_OUT_OF_MEMORY; + } + + memset (ee, 0, sizeof (nns_edge_event_s)); + ee->magic = NNS_EDGE_MAGIC; + ee->event = event; + + *event_h = ee; + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Destroy nnstreamer edge event. + */ +int +nns_edge_event_destroy (nns_edge_event_h event_h) +{ + nns_edge_event_s *ee; + + ee = (nns_edge_event_s *) event_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (ee)) { + nns_edge_loge ("Invalid param, given edge event is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ee->magic = NNS_EDGE_MAGIC_DEAD; + + if (ee->data.destroy_cb) + ee->data.destroy_cb (ee->data.data); + + g_free (ee); + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Set event data. + */ +int +nns_edge_event_set_data (nns_edge_event_h event_h, void *data, size_t data_len, + nns_edge_data_destroy_cb destroy_cb) +{ + nns_edge_event_s *ee; + + ee = (nns_edge_event_s *) event_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (ee)) { + nns_edge_loge ("Invalid param, given edge event is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!data || data_len <= 0) { + nns_edge_loge ("Invalid param, data should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + /* Clear old data and set new one. */ + if (ee->data.destroy_cb) + ee->data.destroy_cb (ee->data.data); + + ee->data.data = data; + ee->data.data_len = data_len; + ee->data.destroy_cb = destroy_cb; + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Get the nnstreamer edge event type. + */ +int +nns_edge_event_get_type (nns_edge_event_h event_h, nns_edge_event_e * event) +{ + nns_edge_event_s *ee; + + ee = (nns_edge_event_s *) event_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (ee)) { + nns_edge_loge ("Invalid param, given edge event is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!event) { + nns_edge_loge ("Invalid param, event should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + *event = ee->event; + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Parse edge event (NNS_EDGE_EVENT_NEW_DATA_RECEIVED) and get received data. + * @note Caller should release returned edge data using nns_edge_data_destroy(). + */ +int +nns_edge_event_parse_new_data (nns_edge_event_h event_h, + nns_edge_data_h * data_h) +{ + nns_edge_event_s *ee; + + ee = (nns_edge_event_s *) event_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (ee)) { + nns_edge_loge ("Invalid param, given edge event is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!data_h) { + nns_edge_loge ("Invalid param, data_h should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (ee->event != NNS_EDGE_EVENT_NEW_DATA_RECEIVED) { + nns_edge_loge ("The edge event has invalid event type."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + return nns_edge_data_copy ((nns_edge_data_h) ee->data.data, data_h); +} + +/** + * @brief Parse edge event (NNS_EDGE_EVENT_CAPABILITY) and get capability string. + * @note Caller should release returned string using free(). + */ +int +nns_edge_event_parse_capability (nns_edge_event_h event_h, char **capability) +{ + nns_edge_event_s *ee; + + ee = (nns_edge_event_s *) event_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (ee)) { + nns_edge_loge ("Invalid param, given edge event is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!capability) { + nns_edge_loge ("Invalid param, capability should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (ee->event != NNS_EDGE_EVENT_CAPABILITY) { + nns_edge_loge ("The edge event has invalid event type."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + *capability = g_strdup (ee->data.data); + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Create nnstreamer edge data. + */ +int +nns_edge_data_create (nns_edge_data_h * data_h) +{ + nns_edge_data_s *ed; + + if (!data_h) { + nns_edge_loge ("Invalid param, data_h should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ed = (nns_edge_data_s *) malloc (sizeof (nns_edge_data_s)); + if (!ed) { + nns_edge_loge ("Failed to allocate memory for edge data."); + return NNS_EDGE_ERROR_OUT_OF_MEMORY; + } + + memset (ed, 0, sizeof (nns_edge_data_s)); + ed->magic = NNS_EDGE_MAGIC; + ed->info_table = + g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_free); + + *data_h = ed; + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Destroy nnstreamer edge data. + */ +int +nns_edge_data_destroy (nns_edge_data_h data_h) +{ + nns_edge_data_s *ed; + unsigned int i; + + ed = (nns_edge_data_s *) data_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (ed)) { + nns_edge_loge ("Invalid param, given edge data is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ed->magic = NNS_EDGE_MAGIC_DEAD; + + for (i = 0; i < ed->num; i++) { + if (ed->data[i].destroy_cb) + ed->data[i].destroy_cb (ed->data[i].data); + } + + g_hash_table_destroy (ed->info_table); + + g_free (ed); + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Validate edge data handle. + */ +int +nns_edge_data_is_valid (nns_edge_data_h data_h) +{ + nns_edge_data_s *ed; + + ed = (nns_edge_data_s *) data_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (ed)) { + nns_edge_loge ("Invalid param, edge data handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Copy edge data and return new handle. + */ +int +nns_edge_data_copy (nns_edge_data_h data_h, nns_edge_data_h * new_data_h) +{ + nns_edge_data_s *ed; + nns_edge_data_s *copied; + GHashTableIter iter; + gpointer key, value; + unsigned int i; + int ret; + + ed = (nns_edge_data_s *) data_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (ed)) { + nns_edge_loge ("Invalid param, edge data handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!new_data_h) { + nns_edge_loge ("Invalid param, new_data_h should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ret = nns_edge_data_create (new_data_h); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to create new data handle."); + return ret; + } + + copied = (nns_edge_data_s *) (*new_data_h); + + copied->num = ed->num; + for (i = 0; i < ed->num; i++) { + copied->data[i].data = _g_memdup (ed->data[i].data, ed->data[i].data_len); + copied->data[i].data_len = ed->data[i].data_len; + copied->data[i].destroy_cb = g_free; + } + + g_hash_table_iter_init (&iter, ed->info_table); + while (g_hash_table_iter_next (&iter, &key, &value)) { + g_hash_table_insert (copied->info_table, g_strdup (key), g_strdup (value)); + } + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Add raw data into nnstreamer edge data. + */ +int +nns_edge_data_add (nns_edge_data_h data_h, void *data, size_t data_len, + nns_edge_data_destroy_cb destroy_cb) +{ + nns_edge_data_s *ed; + + ed = (nns_edge_data_s *) data_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (ed)) { + nns_edge_loge ("Invalid param, given edge data is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (ed->num >= NNS_EDGE_DATA_LIMIT) { + nns_edge_loge ("Cannot add data, the maximum number of edge data is %d.", + NNS_EDGE_DATA_LIMIT); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!data || data_len <= 0) { + nns_edge_loge ("Invalid param, data should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ed->data[ed->num].data = data; + ed->data[ed->num].data_len = data_len; + ed->data[ed->num].destroy_cb = destroy_cb; + ed->num++; + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Get the nnstreamer edge data. + * @note DO NOT release returned data. You should copy the data to another buffer if the returned data is necessary. + */ +int +nns_edge_data_get (nns_edge_data_h data_h, unsigned int index, void **data, + size_t *data_len) +{ + nns_edge_data_s *ed; + + ed = (nns_edge_data_s *) data_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (ed)) { + nns_edge_loge ("Invalid param, given edge data is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!data || !data_len) { + nns_edge_loge ("Invalid param, data and len should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (index >= ed->num) { + nns_edge_loge + ("Invalid param, the number of edge data is %u but requested %uth data.", + ed->num, index); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + *data = ed->data[index].data; + *data_len = ed->data[index].data_len; + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Get the number of nnstreamer edge data. + */ +int +nns_edge_data_get_count (nns_edge_data_h data_h, unsigned int *count) +{ + nns_edge_data_s *ed; + + ed = (nns_edge_data_s *) data_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (ed)) { + nns_edge_loge ("Invalid param, given edge data is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!count) { + nns_edge_loge ("Invalid param, count should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + *count = ed->num; + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Set the information of edge data. + */ +int +nns_edge_data_set_info (nns_edge_data_h data_h, const char *key, + const char *value) +{ + nns_edge_data_s *ed; + + ed = (nns_edge_data_s *) data_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (ed)) { + nns_edge_loge ("Invalid param, given edge data is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!key || *key == '\0') { + nns_edge_loge ("Invalid param, given key is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!value || *value == '\0') { + nns_edge_loge ("Invalid param, given value is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + g_hash_table_insert (ed->info_table, g_strdup (key), g_strdup (value)); + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Get the information of edge data. Caller should release the returned value using free(). + */ +int +nns_edge_data_get_info (nns_edge_data_h data_h, const char *key, char **value) +{ + nns_edge_data_s *ed; + char *val; + + ed = (nns_edge_data_s *) data_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (ed)) { + nns_edge_loge ("Invalid param, given edge data is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!key || *key == '\0') { + nns_edge_loge ("Invalid param, given key is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!value) { + nns_edge_loge ("Invalid param, value should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + val = g_hash_table_lookup (ed->info_table, key); + if (!val) { + nns_edge_loge ("Invalid param, cannot find info about '%s'.", key); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + *value = g_strdup (val); + + return NNS_EDGE_ERROR_NONE; +} diff --git a/src/libnnstreamer-edge/nnstreamer-edge-common.h b/src/libnnstreamer-edge/nnstreamer-edge-common.h new file mode 100644 index 0000000..892ef4a --- /dev/null +++ b/src/libnnstreamer-edge/nnstreamer-edge-common.h @@ -0,0 +1,111 @@ +/* SPDX-License-Identifier: Apache-2.0 */ +/** + * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. + * + * @file nnstreamer-edge-common.h + * @date 6 April 2022 + * @brief Common util functions for nnstreamer edge. + * @see https://github.com/nnstreamer/nnstreamer + * @author Gichan Jang + * @bug No known bugs except for NYI items + * @note This file is internal header for nnstreamer edge utils. DO NOT export this file. + */ + +#ifndef __NNSTREAMER_EDGE_COMMON_H__ +#define __NNSTREAMER_EDGE_COMMON_H__ + +#include /** @todo remove glib */ +#include +#include "nnstreamer-edge.h" + +#ifdef __cplusplus +extern "C" { +#endif /* __cplusplus */ + +/** + * @brief Utility to silence unused parameter warning for intentionally unused parameters (e.g., callback functions of a framework) + */ +#ifndef UNUSED +#define UNUSED(expr) do { (void)(expr); } while (0) +#endif + +/** + * @brief g_memdup() function replaced by g_memdup2() in glib version >= 2.68 + */ +#if GLIB_USE_G_MEMDUP2 +#define _g_memdup g_memdup2 +#else +#define _g_memdup g_memdup +#endif + +#define NNS_EDGE_MAGIC 0xfeedfeed +#define NNS_EDGE_MAGIC_DEAD 0xdeaddead +#define NNS_EDGE_MAGIC_IS_VALID(h) ((h) && (h)->magic == NNS_EDGE_MAGIC) + +#define nns_edge_lock_init(h) do { pthread_mutex_init (&(h)->lock, NULL); } while (0) +#define nns_edge_lock_destroy(h) do { pthread_mutex_destroy (&(h)->lock); } while (0) +#define nns_edge_lock(h) do { pthread_mutex_lock (&(h)->lock); } while (0) +#define nns_edge_unlock(h) do { pthread_mutex_unlock (&(h)->lock); } while (0) + +/** + * @brief Internal data structure for raw data. + */ +typedef struct { + void *data; + size_t data_len; + nns_edge_data_destroy_cb destroy_cb; +} nns_edge_raw_data_s; + +/** + * @brief Internal data structure for edge data. + */ +typedef struct { + unsigned int magic; + unsigned int num; + nns_edge_raw_data_s data[NNS_EDGE_DATA_LIMIT]; + GHashTable *info_table; +} nns_edge_data_s; + +/** + * @brief Internal data structure for edge event. + */ +typedef struct { + unsigned int magic; + nns_edge_event_e event; + nns_edge_raw_data_s data; +} nns_edge_event_s; + +/** + * @todo add log util for nnstreamer-edge. + * 1. define tag (e.g., "nnstreamer-edge"). + * 2. consider macros to print function and line. + * 3. new API to get last error. + */ +#define nns_edge_logi g_info +#define nns_edge_logw g_warning +#define nns_edge_loge g_critical +#define nns_edge_logd g_debug +#define nns_edge_logf g_error + +/** + * @brief Create nnstreamer edge event. + * @note This is internal function for edge event. + */ +int nns_edge_event_create (nns_edge_event_e event, nns_edge_event_h * event_h); + +/** + * @brief Destroy nnstreamer edge event. + * @note This is internal function for edge event. + */ +int nns_edge_event_destroy (nns_edge_event_h event_h); + +/** + * @brief Set event data. + * @note This is internal function for edge event. + */ +int nns_edge_event_set_data (nns_edge_event_h event_h, void *data, size_t data_len, nns_edge_data_destroy_cb destroy_cb); + +#ifdef __cplusplus +} +#endif /* __cplusplus */ +#endif /* __NNSTREAMER_EDGE_COMMON_H__ */ diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c new file mode 100644 index 0000000..f507f08 --- /dev/null +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.c @@ -0,0 +1,1437 @@ +/* SPDX-License-Identifier: Apache-2.0 */ +/** + * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. + * + * @file nnstreamer-edge-internal.c + * @date 6 April 2022 + * @brief Common library to support communication among devices. + * @see https://github.com/nnstreamer/nnstreamer + * @author Gichan Jang + * @bug No known bugs except for NYI items + */ + +#include "nnstreamer-edge-common.h" +#include "nnstreamer-edge-internal.h" + +#define N_BACKLOG 10 +#define DEFAULT_TIMEOUT_SEC 10 +#define _STR_NULL(str) ((str) ? (str) : "(NULL)") + +/** + * @brief enum for nnstreamer edge query commands. + */ +typedef enum +{ + _NNS_EDGE_CMD_ERROR = 0, + _NNS_EDGE_CMD_TRANSFER_DATA, + _NNS_EDGE_CMD_HOST_INFO, + _NNS_EDGE_CMD_CAPABILITY, + _NNS_EDGE_CMD_END +} nns_edge_cmd_e; + +/** + * @brief Structure for edge command info. It should be fixed size. + */ +typedef struct +{ + nns_edge_cmd_e cmd; + int64_t client_id; + + /* memory info */ + uint32_t num; + size_t mem_size[NNS_EDGE_DATA_LIMIT]; +} nns_edge_cmd_info_s; + +/** + * @brief Structure for edge command and buffers. + */ +typedef struct +{ + nns_edge_cmd_info_s info; + void *mem[NNS_EDGE_DATA_LIMIT]; +} nns_edge_cmd_s; + +/** + * @brief Data structure for edge connection. + */ +typedef struct +{ + char *ip; + int port; + int8_t running; + pthread_t msg_thread; + GSocket *socket; + GCancellable *cancellable; +} nns_edge_conn_s; + +/** + * @brief Data structure for connection data. + */ +typedef struct +{ + nns_edge_conn_s *src_conn; + nns_edge_conn_s *sink_conn; + int64_t id; +} nns_edge_conn_data_s; + +/** + * @brief Structures for thread data of message handling. + */ +typedef struct +{ + nns_edge_handle_s *eh; + int64_t client_id; + nns_edge_conn_s *conn; +} nns_edge_thread_data_s; + +/** + * @brief Send data to connected socket. + */ +static bool +_send_raw_data (GSocket * socket, void *data, size_t size, + GCancellable * cancellable) +{ + size_t bytes_sent = 0; + ssize_t rret; + GError *err = NULL; + + while (bytes_sent < size) { + rret = g_socket_send (socket, (char *) data + bytes_sent, + size - bytes_sent, cancellable, &err); + + if (rret == 0) { + nns_edge_loge ("Connection closed."); + return false; + } + + if (rret < 0) { + nns_edge_loge ("Error while sending data (%s).", err->message); + g_clear_error (&err); + return false; + } + + bytes_sent += rret; + } + + return true; +} + +/** + * @brief Receive data from connected socket. + */ +static bool +_receive_raw_data (GSocket * socket, void *data, size_t size, + GCancellable * cancellable) +{ + size_t bytes_received = 0; + ssize_t rret; + GError *err = NULL; + + while (bytes_received < size) { + rret = g_socket_receive (socket, (char *) data + bytes_received, + size - bytes_received, cancellable, &err); + + if (rret == 0) { + nns_edge_loge ("Connection closed."); + return false; + } + + if (rret < 0) { + nns_edge_loge ("Failed to read from socket (%s).", err->message); + g_clear_error (&err); + return false; + } + + bytes_received += rret; + } + + return true; +} + +/** + * @brief Parse string and get host IP:port. + */ +static void +_parse_host_str (const char *host, char **ip, int *port) +{ + char *p = g_strrstr (host, ":"); + + if (p) { + *ip = g_strndup (host, (p - host)); + *port = (int) g_ascii_strtoll (p + 1, NULL, 10); + } +} + +/** + * @brief Get host string (IP:port). + */ +static void +_get_host_str (const char *ip, const int port, char **host) +{ + *host = g_strdup_printf ("%s:%d", ip, port); +} + +/** + * @brief Get available port number. + */ +static int +_get_available_port (void) +{ + struct sockaddr_in sin; + int port = 0, sock; + socklen_t len = sizeof (struct sockaddr); + + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = INADDR_ANY; + sock = socket (AF_INET, SOCK_STREAM, 0); + sin.sin_port = port; + if (bind (sock, (struct sockaddr *) &sin, sizeof (struct sockaddr)) == 0) { + getsockname (sock, (struct sockaddr *) &sin, &len); + port = ntohs (sin.sin_port); + nns_edge_logi ("Available port number: %d", port); + } + close (sock); + + return port; +} + +/** + * @brief initialize edge command. + */ +static void +_nns_edge_cmd_init (nns_edge_cmd_s * cmd, nns_edge_cmd_e c, int64_t cid) +{ + if (!cmd) + return; + + memset (cmd, 0, sizeof (nns_edge_cmd_s)); + cmd->info.cmd = c; + cmd->info.client_id = cid; +} + +/** + * @brief Clear allocated memory in edge command. + */ +static void +_nns_edge_cmd_clear (nns_edge_cmd_s * cmd) +{ + unsigned int i; + + if (!cmd) + return; + + for (i = 0; i < cmd->info.num; i++) { + if (cmd->mem[i]) + free (cmd->mem[i]); + cmd->mem[i] = NULL; + } +} + +/** + * @brief Send edge command to connected device. + */ +static int +_nns_edge_cmd_send (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd) +{ + unsigned int n; + + if (!conn || !cmd) + return NNS_EDGE_ERROR_INVALID_PARAMETER; + + if (!_send_raw_data (conn->socket, &cmd->info, + sizeof (nns_edge_cmd_info_s), conn->cancellable)) { + nns_edge_loge ("Failed to send command to socket."); + return NNS_EDGE_ERROR_IO; + } + + for (n = 0; n < cmd->info.num; n++) { + if (!_send_raw_data (conn->socket, cmd->mem[n], + cmd->info.mem_size[n], conn->cancellable)) { + nns_edge_loge ("Failed to send %uth memory to socket.", n); + return NNS_EDGE_ERROR_IO; + } + } + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Receive edge command from connected device. + */ +static int +_nns_edge_cmd_receive (nns_edge_conn_s * conn, nns_edge_cmd_s * cmd) +{ + unsigned int i, n; + int ret = NNS_EDGE_ERROR_NONE; + + if (!conn || !cmd) + return NNS_EDGE_ERROR_INVALID_PARAMETER; + + if (!_receive_raw_data (conn->socket, &cmd->info, + sizeof (nns_edge_cmd_info_s), conn->cancellable)) { + nns_edge_loge ("Failed to receive command from socket."); + return NNS_EDGE_ERROR_IO; + } + + nns_edge_logd ("Received command:%d (num:%u)", cmd->info.cmd, cmd->info.num); + + for (n = 0; n < cmd->info.num; n++) { + cmd->mem[n] = malloc (cmd->info.mem_size[n]); + if (!cmd->mem[n]) { + nns_edge_loge ("Failed to allocate memory to receive data from socket."); + ret = NNS_EDGE_ERROR_OUT_OF_MEMORY; + break; + } + + if (!_receive_raw_data (conn->socket, cmd->mem[n], + cmd->info.mem_size[n], conn->cancellable)) { + nns_edge_loge ("Failed to receive %uth memory from socket.", n++); + ret = NNS_EDGE_ERROR_IO; + break; + } + } + + if (ret != NNS_EDGE_ERROR_NONE) { + for (i = 0; i < n; i++) { + free (cmd->mem[i]); + cmd->mem[i] = NULL; + } + } + + return ret; +} + +/** + * @brief Internal function to invoke event callback. + * @note This function should be called with handle lock. + */ +static int +_nns_edge_invoke_event_cb (nns_edge_handle_s * eh, nns_edge_event_e event, + void *data, size_t data_len, nns_edge_data_destroy_cb destroy_cb) +{ + nns_edge_event_h event_h; + int ret; + + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + /* If event callback is null, return ok. */ + if (!eh->event_cb) { + nns_edge_logw ("The event callback is null, do nothing!"); + return NNS_EDGE_ERROR_NONE; + } + + ret = nns_edge_event_create (event, &event_h); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to create new edge event."); + return ret; + } + + if (data) { + ret = nns_edge_event_set_data (event_h, data, data_len, destroy_cb); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to handle edge event due to invalid event data."); + goto error; + } + } + + ret = eh->event_cb (event_h, eh->user_data); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("The event callback returns error."); + } + +error: + nns_edge_event_destroy (event_h); + return ret; +} + +/** + * @brief Close connection + */ +static bool +_nns_edge_close_connection (nns_edge_conn_s * conn) +{ + GError *err = NULL; + + if (!conn) + return false; + + if (conn->running) { + conn->running = 0; + pthread_join (conn->msg_thread, NULL); + } + + if (conn->socket) { + if (!g_socket_close (conn->socket, &err)) { + nns_edge_loge ("Failed to close socket: %s", err->message); + g_clear_error (&err); + return false; + } + g_object_unref (conn->socket); + conn->socket = NULL; + } + + if (conn->cancellable) { + g_object_unref (conn->cancellable); + conn->cancellable = NULL; + } + + g_free (conn->ip); + g_free (conn); + return true; +} + +/** + * @brief Get nnstreamer-edge connection data. + * @note This function should be called with handle lock. + */ +static nns_edge_conn_data_s * +_nns_edge_get_connection (nns_edge_handle_s * eh, int64_t client_id) +{ + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NULL; + } + + return g_hash_table_lookup (eh->conn_table, GUINT_TO_POINTER (client_id)); +} + +/** + * @brief Get nnstreamer-edge connection data. + * @note This function should be called with handle lock. + */ +static nns_edge_conn_data_s * +_nns_edge_add_connection (nns_edge_handle_s * eh, int64_t client_id) +{ + nns_edge_conn_data_s *data = NULL; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NULL; + } + + data = g_hash_table_lookup (eh->conn_table, GUINT_TO_POINTER (client_id)); + + if (NULL == data) { + data = (nns_edge_conn_data_s *) malloc (sizeof (nns_edge_conn_data_s)); + if (NULL == data) { + nns_edge_loge ("Failed to allocate memory for connection data."); + return NULL; + } + + memset (data, 0, sizeof (nns_edge_conn_data_s)); + data->id = client_id; + + g_hash_table_insert (eh->conn_table, GUINT_TO_POINTER (client_id), data); + } + + return data; +} + +/** + * @brief Remove nnstreamer-edge connection data. This will be called when removing connection data from hash table. + */ +static void +_nns_edge_remove_connection (gpointer data) +{ + nns_edge_conn_data_s *cdata = (nns_edge_conn_data_s *) data; + + if (cdata) { + _nns_edge_close_connection (cdata->src_conn); + _nns_edge_close_connection (cdata->sink_conn); + + g_free (cdata); + } +} + +/** + * @brief Internal function to check connection. + */ +static bool +_nns_edge_check_connection (nns_edge_conn_s * conn) +{ + size_t size; + GIOCondition condition; + + if (!conn) + return false; + + condition = g_socket_condition_check (conn->socket, + G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP); + size = g_socket_get_available_bytes (conn->socket); + + if (condition && size <= 0) { + nns_edge_logw ("Socket is not available, possibly EOS."); + return false; + } + + return true; +} + +/** + * @brief Get socket address + */ +static bool +_nns_edge_get_saddr (const char *ip, const int port, + GCancellable * cancellable, GSocketAddress ** saddr) +{ + GError *err = NULL; + GInetAddress *addr; + + /* look up name if we need to */ + addr = g_inet_address_new_from_string (ip); + if (!addr) { + GList *results; + GResolver *resolver; + resolver = g_resolver_get_default (); + results = g_resolver_lookup_by_name (resolver, ip, cancellable, &err); + if (!results) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + nns_edge_loge ("Failed to resolve ip, name resolver is cancelled."); + } else { + nns_edge_loge ("Failed to resolve ip '%s': %s", ip, err->message); + } + g_clear_error (&err); + g_object_unref (resolver); + return false; + } + /** @todo Try with the second address if the first fails */ + addr = G_INET_ADDRESS (g_object_ref (results->data)); + g_resolver_free_addresses (results); + g_object_unref (resolver); + } + + *saddr = g_inet_socket_address_new (addr, port); + g_object_unref (addr); + + return true; +} + +/** + * @brief Connect to requested socket. + */ +static bool +_nns_edge_connect_socket (nns_edge_conn_s * conn) +{ + GError *err = NULL; + GSocketAddress *saddr = NULL; + bool ret = false; + + if (!_nns_edge_get_saddr (conn->ip, conn->port, conn->cancellable, &saddr)) { + nns_edge_loge ("Failed to get socket address"); + return ret; + } + + /* create sending client socket */ + conn->socket = + g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM, + G_SOCKET_PROTOCOL_TCP, &err); + + if (!conn->socket) { + nns_edge_loge ("Failed to create new socket"); + goto done; + } + + /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */ + if (!g_socket_set_option (conn->socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) { + nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message); + goto done; + } + + if (!g_socket_connect (conn->socket, saddr, conn->cancellable, &err)) { + if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + nns_edge_logd ("Cancelled connecting"); + } else { + nns_edge_loge ("Failed to connect to host, %s:%d", conn->ip, conn->port); + } + goto done; + } + + /* now connected to the requested socket */ + ret = true; + +done: + g_object_unref (saddr); + g_clear_error (&err); + return ret; +} + +/** + * @brief Connect to the destination node. + */ +static int +_nns_edge_connect_to (nns_edge_handle_s * eh, const char *ip, int port) +{ + nns_edge_conn_s *conn = NULL; + nns_edge_conn_data_s *conn_data; + nns_edge_cmd_s cmd; + char *host; + int64_t client_id; + bool done = false; + int ret; + + conn = (nns_edge_conn_s *) malloc (sizeof (nns_edge_conn_s)); + if (!conn) { + nns_edge_loge ("Failed to allocate client data."); + goto error; + } + + memset (conn, 0, sizeof (nns_edge_conn_s)); + conn->ip = g_strdup (ip); + conn->port = port; + conn->cancellable = g_cancellable_new (); + + if (!_nns_edge_connect_socket (conn)) { + goto error; + } + + /* Get destination capability. */ + ret = _nns_edge_cmd_receive (conn, &cmd); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to receive capability."); + goto error; + } + + if (cmd.info.cmd != _NNS_EDGE_CMD_CAPABILITY) { + nns_edge_loge ("Failed to get capability."); + _nns_edge_cmd_clear (&cmd); + goto error; + } + + client_id = eh->client_id = cmd.info.client_id; + + /* Check compatibility. */ + ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CAPABILITY, + cmd.mem[0], cmd.info.mem_size[0], NULL); + _nns_edge_cmd_clear (&cmd); + + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("The event returns error, capability is not acceptable."); + _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id); + } else { + /* Send ip and port to destination. */ + _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id); + + _get_host_str (eh->recv_ip, eh->recv_port, &host); + cmd.info.num = 1; + cmd.info.mem_size[0] = strlen (host) + 1; + cmd.mem[0] = host; + } + + ret = _nns_edge_cmd_send (conn, &cmd); + _nns_edge_cmd_clear (&cmd); + + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to send host info."); + goto error; + } + + conn_data = _nns_edge_add_connection (eh, client_id); + if (conn_data) { + /* Close old connection and set new one. */ + _nns_edge_close_connection (conn_data->sink_conn); + conn_data->sink_conn = conn; + done = true; + } + +error: + if (!done) { + _nns_edge_close_connection (conn); + return NNS_EDGE_ERROR_CONNECTION_FAILURE; + } + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Message thread, receive buffer from the client. + */ +static void * +_nns_edge_message_handler (void *thread_data) +{ + nns_edge_thread_data_s *_tdata = (nns_edge_thread_data_s *) thread_data; + nns_edge_handle_s *eh; + nns_edge_conn_s *conn; + nns_edge_cmd_s cmd; + int64_t client_id; + char *val; + int ret; + + if (!_tdata) { + nns_edge_loge ("Internal error, thread data is null."); + return NULL; + } + + eh = (nns_edge_handle_s *) _tdata->eh; + conn = _tdata->conn; + client_id = _tdata->client_id; + g_free (_tdata); + + while (conn->running) { + nns_edge_data_h data_h; + unsigned int i; + + /* Validate edge handle */ + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("The edge handle is invalid, it would be expired."); + break; + } + + if (!_nns_edge_check_connection (conn)) + break; + + /** Receive data from the client */ + ret = _nns_edge_cmd_receive (conn, &cmd); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to receive data from the connected node."); + break; + } + + if (cmd.info.cmd == _NNS_EDGE_CMD_ERROR) { + nns_edge_loge ("Received error, stop msg thread."); + _nns_edge_cmd_clear (&cmd); + break; + } else if (cmd.info.cmd != _NNS_EDGE_CMD_TRANSFER_DATA) { + /** @todo handle other cmd later */ + _nns_edge_cmd_clear (&cmd); + continue; + } + + ret = nns_edge_data_create (&data_h); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to create data handle in msg thread."); + _nns_edge_cmd_clear (&cmd); + continue; + } + + /* Set client ID in edge data */ + val = g_strdup_printf ("%ld", (long int) client_id); + nns_edge_data_set_info (data_h, "client_id", val); + g_free (val); + + for (i = 0; i < cmd.info.num; i++) { + nns_edge_data_add (data_h, cmd.mem[i], cmd.info.mem_size[i], NULL); + } + + ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_NEW_DATA_RECEIVED, + data_h, sizeof (data_h), NULL); + if (ret != NNS_EDGE_ERROR_NONE) { + /* Try to get next request if server does not accept data from client. */ + nns_edge_logw ("The server does not accept data from client."); + } + + nns_edge_data_destroy (data_h); + _nns_edge_cmd_clear (&cmd); + } + + conn->running = 0; + return NULL; +} + +/** + * @brief Create message handle thread. + */ +static int +_nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn, + int64_t client_id) +{ + pthread_attr_t attr; + int tid; + nns_edge_thread_data_s *thread_data = NULL; + + thread_data = + (nns_edge_thread_data_s *) malloc (sizeof (nns_edge_thread_data_s)); + if (!thread_data) { + nns_edge_loge ("Failed to allocate edge thread data."); + return NNS_EDGE_ERROR_OUT_OF_MEMORY; + } + + /** Create message receving thread */ + pthread_attr_init (&attr); + pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); + conn->running = 1; + thread_data->eh = eh; + thread_data->conn = conn; + thread_data->client_id = client_id; + + tid = pthread_create (&conn->msg_thread, &attr, _nns_edge_message_handler, + thread_data); + pthread_attr_destroy (&attr); + + if (tid < 0) { + nns_edge_loge ("Failed to create message handler thread."); + conn->running = 0; + g_free (thread_data); + return NNS_EDGE_ERROR_IO; + } + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Callback for socket listener, accept socket and create message thread. + */ +static void +_nns_edge_accept_socket_async_cb (GObject * source, GAsyncResult * result, + gpointer user_data) +{ + GSocketListener *socket_listener = G_SOCKET_LISTENER (source); + GSocket *socket = NULL; + GError *err = NULL; + nns_edge_handle_s *eh = (nns_edge_handle_s *) user_data; + nns_edge_conn_s *conn = NULL; + nns_edge_cmd_s cmd; + bool done = false; + char *connected_ip = NULL; + int connected_port = 0; + nns_edge_conn_data_s *conn_data = NULL; + int64_t client_id; + int ret; + + socket = + g_socket_listener_accept_socket_finish (socket_listener, result, NULL, + &err); + + if (!socket) { + nns_edge_loge ("Failed to get socket: %s", err->message); + g_clear_error (&err); + goto error; + } + g_socket_set_timeout (socket, DEFAULT_TIMEOUT_SEC); + + /* create socket with connection */ + conn = (nns_edge_conn_s *) malloc (sizeof (nns_edge_conn_s)); + if (!conn) { + nns_edge_loge ("Failed to allocate edge connection"); + goto error; + } + + memset (conn, 0, sizeof (nns_edge_conn_s)); + conn->socket = socket; + conn->cancellable = g_cancellable_new (); + + /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */ + if (!g_socket_set_option (socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) { + nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message); + g_clear_error (&err); + goto error; + } + + /* Send capability and info to check compatibility. */ + client_id = eh->is_server ? g_get_monotonic_time () : eh->client_id; + + _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_CAPABILITY, client_id); + cmd.info.num = 1; + cmd.info.mem_size[0] = strlen (eh->caps_str) + 1; + cmd.mem[0] = eh->caps_str; + + ret = _nns_edge_cmd_send (conn, &cmd); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to send capability."); + goto error; + } + + /* Receive ip and port from destination. */ + ret = _nns_edge_cmd_receive (conn, &cmd); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to receive node info."); + goto error; + } + + if (cmd.info.cmd != _NNS_EDGE_CMD_HOST_INFO) { + nns_edge_loge ("Failed to get host info."); + _nns_edge_cmd_clear (&cmd); + goto error; + } + + _parse_host_str (cmd.mem[0], &connected_ip, &connected_port); + _nns_edge_cmd_clear (&cmd); + + ret = _nns_edge_create_message_thread (eh, conn, client_id); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to create message handle thread."); + goto error; + } + + conn_data = _nns_edge_add_connection (eh, client_id); + if (conn_data) { + /* Close old connection and set new one. */ + _nns_edge_close_connection (conn_data->src_conn); + conn_data->src_conn = conn; + done = true; + } + +error: + if (done) { + if (eh->is_server) { + _nns_edge_connect_to (eh, connected_ip, connected_port); + } + } else { + _nns_edge_close_connection (conn); + } + + g_socket_listener_accept_socket_async (socket_listener, eh->cancellable, + (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh); + + g_free (connected_ip); +} + +/** + * @brief Get registered handle. If not registered, create new handle and register it. + */ +int +nns_edge_create_handle (const char *id, const char *topic, nns_edge_h * edge_h) +{ + nns_edge_handle_s *eh; + + if (!id || *id == '\0') { + nns_edge_loge ("Invalid param, given ID is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!topic || *topic == '\0') { + nns_edge_loge ("Invalid param, given topic is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!edge_h) { + nns_edge_loge ("Invalid param, edge_h should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + /** + * @todo manage edge handles + * 1. consider adding hash table or list to manage edge handles. + * 2. compare topic and return error if existing topic in handle is different. + */ + eh = (nns_edge_handle_s *) malloc (sizeof (nns_edge_handle_s)); + if (!eh) { + nns_edge_loge ("Failed to allocate memory for edge handle."); + return NNS_EDGE_ERROR_OUT_OF_MEMORY; + } + + memset (eh, 0, sizeof (nns_edge_handle_s)); + nns_edge_lock_init (eh); + eh->magic = NNS_EDGE_MAGIC; + eh->id = g_strdup (id); + eh->topic = g_strdup (topic); + eh->protocol = NNS_EDGE_PROTOCOL_TCP; + eh->is_server = true; + eh->recv_ip = g_strdup ("localhost"); + eh->recv_port = 0; + eh->caps_str = NULL; + + /* Connection data for each client ID. */ + eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, + _nns_edge_remove_connection); + + *edge_h = eh; + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Initialize the nnstreamer edge handle. + */ +int +nns_edge_start (nns_edge_h edge_h, bool is_server) +{ + GSocketAddress *saddr = NULL; + GError *err = NULL; + int ret = 0; + nns_edge_handle_s *eh; + + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_lock (eh); + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + eh->is_server = is_server; + if (!is_server && 0 == eh->recv_port) + eh->recv_port = _get_available_port (); + + /** Initialize server src data. */ + eh->cancellable = g_cancellable_new (); + eh->listener = g_socket_listener_new (); + g_socket_listener_set_backlog (eh->listener, N_BACKLOG); + + if (!_nns_edge_get_saddr (eh->recv_ip, eh->recv_port, eh->cancellable, + &saddr)) { + nns_edge_loge ("Failed to get socket address"); + ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; + goto error; + } + if (!g_socket_listener_add_address (eh->listener, saddr, + G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP, NULL, NULL, &err)) { + nns_edge_loge ("Failed to add address: %s", err->message); + g_clear_error (&err); + ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; + goto error; + } + g_object_unref (saddr); + saddr = NULL; + + g_socket_listener_accept_socket_async (eh->listener, eh->cancellable, + (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh); + +error: + if (saddr) + g_object_unref (saddr); + + nns_edge_unlock (eh); + return ret; +} + +/** + * @brief Release the given handle. + */ +int +nns_edge_release_handle (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_lock (eh); + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + eh->magic = NNS_EDGE_MAGIC_DEAD; + eh->event_cb = NULL; + eh->user_data = NULL; + g_free (eh->id); + g_free (eh->topic); + g_free (eh->ip); + g_free (eh->recv_ip); + g_hash_table_destroy (eh->conn_table); + + nns_edge_unlock (eh); + nns_edge_lock_destroy (eh); + g_free (eh); + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Set the event callback. + */ +int +nns_edge_set_event_callback (nns_edge_h edge_h, nns_edge_event_cb cb, + void *user_data) +{ + nns_edge_handle_s *eh; + int ret; + + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_lock (eh); + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CALLBACK_RELEASED, + NULL, 0, NULL); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to set new event callback."); + nns_edge_unlock (eh); + return ret; + } + + eh->event_cb = cb; + eh->user_data = user_data; + + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Connect to the destination node. + */ +int +nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol, + const char *ip, int port) +{ + nns_edge_handle_s *eh; + int ret; + + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!ip || *ip == '\0') { + nns_edge_loge ("Invalid param, given IP is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_lock (eh); + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!eh->event_cb) { + nns_edge_loge ("NNStreamer-edge event callback is not registered."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_CONNECTION_FAILURE; + } + + eh->is_server = false; + eh->protocol = protocol; + + /** Connect to info channel. */ + ret = _nns_edge_connect_to (eh, ip, port); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to connect to %s:%d", ip, port); + } + + nns_edge_unlock (eh); + return ret; +} + +/** + * @brief Disconnect from the destination node. + */ +int +nns_edge_disconnect (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_lock (eh); + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + g_hash_table_remove_all (eh->conn_table); + + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Publish a message to a given topic. + */ +int +nns_edge_publish (nns_edge_h edge_h, nns_edge_data_h data_h) +{ + nns_edge_handle_s *eh; + + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (nns_edge_data_is_valid (data_h) != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Invalid param, given edge data is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_lock (eh); + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + /** @todo update code (publish data) */ + + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Request result to the server. + */ +int +nns_edge_request (nns_edge_h edge_h, nns_edge_data_h data_h, void *user_data) +{ + nns_edge_handle_s *eh; + nns_edge_conn_data_s *conn_data; + nns_edge_cmd_s cmd; + int ret; + unsigned int i; + + UNUSED (user_data); + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (nns_edge_data_is_valid (data_h) != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Invalid param, given edge data is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_lock (eh); + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + conn_data = _nns_edge_get_connection (eh, eh->client_id); + if (!_nns_edge_check_connection (conn_data->sink_conn)) { + nns_edge_loge ("Failed to request, connection failure."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_CONNECTION_FAILURE; + } + + _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_TRANSFER_DATA, eh->client_id); + + nns_edge_data_get_count (data_h, &cmd.info.num); + for (i = 0; i < cmd.info.num; i++) { + nns_edge_data_get (data_h, i, &cmd.mem[i], &cmd.info.mem_size[i]); + } + + ret = _nns_edge_cmd_send (conn_data->sink_conn, &cmd); + + nns_edge_unlock (eh); + return ret; +} + +/** + * @brief Subscribe a message to a given topic. + */ +int +nns_edge_subscribe (nns_edge_h edge_h, nns_edge_data_h data_h, void *user_data) +{ + nns_edge_handle_s *eh; + + UNUSED (user_data); + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (nns_edge_data_is_valid (data_h) != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Invalid param, given edge data is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_lock (eh); + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + /** @todo update code (subscribe) */ + + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Unsubscribe a message to a given topic. + */ +int +nns_edge_unsubscribe (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_lock (eh); + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + /** @todo update code (unsubscribe) */ + + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Get the topic of edge handle. Caller should release returned string using free(). + * @todo is this necessary? + */ +int +nns_edge_get_topic (nns_edge_h edge_h, char **topic) +{ + nns_edge_handle_s *eh; + + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!topic) { + nns_edge_loge ("Invalid param, topic should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_lock (eh); + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + *topic = g_strdup (eh->topic); + + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Set nnstreamer edge info. + */ +int +nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value) +{ + nns_edge_handle_s *eh; + char *ret_str = NULL; + + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_lock (eh); + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + /** + * @todo User handles (replace or append) the capability of edge handle. + * @todo Change key-value set as json or hash table. + */ + if (0 == g_ascii_strcasecmp (key, "CAPS")) { + ret_str = g_strdup_printf ("%s%s", _STR_NULL (eh->caps_str), value); + g_free (eh->caps_str); + eh->caps_str = ret_str; + } else if (0 == g_ascii_strcasecmp (key, "IP")) { + g_free (eh->recv_ip); + eh->recv_ip = g_strdup (value); + } else if (0 == g_ascii_strcasecmp (key, "PORT")) { + eh->recv_port = g_ascii_strtoll (value, NULL, 10); + } else if (0 == g_ascii_strcasecmp (key, "TOPIC")) { + g_free (eh->topic); + eh->topic = g_strdup (value); + } else { + nns_edge_logw ("Failed to set edge info. Unknown key: %s", key); + } + + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Respond to a request. + */ +int +nns_edge_respond (nns_edge_h edge_h, nns_edge_data_h data_h) +{ + nns_edge_handle_s *eh; + nns_edge_conn_data_s *conn_data; + nns_edge_cmd_s cmd; + int64_t client_id; + char *val; + int ret; + unsigned int i; + + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (nns_edge_data_is_valid (data_h) != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Invalid param, given edge data is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_lock (eh); + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ret = nns_edge_data_get_info (data_h, "client_id", &val); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Cannot find client ID in edge data."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + client_id = g_ascii_strtoll (val, NULL, 10); + g_free (val); + + conn_data = _nns_edge_get_connection (eh, client_id); + if (!conn_data) { + nns_edge_loge ("Cannot find connection, invalid client ID."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_TRANSFER_DATA, client_id); + + nns_edge_data_get_count (data_h, &cmd.info.num); + for (i = 0; i < cmd.info.num; i++) { + nns_edge_data_get (data_h, i, &cmd.mem[i], &cmd.info.mem_size[i]); + } + + ret = _nns_edge_cmd_send (conn_data->sink_conn, &cmd); + + nns_edge_unlock (eh); + return ret; +} diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.h b/src/libnnstreamer-edge/nnstreamer-edge-internal.h new file mode 100644 index 0000000..8e3a928 --- /dev/null +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.h @@ -0,0 +1,99 @@ +/* SPDX-License-Identifier: Apache-2.0 */ +/** + * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. + * + * @file nnstreamer-edge-internal.h + * @date 11 May 2022 + * @brief Internal functions to support communication among devices. + * @see https://github.com/nnstreamer/nnstreamer + * @author Gichan Jang + * @bug No known bugs except for NYI items + * @note This file is internal header for nnstreamer edge. DO NOT export this file. + */ + +#ifndef __NNSTREAMER_EDGE_INTERNAL_H__ +#define __NNSTREAMER_EDGE_INTERNAL_H__ + +#ifdef __cplusplus +extern "C" { +#endif /* __cplusplus */ + +#include "nnstreamer-edge.h" +#include +#include +#include + +/** + * @brief Data structure for edge handle. + */ +typedef struct { + unsigned int magic; + pthread_mutex_t lock; + char *id; + char *topic; + nns_edge_protocol_e protocol; + char *ip; + int port; + + /* Edge event callback and user data */ + nns_edge_event_cb event_cb; + void *user_data; + + bool is_server; + int64_t client_id; + char *caps_str; + char *recv_ip; + int recv_port; + GHashTable *conn_table; + + GSocketListener *listener; + GCancellable *cancellable; + + /* MQTT */ + void *mqtt_handle; +} nns_edge_handle_s; + +#if defined(ENABLE_MQTT) +/** + * @brief Connect to MQTT. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int nns_edge_mqtt_connect (nns_edge_h edge_h); + +/** + * @brief Close the connection to MQTT. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int nns_edge_mqtt_close (nns_edge_h edge_h); + +/** + * @brief Publish raw data. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length); + +/** + * @brief Subscribe a topic. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int nns_edge_mqtt_subscribe (nns_edge_h edge_h); +#else +/** + * @todo consider to change code style later. + * If MQTT is disabled, nnstreamer does not include nnstreamer_edge_mqtt.c, and changing code style will make error as it is not used function now. + * + * static int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) + * { + * return NNS_EDGE_ERROR_NOT_SUPPORTED; + * } + */ +#define nns_edge_mqtt_connect(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#define nns_edge_mqtt_close(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#define nns_edge_mqtt_publish(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#define nns_edge_mqtt_subscribe(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#endif + +#ifdef __cplusplus +} +#endif /* __cplusplus */ +#endif /* __NNSTREAMER_EDGE_INTERNAL_H__ */ diff --git a/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c new file mode 100644 index 0000000..3443672 --- /dev/null +++ b/src/libnnstreamer-edge/nnstreamer-edge-mqtt.c @@ -0,0 +1,340 @@ +/* SPDX-License-Identifier: Apache-2.0 */ +/** + * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. + * + * @file nnstreamer-edge-mqtt.c + * @date 11 May 2022 + * @brief Internal functions to support MQTT protocol (Paho Asynchronous MQTT C Client Library). + * @see https://github.com/nnstreamer/nnstreamer + * @author Sangjung Woo + * @bug No known bugs except for NYI items + */ + +#if !defined(ENABLE_MQTT) +#error "This file can be built with Paho MQTT library." +#endif + +#include +#include +#include "nnstreamer-edge-common.h" +#include "nnstreamer-edge-internal.h" + +/** + * @brief Callback function to be called when the connection is lost. + */ +static void +mqtt_cb_connection_lost (void *context, char *cause) +{ + nns_edge_handle_s *eh; + + eh = (nns_edge_handle_s *) context; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return; + } + + nns_edge_logw ("MQTT connection is lost (ID:%s, Cause:%s).", eh->id, cause); + if (eh->event_cb) { + /** @todo send new event (MQTT disconnected) */ + } +} + +/** + * @brief Callback function to be called when the connection is completed. + */ +static void +mqtt_cb_connection_success (void *context, MQTTAsync_successData * response) +{ + nns_edge_handle_s *eh; + + UNUSED (response); + eh = (nns_edge_handle_s *) context; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return; + } + + nns_edge_logi ("MQTT connection is completed (ID:%s).", eh->id); + if (eh->event_cb) { + /** @todo send new event (MQTT connected) */ + } +} + +/** + * @brief Callback function to be called when the connection is failed. + */ +static void +mqtt_cb_connection_failure (void *context, MQTTAsync_failureData * response) +{ + nns_edge_handle_s *eh; + + UNUSED (response); + eh = (nns_edge_handle_s *) context; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return; + } + + nns_edge_logw ("MQTT connection is failed (ID:%s).", eh->id); + if (eh->event_cb) { + /** @todo send new event (MQTT connection failure) */ + } +} + +/** + * @brief Callback function to be called when the disconnection is completed. + */ +static void +mqtt_cb_disconnection_success (void *context, MQTTAsync_successData * response) +{ + nns_edge_handle_s *eh; + + UNUSED (response); + eh = (nns_edge_handle_s *) context; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return; + } + + nns_edge_logi ("MQTT disconnection is completed (ID:%s).", eh->id); + if (eh->event_cb) { + /** @todo send new event (MQTT disconnected) */ + } +} + +/** + * @brief Callback function to be called when the disconnection is failed. + */ +static void +mqtt_cb_disconnection_failure (void *context, MQTTAsync_failureData * response) +{ + nns_edge_handle_s *eh; + + UNUSED (response); + eh = (nns_edge_handle_s *) context; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return; + } + + nns_edge_logw ("MQTT disconnection is failed (ID:%s).", eh->id); + if (eh->event_cb) { + /** @todo send new event (MQTT disconnection failure) */ + } +} + +/** + * @brief Callback function to be called when a message is arrived. + * @return Return TRUE to prevent delivering the message again. + */ +static int +mqtt_cb_message_arrived (void *context, char *topic, int topic_len, + MQTTAsync_message * message) +{ + nns_edge_handle_s *eh; + + UNUSED (topic); + UNUSED (topic_len); + UNUSED (message); + eh = (nns_edge_handle_s *) context; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return TRUE; + } + + nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).", + eh->id, eh->topic); + if (eh->event_cb) { + /** @todo send new event (message arrived) */ + } + + return TRUE; +} + +/** + * @brief Connect to MQTT. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int +nns_edge_mqtt_connect (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + MQTTAsync handle; + MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer; + char *url; + char *client_id; + int ret; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", + eh->id, eh->ip, eh->port); + + url = g_strdup_printf ("%s:%d", eh->ip, eh->port); + client_id = g_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ()); + + ret = MQTTAsync_create (&handle, url, client_id, + MQTTCLIENT_PERSISTENCE_NONE, NULL); + if (MQTTASYNC_SUCCESS != ret) { + nns_edge_loge ("Failed to create MQTT handle."); + ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; + goto error; + } + + MQTTAsync_setCallbacks (handle, edge_h, + mqtt_cb_connection_lost, mqtt_cb_message_arrived, NULL); + + options.cleansession = 1; + options.keepAliveInterval = 6; + options.onSuccess = mqtt_cb_connection_success; + options.onFailure = mqtt_cb_connection_failure; + options.context = edge_h; + + if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) { + nns_edge_loge ("Failed to connect MQTT."); + MQTTAsync_destroy (&handle); + ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; + goto error; + } + + eh->mqtt_handle = handle; + ret = NNS_EDGE_ERROR_NONE; + +error: + g_free (url); + g_free (client_id); + return ret; +} + +/** + * @brief Close the connection to MQTT. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int +nns_edge_mqtt_close (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + MQTTAsync handle; + MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + handle = eh->mqtt_handle; + + if (!handle) { + nns_edge_loge ("Invalid state, MQTT connection was not completed."); + return NNS_EDGE_ERROR_IO; + } + + nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).", + eh->id, eh->ip, eh->port); + + options.onSuccess = mqtt_cb_disconnection_success; + options.onFailure = mqtt_cb_disconnection_failure; + options.context = edge_h; + + while (MQTTAsync_isConnected (handle)) { + if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) { + nns_edge_loge ("Failed to disconnect MQTT."); + return NNS_EDGE_ERROR_IO; + } + } + + MQTTAsync_destroy (&handle); + eh->mqtt_handle = NULL; + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Publish raw data. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int +nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) +{ + nns_edge_handle_s *eh; + MQTTAsync handle; + int ret; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!data || length <= 0) { + nns_edge_loge ("Invalid param, given data is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + handle = eh->mqtt_handle; + + if (!handle || !MQTTAsync_isConnected (handle)) { + nns_edge_loge ("Invalid state, MQTT connection was not completed."); + return NNS_EDGE_ERROR_IO; + } + + /* Publish a message (default QoS 1 - at least once and retained true). */ + ret = MQTTAsync_send (handle, eh->topic, length, data, 1, 1, NULL); + if (ret != MQTTASYNC_SUCCESS) { + nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).", + eh->id, eh->topic); + return NNS_EDGE_ERROR_IO; + } + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Subscribe a topic. + * @note This is internal function for MQTT broker. You should call this with edge-handle lock. + */ +int +nns_edge_mqtt_subscribe (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + MQTTAsync handle; + int ret; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + handle = eh->mqtt_handle; + + if (!handle || !MQTTAsync_isConnected (handle)) { + nns_edge_loge ("Invalid state, MQTT connection was not completed."); + return NNS_EDGE_ERROR_IO; + } + + /* Subscribe a topic (default QoS 1 - at least once). */ + ret = MQTTAsync_subscribe (handle, eh->topic, 1, NULL); + if (ret != MQTTASYNC_SUCCESS) { + nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).", + eh->id, eh->topic); + return NNS_EDGE_ERROR_IO; + } + + return NNS_EDGE_ERROR_NONE; +}