apply MQ modularization
authorYoungjae Shin <yj99.shin@samsung.com>
Mon, 22 Aug 2022 01:29:53 +0000 (10:29 +0900)
committerYoungjae Shin <yj99.shin@samsung.com>
Thu, 15 Sep 2022 05:29:05 +0000 (14:29 +0900)
- apply MQProxy

21 files changed:
common/AittDiscovery.cc
common/AittDiscovery.h
common/MQ.cc [deleted file]
common/MQ.h [deleted file]
common/MQProxy.cc [new file with mode: 0644]
common/MQProxy.h [new file with mode: 0644]
common/ModuleLoader.cc
common/ModuleLoader.h
common/MosquittoMQ.cc [new file with mode: 0644]
common/MosquittoMQ.h [new file with mode: 0644]
include/MQ.h [new file with mode: 0644]
modules/webrtc/MqttServer.cc
modules/webrtc/MqttServer.h
src/AITTImpl.cc
src/AITTImpl.h
tests/CMakeLists.txt
tests/MQ_mocktest.cc [deleted file]
tests/MQ_test.cc [deleted file]
tests/ModuleLoader_test.cc
tests/MosquittoMQ_mocktest.cc [new file with mode: 0644]
tests/MosquittoMQ_test.cc [new file with mode: 0644]

index 8f751033de3355f9bae44b92908ace9d3d0c4e37..3d808640981dc714b848782c7296860872fc7210 100644 (file)
 #include <atomic>
 
 #include "AittException.h"
