[Transfer] thread to send data
authorJaeyun <jy1210.jung@samsung.com>
Fri, 26 Aug 2022 06:42:04 +0000 (15:42 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Wed, 31 Aug 2022 02:57:06 +0000 (11:57 +0900)
Add new thread to send data asynchronously.

Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
src/libnnstreamer-edge/nnstreamer-edge-internal.c
src/libnnstreamer-edge/nnstreamer-edge-internal.h

index c67b60adc24a2877e483a4f7ca00fe925e77a25b..810d718b3d0c70bf0e47eb9af812b4a8c97b8b3f 100644 (file)
@@ -406,6 +406,11 @@ _nns_edge_transfer_data (nns_edge_conn_s * conn, nns_edge_data_h data_h,
   ret = _nns_edge_cmd_send (conn, &cmd);
   SAFE_FREE (cmd.meta);
 
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to send edge data to destination (%s:%d).",
+        conn->host, conn->port);
+  }
+
   return ret;
 }
 
@@ -814,6 +819,99 @@ _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn,
   return NNS_EDGE_ERROR_NONE;
 }
 
+/**
+ * @brief Thread to send data.
+ */
+static void *
+_nns_edge_send_thread (void *thread_data)
+{
+  nns_edge_handle_s *eh = (nns_edge_handle_s *) thread_data;
+  nns_edge_conn_data_s *conn_data;
+  nns_edge_data_h data_h;
+  int64_t *client_ids;
+  char *val;
+  unsigned int i, length;
+  int ret;
+
+  while (nns_edge_queue_wait_pop (eh->send_queue, 0U, &data_h)) {
+    /* Send data to destination */
+    ret = nns_edge_data_get_info (data_h, "client_id", &val);
+    if (ret != NNS_EDGE_ERROR_NONE) {
+      nns_edge_logd
+          ("Cannot find client ID in edge data. Send to all connected nodes.");
+      client_ids = (int64_t *) g_hash_table_get_keys_as_array (eh->conn_table,
+          &length);
+    } else {
+      length = 1U;
+
+      client_ids = (int64_t *) malloc (sizeof (int64_t));
+      if (client_ids)
+        *client_ids = strtoll (val, NULL, 10);
+
+      SAFE_FREE (val);
+    }
+
+    if (!client_ids) {
+      nns_edge_loge ("Failed to get client ID, cannot send edge data.");
+      goto done;
+    }
+
+    /** @todo update code for each connect type */
+    switch (eh->connect_type) {
+      case NNS_EDGE_CONNECT_TYPE_TCP:
+      case NNS_EDGE_CONNECT_TYPE_HYBRID:
+        for (i = 0; i < length; i++) {
+          conn_data = _nns_edge_get_connection (eh, client_ids[i]);
+          if (!conn_data) {
+            nns_edge_loge
+                ("Cannot find connection, invalid client ID or connection closed.");
+            goto done;
+          }
+
+          ret = _nns_edge_transfer_data (conn_data->sink_conn, data_h,
+              client_ids[i]);
+          if (ret != NNS_EDGE_ERROR_NONE) {
+            nns_edge_loge ("Failed to send edge data.");
+            goto done;
+          }
+        }
+        break;
+      default:
+        break;
+    }
+
+  done:
+    SAFE_FREE (client_ids);
+    nns_edge_data_destroy (data_h);
+  }
+
+  return NULL;
+}
+
+/**
+ * @brief Create thread to send data.
+ */
+static int
+_nns_edge_create_send_thread (nns_edge_handle_s * eh)
+{
+  pthread_attr_t attr;
+  int tid;
+
+  pthread_attr_init (&attr);
+  pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_JOINABLE);
+
+  tid = pthread_create (&eh->send_thread, &attr, _nns_edge_send_thread, eh);
+  pthread_attr_destroy (&attr);
+
+  if (tid < 0) {
+    nns_edge_loge ("Failed to create sender thread.");
+    eh->send_thread = 0;
+    return NNS_EDGE_ERROR_IO;
+  }
+
+  return NNS_EDGE_ERROR_NONE;
+}
+
 /**
  * @brief Accept socket and create message thread in socket listener thread.
  */
@@ -1051,6 +1149,7 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type,
   nns_edge_metadata_init (&eh->meta);
   eh->listening = false;
   eh->listener_fd = -1;
