integrate subscribe callbacks
authorYoungjae Shin <yj99.shin@samsung.com>
Thu, 19 Jan 2023 05:14:26 +0000 (14:14 +0900)
committerYoungjae Shin <yj99.shin@samsung.com>
Thu, 9 Mar 2023 02:19:33 +0000 (11:19 +0900)
- revise AittMsg also

common/AittDiscovery.cc
common/AittDiscovery.h
common/AittMsg.cc
common/AittTransport.h
include/AITT.h
include/AittMsg.h
include/MQ.h
modules/tcp/Module.cc
src/AITTImpl.cc
src/MosquittoMQ.cc
tests/MosquittoMQ_test.cc

index fd018669d63e12f90515e92d62f5d2f2b2b08867..db41be4b7250c9244eb94456654f3a9d57f4e041 100644 (file)
@@ -109,15 +109,16 @@ bool AittDiscovery::CompareTopic(const std::string &left, const std::string &rig
     return discovery_mq->CompareTopic(left, right);
 }
 
-void AittDiscovery::DiscoveryMessageCallback(AittMsg *mq, const std::string &topic, const void *msg,
-      const int szmsg, void *user_data)
+void AittDiscovery::DiscoveryMessageCallback(AittMsg *info, const void *msg, const int szmsg,
+      void *user_data)
 {
     RET_IF(user_data == nullptr);
+    RET_IF(info == nullptr);
 
     AittDiscovery *discovery = static_cast<AittDiscovery *>(user_data);
 
-    size_t end = topic.find("/", DISCOVERY_TOPIC_BASE.length());
-    std::string clientId = topic.substr(DISCOVERY_TOPIC_BASE.length(), end);
+    size_t end = info->GetTopic().find("/", DISCOVERY_TOPIC_BASE.length());
+    std::string clientId = info->GetTopic().substr(DISCOVERY_TOPIC_BASE.length(), end);
     if (clientId.empty()) {
         ERR("ClientId is empty");
         return;
index 8ae2020f2cf2ce02255bc8fe2a08c4da0e4b0a87..4a956cee0428cd35110fdca68bd1bf3972f2a117 100644 (file)
@@ -55,8 +55,8 @@ class AittDiscovery {
         std::shared_ptr<char> data;
     };
 
-    static void DiscoveryMessageCallback(AittMsg *mq, const std::string &topic, const void *msg,
-          const int szmsg, void *user_data);
+    static void DiscoveryMessageCallback(AittMsg *info, const void *msg, const int szmsg,
+          void *user_data);
     void PublishDiscoveryMsg();
 
     std::string id_;
index ed12c329900293c1011501042a5fd0bb0c97e289..449d978234ff9dd11cd5a9570d3362352c6d467c 100644 (file)
@@ -15,7 +15,7 @@
  */
 #include "AittMsg.h"
 
-AittMsg::AittMsg() : sequence(0), end_sequence(true), id_(nullptr), protocols_(AITT_TYPE_MQTT)
+AittMsg::AittMsg() : sequence(0), end_sequence(true), id_(nullptr), protocol_(AITT_TYPE_MQTT)
 {
 }
 
@@ -24,7 +24,7 @@ void AittMsg::SetID(AittSubscribeID id)
     id_ = id;
 }
 
-AittSubscribeID AittMsg::GetID()
+AittSubscribeID AittMsg::GetID() const
 {
     return id_;
 }
@@ -34,7 +34,7 @@ void AittMsg::SetTopic(const std::string& topic)
     topic_ = topic;
 }
 
-const std::string& AittMsg::GetTopic()
+const std::string& AittMsg::GetTopic() const
 {
     return topic_;
 }
@@ -44,7 +44,7 @@ void AittMsg::SetCorrelation(const std::string& correlation)
     correlation_ = correlation;
 }
 
-const std::string& AittMsg::GetCorrelation()
+const std::string& AittMsg::GetCorrelation() const
 {
     return correlation_;
 }
@@ -54,7 +54,7 @@ void AittMsg::SetResponseTopic(const std::string& replyTopic)
     reply_topic_ = replyTopic;
 }
 
-const std::string& AittMsg::GetResponseTopic()
+const std::string& AittMsg::GetResponseTopic() const
 {
     return reply_topic_;
 }
@@ -69,7 +69,7 @@ void AittMsg::IncreaseSequence()
     sequence++;
 }
 
-int AittMsg::GetSequence()
+int AittMsg::GetSequence() const
 {
     return sequence;
 }
@@ -79,17 +79,17 @@ void AittMsg::SetEndSequence(bool end)
     end_sequence = end;
 }
 
-bool AittMsg::IsEndSequence()
+bool AittMsg::IsEndSequence() const
 {
     return end_sequence;
 }
 
-void AittMsg::SetProtocols(AittProtocol protocols)
+void AittMsg::SetProtocol(AittProtocol protocol)
 {
-    protocols_ = protocols;
+    protocol_ = protocol;
 }
 