+#include "MQProxy.h"
 #include "aitt_internal.h"
 
 namespace aitt {
 
 AittDiscovery::AittDiscovery(const std::string &id, bool custom_broker)
-      : id_(id), discovery_mq(id + "d", true), callback_handle(nullptr)
+      : id_(id), discovery_mq(new MQProxy(id + "d", true, custom_broker)), callback_handle(nullptr)
 {
 }
 
@@ -34,19 +35,19 @@ void AittDiscovery::Start(const std::string &host, int port, const std::string &
 {
     RET_IF(callback_handle);
 
-    discovery_mq.SetWillInfo(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
-    discovery_mq.Connect(host, port, username, password);
+    discovery_mq->SetWillInfo(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
+    discovery_mq->Connect(host, port, username, password);
 
-    callback_handle = discovery_mq.Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback,
+    callback_handle = discovery_mq->Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback,
           static_cast<void *>(this), AITT_QOS_EXACTLY_ONCE);
 }
 
 void AittDiscovery::Stop()
 {
-    discovery_mq.Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
-    discovery_mq.Unsubscribe(callback_handle);
+    discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
+    discovery_mq->Unsubscribe(callback_handle);
     callback_handle = nullptr;
-    discovery_mq.Disconnect();
+    discovery_mq->Disconnect();
 }
 
 void AittDiscovery::UpdateDiscoveryMsg(AittProtocol protocol, const void *msg, size_t length)
@@ -139,7 +140,7 @@ void AittDiscovery::PublishDiscoveryMsg()
     fbb.Finish();
 
     auto buf = fbb.GetBuffer();
-    discovery_mq.Publish(DISCOVERY_TOPIC_BASE + id_, buf.data(), buf.size(), AITT_QOS_EXACTLY_ONCE,
+    discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, buf.data(), buf.size(), AITT_QOS_EXACTLY_ONCE,
           true);
 }
 
index fd6ba1df47d36696754ac9cc4249a5e6c4edb75c..4a3797d1e3043f710432498c8c56396dd62e2406 100644 (file)
@@ -56,7 +56,7 @@ class AittDiscovery {
     AittProtocol GetProtocol(const std::string &protocol_str);
 
     std::string id_;
-    MQ discovery_mq;
+    std::unique_ptr<MQ> discovery_mq;
     void *callback_handle;
     std::map<AittProtocol, DiscoveryBlob> discovery_map;
     std::map<int, std::pair<AittProtocol, DiscoveryCallback>> callbacks;
diff --git a/common/MQ.cc b/common/MQ.cc
deleted file mode 100644 (file)
index 951b9f1..0000000
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "MQ.h"
-
-#include <mqtt_protocol.h>
-#include <sys/types.h>
-#include <unistd.h>
-
-#include <algorithm>
-#include <cerrno>
-#include <stdexcept>
-#include <thread>
-
-#include "AittException.h"
-#include "AittTypes.h"
-#include "aitt_internal.h"
-
-namespace aitt {
-
-const std::string MQ::REPLY_SEQUENCE_NUM_KEY = "sequenceNum";
-const std::string MQ::REPLY_IS_END_SEQUENCE_KEY = "isEndSequence";
-
-MQ::MQ(const std::string &id, bool clear_session)
-      : handle(nullptr),
-        keep_alive(60),
-        subscribers_iterating(false),
-        subscriber_iterator_updated(false),
-        connect_cb(nullptr)
-{
-    do {
-        int ret = mosquitto_lib_init();
-        if (ret != MOSQ_ERR_SUCCESS) {
-            ERR("mosquitto_lib_init() Fail(%s)", mosquitto_strerror(ret));
-            break;
-        }
-
-        handle = mosquitto_new(id.c_str(), clear_session, this);
-        if (handle == nullptr) {
-            ERR("mosquitto_new(%s, %d) Fail", id.c_str(), clear_session);
-            break;
-        }
-
-        ret = mosquitto_int_option(handle, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
-        if (ret != MOSQ_ERR_SUCCESS) {
-            ERR("mosquitto_int_option() Fail(%s)", mosquitto_strerror(ret));
-            break;
-        }
-
-        mosquitto_message_v5_callback_set(handle, MessageCallback);
-        mosquitto_connect_v5_callback_set(handle, ConnectCallback);
-        mosquitto_disconnect_v5_callback_set(handle, DisconnectCallback);
-
-        ret = mosquitto_loop_start(handle);
-        if (ret != MOSQ_ERR_SUCCESS) {
-            ERR("mosquitto_loop_start() Fail(%s)", mosquitto_strerror(ret));
-            break;
-        }
-
-        return;
-    } while (0);
-
-    mosquitto_destroy(handle);
-    mosquitto_lib_cleanup();
-    throw AittException(AittException::MQTT_ERR, std::string("MQ Constructor Error"));
-}
-
-MQ::~MQ(void)
-{
-    int ret;
-    INFO("Destructor");
-
-    ret = mosquitto_loop_stop(handle, true);
-    if (ret != MOSQ_ERR_SUCCESS)
-        ERR("mosquitto_loop_stop() Fail(%s)", mosquitto_strerror(ret));
-
-    callback_lock.lock();
-    connect_cb = nullptr;
-    subscribers.clear();
-    callback_lock.unlock();
-
-    mosquitto_destroy(handle);
-
-    ret = mosquitto_lib_cleanup();
-    if (ret != MOSQ_ERR_SUCCESS)
-        ERR("mosquitto_lib_cleanup() Fail(%s)", mosquitto_strerror(ret));
-}
-
-void MQ::SetConnectionCallback(const MQConnectionCallback &cb)
-{
-    std::lock_guard<std::recursive_mutex> lock_from_here(callback_lock);
-    connect_cb = cb;
-}
-
-void MQ::ConnectCallback(struct mosquitto *mosq, void *obj, int rc, int flag,
-      const mosquitto_property *props)
-{
-    RET_IF(obj == nullptr);
-    MQ *mq = static_cast<MQ *>(obj);
-
-    INFO("Connected : rc(%d), flag(%d)", rc, flag);
-
-    std::lock_guard<std::recursive_mutex> lock_from_here(mq->callback_lock);
-    if (mq->connect_cb)
-        mq->connect_cb(AITT_CONNECTED);
-}
-
-void MQ::DisconnectCallback(struct mosquitto *mosq, void *obj, int rc,
-      const mosquitto_property *props)
-{
-    RET_IF(obj == nullptr);
-    MQ *mq = static_cast<MQ *>(obj);
-
-    INFO("Disconnected : rc(%d)", rc);
-
-    std::lock_guard<std::recursive_mutex> lock_from_here(mq->callback_lock);
-    if (mq->connect_cb)
-        mq->connect_cb(AITT_DISCONNECTED);
-}
-
-void MQ::Connect(const std::string &host, int port, const std::string &username,
-      const std::string &password)
-{
-    int ret;
-
-    if (username.empty() == false) {
-        ret = mosquitto_username_pw_set(handle, username.c_str(), password.c_str());
-        if (ret != MOSQ_ERR_SUCCESS) {
-            ERR("mosquitto_username_pw_set(%s, %s) Fail(%s)", username.c_str(), password.c_str(),
-                  mosquitto_strerror(ret));
-            throw AittException(AittException::MQTT_ERR);
-        }
-    }
-
-    ret = mosquitto_connect(handle, host.c_str(), port, keep_alive);
-    if (ret != MOSQ_ERR_SUCCESS) {
-        ERR("mosquitto_connect(%s, %d) Fail(%s)", host.c_str(), port, mosquitto_strerror(ret));
-        throw AittException(AittException::MQTT_ERR);
-    }
-}
-
-void MQ::SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos, bool retain)
-{
-    int ret = mosquitto_will_set(handle, topic.c_str(), szmsg, msg, qos, retain);
-    if (ret != MOSQ_ERR_SUCCESS) {
-        ERR("mosquitto_will_set(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
-        throw AittException(AittException::MQTT_ERR);
-    }
-}
-
-void MQ::Disconnect(void)
-{
-    int ret = mosquitto_disconnect(handle);
-    if (ret != MOSQ_ERR_SUCCESS) {
-        ERR("mosquitto_disconnect() Fail(%s)", mosquitto_strerror(ret));
-        throw AittException(AittException::MQTT_ERR);
-    }
-
-    mosquitto_will_clear(handle);
-}
-
-void MQ::MessageCallback(mosquitto *handle, void *obj, const mosquitto_message *msg,
-      const mosquitto_property *props)
-{
-    RET_IF(obj == nullptr);
-    MQ *mq = static_cast<MQ *>(obj);
-
-    std::lock_guard<std::recursive_mutex> auto_lock(mq->callback_lock);
-    mq->subscribers_iterating = true;
-    mq->subscriber_iterator = mq->subscribers.begin();
-    while (mq->subscriber_iterator != mq->subscribers.end()) {
-        auto subscribe_data = *(mq->subscriber_iterator);
-        if (nullptr == subscribe_data)
-            ERR("end() is not valid because elements were added.");
-
-        bool result = CompareTopic(subscribe_data->topic.c_str(), msg->topic);
-        if (result)
-            mq->InvokeCallback(msg, props);
-
-        if (!mq->subscriber_iterator_updated)
-            mq->subscriber_iterator++;
-        else
-            mq->subscriber_iterator_updated = false;
-    }
-    mq->subscribers_iterating = false;
-    mq->subscribers.insert(mq->subscribers.end(), mq->new_subscribers.begin(),
-          mq->new_subscribers.end());
-    mq->new_subscribers.clear();
-}
-
-void MQ::InvokeCallback(const mosquitto_message *msg, const mosquitto_property *props)
-{
-    MSG mq_msg;
-    mq_msg.SetTopic(msg->topic);
-    if (props) {
-        const mosquitto_property *prop;
-
-        char *response_topic = nullptr;
-        prop = mosquitto_property_read_string(props, MQTT_PROP_RESPONSE_TOPIC, &response_topic,
-              false);
-        if (prop) {
-            mq_msg.SetResponseTopic(response_topic);
-            free(response_topic);
-        }
-
-        void *correlation = nullptr;
-        uint16_t correlation_size = 0;
-        prop = mosquitto_property_read_binary(props, MQTT_PROP_CORRELATION_DATA, &correlation,
-              &correlation_size, false);
-        if (prop == nullptr || correlation == nullptr)
-            ERR("No Correlation Data");
-
-        mq_msg.SetCorrelation(std::string((char *)correlation, correlation_size));
-        if (correlation)
-            free(correlation);
-
-        char *name = nullptr;
-        char *value = nullptr;
-        prop = mosquitto_property_read_string_pair(props, MQTT_PROP_USER_PROPERTY, &name, &value,
-              false);
-        while (prop) {
-            if (REPLY_SEQUENCE_NUM_KEY == name) {
-                mq_msg.SetSequence(std::stoi(value));
-            } else if (REPLY_IS_END_SEQUENCE_KEY == name) {
-                mq_msg.SetEndSequence(std::stoi(value) == 1);
-            } else {
-                ERR("Unsupported property(%s, %s)", name, value);
-            }
-            free(name);
-            free(value);
-
-            prop = mosquitto_property_read_string_pair(prop, MQTT_PROP_USER_PROPERTY, &name, &value,
-                  true);
-        }
-    }
-
-    SubscribeData *cb_info = *subscriber_iterator;
-    cb_info->cb(&mq_msg, msg->topic, msg->payload, msg->payloadlen, cb_info->user_data);
-}
-
-void MQ::Publish(const std::string &topic, const void *data, const size_t datalen, int qos,
-      bool retain)
-{
-    int mid = -1;
-    int ret = mosquitto_publish(handle, &mid, topic.c_str(), datalen, data, qos, retain);
-    if (ret != MOSQ_ERR_SUCCESS) {
-        ERR("mosquitto_publish(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
-        throw AittException(AittException::MQTT_ERR);
-    }
-}
-
-void MQ::PublishWithReply(const std::string &topic, const void *data, const size_t datalen, int qos,
-      bool retain, const std::string &reply_topic, const std::string &correlation)
-{
-    int ret;
-    int mid = -1;
-    mosquitto_property *props = nullptr;
-
-    ret = mosquitto_property_add_string(&props, MQTT_PROP_RESPONSE_TOPIC, reply_topic.c_str());
-    if (ret != MOSQ_ERR_SUCCESS) {
-        ERR("mosquitto_property_add_string(response-topic) Fail(%s)", mosquitto_strerror(ret));
-        throw AittException(AittException::MQTT_ERR);
-    }
-
-    ret = mosquitto_property_add_binary(&props, MQTT_PROP_CORRELATION_DATA, correlation.c_str(),
-          correlation.size());
-    if (ret != MOSQ_ERR_SUCCESS) {
-        ERR("mosquitto_property_add_binary(correlation) Fail(%s)", mosquitto_strerror(ret));
-        throw AittException(AittException::MQTT_ERR);
-    }
-    ret = mosquitto_publish_v5(handle, &mid, topic.c_str(), datalen, data, qos, retain, props);
-    if (ret != MOSQ_ERR_SUCCESS) {
-        ERR("mosquitto_publish_v5(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
-        throw AittException(AittException::MQTT_ERR);
-    }
-}
-
-void MQ::SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain)
-{
-    RET_IF(msg == nullptr);
-
-    int ret;
-    int mId = -1;
-    mosquitto_property *props = nullptr;
-
-    ret = mosquitto_property_add_binary(&props, MQTT_PROP_CORRELATION_DATA,
-          msg->GetCorrelation().c_str(), msg->GetCorrelation().size());
-    if (ret != MOSQ_ERR_SUCCESS) {
-        ERR("mosquitto_property_add_binary(correlation) Fail(%s)", mosquitto_strerror(ret));
-        throw AittException(AittException::MQTT_ERR);
-    }
-
-    ret = mosquitto_property_add_string_pair(&props, MQTT_PROP_USER_PROPERTY,
-          REPLY_SEQUENCE_NUM_KEY.c_str(), std::to_string(msg->GetSequence()).c_str());
-    if (ret != MOSQ_ERR_SUCCESS) {
-        ERR("mosquitto_property_add_string_pair(squenceNum) Fail(%s)", mosquitto_strerror(ret));
-        throw AittException(AittException::MQTT_ERR);
-    }
-
-    ret = mosquitto_property_add_string_pair(&props, MQTT_PROP_USER_PROPERTY,
-          REPLY_IS_END_SEQUENCE_KEY.c_str(), std::to_string(msg->IsEndSequence()).c_str());
-    if (ret != MOSQ_ERR_SUCCESS) {
-        ERR("mosquitto_property_add_string_pair(IsEndSequence) Fail(%s)", mosquitto_strerror(ret));
-        throw AittException(AittException::MQTT_ERR);
-    }
-
-    ret = mosquitto_publish_v5(handle, &mId, msg->GetResponseTopic().c_str(), datalen, data, qos,
-          retain, props);
-    if (ret != MOSQ_ERR_SUCCESS) {
-        ERR("mosquitto_publish_v5(%s) Fail(%s)", msg->GetResponseTopic().c_str(),
-              mosquitto_strerror(ret));
-        throw AittException(AittException::MQTT_ERR);
-    }
-}
-
-void *MQ::Subscribe(const std::string &topic, const SubscribeCallback &cb, void *user_data, int qos)
-{
-    int mid = -1;
-    int ret = mosquitto_subscribe(handle, &mid, topic.c_str(), qos);
-    if (ret != MOSQ_ERR_SUCCESS) {
-        ERR("mosquitto_subscribe(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
-        throw AittException(AittException::MQTT_ERR);
-    }
-
-    std::lock_guard<std::recursive_mutex> lock_from_here(callback_lock);
-    SubscribeData *data = new SubscribeData(topic, cb, user_data);
-    if (subscribers_iterating)
-        new_subscribers.push_back(data);
-    else
-        subscribers.push_back(data);
-
-    return static_cast<void *>(data);
-}
-
-void *MQ::Unsubscribe(void *sub_handle)
-{
-    std::lock_guard<std::recursive_mutex> auto_lock(callback_lock);
-    auto it = std::find(subscribers.begin(), subscribers.end(),
-          static_cast<SubscribeData *>(sub_handle));
-
-    if (it == subscribers.end()) {
-        ERR("No Subscription(%p)", sub_handle);
-        throw AittException(AittException::NO_DATA_ERR);
-    }
-
-    SubscribeData *data = static_cast<SubscribeData *>(sub_handle);
-
-    if (subscriber_iterator == it) {
-        subscriber_iterator = subscribers.erase(it);
-        subscriber_iterator_updated = true;
-    } else {
-        subscribers.erase(it);
-    }
-
-    void *user_data = data->user_data;
-    std::string topic = data->topic;
-    delete data;
-
-    int mid = -1;
-    int ret = mosquitto_unsubscribe(handle, &mid, topic.c_str());
-    if (ret != MOSQ_ERR_SUCCESS) {
-        ERR("mosquitto_unsubscribe(%s) Fail(%d)", topic.c_str(), ret);
-        throw AittException(AittException::MQTT_ERR);
-    }
-
-    return user_data;
-}
-
-bool MQ::CompareTopic(const std::string &left, const std::string &right)
-{
-    bool result = false;
-    int ret = mosquitto_topic_matches_sub(left.c_str(), right.c_str(), &result);
-    if (ret != MOSQ_ERR_SUCCESS) {
-        ERR("mosquitto_topic_matches_sub(%s, %s) Fail(%s)", left.c_str(), right.c_str(),
-              mosquitto_strerror(ret));
-        throw AittException(AittException::MQTT_ERR);
-    }
-    return result;
-}
-
-MQ::SubscribeData::SubscribeData(const std::string &in_topic, const SubscribeCallback &in_cb,
-      void *in_user_data)
-      : topic(in_topic), cb(in_cb), user_data(in_user_data)
-{
-}
-
-}  // namespace aitt
diff --git a/common/MQ.h b/common/MQ.h
deleted file mode 100644 (file)
index bdcf939..0000000
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <mosquitto.h>
-
-#include <functional>
-#include <mutex>
-#include <string>
-#include <vector>
-
-#include "MSG.h"
-
-#define MQTT_LOCALHOST "127.0.0.1"
-#define MQTT_PORT 1883
-
-namespace aitt {
-
-class MQ {
-  public:
-    using SubscribeCallback = std::function<void(MSG *msg, const std::string &topic,
-          const void *data, const size_t datalen, void *user_data)>;
-    using MQConnectionCallback = std::function<void(int)>;
-
-    explicit MQ(const std::string &id, bool clear_session = false);
-    virtual ~MQ(void);
-
-    static bool CompareTopic(const std::string &left, const std::string &right);
-
-    void SetConnectionCallback(const MQConnectionCallback &cb);
-    void Connect(const std::string &host, int port, const std::string &username,
-          const std::string &password);
-    void SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos, bool retain);
-    void Disconnect(void);
-    void Publish(const std::string &topic, const void *data, const size_t datalen, int qos = 0,
-          bool retain = false);
-    void PublishWithReply(const std::string &topic, const void *data, const size_t datalen, int qos,
-          bool retain, const std::string &reply_topic, const std::string &correlation);
-    void SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain);
-    void *Subscribe(const std::string &topic, const SubscribeCallback &cb,
-          void *user_data = nullptr, int qos = 0);
-    void *Unsubscribe(void *handle);
-
-  private:
-    struct SubscribeData {
-        SubscribeData(const std::string &topic, const SubscribeCallback &cb, void *user_data);
-        std::string topic;
-        SubscribeCallback cb;
-        void *user_data;
-    };
-
-    static void ConnectCallback(mosquitto *mosq, void *obj, int rc, int flag,
-          const mosquitto_property *props);
-    static void DisconnectCallback(mosquitto *mosq, void *obj, int rc,
-          const mosquitto_property *props);
-    static void MessageCallback(mosquitto *, void *, const mosquitto_message *,
-          const mosquitto_property *);
-    void InvokeCallback(const mosquitto_message *msg, const mosquitto_property *props);
-
-    static const std::string REPLY_SEQUENCE_NUM_KEY;
-    static const std::string REPLY_IS_END_SEQUENCE_KEY;
-
-    mosquitto *handle;
-    const int keep_alive;
-    std::vector<SubscribeData *> subscribers;
-    bool subscribers_iterating;
-    std::vector<SubscribeData *> new_subscribers;
-    std::vector<SubscribeData *>::iterator subscriber_iterator;
-    bool subscriber_iterator_updated;
-    std::recursive_mutex callback_lock;
-    MQConnectionCallback connect_cb;
-};
-
-}  // namespace aitt
diff --git a/common/MQProxy.cc b/common/MQProxy.cc
new file mode 100644 (file)
index 0000000..0c259de
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MQProxy.h"
+
+#include "ModuleLoader.h"
+#include "MosquittoMQ.h"
+
+namespace aitt {
+
+MQProxy::MQProxy(const std::string &id, bool clear_session, bool is_custom_broker)
+      : handle(nullptr, nullptr)
+{
+    if (is_custom_broker) {
+        ModuleLoader loader;
+        handle = loader.OpenModule(ModuleLoader::TYPE_CUSTOM_MQTT);
+
+        mq = loader.LoadMqttClient(handle.get(), "test", true);
+    } else {
+        mq = std::shared_ptr<MQ>(new MosquittoMQ(id, clear_session));
+    }
+}
+
+void MQProxy::SetConnectionCallback(const MQConnectionCallback &cb)
+{
+    mq->SetConnectionCallback(cb);
+}
+
+void MQProxy::Connect(const std::string &host, int port, const std::string &username,
+      const std::string &password)
+{
+    mq->Connect(host, port, username, password);
+}
+
+void MQProxy::SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos,
+      bool retain)
+{
+    mq->SetWillInfo(topic, msg, szmsg, qos, retain);
+}
+
+void MQProxy::Disconnect(void)
+{
+    mq->Disconnect();
+}
+
+void MQProxy::Publish(const std::string &topic, const void *data, const size_t datalen, int qos,
+      bool retain)
+{
+    mq->Publish(topic, data, datalen, qos, retain);
+}
+
+void MQProxy::PublishWithReply(const std::string &topic, const void *data, const size_t datalen,
+      int qos, bool retain, const std::string &reply_topic, const std::string &correlation)
+{
+    mq->PublishWithReply(topic, data, datalen, qos, retain, reply_topic, correlation);
+}
+
+void MQProxy::SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain)
+{
+    mq->SendReply(msg, data, datalen, qos, retain);
+}
+
+void *MQProxy::Subscribe(const std::string &topic, const SubscribeCallback &cb, void *user_data,
+      int qos)
+{
+    return mq->Subscribe(topic, cb, user_data, qos);
+}
+
+void *MQProxy::Unsubscribe(void *handle)
+{
+    return mq->Unsubscribe(handle);
+}
+
+}  // namespace aitt
diff --git a/common/MQProxy.h b/common/MQProxy.h
new file mode 100644 (file)
index 0000000..68a30f2
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+
+#include "MQ.h"
+
+namespace aitt {
+
+class MQProxy : public MQ {
+  public:
+    explicit MQProxy(const std::string &id, bool clear_session, bool custom_broker);
+    virtual ~MQProxy() = default;
+
+    void SetConnectionCallback(const MQConnectionCallback &cb);
+    void Connect(const std::string &host, int port, const std::string &username,
+          const std::string &password);
+    void SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos, bool retain);
+    void Disconnect(void);
+    void Publish(const std::string &topic, const void *data, const size_t datalen, int qos = 0,
+          bool retain = false);
+    void PublishWithReply(const std::string &topic, const void *data, const size_t datalen, int qos,
+          bool retain, const std::string &reply_topic, const std::string &correlation);
+    void SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain);
+    void *Subscribe(const std::string &topic, const SubscribeCallback &cb,
+          void *user_data = nullptr, int qos = 0);
+    void *Unsubscribe(void *handle);
+
+  private:
+    std::unique_ptr<void, void (*)(const void *)> handle;
+    std::shared_ptr<MQ> mq;
+};
+
+}  // namespace aitt
index c62229debe132ffa84a4e6e05491519a0c3c0bf5..51cdc78506c5675db5851f83fc2366f08ee9c19a 100644 (file)
@@ -31,6 +31,8 @@ std::string ModuleLoader::GetModuleFilename(Type type)
         return "libaitt-transport-tcp.so";
     if (type == TYPE_WEBRTC)
         return "libaitt-transport-webrtc.so";
