Apply delayed message queue 35/237335/4
authorJusung Son <jusung07.son@samsung.com>
Mon, 29 Jun 2020 07:52:26 +0000 (16:52 +0900)
committerJusung Son <jusung07.son@samsung.com>
Thu, 2 Jul 2020 07:52:59 +0000 (16:52 +0900)
Change-Id: I2f450be650e7cbf9ecb12404a79437ae0bfd765e
Signed-off-by: Jusung Son <jusung07.son@samsung.com>
src/port-internal.cc
src/port-internal.h

index 6e653a237948505847437c32e29c7591ff3026c1..54605eb86a790353cc6990e55979abc7fb8f508d 100644 (file)
@@ -26,6 +26,7 @@
 #include <uuid/uuid.h>
 #include <dlog.h>
 #include <aul_rpc_port.h>
+#include <glib-unix.h>
 
 #include "rpc-port.h"
 #include "port-internal.h"
 #define MAX_SLEEP 100
 #define MIN_SLEEP 5
 #define BASE_SLEEP 1000 * 1000
+#define QUEUE_SIZE_MAX (1024 * 1024) /* 1MB */
+#define MAX_RETRY_CNT 10
 
 namespace rpc_port {
 namespace internal {
 
+Port::DelayMessage::DelayMessage(const char* msg, int index, int size)
+    : message_(msg, msg + size), index_(index), size_(size) {
+}
+
+void Port::DelayMessage::SetIndex(int index) {
+  index_ += index;
+}
+
+int Port::DelayMessage::GetSize() {
+  return size_ - index_;
+}
+
+int Port::DelayMessage::GetOriginalSize() {
+  return size_;
+}
+
+char* Port::DelayMessage::GetMessage() {
+  char* ptr = reinterpret_cast<char*>(message_.data());
+  ptr += index_;
+  return ptr;
+}
+
 Port::Port(int fd, std::string id)
     : fd_(fd), id_(std::move(id)), instance_(""), seq_(0) {
   char uuid[37];
@@ -57,6 +82,7 @@ Port::Port(int fd, std::string id, std::string instance)
     : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {}
 
 Port::~Port() {
+  ClearQueue();
   close(fd_);
 }
 
@@ -130,69 +156,137 @@ int Port::Read(void* buf, unsigned int size) {
 }
 
 int Port::Write(const void* buf, unsigned int size) {
-  unsigned int left = size;
-  ssize_t nb;
-  struct timespec TRY_SLEEP_TIME = { 0, MIN_SLEEP * BASE_SLEEP };
-  struct pollfd fds[1];
+  int sent_bytes = 0;
   int ret;
-  int bytes_write = 0;
-  const char* buffer = static_cast<const char*>(buf);
-  int max_timeout = MAX_CNT * MAX_SLEEP; /* 10 sec */
-  struct timespec start_time = { 0, };
-  struct timespec end_time = { 0, };
   std::lock_guard<std::recursive_mutex> lock(mutex_);
 
-  if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
-    LOGE("Invalid fd(%d)", fd_);
-    return RPC_PORT_ERROR_IO_ERROR;
+  if (queue_.empty()) {
+    ret = Write(buf, size, &sent_bytes);
+    if (ret == PORT_STATUS_ERROR_NONE)
+      return RPC_PORT_ERROR_NONE;
+    else if (ret == PORT_STATUS_ERROR_IO_ERROR)
+      return RPC_PORT_ERROR_IO_ERROR;
   }
 
-  fds[0].fd = fd_;
-  fds[0].events = POLLOUT;
-  fds[0].revents = 0;
-
-  clock_gettime(CLOCK_MONOTONIC, &start_time);
-  ret = poll(fds, 1, MAX_SLEEP * MAX_CNT);
-  clock_gettime(CLOCK_MONOTONIC, &end_time);
-  if (ret == 0) {
-    LOGE("write_socket: : fd %d poll timeout", fd_);
+  if (delayed_message_size_ > QUEUE_SIZE_MAX) {
+    LOGE("cache fail : delayed_message_size (%d), count(%d)",
+                            delayed_message_size_, queue_.size());
     return RPC_PORT_ERROR_IO_ERROR;
   }
 
-  max_timeout -= (((end_time.tv_sec - start_time.tv_sec) * 1000) +
-      ((end_time.tv_nsec - start_time.tv_nsec) / (BASE_SLEEP)));
-  if (max_timeout <= 0) {
-    LOGE("write_socket: ...timed out fd %d: errno %d", fd_, errno);
-    return RPC_PORT_ERROR_IO_ERROR;
+  ret = PushDelayedMessage(
+          std::make_shared<DelayMessage>(static_cast<const char*>(buf),
+          sent_bytes, size));
+  return ret;
+}
+
+int Port::Write(const void* buf, unsigned int size, int* sent_bytes) {
+  unsigned int left = size;
+  ssize_t nb;
+  int retry_cnt = 0;
+  const char* buffer = static_cast<const char*>(buf);
+
+  if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
+    LOGE("Invalid fd(%d)", fd_);
+    return PORT_STATUS_ERROR_IO_ERROR;
   }
 
-  while (left) {
+  while (left && (retry_cnt < MAX_RETRY_CNT)) {
     nb = send(fd_, buffer, left, MSG_NOSIGNAL);
     if (nb == -1) {
-      if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
-        LOGI("write_socket: %d errno, sleep and retry ...", errno);
-        nanosleep(&TRY_SLEEP_TIME, 0);
-        max_timeout -= (TRY_SLEEP_TIME.tv_nsec / (BASE_SLEEP));
-        if (max_timeout <= 0) {
-          LOGE("write_socket: ...timed out fd %d: errno %d", fd_, errno);
-          return RPC_PORT_ERROR_IO_ERROR;
-        }
-        TRY_SLEEP_TIME.tv_nsec *= 2;
-        if (TRY_SLEEP_TIME.tv_nsec > (MAX_SLEEP * BASE_SLEEP))
-          TRY_SLEEP_TIME.tv_nsec = MAX_SLEEP * BASE_SLEEP;
+      if (errno == EINTR) {
+        LOGI("write_socket: EINTR continue ...");
+        retry_cnt++;
         continue;
       }
 
+    if (errno == EAGAIN || errno == EWOULDBLOCK)
+      return PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE;
+
       LOGE("write_socket: ...error fd %d: errno %d\n", fd_, errno);
-      return RPC_PORT_ERROR_IO_ERROR;
+      return PORT_STATUS_ERROR_IO_ERROR;
     }
 
     left -= nb;
     buffer += nb;
-    bytes_write += nb;
-    TRY_SLEEP_TIME.tv_nsec = MIN_SLEEP * BASE_SLEEP;
+    *sent_bytes += nb;
+  }
+
+  if (left != 0) {
+    LOGE("error fd %d: retry_cnt %d", fd_, retry_cnt);
+    return PORT_STATUS_ERROR_IO_ERROR;
+  }
+
+  return PORT_STATUS_ERROR_NONE;
+}
+
+gboolean Port::OnEventReceived(gint fd, GIOCondition cond,
+                               gpointer user_data) {
+  Port* port = static_cast<Port*>(user_data);
+  std::lock_guard<std::recursive_mutex> lock(port->mutex_);
+  int ret;
+  if (port->queue_.empty()) {
+    port->delay_src_id_ = 0;
+    port->delayed_message_size_ = 0;
+    return G_SOURCE_REMOVE;
   }
 
+  ret = port->PopDelayedMessage();
+  if (ret == PORT_STATUS_ERROR_IO_ERROR)
+    return G_SOURCE_REMOVE;
+
+  return G_SOURCE_CONTINUE;
+}
+
+void Port::ClearQueue() {
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+
+  while(queue_.empty() == false)
+    queue_.pop();
+
+  if (delay_src_id_ != 0) {
+    g_source_remove(delay_src_id_);
+    delay_src_id_ = 0;
+  }
+
+  delayed_message_size_ = 0;
+}
+
+int Port::PopDelayedMessage() {
+  int sent_bytes = 0;
+  int ret;
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  auto dm = queue_.front();
+
+  ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
+  if (ret == PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE) {
+    dm->SetIndex(sent_bytes);
+  } else if(ret == PORT_STATUS_ERROR_IO_ERROR) {
+    ClearQueue();
+  } else {
+    delayed_message_size_ -= dm->GetOriginalSize();
+    queue_.pop();
+  }
+
+  LOGW("cache : count(%d), delayed_message_size (%d), ret(%d)",
+      queue_.size(), delayed_message_size_, ret);
+  return ret;
+}
+
+int Port::PushDelayedMessage(std::shared_ptr<DelayMessage> dm) {
+  if (queue_.empty()) {
+    delay_src_id_ = g_unix_fd_add(fd_, G_IO_OUT, OnEventReceived, this);
+    if (delay_src_id_ == 0) {
+      LOGE("Failed to add watch on socket");
+      return RPC_PORT_ERROR_IO_ERROR;
+    }
+  }
+
+  delayed_message_size_ += dm->GetOriginalSize();
+  queue_.push(dm);
+
+  LOGW("cache : count(%d), delayed_message_size (%d)",
+      queue_.size(), delayed_message_size_);
   return RPC_PORT_ERROR_NONE;
 }
 
index 5c4fb1f51003f003ecd19b855a86782590ed82ab..4cab83b24f74ac1e382fec2600feeab5eed842f2 100644 (file)
 #include <string>
 #include <memory>
 #include <atomic>
+#include <vector>
+#include <queue>
+#include <algorithm>
+#include <gio/gio.h>
 
 namespace rpc_port {
 namespace internal {
@@ -38,6 +42,7 @@ class Port {
 
   int Read(void* buf, unsigned int size);
   int Write(const void* buf, unsigned int size);
+  int Write(const void* buf, unsigned int size, int* sent_bytes);
   int GetFd() const {
     return fd_;
   }
@@ -59,11 +64,41 @@ class Port {
   }
 
  private:
+   class DelayMessage {
+    public:
+     DelayMessage(const char* msg, int start_index, int size);
+     ~DelayMessage() = default;
+     void SetIndex(int index);
+     int GetSize();
+     int GetOriginalSize();
+     char* GetMessage();
+
+    private:
+     std::vector<unsigned char> message_;
+     int index_;
+     int size_;
+   };
+
+   enum PortStatus {
+    PORT_STATUS_ERROR_NONE,
+    PORT_STATUS_ERROR_IO_ERROR,
+    PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE
+  };
+
+  int PushDelayedMessage(std::shared_ptr<DelayMessage> dm);
+  int PopDelayedMessage();
+  static gboolean OnEventReceived(gint fd,
+                                  GIOCondition cond, gpointer user_data);
+  void ClearQueue();
+
   int fd_;
   std::string id_;
   std::string instance_;
   std::atomic<uint32_t> seq_;
   mutable std::recursive_mutex mutex_;
+  std::queue<std::shared_ptr<DelayMessage>> queue_;
+  int delayed_message_size_ = 0;
+  int delay_src_id_ = 0;
 };
 
 }  // namespace internal