From 13ebfb965aed36feab5ff7a846fa888482daed79 Mon Sep 17 00:00:00 2001 From: gichan2-jang Date: Fri, 16 Aug 2024 10:08:10 +0900 Subject: [PATCH] [Custom] Implement custom connection Implement custom connection @todo: Add unit test case Signed-off-by: gichan2-jang --- include/nnstreamer-edge-custom.h | 2 +- include/nnstreamer-edge.h | 18 +- .../nnstreamer-edge-internal.c | 232 ++++++++++++++++-- 3 files changed, 224 insertions(+), 28 deletions(-) diff --git a/include/nnstreamer-edge-custom.h b/include/nnstreamer-edge-custom.h index 09d6c2f..5c9959f 100644 --- a/include/nnstreamer-edge-custom.h +++ b/include/nnstreamer-edge-custom.h @@ -32,7 +32,7 @@ typedef struct _NnsEdgeCustomDef int (*nns_edge_custom_set_event_cb) (void *priv, nns_edge_event_cb cb, void *user_data); int (*nns_edge_custom_send_data) (void *priv, nns_edge_data_h data_h); int (*nns_edge_custom_set_option) (void *priv, const char *key, const char *value); - const char *(*nns_edge_custom_get_option) (void *priv, const char *key); + char *(*nns_edge_custom_get_option) (void *priv, const char *key); } NnsEdgeCustomDef; void* nns_edge_custom_get_handle (); diff --git a/include/nnstreamer-edge.h b/include/nnstreamer-edge.h index d546df4..605c310 100644 --- a/include/nnstreamer-edge.h +++ b/include/nnstreamer-edge.h @@ -61,7 +61,6 @@ typedef enum { NNS_EDGE_CONNECT_TYPE_MQTT, NNS_EDGE_CONNECT_TYPE_HYBRID, NNS_EDGE_CONNECT_TYPE_AITT, - NNS_EDGE_CONNECT_TYPE_CUSTOM, NNS_EDGE_CONNECT_TYPE_UNKNOWN @@ -179,6 +178,11 @@ typedef void (*nns_edge_data_destroy_cb) (void *data); */ int nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type, nns_edge_node_type_e node_type, nns_edge_h *edge_h); +/** + * @brief Create edge custom handle. + */ +int nns_edge_custom_create_handle (const char *id, const char *lib_path, nns_edge_node_type_e node_type, nns_edge_h *edge_h); + /** * @brief Start the nnstreamer edge. After the start, the edge can accept a new connection or request a connection. * @param[in] edge_h The edge handle. @@ -191,6 +195,18 @@ int nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type */ int nns_edge_start (nns_edge_h edge_h); +/** + * @brief Stop the nnstreamer edges. + * @param[in] edge_h The edge handle. + * @return 0 on success. Otherwise a negative error value. + * @retval #NNS_EDGE_ERROR_NONE Successful. + * @retval #NNS_EDGE_ERROR_NOT_SUPPORTED Not supported. + * @retval #NNS_EDGE_ERROR_OUT_OF_MEMORY Failed to allocate required memory. + * @retval #NNS_EDGE_ERROR_INVALID_PARAMETER Given parameter is invalid. + * @retval #NNS_EDGE_ERROR_CONNECTION_FAILURE Failed to get socket address. + */ +int nns_edge_stop (nns_edge_h edge_h); + /** * @brief Release the given edge handle. All the connections are disconnected. * @param[in] edge_h The edge handle. diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c index bf79851..f9fdfb0 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.c @@ -13,6 +13,7 @@ #include #include #include +#include #include "nnstreamer-edge-data.h" #include "nnstreamer-edge-event.h" @@ -73,6 +74,7 @@ typedef struct /* MQTT or AITT handle */ void *broker_h; + void *custom_h; } nns_edge_handle_s; /** @@ -920,6 +922,14 @@ _nns_edge_send_thread (void *thread_data) if (NNS_EDGE_ERROR_NONE != ret) nns_edge_loge ("Failed to send data via MQTT connection."); break; + case NNS_EDGE_CONNECT_TYPE_CUSTOM: + { + NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h; + ret = custom_h->nns_edge_custom_send_data (eh->broker_h, data_h); + if (NNS_EDGE_ERROR_NONE != ret) + nns_edge_loge ("Failed to send data via MCF connection."); + break; + } default: break; } @@ -1255,34 +1265,15 @@ error: } /** - * @brief Create edge handle. + * @brief Internal function to create edge handle. */ -int -nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type, - nns_edge_node_type_e node_type, nns_edge_h * edge_h) +static int +_nns_edge_create_handle (const char *id, nns_edge_node_type_e node_type, + nns_edge_h * edge_h) { int ret = NNS_EDGE_ERROR_NONE; nns_edge_handle_s *eh; - if (connect_type < 0 || connect_type >= NNS_EDGE_CONNECT_TYPE_UNKNOWN) { - nns_edge_loge ("Invalid param, set valid connect type."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - /** - * @todo handle flag (receive | send) - * e.g., send only case: listener is unnecessary. - */ - if (node_type < 0 || node_type >= NNS_EDGE_NODE_TYPE_UNKNOWN) { - nns_edge_loge ("Invalid param, set exact node type."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - if (!edge_h) { - nns_edge_loge ("Invalid param, edge_h should not be null."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - eh = (nns_edge_handle_s *) calloc (1, sizeof (nns_edge_handle_s)); if (!eh) { nns_edge_loge ("Failed to allocate memory for edge handle."); @@ -1294,7 +1285,6 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type, nns_edge_handle_set_magic (eh, NNS_EDGE_MAGIC); eh->id = STR_IS_VALID (id) ? nns_edge_strdup (id) : nns_edge_strdup_printf ("%lld", (long long) nns_edge_generate_id ()); - eh->connect_type = connect_type; eh->host = nns_edge_strdup ("localhost"); eh->port = 0; eh->dest_host = nns_edge_strdup ("localhost"); @@ -1307,6 +1297,7 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type, eh->sending = false; eh->listener_fd = -1; eh->caps_str = nns_edge_strdup (""); + eh->custom_h = NULL; ret = nns_edge_metadata_create (&eh->metadata); if (ret != NNS_EDGE_ERROR_NONE) { @@ -1320,6 +1311,111 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type, goto error; } + *edge_h = eh; + return NNS_EDGE_ERROR_NONE; + +error: + nns_edge_release_handle (eh); + return ret; +} + +/** + * @brief Create edge custom handle. + */ +int +nns_edge_custom_create_handle (const char *id, const char *lib_path, + nns_edge_node_type_e node_type, nns_edge_h * edge_h) +{ + int ret = NNS_EDGE_ERROR_NONE; + nns_edge_handle_s *eh; + void *custom_handle; + NnsEdgeCustomDef *custom_h; + + if (node_type < 0 || node_type >= NNS_EDGE_NODE_TYPE_UNKNOWN) { + nns_edge_loge ("Invalid param, set exact node type."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!STR_IS_VALID (lib_path)) { + nns_edge_loge ("Invalid param, given custom lib path is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!edge_h) { + nns_edge_loge ("Invalid param, edge_h should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ret = _nns_edge_create_handle (id, node_type, edge_h); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to create edge handle."); + return ret; + } + eh = (nns_edge_handle_s *) * edge_h; + eh->connect_type = NNS_EDGE_CONNECT_TYPE_CUSTOM; + + custom_handle = dlopen (lib_path, RTLD_LAZY); + if (custom_handle) { + void *(*getCustomHandle) () = + (void *(*)()) dlsym (custom_handle, "nns_edge_custom_get_handle"); + if (!getCustomHandle) { + nns_edge_loge ("Failed to find nns_edge_custom_get_handle: %s", + dlerror ()); + return NNS_EDGE_ERROR_UNKNOWN; + } + eh->custom_h = getCustomHandle (); + } else { + nns_edge_loge ("Failed to open custom handle: %s]\n", dlerror ()); + return NNS_EDGE_ERROR_INVALID_PARAMETER;; + } + + custom_h = (NnsEdgeCustomDef *) eh->custom_h; + ret = custom_h->nns_edge_custom_create (&eh->broker_h); + if (NNS_EDGE_ERROR_NONE != ret) { + nns_edge_loge ("Failed to create custom connection handle."); + } + + return ret; +} + +/** + * @brief Create edge handle. + */ +int +nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type, + nns_edge_node_type_e node_type, nns_edge_h * edge_h) +{ + int ret = NNS_EDGE_ERROR_NONE; + nns_edge_handle_s *eh; + + if (connect_type < 0 || connect_type >= NNS_EDGE_CONNECT_TYPE_UNKNOWN) { + nns_edge_loge ("Invalid param, set valid connect type."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + /** + * @todo handle flag (receive | send) + * e.g., send only case: listener is unnecessary. + */ + if (node_type < 0 || node_type >= NNS_EDGE_NODE_TYPE_UNKNOWN) { + nns_edge_loge ("Invalid param, set exact node type."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!edge_h) { + nns_edge_loge ("Invalid param, edge_h should not be null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + ret = _nns_edge_create_handle (id, node_type, edge_h); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to create edge handle."); + return ret; + } + eh = (nns_edge_handle_s *) * edge_h; + + eh->connect_type = connect_type; + if (NNS_EDGE_CONNECT_TYPE_AITT == connect_type) { ret = nns_edge_aitt_create (&eh->broker_h); if (NNS_EDGE_ERROR_NONE != ret) { @@ -1328,7 +1424,6 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type, } } - *edge_h = eh; return NNS_EDGE_ERROR_NONE; error: @@ -1358,6 +1453,22 @@ nns_edge_start (nns_edge_h edge_h) nns_edge_lock (eh); + if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) { + NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h; + ret = custom_h->nns_edge_custom_start (eh->broker_h); + if (NNS_EDGE_ERROR_NONE != ret) { + nns_edge_loge ("Failed to start edge custom connection"); + } + ret = custom_h->nns_edge_custom_set_event_cb (eh->broker_h, eh->event_cb, + eh->user_data); + if (NNS_EDGE_ERROR_NONE != ret) { + nns_edge_loge ("Failed to set event callback to custom connection."); + goto done; + } + ret = _nns_edge_create_send_thread (eh); + goto done; + } + if (eh->port <= 0) { eh->port = nns_edge_get_available_port (); if (eh->port <= 0) { @@ -1445,6 +1556,41 @@ done: return ret; } +/** + * @brief Stop the nnstreamer edge. + */ +int +nns_edge_stop (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + int ret = NNS_EDGE_ERROR_NONE; + + eh = (nns_edge_handle_s *) edge_h; + if (!eh) { + nns_edge_loge ("Invalid param, given edge handle is null."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!nns_edge_handle_is_valid (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_lock (eh); + + if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) { + NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h; + ret = custom_h->nns_edge_custom_stop (eh->broker_h); + if (NNS_EDGE_ERROR_NONE != ret) { + nns_edge_loge ("Failed to stop MCF"); + } + } + + eh->is_started = FALSE; + nns_edge_unlock (eh); + return ret; +} + /** * @brief Release the given handle. */ @@ -1464,6 +1610,9 @@ nns_edge_release_handle (nns_edge_h edge_h) return NNS_EDGE_ERROR_INVALID_PARAMETER; } + if (eh->is_started) + nns_edge_stop (eh); + nns_edge_lock (eh); switch (eh->connect_type) { @@ -1478,6 +1627,16 @@ nns_edge_release_handle (nns_edge_h edge_h) nns_edge_logw ("Failed to close AITT connection."); } break; + case NNS_EDGE_CONNECT_TYPE_CUSTOM: + { + NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h; + int ret = custom_h->nns_edge_custom_close (eh->broker_h); + if (NNS_EDGE_ERROR_NONE != ret) { + nns_edge_loge ("Failed to stop MCF"); + } + dlclose (eh->custom_h); + break; + } default: break; } @@ -1487,7 +1646,6 @@ nns_edge_release_handle (nns_edge_h edge_h) eh->event_cb = NULL; eh->user_data = NULL; eh->broker_h = NULL; - eh->is_started = false; nns_edge_queue_clear (eh->send_queue); if (eh->send_thread) { @@ -1720,6 +1878,13 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port) nns_edge_loge ("Failed to subscribe the topic using AITT: %s", eh->topic); goto done; } + } else if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) { + NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h; + ret = custom_h->nns_edge_custom_connect (eh->broker_h); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to connect to custom connection."); + goto done; + } } else { ret = _nns_edge_connect_to (eh, eh->client_id, dest_host, dest_port); if (ret != NNS_EDGE_ERROR_NONE) { @@ -1786,6 +1951,11 @@ nns_edge_is_connected (nns_edge_h edge_h) nns_edge_mqtt_is_connected (eh->broker_h)) return NNS_EDGE_ERROR_NONE; + if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) { + NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h; + return custom_h->nns_edge_custom_is_connected (eh->broker_h); + } + conn_data = (nns_edge_conn_data_s *) eh->connections; while (conn_data) { conn = conn_data->sink_conn; @@ -1956,6 +2126,11 @@ nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value) ret = nns_edge_metadata_set (eh->metadata, key, value); } + if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) { + NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h; + ret = custom_h->nns_edge_custom_set_option (eh->broker_h, key, value); + } + nns_edge_unlock (eh); return ret; } @@ -2019,6 +2194,11 @@ nns_edge_get_info (nns_edge_h edge_h, const char *key, char **value) ret = nns_edge_metadata_get (eh->metadata, key, value); } + if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) { + NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h; + *value = custom_h->nns_edge_custom_get_option (eh->broker_h, key); + } + nns_edge_unlock (eh); return ret; } -- 2.34.1