revise Posix Mainloop
authorYoungjae Shin <yj99.shin@samsung.com>
Mon, 12 Dec 2022 05:17:26 +0000 (14:17 +0900)
committerYoungjae Shin <yj99.shin@samsung.com>
Mon, 12 Dec 2022 09:35:28 +0000 (18:35 +0900)
[Problem] cb data is double freed
[Solution] use shared_ptr
- revise TCP test and lock

common/PosixMainLoop.cc
common/PosixMainLoop.h
debian/rules
modules/tcp/Module.cc
modules/tcp/Module.h
tests/AITT_TCP_test.cc

index be495f1..12e323e 100644 (file)
@@ -57,9 +57,6 @@ PosixMainLoop::~PosixMainLoop()
     for (const auto &pair : timeout_table)
         delete pair.second;
 
-    for (const auto &pair : watch_table)
-        delete pair.second;
-
     for (const auto &data : idle_table)
         delete data;
 
@@ -82,21 +79,19 @@ void PosixMainLoop::Run()
 
         table_lock.lock();
         struct pollfd pfds[watch_table.size() + 2];
-
-        // for Watch
-        for (auto iter = watch_table.begin(); iter != watch_table.end(); iter++) {
+        for (auto iter = watch_table.begin(); iter != watch_table.end(); iter++)
             pfds[nfds++] = {iter->second->fd, POLLIN | POLLHUP | POLLERR, 0};
-        }
         table_lock.unlock();
 
-        // for idle
         pfds[nfds++] = {idle_pipe[0], POLLIN, 0};
-        // for timeout
         pfds[nfds++] = {timeout_pipe[0], POLLIN, 0};
 
         if (0 < poll(pfds, nfds, -1)) {
             bool handled = false;
-            handled |= CheckTimeout(pfds[nfds - 1], POLLIN);
+            int ret = CheckTimeout(pfds[nfds - 1], POLLIN);
+            if (ret < 0)
+                break;
+            handled |= !!ret;
             handled |= CheckWatch(pfds, nfds - 2, POLLIN | POLLHUP | POLLERR);
             if (!handled)
                 CheckIdle(pfds[nfds - 2], POLLIN);
@@ -114,7 +109,7 @@ bool PosixMainLoop::Quit()
     }
 
     is_running = false;
-    WriteToPipe(timeout_pipe[1], INVALID);
+    WriteToPipe(timeout_pipe[1], QUIT);
     return true;
 }
 
@@ -138,7 +133,7 @@ void PosixMainLoop::AddWatch(int fd, const mainLoopCB &cb, MainLoopData *user_da
 
     std::lock_guard<std::mutex> lock(table_lock);
     watch_table.insert(WatchMap::value_type(cb_data->fd, cb_data));
-    WriteToPipe(idle_pipe[1], INVALID);
+    WriteToPipe(idle_pipe[1], PING);
 }
 
 PosixMainLoop::MainLoopData *PosixMainLoop::RemoveWatch(int fd)
@@ -150,7 +145,6 @@ PosixMainLoop::MainLoopData *PosixMainLoop::RemoveWatch(int fd)
         return nullptr;
     }
     MainLoopData *user_data = iter->second->data;
-    delete iter->second;
     watch_table.erase(iter);
 
     return user_data;
