[ml-service] Change node type
authorgichan2-jang <gichan2.jang@samsung.com>
Wed, 13 Mar 2024 05:19:42 +0000 (14:19 +0900)
committerjaeyun-jung <39614140+jaeyun-jung@users.noreply.github.com>
Thu, 4 Apr 2024 07:01:26 +0000 (16:01 +0900)
Change ml-service node type from pub/sub to query server/client to send
reply.

Signed-off-by: gichan2-jang <gichan2.jang@samsung.com>
c/src/ml-api-service-offloading.c
c/src/ml-api-service-offloading.h
c/src/ml-api-service-private.h
c/src/ml-api-service.c
tests/capi/unittest_capi_service_offloading.cc
tests/test_models/config/service_offloading_receiver.conf
tests/test_models/config/service_offloading_sender.conf

index 88d063f..f53997a 100644 (file)
@@ -38,6 +38,7 @@ typedef enum
   ML_SERVICE_OFFLOADING_TYPE_MODEL_URI,
   ML_SERVICE_OFFLOADING_TYPE_PIPELINE_RAW,
   ML_SERVICE_OFFLOADING_TYPE_PIPELINE_URI,
+  ML_SERVICE_OFFLOADING_TYPE_REPLY,
 
   ML_SERVICE_OFFLOADING_TYPE_MAX
 } ml_service_offloading_type_e;
@@ -82,9 +83,9 @@ _mlrs_get_node_type (const gchar * value)
     return node_type;
 
   if (g_ascii_strcasecmp (value, "sender") == 0) {
-    node_type = NNS_EDGE_NODE_TYPE_PUB;
+    node_type = NNS_EDGE_NODE_TYPE_QUERY_CLIENT;
   } else if (g_ascii_strcasecmp (value, "receiver") == 0) {
-    node_type = NNS_EDGE_NODE_TYPE_SUB;
+    node_type = NNS_EDGE_NODE_TYPE_QUERY_SERVER;
   } else {
     _ml_error_report ("Invalid node type: %s, Please check ml_option.", value);
   }
@@ -204,6 +205,8 @@ _mlrs_get_service_type (gchar * service_str)
     service_type = ML_SERVICE_OFFLOADING_TYPE_PIPELINE_RAW;
   } else if (g_ascii_strcasecmp (service_str, "pipeline_uri") == 0) {
     service_type = ML_SERVICE_OFFLOADING_TYPE_PIPELINE_URI;
+  } else if (g_ascii_strcasecmp (service_str, "reply") == 0) {
+    service_type = ML_SERVICE_OFFLOADING_TYPE_REPLY;
   } else {
     _ml_error_report ("Invalid service type: %s, Please check service type.",
         service_str);
@@ -365,6 +368,7 @@ _mlrs_process_service_offloading (nns_edge_data_h data_h, void *user_data)
   _ml_service_offloading_s *offloading_s =
       (_ml_service_offloading_s *) mls->priv;
   ml_service_event_e event_type = ML_SERVICE_EVENT_UNKNOWN;
+  ml_information_h info_h = NULL;
 
   ret = nns_edge_data_get (data_h, 0, &data, &data_len);
   if (NNS_EDGE_ERROR_NONE != ret) {
@@ -455,6 +459,20 @@ _mlrs_process_service_offloading (nns_edge_data_h data_h, void *user_data)
         event_type = ML_SERVICE_EVENT_PIPELINE_REGISTERED;
       }
       break;
+    case ML_SERVICE_OFFLOADING_TYPE_REPLY:
+    {
+      ret = _ml_information_create (&info_h);
+      if (ML_ERROR_NONE != ret) {
+        _ml_error_report_return (ret, "Failed to create information handle. ");
+      }
+      ret = _ml_information_set (info_h, "data", (void *) data, NULL);
+      if (ML_ERROR_NONE != ret) {
+        _ml_error_report ("Failed to set data information.");
+        goto done;
+      }
+      event_type = ML_SERVICE_EVENT_REPLY;
+      break;
+    }
     default:
       _ml_error_report ("Unknown service type or not supported yet. "
           "Service num: %d", service_type);
@@ -463,10 +481,16 @@ _mlrs_process_service_offloading (nns_edge_data_h data_h, void *user_data)
 
   if (mls && event_type != ML_SERVICE_EVENT_UNKNOWN) {
     if (mls->cb_info.cb) {
-      mls->cb_info.cb (event_type, NULL, mls->cb_info.pdata);
+      mls->cb_info.cb (event_type, info_h, mls->cb_info.pdata);
     }
   }
 
+done:
+  if (info_h) {
+    ret = ml_information_destroy (info_h);
+    _ml_error_report ("Failed to destroy service info handle.");
+  }
+
   return ret;
 }
 
