apply MQ modularization
authorYoungjae Shin <yj99.shin@samsung.com>
Mon, 22 Aug 2022 01:29:53 +0000 (10:29 +0900)
committerYoungjae Shin <yj99.shin@samsung.com>
Thu, 15 Sep 2022 05:29:05 +0000 (14:29 +0900)
- apply MQProxy

17 files changed:
common/AittDiscovery.cc
common/AittDiscovery.h
common/MQProxy.cc [new file with mode: 0644]
common/MQProxy.h [new file with mode: 0644]
common/ModuleLoader.cc
common/ModuleLoader.h
common/MosquittoMQ.cc [moved from common/MQ.cc with 84% similarity]
common/MosquittoMQ.h [moved from common/MQ.h with 87% similarity]
include/MQ.h [new file with mode: 0644]
modules/webrtc/MqttServer.cc
modules/webrtc/MqttServer.h
src/AITTImpl.cc
src/AITTImpl.h
tests/CMakeLists.txt
tests/ModuleLoader_test.cc
tests/MosquittoMQ_mocktest.cc [moved from tests/MQ_mocktest.cc with 93% similarity]
tests/MosquittoMQ_test.cc [moved from tests/MQ_test.cc with 96% similarity]

index 8f75103..3d80864 100644 (file)
 #include <atomic>
 
 #include "AittException.h"
+#include "MQProxy.h"
 #include "aitt_internal.h"
 
 namespace aitt {
 
 AittDiscovery::AittDiscovery(const std::string &id, bool custom_broker)
-      : id_(id), discovery_mq(id + "d", true), callback_handle(nullptr)
+      : id_(id), discovery_mq(new MQProxy(id + "d", true, custom_broker)), callback_handle(nullptr)
 {
 }
 
@@ -34,19 +35,19 @@ void AittDiscovery::Start(const std::string &host, int port, const std::string &
 {
     RET_IF(callback_handle);
 
-    discovery_mq.SetWillInfo(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
-    discovery_mq.Connect(host, port, username, password);
+    discovery_mq->SetWillInfo(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
+    discovery_mq->Connect(host, port, username, password);
 
-    callback_handle = discovery_mq.Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback,
+    callback_handle = discovery_mq->Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback,
           static_cast<void *>(this), AITT_QOS_EXACTLY_ONCE);
 }
 
 void AittDiscovery::Stop()
 {
-    discovery_mq.Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
-    discovery_mq.Unsubscribe(callback_handle);
+    discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
+    discovery_mq->Unsubscribe(callback_handle);
     callback_handle = nullptr;
-    discovery_mq.Disconnect();
+    discovery_mq->Disconnect();
 }
 
 void AittDiscovery::UpdateDiscoveryMsg(AittProtocol protocol, const void *msg, size_t length)
@@ -139,7 +140,7 @@ void AittDiscovery::PublishDiscoveryMsg()
     fbb.Finish();
 
     auto buf = fbb.GetBuffer();
-    discovery_mq.Publish(DISCOVERY_TOPIC_BASE + id_, buf.data(), buf.size(), AITT_QOS_EXACTLY_ONCE,
+    discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, buf.data(), buf.size(), AITT_QOS_EXACTLY_ONCE,
           true);
 }
 
index fd6ba1d..4a3797d 100644 (file)
@@ -56,7 +56,7 @@ class AittDiscovery {
     AittProtocol GetProtocol(const std::string &protocol_str);
 
     std::string id_;
-    MQ discovery_mq;
+    std::unique_ptr<MQ> discovery_mq;
     void *callback_handle;
     std::map<AittProtocol, DiscoveryBlob> discovery_map;
     std::map<int, std::pair<AittProtocol, DiscoveryCallback>> callbacks;
diff --git a/common/MQProxy.cc b/common/MQProxy.cc
new file mode 100644 (file)
index 0000000..0c259de
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MQProxy.h"
+
+#include "ModuleLoader.h"
+#include "MosquittoMQ.h"
+
+namespace aitt {
+
+MQProxy::MQProxy(const std::string &id, bool clear_session, bool is_custom_broker)
+      : handle(nullptr, nullptr)
+{
+    if (is_custom_broker) {
+        ModuleLoader loader;
+        handle = loader.OpenModule(ModuleLoader::TYPE_CUSTOM_MQTT);
+
+        mq = loader.LoadMqttClient(handle.get(), "test", true);
+    } else {
+        mq = std::shared_ptr<MQ>(new MosquittoMQ(id, clear_session));
+    }
+}
+
+void MQProxy::SetConnectionCallback(const MQConnectionCallback &cb)
+{
+    mq->SetConnectionCallback(cb);
+}
+
+void MQProxy::Connect(const std::string &host, int port, const std::string &username,
+      const std::string &password)
+{
+    mq->Connect(host, port, username, password);
+}
+
+void MQProxy::SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos,
+      bool retain)
+{
+    mq->SetWillInfo(topic, msg, szmsg, qos, retain);
+}
+
+void MQProxy::Disconnect(void)
+{
+    mq->Disconnect();
+}
+
+void MQProxy::Publish(const std::string &topic, const void *data, const size_t datalen, int qos,
+      bool retain)
+{
+    mq->Publish(topic, data, datalen, qos, retain);
+}
+
+void MQProxy::PublishWithReply(const std::string &topic, const void *data, const size_t datalen,
+      int qos, bool retain, const std::string &reply_topic, const std::string &correlation)
+{
+    mq->PublishWithReply(topic, data, datalen, qos, retain, reply_topic, correlation);
+}
+
+void MQProxy::SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain)
+{
+    mq->SendReply(msg, data, datalen, qos, retain);
+}
+
+void *MQProxy::Subscribe(const std::string &topic, const SubscribeCallback &cb, void *user_data,
+      int qos)
+{
+    return mq->Subscribe(topic, cb, user_data, qos);
+}
+
+void *MQProxy::Unsubscribe(void *handle)
+{
+    return mq->Unsubscribe(handle);
+}
+
+}  // namespace aitt
diff --git a/common/MQProxy.h b/common/MQProxy.h
new file mode 100644 (file)
index 0000000..68a30f2
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+
+#include "MQ.h"
+
+namespace aitt {
+
+class MQProxy : public MQ {
+  public:
+    explicit MQProxy(const std::string &id, bool clear_session, bool custom_broker);
+    virtual ~MQProxy() = default;
+
+    void SetConnectionCallback(const MQConnectionCallback &cb);
+    void Connect(const std::string &host, int port, const std::string &username,
+          const std::string &password);
+    void SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos, bool retain);
+    void Disconnect(void);
+    void Publish(const std::string &topic, const void *data, const size_t datalen, int qos = 0,
+          bool retain = false);
+    void PublishWithReply(const std::string &topic, const void *data, const size_t datalen, int qos,
+          bool retain, const std::string &reply_topic, const std::string &correlation);
+    void SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain);
+    void *Subscribe(const std::string &topic, const SubscribeCallback &cb,
+          void *user_data = nullptr, int qos = 0);
+    void *Unsubscribe(void *handle);
+
+  private:
+    std::unique_ptr<void, void (*)(const void *)> handle;
+    std::shared_ptr<MQ> mq;
+};
+
+}  // namespace aitt
index c62229d..51cdc78 100644 (file)
@@ -31,6 +31,8 @@ std::string ModuleLoader::GetModuleFilename(Type type)
         return "libaitt-transport-tcp.so";
     if (type == TYPE_WEBRTC)
         return "libaitt-transport-webrtc.so";