@@ -219,14 +213,12 @@ bool PosixMainLoop::CheckWatch(pollfd *pfds, nfds_t nfds, short int event)
 {
     bool handled = false;
     for (nfds_t idx = 0; idx < nfds; idx++) {
-        if (false == (pfds[idx].revents & event)) {
-            DBG("Unknown Event(%d)", pfds[idx].revents);
+        if (false == (pfds[idx].revents & event))
             continue;
-        }
 
         table_lock.lock();
         auto iter = watch_table.find(pfds[idx].fd);
-        MainLoopCbData *cb_data = (watch_table.end() == iter) ? NULL : iter->second;
+        auto cb_data = (watch_table.end() == iter) ? NULL : iter->second;
         table_lock.unlock();
 
         if (cb_data) {
@@ -243,19 +235,17 @@ bool PosixMainLoop::CheckWatch(pollfd *pfds, nfds_t nfds, short int event)
     return handled;
 }
 
-bool PosixMainLoop::CheckTimeout(pollfd pfd, short int event)
+int PosixMainLoop::CheckTimeout(pollfd pfd, short int event)
 {
-    if (false == (pfd.revents & event)) {
-        DBG("Unknown Event(%d)", pfd.revents);
+    if (false == (pfd.revents & event))
         return false;
-    }
 
     bool handled = false;
-    unsigned int identifier = INVALID;
+    int identifier = PING;
     while (read(pfd.fd, &identifier, sizeof(identifier)) == sizeof(identifier)) {
-        if (identifier == INVALID) {
+        if (identifier == QUIT) {
             INFO("Terminating");
-            return true;
+            return -1;
         }
 
         table_lock.lock();
@@ -278,12 +268,10 @@ bool PosixMainLoop::CheckTimeout(pollfd pfd, short int event)
 
 void PosixMainLoop::CheckIdle(pollfd pfd, short int event)
 {
-    if (false == (pfd.revents & event)) {
-        DBG("Unknown Event(%d)", pfd.revents);
+    if (false == (pfd.revents & event))
         return;
-    }
 
-    unsigned int identifier = INVALID;
+    int identifier = PING;
     if (read(pfd.fd, &identifier, sizeof(identifier)) == sizeof(identifier)) {
         if (identifier == IDLE) {
             table_lock.lock();
index 3325692..4ae3593 100644 (file)
@@ -20,6 +20,7 @@
 
 #include <deque>
 #include <map>
+#include <memory>
 #include <mutex>
 
 #include "MainLoopIface.h"
@@ -41,7 +42,8 @@ class PosixMainLoop : public MainLoopIface {
 
   private:
     enum PipeValue {
-        INVALID = 0,
+        QUIT = -1,
+        PING = 0,
         IDLE = 1,
         TIMEOUT_START = 2,
     };
@@ -60,7 +62,7 @@ class PosixMainLoop : public MainLoopIface {
         int timeout_interval;
     };
 
-    using WatchMap = std::map<int, MainLoopCbData *>;
+    using WatchMap = std::map<int, std::shared_ptr<MainLoopCbData>>;
     using TimeoutMap = std::map<unsigned int, MainLoopCbData *>;
     using IdleQueue = std::deque<MainLoopCbData *>;
 
@@ -69,7 +71,7 @@ class PosixMainLoop : public MainLoopIface {
 
     void TimeoutTableInsert(unsigned int identifier, MainLoopCbData *cb_data);
     bool CheckWatch(pollfd *pfds, nfds_t nfds, short int event);
-    bool CheckTimeout(pollfd pfd, short int event);
+    int CheckTimeout(pollfd pfd, short int event);
     void CheckIdle(pollfd pfd, short int event);
     int SetTimer(int interval, unsigned int timeout_id);
 
@@ -79,7 +81,7 @@ class PosixMainLoop : public MainLoopIface {
     std::mutex table_lock;
     int timeout_pipe[2];
     int idle_pipe[2];
-
     bool is_running;
 };
+
 }  // namespace aitt
index dcfe88d..c021c28 100755 (executable)
@@ -41,7 +41,7 @@ override_dh_auto_configure:
                -DCMAKE_VERBOSE_MAKEFILE:BOOL=OFF \
                -DBUILD_TESTING:BOOL=${TEST} \
                -DCOVERAGE_TEST:BOOL=${COVERAGE} \
-               -DWITH_MBEDTLS:BOOL=${MBEDTLS}; \
+               -DWITH_MBEDTLS:BOOL=${MBEDTLS} \
                -DUSE_GLIB:BOOL=${GLIB}; \
        cd -
 
index f2265d3..27b80bc 100644 (file)
@@ -167,30 +167,27 @@ void *Module::Subscribe(const std::string &topic, const AittTransport::Subscribe
 
 void *Module::Unsubscribe(void *handlePtr)
 {
+    std::lock_guard<std::mutex> autoLock(subscribeTableLock);
+
     int handle = static_cast<int>(reinterpret_cast<intptr_t>(handlePtr));
     TCPServerData *listen_info = dynamic_cast<TCPServerData *>(main_loop.RemoveWatch(handle));
     if (!listen_info)
         return nullptr;
 
-    {
-        std::lock_guard<std::mutex> autoLock(subscribeTableLock);
-        auto it = subscribeTable.find(listen_info->topic);
-        if (it == subscribeTable.end())
-            throw std::runtime_error("Service is not registered: " + listen_info->topic);
+    auto it = subscribeTable.find(listen_info->topic);
+    if (it == subscribeTable.end())
+        throw std::runtime_error("Service is not registered: " + listen_info->topic);
 
-        subscribeTable.erase(it);
+    subscribeTable.erase(it);
 
-        UpdateDiscoveryMsg();
-    }
+    UpdateDiscoveryMsg();
 
     void *cbdata = listen_info->cbdata;
-    listen_info->client_lock.lock();
     for (auto fd : listen_info->client_list) {
         TCPData *tcp_data = dynamic_cast<TCPData *>(main_loop.RemoveWatch(fd));
         delete tcp_data;
     }
     listen_info->client_list.clear();
-    listen_info->client_lock.unlock();
     delete listen_info;
 
     return cbdata;
@@ -353,16 +350,15 @@ int Module::ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
 
 int Module::HandleClientDisconnect(int handle)
 {
+    std::lock_guard<std::mutex> autoLock(subscribeTableLock);
     TCPData *tcp_data = dynamic_cast<TCPData *>(main_loop.RemoveWatch(handle));
     if (tcp_data == nullptr) {
         ERR("No watch data");
         return AITT_LOOP_EVENT_REMOVE;
     }
-    tcp_data->parent->client_lock.lock();
     auto it = std::find(tcp_data->parent->client_list.begin(), tcp_data->parent->client_list.end(),
           handle);
     tcp_data->parent->client_list.erase(it);
-    tcp_data->parent->client_lock.unlock();
 
     delete tcp_data;
     return AITT_LOOP_EVENT_REMOVE;
index 0d3700e..028d407 100644 (file)
@@ -56,7 +56,6 @@ class Module : public AittTransport {
         void *cbdata;
         std::string topic;
         std::vector<int> client_list;
-        std::mutex client_lock;
     };
 
     struct TCPData : public MainLoopHandler::MainLoopData {
index 6b2ea09..a72f16d 100644 (file)
@@ -30,38 +30,45 @@ class AITTTCPTest : public testing::Test, public AittTests {
     void SetUp() override { Init(); }
     void TearDown() override { Deinit(); }
 
-    void TCPWildcardsTopicTemplate(AittProtocol protocol)
+    void TCPWildcardsTopicTemplate(AittProtocol protocol, bool single_level)
     {
         try {
             char dump_msg[204800];
+            std::string sub_topic = "test/" + std::string(single_level ? "+" : "#");
 
             AITT aitt(clientId, LOCAL_IP);
-            aitt.Connect();
 
             int cnt = 0;
-            aitt.Subscribe(
-                  "test/+",
-                  [&](aitt::MSG *handle, const void *msg, const int szmsg, void *cbdata) -> void {
-                      AITTTCPTest *test = static_cast<AITTTCPTest *>(cbdata);
-                      INFO("Got Message(Topic:%s, size:%d)", handle->GetTopic().c_str(), szmsg);
-                      ++cnt;
-
-                      std::stringstream ss;
-                      ss << "test/value" << cnt;
-                      EXPECT_EQ(ss.str(), handle->GetTopic());
-
-                      if (cnt == 3)
-                          test->ToggleReady();
-                  },
-                  static_cast<void *>(this), protocol);
-
-            // Wait a few seconds until the AITT client gets a server list (discover devices)
-            DBG("Sleep %d ms", SLEEP_MS);
-            usleep(100 * SLEEP_MS);
-
-            aitt.Publish("test/value1", dump_msg, 12, protocol);
-            aitt.Publish("test/value2", dump_msg, 1600, protocol);
-            aitt.Publish("test/value3", dump_msg, 1600, protocol);
+            aitt.SetConnectionCallback([&](AITT &handle, int status, void *user_data) {
+                if (status != AITT_CONNECTED)
+                    return;
+                aitt.Subscribe(
+                      sub_topic,
+                      [&](aitt::MSG *handle, const void *msg, const int szmsg,
+                            void *cbdata) -> void {
+                          AITTTCPTest *test = static_cast<AITTTCPTest *>(cbdata);
+                          INFO("Got Message(Topic:%s, size:%d)", handle->GetTopic().c_str(), szmsg);
+                          ++cnt;
+
+                          if (cnt == 3)
+                              test->ToggleReady();
+                      },
+                      static_cast<void *>(this), protocol);
+
+                // Wait a few seconds until the AITT client gets a server list (discover devices)
+                DBG("Sleep %d ms", SLEEP_MS);
+                usleep(100 * SLEEP_MS);
+
+                aitt.Publish("test/value1", dump_msg, 12, protocol);
+                if (single_level) {
+                    aitt.Publish("test/value2", dump_msg, 1600, protocol);
+                    aitt.Publish("test/value3", dump_msg, 1600, protocol);
+                } else {
+                    aitt.Publish("test/step1/value1", dump_msg, 1600, protocol);
+                    aitt.Publish("test/step2/value1", dump_msg, 1600, protocol);
+                }
+            });
+            aitt.Connect();
 
             mainLoop.AddTimeout(CHECK_INTERVAL,
                   [&](MainLoopHandler::MainLoopResult result, int fd,
@@ -77,55 +84,24 @@ class AITTTCPTest : public testing::Test, public AittTests {
     }
 };
 
-TEST_F(AITTTCPTest, TCP_Wildcards1_Anytime)
+TEST_F(AITTTCPTest, TCP_Wildcard_single_Anytime)
 {
-    try {
-        char dump_msg[204800];
-
-        AITT aitt(clientId, LOCAL_IP);
-        aitt.Connect();
-
-        aitt.Subscribe(
-              "test/#",
-              [&](aitt::MSG *handle, const void *msg, const int szmsg, void *cbdata) -> void {
-                  AITTTCPTest *test = static_cast<AITTTCPTest *>(cbdata);
-                  INFO("Got Message(Topic:%s, size:%d)", handle->GetTopic().c_str(), szmsg);
-                  static int cnt = 0;
-                  ++cnt;
-                  if (cnt == 3)
-                      test->ToggleReady();
-              },
-              static_cast<void *>(this), AITT_TYPE_TCP);
-
-        // Wait a few seconds until the AITT client gets a server list (discover devices)
-        DBG("Sleep %d ms", SLEEP_MS);
-        usleep(100 * SLEEP_MS);
-
-        aitt.Publish("test/step1/value1", dump_msg, 12, AITT_TYPE_TCP);
-        aitt.Publish("test/step2/value1", dump_msg, 1600, AITT_TYPE_TCP);
-        aitt.Publish("test/step2/value1", dump_msg, 1600, AITT_TYPE_TCP);
-
-        mainLoop.AddTimeout(CHECK_INTERVAL,
-              [&](MainLoopHandler::MainLoopResult result, int fd,
-                    MainLoopHandler::MainLoopData *data) -> int {
-                  return ReadyCheck(static_cast<AittTests *>(this));
-              });
-        IterateEventLoop();
+    TCPWildcardsTopicTemplate(AITT_TYPE_TCP, true);
+}
 
-        ASSERT_TRUE(ready);
-    } catch (std::exception &e) {
-        FAIL() << "Unexpected exception: " << e.what();
-    }
+TEST_F(AITTTCPTest, SECURE_TCP_Wildcard_single_Anytime)
+{
+    TCPWildcardsTopicTemplate(AITT_TYPE_TCP_SECURE, true);
 }
 
-TEST_F(AITTTCPTest, TCP_Wildcards2_Anytime)
+TEST_F(AITTTCPTest, TCP_Wildcard_multi_Anytime)
 {
-    TCPWildcardsTopicTemplate(AITT_TYPE_TCP);
+    TCPWildcardsTopicTemplate(AITT_TYPE_TCP, false);
 }
 
-TEST_F(AITTTCPTest, SECURE_TCP_Wildcards_Anytime)
+TEST_F(AITTTCPTest, SECURE_TCP_Wildcard_multi_Anytime)
 {
-    TCPWildcardsTopicTemplate(AITT_TYPE_TCP_SECURE);
+    TCPWildcardsTopicTemplate(AITT_TYPE_TCP_SECURE, false);
 }
 
 TEST_F(AITTTCPTest, SECURE_TCP_various_msg_Anytime)