From 061c7ad902c5b4d88bd3dcbf29abdd6f87336d36 Mon Sep 17 00:00:00 2001 From: Jaeyun Date: Wed, 11 May 2022 16:29:20 +0900 Subject: [PATCH] [Edge] add mqtt functions Add internal functions to handle mqtt message. This is initial commit for mqtt-hybrid feature after separating the edge library. Signed-off-by: Jaeyun --- gst/nnstreamer/tensor_query/meson.build | 4 + gst/nnstreamer/tensor_query/nnstreamer_edge.h | 5 +- .../tensor_query/nnstreamer_edge_internal.c | 16 +- .../tensor_query/nnstreamer_edge_internal.h | 80 +++++ gst/nnstreamer/tensor_query/nnstreamer_edge_mqtt.c | 334 +++++++++++++++++++++ 5 files changed, 422 insertions(+), 17 deletions(-) create mode 100644 gst/nnstreamer/tensor_query/nnstreamer_edge_internal.h create mode 100644 gst/nnstreamer/tensor_query/nnstreamer_edge_mqtt.c diff --git a/gst/nnstreamer/tensor_query/meson.build b/gst/nnstreamer/tensor_query/meson.build index edd9be1..128555a 100644 --- a/gst/nnstreamer/tensor_query/meson.build +++ b/gst/nnstreamer/tensor_query/meson.build @@ -31,6 +31,10 @@ if aitt_support_is_available nnstreamer_edge_deps += aitt_support_deps else nnstreamer_edge_sources += join_paths(meson.current_source_dir(), 'nnstreamer_edge_internal.c') + if mqtt_support_is_available + nnstreamer_edge_sources += join_paths(meson.current_source_dir(), 'nnstreamer_edge_mqtt.c') + nnstreamer_edge_deps += [pahomqttc_dep, thread_dep] + endif endif nns_edge_lib = shared_library('nnstreamer-edge', diff --git a/gst/nnstreamer/tensor_query/nnstreamer_edge.h b/gst/nnstreamer/tensor_query/nnstreamer_edge.h index eb7d599..1e85235 100644 --- a/gst/nnstreamer/tensor_query/nnstreamer_edge.h +++ b/gst/nnstreamer/tensor_query/nnstreamer_edge.h @@ -41,7 +41,7 @@ typedef void *nns_edge_data_h; /** * @brief Enumeration for the error codes of nnstreamer-edge. - * @todo define detailed error code later (linux standard error code) + * @todo define detailed error code later (linux standard error, sync with tizen error code) */ typedef enum { NNS_EDGE_ERROR_NONE = 0, @@ -49,7 +49,8 @@ typedef enum { NNS_EDGE_ERROR_OUT_OF_MEMORY = -ENOMEM, NNS_EDGE_ERROR_IO = -EIO, NNS_EDGE_ERROR_CONNECTION_FAILURE = -ECONNREFUSED, - NNS_EDGE_ERROR_UNKNOWN = -(INT_MIN / 2), + NNS_EDGE_ERROR_UNKNOWN = (-1073741824LL), + NNS_EDGE_ERROR_NOT_SUPPORTED = (NNS_EDGE_ERROR_UNKNOWN + 2), } nns_edge_error_e; typedef enum { diff --git a/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c b/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c index f9108a5..1015c0d 100644 --- a/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c +++ b/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.c @@ -11,21 +11,7 @@ */ #include "nnstreamer_edge_common.h" - -/** - * @brief Data structure for edge handle. - */ -typedef struct -{ - unsigned int magic; - char *id; - char *topic; - nns_edge_protocol_e protocol; - char *ip; - int port; - nns_edge_event_cb event_cb; - void *user_data; -} nns_edge_handle_s; +#include "nnstreamer_edge_internal.h" /** * @brief Check network connection. diff --git a/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.h b/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.h new file mode 100644 index 0000000..da6b575 --- /dev/null +++ b/gst/nnstreamer/tensor_query/nnstreamer_edge_internal.h @@ -0,0 +1,80 @@ +/* SPDX-License-Identifier: LGPL-2.1-only */ +/** + * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. + * + * @file nnstreamer_edge_internal.h + * @date 11 May 2022 + * @brief Internal functions to support communication among devices. + * @see https://github.com/nnstreamer/nnstreamer + * @author Gichan Jang + * @bug No known bugs except for NYI items + * @note This file is internal header for nnstreamer edge. DO NOT export this file. + */ + +#ifndef __NNSTREAMER_EDGE_INTERNAL_H__ +#define __NNSTREAMER_EDGE_INTERNAL_H__ + +#ifdef __cplusplus +extern "C" { +#endif /* __cplusplus */ + +#include "nnstreamer_edge.h" + +/** + * @brief Data structure for edge handle. + * @todo Implement mutex lock. + */ +typedef struct { + unsigned int magic; + char *id; + char *topic; + nns_edge_protocol_e protocol; + char *ip; + int port; + nns_edge_event_cb event_cb; + void *user_data; + + /* MQTT */ + void *mqtt_handle; +} nns_edge_handle_s; + +#if defined(ENABLE_MQTT) +/** + * @brief Connect to MQTT. + */ +int nns_edge_mqtt_connect (nns_edge_h edge_h); + +/** + * @brief Close the connection to MQTT. + */ +int nns_edge_mqtt_close (nns_edge_h edge_h); + +/** + * @brief Publish raw data. + */ +int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length); + +/** + * @brief Subscribe a topic. + */ +int nns_edge_mqtt_subscribe (nns_edge_h edge_h); +#else +/** + * @todo consider to change code style later. + * If MQTT is disabled, nnstreamer does not include nnstreamer_edge_mqtt.c, and changing code style will make error as it is not used function now. + * + * static int nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) + * { + * return NNS_EDGE_ERROR_NOT_SUPPORTED; + * } + */ +#define nns_edge_mqtt_connect(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#define nns_edge_mqtt_close(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#define nns_edge_mqtt_publish(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#define nns_edge_mqtt_subscribe(...) (NNS_EDGE_ERROR_NOT_SUPPORTED) +#endif + +#ifdef __cplusplus +} +#endif /* __cplusplus */ +#endif /* __NNSTREAMER_EDGE_INTERNAL_H__ */ diff --git a/gst/nnstreamer/tensor_query/nnstreamer_edge_mqtt.c b/gst/nnstreamer/tensor_query/nnstreamer_edge_mqtt.c new file mode 100644 index 0000000..7c720ea --- /dev/null +++ b/gst/nnstreamer/tensor_query/nnstreamer_edge_mqtt.c @@ -0,0 +1,334 @@ +/* SPDX-License-Identifier: LGPL-2.1-only */ +/** + * Copyright (C) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. + * + * @file nnstreamer_edge_mqtt.c + * @date 11 May 2022 + * @brief Internal functions to support MQTT protocol (Paho Asynchronous MQTT C Client Library). + * @see https://github.com/nnstreamer/nnstreamer + * @author Sangjung Woo + * @bug No known bugs except for NYI items + */ + +#if !defined(ENABLE_MQTT) +#error "This file can be built with Paho MQTT library." +#endif + +#include +#include +#include "nnstreamer_edge_common.h" +#include "nnstreamer_edge_internal.h" + +/** + * @brief Callback function to be called when the connection is lost. + */ +static void +mqtt_cb_connection_lost (void *context, char *cause) +{ + nns_edge_handle_s *eh; + + eh = (nns_edge_handle_s *) context; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return; + } + + nns_edge_logw ("MQTT connection is lost (ID:%s, Cause:%s).", eh->id, cause); + if (eh->event_cb) { + /** @todo send new event (MQTT disconnected) */ + } +} + +/** + * @brief Callback function to be called when the connection is completed. + */ +static void +mqtt_cb_connection_success (void *context, MQTTAsync_successData * response) +{ + nns_edge_handle_s *eh; + + UNUSED (response); + eh = (nns_edge_handle_s *) context; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return; + } + + nns_edge_logi ("MQTT connection is completed (ID:%s).", eh->id); + if (eh->event_cb) { + /** @todo send new event (MQTT connected) */ + } +} + +/** + * @brief Callback function to be called when the connection is failed. + */ +static void +mqtt_cb_connection_failure (void *context, MQTTAsync_failureData * response) +{ + nns_edge_handle_s *eh; + + UNUSED (response); + eh = (nns_edge_handle_s *) context; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return; + } + + nns_edge_logw ("MQTT connection is failed (ID:%s).", eh->id); + if (eh->event_cb) { + /** @todo send new event (MQTT connection failure) */ + } +} + +/** + * @brief Callback function to be called when the disconnection is completed. + */ +static void +mqtt_cb_disconnection_success (void *context, MQTTAsync_successData * response) +{ + nns_edge_handle_s *eh; + + UNUSED (response); + eh = (nns_edge_handle_s *) context; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return; + } + + nns_edge_logi ("MQTT disconnection is completed (ID:%s).", eh->id); + if (eh->event_cb) { + /** @todo send new event (MQTT disconnected) */ + } +} + +/** + * @brief Callback function to be called when the disconnection is failed. + */ +static void +mqtt_cb_disconnection_failure (void *context, MQTTAsync_failureData * response) +{ + nns_edge_handle_s *eh; + + UNUSED (response); + eh = (nns_edge_handle_s *) context; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return; + } + + nns_edge_logw ("MQTT disconnection is failed (ID:%s).", eh->id); + if (eh->event_cb) { + /** @todo send new event (MQTT disconnection failure) */ + } +} + +/** + * @brief Callback function to be called when a message is arrived. + */ +static int +mqtt_cb_message_arrived (void *context, char *topic, int topic_len, + MQTTAsync_message * message) +{ + nns_edge_handle_s *eh; + + UNUSED (topic); + UNUSED (topic_len); + UNUSED (message); + eh = (nns_edge_handle_s *) context; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_logd ("MQTT message is arrived (ID:%s, Topic:%s).", + eh->id, eh->topic); + if (eh->event_cb) { + /** @todo send new event (message arrived) */ + } + + return TRUE; +} + +/** + * @brief Connect to MQTT. + */ +int +nns_edge_mqtt_connect (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + MQTTAsync handle; + MQTTAsync_connectOptions options = MQTTAsync_connectOptions_initializer; + char *url; + char *client_id; + int ret; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + nns_edge_logi ("Trying to connect MQTT (ID:%s, URL:%s:%d).", + eh->id, eh->ip, eh->port); + + url = g_strdup_printf ("%s:%d", eh->ip, eh->port); + client_id = g_strdup_printf ("nns_edge_%s_%u", eh->id, getpid ()); + + ret = MQTTAsync_create (&handle, url, client_id, + MQTTCLIENT_PERSISTENCE_NONE, NULL); + if (MQTTASYNC_SUCCESS != ret) { + nns_edge_loge ("Failed to create MQTT handle."); + ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; + goto error; + } + + MQTTAsync_setCallbacks (handle, edge_h, + mqtt_cb_connection_lost, mqtt_cb_message_arrived, NULL); + + options.cleansession = 1; + options.keepAliveInterval = 6; + options.onSuccess = mqtt_cb_connection_success; + options.onFailure = mqtt_cb_connection_failure; + options.context = edge_h; + + if (MQTTAsync_connect (handle, &options) != MQTTASYNC_SUCCESS) { + nns_edge_loge ("Failed to connect MQTT."); + ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; + goto error; + } + + eh->mqtt_handle = handle; + ret = NNS_EDGE_ERROR_NONE; + +error: + g_free (url); + g_free (client_id); + return ret; +} + +/** + * @brief Close the connection to MQTT. + */ +int +nns_edge_mqtt_close (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + MQTTAsync handle; + MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + handle = eh->mqtt_handle; + + if (!handle) { + nns_edge_loge ("Invalid state, MQTT connection was not completed."); + return NNS_EDGE_ERROR_IO; + } + + nns_edge_logi ("Trying to disconnect MQTT (ID:%s, URL:%s:%d).", + eh->id, eh->ip, eh->port); + + options.onSuccess = mqtt_cb_disconnection_success; + options.onFailure = mqtt_cb_disconnection_failure; + options.context = edge_h; + + while (MQTTAsync_isConnected (handle)) { + if (MQTTAsync_disconnect (handle, &options) != MQTTASYNC_SUCCESS) { + nns_edge_loge ("Failed to disconnect MQTT."); + return NNS_EDGE_ERROR_IO; + } + } + + MQTTAsync_destroy (&handle); + eh->mqtt_handle = NULL; + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Publish raw data. + */ +int +nns_edge_mqtt_publish (nns_edge_h edge_h, const void *data, const int length) +{ + nns_edge_handle_s *eh; + MQTTAsync handle; + int ret; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + if (!data || length <= 0) { + nns_edge_loge ("Invalid param, given data is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + handle = eh->mqtt_handle; + + if (!handle || MQTTAsync_isConnected (handle)) { + nns_edge_loge ("Invalid state, MQTT connection was not completed."); + return NNS_EDGE_ERROR_IO; + } + + /* Publish a message (default QoS 1 - at least once and retained true). */ + ret = MQTTAsync_send (handle, eh->topic, length, data, 1, 1, NULL); + if (ret != MQTTASYNC_SUCCESS) { + nns_edge_loge ("Failed to publish a message (ID:%s, Topic:%s).", + eh->id, eh->topic); + return NNS_EDGE_ERROR_IO; + } + + return NNS_EDGE_ERROR_NONE; +} + +/** + * @brief Subscribe a topic. + */ +int +nns_edge_mqtt_subscribe (nns_edge_h edge_h) +{ + nns_edge_handle_s *eh; + MQTTAsync handle; + int ret; + + eh = (nns_edge_handle_s *) edge_h; + + if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { + nns_edge_loge ("Invalid param, given edge handle is invalid."); + return NNS_EDGE_ERROR_INVALID_PARAMETER; + } + + handle = eh->mqtt_handle; + + if (!handle || MQTTAsync_isConnected (handle)) { + nns_edge_loge ("Invalid state, MQTT connection was not completed."); + return NNS_EDGE_ERROR_IO; + } + + /* Subscribe a topic (default QoS 1 - at least once). */ + ret = MQTTAsync_subscribe (handle, eh->topic, 1, NULL); + if (ret != MQTTASYNC_SUCCESS) { + nns_edge_loge ("Failed to subscribe a topic (ID:%s, Topic:%s).", + eh->id, eh->topic); + return NNS_EDGE_ERROR_IO; + } + + return NNS_EDGE_ERROR_NONE; +} -- 2.7.4