Wait fd event to read data 76/264476/6
authorHwankyu Jhun <h.jhun@samsung.com>
Thu, 23 Sep 2021 02:02:57 +0000 (11:02 +0900)
committerHwankyu Jhun <h.jhun@samsung.com>
Thu, 23 Sep 2021 05:23:22 +0000 (14:23 +0900)
This patch uses poll() instead of nanosleep(). If the pol() returns a
positive number, the Read() method tries to read the data from the file
descriptor immediately.

Change-Id: I4d0f3e548ba038538a2463b17a5dbff64e4cf8a2
Signed-off-by: Hwankyu Jhun <h.jhun@samsung.com>
src/port-internal.cc
src/port-internal.hh

index 7e006a37032e3c429c8cb39ec2be91e70483b04f..c7c572682a0172a42aa3a5e17d93a21225d46934 100644 (file)
@@ -23,6 +23,7 @@
 #include <unistd.h>
 #include <uuid/uuid.h>
 
+#include <chrono>
 #include <utility>
 
 #include "include/rpc-port.h"
 #include "message-sending-thread-internal.hh"
 #include "port-internal.hh"
 
-#define MAX_CNT 100
-#define MAX_SLEEP 100
-#define MIN_SLEEP 5
-#define BASE_SLEEP 1000 * 1000
-#define QUEUE_SIZE_MAX (1024 * 1024) /* 1MB */
-#define MAX_RETRY_CNT 10
-
 namespace rpc_port {
 namespace internal {
+namespace {
+
+constexpr const int QUEUE_SIZE_MAX = 1024 * 1024;
+constexpr const int MAX_RETRY_CNT = 10;
+constexpr const int MAX_TIMEOUT = 1000;
+constexpr const int MIN_TIMEOUT = 50;
+
+}  // namespace
 
 Port::DelayMessage::DelayMessage(const char* msg, int index, int size)
     : message_(msg, msg + size), index_(index), size_(size) {
@@ -115,10 +117,10 @@ int Port::UnsetPrivateSharing() {
 int Port::Read(void* buf, unsigned int size) {
   unsigned int left = size;
   ssize_t nb;
-  struct timespec TRY_SLEEP_TIME = { 0, MIN_SLEEP * BASE_SLEEP};
   int bytes_read = 0;
   char* buffer = static_cast<char*>(buf);
-  int max_timeout = MAX_CNT * MAX_SLEEP; /* 10 sec */
+  int max_timeout = MAX_TIMEOUT * MAX_RETRY_CNT;
+  int timeout = MIN_TIMEOUT;
   int fd;
 
   mutex_.lock();
@@ -138,16 +140,30 @@ int Port::Read(void* buf, unsigned int size) {
       return RPC_PORT_ERROR_IO_ERROR;
     } else if (nb == -1) {
       if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
-        LOGI("read_socket: %d errno, sleep and retry ...", errno);
-        nanosleep(&TRY_SLEEP_TIME, 0);
-        max_timeout -= (TRY_SLEEP_TIME.tv_nsec / (BASE_SLEEP));
-        if (max_timeout <= 0) {
+        LOGI("read_socket: %d errno, wait and retry ...", errno);
+        bool can_read = false;
+        while (!can_read && max_timeout > 0) {
+          auto start = std::chrono::steady_clock::now();
+          mutex_.lock();
+          can_read = CanRead(timeout);
+          mutex_.unlock();
+          auto end = std::chrono::steady_clock::now();
+          auto elapsed_time =
+              std::chrono::duration_cast<std::chrono::milliseconds>(
+                  end - start);
+
+          max_timeout -= elapsed_time.count();
+
+          timeout *= 2;
+          if (timeout > MAX_TIMEOUT)
+            timeout = MAX_TIMEOUT;
+        }
+
+        if (!can_read) {
           _E("read_socket: ...timed out fd %d: errno %d", fd, errno);
           return RPC_PORT_ERROR_IO_ERROR;
         }
-        TRY_SLEEP_TIME.tv_nsec *= 2;
-        if (TRY_SLEEP_TIME.tv_nsec > (MAX_SLEEP * BASE_SLEEP))
-          TRY_SLEEP_TIME.tv_nsec = MAX_SLEEP * BASE_SLEEP;
+
         continue;
       }
 
@@ -158,19 +174,34 @@ int Port::Read(void* buf, unsigned int size) {
     left -= nb;
     buffer += nb;
     bytes_read += nb;
-    TRY_SLEEP_TIME.tv_nsec = MIN_SLEEP * BASE_SLEEP;
+    timeout = MIN_TIMEOUT;
   }
 
   return RPC_PORT_ERROR_NONE;
 }
 
+bool Port::CanRead(int timeout) {
+  struct pollfd fds[1];
+  fds[0].fd = fd_;
+  fds[0].events = POLLIN;
+  fds[0].revents = 0;
+  int ret = poll(fds, 1, timeout);
+  if (ret <= 0) {
+    _W("poll() is failed. fd(%d), error(%s)",
+        fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
+    return false;
+  }
+
+  return true;
+}
+
 bool Port::CanWrite() {
   struct pollfd fds[1];
   fds[0].fd = fd_;
   fds[0].events = POLLOUT;
   fds[0].revents = 0;
   int ret = poll(fds, 1, 100);
-  if (ret == 0 || ret < 0) {
+  if (ret <= 0) {
     _W("poll() is failed. fd(%d), error(%s)",
         fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
     return false;
index ba8e66c3267f8c1b770878dbed1a00edebbc9e50..853b27fe6a008d0f7441b3396976bb40fc0e3500 100644 (file)
@@ -67,6 +67,7 @@ class Port : public std::enable_shared_from_this<Port> {
   }
 
  private:
+  bool CanRead(int timeout);
   bool CanWrite();
   void IgnoreIOEvent();
   int ListenIOEvent();