From d2d3b4c97832c901f152a303371be272f205b602 Mon Sep 17 00:00:00 2001 From: Youngjae Shin Date: Wed, 26 Oct 2022 10:04:20 +0900 Subject: [PATCH] revise connection callback - add C API for setting connection callback - support calling mosquitto APIs in callback --- common/AittDiscovery.cc | 13 ++- common/aitt_internal_definitions.h | 2 +- include/aitt_c.h | 28 ++++++ src/AITTImpl.cc | 19 ++-- src/AITTImpl.h | 3 +- src/MosquittoMQ.cc | 2 + src/aitt_c.cc | 22 +++++ tests/AITT_TCP_test.cc | 4 +- tests/AITT_test.cc | 90 ++++++++++------- tests/AittTests.h | 4 +- tests/RequestResponse_test.cc | 53 +++++----- tests/aitt_c_test.cc | 191 +++++++++++++++++++++---------------- 12 files changed, 282 insertions(+), 149 deletions(-) diff --git a/common/AittDiscovery.cc b/common/AittDiscovery.cc index 8d9996c..2195143 100644 --- a/common/AittDiscovery.cc +++ b/common/AittDiscovery.cc @@ -44,10 +44,16 @@ void AittDiscovery::Start(const std::string &host, int port, const std::string & RET_IF(callback_handle); discovery_mq->SetWillInfo(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true); + discovery_mq->SetConnectionCallback([&](int status) { + if (status != AITT_CONNECTED) + return; + + DBG("Discovery Connected"); + callback_handle = discovery_mq->Subscribe(DISCOVERY_TOPIC_BASE + "+", + DiscoveryMessageCallback, static_cast(this), AITT_QOS_EXACTLY_ONCE); + discovery_mq->SetConnectionCallback(nullptr); + }); discovery_mq->Connect(host, port, username, password); - - callback_handle = discovery_mq->Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback, - static_cast(this), AITT_QOS_EXACTLY_ONCE); } void AittDiscovery::Restart() @@ -60,6 +66,7 @@ void AittDiscovery::Restart() void AittDiscovery::Stop() { + discovery_mq->SetConnectionCallback(nullptr); discovery_mq->Unsubscribe(callback_handle); discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true); discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_AT_MOST_ONCE, true); diff --git a/common/aitt_internal_definitions.h b/common/aitt_internal_definitions.h index 10cde41..6a29dae 100644 --- a/common/aitt_internal_definitions.h +++ b/common/aitt_internal_definitions.h @@ -22,7 +22,7 @@ #define STR_EQ 0 -#define AITT_MANAGED_TOPIC_PREFIX "/v1/custom/aitt/" +#define AITT_MANAGED_TOPIC_PREFIX "/v1/custom/f5c7b34e48c1918f/" #define DISCOVERY_TOPIC_BASE std::string(AITT_MANAGED_TOPIC_PREFIX "discovery/") #define RESPONSE_POSTFIX "_AittRe_" diff --git a/include/aitt_c.h b/include/aitt_c.h index a323c3e..ee77742 100644 --- a/include/aitt_c.h +++ b/include/aitt_c.h @@ -69,6 +69,20 @@ typedef enum AittQoS aitt_qos_e; typedef enum AittError aitt_error_e; /** + * @brief Specify the type of function passed to aitt_set_connect_callback(). + * @details This is called when the mqtt broker is connected. + * @param[in] handle Handle of AITT service + * @param[in] status A value of @a AittConnectionState + * @param[in] user_data The user data to pass to the function + * + * @pre The callback must be registered using aitt_set_connect_callback() + * + * @see aitt_connect() + * @see aitt_connect_full() + */ +typedef void (*aitt_connect_cb)(aitt_h handle, int status, void *user_data); + +/** * @brief Specify the type of function passed to aitt_subscribe(). * @details When the aitt get message, it is called, immediately. * @param[in] msg_handle aitt message handle. The handle has topic name and so on. @c aitt_msg_h @@ -198,6 +212,20 @@ int aitt_will_set(aitt_h handle, const char *topic, const void *msg, const int m int aitt_connect(aitt_h handle, const char *host, int port); /** + * @brief Set the connect callback. The callback is called when the mqtt broker is connected. + * @privlevel public + * @param[in] handle Handle of AITT service + * @param[in] cb The callback function to invoke + * @param[in] user_data The user data to pass to the function + * @return @c 0 on success + * otherwise a negative error value + * @retval #AITT_ERROR_NONE Success + * @retval #AITT_ERROR_INVALID_PARAMETER Invalid parameter + * @retval #AITT_ERROR_SYSTEM System errors + */ +int aitt_set_connect_callback(aitt_h handle, aitt_connect_cb cb, void *user_data); + +/** * @brief Connect to mqtt broker as aitt_connect(), but takes username and password. * @privlevel public * @param[in] handle Handle of AITT service diff --git a/src/AITTImpl.cc b/src/AITTImpl.cc index 025f2fd..efd4ab8 100644 --- a/src/AITTImpl.cc +++ b/src/AITTImpl.cc @@ -50,7 +50,7 @@ AITT::Impl::Impl(AITT &parent, const std::string &id, const std::string &my_ip, AITT::Impl::~Impl(void) { - if (false == mqtt_broker_ip_.empty()) { + if (mqtt_broker_ip_.empty() == false) { try { Disconnect(); } catch (std::exception &e) { @@ -83,14 +83,19 @@ void AITT::Impl::SetWillInfo(const std::string &topic, const void *data, const i void AITT::Impl::SetConnectionCallback(ConnectionCallback cb, void *user_data) { - if (cb) - mq->SetConnectionCallback( - std::bind(&Impl::ConnectionCB, this, cb, user_data, std::placeholders::_1)); - else + if (cb) { + mq->SetConnectionCallback([&, cb, user_data](int status) { + auto idler_cb = std::bind(&Impl::ConnectionCB, this, cb, user_data, status, + std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + MainLoopHandler::AddIdle(&main_loop, idler_cb, nullptr); + }); + } else { mq->SetConnectionCallback(nullptr); + } } -void AITT::Impl::ConnectionCB(ConnectionCallback cb, void *user_data, int status) +void AITT::Impl::ConnectionCB(ConnectionCallback cb, void *user_data, int status, + MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *loop_data) { RET_IF(cb == nullptr); @@ -126,6 +131,8 @@ void AITT::Impl::UnsubscribeAll() { std::unique_lock lock(subscribed_list_mutex_); + DBG("Subscribed list %zu", subscribed_list.size()); + for (auto subscribe_info : subscribed_list) { switch (subscribe_info->first) { case AITT_TYPE_MQTT: diff --git a/src/AITTImpl.h b/src/AITTImpl.h index b902e50..b42bfd6 100644 --- a/src/AITTImpl.h +++ b/src/AITTImpl.h @@ -69,7 +69,8 @@ class AITT::Impl { using Blob = std::pair; using SubscribeInfo = std::pair; - void ConnectionCB(ConnectionCallback cb, void *user_data, int status); + void ConnectionCB(ConnectionCallback cb, void *user_data, int status, + MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *loop_data); AittSubscribeID SubscribeMQ(SubscribeInfo *info, MainLoopHandler *loop_handle, const std::string &topic, const SubscribeCallback &cb, void *cbdata, AittQoS qos); void DetachedCB(SubscribeCallback cb, MSG mq_msg, void *data, const int datalen, void *cbdata, diff --git a/src/MosquittoMQ.cc b/src/MosquittoMQ.cc index ada7de3..57c6daa 100644 --- a/src/MosquittoMQ.cc +++ b/src/MosquittoMQ.cc @@ -356,6 +356,8 @@ void *MosquittoMQ::Subscribe(const std::string &topic, const SubscribeCallback & void *MosquittoMQ::Unsubscribe(void *sub_handle) { + RETV_IF(nullptr == sub_handle, nullptr); + std::lock_guard auto_lock(callback_lock); auto it = std::find(subscribers.begin(), subscribers.end(), static_cast(sub_handle)); diff --git a/src/aitt_c.cc b/src/aitt_c.cc index ce25391..8596681 100644 --- a/src/aitt_c.cc +++ b/src/aitt_c.cc @@ -199,6 +199,28 @@ API int aitt_connect(aitt_h handle, const char *host, int port) return aitt_connect_full(handle, host, port, NULL, NULL); } +API int aitt_set_connect_callback(aitt_h handle, aitt_connect_cb cb, void *user_data) +{ + RETV_IF(handle == nullptr, AITT_ERROR_INVALID_PARAMETER); + RETV_IF(handle->aitt == nullptr, AITT_ERROR_INVALID_PARAMETER); + + try { + if (cb) { + handle->aitt->SetConnectionCallback( + [handle, cb, user_data](AITT &aitt, int status, void *data) { + cb(handle, status, user_data); + }); + } else { + handle->aitt->SetConnectionCallback(nullptr); + } + } catch (std::exception &e) { + ERR("SetConnectionCallback() Fail(%s)", e.what()); + return AITT_ERROR_SYSTEM; + } + + return AITT_ERROR_NONE; +} + API int aitt_connect_full(aitt_h handle, const char *host, int port, const char *username, const char *password) { diff --git a/tests/AITT_TCP_test.cc b/tests/AITT_TCP_test.cc index 4442817..b907621 100644 --- a/tests/AITT_TCP_test.cc +++ b/tests/AITT_TCP_test.cc @@ -56,7 +56,7 @@ class AITTTCPTest : public testing::Test, public AittTests { // Wait a few seconds until the AITT client gets a server list (discover devices) DBG("Sleep %d ms", SLEEP_MS); - usleep(SLEEP_MS * 1000); + usleep(100 * SLEEP_MS); aitt.Publish("test/value1", dump_msg, 12, protocol); aitt.Publish("test/value2", dump_msg, 1600, protocol); @@ -95,7 +95,7 @@ TEST_F(AITTTCPTest, TCP_Wildcards1_Anytime) // Wait a few seconds until the AITT client gets a server list (discover devices) DBG("Sleep %d ms", SLEEP_MS); - usleep(SLEEP_MS * 1000); + usleep(100 * SLEEP_MS); aitt.Publish("test/step1/value1", dump_msg, 12, AITT_TYPE_TCP); aitt.Publish("test/step2/value1", dump_msg, 1600, AITT_TYPE_TCP); diff --git a/tests/AITT_test.cc b/tests/AITT_test.cc index eda737f..bf5225d 100644 --- a/tests/AITT_test.cc +++ b/tests/AITT_test.cc @@ -30,35 +30,43 @@ class AITTTest : public testing::Test, public AittTests { void SetUp() override { Init(); } void TearDown() override { Deinit(); } - void PubsubTemplate(const char *test_msg, AittProtocol protocol) + void PubSub(AITT &aitt, const char *test_msg, AittProtocol protocol, AITTTest *aitt_test) + { + aitt.Subscribe( + testTopic, + [](aitt::MSG *handle, const void *msg, const int szmsg, void *cbdata) -> void { + AITTTest *test = static_cast(cbdata); + if (msg) + DBG("Subscribe invoked: %s %d", static_cast(msg), szmsg); + else + DBG("Subscribe invoked: zero size msg(%d)", szmsg); + test->StopEventLoop(); + }, + aitt_test, protocol); + + // Wait a few seconds until the AITT client gets a server list (discover devices) + DBG("Sleep %d ms", SLEEP_MS); + usleep(100 * SLEEP_MS); + + DBG("Publish(%s) : %s(%zu)", testTopic.c_str(), test_msg, strlen(test_msg)); + aitt.Publish(testTopic, test_msg, strlen(test_msg), protocol); + } + + void PubSubFull(const char *test_msg, AittProtocol protocol) { try { AITT aitt(clientId, LOCAL_IP, AittOption(true, false)); - aitt.Connect(); - aitt.Subscribe( - testTopic, - [](aitt::MSG *handle, const void *msg, const int szmsg, void *cbdata) -> void { - AITTTest *test = static_cast(cbdata); - test->ToggleReady(); - if (msg) - DBG("Subscribe invoked: %s %d", static_cast(msg), szmsg); - else - DBG("Subscribe invoked: zero size msg(%d)", szmsg); + aitt.SetConnectionCallback( + [&, test_msg, protocol](AITT &handle, int status, void *user_data) { + if (status == AITT_CONNECTED) + PubSub(aitt, test_msg, protocol, this); }, - static_cast(this), protocol); - - // Wait a few seconds until the AITT client gets a server list (discover devices) - DBG("Sleep %d ms", SLEEP_MS); - usleep(SLEEP_MS * 1000); + this); - DBG("Publish(%s) : %s(%zu)", testTopic.c_str(), test_msg, strlen(test_msg)); - aitt.Publish(testTopic, test_msg, strlen(test_msg), protocol); - - g_timeout_add(10, AittTests::ReadyCheck, static_cast(this)); + aitt.Connect(); IterateEventLoop(); - - ASSERT_TRUE(ready); + aitt.Disconnect(); } catch (std::exception &e) { FAIL() << "Unexpected exception: " << e.what(); } @@ -109,7 +117,7 @@ class AITTTest : public testing::Test, public AittTests { aitt1.Connect(); // Wait a few seconds to the AITT client gets server list (discover devices) - usleep(SLEEP_MS * 1000); + usleep(100 * SLEEP_MS); for (int i = 0; i < 10; i++) { INFO("size = %zu", sizeof(dump_msg)); @@ -171,7 +179,7 @@ class AITTTest : public testing::Test, public AittTests { static_cast(this), protocol); // Wait a few seconds to the AITT client gets server list (discover devices) - usleep(SLEEP_MS * 1000); + usleep(100 * SLEEP_MS); // NOTE: // Select target peers and send the data through the specified protocol - TCP @@ -210,7 +218,7 @@ class AITTTest : public testing::Test, public AittTests { static_cast(this), protocol); // Wait a few seconds to the AITT client gets server list (discover devices) - usleep(SLEEP_MS * 1000); + usleep(100 * SLEEP_MS); // NOTE: // Publish a message with the retained flag @@ -272,6 +280,24 @@ TEST_F(AITTTest, SetConnectionCallback_P_Anytime) } } +TEST_F(AITTTest, PubSubInConnectionCB_P_Anytime) +{ + try { + AITT aitt(clientId, LOCAL_IP, AittOption(true, false)); + aitt.SetConnectionCallback( + [&](AITT &handle, int status, void *user_data) { + if (status == AITT_CONNECTED) + PubSub(aitt, TEST_MSG, AITT_TYPE_MQTT, this); + }, + this); + aitt.Connect(); + + IterateEventLoop(); + } catch (std::exception &e) { + FAIL() << "Unexpected exception: " << e.what(); + } +} + TEST_F(AITTTest, UnsetConnectionCallback_P_Anytime) { try { @@ -443,12 +469,12 @@ TEST_F(AITTTest, Unsubscribe_SECURE_TCP_P_Anytime) TEST_F(AITTTest, PublishSubscribe_MQTT_P_Anytime) { - PubsubTemplate(TEST_MSG, AITT_TYPE_MQTT); + PubSubFull(TEST_MSG, AITT_TYPE_MQTT); } TEST_F(AITTTest, Publish_0_MQTT_P_Anytime) { - PubsubTemplate("", AITT_TYPE_MQTT); + PubSubFull("", AITT_TYPE_MQTT); } TEST_F(AITTTest, Unsubscribe_in_Subscribe_MQTT_P_Anytime) @@ -537,22 +563,22 @@ TEST_F(AITTTest, Subscribe_in_Subscribe_MQTT_P_Anytime) TEST_F(AITTTest, PublishSubscribe_TCP_P_Anytime) { - PubsubTemplate(TEST_MSG, AITT_TYPE_TCP); + PubSubFull(TEST_MSG, AITT_TYPE_TCP); } TEST_F(AITTTest, PublishSubscribe_SECURE_TCP_P_Anytime) { - PubsubTemplate(TEST_MSG, AITT_TYPE_TCP_SECURE); + PubSubFull(TEST_MSG, AITT_TYPE_TCP_SECURE); } TEST_F(AITTTest, Publish_0_TCP_P_Anytime) { - PubsubTemplate("", AITT_TYPE_TCP); + PubSubFull("", AITT_TYPE_TCP); } TEST_F(AITTTest, Publish_0_SECURE_TCP_P_Anytime) { - PubsubTemplate("", AITT_TYPE_TCP_SECURE); + PubSubFull("", AITT_TYPE_TCP_SECURE); } TEST_F(AITTTest, PublishSubscribe_Multiple_Protocols_P_Anytime) @@ -580,7 +606,7 @@ TEST_F(AITTTest, PublishSubscribe_Multiple_Protocols_P_Anytime) // Wait a few seconds to the AITT client gets server list (discover devices) DBG("Sleep %d ms", SLEEP_MS); - usleep(SLEEP_MS * 1000); + usleep(100 * SLEEP_MS); DBG("Publish message to %s (%s) / %zu", testTopic.c_str(), TEST_MSG, sizeof(TEST_MSG)); aitt.Publish(testTopic, TEST_MSG, sizeof(TEST_MSG), diff --git a/tests/AittTests.h b/tests/AittTests.h index 0274fb0..073fc97 100644 --- a/tests/AittTests.h +++ b/tests/AittTests.h @@ -28,7 +28,7 @@ #define TEST_MSG "This is aitt test message" #define TEST_MSG2 "This message is going to be delivered through a specified AittProtocol" -#define SLEEP_MS 100 +#define SLEEP_MS 1000 class AittTests { public: @@ -63,6 +63,8 @@ class AittTests { return G_SOURCE_CONTINUE; } + void StopEventLoop(void) { g_main_loop_quit(mainLoop); } + void IterateEventLoop(void) { g_main_loop_run(mainLoop); diff --git a/tests/RequestResponse_test.cc b/tests/RequestResponse_test.cc index c20c4a8..0ef8b29 100644 --- a/tests/RequestResponse_test.cc +++ b/tests/RequestResponse_test.cc @@ -124,33 +124,40 @@ class AITTRRTest : public testing::Test, public AittTests { sub_ok = reply1_ok = reply2_ok = false; AITT sub_aitt(clientId + "sub", LOCAL_IP, AittOption(true, false)); - INFO("Constructor Success"); - + sub_aitt.SetConnectionCallback([&](AITT &handle, int status, void *user_data) { + if (status != AITT_CONNECTED) + return; + sub_aitt.Subscribe(rr_topic.c_str(), + [&](aitt::MSG *msg, const void *data, const int datalen, void *cbdata) { + CheckSubscribe(msg, data, datalen); + sub_aitt.SendReply(msg, reply.c_str(), reply.size()); + sub_ok = true; + }); + }); sub_aitt.Connect(); - INFO("Connected"); - - sub_aitt.Subscribe(rr_topic.c_str(), - [&](aitt::MSG *msg, const void *data, const int datalen, void *cbdata) { - CheckSubscribe(msg, data, datalen); - sub_aitt.SendReply(msg, reply.c_str(), reply.size()); - sub_ok = true; - }); AITT aitt(clientId, LOCAL_IP, AittOption(true, false)); + aitt.SetConnectionCallback( + [&](AITT &handle, int status, void *user_data) { + if (status != AITT_CONNECTED) + return; + + using namespace std::placeholders; + auto replyCB = std::bind(&AITTRRTest::PublishSyncInCallback, GetHandle(), &handle, + &reply1_ok, &reply2_ok, _1, _2, _3, _4); + + if (sync) { + aitt.PublishWithReplySync(rr_topic.c_str(), message.c_str(), message.size(), + AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, false, replyCB, nullptr, + correlation); + } else { + aitt.PublishWithReply(rr_topic.c_str(), message.c_str(), message.size(), + AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, false, replyCB, nullptr, + correlation); + } + }, + this); aitt.Connect(); - usleep(SLEEP_MS * 1000); - - using namespace std::placeholders; - auto replyCB = std::bind(&AITTRRTest::PublishSyncInCallback, GetHandle(), &aitt, &reply1_ok, - &reply2_ok, _1, _2, _3, _4); - - if (sync) { - aitt.PublishWithReplySync(rr_topic.c_str(), message.c_str(), message.size(), - AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, false, replyCB, nullptr, correlation); - } else { - aitt.PublishWithReply(rr_topic.c_str(), message.c_str(), message.size(), AITT_TYPE_MQTT, - AITT_QOS_AT_MOST_ONCE, false, replyCB, nullptr, correlation); - } g_timeout_add(10, AittTests::ReadyCheck, static_cast(this)); IterateEventLoop(); diff --git a/tests/aitt_c_test.cc b/tests/aitt_c_test.cc index 5a43eff..0a7ff63 100644 --- a/tests/aitt_c_test.cc +++ b/tests/aitt_c_test.cc @@ -21,6 +21,29 @@ #include "AittTests.h" #include "aitt_internal.h" +#define TEST_REPLY_MSG "hello reply message" +#define TEST_CORRELATION "0001" + +class AITTCTest : public testing::Test, public AittTests { + protected: + void SetUp() override { Init(); } + void TearDown() override { Deinit(); } + + aitt_h NewAitt() + { + aitt_option_h option = aitt_option_new(); + EXPECT_NE(option, nullptr); + + int ret = aitt_option_set(option, AITT_OPT_MY_IP, LOCAL_IP); + EXPECT_EQ(ret, AITT_ERROR_NONE); + + aitt_h handle = aitt_new(clientId.c_str(), option); + aitt_option_destroy(option); + + return handle; + } +}; + TEST(AITT_C_INTERFACE, new_P_Anytime) { aitt_option_h option = aitt_option_new(); @@ -170,7 +193,7 @@ TEST(AITT_C_INTERFACE, disconnect_N_Anytime) aitt_destroy(handle); } -TEST(AITT_C_INTERFACE, pub_sub_P_Anytime) +TEST_F(AITTCTest, pub_sub_P_Anytime) { int ret; @@ -187,26 +210,24 @@ TEST(AITT_C_INTERFACE, pub_sub_P_Anytime) ret = aitt_connect(handle, LOCAL_IP, 1883); ASSERT_EQ(ret, AITT_ERROR_NONE); - GMainLoop *loop = g_main_loop_new(nullptr, FALSE); aitt_sub_h sub_handle = nullptr; ret = aitt_subscribe( handle, TEST_C_TOPIC, [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) { - GMainLoop *loop = static_cast(user_data); std::string received_data((const char *)msg, msg_len); EXPECT_STREQ(received_data.c_str(), TEST_C_MSG); EXPECT_STREQ(aitt_msg_get_topic(msg_handle), TEST_C_TOPIC); - g_main_loop_quit(loop); + AITTCTest *test = static_cast(user_data); + test->StopEventLoop(); }, - loop, &sub_handle); + this, &sub_handle); ASSERT_EQ(ret, AITT_ERROR_NONE); EXPECT_TRUE(sub_handle != nullptr); ret = aitt_publish(handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG)); ASSERT_EQ(ret, AITT_ERROR_NONE); - g_main_loop_run(loop); - g_main_loop_unref(loop); + IterateEventLoop(); ret = aitt_disconnect(handle); EXPECT_EQ(ret, AITT_ERROR_NONE); @@ -291,10 +312,7 @@ TEST(AITT_C_INTERFACE, sub_N_Anytime) aitt_destroy(handle); } -#define reply_msg "hello reply message" -#define test_correlation "0001" - -TEST(AITT_C_INTERFACE, pub_with_reply_send_reply_P_Anytime) +TEST(AITT_C_INTERFACE, pub_with_reply_N_Anytime) { int ret; @@ -304,107 +322,98 @@ TEST(AITT_C_INTERFACE, pub_with_reply_send_reply_P_Anytime) ret = aitt_option_set(option, AITT_OPT_MY_IP, LOCAL_IP); EXPECT_EQ(ret, AITT_ERROR_NONE); - aitt_h handle = aitt_new("test11", option); + aitt_h handle = aitt_new("test12", option); aitt_option_destroy(option); ASSERT_NE(handle, nullptr); ret = aitt_connect(handle, LOCAL_IP, 1883); ASSERT_EQ(ret, AITT_ERROR_NONE); - GMainLoop *loop = g_main_loop_new(nullptr, FALSE); - aitt_sub_h sub_handle = nullptr; - ret = aitt_subscribe( - handle, TEST_C_TOPIC, - [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) { - aitt_h handle = static_cast(user_data); - std::string received_data((const char *)msg, msg_len); - EXPECT_STREQ(received_data.c_str(), TEST_C_MSG); - EXPECT_STREQ(aitt_msg_get_topic(msg_handle), TEST_C_TOPIC); - aitt_send_reply(handle, msg_handle, reply_msg, sizeof(reply_msg), true); - }, - handle, &sub_handle); - ASSERT_EQ(ret, AITT_ERROR_NONE); + ret = aitt_publish_with_reply( + nullptr, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT, + AITT_QOS_AT_MOST_ONCE, TEST_CORRELATION, + [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr); + EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER); ret = aitt_publish_with_reply( - handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT, - AITT_QOS_AT_MOST_ONCE, test_correlation, - [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) { - GMainLoop *loop = static_cast(user_data); - std::string received_data((const char *)msg, msg_len); - EXPECT_STREQ(received_data.c_str(), reply_msg); - g_main_loop_quit(loop); - }, - loop); + handle, nullptr, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, + TEST_CORRELATION, + [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr); + EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER); - g_main_loop_run(loop); - g_main_loop_unref(loop); + ret = aitt_publish_with_reply( + handle, TEST_C_TOPIC, nullptr, 0, AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, TEST_CORRELATION, + [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr); + EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER); - ret = aitt_disconnect(handle); - EXPECT_EQ(ret, AITT_ERROR_NONE); + ret = aitt_publish_with_reply(handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG), + AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, TEST_CORRELATION, nullptr, nullptr); + EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER); aitt_destroy(handle); } -TEST(AITT_C_INTERFACE, pub_with_reply_N_Anytime) +TEST(AITT_C_INTERFACE, will_set_N_Anytime) { int ret; - aitt_option_h option = aitt_option_new(); - ASSERT_NE(option, nullptr); + ret = aitt_will_set(nullptr, "test/will_topic", "test", 4, AITT_QOS_AT_MOST_ONCE, false); + EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER); +} - ret = aitt_option_set(option, AITT_OPT_MY_IP, LOCAL_IP); - EXPECT_EQ(ret, AITT_ERROR_NONE); +TEST_F(AITTCTest, pub_with_reply_send_reply_P_Anytime) +{ + int ret; - aitt_h handle = aitt_new("test12", option); - aitt_option_destroy(option); + aitt_h handle = NewAitt(); ASSERT_NE(handle, nullptr); ret = aitt_connect(handle, LOCAL_IP, 1883); ASSERT_EQ(ret, AITT_ERROR_NONE); - ret = aitt_publish_with_reply( - nullptr, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT, - AITT_QOS_AT_MOST_ONCE, test_correlation, - [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr); - EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER); + aitt_sub_h sub_handle = nullptr; + ret = aitt_subscribe( + handle, TEST_C_TOPIC, + [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) { + aitt_h handle = static_cast(user_data); + std::string received_data((const char *)msg, msg_len); + EXPECT_STREQ(received_data.c_str(), TEST_C_MSG); + EXPECT_STREQ(aitt_msg_get_topic(msg_handle), TEST_C_TOPIC); + aitt_send_reply(handle, msg_handle, TEST_REPLY_MSG, sizeof(TEST_REPLY_MSG), true); + }, + handle, &sub_handle); + ASSERT_EQ(ret, AITT_ERROR_NONE); ret = aitt_publish_with_reply( - handle, nullptr, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, - test_correlation, - [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr); - EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER); + handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT, + AITT_QOS_AT_MOST_ONCE, TEST_CORRELATION, + [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) { + std::string received_data((const char *)msg, msg_len); + EXPECT_STREQ(received_data.c_str(), TEST_REPLY_MSG); + AITTCTest *test = static_cast(user_data); + test->StopEventLoop(); + }, + this); - ret = aitt_publish_with_reply( - handle, TEST_C_TOPIC, nullptr, 0, AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, test_correlation, - [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr); - EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER); + IterateEventLoop(); - ret = aitt_publish_with_reply(handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG), - AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, test_correlation, nullptr, nullptr); - EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER); + ret = aitt_disconnect(handle); + EXPECT_EQ(ret, AITT_ERROR_NONE); aitt_destroy(handle); } -TEST(AITT_C_INTERFACE, sub_unsub_P_Anytime) +TEST_F(AITTCTest, sub_unsub_P_Anytime) { int ret; - aitt_option_h option = aitt_option_new(); - ASSERT_NE(option, nullptr); - - ret = aitt_option_set(option, AITT_OPT_MY_IP, LOCAL_IP); - EXPECT_EQ(ret, AITT_ERROR_NONE); - - aitt_h handle = aitt_new("test13", option); - aitt_option_destroy(option); + aitt_h handle = NewAitt(); ASSERT_NE(handle, nullptr); ret = aitt_connect(handle, LOCAL_IP, 1883); ASSERT_EQ(ret, AITT_ERROR_NONE); static unsigned int sub_call_count = 0; - GMainLoop *loop = g_main_loop_new(nullptr, FALSE); static aitt_sub_h sub_handle = nullptr; ret = aitt_subscribe( handle, TEST_C_TOPIC, @@ -428,27 +437,26 @@ TEST(AITT_C_INTERFACE, sub_unsub_P_Anytime) ret = aitt_publish(handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG)); EXPECT_EQ(ret, AITT_ERROR_NONE); - return FALSE; + return G_SOURCE_REMOVE; }, handle); g_timeout_add( 2000, [](gpointer data) -> gboolean { - GMainLoop *loop = static_cast(data); EXPECT_EQ(sub_call_count, 1); if (sub_call_count == 1) { - g_main_loop_quit(loop); + AITTCTest *test = static_cast(data); + test->StopEventLoop(); return FALSE; } return TRUE; }, - loop); + this); - g_main_loop_run(loop); - g_main_loop_unref(loop); + IterateEventLoop(); ret = aitt_disconnect(handle); EXPECT_EQ(ret, AITT_ERROR_NONE); @@ -456,10 +464,33 @@ TEST(AITT_C_INTERFACE, sub_unsub_P_Anytime) aitt_destroy(handle); } -TEST(AITT_C_INTERFACE, will_set_N_Anytime) +TEST_F(AITTCTest, connect_cb_P_Anytime) { int ret; - ret = aitt_will_set(nullptr, "test/will_topic", "test", 4, AITT_QOS_AT_MOST_ONCE, false); - EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER); + aitt_h handle = NewAitt(); + ASSERT_NE(handle, nullptr); + + ret = aitt_set_connect_callback( + handle, + [](aitt_h aitt_handle, int status, void *user_data) { + AITTCTest *test = static_cast(user_data); + EXPECT_EQ(status, AITT_CONNECTED); + test->StopEventLoop(); + }, + this); + ASSERT_EQ(ret, AITT_ERROR_NONE); + + ret = aitt_connect(handle, LOCAL_IP, 1883); + ASSERT_EQ(ret, AITT_ERROR_NONE); + + IterateEventLoop(); + + ret = aitt_set_connect_callback(handle, nullptr, nullptr); + ASSERT_EQ(ret, AITT_ERROR_NONE); + + ret = aitt_disconnect(handle); + EXPECT_EQ(ret, AITT_ERROR_NONE); + + aitt_destroy(handle); } -- 2.7.4