RET_IF(callback_handle);
discovery_mq->SetWillInfo(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
+ discovery_mq->SetConnectionCallback([&](int status) {
+ if (status != AITT_CONNECTED)
+ return;
+
+ DBG("Discovery Connected");
+ callback_handle = discovery_mq->Subscribe(DISCOVERY_TOPIC_BASE + "+",
+ DiscoveryMessageCallback, static_cast<void *>(this), AITT_QOS_EXACTLY_ONCE);
+ discovery_mq->SetConnectionCallback(nullptr);
+ });
discovery_mq->Connect(host, port, username, password);
-
- callback_handle = discovery_mq->Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback,
- static_cast<void *>(this), AITT_QOS_EXACTLY_ONCE);
}
void AittDiscovery::Restart()
void AittDiscovery::Stop()
{
+ discovery_mq->SetConnectionCallback(nullptr);
discovery_mq->Unsubscribe(callback_handle);
discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_AT_MOST_ONCE, true);
#define STR_EQ 0
-#define AITT_MANAGED_TOPIC_PREFIX "/v1/custom/aitt/"
+#define AITT_MANAGED_TOPIC_PREFIX "/v1/custom/f5c7b34e48c1918f/"
#define DISCOVERY_TOPIC_BASE std::string(AITT_MANAGED_TOPIC_PREFIX "discovery/")
#define RESPONSE_POSTFIX "_AittRe_"
typedef enum AittError aitt_error_e;
/**
+ * @brief Specify the type of function passed to aitt_set_connect_callback().
+ * @details This is called when the mqtt broker is connected.
+ * @param[in] handle Handle of AITT service
+ * @param[in] status A value of @a AittConnectionState
+ * @param[in] user_data The user data to pass to the function
+ *
+ * @pre The callback must be registered using aitt_set_connect_callback()
+ *
+ * @see aitt_connect()
+ * @see aitt_connect_full()
+ */
+typedef void (*aitt_connect_cb)(aitt_h handle, int status, void *user_data);
+
+/**
* @brief Specify the type of function passed to aitt_subscribe().
* @details When the aitt get message, it is called, immediately.
* @param[in] msg_handle aitt message handle. The handle has topic name and so on. @c aitt_msg_h
int aitt_connect(aitt_h handle, const char *host, int port);
/**
+ * @brief Set the connect callback. The callback is called when the mqtt broker is connected.
+ * @privlevel public
+ * @param[in] handle Handle of AITT service
+ * @param[in] cb The callback function to invoke
+ * @param[in] user_data The user data to pass to the function
+ * @return @c 0 on success
+ * otherwise a negative error value
+ * @retval #AITT_ERROR_NONE Success
+ * @retval #AITT_ERROR_INVALID_PARAMETER Invalid parameter
+ * @retval #AITT_ERROR_SYSTEM System errors
+ */
+int aitt_set_connect_callback(aitt_h handle, aitt_connect_cb cb, void *user_data);
+
+/**
* @brief Connect to mqtt broker as aitt_connect(), but takes username and password.
* @privlevel public
* @param[in] handle Handle of AITT service
AITT::Impl::~Impl(void)
{
- if (false == mqtt_broker_ip_.empty()) {
+ if (mqtt_broker_ip_.empty() == false) {
try {
Disconnect();
} catch (std::exception &e) {
void AITT::Impl::SetConnectionCallback(ConnectionCallback cb, void *user_data)
{
- if (cb)
- mq->SetConnectionCallback(
- std::bind(&Impl::ConnectionCB, this, cb, user_data, std::placeholders::_1));
- else
+ if (cb) {
+ mq->SetConnectionCallback([&, cb, user_data](int status) {
+ auto idler_cb = std::bind(&Impl::ConnectionCB, this, cb, user_data, status,
+ std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
+ MainLoopHandler::AddIdle(&main_loop, idler_cb, nullptr);
+ });
+ } else {
mq->SetConnectionCallback(nullptr);
+ }
}
-void AITT::Impl::ConnectionCB(ConnectionCallback cb, void *user_data, int status)
+void AITT::Impl::ConnectionCB(ConnectionCallback cb, void *user_data, int status,
+ MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *loop_data)
{
RET_IF(cb == nullptr);
{
std::unique_lock<std::mutex> lock(subscribed_list_mutex_);
+ DBG("Subscribed list %zu", subscribed_list.size());
+
for (auto subscribe_info : subscribed_list) {
switch (subscribe_info->first) {
case AITT_TYPE_MQTT:
using Blob = std::pair<const void *, int>;
using SubscribeInfo = std::pair<AittProtocol, void *>;
- void ConnectionCB(ConnectionCallback cb, void *user_data, int status);
+ void ConnectionCB(ConnectionCallback cb, void *user_data, int status,
+ MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *loop_data);
AittSubscribeID SubscribeMQ(SubscribeInfo *info, MainLoopHandler *loop_handle,
const std::string &topic, const SubscribeCallback &cb, void *cbdata, AittQoS qos);
void DetachedCB(SubscribeCallback cb, MSG mq_msg, void *data, const int datalen, void *cbdata,
void *MosquittoMQ::Unsubscribe(void *sub_handle)
{
+ RETV_IF(nullptr == sub_handle, nullptr);
+
std::lock_guard<std::recursive_mutex> auto_lock(callback_lock);
auto it = std::find(subscribers.begin(), subscribers.end(),
static_cast<SubscribeData *>(sub_handle));
return aitt_connect_full(handle, host, port, NULL, NULL);
}
+API int aitt_set_connect_callback(aitt_h handle, aitt_connect_cb cb, void *user_data)
+{
+ RETV_IF(handle == nullptr, AITT_ERROR_INVALID_PARAMETER);
+ RETV_IF(handle->aitt == nullptr, AITT_ERROR_INVALID_PARAMETER);
+
+ try {
+ if (cb) {
+ handle->aitt->SetConnectionCallback(
+ [handle, cb, user_data](AITT &aitt, int status, void *data) {
+ cb(handle, status, user_data);
+ });
+ } else {
+ handle->aitt->SetConnectionCallback(nullptr);
+ }
+ } catch (std::exception &e) {
+ ERR("SetConnectionCallback() Fail(%s)", e.what());
+ return AITT_ERROR_SYSTEM;
+ }
+
+ return AITT_ERROR_NONE;
+}
+
API int aitt_connect_full(aitt_h handle, const char *host, int port, const char *username,
const char *password)
{
// Wait a few seconds until the AITT client gets a server list (discover devices)
DBG("Sleep %d ms", SLEEP_MS);
- usleep(SLEEP_MS * 1000);
+ usleep(100 * SLEEP_MS);
aitt.Publish("test/value1", dump_msg, 12, protocol);
aitt.Publish("test/value2", dump_msg, 1600, protocol);
// Wait a few seconds until the AITT client gets a server list (discover devices)
DBG("Sleep %d ms", SLEEP_MS);
- usleep(SLEEP_MS * 1000);
+ usleep(100 * SLEEP_MS);
aitt.Publish("test/step1/value1", dump_msg, 12, AITT_TYPE_TCP);
aitt.Publish("test/step2/value1", dump_msg, 1600, AITT_TYPE_TCP);
void SetUp() override { Init(); }
void TearDown() override { Deinit(); }
- void PubsubTemplate(const char *test_msg, AittProtocol protocol)
+ void PubSub(AITT &aitt, const char *test_msg, AittProtocol protocol, AITTTest *aitt_test)
+ {
+ aitt.Subscribe(
+ testTopic,
+ [](aitt::MSG *handle, const void *msg, const int szmsg, void *cbdata) -> void {
+ AITTTest *test = static_cast<AITTTest *>(cbdata);
+ if (msg)
+ DBG("Subscribe invoked: %s %d", static_cast<const char *>(msg), szmsg);
+ else
+ DBG("Subscribe invoked: zero size msg(%d)", szmsg);
+ test->StopEventLoop();
+ },
+ aitt_test, protocol);
+
+ // Wait a few seconds until the AITT client gets a server list (discover devices)
+ DBG("Sleep %d ms", SLEEP_MS);
+ usleep(100 * SLEEP_MS);
+
+ DBG("Publish(%s) : %s(%zu)", testTopic.c_str(), test_msg, strlen(test_msg));
+ aitt.Publish(testTopic, test_msg, strlen(test_msg), protocol);
+ }
+
+ void PubSubFull(const char *test_msg, AittProtocol protocol)
{
try {
AITT aitt(clientId, LOCAL_IP, AittOption(true, false));
- aitt.Connect();
- aitt.Subscribe(
- testTopic,
- [](aitt::MSG *handle, const void *msg, const int szmsg, void *cbdata) -> void {
- AITTTest *test = static_cast<AITTTest *>(cbdata);
- test->ToggleReady();
- if (msg)
- DBG("Subscribe invoked: %s %d", static_cast<const char *>(msg), szmsg);
- else
- DBG("Subscribe invoked: zero size msg(%d)", szmsg);
+ aitt.SetConnectionCallback(
+ [&, test_msg, protocol](AITT &handle, int status, void *user_data) {
+ if (status == AITT_CONNECTED)
+ PubSub(aitt, test_msg, protocol, this);
},
- static_cast<void *>(this), protocol);
-
- // Wait a few seconds until the AITT client gets a server list (discover devices)
- DBG("Sleep %d ms", SLEEP_MS);
- usleep(SLEEP_MS * 1000);
+ this);
- DBG("Publish(%s) : %s(%zu)", testTopic.c_str(), test_msg, strlen(test_msg));
- aitt.Publish(testTopic, test_msg, strlen(test_msg), protocol);
-
- g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
+ aitt.Connect();
IterateEventLoop();
-
- ASSERT_TRUE(ready);
+ aitt.Disconnect();
} catch (std::exception &e) {
FAIL() << "Unexpected exception: " << e.what();
}
aitt1.Connect();
// Wait a few seconds to the AITT client gets server list (discover devices)
- usleep(SLEEP_MS * 1000);
+ usleep(100 * SLEEP_MS);
for (int i = 0; i < 10; i++) {
INFO("size = %zu", sizeof(dump_msg));
static_cast<void *>(this), protocol);
// Wait a few seconds to the AITT client gets server list (discover devices)
- usleep(SLEEP_MS * 1000);
+ usleep(100 * SLEEP_MS);
// NOTE:
// Select target peers and send the data through the specified protocol - TCP
static_cast<void *>(this), protocol);
// Wait a few seconds to the AITT client gets server list (discover devices)
- usleep(SLEEP_MS * 1000);
+ usleep(100 * SLEEP_MS);
// NOTE:
// Publish a message with the retained flag
}
}
+TEST_F(AITTTest, PubSubInConnectionCB_P_Anytime)
+{
+ try {
+ AITT aitt(clientId, LOCAL_IP, AittOption(true, false));
+ aitt.SetConnectionCallback(
+ [&](AITT &handle, int status, void *user_data) {
+ if (status == AITT_CONNECTED)
+ PubSub(aitt, TEST_MSG, AITT_TYPE_MQTT, this);
+ },
+ this);
+ aitt.Connect();
+
+ IterateEventLoop();
+ } catch (std::exception &e) {
+ FAIL() << "Unexpected exception: " << e.what();
+ }
+}
+
TEST_F(AITTTest, UnsetConnectionCallback_P_Anytime)
{
try {
TEST_F(AITTTest, PublishSubscribe_MQTT_P_Anytime)
{
- PubsubTemplate(TEST_MSG, AITT_TYPE_MQTT);
+ PubSubFull(TEST_MSG, AITT_TYPE_MQTT);
}
TEST_F(AITTTest, Publish_0_MQTT_P_Anytime)
{
- PubsubTemplate("", AITT_TYPE_MQTT);
+ PubSubFull("", AITT_TYPE_MQTT);
}
TEST_F(AITTTest, Unsubscribe_in_Subscribe_MQTT_P_Anytime)
TEST_F(AITTTest, PublishSubscribe_TCP_P_Anytime)
{
- PubsubTemplate(TEST_MSG, AITT_TYPE_TCP);
+ PubSubFull(TEST_MSG, AITT_TYPE_TCP);
}
TEST_F(AITTTest, PublishSubscribe_SECURE_TCP_P_Anytime)
{
- PubsubTemplate(TEST_MSG, AITT_TYPE_TCP_SECURE);
+ PubSubFull(TEST_MSG, AITT_TYPE_TCP_SECURE);
}
TEST_F(AITTTest, Publish_0_TCP_P_Anytime)
{
- PubsubTemplate("", AITT_TYPE_TCP);
+ PubSubFull("", AITT_TYPE_TCP);
}
TEST_F(AITTTest, Publish_0_SECURE_TCP_P_Anytime)
{
- PubsubTemplate("", AITT_TYPE_TCP_SECURE);
+ PubSubFull("", AITT_TYPE_TCP_SECURE);
}
TEST_F(AITTTest, PublishSubscribe_Multiple_Protocols_P_Anytime)
// Wait a few seconds to the AITT client gets server list (discover devices)
DBG("Sleep %d ms", SLEEP_MS);
- usleep(SLEEP_MS * 1000);
+ usleep(100 * SLEEP_MS);
DBG("Publish message to %s (%s) / %zu", testTopic.c_str(), TEST_MSG, sizeof(TEST_MSG));
aitt.Publish(testTopic, TEST_MSG, sizeof(TEST_MSG),
#define TEST_MSG "This is aitt test message"
#define TEST_MSG2 "This message is going to be delivered through a specified AittProtocol"
-#define SLEEP_MS 100
+#define SLEEP_MS 1000
class AittTests {
public:
return G_SOURCE_CONTINUE;
}
+ void StopEventLoop(void) { g_main_loop_quit(mainLoop); }
+
void IterateEventLoop(void)
{
g_main_loop_run(mainLoop);
sub_ok = reply1_ok = reply2_ok = false;
AITT sub_aitt(clientId + "sub", LOCAL_IP, AittOption(true, false));
- INFO("Constructor Success");
-
+ sub_aitt.SetConnectionCallback([&](AITT &handle, int status, void *user_data) {
+ if (status != AITT_CONNECTED)
+ return;
+ sub_aitt.Subscribe(rr_topic.c_str(),
+ [&](aitt::MSG *msg, const void *data, const int datalen, void *cbdata) {
+ CheckSubscribe(msg, data, datalen);
+ sub_aitt.SendReply(msg, reply.c_str(), reply.size());
+ sub_ok = true;
+ });
+ });
sub_aitt.Connect();
- INFO("Connected");
-
- sub_aitt.Subscribe(rr_topic.c_str(),
- [&](aitt::MSG *msg, const void *data, const int datalen, void *cbdata) {
- CheckSubscribe(msg, data, datalen);
- sub_aitt.SendReply(msg, reply.c_str(), reply.size());
- sub_ok = true;
- });
AITT aitt(clientId, LOCAL_IP, AittOption(true, false));
+ aitt.SetConnectionCallback(
+ [&](AITT &handle, int status, void *user_data) {
+ if (status != AITT_CONNECTED)
+ return;
+
+ using namespace std::placeholders;
+ auto replyCB = std::bind(&AITTRRTest::PublishSyncInCallback, GetHandle(), &handle,
+ &reply1_ok, &reply2_ok, _1, _2, _3, _4);
+
+ if (sync) {
+ aitt.PublishWithReplySync(rr_topic.c_str(), message.c_str(), message.size(),
+ AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, false, replyCB, nullptr,
+ correlation);
+ } else {
+ aitt.PublishWithReply(rr_topic.c_str(), message.c_str(), message.size(),
+ AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, false, replyCB, nullptr,
+ correlation);
+ }
+ },
+ this);
aitt.Connect();
- usleep(SLEEP_MS * 1000);
-
- using namespace std::placeholders;
- auto replyCB = std::bind(&AITTRRTest::PublishSyncInCallback, GetHandle(), &aitt, &reply1_ok,
- &reply2_ok, _1, _2, _3, _4);
-
- if (sync) {
- aitt.PublishWithReplySync(rr_topic.c_str(), message.c_str(), message.size(),
- AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, false, replyCB, nullptr, correlation);
- } else {
- aitt.PublishWithReply(rr_topic.c_str(), message.c_str(), message.size(), AITT_TYPE_MQTT,
- AITT_QOS_AT_MOST_ONCE, false, replyCB, nullptr, correlation);
- }
g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
IterateEventLoop();
#include "AittTests.h"
#include "aitt_internal.h"
+#define TEST_REPLY_MSG "hello reply message"
+#define TEST_CORRELATION "0001"
+
+class AITTCTest : public testing::Test, public AittTests {
+ protected:
+ void SetUp() override { Init(); }
+ void TearDown() override { Deinit(); }
+
+ aitt_h NewAitt()
+ {
+ aitt_option_h option = aitt_option_new();
+ EXPECT_NE(option, nullptr);
+
+ int ret = aitt_option_set(option, AITT_OPT_MY_IP, LOCAL_IP);
+ EXPECT_EQ(ret, AITT_ERROR_NONE);
+
+ aitt_h handle = aitt_new(clientId.c_str(), option);
+ aitt_option_destroy(option);
+
+ return handle;
+ }
+};
+
TEST(AITT_C_INTERFACE, new_P_Anytime)
{
aitt_option_h option = aitt_option_new();
aitt_destroy(handle);
}
-TEST(AITT_C_INTERFACE, pub_sub_P_Anytime)
+TEST_F(AITTCTest, pub_sub_P_Anytime)
{
int ret;
ret = aitt_connect(handle, LOCAL_IP, 1883);
ASSERT_EQ(ret, AITT_ERROR_NONE);
- GMainLoop *loop = g_main_loop_new(nullptr, FALSE);
aitt_sub_h sub_handle = nullptr;
ret = aitt_subscribe(
handle, TEST_C_TOPIC,
[](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {
- GMainLoop *loop = static_cast<GMainLoop *>(user_data);
std::string received_data((const char *)msg, msg_len);
EXPECT_STREQ(received_data.c_str(), TEST_C_MSG);
EXPECT_STREQ(aitt_msg_get_topic(msg_handle), TEST_C_TOPIC);
- g_main_loop_quit(loop);
+ AITTCTest *test = static_cast<AITTCTest *>(user_data);
+ test->StopEventLoop();
},
- loop, &sub_handle);
+ this, &sub_handle);
ASSERT_EQ(ret, AITT_ERROR_NONE);
EXPECT_TRUE(sub_handle != nullptr);
ret = aitt_publish(handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG));
ASSERT_EQ(ret, AITT_ERROR_NONE);
- g_main_loop_run(loop);
- g_main_loop_unref(loop);
+ IterateEventLoop();
ret = aitt_disconnect(handle);
EXPECT_EQ(ret, AITT_ERROR_NONE);
aitt_destroy(handle);
}
-#define reply_msg "hello reply message"
-#define test_correlation "0001"
-
-TEST(AITT_C_INTERFACE, pub_with_reply_send_reply_P_Anytime)
+TEST(AITT_C_INTERFACE, pub_with_reply_N_Anytime)
{
int ret;
ret = aitt_option_set(option, AITT_OPT_MY_IP, LOCAL_IP);
EXPECT_EQ(ret, AITT_ERROR_NONE);
- aitt_h handle = aitt_new("test11", option);
+ aitt_h handle = aitt_new("test12", option);
aitt_option_destroy(option);
ASSERT_NE(handle, nullptr);
ret = aitt_connect(handle, LOCAL_IP, 1883);
ASSERT_EQ(ret, AITT_ERROR_NONE);
- GMainLoop *loop = g_main_loop_new(nullptr, FALSE);
- aitt_sub_h sub_handle = nullptr;
- ret = aitt_subscribe(
- handle, TEST_C_TOPIC,
- [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {
- aitt_h handle = static_cast<aitt_h>(user_data);
- std::string received_data((const char *)msg, msg_len);
- EXPECT_STREQ(received_data.c_str(), TEST_C_MSG);
- EXPECT_STREQ(aitt_msg_get_topic(msg_handle), TEST_C_TOPIC);
- aitt_send_reply(handle, msg_handle, reply_msg, sizeof(reply_msg), true);
- },
- handle, &sub_handle);
- ASSERT_EQ(ret, AITT_ERROR_NONE);
+ ret = aitt_publish_with_reply(
+ nullptr, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT,
+ AITT_QOS_AT_MOST_ONCE, TEST_CORRELATION,
+ [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr);
+ EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
ret = aitt_publish_with_reply(
- handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT,
- AITT_QOS_AT_MOST_ONCE, test_correlation,
- [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {
- GMainLoop *loop = static_cast<GMainLoop *>(user_data);
- std::string received_data((const char *)msg, msg_len);
- EXPECT_STREQ(received_data.c_str(), reply_msg);
- g_main_loop_quit(loop);
- },
- loop);
+ handle, nullptr, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE,
+ TEST_CORRELATION,
+ [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr);
+ EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
- g_main_loop_run(loop);
- g_main_loop_unref(loop);
+ ret = aitt_publish_with_reply(
+ handle, TEST_C_TOPIC, nullptr, 0, AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, TEST_CORRELATION,
+ [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr);
+ EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
- ret = aitt_disconnect(handle);
- EXPECT_EQ(ret, AITT_ERROR_NONE);
+ ret = aitt_publish_with_reply(handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG),
+ AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, TEST_CORRELATION, nullptr, nullptr);
+ EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
aitt_destroy(handle);
}
-TEST(AITT_C_INTERFACE, pub_with_reply_N_Anytime)
+TEST(AITT_C_INTERFACE, will_set_N_Anytime)
{
int ret;
- aitt_option_h option = aitt_option_new();
- ASSERT_NE(option, nullptr);
+ ret = aitt_will_set(nullptr, "test/will_topic", "test", 4, AITT_QOS_AT_MOST_ONCE, false);
+ EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
+}
- ret = aitt_option_set(option, AITT_OPT_MY_IP, LOCAL_IP);
- EXPECT_EQ(ret, AITT_ERROR_NONE);
+TEST_F(AITTCTest, pub_with_reply_send_reply_P_Anytime)
+{
+ int ret;
- aitt_h handle = aitt_new("test12", option);
- aitt_option_destroy(option);
+ aitt_h handle = NewAitt();
ASSERT_NE(handle, nullptr);
ret = aitt_connect(handle, LOCAL_IP, 1883);
ASSERT_EQ(ret, AITT_ERROR_NONE);
- ret = aitt_publish_with_reply(
- nullptr, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT,
- AITT_QOS_AT_MOST_ONCE, test_correlation,
- [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr);
- EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
+ aitt_sub_h sub_handle = nullptr;
+ ret = aitt_subscribe(
+ handle, TEST_C_TOPIC,
+ [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {
+ aitt_h handle = static_cast<aitt_h>(user_data);
+ std::string received_data((const char *)msg, msg_len);
+ EXPECT_STREQ(received_data.c_str(), TEST_C_MSG);
+ EXPECT_STREQ(aitt_msg_get_topic(msg_handle), TEST_C_TOPIC);
+ aitt_send_reply(handle, msg_handle, TEST_REPLY_MSG, sizeof(TEST_REPLY_MSG), true);
+ },
+ handle, &sub_handle);
+ ASSERT_EQ(ret, AITT_ERROR_NONE);
ret = aitt_publish_with_reply(
- handle, nullptr, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE,
- test_correlation,
- [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr);
- EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
+ handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT,
+ AITT_QOS_AT_MOST_ONCE, TEST_CORRELATION,
+ [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {
+ std::string received_data((const char *)msg, msg_len);
+ EXPECT_STREQ(received_data.c_str(), TEST_REPLY_MSG);
+ AITTCTest *test = static_cast<AITTCTest *>(user_data);
+ test->StopEventLoop();
+ },
+ this);
- ret = aitt_publish_with_reply(
- handle, TEST_C_TOPIC, nullptr, 0, AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, test_correlation,
- [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr);
- EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
+ IterateEventLoop();
- ret = aitt_publish_with_reply(handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG),
- AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, test_correlation, nullptr, nullptr);
- EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
+ ret = aitt_disconnect(handle);
+ EXPECT_EQ(ret, AITT_ERROR_NONE);
aitt_destroy(handle);
}
-TEST(AITT_C_INTERFACE, sub_unsub_P_Anytime)
+TEST_F(AITTCTest, sub_unsub_P_Anytime)
{
int ret;
- aitt_option_h option = aitt_option_new();
- ASSERT_NE(option, nullptr);
-
- ret = aitt_option_set(option, AITT_OPT_MY_IP, LOCAL_IP);
- EXPECT_EQ(ret, AITT_ERROR_NONE);
-
- aitt_h handle = aitt_new("test13", option);
- aitt_option_destroy(option);
+ aitt_h handle = NewAitt();
ASSERT_NE(handle, nullptr);
ret = aitt_connect(handle, LOCAL_IP, 1883);
ASSERT_EQ(ret, AITT_ERROR_NONE);
static unsigned int sub_call_count = 0;
- GMainLoop *loop = g_main_loop_new(nullptr, FALSE);
static aitt_sub_h sub_handle = nullptr;
ret = aitt_subscribe(
handle, TEST_C_TOPIC,
ret = aitt_publish(handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG));
EXPECT_EQ(ret, AITT_ERROR_NONE);
- return FALSE;
+ return G_SOURCE_REMOVE;
},
handle);
g_timeout_add(
2000,
[](gpointer data) -> gboolean {
- GMainLoop *loop = static_cast<GMainLoop *>(data);
EXPECT_EQ(sub_call_count, 1);
if (sub_call_count == 1) {
- g_main_loop_quit(loop);
+ AITTCTest *test = static_cast<AITTCTest *>(data);
+ test->StopEventLoop();
return FALSE;
}
return TRUE;
},
- loop);
+ this);
- g_main_loop_run(loop);
- g_main_loop_unref(loop);
+ IterateEventLoop();
ret = aitt_disconnect(handle);
EXPECT_EQ(ret, AITT_ERROR_NONE);
aitt_destroy(handle);
}
-TEST(AITT_C_INTERFACE, will_set_N_Anytime)
+TEST_F(AITTCTest, connect_cb_P_Anytime)
{
int ret;
- ret = aitt_will_set(nullptr, "test/will_topic", "test", 4, AITT_QOS_AT_MOST_ONCE, false);
- EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
+ aitt_h handle = NewAitt();
+ ASSERT_NE(handle, nullptr);
+
+ ret = aitt_set_connect_callback(
+ handle,
+ [](aitt_h aitt_handle, int status, void *user_data) {
+ AITTCTest *test = static_cast<AITTCTest *>(user_data);
+ EXPECT_EQ(status, AITT_CONNECTED);
+ test->StopEventLoop();
+ },
+ this);
+ ASSERT_EQ(ret, AITT_ERROR_NONE);
+
+ ret = aitt_connect(handle, LOCAL_IP, 1883);
+ ASSERT_EQ(ret, AITT_ERROR_NONE);
+
+ IterateEventLoop();
+
+ ret = aitt_set_connect_callback(handle, nullptr, nullptr);
+ ASSERT_EQ(ret, AITT_ERROR_NONE);
+
+ ret = aitt_disconnect(handle);
+ EXPECT_EQ(ret, AITT_ERROR_NONE);
+
+ aitt_destroy(handle);
}