Separate locking mutex 49/265749/1
authorHwankyu Jhun <h.jhun@samsung.com>
Thu, 28 Oct 2021 05:14:49 +0000 (14:14 +0900)
committerHwankyu Jhun <h.jhun@samsung.com>
Thu, 28 Oct 2021 05:14:49 +0000 (14:14 +0900)
Currently, the rpc_port_parcel_create_from_port() function and the
rpc_port_parcel_send() function use the mutex of the Port to locking the mutex.
After this patch is applied, a new mutex is added to the Port to avoid problems
that cannot be written.
And, the maximum queue size of the delayed messages is increased to 10MB.

Change-Id: I048adec69643b6cd975f7b2c3345e4babfb4a222
Signed-off-by: Hwankyu Jhun <h.jhun@samsung.com>
src/port-internal.cc
src/port-internal.hh

index aaccee8..aba4772 100644 (file)
@@ -35,7 +35,7 @@ namespace rpc_port {
 namespace internal {
 namespace {
 
-constexpr const int QUEUE_SIZE_MAX = 1024 * 1024;
+constexpr const int QUEUE_SIZE_MAX = 1024 * 1024 * 10;
 constexpr const int MAX_RETRY_CNT = 10;
 constexpr const int MAX_TIMEOUT = 1000;
 constexpr const int MIN_TIMEOUT = 50;
@@ -77,6 +77,7 @@ Port::Port(int fd, std::string id, std::string instance)
     : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {}
 
 Port::~Port() {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
   ClearQueue();
   Disconnect();
 }
@@ -84,7 +85,7 @@ Port::~Port() {
 void Port::Disconnect() {
   IgnoreIOEvent();
 
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
   if (fd_ > 0) {
     _W("Close fd(%d)", fd_);
     close(fd_);
@@ -123,18 +124,22 @@ int Port::Read(void* buf, unsigned int size) {
   int timeout = MIN_TIMEOUT;
   int fd;
 
-  mutex_.lock();
-  fd = fd_;
-  mutex_.unlock();
+  {
+    std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+    fd = fd_;
+  }
+
   if (fd < 0 || fd >= sysconf(_SC_OPEN_MAX)) {
     _E("Invalid fd(%d)", fd);
     return RPC_PORT_ERROR_IO_ERROR;
   }
 
   while (left) {
-    mutex_.lock();
-    nb = read(fd_, buffer, left);
-    mutex_.unlock();
+    {
+      std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+      nb = read(fd_, buffer, left);
+    }
+
     if (nb == 0) {
       _E("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd, nb);
       return RPC_PORT_ERROR_IO_ERROR;
@@ -143,9 +148,7 @@ int Port::Read(void* buf, unsigned int size) {
         bool can_read = false;
         while (!can_read && max_timeout > 0) {
           auto start = std::chrono::steady_clock::now();
-          mutex_.lock();
           can_read = CanRead(timeout);
-          mutex_.unlock();
           auto end = std::chrono::steady_clock::now();
           auto elapsed_time =
               std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -212,7 +215,7 @@ bool Port::CanWrite() {
 int Port::Write(const void* buf, unsigned int size) {
   int sent_bytes = 0;
   int ret;
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
 
   if (queue_.empty()) {
     ret = Write(buf, size, &sent_bytes);
@@ -296,7 +299,7 @@ gboolean Port::OnEventReceived(GIOChannel* io, GIOCondition condition,
     return G_SOURCE_CONTINUE;
   }
 
-  std::lock_guard<std::recursive_mutex> lock(port->mutex_);
+  std::lock_guard<std::recursive_mutex> lock(port->rw_mutex_);
   if (port->queue_.empty()) {
     port->IgnoreIOEvent();
     return G_SOURCE_CONTINUE;
@@ -313,7 +316,7 @@ gboolean Port::OnEventReceived(GIOChannel* io, GIOCondition condition,
 }
 
 void Port::ClearQueue() {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
 
   while (queue_.empty() == false)
     queue_.pop();
@@ -323,7 +326,7 @@ void Port::ClearQueue() {
 }
 
 void Port::IgnoreIOEvent() {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
   if (source_ != nullptr && !g_source_is_destroyed(source_))
     g_source_destroy(source_);
 
@@ -336,7 +339,7 @@ void Port::IgnoreIOEvent() {
 }
 
 int Port::ListenIOEvent() {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
   channel_ = g_io_channel_unix_new(fd_);
   if (channel_ == nullptr) {
     _E("Failed to create GIOChannel");
@@ -365,7 +368,7 @@ int Port::ListenIOEvent() {
 
 int Port::PopDelayedMessage() {
   int sent_bytes = 0;
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
   auto dm = queue_.front();
 
   int ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
@@ -378,13 +381,13 @@ int Port::PopDelayedMessage() {
     queue_.pop();
   }
 
-  _W("cache : count(%zu), delayed_message_size (%d), ret(%d)",
-      queue_.size(), delayed_message_size_, ret);
+  _W("cache : count(%zu), delayed_message_size(%d), ret(%d), sent_bytes(%d)",
+      queue_.size(), delayed_message_size_, ret, sent_bytes);
   return ret;
 }
 
 int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
   if (queue_.empty()) {
     int ret = ListenIOEvent();
     if (ret != RPC_PORT_ERROR_NONE)
@@ -394,7 +397,7 @@ int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
   delayed_message_size_ += dm->GetOriginalSize();
   queue_.push(dm);
 
-  _W("cache : count(%zu), delayed_message_size (%d)",
+  _W("cache : count(%zu), delayed_message_size(%d)",
       queue_.size(), delayed_message_size_);
   return RPC_PORT_ERROR_NONE;
 }
index 853b27f..8924a8a 100644 (file)
@@ -104,6 +104,7 @@ class Port : public std::enable_shared_from_this<Port> {
   std::string instance_;
   std::atomic<uint32_t> seq_;
   mutable std::recursive_mutex mutex_;
+  mutable std::recursive_mutex rw_mutex_;
   std::queue<std::shared_ptr<DelayMessage>> queue_;
   int delayed_message_size_ = 0;
   GIOChannel* channel_ = nullptr;