+    if (type == TYPE_CUSTOM_MQTT)
+        return "libaitt-st-broker.so";
 
     return std::string("Unknown");
 }
@@ -76,4 +78,24 @@ std::shared_ptr<AittTransport> ModuleLoader::LoadTransport(void *handle, const s
     return instance;
 }
 
+std::shared_ptr<MQ> ModuleLoader::LoadMqttClient(void *handle, const std::string &id,
+      bool clear_session)
+{
+    MQ::ModuleEntry get_instance_fn =
+          reinterpret_cast<MQ::ModuleEntry>(dlsym(handle, MQ::MODULE_ENTRY_NAME));
+    if (get_instance_fn == nullptr) {
+        ERR("dlsym: %s", dlerror());
+        throw AittException(AittException::SYSTEM_ERR);
+    }
+
+    std::shared_ptr<MQ> instance(static_cast<MQ *>(get_instance_fn(id.c_str(), clear_session)),
+          [](const MQ *instance) { delete instance; });
+    if (instance == nullptr) {
+        ERR("Failed to create a new instance");
+        throw AittException(AittException::SYSTEM_ERR);
+    }
+
+    return instance;
+}
+
 }  // namespace aitt
index b1ecfb38be5439f61827a15dd50f256f98ba2d11..66e339d09de49cee7a7e76f603780809e2d4ad7c 100644 (file)
@@ -32,6 +32,7 @@ class ModuleLoader {
         TYPE_WEBRTC,
         TYPE_RTSP,
         TYPE_TRANSPORT_MAX,
+        TYPE_CUSTOM_MQTT,
     };
 
     using ModuleHandle = std::unique_ptr<void, void (*)(const void *)>;
