Use thread default context for local execution mode 72/314372/7
authorHwankyu Jhun <h.jhun@samsung.com>
Thu, 11 Jul 2024 02:17:40 +0000 (11:17 +0900)
committerHwankyu Jhun <h.jhun@samsung.com>
Thu, 11 Jul 2024 05:51:46 +0000 (14:51 +0900)
To invoke the event callback properly, the generated code should use
the thread default context instead of the default context.

Change-Id: Ie692be7e320c2e4545f56a7307a08d99c91d0db1
Signed-off-by: Hwankyu Jhun <h.jhun@samsung.com>
idlc/gen/version2/c_proxy_body_generator_cb.hh
idlc/gen/version2/c_stub_body_generator_cb.hh
idlc/gen/version2/cpp_generator_base_cb.hh
idlc/gen/version2/cpp_proxy_body_generator_cb.hh
idlc/gen/version2/cpp_proxy_header_generator_cb.hh
idlc/gen/version2/cpp_stub_body_generator_cb.hh
idlc/gen/version2/cpp_stub_header_generator_cb.hh

index 40602f53bb9bed709234e45baea1930e47696a38..ed3a7c3b645d431d91ca564ec8b74784f5679a11 100644 (file)
@@ -1301,6 +1301,7 @@ typedef struct {
   GQueue *result_queue;
   GQueue *request_queue;
   GRecMutex mutex;
+  GMainContext *thread_context;
 } rpc_port_proxy_lem_s;
 
 typedef rpc_port_proxy_lem_s *rpc_port_proxy_lem_h;
@@ -1364,6 +1365,9 @@ static void rpc_port_proxy_lem_destroy(rpc_port_proxy_lem_h handle)
   if (handle->port_name)
     free(handle->port_name);
 
+  if (handle->thread_context)
+    g_main_context_unref(handle->thread_context);
+
   free(handle);
 }
 
@@ -1417,6 +1421,8 @@ static rpc_port_proxy_lem_h rpc_port_proxy_lem_create(void *context, const char
     return nullptr;
   }
 
+  handle->thread_context = g_main_context_ref_thread_default();
+
   return handle;
 }
 
@@ -1667,27 +1673,50 @@ constexpr const char CB_LEM_PROXY_API[] =
 R"__c_cb(
 EXPORT_API int rpc_port_proxy_<INPUT_FILE>_lem_<IFACE_NAME>_connect(void *h, bool sync)
 {
+  rpc_port_proxy_lem_h handle = h;
+  GSource* source;
+
   if (h == nullptr) {
     _E("Invalid parameter");
     return RPC_PORT_ERROR_INVALID_PARAMETER;
   }
 
-  if (sync)
+  if (sync) {
     rpc_port_proxy_lem_on_connected(h);
-  else
-    g_idle_add(rpc_port_proxy_lem_on_connected, h);
+    return RPC_PORT_ERROR_NONE;
+  }
+
+  source = g_idle_source_new();
+  if (source == nullptr) {
+    _E("Failed to create idle source");
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
+  }
 
+  g_source_set_callback(source, (GSourceFunc)rpc_port_proxy_lem_on_connected, h, nullptr);
+  g_source_attach(source, handle->thread_context);
+  g_source_unref(source);
   return RPC_PORT_ERROR_NONE;
 }
 
 EXPORT_API int rpc_port_proxy_<INPUT_FILE>_lem_<IFACE_NAME>_disconnect(void* h)
 {
+  rpc_port_proxy_lem_h handle = h;
+  GSource* source;
+
   if (h == nullptr) {
     _E("Invalid parameter");
     return RPC_PORT_ERROR_INVALID_PARAMETER;
   }
 
-  g_idle_add(rpc_port_proxy_lem_on_disconnected, h);
+  source = g_idle_source_new();
+  if (source == nullptr) {
+    _E("Failed to create idle source");
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
+  }
+
+  g_source_set_callback(source, (GSourceFunc)rpc_port_proxy_lem_on_disconnected, h, nullptr);
+  g_source_attach(source, handle->thread_context);
+  g_source_unref(source);
   return RPC_PORT_ERROR_NONE;
 }
 
