This patch separates the fd for use respectively for read & write purposes.
Change-Id: I35e51f6af0e91b0f830901a67dd815c01b1d3645
Signed-off-by: Hwankyu Jhun <h.jhun@samsung.com>
namespace rpc_port {
namespace internal {
-AcceptedPort::AcceptedPort(int fd, std::string id, std::string inst,
- IEvent* listener, bool is_delegate)
- : Port(fd, std::move(id), std::move(inst)),
+AcceptedPort::AcceptedPort(int read_fd, int write_fd, std::string id,
+ std::string inst, IEvent* listener, bool is_delegate)
+ : Port(read_fd, write_fd, std::move(id), std::move(inst)),
listener_(listener),
- is_delegate_(is_delegate) {
- Watch();
-}
+ is_delegate_(is_delegate) {}
-AcceptedPort::AcceptedPort(int fd, std::string id, IEvent* listener,
- bool is_delegate)
- : Port(fd, std::move(id)), listener_(listener), is_delegate_(is_delegate) {
- Watch();
-}
+AcceptedPort::AcceptedPort(int read_fd, int write_fd, std::string id,
+ IEvent* listener, bool is_delegate)
+ : Port(read_fd, write_fd, std::move(id)),
+ listener_(listener),
+ is_delegate_(is_delegate) {}
AcceptedPort::~AcceptedPort() {
if (source_ && !g_source_is_destroyed(source_)) g_source_destroy(source_);
source_ = nullptr;
}
+void AcceptedPort::SetReadFd(int read_fd) {
+ Port::SetReadFd(read_fd);
+ Watch();
+}
+
void AcceptedPort::Watch() {
int cond = G_IO_ERR | G_IO_HUP | G_IO_NVAL;
if (!IsDelegate()) cond |= G_IO_IN;
- source_ = g_unix_fd_source_new(GetFd(), static_cast<GIOCondition>(cond));
+ source_ = g_unix_fd_source_new(GetReadFd(), static_cast<GIOCondition>(cond));
if (source_ == nullptr) {
// LCOV_EXCL_START
_E("g_unix_fd_source_new() is failed");
virtual void OnSocketDisconnected(int fd) = 0;
};
- AcceptedPort(int fd, std::string id, std::string inst, IEvent* listener,
+ AcceptedPort(int read_fd, int write_fd, std::string id, std::string inst,
+ IEvent* listener, bool is_delegate);
+ AcceptedPort(int read_fd, int write_fd, std::string id, IEvent* listener,
bool is_delegate);
- AcceptedPort(int fd, std::string id, IEvent* listener, bool is_delegate);
virtual ~AcceptedPort();
bool IsDelegate() const;
void Disconnect() override;
+ void SetReadFd(int read_fd) override;
+
private:
void Watch();
static gboolean UnixFdSourceFunc(gint fd, GIOCondition cond,
class Session {
public:
- Session(std::string port_name, std::string destination,
- int main_port, int delegate_port)
+ Session(std::string port_name, std::string destination, int main_read_fd,
+ int main_write_fd, int delegate_read_fd, int delegate_write_fd)
: port_name_(std::move(port_name)),
destination_(std::move(destination)),
- main_port_(main_port),
- delegate_port_(delegate_port) {
- }
-
- const std::string& GetPortName() const {
- return port_name_;
- }
-
- const std::string& GetDestination() const {
- return destination_;
- }
-
- int GetMainPort() const {
- return main_port_;
- }
-
- int GetDelegatePort() const {
- return delegate_port_;
- }
+ main_read_fd_(main_read_fd),
+ main_write_fd_(main_write_fd),
+ delegate_read_fd_(delegate_read_fd),
+ delegate_write_fd_(delegate_write_fd) {}
+
+ const std::string& GetPortName() const { return port_name_; }
+ const std::string& GetDestination() const { return destination_; }
+ int GetMainReadFd() const { return main_read_fd_; }
+ int GetMainWriteFd() const { return main_write_fd_; }
+ int GetDelegateReadFd() const { return delegate_read_fd_; }
+ int GetDelegateWriteFd() const { return delegate_write_fd_; }
private:
std::string port_name_;
std::string destination_;
- int main_port_;
- int delegate_port_;
+ int main_read_fd_;
+ int main_write_fd_;
+ int delegate_read_fd_;
+ int delegate_write_fd_;
};
class DebugPortImpl {
void Dispose();
bool IsConnected() const;
void AddSession(std::string port_name, std::string destination,
- int main_port, int delegate_port);
+ int main_read_fd, int main_write_fd, int delegate_read_fd,
+ int delegate_write_fd);
void RemoveSession(int port);
int Send(int port, bool is_read, uint32_t seq,
const void* buf, unsigned int size);
}
void DebugPortImpl::AddSession(std::string port_name, std::string destination,
- int main_port, int delegate_port) {
+ int main_read_fd, int main_write_fd,
+ int delegate_read_fd, int delegate_write_fd) {
std::lock_guard<std::recursive_mutex> lock(GetMutex());
sessions_.emplace_back(
- new Session(std::move(port_name), std::move(destination),
- main_port, delegate_port));
+ new Session(std::move(port_name), std::move(destination), main_read_fd,
+ main_write_fd, delegate_read_fd, delegate_write_fd));
}
void DebugPortImpl::RemoveSession(int port) {
std::lock_guard<std::recursive_mutex> lock(GetMutex());
auto iter = std::find_if(sessions_.begin(), sessions_.end(),
- [port](std::shared_ptr<Session>& sess) -> bool {
- return sess->GetMainPort() == port || sess->GetDelegatePort() == port;
- });
-
+ [port](std::shared_ptr<Session>& sess) -> bool {
+ return sess->GetMainReadFd() == port ||
+ sess->GetMainWriteFd() == port ||
+ sess->GetDelegateReadFd() == port ||
+ sess->GetDelegateWriteFd() == port;
+ });
if (iter != sessions_.end()) {
_W("Remove session. port(%d)", port);
iter = sessions_.erase(iter);
}
}
-std::shared_ptr<Session> DebugPortImpl::FindSession(int port) {
+std::shared_ptr<Session> DebugPortImpl::FindSession(int fd) {
std::lock_guard<std::recursive_mutex> lock(GetMutex());
for (auto& s : sessions_) {
- if (s->GetMainPort() == port || s->GetDelegatePort() == port)
+ if (s->GetMainReadFd() == fd || s->GetMainWriteFd() == fd ||
+ s->GetDelegateReadFd() == fd || s->GetDelegateWriteFd() == fd)
return s;
}
parcel.WriteInt64(time(nullptr));
parcel.WriteString(session->GetPortName().c_str());
parcel.WriteString(session->GetDestination().c_str());
- parcel.WriteBool(session->GetDelegatePort() == port);
+ parcel.WriteBool(session->GetDelegateReadFd() == port ||
+ session->GetDelegateWriteFd() == port);
parcel.WriteInt32(port);
parcel.WriteBool(is_read);
parcel.WriteInt32(seq);
if (fd < 0)
break;
- port_.reset(new Port(fd, "Debug"));
+ port_.reset(new Port(-1, fd, "Debug"));
if (Watch(fd) < 0)
break;
return -1;
std::lock_guard<std::recursive_mutex> lock(handle->GetMutex());
- handle->port_.reset(new Port(fd, "Debug"));
+ handle->port_.reset(new Port(-1, fd, "Debug"));
int ret = handle->Watch(fd);
if (ret < 0)
return -1;
}
void DebugPort::AddSession(std::string port_name, std::string destination,
- int main_port, int delegate_port) {
+ int main_read_fd, int main_write_fd,
+ int delegate_read_fd, int delegate_write_fd) {
impl.Init();
return impl.AddSession(std::move(port_name), std::move(destination),
- main_port, delegate_port);
+ main_read_fd, main_write_fd, delegate_read_fd,
+ delegate_write_fd);
}
void DebugPort::RemoveSession(int port) {
public:
static bool IsConnected();
static void AddSession(std::string port_name, std::string destination,
- int main_port, int delegate_port);
+ int main_read_fd, int main_write_fd,
+ int delegate_read_fd, int delegate_write_fd);
static void RemoveSession(int port);
static int Send(int port, bool is_read, uint32_t seq,
const void* buf, unsigned int size);
} // namespace
-Port::Port(int fd, std::string id)
- : fd_(fd), id_(std::move(id)), instance_(""), seq_(0) {
+Port::Port(int read_fd, int write_fd, std::string id)
+ : read_fd_(read_fd),
+ write_fd_(write_fd),
+ id_(std::move(id)),
+ instance_(""),
+ seq_(0) {
char uuid[37];
uuid_t u;
uuid_generate(u);
SetReceiveTimeout(10000);
}
-Port::Port(int fd, std::string id, std::string instance)
- : fd_(fd), id_(std::move(id)), instance_(std::move(instance)), seq_(0) {
+Port::Port(int read_fd, int write_fd, std::string id, std::string instance)
+ : read_fd_(read_fd),
+ write_fd_(write_fd),
+ id_(std::move(id)),
+ instance_(std::move(instance)),
+ seq_(0) {
SetReceiveTimeout(10000);
}
void Port::Disconnect() {
IgnoreIOEvent();
- std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
- if (fd_ > 0) {
- _W("Close fd(%d)", fd_);
- close(fd_);
- fd_ = -1;
+ {
+ std::lock_guard<std::recursive_mutex> lock(read_mutex_);
+ if (read_fd_ > 0) {
+ _W("Close read_fd(%d)", read_fd_);
+ close(read_fd_);
+ read_fd_ = -1;
+ }
+ }
+
+ {
+ std::lock_guard<std::recursive_mutex> lock(write_mutex_);
+ if (write_fd_ > 0) {
+ _W("Close write_fd(%d)", write_fd_);
+ close(write_fd_);
+ write_fd_ = -1;
+ }
}
}
char* buffer = static_cast<char*>(buf);
int flags;
- std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
- if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
- _E("Invalid fd(%d)", fd_); // LCOV_EXCL_LINE
+ std::lock_guard<std::recursive_mutex> lock(read_mutex_);
+ if (read_fd_ < 0 || read_fd_ >= sysconf(_SC_OPEN_MAX)) {
+ _E("Invalid fd(%d)", read_fd_); // LCOV_EXCL_LINE
return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
}
- flags = fcntl(fd_, F_GETFL, 0);
- fcntl(fd_, F_SETFL, flags & ~O_NONBLOCK);
+ flags = fcntl(read_fd_, F_GETFL, 0);
+ fcntl(read_fd_, F_SETFL, flags & ~O_NONBLOCK);
while (left) {
- nb = read(fd_, buffer, left);
+ nb = read(read_fd_, buffer, left);
if (nb == 0) {
- _E("read_socket: ...read EOF, socket closed %d: nb %zd\n", fd_, nb);
- fcntl(fd_, F_SETFL, flags);
+ _E("read_socket: ...read EOF, socket closed %d: nb %zd", read_fd_, nb);
+ fcntl(read_fd_, F_SETFL, flags);
return RPC_PORT_ERROR_IO_ERROR;
}
continue;
}
- _E("read_socket: ...error fd %d: errno %d\n", fd_, errno);
- fcntl(fd_, F_SETFL, flags);
+ _E("read_socket: ...error fd %d: errno %d", read_fd_, errno);
+ fcntl(read_fd_, F_SETFL, flags);
return RPC_PORT_ERROR_IO_ERROR;
}
bytes_read += nb;
}
- fcntl(fd_, F_SETFL, flags);
+ fcntl(read_fd_, F_SETFL, flags);
return RPC_PORT_ERROR_NONE;
}
.tv_usec = static_cast<suseconds_t>((timeout % 1000) * 1000)
};
socklen_t len = static_cast<socklen_t>(sizeof(struct timeval));
- int ret = setsockopt(fd_, SOL_SOCKET, SO_RCVTIMEO, &tv, len);
+ int ret = setsockopt(read_fd_, SOL_SOCKET, SO_RCVTIMEO, &tv, len);
if (ret < 0) {
ret = -errno;
_E("setsockopt() is failed. errno(%d)", errno);
bool Port::CanWrite() {
struct pollfd fds[1];
- fds[0].fd = fd_;
+ fds[0].fd = write_fd_;
fds[0].events = POLLOUT;
fds[0].revents = 0;
int ret = poll(fds, 1, 100);
if (ret <= 0) {
- _W("poll() is failed. fd(%d), error(%s)",
- fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
+ _W("poll() is failed. write_fd(%d), error(%s)",
+ write_fd_, ret == 0 ? "timed out" : std::to_string(-errno).c_str());
return false;
}
int Port::Write(const void* buf, unsigned int size) {
int sent_bytes = 0;
int ret;
- std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+ std::lock_guard<std::recursive_mutex> lock(write_mutex_);
if (queue_.empty()) {
ret = Write(buf, size, &sent_bytes);
int retry_cnt = 0;
const char* buffer = static_cast<const char*>(buf);
- if (fd_ < 0 || fd_ >= sysconf(_SC_OPEN_MAX)) {
- _E("Invalid fd(%d)", fd_); // LCOV_EXCL_LINE
+ if (write_fd_ < 0 || write_fd_ >= sysconf(_SC_OPEN_MAX)) {
+ _E("Invalid write_fd(%d)", write_fd_); // LCOV_EXCL_LINE
return PORT_STATUS_ERROR_IO_ERROR; // LCOV_EXCL_LINE
}
while (left && (retry_cnt < MAX_RETRY_CNT)) {
- nb = send(fd_, buffer, left, MSG_NOSIGNAL);
+ nb = send(write_fd_, buffer, left, MSG_NOSIGNAL);
if (nb == -1) {
if (errno == EINTR) {
// LCOV_EXCL_START
if (errno == EAGAIN || errno == EWOULDBLOCK)
return PORT_STATUS_ERROR_RESOURCE_UNAVAILABLE;
- _E("write_socket: ...error fd: %d, errno: %d", fd_, errno);
+ _E("write_socket: ...error fd: %d, errno: %d", write_fd_, errno);
return PORT_STATUS_ERROR_IO_ERROR;
}
}
if (left != 0) {
- _E("error fd %d: retry_cnt %d", fd_, retry_cnt);
+ _E("error fd %d: retry_cnt %d", write_fd_, retry_cnt);
return PORT_STATUS_ERROR_IO_ERROR;
}
return PORT_STATUS_ERROR_NONE;
}
-int Port::GetFd() const { return fd_; }
+void Port::SetReadFd(int read_fd) { read_fd_ = read_fd; }
+
+int Port::GetReadFd() const { return read_fd_; }
+
+void Port::SetWriteFd(int write_fd) { write_fd_ = write_fd; }
+
+int Port::GetWriteFd() const { return write_fd_; }
const std::string& Port::GetId() const { return id_; }
std::recursive_mutex& Port::GetMutex() const { return mutex_; }
+std::recursive_mutex& Port::GetReadMutex() const { return read_mutex_; }
+
+std::recursive_mutex& Port::GetWriteMutex() const { return write_mutex_; }
+
const std::string& Port::GetInstance() const { return instance_; }
uint32_t Port::GetSeq() { return ++seq_; }
return G_SOURCE_REMOVE;
}
- _W("Writing is now possible. fd: %d, id: %s",
- port->GetFd(), port->GetId().c_str());
- std::lock_guard<std::recursive_mutex> lock(port->rw_mutex_);
+ _W("Writing is now possible. fd: %d, id: %s", fd, port->GetId().c_str());
+ std::lock_guard<std::recursive_mutex> lock(port->write_mutex_);
if (port->source_ == nullptr) {
_E("GSource is destroyed");
return G_SOURCE_REMOVE;
// LCOV_EXCL_STOP
void Port::ClearQueue() {
- std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+ std::lock_guard<std::recursive_mutex> lock(write_mutex_);
while (queue_.empty() == false)
queue_.pop();
// LCOV_EXCL_START
void Port::IgnoreIOEvent() {
- std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+ std::lock_guard<std::recursive_mutex> lock(write_mutex_);
if (source_ && !g_source_is_destroyed(source_)) {
g_source_destroy(source_);
source_ = nullptr;
}
int Port::ListenIOEvent() {
- std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
- source_ = g_unix_fd_source_new(fd_, G_IO_OUT);
+ std::lock_guard<std::recursive_mutex> lock(write_mutex_);
+ source_ = g_unix_fd_source_new(write_fd_, G_IO_OUT);
if (source_ == nullptr) {
_E("g_unix_fd_source_new() is failed");
return RPC_PORT_ERROR_OUT_OF_MEMORY;
int Port::PopDelayedMessage() {
int sent_bytes = 0;
- std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+ std::lock_guard<std::recursive_mutex> lock(write_mutex_);
auto dm = queue_.front();
int ret = Write(dm->GetMessage(), dm->GetSize(), &sent_bytes);
}
int Port::PushDelayedMessage(std::shared_ptr<DelayedMessage> dm) {
- std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
+ std::lock_guard<std::recursive_mutex> lock(write_mutex_);
if (queue_.empty()) {
int ret = ListenIOEvent();
if (ret != RPC_PORT_ERROR_NONE)
class Port : public std::enable_shared_from_this<Port> {
public:
- Port(int fd, std::string id, std::string instance);
- Port(int fd, std::string id);
+ Port(int read_fd, int write_fd, std::string id, std::string instance);
+ Port(int read_fd, int write_fd, std::string id);
virtual ~Port();
virtual void Disconnect();
int Read(void* buf, unsigned int size);
int Write(const void* buf, unsigned int size);
int Write(const void* buf, unsigned int size, int* sent_bytes);
- int GetFd() const;
+ virtual void SetReadFd(int read_fd);
+ int GetReadFd() const;
+ virtual void SetWriteFd(int write_fd);
+ int GetWriteFd() const;
const std::string& GetId() const;
std::recursive_mutex& GetMutex() const;
+ std::recursive_mutex& GetReadMutex() const;
+ std::recursive_mutex& GetWriteMutex() const;
const std::string& GetInstance() const;
uint32_t GetSeq();
// LCOV_EXCL_STOP
private:
- int fd_;
+ int read_fd_;
+ int write_fd_;
std::string id_;
std::string instance_;
std::atomic<uint32_t> seq_;
mutable std::recursive_mutex mutex_;
- mutable std::recursive_mutex rw_mutex_;
+ mutable std::recursive_mutex read_mutex_;
+ mutable std::recursive_mutex write_mutex_;
std::queue<std::shared_ptr<DelayedMessage>> queue_;
size_t delayed_message_size_ = 0;
GSource* source_ = nullptr;
namespace internal {
namespace {
-constexpr const char kPortTypeMain[] = "main";
-constexpr const char kPortTypeDelegate[] = "delegate";
+constexpr const char kPortTypeMainRead[] = "main-0";
+constexpr const char kPortTypeMainWrite[] = "main-1";
+constexpr const char kPortTypeDelegateRead[] = "delegate-0";
+constexpr const char kPortTypeDelegateWrite[] = "delegate-1";
constexpr const char kDPrefix[] = "d::";
constexpr const char kUdPrefix[] = "ud::";
if (context_) g_main_context_unref(context_);
if (main_port_.get() != nullptr)
- DebugPort::RemoveSession(main_port_->GetFd()); // LCOV_EXCL_LINE
+ DebugPort::RemoveSession(main_port_->GetReadFd()); // LCOV_EXCL_LINE
listener_ = nullptr;
UnsetIdler();
Cancel();
}
-int Proxy::MainPortConnect(const std::string& instance, bool sync) {
- std::lock_guard<std::recursive_mutex> lock(GetMutex());
- fds_[0] = 0;
- main_client_.reset(ClientChannel::Create(this, port_path_));
- if (main_client_.get() == nullptr)
- return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
+int Proxy::CreateClientChannel(std::string instance, std::string port_type,
+ bool sync,
+ std::unique_ptr<ClientChannel>* channel) {
+ auto client = ClientChannel::Create(this, port_path_);
+ if (!client) return RPC_PORT_ERROR_IO_ERROR;
- Request request(instance.c_str(), kPortTypeMain);
- int ret = main_client_->Send(request);
- if (ret != 0) return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
+ Request request(std::move(instance), std::move(port_type));
+ int ret = client->Send(request);
+ if (ret != 0) return RPC_PORT_ERROR_IO_ERROR;
- main_client_->SetNonblock();
+ client->SetNonblock();
if (sync) {
Response response;
- ret = main_client_->Receive(&response);
- if (ret != 0) return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
+ ret = client->Receive(&response);
+ if (ret != 0) return RPC_PORT_ERROR_IO_ERROR;
- if (response.GetResult() != 0) {
- // LCOV_EXCL_START
+ if (response.GetResult() != 0) { // LCOV_EXCL_START
_E("Permission denied");
return RPC_PORT_ERROR_PERMISSION_DENIED;
- // LCOV_EXCL_STOP
- }
-
- fds_[0] = main_client_->RemoveFd();
+ } // LCOV_EXCL_STOP
} else {
- ret = main_client_->Watch();
- if (ret != 0) return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
- }
-
- return RPC_PORT_ERROR_NONE;
-}
-
-int Proxy::DelegatePortConnect(const std::string& instance, bool sync) {
- std::lock_guard<std::recursive_mutex> lock(GetMutex());
- fds_[1] = 0;
- delegate_client_.reset(ClientChannel::Create(this, port_path_));
- if (delegate_client_.get() == nullptr)
- return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
-
- Request request(instance.c_str(), kPortTypeDelegate);
- int ret = delegate_client_->Send(request);
- if (ret != 0) return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
-
- delegate_client_->SetNonblock();
- if (sync) {
- Response response;
- ret = delegate_client_->Receive(&response);
- if (ret != 0) return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
-
- if (response.GetResult() != 0) {
- // LCOV_EXCL_START
- _E("Permission denied");
- return RPC_PORT_ERROR_PERMISSION_DENIED;
- // LCOV_EXCL_STOP
- }
-
- fds_[1] = delegate_client_->RemoveFd();
- } else {
- ret = delegate_client_->Watch();
- if (ret != 0) return RPC_PORT_ERROR_IO_ERROR; // LCOV_EXCL_LINE
+ ret = client->Watch();
+ if (ret != 0) return RPC_PORT_ERROR_IO_ERROR;
}
+ channel->reset(client);
return RPC_PORT_ERROR_NONE;
}
int Proxy::Connect(bool sync) {
std::lock_guard<std::recursive_mutex> lock(GetMutex());
std::string instance = GenInstance();
- int ret = MainPortConnect(instance, sync);
+ int ret =
+ CreateClientChannel(instance, kPortTypeMainRead, sync, &main_client_[0]);
if (ret != RPC_PORT_ERROR_NONE) return ret;
- ret = DelegatePortConnect(instance, sync);
+ ret =
+ CreateClientChannel(instance, kPortTypeMainWrite, sync, &main_client_[1]);
+ if (ret != RPC_PORT_ERROR_NONE) return ret;
+
+ ret = CreateClientChannel(instance, kPortTypeDelegateRead, sync,
+ &delegate_client_[0]);
+ if (ret != RPC_PORT_ERROR_NONE) return ret;
+
+ ret = CreateClientChannel(instance, kPortTypeDelegateWrite, sync,
+ &delegate_client_[1]);
if (ret != RPC_PORT_ERROR_NONE) return ret;
if (sync) {
- main_port_.reset(new ProxyPort(fds_[0], target_appid_, this, false));
- delegate_port_.reset(new ProxyPort(fds_[1], target_appid_, this));
- DebugPort::AddSession(port_name_, target_appid_, fds_[0], fds_[1]);
+ int main_read_fd = main_client_[0]->RemoveFd();
+ int main_write_fd = main_client_[1]->RemoveFd();
+ main_port_.reset(
+ new ProxyPort(main_read_fd, main_write_fd, target_appid_, this, false));
+ int delegate_read_fd = delegate_client_[0]->RemoveFd();
+ int delegate_write_fd = delegate_client_[1]->RemoveFd();
+ delegate_port_.reset(new ProxyPort(delegate_read_fd, delegate_write_fd,
+ target_appid_, this));
+ DebugPort::AddSession(port_name_, target_appid_, main_read_fd,
+ main_write_fd, delegate_read_fd, delegate_write_fd);
listener_->OnConnected(target_appid_, main_port_.get());
}
void Proxy::DisconnectPort() {
std::lock_guard<std::recursive_mutex> lock(GetMutex());
if (main_port_.get() != nullptr) {
- DebugPort::RemoveSession(main_port_->GetFd());
+ DebugPort::RemoveSession(main_port_->GetReadFd());
main_port_.reset();
}
}
}
proxy->Cancel();
- proxy->main_client_.reset();
- proxy->delegate_client_.reset();
+ proxy->main_client_[0].reset();
+ proxy->main_client_[1].reset();
+ proxy->delegate_client_[0].reset();
+ proxy->delegate_client_[1].reset();
DestroyWeakPtr(proxy->conn_timer_data_);
proxy->conn_timer_data_ = nullptr;
// LCOV_EXCL_STOP
}
- if (!delegate_port_ || delegate_port_->GetFd() != fd) return;
+ if (!delegate_port_ || delegate_port_->GetReadFd() != fd) return;
char buffer[4];
if (recv(fd, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT) == 0) {
_W("Socket was disconnected by stub. fd(%d)", fd);
listener_ = nullptr;
if (main_port_.get() != nullptr) {
- DebugPort::RemoveSession(main_port_->GetFd());
+ DebugPort::RemoveSession(fd);
main_port_.reset();
}
}
_W("Socket was disconnected. fd(%d)", fd);
- main_client_.reset();
- delegate_client_.reset();
+ main_client_[0].reset();
+ main_client_[1].reset();
+ delegate_client_[0].reset();
+ delegate_client_[1].reset();
auto* listener = listener_;
listener_ = nullptr;
listener->OnDisconnected(target_appid_);
void Proxy::OnResponseReceived(int fd) {
std::lock_guard<std::recursive_mutex> lock(GetMutex());
UnsetConnTimer();
- if (listener_ == nullptr) {
- // LCOV_EXCL_START
+ if (listener_ == nullptr) { // LCOV_EXCL_START
_E("Invalid context");
return;
- // LCOV_EXCL_STOP
- }
+ } // LCOV_EXCL_STOP
+ bool is_read = false;
bool is_delegate = false;
- std::unique_ptr<ClientChannel> client = GetClient(fd, &is_delegate);
- if (!client) {
- // LCOV_EXCL_START
+ std::unique_ptr<ClientChannel> client = GetClient(fd, &is_read, &is_delegate);
+ if (!client) { // LCOV_EXCL_START
_E("Unknown fd(%d)", fd);
return;
- // LCOV_EXCL_STOP
- }
+ } // LCOV_EXCL_STOP
Response response;
int ret = client->Receive(&response);
_E("Permission denied");
auto* listener = listener_;
listener_ = nullptr;
- main_client_.reset();
- delegate_client_.reset();
+ main_client_[0].reset();
+ main_client_[1].reset();
+ delegate_client_[0].reset();
+ delegate_client_[1].reset();
listener->OnRejected(target_appid_, RPC_PORT_ERROR_PERMISSION_DENIED);
return;
}
client->SetNonblock();
int client_fd = client->RemoveFd();
- SetPort(client_fd, is_delegate);
+ SetPort(client_fd, is_read, is_delegate);
}
std::shared_ptr<Proxy> Proxy::GetSharedPtr() { return shared_from_this(); }
bool Proxy::HasRequested() const {
return listener_ != nullptr &&
- (!main_port_ || !delegate_port_ || main_port_->GetFd() > 0);
+ (!main_port_ || !delegate_port_ || main_port_->GetReadFd() > 0);
}
-std::unique_ptr<ClientChannel> Proxy::GetClient(int fd, bool* is_delegate) {
+std::unique_ptr<ClientChannel> Proxy::GetClient(int fd, bool* is_read,
+ bool* is_delegate) {
std::unique_ptr<ClientChannel> client;
- if (main_client_ && main_client_->GetFd() == fd) {
- client.reset(main_client_.release());
+ if (main_client_[0] && main_client_[0]->GetFd() == fd) {
+ *is_read = true;
*is_delegate = false;
- } else if (delegate_client_ && delegate_client_->GetFd() == fd) {
- client.reset(delegate_client_.release());
+ client = std::move(main_client_[0]);
+ } else if (main_client_[1] && main_client_[1]->GetFd() == fd) {
+ *is_read = false;
+ *is_delegate = false;
+ client = std::move(main_client_[1]);
+ } else if (delegate_client_[0] && delegate_client_[0]->GetFd() == fd) {
+ *is_read = true;
+ *is_delegate = true;
+ client = std::move(delegate_client_[0]);
+ } else if (delegate_client_[1] && delegate_client_[1]->GetFd() == fd) {
+ *is_read = false;
*is_delegate = true;
+ client = std::move(delegate_client_[1]);
}
return client;
}
-void Proxy::SetPort(int fd, bool is_delegate) {
+void Proxy::SetPort(int fd, bool is_read, bool is_delegate) {
if (is_delegate) {
- _W("[Delegate] fd=%d", fd);
- fds_[1] = fd;
- delegate_port_.reset(new ProxyPort(fds_[1], target_appid_, this));
+ if (!delegate_port_)
+ delegate_port_.reset(new ProxyPort(-1, -1, target_appid_, this));
+
+ if (is_read) {
+ _W("[DELEGATE] read_fd=%d", fd);
+ delegate_port_->SetReadFd(fd);
+ } else {
+ _W("[DELEGATE] write_fd=%d", fd);
+ delegate_port_->SetWriteFd(fd);
+ }
} else {
- _W("[Main] fd=%d", fd);
- fds_[0] = fd;
- main_port_.reset(new ProxyPort(fds_[0], target_appid_, this, false));
+ if (!main_port_)
+ main_port_.reset(new ProxyPort(-1, -1, target_appid_, this, false));
+
+ if (is_read) {
+ _W("[MAIN] read_fd=%d", fd);
+ main_port_->SetReadFd(fd);
+ } else {
+ _W("[MAIN] write_fd=%d", fd);
+ main_port_->SetWriteFd(fd);
+ }
}
- if (main_port_ && delegate_port_) {
- _W("target_appid=%s, port_name=%s, main_fd=%d, delegate_fd=%d",
- target_appid_.c_str(), port_name_.c_str(), fds_[0], fds_[1]);
- DebugPort::AddSession(port_name_, target_appid_, fds_[0], fds_[1]);
+ if (main_port_ && main_port_->GetReadFd() > 0 &&
+ main_port_->GetWriteFd() > 0 && delegate_port_ &&
+ delegate_port_->GetReadFd() > 0 && delegate_port_->GetWriteFd() > 0) {
+ _W("[CONNECTED] target_appid=%s, port_name=%s, main_fd=%d:%d, "
+ "delegate_fd=%d:%d",
+ target_appid_.c_str(), port_name_.c_str(), main_port_->GetReadFd(),
+ main_port_->GetWriteFd(), delegate_port_->GetReadFd(),
+ delegate_port_->GetWriteFd());
+ DebugPort::AddSession(port_name_, target_appid_, main_port_->GetReadFd(),
+ main_port_->GetWriteFd(), delegate_port_->GetReadFd(),
+ delegate_port_->GetWriteFd());
listener_->OnConnected(target_appid_, main_port_.get());
}
}
void SetRealAppId(const std::string& alias_appid);
std::recursive_mutex& GetMutex() const;
- int MainPortConnect(const std::string& instance, bool sync);
- int DelegatePortConnect(const std::string& instance, bool sync);
+ int CreateClientChannel(std::string instance, std::string port_type,
+ bool sync, std::unique_ptr<ClientChannel>* channel);
int Connect(bool sync);
bool WaitUntilPortCreation();
int Watch();
gpointer CreateWeakPtr();
static void DestroyWeakPtr(gpointer data);
bool HasRequested() const;
- std::unique_ptr<ClientChannel> GetClient(int fd, bool* is_delegate);
- void SetPort(int fd, bool is_delegate);
+ std::unique_ptr<ClientChannel> GetClient(int fd, bool* is_ready,
+ bool* is_delegate);
+ void SetPort(int fd, bool is_read, bool is_delegate);
private:
std::string port_name_;
IEventListener* listener_ = nullptr;
std::string target_appid_;
std::string real_appid_;
- int fds_[2];
- std::unique_ptr<ClientChannel> main_client_;
- std::unique_ptr<ClientChannel> delegate_client_;
+ std::unique_ptr<ClientChannel> main_client_[2];
+ std::unique_ptr<ClientChannel> delegate_client_[2];
gpointer conn_timer_data_ = nullptr;
gpointer idler_data_ = nullptr;
mutable std::recursive_mutex mutex_;
namespace rpc_port {
namespace internal {
-ProxyPort::ProxyPort(int fd, std::string id, IEvent* listener, bool is_delegate)
- : Port(fd, std::move(id)), listener_(listener) {
- Watch(is_delegate);
-}
+ProxyPort::ProxyPort(int read_fd, int write_fd, std::string id,
+ IEvent* listener, bool is_delegate)
+ : Port(read_fd, write_fd, std::move(id)),
+ listener_(listener),
+ is_delegate_(is_delegate) {}
ProxyPort::~ProxyPort() {
if (source_ && !g_source_is_destroyed(source_)) g_source_destroy(source_);
}
-void ProxyPort::Watch(bool is_delegate) {
+void ProxyPort::Watch(int fd) {
int cond = G_IO_ERR | G_IO_HUP | G_IO_NVAL;
- if (is_delegate) cond |= G_IO_IN;
+ if (IsDelegate() && GetReadFd() == fd) cond |= G_IO_IN;
- source_ = g_unix_fd_source_new(GetFd(), static_cast<GIOCondition>(cond));
+ source_ = g_unix_fd_source_new(fd, static_cast<GIOCondition>(cond));
if (source_ == nullptr) {
// LCOV_EXCL_START
_E("g_unix_fd_source_new() is failed");
g_source_unref(source_);
}
+bool ProxyPort::IsDelegate() const { return is_delegate_; }
+
+void ProxyPort::SetReadFd(int read_fd) {
+ Port::SetReadFd(read_fd);
+ Watch(read_fd);
+}
+
void ProxyPort::Disconnect() {
Port::Disconnect();
if (source_ && !g_source_is_destroyed(source_)) g_source_destroy(source_);
virtual void OnSocketDisconnected(int fd) = 0;
};
- ProxyPort(int fd, std::string id, IEvent* listener, bool is_delegate = true);
+ ProxyPort(int read_fd, int write_fd, std::string id, IEvent* listener,
+ bool is_delegate = true);
virtual ~ProxyPort();
+ bool IsDelegate() const;
void Disconnect() override;
+ void SetReadFd(int read_fd) override;
private:
- void Watch(bool is_delegate);
+ void Watch(int fd);
static gboolean UnixFdSourceFunc(gint fd, GIOCondition cond,
gpointer user_data);
private:
IEvent* listener_;
+ bool is_delegate_;
GSource* source_ = nullptr;
};
return RPC_PORT_ERROR_INVALID_PARAMETER;
auto port = static_cast<Port*>(h);
- std::shared_ptr<PeerCred> cred(PeerCred::Get(port->GetFd()));
+ std::shared_ptr<PeerCred> cred(PeerCred::Get(port->GetReadFd()));
if (cred.get() == nullptr)
return RPC_PORT_ERROR_IO_ERROR;
internal::Port* pt = static_cast<internal::Port*>(port);
{
- std::lock_guard<std::recursive_mutex> lock(pt->GetMutex());
+ std::lock_guard<std::recursive_mutex> lock(pt->GetReadMutex());
int ret = rpc_port_read(port, &len, 4);
if (ret != 0)
return ret;
internal::Port* pt = static_cast<internal::Port*>(port);
{
- std::lock_guard<std::recursive_mutex> lock(pt->GetMutex());
+ std::lock_guard<std::recursive_mutex> lock(pt->GetWriteMutex());
int ret = rpc_port_write(port, &len, sizeof(len));
if (ret != 0)
return ret;
}
if (DebugPort::IsConnected())
- DebugPort::Send(port->GetFd(), true, seq, buf, size);
+ DebugPort::Send(port->GetReadFd(), true, seq, buf, size);
return RPC_PORT_ERROR_NONE;
}
return ret;
if (DebugPort::IsConnected())
- DebugPort::Send(port->GetFd(), false, seq, buf, size);
+ DebugPort::Send(port->GetWriteFd(), false, seq, buf, size);
return RPC_PORT_ERROR_NONE;
}
namespace internal {
namespace {
-constexpr const char kPortTypeMain[] = "main";
-constexpr const char kPortTypeDelegate[] = "delegate";
+constexpr const char kPortTypeMainWrite[] = "main-0";
+constexpr const char kPortTypeMainRead[] = "main-1";
+constexpr const char kPortTypeDelegateWrite[] = "delegate-0";
+constexpr const char kPortTypeDelegateRead[] = "delegate-1";
constexpr uid_t kRegularUidMin = 5000;
} // namespace
for (auto& p : ports_) {
if (!p->IsDelegate())
- DebugPort::RemoveSession(p->GetFd());
+ DebugPort::RemoveSession(p->GetReadFd());
}
listener_ = nullptr;
return port_name_;
}
-void Stub::RemoveAcceptedPorts(std::string instance) {
+void Stub::RemoveAcceptedPorts(const std::string& instance) {
std::lock_guard<std::recursive_mutex> lock(GetMutex());
auto iter = ports_.begin();
while (iter != ports_.end()) {
if ((*iter)->GetInstance().compare(instance) == 0) {
- LOGI("Close: fd(%d)", (*iter)->GetFd());
- DebugPort::RemoveSession((*iter)->GetFd());
+ LOGI("Close: fd(%d)", (*iter)->GetReadFd());
+ DebugPort::RemoveSession((*iter)->GetReadFd());
iter = ports_.erase(iter);
} else {
iter++;
}
for (auto& p : ports_) {
- if (p->GetFd() == fd && !p->IsDelegate()) {
+ if (p->GetReadFd() == fd && !p->IsDelegate()) {
char buffer[4];
if (recv(fd, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT) == 0) {
_W("Socket was disconnected from proxy. fd(%d)", fd);
_W("Socket was disconnected. fd(%d)", fd);
for (auto& p : ports_) {
- if (p->GetFd() == fd) {
+ if (p->GetReadFd() == fd || p->GetWriteFd() == fd) {
listener_->OnDisconnected(p->GetId(), p->GetInstance());
RemoveAcceptedPorts(p->GetInstance());
Aul::NotifyRpcFinished();
void Stub::AddAcceptedPort(const std::string& sender_appid,
const std::string& instance, const std::string& port_type, int fd) {
std::lock_guard<std::recursive_mutex> lock(GetMutex());
- if (port_type == kPortTypeMain) {
+ if (port_type == kPortTypeMainWrite) {
ports_.emplace_back(
- new AcceptedPort(fd, sender_appid, instance, this, false));
- return;
+ new AcceptedPort(-1, fd, sender_appid, instance, this, false));
+ } else if (port_type == kPortTypeMainRead) {
+ auto port = FindPort(instance);
+ if (port) port->SetReadFd(fd);
+ } else if (port_type == kPortTypeDelegateWrite) {
+ ports_.emplace_back(
+ new AcceptedPort(-1, fd, sender_appid, instance, this, true));
+ } else if (port_type == kPortTypeDelegateRead) {
+ auto port = FindDelegatePort(instance);
+ if (port) port->SetReadFd(fd);
}
- ports_.emplace_back(new AcceptedPort(fd, sender_appid, instance, this, true));
+ auto main_port = FindPort(instance);
+ if (!main_port || main_port->GetReadFd() < 0 || main_port->GetWriteFd() < 0)
+ return;
- int main_fd = -1;
- for (auto& p : ports_) {
- if (p->GetId() == sender_appid && p->GetInstance() == instance &&
- p->GetFd() != fd) {
- main_fd = p->GetFd();
- break;
- }
- }
+ auto delegate_port = FindDelegatePort(instance);
+ if (!delegate_port || delegate_port->GetReadFd() < 0 ||
+ delegate_port->GetWriteFd() < 0)
+ return;
- _W("sender_appid(%s), instance(%s), main_fd(%d), delegate_fd(%d)",
- sender_appid.c_str(), instance.c_str(), main_fd, fd);
- DebugPort::AddSession(port_name_, sender_appid, main_fd, fd);
+ _W("[CONNECTED] sender_appid(%s), instance(%s), main_fd(%d:%d), "
+ "delegate_fd(%d:%d)",
+ sender_appid.c_str(), instance.c_str(), main_port->GetReadFd(),
+ main_port->GetWriteFd(), delegate_port->GetReadFd(),
+ delegate_port->GetWriteFd());
+ DebugPort::AddSession(port_name_, sender_appid, main_port->GetReadFd(),
+ main_port->GetWriteFd(), delegate_port->GetReadFd(),
+ delegate_port->GetWriteFd());
listener_->OnConnected(sender_appid, instance);
}
private:
void AddAcceptedPort(const std::string& sender_appid,
const std::string& instance, const std::string& port_type, int fd);
- void RemoveAcceptedPorts(std::string instance);
+ void RemoveAcceptedPorts(const std::string& instance);
std::recursive_mutex& GetMutex() const;
int GetFdFromSystemd();
int CreateServerSocket();