GQueue *result_queue;
GQueue *request_queue;
GRecMutex mutex;
+ GMainContext *thread_context;
} rpc_port_proxy_lem_s;
typedef rpc_port_proxy_lem_s *rpc_port_proxy_lem_h;
if (handle->port_name)
free(handle->port_name);
+ if (handle->thread_context)
+ g_main_context_unref(handle->thread_context);
+
free(handle);
}
return nullptr;
}
+ handle->thread_context = g_main_context_ref_thread_default();
+
return handle;
}
R"__c_cb(
EXPORT_API int rpc_port_proxy_<INPUT_FILE>_lem_<IFACE_NAME>_connect(void *h, bool sync)
{
+ rpc_port_proxy_lem_h handle = h;
+ GSource* source;
+
if (h == nullptr) {
_E("Invalid parameter");
return RPC_PORT_ERROR_INVALID_PARAMETER;
}
- if (sync)
+ if (sync) {
rpc_port_proxy_lem_on_connected(h);
- else
- g_idle_add(rpc_port_proxy_lem_on_connected, h);
+ return RPC_PORT_ERROR_NONE;
+ }
+
+ source = g_idle_source_new();
+ if (source == nullptr) {
+ _E("Failed to create idle source");
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
+ }
+ g_source_set_callback(source, (GSourceFunc)rpc_port_proxy_lem_on_connected, h, nullptr);
+ g_source_attach(source, handle->thread_context);
+ g_source_unref(source);
return RPC_PORT_ERROR_NONE;
}
EXPORT_API int rpc_port_proxy_<INPUT_FILE>_lem_<IFACE_NAME>_disconnect(void* h)
{
+ rpc_port_proxy_lem_h handle = h;
+ GSource* source;
+
if (h == nullptr) {
_E("Invalid parameter");
return RPC_PORT_ERROR_INVALID_PARAMETER;
}
- g_idle_add(rpc_port_proxy_lem_on_disconnected, h);
+ source = g_idle_source_new();
+ if (source == nullptr) {
+ _E("Failed to create idle source");
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
+ }
+
+ g_source_set_callback(source, (GSourceFunc)rpc_port_proxy_lem_on_disconnected, h, nullptr);
+ g_source_attach(source, handle->thread_context);
+ g_source_unref(source);
return RPC_PORT_ERROR_NONE;
}
{
rpc_port_proxy_lem_h handle = h;
rpc_port_parcel_h cloned_parcel;
+ GSource* source;
if (h == nullptr || parcel == nullptr) {
_E("Invalid parameter");
cloned_parcel = __rpc_port_parcel_clone(parcel);
rpc_port_proxy_lem_request_queue_push(handle, cloned_parcel);
- g_idle_add(rpc_port_proxy_lem_on_received, h);
+
+ source = g_idle_source_new();
+ if (source == nullptr) {
+ _E("Failed to create idle source");
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
+ }
+
+ g_source_set_callback(source, (GSourceFunc)rpc_port_proxy_lem_on_received, handle, nullptr);
+ g_source_attach(source, handle->thread_context);
+ g_source_unref(source);
return RPC_PORT_ERROR_NONE;
}
GQueue *disconnected_queue;
GQueue *received_queue;
GRecMutex mutex;
+ GMainContext *context;
} rpc_port_stub_lem_s;
typedef rpc_port_stub_lem_s *rpc_port_stub_lem_h;
if (handle->port_name)
free(handle->port_name);
+ if (handle->context)
+ g_main_context_unref(handle->context);
+
free(handle);
}
return nullptr;
}
+ handle->context = g_main_context_ref_thread_default();
handle->callback = *callback;
rpc_port_stub_lem_load_symbols(handle);
{
rpc_port_stub_lem_h handle = __<IFACE_NAME>_context;
rpc_port_stub_lem_data_h data;
+ GSource *source;
if (!rpc_port_stub_lem_is_listening(handle)) {
_E("Server is not ready");
return RPC_PORT_ERROR_IO_ERROR;
}
- if (gettid() != getpid()) {
- data = rpc_port_stub_lem_data_create(context, sender, instance, sync);
- if (data == nullptr) {
- _E("Ouf of memory");
- return RPC_PORT_ERROR_OUT_OF_MEMORY;
- }
+ data = rpc_port_stub_lem_data_create(context, sender, instance, sync);
+ if (data == nullptr) {
+ _E("Ouf of memory");
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
+ }
- rpc_port_stub_lem_connected_data_push(handle, data);
- g_idle_add(rpc_port_stub_lem_on_connected, handle);
- return RPC_PORT_ERROR_NONE;
+ rpc_port_stub_lem_connected_data_push(handle, data);
+ source = g_idle_source_new();
+ if (source == nullptr) {
+ _E("Failed to create idle source");
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
}
- if (handle->callback.connected)
- handle->callback.connected(context, sender, instance, sync);
+ g_source_set_callback(source, (GSourceFunc)rpc_port_stub_lem_on_connected, handle, nullptr);
+ g_source_attach(source, handle->context);
+ g_source_unref(source);
return RPC_PORT_ERROR_NONE;
}
{
rpc_port_stub_lem_h handle = __<IFACE_NAME>_context;
rpc_port_stub_lem_data_h data;
+ GSource *source;
if (!rpc_port_stub_lem_is_listening(handle)) {
_E("Server is not ready");
return RPC_PORT_ERROR_IO_ERROR;
}
- if (gettid() != getpid()) {
- data = rpc_port_stub_lem_data_create(context, sender, instance, false);
- if (data == nullptr) {
- _E("Ouf of memory");
- return RPC_PORT_ERROR_OUT_OF_MEMORY;
- }
-
- rpc_port_stub_lem_disconnected_data_push(handle, data);
- g_idle_add(rpc_port_stub_lem_on_disconnected, handle);
- return RPC_PORT_ERROR_NONE;
+ data = rpc_port_stub_lem_data_create(context, sender, instance, false);
+ if (data == nullptr) {
+ _E("Ouf of memory");
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
}
- if (handle->callback.disconnected)
- handle->callback.disconnected(context, sender, instance);
+ rpc_port_stub_lem_disconnected_data_push(handle, data);
+ source = g_idle_source_new();
+ if (source == nullptr) {
+ _E("Failed to create idle source");
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
+ }
+ g_source_set_callback(source, (GSourceFunc)rpc_port_stub_lem_on_disconnected, handle, nullptr);
+ g_source_attach(source, handle->context);
+ g_source_unref(source);
return RPC_PORT_ERROR_NONE;
}
{
rpc_port_stub_lem_h handle = __<IFACE_NAME>_context;
rpc_port_stub_lem_data_h data;
+ GSource *source;
if (!rpc_port_stub_lem_is_listening(handle)) {
_E("Server is not ready");
return RPC_PORT_ERROR_IO_ERROR;
}
- if (gettid() != getpid()) {
- data = rpc_port_stub_lem_data_create_with_parcel(context, sender, instance, parcel);
- if (data == nullptr) {
- _E("Ouf of memory");
- return RPC_PORT_ERROR_OUT_OF_MEMORY;
- }
-
- rpc_port_stub_lem_received_data_push(handle, data);
- g_idle_add(rpc_port_stub_lem_on_received, handle);
- return RPC_PORT_ERROR_NONE;
+ data = rpc_port_stub_lem_data_create_with_parcel(context, sender, instance, parcel);
+ if (data == nullptr) {
+ _E("Ouf of memory");
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
}
- if (handle->callback.received)
- handle->callback.received(context, sender, instance, parcel);
+ rpc_port_stub_lem_received_data_push(handle, data);
+ source = g_idle_source_new();
+ if (source == nullptr) {
+ _E("Failed to create idle source");
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
+ }
+ g_source_set_callback(source, (GSourceFunc)rpc_port_stub_lem_on_received, handle, nullptr);
+ g_source_attach(source, handle->context);
+ g_source_unref(source);
return RPC_PORT_ERROR_NONE;
}
)__c_cb";
#include <bundle.h>
#include <rpc-port-parcel.h>
#include <rpc-port.h>
+#include <glib.h>
#include <atomic>
#include <functional>
*/
constexpr const char CB_LEM_BASE[] =
R"__cpp_cb(
-LocalExecution::LocalExecution(std::string port_name, LocalExecution::IEvent* listener) : port_name_(std::move(port_name)), listener_(listener) {
+LocalExecution::LocalExecution(std::string port_name, LocalExecution::IEvent* listener) : port_name_(std::move(port_name)), listener_(listener), context_(g_main_context_ref_thread_default(), g_main_context_unref) {
instance_ = GetAppId() + "::" + std::to_string(seq_++);
}
request_queue_.pop();
return parcel;
}
+
+GMainContext* LocalExecution::GetContext() const { return context_.get(); }
)__cpp_cb";
/**
}
auto* ptr = new std::weak_ptr<rpc_port::<FILE_NAMESPACE>::LocalExecution>(handle->shared_from_this());
- g_idle_add([](gpointer user_data) {
+ auto* source = g_idle_source_new();
+ if (source == nullptr) {
+ _E("Failed to create idle source");
+ delete ptr;
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
+ }
+
+ g_source_set_callback(source, static_cast<GSourceFunc>(
+ [](gpointer user_data) {
auto* wp = static_cast<std::weak_ptr<rpc_port::<FILE_NAMESPACE>::LocalExecution>*>(user_data);
auto p = wp->lock();
if (p != nullptr)
delete wp;
return G_SOURCE_REMOVE;
- }, ptr);
+ }), ptr, nullptr);
+ g_source_attach(source, handle->GetContext());
+ g_source_unref(source);
return RPC_PORT_ERROR_NONE;
}
EXPORT_API int rpc_port_proxy_<INPUT_FILE>_lem_<CLS_NAME>_disconnect(void* h) {
auto* handle = static_cast<rpc_port::<FILE_NAMESPACE>::LocalExecution*>(h);
auto* ptr = new std::weak_ptr<rpc_port::<FILE_NAMESPACE>::LocalExecution>(handle->shared_from_this());
- g_idle_add([](gpointer user_data) {
+ auto* source = g_idle_source_new();
+ if (source == nullptr) {
+ _E("Failed to create idle source");
+ delete ptr;
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
+ }
+
+ g_source_set_callback(source, static_cast<GSourceFunc>(
+ [](gpointer user_data) {
auto* wp = static_cast<std::weak_ptr<rpc_port::<FILE_NAMESPACE>::LocalExecution>*>(user_data);
auto p = wp->lock();
if (p != nullptr)
delete wp;
return G_SOURCE_REMOVE;
- }, ptr);
+ }), ptr, nullptr);
+ g_source_attach(source, handle->GetContext());
+ g_source_unref(source);
return RPC_PORT_ERROR_NONE;
}
auto* ptr = new std::weak_ptr<rpc_port::<FILE_NAMESPACE>::LocalExecution>(handle->shared_from_this());
rpc_port_parcel_h cloned_parcel = ::Clone(parcel);
handle->RequestQueuePush(cloned_parcel);
- g_idle_add([](gpointer user_data) {
+ auto* source = g_idle_source_new();
+ if (source == nullptr) {
+ _E("Failed to create idle source");
+ delete ptr;
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
+ }
+
+ g_source_set_callback(source, static_cast<GSourceFunc>(
+ [](gpointer user_data) {
auto* wp = static_cast<std::weak_ptr<rpc_port::<FILE_NAMESPACE>::LocalExecution>*>(user_data);
auto p = wp->lock();
if (p != nullptr) {
delete wp;
return G_SOURCE_REMOVE;
- }, ptr);
+ }), ptr, nullptr);
+ g_source_attach(source, handle->GetContext());
+ g_source_unref(source);
return RPC_PORT_ERROR_NONE;
}
void RequestQueuePush(rpc_port_parcel_h parcel);
rpc_port_parcel_h RequestQueuePop();
-
+ GMainContext* GetContext() const;
private:
using StubConnectFunc = int (*)(void*, const char*, const char*, bool);
std::queue<rpc_port_parcel_h> result_queue_;
std::queue<rpc_port_parcel_h> request_queue_;
mutable std::recursive_mutex mutex_;
+ std::unique_ptr<GMainContext, decltype(g_main_context_unref)*> context_;
};
)__cpp_cb";
return handle;
}
-void IdleAddOnce(std::function<void()>* func) {
- g_idle_add([](gpointer user_data) {
+bool IdleAddOnce(GMainContext* context, std::function<void()>* func) {
+ auto* source = g_idle_source_new();
+ if (source == nullptr) {
+ _E("Failed to create idle source");
+ return false;
+ }
+
+ g_source_set_callback(source, static_cast<GSourceFunc>(
+ [](gpointer user_data) {
auto* cb = static_cast<std::function<void()>*>(user_data);
(*cb)();
delete cb;
return G_SOURCE_REMOVE;
- }, func);
+ }), func, nullptr);
+ g_source_attach(source, context);
+ g_source_unref(source);
+ return true;
}
<LEM_CONTEXT>
*/
constexpr const char CB_LEM_BASE[] =
R"__cpp_cb(
-LocalExecution::LocalExecution(std::string port_name, LocalExecution::IEvent* listener) : port_name_(std::move(port_name)), listener_(listener) {
+LocalExecution::LocalExecution(std::string port_name, LocalExecution::IEvent* listener) : port_name_(std::move(port_name)), listener_(listener), context_(g_main_context_ref_thread_default(), g_main_context_unref) {
LoadSymbols();
}
bool LocalExecution::IsListening() const {
return listening_;
}
+
+GMainContext* LocalExecution::GetContext() const { return context_.get(); }
)__cpp_cb";
/**
return RPC_PORT_ERROR_IO_ERROR;
}
- if (gettid() != getpid()) {
- std::string sender_str(sender);
- std::string instance_str(instance);
- auto* func = new std::function<void()>([context, sender_str, instance_str, sync] {
- <CLS_NAME>_context_->OnConnected(context, sender_str, instance_str, sync);
- });
- IdleAddOnce(func);
- } else {
- <CLS_NAME>_context_->OnConnected(context, sender, instance, sync);
+ std::string sender_str(sender);
+ std::string instance_str(instance);
+ auto* func = new std::function<void()>([context, sender_str, instance_str, sync] {
+ <CLS_NAME>_context_->OnConnected(context, sender_str, instance_str, sync);
+ });
+ if (!IdleAddOnce(<CLS_NAME>_context_->GetContext(), func)) {
+ delete func;
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
}
return RPC_PORT_ERROR_NONE;
return RPC_PORT_ERROR_IO_ERROR;
}
- if (gettid() != getpid()) {
- std::string sender_str(sender);
- std::string instance_str(instance);
- auto* func = new std::function<void()>([context, sender_str, instance_str] {
- <CLS_NAME>_context_->OnDisconnected(context, sender_str, instance_str);
- });
- IdleAddOnce(func);
- } else {
- <CLS_NAME>_context_->OnDisconnected(context, sender, instance);
+ std::string sender_str(sender);
+ std::string instance_str(instance);
+ auto* func = new std::function<void()>([context, sender_str, instance_str] {
+ <CLS_NAME>_context_->OnDisconnected(context, sender_str, instance_str);
+ });
+ if (!IdleAddOnce(<CLS_NAME>_context_->GetContext(), func)) {
+ delete func;
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
}
return RPC_PORT_ERROR_NONE;
}
rpc_port_parcel_h cloned_parcel = Clone(parcel);
- if (getpid() != getpid()) {
- auto* func = new std::function<void()>([context, cloned_parcel] {
- <CLS_NAME>_context_->OnReceived(context, cloned_parcel);
- });
- IdleAddOnce(func);
- } else {
- <CLS_NAME>_context_->OnReceived(context, cloned_parcel);
+ auto* func = new std::function<void()>([context, cloned_parcel] {
+ <CLS_NAME>_context_->OnReceived(context, cloned_parcel);
+ });
+ if (!IdleAddOnce(<CLS_NAME>_context_->GetContext(), func)) {
+ delete func;
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
}
return RPC_PORT_ERROR_NONE;
bool IsListening() const;
bool LoadSymbols();
+ GMainContext* GetContext() const;
private:
using ProxyConnectFunc = int (*)(void*, bool);
ProxyDisconnectFunc disconnect_func_ = nullptr;
ProxySendResultFunc send_result_func_ = nullptr;
ProxyInvokeCallbackFunc invoke_callback_func_ = nullptr;
+ std::unique_ptr<GMainContext, decltype(g_main_context_unref)*> context_;
};
)__cpp_cb";