Fix Port::Read() method 66/308666/3
authorHwankyu Jhun <h.jhun@samsung.com>
Thu, 28 Mar 2024 11:58:36 +0000 (20:58 +0900)
committerHwanKyu Jhun <h.jhun@samsung.com>
Thu, 28 Mar 2024 23:07:29 +0000 (23:07 +0000)
While calling Read() method, the fd is changed to blocking mode.
Before returning the method, the fd is changed to nonblocking mode.

Change-Id: I251855505e90804ee08a0f129cf9b95f198decd7
Signed-off-by: Hwankyu Jhun <h.jhun@samsung.com>
src/port-internal.cc
src/port-internal.hh

index ecb6aea..a8c6ec7 100644 (file)
@@ -16,6 +16,7 @@
 
 #include <aul_rpc_port.h>
 #include <dlog.h>
+#include <fcntl.h>
 #include <poll.h>
 #include <stdio.h>
 #include <stdlib.h>
@@ -73,10 +74,13 @@ Port::Port(int fd, std::string id)
   uuid_generate(u);
   uuid_unparse(u, uuid);
   instance_ = std::string(uuid) + ":" + id_;
+  SetReceiveTimeout(10000);
 }
 
 Port::Port(int fd, std::string id, std::string instance)
-    : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {}
+    : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {
+  SetReceiveTimeout(10000);
+}
 
 Port::~Port() {
   std::lock_guard<std::recursive_mutex> lock(mutex_);
@@ -123,84 +127,69 @@ int Port::Read(void* buf, unsigned int size) {
   ssize_t nb;
   int bytes_read = 0;
   char* buffer = static_cast<char*>(buf);
-  int max_timeout = MAX_TIMEOUT * MAX_RETRY_CNT;
-  int timeout = MIN_TIMEOUT;
-  int fd;
-
-  {
-    std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
-    fd = fd_;
-  }
+  int flags;
 
-  if (fd < 0 || fd >= sysconf(_SC_OPEN_MAX)) {
-    _E("Invalid fd(%d)", fd);  // LCOV_EXCL_LINE
+  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+  if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
+    _E("Invalid fd(%d)", fd_);  // LCOV_EXCL_LINE
     return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
   }
 
-  while (left) {
-    {
-      std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
-      nb = read(fd_, buffer, left);
-    }
+  flags = fcntl(fd_, F_GETFL, 0);
+  fcntl(fd_, F_SETFL, flags & ~O_NONBLOCK);
 
+  while (left) {
+    nb = read(fd_, buffer, left);
     if (nb == 0) {
-      _E("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd, nb);
+      _E("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd_, nb);
+      fcntl(fd_, F_SETFL, flags);
       return RPC_PORT_ERROR_IO_ERROR;
-    } else if (nb == -1) {
-      if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
-        bool can_read = false;
-        while (!can_read && max_timeout > 0) {
-          auto start = std::chrono::steady_clock::now();
-          can_read = CanRead(timeout);
-          auto end = std::chrono::steady_clock::now();
-          auto elapsed_time =
-              std::chrono::duration_cast<std::chrono::milliseconds>(
-                  end - start);
-
-          max_timeout -= elapsed_time.count();
-
-          timeout *= 2;
-          if (timeout > MAX_TIMEOUT)
-            timeout = MAX_TIMEOUT;
-        }
-
-        if (!can_read) {
-          _E("read_socket: ...timed out fd %d: errno %d", fd, errno);  // LCOV_EXCL_LINE
-          return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
-        }
+    }
 
-        continue;
-      }
+    if (nb == -1) {
+      if (errno == EINTR) continue;
 
-      _E("read_socket: ...error fd %d: errno %d\n", fd, errno);
+      _E("read_socket: ...error fd %d: errno %d\n", fd_, errno);
+      fcntl(fd_, F_SETFL, flags);
       return RPC_PORT_ERROR_IO_ERROR;
     }
 
     left -= nb;
     buffer += nb;
     bytes_read += nb;
-    timeout = MIN_TIMEOUT;
   }
 
+  fcntl(fd_, F_SETFL, flags);
   return RPC_PORT_ERROR_NONE;
 }
 
-bool Port::CanRead(int timeout) {
-  struct pollfd fds[1];
-  fds[0].fd = fd_;
-  fds[0].events = POLLIN;
-  fds[0].revents = 0;
-  int ret = poll(fds, 1, timeout);
-  if (ret <= 0) {
-    _W("poll() is failed. fd(%d), error(%s)",
-        fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
-    return false;
+// LCOV_EXCL_START
+int Port::SetReceiveTimeout(int timeout) {
+   if (timeout == INT_MAX)
+    return -EINVAL;
+
+  if (timeout == -1)
+    timeout = 10000;
+
+  if (timeout < 0) {
+    _E("Invalid parameter");
+    return -EINVAL;
   }
 
-  return true;
+  struct timeval tv = {
+    .tv_sec = static_cast<time_t>(timeout / 1000),
+    .tv_usec = static_cast<suseconds_t>((timeout % 1000) * 1000)
+  };
+  socklen_t len = static_cast<socklen_t>(sizeof(struct timeval));
+  int ret = setsockopt(fd_, SOL_SOCKET, SO_RCVTIMEO, &tv, len);
+  if (ret < 0) {
+    ret = -errno;
+    _E("setsockopt() is failed. errno(%d)", errno);
+  }
+
+  return ret;
 }
 
-// LCOV_EXCL_START
 bool Port::CanWrite() {
   struct pollfd fds[1];
   fds[0].fd = fd_;
index ee25d78..64d33ec 100644 (file)
@@ -68,7 +68,7 @@ class Port : public std::enable_shared_from_this<Port> {
 
  private:
   // LCOV_EXCL_START
-  bool CanRead(int timeout);
+  int SetReceiveTimeout(int timeout);
   bool CanWrite();
   void IgnoreIOEvent();
   int ListenIOEvent();