#include <atomic>
#include "AittException.h"
+#include "MQProxy.h"
#include "aitt_internal.h"
namespace aitt {
AittDiscovery::AittDiscovery(const std::string &id, bool custom_broker)
- : id_(id), discovery_mq(id + "d", true), callback_handle(nullptr)
+ : id_(id), discovery_mq(new MQProxy(id + "d", true, custom_broker)), callback_handle(nullptr)
{
}
{
RET_IF(callback_handle);
- discovery_mq.SetWillInfo(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
- discovery_mq.Connect(host, port, username, password);
+ discovery_mq->SetWillInfo(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
+ discovery_mq->Connect(host, port, username, password);
- callback_handle = discovery_mq.Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback,
+ callback_handle = discovery_mq->Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback,
static_cast<void *>(this), AITT_QOS_EXACTLY_ONCE);
}
void AittDiscovery::Stop()
{
- discovery_mq.Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
- discovery_mq.Unsubscribe(callback_handle);
+ discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
+ discovery_mq->Unsubscribe(callback_handle);
callback_handle = nullptr;
- discovery_mq.Disconnect();
+ discovery_mq->Disconnect();
}
void AittDiscovery::UpdateDiscoveryMsg(AittProtocol protocol, const void *msg, size_t length)
fbb.Finish();
auto buf = fbb.GetBuffer();
- discovery_mq.Publish(DISCOVERY_TOPIC_BASE + id_, buf.data(), buf.size(), AITT_QOS_EXACTLY_ONCE,
+ discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, buf.data(), buf.size(), AITT_QOS_EXACTLY_ONCE,
true);
}
AittProtocol GetProtocol(const std::string &protocol_str);
std::string id_;
- MQ discovery_mq;
+ std::unique_ptr<MQ> discovery_mq;
void *callback_handle;
std::map<AittProtocol, DiscoveryBlob> discovery_map;
std::map<int, std::pair<AittProtocol, DiscoveryCallback>> callbacks;
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MQProxy.h"
+
+#include "ModuleLoader.h"
+#include "MosquittoMQ.h"
+
+namespace aitt {
+
+MQProxy::MQProxy(const std::string &id, bool clear_session, bool is_custom_broker)
+ : handle(nullptr, nullptr)
+{
+ if (is_custom_broker) {
+ ModuleLoader loader;
+ handle = loader.OpenModule(ModuleLoader::TYPE_CUSTOM_MQTT);
+
+ mq = loader.LoadMqttClient(handle.get(), "test", true);
+ } else {
+ mq = std::shared_ptr<MQ>(new MosquittoMQ(id, clear_session));
+ }
+}
+
+void MQProxy::SetConnectionCallback(const MQConnectionCallback &cb)
+{
+ mq->SetConnectionCallback(cb);
+}
+
+void MQProxy::Connect(const std::string &host, int port, const std::string &username,
+ const std::string &password)
+{
+ mq->Connect(host, port, username, password);
+}
+
+void MQProxy::SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos,
+ bool retain)
+{
+ mq->SetWillInfo(topic, msg, szmsg, qos, retain);
+}
+
+void MQProxy::Disconnect(void)
+{
+ mq->Disconnect();
+}
+
+void MQProxy::Publish(const std::string &topic, const void *data, const size_t datalen, int qos,
+ bool retain)
+{
+ mq->Publish(topic, data, datalen, qos, retain);
+}
+
+void MQProxy::PublishWithReply(const std::string &topic, const void *data, const size_t datalen,
+ int qos, bool retain, const std::string &reply_topic, const std::string &correlation)
+{
+ mq->PublishWithReply(topic, data, datalen, qos, retain, reply_topic, correlation);
+}
+
+void MQProxy::SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain)
+{
+ mq->SendReply(msg, data, datalen, qos, retain);
+}
+
+void *MQProxy::Subscribe(const std::string &topic, const SubscribeCallback &cb, void *user_data,
+ int qos)
+{
+ return mq->Subscribe(topic, cb, user_data, qos);
+}
+
+void *MQProxy::Unsubscribe(void *handle)
+{
+ return mq->Unsubscribe(handle);
+}
+
+} // namespace aitt
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+
+#include "MQ.h"
+
+namespace aitt {
+
+class MQProxy : public MQ {
+ public:
+ explicit MQProxy(const std::string &id, bool clear_session, bool custom_broker);
+ virtual ~MQProxy() = default;
+
+ void SetConnectionCallback(const MQConnectionCallback &cb);
+ void Connect(const std::string &host, int port, const std::string &username,
+ const std::string &password);
+ void SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos, bool retain);
+ void Disconnect(void);
+ void Publish(const std::string &topic, const void *data, const size_t datalen, int qos = 0,
+ bool retain = false);
+ void PublishWithReply(const std::string &topic, const void *data, const size_t datalen, int qos,
+ bool retain, const std::string &reply_topic, const std::string &correlation);
+ void SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain);
+ void *Subscribe(const std::string &topic, const SubscribeCallback &cb,
+ void *user_data = nullptr, int qos = 0);
+ void *Unsubscribe(void *handle);
+
+ private:
+ std::unique_ptr<void, void (*)(const void *)> handle;
+ std::shared_ptr<MQ> mq;
+};
+
+} // namespace aitt
return "libaitt-transport-tcp.so";
if (type == TYPE_WEBRTC)
return "libaitt-transport-webrtc.so";
+ if (type == TYPE_CUSTOM_MQTT)
+ return "libaitt-st-broker.so";
return std::string("Unknown");
}
return instance;
}
+std::shared_ptr<MQ> ModuleLoader::LoadMqttClient(void *handle, const std::string &id,
+ bool clear_session)
+{
+ MQ::ModuleEntry get_instance_fn =
+ reinterpret_cast<MQ::ModuleEntry>(dlsym(handle, MQ::MODULE_ENTRY_NAME));
+ if (get_instance_fn == nullptr) {
+ ERR("dlsym: %s", dlerror());
+ throw AittException(AittException::SYSTEM_ERR);
+ }
+
+ std::shared_ptr<MQ> instance(static_cast<MQ *>(get_instance_fn(id.c_str(), clear_session)),
+ [](const MQ *instance) { delete instance; });
+ if (instance == nullptr) {
+ ERR("Failed to create a new instance");
+ throw AittException(AittException::SYSTEM_ERR);
+ }
+
+ return instance;
+}
+
} // namespace aitt
TYPE_WEBRTC,
TYPE_RTSP,
TYPE_TRANSPORT_MAX,
+ TYPE_CUSTOM_MQTT,
};
using ModuleHandle = std::unique_ptr<void, void (*)(const void *)>;
ModuleHandle OpenModule(Type type);
std::shared_ptr<AittTransport> LoadTransport(void *handle, const std::string &ip,
AittDiscovery &discovery);
+ std::shared_ptr<MQ> LoadMqttClient(void *handle, const std::string &id, bool clear_session);
private:
std::string GetModuleFilename(Type type);
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "MQ.h"
+#include "MosquittoMQ.h"
#include <mqtt_protocol.h>
#include <sys/types.h>
#include "AittException.h"
#include "AittTypes.h"
+#include "AittUtil.h"
#include "aitt_internal.h"
namespace aitt {
-const std::string MQ::REPLY_SEQUENCE_NUM_KEY = "sequenceNum";
-const std::string MQ::REPLY_IS_END_SEQUENCE_KEY = "isEndSequence";
+const std::string MosquittoMQ::REPLY_SEQUENCE_NUM_KEY = "sequenceNum";
+const std::string MosquittoMQ::REPLY_IS_END_SEQUENCE_KEY = "isEndSequence";
-MQ::MQ(const std::string &id, bool clear_session)
+MosquittoMQ::MosquittoMQ(const std::string &id, bool clear_session)
: handle(nullptr),
keep_alive(60),
subscribers_iterating(false),
mosquitto_destroy(handle);
mosquitto_lib_cleanup();
- throw AittException(AittException::MQTT_ERR, std::string("MQ Constructor Error"));
+ throw AittException(AittException::MQTT_ERR, std::string("MosquittoMQ Constructor Error"));
}
-MQ::~MQ(void)
+MosquittoMQ::~MosquittoMQ(void)
{
int ret;
INFO("Destructor");
ERR("mosquitto_lib_cleanup() Fail(%s)", mosquitto_strerror(ret));
}
-void MQ::SetConnectionCallback(const MQConnectionCallback &cb)
+void MosquittoMQ::SetConnectionCallback(const MQConnectionCallback &cb)
{
std::lock_guard<std::recursive_mutex> lock_from_here(callback_lock);
connect_cb = cb;
}
-void MQ::ConnectCallback(struct mosquitto *mosq, void *obj, int rc, int flag,
+void MosquittoMQ::ConnectCallback(struct mosquitto *mosq, void *obj, int rc, int flag,
const mosquitto_property *props)
{
RET_IF(obj == nullptr);
- MQ *mq = static_cast<MQ *>(obj);
+ MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
INFO("Connected : rc(%d), flag(%d)", rc, flag);
mq->connect_cb(AITT_CONNECTED);
}
-void MQ::DisconnectCallback(struct mosquitto *mosq, void *obj, int rc,
+void MosquittoMQ::DisconnectCallback(struct mosquitto *mosq, void *obj, int rc,
const mosquitto_property *props)
{
RET_IF(obj == nullptr);
- MQ *mq = static_cast<MQ *>(obj);
+ MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
INFO("Disconnected : rc(%d)", rc);
mq->connect_cb(AITT_DISCONNECTED);
}
-void MQ::Connect(const std::string &host, int port, const std::string &username,
+void MosquittoMQ::Connect(const std::string &host, int port, const std::string &username,
const std::string &password)
{
int ret;
}
}
-void MQ::SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos, bool retain)
+void MosquittoMQ::SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos,
+ bool retain)
{
int ret = mosquitto_will_set(handle, topic.c_str(), szmsg, msg, qos, retain);
if (ret != MOSQ_ERR_SUCCESS) {
}
}
-void MQ::Disconnect(void)
+void MosquittoMQ::Disconnect(void)
{
int ret = mosquitto_disconnect(handle);
if (ret != MOSQ_ERR_SUCCESS) {
mosquitto_will_clear(handle);
}
-void MQ::MessageCallback(mosquitto *handle, void *obj, const mosquitto_message *msg,
+void MosquittoMQ::MessageCallback(mosquitto *handle, void *obj, const mosquitto_message *msg,
const mosquitto_property *props)
{
RET_IF(obj == nullptr);
- MQ *mq = static_cast<MQ *>(obj);
+ MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
std::lock_guard<std::recursive_mutex> auto_lock(mq->callback_lock);
mq->subscribers_iterating = true;
if (nullptr == subscribe_data)
ERR("end() is not valid because elements were added.");
- bool result = CompareTopic(subscribe_data->topic.c_str(), msg->topic);
+ bool result = AittUtil::CompareTopic(subscribe_data->topic.c_str(), msg->topic);
if (result)
mq->InvokeCallback(msg, props);
mq->new_subscribers.clear();
}
-void MQ::InvokeCallback(const mosquitto_message *msg, const mosquitto_property *props)
+void MosquittoMQ::InvokeCallback(const mosquitto_message *msg, const mosquitto_property *props)
{
MSG mq_msg;
mq_msg.SetTopic(msg->topic);
cb_info->cb(&mq_msg, msg->topic, msg->payload, msg->payloadlen, cb_info->user_data);
}
-void MQ::Publish(const std::string &topic, const void *data, const size_t datalen, int qos,
+void MosquittoMQ::Publish(const std::string &topic, const void *data, const size_t datalen, int qos,
bool retain)
{
int mid = -1;
}
}
-void MQ::PublishWithReply(const std::string &topic, const void *data, const size_t datalen, int qos,
- bool retain, const std::string &reply_topic, const std::string &correlation)
+void MosquittoMQ::PublishWithReply(const std::string &topic, const void *data, const size_t datalen,
+ int qos, bool retain, const std::string &reply_topic, const std::string &correlation)
{
int ret;
int mid = -1;
}
}
-void MQ::SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain)
+void MosquittoMQ::SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain)
{
RET_IF(msg == nullptr);
}
}
-void *MQ::Subscribe(const std::string &topic, const SubscribeCallback &cb, void *user_data, int qos)
+void *MosquittoMQ::Subscribe(const std::string &topic, const SubscribeCallback &cb, void *user_data,
+ int qos)
{
int mid = -1;
int ret = mosquitto_subscribe(handle, &mid, topic.c_str(), qos);
return static_cast<void *>(data);
}
-void *MQ::Unsubscribe(void *sub_handle)
+void *MosquittoMQ::Unsubscribe(void *sub_handle)
{
std::lock_guard<std::recursive_mutex> auto_lock(callback_lock);
auto it = std::find(subscribers.begin(), subscribers.end(),
return user_data;
}
-bool MQ::CompareTopic(const std::string &left, const std::string &right)
-{
- bool result = false;
- int ret = mosquitto_topic_matches_sub(left.c_str(), right.c_str(), &result);
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_topic_matches_sub(%s, %s) Fail(%s)", left.c_str(), right.c_str(),
- mosquitto_strerror(ret));
- throw AittException(AittException::MQTT_ERR);
- }
- return result;
-}
-
-MQ::SubscribeData::SubscribeData(const std::string &in_topic, const SubscribeCallback &in_cb,
- void *in_user_data)
+MosquittoMQ::SubscribeData::SubscribeData(const std::string &in_topic,
+ const SubscribeCallback &in_cb, void *in_user_data)
: topic(in_topic), cb(in_cb), user_data(in_user_data)
{
}
#include <string>
#include <vector>
+#include "MQ.h"
#include "MSG.h"
#define MQTT_LOCALHOST "127.0.0.1"
namespace aitt {
-class MQ {
+class MosquittoMQ : public MQ {
public:
- using SubscribeCallback = std::function<void(MSG *msg, const std::string &topic,
- const void *data, const size_t datalen, void *user_data)>;
- using MQConnectionCallback = std::function<void(int)>;
-
- explicit MQ(const std::string &id, bool clear_session = false);
- virtual ~MQ(void);
-
- static bool CompareTopic(const std::string &left, const std::string &right);
+ explicit MosquittoMQ(const std::string &id, bool clear_session = false);
+ virtual ~MosquittoMQ(void);
void SetConnectionCallback(const MQConnectionCallback &cb);
void Connect(const std::string &host, int port, const std::string &username,
--- /dev/null
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <MSG.h>
+
+#include <functional>
+#include <string>
+
+#define AITT_MQ_NEW aitt_mq_new
+#define TO_STR(s) #s
+#define DEFINE_TO_STR(x) TO_STR(x)
+
+namespace aitt {
+
+class MQ {
+ public:
+ typedef void *(*ModuleEntry)(const char *id, bool clear_session);
+
+ using SubscribeCallback = std::function<void(MSG *msg, const std::string &topic,
+ const void *data, const size_t datalen, void *user_data)>;
+ using MQConnectionCallback = std::function<void(int)>;
+
+ static constexpr const char *const MODULE_ENTRY_NAME = DEFINE_TO_STR(AITT_MQ_NEW);
+
+ MQ() = default;
+ virtual ~MQ() = default;
+
+ virtual void SetConnectionCallback(const MQConnectionCallback &cb) = 0;
+ virtual void Connect(const std::string &host, int port, const std::string &username,
+ const std::string &password) = 0;
+ virtual void SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos,
+ bool retain) = 0;
+ virtual void Disconnect(void) = 0;
+ virtual void Publish(const std::string &topic, const void *data, const size_t datalen,
+ int qos = 0, bool retain = false) = 0;
+ virtual void PublishWithReply(const std::string &topic, const void *data, const size_t datalen,
+ int qos, bool retain, const std::string &reply_topic, const std::string &correlation) = 0;
+ virtual void SendReply(MSG *msg, const void *data, const size_t datalen, int qos,
+ bool retain) = 0;
+ virtual void *Subscribe(const std::string &topic, const SubscribeCallback &cb,
+ void *user_data = nullptr, int qos = 0) = 0;
+ virtual void *Unsubscribe(void *handle) = 0;
+};
+
+} // namespace aitt
+
+#undef TO_STR
+#undef DEFINE_TO_STR
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#include "MqttServer.h"
+#include "MQProxy.h"
#include "aitt_internal.h"
#define MQTT_HANDLER_MSG_QOS 1
#define MQTT_HANDLER_MGMT_QOS 2
-MqttServer::MqttServer(const Config &config) : mq(config.GetLocalId(), true)
+MqttServer::MqttServer(const Config &config)
+ : mq(new aitt::MQProxy(config.GetLocalId(), true, false))
{
broker_ip_ = config.GetBrokerIp();
broker_port_ = config.GetBrokerPort();
DBG("ID[%s] BROKER IP[%s] BROKER PORT [%d] ROOM[%s] %s", id_.c_str(), broker_ip_.c_str(),
broker_port_, room_id_.c_str(), is_publisher_ ? "Publisher" : "Subscriber");
- mq.SetConnectionCallback(std::bind(&MqttServer::ConnectCallBack, this, std::placeholders::_1));
+ mq->SetConnectionCallback(std::bind(&MqttServer::ConnectCallBack, this, std::placeholders::_1));
}
MqttServer::~MqttServer()
// Notify Who is source?
std::string source_topic = room_id_ + std::string("/source");
if (is_publisher_) {
- mq.Publish(source_topic, id_.c_str(), id_.size(), AITT_QOS_EXACTLY_ONCE, true);
+ mq->Publish(source_topic, id_.c_str(), id_.size(), AITT_QOS_EXACTLY_ONCE, true);
SetConnectionState(ConnectionState::Registered);
} else {
- mq.Subscribe(source_topic,
+ mq->Subscribe(source_topic,
std::bind(&MqttServer::HandleSourceTopic, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5),
int MqttServer::Connect(void)
{
std::string will_message = std::string("ROOM_PEER_LEFT ") + id_;
- mq.SetWillInfo(room_id_, will_message.c_str(), will_message.size(), AITT_QOS_EXACTLY_ONCE,
+ mq->SetWillInfo(room_id_, will_message.c_str(), will_message.size(), AITT_QOS_EXACTLY_ONCE,
false);
SetConnectionState(ConnectionState::Connecting);
- mq.Connect(broker_ip_, broker_port_, std::string(), std::string());
+ mq->Connect(broker_ip_, broker_port_, std::string(), std::string());
return 0;
}
if (is_publisher_) {
INFO("remove retained");
std::string source_topic = room_id_ + std::string("/source");
- mq.Publish(source_topic, nullptr, 0, AITT_QOS_AT_LEAST_ONCE, true);
+ mq->Publish(source_topic, nullptr, 0, AITT_QOS_AT_LEAST_ONCE, true);
}
std::string left_message = std::string("ROOM_PEER_LEFT ") + id_;
- mq.Publish(room_id_, left_message.c_str(), left_message.size(), AITT_QOS_AT_LEAST_ONCE, false);
+ mq->Publish(room_id_, left_message.c_str(), left_message.size(), AITT_QOS_AT_LEAST_ONCE, false);
- mq.Disconnect();
+ mq->Disconnect();
room_id_ = std::string("");
std::string receiver_topic = room_id_ + std::string("/") + peer_id;
std::string server_formatted_msg = "ROOM_PEER_MSG " + id_ + " " + msg;
- mq.Publish(receiver_topic, server_formatted_msg.c_str(), server_formatted_msg.size(),
+ mq->Publish(receiver_topic, server_formatted_msg.c_str(), server_formatted_msg.size(),
AITT_QOS_AT_LEAST_ONCE);
return 0;
}
// Subscribe PEER_JOIN PEER_LEFT
- mq.Subscribe(room_id_,
+ mq->Subscribe(room_id_,
std::bind(&MqttServer::HandleRoomTopic, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5),
// Subscribe PEER_MSG
std::string receiving_topic = room_id + std::string("/") + id_;
- mq.Subscribe(receiving_topic,
+ mq->Subscribe(receiving_topic,
std::bind(&MqttServer::HandleMessageTopic, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5),
if (!is_publisher_) {
std::string join_message = std::string("ROOM_PEER_JOINED ") + id_;
- mq.Publish(room_id_, join_message.c_str(), join_message.size(), AITT_QOS_EXACTLY_ONCE);
+ mq->Publish(room_id_, join_message.c_str(), join_message.size(), AITT_QOS_EXACTLY_ONCE);
}
}
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#pragma once
-#include <MQ.h>
+#include <functional>
+#include <memory>
+#include <string>
#include "Config.h"
#include "IfaceServer.h"
+#include "MQ.h"
class MqttServer : public IfaceServer {
public:
std::string room_id_;
std::string source_id_;
bool is_publisher_;
- aitt::MQ mq;
+ std::unique_ptr<aitt::MQ> mq;
ConnectionState connection_state_;
std::function<void(ConnectionState)> connection_state_changed_cb_;
#include <memory>
#include <stdexcept>
+#include "MQProxy.h"
#include "aitt_internal.h"
#define WEBRTC_ROOM_ID_PREFIX std::string(AITT_MANAGED_TOPIC_PREFIX "webrtc/room/Room.webrtc")
bool custom_broker)
: public_api(parent),
id_(id),
- mq(id, clear_session),
+ mq(new MQProxy(id, clear_session, custom_broker)),
discovery(id, custom_broker),
reply_id(0),
modules{0}
void AITT::Impl::SetWillInfo(const std::string &topic, const void *data, const size_t datalen,
AittQoS qos, bool retain)
{
- mq.SetWillInfo(topic, data, datalen, qos, retain);
+ mq->SetWillInfo(topic, data, datalen, qos, retain);
}
void AITT::Impl::SetConnectionCallback(ConnectionCallback cb, void *user_data)
{
if (cb)
- mq.SetConnectionCallback(
+ mq->SetConnectionCallback(
std::bind(&Impl::ConnectionCB, this, cb, user_data, std::placeholders::_1));
else
- mq.SetConnectionCallback(nullptr);
+ mq->SetConnectionCallback(nullptr);
}
void AITT::Impl::ConnectionCB(ConnectionCallback cb, void *user_data, int status)
const std::string &password)
{
discovery.Start(host, port, username, password);
- mq.Connect(host, port, username, password);
+ mq->Connect(host, port, username, password);
mqtt_broker_ip_ = host;
mqtt_broker_port_ = port;
mqtt_broker_ip_.clear();
mqtt_broker_port_ = -1;
- mq.Disconnect();
+ mq->Disconnect();
discovery.Stop();
}
for (auto subscribe_info : subscribed_list) {
switch (subscribe_info->first) {
case AITT_TYPE_MQTT:
- mq.Unsubscribe(subscribe_info->second);
+ mq->Unsubscribe(subscribe_info->second);
break;
case AITT_TYPE_TCP:
GetTransport(ModuleLoader::TYPE_TCP)->Unsubscribe(subscribe_info->second);
AittProtocol protocols, AittQoS qos, bool retain)
{
if ((protocols & AITT_TYPE_MQTT) == AITT_TYPE_MQTT)
- mq.Publish(topic, data, datalen, qos, retain);
+ mq->Publish(topic, data, datalen, qos, retain);
if ((protocols & AITT_TYPE_TCP) == AITT_TYPE_TCP)
GetTransport(ModuleLoader::TYPE_TCP)->Publish(topic, data, datalen, qos, retain);
AittSubscribeID AITT::Impl::SubscribeMQ(SubscribeInfo *handle, MainLoopHandler *loop_handle,
const std::string &topic, const SubscribeCallback &cb, void *user_data, AittQoS qos)
{
- return mq.Subscribe(
+ return mq->Subscribe(
topic,
[this, handle, loop_handle, cb](MSG *msg, const std::string &topic, const void *data,
const size_t datalen, void *mq_user_data) {
SubscribeInfo *found_info = *it;
switch (found_info->first) {
case AITT_TYPE_MQTT:
- user_data = mq.Unsubscribe(found_info->second);
+ user_data = mq->Unsubscribe(found_info->second);
break;
case AITT_TYPE_TCP: {
auto tcpModule = GetTransport(ModuleLoader::TYPE_TCP);
},
user_data, protocol, qos);
- mq.PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
+ mq->PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
return 0;
}
subscribed_list.push_back(info);
}
- mq.PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
+ mq->PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
if (timeout_ms)
HandleTimeout(timeout_ms, timeout_id, sync_loop, is_timeout);
msg->IncreaseSequence();
msg->SetEndSequence(end);
- mq.SendReply(msg, data, datalen, AITT_QOS_AT_MOST_ONCE, false);
+ mq->SendReply(msg, data, datalen, AITT_QOS_AT_MOST_ONCE, false);
}
void *AITT::Impl::SubscribeTCP(SubscribeInfo *handle, const std::string &topic,
std::string id_;
std::string mqtt_broker_ip_;
int mqtt_broker_port_;
- MQ mq;
+ std::unique_ptr<MQ> mq;
AittDiscovery discovery;
unsigned short reply_id;
ModuleObj *modules[ModuleLoader::TYPE_TRANSPORT_MAX];
LINK_DIRECTORIES(${UT_NEEDS_LIBRARY_DIRS})
###########################################################################
-SET(AITT_UT_SRC AITT_test.cc RequestResponse_test.cc MainLoopHandler_test.cc aitt_c_test.cc AITT_TCP_test.cc MQ_test.cc)
+SET(AITT_UT_SRC AITT_test.cc RequestResponse_test.cc MainLoopHandler_test.cc aitt_c_test.cc AITT_TCP_test.cc MosquittoMQ_test.cc)
ADD_EXECUTABLE(${AITT_UT} ${AITT_UT_SRC})
TARGET_LINK_LIBRARIES(${AITT_UT} Threads::Threads ${UT_NEEDS_LIBRARIES} ${PROJECT_NAME})
###########################################################################
AUX_SOURCE_DIRECTORY(../mock MOCK_SRC)
-ADD_EXECUTABLE(${AITT_UT}_mq MQ_mocktest.cc ${MOCK_SRC})
-TARGET_LINK_LIBRARIES(${AITT_UT}_mq ${UT_NEEDS_LIBRARIES} Threads::Threads ${AITT_NEEDS_LIBRARIES} ${AITT_COMMON})
+ADD_EXECUTABLE(${AITT_UT}_mq MosquittoMQ_mocktest.cc ${MOCK_SRC})
+TARGET_LINK_LIBRARIES(${AITT_UT}_mq ${UT_NEEDS_LIBRARIES} Threads::Threads ${AITT_COMMON})
TARGET_INCLUDE_DIRECTORIES(${AITT_UT}_mq PRIVATE ../mock)
INSTALL(TARGETS ${AITT_UT}_mq DESTINATION ${AITT_TEST_BINDIR})
auto module = loader.LoadTransport(handle.get(), LOCAL_IP, discovery);
ASSERT_NE(module, nullptr);
}
+
+TEST_F(ModuleLoaderTest, LoadMqttClient_P_Anytime)
+{
+ ModuleLoader::ModuleHandle handle = loader.OpenModule(ModuleLoader::TYPE_CUSTOM_MQTT);
+ if (handle) {
+ EXPECT_NO_THROW({
+ auto module = loader.LoadMqttClient(handle.get(), "test", false);
+ ASSERT_NE(module, nullptr);
+ });
+ }
+}
+
+TEST_F(ModuleLoaderTest, LoadMqttClient_N_Anytime)
+{
+ EXPECT_THROW(
+ {
+ loader.LoadMqttClient(nullptr, "test", false);
+ FAIL() << "Should not be called";
+ },
+ aitt::AittException);
+}
#include <mutex>
#include "AittTypes.h"
-#include "MQ.h"
#include "MQMockTest.h"
#include "MQTTMock.h"
+#include "MosquittoMQ.h"
using ::testing::Return;
EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
FAIL() << "lib_init must be failed";
} catch (std::exception &e) {
- ASSERT_STREQ(e.what(), "MQTT failure : MQ Constructor Error");
+ ASSERT_STREQ(e.what(), "MQTT failure : MosquittoMQ Constructor Error");
}
}
EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
FAIL() << "lib_init must be failed";
} catch (std::exception &e) {
- ASSERT_STREQ(e.what(), "MQTT failure : MQ Constructor Error");
+ ASSERT_STREQ(e.what(), "MQTT failure : MosquittoMQ Constructor Error");
}
}
EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
mq.Connect(TEST_HOST, TEST_PORT, "", "");
mq.Publish(TEST_TOPIC, TEST_PAYLOAD, sizeof(TEST_PAYLOAD));
} catch (std::exception &e) {
EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
mq.Connect(TEST_HOST, TEST_PORT, "", "");
mq.Subscribe(
TEST_TOPIC,
EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
mq.Connect(TEST_HOST, TEST_PORT, "", "");
void *handle = mq.Subscribe(
TEST_TOPIC,
EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
} catch (std::exception &e) {
FAIL() << "Unexpected exception occurred";
}
EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
mq.SetWillInfo("lastWill", TEST_PAYLOAD, sizeof(TEST_PAYLOAD), AITT_QOS_AT_MOST_ONCE, true);
mq.Connect(TEST_HOST, TEST_PORT, "", "");
FAIL() << "Connect() must be failed";
EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
mq.Connect(TEST_HOST, TEST_PORT, "", "");
} catch (std::exception &e) {
FAIL() << "Unepxected exception: " << e.what();
EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
mq.Connect(TEST_HOST, TEST_PORT, username, password);
} catch (std::exception &e) {
FAIL() << "Unepxected exception: " << e.what();
EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
mq.Disconnect();
} catch (std::exception &e) {
FAIL() << "Unexpected exception: " << e.what();
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "MQ.h"
+#include "MosquittoMQ.h"
#include <gtest/gtest.h>
#include "aitt_internal.h"
#include "aitt_tests.h"
-using MQ = aitt::MQ;
+using MosquittoMQ = aitt::MosquittoMQ;
class MQTest : public testing::Test, public AittTests {
protected:
TEST_F(MQTest, Positve_Subscribe_in_Subscribe_Anytime)
{
try {
- MQ mq("MQ_TEST_ID");
+ MosquittoMQ mq("MQ_TEST_ID");
mq.Connect(LOCAL_IP, 1883, "", "");
mq.Subscribe(
"MQ_TEST_TOPIC1",