+    if (type == TYPE_CUSTOM_MQTT)
+        return "libaitt-st-broker.so";
 
     return std::string("Unknown");
 }
@@ -76,4 +78,24 @@ std::shared_ptr<AittTransport> ModuleLoader::LoadTransport(void *handle, const s
     return instance;
 }
 
+std::shared_ptr<MQ> ModuleLoader::LoadMqttClient(void *handle, const std::string &id,
+      bool clear_session)
+{
+    MQ::ModuleEntry get_instance_fn =
+          reinterpret_cast<MQ::ModuleEntry>(dlsym(handle, MQ::MODULE_ENTRY_NAME));
+    if (get_instance_fn == nullptr) {
+        ERR("dlsym: %s", dlerror());
+        throw AittException(AittException::SYSTEM_ERR);
+    }
+
+    std::shared_ptr<MQ> instance(static_cast<MQ *>(get_instance_fn(id.c_str(), clear_session)),
+          [](const MQ *instance) { delete instance; });
+    if (instance == nullptr) {
+        ERR("Failed to create a new instance");
+        throw AittException(AittException::SYSTEM_ERR);
+    }
+
+    return instance;
+}
+
 }  // namespace aitt
index b1ecfb3..66e339d 100644 (file)
@@ -32,6 +32,7 @@ class ModuleLoader {
         TYPE_WEBRTC,
         TYPE_RTSP,
         TYPE_TRANSPORT_MAX,
+        TYPE_CUSTOM_MQTT,
     };
 
     using ModuleHandle = std::unique_ptr<void, void (*)(const void *)>;
