Add port message sending thread 70/263070/17
authorChanggyu Choi <changyu.choi@samsung.com>
Tue, 31 Aug 2021 08:52:53 +0000 (17:52 +0900)
committerChanggyu Choi <changyu.choi@samsung.com>
Tue, 7 Sep 2021 06:30:20 +0000 (15:30 +0900)
Change-Id: I548ce7bf15613e32cf15397b8fe388efd07252f0
Signed-off-by: Changgyu Choi <changyu.choi@samsung.com>
src/message_sending_thread.cc [new file with mode: 0644]
src/message_sending_thread.hh [new file with mode: 0644]
src/port-internal.cc
src/port-internal.hh

diff --git a/src/message_sending_thread.cc b/src/message_sending_thread.cc
new file mode 100644 (file)
index 0000000..ef80356
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2021 Samsung Electronics Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gio/gio.h>
+#include <glib-unix.h>
+
+#include <memory>
+
+#include "log-private.hh"
+#include "message_sending_thread.hh"
+
+namespace rpc_port {
+namespace internal {
+
+MessageSendingThread& MessageSendingThread::GetInst() {
+  static MessageSendingThread inst;
+  return inst;
+}
+
+MessageSendingThread::MessageSendingThread() {
+  context_ = g_main_context_new();
+  loop_ = g_main_loop_new(context_, false);
+  std::unique_lock<std::mutex> lock(mutex_);
+  thread_ = std::make_unique<std::thread>([&]() -> void { ThreadLoop(); });
+  cond_.wait(lock, [&] { return thread_ok_; });
+}
+
+MessageSendingThread::~MessageSendingThread() {
+  Dispose();
+}
+
+gboolean MessageSendingThread::NotifyOne(gpointer data) {
+  auto* sender = static_cast<MessageSendingThread*>(data);
+  std::unique_lock<std::mutex> lock(sender->mutex_);
+  sender->thread_ok_ = true;
+  sender->cond_.notify_one();
+  return G_SOURCE_REMOVE;
+}
+
+void MessageSendingThread::ThreadLoop() {
+  {
+    std::unique_lock<std::mutex> lock(mutex_);
+    GSource* source = g_idle_source_new();
+    if (source == nullptr) {
+      _E("Failed to create GSource");
+      cond_.notify_one();
+      return;
+    }
+
+    g_source_set_callback(source, NotifyOne, this, nullptr);
+    g_source_set_priority(source, G_PRIORITY_HIGH);
+    g_source_attach(source, context_);
+    g_source_unref(source);
+
+    g_main_context_push_thread_default(context_);
+  }
+
+  g_main_loop_run(loop_);
+
+  g_main_context_pop_thread_default(context_);
+  _W("Shutdown message sending thread");
+}
+
+void MessageSendingThread::Dispose() {
+  std::unique_lock<std::mutex> lock(mutex_);
+  if (g_main_loop_is_running(loop_))
+    g_main_loop_quit(loop_);
+  else
+    _E("GMainLoop is not running");
+
+  thread_->join();
+
+  g_main_loop_unref(loop_);
+  loop_ = nullptr;
+
+  g_main_context_unref(context_);
+  context_ = nullptr;
+}
+
+GMainContext* MessageSendingThread::GetContext() {
+  return context_;
+}
+
+}  // namespace internal
+}  // namespace rpc_port
+
diff --git a/src/message_sending_thread.hh b/src/message_sending_thread.hh
new file mode 100644 (file)
index 0000000..b71b136
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2021 Samsung Electronics Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MESSAGE_SENDING_THREAD_HH_
+#define MESSAGE_SENDING_THREAD_HH_
+
+#include <gio/gio.h>
+#include <glib-unix.h>
+
+#include <condition_variable>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include "port-internal.hh"
+
+namespace rpc_port {
+namespace internal {
+
+class MessageSendingThread {
+ public:
+  static MessageSendingThread& GetInst();
+  ~MessageSendingThread();
+  GMainContext* GetContext();
+
+ private:
+  MessageSendingThread();
+  void ThreadLoop();
+  void Dispose();
+  static gboolean NotifyOne(gpointer data);
+
+  GMainContext* context_ = nullptr;
+  GMainLoop* loop_ = nullptr;
+  bool thread_ok_ = false;
+  std::unique_ptr<std::thread> thread_;
+  std::mutex mutex_;
+  std::condition_variable cond_;
+};
+
+}  // namespace internal
+}  // namespace rpc_port
+
+#endif  // MESSAGE_SENDING_THREAD_HH_
index 9ae6802..4cceada 100644 (file)
@@ -16,7 +16,6 @@
 
 #include <aul_rpc_port.h>
 #include <dlog.h>
-#include <glib-unix.h>
 #include <poll.h>
 #include <stdio.h>
 #include <stdlib.h>
@@ -28,6 +27,7 @@
 
 #include "include/rpc-port.h"
 #include "log-private.hh"
+#include "message_sending_thread.hh"
 #include "port-internal.hh"
 
 #define MAX_CNT 100
@@ -249,21 +249,28 @@ int Port::Write(const void* buf, unsigned int size, int* sent_bytes) {
   return PORT_STATUS_ERROR_NONE;
 }
 
