#include <sys/types.h>
#include <sys/socket.h>
#include <aul.h>
+#include <aul_rpc_port.h>
#include <dlog.h>
#include "fdbroker-internal.h"
return 0;
}
+int FdBroker::DBusMock::Watch(FdBroker::IEventWatcher* watcher,
+ const std::string& target_appid,
+ const std::string& port_name) {
+ watcher->OnPortAppeared(target_appid, port_name);
+ return 0;
+}
+
void FdBroker::DBusMock::Dispose() {
ports_.clear();
}
return socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, socks_);
}
- if (aul_request_message_port_socket_pair(socks_) != AUL_R_OK) {
+ if (aul_rpc_port_create_socket_pair(&socks_) != AUL_R_OK) {
LOGE("error create socket pair");
return -1;
}
}
FdBroker::~FdBroker() {
+ if (watcher_id_ > 0)
+ g_bus_unwatch_name(watcher_id_);
+
if (registration_id_ > 0) {
g_dbus_connection_unregister_object(
DBusConnectionManager::GetInst().GetConnection(),
return ac_;
}
+void FdBroker::OnNameAppeared(GDBusConnection *connection,
+ const gchar *name,
+ const gchar *name_owner,
+ gpointer user_data) {
+ FdBroker* broker = static_cast<FdBroker*>(user_data);
+ broker->watcher_->OnPortAppeared(broker->watch_appid_,
+ broker->watch_port_name_);
+}
+
+void FdBroker::OnNameVanished(GDBusConnection *connection,
+ const gchar *name,
+ gpointer user_data) {
+ FdBroker* broker = static_cast<FdBroker*>(user_data);
+ broker->watcher_->OnPortVanished(broker->watch_appid_,
+ broker->watch_port_name_);
+}
+
+int FdBroker::Watch(IEventWatcher* ev, const std::string& target_appid,
+ const std::string& port_name) {
+ int r;
+
+ if (watcher_ != nullptr)
+ return -1;
+
+ if (ev == nullptr)
+ return -1;
+
+ if (!mock_) {
+ r = aul_rpc_port_prepare_stub(target_appid.c_str(), port_name.c_str());
+ if (r != AUL_R_OK) {
+ LOGE("Failed to prepare stub %s:%s",
+ target_appid.c_str(), port_name.c_str());
+ return -1;
+ }
+ }
+
+ watcher_ = ev;
+ watch_appid_ = target_appid;
+ watch_port_name_ = port_name;
+
+ if (mock_) {
+ r = DBusMock::GetInst().Watch(ev, target_appid, port_name);
+ if (r < 0)
+ return -1;
+
+ return 0;
+ }
+
+ std::string interface_name = GetInterfaceName(target_appid, port_name);
+ watcher_id_ = g_bus_watch_name_on_connection(
+ DBusConnectionManager::GetInst().GetConnection(),
+ interface_name.c_str(),
+ G_BUS_NAME_WATCHER_FLAGS_NONE,
+ OnNameAppeared,
+ OnNameVanished,
+ this,
+ NULL);
+ if (watcher_id_ == 0) {
+ LOGE("Failed to watch connection(%s)", interface_name.c_str());
+ watcher_ = nullptr;
+ return -1;
+ }
+
+ return 0;
+}
+
} // namespace internal
} // namespace rpc_port
virtual void OnFdReceived(const std::string& sender, int fd) = 0;
};
+ class IEventWatcher {
+ public:
+ virtual void OnPortAppeared(const std::string& appid,
+ const std::string& port_name) = 0;
+ virtual void OnPortVanished(const std::string& appid,
+ const std::string& port_name) = 0;
+ };
+
FdBroker(bool mock = false) : mock_(mock) {}
~FdBroker();
int Send(const std::string& target_appid, const std::string& port_name);
int Listen(IEventListener* ev, const std::string& port_name);
AccessController& GetAccessController();
+ int Watch(IEventWatcher* ev, const std::string& target_appid,
+ const std::string& port_name);
private:
class DBusConnectionManager {
int Send(const std::string& sender, const std::string& port, int fd);
int AddListener(const std::string& port,
FdBroker::IEventListener* listener);
+ int Watch(FdBroker::IEventWatcher* watcher, const std::string& target_appid,
+ const std::string& port_name);
void Dispose();
private:
void ReceiveMessage(GVariant* parameters, GDBusMethodInvocation* invocation);
std::string GetInterfaceName(const std::string& target_appid,
const std::string& port_name);
+ static void OnNameAppeared(GDBusConnection *connection,
+ const gchar *name,
+ const gchar *name_owner,
+ gpointer user_data);
+ static void OnNameVanished(GDBusConnection *connection,
+ const gchar *name,
+ gpointer user_data);
private:
IEventListener* listener_ = nullptr;
int registration_id_ = 0;
bool mock_;
AccessController ac_;
+ IEventWatcher* watcher_ = nullptr;
+ guint watcher_id_ = 0;
+ std::string watch_appid_;
+ std::string watch_port_name_;
};
} // namespace internal
return 0;
}
-void Proxy::Connect(const std::string appid, const std::string& port_name,
- IEventListener* ev) {
- if (ev == nullptr || listener_ != nullptr)
+void Proxy::OnPortAppeared(const std::string& appid,
+ const std::string& port_name) {
+ LOGD("endpoint(%s), port_name(%s)", appid.c_str(), port_name.c_str());
+
+ if (listener_ == nullptr)
return;
- listener_ = ev;
- target_appid_ = appid;
- port_name_ = port_name;
int fd = fd_broker_.Send(appid, port_name);
-
if (fd <= 0) {
listener_->OnRejected(appid);
listener_ = nullptr;
Watch(fd);
}
+void Proxy::OnPortVanished(const std::string& appid,
+ const std::string& port_name) {
+ LOGD("endpoint(%s), port_name(%s)", appid.c_str(), port_name.c_str());
+}
+
+void Proxy::Connect(const std::string appid, const std::string& port_name,
+ IEventListener* ev) {
+ if (ev == nullptr || listener_ != nullptr)
+ return;
+
+ listener_ = ev;
+ target_appid_ = appid;
+ port_name_ = port_name;
+ int r = fd_broker_.Watch(this, appid, port_name);
+ if (r < 0)
+ listener_ = nullptr;
+}
+
} // namespace internal
-} // namespace rpc_port
\ No newline at end of file
+} // namespace rpc_port
namespace rpc_port {
namespace internal {
-class Proxy {
+class Proxy : public FdBroker::IEventWatcher {
public:
Proxy(bool mock = false);
virtual ~Proxy();
static gboolean OnDataReceived(GIOChannel *gio, GIOCondition cond,
gpointer data);
int Watch(int fd);
+ void OnPortAppeared(const std::string& appid,
+ const std::string& port_name) override;
+ void OnPortVanished(const std::string& appid,
+ const std::string& port_name) override;
private:
std::string port_name_;
#include <sys/types.h>
#include <sys/socket.h>
#include <dlog.h>
+#include <aul.h>
+#include <aul_rpc_port.h>
#include "stub-internal.h"
LOGW("Socket was disconnected from proxy");
stub->listener_->OnDisconnected(p->GetId());
stub->ports_.remove(p);
+
+ if (aul_rpc_port_notify_rpc_finished() != AUL_R_OK)
+ LOGW("Failed to notify rpc finished");
+
return FALSE;
}
LOGW("Invalid protocol");
stub->listener_->OnDisconnected(p->GetId());
stub->ports_.remove(p);
+
+ if (aul_rpc_port_notify_rpc_finished() != AUL_R_OK)
+ LOGW("Failed to notify rpc finished");
+
return FALSE;
}
}
}
+ if (aul_rpc_port_notify_rpc_finished() != AUL_R_OK)
+ LOGW("Failed to notify rpc finished");
+
return FALSE;
}