@@ -1695,6 +1724,7 @@ EXPORT_API int rpc_port_proxy_<INPUT_FILE>_lem_<IFACE_NAME>_invoke_callback(void
 {
   rpc_port_proxy_lem_h handle = h;
   rpc_port_parcel_h cloned_parcel;
+  GSource* source;
 
   if (h == nullptr || parcel == nullptr) {
     _E("Invalid parameter");
@@ -1703,7 +1733,16 @@ EXPORT_API int rpc_port_proxy_<INPUT_FILE>_lem_<IFACE_NAME>_invoke_callback(void
 
   cloned_parcel = __rpc_port_parcel_clone(parcel);
   rpc_port_proxy_lem_request_queue_push(handle, cloned_parcel);
-  g_idle_add(rpc_port_proxy_lem_on_received, h);
+
+  source = g_idle_source_new();
+  if (source == nullptr) {
+    _E("Failed to create idle source");
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
+  }
+
+  g_source_set_callback(source, (GSourceFunc)rpc_port_proxy_lem_on_received, handle, nullptr);
+  g_source_attach(source, handle->thread_context);
+  g_source_unref(source);
   return RPC_PORT_ERROR_NONE;
 }
 
index 748217f1acdc6eead11be542e077a6561df575b4..cc4a53548c4c4995d3f0a831cbc5610382fb8431 100644 (file)
@@ -1804,6 +1804,7 @@ typedef struct {
   GQueue *disconnected_queue;
   GQueue *received_queue;
   GRecMutex mutex;
+  GMainContext *context;
 } rpc_port_stub_lem_s;
 
 typedef rpc_port_stub_lem_s *rpc_port_stub_lem_h;
@@ -1997,6 +1998,9 @@ static void rpc_port_stub_lem_destroy(rpc_port_stub_lem_h handle)
   if (handle->port_name)
     free(handle->port_name);
 
+  if (handle->context)
+    g_main_context_unref(handle->context);
+
   free(handle);
 }
 
@@ -2044,6 +2048,7 @@ static rpc_port_stub_lem_h rpc_port_stub_lem_create(const char *port_name, rpc_p
     return nullptr;
   }
 
+  handle->context = g_main_context_ref_thread_default();
   handle->callback = *callback;
   rpc_port_stub_lem_load_symbols(handle);
 
@@ -2239,26 +2244,29 @@ EXPORT_API int rpc_port_stub_<INPUT_FILE>_lem_<IFACE_NAME>_connect(void *context
 {
   rpc_port_stub_lem_h handle = __<IFACE_NAME>_context;
   rpc_port_stub_lem_data_h data;
+  GSource *source;
 
   if (!rpc_port_stub_lem_is_listening(handle)) {
     _E("Server is not ready");
     return RPC_PORT_ERROR_IO_ERROR;
   }
 
-  if (gettid() != getpid()) {
-    data = rpc_port_stub_lem_data_create(context, sender, instance, sync);
-    if (data == nullptr) {
-      _E("Ouf of memory");
-      return RPC_PORT_ERROR_OUT_OF_MEMORY;
-    }
+  data = rpc_port_stub_lem_data_create(context, sender, instance, sync);
+  if (data == nullptr) {
+    _E("Ouf of memory");
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
+  }
 
-    rpc_port_stub_lem_connected_data_push(handle, data);
-    g_idle_add(rpc_port_stub_lem_on_connected, handle);
-    return RPC_PORT_ERROR_NONE;
+  rpc_port_stub_lem_connected_data_push(handle, data);
+  source = g_idle_source_new();
+  if (source == nullptr) {
+    _E("Failed to create idle source");
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
   }
 
-  if (handle->callback.connected)
-    handle->callback.connected(context, sender, instance, sync);
+  g_source_set_callback(source, (GSourceFunc)rpc_port_stub_lem_on_connected, handle, nullptr);
+  g_source_attach(source, handle->context);
+  g_source_unref(source);
 
   return RPC_PORT_ERROR_NONE;
 }
@@ -2267,27 +2275,29 @@ EXPORT_API int rpc_port_stub_<INPUT_FILE>_lem_<IFACE_NAME>_disconnect(void *cont
 {
   rpc_port_stub_lem_h handle = __<IFACE_NAME>_context;
   rpc_port_stub_lem_data_h data;
+  GSource *source;
 
   if (!rpc_port_stub_lem_is_listening(handle)) {
     _E("Server is not ready");
     return RPC_PORT_ERROR_IO_ERROR;
   }
 
-  if (gettid() != getpid()) {
-    data = rpc_port_stub_lem_data_create(context, sender, instance, false);
-    if (data == nullptr) {
-      _E("Ouf of memory");
-      return RPC_PORT_ERROR_OUT_OF_MEMORY;
-    }
-
-    rpc_port_stub_lem_disconnected_data_push(handle, data);
-    g_idle_add(rpc_port_stub_lem_on_disconnected, handle);
-    return RPC_PORT_ERROR_NONE;
+  data = rpc_port_stub_lem_data_create(context, sender, instance, false);
+  if (data == nullptr) {
+    _E("Ouf of memory");
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
   }
 
-  if (handle->callback.disconnected)
-    handle->callback.disconnected(context, sender, instance);
+  rpc_port_stub_lem_disconnected_data_push(handle, data);
+  source = g_idle_source_new();
+  if (source == nullptr) {
+    _E("Failed to create idle source");
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
+  }
 
+  g_source_set_callback(source, (GSourceFunc)rpc_port_stub_lem_on_disconnected, handle, nullptr);
+  g_source_attach(source, handle->context);
+  g_source_unref(source);
   return RPC_PORT_ERROR_NONE;
 }
 
@@ -2295,27 +2305,29 @@ EXPORT_API int rpc_port_stub_<INPUT_FILE>_lem_<IFACE_NAME>_send(void *context, c
 {
   rpc_port_stub_lem_h handle = __<IFACE_NAME>_context;
   rpc_port_stub_lem_data_h data;
+  GSource *source;
 
   if (!rpc_port_stub_lem_is_listening(handle)) {
     _E("Server is not ready");
     return RPC_PORT_ERROR_IO_ERROR;
   }
 
-  if (gettid() != getpid()) {
-    data = rpc_port_stub_lem_data_create_with_parcel(context, sender, instance, parcel);
-    if (data == nullptr) {
-      _E("Ouf of memory");
-      return RPC_PORT_ERROR_OUT_OF_MEMORY;
-    }
-
-    rpc_port_stub_lem_received_data_push(handle, data);
-    g_idle_add(rpc_port_stub_lem_on_received, handle);
-    return RPC_PORT_ERROR_NONE;
+  data = rpc_port_stub_lem_data_create_with_parcel(context, sender, instance, parcel);
+  if (data == nullptr) {
+    _E("Ouf of memory");
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
   }
 
-  if (handle->callback.received)
-    handle->callback.received(context, sender, instance, parcel);
+  rpc_port_stub_lem_received_data_push(handle, data);
+  source = g_idle_source_new();
+  if (source == nullptr) {
+    _E("Failed to create idle source");
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
+  }
 
+  g_source_set_callback(source, (GSourceFunc)rpc_port_stub_lem_on_received, handle, nullptr);
+  g_source_attach(source, handle->context);
+  g_source_unref(source);
   return RPC_PORT_ERROR_NONE;
 }
 )__c_cb";
index 99703bdc4a30f515d2128dc6835b4a0bd6a86090..a8c753c42219fffe8a5d553609a0b93ef45a8193 100644 (file)
@@ -60,6 +60,7 @@ R"__cpp_cb(
 #include <bundle.h>
 #include <rpc-port-parcel.h>
 #include <rpc-port.h>
+#include <glib.h>
 
 #include <atomic>
 #include <functional>
index 302ca1475e88469545c365423a8ca969a408921e..57d83e3f3321cb649b627183cdd4f30f3a5bac6a 100644 (file)
@@ -489,7 +489,7 @@ rpc_port_parcel_h Clone(rpc_port_parcel_h parcel) {
  */
 constexpr const char CB_LEM_BASE[] =
 R"__cpp_cb(
-LocalExecution::LocalExecution(std::string port_name, LocalExecution::IEvent* listener) : port_name_(std::move(port_name)), listener_(listener) {
+LocalExecution::LocalExecution(std::string port_name, LocalExecution::IEvent* listener) : port_name_(std::move(port_name)), listener_(listener), context_(g_main_context_ref_thread_default(), g_main_context_unref) {
   instance_ = GetAppId() + "::" + std::to_string(seq_++);
 }
 
@@ -642,6 +642,8 @@ rpc_port_parcel_h LocalExecution::RequestQueuePop() {
   request_queue_.pop();
   return parcel;
 }
+
+GMainContext* LocalExecution::GetContext() const { return context_.get(); }
 )__cpp_cb";
 
 /**
@@ -659,7 +661,15 @@ EXPORT_API int rpc_port_proxy_<INPUT_FILE>_lem_<CLS_NAME>_connect(void* h, bool
   }
 
   auto* ptr = new std::weak_ptr<rpc_port::<FILE_NAMESPACE>::LocalExecution>(handle->shared_from_this());
-  g_idle_add([](gpointer user_data) {
+  auto* source = g_idle_source_new();
+  if (source == nullptr) {
+    _E("Failed to create idle source");
+    delete ptr;
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
+  }
+
+  g_source_set_callback(source, static_cast<GSourceFunc>(
+      [](gpointer user_data) {
         auto* wp = static_cast<std::weak_ptr<rpc_port::<FILE_NAMESPACE>::LocalExecution>*>(user_data);
         auto p = wp->lock();
         if (p != nullptr)
@@ -667,14 +677,24 @@ EXPORT_API int rpc_port_proxy_<INPUT_FILE>_lem_<CLS_NAME>_connect(void* h, bool
 
         delete wp;
         return G_SOURCE_REMOVE;
-      }, ptr);
+      }), ptr, nullptr);
+  g_source_attach(source, handle->GetContext());
+  g_source_unref(source);
   return RPC_PORT_ERROR_NONE;
 }
 
 EXPORT_API int rpc_port_proxy_<INPUT_FILE>_lem_<CLS_NAME>_disconnect(void* h) {
   auto* handle = static_cast<rpc_port::<FILE_NAMESPACE>::LocalExecution*>(h);
   auto* ptr = new std::weak_ptr<rpc_port::<FILE_NAMESPACE>::LocalExecution>(handle->shared_from_this());
-  g_idle_add([](gpointer user_data) {
+  auto* source = g_idle_source_new();
+  if (source == nullptr) {
+    _E("Failed to create idle source");
+    delete ptr;
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
+  }
+
+  g_source_set_callback(source, static_cast<GSourceFunc>(
+      [](gpointer user_data) {
         auto* wp = static_cast<std::weak_ptr<rpc_port::<FILE_NAMESPACE>::LocalExecution>*>(user_data);
         auto p = wp->lock();
         if (p != nullptr)
@@ -682,7 +702,9 @@ EXPORT_API int rpc_port_proxy_<INPUT_FILE>_lem_<CLS_NAME>_disconnect(void* h) {
 
         delete wp;
         return G_SOURCE_REMOVE;
-      }, ptr);
+      }), ptr, nullptr);
+  g_source_attach(source, handle->GetContext());
+  g_source_unref(source);
   return RPC_PORT_ERROR_NONE;
 }
 
@@ -691,7 +713,15 @@ EXPORT_API int rpc_port_proxy_<INPUT_FILE>_lem_<CLS_NAME>_invoke_callback(void*
   auto* ptr = new std::weak_ptr<rpc_port::<FILE_NAMESPACE>::LocalExecution>(handle->shared_from_this());
   rpc_port_parcel_h cloned_parcel = ::Clone(parcel);
   handle->RequestQueuePush(cloned_parcel);
-  g_idle_add([](gpointer user_data) {
+  auto* source = g_idle_source_new();
+  if (source == nullptr) {
+    _E("Failed to create idle source");
+    delete ptr;
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
+  }
+
+  g_source_set_callback(source, static_cast<GSourceFunc>(
+      [](gpointer user_data) {
         auto* wp = static_cast<std::weak_ptr<rpc_port::<FILE_NAMESPACE>::LocalExecution>*>(user_data);
         auto p = wp->lock();
         if (p != nullptr) {
@@ -702,7 +732,9 @@ EXPORT_API int rpc_port_proxy_<INPUT_FILE>_lem_<CLS_NAME>_invoke_callback(void*
 
         delete wp;
         return G_SOURCE_REMOVE;
-      }, ptr);
+      }), ptr, nullptr);
+  g_source_attach(source, handle->GetContext());
+  g_source_unref(source);
   return RPC_PORT_ERROR_NONE;
 }
 
index 098431f864b02e8618b82d00d4f018bb3b74d5fc..fcd08ef8cf03759c7466bfa41d1c2f8bc7cda184 100644 (file)
@@ -214,7 +214,7 @@ class LocalExecution : public std::enable_shared_from_this<LocalExecution> {
 
   void RequestQueuePush(rpc_port_parcel_h parcel);
   rpc_port_parcel_h RequestQueuePop();
-
+  GMainContext* GetContext() const;
 
  private:
   using StubConnectFunc = int (*)(void*, const char*, const char*, bool);
@@ -234,6 +234,7 @@ class LocalExecution : public std::enable_shared_from_this<LocalExecution> {
   std::queue<rpc_port_parcel_h> result_queue_;
   std::queue<rpc_port_parcel_h> request_queue_;
   mutable std::recursive_mutex mutex_;
+  std::unique_ptr<GMainContext, decltype(g_main_context_unref)*> context_;
 };
 )__cpp_cb";
 
index 42810f10332e9aa40aca7e340422c08cab56ac2c..03ee218d57a3837890dc315d4142fc2b6620827a 100644 (file)
@@ -576,13 +576,23 @@ rpc_port_parcel_h Clone(rpc_port_parcel_h parcel) {
   return handle;
 }
 
-void IdleAddOnce(std::function<void()>* func) {
-  g_idle_add([](gpointer user_data) {
+bool IdleAddOnce(GMainContext* context, std::function<void()>* func) {
+  auto* source = g_idle_source_new();
+  if (source == nullptr) {
+    _E("Failed to create idle source");
+    return false;
+  }
+
+  g_source_set_callback(source, static_cast<GSourceFunc>(
+      [](gpointer user_data) {
         auto* cb = static_cast<std::function<void()>*>(user_data);
         (*cb)();
         delete cb;
         return G_SOURCE_REMOVE;
-      }, func);
+      }), func, nullptr);
+  g_source_attach(source, context);
+  g_source_unref(source);
+  return true;
 }
 
 <LEM_CONTEXT>
@@ -604,7 +614,7 @@ std::shared_ptr<rpc_port::<FILE_NAMESPACE>::LocalExecution> <IFACE_NAME>_context
  */
 constexpr const char CB_LEM_BASE[] =
 R"__cpp_cb(
-LocalExecution::LocalExecution(std::string port_name, LocalExecution::IEvent* listener) : port_name_(std::move(port_name)), listener_(listener) {
+LocalExecution::LocalExecution(std::string port_name, LocalExecution::IEvent* listener) : port_name_(std::move(port_name)), listener_(listener), context_(g_main_context_ref_thread_default(), g_main_context_unref) {
   LoadSymbols();
 }
 
@@ -686,6 +696,8 @@ void LocalExecution::SetListening(bool listening) {
 bool LocalExecution::IsListening() const {
   return listening_;
 }
+
+GMainContext* LocalExecution::GetContext() const { return context_.get(); }
 )__cpp_cb";
 
 /**
@@ -700,15 +712,14 @@ EXPORT_API int rpc_port_stub_<INPUT_FILE>_lem_<CLS_NAME>_connect(void* context,
     return RPC_PORT_ERROR_IO_ERROR;
   }
 
-  if (gettid() != getpid()) {
-    std::string sender_str(sender);
-    std::string instance_str(instance);
-    auto* func = new std::function<void()>([context, sender_str, instance_str, sync] {
-          <CLS_NAME>_context_->OnConnected(context, sender_str, instance_str, sync);
-        });
-    IdleAddOnce(func);
-  } else {
-    <CLS_NAME>_context_->OnConnected(context, sender, instance, sync);
+  std::string sender_str(sender);
+  std::string instance_str(instance);
+  auto* func = new std::function<void()>([context, sender_str, instance_str, sync] {
+        <CLS_NAME>_context_->OnConnected(context, sender_str, instance_str, sync);
+      });
+  if (!IdleAddOnce(<CLS_NAME>_context_->GetContext(), func)) {
+    delete func;
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
   }
 
   return RPC_PORT_ERROR_NONE;
@@ -720,15 +731,14 @@ EXPORT_API int rpc_port_stub_<INPUT_FILE>_lem_<CLS_NAME>_disconnect(void* contex
     return RPC_PORT_ERROR_IO_ERROR;
   }
 
-  if (gettid() != getpid()) {
-    std::string sender_str(sender);
-    std::string instance_str(instance);
-    auto* func = new std::function<void()>([context, sender_str, instance_str] {
-          <CLS_NAME>_context_->OnDisconnected(context, sender_str, instance_str);
-        });
-    IdleAddOnce(func);
-  } else {
-    <CLS_NAME>_context_->OnDisconnected(context, sender, instance);
+  std::string sender_str(sender);
+  std::string instance_str(instance);
+  auto* func = new std::function<void()>([context, sender_str, instance_str] {
+        <CLS_NAME>_context_->OnDisconnected(context, sender_str, instance_str);
+      });
+  if (!IdleAddOnce(<CLS_NAME>_context_->GetContext(), func)) {
+    delete func;
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
   }
 
   return RPC_PORT_ERROR_NONE;
@@ -741,13 +751,12 @@ EXPORT_API int rpc_port_stub_<INPUT_FILE>_lem_<CLS_NAME>_send(void* context, rpc
   }
 
   rpc_port_parcel_h cloned_parcel = Clone(parcel);
-  if (getpid() != getpid()) {
-    auto* func = new std::function<void()>([context, cloned_parcel] {
-          <CLS_NAME>_context_->OnReceived(context, cloned_parcel);
-        });
-    IdleAddOnce(func);
-  } else {
-    <CLS_NAME>_context_->OnReceived(context, cloned_parcel);
+  auto* func = new std::function<void()>([context, cloned_parcel] {
+        <CLS_NAME>_context_->OnReceived(context, cloned_parcel);
+      });
+  if (!IdleAddOnce(<CLS_NAME>_context_->GetContext(), func)) {
+    delete func;
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
   }
 
   return RPC_PORT_ERROR_NONE;
index 7049129de179ba9ec80e54173beef2c1bb047d6c..4eea2d66b7072f8da2668ac536b65371646a3e54 100644 (file)
@@ -423,6 +423,7 @@ class LocalExecution {
   bool IsListening() const;
 
   bool LoadSymbols();
+  GMainContext* GetContext() const;
 
  private:
   using ProxyConnectFunc = int (*)(void*, bool);
@@ -439,6 +440,7 @@ class LocalExecution {
   ProxyDisconnectFunc disconnect_func_ = nullptr;
   ProxySendResultFunc send_result_func_ = nullptr;
   ProxyInvokeCallbackFunc invoke_callback_func_ = nullptr;
+  std::unique_ptr<GMainContext, decltype(g_main_context_unref)*> context_;
 };
 )__cpp_cb";