class StubExt : public Stub, public Stub::IEventListener {
public:
- explicit StubExt(const std::string& port) : Stub(port) {}
+ explicit StubExt(const std::string& port) : Stub(port), destroying_(false) {}
virtual ~StubExt() = default;
void AddConnectedEventListener(rpc_port_stub_connected_event_cb cb,
void OnConnected(const std::string& sender,
const std::string& instance) override {
+ if (IsDestroying())
+ return;
+
for (auto& ev : connected_events_) {
ev->cb_(sender.c_str(), instance.c_str(), ev->user_data_);
}
void OnDisconnected(const std::string& sender,
const std::string& instance) override {
+ if (IsDestroying())
+ return;
+
for (auto& ev : disconnected_events_) {
ev->cb_(sender.c_str(), instance.c_str(), ev->user_data_);
}
int OnReceived(const std::string& sender,
const std::string& instance, Port* port) override {
+ if (IsDestroying())
+ return -1;
+
for (auto& ev : received_events_) {
int ret = ev->cb_(sender.c_str(), instance.c_str(), port,
ev->user_data_);
return 0;
}
+ void SetDestroying(bool destroying) {
+ destroying_ = destroying;
+ }
+
+ bool IsDestroying() {
+ return destroying_;
+ }
+
std::recursive_mutex& GetMutex() const {
return mutex_;
}
private:
+ std::atomic<bool> destroying_;
std::list<std::unique_ptr<Event<rpc_port_stub_connected_event_cb>>>
connected_events_;
std::list<std::unique_ptr<Event<rpc_port_stub_disconnected_event_cb>>>
_W("rpc_port_stub_destroy(%p)", h);
auto p = static_cast<::StubExt*>(h);
aul_rpc_port_usr_destroy(p->GetPortName().c_str(), rpc_port_get_target_uid());
- delete p;
+ p->Ignore();
+ p->SetDestroying(true);
+
+ g_idle_add_full(G_PRIORITY_HIGH,
+ [](gpointer data) -> gboolean {
+ auto p = static_cast<::StubExt*>(data);
+ _W("rpc_port_stub_destroy(%p)", p);
+ delete p;
+ return G_SOURCE_REMOVE;
+ }, h, nullptr);
return RPC_PORT_ERROR_NONE;
}
return server_->Listen();
}
+void Stub::Ignore() {
+ std::lock_guard<std::recursive_mutex> lock(GetMutex());
+ listener_ = nullptr;
+}
+
void Stub::AddPrivilege(const std::string& privilege) {
std::lock_guard<std::recursive_mutex> lock(GetMutex());
access_controller_.AddPrivilege(privilege);
}
}
-gboolean Stub::OnDataReceived(GIOChannel* channel, GIOCondition cond,
- gpointer user_data) {
- Stub* stub = static_cast<Stub*>(user_data);
+gboolean Stub::AcceptedPort::OnDataReceived(GIOChannel* channel,
+ GIOCondition cond, gpointer user_data) {
+ auto* stub = static_cast<Stub*>(user_data);
std::lock_guard<std::recursive_mutex> lock(stub->GetMutex());
auto* listener = stub->listener_;
if (listener == nullptr) {
listener->OnDisconnected(p->GetId(), p->GetInstance());
stub->RemoveAcceptedPorts(p->GetInstance());
Aul::NotifyRpcFinished();
- return G_SOURCE_REMOVE;
+ return G_SOURCE_CONTINUE;
}
int ret = stub->listener_->OnReceived(p->GetId(), p->GetInstance(),
listener->OnDisconnected(p->GetId(), p->GetInstance());
stub->RemoveAcceptedPorts(p->GetInstance());
Aul::NotifyRpcFinished();
- return G_SOURCE_REMOVE;
+ return G_SOURCE_CONTINUE;
}
break;
return G_SOURCE_CONTINUE;
}
-gboolean Stub::OnSocketDisconnected(GIOChannel* channel, GIOCondition cond,
- gpointer user_data) {
+gboolean Stub::AcceptedPort::OnSocketDisconnected(GIOChannel* channel,
+ GIOCondition cond, gpointer user_data) {
auto* stub = static_cast<Stub*>(user_data);
std::lock_guard<std::recursive_mutex> lock(stub->GetMutex());
auto* listener = stub->listener_;
}
Stub::AcceptedPort::~AcceptedPort() {
- if (disconn_src_ > 0)
- g_source_remove(disconn_src_);
+ if (disconn_source_ > 0)
+ g_source_remove(disconn_source_);
- if (src_ > 0)
- g_source_remove(src_);
+ if (source_ > 0)
+ g_source_remove(source_);
if (channel_ != nullptr)
g_io_channel_unref(channel_);
return -1;
}
- disconn_src_ = g_io_add_watch(channel_,
+ disconn_source_ = g_io_add_watch(channel_,
static_cast<GIOCondition>(G_IO_ERR | G_IO_HUP | G_IO_NVAL),
- Stub::OnSocketDisconnected, parent_);
- if (disconn_src_ == 0) {
+ OnSocketDisconnected, parent_);
+ if (disconn_source_ == 0) {
_E("g_io_add_watch() is failed");
return -1;
}
if (!receive)
return 0;
- src_ = g_io_add_watch(channel_, static_cast<GIOCondition>(G_IO_IN),
- Stub::OnDataReceived, parent_);
- if (src_ == 0) {
+ source_ = g_io_add_watch(channel_, static_cast<GIOCondition>(G_IO_IN),
+ OnDataReceived, parent_);
+ if (source_ == 0) {
_E("g_io_add_watch() is failed");
return -1;
}
gboolean Stub::Server::OnRequestReceived(GIOChannel* channel, GIOCondition cond,
gpointer user_data) {
- Stub* stub = static_cast<Stub*>(user_data);
+ auto* stub = static_cast<Stub*>(user_data);
std::lock_guard<std::recursive_mutex> lock(stub->GetMutex());
- if (stub->server_.get() == nullptr) {
+ if (stub->listener_ == nullptr) {
_E("Invalid context");
return G_SOURCE_REMOVE;
}
virtual ~Stub();
int Listen(IEventListener* ev, int fd);
+ void Ignore();
void AddPrivilege(const std::string& privilege);
void SetTrusted(const bool trusted);
std::shared_ptr<Port> FindPort(const std::string& instance) const;
private:
int Watch(bool receive);
+ static gboolean OnDataReceived(GIOChannel* channel, GIOCondition cond,
+ gpointer user_data);
+ static gboolean OnSocketDisconnected(GIOChannel* channel, GIOCondition cond,
+ gpointer user_data);
private:
GIOChannel* channel_ = nullptr;
- int disconn_src_ = 0;
- int src_ = 0;
+ guint disconn_source_ = 0;
+ guint source_ = 0;
Stub* parent_;
bool is_delegate_ = false;
};
guint source_ = 0;;
};
- static gboolean OnDataReceived(GIOChannel* channel, GIOCondition cond,
- gpointer user_data);
- static gboolean OnSocketDisconnected(GIOChannel* channel, GIOCondition cond,
- gpointer user_data);
void AddAcceptedPort(const std::string& sender_appid,
const std::string& instance, const std::string& port_type, int fd);
void RemoveAcceptedPorts(std::string instance);