for (const auto &pair : timeout_table)
delete pair.second;
- for (const auto &pair : watch_table)
- delete pair.second;
-
for (const auto &data : idle_table)
delete data;
table_lock.lock();
struct pollfd pfds[watch_table.size() + 2];
-
- // for Watch
- for (auto iter = watch_table.begin(); iter != watch_table.end(); iter++) {
+ for (auto iter = watch_table.begin(); iter != watch_table.end(); iter++)
pfds[nfds++] = {iter->second->fd, POLLIN | POLLHUP | POLLERR, 0};
- }
table_lock.unlock();
- // for idle
pfds[nfds++] = {idle_pipe[0], POLLIN, 0};
- // for timeout
pfds[nfds++] = {timeout_pipe[0], POLLIN, 0};
if (0 < poll(pfds, nfds, -1)) {
bool handled = false;
- handled |= CheckTimeout(pfds[nfds - 1], POLLIN);
+ int ret = CheckTimeout(pfds[nfds - 1], POLLIN);
+ if (ret < 0)
+ break;
+ handled |= !!ret;
handled |= CheckWatch(pfds, nfds - 2, POLLIN | POLLHUP | POLLERR);
if (!handled)
CheckIdle(pfds[nfds - 2], POLLIN);
}
is_running = false;
- WriteToPipe(timeout_pipe[1], INVALID);
+ WriteToPipe(timeout_pipe[1], QUIT);
return true;
}
std::lock_guard<std::mutex> lock(table_lock);
watch_table.insert(WatchMap::value_type(cb_data->fd, cb_data));
- WriteToPipe(idle_pipe[1], INVALID);
+ WriteToPipe(idle_pipe[1], PING);
}
PosixMainLoop::MainLoopData *PosixMainLoop::RemoveWatch(int fd)
return nullptr;
}
MainLoopData *user_data = iter->second->data;
- delete iter->second;
watch_table.erase(iter);
return user_data;
{
bool handled = false;
for (nfds_t idx = 0; idx < nfds; idx++) {
- if (false == (pfds[idx].revents & event)) {
- DBG("Unknown Event(%d)", pfds[idx].revents);
+ if (false == (pfds[idx].revents & event))
continue;
- }
table_lock.lock();
auto iter = watch_table.find(pfds[idx].fd);
- MainLoopCbData *cb_data = (watch_table.end() == iter) ? NULL : iter->second;
+ auto cb_data = (watch_table.end() == iter) ? NULL : iter->second;
table_lock.unlock();
if (cb_data) {
return handled;
}
-bool PosixMainLoop::CheckTimeout(pollfd pfd, short int event)
+int PosixMainLoop::CheckTimeout(pollfd pfd, short int event)
{
- if (false == (pfd.revents & event)) {
- DBG("Unknown Event(%d)", pfd.revents);
+ if (false == (pfd.revents & event))
return false;
- }
bool handled = false;
- unsigned int identifier = INVALID;
+ int identifier = PING;
while (read(pfd.fd, &identifier, sizeof(identifier)) == sizeof(identifier)) {
- if (identifier == INVALID) {
+ if (identifier == QUIT) {
INFO("Terminating");
- return true;
+ return -1;
}
table_lock.lock();
void PosixMainLoop::CheckIdle(pollfd pfd, short int event)
{
- if (false == (pfd.revents & event)) {
- DBG("Unknown Event(%d)", pfd.revents);
+ if (false == (pfd.revents & event))
return;
- }
- unsigned int identifier = INVALID;
+ int identifier = PING;
if (read(pfd.fd, &identifier, sizeof(identifier)) == sizeof(identifier)) {
if (identifier == IDLE) {
table_lock.lock();
#include <deque>
#include <map>
+#include <memory>
#include <mutex>
#include "MainLoopIface.h"
private:
enum PipeValue {
- INVALID = 0,
+ QUIT = -1,
+ PING = 0,
IDLE = 1,
TIMEOUT_START = 2,
};
int timeout_interval;
};
- using WatchMap = std::map<int, MainLoopCbData *>;
+ using WatchMap = std::map<int, std::shared_ptr<MainLoopCbData>>;
using TimeoutMap = std::map<unsigned int, MainLoopCbData *>;
using IdleQueue = std::deque<MainLoopCbData *>;
void TimeoutTableInsert(unsigned int identifier, MainLoopCbData *cb_data);
bool CheckWatch(pollfd *pfds, nfds_t nfds, short int event);
- bool CheckTimeout(pollfd pfd, short int event);
+ int CheckTimeout(pollfd pfd, short int event);
void CheckIdle(pollfd pfd, short int event);
int SetTimer(int interval, unsigned int timeout_id);
std::mutex table_lock;
int timeout_pipe[2];
int idle_pipe[2];
-
bool is_running;
};
+
} // namespace aitt
-DCMAKE_VERBOSE_MAKEFILE:BOOL=OFF \
-DBUILD_TESTING:BOOL=${TEST} \
-DCOVERAGE_TEST:BOOL=${COVERAGE} \
- -DWITH_MBEDTLS:BOOL=${MBEDTLS}; \
+ -DWITH_MBEDTLS:BOOL=${MBEDTLS} \
-DUSE_GLIB:BOOL=${GLIB}; \
cd -
void *Module::Unsubscribe(void *handlePtr)
{
+ std::lock_guard<std::mutex> autoLock(subscribeTableLock);
+
int handle = static_cast<int>(reinterpret_cast<intptr_t>(handlePtr));
TCPServerData *listen_info = dynamic_cast<TCPServerData *>(main_loop.RemoveWatch(handle));
if (!listen_info)
return nullptr;
- {
- std::lock_guard<std::mutex> autoLock(subscribeTableLock);
- auto it = subscribeTable.find(listen_info->topic);
- if (it == subscribeTable.end())
- throw std::runtime_error("Service is not registered: " + listen_info->topic);
+ auto it = subscribeTable.find(listen_info->topic);
+ if (it == subscribeTable.end())
+ throw std::runtime_error("Service is not registered: " + listen_info->topic);
- subscribeTable.erase(it);
+ subscribeTable.erase(it);
- UpdateDiscoveryMsg();
- }
+ UpdateDiscoveryMsg();
void *cbdata = listen_info->cbdata;
- listen_info->client_lock.lock();
for (auto fd : listen_info->client_list) {
TCPData *tcp_data = dynamic_cast<TCPData *>(main_loop.RemoveWatch(fd));
delete tcp_data;
}
listen_info->client_list.clear();
- listen_info->client_lock.unlock();
delete listen_info;
return cbdata;
int Module::HandleClientDisconnect(int handle)
{
+ std::lock_guard<std::mutex> autoLock(subscribeTableLock);
TCPData *tcp_data = dynamic_cast<TCPData *>(main_loop.RemoveWatch(handle));
if (tcp_data == nullptr) {
ERR("No watch data");
return AITT_LOOP_EVENT_REMOVE;
}
- tcp_data->parent->client_lock.lock();
auto it = std::find(tcp_data->parent->client_list.begin(), tcp_data->parent->client_list.end(),
handle);
tcp_data->parent->client_list.erase(it);
- tcp_data->parent->client_lock.unlock();
delete tcp_data;
return AITT_LOOP_EVENT_REMOVE;
void *cbdata;
std::string topic;
std::vector<int> client_list;
- std::mutex client_lock;
};
struct TCPData : public MainLoopHandler::MainLoopData {
void SetUp() override { Init(); }
void TearDown() override { Deinit(); }
- void TCPWildcardsTopicTemplate(AittProtocol protocol)
+ void TCPWildcardsTopicTemplate(AittProtocol protocol, bool single_level)
{
try {
char dump_msg[204800];
+ std::string sub_topic = "test/" + std::string(single_level ? "+" : "#");
AITT aitt(clientId, LOCAL_IP);
- aitt.Connect();
int cnt = 0;
- aitt.Subscribe(
- "test/+",
- [&](aitt::MSG *handle, const void *msg, const int szmsg, void *cbdata) -> void {
- AITTTCPTest *test = static_cast<AITTTCPTest *>(cbdata);
- INFO("Got Message(Topic:%s, size:%d)", handle->GetTopic().c_str(), szmsg);
- ++cnt;
-
- std::stringstream ss;
- ss << "test/value" << cnt;
- EXPECT_EQ(ss.str(), handle->GetTopic());
-
- if (cnt == 3)
- test->ToggleReady();
- },
- static_cast<void *>(this), protocol);
-
- // Wait a few seconds until the AITT client gets a server list (discover devices)
- DBG("Sleep %d ms", SLEEP_MS);
- usleep(100 * SLEEP_MS);
-
- aitt.Publish("test/value1", dump_msg, 12, protocol);
- aitt.Publish("test/value2", dump_msg, 1600, protocol);
- aitt.Publish("test/value3", dump_msg, 1600, protocol);
+ aitt.SetConnectionCallback([&](AITT &handle, int status, void *user_data) {
+ if (status != AITT_CONNECTED)
+ return;
+ aitt.Subscribe(
+ sub_topic,
+ [&](aitt::MSG *handle, const void *msg, const int szmsg,
+ void *cbdata) -> void {
+ AITTTCPTest *test = static_cast<AITTTCPTest *>(cbdata);
+ INFO("Got Message(Topic:%s, size:%d)", handle->GetTopic().c_str(), szmsg);
+ ++cnt;
+
+ if (cnt == 3)
+ test->ToggleReady();
+ },
+ static_cast<void *>(this), protocol);
+
+ // Wait a few seconds until the AITT client gets a server list (discover devices)
+ DBG("Sleep %d ms", SLEEP_MS);
+ usleep(100 * SLEEP_MS);
+
+ aitt.Publish("test/value1", dump_msg, 12, protocol);
+ if (single_level) {
+ aitt.Publish("test/value2", dump_msg, 1600, protocol);
+ aitt.Publish("test/value3", dump_msg, 1600, protocol);
+ } else {
+ aitt.Publish("test/step1/value1", dump_msg, 1600, protocol);
+ aitt.Publish("test/step2/value1", dump_msg, 1600, protocol);
+ }
+ });
+ aitt.Connect();
mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
}
};
-TEST_F(AITTTCPTest, TCP_Wildcards1_Anytime)
+TEST_F(AITTTCPTest, TCP_Wildcard_single_Anytime)
{
- try {
- char dump_msg[204800];
-
- AITT aitt(clientId, LOCAL_IP);
- aitt.Connect();
-
- aitt.Subscribe(
- "test/#",
- [&](aitt::MSG *handle, const void *msg, const int szmsg, void *cbdata) -> void {
- AITTTCPTest *test = static_cast<AITTTCPTest *>(cbdata);
- INFO("Got Message(Topic:%s, size:%d)", handle->GetTopic().c_str(), szmsg);
- static int cnt = 0;
- ++cnt;
- if (cnt == 3)
- test->ToggleReady();
- },
- static_cast<void *>(this), AITT_TYPE_TCP);
-
- // Wait a few seconds until the AITT client gets a server list (discover devices)
- DBG("Sleep %d ms", SLEEP_MS);
- usleep(100 * SLEEP_MS);
-
- aitt.Publish("test/step1/value1", dump_msg, 12, AITT_TYPE_TCP);
- aitt.Publish("test/step2/value1", dump_msg, 1600, AITT_TYPE_TCP);
- aitt.Publish("test/step2/value1", dump_msg, 1600, AITT_TYPE_TCP);
-
- mainLoop.AddTimeout(CHECK_INTERVAL,
- [&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) -> int {
- return ReadyCheck(static_cast<AittTests *>(this));
- });
- IterateEventLoop();
+ TCPWildcardsTopicTemplate(AITT_TYPE_TCP, true);
+}
- ASSERT_TRUE(ready);
- } catch (std::exception &e) {
- FAIL() << "Unexpected exception: " << e.what();
- }
+TEST_F(AITTTCPTest, SECURE_TCP_Wildcard_single_Anytime)
+{
+ TCPWildcardsTopicTemplate(AITT_TYPE_TCP_SECURE, true);
}
-TEST_F(AITTTCPTest, TCP_Wildcards2_Anytime)
+TEST_F(AITTTCPTest, TCP_Wildcard_multi_Anytime)
{
- TCPWildcardsTopicTemplate(AITT_TYPE_TCP);
+ TCPWildcardsTopicTemplate(AITT_TYPE_TCP, false);
}
-TEST_F(AITTTCPTest, SECURE_TCP_Wildcards_Anytime)
+TEST_F(AITTTCPTest, SECURE_TCP_Wildcard_multi_Anytime)
{
- TCPWildcardsTopicTemplate(AITT_TYPE_TCP_SECURE);
+ TCPWildcardsTopicTemplate(AITT_TYPE_TCP_SECURE, false);
}
TEST_F(AITTTCPTest, SECURE_TCP_various_msg_Anytime)