revise connection callback
authorYoungjae Shin <yj99.shin@samsung.com>
Wed, 26 Oct 2022 01:04:20 +0000 (10:04 +0900)
committerYoungjae Shin <yj99.shin@samsung.com>
Wed, 9 Nov 2022 08:21:13 +0000 (17:21 +0900)
- add C API for setting connection callback
- support calling mosquitto APIs in callback

12 files changed:
common/AittDiscovery.cc
common/aitt_internal_definitions.h
include/aitt_c.h
src/AITTImpl.cc
src/AITTImpl.h
src/MosquittoMQ.cc
src/aitt_c.cc
tests/AITT_TCP_test.cc
tests/AITT_test.cc
tests/AittTests.h
tests/RequestResponse_test.cc
tests/aitt_c_test.cc

index 8d9996c..2195143 100644 (file)
@@ -44,10 +44,16 @@ void AittDiscovery::Start(const std::string &host, int port, const std::string &
     RET_IF(callback_handle);
 
     discovery_mq->SetWillInfo(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
+    discovery_mq->SetConnectionCallback([&](int status) {
+        if (status != AITT_CONNECTED)
+            return;
+
+        DBG("Discovery Connected");
+        callback_handle = discovery_mq->Subscribe(DISCOVERY_TOPIC_BASE + "+",
+              DiscoveryMessageCallback, static_cast<void *>(this), AITT_QOS_EXACTLY_ONCE);
+        discovery_mq->SetConnectionCallback(nullptr);
+    });
     discovery_mq->Connect(host, port, username, password);
-
-    callback_handle = discovery_mq->Subscribe(DISCOVERY_TOPIC_BASE + "+", DiscoveryMessageCallback,
-          static_cast<void *>(this), AITT_QOS_EXACTLY_ONCE);
 }
 
 void AittDiscovery::Restart()
@@ -60,6 +66,7 @@ void AittDiscovery::Restart()
 
 void AittDiscovery::Stop()
 {
+    discovery_mq->SetConnectionCallback(nullptr);
     discovery_mq->Unsubscribe(callback_handle);
     discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_EXACTLY_ONCE, true);
     discovery_mq->Publish(DISCOVERY_TOPIC_BASE + id_, nullptr, 0, AITT_QOS_AT_MOST_ONCE, true);
index 10cde41..6a29dae 100644 (file)
@@ -22,7 +22,7 @@
 
 #define STR_EQ 0
 
-#define AITT_MANAGED_TOPIC_PREFIX "/v1/custom/aitt/"
+#define AITT_MANAGED_TOPIC_PREFIX "/v1/custom/f5c7b34e48c1918f/"
 #define DISCOVERY_TOPIC_BASE std::string(AITT_MANAGED_TOPIC_PREFIX "discovery/")
 #define RESPONSE_POSTFIX "_AittRe_"
 
index a323c3e..ee77742 100644 (file)
@@ -69,6 +69,20 @@ typedef enum AittQoS aitt_qos_e;
 typedef enum AittError aitt_error_e;
 
 /**
+ * @brief Specify the type of function passed to aitt_set_connect_callback().
+ * @details This is called when the mqtt broker is connected.
+ * @param[in] handle Handle of AITT service
+ * @param[in] status A value of @a AittConnectionState
+ * @param[in] user_data The user data to pass to the function
+ *
+ * @pre The callback must be registered using aitt_set_connect_callback()
+ *
+ * @see aitt_connect()
+ * @see aitt_connect_full()
+ */
+typedef void (*aitt_connect_cb)(aitt_h handle, int status, void *user_data);
+
+/**
  * @brief Specify the type of function passed to aitt_subscribe().
  * @details When the aitt get message, it is called, immediately.
  * @param[in] msg_handle aitt message handle. The handle has topic name and so on. @c aitt_msg_h
@@ -198,6 +212,20 @@ int aitt_will_set(aitt_h handle, const char *topic, const void *msg, const int m
 int aitt_connect(aitt_h handle, const char *host, int port);
 
 /**
+ * @brief Set the connect callback. The callback is called when the mqtt broker is connected.
+ * @privlevel public
+ * @param[in] handle Handle of AITT service
+ * @param[in] cb The callback function to invoke
+ * @param[in] user_data The user data to pass to the function
+ * @return @c 0 on success
+ *         otherwise a negative error value
+ * @retval #AITT_ERROR_NONE  Success
+ * @retval #AITT_ERROR_INVALID_PARAMETER Invalid parameter
+ * @retval #AITT_ERROR_SYSTEM System errors
+ */
+int aitt_set_connect_callback(aitt_h handle, aitt_connect_cb cb, void *user_data);
+
+/**
  * @brief Connect to mqtt broker as aitt_connect(), but takes username and password.
  * @privlevel public
  * @param[in] handle Handle of AITT service
index 025f2fd..efd4ab8 100644 (file)
@@ -50,7 +50,7 @@ AITT::Impl::Impl(AITT &parent, const std::string &id, const std::string &my_ip,
 
 AITT::Impl::~Impl(void)
 {
-    if (false == mqtt_broker_ip_.empty()) {
+    if (mqtt_broker_ip_.empty() == false) {
         try {
             Disconnect();
         } catch (std::exception &e) {
@@ -83,14 +83,19 @@ void AITT::Impl::SetWillInfo(const std::string &topic, const void *data, const i
 
 void AITT::Impl::SetConnectionCallback(ConnectionCallback cb, void *user_data)
 {
-    if (cb)
-        mq->SetConnectionCallback(
-              std::bind(&Impl::ConnectionCB, this, cb, user_data, std::placeholders::_1));
-    else
+    if (cb) {
+        mq->SetConnectionCallback([&, cb, user_data](int status) {
+            auto idler_cb = std::bind(&Impl::ConnectionCB, this, cb, user_data, status,
+                  std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
+            MainLoopHandler::AddIdle(&main_loop, idler_cb, nullptr);
+        });
+    } else {
         mq->SetConnectionCallback(nullptr);
+    }
 }
 
-void AITT::Impl::ConnectionCB(ConnectionCallback cb, void *user_data, int status)
+void AITT::Impl::ConnectionCB(ConnectionCallback cb, void *user_data, int status,
+      MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *loop_data)
 {
     RET_IF(cb == nullptr);
 
@@ -126,6 +131,8 @@ void AITT::Impl::UnsubscribeAll()
 {
     std::unique_lock<std::mutex> lock(subscribed_list_mutex_);
 
+    DBG("Subscribed list %zu", subscribed_list.size());
+
     for (auto subscribe_info : subscribed_list) {
         switch (subscribe_info->first) {
         case AITT_TYPE_MQTT:
index b902e50..b42bfd6 100644 (file)
@@ -69,7 +69,8 @@ class AITT::Impl {
     using Blob = std::pair<const void *, int>;
     using SubscribeInfo = std::pair<AittProtocol, void *>;
 
-    void ConnectionCB(ConnectionCallback cb, void *user_data, int status);
+    void ConnectionCB(ConnectionCallback cb, void *user_data, int status,
+          MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *loop_data);
     AittSubscribeID SubscribeMQ(SubscribeInfo *info, MainLoopHandler *loop_handle,
           const std::string &topic, const SubscribeCallback &cb, void *cbdata, AittQoS qos);
     void DetachedCB(SubscribeCallback cb, MSG mq_msg, void *data, const int datalen, void *cbdata,
index ada7de3..57c6daa 100644 (file)
@@ -356,6 +356,8 @@ void *MosquittoMQ::Subscribe(const std::string &topic, const SubscribeCallback &
 
 void *MosquittoMQ::Unsubscribe(void *sub_handle)
 {
+    RETV_IF(nullptr == sub_handle, nullptr);
+
     std::lock_guard<std::recursive_mutex> auto_lock(callback_lock);
     auto it = std::find(subscribers.begin(), subscribers.end(),
           static_cast<SubscribeData *>(sub_handle));
index ce25391..8596681 100644 (file)
@@ -199,6 +199,28 @@ API int aitt_connect(aitt_h handle, const char *host, int port)
     return aitt_connect_full(handle, host, port, NULL, NULL);
 }
 
+API int aitt_set_connect_callback(aitt_h handle, aitt_connect_cb cb, void *user_data)
+{
+    RETV_IF(handle == nullptr, AITT_ERROR_INVALID_PARAMETER);
+    RETV_IF(handle->aitt == nullptr, AITT_ERROR_INVALID_PARAMETER);
+
+    try {
+        if (cb) {
+            handle->aitt->SetConnectionCallback(
+                  [handle, cb, user_data](AITT &aitt, int status, void *data) {
+                      cb(handle, status, user_data);
+                  });
+        } else {
+            handle->aitt->SetConnectionCallback(nullptr);
+        }
+    } catch (std::exception &e) {
+        ERR("SetConnectionCallback() Fail(%s)", e.what());
+        return AITT_ERROR_SYSTEM;
+    }
+
+    return AITT_ERROR_NONE;
+}
+
 API int aitt_connect_full(aitt_h handle, const char *host, int port, const char *username,
       const char *password)
 {
index 4442817..b907621 100644 (file)
@@ -56,7 +56,7 @@ class AITTTCPTest : public testing::Test, public AittTests {
 
             // Wait a few seconds until the AITT client gets a server list (discover devices)
             DBG("Sleep %d ms", SLEEP_MS);
-            usleep(SLEEP_MS * 1000);
+            usleep(100 * SLEEP_MS);
 
             aitt.Publish("test/value1", dump_msg, 12, protocol);
             aitt.Publish("test/value2", dump_msg, 1600, protocol);
@@ -95,7 +95,7 @@ TEST_F(AITTTCPTest, TCP_Wildcards1_Anytime)
 
         // Wait a few seconds until the AITT client gets a server list (discover devices)
         DBG("Sleep %d ms", SLEEP_MS);
-        usleep(SLEEP_MS * 1000);
+        usleep(100 * SLEEP_MS);
 
         aitt.Publish("test/step1/value1", dump_msg, 12, AITT_TYPE_TCP);
         aitt.Publish("test/step2/value1", dump_msg, 1600, AITT_TYPE_TCP);
index eda737f..bf5225d 100644 (file)
@@ -30,35 +30,43 @@ class AITTTest : public testing::Test, public AittTests {
     void SetUp() override { Init(); }
     void TearDown() override { Deinit(); }
 
-    void PubsubTemplate(const char *test_msg, AittProtocol protocol)
+    void PubSub(AITT &aitt, const char *test_msg, AittProtocol protocol, AITTTest *aitt_test)
+    {
+        aitt.Subscribe(
+              testTopic,
+              [](aitt::MSG *handle, const void *msg, const int szmsg, void *cbdata) -> void {
+                  AITTTest *test = static_cast<AITTTest *>(cbdata);
+                  if (msg)
+                      DBG("Subscribe invoked: %s %d", static_cast<const char *>(msg), szmsg);
+                  else
+                      DBG("Subscribe invoked: zero size msg(%d)", szmsg);
+                  test->StopEventLoop();
+              },
+              aitt_test, protocol);
+
+        // Wait a few seconds until the AITT client gets a server list (discover devices)
+        DBG("Sleep %d ms", SLEEP_MS);
+        usleep(100 * SLEEP_MS);
+
+        DBG("Publish(%s) : %s(%zu)", testTopic.c_str(), test_msg, strlen(test_msg));
+        aitt.Publish(testTopic, test_msg, strlen(test_msg), protocol);
+    }
+
+    void PubSubFull(const char *test_msg, AittProtocol protocol)
     {
         try {
             AITT aitt(clientId, LOCAL_IP, AittOption(true, false));
-            aitt.Connect();
-            aitt.Subscribe(
-                  testTopic,
-                  [](aitt::MSG *handle, const void *msg, const int szmsg, void *cbdata) -> void {
-                      AITTTest *test = static_cast<AITTTest *>(cbdata);
-                      test->ToggleReady();
-                      if (msg)
-                          DBG("Subscribe invoked: %s %d", static_cast<const char *>(msg), szmsg);
-                      else
-                          DBG("Subscribe invoked: zero size msg(%d)", szmsg);
+            aitt.SetConnectionCallback(
+                  [&, test_msg, protocol](AITT &handle, int status, void *user_data) {
+                      if (status == AITT_CONNECTED)
+                          PubSub(aitt, test_msg, protocol, this);
                   },
-                  static_cast<void *>(this), protocol);
-
-            // Wait a few seconds until the AITT client gets a server list (discover devices)
-            DBG("Sleep %d ms", SLEEP_MS);
-            usleep(SLEEP_MS * 1000);
+                  this);
 
-            DBG("Publish(%s) : %s(%zu)", testTopic.c_str(), test_msg, strlen(test_msg));
-            aitt.Publish(testTopic, test_msg, strlen(test_msg), protocol);
-
-            g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
+            aitt.Connect();
 
             IterateEventLoop();
-
-            ASSERT_TRUE(ready);
+            aitt.Disconnect();
         } catch (std::exception &e) {
             FAIL() << "Unexpected exception: " << e.what();
         }
@@ -109,7 +117,7 @@ class AITTTest : public testing::Test, public AittTests {
                 aitt1.Connect();
 
                 // Wait a few seconds to the AITT client gets server list (discover devices)
-                usleep(SLEEP_MS * 1000);
+                usleep(100 * SLEEP_MS);
 
                 for (int i = 0; i < 10; i++) {
                     INFO("size = %zu", sizeof(dump_msg));
@@ -171,7 +179,7 @@ class AITTTest : public testing::Test, public AittTests {
                   static_cast<void *>(this), protocol);
 
             // Wait a few seconds to the AITT client gets server list (discover devices)
-            usleep(SLEEP_MS * 1000);
+            usleep(100 * SLEEP_MS);
 
             // NOTE:
             // Select target peers and send the data through the specified protocol - TCP
@@ -210,7 +218,7 @@ class AITTTest : public testing::Test, public AittTests {
                   static_cast<void *>(this), protocol);
 
             // Wait a few seconds to the AITT client gets server list (discover devices)
-            usleep(SLEEP_MS * 1000);
+            usleep(100 * SLEEP_MS);
 
             // NOTE:
             // Publish a message with the retained flag
@@ -272,6 +280,24 @@ TEST_F(AITTTest, SetConnectionCallback_P_Anytime)
     }
 }
 
+TEST_F(AITTTest, PubSubInConnectionCB_P_Anytime)
+{
+    try {
+        AITT aitt(clientId, LOCAL_IP, AittOption(true, false));
+        aitt.SetConnectionCallback(
+              [&](AITT &handle, int status, void *user_data) {
+                  if (status == AITT_CONNECTED)
+                      PubSub(aitt, TEST_MSG, AITT_TYPE_MQTT, this);
+              },
+              this);
+        aitt.Connect();
+
+        IterateEventLoop();
+    } catch (std::exception &e) {
+        FAIL() << "Unexpected exception: " << e.what();
+    }
+}
+
 TEST_F(AITTTest, UnsetConnectionCallback_P_Anytime)
 {
     try {
@@ -443,12 +469,12 @@ TEST_F(AITTTest, Unsubscribe_SECURE_TCP_P_Anytime)
 
 TEST_F(AITTTest, PublishSubscribe_MQTT_P_Anytime)
 {
-    PubsubTemplate(TEST_MSG, AITT_TYPE_MQTT);
+    PubSubFull(TEST_MSG, AITT_TYPE_MQTT);
 }
 
 TEST_F(AITTTest, Publish_0_MQTT_P_Anytime)
 {
-    PubsubTemplate("", AITT_TYPE_MQTT);
+    PubSubFull("", AITT_TYPE_MQTT);
 }
 
 TEST_F(AITTTest, Unsubscribe_in_Subscribe_MQTT_P_Anytime)
@@ -537,22 +563,22 @@ TEST_F(AITTTest, Subscribe_in_Subscribe_MQTT_P_Anytime)
 
 TEST_F(AITTTest, PublishSubscribe_TCP_P_Anytime)
 {
-    PubsubTemplate(TEST_MSG, AITT_TYPE_TCP);
+    PubSubFull(TEST_MSG, AITT_TYPE_TCP);
 }
 
 TEST_F(AITTTest, PublishSubscribe_SECURE_TCP_P_Anytime)
 {
-    PubsubTemplate(TEST_MSG, AITT_TYPE_TCP_SECURE);
+    PubSubFull(TEST_MSG, AITT_TYPE_TCP_SECURE);
 }
 
 TEST_F(AITTTest, Publish_0_TCP_P_Anytime)
 {
-    PubsubTemplate("", AITT_TYPE_TCP);
+    PubSubFull("", AITT_TYPE_TCP);
 }
 
 TEST_F(AITTTest, Publish_0_SECURE_TCP_P_Anytime)
 {
-    PubsubTemplate("", AITT_TYPE_TCP_SECURE);
+    PubSubFull("", AITT_TYPE_TCP_SECURE);
 }
 
 TEST_F(AITTTest, PublishSubscribe_Multiple_Protocols_P_Anytime)
@@ -580,7 +606,7 @@ TEST_F(AITTTest, PublishSubscribe_Multiple_Protocols_P_Anytime)
 
         // Wait a few seconds to the AITT client gets server list (discover devices)
         DBG("Sleep %d ms", SLEEP_MS);
-        usleep(SLEEP_MS * 1000);
+        usleep(100 * SLEEP_MS);
 
         DBG("Publish message to %s (%s) / %zu", testTopic.c_str(), TEST_MSG, sizeof(TEST_MSG));
         aitt.Publish(testTopic, TEST_MSG, sizeof(TEST_MSG),
index 0274fb0..073fc97 100644 (file)
@@ -28,7 +28,7 @@
 
 #define TEST_MSG "This is aitt test message"
 #define TEST_MSG2 "This message is going to be delivered through a specified AittProtocol"
-#define SLEEP_MS 100
+#define SLEEP_MS 1000
 
 class AittTests {
   public:
@@ -63,6 +63,8 @@ class AittTests {
         return G_SOURCE_CONTINUE;
     }
 
+    void StopEventLoop(void) { g_main_loop_quit(mainLoop); }
+
     void IterateEventLoop(void)
     {
         g_main_loop_run(mainLoop);
index c20c4a8..0ef8b29 100644 (file)
@@ -124,33 +124,40 @@ class AITTRRTest : public testing::Test, public AittTests {
         sub_ok = reply1_ok = reply2_ok = false;
 
         AITT sub_aitt(clientId + "sub", LOCAL_IP, AittOption(true, false));
-        INFO("Constructor Success");
-
+        sub_aitt.SetConnectionCallback([&](AITT &handle, int status, void *user_data) {
+            if (status != AITT_CONNECTED)
+                return;
+            sub_aitt.Subscribe(rr_topic.c_str(),
+                  [&](aitt::MSG *msg, const void *data, const int datalen, void *cbdata) {
+                      CheckSubscribe(msg, data, datalen);
+                      sub_aitt.SendReply(msg, reply.c_str(), reply.size());
+                      sub_ok = true;
+                  });
+        });
         sub_aitt.Connect();
-        INFO("Connected");
-
-        sub_aitt.Subscribe(rr_topic.c_str(),
-              [&](aitt::MSG *msg, const void *data, const int datalen, void *cbdata) {
-                  CheckSubscribe(msg, data, datalen);
-                  sub_aitt.SendReply(msg, reply.c_str(), reply.size());
-                  sub_ok = true;
-              });
 
         AITT aitt(clientId, LOCAL_IP, AittOption(true, false));
+        aitt.SetConnectionCallback(
+              [&](AITT &handle, int status, void *user_data) {
+                  if (status != AITT_CONNECTED)
+                      return;
+
+                  using namespace std::placeholders;
+                  auto replyCB = std::bind(&AITTRRTest::PublishSyncInCallback, GetHandle(), &handle,
+                        &reply1_ok, &reply2_ok, _1, _2, _3, _4);
+
+                  if (sync) {
+                      aitt.PublishWithReplySync(rr_topic.c_str(), message.c_str(), message.size(),
+                            AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, false, replyCB, nullptr,
+                            correlation);
+                  } else {
+                      aitt.PublishWithReply(rr_topic.c_str(), message.c_str(), message.size(),
+                            AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, false, replyCB, nullptr,
+                            correlation);
+                  }
+              },
+              this);
         aitt.Connect();
-        usleep(SLEEP_MS * 1000);
-
-        using namespace std::placeholders;
-        auto replyCB = std::bind(&AITTRRTest::PublishSyncInCallback, GetHandle(), &aitt, &reply1_ok,
-              &reply2_ok, _1, _2, _3, _4);
-
-        if (sync) {
-            aitt.PublishWithReplySync(rr_topic.c_str(), message.c_str(), message.size(),
-                  AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, false, replyCB, nullptr, correlation);
-        } else {
-            aitt.PublishWithReply(rr_topic.c_str(), message.c_str(), message.size(), AITT_TYPE_MQTT,
-                  AITT_QOS_AT_MOST_ONCE, false, replyCB, nullptr, correlation);
-        }
 
         g_timeout_add(10, AittTests::ReadyCheck, static_cast<AittTests *>(this));
         IterateEventLoop();
index 5a43eff..0a7ff63 100644 (file)
 #include "AittTests.h"
 #include "aitt_internal.h"
 
+#define TEST_REPLY_MSG "hello reply message"
+#define TEST_CORRELATION "0001"
+
+class AITTCTest : public testing::Test, public AittTests {
+  protected:
+    void SetUp() override { Init(); }
+    void TearDown() override { Deinit(); }
+
+    aitt_h NewAitt()
+    {
+        aitt_option_h option = aitt_option_new();
+        EXPECT_NE(option, nullptr);
+
+        int ret = aitt_option_set(option, AITT_OPT_MY_IP, LOCAL_IP);
+        EXPECT_EQ(ret, AITT_ERROR_NONE);
+
+        aitt_h handle = aitt_new(clientId.c_str(), option);
+        aitt_option_destroy(option);
+
+        return handle;
+    }
+};
+
 TEST(AITT_C_INTERFACE, new_P_Anytime)
 {
     aitt_option_h option = aitt_option_new();
@@ -170,7 +193,7 @@ TEST(AITT_C_INTERFACE, disconnect_N_Anytime)
     aitt_destroy(handle);
 }
 
-TEST(AITT_C_INTERFACE, pub_sub_P_Anytime)
+TEST_F(AITTCTest, pub_sub_P_Anytime)
 {
     int ret;
 
@@ -187,26 +210,24 @@ TEST(AITT_C_INTERFACE, pub_sub_P_Anytime)
     ret = aitt_connect(handle, LOCAL_IP, 1883);
     ASSERT_EQ(ret, AITT_ERROR_NONE);
 
-    GMainLoop *loop = g_main_loop_new(nullptr, FALSE);
     aitt_sub_h sub_handle = nullptr;
     ret = aitt_subscribe(
           handle, TEST_C_TOPIC,
           [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {
-              GMainLoop *loop = static_cast<GMainLoop *>(user_data);
               std::string received_data((const char *)msg, msg_len);
               EXPECT_STREQ(received_data.c_str(), TEST_C_MSG);
               EXPECT_STREQ(aitt_msg_get_topic(msg_handle), TEST_C_TOPIC);
-              g_main_loop_quit(loop);
+              AITTCTest *test = static_cast<AITTCTest *>(user_data);
+              test->StopEventLoop();
           },
-          loop, &sub_handle);
+          this, &sub_handle);
     ASSERT_EQ(ret, AITT_ERROR_NONE);
     EXPECT_TRUE(sub_handle != nullptr);
 
     ret = aitt_publish(handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG));
     ASSERT_EQ(ret, AITT_ERROR_NONE);
 
-    g_main_loop_run(loop);
-    g_main_loop_unref(loop);
+    IterateEventLoop();
 
     ret = aitt_disconnect(handle);
     EXPECT_EQ(ret, AITT_ERROR_NONE);
@@ -291,10 +312,7 @@ TEST(AITT_C_INTERFACE, sub_N_Anytime)
     aitt_destroy(handle);
 }
 
-#define reply_msg "hello reply message"
-#define test_correlation "0001"
-
-TEST(AITT_C_INTERFACE, pub_with_reply_send_reply_P_Anytime)
+TEST(AITT_C_INTERFACE, pub_with_reply_N_Anytime)
 {
     int ret;
 
@@ -304,107 +322,98 @@ TEST(AITT_C_INTERFACE, pub_with_reply_send_reply_P_Anytime)
     ret = aitt_option_set(option, AITT_OPT_MY_IP, LOCAL_IP);
     EXPECT_EQ(ret, AITT_ERROR_NONE);
 
-    aitt_h handle = aitt_new("test11", option);
+    aitt_h handle = aitt_new("test12", option);
     aitt_option_destroy(option);
     ASSERT_NE(handle, nullptr);
 
     ret = aitt_connect(handle, LOCAL_IP, 1883);
     ASSERT_EQ(ret, AITT_ERROR_NONE);
 
-    GMainLoop *loop = g_main_loop_new(nullptr, FALSE);
-    aitt_sub_h sub_handle = nullptr;
-    ret = aitt_subscribe(
-          handle, TEST_C_TOPIC,
-          [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {
-              aitt_h handle = static_cast<aitt_h>(user_data);
-              std::string received_data((const char *)msg, msg_len);
-              EXPECT_STREQ(received_data.c_str(), TEST_C_MSG);
-              EXPECT_STREQ(aitt_msg_get_topic(msg_handle), TEST_C_TOPIC);
-              aitt_send_reply(handle, msg_handle, reply_msg, sizeof(reply_msg), true);
-          },
-          handle, &sub_handle);
-    ASSERT_EQ(ret, AITT_ERROR_NONE);
+    ret = aitt_publish_with_reply(
+          nullptr, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT,
+          AITT_QOS_AT_MOST_ONCE, TEST_CORRELATION,
+          [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr);
+    EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
 
     ret = aitt_publish_with_reply(
-          handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT,
-          AITT_QOS_AT_MOST_ONCE, test_correlation,
-          [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {
-              GMainLoop *loop = static_cast<GMainLoop *>(user_data);
-              std::string received_data((const char *)msg, msg_len);
-              EXPECT_STREQ(received_data.c_str(), reply_msg);
-              g_main_loop_quit(loop);
-          },
-          loop);
+          handle, nullptr, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE,
+          TEST_CORRELATION,
+          [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr);
+    EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
 
-    g_main_loop_run(loop);
-    g_main_loop_unref(loop);
+    ret = aitt_publish_with_reply(
+          handle, TEST_C_TOPIC, nullptr, 0, AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, TEST_CORRELATION,
+          [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr);
+    EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
 
-    ret = aitt_disconnect(handle);
-    EXPECT_EQ(ret, AITT_ERROR_NONE);
+    ret = aitt_publish_with_reply(handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG),
+          AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, TEST_CORRELATION, nullptr, nullptr);
+    EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
 
     aitt_destroy(handle);
 }
 
-TEST(AITT_C_INTERFACE, pub_with_reply_N_Anytime)
+TEST(AITT_C_INTERFACE, will_set_N_Anytime)
 {
     int ret;
 
-    aitt_option_h option = aitt_option_new();
-    ASSERT_NE(option, nullptr);
+    ret = aitt_will_set(nullptr, "test/will_topic", "test", 4, AITT_QOS_AT_MOST_ONCE, false);
+    EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
+}
 
-    ret = aitt_option_set(option, AITT_OPT_MY_IP, LOCAL_IP);
-    EXPECT_EQ(ret, AITT_ERROR_NONE);
+TEST_F(AITTCTest, pub_with_reply_send_reply_P_Anytime)
+{
+    int ret;
 
-    aitt_h handle = aitt_new("test12", option);
-    aitt_option_destroy(option);
+    aitt_h handle = NewAitt();
     ASSERT_NE(handle, nullptr);
 
     ret = aitt_connect(handle, LOCAL_IP, 1883);
     ASSERT_EQ(ret, AITT_ERROR_NONE);
 
-    ret = aitt_publish_with_reply(
-          nullptr, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT,
-          AITT_QOS_AT_MOST_ONCE, test_correlation,
-          [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr);
-    EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
+    aitt_sub_h sub_handle = nullptr;
+    ret = aitt_subscribe(
+          handle, TEST_C_TOPIC,
+          [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {
+              aitt_h handle = static_cast<aitt_h>(user_data);
+              std::string received_data((const char *)msg, msg_len);
+              EXPECT_STREQ(received_data.c_str(), TEST_C_MSG);
+              EXPECT_STREQ(aitt_msg_get_topic(msg_handle), TEST_C_TOPIC);
+              aitt_send_reply(handle, msg_handle, TEST_REPLY_MSG, sizeof(TEST_REPLY_MSG), true);
+          },
+          handle, &sub_handle);
+    ASSERT_EQ(ret, AITT_ERROR_NONE);
 
     ret = aitt_publish_with_reply(
-          handle, nullptr, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE,
-          test_correlation,
-          [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr);
-    EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
+          handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG), AITT_TYPE_MQTT,
+          AITT_QOS_AT_MOST_ONCE, TEST_CORRELATION,
+          [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {
+              std::string received_data((const char *)msg, msg_len);
+              EXPECT_STREQ(received_data.c_str(), TEST_REPLY_MSG);
+              AITTCTest *test = static_cast<AITTCTest *>(user_data);
+              test->StopEventLoop();
+          },
+          this);
 
-    ret = aitt_publish_with_reply(
-          handle, TEST_C_TOPIC, nullptr, 0, AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, test_correlation,
-          [](aitt_msg_h msg_handle, const void *msg, int msg_len, void *user_data) {}, nullptr);
-    EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
+    IterateEventLoop();
 
-    ret = aitt_publish_with_reply(handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG),
-          AITT_TYPE_MQTT, AITT_QOS_AT_MOST_ONCE, test_correlation, nullptr, nullptr);
-    EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
+    ret = aitt_disconnect(handle);
+    EXPECT_EQ(ret, AITT_ERROR_NONE);
 
     aitt_destroy(handle);
 }
 
-TEST(AITT_C_INTERFACE, sub_unsub_P_Anytime)
+TEST_F(AITTCTest, sub_unsub_P_Anytime)
 {
     int ret;
 
-    aitt_option_h option = aitt_option_new();
-    ASSERT_NE(option, nullptr);
-
-    ret = aitt_option_set(option, AITT_OPT_MY_IP, LOCAL_IP);
-    EXPECT_EQ(ret, AITT_ERROR_NONE);
-
-    aitt_h handle = aitt_new("test13", option);
-    aitt_option_destroy(option);
+    aitt_h handle = NewAitt();
     ASSERT_NE(handle, nullptr);
 
     ret = aitt_connect(handle, LOCAL_IP, 1883);
     ASSERT_EQ(ret, AITT_ERROR_NONE);
 
     static unsigned int sub_call_count = 0;
-    GMainLoop *loop = g_main_loop_new(nullptr, FALSE);
     static aitt_sub_h sub_handle = nullptr;
     ret = aitt_subscribe(
           handle, TEST_C_TOPIC,
@@ -428,27 +437,26 @@ TEST(AITT_C_INTERFACE, sub_unsub_P_Anytime)
 
               ret = aitt_publish(handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG));
               EXPECT_EQ(ret, AITT_ERROR_NONE);
-              return FALSE;
+              return G_SOURCE_REMOVE;
           },
           handle);
 
     g_timeout_add(
           2000,
           [](gpointer data) -> gboolean {
-              GMainLoop *loop = static_cast<GMainLoop *>(data);
               EXPECT_EQ(sub_call_count, 1);
 
               if (sub_call_count == 1) {
-                  g_main_loop_quit(loop);
+                  AITTCTest *test = static_cast<AITTCTest *>(data);
+                  test->StopEventLoop();
                   return FALSE;
               }
 
               return TRUE;
           },
-          loop);
+          this);
 
-    g_main_loop_run(loop);
-    g_main_loop_unref(loop);
+    IterateEventLoop();
 
     ret = aitt_disconnect(handle);
     EXPECT_EQ(ret, AITT_ERROR_NONE);
@@ -456,10 +464,33 @@ TEST(AITT_C_INTERFACE, sub_unsub_P_Anytime)
     aitt_destroy(handle);
 }
 
-TEST(AITT_C_INTERFACE, will_set_N_Anytime)
+TEST_F(AITTCTest, connect_cb_P_Anytime)
 {
     int ret;
 
-    ret = aitt_will_set(nullptr, "test/will_topic", "test", 4, AITT_QOS_AT_MOST_ONCE, false);
-    EXPECT_EQ(ret, AITT_ERROR_INVALID_PARAMETER);
+    aitt_h handle = NewAitt();
+    ASSERT_NE(handle, nullptr);
+
+    ret = aitt_set_connect_callback(
+          handle,
+          [](aitt_h aitt_handle, int status, void *user_data) {
+              AITTCTest *test = static_cast<AITTCTest *>(user_data);
+              EXPECT_EQ(status, AITT_CONNECTED);
+              test->StopEventLoop();
+          },
+          this);
+    ASSERT_EQ(ret, AITT_ERROR_NONE);
+
+    ret = aitt_connect(handle, LOCAL_IP, 1883);
+    ASSERT_EQ(ret, AITT_ERROR_NONE);
+
+    IterateEventLoop();
+
+    ret = aitt_set_connect_callback(handle, nullptr, nullptr);
+    ASSERT_EQ(ret, AITT_ERROR_NONE);
+
+    ret = aitt_disconnect(handle);
+    EXPECT_EQ(ret, AITT_ERROR_NONE);
+
+    aitt_destroy(handle);
 }