From 1561b08eb5472884251d0746da28afb779b9a95b Mon Sep 17 00:00:00 2001 From: Changgyu Choi Date: Tue, 31 Aug 2021 17:52:53 +0900 Subject: [PATCH] Add port message sending thread Change-Id: I548ce7bf15613e32cf15397b8fe388efd07252f0 Signed-off-by: Changgyu Choi --- src/message_sending_thread.cc | 99 +++++++++++++++++++++++++++++++++++++++++++ src/message_sending_thread.hh | 57 +++++++++++++++++++++++++ src/port-internal.cc | 83 +++++++++++++++++++++++++++--------- src/port-internal.hh | 12 ++++-- 4 files changed, 227 insertions(+), 24 deletions(-) create mode 100644 src/message_sending_thread.cc create mode 100644 src/message_sending_thread.hh diff --git a/src/message_sending_thread.cc b/src/message_sending_thread.cc new file mode 100644 index 0000000..ef80356 --- /dev/null +++ b/src/message_sending_thread.cc @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2021 Samsung Electronics Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +#include "log-private.hh" +#include "message_sending_thread.hh" + +namespace rpc_port { +namespace internal { + +MessageSendingThread& MessageSendingThread::GetInst() { + static MessageSendingThread inst; + return inst; +} + +MessageSendingThread::MessageSendingThread() { + context_ = g_main_context_new(); + loop_ = g_main_loop_new(context_, false); + std::unique_lock lock(mutex_); + thread_ = std::make_unique([&]() -> void { ThreadLoop(); }); + cond_.wait(lock, [&] { return thread_ok_; }); +} + +MessageSendingThread::~MessageSendingThread() { + Dispose(); +} + +gboolean MessageSendingThread::NotifyOne(gpointer data) { + auto* sender = static_cast(data); + std::unique_lock lock(sender->mutex_); + sender->thread_ok_ = true; + sender->cond_.notify_one(); + return G_SOURCE_REMOVE; +} + +void MessageSendingThread::ThreadLoop() { + { + std::unique_lock lock(mutex_); + GSource* source = g_idle_source_new(); + if (source == nullptr) { + _E("Failed to create GSource"); + cond_.notify_one(); + return; + } + + g_source_set_callback(source, NotifyOne, this, nullptr); + g_source_set_priority(source, G_PRIORITY_HIGH); + g_source_attach(source, context_); + g_source_unref(source); + + g_main_context_push_thread_default(context_); + } + + g_main_loop_run(loop_); + + g_main_context_pop_thread_default(context_); + _W("Shutdown message sending thread"); +} + +void MessageSendingThread::Dispose() { + std::unique_lock lock(mutex_); + if (g_main_loop_is_running(loop_)) + g_main_loop_quit(loop_); + else + _E("GMainLoop is not running"); + + thread_->join(); + + g_main_loop_unref(loop_); + loop_ = nullptr; + + g_main_context_unref(context_); + context_ = nullptr; +} + +GMainContext* MessageSendingThread::GetContext() { + return context_; +} + +} // namespace internal +} // namespace rpc_port + diff --git a/src/message_sending_thread.hh b/src/message_sending_thread.hh new file mode 100644 index 0000000..b71b136 --- /dev/null +++ b/src/message_sending_thread.hh @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2021 Samsung Electronics Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MESSAGE_SENDING_THREAD_HH_ +#define MESSAGE_SENDING_THREAD_HH_ + +#include +#include + +#include +#include +#include +#include +#include + +#include "port-internal.hh" + +namespace rpc_port { +namespace internal { + +class MessageSendingThread { + public: + static MessageSendingThread& GetInst(); + ~MessageSendingThread(); + GMainContext* GetContext(); + + private: + MessageSendingThread(); + void ThreadLoop(); + void Dispose(); + static gboolean NotifyOne(gpointer data); + + GMainContext* context_ = nullptr; + GMainLoop* loop_ = nullptr; + bool thread_ok_ = false; + std::unique_ptr thread_; + std::mutex mutex_; + std::condition_variable cond_; +}; + +} // namespace internal +} // namespace rpc_port + +#endif // MESSAGE_SENDING_THREAD_HH_ diff --git a/src/port-internal.cc b/src/port-internal.cc index 9ae6802..4cceada 100644 --- a/src/port-internal.cc +++ b/src/port-internal.cc @@ -16,7 +16,6 @@ #include #include -#include #include #include #include @@ -28,6 +27,7 @@ #include "include/rpc-port.h" #include "log-private.hh" +#include "message_sending_thread.hh" #include "port-internal.hh" #define MAX_CNT 100 @@ -249,21 +249,28 @@ int Port::Write(const void* buf, unsigned int size, int* sent_bytes) { return PORT_STATUS_ERROR_NONE; } -gboolean Port::OnEventReceived(gint fd, GIOCondition cond, - gpointer user_data) { - Port* port = static_cast(user_data); +gboolean Port::OnEventReceived(GIOChannel* io, GIOCondition condition, + gpointer data) { + auto* ptr = static_cast*>(data); + auto port = ptr->lock(); + if (port == nullptr) { + _E("port is destructed"); + return G_SOURCE_CONTINUE; + } + std::lock_guard lock(port->mutex_); - int ret; if (port->queue_.empty()) { - port->delay_src_id_ = 0; - port->delayed_message_size_ = 0; - return G_SOURCE_REMOVE; + port->IgnoreIOEvent(); + return G_SOURCE_CONTINUE; } - ret = port->PopDelayedMessage(); - if (ret == PORT_STATUS_ERROR_IO_ERROR) - return G_SOURCE_REMOVE; + if (port->source_ == nullptr || g_source_is_destroyed(port->source_)) { + _E("GSource(%p) is destroyed", port->source_); + port->IgnoreIOEvent(); + return G_SOURCE_CONTINUE; + } + port->PopDelayedMessage(); return G_SOURCE_CONTINUE; } @@ -273,12 +280,49 @@ void Port::ClearQueue() { while (queue_.empty() == false) queue_.pop(); - if (delay_src_id_ != 0) { - g_source_remove(delay_src_id_); - delay_src_id_ = 0; + IgnoreIOEvent(); + delayed_message_size_ = 0; +} + +void Port::IgnoreIOEvent() { + std::lock_guard lock(mutex_); + if (source_ != nullptr && !g_source_is_destroyed(source_)) + g_source_destroy(source_); + + source_ = nullptr; + + if (channel_ != nullptr) { + g_io_channel_unref(channel_); + channel_ = nullptr; + } +} + +int Port::ListenIOEvent() { + std::lock_guard lock(mutex_); + channel_ = g_io_channel_unix_new(fd_); + if (channel_ == nullptr) { + _E("Failed to create GIOChannel"); + return RPC_PORT_ERROR_OUT_OF_MEMORY; + } + + source_ = g_io_create_watch(channel_, static_cast(G_IO_OUT)); + if (source_ == nullptr) { + _E("Failed to create GSource"); + IgnoreIOEvent(); + return RPC_PORT_ERROR_OUT_OF_MEMORY; } - delayed_message_size_ = 0; + auto* ptr = new (std::nothrow) std::weak_ptr(shared_from_this()); + g_source_set_callback(source_, reinterpret_cast(OnEventReceived), + static_cast(ptr), [](gpointer ptr) { + auto* port = static_cast*>(ptr); + delete port; + }); + g_source_set_priority(source_, G_PRIORITY_DEFAULT); + g_source_attach(source_, MessageSendingThread::GetInst().GetContext()); + g_source_unref(source_); + + return RPC_PORT_ERROR_NONE; } int Port::PopDelayedMessage() { @@ -302,12 +346,11 @@ int Port::PopDelayedMessage() { } int Port::PushDelayedMessage(std::shared_ptr dm) { + std::lock_guard lock(mutex_); if (queue_.empty()) { - delay_src_id_ = g_unix_fd_add(fd_, G_IO_OUT, OnEventReceived, this); - if (delay_src_id_ == 0) { - _E("Failed to add watch on socket"); - return RPC_PORT_ERROR_IO_ERROR; - } + int ret = ListenIOEvent(); + if (ret != RPC_PORT_ERROR_NONE) + return ret; } delayed_message_size_ += dm->GetOriginalSize(); diff --git a/src/port-internal.hh b/src/port-internal.hh index bda4d41..ba8e66c 100644 --- a/src/port-internal.hh +++ b/src/port-internal.hh @@ -18,6 +18,7 @@ #define PORT_INTERNAL_HH_ #include +#include #include #include @@ -31,7 +32,7 @@ namespace rpc_port { namespace internal { -class Port { +class Port : public std::enable_shared_from_this { public: Port(int fd, std::string id, std::string instance); Port(int fd, std::string id); @@ -67,6 +68,8 @@ class Port { private: bool CanWrite(); + void IgnoreIOEvent(); + int ListenIOEvent(); class DelayMessage { public: @@ -91,8 +94,8 @@ class Port { int PushDelayedMessage(std::shared_ptr dm); int PopDelayedMessage(); - static gboolean OnEventReceived(gint fd, - GIOCondition cond, gpointer user_data); + static gboolean OnEventReceived(GIOChannel* io, + GIOCondition condition, gpointer data); void ClearQueue(); int fd_; @@ -102,7 +105,8 @@ class Port { mutable std::recursive_mutex mutex_; std::queue> queue_; int delayed_message_size_ = 0; - int delay_src_id_ = 0; + GIOChannel* channel_ = nullptr; + GSource* source_ = nullptr; }; } // namespace internal -- 2.7.4