auto port = ptr->lock();
if (port == nullptr) {
_E("port is destructed");
- return G_SOURCE_CONTINUE;
+ return G_SOURCE_REMOVE;
}
std::lock_guard<std::recursive_mutex> lock(port->rw_mutex_);
- if (port->queue_.empty()) {
- port->IgnoreIOEvent();
- return G_SOURCE_CONTINUE;
+ if (port->source_id_ == 0) {
+ _E("GSource is destroyed");
+ return G_SOURCE_REMOVE;
}
- if (port->source_ == nullptr || g_source_is_destroyed(port->source_)) {
- _E("GSource(%p) is destroyed", port->source_);
+ if (port->queue_.empty()) {
port->IgnoreIOEvent();
return G_SOURCE_CONTINUE;
}
void Port::IgnoreIOEvent() {
std::lock_guard<std::recursive_mutex> lock(rw_mutex_);
- if (source_ != nullptr && !g_source_is_destroyed(source_))
- g_source_destroy(source_);
+ if (source_id_ != 0) {
+ GSource* source = g_main_context_find_source_by_id(
+ MessageSendingThread::GetInst().GetContext(), source_id_);
+ if (source != nullptr && !g_source_is_destroyed(source))
+ g_source_destroy(source);
- source_ = nullptr;
+ source_id_ = 0;
+ }
if (channel_ != nullptr) {
g_io_channel_unref(channel_);
return RPC_PORT_ERROR_OUT_OF_MEMORY;
}
- source_ = g_io_create_watch(channel_, static_cast<GIOCondition>(G_IO_OUT));
- if (source_ == nullptr) {
+ GSource* source = g_io_create_watch(channel_,
+ static_cast<GIOCondition>(G_IO_OUT));
+ if (source == nullptr) {
_E("Failed to create GSource");
IgnoreIOEvent();
return RPC_PORT_ERROR_OUT_OF_MEMORY;
}
auto* ptr = new (std::nothrow) std::weak_ptr<Port>(shared_from_this());
- g_source_set_callback(source_, reinterpret_cast<GSourceFunc>(OnEventReceived),
+ g_source_set_callback(source, reinterpret_cast<GSourceFunc>(OnEventReceived),
static_cast<gpointer>(ptr), [](gpointer ptr) {
auto* port = static_cast<std::weak_ptr<Port>*>(ptr);
delete port;
});
- g_source_set_priority(source_, G_PRIORITY_DEFAULT);
- g_source_attach(source_, MessageSendingThread::GetInst().GetContext());
- g_source_unref(source_);
+ g_source_set_priority(source, G_PRIORITY_DEFAULT);
+ source_id_ = g_source_attach(source,
+ MessageSendingThread::GetInst().GetContext());
+ g_source_unref(source);
return RPC_PORT_ERROR_NONE;
}