Separate locking mutex 64/265864/1
authorHwankyu Jhun <h.jhun@samsung.com>
Mon, 1 Nov 2021 06:05:26 +0000 (15:05 +0900)
committerHwankyu Jhun <h.jhun@samsung.com>
Mon, 1 Nov 2021 06:05:26 +0000 (15:05 +0900)
Currently, the rpc_port_parcel_create_from_port() function and the
rpc_port_parcel_send() function use the mutex of the Port to locking the mutex.
After this patch is applied, a new mutex is added to the Port to avoid problems
that cannot be written.
And, the maximum queue size of the delayed messages is increased to 10MB.

Change-Id: If2fd5c5208ee8f0d44389107b915f485f45ac0c6
Signed-off-by: Hwankyu Jhun <h.jhun@samsung.com>
src/port-internal.cc
src/port-internal.h

index a33698a2c560a90eba299af97b104d7a66c1f5fd..f8aca3f155a9362c354e7a107e3bbce80815ceee 100644 (file)
@@ -33,7 +33,7 @@
 #define MAX_SLEEP 100
 #define MIN_SLEEP 5
 #define BASE_SLEEP 1000 * 1000
-#define QUEUE_SIZE_MAX (1024 * 1024) /* 1MB */
+#define QUEUE_SIZE_MAX (1024 * 1024 * 10) /* 10 MB */
 #define MAX_RETRY_CNT 10
 
 namespace rpc_port {
@@ -74,6 +74,7 @@ Port::Port(int fd, std::string id, std::string instance)
     : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {}
 
 Port::~Port() {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
   ClearQueue();
   close(fd_);
 }
@@ -101,7 +102,7 @@ int Port::UnsetPrivateSharing() {
 }
 
 void Port::Disconnect() {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
   if (fd_ > 0) {
     _W("Close fd(%d)", fd_);
     close(fd_);
@@ -119,7 +120,7 @@ int Port::Read(void* buf, unsigned int size) {
   int fd;
 
   {
-    std::lock_guard<std::recursive_mutex> lock(mutex_);
+    std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
     fd = fd_;
     if (fd < 0 || fd >= sysconf(_SC_OPEN_MAX)) {
       _E("Invalid fd(%d)", fd);
@@ -129,7 +130,7 @@ int Port::Read(void* buf, unsigned int size) {
 
   while (left) {
     {
-      std::lock_guard<std::recursive_mutex> lock(mutex_);
+      std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
       nb = read(fd, buffer, left);
     }
 
@@ -167,7 +168,7 @@ int Port::Read(void* buf, unsigned int size) {
 int Port::Write(const void* buf, unsigned int size) {
   int sent_bytes = 0;
   int ret;
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
 
   if (queue_.empty()) {
     ret = Write(buf, size, &sent_bytes);
@@ -238,7 +239,7 @@ gboolean Port::OnEventReceived(GIOChannel* io, GIOCondition condition,
     return G_SOURCE_CONTINUE;
   }
 
-  std::lock_guard<std::recursive_mutex> lock(port->mutex_);
+  std::lock_guard<std::recursive_mutex> lock(port->rw_mutex_);
   if (port->queue_.empty()) {
     port->IgnoreIOEvent();
     return G_SOURCE_CONTINUE;
@@ -255,7 +256,7 @@ gboolean Port::OnEventReceived(GIOChannel* io, GIOCondition condition,
 }
 
 void Port::ClearQueue() {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
 
   while(queue_.empty() == false)
     queue_.pop();
@@ -265,7 +266,7 @@ void Port::ClearQueue() {
 }
 
 void Port::IgnoreIOEvent() {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
   if (source_ != nullptr && !g_source_is_destroyed(source_))
     g_source_destroy(source_);
 
@@ -278,7 +279,7 @@ void Port::IgnoreIOEvent() {
 }
 
 int Port::ListenIOEvent() {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
   channel_ = g_io_channel_unix_new(fd_);
   if (channel_ == nullptr) {
     _E("Failed to create GIOChannel");
@@ -308,7 +309,7 @@ int Port::ListenIOEvent() {
 int Port::PopDelayedMessage() {
   int sent_bytes = 0;
   int ret;
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
   auto dm = queue_.front();
 
   ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
@@ -321,13 +322,13 @@ int Port::PopDelayedMessage() {
     queue_.pop();
   }
 
-  _W("cache : count(%zu), delayed_message_size (%d), ret(%d)",
-      queue_.size(), delayed_message_size_, ret);
+  _W("cache : count(%zu), delayed_message_size(%d), ret(%d), sent_bytes(%d)",
+      queue_.size(), delayed_message_size_, ret, sent_bytes);
   return ret;
 }
 
 int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
-  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
   if (queue_.empty()) {
     int ret = ListenIOEvent();
     if (ret != RPC_PORT_ERROR_NONE)
@@ -337,7 +338,7 @@ int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
   delayed_message_size_ += dm->GetOriginalSize();
   queue_.push(dm);
 
-  _W("cache : count(%zu), delayed_message_size (%d)",
+  _W("cache : count(%zu), delayed_message_size(%d)",
       queue_.size(), delayed_message_size_);
   return RPC_PORT_ERROR_NONE;
 }
index 948131658f76d1617c48eab9e93d8224ab69dc77..4743df91bb75c1a7ebafae6f765601e2aa54dce8 100644 (file)
@@ -101,6 +101,7 @@ class Port : public std::enable_shared_from_this<Port> {
   std::string instance_;
   std::atomic<uint32_t> seq_;
   mutable std::recursive_mutex mutex_;
+  mutable std::recursive_mutex rw_mutex_;
   std::queue<std::shared_ptr<DelayMessage>> queue_;
   int delayed_message_size_ = 0;
   GIOChannel* channel_ = nullptr;