--- /dev/null
+/*
+ * Copyright (c) 2021 Samsung Electronics Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gio/gio.h>
+#include <glib-unix.h>
+
+#include <memory>
+
+#include "log-private.hh"
+#include "message_sending_thread.hh"
+
+namespace rpc_port {
+namespace internal {
+
+MessageSendingThread& MessageSendingThread::GetInst() {
+ static MessageSendingThread inst;
+ return inst;
+}
+
+MessageSendingThread::MessageSendingThread() {
+ context_ = g_main_context_new();
+ loop_ = g_main_loop_new(context_, false);
+ std::unique_lock<std::mutex> lock(mutex_);
+ thread_ = std::make_unique<std::thread>([&]() -> void { ThreadLoop(); });
+ cond_.wait(lock, [&] { return thread_ok_; });
+}
+
+MessageSendingThread::~MessageSendingThread() {
+ Dispose();
+}
+
+gboolean MessageSendingThread::NotifyOne(gpointer data) {
+ auto* sender = static_cast<MessageSendingThread*>(data);
+ std::unique_lock<std::mutex> lock(sender->mutex_);
+ sender->thread_ok_ = true;
+ sender->cond_.notify_one();
+ return G_SOURCE_REMOVE;
+}
+
+void MessageSendingThread::ThreadLoop() {
+ {
+ std::unique_lock<std::mutex> lock(mutex_);
+ GSource* source = g_idle_source_new();
+ if (source == nullptr) {
+ _E("Failed to create GSource");
+ cond_.notify_one();
+ return;
+ }
+
+ g_source_set_callback(source, NotifyOne, this, nullptr);
+ g_source_set_priority(source, G_PRIORITY_HIGH);
+ g_source_attach(source, context_);
+ g_source_unref(source);
+
+ g_main_context_push_thread_default(context_);
+ }
+
+ g_main_loop_run(loop_);
+
+ g_main_context_pop_thread_default(context_);
+ _W("Shutdown message sending thread");
+}
+
+void MessageSendingThread::Dispose() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (g_main_loop_is_running(loop_))
+ g_main_loop_quit(loop_);
+ else
+ _E("GMainLoop is not running");
+
+ thread_->join();
+
+ g_main_loop_unref(loop_);
+ loop_ = nullptr;
+
+ g_main_context_unref(context_);
+ context_ = nullptr;
+}
+
+GMainContext* MessageSendingThread::GetContext() {
+ return context_;
+}
+
+} // namespace internal
+} // namespace rpc_port
+
--- /dev/null
+/*
+ * Copyright (c) 2021 Samsung Electronics Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MESSAGE_SENDING_THREAD_HH_
+#define MESSAGE_SENDING_THREAD_HH_
+
+#include <gio/gio.h>
+#include <glib-unix.h>
+
+#include <condition_variable>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include "port-internal.hh"
+
+namespace rpc_port {
+namespace internal {
+
+class MessageSendingThread {
+ public:
+ static MessageSendingThread& GetInst();
+ ~MessageSendingThread();
+ GMainContext* GetContext();
+
+ private:
+ MessageSendingThread();
+ void ThreadLoop();
+ void Dispose();
+ static gboolean NotifyOne(gpointer data);
+
+ GMainContext* context_ = nullptr;
+ GMainLoop* loop_ = nullptr;
+ bool thread_ok_ = false;
+ std::unique_ptr<std::thread> thread_;
+ std::mutex mutex_;
+ std::condition_variable cond_;
+};
+
+} // namespace internal
+} // namespace rpc_port
+
+#endif // MESSAGE_SENDING_THREAD_HH_
#include <aul_rpc_port.h>
#include <dlog.h>
-#include <glib-unix.h>
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include "include/rpc-port.h"
#include "log-private.hh"
+#include "message_sending_thread.hh"
#include "port-internal.hh"
#define MAX_CNT 100
return PORT_STATUS_ERROR_NONE;
}
-gboolean Port::OnEventReceived(gint fd, GIOCondition cond,
- gpointer user_data) {
- Port* port = static_cast<Port*>(user_data);
+gboolean Port::OnEventReceived(GIOChannel* io, GIOCondition condition,
+ gpointer data) {
+ auto* ptr = static_cast<std::weak_ptr<Port>*>(data);
+ auto port = ptr->lock();
+ if (port == nullptr) {
+ _E("port is destructed");
+ return G_SOURCE_CONTINUE;
+ }
+
std::lock_guard<std::recursive_mutex> lock(port->mutex_);
- int ret;
if (port->queue_.empty()) {
- port->delay_src_id_ = 0;
- port->delayed_message_size_ = 0;
- return G_SOURCE_REMOVE;
+ port->IgnoreIOEvent();
+ return G_SOURCE_CONTINUE;
}
- ret = port->PopDelayedMessage();
- if (ret == PORT_STATUS_ERROR_IO_ERROR)
- return G_SOURCE_REMOVE;
+ if (port->source_ == nullptr || g_source_is_destroyed(port->source_)) {
+ _E("GSource(%p) is destroyed", port->source_);
+ port->IgnoreIOEvent();
+ return G_SOURCE_CONTINUE;
+ }
+ port->PopDelayedMessage();
return G_SOURCE_CONTINUE;
}
while (queue_.empty() == false)
queue_.pop();
- if (delay_src_id_ != 0) {
- g_source_remove(delay_src_id_);
- delay_src_id_ = 0;
+ IgnoreIOEvent();
+ delayed_message_size_ = 0;
+}
+
+void Port::IgnoreIOEvent() {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ if (source_ != nullptr && !g_source_is_destroyed(source_))
+ g_source_destroy(source_);
+
+ source_ = nullptr;
+
+ if (channel_ != nullptr) {
+ g_io_channel_unref(channel_);
+ channel_ = nullptr;
+ }
+}
+
+int Port::ListenIOEvent() {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
+ channel_ = g_io_channel_unix_new(fd_);
+ if (channel_ == nullptr) {
+ _E("Failed to create GIOChannel");
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
+ }
+
+ source_ = g_io_create_watch(channel_, static_cast<GIOCondition>(G_IO_OUT));
+ if (source_ == nullptr) {
+ _E("Failed to create GSource");
+ IgnoreIOEvent();
+ return RPC_PORT_ERROR_OUT_OF_MEMORY;
}
- delayed_message_size_ = 0;
+ auto* ptr = new (std::nothrow) std::weak_ptr<Port>(shared_from_this());
+ g_source_set_callback(source_, reinterpret_cast<GSourceFunc>(OnEventReceived),
+ static_cast<gpointer>(ptr), [](gpointer ptr) {
+ auto* port = static_cast<std::weak_ptr<Port>*>(ptr);
+ delete port;
+ });
+ g_source_set_priority(source_, G_PRIORITY_DEFAULT);
+ g_source_attach(source_, MessageSendingThread::GetInst().GetContext());
+ g_source_unref(source_);
+
+ return RPC_PORT_ERROR_NONE;
}
int Port::PopDelayedMessage() {
}
int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
+ std::lock_guard<std::recursive_mutex> lock(mutex_);
if (queue_.empty()) {
- delay_src_id_ = g_unix_fd_add(fd_, G_IO_OUT, OnEventReceived, this);
- if (delay_src_id_ == 0) {
- _E("Failed to add watch on socket");
- return RPC_PORT_ERROR_IO_ERROR;
- }
+ int ret = ListenIOEvent();
+ if (ret != RPC_PORT_ERROR_NONE)
+ return ret;
}
delayed_message_size_ += dm->GetOriginalSize();
#define PORT_INTERNAL_HH_
#include <gio/gio.h>
+#include <glib.h>
#include <algorithm>
#include <atomic>
namespace rpc_port {
namespace internal {
-class Port {
+class Port : public std::enable_shared_from_this<Port> {
public:
Port(int fd, std::string id, std::string instance);
Port(int fd, std::string id);
private:
bool CanWrite();
+ void IgnoreIOEvent();
+ int ListenIOEvent();
class DelayMessage {
public:
int PushDelayedMessage(std::shared_ptr<DelayMessage> dm);
int PopDelayedMessage();
- static gboolean OnEventReceived(gint fd,
- GIOCondition cond, gpointer user_data);
+ static gboolean OnEventReceived(GIOChannel* io,
+ GIOCondition condition, gpointer data);
void ClearQueue();
int fd_;
mutable std::recursive_mutex mutex_;
std::queue<std::shared_ptr<DelayMessage>> queue_;
int delayed_message_size_ = 0;
- int delay_src_id_ = 0;
+ GIOChannel* channel_ = nullptr;
+ GSource* source_ = nullptr;
};
} // namespace internal