return handle;
}
-std::shared_ptr<AittTransport> ModuleLoader::LoadTransport(void *handle, const std::string &ip,
+std::unique_ptr<AittTransport> ModuleLoader::LoadTransport(void *handle, const std::string &ip,
AittDiscovery &discovery)
{
if (handle == nullptr) {
ERR("handle is NULL");
- return std::shared_ptr<AittTransport>(new NullTransport(ip.c_str(), discovery));
+ return std::unique_ptr<AittTransport>(new NullTransport(ip.c_str(), discovery));
}
AittTransport::ModuleEntry get_instance_fn = reinterpret_cast<AittTransport::ModuleEntry>(
dlsym(handle, AittTransport::MODULE_ENTRY_NAME));
if (get_instance_fn == nullptr) {
ERR("dlsym: %s", dlerror());
- return std::shared_ptr<AittTransport>(new NullTransport(ip.c_str(), discovery));
+ return std::unique_ptr<AittTransport>(new NullTransport(ip.c_str(), discovery));
}
- std::shared_ptr<AittTransport> instance(
- static_cast<AittTransport *>(get_instance_fn(ip.c_str(), discovery)),
- [](const AittTransport *instance) -> void { delete instance; });
+ std::unique_ptr<AittTransport> instance(
+ static_cast<AittTransport *>(get_instance_fn(ip.c_str(), discovery)));
if (instance == nullptr) {
- ERR("Failed to create a new instance");
- return std::shared_ptr<AittTransport>(new NullTransport(ip.c_str(), discovery));
+ ERR("get_instance_fn(AittTransport) Fail");
+ return std::unique_ptr<AittTransport>(new NullTransport(ip.c_str(), discovery));
}
return instance;
}
-std::shared_ptr<MQ> ModuleLoader::LoadMqttClient(void *handle, const std::string &id,
+std::unique_ptr<MQ> ModuleLoader::LoadMqttClient(void *handle, const std::string &id,
bool clear_session)
{
MQ::ModuleEntry get_instance_fn =
throw AittException(AittException::SYSTEM_ERR);
}
- std::shared_ptr<MQ> instance(static_cast<MQ *>(get_instance_fn(id.c_str(), clear_session)),
- [](const MQ *instance) { delete instance; });
+ std::unique_ptr<MQ> instance(static_cast<MQ *>(get_instance_fn(id.c_str(), clear_session)));
if (instance == nullptr) {
- ERR("Failed to create a new instance");
+ ERR("get_instance_fn(MQ) Fail");
throw AittException(AittException::SYSTEM_ERR);
}
mq(new MQProxy(id, clear_session, custom_broker)),
discovery(id, custom_broker),
reply_id(0),
- modules{0}
+ transports{0}
{
// TODO: Validate my_ip
ModuleLoader loader;
for (ModuleLoader::Type i = ModuleLoader::TYPE_TCP; i < ModuleLoader::TYPE_TRANSPORT_MAX;
i = ModuleLoader::Type(i + 1)) {
- ModuleLoader::ModuleHandle handle = loader.OpenModule(i);
+ module_handles.push_back(loader.OpenModule(i));
+ const ModuleLoader::ModuleHandle &handle = module_handles.back();
if (handle == nullptr)
ERR("OpenModule() Fail");
- modules[i] = new ModuleObj(std::move(handle),
- loader.LoadTransport(handle.get(), my_ip, discovery));
+ transports[i] = loader.LoadTransport(handle.get(), my_ip, discovery);
}
aittThread = std::thread(&AITT::Impl::ThreadMain, this);
}
if (aittThread.joinable())
aittThread.join();
-
- for (ModuleLoader::Type i = ModuleLoader::TYPE_TCP; i < ModuleLoader::TYPE_TRANSPORT_MAX;
- i = ModuleLoader::Type(i + 1)) {
- delete modules[i];
- }
}
void AITT::Impl::ThreadMain(void)
mq->Unsubscribe(subscribe_info->second);
break;
case AITT_TYPE_TCP:
- GetTransport(ModuleLoader::TYPE_TCP)->Unsubscribe(subscribe_info->second);
+ transports[ModuleLoader::TYPE_TCP]->Unsubscribe(subscribe_info->second);
break;
case AITT_TYPE_WEBRTC:
- GetTransport(ModuleLoader::TYPE_WEBRTC)->Unsubscribe(subscribe_info->second);
+ transports[ModuleLoader::TYPE_WEBRTC]->Unsubscribe(subscribe_info->second);
break;
default:
subscribed_list.clear();
}
-inline std::shared_ptr<AittTransport> AITT::Impl::GetTransport(ModuleLoader::Type type)
-{
- return modules[type]->second;
-}
-
void AITT::Impl::ConfigureTransportModule(const std::string &key, const std::string &value,
AittProtocol protocols)
{
mq->Publish(topic, data, datalen, qos, retain);
if ((protocols & AITT_TYPE_TCP) == AITT_TYPE_TCP)
- GetTransport(ModuleLoader::TYPE_TCP)->Publish(topic, data, datalen, qos, retain);
+ transports[ModuleLoader::TYPE_TCP]->Publish(topic, data, datalen, qos, retain);
if ((protocols & AITT_TYPE_WEBRTC) == AITT_TYPE_WEBRTC)
PublishWebRtc(topic, data, datalen, qos, retain);
void AITT::Impl::PublishWebRtc(const std::string &topic, const void *data, const size_t datalen,
AittQoS qos, bool retain)
{
- auto webrtcModule = GetTransport(ModuleLoader::TYPE_WEBRTC);
flexbuffers::Builder fbb;
fbb.Map([=, &fbb]() {
fbb.String("Id", id_ + WEBRTC_ID_POSTFIX);
});
fbb.Finish();
auto buf = fbb.GetBuffer();
- webrtcModule->Publish(topic, buf.data(), buf.size(), qos, retain);
+ transports[ModuleLoader::TYPE_WEBRTC]->Publish(topic, buf.data(), buf.size(), qos, retain);
}
AittSubscribeID AITT::Impl::Subscribe(const std::string &topic, const AITT::SubscribeCallback &cb,
user_data = mq->Unsubscribe(found_info->second);
break;
case AITT_TYPE_TCP: {
- auto tcpModule = GetTransport(ModuleLoader::TYPE_TCP);
- user_data = tcpModule->Unsubscribe(found_info->second);
+ user_data = transports[ModuleLoader::TYPE_TCP]->Unsubscribe(found_info->second);
break;
}
case AITT_TYPE_WEBRTC: {
- auto webrtcModule = GetTransport(ModuleLoader::TYPE_WEBRTC);
- user_data = webrtcModule->Unsubscribe(found_info->second);
+ user_data = transports[ModuleLoader::TYPE_WEBRTC]->Unsubscribe(found_info->second);
break;
}
default:
void *AITT::Impl::SubscribeTCP(SubscribeInfo *handle, const std::string &topic,
const SubscribeCallback &cb, void *user_data, AittQoS qos)
{
- auto tcpModule = GetTransport(ModuleLoader::TYPE_TCP);
- return tcpModule->Subscribe(
+ return transports[ModuleLoader::TYPE_TCP]->Subscribe(
topic,
[handle, cb](const std::string &topic, const void *data, const size_t datalen,
void *user_data, const std::string &correlation) -> void {
void *AITT::Impl::SubscribeWebRtc(SubscribeInfo *handle, const std::string &topic,
const SubscribeCallback &cb, void *user_data, AittQoS qos)
{
- auto webrtc_module = GetTransport(ModuleLoader::TYPE_WEBRTC);
flexbuffers::Builder fbb;
fbb.Map([=, &fbb]() {
fbb.String("Id", id_ + WEBRTC_ID_POSTFIX);
fbb.Finish();
auto buf = fbb.GetBuffer();
- return webrtc_module->Subscribe(
+ return transports[ModuleLoader::TYPE_WEBRTC]->Subscribe(
topic,
[handle, cb](const std::string &topic, const void *data, const size_t datalen,
void *user_data, const std::string &correlation) -> void {
private:
using Blob = std::pair<const void *, int>;
using SubscribeInfo = std::pair<AittProtocol, void *>;
- using ModuleObj = std::pair<ModuleLoader::ModuleHandle, std::shared_ptr<AittTransport>>;
void ConnectionCB(ConnectionCallback cb, void *user_data, int status);
AittSubscribeID SubscribeMQ(SubscribeInfo *info, MainLoopHandler *loop_handle,
void PublishWebRtc(const std::string &topic, const void *data, const size_t datalen,
AittQoS qos, bool retain);
void UnsubscribeAll();
- std::shared_ptr<AittTransport> GetTransport(ModuleLoader::Type type);
AITT &public_api;
std::string id_;
std::unique_ptr<MQ> mq;
AittDiscovery discovery;
unsigned short reply_id;
- ModuleObj *modules[ModuleLoader::TYPE_TRANSPORT_MAX];
+ std::vector<ModuleLoader::ModuleHandle> module_handles;
+ std::unique_ptr<AittTransport> transports[ModuleLoader::TYPE_TRANSPORT_MAX];
MainLoopHandler main_loop;
void ThreadMain(void);
std::thread aittThread;