[Custom] Implement custom connection
authorgichan2-jang <gichan2.jang@samsung.com>
Fri, 16 Aug 2024 01:08:10 +0000 (10:08 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Wed, 21 Aug 2024 05:32:25 +0000 (14:32 +0900)
Implement custom connection
@todo: Add unit test case

Signed-off-by: gichan2-jang <gichan2.jang@samsung.com>
include/nnstreamer-edge-custom.h
include/nnstreamer-edge.h
src/libnnstreamer-edge/nnstreamer-edge-internal.c

index 09d6c2fa0e461eb18bc1d3a707efd4f21d3725d6..5c9959f347870eab8bc6d786547a6de6d85db7f1 100644 (file)
@@ -32,7 +32,7 @@ typedef struct _NnsEdgeCustomDef
   int (*nns_edge_custom_set_event_cb) (void *priv, nns_edge_event_cb cb, void *user_data);
   int (*nns_edge_custom_send_data) (void *priv, nns_edge_data_h data_h);
   int (*nns_edge_custom_set_option) (void *priv, const char *key, const char *value);
-  const char *(*nns_edge_custom_get_option) (void *priv, const char *key);
+  char *(*nns_edge_custom_get_option) (void *priv, const char *key);
 } NnsEdgeCustomDef;
 
 void* nns_edge_custom_get_handle ();
index d546df4f473707c68652b3f464d916555da61c16..605c31030bf7bd35d61811be6a6665a17458ba07 100644 (file)
@@ -61,7 +61,6 @@ typedef enum {
   NNS_EDGE_CONNECT_TYPE_MQTT,
   NNS_EDGE_CONNECT_TYPE_HYBRID,
   NNS_EDGE_CONNECT_TYPE_AITT,
-
   NNS_EDGE_CONNECT_TYPE_CUSTOM,
 
   NNS_EDGE_CONNECT_TYPE_UNKNOWN
@@ -179,6 +178,11 @@ typedef void (*nns_edge_data_destroy_cb) (void *data);
  */
 int nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type, nns_edge_node_type_e node_type, nns_edge_h *edge_h);
 
+/**
+ * @brief Create edge custom handle.
+ */
+int nns_edge_custom_create_handle (const char *id, const char *lib_path, nns_edge_node_type_e node_type, nns_edge_h *edge_h);
+
 /**
  * @brief Start the nnstreamer edge. After the start, the edge can accept a new connection or request a connection.
  * @param[in] edge_h The edge handle.
@@ -191,6 +195,18 @@ int nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type
  */
 int nns_edge_start (nns_edge_h edge_h);
 
+/**
+ * @brief Stop the nnstreamer edges.
+ * @param[in] edge_h The edge handle.
+ * @return 0 on success. Otherwise a negative error value.
+ * @retval #NNS_EDGE_ERROR_NONE Successful.
+ * @retval #NNS_EDGE_ERROR_NOT_SUPPORTED Not supported.
+ * @retval #NNS_EDGE_ERROR_OUT_OF_MEMORY Failed to allocate required memory.
+ * @retval #NNS_EDGE_ERROR_INVALID_PARAMETER Given parameter is invalid.
+ * @retval #NNS_EDGE_ERROR_CONNECTION_FAILURE Failed to get socket address.
+ */
+int nns_edge_stop (nns_edge_h edge_h);
+
 /**
  * @brief Release the given edge handle. All the connections are disconnected.
  * @param[in] edge_h The edge handle.
index bf79851c4e45a2f17b6418b972d3a8f83b03a9cd..f9fdfb038c534af82ec960854ee1c12f5a33e0dc 100644 (file)
@@ -13,6 +13,7 @@
 #include <arpa/inet.h>
 #include <netdb.h>
 #include <poll.h>
+#include <dlfcn.h>
 
 #include "nnstreamer-edge-data.h"
 #include "nnstreamer-edge-event.h"
@@ -73,6 +74,7 @@ typedef struct
 
   /* MQTT or AITT handle */
   void *broker_h;