+  nns_edge_queue_create (&eh->send_queue);
 
   /* Connection data for each client ID. */
   eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
@@ -1131,6 +1230,8 @@ nns_edge_start (nns_edge_h edge_h)
     goto error;
   }
 
+  ret = _nns_edge_create_send_thread (eh);
+
 error:
   nns_edge_unlock (eh);
   return ret;
@@ -1168,6 +1269,14 @@ nns_edge_release_handle (nns_edge_h edge_h)
   eh->event_cb = NULL;
   eh->user_data = NULL;
 
+  nns_edge_queue_destroy (eh->send_queue);
+  eh->send_queue = NULL;
+
+  if (eh->send_thread) {
+    pthread_join (eh->send_thread, NULL);
+    eh->send_thread = 0;
+  }
+
   if (eh->listener_thread) {
     if (eh->listening) {
       pthread_cancel (eh->listener_thread);
@@ -1367,10 +1476,7 @@ int
 nns_edge_send (nns_edge_h edge_h, nns_edge_data_h data_h)
 {
   nns_edge_handle_s *eh;
-  nns_edge_conn_data_s *conn_data;
-  int64_t client_id;
-  char *val;
-  int ret = NNS_EDGE_ERROR_NONE;
+  nns_edge_data_h new_data_h;
 
   eh = (nns_edge_handle_s *) edge_h;
   if (!eh) {
@@ -1383,15 +1489,6 @@ nns_edge_send (nns_edge_h edge_h, nns_edge_data_h data_h)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  ret = nns_edge_data_get_info (data_h, "client_id", &val);
-  if (ret != NNS_EDGE_ERROR_NONE) {
-    nns_edge_loge ("Cannot find client ID in edge data.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  client_id = strtoll (val, NULL, 10);
-  SAFE_FREE (val);
-
   nns_edge_lock (eh);
 
   if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
@@ -1400,28 +1497,24 @@ nns_edge_send (nns_edge_h edge_h, nns_edge_data_h data_h)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
-  /** @todo update code for each connect type */
-  switch (eh->connect_type) {
-    case NNS_EDGE_CONNECT_TYPE_TCP:
-    case NNS_EDGE_CONNECT_TYPE_HYBRID:
-      conn_data = _nns_edge_get_connection (eh, client_id);
-      if (!conn_data) {
-        nns_edge_loge
-            ("Cannot find connection, invalid client ID or connection closed.");
-        ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
-        break;
-      }
+  if (!eh->send_thread) {
+    nns_edge_loge ("Invalid state, start edge before sending a data.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_IO;
+  }
 
-      ret = _nns_edge_transfer_data (conn_data->sink_conn, data_h, client_id);
-      if (ret != NNS_EDGE_ERROR_NONE)
-        nns_edge_loge ("Failed to send edge data.");
-      break;
-    default:
-      break;
+  /* Create new data handle and push it into send-queue. */
+  nns_edge_data_copy (data_h, &new_data_h);
+
+  if (!nns_edge_queue_push (eh->send_queue, new_data_h,
+          (nns_edge_queue_data_destroy_cb) nns_edge_data_destroy)) {
+    nns_edge_loge ("Failed to send data, cannot push data into queue.");
+    nns_edge_unlock (eh);
+    return NNS_EDGE_ERROR_IO;
   }
 
   nns_edge_unlock (eh);
-  return ret;
+  return NNS_EDGE_ERROR_NONE;
 }
 
 /**
index 698b58c73f6b1ed502ebb8e9b34a1fff0fba76a3..7a8d4ccb054074c37d2fd0dcb1ca2bf079d5859c 100644 (file)
@@ -20,6 +20,7 @@ extern "C" {
 
 #include "nnstreamer-edge.h"
 #include "nnstreamer-edge-common.h"
+#include "nnstreamer-edge-queue.h"
 
 /**
  * @brief Data structure for edge handle.
@@ -51,6 +52,10 @@ typedef struct {
   int listener_fd;
   pthread_t listener_thread;
 
+  /* thread and queue to send data */
+  nns_edge_queue_h send_queue;
+  pthread_t send_thread;
+
   /* MQTT */
   nns_edge_broker_h broker_h;
 } nns_edge_handle_s;