-AittProtocol AittMsg::GetProtocols()
+AittProtocol AittMsg::GetProtocol() const
 {
-    return protocols_;
+    return protocol_;
 }
index 1946dee4e57c91085417e8ce67b96e333121d9b0..0c03f5d24d5d9b3624603a26dd6e55261321fdc4 100644 (file)
@@ -16,9 +16,9 @@
 #pragma once
 
 #include <AittDiscovery.h>
+#include <AittMsg.h>
 #include <AittTypes.h>
 
-#include <functional>
 #include <string>
 
 #define AITT_TRANSPORT_NEW aitt_transport_new
@@ -31,8 +31,7 @@ class AittTransport {
   public:
     typedef void *(
           *ModuleEntry)(AittProtocol type, AittDiscovery &discovery, const std::string &my_ip);
-    using SubscribeCallback = std::function<void(const std::string &topic, const void *msg,
-          const int szmsg, void *cbdata, const std::string &correlation)>;
+    using SubscribeCallback = AittMsgCB;
 
     static constexpr const char *const MODULE_ENTRY_NAME = DEFINE_TO_STR(AITT_TRANSPORT_NEW);
 
index feed1adf60519c372283db96d914dff3f4b14168..c79703e7f3cda231145918337059abf1fccb752d 100644 (file)
@@ -33,8 +33,7 @@ namespace aitt {
 
 class API AITT {
   public:
-    using SubscribeCallback =
-          std::function<void(AittMsg *msg, const void *data, const int datalen, void *user_data)>;
+    using SubscribeCallback = AittMsgCB;
     using ConnectionCallback = std::function<void(AITT &, int, void *user_data)>;
 
     explicit AITT(const std::string notice);
index 654e5fd40d5347aa36def4b80fa2fb2460913fb1..aa41623d95c4a93b1b246f0667bc45c0058bae90 100644 (file)
@@ -17,6 +17,7 @@
 
 #include <AittTypes.h>
 
+#include <functional>
 #include <string>
 
 class API AittMsg {
@@ -24,27 +25,30 @@ class API AittMsg {
     AittMsg();
 
     void SetID(AittSubscribeID id);
-    AittSubscribeID GetID();
+    AittSubscribeID GetID() const;
     void SetTopic(const std::string &topic);
-    const std::string &GetTopic();
+    const std::string &GetTopic() const;
     void SetCorrelation(const std::string &correlation);
-    const std::string &GetCorrelation();
+    const std::string &GetCorrelation() const;
     void SetResponseTopic(const std::string &reply_topic);
-    const std::string &GetResponseTopic();
+    const std::string &GetResponseTopic() const;
     void SetSequence(int num);
     void IncreaseSequence();
-    int GetSequence();
+    int GetSequence() const;
     void SetEndSequence(bool end);
-    bool IsEndSequence();
-    void SetProtocols(AittProtocol protocols);
-    AittProtocol GetProtocols();
+    bool IsEndSequence() const;
+    void SetProtocol(AittProtocol protocol);
+    AittProtocol GetProtocol() const;
 
-  protected:
+  private:
     std::string topic_;
     std::string correlation_;
     std::string reply_topic_;
     int sequence;
     bool end_sequence;
     AittSubscribeID id_;
-    AittProtocol protocols_;
+    AittProtocol protocol_;
 };
+
+using AittMsgCB =
+      std::function<void(AittMsg *msg, const void *data, const int data_len, void *user_data)>;
index dad16e6b47b6298ebb2d8789307c1207454707ee..09a70e4077b3cc34f5c23ad4f11d0fb0d873cde9 100644 (file)
@@ -31,8 +31,7 @@ class MQ {
   public:
     typedef void *(*ModuleEntry)(const char *id, const AittOption &option);
 
-    using SubscribeCallback = std::function<void(AittMsg *msg, const std::string &topic,
-          const void *data, const int datalen, void *user_data)>;
+    using SubscribeCallback = AittMsgCB;
     using MQConnectionCallback = std::function<void(int)>;
 
     static constexpr const char *const MODULE_ENTRY_NAME = DEFINE_TO_STR(AITT_MQ_NEW);
index 27b80bc9fb1e429ff2a9c9bef28d4ee29a5a4ba1..407d92dade35add0581117f489d0bc7cc63b7f90 100644 (file)
@@ -320,6 +320,7 @@ int Module::ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
     int32_t szmsg = 0;
     char *msg = nullptr;
     std::string topic;
+    AittMsg msg_info;
 
     try {
         topic = impl->GetTopicName(tcp_data);
@@ -327,6 +328,7 @@ int Module::ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
             ERR("A topic is empty.");
             return AITT_LOOP_EVENT_CONTINUE;
         }
+               msg_info.SetTopic(topic);
 
         szmsg = tcp_data->client->RecvSizedData((void **)&msg);
         if (szmsg < 0) {
@@ -339,10 +341,8 @@ int Module::ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
         return AITT_LOOP_EVENT_CONTINUE;
     }
 
-    std::string correlation;
-    // TODO: Correlation data (string) should be filled
-
-    parent_info->cb(topic, msg, szmsg, parent_info->cbdata, correlation);
+    auto callback = parent_info->cb;
+    callback(&msg_info, msg, szmsg, parent_info->cbdata);
     free(msg);
 
     return AITT_LOOP_EVENT_CONTINUE;
index 5342ab85136b1ce7309826601e67a6e675ea535d..40a986255523d324b910cb36b65b1f35c2ba256f 100644 (file)
@@ -208,8 +208,8 @@ AittSubscribeID AITT::Impl::SubscribeMQ(SubscribeInfo *handle, MainLoopHandler *
 {
     return mq->Subscribe(
           topic,
-          [this, handle, loop_handle, cb](AittMsg *msg, const std::string &topic, const void *data,
-                const int datalen, void *mq_user_data) {
+          [this, handle, loop_handle, cb](AittMsg *msg, const void *data, const int datalen,
+                void *mq_user_data) {
               void *delivery = malloc(datalen);
               if (delivery)
                   memcpy(delivery, data, datalen);
@@ -370,7 +370,7 @@ void AITT::Impl::SendReply(AittMsg *msg, const void *data, const int datalen, bo
 {
     RET_IF(msg == nullptr);
 
-    if ((msg->GetProtocols() & AITT_TYPE_MQTT) != AITT_TYPE_MQTT)
+    if ((msg->GetProtocol() & AITT_TYPE_MQTT) != AITT_TYPE_MQTT)
         return;  // not yet support
 
     if (end == false || msg->GetSequence())
@@ -383,20 +383,17 @@ void AITT::Impl::SendReply(AittMsg *msg, const void *data, const int datalen, bo
 void *AITT::Impl::SubscribeTCP(SubscribeInfo *handle, const std::string &topic,
       const SubscribeCallback &cb, void *user_data, AittQoS qos)
 {
-    return modules.Get(handle->first)
-          .Subscribe(
-                topic,
-                [handle, cb](const std::string &topic, const void *data, const int datalen,
-                      void *user_data, const std::string &correlation) -> void {
-                    AittMsg msg;
-                    msg.SetID(handle);
-                    msg.SetTopic(topic);
-                    msg.SetCorrelation(correlation);
-                    msg.SetProtocols(handle->first);
-
-                    return cb(&msg, data, datalen, user_data);
-                },
-                user_data, qos);
+    auto protocol = handle->first;
+    return modules.Get(protocol).Subscribe(
+          topic,
+          [handle, cb, protocol](AittMsg *msg, const void *data, const int datalen,
+                void *user_data) {
+              msg->SetID(handle);
+              msg->SetProtocol(protocol);
+
+              return cb(msg, data, datalen, user_data);
+          },
+          user_data, qos);
 }
 
 AittStream *AITT::Impl::CreateStream(AittStreamProtocol type, const std::string &topic,
index 6bd9e23a00607704c9a21e459ca74f3e0298366c..79ccd6cf38ed7e6ee793408710d84f077d523ec3 100644 (file)
@@ -256,7 +256,7 @@ void MosquittoMQ::InvokeCallback(SubscribeData *subscriber, const mosquitto_mess
         }
     }
 
-    subscriber->cb(&mq_msg, msg->topic, msg->payload, msg->payloadlen, subscriber->user_data);
+    subscriber->cb(&mq_msg, msg->payload, msg->payloadlen, subscriber->user_data);
 }
 
 void MosquittoMQ::Publish(const std::string &topic, const void *data, const int datalen, int qos,
index 61ae4622909d48ccaa06d67705f972caf8397aeb..bd03d89d78f6a4cf694a25bdd14394ac43922b2b 100644 (file)
@@ -35,20 +35,17 @@ TEST_F(MQTest, Subscribe_in_Subscribe_MQTT_P_Anytime)
         mq.Connect(LOCAL_IP, 1883, "", "");
         mq.Subscribe(
               "MQ_TEST_TOPIC1",
-              [&](AittMsg *handle, const std::string &topic, const void *data, const int datalen,
-                    void *user_data) {
+              [&](AittMsg *handle, const void *data, const int datalen, void *user_data) {
                   DBG("Subscribe invoked: %s %d", static_cast<const char *>(data), datalen);
 
                   mq.Subscribe(
                         "topic1InCallback",
-                        [](AittMsg *handle, const std::string &topic, const void *msg,
-                              const int szmsg, void *cbdata) {},
+                        [](AittMsg *handle, const void *msg, const int szmsg, void *cbdata) {},
                         user_data);
 
                   mq.Subscribe(
                         "topic2InCallback",
-                        [](AittMsg *handle, const std::string &topic, const void *msg,
-                              const int szmsg, void *cbdata) {},
+                        [](AittMsg *handle, const void *msg, const int szmsg, void *cbdata) {},
                         user_data);
 
                   MQTest *test = static_cast<MQTest *>(user_data);