return discovery_mq->CompareTopic(left, right);
}
-void AittDiscovery::DiscoveryMessageCallback(AittMsg *mq, const std::string &topic, const void *msg,
- const int szmsg, void *user_data)
+void AittDiscovery::DiscoveryMessageCallback(AittMsg *info, const void *msg, const int szmsg,
+ void *user_data)
{
RET_IF(user_data == nullptr);
+ RET_IF(info == nullptr);
AittDiscovery *discovery = static_cast<AittDiscovery *>(user_data);
- size_t end = topic.find("/", DISCOVERY_TOPIC_BASE.length());
- std::string clientId = topic.substr(DISCOVERY_TOPIC_BASE.length(), end);
+ size_t end = info->GetTopic().find("/", DISCOVERY_TOPIC_BASE.length());
+ std::string clientId = info->GetTopic().substr(DISCOVERY_TOPIC_BASE.length(), end);
if (clientId.empty()) {
ERR("ClientId is empty");
return;
std::shared_ptr<char> data;
};
- static void DiscoveryMessageCallback(AittMsg *mq, const std::string &topic, const void *msg,
- const int szmsg, void *user_data);
+ static void DiscoveryMessageCallback(AittMsg *info, const void *msg, const int szmsg,
+ void *user_data);
void PublishDiscoveryMsg();
std::string id_;
*/
#include "AittMsg.h"
-AittMsg::AittMsg() : sequence(0), end_sequence(true), id_(nullptr), protocols_(AITT_TYPE_MQTT)
+AittMsg::AittMsg() : sequence(0), end_sequence(true), id_(nullptr), protocol_(AITT_TYPE_MQTT)
{
}
id_ = id;
}
-AittSubscribeID AittMsg::GetID()
+AittSubscribeID AittMsg::GetID() const
{
return id_;
}
topic_ = topic;
}
-const std::string& AittMsg::GetTopic()
+const std::string& AittMsg::GetTopic() const
{
return topic_;
}
correlation_ = correlation;
}
-const std::string& AittMsg::GetCorrelation()
+const std::string& AittMsg::GetCorrelation() const
{
return correlation_;
}
reply_topic_ = replyTopic;
}
-const std::string& AittMsg::GetResponseTopic()
+const std::string& AittMsg::GetResponseTopic() const
{
return reply_topic_;
}
sequence++;
}
-int AittMsg::GetSequence()
+int AittMsg::GetSequence() const
{
return sequence;
}
end_sequence = end;
}
-bool AittMsg::IsEndSequence()
+bool AittMsg::IsEndSequence() const
{
return end_sequence;
}
-void AittMsg::SetProtocols(AittProtocol protocols)
+void AittMsg::SetProtocol(AittProtocol protocol)
{
- protocols_ = protocols;
+ protocol_ = protocol;
}
-AittProtocol AittMsg::GetProtocols()
+AittProtocol AittMsg::GetProtocol() const
{
- return protocols_;
+ return protocol_;
}
#pragma once
#include <AittDiscovery.h>
+#include <AittMsg.h>
#include <AittTypes.h>
-#include <functional>
#include <string>
#define AITT_TRANSPORT_NEW aitt_transport_new
public:
typedef void *(
*ModuleEntry)(AittProtocol type, AittDiscovery &discovery, const std::string &my_ip);
- using SubscribeCallback = std::function<void(const std::string &topic, const void *msg,
- const int szmsg, void *cbdata, const std::string &correlation)>;
+ using SubscribeCallback = AittMsgCB;
static constexpr const char *const MODULE_ENTRY_NAME = DEFINE_TO_STR(AITT_TRANSPORT_NEW);
class API AITT {
public:
- using SubscribeCallback =
- std::function<void(AittMsg *msg, const void *data, const int datalen, void *user_data)>;
+ using SubscribeCallback = AittMsgCB;
using ConnectionCallback = std::function<void(AITT &, int, void *user_data)>;
explicit AITT(const std::string notice);
#include <AittTypes.h>
+#include <functional>
#include <string>
class API AittMsg {
AittMsg();
void SetID(AittSubscribeID id);
- AittSubscribeID GetID();
+ AittSubscribeID GetID() const;
void SetTopic(const std::string &topic);
- const std::string &GetTopic();
+ const std::string &GetTopic() const;
void SetCorrelation(const std::string &correlation);
- const std::string &GetCorrelation();
+ const std::string &GetCorrelation() const;
void SetResponseTopic(const std::string &reply_topic);
- const std::string &GetResponseTopic();
+ const std::string &GetResponseTopic() const;
void SetSequence(int num);
void IncreaseSequence();
- int GetSequence();
+ int GetSequence() const;
void SetEndSequence(bool end);
- bool IsEndSequence();
- void SetProtocols(AittProtocol protocols);
- AittProtocol GetProtocols();
+ bool IsEndSequence() const;
+ void SetProtocol(AittProtocol protocol);
+ AittProtocol GetProtocol() const;
- protected:
+ private:
std::string topic_;
std::string correlation_;
std::string reply_topic_;
int sequence;
bool end_sequence;
AittSubscribeID id_;
- AittProtocol protocols_;
+ AittProtocol protocol_;
};
+
+using AittMsgCB =
+ std::function<void(AittMsg *msg, const void *data, const int data_len, void *user_data)>;
public:
typedef void *(*ModuleEntry)(const char *id, const AittOption &option);
- using SubscribeCallback = std::function<void(AittMsg *msg, const std::string &topic,
- const void *data, const int datalen, void *user_data)>;
+ using SubscribeCallback = AittMsgCB;
using MQConnectionCallback = std::function<void(int)>;
static constexpr const char *const MODULE_ENTRY_NAME = DEFINE_TO_STR(AITT_MQ_NEW);
int32_t szmsg = 0;
char *msg = nullptr;
std::string topic;
+ AittMsg msg_info;
try {
topic = impl->GetTopicName(tcp_data);
ERR("A topic is empty.");
return AITT_LOOP_EVENT_CONTINUE;
}
+ msg_info.SetTopic(topic);
szmsg = tcp_data->client->RecvSizedData((void **)&msg);
if (szmsg < 0) {
return AITT_LOOP_EVENT_CONTINUE;
}
- std::string correlation;
- // TODO: Correlation data (string) should be filled
-
- parent_info->cb(topic, msg, szmsg, parent_info->cbdata, correlation);
+ auto callback = parent_info->cb;
+ callback(&msg_info, msg, szmsg, parent_info->cbdata);
free(msg);
return AITT_LOOP_EVENT_CONTINUE;
{
return mq->Subscribe(
topic,
- [this, handle, loop_handle, cb](AittMsg *msg, const std::string &topic, const void *data,
- const int datalen, void *mq_user_data) {
+ [this, handle, loop_handle, cb](AittMsg *msg, const void *data, const int datalen,
+ void *mq_user_data) {
void *delivery = malloc(datalen);
if (delivery)
memcpy(delivery, data, datalen);
{
RET_IF(msg == nullptr);
- if ((msg->GetProtocols() & AITT_TYPE_MQTT) != AITT_TYPE_MQTT)
+ if ((msg->GetProtocol() & AITT_TYPE_MQTT) != AITT_TYPE_MQTT)
return; // not yet support
if (end == false || msg->GetSequence())
void *AITT::Impl::SubscribeTCP(SubscribeInfo *handle, const std::string &topic,
const SubscribeCallback &cb, void *user_data, AittQoS qos)
{
- return modules.Get(handle->first)
- .Subscribe(
- topic,
- [handle, cb](const std::string &topic, const void *data, const int datalen,
- void *user_data, const std::string &correlation) -> void {
- AittMsg msg;
- msg.SetID(handle);
- msg.SetTopic(topic);
- msg.SetCorrelation(correlation);
- msg.SetProtocols(handle->first);
-
- return cb(&msg, data, datalen, user_data);
- },
- user_data, qos);
+ auto protocol = handle->first;
+ return modules.Get(protocol).Subscribe(
+ topic,
+ [handle, cb, protocol](AittMsg *msg, const void *data, const int datalen,
+ void *user_data) {
+ msg->SetID(handle);
+ msg->SetProtocol(protocol);
+
+ return cb(msg, data, datalen, user_data);
+ },
+ user_data, qos);
}
AittStream *AITT::Impl::CreateStream(AittStreamProtocol type, const std::string &topic,
}
}
- subscriber->cb(&mq_msg, msg->topic, msg->payload, msg->payloadlen, subscriber->user_data);
+ subscriber->cb(&mq_msg, msg->payload, msg->payloadlen, subscriber->user_data);
}
void MosquittoMQ::Publish(const std::string &topic, const void *data, const int datalen, int qos,
mq.Connect(LOCAL_IP, 1883, "", "");
mq.Subscribe(
"MQ_TEST_TOPIC1",
- [&](AittMsg *handle, const std::string &topic, const void *data, const int datalen,
- void *user_data) {
+ [&](AittMsg *handle, const void *data, const int datalen, void *user_data) {
DBG("Subscribe invoked: %s %d", static_cast<const char *>(data), datalen);
mq.Subscribe(
"topic1InCallback",
- [](AittMsg *handle, const std::string &topic, const void *msg,
- const int szmsg, void *cbdata) {},
+ [](AittMsg *handle, const void *msg, const int szmsg, void *cbdata) {},
user_data);
mq.Subscribe(
"topic2InCallback",
- [](AittMsg *handle, const std::string &topic, const void *msg,
- const int szmsg, void *cbdata) {},
+ [](AittMsg *handle, const void *msg, const int szmsg, void *cbdata) {},
user_data);
MQTest *test = static_cast<MQTest *>(user_data);