Modify Port implementation 25/315425/4
authorHwankyu Jhun <h.jhun@samsung.com>
Thu, 1 Aug 2024 01:10:55 +0000 (10:10 +0900)
committerHwankyu Jhun <h.jhun@samsung.com>
Thu, 1 Aug 2024 02:57:24 +0000 (11:57 +0900)
This patch separates the fd for use respectively for read & write purposes.

Change-Id: I35e51f6af0e91b0f830901a67dd815c01b1d3645
Signed-off-by: Hwankyu Jhun <h.jhun@samsung.com>
15 files changed:
src/rpc-port/accepted-port-internal.cc
src/rpc-port/accepted-port-internal.hh
src/rpc-port/debug-port-internal.cc
src/rpc-port/debug-port-internal.hh
src/rpc-port/port-internal.cc
src/rpc-port/port-internal.hh
src/rpc-port/proxy-internal.cc
src/rpc-port/proxy-internal.hh
src/rpc-port/proxy-port-internal.cc
src/rpc-port/proxy-port-internal.hh
src/rpc-port/rpc-port-internal.cc
src/rpc-port/rpc-port-parcel.cc
src/rpc-port/rpc-port.cc
src/rpc-port/stub-internal.cc
src/rpc-port/stub-internal.hh

index 3711e8e43072ae43759e3d88a2471b8ad93d116f..4ae733e302615eb4d72dc4349386343c6f9dcc4c 100644 (file)
 namespace rpc_port {
 namespace internal {
 
-AcceptedPort::AcceptedPort(int fd, std::string id, std::string inst,
-                           IEvent* listener, bool is_delegate)
-    : Port(fd, std::move(id), std::move(inst)),
+AcceptedPort::AcceptedPort(int read_fd, int write_fd, std::string id,
+                           std::string inst, IEvent* listener, bool is_delegate)
+    : Port(read_fd, write_fd, std::move(id), std::move(inst)),
       listener_(listener),
-      is_delegate_(is_delegate) {
-  Watch();
-}
+      is_delegate_(is_delegate) {}
 
-AcceptedPort::AcceptedPort(int fd, std::string id, IEvent* listener,
-                           bool is_delegate)
-    : Port(fd, std::move(id)), listener_(listener), is_delegate_(is_delegate) {
-  Watch();
-}
+AcceptedPort::AcceptedPort(int read_fd, int write_fd, std::string id,
+                           IEvent* listener, bool is_delegate)
+    : Port(read_fd, write_fd, std::move(id)),
+      listener_(listener),
+      is_delegate_(is_delegate) {}
 
 AcceptedPort::~AcceptedPort() {
   if (source_ && !g_source_is_destroyed(source_)) g_source_destroy(source_);
@@ -53,11 +51,16 @@ void AcceptedPort::Disconnect() {
   source_ = nullptr;
 }
 
+void AcceptedPort::SetReadFd(int read_fd) {
+  Port::SetReadFd(read_fd);
+  Watch();
+}
+
 void AcceptedPort::Watch() {
   int cond = G_IO_ERR | G_IO_HUP | G_IO_NVAL;
   if (!IsDelegate()) cond |= G_IO_IN;
 
-  source_ = g_unix_fd_source_new(GetFd(), static_cast<GIOCondition>(cond));
+  source_ = g_unix_fd_source_new(GetReadFd(), static_cast<GIOCondition>(cond));
   if (source_ == nullptr) {
     // LCOV_EXCL_START
     _E("g_unix_fd_source_new() is failed");
index 88d4b72ebb04f601e124cfbaf5d737ca6798d974..89cd5962f266ad2a573adcf5b52ee46d17bd47da 100644 (file)
@@ -35,14 +35,17 @@ class AcceptedPort : public Port {
     virtual void OnSocketDisconnected(int fd) = 0;
   };
 
-  AcceptedPort(int fd, std::string id, std::string inst, IEvent* listener,
+  AcceptedPort(int read_fd, int write_fd, std::string id, std::string inst,
+               IEvent* listener, bool is_delegate);
+  AcceptedPort(int read_fd, int write_fd, std::string id, IEvent* listener,
                bool is_delegate);
-  AcceptedPort(int fd, std::string id, IEvent* listener, bool is_delegate);
   virtual ~AcceptedPort();
 
   bool IsDelegate() const;
   void Disconnect() override;
 
+  void SetReadFd(int read_fd) override;
+
  private:
   void Watch();
   static gboolean UnixFdSourceFunc(gint fd, GIOCondition cond,
index eabe4328eb0a57a6a1de521fc9dae439e462c605..42144956a58e88cac280c0e5c5329cc5cdd1c9b2 100644 (file)
@@ -49,35 +49,29 @@ constexpr const char KEY_PORT_NAME[] = "__K_PORT_NAME__";
 
 class Session {
  public:
-  Session(std::string port_name, std::string destination,
-      int main_port, int delegate_port)
+  Session(std::string port_name, std::string destination, int main_read_fd,
+          int main_write_fd, int delegate_read_fd, int delegate_write_fd)
       : port_name_(std::move(port_name)),
         destination_(std::move(destination)),
-        main_port_(main_port),
-        delegate_port_(delegate_port) {
-  }
-
-  const std::string& GetPortName() const {
-    return port_name_;
-  }
-
-  const std::string& GetDestination() const {
-    return destination_;
-  }
-
-  int GetMainPort() const {
-    return main_port_;
-  }
-
-  int GetDelegatePort() const {
-    return delegate_port_;
-  }
+        main_read_fd_(main_read_fd),
+        main_write_fd_(main_write_fd),
+        delegate_read_fd_(delegate_read_fd),
+        delegate_write_fd_(delegate_write_fd) {}
+
+  const std::string& GetPortName() const { return port_name_; }
+  const std::string& GetDestination() const { return destination_; }
+  int GetMainReadFd() const { return main_read_fd_; }
+  int GetMainWriteFd() const { return main_write_fd_; }
+  int GetDelegateReadFd() const { return delegate_read_fd_; }
+  int GetDelegateWriteFd() const { return delegate_write_fd_; }
 
  private:
   std::string port_name_;
   std::string destination_;
-  int main_port_;
-  int delegate_port_;
+  int main_read_fd_;
+  int main_write_fd_;
+  int delegate_read_fd_;
+  int delegate_write_fd_;
 };
 
 class DebugPortImpl {
@@ -88,7 +82,8 @@ class DebugPortImpl {
   void Dispose();
   bool IsConnected() const;
   void AddSession(std::string port_name, std::string destination,
-      int main_port, int delegate_port);
+                  int main_read_fd, int main_write_fd, int delegate_read_fd,
+                  int delegate_write_fd);
   void RemoveSession(int port);
   int Send(int port, bool is_read, uint32_t seq,
       const void* buf, unsigned int size);
@@ -149,30 +144,34 @@ bool DebugPortImpl::IsConnected() const {
 }
 
 void DebugPortImpl::AddSession(std::string port_name, std::string destination,
-    int main_port, int delegate_port) {
+                               int main_read_fd, int main_write_fd,
+                               int delegate_read_fd, int delegate_write_fd) {
   std::lock_guard<std::recursive_mutex> lock(GetMutex());
   sessions_.emplace_back(
-      new Session(std::move(port_name), std::move(destination),
-        main_port, delegate_port));
+      new Session(std::move(port_name), std::move(destination), main_read_fd,
+                  main_write_fd, delegate_read_fd, delegate_write_fd));
 }
 
 void DebugPortImpl::RemoveSession(int port) {
   std::lock_guard<std::recursive_mutex> lock(GetMutex());
   auto iter = std::find_if(sessions_.begin(), sessions_.end(),
-      [port](std::shared_ptr<Session>& sess) -> bool {
-        return sess->GetMainPort() == port || sess->GetDelegatePort() == port;
-      });
-
+                           [port](std::shared_ptr<Session>& sess) -> bool {
+                             return sess->GetMainReadFd() == port ||
+                                    sess->GetMainWriteFd() == port ||
+                                    sess->GetDelegateReadFd() == port ||
+                                    sess->GetDelegateWriteFd() == port;
+                           });
   if (iter != sessions_.end()) {
     _W("Remove session. port(%d)", port);
     iter = sessions_.erase(iter);
   }
 }
 
-std::shared_ptr<Session> DebugPortImpl::FindSession(int port) {
+std::shared_ptr<Session> DebugPortImpl::FindSession(int fd) {
   std::lock_guard<std::recursive_mutex> lock(GetMutex());
   for (auto& s : sessions_) {
-    if (s->GetMainPort() == port || s->GetDelegatePort() == port)
+    if (s->GetMainReadFd() == fd || s->GetMainWriteFd() == fd ||
+        s->GetDelegateReadFd() == fd || s->GetDelegateWriteFd() == fd)
       return s;
   }
 
@@ -206,7 +205,8 @@ int DebugPortImpl::Send(int port, bool is_read, uint32_t seq,
   parcel.WriteInt64(time(nullptr));
   parcel.WriteString(session->GetPortName().c_str());
   parcel.WriteString(session->GetDestination().c_str());
-  parcel.WriteBool(session->GetDelegatePort() == port);
+  parcel.WriteBool(session->GetDelegateReadFd() == port ||
+                   session->GetDelegateWriteFd() == port);
   parcel.WriteInt32(port);
   parcel.WriteBool(is_read);
   parcel.WriteInt32(seq);
@@ -232,7 +232,7 @@ void DebugPortImpl::Init() {
     if (fd < 0)
       break;
 
-    port_.reset(new Port(fd, "Debug"));
+    port_.reset(new Port(-1, fd, "Debug"));
     if (Watch(fd) < 0)
       break;
 
@@ -385,7 +385,7 @@ int DebugPortImpl::AppComCb(const char* endpoint, aul_app_com_result_e result,
       return -1;
 
     std::lock_guard<std::recursive_mutex> lock(handle->GetMutex());
-    handle->port_.reset(new Port(fd, "Debug"));
+    handle->port_.reset(new Port(-1, fd, "Debug"));
     int ret = handle->Watch(fd);
     if (ret < 0)
       return -1;
@@ -408,10 +408,12 @@ bool DebugPort::IsConnected() {
 }
 
 void DebugPort::AddSession(std::string port_name, std::string destination,
-    int main_port, int delegate_port) {
+                           int main_read_fd, int main_write_fd,
+                           int delegate_read_fd, int delegate_write_fd) {
   impl.Init();
   return impl.AddSession(std::move(port_name), std::move(destination),
-      main_port, delegate_port);
+                         main_read_fd, main_write_fd, delegate_read_fd,
+                         delegate_write_fd);
 }
 
 void DebugPort::RemoveSession(int port) {
index 9b68c6587b1dd9ae8193c7a14f193b7e7ce43c64..080fc4a2c36d061c0586e1f230a59337c3542a97 100644 (file)
@@ -27,7 +27,8 @@ class DebugPort {
  public:
   static bool IsConnected();
   static void AddSession(std::string port_name, std::string destination,
-      int main_port, int delegate_port);
+                         int main_read_fd, int main_write_fd,
+                         int delegate_read_fd, int delegate_write_fd);
   static void RemoveSession(int port);
   static int Send(int port, bool is_read, uint32_t seq,
       const void* buf, unsigned int size);
index 3860ef719aace6ecf37c0e0a08162167b2edece9..e52d4543207b55fd63c17459a650900997a8ca26 100644 (file)
@@ -49,8 +49,12 @@ enum PortStatus {
 
 }  // namespace
 
-Port::Port(int fd, std::string id)
-    : fd_(fd), id_(std::move(id)), instance_(""), seq_(0) {
+Port::Port(int read_fd, int write_fd, std::string id)
+    : read_fd_(read_fd),
+      write_fd_(write_fd),
+      id_(std::move(id)),
+      instance_(""),
+      seq_(0) {
   char uuid[37];
   uuid_t u;
   uuid_generate(u);
@@ -59,8 +63,12 @@ Port::Port(int fd, std::string id)
   SetReceiveTimeout(10000);
 }
 
-Port::Port(int fd, std::string id, std::string instance)
-    : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {
+Port::Port(int read_fd, int write_fd, std::string id, std::string instance)
+    : read_fd_(read_fd),
+      write_fd_(write_fd),
+      id_(std::move(id)),
+      instance_(std::move(instance)),
+      seq_(0) {
   SetReceiveTimeout(10000);
 }
 
@@ -73,11 +81,22 @@ Port::~Port() {
 void Port::Disconnect() {
   IgnoreIOEvent();
 
-  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
-  if (fd_ > 0) {
-    _W("Close fd(%d)", fd_);
-    close(fd_);
-    fd_ = -1;
+  {
+    std::lock_guard<std::recursive_mutex> lock(read_mutex_);
+    if (read_fd_ > 0) {
+      _W("Close read_fd(%d)", read_fd_);
+      close(read_fd_);
+      read_fd_ = -1;
+    }
+  }
+
+  {
+    std::lock_guard<std::recursive_mutex> lock(write_mutex_);
+    if (write_fd_ > 0) {
+      _W("Close write_fd(%d)", write_fd_);
+      close(write_fd_);
+      write_fd_ = -1;
+    }
   }
 }
 
@@ -111,20 +130,20 @@ int Port::Read(void* buf, unsigned int size) {
   char* buffer = static_cast<char*>(buf);
   int flags;
 
-  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
-  if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
-    _E("Invalid fd(%d)", fd_);  // LCOV_EXCL_LINE
+  std::lock_guard<std::recursive_mutex> lock(read_mutex_);
+  if (read_fd_ < 0 || read_fd_ >= sysconf(_SC_OPEN_MAX)) {
+    _E("Invalid fd(%d)", read_fd_);  // LCOV_EXCL_LINE
     return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
   }
 
-  flags = fcntl(fd_, F_GETFL, 0);
-  fcntl(fd_, F_SETFL, flags & ~O_NONBLOCK);
+  flags = fcntl(read_fd_, F_GETFL, 0);
+  fcntl(read_fd_, F_SETFL, flags & ~O_NONBLOCK);
 
   while (left) {
-    nb = read(fd_, buffer, left);
+    nb = read(read_fd_, buffer, left);
     if (nb == 0) {
-      _E("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd_, nb);
-      fcntl(fd_, F_SETFL, flags);
+      _E("read_socket: ...read EOF, socket closed %d: nb %zd", read_fd_, nb);
+      fcntl(read_fd_, F_SETFL, flags);
       return RPC_PORT_ERROR_IO_ERROR;
     }
 
@@ -134,8 +153,8 @@ int Port::Read(void* buf, unsigned int size) {
         continue;
       }
 
-      _E("read_socket: ...error fd %d: errno %d\n", fd_, errno);
-      fcntl(fd_, F_SETFL, flags);
+      _E("read_socket: ...error fd %d: errno %d", read_fd_, errno);
+      fcntl(read_fd_, F_SETFL, flags);
       return RPC_PORT_ERROR_IO_ERROR;
     }
 
@@ -144,7 +163,7 @@ int Port::Read(void* buf, unsigned int size) {
     bytes_read += nb;
   }
 
-  fcntl(fd_, F_SETFL, flags);
+  fcntl(read_fd_, F_SETFL, flags);
   return RPC_PORT_ERROR_NONE;
 }
 
@@ -164,7 +183,7 @@ int Port::SetReceiveTimeout(int timeout) {
     .tv_usec = static_cast<suseconds_t>((timeout % 1000) * 1000)
   };
   socklen_t len = static_cast<socklen_t>(sizeof(struct timeval));
-  int ret = setsockopt(fd_, SOL_SOCKET, SO_RCVTIMEO, &tv, len);
+  int ret = setsockopt(read_fd_, SOL_SOCKET, SO_RCVTIMEO, &tv, len);
   if (ret < 0) {
     ret = -errno;
     _E("setsockopt() is failed. errno(%d)", errno);
@@ -175,13 +194,13 @@ int Port::SetReceiveTimeout(int timeout) {
 
 bool Port::CanWrite() {
   struct pollfd fds[1];
-  fds[0].fd = fd_;
+  fds[0].fd = write_fd_;
   fds[0].events = POLLOUT;
   fds[0].revents = 0;
   int ret = poll(fds, 1, 100);
   if (ret <= 0) {
-    _W("poll() is failed. fd(%d), error(%s)",
-        fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
+    _W("poll() is failed. write_fd(%d), error(%s)",
+        write_fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
     return false;
   }
 
@@ -192,7 +211,7 @@ bool Port::CanWrite() {
 int Port::Write(const void* buf, unsigned int size) {
   int sent_bytes = 0;
   int ret;
-  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+  std::lock_guard<std::recursive_mutex> lock(write_mutex_);
 
   if (queue_.empty()) {
     ret = Write(buf, size, &sent_bytes);
@@ -232,13 +251,13 @@ int Port::Write(const void* buf, unsigned int size, int* sent_bytes) {
   int retry_cnt = 0;
   const char* buffer = static_cast<const char*>(buf);
 
-  if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
-    _E("Invalid fd(%d)", fd_);  // LCOV_EXCL_LINE
+  if (write_fd_ < 0 || write_fd_ >= sysconf(_SC_OPEN_MAX)) {
+    _E("Invalid write_fd(%d)", write_fd_);  // LCOV_EXCL_LINE
     return PORT_STATUS_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
   }
 
   while (left && (retry_cnt < MAX_RETRY_CNT)) {
-    nb = send(fd_, buffer, left, MSG_NOSIGNAL);
+    nb = send(write_fd_, buffer, left, MSG_NOSIGNAL);
     if (nb == -1) {
       if (errno == EINTR) {
         // LCOV_EXCL_START
@@ -251,7 +270,7 @@ int Port::Write(const void* buf, unsigned int size, int* sent_bytes) {
       if (errno == EAGAIN || errno == EWOULDBLOCK)
         return PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE;
 
-      _E("write_socket: ...error fd: %d, errno: %d", fd_, errno);
+      _E("write_socket: ...error fd: %d, errno: %d", write_fd_, errno);
       return PORT_STATUS_ERROR_IO_ERROR;
     }
 
@@ -261,19 +280,29 @@ int Port::Write(const void* buf, unsigned int size, int* sent_bytes) {
   }
 
   if (left != 0) {
-    _E("error fd %d: retry_cnt %d", fd_, retry_cnt);
+    _E("error fd %d: retry_cnt %d", write_fd_, retry_cnt);
     return PORT_STATUS_ERROR_IO_ERROR;
   }
 
   return PORT_STATUS_ERROR_NONE;
 }
 
-int Port::GetFd() const { return fd_; }
+void Port::SetReadFd(int read_fd) { read_fd_ = read_fd; }
+
+int Port::GetReadFd() const { return read_fd_; }
+
+void Port::SetWriteFd(int write_fd) { write_fd_ = write_fd; }
+
+int Port::GetWriteFd() const { return write_fd_; }
 
 const std::string& Port::GetId() const { return id_; }
 
 std::recursive_mutex& Port::GetMutex() const { return mutex_; }
 
+std::recursive_mutex& Port::GetReadMutex() const { return read_mutex_; }
+
+std::recursive_mutex& Port::GetWriteMutex() const { return write_mutex_; }
+
 const std::string& Port::GetInstance() const { return instance_; }
 
 uint32_t Port::GetSeq() { return ++seq_; }
@@ -288,9 +317,8 @@ gboolean Port::UnixFdSourceFunc(gint fd, GIOCondition condition,
     return G_SOURCE_REMOVE;
   }
 
-  _W("Writing is now possible. fd: %d, id: %s",
-      port->GetFd(), port->GetId().c_str());
-  std::lock_guard<std::recursive_mutex> lock(port->rw_mutex_);
+  _W("Writing is now possible. fd: %d, id: %s", fd, port->GetId().c_str());
+  std::lock_guard<std::recursive_mutex> lock(port->write_mutex_);
   if (port->source_ == nullptr) {
     _E("GSource is destroyed");
     return G_SOURCE_REMOVE;
@@ -307,7 +335,7 @@ gboolean Port::UnixFdSourceFunc(gint fd, GIOCondition condition,
 // LCOV_EXCL_STOP
 
 void Port::ClearQueue() {
-  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+  std::lock_guard<std::recursive_mutex> lock(write_mutex_);
 
   while (queue_.empty() == false)
     queue_.pop();
@@ -318,7 +346,7 @@ void Port::ClearQueue() {
 
 // LCOV_EXCL_START
 void Port::IgnoreIOEvent() {
-  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+  std::lock_guard<std::recursive_mutex> lock(write_mutex_);
   if (source_ && !g_source_is_destroyed(source_)) {
     g_source_destroy(source_);
     source_ = nullptr;
@@ -326,8 +354,8 @@ void Port::IgnoreIOEvent() {
 }
 
 int Port::ListenIOEvent() {
-  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
-  source_ = g_unix_fd_source_new(fd_, G_IO_OUT);
+  std::lock_guard<std::recursive_mutex> lock(write_mutex_);
+  source_ = g_unix_fd_source_new(write_fd_, G_IO_OUT);
   if (source_ == nullptr) {
     _E("g_unix_fd_source_new() is failed");
     return RPC_PORT_ERROR_OUT_OF_MEMORY;
@@ -348,7 +376,7 @@ int Port::ListenIOEvent() {
 
 int Port::PopDelayedMessage() {
   int sent_bytes = 0;
-  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+  std::lock_guard<std::recursive_mutex> lock(write_mutex_);
   auto dm = queue_.front();
 
   int ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
@@ -367,7 +395,7 @@ int Port::PopDelayedMessage() {
 }
 
 int Port::PushDelayedMessage(std::shared_ptr<DelayedMessage> dm) {
-  std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+  std::lock_guard<std::recursive_mutex> lock(write_mutex_);
   if (queue_.empty()) {
     int ret = ListenIOEvent();
     if (ret != RPC_PORT_ERROR_NONE)
index d0f20137b4a84654684436e827a031ca8e2bd0c8..24cbf85fca96d80e3dbf6b54c05eb5ed8ac40b2e 100644 (file)
@@ -36,8 +36,8 @@ namespace internal {
 
 class Port : public std::enable_shared_from_this<Port> {
  public:
-  Port(int fd, std::string id, std::string instance);
-  Port(int fd, std::string id);
+  Port(int read_fd, int write_fd, std::string id, std::string instance);
+  Port(int read_fd, int write_fd, std::string id);
   virtual ~Port();
 
   virtual void Disconnect();
@@ -48,9 +48,14 @@ class Port : public std::enable_shared_from_this<Port> {
   int Read(void* buf, unsigned int size);
   int Write(const void* buf, unsigned int size);
   int Write(const void* buf, unsigned int size, int* sent_bytes);
-  int GetFd() const;
+  virtual void SetReadFd(int read_fd);
+  int GetReadFd() const;
+  virtual void SetWriteFd(int write_fd);
+  int GetWriteFd() const;
   const std::string& GetId() const;
   std::recursive_mutex& GetMutex() const;
+  std::recursive_mutex& GetReadMutex() const;
+  std::recursive_mutex& GetWriteMutex() const;
   const std::string& GetInstance() const;
   uint32_t GetSeq();
 
@@ -68,12 +73,14 @@ class Port : public std::enable_shared_from_this<Port> {
   // LCOV_EXCL_STOP
 
  private:
-  int fd_;
+  int read_fd_;
+  int write_fd_;
   std::string id_;
   std::string instance_;
   std::atomic<uint32_t> seq_;
   mutable std::recursive_mutex mutex_;
-  mutable std::recursive_mutex rw_mutex_;
+  mutable std::recursive_mutex read_mutex_;
+  mutable std::recursive_mutex write_mutex_;
   std::queue<std::shared_ptr<DelayedMessage>> queue_;
   size_t delayed_message_size_ = 0;
   GSource* source_ = nullptr;
index ce4dcc8c2a7411cdb0305b0f0083d67526b4513c..1bc692db701efd91b6c30290f0b7d011d0f9706b 100644 (file)
@@ -40,8 +40,10 @@ namespace rpc_port {
 namespace internal {
 namespace {
 
-constexpr const char kPortTypeMain[] = "main";
-constexpr const char kPortTypeDelegate[] = "delegate";
+constexpr const char kPortTypeMainRead[] = "main-0";
+constexpr const char kPortTypeMainWrite[] = "main-1";
+constexpr const char kPortTypeDelegateRead[] = "delegate-0";
+constexpr const char kPortTypeDelegateWrite[] = "delegate-1";
 constexpr const char kDPrefix[] = "d::";
 constexpr const char kUdPrefix[] = "ud::";
 
@@ -73,7 +75,7 @@ Proxy::~Proxy() {
   if (context_) g_main_context_unref(context_);
 
   if (main_port_.get() != nullptr)
-    DebugPort::RemoveSession(main_port_->GetFd());  // LCOV_EXCL_LINE
+    DebugPort::RemoveSession(main_port_->GetReadFd());  // LCOV_EXCL_LINE
 
   listener_ = nullptr;
   UnsetIdler();
@@ -81,85 +83,65 @@ Proxy::~Proxy() {
   Cancel();
 }
 
-int Proxy::MainPortConnect(const std::string& instance, bool sync) {
-  std::lock_guard<std::recursive_mutex> lock(GetMutex());
-  fds_[0] = 0;
-  main_client_.reset(ClientChannel::Create(this, port_path_));
-  if (main_client_.get() == nullptr)
-    return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
+int Proxy::CreateClientChannel(std::string instance, std::string port_type,
+                               bool sync,
+                               std::unique_ptr<ClientChannel>* channel) {
+  auto client = ClientChannel::Create(this, port_path_);
+  if (!client) return RPC_PORT_ERROR_IO_ERROR;
 
-  Request request(instance.c_str(), kPortTypeMain);
-  int ret = main_client_->Send(request);
-  if (ret != 0) return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
+  Request request(std::move(instance), std::move(port_type));
+  int ret = client->Send(request);
+  if (ret != 0) return RPC_PORT_ERROR_IO_ERROR;
 
-  main_client_->SetNonblock();
+  client->SetNonblock();
   if (sync) {
     Response response;
-    ret = main_client_->Receive(&response);
-    if (ret != 0) return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
+    ret = client->Receive(&response);
+    if (ret != 0) return RPC_PORT_ERROR_IO_ERROR;
 
-    if (response.GetResult() != 0) {
-      // LCOV_EXCL_START
+    if (response.GetResult() != 0) {  // LCOV_EXCL_START
       _E("Permission denied");
       return RPC_PORT_ERROR_PERMISSION_DENIED;
-      // LCOV_EXCL_STOP
-    }
-
-    fds_[0] = main_client_->RemoveFd();
+    }  // LCOV_EXCL_STOP
   } else {
-    ret = main_client_->Watch();
-    if (ret != 0) return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
-  }
-
-  return RPC_PORT_ERROR_NONE;
-}
-
-int Proxy::DelegatePortConnect(const std::string& instance, bool sync) {
-  std::lock_guard<std::recursive_mutex> lock(GetMutex());
-  fds_[1] = 0;
-  delegate_client_.reset(ClientChannel::Create(this, port_path_));
-  if (delegate_client_.get() == nullptr)
-    return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
-
-  Request request(instance.c_str(), kPortTypeDelegate);
-  int ret = delegate_client_->Send(request);
-  if (ret != 0) return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
-
-  delegate_client_->SetNonblock();
-  if (sync) {
-    Response response;
-    ret = delegate_client_->Receive(&response);
-    if (ret != 0) return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
-
-    if (response.GetResult() != 0) {
-      // LCOV_EXCL_START
-      _E("Permission denied");
-      return RPC_PORT_ERROR_PERMISSION_DENIED;
-      // LCOV_EXCL_STOP
-    }
-
-    fds_[1] = delegate_client_->RemoveFd();
-  } else {
-    ret = delegate_client_->Watch();
-    if (ret != 0) return RPC_PORT_ERROR_IO_ERROR;  // LCOV_EXCL_LINE
+    ret = client->Watch();
+    if (ret != 0) return RPC_PORT_ERROR_IO_ERROR;
   }
 
+  channel->reset(client);
   return RPC_PORT_ERROR_NONE;
 }
 
 int Proxy::Connect(bool sync) {
   std::lock_guard<std::recursive_mutex> lock(GetMutex());
   std::string instance = GenInstance();
-  int ret = MainPortConnect(instance, sync);
+  int ret =
+      CreateClientChannel(instance, kPortTypeMainRead, sync, &main_client_[0]);
   if (ret != RPC_PORT_ERROR_NONE) return ret;
 
-  ret = DelegatePortConnect(instance, sync);
+  ret =
+      CreateClientChannel(instance, kPortTypeMainWrite, sync, &main_client_[1]);
+  if (ret != RPC_PORT_ERROR_NONE) return ret;
+
+  ret = CreateClientChannel(instance, kPortTypeDelegateRead, sync,
+                            &delegate_client_[0]);
+  if (ret != RPC_PORT_ERROR_NONE) return ret;
+
+  ret = CreateClientChannel(instance, kPortTypeDelegateWrite, sync,
+                            &delegate_client_[1]);
   if (ret != RPC_PORT_ERROR_NONE) return ret;
 
   if (sync) {
-    main_port_.reset(new ProxyPort(fds_[0], target_appid_, this, false));
-    delegate_port_.reset(new ProxyPort(fds_[1], target_appid_, this));
-    DebugPort::AddSession(port_name_, target_appid_, fds_[0], fds_[1]);
+    int main_read_fd = main_client_[0]->RemoveFd();
+    int main_write_fd = main_client_[1]->RemoveFd();
+    main_port_.reset(
+        new ProxyPort(main_read_fd, main_write_fd, target_appid_, this, false));
+    int delegate_read_fd = delegate_client_[0]->RemoveFd();
+    int delegate_write_fd = delegate_client_[1]->RemoveFd();
+    delegate_port_.reset(new ProxyPort(delegate_read_fd, delegate_write_fd,
+                                       target_appid_, this));
+    DebugPort::AddSession(port_name_, target_appid_, main_read_fd,
+                          main_write_fd, delegate_read_fd, delegate_write_fd);
     listener_->OnConnected(target_appid_, main_port_.get());
   }
 
@@ -280,7 +262,7 @@ bool Proxy::WaitUntilPortCreation() {
 void Proxy::DisconnectPort() {
   std::lock_guard<std::recursive_mutex> lock(GetMutex());
   if (main_port_.get() != nullptr) {
-    DebugPort::RemoveSession(main_port_->GetFd());
+    DebugPort::RemoveSession(main_port_->GetReadFd());
     main_port_.reset();
   }
 }
@@ -484,8 +466,10 @@ gboolean Proxy::OnTimedOut(gpointer user_data) {
   }
 
   proxy->Cancel();
-  proxy->main_client_.reset();
-  proxy->delegate_client_.reset();
+  proxy->main_client_[0].reset();
+  proxy->main_client_[1].reset();
+  proxy->delegate_client_[0].reset();
+  proxy->delegate_client_[1].reset();
   DestroyWeakPtr(proxy->conn_timer_data_);
   proxy->conn_timer_data_ = nullptr;
 
@@ -560,14 +544,14 @@ void Proxy::OnDataReceived(int fd) {
     // LCOV_EXCL_STOP
   }
 
-  if (!delegate_port_ || delegate_port_->GetFd() != fd) return;
+  if (!delegate_port_ || delegate_port_->GetReadFd() != fd) return;
 
   char buffer[4];
   if (recv(fd, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT) == 0) {
     _W("Socket was disconnected by stub. fd(%d)", fd);
     listener_ = nullptr;
     if (main_port_.get() != nullptr) {
-      DebugPort::RemoveSession(main_port_->GetFd());
+      DebugPort::RemoveSession(fd);
       main_port_.reset();
     }
 
@@ -589,8 +573,10 @@ void Proxy::OnChannelDisconnected(int fd) {
   }
 
   _W("Socket was disconnected. fd(%d)", fd);
-  main_client_.reset();
-  delegate_client_.reset();
+  main_client_[0].reset();
+  main_client_[1].reset();
+  delegate_client_[0].reset();
+  delegate_client_[1].reset();
   auto* listener = listener_;
   listener_ = nullptr;
   listener->OnDisconnected(target_appid_);
@@ -600,21 +586,18 @@ void Proxy::OnChannelDisconnected(int fd) {
 void Proxy::OnResponseReceived(int fd) {
   std::lock_guard<std::recursive_mutex> lock(GetMutex());
   UnsetConnTimer();
-  if (listener_ == nullptr) {
-    // LCOV_EXCL_START
+  if (listener_ == nullptr) {  // LCOV_EXCL_START
     _E("Invalid context");
     return;
-    // LCOV_EXCL_STOP
-  }
+  }  // LCOV_EXCL_STOP
 
+  bool is_read = false;
   bool is_delegate = false;
-  std::unique_ptr<ClientChannel> client = GetClient(fd, &is_delegate);
-  if (!client) {
-    // LCOV_EXCL_START
+  std::unique_ptr<ClientChannel> client = GetClient(fd, &is_read, &is_delegate);
+  if (!client) {  // LCOV_EXCL_START
     _E("Unknown fd(%d)", fd);
     return;
-    // LCOV_EXCL_STOP
-  }
+  }  // LCOV_EXCL_STOP
 
   Response response;
   int ret = client->Receive(&response);
@@ -622,15 +605,17 @@ void Proxy::OnResponseReceived(int fd) {
     _E("Permission denied");
     auto* listener = listener_;
     listener_ = nullptr;
-    main_client_.reset();
-    delegate_client_.reset();
+    main_client_[0].reset();
+    main_client_[1].reset();
+    delegate_client_[0].reset();
+    delegate_client_[1].reset();
     listener->OnRejected(target_appid_, RPC_PORT_ERROR_PERMISSION_DENIED);
     return;
   }
 
   client->SetNonblock();
   int client_fd = client->RemoveFd();
-  SetPort(client_fd, is_delegate);
+  SetPort(client_fd, is_read, is_delegate);
 }
 
 std::shared_ptr<Proxy> Proxy::GetSharedPtr() { return shared_from_this(); }
@@ -647,37 +632,69 @@ void Proxy::DestroyWeakPtr(gpointer data) {
 
 bool Proxy::HasRequested() const {
   return listener_ != nullptr &&
-         (!main_port_ || !delegate_port_ || main_port_->GetFd() > 0);
+         (!main_port_ || !delegate_port_ || main_port_->GetReadFd() > 0);
 }
 
-std::unique_ptr<ClientChannel> Proxy::GetClient(int fd, bool* is_delegate) {
+std::unique_ptr<ClientChannel> Proxy::GetClient(int fd, bool* is_read,
+                                                bool* is_delegate) {
   std::unique_ptr<ClientChannel> client;
-  if (main_client_ && main_client_->GetFd() == fd) {
-    client.reset(main_client_.release());
+  if (main_client_[0] && main_client_[0]->GetFd() == fd) {
+    *is_read = true;
     *is_delegate = false;
-  } else if (delegate_client_ && delegate_client_->GetFd() == fd) {
-    client.reset(delegate_client_.release());
+    client = std::move(main_client_[0]);
+  } else if (main_client_[1] && main_client_[1]->GetFd() == fd) {
+    *is_read = false;
+    *is_delegate = false;
+    client = std::move(main_client_[1]);
+  } else if (delegate_client_[0] && delegate_client_[0]->GetFd() == fd) {
+    *is_read = true;
+    *is_delegate = true;
+    client = std::move(delegate_client_[0]);
+  } else if (delegate_client_[1] && delegate_client_[1]->GetFd() == fd) {
+    *is_read = false;
     *is_delegate = true;
+    client = std::move(delegate_client_[1]);
   }
 
   return client;
 }
 
-void Proxy::SetPort(int fd, bool is_delegate) {
+void Proxy::SetPort(int fd, bool is_read, bool is_delegate) {
   if (is_delegate) {
-    _W("[Delegate] fd=%d", fd);
-    fds_[1] = fd;
-    delegate_port_.reset(new ProxyPort(fds_[1], target_appid_, this));
+    if (!delegate_port_)
+      delegate_port_.reset(new ProxyPort(-1, -1, target_appid_, this));
+
+    if (is_read) {
+      _W("[DELEGATE] read_fd=%d", fd);
+      delegate_port_->SetReadFd(fd);
+    } else {
+      _W("[DELEGATE] write_fd=%d", fd);
+      delegate_port_->SetWriteFd(fd);
+    }
   } else {
-    _W("[Main] fd=%d", fd);
-    fds_[0] = fd;
-    main_port_.reset(new ProxyPort(fds_[0], target_appid_, this, false));
+    if (!main_port_)
+      main_port_.reset(new ProxyPort(-1, -1, target_appid_, this, false));
+
+    if (is_read) {
+      _W("[MAIN] read_fd=%d", fd);
+      main_port_->SetReadFd(fd);
+    } else {
+      _W("[MAIN] write_fd=%d", fd);
+      main_port_->SetWriteFd(fd);
+    }
   }
 
-  if (main_port_ && delegate_port_) {
-    _W("target_appid=%s, port_name=%s, main_fd=%d, delegate_fd=%d",
-       target_appid_.c_str(), port_name_.c_str(), fds_[0], fds_[1]);
-    DebugPort::AddSession(port_name_, target_appid_, fds_[0], fds_[1]);
+  if (main_port_ && main_port_->GetReadFd() > 0 &&
+      main_port_->GetWriteFd() > 0 && delegate_port_ &&
+      delegate_port_->GetReadFd() > 0 && delegate_port_->GetWriteFd() > 0) {
+    _W("[CONNECTED] target_appid=%s, port_name=%s, main_fd=%d:%d, "
+       "delegate_fd=%d:%d",
+       target_appid_.c_str(), port_name_.c_str(), main_port_->GetReadFd(),
+       main_port_->GetWriteFd(), delegate_port_->GetReadFd(),
+       delegate_port_->GetWriteFd());
+    DebugPort::AddSession(port_name_, target_appid_, main_port_->GetReadFd(),
+                          main_port_->GetWriteFd(), delegate_port_->GetReadFd(),
+                          delegate_port_->GetWriteFd());
     listener_->OnConnected(target_appid_, main_port_.get());
   }
 }
index f5a49dbcd078f854ccf491df69fd9aefaad3cc2d..4543be5284d0716a5035d2ce6b19ba0b328add6b 100644 (file)
@@ -72,8 +72,8 @@ class Proxy : public std::enable_shared_from_this<Proxy>,
 
   void SetRealAppId(const std::string& alias_appid);
   std::recursive_mutex& GetMutex() const;
-  int MainPortConnect(const std::string& instance, bool sync);
-  int DelegatePortConnect(const std::string& instance, bool sync);
+  int CreateClientChannel(std::string instance, std::string port_type,
+                          bool sync, std::unique_ptr<ClientChannel>* channel);
   int Connect(bool sync);
   bool WaitUntilPortCreation();
   int Watch();
@@ -87,8 +87,9 @@ class Proxy : public std::enable_shared_from_this<Proxy>,
   gpointer CreateWeakPtr();
   static void DestroyWeakPtr(gpointer data);
   bool HasRequested() const;
-  std::unique_ptr<ClientChannel> GetClient(int fd, bool* is_delegate);
-  void SetPort(int fd, bool is_delegate);
+  std::unique_ptr<ClientChannel> GetClient(int fd, bool* is_ready,
+                                           bool* is_delegate);
+  void SetPort(int fd, bool is_read, bool is_delegate);
 
  private:
   std::string port_name_;
@@ -98,9 +99,8 @@ class Proxy : public std::enable_shared_from_this<Proxy>,
   IEventListener* listener_ = nullptr;
   std::string target_appid_;
   std::string real_appid_;
-  int fds_[2];
-  std::unique_ptr<ClientChannel> main_client_;
-  std::unique_ptr<ClientChannel> delegate_client_;
+  std::unique_ptr<ClientChannel> main_client_[2];
+  std::unique_ptr<ClientChannel> delegate_client_[2];
   gpointer conn_timer_data_ = nullptr;
   gpointer idler_data_ = nullptr;
   mutable std::recursive_mutex mutex_;
index 7e8f147a2135ad44b2170970c28ec1ff0c85f166..aca2fec3fe27a6b7f3e805311a8351893e3d7f90 100644 (file)
 namespace rpc_port {
 namespace internal {
 
-ProxyPort::ProxyPort(int fd, std::string id, IEvent* listener, bool is_delegate)
-    : Port(fd, std::move(id)), listener_(listener) {
-  Watch(is_delegate);
-}
+ProxyPort::ProxyPort(int read_fd, int write_fd, std::string id,
+                     IEvent* listener, bool is_delegate)
+    : Port(read_fd, write_fd, std::move(id)),
+      listener_(listener),
+      is_delegate_(is_delegate) {}
 
 ProxyPort::~ProxyPort() {
   if (source_ && !g_source_is_destroyed(source_)) g_source_destroy(source_);
 }
 
-void ProxyPort::Watch(bool is_delegate) {
+void ProxyPort::Watch(int fd) {
   int cond = G_IO_ERR | G_IO_HUP | G_IO_NVAL;
-  if (is_delegate) cond |= G_IO_IN;
+  if (IsDelegate() && GetReadFd() == fd) cond |= G_IO_IN;
 
-  source_ = g_unix_fd_source_new(GetFd(), static_cast<GIOCondition>(cond));
+  source_ = g_unix_fd_source_new(fd, static_cast<GIOCondition>(cond));
   if (source_ == nullptr) {
     // LCOV_EXCL_START
     _E("g_unix_fd_source_new() is failed");
@@ -53,6 +54,13 @@ void ProxyPort::Watch(bool is_delegate) {
   g_source_unref(source_);
 }
 
+bool ProxyPort::IsDelegate() const { return is_delegate_; }
+
+void ProxyPort::SetReadFd(int read_fd) {
+  Port::SetReadFd(read_fd);
+  Watch(read_fd);
+}
+
 void ProxyPort::Disconnect() {
   Port::Disconnect();
   if (source_ && !g_source_is_destroyed(source_)) g_source_destroy(source_);
index ab4fdbbf29f893979d6f32af10bd99fe158ef28e..e7a47b1386c418f3f1d050428c80d7a7782d0a76 100644 (file)
@@ -35,18 +35,22 @@ class ProxyPort : public Port {
     virtual void OnSocketDisconnected(int fd) = 0;
   };
 
-  ProxyPort(int fd, std::string id, IEvent* listener, bool is_delegate = true);
+  ProxyPort(int read_fd, int write_fd, std::string id, IEvent* listener,
+            bool is_delegate = true);
   virtual ~ProxyPort();
 
+  bool IsDelegate() const;
   void Disconnect() override;
+  void SetReadFd(int read_fd) override;
 
  private:
-  void Watch(bool is_delegate);
+  void Watch(int fd);
   static gboolean UnixFdSourceFunc(gint fd, GIOCondition cond,
                                    gpointer user_data);
 
  private:
   IEvent* listener_;
+  bool is_delegate_;
   GSource* source_ = nullptr;
 };
 
index ba78616e58eec0b020ad98abd212eca2ad9880ed..a54b8eb3bedbe99ce9e9d4cde7b559e1df8cd515 100644 (file)
@@ -73,7 +73,7 @@ RPC_API int  rpc_port_get_peer_info(rpc_port_h h, pid_t* pid, uid_t* uid) {
     return RPC_PORT_ERROR_INVALID_PARAMETER;
 
   auto port = static_cast<Port*>(h);
-  std::shared_ptr<PeerCred> cred(PeerCred::Get(port->GetFd()));
+  std::shared_ptr<PeerCred> cred(PeerCred::Get(port->GetReadFd()));
   if (cred.get() == nullptr)
     return RPC_PORT_ERROR_IO_ERROR;
 
index e78b6da58cc606c4b1ab3b56c3212b82baf099a2..c2710c6597e9b87511fcdebafedd7c599df6df2f 100644 (file)
@@ -55,7 +55,7 @@ RPC_API int rpc_port_parcel_create_from_port(rpc_port_parcel_h* h,
 
   internal::Port* pt = static_cast<internal::Port*>(port);
   {
-    std::lock_guard<std::recursive_mutex> lock(pt->GetMutex());
+    std::lock_guard<std::recursive_mutex> lock(pt->GetReadMutex());
     int ret = rpc_port_read(port, &len, 4);
     if (ret != 0)
       return ret;
@@ -105,7 +105,7 @@ RPC_API int rpc_port_parcel_send(rpc_port_parcel_h h, rpc_port_h port) {
 
   internal::Port* pt = static_cast<internal::Port*>(port);
   {
-    std::lock_guard<std::recursive_mutex> lock(pt->GetMutex());
+    std::lock_guard<std::recursive_mutex> lock(pt->GetWriteMutex());
     int ret = rpc_port_write(port, &len, sizeof(len));
     if (ret != 0)
       return ret;
index 991ed07eed822dc46ccab6c2fde6050e95fdfa2c..0e8fdd7540a7b9e3a9c67faf11947f50dda719f3 100644 (file)
@@ -243,7 +243,7 @@ RPC_API int rpc_port_read(rpc_port_h h, void* buf, unsigned int size) {
   }
 
   if (DebugPort::IsConnected())
-    DebugPort::Send(port->GetFd(), true, seq, buf, size);
+    DebugPort::Send(port->GetReadFd(), true, seq, buf, size);
 
   return RPC_PORT_ERROR_NONE;
 }
@@ -265,7 +265,7 @@ RPC_API int rpc_port_write(rpc_port_h h, const void* buf, unsigned int size) {
     return ret;
 
   if (DebugPort::IsConnected())
-    DebugPort::Send(port->GetFd(), false, seq, buf, size);
+    DebugPort::Send(port->GetWriteFd(), false, seq, buf, size);
 
   return RPC_PORT_ERROR_NONE;
 }
index 22f1060ac4d0ac33a2d090d5bbcfc7b58bd39d00..dc705f9da2534ea89dfc44dd97b7513575d50773 100644 (file)
@@ -40,8 +40,10 @@ namespace rpc_port {
 namespace internal {
 namespace {
 
-constexpr const char kPortTypeMain[] = "main";
-constexpr const char kPortTypeDelegate[] = "delegate";
+constexpr const char kPortTypeMainWrite[] = "main-0";
+constexpr const char kPortTypeMainRead[] = "main-1";
+constexpr const char kPortTypeDelegateWrite[] = "delegate-0";
+constexpr const char kPortTypeDelegateRead[] = "delegate-1";
 constexpr uid_t kRegularUidMin = 5000;
 
 }  // namespace
@@ -60,7 +62,7 @@ Stub::~Stub() {
 
   for (auto& p : ports_) {
     if (!p->IsDelegate())
-      DebugPort::RemoveSession(p->GetFd());
+      DebugPort::RemoveSession(p->GetReadFd());
   }
 
   listener_ = nullptr;
@@ -133,13 +135,13 @@ const std::string& Stub::GetPortName() const {
   return port_name_;
 }
 
-void Stub::RemoveAcceptedPorts(std::string instance) {
+void Stub::RemoveAcceptedPorts(const std::string& instance) {
   std::lock_guard<std::recursive_mutex> lock(GetMutex());
   auto iter = ports_.begin();
   while (iter != ports_.end()) {
     if ((*iter)->GetInstance().compare(instance) == 0) {
-      LOGI("Close: fd(%d)", (*iter)->GetFd());
-      DebugPort::RemoveSession((*iter)->GetFd());
+      LOGI("Close: fd(%d)", (*iter)->GetReadFd());
+      DebugPort::RemoveSession((*iter)->GetReadFd());
       iter = ports_.erase(iter);
     } else {
       iter++;
@@ -212,7 +214,7 @@ void Stub::OnDataReceived(int fd) {
   }
 
   for (auto& p : ports_) {
-    if (p->GetFd() == fd && !p->IsDelegate()) {
+    if (p->GetReadFd() == fd && !p->IsDelegate()) {
       char buffer[4];
       if (recv(fd, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT) == 0) {
         _W("Socket was disconnected from proxy. fd(%d)", fd);
@@ -247,7 +249,7 @@ void Stub::OnSocketDisconnected(gint fd) {
 
   _W("Socket was disconnected. fd(%d)", fd);
   for (auto& p : ports_) {
-    if (p->GetFd() == fd) {
+    if (p->GetReadFd() == fd || p->GetWriteFd() == fd) {
       listener_->OnDisconnected(p->GetId(), p->GetInstance());
       RemoveAcceptedPorts(p->GetInstance());
       Aul::NotifyRpcFinished();
@@ -259,26 +261,37 @@ void Stub::OnSocketDisconnected(gint fd) {
 void Stub::AddAcceptedPort(const std::string& sender_appid,
     const std::string& instance, const std::string& port_type, int fd) {
   std::lock_guard<std::recursive_mutex> lock(GetMutex());
-  if (port_type == kPortTypeMain) {
+  if (port_type == kPortTypeMainWrite) {
     ports_.emplace_back(
-        new AcceptedPort(fd, sender_appid, instance, this, false));
-    return;
+        new AcceptedPort(-1, fd, sender_appid, instance, this, false));
+  } else if (port_type == kPortTypeMainRead) {
+    auto port = FindPort(instance);
+    if (port) port->SetReadFd(fd);
+  } else if (port_type == kPortTypeDelegateWrite) {
+    ports_.emplace_back(
+        new AcceptedPort(-1, fd, sender_appid, instance, this, true));
+  } else if (port_type == kPortTypeDelegateRead) {
+    auto port = FindDelegatePort(instance);
+    if (port) port->SetReadFd(fd);
   }
 
-  ports_.emplace_back(new AcceptedPort(fd, sender_appid, instance, this, true));
+  auto main_port = FindPort(instance);
+  if (!main_port || main_port->GetReadFd() < 0 || main_port->GetWriteFd() < 0)
+    return;
 
-  int main_fd = -1;
-  for (auto& p : ports_) {
-    if (p->GetId() == sender_appid && p->GetInstance() == instance &&
-        p->GetFd() != fd) {
-      main_fd = p->GetFd();
-      break;
-    }
-  }
+  auto delegate_port = FindDelegatePort(instance);
+  if (!delegate_port || delegate_port->GetReadFd() < 0 ||
+      delegate_port->GetWriteFd() < 0)
+    return;
 
-  _W("sender_appid(%s), instance(%s), main_fd(%d), delegate_fd(%d)",
-      sender_appid.c_str(), instance.c_str(), main_fd, fd);
-  DebugPort::AddSession(port_name_, sender_appid, main_fd, fd);
+  _W("[CONNECTED] sender_appid(%s), instance(%s), main_fd(%d:%d), "
+     "delegate_fd(%d:%d)",
+     sender_appid.c_str(), instance.c_str(), main_port->GetReadFd(),
+     main_port->GetWriteFd(), delegate_port->GetReadFd(),
+     delegate_port->GetWriteFd());
+  DebugPort::AddSession(port_name_, sender_appid, main_port->GetReadFd(),
+                        main_port->GetWriteFd(), delegate_port->GetReadFd(),
+                        delegate_port->GetWriteFd());
   listener_->OnConnected(sender_appid, instance);
 }
 
index af15f68916efd54f6845974a3a8625fb26ba9465..f4818a2e90fb497046989ca545b16272d06a6f00 100644 (file)
@@ -67,7 +67,7 @@ class Stub : public Server::IEvent, public AcceptedPort::IEvent {
  private:
   void AddAcceptedPort(const std::string& sender_appid,
       const std::string& instance, const std::string& port_type, int fd);
-  void RemoveAcceptedPorts(std::string instance);
+  void RemoveAcceptedPorts(const std::string& instance);
   std::recursive_mutex& GetMutex() const;
   int GetFdFromSystemd();
   int CreateServerSocket();