From 8363a26c15beeaa473d5a5e3fb79e4b3194e1c49 Mon Sep 17 00:00:00 2001 From: Hwankyu Jhun Date: Thu, 28 Oct 2021 14:14:49 +0900 Subject: [PATCH] Separate locking mutex Currently, the rpc_port_parcel_create_from_port() function and the rpc_port_parcel_send() function use the mutex of the Port to locking the mutex. After this patch is applied, a new mutex is added to the Port to avoid problems that cannot be written. And, the maximum queue size of the delayed messages is increased to 10MB. Change-Id: I048adec69643b6cd975f7b2c3345e4babfb4a222 Signed-off-by: Hwankyu Jhun --- src/port-internal.cc | 43 +++++++++++++++++++++++-------------------- src/port-internal.hh | 1 + 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/src/port-internal.cc b/src/port-internal.cc index aaccee8..aba4772 100644 --- a/src/port-internal.cc +++ b/src/port-internal.cc @@ -35,7 +35,7 @@ namespace rpc_port { namespace internal { namespace { -constexpr const int QUEUE_SIZE_MAX = 1024 * 1024; +constexpr const int QUEUE_SIZE_MAX = 1024 * 1024 * 10; constexpr const int MAX_RETRY_CNT = 10; constexpr const int MAX_TIMEOUT = 1000; constexpr const int MIN_TIMEOUT = 50; @@ -77,6 +77,7 @@ Port::Port(int fd, std::string id, std::string instance) : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {} Port::~Port() { + std::lock_guard lock(mutex_); ClearQueue(); Disconnect(); } @@ -84,7 +85,7 @@ Port::~Port() { void Port::Disconnect() { IgnoreIOEvent(); - std::lock_guard lock(mutex_); + std::lock_guard lock(rw_mutex_); if (fd_ > 0) { _W("Close fd(%d)", fd_); close(fd_); @@ -123,18 +124,22 @@ int Port::Read(void* buf, unsigned int size) { int timeout = MIN_TIMEOUT; int fd; - mutex_.lock(); - fd = fd_; - mutex_.unlock(); + { + std::lock_guard lock(rw_mutex_); + fd = fd_; + } + if (fd < 0 || fd >= sysconf(_SC_OPEN_MAX)) { _E("Invalid fd(%d)", fd); return RPC_PORT_ERROR_IO_ERROR; } while (left) { - mutex_.lock(); - nb = read(fd_, buffer, left); - mutex_.unlock(); + { + std::lock_guard lock(rw_mutex_); + nb = read(fd_, buffer, left); + } + if (nb == 0) { _E("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd, nb); return RPC_PORT_ERROR_IO_ERROR; @@ -143,9 +148,7 @@ int Port::Read(void* buf, unsigned int size) { bool can_read = false; while (!can_read && max_timeout > 0) { auto start = std::chrono::steady_clock::now(); - mutex_.lock(); can_read = CanRead(timeout); - mutex_.unlock(); auto end = std::chrono::steady_clock::now(); auto elapsed_time = std::chrono::duration_cast( @@ -212,7 +215,7 @@ bool Port::CanWrite() { int Port::Write(const void* buf, unsigned int size) { int sent_bytes = 0; int ret; - std::lock_guard lock(mutex_); + std::lock_guard lock(rw_mutex_); if (queue_.empty()) { ret = Write(buf, size, &sent_bytes); @@ -296,7 +299,7 @@ gboolean Port::OnEventReceived(GIOChannel* io, GIOCondition condition, return G_SOURCE_CONTINUE; } - std::lock_guard lock(port->mutex_); + std::lock_guard lock(port->rw_mutex_); if (port->queue_.empty()) { port->IgnoreIOEvent(); return G_SOURCE_CONTINUE; @@ -313,7 +316,7 @@ gboolean Port::OnEventReceived(GIOChannel* io, GIOCondition condition, } void Port::ClearQueue() { - std::lock_guard lock(mutex_); + std::lock_guard lock(rw_mutex_); while (queue_.empty() == false) queue_.pop(); @@ -323,7 +326,7 @@ void Port::ClearQueue() { } void Port::IgnoreIOEvent() { - std::lock_guard lock(mutex_); + std::lock_guard lock(rw_mutex_); if (source_ != nullptr && !g_source_is_destroyed(source_)) g_source_destroy(source_); @@ -336,7 +339,7 @@ void Port::IgnoreIOEvent() { } int Port::ListenIOEvent() { - std::lock_guard lock(mutex_); + std::lock_guard lock(rw_mutex_); channel_ = g_io_channel_unix_new(fd_); if (channel_ == nullptr) { _E("Failed to create GIOChannel"); @@ -365,7 +368,7 @@ int Port::ListenIOEvent() { int Port::PopDelayedMessage() { int sent_bytes = 0; - std::lock_guard lock(mutex_); + std::lock_guard lock(rw_mutex_); auto dm = queue_.front(); int ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes); @@ -378,13 +381,13 @@ int Port::PopDelayedMessage() { queue_.pop(); } - _W("cache : count(%zu), delayed_message_size (%d), ret(%d)", - queue_.size(), delayed_message_size_, ret); + _W("cache : count(%zu), delayed_message_size(%d), ret(%d), sent_bytes(%d)", + queue_.size(), delayed_message_size_, ret, sent_bytes); return ret; } int Port::PushDelayedMessage(std::shared_ptr dm) { - std::lock_guard lock(mutex_); + std::lock_guard lock(rw_mutex_); if (queue_.empty()) { int ret = ListenIOEvent(); if (ret != RPC_PORT_ERROR_NONE) @@ -394,7 +397,7 @@ int Port::PushDelayedMessage(std::shared_ptr dm) { delayed_message_size_ += dm->GetOriginalSize(); queue_.push(dm); - _W("cache : count(%zu), delayed_message_size (%d)", + _W("cache : count(%zu), delayed_message_size(%d)", queue_.size(), delayed_message_size_); return RPC_PORT_ERROR_NONE; } diff --git a/src/port-internal.hh b/src/port-internal.hh index 853b27f..8924a8a 100644 --- a/src/port-internal.hh +++ b/src/port-internal.hh @@ -104,6 +104,7 @@ class Port : public std::enable_shared_from_this { std::string instance_; std::atomic seq_; mutable std::recursive_mutex mutex_; + mutable std::recursive_mutex rw_mutex_; std::queue> queue_; int delayed_message_size_ = 0; GIOChannel* channel_ = nullptr; -- 2.7.4