namespace rpc_port {
namespace internal {
-Port::Port(int fd, const std::string& id)
- : fd_(fd), id_(id) {
+Port::Port(int fd, std::string id)
+ : fd_(fd), id_(std::move(id)) {
char uuid[37];
uuid_t u;
uuid_generate(u);
instance_ = std::string(uuid) + ":" + id;
}
+Port::Port(int fd, std::string id, std::string instance)
+ : fd_(fd), id_(std::move(id)), instance_(std::move(instance)) {}
+
Port::~Port() {
close(fd_);
}
class Port {
public:
- Port(int fd, const std::string& id);
+ Port(int fd, std::string id, std::string instance);
+ Port(int fd, std::string id);
virtual ~Port();
int Read(void* buf, unsigned int size);
int fd = g_io_channel_unix_get_fd(gio);
char buffer[4];
- if (proxy->main_port_.get()->GetFd() == fd) {
+ if (proxy->delegate_port_.get()->GetFd() == fd) {
if (recv(fd, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT) == 0) {
LOGW("Socket was disconnected by stub. fd(%d)", fd);
IEventListener* listener = proxy->listener_;
proxy->listener_ = nullptr;
- proxy->main_port_.get()->SetSource(0);
+ proxy->delegate_port_.get()->SetSource(0);
if (listener)
listener->OnDisconnected(proxy->target_appid_);
return FALSE;
}
LOGW("[__OnPortAppeared__] fds[0]: %d, fds[1]: %d", fds[0], fds[1]);
- main_port_.reset(new ProxyPort(this, fds[0], appid));
+ main_port_.reset(new ProxyPort(this, fds[0], appid, false));
delegate_port_.reset(new ProxyPort(this, fds[1], appid));
listener_->OnConnected(appid, main_port_.get());
}
return RPC_PORT_ERROR_NONE;
}
-Proxy::ProxyPort::ProxyPort(Proxy* parent, int fd, const std::string& id)
- : Port(fd, id), parent_(parent) {
- Watch();
+Proxy::ProxyPort::ProxyPort(Proxy* parent, int fd, const std::string& id,
+ bool receive) : Port(fd, id), parent_(parent) {
+ Watch(receive);
}
Proxy::ProxyPort::~ProxyPort() {
g_io_channel_unref(gioc_);
}
-int Proxy::ProxyPort::Watch() {
+int Proxy::ProxyPort::Watch(bool receive) {
char buf[1024];
int fd = GetFd();
return -1;
}
+ if (!receive)
+ return 0;
+
src_ = g_io_add_watch(gioc_,
(GIOCondition)(G_IO_IN),
Proxy::OnDataReceived, parent_);
private:
class ProxyPort : public Port {
public:
- ProxyPort(Proxy* parent, int fd, const std::string& id);
+ ProxyPort(Proxy* parent, int fd, const std::string& id, bool receive = true);
virtual ~ProxyPort();
void SetDisconnectedSource(int sourceId);
void SetSource(int sourceId);
private:
- int Watch();
+ int Watch(bool receive);
private:
GIOChannel* gioc_ = nullptr;
return {};
}
-
gboolean Stub::OnDataReceived(GIOChannel *gio, GIOCondition cond,
gpointer data) {
Stub* stub = static_cast<Stub*>(data);
void Stub::OnFdReceived(const std::string& sender, int fds[2]) {
LOGW("[__OnFdReceived__] fds[0]: %d, fds[1]: %d", fds[0], fds[1]);
- ports_.emplace_back(new AcceptedPort(this, false, fds[0], sender));
- ports_.emplace_back(new AcceptedPort(this, true, fds[1], sender));
+ auto* main_port = new AcceptedPort(this, false, fds[0], sender, true);
+ ports_.emplace_back(main_port);
+ ports_.emplace_back(new AcceptedPort(this, true, fds[1], sender,
+ main_port->GetInstance(), false));
for (auto& p : ports_) {
if (p->GetFd() == fds[0]) {
listener_->OnConnected(p->GetId(), p->GetInstance());
}
}
-Stub::AcceptedPort::AcceptedPort(Stub* parent, bool isDelegate,
- int fd, const std::string& id)
- : Port(fd, id), parent_(parent), is_delegate_(isDelegate) {
- Watch();
+Stub::AcceptedPort::AcceptedPort(Stub* parent, bool isDelegate, int fd,
+ std::string id, std::string inst, bool watch)
+ : Port(fd, std::move(id), std::move(inst)), parent_(parent),
+ is_delegate_(isDelegate) {
+ Watch(watch);
+}
+
+Stub::AcceptedPort::AcceptedPort(Stub* parent, bool isDelegate, int fd,
+ std::string id, bool watch)
+ : Port(fd, std::move(id)), parent_(parent),
+ is_delegate_(isDelegate) {
+ Watch(watch);
}
Stub::AcceptedPort::~AcceptedPort() {
g_io_channel_unref(gioc_);
}
-int Stub::AcceptedPort::Watch() {
+int Stub::AcceptedPort::Watch(bool receive) {
char buf[1024];
int fd = GetFd();
return -1;
}
+ if (!receive)
+ return 0;
+
src_ = g_io_add_watch(gioc_,
(GIOCondition)(G_IO_IN),
Stub::OnDataReceived, parent_);
private:
class AcceptedPort : public Port {
public:
- AcceptedPort(Stub* parent, bool isDelegate, int fd, const std::string& id);
+ AcceptedPort(Stub* parent, bool isDelegate, int fd, std::string id,
+ std::string inst, bool receive);
+ AcceptedPort(Stub* parent, bool isDelegate, int fd, std::string id,
+ bool receive);
virtual ~AcceptedPort();
bool IsDelegate() const {
return is_delegate_;
}
private:
- int Watch();
+ int Watch(bool receive);
private:
GIOChannel* gioc_ = nullptr;
rpc_port_h port, void *data) -> int {
RpcPortConnection* p = static_cast<RpcPortConnection*>(data);
p->stub_port_ = port;
+ rpc_port_stub_get_port(p->stub_handle_, RPC_PORT_PORT_CALLBACK,
+ instance, &p->stub_callback_port_);
+
p->Finish();
return 0;
}, this);
[](const char *ep, const char *port_name, rpc_port_h port, void *data) {
RpcPortConnection* p = static_cast<RpcPortConnection*>(data);
p->proxy_port_ = port;
+ rpc_port_proxy_get_port(p->proxy_handle_, RPC_PORT_PORT_CALLBACK,
+ &p->proxy_callback_port_);
}, this);
ASSERT_EQ(ret, 0);
}
rpc_port_h proxy_port_ = nullptr;
+ rpc_port_h proxy_callback_port_ = nullptr;
rpc_port_h stub_port_ = nullptr;
+ rpc_port_h stub_callback_port_ = nullptr;
bool touch_proxy_disconnected_event_cb_ = false;
bool touch_stub_disconnected_event_cb_ = false;
bool touch_proxy_received_event_cb_ = false;
RunMainLoop();
ASSERT_NE(stub_port_, nullptr);
+ ASSERT_NE(stub_callback_port_, nullptr);
- ret = rpc_port_write(stub_port_, res, sizeof(res));
+ ret = rpc_port_write(stub_callback_port_, res, sizeof(res));
ASSERT_EQ(ret, 0);
+
RunMainLoop();
ASSERT_TRUE(touch_proxy_received_event_cb_);
- ret = rpc_port_read(proxy_port_, r_buf, sizeof(res));
+ ret = rpc_port_read(proxy_callback_port_, r_buf, sizeof(res));
ASSERT_EQ(ret, 0);
ASSERT_STREQ(res, r_buf);
touch_proxy_received_event_cb_ = false;
- ret = rpc_port_write(stub_port_, res, sizeof(res));
+ ret = rpc_port_write(stub_callback_port_, res, sizeof(res));
ASSERT_EQ(ret, 0);
+
RunMainLoop();
ASSERT_TRUE(touch_proxy_received_event_cb_);
- ret = rpc_port_read(proxy_port_, r_buf, sizeof(res));
+ ret = rpc_port_read(proxy_callback_port_, r_buf, sizeof(res));
ASSERT_EQ(ret, 0);
ASSERT_STREQ(res, r_buf);
}