Fix Write() implemenation 81/253881/3
authorHwankyu Jhun <h.jhun@samsung.com>
Fri, 19 Feb 2021 04:09:02 +0000 (13:09 +0900)
committerHwankyu Jhun <h.jhun@samsung.com>
Fri, 19 Feb 2021 05:26:26 +0000 (14:26 +0900)
After queuing the message, rpc-port checks whether writing is now possible
or not.

Change-Id: I765a9b02f67755c2838fe81149d71817aa7f9763
Signed-off-by: Hwankyu Jhun <h.jhun@samsung.com>
src/port-internal.cc
src/port-internal.hh

index 0b61a27..1c939da 100644 (file)
@@ -158,6 +158,21 @@ int Port::Read(void* buf, unsigned int size) {
   return RPC_PORT_ERROR_NONE;
 }
 
+bool Port::CanWrite() {
+  struct pollfd fds[1];
+  fds[0].fd = fd_;
+  fds[0].events = POLLOUT;
+  fds[0].revents = 0;
+  int ret = poll(fds, 1, 100);
+  if (ret == 0 || ret < 0) {
+    _W("poll() is failed. fd(%d), error(%s)",
+        fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
+    return false;
+  }
+
+  return true;
+}
+
 int Port::Write(const void* buf, unsigned int size) {
   int sent_bytes = 0;
   int ret;
@@ -180,6 +195,19 @@ int Port::Write(const void* buf, unsigned int size) {
   ret = PushDelayedMessage(
           std::make_shared<DelayMessage>(static_cast<const char*>(buf),
           sent_bytes, size));
+
+  if (CanWrite()) {
+    while (!queue_.empty()) {
+      int port_status = PopDelayedMessage();
+      if (port_status != PORT_STATUS_ERROR_NONE) {
+        if (port_status == PORT_STATUS_ERROR_IO_ERROR)
+          return RPC_PORT_ERROR_IO_ERROR;
+
+        break;
+      }
+    }
+  }
+
   return ret;
 }
 
@@ -257,14 +285,13 @@ void Port::ClearQueue() {
 
 int Port::PopDelayedMessage() {
   int sent_bytes = 0;
-  int ret;
   std::lock_guard<std::recursive_mutex> lock(mutex_);
   auto dm = queue_.front();
 
-  ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
+  int ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
   if (ret == PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE) {
     dm->SetIndex(sent_bytes);
-  } else if(ret == PORT_STATUS_ERROR_IO_ERROR) {
+  } else if (ret == PORT_STATUS_ERROR_IO_ERROR) {
     ClearQueue();
   } else {
     delayed_message_size_ -= dm->GetOriginalSize();
index 96c31a5..0286304 100644 (file)
@@ -65,6 +65,8 @@ class Port {
   }
 
  private:
+  bool CanWrite();
+
   class DelayMessage {
    public:
     DelayMessage(const char* msg, int start_index, int size);