ADD_DEFINITIONS(-DPLATFORM=${PLATFORM})
INCLUDE(${PROJECT_ROOT_DIR}/cmake/aitt_android_flatbuffers.cmake)
INCLUDE(${PROJECT_ROOT_DIR}/cmake/aitt_android_mosquitto.cmake)
- SET(AITT_NEEDS_LIBRARIES ${GLIB_LIBRARIES} ${MOSQUITTO_LIBRARY} ${FLATBUFFERS_LIBRARY} ${LOG_LIBRARIES})
+ SET(AITT_NEEDS_LIBRARIES ${MOSQUITTO_LIBRARY} ${FLATBUFFERS_LIBRARY} ${LOG_LIBRARIES})
ELSE(PLATFORM STREQUAL "android")
IF(PLATFORM STREQUAL "tizen")
ADD_DEFINITIONS(-DTIZEN)
ADD_DEFINITIONS(-DPLATFORM=${PLATFORM})
SET(ADDITIONAL_OPT "-DTIZEN")
- SET(TIZEN_LOG_PKG dlog)
+ SET(ADDITION_PKG dlog)
ENDIF(PLATFORM STREQUAL "tizen")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Werror -Wno-psabi -fdiagnostics-color -fvisibility=hidden")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -pie")
IF(USE_GLIB)
- PKG_CHECK_MODULES(AITT_NEEDS REQUIRED ${TIZEN_LOG_PKG} libmosquitto flatbuffers glib-2.0)
- ELSE(USE_GLIB)
- PKG_CHECK_MODULES(AITT_NEEDS REQUIRED ${TIZEN_LOG_PKG} libmosquitto flatbuffers)
+ SET(ADDITION_PKG "${ADDITION_PKG} glib-2.0")
ENDIF(USE_GLIB)
+ PKG_CHECK_MODULES(AITT_NEEDS REQUIRED ${ADDITION_PKG} libmosquitto flatbuffers)
INCLUDE_DIRECTORIES(${AITT_NEEDS_INCLUDE_DIRS})
LINK_DIRECTORIES(${AITT_NEEDS_LIBRARY_DIRS})
ENDIF(PLATFORM STREQUAL "android")
INCLUDE_DIRECTORIES(include common)
-IF(USE_GLIB)
- INCLUDE_DIRECTORIES(common/glib)
-ELSE(USE_GLIB)
- INCLUDE_DIRECTORIES(common/posix)
-ENDIF(USE_GLIB)
-
AUX_SOURCE_DIRECTORY(src AITT_SRC)
SET(AITT_INTERNAL_SRC src/ModuleManager.cc src/NullTransport.cc src/MosquittoMQ.cc)
FILE(GLOB COMMON_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/*.cc)
IF(USE_GLIB)
- ADD_LIBRARY(${AITT_COMMON} SHARED ${COMMON_SRCS} ${CMAKE_CURRENT_SOURCE_DIR}/glib/MainLoopHandler.cc)
+ ADD_DEFINITIONS(-DUSE_GLIB)
+ LIST(REMOVE_ITEM COMMON_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/PosixMainLoop.cc)
ELSE(USE_GLIB)
- ADD_LIBRARY(${AITT_COMMON} SHARED ${COMMON_SRCS} ${CMAKE_CURRENT_SOURCE_DIR}/posix/MainLoopHandler.cc)
+ LIST(REMOVE_ITEM COMMON_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/GlibMainLoop.cc)
IF(NOT PLATFORM STREQUAL "android")
- TARGET_LINK_LIBRARIES(${AITT_COMMON} rt)
+ SET(ADDITION_LIB rt)
ENDIF()
ENDIF(USE_GLIB)
-TARGET_LINK_LIBRARIES(${AITT_COMMON} ${AITT_NEEDS_LIBRARIES} Threads::Threads)
+ADD_LIBRARY(${AITT_COMMON} SHARED ${COMMON_SRCS})
+TARGET_LINK_LIBRARIES(${AITT_COMMON} ${AITT_NEEDS_LIBRARIES} ${ADDITION_LIB} Threads::Threads)
TARGET_COMPILE_OPTIONS(${AITT_COMMON} PRIVATE ${AITT_NEEDS_CFLAGS_OTHER} "-fvisibility=default")
IF(VERSIONING)
SET_TARGET_PROPERTIES(${AITT_COMMON} PROPERTIES
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "MainLoopHandler.h"
+#include "GlibMainLoop.h"
#include <glib.h>
namespace aitt {
-MainLoopHandler::MainLoopHandler()
+GlibMainLoop::GlibMainLoop()
{
GMainContext *ctx = g_main_context_new();
if (ctx == nullptr)
- throw std::runtime_error("Failed to create a context");
+ throw std::runtime_error("g_main_context_new() Fail");
loop = g_main_loop_new(ctx, FALSE);
if (loop == nullptr) {
g_main_context_unref(ctx);
- throw std::runtime_error("Failed to create a loop");
+ throw std::runtime_error("g_main_loop_new() Fail");
}
g_main_context_unref(ctx);
}
-MainLoopHandler::~MainLoopHandler()
+GlibMainLoop::~GlibMainLoop()
{
g_main_loop_unref(loop);
}
-void MainLoopHandler::Run()
+void GlibMainLoop::Run()
{
g_main_loop_run(loop);
}
-bool MainLoopHandler::Quit()
+bool GlibMainLoop::Quit()
{
if (g_main_loop_is_running(loop) == FALSE) {
ERR("main loop is not running");
return true;
}
-void MainLoopHandler::AddWatch(int fd, const mainLoopCB &cb, MainLoopData *user_data)
+void GlibMainLoop::AddIdle(const mainLoopCB &cb, MainLoopData *user_data)
+{
+ MainLoopCbData *cb_data = new MainLoopCbData();
+ cb_data->cb = cb;
+ cb_data->data = user_data;
+ cb_data->ctx = g_main_loop_get_context(loop);
+
+ GSource *source = g_idle_source_new();
+ g_source_set_priority(source, G_PRIORITY_HIGH);
+ g_source_set_callback(source, CallbackHandler, cb_data, DestroyNotify);
+ g_source_attach(source, cb_data->ctx);
+ g_source_unref(source);
+}
+
+void GlibMainLoop::AddWatch(int fd, const mainLoopCB &cb, MainLoopData *user_data)
{
MainLoopCbData *cb_data = new MainLoopCbData();
GMainContext *ctx = g_main_loop_get_context(loop);
callback_table_lock.unlock();
}
-MainLoopHandler::MainLoopData *MainLoopHandler::RemoveWatch(int fd)
+GlibMainLoop::MainLoopData *GlibMainLoop::RemoveWatch(int fd)
{
GSource *source;
MainLoopData *user_data = nullptr;
return user_data;
}
-unsigned int MainLoopHandler::AddTimeout(int interval, const mainLoopCB &cb, MainLoopData *data)
+unsigned int GlibMainLoop::AddTimeout(int interval, const mainLoopCB &cb, MainLoopData *data)
{
MainLoopCbData *cb_data = new MainLoopCbData();
GMainContext *ctx = g_main_loop_get_context(loop);
cb_data->data = data;
GSource *source = g_timeout_source_new(interval);
- g_source_set_callback(source, IdlerHandler, cb_data, DestroyNotify);
+ g_source_set_callback(source, CallbackHandler, cb_data, DestroyNotify);
unsigned int id = g_source_attach(source, cb_data->ctx);
g_source_unref(source);
return id;
}
-void MainLoopHandler::RemoveTimeout(unsigned int id)
+void GlibMainLoop::RemoveTimeout(unsigned int id)
{
GSource *source;
source = g_main_context_find_source_by_id(g_main_loop_get_context(loop), id);
g_source_destroy(source);
}
-void MainLoopHandler::AddIdle(MainLoopHandler *handle, const mainLoopCB &cb,
- MainLoopData *user_data)
-{
- RET_IF(handle == nullptr);
-
- MainLoopCbData *cb_data = new MainLoopCbData();
- cb_data->cb = cb;
- cb_data->data = user_data;
- cb_data->ctx = g_main_loop_get_context(handle->loop);
-
- AddIdle(cb_data, DestroyNotify);
-}
-
-void MainLoopHandler::AddIdle(MainLoopCbData *cb_data, GDestroyNotify destroy)
-{
- RET_IF(cb_data->ctx == nullptr);
-
- GSource *source = g_idle_source_new();
- g_source_set_priority(source, G_PRIORITY_HIGH);
- g_source_set_callback(source, IdlerHandler, cb_data, destroy);
- g_source_attach(source, cb_data->ctx);
- g_source_unref(source);
-}
-
-gboolean MainLoopHandler::IdlerHandler(gpointer user_data)
+gboolean GlibMainLoop::CallbackHandler(gpointer user_data)
{
RETV_IF(user_data == nullptr, FALSE);
MainLoopCbData *cb_data = static_cast<MainLoopCbData *>(user_data);
- cb_data->cb(cb_data->result, cb_data->fd, cb_data->data);
-
- return FALSE;
+ return cb_data->cb(cb_data->result, cb_data->fd, cb_data->data);
}
-gboolean MainLoopHandler::EventHandler(GIOChannel *src, GIOCondition condition, gpointer user_data)
+gboolean GlibMainLoop::EventHandler(GIOChannel *src, GIOCondition condition, gpointer user_data)
{
RETV_IF(user_data == nullptr, FALSE);
ret = FALSE;
}
- cb_data->cb(cb_data->result, cb_data->fd, cb_data->data);
+ ret &= cb_data->cb(cb_data->result, cb_data->fd, cb_data->data);
return ret;
}
-void MainLoopHandler::DestroyNotify(gpointer data)
+void GlibMainLoop::DestroyNotify(gpointer data)
{
MainLoopCbData *cb_data = static_cast<MainLoopCbData *>(data);
delete cb_data;
}
-MainLoopHandler::MainLoopCbData::MainLoopCbData() : data(nullptr), result(OK), fd(-1), ctx(nullptr)
+GlibMainLoop::MainLoopCbData::MainLoopCbData() : data(nullptr), result(OK), fd(-1), ctx(nullptr)
{
}
*/
#pragma once
-#include <AittTypes.h>
#include <glib.h>
-#include <functional>
#include <map>
#include <mutex>
+#include "MainLoopIface.h"
+
namespace aitt {
-class MainLoopHandler {
+class GlibMainLoop : public MainLoopIface {
public:
- enum MainLoopResult {
- OK,
- ERROR,
- REMOVED,
- HANGUP,
- };
- struct MainLoopData {
- virtual ~MainLoopData() = default;
- };
- using mainLoopCB = std::function<void(MainLoopResult result, int fd, MainLoopData *data)>;
-
- MainLoopHandler();
- ~MainLoopHandler();
-
- static void AddIdle(MainLoopHandler *handle, const mainLoopCB &cb, MainLoopData *user_data);
+ GlibMainLoop();
+ ~GlibMainLoop();
- void Run();
- bool Quit();
- void AddWatch(int fd, const mainLoopCB &cb, MainLoopData *user_data);
- MainLoopData *RemoveWatch(int fd);
- unsigned int AddTimeout(int interval, const mainLoopCB &cb, MainLoopData *user_data);
- void RemoveTimeout(unsigned int id);
+ void Run() override;
+ bool Quit() override;
+ void AddIdle(const mainLoopCB &cb, MainLoopData *user_data) override;
+ void AddWatch(int fd, const mainLoopCB &cb, MainLoopData *user_data) override;
+ MainLoopData *RemoveWatch(int fd) override;
+ unsigned int AddTimeout(int interval, const mainLoopCB &cb, MainLoopData *user_data) override;
+ void RemoveTimeout(unsigned int id) override;
private:
struct MainLoopCbData {
};
using CallbackMap = std::map<int, std::pair<GSource *, MainLoopCbData *>>;
- static void AddIdle(MainLoopCbData *, GDestroyNotify);
- static gboolean IdlerHandler(gpointer user_data);
+ static gboolean CallbackHandler(gpointer user_data);
static gboolean EventHandler(GIOChannel *src, GIOCondition cond, gpointer user_data);
static void DestroyNotify(gpointer data);
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MainLoopHandler.h"
+
+#ifdef USE_GLIB
+#include "GlibMainLoop.h"
+#else
+#include "PosixMainLoop.h"
+#endif
+#include "aitt_internal.h"
+
+namespace aitt {
+
+MainLoopHandler::MainLoopHandler()
+ :
+#ifdef USE_GLIB
+ loop(new GlibMainLoop())
+#else
+ loop(new PosixMainLoop())
+#endif
+{
+}
+
+void MainLoopHandler::AddIdle(MainLoopHandler *handle, const mainLoopCB &cb,
+ MainLoopData *user_data)
+{
+ RET_IF(handle == nullptr);
+
+ handle->loop->AddIdle(cb, user_data);
+}
+
+void MainLoopHandler::Run()
+{
+ return loop->Run();
+}
+
+bool MainLoopHandler::Quit()
+{
+ return loop->Quit();
+}
+
+void MainLoopHandler::AddWatch(int fd, const mainLoopCB &cb, MainLoopData *user_data)
+{
+ return loop->AddWatch(fd, cb, user_data);
+}
+
+void MainLoopHandler::AddIdle(const mainLoopCB &cb, MainLoopData *user_data)
+{
+ return loop->AddIdle(cb, user_data);
+}
+
+MainLoopIface::MainLoopData *MainLoopHandler::RemoveWatch(int fd)
+{
+ return loop->RemoveWatch(fd);
+}
+
+unsigned int MainLoopHandler::AddTimeout(int interval, const mainLoopCB &cb,
+ MainLoopData *user_data)
+{
+ return loop->AddTimeout(interval, cb, user_data);
+}
+
+void MainLoopHandler::RemoveTimeout(unsigned int id)
+{
+ return loop->RemoveTimeout(id);
+}
+
+} // namespace aitt
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <memory>
+
+#include "MainLoopIface.h"
+
+namespace aitt {
+
+class MainLoopHandler : public MainLoopIface {
+ public:
+ MainLoopHandler();
+ virtual ~MainLoopHandler() = default;
+
+ static void AddIdle(MainLoopHandler *handle, const mainLoopCB &cb,
+ MainLoopData *user_data = nullptr);
+
+ void Run() override;
+ bool Quit() override;
+ void AddIdle(const mainLoopCB &cb, MainLoopData *user_data = nullptr) override;
+ void AddWatch(int fd, const mainLoopCB &cb, MainLoopData *user_data = nullptr) override;
+ MainLoopData *RemoveWatch(int fd) override;
+ unsigned int AddTimeout(int interval, const mainLoopCB &cb,
+ MainLoopData *user_data = nullptr) override;
+ void RemoveTimeout(unsigned int id) override;
+
+ private:
+ std::unique_ptr<MainLoopIface> loop;
+};
+
+} // namespace aitt
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <functional>
+#include <map>
+#include <mutex>
+
+#define AITT_LOOP_EVENT_REMOVE 0
+#define AITT_LOOP_EVENT_CONTINUE 1
+
+namespace aitt {
+
+class MainLoopIface {
+ public:
+ enum MainLoopResult {
+ OK,
+ ERROR,
+ REMOVED,
+ HANGUP,
+ };
+ struct MainLoopData {
+ virtual ~MainLoopData() = default;
+ };
+ using mainLoopCB = std::function<int(MainLoopResult result, int fd, MainLoopData *data)>;
+
+ MainLoopIface() = default;
+ virtual ~MainLoopIface() = default;
+
+ virtual void Run() = 0;
+ virtual bool Quit() = 0;
+ virtual void AddIdle(const mainLoopCB &cb, MainLoopData *user_data) = 0;
+ virtual void AddWatch(int fd, const mainLoopCB &cb, MainLoopData *user_data) = 0;
+ virtual MainLoopData *RemoveWatch(int fd) = 0;
+ virtual unsigned int AddTimeout(int interval, const mainLoopCB &cb,
+ MainLoopData *user_data) = 0;
+ virtual void RemoveTimeout(unsigned int id) = 0;
+};
+
+} // namespace aitt
--- /dev/null
+/*
+ * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PosixMainLoop.h"
+
+#include <fcntl.h>
+#include <poll.h>
+#include <signal.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+
+#include "aitt_internal.h"
+
+namespace aitt {
+
+PosixMainLoop::PosixMainLoop() : timeout_pipe{}, idle_pipe{}, is_running(false)
+{
+ if (pipe(idle_pipe) == -1 || pipe(timeout_pipe) == -1) {
+ ERR("pipe() Fail(%d)", errno);
+ throw std::runtime_error("PosixMainLoop() Fail");
+ }
+ if (fcntl(idle_pipe[0], F_SETFL, O_NONBLOCK) == -1
+ || fcntl(timeout_pipe[0], F_SETFL, O_NONBLOCK) == -1) {
+ ERR("fcntl(O_NONBLOCK) Fail(%d)", errno);
+ throw std::runtime_error("PosixMainLoop() Fail");
+ }
+
+ struct sigaction sa;
+ sa.sa_flags = SA_SIGINFO;
+ sa.sa_sigaction = TimerHandler;
+ sigemptyset(&sa.sa_mask);
+ if (sigaction(SIGUSR1, &sa, NULL) == -1) {
+ ERR("sigaction() Fail(%d)", errno);
+ throw std::runtime_error("PosixMainLoop() Fail");
+ }
+}
+
+PosixMainLoop::~PosixMainLoop()
+{
+ if (is_running)
+ Quit();
+
+ std::lock_guard<std::mutex> lock(table_lock);
+ for (const auto &pair : timeout_table)
+ delete pair.second;
+
+ for (const auto &pair : watch_table)
+ delete pair.second;
+
+ for (const auto &data : idle_table)
+ delete data;
+
+ timeout_table.clear();
+ watch_table.clear();
+ idle_table.clear();
+
+ for (int iter = 0; iter < 2; iter++) {
+ close(timeout_pipe[iter]);
+ close(idle_pipe[iter]);
+ }
+}
+
+void PosixMainLoop::Run()
+{
+ is_running = true;
+
+ while (is_running) {
+ nfds_t nfds = 0;
+
+ table_lock.lock();
+ struct pollfd pfds[watch_table.size() + 2];
+
+ // for Watch
+ for (auto iter = watch_table.begin(); iter != watch_table.end(); iter++) {
+ pfds[nfds++] = {iter->second->fd, POLLIN | POLLHUP | POLLERR, 0};
+ }
+ table_lock.unlock();
+
+ // for idle
+ pfds[nfds++] = {idle_pipe[0], POLLIN, 0};
+ // for timeout
+ pfds[nfds++] = {timeout_pipe[0], POLLIN, 0};
+
+ if (0 < poll(pfds, nfds, -1)) {
+ bool handled = false;
+ handled |= CheckTimeout(pfds[nfds - 1], POLLIN);
+ handled |= CheckWatch(pfds, nfds - 2, POLLIN | POLLHUP | POLLERR);
+ if (!handled)
+ CheckIdle(pfds[nfds - 2], POLLIN);
+ }
+ if (false == idle_table.empty())
+ WriteToPipe(idle_pipe[1], IDLE);
+ }
+}
+
+bool PosixMainLoop::Quit()
+{
+ if (is_running == false) {
+ ERR("main loop is not running");
+ return false;
+ }
+
+ is_running = false;
+ WriteToPipe(timeout_pipe[1], INVALID);
+ return true;
+}
+
+void PosixMainLoop::AddIdle(const mainLoopCB &cb, MainLoopData *user_data)
+{
+ MainLoopCbData *cb_data = new MainLoopCbData();
+ cb_data->cb = cb;
+ cb_data->data = user_data;
+
+ std::lock_guard<std::mutex> lock(table_lock);
+ idle_table.push_back(cb_data);
+ WriteToPipe(idle_pipe[1], IDLE);
+}
+
+void PosixMainLoop::AddWatch(int fd, const mainLoopCB &cb, MainLoopData *user_data)
+{
+ MainLoopCbData *cb_data = new MainLoopCbData();
+ cb_data->cb = cb;
+ cb_data->data = user_data;
+ cb_data->fd = fd;
+
+ std::lock_guard<std::mutex> lock(table_lock);
+ watch_table.insert(WatchMap::value_type(cb_data->fd, cb_data));
+ WriteToPipe(idle_pipe[1], INVALID);
+}
+
+PosixMainLoop::MainLoopData *PosixMainLoop::RemoveWatch(int fd)
+{
+ std::lock_guard<std::mutex> lock(table_lock);
+ WatchMap::iterator iter = watch_table.find(fd);
+ if (iter == watch_table.end()) {
+ ERR("Unknown fd(%d)", fd);
+ return nullptr;
+ }
+ MainLoopData *user_data = iter->second->data;
+ delete iter->second;
+ watch_table.erase(iter);
+
+ return user_data;
+}
+
+unsigned int PosixMainLoop::AddTimeout(int interval, const mainLoopCB &cb, MainLoopData *data)
+{
+ static int identifier = TIMEOUT_START;
+
+ if (interval < 0) {
+ ERR("Invalid : interval(%d) < 0", interval);
+ return 0;
+ }
+
+ if (0 == SetTimer(interval, identifier)) {
+ MainLoopCbData *cb_data = new MainLoopCbData();
+ cb_data->cb = cb;
+ cb_data->data = data;
+ cb_data->timeout_interval = interval;
+ TimeoutTableInsert(identifier, cb_data);
+ } else {
+ ERR("SetTimer() Fail");
+ return 0;
+ }
+
+ return identifier++;
+}
+
+void PosixMainLoop::RemoveTimeout(unsigned int id)
+{
+ std::lock_guard<std::mutex> lock(table_lock);
+ TimeoutMap::iterator iter = timeout_table.find(id);
+ if (iter != timeout_table.end()) {
+ delete iter->second;
+ timeout_table.erase(iter);
+ }
+}
+
+void PosixMainLoop::WriteToPipe(int pipe_fd, unsigned int identifier)
+{
+ ssize_t ret = write(pipe_fd, (void *)&identifier, sizeof(identifier));
+ if (ret != sizeof(identifier)) {
+ ERR("write() Fail(%zd)", ret);
+ }
+}
+
+void PosixMainLoop::TimerHandler(int sig, siginfo_t *si, void *uc)
+{
+ TimeoutData *data = static_cast<TimeoutData *>(si->si_value.sival_ptr);
+ if (data == NULL) {
+ ERR("sival_ptr is NULL");
+ return;
+ }
+
+ WriteToPipe(data->pipe_fd, data->timeout_id);
+ delete data;
+}
+
+void PosixMainLoop::TimeoutTableInsert(unsigned int identifier, MainLoopCbData *cb_data)
+{
+ std::lock_guard<std::mutex> lock(table_lock);
+ timeout_table.insert(TimeoutMap::value_type(identifier, cb_data));
+}
+
+bool PosixMainLoop::CheckWatch(pollfd *pfds, nfds_t nfds, short int event)
+{
+ bool handled = false;
+ for (nfds_t idx = 0; idx < nfds; idx++) {
+ if (false == (pfds[idx].revents & event)) {
+ DBG("Unknown Event(%d)", pfds[idx].revents);
+ continue;
+ }
+
+ table_lock.lock();
+ auto iter = watch_table.find(pfds[idx].fd);
+ MainLoopCbData *cb_data = (watch_table.end() == iter) ? NULL : iter->second;
+ table_lock.unlock();
+
+ if (cb_data) {
+ if (pfds[idx].revents & (POLLHUP | POLLERR))
+ cb_data->result = (POLLHUP & pfds[idx].revents) ? HANGUP : ERROR;
+
+ int ret = cb_data->cb(cb_data->result, cb_data->fd, cb_data->data);
+ handled = true;
+
+ if (AITT_LOOP_EVENT_REMOVE == ret)
+ RemoveWatch(pfds[idx].fd);
+ }
+ }
+ return handled;
+}
+
+bool PosixMainLoop::CheckTimeout(pollfd pfd, short int event)
+{
+ if (false == (pfd.revents & event)) {
+ DBG("Unknown Event(%d)", pfd.revents);
+ return false;
+ }
+
+ bool handled = false;
+ unsigned int identifier = INVALID;
+ while (read(pfd.fd, &identifier, sizeof(identifier)) == sizeof(identifier)) {
+ if (identifier == INVALID) {
+ INFO("Terminating");
+ return true;
+ }
+
+ table_lock.lock();
+ TimeoutMap::iterator iter = timeout_table.find(identifier);
+ MainLoopCbData *cb_data = iter == timeout_table.end() ? NULL : iter->second;
+ table_lock.unlock();
+
+ if (cb_data) {
+ int ret = cb_data->cb(cb_data->result, cb_data->fd, cb_data->data);
+ handled = true;
+
+ if (AITT_LOOP_EVENT_REMOVE == ret)
+ RemoveTimeout(identifier);
+ else
+ SetTimer(cb_data->timeout_interval, identifier);
+ }
+ }
+ return handled;
+}
+
+void PosixMainLoop::CheckIdle(pollfd pfd, short int event)
+{
+ if (false == (pfd.revents & event)) {
+ DBG("Unknown Event(%d)", pfd.revents);
+ return;
+ }
+
+ unsigned int identifier = INVALID;
+ if (read(pfd.fd, &identifier, sizeof(identifier)) == sizeof(identifier)) {
+ if (identifier == IDLE) {
+ table_lock.lock();
+ MainLoopCbData *cb_data = idle_table.empty() ? NULL : idle_table.front();
+ table_lock.unlock();
+
+ if (cb_data) {
+ int ret = cb_data->cb(cb_data->result, cb_data->fd, cb_data->data);
+
+ if (AITT_LOOP_EVENT_REMOVE == ret) {
+ table_lock.lock();
+ idle_table.pop_front();
+ delete cb_data;
+ table_lock.unlock();
+ }
+ }
+ }
+ }
+}
+
+int PosixMainLoop::SetTimer(int interval, unsigned int timeout_id)
+{
+ struct sigevent se;
+ timer_t timer;
+ struct itimerspec its;
+
+ TimeoutData *data = new TimeoutData();
+ data->pipe_fd = timeout_pipe[1];
+ data->timeout_id = timeout_id;
+
+ se.sigev_notify = SIGEV_SIGNAL;
+ se.sigev_signo = SIGUSR1;
+ se.sigev_value.sival_ptr = static_cast<void *>(data);
+
+ long sec = interval / 1000;
+ long msec = interval % 1000;
+
+ memset(&its, 0, sizeof(its));
+ its.it_value.tv_sec = sec;
+ its.it_value.tv_nsec = msec * 1000 * 1000;
+
+ if (timer_create(CLOCK_MONOTONIC, &se, &timer) == -1) {
+ ERR("timer_create() Fail(%d)", errno);
+ return -1;
+ }
+ if (timer_settime(timer, 0, &its, NULL) == -1) {
+ ERR("timer_settime() Fail(%d)", errno);
+ return -1;
+ }
+
+ return 0;
+}
+
+PosixMainLoop::MainLoopCbData::MainLoopCbData()
+ : data(nullptr), result(OK), fd(IDLE), timeout_interval(0)
+{
+}
+
+} // namespace aitt
*/
#pragma once
-#include <fcntl.h>
+#include <poll.h>
#include <signal.h>
-#include <string.h>
-#include <sys/poll.h>
-#include <time.h>
-#include <unistd.h>
#include <deque>
-#include <functional>
#include <map>
#include <mutex>
-#include "aitt_internal.h"
-
-#define BUF_SIZE 10
+#include "MainLoopIface.h"
namespace aitt {
-class MainLoopHandler {
+class PosixMainLoop : public MainLoopIface {
public:
- enum MainLoopResult {
- OK,
- ERROR,
- REMOVED,
- HANGUP,
- };
+ PosixMainLoop();
+ virtual ~PosixMainLoop();
+
+ void Run() override;
+ bool Quit() override;
+ void AddIdle(const mainLoopCB &cb, MainLoopData *user_data) override;
+ void AddWatch(int fd, const mainLoopCB &cb, MainLoopData *user_data) override;
+ MainLoopData *RemoveWatch(int fd) override;
+ unsigned int AddTimeout(int interval, const mainLoopCB &cb, MainLoopData *user_data) override;
+ void RemoveTimeout(unsigned int id) override;
+ private:
enum PipeValue {
- INVALID = -3,
- TIMEOUT = -2,
- IDLE = -1,
+ INVALID = 0,
+ IDLE = 1,
+ TIMEOUT_START = 2,
};
struct TimeoutData {
int pipe_fd;
};
- struct MainLoopData {
- virtual ~MainLoopData() = default;
- };
- using mainLoopCB = std::function<void(MainLoopResult result, int fd, MainLoopData *data)>;
-
- MainLoopHandler();
- ~MainLoopHandler();
-
- static void AddIdle(MainLoopHandler *handle, const mainLoopCB &cb, MainLoopData *user_data);
- static void WriteToPipe(int pipe_fd, int identifier);
-
- void Run();
- bool Quit();
- void AddWatch(int fd, const mainLoopCB &cb, MainLoopData *user_data);
- MainLoopData *RemoveWatch(int fd);
- unsigned int AddTimeout(int interval, const mainLoopCB &cb, MainLoopData *user_data);
- void RemoveTimeout(unsigned int id);
-
- private:
struct MainLoopCbData {
MainLoopCbData();
mainLoopCB cb;
MainLoopData *data;
MainLoopResult result;
int fd;
- unsigned int timeout_id;
+ int timeout_interval;
};
- using WatchMap = std::multimap<int, MainLoopCbData *>;
+ using WatchMap = std::map<int, MainLoopCbData *>;
using TimeoutMap = std::map<unsigned int, MainLoopCbData *>;
using IdleQueue = std::deque<MainLoopCbData *>;
- void WatchTableInsert(int identifier, MainLoopCbData *cb_data);
+ static void WriteToPipe(int pipe_fd, unsigned int identifier);
+ static void TimerHandler(int sig, siginfo_t *si, void *uc);
+
void TimeoutTableInsert(unsigned int identifier, MainLoopCbData *cb_data);
- void IdleTableInsert(MainLoopCbData *cb_data);
- void CheckWatch(pollfd *pfds, nfds_t nfds, short int event);
- void CheckTimeout(pollfd pfd, short int event);
+ bool CheckWatch(pollfd *pfds, nfds_t nfds, short int event);
+ bool CheckTimeout(pollfd pfd, short int event);
void CheckIdle(pollfd pfd, short int event);
- void SetTimer(int interval, unsigned int timeout_id);
+ int SetTimer(int interval, unsigned int timeout_id);
WatchMap watch_table;
TimeoutMap timeout_table;
int idle_pipe[2];
bool is_running;
- bool is_idle;
};
} // namespace aitt
+++ /dev/null
-/*
- * Copyright (c) 2022 Samsung Electronics Co., Ltd All Rights Reserved
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "MainLoopHandler.h"
-
-namespace aitt {
-
-static void TimerHandler(int sig, siginfo_t *si, void *uc)
-{
- struct MainLoopHandler::TimeoutData *data =
- (MainLoopHandler::TimeoutData *)si->si_value.sival_ptr;
- if (data == NULL)
- ERR("sival_ptr is null");
- MainLoopHandler::WriteToPipe(data->pipe_fd, data->timeout_id);
- delete data;
-}
-
-MainLoopHandler::MainLoopHandler() : timeout_pipe{}, idle_pipe{}, is_running(false), is_idle(false)
-{
- if (pipe(idle_pipe) == -1 || pipe(timeout_pipe) == -1) {
- ERR("failed to create pipe");
- }
- if (fcntl(idle_pipe[0], F_SETFL, O_NONBLOCK) == -1
- || fcntl(timeout_pipe[0], F_SETFL, O_NONBLOCK) == -1) {
- ERR("failed to set NonBlock");
- }
-
- struct sigaction sa;
- sa.sa_flags = SA_SIGINFO;
- sa.sa_sigaction = TimerHandler;
- sigemptyset(&sa.sa_mask);
- if (sigaction(SIGUSR1, &sa, NULL) == -1)
- ERR("failed to invoke sigaction..");
-}
-
-MainLoopHandler::~MainLoopHandler()
-{
- if (is_running)
- Quit();
-
- std::lock_guard<std::mutex> lock(table_lock);
- for (auto iter = timeout_table.begin(); iter != timeout_table.end(); iter++) {
- delete iter->second;
- }
- for (auto iter = watch_table.begin(); iter != watch_table.end(); iter++) {
- delete iter->second;
- }
- for (auto iter = idle_table.begin(); iter != idle_table.end(); iter++) {
- delete *iter;
- }
- timeout_table.clear();
- watch_table.clear();
- idle_table.clear();
-
- for (int iter = 0; iter < 2; iter++) {
- close(timeout_pipe[iter]);
- close(idle_pipe[iter]);
- }
-}
-
-void MainLoopHandler::Run()
-{
- is_running = true;
-
- while (is_running) {
- table_lock.lock();
- is_idle = true;
- struct pollfd pfds[watch_table.size() + 2];
- nfds_t nfds = 0;
-
- // for Watch
- for (auto iter = watch_table.begin(); iter != watch_table.end(); iter++) {
- pfds[nfds++] = {iter->second->fd, POLLIN | POLLHUP | POLLERR, 0};
- }
- // for idle
- pfds[nfds++] = {idle_pipe[0], POLLIN, 0};
- // for timeout
- pfds[nfds++] = {timeout_pipe[0], POLLIN, 0};
- table_lock.unlock();
-
- if (0 < poll(pfds, nfds, -1)) {
- CheckTimeout(pfds[nfds - 1], POLLIN);
- CheckWatch(pfds, nfds - 2, POLLIN | POLLHUP | POLLERR);
- if (is_idle)
- CheckIdle(pfds[nfds - 2], POLLIN);
- }
- }
-}
-
-bool MainLoopHandler::Quit()
-{
- if (is_running == false) {
- ERR("main loop is not running");
- return false;
- }
-
- is_running = false;
- WriteToPipe(timeout_pipe[1], INVALID);
- return true;
-}
-
-void MainLoopHandler::AddWatch(int fd, const mainLoopCB &cb, MainLoopData *user_data)
-{
- MainLoopCbData *cb_data = new MainLoopCbData();
- cb_data->cb = cb;
- cb_data->data = user_data;
- cb_data->fd = fd;
-
- WatchTableInsert(cb_data->fd, cb_data);
-}
-
-MainLoopHandler::MainLoopData *MainLoopHandler::RemoveWatch(int fd)
-{
- MainLoopData *user_data = nullptr;
-
- std::lock_guard<std::mutex> lock(table_lock);
- WatchMap::iterator iter = watch_table.find(fd);
- if (iter == watch_table.end())
- return user_data;
- user_data = iter->second->data;
- watch_table.erase(iter);
-
- return user_data;
-}
-
-unsigned int MainLoopHandler::AddTimeout(int interval, const mainLoopCB &cb, MainLoopData *data)
-{
- if (interval < 0)
- ERR("interval must be greater then or equal to zero");
- static unsigned int identifier = 1;
-
- MainLoopCbData *cb_data = new MainLoopCbData();
- cb_data->cb = cb;
- cb_data->data = data;
- cb_data->fd = PipeValue::TIMEOUT;
- cb_data->timeout_id = identifier++;
-
- TimeoutTableInsert(cb_data->timeout_id, cb_data);
- SetTimer(interval, cb_data->timeout_id);
- return cb_data->timeout_id;
-}
-
-void MainLoopHandler::RemoveTimeout(unsigned int id)
-{
- std::lock_guard<std::mutex> lock(table_lock);
- TimeoutMap::iterator iter = timeout_table.find(id);
- if (iter != timeout_table.end()) {
- delete iter->second;
- timeout_table.erase(iter);
- }
-}
-
-void MainLoopHandler::AddIdle(MainLoopHandler *handle, const mainLoopCB &cb,
- MainLoopData *user_data)
-{
- RET_IF(handle == nullptr);
-
- MainLoopCbData *cb_data = new MainLoopCbData();
- cb_data->cb = cb;
- cb_data->data = user_data;
-
- handle->IdleTableInsert(cb_data);
-}
-
-void MainLoopHandler::WriteToPipe(int pipe_fd, int identifier)
-{
- if (write(pipe_fd, (const void *)(&identifier), sizeof(int)) != sizeof(int)) {
- ERR("write fail");
- }
-}
-
-void MainLoopHandler::WatchTableInsert(int identifier, MainLoopCbData *cb_data)
-{
- std::lock_guard<std::mutex> lock(table_lock);
- watch_table.insert(WatchMap::value_type(identifier, cb_data));
- WriteToPipe(idle_pipe[1], INVALID);
-}
-
-void MainLoopHandler::TimeoutTableInsert(unsigned int identifier, MainLoopCbData *cb_data)
-{
- std::lock_guard<std::mutex> lock(table_lock);
- timeout_table.insert(TimeoutMap::value_type(identifier, cb_data));
-}
-
-void MainLoopHandler::IdleTableInsert(MainLoopCbData *cb_data)
-{
- std::lock_guard<std::mutex> lock(table_lock);
- idle_table.push_back(cb_data);
- WriteToPipe(idle_pipe[1], IDLE);
-}
-
-void MainLoopHandler::CheckWatch(pollfd *pfds, nfds_t nfds, short int event)
-{
- for (unsigned long int idx = 0; idx < nfds; idx++) {
- if (pfds[idx].revents & event) {
- table_lock.lock();
- WatchMap::iterator wt_map = watch_table.find(pfds[idx].fd);
- MainLoopCbData *cb_data = wt_map == watch_table.end() ? NULL : wt_map->second;
- table_lock.unlock();
-
- if (cb_data) {
- if (pfds[idx].revents & (POLLHUP | POLLERR)) {
- cb_data->result = (POLLHUP & pfds[idx].revents) ? HANGUP : ERROR;
- }
- cb_data->cb(cb_data->result, cb_data->fd, cb_data->data);
- is_idle = false;
- }
- }
- }
-}
-
-void MainLoopHandler::CheckTimeout(pollfd pfd, short int event)
-{
- if (pfd.revents & event) {
- int identifier = INVALID;
- while (read(pfd.fd, &identifier, sizeof(int)) == sizeof(int)) {
- if (identifier == INVALID)
- continue;
-
- table_lock.lock();
- TimeoutMap::iterator tm_map = timeout_table.find((unsigned int)(identifier));
- MainLoopCbData *cb_data = tm_map == timeout_table.end() ? NULL : tm_map->second;
- table_lock.unlock();
-
- if (cb_data) {
- cb_data->cb(cb_data->result, cb_data->fd, cb_data->data);
- is_idle = false;
-
- table_lock.lock();
- tm_map = timeout_table.find((unsigned int)(identifier));
- if (tm_map != timeout_table.end()) {
- delete tm_map->second;
- timeout_table.erase(tm_map);
- }
- table_lock.unlock();
- }
- }
- }
-}
-
-void MainLoopHandler::CheckIdle(pollfd pfd, short int event)
-{
- if (pfd.revents & event) {
- int identifier = INVALID;
- if (read(pfd.fd, &identifier, sizeof(int)) == sizeof(int)) {
- if (identifier == IDLE) {
- table_lock.lock();
- MainLoopCbData *cb_data = idle_table.empty() ? NULL : idle_table.front();
- table_lock.unlock();
-
- if (cb_data) {
- cb_data->cb(cb_data->result, cb_data->fd, cb_data->data);
-
- table_lock.lock();
- idle_table.pop_front();
- delete cb_data;
- table_lock.unlock();
- }
- }
- }
- }
-}
-
-void MainLoopHandler::SetTimer(int interval, unsigned int timeout_id)
-{
- struct sigevent se;
- timer_t timer;
- struct itimerspec its;
-
- struct TimeoutData *data = new TimeoutData();
- data->pipe_fd = timeout_pipe[1];
- data->timeout_id = timeout_id;
-
- se.sigev_notify = SIGEV_SIGNAL;
- se.sigev_signo = SIGUSR1;
- se.sigev_value.sival_ptr = (void *)(data);
-
- long sec = interval / 1000;
- long msec = interval % 1000;
-
- memset(&its, 0, sizeof(its));
- its.it_value.tv_sec = sec;
- its.it_value.tv_nsec = msec * 1000 * 1000;
-
- if (timer_create(CLOCK_MONOTONIC, &se, &timer) == -1)
- ERR("failed to create timer");
- if (timer_settime(timer, 0, &its, NULL) == -1)
- ERR("failed to set timer");
-}
-
-MainLoopHandler::MainLoopCbData::MainLoopCbData()
- : data(nullptr), result(OK), fd(IDLE), timeout_id(0)
-{
-}
-
-} // namespace aitt
INSTALL(TARGETS ${AITT_TCP} DESTINATION ${CMAKE_INSTALL_LIBDIR})
IF(BUILD_TESTING)
- ADD_SUBDIRECTORY(samples)
ADD_SUBDIRECTORY(tests)
+ IF(USE_GLIB)
+ ADD_SUBDIRECTORY(samples)
+ ENDIF(USE_GLIB)
ENDIF(BUILD_TESTING)
discovery.UpdateDiscoveryMsg(NAME[secure], buf.data(), buf.size());
}
-void Module::ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
+int Module::ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
MainLoopHandler::MainLoopData *user_data)
{
TCPData *tcp_data = dynamic_cast<TCPData *>(user_data);
- RET_IF(tcp_data == nullptr);
+ RETV_IF(tcp_data == nullptr, AITT_LOOP_EVENT_REMOVE);
TCPServerData *parent_info = tcp_data->parent;
- RET_IF(parent_info == nullptr);
+ RETV_IF(parent_info == nullptr, AITT_LOOP_EVENT_REMOVE);
Module *impl = parent_info->impl;
- RET_IF(impl == nullptr);
+ RETV_IF(impl == nullptr, AITT_LOOP_EVENT_REMOVE);
if (result == MainLoopHandler::HANGUP) {
ERR("The main loop hung up. Disconnect the client.");
topic = impl->GetTopicName(tcp_data);
if (topic.empty()) {
ERR("A topic is empty.");
- return;
+ return AITT_LOOP_EVENT_CONTINUE;
}
szmsg = tcp_data->client->RecvSizedData((void **)&msg);
} catch (std::exception &e) {
ERR("An exception(%s) occurs", e.what());
free(msg);
- return;
+ return AITT_LOOP_EVENT_CONTINUE;
}
std::string correlation;
parent_info->cb(topic, msg, szmsg, parent_info->cbdata, correlation);
free(msg);
+
+ return AITT_LOOP_EVENT_CONTINUE;
}
-void Module::HandleClientDisconnect(int handle)
+int Module::HandleClientDisconnect(int handle)
{
TCPData *tcp_data = dynamic_cast<TCPData *>(main_loop.RemoveWatch(handle));
if (tcp_data == nullptr) {
ERR("No watch data");
- return;
+ return AITT_LOOP_EVENT_REMOVE;
}
tcp_data->parent->client_lock.lock();
auto it = std::find(tcp_data->parent->client_list.begin(), tcp_data->parent->client_list.end(),
tcp_data->parent->client_lock.unlock();
delete tcp_data;
+ return AITT_LOOP_EVENT_REMOVE;
}
std::string Module::GetTopicName(Module::TCPData *tcp_data)
return topic;
}
-void Module::AcceptConnection(MainLoopHandler::MainLoopResult result, int handle,
+int Module::AcceptConnection(MainLoopHandler::MainLoopResult result, int handle,
MainLoopHandler::MainLoopData *user_data)
{
TCPServerData *listen_info = dynamic_cast<TCPServerData *>(user_data);
- RET_IF(listen_info == nullptr);
+ RETV_IF(listen_info == nullptr, AITT_LOOP_EVENT_REMOVE);
Module *impl = listen_info->impl;
- RET_IF(impl == nullptr);
+ RETV_IF(impl == nullptr, AITT_LOOP_EVENT_REMOVE);
std::unique_ptr<TCP> client;
{
auto clientIt = impl->subscribeTable.find(listen_info->topic);
if (clientIt == impl->subscribeTable.end())
- return;
+ return AITT_LOOP_EVENT_REMOVE;
client = clientIt->second->AcceptPeer();
}
if (client == nullptr) {
ERR("Unable to accept a peer"); // NOTE: FATAL ERROR
- return;
+ return AITT_LOOP_EVENT_CONTINUE;
}
int client_handle = client->GetHandle();
ecd->client = std::move(client);
impl->main_loop.AddWatch(client_handle, ReceiveData, ecd);
+ return AITT_LOOP_EVENT_CONTINUE;
}
void Module::UpdatePublishTable(const std::string &topic, const std::string &clientId,
using HostMap = std::map<std::string /* clientId */, PortMap>;
using PublishMap = std::map<std::string /* topic */, HostMap>;
- static void AcceptConnection(MainLoopHandler::MainLoopResult result, int handle,
+ static int AcceptConnection(MainLoopHandler::MainLoopResult result, int handle,
MainLoopHandler::MainLoopData *watchData);
void DiscoveryMessageCallback(const std::string &clientId, const std::string &status,
const void *msg, const int szmsg);
void UpdateDiscoveryMsg();
- static void ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
+ static int ReceiveData(MainLoopHandler::MainLoopResult result, int handle,
MainLoopHandler::MainLoopData *watchData);
- void HandleClientDisconnect(int handle);
+ int HandleClientDisconnect(int handle);
std::string GetTopicName(TCPData *connect_info);
void ThreadMain(void);
void UpdatePublishTable(const std::string &topic, const std::string &host,
-PKG_CHECK_MODULES(SAMPLE_NEEDS REQUIRED glib-2.0 ${TIZEN_LOG_PKG})
+PKG_CHECK_MODULES(SAMPLE_NEEDS REQUIRED ${ADDITION_PKG})
INCLUDE_DIRECTORIES(${SAMPLE_NEEDS_INCLUDE_DIRS})
LINK_DIRECTORIES(${SAMPLE_NEEDS_LIBRARY_DIRS})
-PKG_CHECK_MODULES(UT_NEEDS REQUIRED gmock_main ${TIZEN_LOG_PKG})
+PKG_CHECK_MODULES(UT_NEEDS REQUIRED gmock_main ${ADDITION_PKG})
INCLUDE_DIRECTORIES(${UT_NEEDS_INCLUDE_DIRS})
LINK_DIRECTORIES(${UT_NEEDS_LIBRARY_DIRS})
* limitations under the License.
*/
#include <assert.h>
+#include <string.h>
#include <string>
}
}
-void AITT::Impl::ConnectionCB(ConnectionCallback cb, void *user_data, int status,
+int AITT::Impl::ConnectionCB(ConnectionCallback cb, void *user_data, int status,
MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *loop_data)
{
- RET_IF(cb == nullptr);
+ RETV_IF(cb == nullptr, AITT_LOOP_EVENT_REMOVE);
cb(public_api, status, user_data);
+
+ return AITT_LOOP_EVENT_REMOVE;
}
void AITT::Impl::Connect(const std::string &host, int port, const std::string &username,
user_data, qos);
}
-void AITT::Impl::DetachedCB(SubscribeCallback cb, MSG msg, void *data, const int datalen,
+int AITT::Impl::DetachedCB(SubscribeCallback cb, MSG msg, void *data, const int datalen,
void *user_data, MainLoopHandler::MainLoopResult result, int fd,
MainLoopHandler::MainLoopData *loop_data)
{
- RET_IF(cb == nullptr);
+ RETV_IF(cb == nullptr, AITT_LOOP_EVENT_REMOVE);
cb(&msg, data, datalen, user_data);
free(data);
+ return AITT_LOOP_EVENT_REMOVE;
}
void *AITT::Impl::Unsubscribe(AittSubscribeID subscribe_id)
timeout_id = sync_loop.AddTimeout(
timeout_ms,
[&, timeout_ms](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
+ MainLoopHandler::MainLoopData *data) -> int {
ERR("PublishWithReplySync() timeout(%d)", timeout_ms);
sync_loop.Quit();
is_timeout = true;
+ return AITT_LOOP_EVENT_REMOVE;
},
nullptr);
}
using Blob = std::pair<const void *, int>;
using SubscribeInfo = std::pair<AittProtocol, void *>;
- void ConnectionCB(ConnectionCallback cb, void *user_data, int status,
+ int ConnectionCB(ConnectionCallback cb, void *user_data, int status,
MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *loop_data);
AittSubscribeID SubscribeMQ(SubscribeInfo *info, MainLoopHandler *loop_handle,
const std::string &topic, const SubscribeCallback &cb, void *cbdata, AittQoS qos);
- void DetachedCB(SubscribeCallback cb, MSG mq_msg, void *data, const int datalen, void *cbdata,
+ int DetachedCB(SubscribeCallback cb, MSG mq_msg, void *data, const int datalen, void *cbdata,
MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *loop_data);
void *SubscribeTCP(SubscribeInfo *, const std::string &topic, const SubscribeCallback &cb,
void *cbdata, AittQoS qos);
aitt.Publish("test/value2", dump_msg, 1600, protocol);
aitt.Publish("test/value3", dump_msg, 1600, protocol);
- mainLoop.AddTimeout(
- 100,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
- },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
+ });
IterateEventLoop();
ASSERT_TRUE(ready);
aitt.Publish("test/step2/value1", dump_msg, 1600, AITT_TYPE_TCP);
aitt.Publish("test/step2/value1", dump_msg, 1600, AITT_TYPE_TCP);
- mainLoop.AddTimeout(
- 100,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
- },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
+ });
IterateEventLoop();
ASSERT_TRUE(ready);
sleep(1);
kill(pid, SIGKILL);
mainLoop.AddTimeout(
- 10,
+ CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
},
nullptr);
IterateEventLoop();
aitt1.Publish(TEST_STRESS_TOPIC, dump_msg, sizeof(dump_msg), protocol,
AITT_QOS_AT_MOST_ONCE);
}
- mainLoop.AddTimeout(
- 100,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
- },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
+ });
IterateEventLoop();
}
aitt_retry.Publish(TEST_STRESS_TOPIC, dump_msg, sizeof(dump_msg), protocol,
AITT_QOS_AT_MOST_ONCE);
- mainLoop.AddTimeout(
- 100,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
- },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
+ });
IterateEventLoop();
// Publish message through the specified protocol - TCP
aitt.Publish(testTopic, TEST_MSG2, sizeof(TEST_MSG2), protocol);
- mainLoop.AddTimeout(
- 100,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
- },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
+ });
IterateEventLoop();
aitt.Publish(testTopic, TEST_MSG2, sizeof(TEST_MSG2), protocol);
- mainLoop.AddTimeout(
- 100,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
- },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
+ });
IterateEventLoop();
this);
aitt.Connect();
- mainLoop.AddTimeout(
- 100,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
- },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
+ });
IterateEventLoop();
ASSERT_TRUE(ready);
this);
aitt.Connect();
- mainLoop.AddTimeout(
- 100,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
- },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
+ });
IterateEventLoop();
sleep(1);
DBG("Publish message to %s (%s)", testTopic.c_str(), TEST_MSG);
aitt.Publish(testTopic, TEST_MSG, sizeof(TEST_MSG));
- mainLoop.AddTimeout(
- 100,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
- },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
+ });
IterateEventLoop();
cbdata);
DBG("Ready flag is toggled");
- mainLoop.AddTimeout(
- 10,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
+ MainLoopHandler::MainLoopData *data) -> int {
AITTTest *test = static_cast<AITTTest *>(cbdata);
test->ToggleReady();
- },
- nullptr);
+ return AITT_LOOP_EVENT_REMOVE;
+ });
},
static_cast<void *>(this));
DBG("Publish message to %s (%s)", testTopic.c_str(), TEST_MSG);
aitt.Publish(testTopic, TEST_MSG, sizeof(TEST_MSG));
- mainLoop.AddTimeout(
- 100,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
- },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
+ });
IterateEventLoop();
aitt.Publish(testTopic, TEST_MSG, sizeof(TEST_MSG),
(AittProtocol)(AITT_TYPE_MQTT | AITT_TYPE_TCP));
- mainLoop.AddTimeout(
- 100,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
- },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
+ });
IterateEventLoop();
},
nullptr);
- main_loop.AddTimeout(
- 3000,
+ main_loop.AddTimeout(3000,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) { subscriber->Start(); },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ subscriber->Start();
+ return AITT_LOOP_EVENT_REMOVE;
+ });
main_loop.Run();
} catch (std::exception &e) {
FAIL() << "Unexpected exception: " << e.what();
#define TEST_MSG2 "This message is going to be delivered through a specified AittProtocol"
#define SLEEP_MS 1000
+#define CHECK_INTERVAL 10
+
using aitt::MainLoopHandler;
class AittTests {
void ToggleReady() { ready = true; }
void ToggleReady2() { ready2 = true; }
- void ReadyCheck(void *data)
+ int ReadyCheck(void *data)
{
AittTests *test = static_cast<AittTests *>(data);
if (test->ready) {
test->StopEventLoop();
+ return AITT_LOOP_EVENT_REMOVE;
}
+
+ return AITT_LOOP_EVENT_CONTINUE;
}
void StopEventLoop(void) { mainLoop.Quit(); }
#include "aitt_internal.h"
+using aitt::MainLoopHandler;
+
class MainLoopTest : public testing::Test {
protected:
void SetUp() override
struct sockaddr_un addr;
std::thread my_thread;
+ protected:
+ int CheckCount(MainLoopHandler &handler, int &count)
+ {
+ switch (count) {
+ case 0:
+ count++;
+ return AITT_LOOP_EVENT_CONTINUE;
+ case 1:
+ std::thread([&]() {
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ handler.Quit();
+ }).detach();
+ return AITT_LOOP_EVENT_REMOVE;
+ default:
+ ADD_FAILURE() << "Should not be called";
+ return AITT_LOOP_EVENT_REMOVE;
+ }
+ }
+
private:
void eventWriter()
{
ret = connect(fd, (struct sockaddr *)&addr, sizeof(struct sockaddr_un));
ASSERT_NE(ret, -1);
- std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ std::this_thread::sleep_for(std::chrono::milliseconds(200));
ret = write(fd, "1", 1);
ASSERT_NE(ret, -1);
- std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ std::this_thread::sleep_for(std::chrono::milliseconds(200));
close(fd);
}
};
-using aitt::MainLoopHandler;
-
TEST_F(MainLoopTest, Normal_Anytime)
{
MainLoopHandler handler;
handler.AddWatch(
server_fd,
- [&](MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *data) {
+ [&](MainLoopHandler::MainLoopResult result, int fd,
+ MainLoopHandler::MainLoopData *data) -> int {
int client_fd = accept(server_fd, 0, 0);
EXPECT_NE(client_fd, -1);
handler.AddWatch(
client_fd,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
+ MainLoopHandler::MainLoopData *data) -> int {
EXPECT_EQ(result, MainLoopHandler::OK);
char buf[2] = {0};
EXPECT_EQ(read(fd, buf, 1), 1);
EXPECT_STREQ(buf, "1");
handler.Quit();
ret = true;
+ return AITT_LOOP_EVENT_REMOVE;
},
nullptr);
+ return AITT_LOOP_EVENT_REMOVE;
},
nullptr);
handler.Run();
handler.AddWatch(
server_fd,
- [&](MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *data) {
+ [&](MainLoopHandler::MainLoopResult result, int fd,
+ MainLoopHandler::MainLoopData *data) -> int {
int client_fd = accept(server_fd, 0, 0);
EXPECT_NE(client_fd, -1);
handler.AddWatch(
client_fd,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
+ MainLoopHandler::MainLoopData *data) -> int {
if (result == MainLoopHandler::OK) {
char buf[2] = {0};
EXPECT_EQ(read(fd, buf, 1), 1);
EXPECT_STREQ(buf, "1");
- return;
+ return AITT_LOOP_EVENT_CONTINUE;
}
EXPECT_EQ(result, MainLoopHandler::HANGUP);
handler.Quit();
ret = true;
+ return AITT_LOOP_EVENT_REMOVE;
},
nullptr);
+ return AITT_LOOP_EVENT_REMOVE;
},
nullptr);
handler.AddWatch(
server_fd,
- [&](MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *data) {
- FAIL() << "It's removed";
+ [&](MainLoopHandler::MainLoopResult result, int fd,
+ MainLoopHandler::MainLoopData *data) -> int {
+ ADD_FAILURE() << "It's removed";
+ return AITT_LOOP_EVENT_REMOVE;
},
&test_data);
MainLoopHandler::MainLoopData *check_data = handler.RemoveWatch(server_fd);
handler.AddWatch(
server_fd,
- [&](MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *data) {
+ [&](MainLoopHandler::MainLoopResult result, int fd,
+ MainLoopHandler::MainLoopData *data) -> int {
EXPECT_EQ(data, &test_data);
handler.Quit();
ret = true;
+ return AITT_LOOP_EVENT_REMOVE;
},
&test_data);
handler.AddIdle(
&handler,
- [&](MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *data) {
+ [&](MainLoopHandler::MainLoopResult result, int fd,
+ MainLoopHandler::MainLoopData *data) -> int {
EXPECT_EQ(data, &test_data);
handler.Quit();
ret = true;
+ return AITT_LOOP_EVENT_REMOVE;
},
&test_data);
TEST_F(MainLoopTest, AddTimeout_Anytime)
{
bool ret = false;
- int interval = 1000;
+ int interval = 100;
MainLoopHandler handler;
struct timespec ts_start, ts_end;
MainLoopHandler::MainLoopData test_data;
handler.AddTimeout(
interval,
- [&](MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *data) {
+ [&](MainLoopHandler::MainLoopResult result, int fd,
+ MainLoopHandler::MainLoopData *data) -> int {
EXPECT_EQ(data, &test_data);
clock_gettime(CLOCK_MONOTONIC, &ts_end);
double diff = 1000.0 * ts_end.tv_sec + 1e-6 * ts_end.tv_nsec
EXPECT_GE(diff, interval);
handler.Quit();
ret = true;
+ return AITT_LOOP_EVENT_REMOVE;
},
&test_data);
EXPECT_TRUE(ret);
}
+
+TEST_F(MainLoopTest, Watch_Event_CB_Return_P_Anytime)
+{
+ MainLoopHandler handler;
+
+ handler.AddWatch(server_fd,
+ [&](MainLoopHandler::MainLoopResult result, int fd,
+ MainLoopHandler::MainLoopData *data) -> int {
+ int client_fd = accept(server_fd, 0, 0);
+ EXPECT_NE(client_fd, -1);
+ handler.AddWatch(
+ client_fd,
+ [&](MainLoopHandler::MainLoopResult result, int fd,
+ MainLoopHandler::MainLoopData *data) -> int {
+ static int count = 0;
+ EXPECT_EQ(result, MainLoopHandler::OK);
+ return CheckCount(handler, count);
+ },
+ nullptr);
+ return AITT_LOOP_EVENT_REMOVE;
+ });
+ handler.Run();
+}
+
+TEST_F(MainLoopTest, IDLE_CB_Return_P_Anytime)
+{
+ MainLoopHandler handler;
+
+ handler.AddIdle([&](MainLoopHandler::MainLoopResult result, int fd,
+ MainLoopHandler::MainLoopData *data) -> int {
+ static int count = 0;
+ EXPECT_EQ(result, MainLoopHandler::OK);
+ return CheckCount(handler, count);
+ });
+
+ handler.Run();
+}
+
+TEST_F(MainLoopTest, Timeout_CB_Return_P_Anytime)
+{
+ MainLoopHandler handler;
+
+ handler.AddTimeout(200,
+ [&](MainLoopHandler::MainLoopResult result, int fd,
+ MainLoopHandler::MainLoopData *data) -> int {
+ static int count = 0;
+ EXPECT_EQ(result, MainLoopHandler::OK);
+ return CheckCount(handler, count);
+ });
+
+ handler.Run();
+}
[](aitt::MSG *handle, const std::string &topic, const void *msg,
const int szmsg, void *cbdata) {},
user_data);
- mainLoop.AddTimeout(
- 10,
- [&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- MQTest *test = static_cast<MQTest *>(user_data);
- test->ToggleReady();
- },
- nullptr);
+
+ MQTest *test = static_cast<MQTest *>(user_data);
+ test->ToggleReady();
},
static_cast<void *>(this));
mq.Publish("MQ_TEST_TOPIC1", TEST_MSG, sizeof(TEST_MSG));
mainLoop.AddTimeout(
- 100,
+ CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
},
nullptr);
}
}
- mainLoop.AddTimeout(
- 100,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
- },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
+ });
IterateEventLoop();
aitt.Disconnect();
bool sub_ok, reply1_ok, reply2_ok;
sub_ok = reply1_ok = reply2_ok = false;
+ bool connect1 = false;
+
AITT sub_aitt(clientId + "sub", LOCAL_IP, AittOption(true, false));
sub_aitt.SetConnectionCallback([&](AITT &handle, int status, void *user_data) {
if (status != AITT_CONNECTED)
sub_aitt.SendReply(msg, reply.c_str(), reply.size());
sub_ok = true;
});
+ connect1 = true;
});
sub_aitt.Connect();
if (status != AITT_CONNECTED)
return;
+ usleep(CHECK_INTERVAL * SLEEP_MS);
+ while (!connect1) {
+ usleep(SLEEP_MS);
+ }
using namespace std::placeholders;
auto replyCB = std::bind(&AITTRRTest::PublishSyncInCallback, GetHandle(), &handle,
&reply1_ok, &reply2_ok, _1, _2, _3, _4);
this);
aitt.Connect();
- mainLoop.AddTimeout(
- 150,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
- },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
+ });
IterateEventLoop();
aitt.Disconnect();
std::placeholders::_4),
nullptr, correlation);
- mainLoop.AddTimeout(
- 100,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
- },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
+ });
IterateEventLoop();
EXPECT_TRUE(sub_ok);
},
nullptr, correlation);
- mainLoop.AddTimeout(
- 100,
+ mainLoop.AddTimeout(CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
- ReadyCheck(static_cast<AittTests *>(this));
- },
- nullptr);
+ MainLoopHandler::MainLoopData *data) -> int {
+ return ReadyCheck(static_cast<AittTests *>(this));
+ });
IterateEventLoop();
EXPECT_TRUE(sub_ok);
sleep(1);
kill(pid, SIGKILL);
handler.AddTimeout(
- 10,
+ CHECK_INTERVAL,
[&](MainLoopHandler::MainLoopResult result, int fd,
- MainLoopHandler::MainLoopData *data) {
+ MainLoopHandler::MainLoopData *data) -> int {
if (sub_called) {
handler.Quit();
}
+ return AITT_LOOP_EVENT_REMOVE;
},
nullptr);
ASSERT_EQ(ret, AITT_ERROR_NONE);
mainLoop.AddTimeout(
- 1000,
- [&](MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *data) {
+ 200,
+ [&](MainLoopHandler::MainLoopResult result, int fd,
+ MainLoopHandler::MainLoopData *data) -> int {
int ret = aitt_unsubscribe(handle, sub_handle);
EXPECT_EQ(ret, AITT_ERROR_NONE);
sub_handle = nullptr;
ret = aitt_publish(handle, TEST_C_TOPIC, TEST_C_MSG, strlen(TEST_C_MSG));
EXPECT_EQ(ret, AITT_ERROR_NONE);
+ return AITT_LOOP_EVENT_REMOVE;
},
nullptr);
mainLoop.AddTimeout(
- 2000,
- [&](MainLoopHandler::MainLoopResult result, int fd, MainLoopHandler::MainLoopData *data) {
- EXPECT_EQ(sub_call_count, 1);
+ CHECK_INTERVAL,
+ [&](MainLoopHandler::MainLoopResult result, int fd,
+ MainLoopHandler::MainLoopData *data) -> int {
if (sub_call_count == 1) {
StopEventLoop();
+ return AITT_LOOP_EVENT_REMOVE;
}
+ return AITT_LOOP_EVENT_CONTINUE;
},
nullptr);
EXPECT_EQ(ret, AITT_ERROR_NONE);
aitt_destroy(handle);
+ EXPECT_EQ(sub_call_count, 1);
}
TEST_F(AITTCTest, connect_cb_P_Anytime)