-gboolean Port::OnEventReceived(gint fd, GIOCondition cond,
-                               gpointer user_data) {
-  Port* port = static_cast<Port*>(user_data);
+gboolean Port::OnEventReceived(GIOChannel* io, GIOCondition condition,
+                               gpointer data) {
+  auto* ptr = static_cast<std::weak_ptr<Port>*>(data);
+  auto port = ptr->lock();
+  if (port == nullptr) {
+    _E("port is destructed");
+    return G_SOURCE_CONTINUE;
+  }
+
   std::lock_guard<std::recursive_mutex> lock(port->mutex_);
-  int ret;
   if (port->queue_.empty()) {
-    port->delay_src_id_ = 0;
-    port->delayed_message_size_ = 0;
-    return G_SOURCE_REMOVE;
+    port->IgnoreIOEvent();
+    return G_SOURCE_CONTINUE;
   }
 
-  ret = port->PopDelayedMessage();
-  if (ret == PORT_STATUS_ERROR_IO_ERROR)
-    return G_SOURCE_REMOVE;
+  if (port->source_ == nullptr || g_source_is_destroyed(port->source_)) {
+    _E("GSource(%p) is destroyed", port->source_);
+    port->IgnoreIOEvent();
+    return G_SOURCE_CONTINUE;
+  }
 
+  port->PopDelayedMessage();
   return G_SOURCE_CONTINUE;
 }
 
@@ -273,12 +280,49 @@ void Port::ClearQueue() {
   while (queue_.empty() == false)
     queue_.pop();
 
-  if (delay_src_id_ != 0) {
-    g_source_remove(delay_src_id_);
-    delay_src_id_ = 0;
+  IgnoreIOEvent();
+  delayed_message_size_ = 0;
+}
+
+void Port::IgnoreIOEvent() {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  if (source_ != nullptr && !g_source_is_destroyed(source_))
+    g_source_destroy(source_);
+
+  source_ = nullptr;
+
+  if (channel_ != nullptr) {
+    g_io_channel_unref(channel_);
+    channel_ = nullptr;
+  }
+}
+
+int Port::ListenIOEvent() {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  channel_ = g_io_channel_unix_new(fd_);
+  if (channel_ == nullptr) {
+    _E("Failed to create GIOChannel");
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
+  }
+
+  source_ = g_io_create_watch(channel_, static_cast<GIOCondition>(G_IO_OUT));
+  if (source_ == nullptr) {
+    _E("Failed to create GSource");
+    IgnoreIOEvent();
+    return RPC_PORT_ERROR_OUT_OF_MEMORY;
   }
 
-  delayed_message_size_ = 0;
+  auto* ptr = new (std::nothrow) std::weak_ptr<Port>(shared_from_this());
+  g_source_set_callback(source_, reinterpret_cast<GSourceFunc>(OnEventReceived),
+                        static_cast<gpointer>(ptr), [](gpointer ptr) {
+                          auto* port = static_cast<std::weak_ptr<Port>*>(ptr);
+                          delete port;
+                        });
+  g_source_set_priority(source_, G_PRIORITY_DEFAULT);
+  g_source_attach(source_, MessageSendingThread::GetInst().GetContext());
+  g_source_unref(source_);
+
+  return RPC_PORT_ERROR_NONE;
 }
 
 int Port::PopDelayedMessage() {
@@ -302,12 +346,11 @@ int Port::PopDelayedMessage() {
 }
 
 int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
   if (queue_.empty()) {
-    delay_src_id_ = g_unix_fd_add(fd_, G_IO_OUT, OnEventReceived, this);
-    if (delay_src_id_ == 0) {
-      _E("Failed to add watch on socket");
-      return RPC_PORT_ERROR_IO_ERROR;
-    }
+    int ret = ListenIOEvent();
+    if (ret != RPC_PORT_ERROR_NONE)
+      return ret;
   }
 
   delayed_message_size_ += dm->GetOriginalSize();
index bda4d41..ba8e66c 100644 (file)
@@ -18,6 +18,7 @@
 #define PORT_INTERNAL_HH_
 
 #include <gio/gio.h>
+#include <glib.h>
 
 #include <algorithm>
 #include <atomic>
@@ -31,7 +32,7 @@
 namespace rpc_port {
 namespace internal {
 
-class Port {
+class Port : public std::enable_shared_from_this<Port> {
  public:
   Port(int fd, std::string id, std::string instance);
   Port(int fd, std::string id);
@@ -67,6 +68,8 @@ class Port {
 
  private:
   bool CanWrite();
+  void IgnoreIOEvent();
+  int ListenIOEvent();
 
   class DelayMessage {
    public:
@@ -91,8 +94,8 @@ class Port {
 
   int PushDelayedMessage(std::shared_ptr<DelayMessage> dm);
   int PopDelayedMessage();
-  static gboolean OnEventReceived(gint fd,
-                                  GIOCondition cond, gpointer user_data);
+  static gboolean OnEventReceived(GIOChannel* io,
+      GIOCondition condition, gpointer data);
   void ClearQueue();
 
   int fd_;
@@ -102,7 +105,8 @@ class Port {
   mutable std::recursive_mutex mutex_;
   std::queue<std::shared_ptr<DelayMessage>> queue_;
   int delayed_message_size_ = 0;
-  int delay_src_id_ = 0;
+  GIOChannel* channel_ = nullptr;
+  GSource* source_ = nullptr;
 };
 
 }  // namespace internal