@@ -42,6 +43,7 @@ class ModuleLoader {
     ModuleHandle OpenModule(Type type);
     std::shared_ptr<AittTransport> LoadTransport(void *handle, const std::string &ip,
           AittDiscovery &discovery);
+    std::shared_ptr<MQ> LoadMqttClient(void *handle, const std::string &id, bool clear_session);
 
   private:
     std::string GetModuleFilename(Type type);
diff --git a/common/MosquittoMQ.cc b/common/MosquittoMQ.cc
new file mode 100644 (file)
index 0000000..f10532e
--- /dev/null
@@ -0,0 +1,390 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MosquittoMQ.h"
+
+#include <mqtt_protocol.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <cerrno>
+#include <stdexcept>
+#include <thread>
+
+#include "AittException.h"
+#include "AittTypes.h"
+#include "AittUtil.h"
+#include "aitt_internal.h"
+
+namespace aitt {
+
+const std::string MosquittoMQ::REPLY_SEQUENCE_NUM_KEY = "sequenceNum";
+const std::string MosquittoMQ::REPLY_IS_END_SEQUENCE_KEY = "isEndSequence";
+
+MosquittoMQ::MosquittoMQ(const std::string &id, bool clear_session)
+      : handle(nullptr),
+        keep_alive(60),
+        subscribers_iterating(false),
+        subscriber_iterator_updated(false),
+        connect_cb(nullptr)
+{
+    do {
+        int ret = mosquitto_lib_init();
+        if (ret != MOSQ_ERR_SUCCESS) {
+            ERR("mosquitto_lib_init() Fail(%s)", mosquitto_strerror(ret));
+            break;
+        }
+
+        handle = mosquitto_new(id.c_str(), clear_session, this);
+        if (handle == nullptr) {
+            ERR("mosquitto_new(%s, %d) Fail", id.c_str(), clear_session);
+            break;
+        }
+
+        ret = mosquitto_int_option(handle, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5);
+        if (ret != MOSQ_ERR_SUCCESS) {
+            ERR("mosquitto_int_option() Fail(%s)", mosquitto_strerror(ret));
+            break;
+        }
+
+        mosquitto_message_v5_callback_set(handle, MessageCallback);
+        mosquitto_connect_v5_callback_set(handle, ConnectCallback);
+        mosquitto_disconnect_v5_callback_set(handle, DisconnectCallback);
+
+        ret = mosquitto_loop_start(handle);
+        if (ret != MOSQ_ERR_SUCCESS) {
+            ERR("mosquitto_loop_start() Fail(%s)", mosquitto_strerror(ret));
+            break;
+        }
+
+        return;
+    } while (0);
+
+    mosquitto_destroy(handle);
+    mosquitto_lib_cleanup();
+    throw AittException(AittException::MQTT_ERR, std::string("MosquittoMQ Constructor Error"));
+}
+
+MosquittoMQ::~MosquittoMQ(void)
+{
+    int ret;
+    INFO("Destructor");
+
+    ret = mosquitto_loop_stop(handle, true);
+    if (ret != MOSQ_ERR_SUCCESS)
+        ERR("mosquitto_loop_stop() Fail(%s)", mosquitto_strerror(ret));
+
+    callback_lock.lock();
+    connect_cb = nullptr;
+    subscribers.clear();
+    callback_lock.unlock();
+
+    mosquitto_destroy(handle);
+
+    ret = mosquitto_lib_cleanup();
+    if (ret != MOSQ_ERR_SUCCESS)
+        ERR("mosquitto_lib_cleanup() Fail(%s)", mosquitto_strerror(ret));
+}
+
+void MosquittoMQ::SetConnectionCallback(const MQConnectionCallback &cb)
+{
+    std::lock_guard<std::recursive_mutex> lock_from_here(callback_lock);
+    connect_cb = cb;
+}
+
+void MosquittoMQ::ConnectCallback(struct mosquitto *mosq, void *obj, int rc, int flag,
+      const mosquitto_property *props)
+{
+    RET_IF(obj == nullptr);
+    MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
+
+    INFO("Connected : rc(%d), flag(%d)", rc, flag);
+
+    std::lock_guard<std::recursive_mutex> lock_from_here(mq->callback_lock);
+    if (mq->connect_cb)
+        mq->connect_cb(AITT_CONNECTED);
+}
+
+void MosquittoMQ::DisconnectCallback(struct mosquitto *mosq, void *obj, int rc,
+      const mosquitto_property *props)
+{
+    RET_IF(obj == nullptr);
+    MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
+
+    INFO("Disconnected : rc(%d)", rc);
+
+    std::lock_guard<std::recursive_mutex> lock_from_here(mq->callback_lock);
+    if (mq->connect_cb)
+        mq->connect_cb(AITT_DISCONNECTED);
+}
+
+void MosquittoMQ::Connect(const std::string &host, int port, const std::string &username,
+      const std::string &password)
+{
+    int ret;
+
+    if (username.empty() == false) {
+        ret = mosquitto_username_pw_set(handle, username.c_str(), password.c_str());
+        if (ret != MOSQ_ERR_SUCCESS) {
+            ERR("mosquitto_username_pw_set(%s, %s) Fail(%s)", username.c_str(), password.c_str(),
+                  mosquitto_strerror(ret));
+            throw AittException(AittException::MQTT_ERR);
+        }
+    }
+
+    ret = mosquitto_connect(handle, host.c_str(), port, keep_alive);
+    if (ret != MOSQ_ERR_SUCCESS) {
+        ERR("mosquitto_connect(%s, %d) Fail(%s)", host.c_str(), port, mosquitto_strerror(ret));
+        throw AittException(AittException::MQTT_ERR);
+    }
+}
+
+void MosquittoMQ::SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos,
+      bool retain)
+{
+    int ret = mosquitto_will_set(handle, topic.c_str(), szmsg, msg, qos, retain);
+    if (ret != MOSQ_ERR_SUCCESS) {
+        ERR("mosquitto_will_set(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
+        throw AittException(AittException::MQTT_ERR);
+    }
+}
+
+void MosquittoMQ::Disconnect(void)
+{
+    int ret = mosquitto_disconnect(handle);
+    if (ret != MOSQ_ERR_SUCCESS) {
+        ERR("mosquitto_disconnect() Fail(%s)", mosquitto_strerror(ret));
+        throw AittException(AittException::MQTT_ERR);
+    }
+
+    mosquitto_will_clear(handle);
+}
+
+void MosquittoMQ::MessageCallback(mosquitto *handle, void *obj, const mosquitto_message *msg,
+      const mosquitto_property *props)
+{
+    RET_IF(obj == nullptr);
+    MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
+
+    std::lock_guard<std::recursive_mutex> auto_lock(mq->callback_lock);
+    mq->subscribers_iterating = true;
+    mq->subscriber_iterator = mq->subscribers.begin();
+    while (mq->subscriber_iterator != mq->subscribers.end()) {
+        auto subscribe_data = *(mq->subscriber_iterator);
+        if (nullptr == subscribe_data)
+            ERR("end() is not valid because elements were added.");
+
+        bool result = AittUtil::CompareTopic(subscribe_data->topic.c_str(), msg->topic);
+        if (result)
+            mq->InvokeCallback(msg, props);
+
+        if (!mq->subscriber_iterator_updated)
+            mq->subscriber_iterator++;
+        else
+            mq->subscriber_iterator_updated = false;
+    }
+    mq->subscribers_iterating = false;
+    mq->subscribers.insert(mq->subscribers.end(), mq->new_subscribers.begin(),
+          mq->new_subscribers.end());
+    mq->new_subscribers.clear();
+}
+
+void MosquittoMQ::InvokeCallback(const mosquitto_message *msg, const mosquitto_property *props)
+{
+    MSG mq_msg;
+    mq_msg.SetTopic(msg->topic);
+    if (props) {
+        const mosquitto_property *prop;
+
+        char *response_topic = nullptr;
+        prop = mosquitto_property_read_string(props, MQTT_PROP_RESPONSE_TOPIC, &response_topic,
+              false);
+        if (prop) {
+            mq_msg.SetResponseTopic(response_topic);
+            free(response_topic);
+        }
+
+        void *correlation = nullptr;
+        uint16_t correlation_size = 0;
+        prop = mosquitto_property_read_binary(props, MQTT_PROP_CORRELATION_DATA, &correlation,
+              &correlation_size, false);
+        if (prop == nullptr || correlation == nullptr)
+            ERR("No Correlation Data");
+
+        mq_msg.SetCorrelation(std::string((char *)correlation, correlation_size));
+        if (correlation)
+            free(correlation);
+
+        char *name = nullptr;
+        char *value = nullptr;
+        prop = mosquitto_property_read_string_pair(props, MQTT_PROP_USER_PROPERTY, &name, &value,
+              false);
+        while (prop) {
+            if (REPLY_SEQUENCE_NUM_KEY == name) {
+                mq_msg.SetSequence(std::stoi(value));
+            } else if (REPLY_IS_END_SEQUENCE_KEY == name) {
+                mq_msg.SetEndSequence(std::stoi(value) == 1);
+            } else {
+                ERR("Unsupported property(%s, %s)", name, value);
+            }
+            free(name);
+            free(value);
+
+            prop = mosquitto_property_read_string_pair(prop, MQTT_PROP_USER_PROPERTY, &name, &value,
+                  true);
+        }
+    }
+
+    SubscribeData *cb_info = *subscriber_iterator;
+    cb_info->cb(&mq_msg, msg->topic, msg->payload, msg->payloadlen, cb_info->user_data);
+}
+
+void MosquittoMQ::Publish(const std::string &topic, const void *data, const size_t datalen, int qos,
+      bool retain)
+{
+    int mid = -1;
+    int ret = mosquitto_publish(handle, &mid, topic.c_str(), datalen, data, qos, retain);
+    if (ret != MOSQ_ERR_SUCCESS) {
+        ERR("mosquitto_publish(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
+        throw AittException(AittException::MQTT_ERR);
+    }
+}
+
+void MosquittoMQ::PublishWithReply(const std::string &topic, const void *data, const size_t datalen,
+      int qos, bool retain, const std::string &reply_topic, const std::string &correlation)
+{
+    int ret;
+    int mid = -1;
+    mosquitto_property *props = nullptr;
+
+    ret = mosquitto_property_add_string(&props, MQTT_PROP_RESPONSE_TOPIC, reply_topic.c_str());
+    if (ret != MOSQ_ERR_SUCCESS) {
+        ERR("mosquitto_property_add_string(response-topic) Fail(%s)", mosquitto_strerror(ret));
+        throw AittException(AittException::MQTT_ERR);
+    }
+
+    ret = mosquitto_property_add_binary(&props, MQTT_PROP_CORRELATION_DATA, correlation.c_str(),
+          correlation.size());
+    if (ret != MOSQ_ERR_SUCCESS) {
+        ERR("mosquitto_property_add_binary(correlation) Fail(%s)", mosquitto_strerror(ret));
+        throw AittException(AittException::MQTT_ERR);
+    }
+    ret = mosquitto_publish_v5(handle, &mid, topic.c_str(), datalen, data, qos, retain, props);
+    if (ret != MOSQ_ERR_SUCCESS) {
+        ERR("mosquitto_publish_v5(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
+        throw AittException(AittException::MQTT_ERR);
+    }
+}
+
+void MosquittoMQ::SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain)
+{
+    RET_IF(msg == nullptr);
+
+    int ret;
+    int mId = -1;
+    mosquitto_property *props = nullptr;
+
+    ret = mosquitto_property_add_binary(&props, MQTT_PROP_CORRELATION_DATA,
+          msg->GetCorrelation().c_str(), msg->GetCorrelation().size());
+    if (ret != MOSQ_ERR_SUCCESS) {
+        ERR("mosquitto_property_add_binary(correlation) Fail(%s)", mosquitto_strerror(ret));
+        throw AittException(AittException::MQTT_ERR);
+    }
+
+    ret = mosquitto_property_add_string_pair(&props, MQTT_PROP_USER_PROPERTY,
+          REPLY_SEQUENCE_NUM_KEY.c_str(), std::to_string(msg->GetSequence()).c_str());
+    if (ret != MOSQ_ERR_SUCCESS) {
+        ERR("mosquitto_property_add_string_pair(squenceNum) Fail(%s)", mosquitto_strerror(ret));
+        throw AittException(AittException::MQTT_ERR);
+    }
+
+    ret = mosquitto_property_add_string_pair(&props, MQTT_PROP_USER_PROPERTY,
+          REPLY_IS_END_SEQUENCE_KEY.c_str(), std::to_string(msg->IsEndSequence()).c_str());
+    if (ret != MOSQ_ERR_SUCCESS) {
+        ERR("mosquitto_property_add_string_pair(IsEndSequence) Fail(%s)", mosquitto_strerror(ret));
+        throw AittException(AittException::MQTT_ERR);
+    }
+
+    ret = mosquitto_publish_v5(handle, &mId, msg->GetResponseTopic().c_str(), datalen, data, qos,
+          retain, props);
+    if (ret != MOSQ_ERR_SUCCESS) {
+        ERR("mosquitto_publish_v5(%s) Fail(%s)", msg->GetResponseTopic().c_str(),
+              mosquitto_strerror(ret));
+        throw AittException(AittException::MQTT_ERR);
+    }
+}
+
+void *MosquittoMQ::Subscribe(const std::string &topic, const SubscribeCallback &cb, void *user_data,
+      int qos)
+{
+    int mid = -1;
+    int ret = mosquitto_subscribe(handle, &mid, topic.c_str(), qos);
+    if (ret != MOSQ_ERR_SUCCESS) {
+        ERR("mosquitto_subscribe(%s) Fail(%s)", topic.c_str(), mosquitto_strerror(ret));
+        throw AittException(AittException::MQTT_ERR);
+    }
+
+    std::lock_guard<std::recursive_mutex> lock_from_here(callback_lock);
+    SubscribeData *data = new SubscribeData(topic, cb, user_data);
+    if (subscribers_iterating)
+        new_subscribers.push_back(data);
+    else
+        subscribers.push_back(data);
+
+    return static_cast<void *>(data);
+}
+
+void *MosquittoMQ::Unsubscribe(void *sub_handle)
+{
+    std::lock_guard<std::recursive_mutex> auto_lock(callback_lock);
+    auto it = std::find(subscribers.begin(), subscribers.end(),
+          static_cast<SubscribeData *>(sub_handle));
+
+    if (it == subscribers.end()) {
+        ERR("No Subscription(%p)", sub_handle);
+        throw AittException(AittException::NO_DATA_ERR);
+    }
+
+    SubscribeData *data = static_cast<SubscribeData *>(sub_handle);
+
+    if (subscriber_iterator == it) {
+        subscriber_iterator = subscribers.erase(it);
+        subscriber_iterator_updated = true;
+    } else {
+        subscribers.erase(it);
+    }
+
+    void *user_data = data->user_data;
+    std::string topic = data->topic;
+    delete data;
+
+    int mid = -1;
+    int ret = mosquitto_unsubscribe(handle, &mid, topic.c_str());
+    if (ret != MOSQ_ERR_SUCCESS) {
+        ERR("mosquitto_unsubscribe(%s) Fail(%d)", topic.c_str(), ret);
+        throw AittException(AittException::MQTT_ERR);
+    }
+
+    return user_data;
+}
+
+MosquittoMQ::SubscribeData::SubscribeData(const std::string &in_topic,
+      const SubscribeCallback &in_cb, void *in_user_data)
+      : topic(in_topic), cb(in_cb), user_data(in_user_data)
+{
+}
+
+}  // namespace aitt
diff --git a/common/MosquittoMQ.h b/common/MosquittoMQ.h
new file mode 100644 (file)
index 0000000..6c93809
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <mosquitto.h>
+
+#include <functional>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "MQ.h"
+#include "MSG.h"
+
+#define MQTT_LOCALHOST "127.0.0.1"
+#define MQTT_PORT 1883
+
+namespace aitt {
+
+class MosquittoMQ : public MQ {
+  public:
+    explicit MosquittoMQ(const std::string &id, bool clear_session = false);
+    virtual ~MosquittoMQ(void);
+
+    void SetConnectionCallback(const MQConnectionCallback &cb);
+    void Connect(const std::string &host, int port, const std::string &username,
+          const std::string &password);
+    void SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos, bool retain);
+    void Disconnect(void);
+    void Publish(const std::string &topic, const void *data, const size_t datalen, int qos = 0,
+          bool retain = false);
+    void PublishWithReply(const std::string &topic, const void *data, const size_t datalen, int qos,
+          bool retain, const std::string &reply_topic, const std::string &correlation);
+    void SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain);
+    void *Subscribe(const std::string &topic, const SubscribeCallback &cb,
+          void *user_data = nullptr, int qos = 0);
+    void *Unsubscribe(void *handle);
+
+  private:
+    struct SubscribeData {
+        SubscribeData(const std::string &topic, const SubscribeCallback &cb, void *user_data);
+        std::string topic;
+        SubscribeCallback cb;
+        void *user_data;
+    };
+
+    static void ConnectCallback(mosquitto *mosq, void *obj, int rc, int flag,
+          const mosquitto_property *props);
+    static void DisconnectCallback(mosquitto *mosq, void *obj, int rc,
+          const mosquitto_property *props);
+    static void MessageCallback(mosquitto *, void *, const mosquitto_message *,
+          const mosquitto_property *);
+    void InvokeCallback(const mosquitto_message *msg, const mosquitto_property *props);
+
+    static const std::string REPLY_SEQUENCE_NUM_KEY;
+    static const std::string REPLY_IS_END_SEQUENCE_KEY;
+
+    mosquitto *handle;
+    const int keep_alive;
+    std::vector<SubscribeData *> subscribers;
+    bool subscribers_iterating;
+    std::vector<SubscribeData *> new_subscribers;
+    std::vector<SubscribeData *>::iterator subscriber_iterator;
+    bool subscriber_iterator_updated;
+    std::recursive_mutex callback_lock;
+    MQConnectionCallback connect_cb;
+};
+
+}  // namespace aitt
diff --git a/include/MQ.h b/include/MQ.h
new file mode 100644 (file)
index 0000000..644c8d1
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <MSG.h>
+
+#include <functional>
+#include <string>
+
+#define AITT_MQ_NEW aitt_mq_new
+#define TO_STR(s) #s
+#define DEFINE_TO_STR(x) TO_STR(x)
+
+namespace aitt {
+
+class MQ {
+  public:
+    typedef void *(*ModuleEntry)(const char *id, bool clear_session);
+
+    using SubscribeCallback = std::function<void(MSG *msg, const std::string &topic,
+          const void *data, const size_t datalen, void *user_data)>;
+    using MQConnectionCallback = std::function<void(int)>;
+
+    static constexpr const char *const MODULE_ENTRY_NAME = DEFINE_TO_STR(AITT_MQ_NEW);
+
+    MQ() = default;
+    virtual ~MQ() = default;
+
+    virtual void SetConnectionCallback(const MQConnectionCallback &cb) = 0;
+    virtual void Connect(const std::string &host, int port, const std::string &username,
+          const std::string &password) = 0;
+    virtual void SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos,
+          bool retain) = 0;
+    virtual void Disconnect(void) = 0;
+    virtual void Publish(const std::string &topic, const void *data, const size_t datalen,
+          int qos = 0, bool retain = false) = 0;
+    virtual void PublishWithReply(const std::string &topic, const void *data, const size_t datalen,
+          int qos, bool retain, const std::string &reply_topic, const std::string &correlation) = 0;
+    virtual void SendReply(MSG *msg, const void *data, const size_t datalen, int qos,
+          bool retain) = 0;
+    virtual void *Subscribe(const std::string &topic, const SubscribeCallback &cb,
+          void *user_data = nullptr, int qos = 0) = 0;
+    virtual void *Unsubscribe(void *handle) = 0;
+};
+
+}  // namespace aitt
+
+#undef TO_STR
+#undef DEFINE_TO_STR
index 70a07edea34bc952d941109b0c07c0298f06afff..a0afae56ea147942d0b263131b91bcaade3da3b2 100644 (file)
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 #include "MqttServer.h"
 
