#include <atomic>
#include "AittException.h"
+#include "MQProxy.h"
#include "aitt_internal.h"
namespace aitt {
AittDiscovery::AittDiscovery(const std::string &id, bool custom_broker)
- : id_(id), discovery_mq(id + "d", true), callback_handle(nullptr)
+ : id_(id), discovery_mq(new MQProxy(id + "d", true, custom_broker)), callback_handle(nullptr)
{
}
{
RET_IF(callback_handle);
- discovery_mq.SetWillInfo(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
- discovery_mq.Connect(host, port, username, password);
+ discovery_mq->SetWillInfo(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
+ discovery_mq->Connect(host, port, username, password);
- callback_handle = discovery_mq.Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback,
+ callback_handle = discovery_mq->Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback,
static_cast<void *>(this), AITT_QOS_EXACTLY_ONCE);
}
void AittDiscovery::Stop()
{
- discovery_mq.Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
- discovery_mq.Unsubscribe(callback_handle);
+ discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
+ discovery_mq->Unsubscribe(callback_handle);
callback_handle = nullptr;
- discovery_mq.Disconnect();
+ discovery_mq->Disconnect();
}
void AittDiscovery::UpdateDiscoveryMsg(AittProtocol protocol, const void *msg, size_t length)
fbb.Finish();
auto buf = fbb.GetBuffer();
- discovery_mq.Publish(DISCOVERY_TOPIC_BASE + id_, buf.data(), buf.size(), AITT_QOS_EXACTLY_ONCE,
+ discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, buf.data(), buf.size(), AITT_QOS_EXACTLY_ONCE,
true);
}
AittProtocol GetProtocol(const std::string &protocol_str);
std::string id_;
- MQ discovery_mq;
+ std::unique_ptr<MQ> discovery_mq;
void *callback_handle;
std::map<AittProtocol, DiscoveryBlob> discovery_map;
std::map<int, std::pair<AittProtocol, DiscoveryCallback>> callbacks;
+++ /dev/null
-/*
- * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "MQ.h"
-
-#include <mqtt_protocol.h>
-#include <sys/types.h>
-#include <unistd.h>
-
-#include <algorithm>
-#include <cerrno>
-#include <stdexcept>
-#include <thread>
-
-#include "AittException.h"
-#include "AittTypes.h"
-#include "aitt_internal.h"
-
-namespace aitt {
-
-const std::string MQ::REPLY_SEQUENCE_NUM_KEY = "sequenceNum";
-const std::string MQ::REPLY_IS_END_SEQUENCE_KEY = "isEndSequence";
-
-MQ::MQ(const std::string &id, bool clear_session)
- : handle(nullptr),
- keep_alive(60),
- subscribers_iterating(false),
- subscriber_iterator_updated(false),
- connect_cb(nullptr)
-{
- do {
- int ret = mosquitto_lib_init();
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_lib_init() Fail(%s)", mosquitto_strerror(ret));
- break;
- }
-
- handle = mosquitto_new(id.c_str(), clear_session, this);
- if (handle == nullptr) {
- ERR("mosquitto_new(%s, %d) Fail", id.c_str(), clear_session);
- break;
- }
-
- ret = mosquitto_int_option(handle, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_int_option() Fail(%s)", mosquitto_strerror(ret));
- break;
- }
-
- mosquitto_message_v5_callback_set(handle, MessageCallback);
- mosquitto_connect_v5_callback_set(handle, ConnectCallback);
- mosquitto_disconnect_v5_callback_set(handle, DisconnectCallback);
-
- ret = mosquitto_loop_start(handle);
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_loop_start() Fail(%s)", mosquitto_strerror(ret));
- break;
- }
-
- return;
- } while (0);
-
- mosquitto_destroy(handle);
- mosquitto_lib_cleanup();
- throw AittException(AittException::MQTT_ERR, std::string("MQ Constructor Error"));
-}
-
-MQ::~MQ(void)
-{
- int ret;
- INFO("Destructor");
-
- ret = mosquitto_loop_stop(handle, true);
- if (ret != MOSQ_ERR_SUCCESS)
- ERR("mosquitto_loop_stop() Fail(%s)", mosquitto_strerror(ret));
-
- callback_lock.lock();
- connect_cb = nullptr;
- subscribers.clear();
- callback_lock.unlock();
-
- mosquitto_destroy(handle);
-
- ret = mosquitto_lib_cleanup();
- if (ret != MOSQ_ERR_SUCCESS)
- ERR("mosquitto_lib_cleanup() Fail(%s)", mosquitto_strerror(ret));
-}
-
-void MQ::SetConnectionCallback(const MQConnectionCallback &cb)
-{
- std::lock_guard<std::recursive_mutex> lock_from_here(callback_lock);
- connect_cb = cb;
-}
-
-void MQ::ConnectCallback(struct mosquitto *mosq, void *obj, int rc, int flag,
- const mosquitto_property *props)
-{
- RET_IF(obj == nullptr);
- MQ *mq = static_cast<MQ *>(obj);
-
- INFO("Connected : rc(%d), flag(%d)", rc, flag);
-
- std::lock_guard<std::recursive_mutex> lock_from_here(mq->callback_lock);
- if (mq->connect_cb)
- mq->connect_cb(AITT_CONNECTED);
-}
-
-void MQ::DisconnectCallback(struct mosquitto *mosq, void *obj, int rc,
- const mosquitto_property *props)
-{
- RET_IF(obj == nullptr);
- MQ *mq = static_cast<MQ *>(obj);
-
- INFO("Disconnected : rc(%d)", rc);
-
- std::lock_guard<std::recursive_mutex> lock_from_here(mq->callback_lock);
- if (mq->connect_cb)
- mq->connect_cb(AITT_DISCONNECTED);
-}
-
-void MQ::Connect(const std::string &host, int port, const std::string &username,
- const std::string &password)
-{
- int ret;
-
- if (username.empty() == false) {
- ret = mosquitto_username_pw_set(handle, username.c_str(), password.c_str());
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_username_pw_set(%s, %s) Fail(%s)", username.c_str(), password.c_str(),
- mosquitto_strerror(ret));
- throw AittException(AittException::MQTT_ERR);
- }
- }
-
- ret = mosquitto_connect(handle, host.c_str(), port, keep_alive);
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_connect(%s, %d) Fail(%s)", host.c_str(), port, mosquitto_strerror(ret));
- throw AittException(AittException::MQTT_ERR);
- }
-}
-
-void MQ::SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos, bool retain)
-{
- int ret = mosquitto_will_set(handle, topic.c_str(), szmsg, msg, qos, retain);
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_will_set(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
- throw AittException(AittException::MQTT_ERR);
- }
-}
-
-void MQ::Disconnect(void)
-{
- int ret = mosquitto_disconnect(handle);
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_disconnect() Fail(%s)", mosquitto_strerror(ret));
- throw AittException(AittException::MQTT_ERR);
- }
-
- mosquitto_will_clear(handle);
-}
-
-void MQ::MessageCallback(mosquitto *handle, void *obj, const mosquitto_message *msg,
- const mosquitto_property *props)
-{
- RET_IF(obj == nullptr);
- MQ *mq = static_cast<MQ *>(obj);
-
- std::lock_guard<std::recursive_mutex> auto_lock(mq->callback_lock);
- mq->subscribers_iterating = true;
- mq->subscriber_iterator = mq->subscribers.begin();
- while (mq->subscriber_iterator != mq->subscribers.end()) {
- auto subscribe_data = *(mq->subscriber_iterator);
- if (nullptr == subscribe_data)
- ERR("end() is not valid because elements were added.");
-
- bool result = CompareTopic(subscribe_data->topic.c_str(), msg->topic);
- if (result)
- mq->InvokeCallback(msg, props);
-
- if (!mq->subscriber_iterator_updated)
- mq->subscriber_iterator++;
- else
- mq->subscriber_iterator_updated = false;
- }
- mq->subscribers_iterating = false;
- mq->subscribers.insert(mq->subscribers.end(), mq->new_subscribers.begin(),
- mq->new_subscribers.end());
- mq->new_subscribers.clear();
-}
-
-void MQ::InvokeCallback(const mosquitto_message *msg, const mosquitto_property *props)
-{
- MSG mq_msg;
- mq_msg.SetTopic(msg->topic);
- if (props) {
- const mosquitto_property *prop;
-
- char *response_topic = nullptr;
- prop = mosquitto_property_read_string(props, MQTT_PROP_RESPONSE_TOPIC, &response_topic,
- false);
- if (prop) {
- mq_msg.SetResponseTopic(response_topic);
- free(response_topic);
- }
-
- void *correlation = nullptr;
- uint16_t correlation_size = 0;
- prop = mosquitto_property_read_binary(props, MQTT_PROP_CORRELATION_DATA, &correlation,
- &correlation_size, false);
- if (prop == nullptr || correlation == nullptr)
- ERR("No Correlation Data");
-
- mq_msg.SetCorrelation(std::string((char *)correlation, correlation_size));
- if (correlation)
- free(correlation);
-
- char *name = nullptr;
- char *value = nullptr;
- prop = mosquitto_property_read_string_pair(props, MQTT_PROP_USER_PROPERTY, &name, &value,
- false);
- while (prop) {
- if (REPLY_SEQUENCE_NUM_KEY == name) {
- mq_msg.SetSequence(std::stoi(value));
- } else if (REPLY_IS_END_SEQUENCE_KEY == name) {
- mq_msg.SetEndSequence(std::stoi(value) == 1);
- } else {
- ERR("Unsupported property(%s, %s)", name, value);
- }
- free(name);
- free(value);
-
- prop = mosquitto_property_read_string_pair(prop, MQTT_PROP_USER_PROPERTY, &name, &value,
- true);
- }
- }
-
- SubscribeData *cb_info = *subscriber_iterator;
- cb_info->cb(&mq_msg, msg->topic, msg->payload, msg->payloadlen, cb_info->user_data);
-}
-
-void MQ::Publish(const std::string &topic, const void *data, const size_t datalen, int qos,
- bool retain)
-{
- int mid = -1;
- int ret = mosquitto_publish(handle, &mid, topic.c_str(), datalen, data, qos, retain);
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_publish(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
- throw AittException(AittException::MQTT_ERR);
- }
-}
-
-void MQ::PublishWithReply(const std::string &topic, const void *data, const size_t datalen, int qos,
- bool retain, const std::string &reply_topic, const std::string &correlation)
-{
- int ret;
- int mid = -1;
- mosquitto_property *props = nullptr;
-
- ret = mosquitto_property_add_string(&props, MQTT_PROP_RESPONSE_TOPIC, reply_topic.c_str());
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_property_add_string(response-topic) Fail(%s)", mosquitto_strerror(ret));
- throw AittException(AittException::MQTT_ERR);
- }
-
- ret = mosquitto_property_add_binary(&props, MQTT_PROP_CORRELATION_DATA, correlation.c_str(),
- correlation.size());
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_property_add_binary(correlation) Fail(%s)", mosquitto_strerror(ret));
- throw AittException(AittException::MQTT_ERR);
- }
- ret = mosquitto_publish_v5(handle, &mid, topic.c_str(), datalen, data, qos, retain, props);
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_publish_v5(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
- throw AittException(AittException::MQTT_ERR);
- }
-}
-
-void MQ::SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain)
-{
- RET_IF(msg == nullptr);
-
- int ret;
- int mId = -1;
- mosquitto_property *props = nullptr;
-
- ret = mosquitto_property_add_binary(&props, MQTT_PROP_CORRELATION_DATA,
- msg->GetCorrelation().c_str(), msg->GetCorrelation().size());
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_property_add_binary(correlation) Fail(%s)", mosquitto_strerror(ret));
- throw AittException(AittException::MQTT_ERR);
- }
-
- ret = mosquitto_property_add_string_pair(&props, MQTT_PROP_USER_PROPERTY,
- REPLY_SEQUENCE_NUM_KEY.c_str(), std::to_string(msg->GetSequence()).c_str());
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_property_add_string_pair(squenceNum) Fail(%s)", mosquitto_strerror(ret));
- throw AittException(AittException::MQTT_ERR);
- }
-
- ret = mosquitto_property_add_string_pair(&props, MQTT_PROP_USER_PROPERTY,
- REPLY_IS_END_SEQUENCE_KEY.c_str(), std::to_string(msg->IsEndSequence()).c_str());
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_property_add_string_pair(IsEndSequence) Fail(%s)", mosquitto_strerror(ret));
- throw AittException(AittException::MQTT_ERR);
- }
-
- ret = mosquitto_publish_v5(handle, &mId, msg->GetResponseTopic().c_str(), datalen, data, qos,
- retain, props);
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_publish_v5(%s) Fail(%s)", msg->GetResponseTopic().c_str(),
- mosquitto_strerror(ret));
- throw AittException(AittException::MQTT_ERR);
- }
-}
-
-void *MQ::Subscribe(const std::string &topic, const SubscribeCallback &cb, void *user_data, int qos)
-{
- int mid = -1;
- int ret = mosquitto_subscribe(handle, &mid, topic.c_str(), qos);
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_subscribe(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
- throw AittException(AittException::MQTT_ERR);
- }
-
- std::lock_guard<std::recursive_mutex> lock_from_here(callback_lock);
- SubscribeData *data = new SubscribeData(topic, cb, user_data);
- if (subscribers_iterating)
- new_subscribers.push_back(data);
- else
- subscribers.push_back(data);
-
- return static_cast<void *>(data);
-}
-
-void *MQ::Unsubscribe(void *sub_handle)
-{
- std::lock_guard<std::recursive_mutex> auto_lock(callback_lock);
- auto it = std::find(subscribers.begin(), subscribers.end(),
- static_cast<SubscribeData *>(sub_handle));
-
- if (it == subscribers.end()) {
- ERR("No Subscription(%p)", sub_handle);
- throw AittException(AittException::NO_DATA_ERR);
- }
-
- SubscribeData *data = static_cast<SubscribeData *>(sub_handle);
-
- if (subscriber_iterator == it) {
- subscriber_iterator = subscribers.erase(it);
- subscriber_iterator_updated = true;
- } else {
- subscribers.erase(it);
- }
-
- void *user_data = data->user_data;
- std::string topic = data->topic;
- delete data;
-
- int mid = -1;
- int ret = mosquitto_unsubscribe(handle, &mid, topic.c_str());
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_unsubscribe(%s) Fail(%d)", topic.c_str(), ret);
- throw AittException(AittException::MQTT_ERR);
- }
-
- return user_data;
-}
-
-bool MQ::CompareTopic(const std::string &left, const std::string &right)
-{
- bool result = false;
- int ret = mosquitto_topic_matches_sub(left.c_str(), right.c_str(), &result);
- if (ret != MOSQ_ERR_SUCCESS) {
- ERR("mosquitto_topic_matches_sub(%s, %s) Fail(%s)", left.c_str(), right.c_str(),
- mosquitto_strerror(ret));
- throw AittException(AittException::MQTT_ERR);
- }
- return result;
-}
-
-MQ::SubscribeData::SubscribeData(const std::string &in_topic, const SubscribeCallback &in_cb,
- void *in_user_data)
- : topic(in_topic), cb(in_cb), user_data(in_user_data)
-{
-}
-
-} // namespace aitt
+++ /dev/null
-/*
- * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <mosquitto.h>
-
-#include <functional>
-#include <mutex>
-#include <string>
-#include <vector>
-
-#include "MSG.h"
-
-#define MQTT_LOCALHOST "127.0.0.1"
-#define MQTT_PORT 1883
-
-namespace aitt {
-
-class MQ {
- public:
- using SubscribeCallback = std::function<void(MSG *msg, const std::string &topic,
- const void *data, const size_t datalen, void *user_data)>;
- using MQConnectionCallback = std::function<void(int)>;
-
- explicit MQ(const std::string &id, bool clear_session = false);
- virtual ~MQ(void);
-
- static bool CompareTopic(const std::string &left, const std::string &right);
-
- void SetConnectionCallback(const MQConnectionCallback &cb);
- void Connect(const std::string &host, int port, const std::string &username,
- const std::string &password);
- void SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos, bool retain);
- void Disconnect(void);
- void Publish(const std::string &topic, const void *data, const size_t datalen, int qos = 0,
- bool retain = false);
- void PublishWithReply(const std::string &topic, const void *data, const size_t datalen, int qos,
- bool retain, const std::string &reply_topic, const std::string &correlation);
- void SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain);
- void *Subscribe(const std::string &topic, const SubscribeCallback &cb,
- void *user_data = nullptr, int qos = 0);
- void *Unsubscribe(void *handle);
-
- private:
- struct SubscribeData {
- SubscribeData(const std::string &topic, const SubscribeCallback &cb, void *user_data);
- std::string topic;
- SubscribeCallback cb;
- void *user_data;
- };
-
- static void ConnectCallback(mosquitto *mosq, void *obj, int rc, int flag,
- const mosquitto_property *props);
- static void DisconnectCallback(mosquitto *mosq, void *obj, int rc,
- const mosquitto_property *props);
- static void MessageCallback(mosquitto *, void *, const mosquitto_message *,
- const mosquitto_property *);
- void InvokeCallback(const mosquitto_message *msg, const mosquitto_property *props);
-
- static const std::string REPLY_SEQUENCE_NUM_KEY;
- static const std::string REPLY_IS_END_SEQUENCE_KEY;
-
- mosquitto *handle;
- const int keep_alive;
- std::vector<SubscribeData *> subscribers;
- bool subscribers_iterating;
- std::vector<SubscribeData *> new_subscribers;
- std::vector<SubscribeData *>::iterator subscriber_iterator;
- bool subscriber_iterator_updated;
- std::recursive_mutex callback_lock;
- MQConnectionCallback connect_cb;
-};
-
-} // namespace aitt
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MQProxy.h"
+
+#include "ModuleLoader.h"
+#include "MosquittoMQ.h"
+
+namespace aitt {
+
+MQProxy::MQProxy(const std::string &id, bool clear_session, bool is_custom_broker)
+ : handle(nullptr, nullptr)
+{
+ if (is_custom_broker) {
+ ModuleLoader loader;
+ handle = loader.OpenModule(ModuleLoader::TYPE_CUSTOM_MQTT);
+
+ mq = loader.LoadMqttClient(handle.get(), "test", true);
+ } else {
+ mq = std::shared_ptr<MQ>(new MosquittoMQ(id, clear_session));
+ }
+}
+
+void MQProxy::SetConnectionCallback(const MQConnectionCallback &cb)
+{
+ mq->SetConnectionCallback(cb);
+}
+
+void MQProxy::Connect(const std::string &host, int port, const std::string &username,
+ const std::string &password)
+{
+ mq->Connect(host, port, username, password);
+}
+
+void MQProxy::SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos,
+ bool retain)
+{
+ mq->SetWillInfo(topic, msg, szmsg, qos, retain);
+}
+
+void MQProxy::Disconnect(void)
+{
+ mq->Disconnect();
+}
+
+void MQProxy::Publish(const std::string &topic, const void *data, const size_t datalen, int qos,
+ bool retain)
+{
+ mq->Publish(topic, data, datalen, qos, retain);
+}
+
+void MQProxy::PublishWithReply(const std::string &topic, const void *data, const size_t datalen,
+ int qos, bool retain, const std::string &reply_topic, const std::string &correlation)
+{
+ mq->PublishWithReply(topic, data, datalen, qos, retain, reply_topic, correlation);
+}
+
+void MQProxy::SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain)
+{
+ mq->SendReply(msg, data, datalen, qos, retain);
+}
+
+void *MQProxy::Subscribe(const std::string &topic, const SubscribeCallback &cb, void *user_data,
+ int qos)
+{
+ return mq->Subscribe(topic, cb, user_data, qos);
+}
+
+void *MQProxy::Unsubscribe(void *handle)
+{
+ return mq->Unsubscribe(handle);
+}
+
+} // namespace aitt
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+
+#include "MQ.h"
+
+namespace aitt {
+
+class MQProxy : public MQ {
+ public:
+ explicit MQProxy(const std::string &id, bool clear_session, bool custom_broker);
+ virtual ~MQProxy() = default;
+
+ void SetConnectionCallback(const MQConnectionCallback &cb);
+ void Connect(const std::string &host, int port, const std::string &username,
+ const std::string &password);
+ void SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos, bool retain);
+ void Disconnect(void);
+ void Publish(const std::string &topic, const void *data, const size_t datalen, int qos = 0,
+ bool retain = false);
+ void PublishWithReply(const std::string &topic, const void *data, const size_t datalen, int qos,
+ bool retain, const std::string &reply_topic, const std::string &correlation);
+ void SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain);
+ void *Subscribe(const std::string &topic, const SubscribeCallback &cb,
+ void *user_data = nullptr, int qos = 0);
+ void *Unsubscribe(void *handle);
+
+ private:
+ std::unique_ptr<void, void (*)(const void *)> handle;
+ std::shared_ptr<MQ> mq;
+};
+
+} // namespace aitt
return "libaitt-transport-tcp.so";
if (type == TYPE_WEBRTC)
return "libaitt-transport-webrtc.so";
+ if (type == TYPE_CUSTOM_MQTT)
+ return "libaitt-st-broker.so";
return std::string("Unknown");
}
return instance;
}
+std::shared_ptr<MQ> ModuleLoader::LoadMqttClient(void *handle, const std::string &id,
+ bool clear_session)
+{
+ MQ::ModuleEntry get_instance_fn =
+ reinterpret_cast<MQ::ModuleEntry>(dlsym(handle, MQ::MODULE_ENTRY_NAME));
+ if (get_instance_fn == nullptr) {
+ ERR("dlsym: %s", dlerror());
+ throw AittException(AittException::SYSTEM_ERR);
+ }
+
+ std::shared_ptr<MQ> instance(static_cast<MQ *>(get_instance_fn(id.c_str(), clear_session)),
+ [](const MQ *instance) { delete instance; });
+ if (instance == nullptr) {
+ ERR("Failed to create a new instance");
+ throw AittException(AittException::SYSTEM_ERR);
+ }
+
+ return instance;
+}
+
} // namespace aitt
TYPE_WEBRTC,
TYPE_RTSP,
TYPE_TRANSPORT_MAX,
+ TYPE_CUSTOM_MQTT,
};
using ModuleHandle = std::unique_ptr<void, void (*)(const void *)>;
ModuleHandle OpenModule(Type type);
std::shared_ptr<AittTransport> LoadTransport(void *handle, const std::string &ip,
AittDiscovery &discovery);
+ std::shared_ptr<MQ> LoadMqttClient(void *handle, const std::string &id, bool clear_session);
private:
std::string GetModuleFilename(Type type);
--- /dev/null
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MosquittoMQ.h"
+
+#include <mqtt_protocol.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <cerrno>
+#include <stdexcept>
+#include <thread>
+
+#include "AittException.h"
+#include "AittTypes.h"
+#include "AittUtil.h"
+#include "aitt_internal.h"
+
+namespace aitt {
+
+const std::string MosquittoMQ::REPLY_SEQUENCE_NUM_KEY = "sequenceNum";
+const std::string MosquittoMQ::REPLY_IS_END_SEQUENCE_KEY = "isEndSequence";
+
+MosquittoMQ::MosquittoMQ(const std::string &id, bool clear_session)
+ : handle(nullptr),
+ keep_alive(60),
+ subscribers_iterating(false),
+ subscriber_iterator_updated(false),
+ connect_cb(nullptr)
+{
+ do {
+ int ret = mosquitto_lib_init();
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_lib_init() Fail(%s)", mosquitto_strerror(ret));
+ break;
+ }
+
+ handle = mosquitto_new(id.c_str(), clear_session, this);
+ if (handle == nullptr) {
+ ERR("mosquitto_new(%s, %d) Fail", id.c_str(), clear_session);
+ break;
+ }
+
+ ret = mosquitto_int_option(handle, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_int_option() Fail(%s)", mosquitto_strerror(ret));
+ break;
+ }
+
+ mosquitto_message_v5_callback_set(handle, MessageCallback);
+ mosquitto_connect_v5_callback_set(handle, ConnectCallback);
+ mosquitto_disconnect_v5_callback_set(handle, DisconnectCallback);
+
+ ret = mosquitto_loop_start(handle);
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_loop_start() Fail(%s)", mosquitto_strerror(ret));
+ break;
+ }
+
+ return;
+ } while (0);
+
+ mosquitto_destroy(handle);
+ mosquitto_lib_cleanup();
+ throw AittException(AittException::MQTT_ERR, std::string("MosquittoMQ Constructor Error"));
+}
+
+MosquittoMQ::~MosquittoMQ(void)
+{
+ int ret;
+ INFO("Destructor");
+
+ ret = mosquitto_loop_stop(handle, true);
+ if (ret != MOSQ_ERR_SUCCESS)
+ ERR("mosquitto_loop_stop() Fail(%s)", mosquitto_strerror(ret));
+
+ callback_lock.lock();
+ connect_cb = nullptr;
+ subscribers.clear();
+ callback_lock.unlock();
+
+ mosquitto_destroy(handle);
+
+ ret = mosquitto_lib_cleanup();
+ if (ret != MOSQ_ERR_SUCCESS)
+ ERR("mosquitto_lib_cleanup() Fail(%s)", mosquitto_strerror(ret));
+}
+
+void MosquittoMQ::SetConnectionCallback(const MQConnectionCallback &cb)
+{
+ std::lock_guard<std::recursive_mutex> lock_from_here(callback_lock);
+ connect_cb = cb;
+}
+
+void MosquittoMQ::ConnectCallback(struct mosquitto *mosq, void *obj, int rc, int flag,
+ const mosquitto_property *props)
+{
+ RET_IF(obj == nullptr);
+ MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
+
+ INFO("Connected : rc(%d), flag(%d)", rc, flag);
+
+ std::lock_guard<std::recursive_mutex> lock_from_here(mq->callback_lock);
+ if (mq->connect_cb)
+ mq->connect_cb(AITT_CONNECTED);
+}
+
+void MosquittoMQ::DisconnectCallback(struct mosquitto *mosq, void *obj, int rc,
+ const mosquitto_property *props)
+{
+ RET_IF(obj == nullptr);
+ MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
+
+ INFO("Disconnected : rc(%d)", rc);
+
+ std::lock_guard<std::recursive_mutex> lock_from_here(mq->callback_lock);
+ if (mq->connect_cb)
+ mq->connect_cb(AITT_DISCONNECTED);
+}
+
+void MosquittoMQ::Connect(const std::string &host, int port, const std::string &username,
+ const std::string &password)
+{
+ int ret;
+
+ if (username.empty() == false) {
+ ret = mosquitto_username_pw_set(handle, username.c_str(), password.c_str());
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_username_pw_set(%s, %s) Fail(%s)", username.c_str(), password.c_str(),
+ mosquitto_strerror(ret));
+ throw AittException(AittException::MQTT_ERR);
+ }
+ }
+
+ ret = mosquitto_connect(handle, host.c_str(), port, keep_alive);
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_connect(%s, %d) Fail(%s)", host.c_str(), port, mosquitto_strerror(ret));
+ throw AittException(AittException::MQTT_ERR);
+ }
+}
+
+void MosquittoMQ::SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos,
+ bool retain)
+{
+ int ret = mosquitto_will_set(handle, topic.c_str(), szmsg, msg, qos, retain);
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_will_set(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
+ throw AittException(AittException::MQTT_ERR);
+ }
+}
+
+void MosquittoMQ::Disconnect(void)
+{
+ int ret = mosquitto_disconnect(handle);
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_disconnect() Fail(%s)", mosquitto_strerror(ret));
+ throw AittException(AittException::MQTT_ERR);
+ }
+
+ mosquitto_will_clear(handle);
+}
+
+void MosquittoMQ::MessageCallback(mosquitto *handle, void *obj, const mosquitto_message *msg,
+ const mosquitto_property *props)
+{
+ RET_IF(obj == nullptr);
+ MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
+
+ std::lock_guard<std::recursive_mutex> auto_lock(mq->callback_lock);
+ mq->subscribers_iterating = true;
+ mq->subscriber_iterator = mq->subscribers.begin();
+ while (mq->subscriber_iterator != mq->subscribers.end()) {
+ auto subscribe_data = *(mq->subscriber_iterator);
+ if (nullptr == subscribe_data)
+ ERR("end() is not valid because elements were added.");
+
+ bool result = AittUtil::CompareTopic(subscribe_data->topic.c_str(), msg->topic);
+ if (result)
+ mq->InvokeCallback(msg, props);
+
+ if (!mq->subscriber_iterator_updated)
+ mq->subscriber_iterator++;
+ else
+ mq->subscriber_iterator_updated = false;
+ }
+ mq->subscribers_iterating = false;
+ mq->subscribers.insert(mq->subscribers.end(), mq->new_subscribers.begin(),
+ mq->new_subscribers.end());
+ mq->new_subscribers.clear();
+}
+
+void MosquittoMQ::InvokeCallback(const mosquitto_message *msg, const mosquitto_property *props)
+{
+ MSG mq_msg;
+ mq_msg.SetTopic(msg->topic);
+ if (props) {
+ const mosquitto_property *prop;
+
+ char *response_topic = nullptr;
+ prop = mosquitto_property_read_string(props, MQTT_PROP_RESPONSE_TOPIC, &response_topic,
+ false);
+ if (prop) {
+ mq_msg.SetResponseTopic(response_topic);
+ free(response_topic);
+ }
+
+ void *correlation = nullptr;
+ uint16_t correlation_size = 0;
+ prop = mosquitto_property_read_binary(props, MQTT_PROP_CORRELATION_DATA, &correlation,
+ &correlation_size, false);
+ if (prop == nullptr || correlation == nullptr)
+ ERR("No Correlation Data");
+
+ mq_msg.SetCorrelation(std::string((char *)correlation, correlation_size));
+ if (correlation)
+ free(correlation);
+
+ char *name = nullptr;
+ char *value = nullptr;
+ prop = mosquitto_property_read_string_pair(props, MQTT_PROP_USER_PROPERTY, &name, &value,
+ false);
+ while (prop) {
+ if (REPLY_SEQUENCE_NUM_KEY == name) {
+ mq_msg.SetSequence(std::stoi(value));
+ } else if (REPLY_IS_END_SEQUENCE_KEY == name) {
+ mq_msg.SetEndSequence(std::stoi(value) == 1);
+ } else {
+ ERR("Unsupported property(%s, %s)", name, value);
+ }
+ free(name);
+ free(value);
+
+ prop = mosquitto_property_read_string_pair(prop, MQTT_PROP_USER_PROPERTY, &name, &value,
+ true);
+ }
+ }
+
+ SubscribeData *cb_info = *subscriber_iterator;
+ cb_info->cb(&mq_msg, msg->topic, msg->payload, msg->payloadlen, cb_info->user_data);
+}
+
+void MosquittoMQ::Publish(const std::string &topic, const void *data, const size_t datalen, int qos,
+ bool retain)
+{
+ int mid = -1;
+ int ret = mosquitto_publish(handle, &mid, topic.c_str(), datalen, data, qos, retain);
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_publish(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
+ throw AittException(AittException::MQTT_ERR);
+ }
+}
+
+void MosquittoMQ::PublishWithReply(const std::string &topic, const void *data, const size_t datalen,
+ int qos, bool retain, const std::string &reply_topic, const std::string &correlation)
+{
+ int ret;
+ int mid = -1;
+ mosquitto_property *props = nullptr;
+
+ ret = mosquitto_property_add_string(&props, MQTT_PROP_RESPONSE_TOPIC, reply_topic.c_str());
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_property_add_string(response-topic) Fail(%s)", mosquitto_strerror(ret));
+ throw AittException(AittException::MQTT_ERR);
+ }
+
+ ret = mosquitto_property_add_binary(&props, MQTT_PROP_CORRELATION_DATA, correlation.c_str(),
+ correlation.size());
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_property_add_binary(correlation) Fail(%s)", mosquitto_strerror(ret));
+ throw AittException(AittException::MQTT_ERR);
+ }
+ ret = mosquitto_publish_v5(handle, &mid, topic.c_str(), datalen, data, qos, retain, props);
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_publish_v5(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
+ throw AittException(AittException::MQTT_ERR);
+ }
+}
+
+void MosquittoMQ::SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain)
+{
+ RET_IF(msg == nullptr);
+
+ int ret;
+ int mId = -1;
+ mosquitto_property *props = nullptr;
+
+ ret = mosquitto_property_add_binary(&props, MQTT_PROP_CORRELATION_DATA,
+ msg->GetCorrelation().c_str(), msg->GetCorrelation().size());
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_property_add_binary(correlation) Fail(%s)", mosquitto_strerror(ret));
+ throw AittException(AittException::MQTT_ERR);
+ }
+
+ ret = mosquitto_property_add_string_pair(&props, MQTT_PROP_USER_PROPERTY,
+ REPLY_SEQUENCE_NUM_KEY.c_str(), std::to_string(msg->GetSequence()).c_str());
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_property_add_string_pair(squenceNum) Fail(%s)", mosquitto_strerror(ret));
+ throw AittException(AittException::MQTT_ERR);
+ }
+
+ ret = mosquitto_property_add_string_pair(&props, MQTT_PROP_USER_PROPERTY,
+ REPLY_IS_END_SEQUENCE_KEY.c_str(), std::to_string(msg->IsEndSequence()).c_str());
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_property_add_string_pair(IsEndSequence) Fail(%s)", mosquitto_strerror(ret));
+ throw AittException(AittException::MQTT_ERR);
+ }
+
+ ret = mosquitto_publish_v5(handle, &mId, msg->GetResponseTopic().c_str(), datalen, data, qos,
+ retain, props);
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_publish_v5(%s) Fail(%s)", msg->GetResponseTopic().c_str(),
+ mosquitto_strerror(ret));
+ throw AittException(AittException::MQTT_ERR);
+ }
+}
+
+void *MosquittoMQ::Subscribe(const std::string &topic, const SubscribeCallback &cb, void *user_data,
+ int qos)
+{
+ int mid = -1;
+ int ret = mosquitto_subscribe(handle, &mid, topic.c_str(), qos);
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_subscribe(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
+ throw AittException(AittException::MQTT_ERR);
+ }
+
+ std::lock_guard<std::recursive_mutex> lock_from_here(callback_lock);
+ SubscribeData *data = new SubscribeData(topic, cb, user_data);
+ if (subscribers_iterating)
+ new_subscribers.push_back(data);
+ else
+ subscribers.push_back(data);
+
+ return static_cast<void *>(data);
+}
+
+void *MosquittoMQ::Unsubscribe(void *sub_handle)
+{
+ std::lock_guard<std::recursive_mutex> auto_lock(callback_lock);
+ auto it = std::find(subscribers.begin(), subscribers.end(),
+ static_cast<SubscribeData *>(sub_handle));
+
+ if (it == subscribers.end()) {
+ ERR("No Subscription(%p)", sub_handle);
+ throw AittException(AittException::NO_DATA_ERR);
+ }
+
+ SubscribeData *data = static_cast<SubscribeData *>(sub_handle);
+
+ if (subscriber_iterator == it) {
+ subscriber_iterator = subscribers.erase(it);
+ subscriber_iterator_updated = true;
+ } else {
+ subscribers.erase(it);
+ }
+
+ void *user_data = data->user_data;
+ std::string topic = data->topic;
+ delete data;
+
+ int mid = -1;
+ int ret = mosquitto_unsubscribe(handle, &mid, topic.c_str());
+ if (ret != MOSQ_ERR_SUCCESS) {
+ ERR("mosquitto_unsubscribe(%s) Fail(%d)", topic.c_str(), ret);
+ throw AittException(AittException::MQTT_ERR);
+ }
+
+ return user_data;
+}
+
+MosquittoMQ::SubscribeData::SubscribeData(const std::string &in_topic,
+ const SubscribeCallback &in_cb, void *in_user_data)
+ : topic(in_topic), cb(in_cb), user_data(in_user_data)
+{
+}
+
+} // namespace aitt
--- /dev/null
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <mosquitto.h>
+
+#include <functional>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "MQ.h"
+#include "MSG.h"
+
+#define MQTT_LOCALHOST "127.0.0.1"
+#define MQTT_PORT 1883
+
+namespace aitt {
+
+class MosquittoMQ : public MQ {
+ public:
+ explicit MosquittoMQ(const std::string &id, bool clear_session = false);
+ virtual ~MosquittoMQ(void);
+
+ void SetConnectionCallback(const MQConnectionCallback &cb);
+ void Connect(const std::string &host, int port, const std::string &username,
+ const std::string &password);
+ void SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos, bool retain);
+ void Disconnect(void);
+ void Publish(const std::string &topic, const void *data, const size_t datalen, int qos = 0,
+ bool retain = false);
+ void PublishWithReply(const std::string &topic, const void *data, const size_t datalen, int qos,
+ bool retain, const std::string &reply_topic, const std::string &correlation);
+ void SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain);
+ void *Subscribe(const std::string &topic, const SubscribeCallback &cb,
+ void *user_data = nullptr, int qos = 0);
+ void *Unsubscribe(void *handle);
+
+ private:
+ struct SubscribeData {
+ SubscribeData(const std::string &topic, const SubscribeCallback &cb, void *user_data);
+ std::string topic;
+ SubscribeCallback cb;
+ void *user_data;
+ };
+
+ static void ConnectCallback(mosquitto *mosq, void *obj, int rc, int flag,
+ const mosquitto_property *props);
+ static void DisconnectCallback(mosquitto *mosq, void *obj, int rc,
+ const mosquitto_property *props);
+ static void MessageCallback(mosquitto *, void *, const mosquitto_message *,
+ const mosquitto_property *);
+ void InvokeCallback(const mosquitto_message *msg, const mosquitto_property *props);
+
+ static const std::string REPLY_SEQUENCE_NUM_KEY;
+ static const std::string REPLY_IS_END_SEQUENCE_KEY;
+
+ mosquitto *handle;
+ const int keep_alive;
+ std::vector<SubscribeData *> subscribers;
+ bool subscribers_iterating;
+ std::vector<SubscribeData *> new_subscribers;
+ std::vector<SubscribeData *>::iterator subscriber_iterator;
+ bool subscriber_iterator_updated;
+ std::recursive_mutex callback_lock;
+ MQConnectionCallback connect_cb;
+};
+
+} // namespace aitt
--- /dev/null
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <MSG.h>
+
+#include <functional>
+#include <string>
+
+#define AITT_MQ_NEW aitt_mq_new
+#define TO_STR(s) #s
+#define DEFINE_TO_STR(x) TO_STR(x)
+
+namespace aitt {
+
+class MQ {
+ public:
+ typedef void *(*ModuleEntry)(const char *id, bool clear_session);
+
+ using SubscribeCallback = std::function<void(MSG *msg, const std::string &topic,
+ const void *data, const size_t datalen, void *user_data)>;
+ using MQConnectionCallback = std::function<void(int)>;
+
+ static constexpr const char *const MODULE_ENTRY_NAME = DEFINE_TO_STR(AITT_MQ_NEW);
+
+ MQ() = default;
+ virtual ~MQ() = default;
+
+ virtual void SetConnectionCallback(const MQConnectionCallback &cb) = 0;
+ virtual void Connect(const std::string &host, int port, const std::string &username,
+ const std::string &password) = 0;
+ virtual void SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos,
+ bool retain) = 0;
+ virtual void Disconnect(void) = 0;
+ virtual void Publish(const std::string &topic, const void *data, const size_t datalen,
+ int qos = 0, bool retain = false) = 0;
+ virtual void PublishWithReply(const std::string &topic, const void *data, const size_t datalen,
+ int qos, bool retain, const std::string &reply_topic, const std::string &correlation) = 0;
+ virtual void SendReply(MSG *msg, const void *data, const size_t datalen, int qos,
+ bool retain) = 0;
+ virtual void *Subscribe(const std::string &topic, const SubscribeCallback &cb,
+ void *user_data = nullptr, int qos = 0) = 0;
+ virtual void *Unsubscribe(void *handle) = 0;
+};
+
+} // namespace aitt
+
+#undef TO_STR
+#undef DEFINE_TO_STR
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#include "MqttServer.h"
+#include "MQProxy.h"
#include "aitt_internal.h"
#define MQTT_HANDLER_MSG_QOS 1
#define MQTT_HANDLER_MGMT_QOS 2
-MqttServer::MqttServer(const Config &config) : mq(config.GetLocalId(), true)
+MqttServer::MqttServer(const Config &config)
+ : mq(new aitt::MQProxy(config.GetLocalId(), true, false))
{
broker_ip_ = config.GetBrokerIp();
broker_port_ = config.GetBrokerPort();
DBG("ID[%s] BROKER IP[%s] BROKER PORT [%d] ROOM[%s] %s", id_.c_str(), broker_ip_.c_str(),
broker_port_, room_id_.c_str(), is_publisher_ ? "Publisher" : "Subscriber");
- mq.SetConnectionCallback(std::bind(&MqttServer::ConnectCallBack, this, std::placeholders::_1));
+ mq->SetConnectionCallback(std::bind(&MqttServer::ConnectCallBack, this, std::placeholders::_1));
}
MqttServer::~MqttServer()
// Notify Who is source?
std::string source_topic = room_id_ + std::string("/source");
if (is_publisher_) {
- mq.Publish(source_topic, id_.c_str(), id_.size(), AITT_QOS_EXACTLY_ONCE, true);
+ mq->Publish(source_topic, id_.c_str(), id_.size(), AITT_QOS_EXACTLY_ONCE, true);
SetConnectionState(ConnectionState::Registered);
} else {
- mq.Subscribe(source_topic,
+ mq->Subscribe(source_topic,
std::bind(&MqttServer::HandleSourceTopic, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5),
int MqttServer::Connect(void)
{
std::string will_message = std::string("ROOM_PEER_LEFT ") + id_;
- mq.SetWillInfo(room_id_, will_message.c_str(), will_message.size(), AITT_QOS_EXACTLY_ONCE,
+ mq->SetWillInfo(room_id_, will_message.c_str(), will_message.size(), AITT_QOS_EXACTLY_ONCE,
false);
SetConnectionState(ConnectionState::Connecting);
- mq.Connect(broker_ip_, broker_port_, std::string(), std::string());
+ mq->Connect(broker_ip_, broker_port_, std::string(), std::string());
return 0;
}
if (is_publisher_) {
INFO("remove retained");
std::string source_topic = room_id_ + std::string("/source");
- mq.Publish(source_topic, nullptr, 0, AITT_QOS_AT_LEAST_ONCE, true);
+ mq->Publish(source_topic, nullptr, 0, AITT_QOS_AT_LEAST_ONCE, true);
}
std::string left_message = std::string("ROOM_PEER_LEFT ") + id_;
- mq.Publish(room_id_, left_message.c_str(), left_message.size(), AITT_QOS_AT_LEAST_ONCE, false);
+ mq->Publish(room_id_, left_message.c_str(), left_message.size(), AITT_QOS_AT_LEAST_ONCE, false);
- mq.Disconnect();
+ mq->Disconnect();
room_id_ = std::string("");
std::string receiver_topic = room_id_ + std::string("/") + peer_id;
std::string server_formatted_msg = "ROOM_PEER_MSG " + id_ + " " + msg;
- mq.Publish(receiver_topic, server_formatted_msg.c_str(), server_formatted_msg.size(),
+ mq->Publish(receiver_topic, server_formatted_msg.c_str(), server_formatted_msg.size(),
AITT_QOS_AT_LEAST_ONCE);
return 0;
}
// Subscribe PEER_JOIN PEER_LEFT
- mq.Subscribe(room_id_,
+ mq->Subscribe(room_id_,
std::bind(&MqttServer::HandleRoomTopic, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5),
// Subscribe PEER_MSG
std::string receiving_topic = room_id + std::string("/") + id_;
- mq.Subscribe(receiving_topic,
+ mq->Subscribe(receiving_topic,
std::bind(&MqttServer::HandleMessageTopic, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5),
if (!is_publisher_) {
std::string join_message = std::string("ROOM_PEER_JOINED ") + id_;
- mq.Publish(room_id_, join_message.c_str(), join_message.size(), AITT_QOS_EXACTLY_ONCE);
+ mq->Publish(room_id_, join_message.c_str(), join_message.size(), AITT_QOS_EXACTLY_ONCE);
}
}
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#pragma once
-#include <MQ.h>
+#include <functional>
+#include <memory>
+#include <string>
#include "Config.h"
#include "IfaceServer.h"
+#include "MQ.h"
class MqttServer : public IfaceServer {
public:
std::string room_id_;
std::string source_id_;
bool is_publisher_;
- aitt::MQ mq;
+ std::unique_ptr<aitt::MQ> mq;
ConnectionState connection_state_;
std::function<void(ConnectionState)> connection_state_changed_cb_;
#include <memory>
#include <stdexcept>
+#include "MQProxy.h"
#include "aitt_internal.h"
#define WEBRTC_ROOM_ID_PREFIX std::string(AITT_MANAGED_TOPIC_PREFIX "webrtc/room/Room.webrtc")
bool custom_broker)
: public_api(parent),
id_(id),
- mq(id, clear_session),
+ mq(new MQProxy(id, clear_session, custom_broker)),
discovery(id, custom_broker),
reply_id(0),
modules{0}
void AITT::Impl::SetWillInfo(const std::string &topic, const void *data, const size_t datalen,
AittQoS qos, bool retain)
{
- mq.SetWillInfo(topic, data, datalen, qos, retain);
+ mq->SetWillInfo(topic, data, datalen, qos, retain);
}
void AITT::Impl::SetConnectionCallback(ConnectionCallback cb, void *user_data)
{
if (cb)
- mq.SetConnectionCallback(
+ mq->SetConnectionCallback(
std::bind(&Impl::ConnectionCB, this, cb, user_data, std::placeholders::_1));
else
- mq.SetConnectionCallback(nullptr);
+ mq->SetConnectionCallback(nullptr);
}
void AITT::Impl::ConnectionCB(ConnectionCallback cb, void *user_data, int status)
const std::string &password)
{
discovery.Start(host, port, username, password);
- mq.Connect(host, port, username, password);
+ mq->Connect(host, port, username, password);
mqtt_broker_ip_ = host;
mqtt_broker_port_ = port;
mqtt_broker_ip_.clear();
mqtt_broker_port_ = -1;
- mq.Disconnect();
+ mq->Disconnect();
discovery.Stop();
}
for (auto subscribe_info : subscribed_list) {
switch (subscribe_info->first) {
case AITT_TYPE_MQTT:
- mq.Unsubscribe(subscribe_info->second);
+ mq->Unsubscribe(subscribe_info->second);
break;
case AITT_TYPE_TCP:
GetTransport(ModuleLoader::TYPE_TCP)->Unsubscribe(subscribe_info->second);
AittProtocol protocols, AittQoS qos, bool retain)
{
if ((protocols & AITT_TYPE_MQTT) == AITT_TYPE_MQTT)
- mq.Publish(topic, data, datalen, qos, retain);
+ mq->Publish(topic, data, datalen, qos, retain);
if ((protocols & AITT_TYPE_TCP) == AITT_TYPE_TCP)
GetTransport(ModuleLoader::TYPE_TCP)->Publish(topic, data, datalen, qos, retain);
AittSubscribeID AITT::Impl::SubscribeMQ(SubscribeInfo *handle, MainLoopHandler *loop_handle,
const std::string &topic, const SubscribeCallback &cb, void *user_data, AittQoS qos)
{
- return mq.Subscribe(
+ return mq->Subscribe(
topic,
[this, handle, loop_handle, cb](MSG *msg, const std::string &topic, const void *data,
const size_t datalen, void *mq_user_data) {
SubscribeInfo *found_info = *it;
switch (found_info->first) {
case AITT_TYPE_MQTT:
- user_data = mq.Unsubscribe(found_info->second);
+ user_data = mq->Unsubscribe(found_info->second);
break;
case AITT_TYPE_TCP: {
auto tcpModule = GetTransport(ModuleLoader::TYPE_TCP);
},
user_data, protocol, qos);
- mq.PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
+ mq->PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
return 0;
}
subscribed_list.push_back(info);
}
- mq.PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
+ mq->PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
if (timeout_ms)
HandleTimeout(timeout_ms, timeout_id, sync_loop, is_timeout);
msg->IncreaseSequence();
msg->SetEndSequence(end);
- mq.SendReply(msg, data, datalen, AITT_QOS_AT_MOST_ONCE, false);
+ mq->SendReply(msg, data, datalen, AITT_QOS_AT_MOST_ONCE, false);
}
void *AITT::Impl::SubscribeTCP(SubscribeInfo *handle, const std::string &topic,
std::string id_;
std::string mqtt_broker_ip_;
int mqtt_broker_port_;
- MQ mq;
+ std::unique_ptr<MQ> mq;
AittDiscovery discovery;
unsigned short reply_id;
ModuleObj *modules[ModuleLoader::TYPE_TRANSPORT_MAX];
LINK_DIRECTORIES(${UT_NEEDS_LIBRARY_DIRS})
###########################################################################
-SET(AITT_UT_SRC AITT_test.cc RequestResponse_test.cc MainLoopHandler_test.cc aitt_c_test.cc AITT_TCP_test.cc MQ_test.cc)
+SET(AITT_UT_SRC AITT_test.cc RequestResponse_test.cc MainLoopHandler_test.cc aitt_c_test.cc AITT_TCP_test.cc MosquittoMQ_test.cc)
ADD_EXECUTABLE(${AITT_UT} ${AITT_UT_SRC})
TARGET_LINK_LIBRARIES(${AITT_UT} Threads::Threads ${UT_NEEDS_LIBRARIES} ${PROJECT_NAME})
###########################################################################
AUX_SOURCE_DIRECTORY(../mock MOCK_SRC)
-ADD_EXECUTABLE(${AITT_UT}_mq MQ_mocktest.cc ${MOCK_SRC})
-TARGET_LINK_LIBRARIES(${AITT_UT}_mq ${UT_NEEDS_LIBRARIES} Threads::Threads ${AITT_NEEDS_LIBRARIES} ${AITT_COMMON})
+ADD_EXECUTABLE(${AITT_UT}_mq MosquittoMQ_mocktest.cc ${MOCK_SRC})
+TARGET_LINK_LIBRARIES(${AITT_UT}_mq ${UT_NEEDS_LIBRARIES} Threads::Threads ${AITT_COMMON})
TARGET_INCLUDE_DIRECTORIES(${AITT_UT}_mq PRIVATE ../mock)
INSTALL(TARGETS ${AITT_UT}_mq DESTINATION ${AITT_TEST_BINDIR})
+++ /dev/null
-/*
- * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
-#include <condition_variable>
-#include <mutex>
-
-#include "AittTypes.h"
-#include "MQ.h"
-#include "MQMockTest.h"
-#include "MQTTMock.h"
-
-using ::testing::Return;
-
-#define TEST_TOPIC "Test/Topic"
-#define TEST_PAYLOAD "The last will is ..."
-#define TEST_CLIENT_ID "testClient"
-#define TEST_PORT 8123
-#define TEST_HOST "localhost"
-#define TEST_HANDLE reinterpret_cast<mosquitto *>(0xbeefbeef)
-
-TEST_F(MQMockTest, Negative_Create_lib_init_Anytime)
-{
- EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_NOT_SUPPORTED));
- EXPECT_CALL(GetMock(), mosquitto_destroy(nullptr)).WillOnce(Return());
- EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-
- try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
- FAIL() << "lib_init must be failed";
- } catch (std::exception &e) {
- ASSERT_STREQ(e.what(), "MQTT failure : MQ Constructor Error");
- }
-}
-
-TEST_F(MQMockTest, Negative_Create_new_Anytime)
-{
- EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
- .WillOnce(Return(nullptr));
- EXPECT_CALL(GetMock(), mosquitto_destroy(nullptr)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-
- try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
- FAIL() << "lib_init must be failed";
- } catch (std::exception &e) {
- ASSERT_STREQ(e.what(), "MQTT failure : MQ Constructor Error");
- }
-}
-
-TEST_F(MQMockTest, Positive_Publish_Anytime)
-{
- EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
- .WillOnce(Return(TEST_HANDLE));
- EXPECT_CALL(GetMock(),
- mosquitto_int_option(TEST_HANDLE, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5))
- .WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_loop_start(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
- .WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_publish(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC),
- sizeof(TEST_PAYLOAD), TEST_PAYLOAD, AITT_QOS_AT_MOST_ONCE, false))
- .WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-
- try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
- mq.Connect(TEST_HOST, TEST_PORT, "", "");
- mq.Publish(TEST_TOPIC, TEST_PAYLOAD, sizeof(TEST_PAYLOAD));
- } catch (std::exception &e) {
- FAIL() << "Unexpected exception: " << e.what();
- }
-}
-
-TEST_F(MQMockTest, Positive_Subscribe_Anytime)
-{
- EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
- .WillOnce(Return(TEST_HANDLE));
- EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_loop_start(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
- .WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_subscribe(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC),
- AITT_QOS_AT_MOST_ONCE))
- .WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-
- try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
- mq.Connect(TEST_HOST, TEST_PORT, "", "");
- mq.Subscribe(
- TEST_TOPIC,
- [](aitt::MSG *info, const std::string &topic, const void *msg, const int szmsg,
- const void *cbdata) -> void {},
- nullptr, AITT_QOS_AT_MOST_ONCE);
- } catch (std::exception &e) {
- FAIL() << "Unexpected exception: " << e.what();
- }
-}
-
-TEST_F(MQMockTest, Positive_Unsubscribe_Anytime)
-{
- EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
- .WillOnce(Return(TEST_HANDLE));
- EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_loop_start(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
- .WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(),
- mosquitto_subscribe(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC), 0))
- .WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(),
- mosquitto_unsubscribe(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC)))
- .WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-
- try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
- mq.Connect(TEST_HOST, TEST_PORT, "", "");
- void *handle = mq.Subscribe(
- TEST_TOPIC,
- [](aitt::MSG *info, const std::string &topic, const void *msg, const int szmsg,
- const void *cbdata) -> void {},
- nullptr, AITT_QOS_AT_MOST_ONCE);
- mq.Unsubscribe(handle);
- } catch (std::exception &e) {
- FAIL() << "Unexpected exception: " << e.what();
- }
-}
-
-TEST_F(MQMockTest, Positive_Create_Anytime)
-{
- EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
- .WillOnce(Return(TEST_HANDLE));
- EXPECT_CALL(GetMock(),
- mosquitto_int_option(TEST_HANDLE, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5))
- .Times(1);
- EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-
- try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
- } catch (std::exception &e) {
- FAIL() << "Unexpected exception occurred";
- }
-}
-
-TEST_F(MQMockTest, Negative_Connect_will_set_Anytime)
-{
- EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
- .WillOnce(Return(TEST_HANDLE));
- EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_will_set(TEST_HANDLE, testing::StrEq("lastWill"),
- sizeof(TEST_PAYLOAD), TEST_PAYLOAD, AITT_QOS_AT_MOST_ONCE, true))
- .WillOnce(Return(MOSQ_ERR_NOMEM));
- EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
- try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
- mq.SetWillInfo("lastWill", TEST_PAYLOAD, sizeof(TEST_PAYLOAD), AITT_QOS_AT_MOST_ONCE, true);
- mq.Connect(TEST_HOST, TEST_PORT, "", "");
- FAIL() << "Connect() must be failed";
- } catch (std::exception &e) {
- ASSERT_STREQ(e.what(), "MQTT failure");
- }
-}
-
-TEST_F(MQMockTest, Positive_Connect_Anytime)
-{
- EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
- .WillOnce(Return(TEST_HANDLE));
- EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
- .WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
- try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
- mq.Connect(TEST_HOST, TEST_PORT, "", "");
- } catch (std::exception &e) {
- FAIL() << "Unepxected exception: " << e.what();
- }
-}
-
-TEST_F(MQMockTest, Positive_Connect_User_Anytime)
-{
- std::string username = "test";
- std::string password = "test";
- EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
- .WillOnce(Return(TEST_HANDLE));
- EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
- EXPECT_CALL(GetMock(),
- mosquitto_username_pw_set(TEST_HANDLE, username.c_str(), password.c_str()))
- .Times(1);
- EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
- .WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
- try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
- mq.Connect(TEST_HOST, TEST_PORT, username, password);
- } catch (std::exception &e) {
- FAIL() << "Unepxected exception: " << e.what();
- }
-}
-
-TEST_F(MQMockTest, Positive_Disconnect_Anytime)
-{
- EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
- .WillOnce(Return(TEST_HANDLE));
- EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_disconnect(testing::_)).WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_will_clear(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
- EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
- EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
- try {
- aitt::MQ mq(TEST_CLIENT_ID, true);
- mq.Disconnect();
- } catch (std::exception &e) {
- FAIL() << "Unexpected exception: " << e.what();
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "MQ.h"
-
-#include <gtest/gtest.h>
-
-#include "aitt_internal.h"
-#include "aitt_tests.h"
-
-using MQ = aitt::MQ;
-
-class MQTest : public testing::Test, public AittTests {
- protected:
- void SetUp() override { Init(); }
- void TearDown() override { Deinit(); }
-};
-
-TEST_F(MQTest, Positve_Subscribe_in_Subscribe_Anytime)
-{
- try {
- MQ mq("MQ_TEST_ID");
- mq.Connect(LOCAL_IP, 1883, "", "");
- mq.Subscribe(
- "MQ_TEST_TOPIC1",
- [&](aitt::MSG *handle, const std::string &topic, const void *data,
- const size_t datalen, void *user_data) {
- DBG("Subscribe invoked: %s %zu", static_cast<const char *>(data), datalen);
-
- mq.Subscribe(
- "topic1InCallback",
- [](aitt::MSG *handle, const std::string &topic, const void *msg,
- const size_t szmsg, void *cbdata) {},
- user_data);
-
- mq.Subscribe(
- "topic2InCallback",
- [](aitt::MSG *handle, const std::string &topic, const void *msg,
- const size_t szmsg, void *cbdata) {},
- user_data);
- g_timeout_add(
- 100,
- [](gpointer cbdata) -> gboolean {
- MQTest *test = static_cast<MQTest *>(cbdata);
- test->ToggleReady();
- return G_SOURCE_REMOVE;
- },
- user_data);
- },
- static_cast<void *>(this));
-
- DBG("Publish message to %s (%s)", "MQ_TEST_TOPIC1", TEST_MSG);
- mq.Publish("MQ_TEST_TOPIC1", TEST_MSG, sizeof(TEST_MSG));
-
- g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
-
- IterateEventLoop();
-
- ASSERT_TRUE(ready);
- } catch (std::exception &e) {
- FAIL() << "Unexpected exception: " << e.what();
- }
-}
auto module = loader.LoadTransport(handle.get(), LOCAL_IP, discovery);
ASSERT_NE(module, nullptr);
}
+
+TEST_F(ModuleLoaderTest, LoadMqttClient_P_Anytime)
+{
+ ModuleLoader::ModuleHandle handle = loader.OpenModule(ModuleLoader::TYPE_CUSTOM_MQTT);
+ if (handle) {
+ EXPECT_NO_THROW({
+ auto module = loader.LoadMqttClient(handle.get(), "test", false);
+ ASSERT_NE(module, nullptr);
+ });
+ }
+}
+
+TEST_F(ModuleLoaderTest, LoadMqttClient_N_Anytime)
+{
+ EXPECT_THROW(
+ {
+ loader.LoadMqttClient(nullptr, "test", false);
+ FAIL() << "Should not be called";
+ },
+ aitt::AittException);
+}
--- /dev/null
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <condition_variable>
+#include <mutex>
+
+#include "AittTypes.h"
+#include "MQMockTest.h"
+#include "MQTTMock.h"
+#include "MosquittoMQ.h"
+
+using ::testing::Return;
+
+#define TEST_TOPIC "Test/Topic"
+#define TEST_PAYLOAD "The last will is ..."
+#define TEST_CLIENT_ID "testClient"
+#define TEST_PORT 8123
+#define TEST_HOST "localhost"
+#define TEST_HANDLE reinterpret_cast<mosquitto *>(0xbeefbeef)
+
+TEST_F(MQMockTest, Negative_Create_lib_init_Anytime)
+{
+ EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_NOT_SUPPORTED));
+ EXPECT_CALL(GetMock(), mosquitto_destroy(nullptr)).WillOnce(Return());
+ EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+
+ try {
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+ FAIL() << "lib_init must be failed";
+ } catch (std::exception &e) {
+ ASSERT_STREQ(e.what(), "MQTT failure : MosquittoMQ Constructor Error");
+ }
+}
+
+TEST_F(MQMockTest, Negative_Create_new_Anytime)
+{
+ EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+ .WillOnce(Return(nullptr));
+ EXPECT_CALL(GetMock(), mosquitto_destroy(nullptr)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+
+ try {
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+ FAIL() << "lib_init must be failed";
+ } catch (std::exception &e) {
+ ASSERT_STREQ(e.what(), "MQTT failure : MosquittoMQ Constructor Error");
+ }
+}
+
+TEST_F(MQMockTest, Positive_Publish_Anytime)
+{
+ EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+ .WillOnce(Return(TEST_HANDLE));
+ EXPECT_CALL(GetMock(),
+ mosquitto_int_option(TEST_HANDLE, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5))
+ .WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_loop_start(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
+ .WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_publish(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC),
+ sizeof(TEST_PAYLOAD), TEST_PAYLOAD, AITT_QOS_AT_MOST_ONCE, false))
+ .WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+
+ try {
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+ mq.Connect(TEST_HOST, TEST_PORT, "", "");
+ mq.Publish(TEST_TOPIC, TEST_PAYLOAD, sizeof(TEST_PAYLOAD));
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception: " << e.what();
+ }
+}
+
+TEST_F(MQMockTest, Positive_Subscribe_Anytime)
+{
+ EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+ .WillOnce(Return(TEST_HANDLE));
+ EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_loop_start(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
+ .WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_subscribe(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC),
+ AITT_QOS_AT_MOST_ONCE))
+ .WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+
+ try {
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+ mq.Connect(TEST_HOST, TEST_PORT, "", "");
+ mq.Subscribe(
+ TEST_TOPIC,
+ [](aitt::MSG *info, const std::string &topic, const void *msg, const int szmsg,
+ const void *cbdata) -> void {},
+ nullptr, AITT_QOS_AT_MOST_ONCE);
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception: " << e.what();
+ }
+}
+
+TEST_F(MQMockTest, Positive_Unsubscribe_Anytime)
+{
+ EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+ .WillOnce(Return(TEST_HANDLE));
+ EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_loop_start(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
+ .WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(),
+ mosquitto_subscribe(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC), 0))
+ .WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(),
+ mosquitto_unsubscribe(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC)))
+ .WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+
+ try {
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+ mq.Connect(TEST_HOST, TEST_PORT, "", "");
+ void *handle = mq.Subscribe(
+ TEST_TOPIC,
+ [](aitt::MSG *info, const std::string &topic, const void *msg, const int szmsg,
+ const void *cbdata) -> void {},
+ nullptr, AITT_QOS_AT_MOST_ONCE);
+ mq.Unsubscribe(handle);
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception: " << e.what();
+ }
+}
+
+TEST_F(MQMockTest, Positive_Create_Anytime)
+{
+ EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+ .WillOnce(Return(TEST_HANDLE));
+ EXPECT_CALL(GetMock(),
+ mosquitto_int_option(TEST_HANDLE, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5))
+ .Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+
+ try {
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception occurred";
+ }
+}
+
+TEST_F(MQMockTest, Negative_Connect_will_set_Anytime)
+{
+ EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+ .WillOnce(Return(TEST_HANDLE));
+ EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_will_set(TEST_HANDLE, testing::StrEq("lastWill"),
+ sizeof(TEST_PAYLOAD), TEST_PAYLOAD, AITT_QOS_AT_MOST_ONCE, true))
+ .WillOnce(Return(MOSQ_ERR_NOMEM));
+ EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ try {
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+ mq.SetWillInfo("lastWill", TEST_PAYLOAD, sizeof(TEST_PAYLOAD), AITT_QOS_AT_MOST_ONCE, true);
+ mq.Connect(TEST_HOST, TEST_PORT, "", "");
+ FAIL() << "Connect() must be failed";
+ } catch (std::exception &e) {
+ ASSERT_STREQ(e.what(), "MQTT failure");
+ }
+}
+
+TEST_F(MQMockTest, Positive_Connect_Anytime)
+{
+ EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+ .WillOnce(Return(TEST_HANDLE));
+ EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
+ .WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ try {
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+ mq.Connect(TEST_HOST, TEST_PORT, "", "");
+ } catch (std::exception &e) {
+ FAIL() << "Unepxected exception: " << e.what();
+ }
+}
+
+TEST_F(MQMockTest, Positive_Connect_User_Anytime)
+{
+ std::string username = "test";
+ std::string password = "test";
+ EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+ .WillOnce(Return(TEST_HANDLE));
+ EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+ EXPECT_CALL(GetMock(),
+ mosquitto_username_pw_set(TEST_HANDLE, username.c_str(), password.c_str()))
+ .Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
+ .WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ try {
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+ mq.Connect(TEST_HOST, TEST_PORT, username, password);
+ } catch (std::exception &e) {
+ FAIL() << "Unepxected exception: " << e.what();
+ }
+}
+
+TEST_F(MQMockTest, Positive_Disconnect_Anytime)
+{
+ EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+ .WillOnce(Return(TEST_HANDLE));
+ EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_disconnect(testing::_)).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_will_clear(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+ EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+ try {
+ aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+ mq.Disconnect();
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception: " << e.what();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MosquittoMQ.h"
+
+#include <gtest/gtest.h>
+
+#include "aitt_internal.h"
+#include "aitt_tests.h"
+
+using MosquittoMQ = aitt::MosquittoMQ;
+
+class MQTest : public testing::Test, public AittTests {
+ protected:
+ void SetUp() override { Init(); }
+ void TearDown() override { Deinit(); }
+};
+
+TEST_F(MQTest, Positve_Subscribe_in_Subscribe_Anytime)
+{
+ try {
+ MosquittoMQ mq("MQ_TEST_ID");
+ mq.Connect(LOCAL_IP, 1883, "", "");
+ mq.Subscribe(
+ "MQ_TEST_TOPIC1",
+ [&](aitt::MSG *handle, const std::string &topic, const void *data,
+ const size_t datalen, void *user_data) {
+ DBG("Subscribe invoked: %s %zu", static_cast<const char *>(data), datalen);
+
+ mq.Subscribe(
+ "topic1InCallback",
+ [](aitt::MSG *handle, const std::string &topic, const void *msg,
+ const size_t szmsg, void *cbdata) {},
+ user_data);
+
+ mq.Subscribe(
+ "topic2InCallback",
+ [](aitt::MSG *handle, const std::string &topic, const void *msg,
+ const size_t szmsg, void *cbdata) {},
+ user_data);
+ g_timeout_add(
+ 100,
+ [](gpointer cbdata) -> gboolean {
+ MQTest *test = static_cast<MQTest *>(cbdata);
+ test->ToggleReady();
+ return G_SOURCE_REMOVE;
+ },
+ user_data);
+ },
+ static_cast<void *>(this));
+
+ DBG("Publish message to %s (%s)", "MQ_TEST_TOPIC1", TEST_MSG);
+ mq.Publish("MQ_TEST_TOPIC1", TEST_MSG, sizeof(TEST_MSG));
+
+ g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
+
+ IterateEventLoop();
+
+ ASSERT_TRUE(ready);
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception: " << e.what();
+ }
+}