edge-sensor pkg is to provide a library for publishing data using mqtt.
Now we implemented new interfaces, so remove previous pkg.
Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
SET(SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src)
SET(INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/include)
SET(NNS_EDGE_SRC_DIR ${SOURCE_DIR}/libnnstreamer-edge)
-SET(SENSOR_SRC_DIR ${SOURCE_DIR}/libsensor)
# Check requires packages
# TODO FIXME remove glib dependency
IF (ENABLE_TEST)
SET(TEST_REQUIRES_LIST gtest)
- INCLUDE(FindPkgConfig)
PKG_CHECK_MODULES(TEST_REQUIRE_PKGS REQUIRED ${TEST_REQUIRES_LIST})
ADD_SUBDIRECTORY(tests)
ENDIF()
CONFIGURE_FILE(nnstreamer-edge.pc.in nnstreamer-edge.pc @ONLY)
INSTALL(FILES ${CMAKE_CURRENT_BINARY_DIR}/nnstreamer-edge.pc
DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig)
-
-CONFIGURE_FILE(nnstreamer-edge-sensor.pc.in nnstreamer-edge-sensor.pc @ONLY)
-INSTALL(FILES ${CMAKE_CURRENT_BINARY_DIR}/nnstreamer-edge-sensor.pc
- DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig)
Depends: nnstreamer-edge
Description: development package for nnstreamer-edge
It is a development package for nnstreamer-edge.
-
-Package: nnstreamer-edge-sensor
-Architecture: any
-Multi-Arch: same
-Depends: ${shlibs:Depends}, ${misc:Depends}
-Description: communication library for edge sensor
- It is a communication library for edge sensor devices.
- This library supports publishing the sensor data to the GStreamer pipeline without GStreamer / Glib dependency.
-
-Package: nnstreamer-edge-sensor-dev
-Architecture: any
-Multi-Arch: same
-Depends: nnstreamer-edge-sensor
-Description: development package for nnstreamer-edge-sensor
- It is a development package for nnstreamer-edge-sensor.
+++ /dev/null
-/usr/include/edge_sensor.h
-/usr/lib/*/pkgconfig/nnstreamer-edge-sensor.pc
-/usr/lib/*/libedge-sensor.so
+++ /dev/null
-/usr/lib/*/libedge-sensor.so.*
+++ /dev/null
-/* SPDX-License-Identifier: Apache-2.0 */
-/**
- * Copyright (C) 2021 Sangjung Woo <sangjung.woo@samsung.com>
- */
-/**
- * @file edge_sensor.h
- * @date 05 July 2021
- * @brief Edge APIs for publishing the any type of data as a MQTT topic
- * @see https://github.com/nnstreamer/nnstreamer-edge
- * @author Sangjung Woo <sangjung.woo@samsung.com>
- * @bug No known bugs except for NYI items
- */
-#ifndef _EDGE_SENSOR_H__
-#define _EDGE_SENSOR_H__
-
-#include <stdint.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif /* __cplusplus */
-
-typedef enum _edge_mqtt_state_t {
- MQTT_CONNECTION_LOST = -3, /**< MQTT connection is lost */
- MQTT_CONNECT_FAILURE = -2, /**< MQTT connection is failed */
- MQTT_INITIALIZING = -1, /**< Not connected yet */
- MQTT_CONNECTED, /**< Connected to server successfully */
- MQTT_REQUEST_STOP, /**< User request to disconnect */
- MQTT_SENDING, /**< MQTT message is sent but ACK is not received */
- MQTT_DELIVERY_COMPLETE, /**< All message is successfully delivered */
- MQTT_SEND_ERROR, /**< Failed to sent the message */
- MQTT_DISCONNECTED, /**< MQTT connection is closed */
- MQTT_DISCONNECT_FAILED, /**< Disconnect is failed */
-} edge_mqtt_state_t;
-
-typedef void *edge_h;
-
-/**
- * @brief Callback function for MQTT state change
- * @param[in,out] user_data User data that provided when calling edge_open_connection()
- * @param[in] state The new state of MQTT
- */
-typedef void (*edge_state_change_cb) (void *user_data, edge_mqtt_state_t state);
-
-/**
- * @brief Open the MQTT connection with specific options
- * @return @c 0 on success. Otherwise a negative error value.
- * @param[in,out] handle MQTT handle to connect
- * @param[in] host_address Host address to connect to. If NULL, then 'tcp://localhost' is used.
- * @param[in] host_port Network port of host to connect to. If NULL, then '1883' is used.
- * @param[in] topic_name Topic name to publish, If NULL, then 'edge_sensor_$PID_$SEQ/topic' is used.
- * @param[in] base_time_stamp Base time stamp in usec. If 0, then current time stamp is set.
- * @param[in] duration Duration time for payload in nanosec. If 0, then GST_CLOCK_TIME_NONE value is set.
- * @param[in] gst_caps_string GStreamer cap string for payload.
- * @param[in] callback State change callback for MQTT. If NULL, callback event is not used.
- * @param[in] user_data User data for callback function. If @callback is NULL, then user_data is ignored.
- */
-int
-edge_open_connection (edge_h *handle,
- char *host_address, char *host_port, char *topic_name,
- int64_t base_time_stamp, uint64_t duration, char *gst_caps_string,
- edge_state_change_cb callback, void *user_data);
-
-
-/**
- * @brief Publish the single message that contains only one record.
- * @return @c 0 on success. Otherwise a negative error value.
- * @param[in] handle MQTT handle to publish message
- * @param[in] buffer Payload to publish
- * @param[in] size The payload size to publish
- */
-int
-edge_publish_single_msg (edge_h handle,
- void *buffer, uint64_t size);
-
-/**
- * @brief Close the MQTT connection and release the allocated memory space.
- * @return @c 0 on success. Otherwise a negative error value.
- * @param[in] handle MQTT handle to close.
- */
-int
-edge_close_connection (edge_h handle);
-
-#ifdef __cplusplus
-}
-#endif /* __cplusplus */
-#endif /* _EDGE_SENSOR_H__ */
+++ /dev/null
-
-prefix=@PREFIX@
-exec_prefix=@EXEC_PREFIX@
-libdir=@LIB_INSTALL_DIR@
-includedir=@INCLUDE_INSTALL_DIR@
-
-Name: nnstreamer-edge-sensor
-Description: NNStreamer edge sensor library
-Version: @VERSION@
-Requires:
-Libs: -L${libdir} -lnnstreamer-edge-sensor
-Cflags: -I${includedir}
BuildRequires: pkgconfig(paho-mqtt-c)
# TODO remove glib
BuildRequires: glib2-devel
+
%if 0%{?unit_test}
BuildRequires: gtest-devel
-%endif
%if 0%{?testcoverage}
BuildRequires: lcov
%endif
+%endif
%description
nnstreamer-edge provides remote source nodes for NNStreamer pipelines without GStreamer dependencies.
%description devel
It is a development package for nnstreamer-edge.
-%package sensor
-Summary: communication library for edge sensor
-%description sensor
-It is a communication library for edge sensor devices.
-This library supports publishing the sensor data to the GStreamer pipeline without GStreamer / Glib dependency.
-
-%package sensor-devel
-Summary: development package for nnstreamer-edge-sensor
-Requires: nnstreamer-edge = %{version}-%{release}
-%description sensor-devel
-It is a development package for nnstreamer-edge-sensor.
-
+%if 0%{?unit_test}
%package unittest
Summary: test program for nnstreamer-edge library
%description unittest
%description unittest-coverage
HTML pages of lcov results of nnstreamer-edge generated during rpm build
%endif
+%endif
# TODO FIXME enable unittest after migration
%if 0%{?unit_test}
%build
+%if 0%{?unit_test}
%if 0%{?testcoverage}
# To test coverage, disable optimizations (and should unset _FORTIFY_SOURCE to use -O0)
CFLAGS=`echo $CFLAGS | sed -e "s|-O[1-9]|-O0|g"`
export FFLAGS+=" -fprofile-arcs -ftest-coverage -g"
export LDFLAGS+=" -lgcov"
%endif
+%endif # unittest
mkdir -p build
pushd build
popd
%if 0%{?unit_test}
-LD_LIBRARY_PATH=./src bash %{test_script} ./tests/unittest_edge_sensor
-%endif
+#LD_LIBRARY_PATH=./src bash %{test_script} ./tests/unittest_edge_sensor
%if 0%{?testcoverage}
# 'lcov' generates the date format with UTC time zone by default. Let's replace UTC with KST.
mkdir -p %{buildroot}%{_datadir}/nnstreamer-edge/unittest/
cp -r result %{buildroot}%{_datadir}/nnstreamer-edge/unittest/
-%endif # test coverage
+%endif # testcoverage
+%endif # unittest
%clean
rm -rf %{buildroot}
%{_includedir}/nnstreamer-edge.h
%{_libdir}/pkgconfig/nnstreamer-edge.pc
-%files sensor
-%manifest nnstreamer-edge.manifest
-%defattr(-,root,root,-)
-%{_libdir}/libedge-sensor.so*
-
-%files sensor-devel
-%{_includedir}/edge_sensor.h
-%{_libdir}/pkgconfig/nnstreamer-edge-sensor.pc
-
%if 0%{?unit_test}
%files unittest
%manifest nnstreamer-edge.manifest
%defattr(-,root,root,-)
%{_bindir}/test_edge_sensor
-%endif
%if 0%{?testcoverage}
%files unittest-coverage
%{_datadir}/nnstreamer-edge/unittest/*
%endif
+%endif # unittest
INSTALL (TARGETS ${NNS_EDGE_LIB_NAME} DESTINATION ${CMAKE_INSTALL_LIBDIR})
INSTALL (FILES ${INCLUDE_DIR}/nnstreamer-edge.h DESTINATION ${INCLUDE_INSTALL_DIR})
-
-# Edge Sensor
-SET(SENSOR_SRCS ${SENSOR_SRC_DIR}/edge_sensor.c)
-ADD_LIBRARY(edge-sensor SHARED ${SENSOR_SRCS})
-SET_TARGET_PROPERTIES(edge-sensor PROPERTIES VERSION ${SO_VERSION})
-TARGET_INCLUDE_DIRECTORIES(edge-sensor PRIVATE ${INCLUDE_DIR})
-TARGET_LINK_LIBRARIES(edge-sensor ${EDGE_REQUIRE_PKGS_LDFLAGS})
-
-INSTALL (TARGETS edge-sensor DESTINATION ${CMAKE_INSTALL_LIBDIR})
-INSTALL (FILES ${INCLUDE_DIR}/edge_sensor.h DESTINATION ${INCLUDE_INSTALL_DIR})
+++ /dev/null
-# nnstreamer-edge-sensor
-
-`nnstreamer-edge-sensor` is a utility library for publishing any type of data as an MQTT message.
-Without GStreamer dependencies, it can provide remote nodes with stream input data such as camera image, temperature, or humidity.
-`mqttsrc` element of NNStreamer can access these data and exploit them on pipeline stream.
-
-`nnstreamer-edge-sensor` is designed not to depend on GStreamer/GLib so targets to be adopted by general RTOS or Lightweight devices.
-
-
-## Getting Started
-
-### Prerequisites
-
-To build `nnstreamer-edge-sensor` in the Ubuntu/Debian environment, the following packages are needed.
-
-```bash
-$ sudo apt install cmake libpaho-mqtt-dev libpaho-mqtt1.3 mosquitto mosquitto-clients
-```
-
-### How to build
-
-When building `nnstreamer-edge-sensor`, you can set the version number of nnstreamer-edge-sensor or enable test program.
-In this example, the test program is enabled for a future test. (`-DENABLE_TEST=ON`)
-
-```bash
-$ git clone https://github.com/nnstreamer/nnstreamer-edge.git
-$ cd nnstreamer-edge
-$ mkdir build
-$ cd build
-...
-# In this example, test is enabled.
-$ cmake .. -DCMAKE_INSTALL_PREFIX=/usr -DENABLE_TEST=ON
-$ make
-```
-
-Note that, to install the built library, you need the `root` permission. You can make an rpm or debian package and install it.
-
-
-## How to use
-
-`nnstreamer-edge-sensor` provides a devel package and it contains `pkg-config` file. So you can use it when building your own program.
-
-```bash
-$ pkg-config --libs nnstreamer-edge-sensor
--L -lnnstreamer-edge-sensor
-```
-
-## How to test
-
-In this example, we publish a jpeg image as `TestTopic` name on the client-side. On the server-side, we subscribe to the `TestTopic` and show received images.
-
-### Check the mqttsrc and mqttsink are available
-
-Before executing the server script, you have to check the `mqttsrc` and `mqttsink` are available as below.
-
-```bash
-$ gst-inspect-1.0 --gst-plugin-path="build" | grep mqtt
-mqtt: mqttsink: MQTT sink
-mqtt: mqttsrc: MQTT source
-```
-
-If `mqtt` element is not available in your environment, you can build the NNStreamer from the source code after enabling the mqtt-support option.
-
-```bash
-$ git clone https://github.com/nnstreamer/nnstreamer.git
-$ cd nnstreamer
-
-# Enable the mqtt-support option as below
-# meson_options.txt
-# +option('mqtt-support', type: 'feature', value: 'enabled')
-
-$ meson build
-$ ninja -C build
-
-# If you want to install, add `install` option as below
-$ ninja -C build install
-```
-
-### Server side
-
-With `mqttsrc` element, the below command subscribes `TestTopic` message which contains one jpeg image. If the message is received, then it is shown on the screen. In this example, we built the nnstreamer in the `build` directory so we use `--gst-plugin-path` option to set the mqtt plugin path.
-
-```bash
-$ gst-launch-1.0 --gst-plugin-path="build" mqttsrc debug=1 sub-topic=TestTopic host=localhost sub-timeout=9223372036854775807 ! jpegdec ! video/x-raw,framerate=0/1 ! videoscale ! videoconvert ! ximagesink qos=0
-```
-
-### Publish single data
-
-If you want to publish a single jpeg file, you can use `-f [file_path]` option. Then the received image is shown on another window.
-
-```bash
-$ pwd
-[your_home]/nnstreamer-edge/build
-$ ./tests/test_edge_sensor
-Usage: ./tests/test_edge_sensor [-f jpeg_file] [-d directory] [-c count]
-$ ./tests/test_edge_sensor -f ../tests/res/1.jpg
-```
-
-
-
-### Publish multiple data
-If you want to publish multiple jpeg files, you can use `[-d directory] [-c count]` options.
-As soon as the topic is received, the image is shown as below.
-
-```bash
-$ pwd
-[your_home]/nnstreamer-edge/build
-$ ./tests/test_edge_sensor -d ../tests/res -c 10
-```
-
+++ /dev/null
-/* SPDX-License-Identifier: Apache-2.0 */
-/**
- * Copyright (C) 2021 Sangjung Woo <sangjung.woo@samsung.com>
- */
-/**
- * @file edge_sensor.c
- * @date 05 July 2021
- * @brief Implementation of Edge APIs for publishing the any type of data as a MQTT topic
- * @see https://github.com/nnstreamer/nnstreamer-edge
- * @author Sangjung Woo <sangjung.woo@samsung.com>
- * @bug No known bugs except for NYI items
- */
-#include "mqttcommon.h"
-#include "edge_sensor.h"
-
-#include <MQTTAsync.h>
-#include <sys/time.h>
-#include <unistd.h>
-#include <string.h>
-#include <stdlib.h>
-
-#define debug_print(fmt, ...) \
- do { if (DEBUG) fprintf(stderr, "%s:%d:%s(): " fmt "\n", __FILE__, \
- __LINE__, __func__, ##__VA_ARGS__); } while (0)
-
-#define GST_CLOCK_TIME_NONE ((uint64_t) -1)
-
-/**
- * @brief Structures for edge info.
- */
-typedef struct _EdgeInfo
-{
- char *mqtt_host_address;
- char *mqtt_host_port;
- char *mqtt_client_id;
- char *mqtt_topic;
-
- edge_mqtt_state_t mqtt_state;
-
- GstMQTTMessageHdr mqtt_msg_hdr;
- uint64_t mqtt_msg_buf_size;
-
- edge_state_change_cb state_change_cb;
- void *user_data;
-
- MQTTAsync mqtt_client;
- MQTTAsync_connectOptions mqtt_conn_opts;
- MQTTAsync_responseOptions mqtt_respn_opts;
- MQTTAsync_disconnectOptions mqtt_disconn_opts;
- MQTTAsync_token mqtt_last_sent_token;
-
- void *mqtt_msg_buf;
-} EdgeInfo;
-
-static uint8_t sink_client_id = 0;
-static const char DEFAULT_MQTT_HOST_ADDRESS[] = "tcp://localhost";
-static const char DEFAULT_MQTT_HOST_PORT[] = "1883";
-static const int DEFAULT_MQTT_QOS = 1;
-static const char DEFAULT_MQTT_PUB_TOPIC_FORMAT[] = "%s/topic";
-static const int BUFFER_SIZE = 1024;
-static const unsigned long DEFAULT_MQTT_DISCONNECT_TIMEOUT = 3 * 1000; /* 3 secs */
-
-/**
- * @brief Callback function to be called if MQTT connection is established.
- * @note If user register the state change callback, then the new state is set to users.
- */
-static void
-cb_mqtt_on_connect (void *context, MQTTAsync_successData * response)
-{
- EdgeInfo *info = (EdgeInfo *) context;
-
- info->mqtt_state = MQTT_CONNECTED;
- debug_print ("Info: MQTT connectioned");
-
- if (info->state_change_cb) {
- info->state_change_cb (info->user_data, info->mqtt_state);
- }
-}
-
-/**
- * @brief Callback function to be called if the connect fails.
- * @note If user register the state change callback, then the new state is set to users.
- */
-static void
-cb_mqtt_on_connect_failure (void *context, MQTTAsync_failureData * response)
-{
- EdgeInfo *info = (EdgeInfo *) context;
-
- info->mqtt_state = MQTT_CONNECT_FAILURE;
- debug_print ("Error: MQTT connection failure");
-
- if (info->state_change_cb) {
- info->state_change_cb (info->user_data, info->mqtt_state);
- }
-}
-
-/**
- * @brief Callback function to be called if the connection to the server is lost.
- * @note If user register the state change callback, then the new state is set to users.
- */
-static void
-cb_mqtt_on_connection_lost (void *context, char *cause)
-{
- EdgeInfo *info = (EdgeInfo *) context;
-
- info->mqtt_state = MQTT_CONNECTION_LOST;
- debug_print ("Error: MQTT connection lost");
-
- if (info->state_change_cb) {
- info->state_change_cb (info->user_data, info->mqtt_state);
- }
-}
-
-/**
- * @brief Callback function to be called the message is completed to deliver to the server.
- */
-static void
-cb_mqtt_on_delivery_complete (void *context, MQTTAsync_token token)
-{
- EdgeInfo *info = (EdgeInfo *) context;
- debug_print ("Info: Message token %d is completed", token);
-
- if (info->mqtt_last_sent_token >= token) {
- info->mqtt_state = MQTT_DELIVERY_COMPLETE;
- }
-}
-
-/**
- * @brief Callback function to be called the message is sent successfully.
- */
-static void
-cb_mqtt_on_send_success (void *context, MQTTAsync_successData * response)
-{
- EdgeInfo *info = (EdgeInfo *) context;
-
- info->mqtt_state = MQTT_SENDING;
- info->mqtt_last_sent_token = response->token;
- debug_print ("Info: Message token %d is sent", response->token);
-}
-
-/**
- * @brief Callback function to be called the message is failed to send.
- */
-static void
-cb_mqtt_on_send_failure (void *context, MQTTAsync_failureData * response)
-{
- EdgeInfo *info = (EdgeInfo *) context;
-
- info->mqtt_state = MQTT_SEND_ERROR;
- debug_print ("Error: Message token %d is failed to send", response->token);
-}
-
-/**
- * @brief Callback function to be called if the disconnect successfully completes.
- * @note If user register the state change callback, then the new state is set to users.
- */
-static void
-cb_mqtt_on_disconnect (void *context, MQTTAsync_successData * response)
-{
- EdgeInfo *info = (EdgeInfo *) context;
-
- info->mqtt_state = MQTT_DISCONNECTED;
- debug_print ("Info: MQTT connection is closed");
-
- if (info->state_change_cb) {
- info->state_change_cb (info->user_data, info->mqtt_state);
- }
-}
-
-/**
- * @brief Callback function to be called if the disconnect fails.
- * @note If user register the state change callback, then the new state is set to users.
- * If the state is MQTT_SENDING or MQTT_DISCONNECTED, then MQTT_DISCONNECT_FAILED is not sent to users.
- */
-static void
-cb_mqtt_on_disconnect_failure (void *context, MQTTAsync_failureData * response)
-{
- EdgeInfo *info = (EdgeInfo *) context;
-
- if ((info->mqtt_state == MQTT_SENDING) ||
- (info->mqtt_state == MQTT_DISCONNECTED)) {
- debug_print ("state: %d", info->mqtt_state);
- return;
- }
-
- info->mqtt_state = MQTT_DISCONNECT_FAILED;
- debug_print ("Info: MQTT connection is failed to close");
- if (info->state_change_cb) {
- info->state_change_cb (info->user_data, info->mqtt_state);
- }
-}
-
-/**
- * @brief Get the system time in microseconds format.
- * @return the number of microseconds since January 1, 1970 UTC.
- */
-static int64_t
-get_time_stamp ()
-{
- const int64_t GST_SEC_TO_US_MULTIPLIER = 1000000;
- struct timeval tv;
- gettimeofday (&tv, NULL);
- return tv.tv_sec * (int64_t) GST_SEC_TO_US_MULTIPLIER + tv.tv_usec;
-}
-
-/**
- * @brief Initialize the EdgeInfo structure.
- * @param[in,out] info The pointer of EdgeInfo structure.
- */
-static void
-init_edge_info (EdgeInfo * info)
-{
- MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
- MQTTAsync_responseOptions respn_opts = MQTTAsync_responseOptions_initializer;
- MQTTAsync_disconnectOptions disconn_opts =
- MQTTAsync_disconnectOptions_initializer;
-
- /* initialize GstMQTTMessageHdr */
- info->mqtt_msg_hdr.num_mems = 0;
- info->mqtt_msg_hdr.base_time_epoch = 0;
- info->mqtt_msg_hdr.sent_time_epoch = 0;
- info->mqtt_msg_hdr.duration = GST_CLOCK_TIME_NONE;
- info->mqtt_msg_hdr.dts = GST_CLOCK_TIME_NONE;
- info->mqtt_msg_hdr.pts = GST_CLOCK_TIME_NONE;
- memset (info->mqtt_msg_hdr.gst_caps_str, 0x00,
- sizeof (info->mqtt_msg_hdr.gst_caps_str));
-
- /* initialize EdgeInfo */
- info->mqtt_host_address = NULL;
- info->mqtt_host_port = NULL;
- info->mqtt_client_id = NULL;
- info->mqtt_topic = NULL;
-
- info->mqtt_state = MQTT_INITIALIZING;
- info->mqtt_msg_buf = NULL;
- info->mqtt_msg_buf_size = 0;
- info->mqtt_last_sent_token = -1;
-
- info->state_change_cb = NULL;
- info->user_data = NULL;
-
- /* MQTT connection option */
- info->mqtt_conn_opts = conn_opts;
- info->mqtt_conn_opts.cleansession = 1;
- info->mqtt_conn_opts.keepAliveInterval = 6;
- info->mqtt_conn_opts.onSuccess = cb_mqtt_on_connect;
- info->mqtt_conn_opts.onFailure = cb_mqtt_on_connect_failure;
- info->mqtt_conn_opts.context = info;
-
- /* MQTT response option */
- info->mqtt_respn_opts = respn_opts;
- info->mqtt_respn_opts.context = info;
- info->mqtt_respn_opts.onSuccess = cb_mqtt_on_send_success;
- info->mqtt_respn_opts.onFailure = cb_mqtt_on_send_failure;
-
- /* MQTT disconnect option */
- info->mqtt_disconn_opts = disconn_opts;
- info->mqtt_disconn_opts.context = info;
- info->mqtt_disconn_opts.onSuccess = cb_mqtt_on_disconnect;
- info->mqtt_disconn_opts.onFailure = cb_mqtt_on_disconnect_failure;
-}
-
-/**
- * @brief Release the allocated memory in the EdgeInfo structure.
- * @param[in] info The pointer of EdgeInfo structure.
- */
-static void
-finalize_edge_info (EdgeInfo * info)
-{
- if (info->mqtt_host_address) {
- free (info->mqtt_host_address);
- info->mqtt_host_address = NULL;
- }
-
- if (info->mqtt_host_port) {
- free (info->mqtt_host_port);
- info->mqtt_host_port = NULL;
- }
-
- if (info->mqtt_client_id) {
- free (info->mqtt_client_id);
- info->mqtt_client_id = NULL;
- }
-
- if (info->mqtt_topic) {
- free (info->mqtt_topic);
- info->mqtt_topic = NULL;
- }
-
- if (info->mqtt_msg_buf) {
- free (info->mqtt_msg_buf);
- info->mqtt_msg_buf = NULL;
- }
-}
-
-/**
- * @brief Close the MQTT connection and release the allocated memory space.
- */
-int
-edge_close_connection (edge_h handle)
-{
- EdgeInfo *info = (EdgeInfo *) handle;
- int ret;
-
- if (!handle) {
- debug_print ("Error: Invalid Param: handle is NULL");
- return -1;
- }
-
- /* check the current state */
- if ((info->mqtt_state != MQTT_CONNECTED) &&
- (info->mqtt_state != MQTT_DELIVERY_COMPLETE) &&
- (info->mqtt_state != MQTT_SENDING)) {
- debug_print ("Error: mqtt_state is invalid!: %d", info->mqtt_state);
- return -1;
- }
-
- /* support QoS */
- if (info->mqtt_state == MQTT_SENDING) {
- debug_print ("Info: MQTTAsync_waitForCompletion(): %d",
- info->mqtt_last_sent_token);
- MQTTAsync_waitForCompletion (info->mqtt_client, info->mqtt_last_sent_token,
- DEFAULT_MQTT_DISCONNECT_TIMEOUT);
- }
- /* In case of MQTT_REQUEST_STOP state, publishing new message is denied. */
- info->mqtt_state = MQTT_REQUEST_STOP;
-
- /* try to disconnect */
- while (MQTTAsync_isConnected (info->mqtt_client)) {
- ret = MQTTAsync_disconnect (info->mqtt_client, &info->mqtt_disconn_opts);
- if (ret != MQTTASYNC_SUCCESS) {
- debug_print ("Error: failed to MQTTAsync_disconnect()");
- return -1;
- }
- }
-
- /* cleanup resources */
- MQTTAsync_destroy (&info->mqtt_client);
- finalize_edge_info (info);
- free (info);
-
- return 0;
-}
-
-/**
- * @brief Open the MQTT connection with specific options
- */
-int
-edge_open_connection (edge_h * handle,
- char *host_address, char *host_port, char *topic_name,
- int64_t base_time_stamp, uint64_t duration, char *gst_caps_string,
- edge_state_change_cb callback, void *user_data)
-{
- EdgeInfo *info = NULL;
- char server_url[BUFFER_SIZE];
- char client_id[BUFFER_SIZE];
- char topic[BUFFER_SIZE];
- int ret = 0;
-
- if (!handle) {
- debug_print ("Error: Invalid Param: handle is NULL");
- return -1;
- }
-
- info = (EdgeInfo *) malloc (sizeof (EdgeInfo));
- if (!info) {
- debug_print ("Error: failed to malloc()");
- return -1;
- }
- init_edge_info (info);
-
- /* Set parameter */
- info->mqtt_host_address =
- host_address ? strdup (host_address) : strdup (DEFAULT_MQTT_HOST_ADDRESS);
- info->mqtt_host_port =
- host_port ? strdup (host_port) : strdup (DEFAULT_MQTT_HOST_PORT);
-
- snprintf (client_id, BUFFER_SIZE, "edge_sensor_%u_%u", getpid (),
- sink_client_id++);
- info->mqtt_client_id = strdup (client_id);
-
- if (topic_name) {
- info->mqtt_topic = strdup (topic_name);
- } else {
- snprintf (topic, BUFFER_SIZE, DEFAULT_MQTT_PUB_TOPIC_FORMAT,
- info->mqtt_client_id);
- info->mqtt_topic = strdup (topic);
- }
-
- /* Set base time stamp if user is provided. If not, current time stamp is used */
- if (base_time_stamp == 0) {
- info->mqtt_msg_hdr.base_time_epoch =
- get_time_stamp () * GST_US_TO_NS_MULTIPLIER;
- } else {
- info->mqtt_msg_hdr.base_time_epoch = base_time_stamp;
- }
-
- /* Set state change callback */
- if (callback) {
- info->state_change_cb = callback;
- info->user_data = user_data;
- }
-
- /* Create MQTT Client object */
- snprintf (server_url, BUFFER_SIZE, "%s:%s", info->mqtt_host_address,
- info->mqtt_host_port);
- ret =
- MQTTAsync_create (&info->mqtt_client, server_url, info->mqtt_client_id,
- MQTTCLIENT_PERSISTENCE_NONE, NULL);
- if (ret != MQTTASYNC_SUCCESS) {
- debug_print ("Error: failed MQTTAsync_create(): %d", ret);
- goto error_handling;
- }
-
- /* Attempt to connect MQTT Server */
- MQTTAsync_setCallbacks (info->mqtt_client, info,
- cb_mqtt_on_connection_lost, NULL, cb_mqtt_on_delivery_complete);
- ret = MQTTAsync_connect (info->mqtt_client, &info->mqtt_conn_opts);
- if (ret != MQTTASYNC_SUCCESS) {
- debug_print ("Error: failed MQTTAsync_connect()");
- goto error_handling;
- }
- *handle = info;
- return 0;
-
-error_handling:
- finalize_edge_info (info);
- free (info);
- return -1;
-}
-
-/**
- * @brief Publish the single message that contains only one record.
- *
- */
-int
-edge_publish_single_msg (edge_h handle, void *buffer, uint64_t payload_size)
-{
- EdgeInfo *info = (EdgeInfo *) handle;
- int ret = 0;
- uint8_t *msg_pub;
-
- /* check the input parameters */
- if (!handle) {
- debug_print ("Error: Invalid Param: handle is NULL");
- return -1;
- }
-
- if (!buffer) {
- debug_print ("Error: Invalid Param: buffer is NULL");
- return -1;
- }
-
- if (payload_size == 0) {
- debug_print ("Error: Invalid Param: payload_size should be bigger than 0.");
- return -1;
- }
-
- /* check the current state */
- if ((info->mqtt_state != MQTT_CONNECTED) &&
- (info->mqtt_state != MQTT_DELIVERY_COMPLETE) &&
- (info->mqtt_state != MQTT_SENDING)) {
- debug_print ("Error: mqtt_state is invalid!: %d", info->mqtt_state);
- return -1;
- }
-
- /* alocate the MQTT buffer memory */
- if (payload_size > info->mqtt_msg_buf_size) {
- info->mqtt_msg_buf_size = payload_size + GST_MQTT_LEN_MSG_HDR;;
- free (info->mqtt_msg_buf);
- info->mqtt_msg_buf = NULL;
- }
- if (!info->mqtt_msg_buf) {
- info->mqtt_msg_buf = malloc (info->mqtt_msg_buf_size);
- if (!info->mqtt_msg_buf) {
- debug_print ("Error: failed malloc()");
- return -1;
- }
- }
-
- /* setup header info */
- info->mqtt_msg_hdr.num_mems = 1;
- info->mqtt_msg_hdr.size_mems[0] = payload_size;
-
- /* set timestamp */
- info->mqtt_msg_hdr.sent_time_epoch =
- get_time_stamp () * GST_US_TO_NS_MULTIPLIER;
- info->mqtt_msg_hdr.pts =
- info->mqtt_msg_hdr.base_time_epoch - info->mqtt_msg_hdr.sent_time_epoch;
- info->mqtt_msg_hdr.dts = GST_CLOCK_TIME_NONE;
- info->mqtt_msg_hdr.duration = GST_CLOCK_TIME_NONE;
-
- /* copy header and payload */
- msg_pub = info->mqtt_msg_buf;
- memcpy (msg_pub, &info->mqtt_msg_hdr, sizeof (info->mqtt_msg_hdr));
- memcpy (&msg_pub[sizeof (info->mqtt_msg_hdr)], buffer, payload_size);
-
- /* send message to the MQTT server */
- ret = MQTTAsync_send (info->mqtt_client, info->mqtt_topic,
- info->mqtt_msg_buf_size, info->mqtt_msg_buf,
- DEFAULT_MQTT_QOS, 1, &info->mqtt_respn_opts);
- if (ret != MQTTASYNC_SUCCESS) {
- debug_print ("Error: failed MQTTAsync_send()");
- return -1;
- }
- return ret;
-}
+++ /dev/null
-/* SPDX-License-Identifier: Apache-2.0 */
-/**
- * @file mqttcommon.h
- * @date 08 Mar 2021
- * @brief Common macros and utility functions for GStreamer MQTT plugins
- * @see https://github.com/nnstreamer/nnstreamer
- * @see https://github.com/nnstreamer/nnstreamer-edge
- * @author Wook Song <wook16.song@samsung.com>
- * Sangjung Woo <sangjung.woo@samsung.com>
- * @bug No known bugs except for NYI items
- * @note To remove glib dependency, this file uses fixed width integer types (i.e. stdint.h) instead of GLib types.
- */
-#ifndef __EDGE_MQTT_COMMON_H__
-#define __EDGE_MQTT_COMMON_H__
-
-#include <stdint.h>
-
-#define GST_MQTT_MAX_NUM_MEMS 16
-#define GST_MQTT_LEN_MSG_HDR 1024
-#define GST_MQTT_MAX_LEN_GST_CPAS_STR 512
-
-#define GST_US_TO_NS_MULTIPLIER 1000
-
-typedef struct _GstMQTTMessageHdr {
- union {
- struct {
- uint32_t num_mems;
- uint64_t size_mems[GST_MQTT_MAX_NUM_MEMS];
-
- int64_t base_time_epoch;
- int64_t sent_time_epoch;
-
- uint64_t duration;
- uint64_t dts;
- uint64_t pts;
- char gst_caps_str[GST_MQTT_MAX_LEN_GST_CPAS_STR];
- };
- uint8_t _reserved_hdr[GST_MQTT_LEN_MSG_HDR];
- };
-} GstMQTTMessageHdr;
-
-#endif /* __EDGE_MQTT_COMMON_H__ */
-# Edge Sensor test
-ADD_EXECUTABLE(test_edge_sensor test_edge_sensor.c)
-TARGET_INCLUDE_DIRECTORIES(test_edge_sensor PRIVATE ${EDGE_REQUIRE_PKGS_INCLUDE_DIRS} ${INCLUDE_DIR})
-TARGET_LINK_LIBRARIES(test_edge_sensor -L${PROJECT_BINARY_DIR}/src edge-sensor)
-INSTALL (TARGETS test_edge_sensor DESTINATION ${BIN_INSTALL_DIR})
-
-# Unittest
-ADD_EXECUTABLE(unittest_edge_sensor unittest_edge_sensor.cc)
-TARGET_INCLUDE_DIRECTORIES(unittest_edge_sensor PRIVATE ${INCLUDE_DIR})
-TARGET_LINK_LIBRARIES(unittest_edge_sensor ${TEST_REQUIRE_PKGS_LDFLAGS} edge-sensor)
+# nnstreamer-edge test
+# TODO upload unittest for nnstreamer-edge later
+++ /dev/null
-
-/* SPDX-License-Identifier: Apache-2.0 */
-/**
- * @file test_edge_sensor.c
- * @date 05 July 2021
- * @brief Publish the jpeg data as "TestTopic" topic name 10 times
- *
- * @details Usage examples
- * This test case publishes the jpeg data as "TestTopic" topic name 10 times.
- * If data is successfully received, then the image is shown on the server-side.
- *
- * Server Side:
- * gst-launch-1.0 \
- * mqttsrc debug=1 sub-topic=TestTopic host=localhost sub-timeout=9223372036854775807 ! \
- * jpegdec ! video/x-raw,framerate=0/1 ! \
- * videoscale ! videoconvert ! ximagesink qos=0
- *
- * Client Side:
- * $ ./test_edge_sensor
- * Usage: ./tests/test_edge_sensor [-f jpeg_file] [-d directory] [-c count]
- * # Publish single image
- * $ ./test_edge_sensor -f ./0.jpg
- * # Publish multiple images 10 times
- * $ ./tests/test_edge_sensor -d ../tests/res -c 10
- */
-#include <stdio.h>
-#include <stdbool.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-#include "edge_sensor.h"
-
-/**
- * @brief Get jpeg data and its size from the file path
- * @param[in] path jpeg file path
- * @param[out] buf image data of the input jpeg file
- * @param[out] buf_size image size of the input jpeg file
- */
-int
-read_jpeg_file (const char *path, uint8_t **buf, uint64_t *buf_size)
-{
- uint8_t *out_buf;
- uint64_t curr_size = 0;
- uint64_t total_size = 0;
-
- FILE *fr = fopen(path,"rb");
-
- /* get file size */
- fseek(fr, 0, SEEK_END);
- *buf_size = (uint64_t)ftell(fr);
-
- /* allocate the memory space */
- out_buf = (uint8_t*)malloc(*buf_size);
- if (!out_buf) {
- printf ("Fail to malloc()\n");
- return -1;
- }
-
- /* get the data from file */
- fseek(fr, 0, SEEK_SET);
- while ((curr_size = fread (&out_buf[total_size], sizeof(uint8_t), *buf_size - total_size, fr)) > 0) {
- total_size += curr_size;
- }
-
- if (total_size != *buf_size) {
- printf ("Fail to fread(): different size\n");
- return -1;
- }
-
- fclose (fr);
- *buf = out_buf;
- return 0;
-}
-
-/**
- * @brief Callback function to be called the connection state is changed.
- * @param[in,out] user_data User data that provided when calling edge_open_connection()
- * @param[in] state The new state of MQTT connection
- */
-void
-state_change_cb (void *user_data, edge_mqtt_state_t state)
-{
- edge_mqtt_state_t *s = (edge_mqtt_state_t*)user_data;
- switch (state) {
- case MQTT_CONNECTED:
- *s = MQTT_CONNECTED;
- printf(" - User: State change: MQTT_CONNECTED\n");
- break;
-
- case MQTT_DISCONNECTED:
- *s = MQTT_DISCONNECTED;
- printf(" - User: State change: MQTT_DISCONNECTED\n");
- break;
-
- default:
- printf(" - User: State change: Default\n");
- break;
- }
- return ;
-}
-
-int
-main(int argc, char* argv[])
-{
- int opt;
- edge_h handle;
- edge_mqtt_state_t state = MQTT_CONNECTION_LOST;
- char path[512] = { 0 };
- uint8_t *buf;
- uint64_t buf_size;
- int curr_count = 1;
- int max_count = 0;
- bool flag_single_file = false;
-
- if (argc == 1) {
- fprintf(stderr, "Usage: %s [-f jpeg_file] [-d directory] [-c count]\n", argv[0]);
- exit (0);
- }
-
- while ((opt = getopt(argc, argv, "f:c:d:")) != -1) {
- switch (opt) {
- case 'f':
- /* Single image file */
- flag_single_file = true;
- memcpy (path, optarg, strlen (optarg));
- break;
- case 'c':
- /* Max Count */
- max_count = atoi (optarg);
- break;
- case 'd':
- flag_single_file = false;
- memcpy (path, optarg, strlen (optarg));
- break;
-
- default:
- fprintf(stderr, "Usage: %s [-f jpeg_file] [-d directory] [-c count]\n", argv[0]);
- exit (0);
- }
- }
-
- int ret = edge_open_connection (&handle,
- NULL, NULL, "TestTopic",
- 0, 0, "", state_change_cb, (void*)&state);
- if (ret != 0) {
- printf ("Error: edge_open_connection() ret: %d\n", ret);
- return -1;
- }
- usleep(200000L);
-
- while (1) {
- if (flag_single_file) {
- read_jpeg_file (path, &buf, &buf_size);
- } else {
- char local_path[1024] = { 0 };
- snprintf (local_path, 1024, "%s/%d.jpg", path, curr_count%3);
- read_jpeg_file (local_path, &buf, &buf_size);
- }
- ret = edge_publish_single_msg (handle, buf, buf_size);
- if (ret != 0) {
- printf ("Error: edge_publish_single_msg() ret: %d\n", ret);
- return -1;
- }
-
- free(buf);
- usleep(200000L);
- if (++curr_count > max_count)
- break;
- }
-
- ret = edge_close_connection (handle);
- if (ret != 0) {
- printf ("Error: edge_close_connection() ret: %d\n", ret);
- return -1;
- }
- return 0;
-}
+++ /dev/null
-/**\r
- * @file unittest_edge_sensor.cc\r
- * @date 15 Sep 2021\r
- * @brief Unit test for edge sensor.\r
- * @see https://github.com/nnstreamer/nnstreamer-edge\r
- * @author Gichan Jang <gichan2.jang@samsung.com>\r
- * @bug No known bugs\r
- */\r
-\r
-#include <gtest/gtest.h>\r
-#include "edge_sensor.h"\r
-#include <string.h>\r
-#include <stdlib.h>\r
-\r
-/**\r
- * @brief Callback function to be called the connection state is changed.\r
- * @param[in,out] user_data User data that provided when calling edge_open_connection()\r
- * @param[in] state The new state of MQTT connection\r
- */\r
-static void\r
-state_change_cb (void *user_data, edge_mqtt_state_t state)\r
-{\r
- edge_mqtt_state_t *s = (edge_mqtt_state_t*)user_data;\r
- switch (state) {\r
- case MQTT_CONNECTED:\r
- *s = MQTT_CONNECTED;\r
- printf(" - User: State change: MQTT_CONNECTED\n");\r
- break;\r
- case MQTT_DISCONNECTED:\r
- *s = MQTT_DISCONNECTED;\r
- printf(" - User: State change: MQTT_DISCONNECTED\n");\r
- break;\r
- default:\r
- printf(" - User: State change: Default\n");\r
- break;\r
- }\r
- return ;\r
-}\r
-\r
-/**\r
- * @brief Publish single message.\r
- */\r
-TEST(edgeSensor, publishSingleMessage0) {\r
- edge_h handle;\r
- edge_mqtt_state_t state = MQTT_CONNECTION_LOST;\r
- char * topic_str = strdup ("TestTopic");\r
- char * caps_str = strdup ("");\r
- uint64_t buf_size = 10;\r
- uint8_t * buf = (uint8_t *) malloc (buf_size * sizeof(uint8_t));\r
-\r
- ASSERT_EQ (0, edge_open_connection (&handle, NULL, NULL, topic_str,\r
- 0, 0, caps_str, state_change_cb, (void*) &state));\r
- usleep(200000L);\r
-\r
- EXPECT_EQ (0, edge_publish_single_msg (handle, buf, buf_size));\r
- usleep(200000L);\r
-\r
- EXPECT_EQ (0, edge_close_connection (handle));\r
-\r
- free (buf);\r
- free (topic_str);\r
- free (caps_str);\r
-}\r
-\r
-/**\r
- * @brief Publish single message with invalid param.\r
- */\r
-TEST(edgeSensor, publishSingleMessage1_n) {\r
- uint64_t buf_size = 10;\r
- uint8_t * buf = (uint8_t *) malloc (buf_size * sizeof(uint8_t));\r
-\r
- EXPECT_NE (0, edge_publish_single_msg (NULL, buf, buf_size));\r
- usleep(200000L);\r
-\r
- free (buf);\r
-}\r
-\r
-/**\r
- * @brief Open connection with invalud param.\r
- */\r
-TEST(edgeSensor, openConnection0_n) {\r
- edge_mqtt_state_t state = MQTT_CONNECTION_LOST;\r
- char * topic_str = strdup ("TestTopic");\r
- char * caps_str = strdup ("");\r
-\r
- ASSERT_NE (0, edge_open_connection (NULL, NULL, NULL, topic_str,\r
- 0, 0, caps_str, state_change_cb, (void*) &state));\r
- usleep(200000L);\r
-\r
- free (topic_str);\r
- free (caps_str);\r
-}\r
-\r
-/**\r
- * @brief Publish single message with invalid param.\r
- */\r
-TEST(edgeSensor, publish2_n) {\r
- edge_h handle;\r
- edge_mqtt_state_t state = MQTT_CONNECTION_LOST;\r
- char * topic_str = strdup ("TestTopic");\r
- char * caps_str = strdup ("");\r
- uint64_t buf_size = 10;\r
-\r
- ASSERT_EQ (0, edge_open_connection (&handle, NULL, NULL, topic_str,\r
- 0, 0, caps_str, state_change_cb, (void*) &state));\r
- usleep(200000L);\r
-\r
- EXPECT_NE (0, edge_publish_single_msg (handle, NULL, buf_size));\r
- usleep(200000L);\r
-\r
- EXPECT_EQ (0, edge_close_connection (handle));\r
-\r
- free (topic_str);\r
- free (caps_str);\r
-}\r
-\r
-/**\r
- * @brief Publish single message with invalid param.\r
- */\r
-TEST(edgeSensor, publish3_n) {\r
- edge_h handle;\r
- edge_mqtt_state_t state = MQTT_CONNECTION_LOST;\r
- char * topic_str = strdup ("TestTopic");\r
- char * caps_str = strdup ("");\r
- uint64_t buf_size = 0;\r
- uint8_t * buf = (uint8_t *) malloc (buf_size * sizeof(uint8_t));\r
-\r
- ASSERT_EQ (0, edge_open_connection (&handle, NULL, NULL, topic_str,\r
- 0, 0, caps_str, state_change_cb, (void*) &state));\r
- usleep(200000L);\r
-\r
- EXPECT_NE (0, edge_publish_single_msg (handle, buf, buf_size));\r
- usleep(200000L);\r
-\r
- EXPECT_EQ (0, edge_close_connection (handle));\r
-\r
- free (buf);\r
- free (topic_str);\r
- free (caps_str);\r
-}\r
-\r
-/**\r
- * @brief Close connection with invalid param.\r
- */\r
-TEST(edgeSensor, closeConnection0_n) {\r
- EXPECT_NE (0, edge_close_connection (NULL));\r
-}\r
-\r
-/**\r
- * @brief Main gtest\r
- */\r
-int\r
-main (int argc, char **argv)\r
-{\r
- int result = -1;\r
-\r
- try {\r
- testing::InitGoogleTest (&argc, argv);\r
- } catch (...) {\r
- fprintf(stderr, "catch 'testing::internal::<unnamed>::ClassUniqueToAlwaysTrue'");\r
- }\r
-\r
- try {\r
- result = RUN_ALL_TESTS ();\r
- } catch (...) {\r
- fprintf(stderr, "catch `testing::internal::GoogleTestFailureException`");\r
- }\r
-\r
- return result;\r
-}\r