+  void *custom_h;
 } nns_edge_handle_s;
 
 /**
@@ -920,6 +922,14 @@ _nns_edge_send_thread (void *thread_data)
         if (NNS_EDGE_ERROR_NONE != ret)
           nns_edge_loge ("Failed to send data via MQTT connection.");
         break;
+      case NNS_EDGE_CONNECT_TYPE_CUSTOM:
+      {
+        NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h;
+        ret = custom_h->nns_edge_custom_send_data (eh->broker_h, data_h);
+        if (NNS_EDGE_ERROR_NONE != ret)
+          nns_edge_loge ("Failed to send data via MCF connection.");
+        break;
+      }
       default:
         break;
     }
@@ -1255,34 +1265,15 @@ error:
 }
 
 /**
- * @brief Create edge handle.
+ * @brief Internal function to create edge handle.
  */
-int
-nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type,
-    nns_edge_node_type_e node_type, nns_edge_h * edge_h)
+static int
+_nns_edge_create_handle (const char *id, nns_edge_node_type_e node_type,
+    nns_edge_h * edge_h)
 {
   int ret = NNS_EDGE_ERROR_NONE;
   nns_edge_handle_s *eh;
 
-  if (connect_type < 0 || connect_type >= NNS_EDGE_CONNECT_TYPE_UNKNOWN) {
-    nns_edge_loge ("Invalid param, set valid connect type.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  /**
-   * @todo handle flag (receive | send)
-   * e.g., send only case: listener is unnecessary.
-   */
-  if (node_type < 0 || node_type >= NNS_EDGE_NODE_TYPE_UNKNOWN) {
-    nns_edge_loge ("Invalid param, set exact node type.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
-  if (!edge_h) {
-    nns_edge_loge ("Invalid param, edge_h should not be null.");
-    return NNS_EDGE_ERROR_INVALID_PARAMETER;
-  }
-
   eh = (nns_edge_handle_s *) calloc (1, sizeof (nns_edge_handle_s));
   if (!eh) {
     nns_edge_loge ("Failed to allocate memory for edge handle.");
@@ -1294,7 +1285,6 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type,
   nns_edge_handle_set_magic (eh, NNS_EDGE_MAGIC);
   eh->id = STR_IS_VALID (id) ? nns_edge_strdup (id) :
       nns_edge_strdup_printf ("%lld", (long long) nns_edge_generate_id ());
-  eh->connect_type = connect_type;
   eh->host = nns_edge_strdup ("localhost");
   eh->port = 0;
   eh->dest_host = nns_edge_strdup ("localhost");
@@ -1307,6 +1297,7 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type,
   eh->sending = false;
   eh->listener_fd = -1;
   eh->caps_str = nns_edge_strdup ("");
+  eh->custom_h = NULL;
 
   ret = nns_edge_metadata_create (&eh->metadata);
   if (ret != NNS_EDGE_ERROR_NONE) {
@@ -1320,6 +1311,111 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type,
     goto error;
   }
 
+  *edge_h = eh;
+  return NNS_EDGE_ERROR_NONE;
+
+error:
+  nns_edge_release_handle (eh);
+  return ret;
+}
+
+/**
+ * @brief Create edge custom handle.
+ */
+int
+nns_edge_custom_create_handle (const char *id, const char *lib_path,
+    nns_edge_node_type_e node_type, nns_edge_h * edge_h)
+{
+  int ret = NNS_EDGE_ERROR_NONE;
+  nns_edge_handle_s *eh;
+  void *custom_handle;
+  NnsEdgeCustomDef *custom_h;
+
+  if (node_type < 0 || node_type >= NNS_EDGE_NODE_TYPE_UNKNOWN) {
+    nns_edge_loge ("Invalid param, set exact node type.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!STR_IS_VALID (lib_path)) {
+    nns_edge_loge ("Invalid param, given custom lib path is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!edge_h) {
+    nns_edge_loge ("Invalid param, edge_h should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ret = _nns_edge_create_handle (id, node_type, edge_h);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to create edge handle.");
+    return ret;
+  }
+  eh = (nns_edge_handle_s *) * edge_h;
+  eh->connect_type = NNS_EDGE_CONNECT_TYPE_CUSTOM;
+
+  custom_handle = dlopen (lib_path, RTLD_LAZY);
+  if (custom_handle) {
+    void *(*getCustomHandle) () =
+        (void *(*)()) dlsym (custom_handle, "nns_edge_custom_get_handle");
+    if (!getCustomHandle) {
+      nns_edge_loge ("Failed to find nns_edge_custom_get_handle: %s",
+          dlerror ());
+      return NNS_EDGE_ERROR_UNKNOWN;
+    }
+    eh->custom_h = getCustomHandle ();
+  } else {
+    nns_edge_loge ("Failed to open custom handle: %s]\n", dlerror ());
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;;
+  }
+
+  custom_h = (NnsEdgeCustomDef *) eh->custom_h;
+  ret = custom_h->nns_edge_custom_create (&eh->broker_h);
+  if (NNS_EDGE_ERROR_NONE != ret) {
+    nns_edge_loge ("Failed to create custom connection handle.");
+  }
+
+  return ret;
+}
+
+/**
+ * @brief Create edge handle.
+ */
+int
+nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type,
+    nns_edge_node_type_e node_type, nns_edge_h * edge_h)
+{
+  int ret = NNS_EDGE_ERROR_NONE;
+  nns_edge_handle_s *eh;
+
+  if (connect_type < 0 || connect_type >= NNS_EDGE_CONNECT_TYPE_UNKNOWN) {
+    nns_edge_loge ("Invalid param, set valid connect type.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  /**
+   * @todo handle flag (receive | send)
+   * e.g., send only case: listener is unnecessary.
+   */
+  if (node_type < 0 || node_type >= NNS_EDGE_NODE_TYPE_UNKNOWN) {
+    nns_edge_loge ("Invalid param, set exact node type.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!edge_h) {
+    nns_edge_loge ("Invalid param, edge_h should not be null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  ret = _nns_edge_create_handle (id, node_type, edge_h);
+  if (ret != NNS_EDGE_ERROR_NONE) {
+    nns_edge_loge ("Failed to create edge handle.");
+    return ret;
+  }
+  eh = (nns_edge_handle_s *) * edge_h;
+
+  eh->connect_type = connect_type;
+
   if (NNS_EDGE_CONNECT_TYPE_AITT == connect_type) {
     ret = nns_edge_aitt_create (&eh->broker_h);
     if (NNS_EDGE_ERROR_NONE != ret) {
@@ -1328,7 +1424,6 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type,
     }
   }
 
-  *edge_h = eh;
   return NNS_EDGE_ERROR_NONE;
 
 error:
@@ -1358,6 +1453,22 @@ nns_edge_start (nns_edge_h edge_h)
 
   nns_edge_lock (eh);
 
+  if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
+    NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h;
+    ret = custom_h->nns_edge_custom_start (eh->broker_h);
+    if (NNS_EDGE_ERROR_NONE != ret) {
+      nns_edge_loge ("Failed to start edge custom connection");
+    }
+    ret = custom_h->nns_edge_custom_set_event_cb (eh->broker_h, eh->event_cb,
+        eh->user_data);
+    if (NNS_EDGE_ERROR_NONE != ret) {
+      nns_edge_loge ("Failed to set event callback to custom connection.");
+      goto done;
+    }
+    ret = _nns_edge_create_send_thread (eh);
+    goto done;
+  }
+
   if (eh->port <= 0) {
     eh->port = nns_edge_get_available_port ();
     if (eh->port <= 0) {
@@ -1445,6 +1556,41 @@ done:
   return ret;
 }
 
+/**
+ * @brief Stop the nnstreamer edge.
+ */
+int
+nns_edge_stop (nns_edge_h edge_h)
+{
+  nns_edge_handle_s *eh;
+  int ret = NNS_EDGE_ERROR_NONE;
+
+  eh = (nns_edge_handle_s *) edge_h;
+  if (!eh) {
+    nns_edge_loge ("Invalid param, given edge handle is null.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  if (!nns_edge_handle_is_valid (eh)) {
+    nns_edge_loge ("Invalid param, given edge handle is invalid.");
+    return NNS_EDGE_ERROR_INVALID_PARAMETER;
+  }
+
+  nns_edge_lock (eh);
+
+  if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
+    NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h;
+    ret = custom_h->nns_edge_custom_stop (eh->broker_h);
+    if (NNS_EDGE_ERROR_NONE != ret) {
+      nns_edge_loge ("Failed to stop MCF");
+    }
+  }
+
+  eh->is_started = FALSE;
+  nns_edge_unlock (eh);
+  return ret;
+}
+
 /**
  * @brief Release the given handle.
  */
@@ -1464,6 +1610,9 @@ nns_edge_release_handle (nns_edge_h edge_h)
     return NNS_EDGE_ERROR_INVALID_PARAMETER;
   }
 
+  if (eh->is_started)
+    nns_edge_stop (eh);
+
   nns_edge_lock (eh);
 
   switch (eh->connect_type) {
@@ -1478,6 +1627,16 @@ nns_edge_release_handle (nns_edge_h edge_h)
         nns_edge_logw ("Failed to close AITT connection.");
       }
       break;
+    case NNS_EDGE_CONNECT_TYPE_CUSTOM:
+    {
+      NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h;
+      int ret = custom_h->nns_edge_custom_close (eh->broker_h);
+      if (NNS_EDGE_ERROR_NONE != ret) {
+        nns_edge_loge ("Failed to stop MCF");
+      }
+      dlclose (eh->custom_h);
+      break;
+    }
     default:
       break;
   }
@@ -1487,7 +1646,6 @@ nns_edge_release_handle (nns_edge_h edge_h)
   eh->event_cb = NULL;
   eh->user_data = NULL;
   eh->broker_h = NULL;
-  eh->is_started = false;
 
   nns_edge_queue_clear (eh->send_queue);
   if (eh->send_thread) {
@@ -1720,6 +1878,13 @@ nns_edge_connect (nns_edge_h edge_h, const char *dest_host, int dest_port)
       nns_edge_loge ("Failed to subscribe the topic using AITT: %s", eh->topic);
       goto done;
     }
+  } else if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
+    NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h;
+    ret = custom_h->nns_edge_custom_connect (eh->broker_h);
+    if (ret != NNS_EDGE_ERROR_NONE) {
+      nns_edge_loge ("Failed to connect to custom connection.");
+      goto done;
+    }
   } else {
     ret = _nns_edge_connect_to (eh, eh->client_id, dest_host, dest_port);
     if (ret != NNS_EDGE_ERROR_NONE) {
@@ -1786,6 +1951,11 @@ nns_edge_is_connected (nns_edge_h edge_h)
       nns_edge_mqtt_is_connected (eh->broker_h))
     return NNS_EDGE_ERROR_NONE;
 
+  if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
+    NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h;
+    return custom_h->nns_edge_custom_is_connected (eh->broker_h);
+  }
+
   conn_data = (nns_edge_conn_data_s *) eh->connections;
   while (conn_data) {
     conn = conn_data->sink_conn;
@@ -1956,6 +2126,11 @@ nns_edge_set_info (nns_edge_h edge_h, const char *key, const char *value)
     ret = nns_edge_metadata_set (eh->metadata, key, value);
   }
 
+  if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
+    NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h;
+    ret = custom_h->nns_edge_custom_set_option (eh->broker_h, key, value);
+  }
+
   nns_edge_unlock (eh);
   return ret;
 }
@@ -2019,6 +2194,11 @@ nns_edge_get_info (nns_edge_h edge_h, const char *key, char **value)
     ret = nns_edge_metadata_get (eh->metadata, key, value);
   }
 
+  if (NNS_EDGE_CONNECT_TYPE_CUSTOM == eh->connect_type) {
+    NnsEdgeCustomDef *custom_h = (NnsEdgeCustomDef *) eh->custom_h;
+    *value = custom_h->nns_edge_custom_get_option (eh->broker_h, key);
+  }
+
   nns_edge_unlock (eh);
   return ret;
 }