From: Youngjae Shin Date: Mon, 12 Dec 2022 05:17:26 +0000 (+0900) Subject: revise Posix Mainloop X-Git-Tag: accepted/tizen/unified/20230309.161434~26 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=c89f3809aeb4ce2a852f79438bf02553ff02910d;p=platform%2Fcore%2Fml%2Faitt.git revise Posix Mainloop [Problem] cb data is double freed [Solution] use shared_ptr - revise TCP test and lock --- diff --git a/common/PosixMainLoop.cc b/common/PosixMainLoop.cc index be495f1..12e323e 100644 --- a/common/PosixMainLoop.cc +++ b/common/PosixMainLoop.cc @@ -57,9 +57,6 @@ PosixMainLoop::~PosixMainLoop() for (const auto &pair : timeout_table) delete pair.second; - for (const auto &pair : watch_table) - delete pair.second; - for (const auto &data : idle_table) delete data; @@ -82,21 +79,19 @@ void PosixMainLoop::Run() table_lock.lock(); struct pollfd pfds[watch_table.size() + 2]; - - // for Watch - for (auto iter = watch_table.begin(); iter != watch_table.end(); iter++) { + for (auto iter = watch_table.begin(); iter != watch_table.end(); iter++) pfds[nfds++] = {iter->second->fd, POLLIN | POLLHUP | POLLERR, 0}; - } table_lock.unlock(); - // for idle pfds[nfds++] = {idle_pipe[0], POLLIN, 0}; - // for timeout pfds[nfds++] = {timeout_pipe[0], POLLIN, 0}; if (0 < poll(pfds, nfds, -1)) { bool handled = false; - handled |= CheckTimeout(pfds[nfds - 1], POLLIN); + int ret = CheckTimeout(pfds[nfds - 1], POLLIN); + if (ret < 0) + break; + handled |= !!ret; handled |= CheckWatch(pfds, nfds - 2, POLLIN | POLLHUP | POLLERR); if (!handled) CheckIdle(pfds[nfds - 2], POLLIN); @@ -114,7 +109,7 @@ bool PosixMainLoop::Quit() } is_running = false; - WriteToPipe(timeout_pipe[1], INVALID); + WriteToPipe(timeout_pipe[1], QUIT); return true; } @@ -138,7 +133,7 @@ void PosixMainLoop::AddWatch(int fd, const mainLoopCB &cb, MainLoopData *user_da std::lock_guard lock(table_lock); watch_table.insert(WatchMap::value_type(cb_data->fd, cb_data)); - WriteToPipe(idle_pipe[1], INVALID); + WriteToPipe(idle_pipe[1], PING); } PosixMainLoop::MainLoopData *PosixMainLoop::RemoveWatch(int fd) @@ -150,7 +145,6 @@ PosixMainLoop::MainLoopData *PosixMainLoop::RemoveWatch(int fd) return nullptr; } MainLoopData *user_data = iter->second->data; - delete iter->second; watch_table.erase(iter); return user_data; @@ -219,14 +213,12 @@ bool PosixMainLoop::CheckWatch(pollfd *pfds, nfds_t nfds, short int event) { bool handled = false; for (nfds_t idx = 0; idx < nfds; idx++) { - if (false == (pfds[idx].revents & event)) { - DBG("Unknown Event(%d)", pfds[idx].revents); + if (false == (pfds[idx].revents & event)) continue; - } table_lock.lock(); auto iter = watch_table.find(pfds[idx].fd); - MainLoopCbData *cb_data = (watch_table.end() == iter) ? NULL : iter->second; + auto cb_data = (watch_table.end() == iter) ? NULL : iter->second; table_lock.unlock(); if (cb_data) { @@ -243,19 +235,17 @@ bool PosixMainLoop::CheckWatch(pollfd *pfds, nfds_t nfds, short int event) return handled; } -bool PosixMainLoop::CheckTimeout(pollfd pfd, short int event) +int PosixMainLoop::CheckTimeout(pollfd pfd, short int event) { - if (false == (pfd.revents & event)) { - DBG("Unknown Event(%d)", pfd.revents); + if (false == (pfd.revents & event)) return false; - } bool handled = false; - unsigned int identifier = INVALID; + int identifier = PING; while (read(pfd.fd, &identifier, sizeof(identifier)) == sizeof(identifier)) { - if (identifier == INVALID) { + if (identifier == QUIT) { INFO("Terminating"); - return true; + return -1; } table_lock.lock(); @@ -278,12 +268,10 @@ bool PosixMainLoop::CheckTimeout(pollfd pfd, short int event) void PosixMainLoop::CheckIdle(pollfd pfd, short int event) { - if (false == (pfd.revents & event)) { - DBG("Unknown Event(%d)", pfd.revents); + if (false == (pfd.revents & event)) return; - } - unsigned int identifier = INVALID; + int identifier = PING; if (read(pfd.fd, &identifier, sizeof(identifier)) == sizeof(identifier)) { if (identifier == IDLE) { table_lock.lock(); diff --git a/common/PosixMainLoop.h b/common/PosixMainLoop.h index 3325692..4ae3593 100644 --- a/common/PosixMainLoop.h +++ b/common/PosixMainLoop.h @@ -20,6 +20,7 @@ #include #include +#include #include #include "MainLoopIface.h" @@ -41,7 +42,8 @@ class PosixMainLoop : public MainLoopIface { private: enum PipeValue { - INVALID = 0, + QUIT = -1, + PING = 0, IDLE = 1, TIMEOUT_START = 2, }; @@ -60,7 +62,7 @@ class PosixMainLoop : public MainLoopIface { int timeout_interval; }; - using WatchMap = std::map; + using WatchMap = std::map>; using TimeoutMap = std::map; using IdleQueue = std::deque; @@ -69,7 +71,7 @@ class PosixMainLoop : public MainLoopIface { void TimeoutTableInsert(unsigned int identifier, MainLoopCbData *cb_data); bool CheckWatch(pollfd *pfds, nfds_t nfds, short int event); - bool CheckTimeout(pollfd pfd, short int event); + int CheckTimeout(pollfd pfd, short int event); void CheckIdle(pollfd pfd, short int event); int SetTimer(int interval, unsigned int timeout_id); @@ -79,7 +81,7 @@ class PosixMainLoop : public MainLoopIface { std::mutex table_lock; int timeout_pipe[2]; int idle_pipe[2]; - bool is_running; }; + } // namespace aitt diff --git a/debian/rules b/debian/rules index dcfe88d..c021c28 100755 --- a/debian/rules +++ b/debian/rules @@ -41,7 +41,7 @@ override_dh_auto_configure: -DCMAKE_VERBOSE_MAKEFILE:BOOL=OFF \ -DBUILD_TESTING:BOOL=${TEST} \ -DCOVERAGE_TEST:BOOL=${COVERAGE} \ - -DWITH_MBEDTLS:BOOL=${MBEDTLS}; \ + -DWITH_MBEDTLS:BOOL=${MBEDTLS} \ -DUSE_GLIB:BOOL=${GLIB}; \ cd - diff --git a/modules/tcp/Module.cc b/modules/tcp/Module.cc index f2265d3..27b80bc 100644 --- a/modules/tcp/Module.cc +++ b/modules/tcp/Module.cc @@ -167,30 +167,27 @@ void *Module::Subscribe(const std::string &topic, const AittTransport::Subscribe void *Module::Unsubscribe(void *handlePtr) { + std::lock_guard autoLock(subscribeTableLock); + int handle = static_cast(reinterpret_cast(handlePtr)); TCPServerData *listen_info = dynamic_cast(main_loop.RemoveWatch(handle)); if (!listen_info) return nullptr; - { - std::lock_guard autoLock(subscribeTableLock); - auto it = subscribeTable.find(listen_info->topic); - if (it == subscribeTable.end()) - throw std::runtime_error("Service is not registered: " + listen_info->topic); + auto it = subscribeTable.find(listen_info->topic); + if (it == subscribeTable.end()) + throw std::runtime_error("Service is not registered: " + listen_info->topic); - subscribeTable.erase(it); + subscribeTable.erase(it); - UpdateDiscoveryMsg(); - } + UpdateDiscoveryMsg(); void *cbdata = listen_info->cbdata; - listen_info->client_lock.lock(); for (auto fd : listen_info->client_list) { TCPData *tcp_data = dynamic_cast(main_loop.RemoveWatch(fd)); delete tcp_data; } listen_info->client_list.clear(); - listen_info->client_lock.unlock(); delete listen_info; return cbdata; @@ -353,16 +350,15 @@ int Module::ReceiveData(MainLoopHandler::MainLoopResult result, int handle, int Module::HandleClientDisconnect(int handle) { + std::lock_guard autoLock(subscribeTableLock); TCPData *tcp_data = dynamic_cast(main_loop.RemoveWatch(handle)); if (tcp_data == nullptr) { ERR("No watch data"); return AITT_LOOP_EVENT_REMOVE; } - tcp_data->parent->client_lock.lock(); auto it = std::find(tcp_data->parent->client_list.begin(), tcp_data->parent->client_list.end(), handle); tcp_data->parent->client_list.erase(it); - tcp_data->parent->client_lock.unlock(); delete tcp_data; return AITT_LOOP_EVENT_REMOVE; diff --git a/modules/tcp/Module.h b/modules/tcp/Module.h index 0d3700e..028d407 100644 --- a/modules/tcp/Module.h +++ b/modules/tcp/Module.h @@ -56,7 +56,6 @@ class Module : public AittTransport { void *cbdata; std::string topic; std::vector client_list; - std::mutex client_lock; }; struct TCPData : public MainLoopHandler::MainLoopData { diff --git a/tests/AITT_TCP_test.cc b/tests/AITT_TCP_test.cc index 6b2ea09..a72f16d 100644 --- a/tests/AITT_TCP_test.cc +++ b/tests/AITT_TCP_test.cc @@ -30,38 +30,45 @@ class AITTTCPTest : public testing::Test, public AittTests { void SetUp() override { Init(); } void TearDown() override { Deinit(); } - void TCPWildcardsTopicTemplate(AittProtocol protocol) + void TCPWildcardsTopicTemplate(AittProtocol protocol, bool single_level) { try { char dump_msg[204800]; + std::string sub_topic = "test/" + std::string(single_level ? "+" : "#"); AITT aitt(clientId, LOCAL_IP); - aitt.Connect(); int cnt = 0; - aitt.Subscribe( - "test/+", - [&](aitt::MSG *handle, const void *msg, const int szmsg, void *cbdata) -> void { - AITTTCPTest *test = static_cast(cbdata); - INFO("Got Message(Topic:%s, size:%d)", handle->GetTopic().c_str(), szmsg); - ++cnt; - - std::stringstream ss; - ss << "test/value" << cnt; - EXPECT_EQ(ss.str(), handle->GetTopic()); - - if (cnt == 3) - test->ToggleReady(); - }, - static_cast(this), protocol); - - // Wait a few seconds until the AITT client gets a server list (discover devices) - DBG("Sleep %d ms", SLEEP_MS); - usleep(100 * SLEEP_MS); - - aitt.Publish("test/value1", dump_msg, 12, protocol); - aitt.Publish("test/value2", dump_msg, 1600, protocol); - aitt.Publish("test/value3", dump_msg, 1600, protocol); + aitt.SetConnectionCallback([&](AITT &handle, int status, void *user_data) { + if (status != AITT_CONNECTED) + return; + aitt.Subscribe( + sub_topic, + [&](aitt::MSG *handle, const void *msg, const int szmsg, + void *cbdata) -> void { + AITTTCPTest *test = static_cast(cbdata); + INFO("Got Message(Topic:%s, size:%d)", handle->GetTopic().c_str(), szmsg); + ++cnt; + + if (cnt == 3) + test->ToggleReady(); + }, + static_cast(this), protocol); + + // Wait a few seconds until the AITT client gets a server list (discover devices) + DBG("Sleep %d ms", SLEEP_MS); + usleep(100 * SLEEP_MS); + + aitt.Publish("test/value1", dump_msg, 12, protocol); + if (single_level) { + aitt.Publish("test/value2", dump_msg, 1600, protocol); + aitt.Publish("test/value3", dump_msg, 1600, protocol); + } else { + aitt.Publish("test/step1/value1", dump_msg, 1600, protocol); + aitt.Publish("test/step2/value1", dump_msg, 1600, protocol); + } + }); + aitt.Connect(); mainLoop.AddTimeout(CHECK_INTERVAL, [&](MainLoopHandler::MainLoopResult result, int fd, @@ -77,55 +84,24 @@ class AITTTCPTest : public testing::Test, public AittTests { } }; -TEST_F(AITTTCPTest, TCP_Wildcards1_Anytime) +TEST_F(AITTTCPTest, TCP_Wildcard_single_Anytime) { - try { - char dump_msg[204800]; - - AITT aitt(clientId, LOCAL_IP); - aitt.Connect(); - - aitt.Subscribe( - "test/#", - [&](aitt::MSG *handle, const void *msg, const int szmsg, void *cbdata) -> void { - AITTTCPTest *test = static_cast(cbdata); - INFO("Got Message(Topic:%s, size:%d)", handle->GetTopic().c_str(), szmsg); - static int cnt = 0; - ++cnt; - if (cnt == 3) - test->ToggleReady(); - }, - static_cast(this), AITT_TYPE_TCP); - - // Wait a few seconds until the AITT client gets a server list (discover devices) - DBG("Sleep %d ms", SLEEP_MS); - usleep(100 * SLEEP_MS); - - aitt.Publish("test/step1/value1", dump_msg, 12, AITT_TYPE_TCP); - aitt.Publish("test/step2/value1", dump_msg, 1600, AITT_TYPE_TCP); - aitt.Publish("test/step2/value1", dump_msg, 1600, AITT_TYPE_TCP); - - mainLoop.AddTimeout(CHECK_INTERVAL, - [&](MainLoopHandler::MainLoopResult result, int fd, - MainLoopHandler::MainLoopData *data) -> int { - return ReadyCheck(static_cast(this)); - }); - IterateEventLoop(); + TCPWildcardsTopicTemplate(AITT_TYPE_TCP, true); +} - ASSERT_TRUE(ready); - } catch (std::exception &e) { - FAIL() << "Unexpected exception: " << e.what(); - } +TEST_F(AITTTCPTest, SECURE_TCP_Wildcard_single_Anytime) +{ + TCPWildcardsTopicTemplate(AITT_TYPE_TCP_SECURE, true); } -TEST_F(AITTTCPTest, TCP_Wildcards2_Anytime) +TEST_F(AITTTCPTest, TCP_Wildcard_multi_Anytime) { - TCPWildcardsTopicTemplate(AITT_TYPE_TCP); + TCPWildcardsTopicTemplate(AITT_TYPE_TCP, false); } -TEST_F(AITTTCPTest, SECURE_TCP_Wildcards_Anytime) +TEST_F(AITTTCPTest, SECURE_TCP_Wildcard_multi_Anytime) { - TCPWildcardsTopicTemplate(AITT_TYPE_TCP_SECURE); + TCPWildcardsTopicTemplate(AITT_TYPE_TCP_SECURE, false); } TEST_F(AITTTCPTest, SECURE_TCP_various_msg_Anytime)