+#include "MQProxy.h"
 #include "aitt_internal.h"
 
 #define MQTT_HANDLER_MSG_QOS 1
 #define MQTT_HANDLER_MGMT_QOS 2
 
-MqttServer::MqttServer(const Config &config) : mq(config.GetLocalId(), true)
+MqttServer::MqttServer(const Config &config)
+      : mq(new aitt::MQProxy(config.GetLocalId(), true, false))
 {
     broker_ip_ = config.GetBrokerIp();
     broker_port_ = config.GetBrokerPort();
@@ -33,7 +34,7 @@ MqttServer::MqttServer(const Config &config) : mq(config.GetLocalId(), true)
     DBG("ID[%s] BROKER IP[%s] BROKER PORT [%d] ROOM[%s] %s", id_.c_str(), broker_ip_.c_str(),
           broker_port_, room_id_.c_str(), is_publisher_ ? "Publisher" : "Subscriber");
 
-    mq.SetConnectionCallback(std::bind(&MqttServer::ConnectCallBack, this, std::placeholders::_1));
+    mq->SetConnectionCallback(std::bind(&MqttServer::ConnectCallBack, this, std::placeholders::_1));
 }
 
 MqttServer::~MqttServer()
@@ -96,10 +97,10 @@ void MqttServer::RegisterWithServer(void)
     // Notify Who is source?
     std::string source_topic = room_id_ + std::string("/source");
     if (is_publisher_) {
-        mq.Publish(source_topic, id_.c_str(), id_.size(), AITT_QOS_EXACTLY_ONCE, true);
+        mq->Publish(source_topic, id_.c_str(), id_.size(), AITT_QOS_EXACTLY_ONCE, true);
         SetConnectionState(ConnectionState::Registered);
     } else {
-        mq.Subscribe(source_topic,
+        mq->Subscribe(source_topic,
               std::bind(&MqttServer::HandleSourceTopic, this, std::placeholders::_1,
                     std::placeholders::_2, std::placeholders::_3, std::placeholders::_4,
                     std::placeholders::_5),
@@ -136,11 +137,11 @@ bool MqttServer::IsConnected(void)
 int MqttServer::Connect(void)
 {
     std::string will_message = std::string("ROOM_PEER_LEFT ") + id_;
-    mq.SetWillInfo(room_id_, will_message.c_str(), will_message.size(), AITT_QOS_EXACTLY_ONCE,
+    mq->SetWillInfo(room_id_, will_message.c_str(), will_message.size(), AITT_QOS_EXACTLY_ONCE,
           false);
 
     SetConnectionState(ConnectionState::Connecting);
-    mq.Connect(broker_ip_, broker_port_, std::string(), std::string());
+    mq->Connect(broker_ip_, broker_port_, std::string(), std::string());
 
     return 0;
 }
@@ -150,13 +151,13 @@ int MqttServer::Disconnect(void)
     if (is_publisher_) {
         INFO("remove retained");
         std::string source_topic = room_id_ + std::string("/source");
-        mq.Publish(source_topic, nullptr, 0, AITT_QOS_AT_LEAST_ONCE, true);
+        mq->Publish(source_topic, nullptr, 0, AITT_QOS_AT_LEAST_ONCE, true);
     }
 
     std::string left_message = std::string("ROOM_PEER_LEFT ") + id_;
-    mq.Publish(room_id_, left_message.c_str(), left_message.size(), AITT_QOS_AT_LEAST_ONCE, false);
+    mq->Publish(room_id_, left_message.c_str(), left_message.size(), AITT_QOS_AT_LEAST_ONCE, false);
 
-    mq.Disconnect();
+    mq->Disconnect();
 
     room_id_ = std::string("");
 
@@ -177,7 +178,7 @@ int MqttServer::SendMessage(const std::string &peer_id, const std::string &msg)
 
     std::string receiver_topic = room_id_ + std::string("/") + peer_id;
     std::string server_formatted_msg = "ROOM_PEER_MSG " + id_ + " " + msg;
-    mq.Publish(receiver_topic, server_formatted_msg.c_str(), server_formatted_msg.size(),
+    mq->Publish(receiver_topic, server_formatted_msg.c_str(), server_formatted_msg.size(),
           AITT_QOS_AT_LEAST_ONCE);
 
     return 0;
@@ -220,7 +221,7 @@ void MqttServer::JoinRoom(const std::string &room_id)
     }
 
     // Subscribe PEER_JOIN PEER_LEFT
-    mq.Subscribe(room_id_,
+    mq->Subscribe(room_id_,
           std::bind(&MqttServer::HandleRoomTopic, this, std::placeholders::_1,
                 std::placeholders::_2, std::placeholders::_3, std::placeholders::_4,
                 std::placeholders::_5),
@@ -228,7 +229,7 @@ void MqttServer::JoinRoom(const std::string &room_id)
 
     // Subscribe PEER_MSG
     std::string receiving_topic = room_id + std::string("/") + id_;
-    mq.Subscribe(receiving_topic,
+    mq->Subscribe(receiving_topic,
           std::bind(&MqttServer::HandleMessageTopic, this, std::placeholders::_1,
                 std::placeholders::_2, std::placeholders::_3, std::placeholders::_4,
                 std::placeholders::_5),
@@ -238,7 +239,7 @@ void MqttServer::JoinRoom(const std::string &room_id)
 
     if (!is_publisher_) {
         std::string join_message = std::string("ROOM_PEER_JOINED ") + id_;
-        mq.Publish(room_id_, join_message.c_str(), join_message.size(), AITT_QOS_EXACTLY_ONCE);
+        mq->Publish(room_id_, join_message.c_str(), join_message.size(), AITT_QOS_EXACTLY_ONCE);
     }
 }
 
index ad2a3d616470b5d85a2aac3a563cbe67ce94ee73..49ab5e908754b6205aa9c221efa211798295a14c 100644 (file)
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 #pragma once
 
-#include <MQ.h>
+#include <functional>
+#include <memory>
+#include <string>
 
 #include "Config.h"
 #include "IfaceServer.h"
+#include "MQ.h"
 
 class MqttServer : public IfaceServer {
   public:
@@ -70,7 +72,7 @@ class MqttServer : public IfaceServer {
     std::string room_id_;
     std::string source_id_;
     bool is_publisher_;
-    aitt::MQ mq;
+    std::unique_ptr<aitt::MQ> mq;
 
     ConnectionState connection_state_;
     std::function<void(ConnectionState)> connection_state_changed_cb_;
index 18c5a387fba9589101ce9997d31838c4957e45b2..ac1945759378111651b1894ee104e7a3b511c20a 100644 (file)
@@ -23,6 +23,7 @@
 #include <memory>
 #include <stdexcept>
 
+#include "MQProxy.h"
 #include "aitt_internal.h"
 
 #define WEBRTC_ROOM_ID_PREFIX std::string(AITT_MANAGED_TOPIC_PREFIX "webrtc/room/Room.webrtc")
@@ -34,7 +35,7 @@ AITT::Impl::Impl(AITT &parent, const std::string &id, const std::string &my_ip,
       bool custom_broker)
       : public_api(parent),
         id_(id),
-        mq(id, clear_session),
+        mq(new MQProxy(id, clear_session, custom_broker)),
         discovery(id, custom_broker),
         reply_id(0),
         modules{0}
@@ -81,16 +82,16 @@ void AITT::Impl::ThreadMain(void)
 void AITT::Impl::SetWillInfo(const std::string &topic, const void *data, const size_t datalen,
       AittQoS qos, bool retain)
 {
-    mq.SetWillInfo(topic, data, datalen, qos, retain);
+    mq->SetWillInfo(topic, data, datalen, qos, retain);
 }
 
 void AITT::Impl::SetConnectionCallback(ConnectionCallback cb, void *user_data)
 {
     if (cb)
-        mq.SetConnectionCallback(
+        mq->SetConnectionCallback(
               std::bind(&Impl::ConnectionCB, this, cb, user_data, std::placeholders::_1));
     else
-        mq.SetConnectionCallback(nullptr);
+        mq->SetConnectionCallback(nullptr);
 }
 
 void AITT::Impl::ConnectionCB(ConnectionCallback cb, void *user_data, int status)
@@ -104,7 +105,7 @@ void AITT::Impl::Connect(const std::string &host, int port, const std::string &u
       const std::string &password)
 {
     discovery.Start(host, port, username, password);
-    mq.Connect(host, port, username, password);
+    mq->Connect(host, port, username, password);
 
     mqtt_broker_ip_ = host;
     mqtt_broker_port_ = port;
@@ -117,7 +118,7 @@ void AITT::Impl::Disconnect(void)
     mqtt_broker_ip_.clear();
     mqtt_broker_port_ = -1;
 
-    mq.Disconnect();
+    mq->Disconnect();
     discovery.Stop();
 }
 
@@ -128,7 +129,7 @@ void AITT::Impl::UnsubscribeAll()
     for (auto subscribe_info : subscribed_list) {
         switch (subscribe_info->first) {
         case AITT_TYPE_MQTT:
-            mq.Unsubscribe(subscribe_info->second);
+            mq->Unsubscribe(subscribe_info->second);
             break;
         case AITT_TYPE_TCP:
             GetTransport(ModuleLoader::TYPE_TCP)->Unsubscribe(subscribe_info->second);
@@ -161,7 +162,7 @@ void AITT::Impl::Publish(const std::string &topic, const void *data, const size_
       AittProtocol protocols, AittQoS qos, bool retain)
 {
     if ((protocols & AITT_TYPE_MQTT) == AITT_TYPE_MQTT)
-        mq.Publish(topic, data, datalen, qos, retain);
+        mq->Publish(topic, data, datalen, qos, retain);
 
     if ((protocols & AITT_TYPE_TCP) == AITT_TYPE_TCP)
         GetTransport(ModuleLoader::TYPE_TCP)->Publish(topic, data, datalen, qos, retain);
@@ -224,7 +225,7 @@ AittSubscribeID AITT::Impl::Subscribe(const std::string &topic, const AITT::Subs
 AittSubscribeID AITT::Impl::SubscribeMQ(SubscribeInfo *handle, MainLoopHandler *loop_handle,
       const std::string &topic, const SubscribeCallback &cb, void *user_data, AittQoS qos)
 {
-    return mq.Subscribe(
+    return mq->Subscribe(
           topic,
           [this, handle, loop_handle, cb](MSG *msg, const std::string &topic, const void *data,
                 const size_t datalen, void *mq_user_data) {
@@ -269,7 +270,7 @@ void *AITT::Impl::Unsubscribe(AittSubscribeID subscribe_id)
     SubscribeInfo *found_info = *it;
     switch (found_info->first) {
     case AITT_TYPE_MQTT:
-        user_data = mq.Unsubscribe(found_info->second);
+        user_data = mq->Unsubscribe(found_info->second);
         break;
     case AITT_TYPE_TCP: {
         auto tcpModule = GetTransport(ModuleLoader::TYPE_TCP);
@@ -316,7 +317,7 @@ int AITT::Impl::PublishWithReply(const std::string &topic, const void *data, con
           },
           user_data, protocol, qos);
 
-    mq.PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
+    mq->PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
     return 0;
 }
 
@@ -362,7 +363,7 @@ int AITT::Impl::PublishWithReplySync(const std::string &topic, const void *data,
         subscribed_list.push_back(info);
     }
 
-    mq.PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
+    mq->PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
     if (timeout_ms)
         HandleTimeout(timeout_ms, timeout_id, sync_loop, is_timeout);
 
@@ -398,7 +399,7 @@ void AITT::Impl::SendReply(MSG *msg, const void *data, const int datalen, bool e
         msg->IncreaseSequence();
     msg->SetEndSequence(end);
 
-    mq.SendReply(msg, data, datalen, AITT_QOS_AT_MOST_ONCE, false);
+    mq->SendReply(msg, data, datalen, AITT_QOS_AT_MOST_ONCE, false);
 }
 
 void *AITT::Impl::SubscribeTCP(SubscribeInfo *handle, const std::string &topic,
index 6c47bc1ee4b04efee111b5447a5372f3e6ac2cd0..aa26e3ba18453373e01857853675523f97df7443 100644 (file)
@@ -89,7 +89,7 @@ class AITT::Impl {
     std::string id_;
     std::string mqtt_broker_ip_;
     int mqtt_broker_port_;
-    MQ mq;
+    std::unique_ptr<MQ> mq;
     AittDiscovery discovery;
     unsigned short reply_id;
     ModuleObj *modules[ModuleLoader::TYPE_TRANSPORT_MAX];
index 96dda23508c47ce94ac2a09f6c08546a6b0e6b8d..e81fb68538310c7b6c5a7098d2e28c4eacfbb6c1 100644 (file)
@@ -7,7 +7,7 @@ INCLUDE_DIRECTORIES(${UT_NEEDS_INCLUDE_DIRS})
 LINK_DIRECTORIES(${UT_NEEDS_LIBRARY_DIRS})
 
 ###########################################################################
-SET(AITT_UT_SRC AITT_test.cc RequestResponse_test.cc MainLoopHandler_test.cc aitt_c_test.cc AITT_TCP_test.cc MQ_test.cc)
+SET(AITT_UT_SRC AITT_test.cc RequestResponse_test.cc MainLoopHandler_test.cc aitt_c_test.cc AITT_TCP_test.cc MosquittoMQ_test.cc)
 ADD_EXECUTABLE(${AITT_UT} ${AITT_UT_SRC})
 TARGET_LINK_LIBRARIES(${AITT_UT} Threads::Threads ${UT_NEEDS_LIBRARIES} ${PROJECT_NAME})
 
@@ -31,8 +31,8 @@ INSTALL(TARGETS ${AITT_UT}_manual DESTINATION ${AITT_TEST_BINDIR})
 
 ###########################################################################
 AUX_SOURCE_DIRECTORY(../mock MOCK_SRC)
-ADD_EXECUTABLE(${AITT_UT}_mq MQ_mocktest.cc ${MOCK_SRC})
-TARGET_LINK_LIBRARIES(${AITT_UT}_mq ${UT_NEEDS_LIBRARIES} Threads::Threads ${AITT_NEEDS_LIBRARIES} ${AITT_COMMON})
+ADD_EXECUTABLE(${AITT_UT}_mq MosquittoMQ_mocktest.cc ${MOCK_SRC})
+TARGET_LINK_LIBRARIES(${AITT_UT}_mq ${UT_NEEDS_LIBRARIES} Threads::Threads ${AITT_COMMON})
 TARGET_INCLUDE_DIRECTORIES(${AITT_UT}_mq PRIVATE ../mock)
 INSTALL(TARGETS ${AITT_UT}_mq DESTINATION ${AITT_TEST_BINDIR})
 
diff --git a/tests/MQ_mocktest.cc b/tests/MQ_mocktest.cc
deleted file mode 100644 (file)
index 4132e5a..0000000
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
-#include <condition_variable>
-#include <mutex>
-
-#include "AittTypes.h"
-#include "MQ.h"
-#include "MQMockTest.h"
-#include "MQTTMock.h"
-
-using ::testing::Return;
-
-#define TEST_TOPIC "Test/Topic"
-#define TEST_PAYLOAD "The last will is ..."
-#define TEST_CLIENT_ID "testClient"
-#define TEST_PORT 8123
-#define TEST_HOST "localhost"
-#define TEST_HANDLE reinterpret_cast<mosquitto *>(0xbeefbeef)
-
-TEST_F(MQMockTest, Negative_Create_lib_init_Anytime)
-{
-    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_NOT_SUPPORTED));
-    EXPECT_CALL(GetMock(), mosquitto_destroy(nullptr)).WillOnce(Return());
-    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-
-    try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
-        FAIL() << "lib_init must be failed";
-    } catch (std::exception &e) {
-        ASSERT_STREQ(e.what(), "MQTT failure : MQ Constructor Error");
-    }
-}
-
-TEST_F(MQMockTest, Negative_Create_new_Anytime)
-{
-    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
-          .WillOnce(Return(nullptr));
-    EXPECT_CALL(GetMock(), mosquitto_destroy(nullptr)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-
-    try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
-        FAIL() << "lib_init must be failed";
-    } catch (std::exception &e) {
-        ASSERT_STREQ(e.what(), "MQTT failure : MQ Constructor Error");
-    }
-}
-
-TEST_F(MQMockTest, Positive_Publish_Anytime)
-{
-    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
-          .WillOnce(Return(TEST_HANDLE));
-    EXPECT_CALL(GetMock(),
-          mosquitto_int_option(TEST_HANDLE, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5))
-          .WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_loop_start(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
-          .WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_publish(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC),
-                                 sizeof(TEST_PAYLOAD), TEST_PAYLOAD, AITT_QOS_AT_MOST_ONCE, false))
-          .WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-
-    try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
-        mq.Connect(TEST_HOST, TEST_PORT, "", "");
-        mq.Publish(TEST_TOPIC, TEST_PAYLOAD, sizeof(TEST_PAYLOAD));
-    } catch (std::exception &e) {
-        FAIL() << "Unexpected exception: " << e.what();
-    }
-}
-
-TEST_F(MQMockTest, Positive_Subscribe_Anytime)
-{
-    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
-          .WillOnce(Return(TEST_HANDLE));
-    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_loop_start(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
-          .WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_subscribe(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC),
-                                 AITT_QOS_AT_MOST_ONCE))
-          .WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-
-    try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
-        mq.Connect(TEST_HOST, TEST_PORT, "", "");
-        mq.Subscribe(
-              TEST_TOPIC,
-              [](aitt::MSG *info, const std::string &topic, const void *msg, const int szmsg,
-                    const void *cbdata) -> void {},
-              nullptr, AITT_QOS_AT_MOST_ONCE);
-    } catch (std::exception &e) {
-        FAIL() << "Unexpected exception: " << e.what();
-    }
-}
-
-TEST_F(MQMockTest, Positive_Unsubscribe_Anytime)
-{
-    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
-          .WillOnce(Return(TEST_HANDLE));
-    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_loop_start(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
-          .WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(),
-          mosquitto_subscribe(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC), 0))
-          .WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(),
-          mosquitto_unsubscribe(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC)))
-          .WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-
-    try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
-        mq.Connect(TEST_HOST, TEST_PORT, "", "");
-        void *handle = mq.Subscribe(
-              TEST_TOPIC,
-              [](aitt::MSG *info, const std::string &topic, const void *msg, const int szmsg,
-                    const void *cbdata) -> void {},
-              nullptr, AITT_QOS_AT_MOST_ONCE);
-        mq.Unsubscribe(handle);
-    } catch (std::exception &e) {
-        FAIL() << "Unexpected exception: " << e.what();
-    }
-}
-
-TEST_F(MQMockTest, Positive_Create_Anytime)
-{
-    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
-          .WillOnce(Return(TEST_HANDLE));
-    EXPECT_CALL(GetMock(),
-          mosquitto_int_option(TEST_HANDLE, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5))
-          .Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-
-    try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
-    } catch (std::exception &e) {
-        FAIL() << "Unexpected exception occurred";
-    }
-}
-
-TEST_F(MQMockTest, Negative_Connect_will_set_Anytime)
-{
-    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
-          .WillOnce(Return(TEST_HANDLE));
-    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_will_set(TEST_HANDLE, testing::StrEq("lastWill"),
-                                 sizeof(TEST_PAYLOAD), TEST_PAYLOAD, AITT_QOS_AT_MOST_ONCE, true))
-          .WillOnce(Return(MOSQ_ERR_NOMEM));
-    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
-        mq.SetWillInfo("lastWill", TEST_PAYLOAD, sizeof(TEST_PAYLOAD), AITT_QOS_AT_MOST_ONCE, true);
-        mq.Connect(TEST_HOST, TEST_PORT, "", "");
-        FAIL() << "Connect() must be failed";
-    } catch (std::exception &e) {
-        ASSERT_STREQ(e.what(), "MQTT failure");
-    }
-}
-
-TEST_F(MQMockTest, Positive_Connect_Anytime)
-{
-    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
-          .WillOnce(Return(TEST_HANDLE));
-    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
-          .WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
-        mq.Connect(TEST_HOST, TEST_PORT, "", "");
-    } catch (std::exception &e) {
-        FAIL() << "Unepxected exception: " << e.what();
-    }
-}
-
-TEST_F(MQMockTest, Positive_Connect_User_Anytime)
-{
-    std::string username = "test";
-    std::string password = "test";
-    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
-          .WillOnce(Return(TEST_HANDLE));
-    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
-    EXPECT_CALL(GetMock(),
-          mosquitto_username_pw_set(TEST_HANDLE, username.c_str(), password.c_str()))
-          .Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
-          .WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
-        mq.Connect(TEST_HOST, TEST_PORT, username, password);
-    } catch (std::exception &e) {
-        FAIL() << "Unepxected exception: " << e.what();
-    }
-}
-
-TEST_F(MQMockTest, Positive_Disconnect_Anytime)
-{
-    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
-          .WillOnce(Return(TEST_HANDLE));
-    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_disconnect(testing::_)).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_will_clear(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
-    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
-    try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
-        mq.Disconnect();
-    } catch (std::exception &e) {
-        FAIL() << "Unexpected exception: " << e.what();
-    }
-}
diff --git a/tests/MQ_test.cc b/tests/MQ_test.cc
deleted file mode 100644 (file)
index 4ff554b..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "MQ.h"
-
-#include <gtest/gtest.h>
-
-#include "aitt_internal.h"
-#include "aitt_tests.h"
-
-using MQ = aitt::MQ;
-
-class MQTest : public testing::Test, public AittTests {
-  protected:
-    void SetUp() override { Init(); }
-    void TearDown() override { Deinit(); }
-};
-
-TEST_F(MQTest, Positve_Subscribe_in_Subscribe_Anytime)
-{
-    try {
-        MQ mq("MQ_TEST_ID");
-        mq.Connect(LOCAL_IP, 1883, "", "");
-        mq.Subscribe(
-              "MQ_TEST_TOPIC1",
-              [&](aitt::MSG *handle, const std::string &topic, const void *data,
-                    const size_t datalen, void *user_data) {
-                  DBG("Subscribe invoked: %s %zu", static_cast<const char *>(data), datalen);
-
-                  mq.Subscribe(
-                        "topic1InCallback",
-                        [](aitt::MSG *handle, const std::string &topic, const void *msg,
-                              const size_t szmsg, void *cbdata) {},
-                        user_data);
-
-                  mq.Subscribe(
-                        "topic2InCallback",
-                        [](aitt::MSG *handle, const std::string &topic, const void *msg,
-                              const size_t szmsg, void *cbdata) {},
-                        user_data);
-                  g_timeout_add(
-                        100,
-                        [](gpointer cbdata) -> gboolean {
-                            MQTest *test = static_cast<MQTest *>(cbdata);
-                            test->ToggleReady();
-                            return G_SOURCE_REMOVE;
-                        },
-                        user_data);
-              },
-              static_cast<void *>(this));
-
-        DBG("Publish message to %s (%s)", "MQ_TEST_TOPIC1", TEST_MSG);
-        mq.Publish("MQ_TEST_TOPIC1", TEST_MSG, sizeof(TEST_MSG));
-
-        g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
-
-        IterateEventLoop();
-
-        ASSERT_TRUE(ready);
-    } catch (std::exception &e) {
-        FAIL() << "Unexpected exception: " << e.what();
-    }
-}
index 12437f3f73e058d2d8f09519137c5c6008a1d4cb..ccaefdc6c352a84b56d24f55b40ab320ddad2ed7 100644 (file)
@@ -54,3 +54,24 @@ TEST_F(ModuleLoaderTest, LoadTransport_N_Anytime)
     auto module = loader.LoadTransport(handle.get(), LOCAL_IP, discovery);
     ASSERT_NE(module, nullptr);
 }
+
+TEST_F(ModuleLoaderTest, LoadMqttClient_P_Anytime)
+{
+    ModuleLoader::ModuleHandle handle = loader.OpenModule(ModuleLoader::TYPE_CUSTOM_MQTT);
+    if (handle) {
+        EXPECT_NO_THROW({
+            auto module = loader.LoadMqttClient(handle.get(), "test", false);
+            ASSERT_NE(module, nullptr);
+        });
+    }
+}
+
+TEST_F(ModuleLoaderTest, LoadMqttClient_N_Anytime)
+{
+    EXPECT_THROW(
+          {
+              loader.LoadMqttClient(nullptr, "test", false);
+              FAIL() << "Should not be called";
+          },
+          aitt::AittException);
+}
diff --git a/tests/MosquittoMQ_mocktest.cc b/tests/MosquittoMQ_mocktest.cc
new file mode 100644 (file)
index 0000000..21140a2
--- /dev/null
@@ -0,0 +1,250 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <condition_variable>
+#include <mutex>
+
+#include "AittTypes.h"
+#include "MQMockTest.h"
+#include "MQTTMock.h"
+#include "MosquittoMQ.h"
+
+using ::testing::Return;
+
+#define TEST_TOPIC "Test/Topic"
+#define TEST_PAYLOAD "The last will is ..."
+#define TEST_CLIENT_ID "testClient"
+#define TEST_PORT 8123
+#define TEST_HOST "localhost"
+#define TEST_HANDLE reinterpret_cast<mosquitto *>(0xbeefbeef)
+
+TEST_F(MQMockTest, Negative_Create_lib_init_Anytime)
+{
+    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_NOT_SUPPORTED));
+    EXPECT_CALL(GetMock(), mosquitto_destroy(nullptr)).WillOnce(Return());
+    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+
+    try {
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+        FAIL() << "lib_init must be failed";
+    } catch (std::exception &e) {
+        ASSERT_STREQ(e.what(), "MQTT failure : MosquittoMQ Constructor Error");
+    }
+}
+
+TEST_F(MQMockTest, Negative_Create_new_Anytime)
+{
+    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+          .WillOnce(Return(nullptr));
+    EXPECT_CALL(GetMock(), mosquitto_destroy(nullptr)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+
+    try {
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+        FAIL() << "lib_init must be failed";
+    } catch (std::exception &e) {
+        ASSERT_STREQ(e.what(), "MQTT failure : MosquittoMQ Constructor Error");
+    }
+}
+
+TEST_F(MQMockTest, Positive_Publish_Anytime)
+{
+    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+          .WillOnce(Return(TEST_HANDLE));
+    EXPECT_CALL(GetMock(),
+          mosquitto_int_option(TEST_HANDLE, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5))
+          .WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_loop_start(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
+          .WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_publish(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC),
+                                 sizeof(TEST_PAYLOAD), TEST_PAYLOAD, AITT_QOS_AT_MOST_ONCE, false))
+          .WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+
+    try {
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+        mq.Connect(TEST_HOST, TEST_PORT, "", "");
+        mq.Publish(TEST_TOPIC, TEST_PAYLOAD, sizeof(TEST_PAYLOAD));
+    } catch (std::exception &e) {
+        FAIL() << "Unexpected exception: " << e.what();
+    }
+}
+
+TEST_F(MQMockTest, Positive_Subscribe_Anytime)
+{
+    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+          .WillOnce(Return(TEST_HANDLE));
+    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_loop_start(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
+          .WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_subscribe(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC),
+                                 AITT_QOS_AT_MOST_ONCE))
+          .WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+
+    try {
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+        mq.Connect(TEST_HOST, TEST_PORT, "", "");
+        mq.Subscribe(
+              TEST_TOPIC,
+              [](aitt::MSG *info, const std::string &topic, const void *msg, const int szmsg,
+                    const void *cbdata) -> void {},
+              nullptr, AITT_QOS_AT_MOST_ONCE);
+    } catch (std::exception &e) {
+        FAIL() << "Unexpected exception: " << e.what();
+    }
+}
+
+TEST_F(MQMockTest, Positive_Unsubscribe_Anytime)
+{
+    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+          .WillOnce(Return(TEST_HANDLE));
+    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_loop_start(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
+          .WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(),
+          mosquitto_subscribe(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC), 0))
+          .WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(),
+          mosquitto_unsubscribe(TEST_HANDLE, testing::_, testing::StrEq(TEST_TOPIC)))
+          .WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+
+    try {
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+        mq.Connect(TEST_HOST, TEST_PORT, "", "");
+        void *handle = mq.Subscribe(
+              TEST_TOPIC,
+              [](aitt::MSG *info, const std::string &topic, const void *msg, const int szmsg,
+                    const void *cbdata) -> void {},
+              nullptr, AITT_QOS_AT_MOST_ONCE);
+        mq.Unsubscribe(handle);
+    } catch (std::exception &e) {
+        FAIL() << "Unexpected exception: " << e.what();
+    }
+}
+
+TEST_F(MQMockTest, Positive_Create_Anytime)
+{
+    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+          .WillOnce(Return(TEST_HANDLE));
+    EXPECT_CALL(GetMock(),
+          mosquitto_int_option(TEST_HANDLE, MOSQ_OPT_PROTOCOL_VERSION, MQTT_PROTOCOL_V5))
+          .Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+
+    try {
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+    } catch (std::exception &e) {
+        FAIL() << "Unexpected exception occurred";
+    }
+}
+
+TEST_F(MQMockTest, Negative_Connect_will_set_Anytime)
+{
+    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+          .WillOnce(Return(TEST_HANDLE));
+    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_will_set(TEST_HANDLE, testing::StrEq("lastWill"),
+                                 sizeof(TEST_PAYLOAD), TEST_PAYLOAD, AITT_QOS_AT_MOST_ONCE, true))
+          .WillOnce(Return(MOSQ_ERR_NOMEM));
+    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    try {
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+        mq.SetWillInfo("lastWill", TEST_PAYLOAD, sizeof(TEST_PAYLOAD), AITT_QOS_AT_MOST_ONCE, true);
+        mq.Connect(TEST_HOST, TEST_PORT, "", "");
+        FAIL() << "Connect() must be failed";
+    } catch (std::exception &e) {
+        ASSERT_STREQ(e.what(), "MQTT failure");
+    }
+}
+
+TEST_F(MQMockTest, Positive_Connect_Anytime)
+{
+    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+          .WillOnce(Return(TEST_HANDLE));
+    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
+          .WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    try {
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+        mq.Connect(TEST_HOST, TEST_PORT, "", "");
+    } catch (std::exception &e) {
+        FAIL() << "Unepxected exception: " << e.what();
+    }
+}
+
+TEST_F(MQMockTest, Positive_Connect_User_Anytime)
+{
+    std::string username = "test";
+    std::string password = "test";
+    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+          .WillOnce(Return(TEST_HANDLE));
+    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+    EXPECT_CALL(GetMock(),
+          mosquitto_username_pw_set(TEST_HANDLE, username.c_str(), password.c_str()))
+          .Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_connect(TEST_HANDLE, testing::StrEq(TEST_HOST), TEST_PORT, 60))
+          .WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    try {
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+        mq.Connect(TEST_HOST, TEST_PORT, username, password);
+    } catch (std::exception &e) {
+        FAIL() << "Unepxected exception: " << e.what();
+    }
+}
+
+TEST_F(MQMockTest, Positive_Disconnect_Anytime)
+{
+    EXPECT_CALL(GetMock(), mosquitto_lib_init()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_new(testing::StrEq(TEST_CLIENT_ID), true, testing::_))
+          .WillOnce(Return(TEST_HANDLE));
+    EXPECT_CALL(GetMock(), mosquitto_message_v5_callback_set(TEST_HANDLE, testing::_)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_disconnect(testing::_)).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_will_clear(TEST_HANDLE)).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
+    EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
+    try {
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
+        mq.Disconnect();
+    } catch (std::exception &e) {
+        FAIL() << "Unexpected exception: " << e.what();
+    }
+}
diff --git a/tests/MosquittoMQ_test.cc b/tests/MosquittoMQ_test.cc
new file mode 100644 (file)
index 0000000..d0937b2
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MosquittoMQ.h"
+
+#include <gtest/gtest.h>
+
+#include "aitt_internal.h"
+#include "aitt_tests.h"
+
+using MosquittoMQ = aitt::MosquittoMQ;
+
+class MQTest : public testing::Test, public AittTests {
+  protected:
+    void SetUp() override { Init(); }
+    void TearDown() override { Deinit(); }
+};
+
+TEST_F(MQTest, Positve_Subscribe_in_Subscribe_Anytime)
+{
+    try {
+        MosquittoMQ mq("MQ_TEST_ID");
+        mq.Connect(LOCAL_IP, 1883, "", "");
+        mq.Subscribe(
+              "MQ_TEST_TOPIC1",
+              [&](aitt::MSG *handle, const std::string &topic, const void *data,
+                    const size_t datalen, void *user_data) {
+                  DBG("Subscribe invoked: %s %zu", static_cast<const char *>(data), datalen);
+
+                  mq.Subscribe(
+                        "topic1InCallback",
+                        [](aitt::MSG *handle, const std::string &topic, const void *msg,
+                              const size_t szmsg, void *cbdata) {},
+                        user_data);
+
+                  mq.Subscribe(
+                        "topic2InCallback",
+                        [](aitt::MSG *handle, const std::string &topic, const void *msg,
+                              const size_t szmsg, void *cbdata) {},
+                        user_data);
+                  g_timeout_add(
+                        100,
+                        [](gpointer cbdata) -> gboolean {
+                            MQTest *test = static_cast<MQTest *>(cbdata);
+                            test->ToggleReady();
+                            return G_SOURCE_REMOVE;
+                        },
+                        user_data);
+              },
+              static_cast<void *>(this));
+
+        DBG("Publish message to %s (%s)", "MQ_TEST_TOPIC1", TEST_MSG);
+        mq.Publish("MQ_TEST_TOPIC1", TEST_MSG, sizeof(TEST_MSG));
+
+        g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
+
+        IterateEventLoop();
+
+        ASSERT_TRUE(ready);
+    } catch (std::exception &e) {
+        FAIL() << "Unexpected exception: " << e.what();
+    }
+}