From 2add5bd98c5d053e0df928564741d0d3acba712f Mon Sep 17 00:00:00 2001 From: Jaeyun Date: Fri, 26 Aug 2022 15:42:04 +0900 Subject: [PATCH] [Transfer] thread to send data Add new thread to send data asynchronously. Signed-off-by: Jaeyun --- .../nnstreamer-edge-internal.c | 155 ++++++++++++++---- .../nnstreamer-edge-internal.h | 5 + 2 files changed, 129 insertions(+), 31 deletions(-) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c index c67b60a..810d718 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.c @@ -406,6 +406,11 @@ _nns_edge_transfer_data (nns_edge_conn_s * conn, nns_edge_data_h data_h, ret = _nns_edge_cmd_send (conn, &cmd); SAFE_FREE (cmd.meta); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to send edge data to destination (%s:%d).", + conn->host, conn->port); + } + return ret; } @@ -814,6 +819,99 @@ _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn, return NNS_EDGE_ERROR_NONE; } +/** + * @brief Thread to send data. + */ +static void * +_nns_edge_send_thread (void *thread_data) +{ + nns_edge_handle_s *eh = (nns_edge_handle_s *) thread_data; + nns_edge_conn_data_s *conn_data; + nns_edge_data_h data_h; + int64_t *client_ids; + char *val; + unsigned int i, length; + int ret; + + while (nns_edge_queue_wait_pop (eh->send_queue, 0U, &data_h)) { + /* Send data to destination */ + ret = nns_edge_data_get_info (data_h, "client_id", &val); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_logd + ("Cannot find client ID in edge data. Send to all connected nodes."); + client_ids = (int64_t *) g_hash_table_get_keys_as_array (eh->conn_table, + &length); + } else { + length = 1U; + + client_ids = (int64_t *) malloc (sizeof (int64_t)); + if (client_ids) + *client_ids = strtoll (val, NULL, 10); + + SAFE_FREE (val); + } + + if (!client_ids) { + nns_edge_loge ("Failed to get client ID, cannot send edge data."); + goto done; + } + + /** @todo update code for each connect type */ + switch (eh->connect_type) { + case NNS_EDGE_CONNECT_TYPE_TCP: + case NNS_EDGE_CONNECT_TYPE_HYBRID: + for (i = 0; i < length; i++) { + conn_data = _nns_edge_get_connection (eh, client_ids[i]); + if (!conn_data) { + nns_edge_loge + ("Cannot find connection, invalid client ID or connection closed."); + goto done; + } + + ret = _nns_edge_transfer_data (conn_data->sink_conn, data_h, + client_ids[i]); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to send edge data."); + goto done; + } + } + break; + default: + break; + } + + done: + SAFE_FREE (client_ids); + nns_edge_data_destroy (data_h); + } + + return NULL; +} + +/** + * @brief Create thread to send data. + */ +static int +_nns_edge_create_send_thread (nns_edge_handle_s * eh) +{ + pthread_attr_t attr; + int tid; + + pthread_attr_init (&attr); + pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_JOINABLE); + + tid = pthread_create (&eh->send_thread, &attr, _nns_edge_send_thread, eh); + pthread_attr_destroy (&attr); + + if (tid < 0) { + nns_edge_loge ("Failed to create sender thread."); + eh->send_thread = 0; + return NNS_EDGE_ERROR_IO; + } + + return NNS_EDGE_ERROR_NONE; +} + /** * @brief Accept socket and create message thread in socket listener thread. */ @@ -1051,6 +1149,7 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type, nns_edge_metadata_init (&eh->meta); eh->listening = false; eh->listener_fd = -1; + nns_edge_queue_create (&eh->send_queue); /* Connection data for each client ID. */ eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, @@ -1131,6 +1230,8 @@ nns_edge_start (nns_edge_h edge_h) goto error; } + ret = _nns_edge_create_send_thread (eh); + error: nns_edge_unlock (eh); return ret; @@ -1168,6 +1269,14 @@ nns_edge_release_handle (nns_edge_h edge_h) eh->event_cb = NULL; eh->user_data = NULL; + nns_edge_queue_destroy (eh->send_queue); + eh->send_queue = NULL; + + if (eh->send_thread) { + pthread_join (eh->send_thread, NULL); + eh->send_thread = 0; + } + if (eh->listener_thread) { if (eh->listening) { pthread_cancel (eh->listener_thread); @@ -1367,10 +1476,7 @@ int nns_edge_send (nns_edge_h edge_h, nns_edge_data_h data_h) { nns_edge_handle_s *eh; - nns_edge_conn_data_s *conn_data; - int64_t client_id; - char *val; - int ret = NNS_EDGE_ERROR_NONE; + nns_edge_data_h new_data_h; eh = (nns_edge_handle_s *) edge_h; if (!eh) { @@ -1383,15 +1489,6 @@ nns_edge_send (nns_edge_h edge_h, nns_edge_data_h data_h) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - ret = nns_edge_data_get_info (data_h, "client_id", &val); - if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("Cannot find client ID in edge data."); - return NNS_EDGE_ERROR_INVALID_PARAMETER; - } - - client_id = strtoll (val, NULL, 10); - SAFE_FREE (val); - nns_edge_lock (eh); if (!NNS_EDGE_MAGIC_IS_VALID (eh)) { @@ -1400,28 +1497,24 @@ nns_edge_send (nns_edge_h edge_h, nns_edge_data_h data_h) return NNS_EDGE_ERROR_INVALID_PARAMETER; } - /** @todo update code for each connect type */ - switch (eh->connect_type) { - case NNS_EDGE_CONNECT_TYPE_TCP: - case NNS_EDGE_CONNECT_TYPE_HYBRID: - conn_data = _nns_edge_get_connection (eh, client_id); - if (!conn_data) { - nns_edge_loge - ("Cannot find connection, invalid client ID or connection closed."); - ret = NNS_EDGE_ERROR_INVALID_PARAMETER; - break; - } + if (!eh->send_thread) { + nns_edge_loge ("Invalid state, start edge before sending a data."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_IO; + } - ret = _nns_edge_transfer_data (conn_data->sink_conn, data_h, client_id); - if (ret != NNS_EDGE_ERROR_NONE) - nns_edge_loge ("Failed to send edge data."); - break; - default: - break; + /* Create new data handle and push it into send-queue. */ + nns_edge_data_copy (data_h, &new_data_h); + + if (!nns_edge_queue_push (eh->send_queue, new_data_h, + (nns_edge_queue_data_destroy_cb) nns_edge_data_destroy)) { + nns_edge_loge ("Failed to send data, cannot push data into queue."); + nns_edge_unlock (eh); + return NNS_EDGE_ERROR_IO; } nns_edge_unlock (eh); - return ret; + return NNS_EDGE_ERROR_NONE; } /** diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.h b/src/libnnstreamer-edge/nnstreamer-edge-internal.h index 698b58c..7a8d4cc 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.h +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.h @@ -20,6 +20,7 @@ extern "C" { #include "nnstreamer-edge.h" #include "nnstreamer-edge-common.h" +#include "nnstreamer-edge-queue.h" /** * @brief Data structure for edge handle. @@ -51,6 +52,10 @@ typedef struct { int listener_fd; pthread_t listener_thread; + /* thread and queue to send data */ + nns_edge_queue_h send_queue; + pthread_t send_thread; + /* MQTT */ nns_edge_broker_h broker_h; } nns_edge_handle_s; -- 2.34.1