@@ -42,6 +43,7 @@ class ModuleLoader {
     ModuleHandle OpenModule(Type type);
     std::shared_ptr<AittTransport> LoadTransport(void *handle, const std::string &ip,
           AittDiscovery &discovery);
+    std::shared_ptr<MQ> LoadMqttClient(void *handle, const std::string &id, bool clear_session);
 
   private:
     std::string GetModuleFilename(Type type);
similarity index 84%
rename from common/MQ.cc
rename to common/MosquittoMQ.cc
index 951b9f1..f10532e 100644 (file)
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "MQ.h"
+#include "MosquittoMQ.h"
 
 #include <mqtt_protocol.h>
 #include <sys/types.h>
 
 #include "AittException.h"
 #include "AittTypes.h"
+#include "AittUtil.h"
 #include "aitt_internal.h"
 
 namespace aitt {
 
-const std::string MQ::REPLY_SEQUENCE_NUM_KEY = "sequenceNum";
-const std::string MQ::REPLY_IS_END_SEQUENCE_KEY = "isEndSequence";
+const std::string MosquittoMQ::REPLY_SEQUENCE_NUM_KEY = "sequenceNum";
+const std::string MosquittoMQ::REPLY_IS_END_SEQUENCE_KEY = "isEndSequence";
 
-MQ::MQ(const std::string &id, bool clear_session)
+MosquittoMQ::MosquittoMQ(const std::string &id, bool clear_session)
       : handle(nullptr),
         keep_alive(60),
         subscribers_iterating(false),
@@ -74,10 +75,10 @@ MQ::MQ(const std::string &id, bool clear_session)
 
     mosquitto_destroy(handle);
     mosquitto_lib_cleanup();
-    throw AittException(AittException::MQTT_ERR, std::string("MQ Constructor Error"));
+    throw AittException(AittException::MQTT_ERR, std::string("MosquittoMQ Constructor Error"));
 }
 
-MQ::~MQ(void)
+MosquittoMQ::~MosquittoMQ(void)
 {
     int ret;
     INFO("Destructor");
@@ -98,17 +99,17 @@ MQ::~MQ(void)
         ERR("mosquitto_lib_cleanup() Fail(%s)", mosquitto_strerror(ret));
 }
 
-void MQ::SetConnectionCallback(const MQConnectionCallback &cb)
+void MosquittoMQ::SetConnectionCallback(const MQConnectionCallback &cb)
 {
     std::lock_guard<std::recursive_mutex> lock_from_here(callback_lock);
     connect_cb = cb;
 }
 
-void MQ::ConnectCallback(struct mosquitto *mosq, void *obj, int rc, int flag,
+void MosquittoMQ::ConnectCallback(struct mosquitto *mosq, void *obj, int rc, int flag,
       const mosquitto_property *props)
 {
     RET_IF(obj == nullptr);
-    MQ *mq = static_cast<MQ *>(obj);
+    MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
 
     INFO("Connected : rc(%d), flag(%d)", rc, flag);
 
@@ -117,11 +118,11 @@ void MQ::ConnectCallback(struct mosquitto *mosq, void *obj, int rc, int flag,
         mq->connect_cb(AITT_CONNECTED);
 }
 
-void MQ::DisconnectCallback(struct mosquitto *mosq, void *obj, int rc,
+void MosquittoMQ::DisconnectCallback(struct mosquitto *mosq, void *obj, int rc,
       const mosquitto_property *props)
 {
     RET_IF(obj == nullptr);
-    MQ *mq = static_cast<MQ *>(obj);
+    MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
 
     INFO("Disconnected : rc(%d)", rc);
 
@@ -130,7 +131,7 @@ void MQ::DisconnectCallback(struct mosquitto *mosq, void *obj, int rc,
         mq->connect_cb(AITT_DISCONNECTED);
 }
 
-void MQ::Connect(const std::string &host, int port, const std::string &username,
+void MosquittoMQ::Connect(const std::string &host, int port, const std::string &username,
       const std::string &password)
 {
     int ret;
@@ -151,7 +152,8 @@ void MQ::Connect(const std::string &host, int port, const std::string &username,
     }
 }
 
-void MQ::SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos, bool retain)
+void MosquittoMQ::SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos,
+      bool retain)
 {
     int ret = mosquitto_will_set(handle, topic.c_str(), szmsg, msg, qos, retain);
     if (ret != MOSQ_ERR_SUCCESS) {
@@ -160,7 +162,7 @@ void MQ::SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, in
     }
 }
 
-void MQ::Disconnect(void)
+void MosquittoMQ::Disconnect(void)
 {
     int ret = mosquitto_disconnect(handle);
     if (ret != MOSQ_ERR_SUCCESS) {
@@ -171,11 +173,11 @@ void MQ::Disconnect(void)
     mosquitto_will_clear(handle);
 }
 
-void MQ::MessageCallback(mosquitto *handle, void *obj, const mosquitto_message *msg,
+void MosquittoMQ::MessageCallback(mosquitto *handle, void *obj, const mosquitto_message *msg,
       const mosquitto_property *props)
 {
     RET_IF(obj == nullptr);
-    MQ *mq = static_cast<MQ *>(obj);
+    MosquittoMQ *mq = static_cast<MosquittoMQ *>(obj);
 
     std::lock_guard<std::recursive_mutex> auto_lock(mq->callback_lock);
     mq->subscribers_iterating = true;
@@ -185,7 +187,7 @@ void MQ::MessageCallback(mosquitto *handle, void *obj, const mosquitto_message *
         if (nullptr == subscribe_data)
             ERR("end() is not valid because elements were added.");
 
-        bool result = CompareTopic(subscribe_data->topic.c_str(), msg->topic);
+        bool result = AittUtil::CompareTopic(subscribe_data->topic.c_str(), msg->topic);
         if (result)
             mq->InvokeCallback(msg, props);
 
@@ -200,7 +202,7 @@ void MQ::MessageCallback(mosquitto *handle, void *obj, const mosquitto_message *
     mq->new_subscribers.clear();
 }
 
-void MQ::InvokeCallback(const mosquitto_message *msg, const mosquitto_property *props)
+void MosquittoMQ::InvokeCallback(const mosquitto_message *msg, const mosquitto_property *props)
 {
     MSG mq_msg;
     mq_msg.SetTopic(msg->topic);
@@ -250,7 +252,7 @@ void MQ::InvokeCallback(const mosquitto_message *msg, const mosquitto_property *
     cb_info->cb(&mq_msg, msg->topic, msg->payload, msg->payloadlen, cb_info->user_data);
 }
 
-void MQ::Publish(const std::string &topic, const void *data, const size_t datalen, int qos,
+void MosquittoMQ::Publish(const std::string &topic, const void *data, const size_t datalen, int qos,
       bool retain)
 {
     int mid = -1;
@@ -261,8 +263,8 @@ void MQ::Publish(const std::string &topic, const void *data, const size_t datale
     }
 }
 
-void MQ::PublishWithReply(const std::string &topic, const void *data, const size_t datalen, int qos,
-      bool retain, const std::string &reply_topic, const std::string &correlation)
+void MosquittoMQ::PublishWithReply(const std::string &topic, const void *data, const size_t datalen,
+      int qos, bool retain, const std::string &reply_topic, const std::string &correlation)
 {
     int ret;
     int mid = -1;
@@ -287,7 +289,7 @@ void MQ::PublishWithReply(const std::string &topic, const void *data, const size
     }
 }
 
-void MQ::SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain)
+void MosquittoMQ::SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bool retain)
 {
     RET_IF(msg == nullptr);
 
@@ -325,7 +327,8 @@ void MQ::SendReply(MSG *msg, const void *data, const size_t datalen, int qos, bo
     }
 }
 
-void *MQ::Subscribe(const std::string &topic, const SubscribeCallback &cb, void *user_data, int qos)
+void *MosquittoMQ::Subscribe(const std::string &topic, const SubscribeCallback &cb, void *user_data,
+      int qos)
 {
     int mid = -1;
     int ret = mosquitto_subscribe(handle, &mid, topic.c_str(), qos);
@@ -344,7 +347,7 @@ void *MQ::Subscribe(const std::string &topic, const SubscribeCallback &cb, void
     return static_cast<void *>(data);
 }
 
-void *MQ::Unsubscribe(void *sub_handle)
+void *MosquittoMQ::Unsubscribe(void *sub_handle)
 {
     std::lock_guard<std::recursive_mutex> auto_lock(callback_lock);
     auto it = std::find(subscribers.begin(), subscribers.end(),
@@ -378,20 +381,8 @@ void *MQ::Unsubscribe(void *sub_handle)
     return user_data;
 }
 
-bool MQ::CompareTopic(const std::string &left, const std::string &right)
-{
-    bool result = false;
-    int ret = mosquitto_topic_matches_sub(left.c_str(), right.c_str(), &result);
-    if (ret != MOSQ_ERR_SUCCESS) {
-        ERR("mosquitto_topic_matches_sub(%s, %s) Fail(%s)", left.c_str(), right.c_str(),
-              mosquitto_strerror(ret));
-        throw AittException(AittException::MQTT_ERR);
-    }
-    return result;
-}
-
-MQ::SubscribeData::SubscribeData(const std::string &in_topic, const SubscribeCallback &in_cb,
-      void *in_user_data)
+MosquittoMQ::SubscribeData::SubscribeData(const std::string &in_topic,
+      const SubscribeCallback &in_cb, void *in_user_data)
       : topic(in_topic), cb(in_cb), user_data(in_user_data)
 {
 }
similarity index 87%
rename from common/MQ.h
rename to common/MosquittoMQ.h
index bdcf939..6c93809 100644 (file)
@@ -22,6 +22,7 @@
 #include <string>
 #include <vector>
 
+#include "MQ.h"
 #include "MSG.h"
 
 #define MQTT_LOCALHOST "127.0.0.1"
 
 namespace aitt {
 
-class MQ {
+class MosquittoMQ : public MQ {
   public:
-    using SubscribeCallback = std::function<void(MSG *msg, const std::string &topic,
-          const void *data, const size_t datalen, void *user_data)>;
-    using MQConnectionCallback = std::function<void(int)>;
-
-    explicit MQ(const std::string &id, bool clear_session = false);
-    virtual ~MQ(void);
-
-    static bool CompareTopic(const std::string &left, const std::string &right);
+    explicit MosquittoMQ(const std::string &id, bool clear_session = false);
+    virtual ~MosquittoMQ(void);
 
     void SetConnectionCallback(const MQConnectionCallback &cb);
     void Connect(const std::string &host, int port, const std::string &username,
diff --git a/include/MQ.h b/include/MQ.h
new file mode 100644 (file)
index 0000000..644c8d1
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2021-2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <MSG.h>
+
+#include <functional>
+#include <string>
+
+#define AITT_MQ_NEW aitt_mq_new
+#define TO_STR(s) #s
+#define DEFINE_TO_STR(x) TO_STR(x)
+
+namespace aitt {
+
+class MQ {
+  public:
+    typedef void *(*ModuleEntry)(const char *id, bool clear_session);
+
+    using SubscribeCallback = std::function<void(MSG *msg, const std::string &topic,
+          const void *data, const size_t datalen, void *user_data)>;
+    using MQConnectionCallback = std::function<void(int)>;
+
+    static constexpr const char *const MODULE_ENTRY_NAME = DEFINE_TO_STR(AITT_MQ_NEW);
+
+    MQ() = default;
+    virtual ~MQ() = default;
+
+    virtual void SetConnectionCallback(const MQConnectionCallback &cb) = 0;
+    virtual void Connect(const std::string &host, int port, const std::string &username,
+          const std::string &password) = 0;
+    virtual void SetWillInfo(const std::string &topic, const void *msg, size_t szmsg, int qos,
+          bool retain) = 0;
+    virtual void Disconnect(void) = 0;
+    virtual void Publish(const std::string &topic, const void *data, const size_t datalen,
+          int qos = 0, bool retain = false) = 0;
+    virtual void PublishWithReply(const std::string &topic, const void *data, const size_t datalen,
+          int qos, bool retain, const std::string &reply_topic, const std::string &correlation) = 0;
+    virtual void SendReply(MSG *msg, const void *data, const size_t datalen, int qos,
+          bool retain) = 0;
+    virtual void *Subscribe(const std::string &topic, const SubscribeCallback &cb,
+          void *user_data = nullptr, int qos = 0) = 0;
+    virtual void *Unsubscribe(void *handle) = 0;
+};
+
+}  // namespace aitt
+
+#undef TO_STR
+#undef DEFINE_TO_STR
index 70a07ed..a0afae5 100644 (file)
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 #include "MqttServer.h"
 
+#include "MQProxy.h"
 #include "aitt_internal.h"
 
 #define MQTT_HANDLER_MSG_QOS 1
 #define MQTT_HANDLER_MGMT_QOS 2
 
-MqttServer::MqttServer(const Config &config) : mq(config.GetLocalId(), true)
+MqttServer::MqttServer(const Config &config)
+      : mq(new aitt::MQProxy(config.GetLocalId(), true, false))
 {
     broker_ip_ = config.GetBrokerIp();
     broker_port_ = config.GetBrokerPort();
@@ -33,7 +34,7 @@ MqttServer::MqttServer(const Config &config) : mq(config.GetLocalId(), true)
     DBG("ID[%s] BROKER IP[%s] BROKER PORT [%d] ROOM[%s] %s", id_.c_str(), broker_ip_.c_str(),
           broker_port_, room_id_.c_str(), is_publisher_ ? "Publisher" : "Subscriber");
 
-    mq.SetConnectionCallback(std::bind(&MqttServer::ConnectCallBack, this, std::placeholders::_1));
+    mq->SetConnectionCallback(std::bind(&MqttServer::ConnectCallBack, this, std::placeholders::_1));
 }
 
 MqttServer::~MqttServer()
@@ -96,10 +97,10 @@ void MqttServer::RegisterWithServer(void)
     // Notify Who is source?
     std::string source_topic = room_id_ + std::string("/source");
     if (is_publisher_) {
-        mq.Publish(source_topic, id_.c_str(), id_.size(), AITT_QOS_EXACTLY_ONCE, true);
+        mq->Publish(source_topic, id_.c_str(), id_.size(), AITT_QOS_EXACTLY_ONCE, true);
         SetConnectionState(ConnectionState::Registered);
     } else {
-        mq.Subscribe(source_topic,
+        mq->Subscribe(source_topic,
               std::bind(&MqttServer::HandleSourceTopic, this, std::placeholders::_1,
                     std::placeholders::_2, std::placeholders::_3, std::placeholders::_4,
                     std::placeholders::_5),
@@ -136,11 +137,11 @@ bool MqttServer::IsConnected(void)
 int MqttServer::Connect(void)
 {
     std::string will_message = std::string("ROOM_PEER_LEFT ") + id_;
-    mq.SetWillInfo(room_id_, will_message.c_str(), will_message.size(), AITT_QOS_EXACTLY_ONCE,
+    mq->SetWillInfo(room_id_, will_message.c_str(), will_message.size(), AITT_QOS_EXACTLY_ONCE,
           false);
 
     SetConnectionState(ConnectionState::Connecting);
-    mq.Connect(broker_ip_, broker_port_, std::string(), std::string());
+    mq->Connect(broker_ip_, broker_port_, std::string(), std::string());
 
     return 0;
 }
@@ -150,13 +151,13 @@ int MqttServer::Disconnect(void)
     if (is_publisher_) {
         INFO("remove retained");
         std::string source_topic = room_id_ + std::string("/source");
-        mq.Publish(source_topic, nullptr, 0, AITT_QOS_AT_LEAST_ONCE, true);
+        mq->Publish(source_topic, nullptr, 0, AITT_QOS_AT_LEAST_ONCE, true);
     }
 
     std::string left_message = std::string("ROOM_PEER_LEFT ") + id_;
-    mq.Publish(room_id_, left_message.c_str(), left_message.size(), AITT_QOS_AT_LEAST_ONCE, false);
+    mq->Publish(room_id_, left_message.c_str(), left_message.size(), AITT_QOS_AT_LEAST_ONCE, false);
 
-    mq.Disconnect();
+    mq->Disconnect();
 
     room_id_ = std::string("");
 
@@ -177,7 +178,7 @@ int MqttServer::SendMessage(const std::string &peer_id, const std::string &msg)
 
     std::string receiver_topic = room_id_ + std::string("/") + peer_id;
     std::string server_formatted_msg = "ROOM_PEER_MSG " + id_ + " " + msg;
-    mq.Publish(receiver_topic, server_formatted_msg.c_str(), server_formatted_msg.size(),
+    mq->Publish(receiver_topic, server_formatted_msg.c_str(), server_formatted_msg.size(),
           AITT_QOS_AT_LEAST_ONCE);
 
     return 0;
@@ -220,7 +221,7 @@ void MqttServer::JoinRoom(const std::string &room_id)
     }
 
     // Subscribe PEER_JOIN PEER_LEFT
-    mq.Subscribe(room_id_,
+    mq->Subscribe(room_id_,
           std::bind(&MqttServer::HandleRoomTopic, this, std::placeholders::_1,
                 std::placeholders::_2, std::placeholders::_3, std::placeholders::_4,
                 std::placeholders::_5),
@@ -228,7 +229,7 @@ void MqttServer::JoinRoom(const std::string &room_id)
 
     // Subscribe PEER_MSG
     std::string receiving_topic = room_id + std::string("/") + id_;
-    mq.Subscribe(receiving_topic,
+    mq->Subscribe(receiving_topic,
           std::bind(&MqttServer::HandleMessageTopic, this, std::placeholders::_1,
                 std::placeholders::_2, std::placeholders::_3, std::placeholders::_4,
                 std::placeholders::_5),
@@ -238,7 +239,7 @@ void MqttServer::JoinRoom(const std::string &room_id)
 
     if (!is_publisher_) {
         std::string join_message = std::string("ROOM_PEER_JOINED ") + id_;
-        mq.Publish(room_id_, join_message.c_str(), join_message.size(), AITT_QOS_EXACTLY_ONCE);
+        mq->Publish(room_id_, join_message.c_str(), join_message.size(), AITT_QOS_EXACTLY_ONCE);
     }
 }
 
index ad2a3d6..49ab5e9 100644 (file)
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 #pragma once
 
-#include <MQ.h>
+#include <functional>
+#include <memory>
+#include <string>
 
 #include "Config.h"
 #include "IfaceServer.h"
+#include "MQ.h"
 
 class MqttServer : public IfaceServer {
   public:
@@ -70,7 +72,7 @@ class MqttServer : public IfaceServer {
     std::string room_id_;
     std::string source_id_;
     bool is_publisher_;
-    aitt::MQ mq;
+    std::unique_ptr<aitt::MQ> mq;
 
     ConnectionState connection_state_;
     std::function<void(ConnectionState)> connection_state_changed_cb_;
index 18c5a38..ac19457 100644 (file)
@@ -23,6 +23,7 @@
 #include <memory>
 #include <stdexcept>
 
+#include "MQProxy.h"
 #include "aitt_internal.h"
 
 #define WEBRTC_ROOM_ID_PREFIX std::string(AITT_MANAGED_TOPIC_PREFIX "webrtc/room/Room.webrtc")
@@ -34,7 +35,7 @@ AITT::Impl::Impl(AITT &parent, const std::string &id, const std::string &my_ip,
       bool custom_broker)
       : public_api(parent),
         id_(id),
-        mq(id, clear_session),
+        mq(new MQProxy(id, clear_session, custom_broker)),
         discovery(id, custom_broker),
         reply_id(0),
         modules{0}
@@ -81,16 +82,16 @@ void AITT::Impl::ThreadMain(void)
 void AITT::Impl::SetWillInfo(const std::string &topic, const void *data, const size_t datalen,
       AittQoS qos, bool retain)
 {
-    mq.SetWillInfo(topic, data, datalen, qos, retain);
+    mq->SetWillInfo(topic, data, datalen, qos, retain);
 }
 
 void AITT::Impl::SetConnectionCallback(ConnectionCallback cb, void *user_data)
 {
     if (cb)
-        mq.SetConnectionCallback(
+        mq->SetConnectionCallback(
               std::bind(&Impl::ConnectionCB, this, cb, user_data, std::placeholders::_1));
     else
-        mq.SetConnectionCallback(nullptr);
+        mq->SetConnectionCallback(nullptr);
 }
 
 void AITT::Impl::ConnectionCB(ConnectionCallback cb, void *user_data, int status)
@@ -104,7 +105,7 @@ void AITT::Impl::Connect(const std::string &host, int port, const std::string &u
       const std::string &password)
 {
     discovery.Start(host, port, username, password);
-    mq.Connect(host, port, username, password);
+    mq->Connect(host, port, username, password);
 
     mqtt_broker_ip_ = host;
     mqtt_broker_port_ = port;
@@ -117,7 +118,7 @@ void AITT::Impl::Disconnect(void)
     mqtt_broker_ip_.clear();
     mqtt_broker_port_ = -1;
 
-    mq.Disconnect();
+    mq->Disconnect();
     discovery.Stop();
 }
 
@@ -128,7 +129,7 @@ void AITT::Impl::UnsubscribeAll()
     for (auto subscribe_info : subscribed_list) {
         switch (subscribe_info->first) {
         case AITT_TYPE_MQTT:
-            mq.Unsubscribe(subscribe_info->second);
+            mq->Unsubscribe(subscribe_info->second);
             break;
         case AITT_TYPE_TCP:
             GetTransport(ModuleLoader::TYPE_TCP)->Unsubscribe(subscribe_info->second);
@@ -161,7 +162,7 @@ void AITT::Impl::Publish(const std::string &topic, const void *data, const size_
       AittProtocol protocols, AittQoS qos, bool retain)
 {
     if ((protocols & AITT_TYPE_MQTT) == AITT_TYPE_MQTT)
-        mq.Publish(topic, data, datalen, qos, retain);
+        mq->Publish(topic, data, datalen, qos, retain);
 
     if ((protocols & AITT_TYPE_TCP) == AITT_TYPE_TCP)
         GetTransport(ModuleLoader::TYPE_TCP)->Publish(topic, data, datalen, qos, retain);
@@ -224,7 +225,7 @@ AittSubscribeID AITT::Impl::Subscribe(const std::string &topic, const AITT::Subs
 AittSubscribeID AITT::Impl::SubscribeMQ(SubscribeInfo *handle, MainLoopHandler *loop_handle,
       const std::string &topic, const SubscribeCallback &cb, void *user_data, AittQoS qos)
 {
-    return mq.Subscribe(
+    return mq->Subscribe(
           topic,
           [this, handle, loop_handle, cb](MSG *msg, const std::string &topic, const void *data,
                 const size_t datalen, void *mq_user_data) {
@@ -269,7 +270,7 @@ void *AITT::Impl::Unsubscribe(AittSubscribeID subscribe_id)
     SubscribeInfo *found_info = *it;
     switch (found_info->first) {
     case AITT_TYPE_MQTT:
-        user_data = mq.Unsubscribe(found_info->second);
+        user_data = mq->Unsubscribe(found_info->second);
         break;
     case AITT_TYPE_TCP: {
         auto tcpModule = GetTransport(ModuleLoader::TYPE_TCP);
@@ -316,7 +317,7 @@ int AITT::Impl::PublishWithReply(const std::string &topic, const void *data, con
           },
           user_data, protocol, qos);
 
-    mq.PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
+    mq->PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
     return 0;
 }
 
@@ -362,7 +363,7 @@ int AITT::Impl::PublishWithReplySync(const std::string &topic, const void *data,
         subscribed_list.push_back(info);
     }
 
-    mq.PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
+    mq->PublishWithReply(topic, data, datalen, qos, false, replyTopic, correlation);
     if (timeout_ms)
         HandleTimeout(timeout_ms, timeout_id, sync_loop, is_timeout);
 
@@ -398,7 +399,7 @@ void AITT::Impl::SendReply(MSG *msg, const void *data, const int datalen, bool e
         msg->IncreaseSequence();
     msg->SetEndSequence(end);
 
-    mq.SendReply(msg, data, datalen, AITT_QOS_AT_MOST_ONCE, false);
+    mq->SendReply(msg, data, datalen, AITT_QOS_AT_MOST_ONCE, false);
 }
 
 void *AITT::Impl::SubscribeTCP(SubscribeInfo *handle, const std::string &topic,
index 6c47bc1..aa26e3b 100644 (file)
@@ -89,7 +89,7 @@ class AITT::Impl {
     std::string id_;
     std::string mqtt_broker_ip_;
     int mqtt_broker_port_;
-    MQ mq;
+    std::unique_ptr<MQ> mq;
     AittDiscovery discovery;
     unsigned short reply_id;
     ModuleObj *modules[ModuleLoader::TYPE_TRANSPORT_MAX];
index 96dda23..e81fb68 100644 (file)
@@ -7,7 +7,7 @@ INCLUDE_DIRECTORIES(${UT_NEEDS_INCLUDE_DIRS})
 LINK_DIRECTORIES(${UT_NEEDS_LIBRARY_DIRS})
 
 ###########################################################################
-SET(AITT_UT_SRC AITT_test.cc RequestResponse_test.cc MainLoopHandler_test.cc aitt_c_test.cc AITT_TCP_test.cc MQ_test.cc)
+SET(AITT_UT_SRC AITT_test.cc RequestResponse_test.cc MainLoopHandler_test.cc aitt_c_test.cc AITT_TCP_test.cc MosquittoMQ_test.cc)
 ADD_EXECUTABLE(${AITT_UT} ${AITT_UT_SRC})
 TARGET_LINK_LIBRARIES(${AITT_UT} Threads::Threads ${UT_NEEDS_LIBRARIES} ${PROJECT_NAME})
 
@@ -31,8 +31,8 @@ INSTALL(TARGETS ${AITT_UT}_manual DESTINATION ${AITT_TEST_BINDIR})
 
 ###########################################################################
 AUX_SOURCE_DIRECTORY(../mock MOCK_SRC)
-ADD_EXECUTABLE(${AITT_UT}_mq MQ_mocktest.cc ${MOCK_SRC})
-TARGET_LINK_LIBRARIES(${AITT_UT}_mq ${UT_NEEDS_LIBRARIES} Threads::Threads ${AITT_NEEDS_LIBRARIES} ${AITT_COMMON})
+ADD_EXECUTABLE(${AITT_UT}_mq MosquittoMQ_mocktest.cc ${MOCK_SRC})
+TARGET_LINK_LIBRARIES(${AITT_UT}_mq ${UT_NEEDS_LIBRARIES} Threads::Threads ${AITT_COMMON})
 TARGET_INCLUDE_DIRECTORIES(${AITT_UT}_mq PRIVATE ../mock)
 INSTALL(TARGETS ${AITT_UT}_mq DESTINATION ${AITT_TEST_BINDIR})
 
index 12437f3..ccaefdc 100644 (file)
@@ -54,3 +54,24 @@ TEST_F(ModuleLoaderTest, LoadTransport_N_Anytime)
     auto module = loader.LoadTransport(handle.get(), LOCAL_IP, discovery);
     ASSERT_NE(module, nullptr);
 }
+
+TEST_F(ModuleLoaderTest, LoadMqttClient_P_Anytime)
+{
+    ModuleLoader::ModuleHandle handle = loader.OpenModule(ModuleLoader::TYPE_CUSTOM_MQTT);
+    if (handle) {
+        EXPECT_NO_THROW({
+            auto module = loader.LoadMqttClient(handle.get(), "test", false);
+            ASSERT_NE(module, nullptr);
+        });
+    }
+}
+
+TEST_F(ModuleLoaderTest, LoadMqttClient_N_Anytime)
+{
+    EXPECT_THROW(
+          {
+              loader.LoadMqttClient(nullptr, "test", false);
+              FAIL() << "Should not be called";
+          },
+          aitt::AittException);
+}
similarity index 93%
rename from tests/MQ_mocktest.cc
rename to tests/MosquittoMQ_mocktest.cc
index 4132e5a..21140a2 100644 (file)
@@ -20,9 +20,9 @@
 #include <mutex>
 
 #include "AittTypes.h"
-#include "MQ.h"
 #include "MQMockTest.h"
 #include "MQTTMock.h"
+#include "MosquittoMQ.h"
 
 using ::testing::Return;
 
@@ -40,10 +40,10 @@ TEST_F(MQMockTest, Negative_Create_lib_init_Anytime)
     EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
 
     try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
         FAIL() << "lib_init must be failed";
     } catch (std::exception &e) {
-        ASSERT_STREQ(e.what(), "MQTT failure : MQ Constructor Error");
+        ASSERT_STREQ(e.what(), "MQTT failure : MosquittoMQ Constructor Error");
     }
 }
 
@@ -56,10 +56,10 @@ TEST_F(MQMockTest, Negative_Create_new_Anytime)
     EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
 
     try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
         FAIL() << "lib_init must be failed";
     } catch (std::exception &e) {
-        ASSERT_STREQ(e.what(), "MQTT failure : MQ Constructor Error");
+        ASSERT_STREQ(e.what(), "MQTT failure : MosquittoMQ Constructor Error");
     }
 }
 
@@ -82,7 +82,7 @@ TEST_F(MQMockTest, Positive_Publish_Anytime)
     EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
 
     try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
         mq.Connect(TEST_HOST, TEST_PORT, "", "");
         mq.Publish(TEST_TOPIC, TEST_PAYLOAD, sizeof(TEST_PAYLOAD));
     } catch (std::exception &e) {
@@ -106,7 +106,7 @@ TEST_F(MQMockTest, Positive_Subscribe_Anytime)
     EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
 
     try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
         mq.Connect(TEST_HOST, TEST_PORT, "", "");
         mq.Subscribe(
               TEST_TOPIC,
@@ -137,7 +137,7 @@ TEST_F(MQMockTest, Positive_Unsubscribe_Anytime)
     EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
 
     try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
         mq.Connect(TEST_HOST, TEST_PORT, "", "");
         void *handle = mq.Subscribe(
               TEST_TOPIC,
@@ -163,7 +163,7 @@ TEST_F(MQMockTest, Positive_Create_Anytime)
     EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
 
     try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
     } catch (std::exception &e) {
         FAIL() << "Unexpected exception occurred";
     }
@@ -181,7 +181,7 @@ TEST_F(MQMockTest, Negative_Connect_will_set_Anytime)
     EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
     EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
     try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
         mq.SetWillInfo("lastWill", TEST_PAYLOAD, sizeof(TEST_PAYLOAD), AITT_QOS_AT_MOST_ONCE, true);
         mq.Connect(TEST_HOST, TEST_PORT, "", "");
         FAIL() << "Connect() must be failed";
@@ -201,7 +201,7 @@ TEST_F(MQMockTest, Positive_Connect_Anytime)
     EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
     EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
     try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
         mq.Connect(TEST_HOST, TEST_PORT, "", "");
     } catch (std::exception &e) {
         FAIL() << "Unepxected exception: " << e.what();
@@ -224,7 +224,7 @@ TEST_F(MQMockTest, Positive_Connect_User_Anytime)
     EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
     EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
     try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
         mq.Connect(TEST_HOST, TEST_PORT, username, password);
     } catch (std::exception &e) {
         FAIL() << "Unepxected exception: " << e.what();
@@ -242,7 +242,7 @@ TEST_F(MQMockTest, Positive_Disconnect_Anytime)
     EXPECT_CALL(GetMock(), mosquitto_destroy(TEST_HANDLE)).Times(1);
     EXPECT_CALL(GetMock(), mosquitto_lib_cleanup()).WillOnce(Return(MOSQ_ERR_SUCCESS));
     try {
-        aitt::MQ mq(TEST_CLIENT_ID, true);
+        aitt::MosquittoMQ mq(TEST_CLIENT_ID, true);
         mq.Disconnect();
     } catch (std::exception &e) {
         FAIL() << "Unexpected exception: " << e.what();
similarity index 96%
rename from tests/MQ_test.cc
rename to tests/MosquittoMQ_test.cc
index 4ff554b..d0937b2 100644 (file)
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "MQ.h"
+#include "MosquittoMQ.h"
 
 #include <gtest/gtest.h>
 
 #include "aitt_internal.h"
 #include "aitt_tests.h"
 
-using MQ = aitt::MQ;
+using MosquittoMQ = aitt::MosquittoMQ;
 
 class MQTest : public testing::Test, public AittTests {
   protected:
@@ -31,7 +31,7 @@ class MQTest : public testing::Test, public AittTests {
 TEST_F(MQTest, Positve_Subscribe_in_Subscribe_Anytime)
 {
     try {
-        MQ mq("MQ_TEST_ID");
+        MosquittoMQ mq("MQ_TEST_ID");
         mq.Connect(LOCAL_IP, 1883, "", "");
         mq.Subscribe(
               "MQ_TEST_TOPIC1",