From d759af0916a6b21708ea5ccd858e72bfb5e386ec Mon Sep 17 00:00:00 2001 From: Jaeyun Date: Tue, 3 Jan 2023 19:33:00 +0900 Subject: [PATCH] [Connect] flag to check sending thread 1. Add flag to check sending thread. 2. Init data ptr and size in the queue. Signed-off-by: Jaeyun --- src/libnnstreamer-edge/nnstreamer-edge-internal.c | 14 +++++++++++++- src/libnnstreamer-edge/nnstreamer-edge-queue.c | 8 ++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c index 29e490d..f19a74a 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.c @@ -66,6 +66,7 @@ typedef struct pthread_t listener_thread; /* thread and queue to send data */ + bool sending; nns_edge_queue_h send_queue; pthread_t send_thread; @@ -813,7 +814,14 @@ _nns_edge_send_thread (void *thread_data) char *val; int ret; - while (nns_edge_queue_wait_pop (eh->send_queue, 0U, &data_h, &data_size)) { + eh->sending = true; + while (eh->sending && + nns_edge_queue_wait_pop (eh->send_queue, 0U, &data_h, &data_size)) { + if (!eh->sending) { + nns_edge_data_destroy (data_h); + break; + } + /* Send data to destination */ switch (eh->connect_type) { case NNS_EDGE_CONNECT_TYPE_TCP: @@ -859,6 +867,8 @@ _nns_edge_send_thread (void *thread_data) } nns_edge_data_destroy (data_h); } + eh->sending = false; + return NULL; } @@ -1231,6 +1241,7 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type, eh->broker_h = NULL; eh->connections = NULL; eh->listening = false; + eh->sending = false; eh->listener_fd = -1; nns_edge_metadata_create (&eh->metadata); nns_edge_queue_create (&eh->send_queue); @@ -1382,6 +1393,7 @@ nns_edge_release_handle (nns_edge_h edge_h) eh->send_queue = NULL; if (eh->send_thread) { + eh->sending = false; pthread_join (eh->send_thread, NULL); eh->send_thread = 0; } diff --git a/src/libnnstreamer-edge/nnstreamer-edge-queue.c b/src/libnnstreamer-edge/nnstreamer-edge-queue.c index ea806c1..4a3fcb8 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-queue.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-queue.c @@ -262,6 +262,10 @@ nns_edge_queue_pop (nns_edge_queue_h handle, void **data, nns_size_t * size) return false; } + /* init data */ + *data = NULL; + *size = 0U; + nns_edge_lock (q); popped = _pop_data (q, false, data, size); nns_edge_unlock (q); @@ -294,6 +298,10 @@ nns_edge_queue_wait_pop (nns_edge_queue_h handle, unsigned int timeout, return false; } + /* init data */ + *data = NULL; + *size = 0U; + nns_edge_lock (q); if (q->length == 0U) { if (timeout > 0) { -- 2.34.1