@@ -485,7 +509,8 @@ _mlrs_edge_event_cb (nns_edge_event_h event_h, void *user_data)
     return ret;
 
   switch (event) {
-    case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:{
+    case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
+    {
       ret = nns_edge_event_parse_new_data (event_h, &data_h);
       if (NNS_EDGE_ERROR_NONE != ret)
         return ret;
@@ -538,7 +563,7 @@ _mlrs_create_edge_handle (ml_service_s * mls, edge_info_s * edge_info)
     return ret;
   }
 
-  if (edge_info->node_type == NNS_EDGE_NODE_TYPE_SUB) {
+  if (edge_info->node_type == NNS_EDGE_NODE_TYPE_QUERY_CLIENT) {
     ret = nns_edge_connect (edge_h, edge_info->dest_host, edge_info->dest_port);
 
     if (NNS_EDGE_ERROR_NONE != ret) {
@@ -581,10 +606,11 @@ ml_service_offloading_release_internal (ml_service_s * mls)
 }
 
 /**
- * @brief Set value in ml-service remote handle.
+ * @brief Set value in ml-service offloading handle.
  */
 int
-ml_service_remote_set_information (ml_service_h handle, const gchar * name, const gchar * value)
+ml_service_offloading_set_information (ml_service_h handle, const gchar * name,
+    const gchar * value)
 {
   ml_service_s *mls = (ml_service_s *) handle;
   _ml_service_offloading_s *mlrs = (_ml_service_offloading_s *) mls->priv;
@@ -646,7 +672,7 @@ ml_service_offloading_create (ml_service_h handle, ml_option_h option)
   }
 
   if (ML_ERROR_NONE == ml_option_get (option, "path", (void **) (&_path))) {
-    ret = ml_service_remote_set_information (mls, "path", _path);
+    ret = ml_service_offloading_set_information (mls, "path", _path);
     if (ML_ERROR_NONE != ret) {
       _ml_error_report_return (ret,
           "Failed to set path in ml-service offloading handle.");
index 300d1c8..3b2ca13 100644 (file)
@@ -72,7 +72,7 @@ int ml_service_offloading_set_service (ml_service_h handle, const char *name, co
  * @retval #ML_ERROR_NOT_SUPPORTED Not supported.
  * @retval #ML_ERROR_INVALID_PARAMETER Given parameter is invalid.
  */
-int ml_service_remote_set_information (ml_service_h handle, const char *name, const char *value);
+int ml_service_offloading_set_information (ml_service_h handle, const char *name, const char *value);
 
 #ifdef __cplusplus
 }
index a7d6a52..8220c85 100644 (file)
@@ -31,6 +31,7 @@ extern "C" {
  */
 #define ML_SERVICE_EVENT_MODEL_REGISTERED 2
 #define ML_SERVICE_EVENT_PIPELINE_REGISTERED 3
+#define ML_SERVICE_EVENT_REPLY 4
 
 /**
  * @brief Enumeration for ml-service type.
index 275eeea..4fef6fe 100644 (file)
@@ -70,7 +70,7 @@ _ml_service_set_information_internal (ml_service_s * mls, const char *name,
       break;
     case ML_SERVICE_TYPE_OFFLOADING:
     {
-      status = ml_service_remote_set_information (mls, name, value);
+      status = ml_service_offloading_set_information (mls, name, value);
       break;
     }
     default:
index 804d2ff..1afb92b 100644 (file)
 #include "ml-api-service-offloading.h"
 
 /**
+ * @brief Structure for ml-service event callback.
+ */
+typedef struct {
+  ml_service_h handle;
+  void *data;
+  gboolean received_reply;
+} _ml_service_test_data_s;
+
+/**
  * @brief Internal function to get the config file path.
  */
 static gchar *
@@ -48,6 +57,7 @@ class MLOffloadingService : public ::testing::Test
   int status;
   ml_service_h client_h;
   ml_service_h server_h;
+  _ml_service_test_data_s test_data;
 
   public:
   /**
@@ -65,14 +75,15 @@ class MLOffloadingService : public ::testing::Test
 
     g_test_dbus_up (dbus);
 
-    g_autofree gchar *sender_config = _get_config_path ("service_offloading_sender.conf");
-    status = ml_service_new (sender_config, &client_h);
-    ASSERT_EQ (status, ML_ERROR_NONE);
-
     g_autofree gchar *receiver_config
         = _get_config_path ("service_offloading_receiver.conf");
     status = ml_service_new (receiver_config, &server_h);
     ASSERT_EQ (status, ML_ERROR_NONE);
+    test_data.handle = server_h;
+
+    g_autofree gchar *sender_config = _get_config_path ("service_offloading_sender.conf");
+    status = ml_service_new (sender_config, &client_h);
+    ASSERT_EQ (status, ML_ERROR_NONE);
   }
 
   /**
@@ -129,6 +140,7 @@ static void
 _ml_service_event_cb (ml_service_event_e event, ml_information_h event_data, void *user_data)
 {
   int status;
+  _ml_service_test_data_s *test_data = (_ml_service_test_data_s *) user_data;
 
   /** @todo remove typecast to int after new event type is added. */
   switch ((int) event) {
@@ -138,7 +150,7 @@ _ml_service_event_cb (ml_service_event_e event, ml_information_h event_data, voi
         const gchar *service_key = "pipeline_registration_test_key";
         status = ml_service_pipeline_get (service_key, &ret_pipeline);
         EXPECT_EQ (ML_ERROR_NONE, status);
-        EXPECT_STREQ ((gchar *) user_data, ret_pipeline);
+        EXPECT_STREQ ((gchar *) test_data->data, ret_pipeline);
         break;
       }
     case ML_SERVICE_EVENT_MODEL_REGISTERED:
@@ -159,7 +171,8 @@ _ml_service_event_cb (ml_service_event_e event, ml_information_h event_data, voi
         gsize activated_model_len = 0;
         EXPECT_TRUE (g_file_get_contents (activated_model_path,
             &activated_model_contents, &activated_model_len, NULL));
-        EXPECT_EQ (memcmp ((gchar *) user_data, activated_model_contents, activated_model_len), 0);
+        EXPECT_EQ (memcmp ((gchar *) test_data->data, activated_model_contents, activated_model_len),
+            0);
 
         status = g_remove (activated_model_path);
         EXPECT_TRUE (status == 0);
@@ -182,7 +195,8 @@ TEST_F (MLOffloadingService, registerPipeline)
   ml_tensor_dimension in_dim = { 0 };
 
   gchar *pipeline_desc = g_strdup ("fakesrc ! fakesink");
-  status = ml_service_set_event_cb (server_h, _ml_service_event_cb, pipeline_desc);
+  test_data.data = pipeline_desc;
+  status = ml_service_set_event_cb (server_h, _ml_service_event_cb, &test_data);
   EXPECT_EQ (status, ML_ERROR_NONE);
 
   status = ml_tensors_info_create (&in_info);
@@ -224,7 +238,8 @@ TEST_F (MLOffloadingService, registerPipelineURI)
   ml_tensor_dimension in_dim = { 0 };
 
   g_autofree gchar *pipeline_desc = g_strdup ("fakesrc ! fakesink");
-  status = ml_service_set_event_cb (server_h, _ml_service_event_cb, pipeline_desc);
+  test_data.data = pipeline_desc;
+  status = ml_service_set_event_cb (server_h, _ml_service_event_cb, &test_data);
   EXPECT_EQ (status, ML_ERROR_NONE);
 
   gchar *current_dir = g_get_current_dir ();
@@ -345,7 +360,8 @@ TEST_F (MLOffloadingService, registerModel)
   gsize len = 0;
   EXPECT_TRUE (g_file_get_contents (test_model, &contents, &len, NULL));
 
-  status = ml_service_set_event_cb (server_h, _ml_service_event_cb, contents);
+  test_data.data = contents;
+  status = ml_service_set_event_cb (server_h, _ml_service_event_cb, &test_data);
   EXPECT_EQ (status, ML_ERROR_NONE);
 
   status = ml_tensors_info_create (&in_info);
@@ -395,7 +411,8 @@ TEST_F (MLOffloadingService, registerModelURI)
   gsize len = 0;
   EXPECT_TRUE (g_file_get_contents (test_model_path, &contents, &len, NULL));
 
-  status = ml_service_set_event_cb (server_h, _ml_service_event_cb, contents);
+  test_data.data = contents;
+  status = ml_service_set_event_cb (server_h, _ml_service_event_cb, &test_data);
   EXPECT_EQ (status, ML_ERROR_NONE);
 
   g_autofree gchar *model_uri = g_strdup_printf ("file://%s", test_model_path);
@@ -455,7 +472,8 @@ TEST_F (MLOffloadingService, registerModelPath)
   gsize len = 0;
   EXPECT_TRUE (g_file_get_contents (test_model, &contents, &len, NULL));
 
-  status = ml_service_set_event_cb (server_h, _ml_service_event_cb, contents);
+  test_data.data = contents;
+  status = ml_service_set_event_cb (server_h, _ml_service_event_cb, &test_data);
   EXPECT_EQ (status, ML_ERROR_NONE);
 
   status = ml_tensors_info_create (&in_info);
@@ -518,6 +536,93 @@ TEST_F (MLOffloadingService, requestInvalidParam_n)
   EXPECT_EQ (ML_ERROR_NONE, status);
 }
 
+
+/**
+ * @brief Callback function for reply test.
+ */
+static void
+_ml_service_reply_test_cb (ml_service_event_e event, ml_information_h event_data, void *user_data)
+{
+  int status;
+
+  switch ((int) event) {
+    case ML_SERVICE_EVENT_PIPELINE_REGISTERED:
+      {
+        _ml_service_test_data_s *test_data = (_ml_service_test_data_s *) user_data;
+        g_autofree gchar *ret_pipeline = NULL;
+        void *_data;
+        size_t _size;
+        const gchar *service_key = "pipeline_registration_test_key";
+        status = ml_service_pipeline_get (service_key, &ret_pipeline);
+        EXPECT_EQ (ML_ERROR_NONE, status);
+        status = ml_tensors_data_get_tensor_data (test_data->data, 0, &_data, &_size);
+        EXPECT_EQ (ML_ERROR_NONE, status);
+        EXPECT_STREQ ((gchar *) _data, ret_pipeline);
+
+        ml_service_request (test_data->handle, "reply_to_client", test_data->data);
+        break;
+      }
+    case ML_SERVICE_EVENT_REPLY:
+      {
+        gint *received = (gint *) user_data;
+        (*received)++;
+        break;
+      }
+    default:
+      break;
+  }
+}
+
+/**
+ * @brief use case of replying to client.
+ */
+TEST_F (MLOffloadingService, replyToClient)
+{
+  ml_tensors_data_h input = NULL;
+  ml_tensors_info_h in_info = NULL;
+  ml_tensor_dimension in_dim = { 0 };
+  gint received = 0;
+
+  gchar *pipeline_desc = g_strdup ("fakesrc ! fakesink");
+
+  status = ml_tensors_info_create (&in_info);
+  EXPECT_EQ (ML_ERROR_NONE, status);
+  ml_tensors_info_set_count (in_info, 1);
+  ml_tensors_info_set_tensor_type (in_info, 0, ML_TENSOR_TYPE_UINT8);
+  in_dim[0] = strlen (pipeline_desc) + 1;
+  ml_tensors_info_set_tensor_dimension (in_info, 0, in_dim);
+  status = ml_tensors_data_create (in_info, &input);
+  EXPECT_EQ (ML_ERROR_NONE, status);
+  status = ml_tensors_data_set_tensor_data (
+      input, 0, pipeline_desc, strlen (pipeline_desc) + 1);
+  EXPECT_EQ (ML_ERROR_NONE, status);
+
+  test_data.data = input;
+  status = ml_service_set_event_cb (server_h, _ml_service_reply_test_cb, &test_data);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+
+  status = ml_service_set_event_cb (client_h, _ml_service_reply_test_cb, &received);
+  EXPECT_EQ (status, ML_ERROR_NONE);
+
+  status = ml_service_request (client_h, "pipeline_registration_raw", input);
+  EXPECT_EQ (ML_ERROR_NONE, status);
+
+  /* Wait for the server to register and check the result. */
+  g_usleep (1000000);
+
+  EXPECT_GT (received, 0);
+
+  status = ml_service_pipeline_delete ("pipeline_registration_test_key");
+  EXPECT_TRUE (status == ML_ERROR_NONE);
+
+  status = ml_tensors_info_destroy (in_info);
+  EXPECT_EQ (ML_ERROR_NONE, status);
+  status = ml_tensors_data_destroy (input);
+  EXPECT_EQ (ML_ERROR_NONE, status);
+
+  g_free (pipeline_desc);
+}
+
 /**
  * @brief Main gtest
  */
index 5303487..1f34497 100644 (file)
@@ -2,9 +2,17 @@
     "offloading" :
     {
         "node-type" : "receiver",
-        "dest-host" : "127.0.0.1",
-        "dest-port" : "3001",
+        "host" : "127.0.0.1",
+        "port" : "3000",
         "connect-type" : "TCP",
         "topic" : "offloading_service_test_topic"
+    },
+    "services" :
+    {
+        "reply_to_client" :
+        {
+            "service-type" : "reply",
+            "service-key" : "reply_to_client"
+        }
     }
 }
index 337ae8e..1245451 100644 (file)
@@ -2,8 +2,8 @@
     "offloading" :
     {
         "node-type" : "sender",
-        "host" : "127.0.0.1",
-        "port" : "3001",
+        "dest-host" : "127.0.0.1",
+        "dest-port" : "3000",
         "connect-type" : "TCP",
         "topic" : "offloading_service_test_topic"
     },