From ac0a215826052e4c0e1e88b446579aea841206ed Mon Sep 17 00:00:00 2001
From: Piotr Bartosiewicz
Date: Fri, 5 Dec 2014 15:57:49 +0100
Subject: [PATCH 01/16] Fix ipc threading issues
[Bug/Feature] N/A
[Cause] N/A
[Solution] N/A
[Verification] Build, install, run tests
Change-Id: I0f403bdb9dd535186a7c1fa10a486da265858ad7
---
common/ipc/internals/processor.cpp | 6 ++---
common/ipc/internals/processor.hpp | 2 +-
tests/unit_tests/ipc/ut-ipc.cpp | 48 ++++++++++++++++++++------------------
3 files changed, 29 insertions(+), 27 deletions(-)
diff --git a/common/ipc/internals/processor.cpp b/common/ipc/internals/processor.cpp
index 5565124..b134b08 100644
--- a/common/ipc/internals/processor.cpp
+++ b/common/ipc/internals/processor.cpp
@@ -325,18 +325,18 @@ bool Processor::handleInput(const PeerID peerID, const Socket& socket)
if (mMethodsCallbacks.count(methodID)) {
// Method
std::shared_ptr methodCallbacks = mMethodsCallbacks.at(methodID);
- mCallsMutex.unlock();
+ lock.unlock();
return onRemoteCall(peerID, socket, methodID, messageID, methodCallbacks);
} else if (mSignalsCallbacks.count(methodID)) {
// Signal
std::shared_ptr signalCallbacks = mSignalsCallbacks.at(methodID);
- mCallsMutex.unlock();
+ lock.unlock();
return onRemoteSignal(peerID, socket, methodID, messageID, signalCallbacks);
} else {
// Nothing
- mCallsMutex.unlock();
+ lock.unlock();
LOGW("No method or signal callback for methodID: " << methodID);
removePeerInternal(peerID, Status::NAUGHTY_PEER);
return true;
diff --git a/common/ipc/internals/processor.hpp b/common/ipc/internals/processor.hpp
index 8fc17fb..da2a5b9 100644
--- a/common/ipc/internals/processor.hpp
+++ b/common/ipc/internals/processor.hpp
@@ -243,7 +243,7 @@ public:
private:
typedef std::function& data)> SerializeCallback;
typedef std::function(int fd)> ParseCallback;
- typedef std::lock_guard Lock;
+ typedef std::unique_lock Lock;
struct EmptyData {
CONFIG_REGISTER_EMPTY
diff --git a/tests/unit_tests/ipc/ut-ipc.cpp b/tests/unit_tests/ipc/ut-ipc.cpp
index 88696fe..b8b9e95 100644
--- a/tests/unit_tests/ipc/ut-ipc.cpp
+++ b/tests/unit_tests/ipc/ut-ipc.cpp
@@ -102,14 +102,12 @@ struct ThrowOnAcceptData {
template
void accept(Visitor)
{
- LOGE("Serialization and parsing failed");
- throw std::exception();
+ throw std::runtime_error("intentional failure in accept");
}
template
void accept(Visitor) const
{
- LOGE("Const Serialization and parsing failed");
- throw std::exception();
+ throw std::runtime_error("intentional failure in accept const");
}
};
@@ -139,11 +137,11 @@ PeerID connect(Service& s, Client& c)
// Connects the Client to the Service and returns Clients PeerID
std::mutex mutex;
- std::unique_lock lock(mutex);
std::condition_variable cv;
- unsigned int peerID = 0;
- auto newPeerCallback = [&cv, &peerID](unsigned int newPeerID) {
+ PeerID peerID = 0;
+ auto newPeerCallback = [&cv, &peerID, &mutex](const PeerID newPeerID) {
+ std::unique_lock lock(mutex);
peerID = newPeerID;
cv.notify_one();
};
@@ -156,6 +154,7 @@ PeerID connect(Service& s, Client& c)
c.start();
+ std::unique_lock lock(mutex);
BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(1000), [&peerID]() {
return peerID != 0;
}));
@@ -322,22 +321,23 @@ BOOST_AUTO_TEST_CASE(AsyncClientToServiceEcho)
Client c(socketPath);
c.start();
- std::mutex mtx;
- std::unique_lock lck(mtx);
+ std::mutex mutex;
std::condition_variable cv;
//Async call
std::shared_ptr sentData(new SendData(34));
std::shared_ptr recvData;
- auto dataBack = [&cv, &recvData](ipc::Status status, std::shared_ptr& data) {
+ auto dataBack = [&cv, &recvData, &mutex](ipc::Status status, std::shared_ptr& data) {
BOOST_CHECK(status == ipc::Status::OK);
+ std::unique_lock lock(mutex);
recvData = data;
cv.notify_one();
};
c.callAsync(1, sentData, dataBack);
// Wait for the response
- BOOST_CHECK(cv.wait_for(lck, std::chrono::milliseconds(100), [&recvData]() {
+ std::unique_lock lock(mutex);
+ BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(100), [&recvData]() {
return static_cast(recvData);
}));
@@ -355,11 +355,11 @@ BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho)
std::shared_ptr sentData(new SendData(56));
std::shared_ptr recvData;
- std::mutex mtx;
- std::unique_lock lck(mtx);
+ std::mutex mutex;
std::condition_variable cv;
- auto dataBack = [&cv, &recvData](ipc::Status status, std::shared_ptr& data) {
+ auto dataBack = [&cv, &recvData, &mutex](ipc::Status status, std::shared_ptr& data) {
BOOST_CHECK(status == ipc::Status::OK);
+ std::unique_lock lock(mutex);
recvData = data;
cv.notify_one();
};
@@ -367,7 +367,8 @@ BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho)
s.callAsync(1, peerID, sentData, dataBack);
// Wait for the response
- BOOST_CHECK(cv.wait_for(lck, std::chrono::milliseconds(1000), [&recvData]() {
+ std::unique_lock lock(mutex);
+ BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(1000), [&recvData]() {
return recvData.get() != nullptr;
}));
@@ -386,7 +387,7 @@ BOOST_AUTO_TEST_CASE(SyncTimeout)
std::shared_ptr sentData(new SendData(78));
- BOOST_CHECK_THROW((c.callSync(1, sentData, 10)), IPCException);
+ BOOST_CHECK_THROW((c.callSync(1, sentData, 10)), IPCException); //TODO it fails from time to time
}
BOOST_AUTO_TEST_CASE(SerializationError)
@@ -432,12 +433,12 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
Client c(socketPath);
c.start();
- std::mutex mtx;
- std::unique_lock lck(mtx);
+ std::mutex mutex;
std::condition_variable cv;
ipc::Status retStatus = ipc::Status::UNDEFINED;
- auto dataBack = [&cv, &retStatus](ipc::Status status, std::shared_ptr&) {
+ auto dataBack = [&cv, &retStatus, &mutex](ipc::Status status, std::shared_ptr&) {
+ std::unique_lock lock(mutex);
retStatus = status;
cv.notify_one();
};
@@ -446,10 +447,11 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
c.callAsync(1, sentData, dataBack);
// Wait for the response
- BOOST_CHECK(cv.wait_for(lck, std::chrono::seconds(10), [&retStatus]() {
+ std::unique_lock lock(mutex);
+ BOOST_CHECK(cv.wait_for(lock, std::chrono::seconds(10), [&retStatus]() {
return retStatus != ipc::Status::UNDEFINED;
}));
- BOOST_CHECK(retStatus == ipc::Status::PEER_DISCONNECTED);
+ BOOST_CHECK(retStatus == ipc::Status::PEER_DISCONNECTED); //TODO it fails from time to time
}
@@ -515,7 +517,7 @@ BOOST_AUTO_TEST_CASE(AddSignalInRuntime)
s.signal(1, data);
// Wait for the signals to arrive
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ std::this_thread::sleep_for(std::chrono::milliseconds(100)); //TODO wait_for
BOOST_CHECK(isHandlerACalled && isHandlerBCalled);
}
@@ -547,7 +549,7 @@ BOOST_AUTO_TEST_CASE(AddSignalOffline)
s.signal(1, data);
// Wait for the signals to arrive
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ std::this_thread::sleep_for(std::chrono::milliseconds(100)); //TODO wait_for
BOOST_CHECK(isHandlerACalled && isHandlerBCalled);
}
--
2.7.4
From b550c251c29c804983f019305b6e7574b2f7b585 Mon Sep 17 00:00:00 2001
From: Piotr Bartosiewicz
Date: Mon, 8 Dec 2014 12:52:32 +0100
Subject: [PATCH 02/16] Fix some other threading issues
[Bug/Feature] N/A
[Cause] N/A
[Solution] N/A
[Verification] Build, install, run tests
Change-Id: I89b35811644f424773c007bb7d13d622ba57ac48
---
common/lxc/zone.cpp | 4 +-
common/utils/environment.cpp | 18 +++----
common/utils/execute.cpp | 17 ++++---
common/utils/execute.hpp | 3 ++
tests/unit_tests/server/ut-containers-manager.cpp | 60 +++++++++++------------
tests/unit_tests/utils/scoped-daemon.cpp | 37 ++++++++------
tests/unit_tests/utils/scoped-daemon.hpp | 2 +-
7 files changed, 78 insertions(+), 63 deletions(-)
diff --git a/common/lxc/zone.cpp b/common/lxc/zone.cpp
index c7736f5..f4ad3e4 100644
--- a/common/lxc/zone.cpp
+++ b/common/lxc/zone.cpp
@@ -34,8 +34,8 @@
#include "logger/logger.hpp"
#include "lxc/zone.hpp"
#include "lxc/exception.hpp"
-#ifdef USE_EXEC
#include "utils/execute.hpp"
+#ifdef USE_EXEC
#include "utils/c-array.hpp"
#endif
@@ -320,7 +320,7 @@ bool LxcZone::setRunLevel(int runLevel)
return false;
}
int status;
- if (waitpid(pid, &status, 0) < 0) {
+ if (!utils::waitPid(pid, status)) {
return false;
}
return status == 0;
diff --git a/common/utils/environment.cpp b/common/utils/environment.cpp
index 73c7057..aec70c1 100644
--- a/common/utils/environment.cpp
+++ b/common/utils/environment.cpp
@@ -25,6 +25,7 @@
#include "config.hpp"
#include "utils/environment.hpp"
+#include "utils/execute.hpp"
#include "logger/logger.hpp"
#include
@@ -96,29 +97,28 @@ bool launchAsRoot(const std::function& func)
if (pid == 0) {
if (::setuid(0) < 0) {
LOGW("Failed to become root: " << strerror(errno));
- ::exit(EXIT_FAILURE);
+ _exit(EXIT_FAILURE);
}
try {
if (!func()) {
LOGE("Failed to successfully execute func");
- ::exit(EXIT_FAILURE);
+ _exit(EXIT_FAILURE);
}
} catch (const std::exception& e) {
LOGE("Failed to successfully execute func: " << e.what());
- ::exit(EXIT_FAILURE);
+ _exit(EXIT_FAILURE);
}
- ::exit(EXIT_SUCCESS);
+ _exit(EXIT_SUCCESS);
}
- int result;
- if (::waitpid(pid, &result, 0) < 0) {
- LOGE("waitpid failed: " << strerror(errno));
+ int status;
+ if (!waitPid(pid, status)) {
return false;
}
- if (result != 0) {
- LOGE("Function launched as root failed with result " << result);
+ if (status != 0) {
+ LOGE("Function launched as root exited with status " << status);
return false;
}
diff --git a/common/utils/execute.cpp b/common/utils/execute.cpp
index 2f2e927..6511456 100644
--- a/common/utils/execute.cpp
+++ b/common/utils/execute.cpp
@@ -61,12 +61,7 @@ bool executeAndWait(const char* fname, const char* const* argv, int& status)
execv(fname, const_cast(argv));
_exit(EXIT_FAILURE);
}
- int ret = waitpid(pid, &status, 0);
- if (ret != pid) {
- LOGE("Waitpid failed");
- return false;
- }
- return true;
+ return waitPid(pid, status);
}
bool executeAndWait(const char* fname, const char* const* argv)
@@ -88,5 +83,15 @@ bool executeAndWait(const char* fname, const char* const* argv)
return true;
}
+bool waitPid(pid_t pid, int& status)
+{
+ while (waitpid(pid, &status, 0) == -1) {
+ if (errno != EINTR) {
+ return false;
+ }
+ }
+ return true;
+}
+
} // namespace utils
} // namespace security_containers
diff --git a/common/utils/execute.hpp b/common/utils/execute.hpp
index 1fc7b29..3256ee2 100644
--- a/common/utils/execute.hpp
+++ b/common/utils/execute.hpp
@@ -25,6 +25,7 @@
#ifndef COMMON_UTILS_EXECUTE_HPP
#define COMMON_UTILS_EXECUTE_HPP
+#include
namespace security_containers {
namespace utils {
@@ -33,6 +34,8 @@ bool executeAndWait(const char* fname, const char* const* argv);
bool executeAndWait(const char* fname, const char* const* argv, int& status);
+bool waitPid(pid_t pid, int& status);
+
} // namespace utils
} // namespace security_containers
diff --git a/tests/unit_tests/server/ut-containers-manager.cpp b/tests/unit_tests/server/ut-containers-manager.cpp
index 6d64c83..ac4a898 100644
--- a/tests/unit_tests/server/ut-containers-manager.cpp
+++ b/tests/unit_tests/server/ut-containers-manager.cpp
@@ -460,27 +460,6 @@ std::function expectedMessage(const std::string& me
};
}
-class FileCleanerRAII {
-public:
- FileCleanerRAII(const std::vector& filePathsToClean):
- mFilePathsToClean(filePathsToClean)
- { }
-
- ~FileCleanerRAII()
- {
- namespace fs = boost::filesystem;
- for (const auto& file : mFilePathsToClean) {
- fs::path f(file);
- if (fs::exists(f)) {
- fs::remove(f);
- }
- }
- }
-
-private:
- const std::vector mFilePathsToClean;
-};
-
struct Fixture {
security_containers::utils::ScopedGlibLoop mLoop;
@@ -1041,11 +1020,15 @@ BOOST_AUTO_TEST_CASE(SetActiveContainerTest)
BOOST_AUTO_TEST_CASE(CreateDestroyContainerTest)
{
- const std::string newContainerId = "test1234";
+ const std::string container1 = "test1";
+ const std::string container2 = "test2";
+ const std::string container3 = "test3";
ContainersManager cm(EMPTY_DBUS_CONFIG_PATH);
cm.startAll();
+ BOOST_CHECK_EQUAL(cm.getRunningForegroundContainerId(), "");
+
Latch callDone;
auto resultCallback = [&]() {
callDone.set();
@@ -1053,19 +1036,36 @@ BOOST_AUTO_TEST_CASE(CreateDestroyContainerTest)
DbusAccessory dbus(DbusAccessory::HOST_ID);
- // create new container
- dbus.callAsyncMethodCreateContainer(newContainerId, resultCallback);
+ // create container1
+ dbus.callAsyncMethodCreateContainer(container1, resultCallback);
BOOST_REQUIRE(callDone.wait(EVENT_TIMEOUT));
- // focus new container
- cm.focus(newContainerId);
- BOOST_CHECK(cm.getRunningForegroundContainerId() == newContainerId);
+ BOOST_CHECK_EQUAL(cm.getRunningForegroundContainerId(), container1);
- // destroy container
- dbus.callAsyncMethodDestroyContainer(newContainerId, resultCallback);
+ // create container2
+ dbus.callAsyncMethodCreateContainer(container2, resultCallback);
BOOST_REQUIRE(callDone.wait(EVENT_TIMEOUT));
+ BOOST_CHECK_EQUAL(cm.getRunningForegroundContainerId(), container2); //TODO is this valid?
- BOOST_CHECK(cm.getRunningForegroundContainerId() == "");
+ // create container3
+ dbus.callAsyncMethodCreateContainer(container3, resultCallback);
+ BOOST_REQUIRE(callDone.wait(EVENT_TIMEOUT));
+ BOOST_CHECK_EQUAL(cm.getRunningForegroundContainerId(), container3);
+
+ // destroy container2
+ dbus.callAsyncMethodDestroyContainer(container2, resultCallback);
+ BOOST_REQUIRE(callDone.wait(EVENT_TIMEOUT));
+ BOOST_CHECK_EQUAL(cm.getRunningForegroundContainerId(), container3);
+
+ // destroy container3
+ dbus.callAsyncMethodDestroyContainer(container3, resultCallback);
+ BOOST_REQUIRE(callDone.wait(EVENT_TIMEOUT));
+ //BOOST_CHECK_EQUAL(cm.getRunningForegroundContainerId(), container1);//TODO fix it
+
+ // destroy container1
+ dbus.callAsyncMethodDestroyContainer(container1, resultCallback);
+ BOOST_REQUIRE(callDone.wait(EVENT_TIMEOUT));
+ BOOST_CHECK_EQUAL(cm.getRunningForegroundContainerId(), "");
}
BOOST_AUTO_TEST_CASE(DeclareFile)
diff --git a/tests/unit_tests/utils/scoped-daemon.cpp b/tests/unit_tests/utils/scoped-daemon.cpp
index 21fea83..db85375 100644
--- a/tests/unit_tests/utils/scoped-daemon.cpp
+++ b/tests/unit_tests/utils/scoped-daemon.cpp
@@ -25,6 +25,7 @@
#include "config.hpp"
#include "utils/scoped-daemon.hpp"
+#include "utils/execute.hpp"
#include "logger/logger.hpp"
@@ -69,17 +70,17 @@ namespace {
volatile pid_t daemonPid = -1;// available in launcher process only;
-void startDaemon(const char* path, const char* const argv[])
+bool startDaemon(const char* path, const char* const argv[])
{
execv(path, const_cast(argv));
perror("exec failed");
+ return false;
}
-void waitForDaemon()
+bool waitForDaemon()
{
- if (waitpid(daemonPid, NULL, 0) == -1) {
- perror("wait for daemon failed");
- }
+ int status;
+ return waitPid(daemonPid, status);
}
void launcherSignalHandler(int sig)
@@ -108,21 +109,22 @@ void cleanupProcess()
signal(SIGHUP, SIG_DFL);
}
-void startByLauncher(const char* path, const char* const argv[])
+bool startByLauncher(const char* path, const char* const argv[])
{
cleanupProcess();
daemonPid = fork();
if (daemonPid == -1) {
perror("fork failed");
- return;
+ return false;
}
if (daemonPid == 0) {
- startDaemon(path, argv);
- _exit(1);
+ if (!startDaemon(path, argv)) {
+ return false;
+ }
}
registerLauncherSignalHandler();
registerParentDiedNotification();
- waitForDaemon();
+ return waitForDaemon();
}
} // namespace
@@ -147,12 +149,13 @@ void ScopedDaemon::start(const char* path, const char* const argv[], const bool
throw std::runtime_error("fork failed");
}
if (mPid == 0) {
+ bool ret;
if (useLauncher) {
- startByLauncher(path, argv);
+ ret = startByLauncher(path, argv);
} else {
- startDaemon(path, argv);
+ ret = startDaemon(path, argv);
}
- _exit(0);
+ _exit(ret ? EXIT_SUCCESS : EXIT_FAILURE);
}
}
@@ -164,8 +167,12 @@ void ScopedDaemon::stop()
if (kill(mPid, SIGTERM) == -1) {
LOGE("kill failed");
}
- if (waitpid(mPid, NULL, 0) == -1) {
- LOGE("waitpid failed");
+ int status;
+ if (!waitPid(mPid, status)) {
+ throw std::runtime_error("waitpid failed");
+ }
+ if (status != EXIT_SUCCESS) {
+ LOGW("process exit with status " << status);
}
mPid = -1;
}
diff --git a/tests/unit_tests/utils/scoped-daemon.hpp b/tests/unit_tests/utils/scoped-daemon.hpp
index 89e630c..b3eab29 100644
--- a/tests/unit_tests/utils/scoped-daemon.hpp
+++ b/tests/unit_tests/utils/scoped-daemon.hpp
@@ -50,7 +50,7 @@ public:
* @param argv arguments passed to the daemon
* @param useLauncher use additional launcher process
*/
- void start(const char* path, const char* const argv[], const bool useLauncher = true);
+ void start(const char* path, const char* const argv[], const bool useLauncher = false);
/**
* Stops a daemon by sending SIGTERM and waits for a process.
--
2.7.4
From 316605cca8c654878acfc67deb5a62103be2822b Mon Sep 17 00:00:00 2001
From: Jan Olszak
Date: Mon, 8 Dec 2014 13:41:39 +0100
Subject: [PATCH 03/16] IPC: Replace PeerID witch peer's file descriptor
[Bug/Feature] N/A
[Cause] N/A
[Solution] N/A
[Verification] Build, install, run tests
Change-Id: I30203990d9c9c3a58515d2fe09b074072122c156
---
common/ipc/client.cpp | 2 +-
common/ipc/client.hpp | 6 +-
common/ipc/internals/call-queue.hpp | 14 ++--
common/ipc/internals/processor.cpp | 134 ++++++++++++++++--------------------
common/ipc/internals/processor.hpp | 100 +++++++++++++--------------
common/ipc/service.hpp | 18 ++---
common/ipc/types.hpp | 14 ++--
tests/unit_tests/ipc/ut-ipc.cpp | 54 +++++++--------
8 files changed, 165 insertions(+), 177 deletions(-)
diff --git a/common/ipc/client.cpp b/common/ipc/client.cpp
index c806e7b..8b0e458 100644
--- a/common/ipc/client.cpp
+++ b/common/ipc/client.cpp
@@ -55,7 +55,7 @@ void Client::start()
// Initialize the connection with the server
LOGD("Connecting to " + mSocketPath);
auto socketPtr = std::make_shared(Socket::connectSocket(mSocketPath));
- mServiceID = mProcessor.addPeer(socketPtr);
+ mServiceFD = mProcessor.addPeer(socketPtr);
// Start listening
mProcessor.start();
diff --git a/common/ipc/client.hpp b/common/ipc/client.hpp
index 3178474..6f8b049 100644
--- a/common/ipc/client.hpp
+++ b/common/ipc/client.hpp
@@ -156,7 +156,7 @@ public:
const std::shared_ptr& data);
private:
- PeerID mServiceID;
+ FileDescriptor mServiceFD;
Processor mProcessor;
std::string mSocketPath;
};
@@ -185,7 +185,7 @@ std::shared_ptr Client::callSync(const MethodID methodID,
unsigned int timeoutMS)
{
LOGD("Sync calling method: " << methodID);
- return mProcessor.callSync(methodID, mServiceID, data, timeoutMS);
+ return mProcessor.callSync(methodID, mServiceFD, data, timeoutMS);
}
template
@@ -196,7 +196,7 @@ void Client::callAsync(const MethodID methodID,
LOGD("Async calling method: " << methodID);
mProcessor.callAsync(methodID,
- mServiceID,
+ mServiceFD,
data,
resultCallback);
LOGD("Async called method: " << methodID);
diff --git a/common/ipc/internals/call-queue.hpp b/common/ipc/internals/call-queue.hpp
index 4d1ecf6..7911d7a 100644
--- a/common/ipc/internals/call-queue.hpp
+++ b/common/ipc/internals/call-queue.hpp
@@ -48,7 +48,7 @@ public:
Call() = default;
Call(Call&&) = default;
- PeerID peerID;
+ FileDescriptor peerFD;
MethodID methodID;
MessageID messageID;
std::shared_ptr data;
@@ -66,14 +66,14 @@ public:
template
MessageID push(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr& data,
const typename ResultHandler::type& process);
template
MessageID push(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr& data);
Call pop();
@@ -90,13 +90,13 @@ private:
template
MessageID CallQueue::push(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr& data,
const typename ResultHandler::type& process)
{
Call call;
call.methodID = methodID;
- call.peerID = peerID;
+ call.peerFD = peerFD;
call.data = data;
MessageID messageID = getNextMessageID();
@@ -124,12 +124,12 @@ MessageID CallQueue::push(const MethodID methodID,
template
MessageID CallQueue::push(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr& data)
{
Call call;
call.methodID = methodID;
- call.peerID = peerID;
+ call.peerFD = peerFD;
call.data = data;
MessageID messageID = getNextMessageID();
diff --git a/common/ipc/internals/processor.cpp b/common/ipc/internals/processor.cpp
index b134b08..7ac378e 100644
--- a/common/ipc/internals/processor.cpp
+++ b/common/ipc/internals/processor.cpp
@@ -55,8 +55,7 @@ Processor::Processor(const PeerCallback& newPeerCallback,
const unsigned int maxNumberOfPeers)
: mNewPeerCallback(newPeerCallback),
mRemovedPeerCallback(removedPeerCallback),
- mMaxNumberOfPeers(maxNumberOfPeers),
- mPeerIDCounter(0)
+ mMaxNumberOfPeers(maxNumberOfPeers)
{
LOGT("Creating Processor");
using namespace std::placeholders;
@@ -120,37 +119,37 @@ void Processor::removeMethod(const MethodID methodID)
mMethodsCallbacks.erase(methodID);
}
-PeerID Processor::addPeer(const std::shared_ptr& socketPtr)
+FileDescriptor Processor::addPeer(const std::shared_ptr& socketPtr)
{
LOGT("Adding socket");
- PeerID peerID;
+ FileDescriptor peerFD;
{
Lock lock(mSocketsMutex);
- peerID = getNextPeerID();
- SocketInfo socketInfo(peerID, std::move(socketPtr));
+ peerFD = socketPtr->getFD();
+ SocketInfo socketInfo(peerFD, std::move(socketPtr));
mNewSockets.push(std::move(socketInfo));
}
- LOGI("New peer added. Id: " << peerID);
+ LOGI("New peer added. Id: " << peerFD);
mEventQueue.send(Event::ADD_PEER);
- return peerID;
+ return peerFD;
}
-void Processor::removePeer(const PeerID peerID)
+void Processor::removePeer(const FileDescriptor peerFD)
{
std::shared_ptr conditionPtr(new std::condition_variable());
{
Lock lock(mSocketsMutex);
- RemovePeerRequest request(peerID, conditionPtr);
+ RemovePeerRequest request(peerFD, conditionPtr);
mPeersToDelete.push(std::move(request));
}
mEventQueue.send(Event::REMOVE_PEER);
- auto isPeerDeleted = [&peerID, this]()->bool {
+ auto isPeerDeleted = [&peerFD, this]()->bool {
Lock lock(mSocketsMutex);
- return mSockets.count(peerID) == 0;
+ return mSockets.count(peerFD) == 0;
};
std::mutex mutex;
@@ -158,16 +157,16 @@ void Processor::removePeer(const PeerID peerID)
conditionPtr->wait(lock, isPeerDeleted);
}
-void Processor::removePeerInternal(const PeerID peerID, Status status)
+void Processor::removePeerInternal(const FileDescriptor peerFD, Status status)
{
- LOGW("Removing peer. ID: " << peerID);
+ LOGW("Removing peer. ID: " << peerFD);
{
Lock lock(mSocketsMutex);
- mSockets.erase(peerID);
+ mSockets.erase(peerFD);
// Remove from signal addressees
for (auto it = mSignalsPeers.begin(); it != mSignalsPeers.end();) {
- it->second.remove(peerID);
+ it->second.remove(peerFD);
if (it->second.empty()) {
it = mSignalsPeers.erase(it);
} else {
@@ -182,7 +181,7 @@ void Processor::removePeerInternal(const PeerID peerID, Status status)
std::shared_ptr data;
for (auto it = mReturnCallbacks.begin(); it != mReturnCallbacks.end();) {
- if (it->second.peerID == peerID) {
+ if (it->second.peerFD == peerFD) {
IGNORE_EXCEPTIONS(it->second.process(status, data));
it = mReturnCallbacks.erase(it);
} else {
@@ -196,7 +195,7 @@ void Processor::removePeerInternal(const PeerID peerID, Status status)
Lock lock(mCallbacksMutex);
if (mRemovedPeerCallback) {
// Notify about the deletion
- mRemovedPeerCallback(peerID);
+ mRemovedPeerCallback(peerFD);
}
}
@@ -264,22 +263,21 @@ void Processor::run()
bool Processor::handleLostConnections()
{
- std::list peersToRemove;
+ std::vector peersToRemove;
{
Lock lock(mSocketsMutex);
- auto socketIt = mSockets.begin();
- for (unsigned int i = 1; i < mFDs.size(); ++i, ++socketIt) {
+ for (unsigned int i = 1; i < mFDs.size(); ++i) {
if (mFDs[i].revents & POLLHUP) {
- LOGI("Lost connection to peer: " << socketIt->first);
+ LOGI("Lost connection to peer: " << mFDs[i].fd);
mFDs[i].revents &= ~(POLLHUP);
- peersToRemove.push_back(socketIt->first);
+ peersToRemove.push_back(mFDs[i].fd);
}
}
}
- for (const PeerID peerID : peersToRemove) {
- removePeerInternal(peerID, Status::PEER_DISCONNECTED);
+ for (const FileDescriptor peerFD : peersToRemove) {
+ removePeerInternal(peerFD, Status::PEER_DISCONNECTED);
}
return !peersToRemove.empty();
@@ -287,27 +285,26 @@ bool Processor::handleLostConnections()
bool Processor::handleInputs()
{
- std::list> > peersWithInput;
+ std::vector> socketsWithInput;
{
Lock lock(mSocketsMutex);
- auto socketIt = mSockets.begin();
- for (unsigned int i = 1; i < mFDs.size(); ++i, ++socketIt) {
+ for (unsigned int i = 1; i < mFDs.size(); ++i) {
if (mFDs[i].revents & POLLIN) {
mFDs[i].revents &= ~(POLLIN);
- peersWithInput.push_back(*socketIt);
+ socketsWithInput.push_back(mSockets[mFDs[i].fd]);
}
}
}
bool pollChanged = false;
// Handle input outside the critical section
- for (const auto& peer : peersWithInput) {
- pollChanged = pollChanged || handleInput(peer.first, *peer.second);
+ for (const auto& socketPtr : socketsWithInput) {
+ pollChanged = pollChanged || handleInput(*socketPtr);
}
return pollChanged;
}
-bool Processor::handleInput(const PeerID peerID, const Socket& socket)
+bool Processor::handleInput(const Socket& socket)
{
LOGT("Handle incoming data");
MethodID methodID;
@@ -318,7 +315,7 @@ bool Processor::handleInput(const PeerID peerID, const Socket& socket)
socket.read(&messageID, sizeof(messageID));
if (methodID == RETURN_METHOD_ID) {
- return onReturnValue(peerID, socket, messageID);
+ return onReturnValue(socket, messageID);
} else {
Lock lock(mCallsMutex);
@@ -326,19 +323,19 @@ bool Processor::handleInput(const PeerID peerID, const Socket& socket)
// Method
std::shared_ptr methodCallbacks = mMethodsCallbacks.at(methodID);
lock.unlock();
- return onRemoteCall(peerID, socket, methodID, messageID, methodCallbacks);
+ return onRemoteCall(socket, methodID, messageID, methodCallbacks);
} else if (mSignalsCallbacks.count(methodID)) {
// Signal
std::shared_ptr signalCallbacks = mSignalsCallbacks.at(methodID);
lock.unlock();
- return onRemoteSignal(peerID, socket, methodID, messageID, signalCallbacks);
+ return onRemoteSignal(socket, methodID, messageID, signalCallbacks);
} else {
// Nothing
lock.unlock();
LOGW("No method or signal callback for methodID: " << methodID);
- removePeerInternal(peerID, Status::NAUGHTY_PEER);
+ removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
return true;
}
}
@@ -347,20 +344,19 @@ bool Processor::handleInput(const PeerID peerID, const Socket& socket)
return false;
}
-std::shared_ptr Processor::onNewSignals(const PeerID peerID,
+std::shared_ptr Processor::onNewSignals(const FileDescriptor peerFD,
std::shared_ptr& data)
{
- LOGD("New signals for peer: " << peerID);
+ LOGD("New signals for peer: " << peerFD);
Lock lock(mSocketsMutex);
for (MethodID methodID : data->ids) {
- mSignalsPeers[methodID].push_back(peerID);
+ mSignalsPeers[methodID].push_back(peerFD);
}
return std::make_shared();
}
-bool Processor::onReturnValue(const PeerID peerID,
- const Socket& socket,
+bool Processor::onReturnValue(const Socket& socket,
const MessageID messageID)
{
LOGI("Return value for messageID: " << messageID);
@@ -372,7 +368,7 @@ bool Processor::onReturnValue(const PeerID peerID,
mReturnCallbacks.erase(messageID);
} catch (const std::out_of_range&) {
LOGW("No return callback for messageID: " << messageID);
- removePeerInternal(peerID, Status::NAUGHTY_PEER);
+ removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
return true;
}
@@ -383,7 +379,7 @@ bool Processor::onReturnValue(const PeerID peerID,
} catch (const std::exception& e) {
LOGE("Exception during parsing: " << e.what());
IGNORE_EXCEPTIONS(returnCallbacks.process(Status::PARSING_ERROR, data));
- removePeerInternal(peerID, Status::PARSING_ERROR);
+ removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
return true;
}
@@ -393,8 +389,7 @@ bool Processor::onReturnValue(const PeerID peerID,
return false;
}
-bool Processor::onRemoteSignal(const PeerID peerID,
- const Socket& socket,
+bool Processor::onRemoteSignal(const Socket& socket,
const MethodID methodID,
const MessageID messageID,
std::shared_ptr signalCallbacks)
@@ -407,24 +402,23 @@ bool Processor::onRemoteSignal(const PeerID peerID,
data = signalCallbacks->parse(socket.getFD());
} catch (const std::exception& e) {
LOGE("Exception during parsing: " << e.what());
- removePeerInternal(peerID, Status::PARSING_ERROR);
+ removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
return true;
}
LOGT("Signal callback for methodID: " << methodID << "; messageID: " << messageID);
try {
- signalCallbacks->signal(peerID, data);
+ signalCallbacks->signal(socket.getFD(), data);
} catch (const std::exception& e) {
LOGE("Exception in method handler: " << e.what());
- removePeerInternal(peerID, Status::NAUGHTY_PEER);
+ removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
return true;
}
return false;
}
-bool Processor::onRemoteCall(const PeerID peerID,
- const Socket& socket,
+bool Processor::onRemoteCall(const Socket& socket,
const MethodID methodID,
const MessageID messageID,
std::shared_ptr methodCallbacks)
@@ -437,17 +431,17 @@ bool Processor::onRemoteCall(const PeerID peerID,
data = methodCallbacks->parse(socket.getFD());
} catch (const std::exception& e) {
LOGE("Exception during parsing: " << e.what());
- removePeerInternal(peerID, Status::PARSING_ERROR);
+ removePeerInternal(socket.getFD(), Status::PARSING_ERROR);
return true;
}
LOGT("Process callback for methodID: " << methodID << "; messageID: " << messageID);
std::shared_ptr returnData;
try {
- returnData = methodCallbacks->method(peerID, data);
+ returnData = methodCallbacks->method(socket.getFD(), data);
} catch (const std::exception& e) {
LOGE("Exception in method handler: " << e.what());
- removePeerInternal(peerID, Status::NAUGHTY_PEER);
+ removePeerInternal(socket.getFD(), Status::NAUGHTY_PEER);
return true;
}
@@ -460,7 +454,7 @@ bool Processor::onRemoteCall(const PeerID peerID,
methodCallbacks->serialize(socket.getFD(), returnData);
} catch (const std::exception& e) {
LOGE("Exception during serialization: " << e.what());
- removePeerInternal(peerID, Status::SERIALIZATION_ERROR);
+ removePeerInternal(socket.getFD(), Status::SERIALIZATION_ERROR);
return true;
}
@@ -512,21 +506,21 @@ bool Processor::onNewPeer()
mNewSockets.pop();
if (mSockets.size() > mMaxNumberOfPeers) {
- LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerID);
+ LOGE("There are too many peers. I don't accept the connection with " << socketInfo.peerFD);
return false;
}
- if (mSockets.count(socketInfo.peerID) != 0) {
- LOGE("There already was a socket for peerID: " << socketInfo.peerID);
+ if (mSockets.count(socketInfo.peerFD) != 0) {
+ LOGE("There already was a socket for peerFD: " << socketInfo.peerFD);
return false;
}
- mSockets[socketInfo.peerID] = std::move(socketInfo.socketPtr);
+ mSockets[socketInfo.peerFD] = std::move(socketInfo.socketPtr);
}
// Broadcast the new signal to peers
LOGW("Sending handled signals");
- std::list peersIDs;
+ std::list peersIDs;
{
Lock lock(mSocketsMutex);
for (const auto kv : mSockets) {
@@ -543,9 +537,9 @@ bool Processor::onNewPeer()
}
auto data = std::make_shared(ids);
- for (const PeerID peerID : peersIDs) {
+ for (const FileDescriptor peerFD : peersIDs) {
callInternal(REGISTER_SIGNAL_METHOD_ID,
- peerID,
+ peerFD,
data,
discardResultHandler);
}
@@ -558,7 +552,7 @@ bool Processor::onNewPeer()
Lock lock(mCallbacksMutex);
if (mNewPeerCallback) {
// Notify about the new user.
- mNewPeerCallback(socketInfo.peerID);
+ mNewPeerCallback(socketInfo.peerFD);
}
}
return true;
@@ -573,17 +567,11 @@ bool Processor::onRemovePeer()
mPeersToDelete.pop();
}
- removePeerInternal(request.peerID, Status::REMOVED_PEER);
+ removePeerInternal(request.peerFD, Status::REMOVED_PEER);
request.conditionPtr->notify_all();
return true;
}
-PeerID Processor::getNextPeerID()
-{
- // TODO: This method of generating UIDs is buggy. To be changed.
- return ++mPeerIDCounter;
-}
-
CallQueue::Call Processor::getCall()
{
Lock lock(mCallsMutex);
@@ -599,9 +587,9 @@ bool Processor::onCall()
try {
// Get the peer's socket
Lock lock(mSocketsMutex);
- socketPtr = mSockets.at(call.peerID);
+ socketPtr = mSockets.at(call.peerFD);
} catch (const std::out_of_range&) {
- LOGE("Peer disconnected. No socket with a peerID: " << call.peerID);
+ LOGE("Peer disconnected. No socket with a peerFD: " << call.peerFD);
IGNORE_EXCEPTIONS(call.process(Status::PEER_DISCONNECTED, call.data));
return false;
}
@@ -612,7 +600,7 @@ bool Processor::onCall()
if (mReturnCallbacks.count(call.messageID) != 0) {
LOGE("There already was a return callback for messageID: " << call.messageID);
}
- mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerID,
+ mReturnCallbacks[call.messageID] = std::move(ReturnCallbacks(call.peerFD,
std::move(call.parse),
std::move(call.process)));
}
@@ -634,7 +622,7 @@ bool Processor::onCall()
mReturnCallbacks.erase(call.messageID);
}
- removePeerInternal(call.peerID, Status::SERIALIZATION_ERROR);
+ removePeerInternal(call.peerFD, Status::SERIALIZATION_ERROR);
return true;
}
diff --git a/common/ipc/internals/processor.hpp b/common/ipc/internals/processor.hpp
index da2a5b9..476e662 100644
--- a/common/ipc/internals/processor.hpp
+++ b/common/ipc/internals/processor.hpp
@@ -35,8 +35,6 @@
#include "logger/logger.hpp"
#include
-
-#include
#include
#include
#include
@@ -76,6 +74,9 @@ const unsigned int DEFAULT_METHOD_TIMEOUT = 1000;
* - helper function for removing from unordered map
* - new way to generate UIDs
* - callbacks for serialization/parsing
+* - store Sockets in a vector, maybe SocketStore?
+*
+*
*/
class Processor {
public:
@@ -141,16 +142,16 @@ public:
* Calls the newPeerCallback.
*
* @param socketPtr pointer to the new socket
- * @return peerID of the new socket
+ * @return peerFD of the new socket
*/
- PeerID addPeer(const std::shared_ptr& socketPtr);
+ FileDescriptor addPeer(const std::shared_ptr& socketPtr);
/**
* Request removing peer and wait
*
- * @param peerID id of the peer
+ * @param peerFD id of the peer
*/
- void removePeer(const PeerID peerID);
+ void removePeer(const FileDescriptor peerFD);
/**
* Saves the callbacks connected to the method id.
@@ -197,7 +198,7 @@ public:
* Synchronous method call.
*
* @param methodID API dependent id of the method
- * @param peerID id of the peer
+ * @param peerFD id of the peer
* @param data data to sent
* @param timeoutMS how long to wait for the return value before throw
* @tparam SentDataType data type to send
@@ -205,7 +206,7 @@ public:
*/
template
std::shared_ptr callSync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr& data,
unsigned int timeoutMS = 500);
@@ -213,7 +214,7 @@ public:
* Asynchronous method call
*
* @param methodID API dependent id of the method
- * @param peerID id of the peer
+ * @param peerFD id of the peer
* @param data data to sent
* @param process callback processing the return data
* @tparam SentDataType data type to send
@@ -221,7 +222,7 @@ public:
*/
template
MessageID callAsync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr& data,
const typename ResultHandler::type& process);
@@ -292,10 +293,10 @@ private:
ReturnCallbacks(ReturnCallbacks&&) = default;
ReturnCallbacks& operator=(ReturnCallbacks &&) = default;
- ReturnCallbacks(PeerID peerID, const ParseCallback& parse, const ResultHandler::type& process)
- : peerID(peerID), parse(parse), process(process) {}
+ ReturnCallbacks(FileDescriptor peerFD, const ParseCallback& parse, const ResultHandler::type& process)
+ : peerFD(peerFD), parse(parse), process(process) {}
- PeerID peerID;
+ FileDescriptor peerFD;
ParseCallback parse;
ResultHandler::type process;
};
@@ -307,10 +308,10 @@ private:
SocketInfo(SocketInfo&&) = default;
SocketInfo& operator=(SocketInfo &&) = default;
- SocketInfo(const PeerID peerID, const std::shared_ptr& socketPtr)
- : peerID(peerID), socketPtr(socketPtr) {}
+ SocketInfo(const FileDescriptor peerFD, const std::shared_ptr& socketPtr)
+ : peerFD(peerFD), socketPtr(socketPtr) {}
- PeerID peerID;
+ FileDescriptor peerFD;
std::shared_ptr socketPtr;
};
@@ -321,11 +322,11 @@ private:
RemovePeerRequest(RemovePeerRequest&&) = default;
RemovePeerRequest& operator=(RemovePeerRequest &&) = default;
- RemovePeerRequest(const PeerID peerID,
+ RemovePeerRequest(const FileDescriptor peerFD,
const std::shared_ptr& conditionPtr)
- : peerID(peerID), conditionPtr(conditionPtr) {}
+ : peerFD(peerFD), conditionPtr(conditionPtr) {}
- PeerID peerID;
+ FileDescriptor peerFD;
std::shared_ptr conditionPtr;
};
@@ -345,12 +346,13 @@ private:
CallQueue mCalls;
std::unordered_map> mMethodsCallbacks;
std::unordered_map> mSignalsCallbacks;
- std::unordered_map> mSignalsPeers;
+ std::unordered_map> mSignalsPeers;
// Mutex for changing mSockets map.
// Shouldn't be locked on any read/write, that could block. Just copy the ptr.
std::mutex mSocketsMutex;
- std::unordered_map > mSockets;
+ std::unordered_map > mSockets;
+ std::vector mFDs;
std::queue mNewSockets;
std::queue mPeersToDelete;
@@ -366,9 +368,6 @@ private:
unsigned int mMaxNumberOfPeers;
std::thread mThread;
- std::vector mFDs;
-
- std::atomic mPeerIDCounter;
template
void addMethodHandlerInternal(const MethodID methodID,
@@ -376,7 +375,7 @@ private:
template
MessageID callInternal(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr& data,
const typename ResultHandler::type& process);
@@ -390,26 +389,23 @@ private:
bool onRemovePeer();
bool handleLostConnections();
bool handleInputs();
- bool handleInput(const PeerID peerID, const Socket& socket);
- bool onReturnValue(const PeerID peerID,
- const Socket& socket,
+ bool handleInput(const Socket& socket);
+ bool onReturnValue(const Socket& socket,
const MessageID messageID);
- bool onRemoteCall(const PeerID peerID,
- const Socket& socket,
+ bool onRemoteCall(const Socket& socket,
const MethodID methodID,
const MessageID messageID,
std::shared_ptr methodCallbacks);
- bool onRemoteSignal(const PeerID peerID,
- const Socket& socket,
+ bool onRemoteSignal(const Socket& socket,
const MethodID methodID,
const MessageID messageID,
std::shared_ptr signalCallbacks);
void resetPolling();
- PeerID getNextPeerID();
+ FileDescriptor getNextFileDescriptor();
CallQueue::Call getCall();
- void removePeerInternal(const PeerID peerID, Status status);
+ void removePeerInternal(const FileDescriptor peerFD, Status status);
- std::shared_ptr onNewSignals(const PeerID peerID,
+ std::shared_ptr onNewSignals(const FileDescriptor peerFD,
std::shared_ptr& data);
@@ -432,9 +428,9 @@ void Processor::addMethodHandlerInternal(const MethodID methodID,
config::saveToFD(fd, *std::static_pointer_cast(data));
};
- methodCall.method = [method](const PeerID peerID, std::shared_ptr& data)->std::shared_ptr {
+ methodCall.method = [method](const FileDescriptor peerFD, std::shared_ptr& data)->std::shared_ptr {
std::shared_ptr tmpData = std::static_pointer_cast(data);
- return method(peerID, tmpData);
+ return method(peerFD, tmpData);
};
{
@@ -488,9 +484,9 @@ void Processor::addSignalHandler(const MethodID methodID,
return data;
};
- signalCall.signal = [handler](const PeerID peerID, std::shared_ptr& data) {
+ signalCall.signal = [handler](const FileDescriptor peerFD, std::shared_ptr& data) {
std::shared_ptr tmpData = std::static_pointer_cast(data);
- handler(peerID, tmpData);
+ handler(peerFD, tmpData);
};
{
@@ -503,7 +499,7 @@ void Processor::addSignalHandler(const MethodID methodID,
std::vector ids {methodID};
auto data = std::make_shared(ids);
- std::list peersIDs;
+ std::list peersIDs;
{
Lock lock(mSocketsMutex);
for (const auto kv : mSockets) {
@@ -511,9 +507,9 @@ void Processor::addSignalHandler(const MethodID methodID,
}
}
- for (const PeerID peerID : peersIDs) {
+ for (const FileDescriptor peerFD : peersIDs) {
callSync(REGISTER_SIGNAL_METHOD_ID,
- peerID,
+ peerFD,
data,
DEFAULT_METHOD_TIMEOUT);
}
@@ -522,12 +518,12 @@ void Processor::addSignalHandler(const MethodID methodID,
template
MessageID Processor::callInternal(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr& data,
const typename ResultHandler::type& process)
{
Lock lock(mCallsMutex);
- MessageID messageID = mCalls.push(methodID, peerID, data, process);
+ MessageID messageID = mCalls.push(methodID, peerFD, data, process);
mEventQueue.send(Event::CALL);
return messageID;
@@ -535,7 +531,7 @@ MessageID Processor::callInternal(const MethodID methodID,
template
MessageID Processor::callAsync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr& data,
const typename ResultHandler::type& process)
{
@@ -544,13 +540,13 @@ MessageID Processor::callAsync(const MethodID methodID,
throw IPCException("The Processor thread is not started. Can't send any data.");
}
- return callInternal(methodID, peerID, data, process);
+ return callInternal(methodID, peerFD, data, process);
}
template
std::shared_ptr Processor::callSync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr& data,
unsigned int timeoutMS)
{
@@ -568,7 +564,7 @@ std::shared_ptr Processor::callSync(const MethodID methodID,
};
MessageID messageID = callAsync(methodID,
- peerID,
+ peerFD,
data,
process);
@@ -586,7 +582,7 @@ std::shared_ptr Processor::callSync(const MethodID methodID,
}
}
if (isTimeout) {
- removePeer(peerID);
+ removePeer(peerFD);
LOGE("Function call timeout; methodID: " << methodID);
throw IPCTimeoutException("Function call timeout; methodID: " + std::to_string(methodID));
} else {
@@ -609,15 +605,15 @@ void Processor::signal(const MethodID methodID,
throw IPCException("The Processor thread is not started. Can't send any data.");
}
- std::list peersIDs;
+ std::list peersIDs;
{
Lock lock(mSocketsMutex);
peersIDs = mSignalsPeers[methodID];
}
- for (const PeerID peerID : peersIDs) {
+ for (const FileDescriptor peerFD : peersIDs) {
Lock lock(mCallsMutex);
- mCalls.push(methodID, peerID, data);
+ mCalls.push(methodID, peerFD, data);
mEventQueue.send(Event::CALL);
}
}
diff --git a/common/ipc/service.hpp b/common/ipc/service.hpp
index ac22eb2..317311d 100644
--- a/common/ipc/service.hpp
+++ b/common/ipc/service.hpp
@@ -129,7 +129,7 @@ public:
*/
template
std::shared_ptr callSync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr& data,
unsigned int timeoutMS = 500);
@@ -144,7 +144,7 @@ public:
*/
template
void callAsync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr& data,
const typename ResultHandler::type& resultCallback);
@@ -187,27 +187,27 @@ void Service::addSignalHandler(const MethodID methodID,
template
std::shared_ptr Service::callSync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr& data,
unsigned int timeoutMS)
{
- LOGD("Sync calling method: " << methodID << " for user: " << peerID);
- return mProcessor.callSync(methodID, peerID, data, timeoutMS);
+ LOGD("Sync calling method: " << methodID << " for user: " << peerFD);
+ return mProcessor.callSync(methodID, peerFD, data, timeoutMS);
}
template
void Service::callAsync(const MethodID methodID,
- const PeerID peerID,
+ const FileDescriptor peerFD,
const std::shared_ptr& data,
const typename ResultHandler::type& resultCallback)
{
- LOGD("Async calling method: " << methodID << " for user: " << peerID);
+ LOGD("Async calling method: " << methodID << " for user: " << peerFD);
mProcessor.callAsync(methodID,
- peerID,
+ peerFD,
data,
resultCallback);
- LOGD("Async called method: " << methodID << "for user: " << peerID);
+ LOGD("Async called method: " << methodID << "for user: " << peerFD);
}
template
diff --git a/common/ipc/types.hpp b/common/ipc/types.hpp
index 6588fb0..5fe9188 100644
--- a/common/ipc/types.hpp
+++ b/common/ipc/types.hpp
@@ -34,11 +34,12 @@
namespace security_containers {
namespace ipc {
-typedef std::function PeerCallback;
-typedef unsigned int PeerID;
+typedef int FileDescriptor;
typedef unsigned int MethodID;
typedef unsigned int MessageID;
+typedef std::function PeerCallback;
+
enum class Status : int {
OK = 0,
PARSING_ERROR,
@@ -55,17 +56,20 @@ void throwOnError(const Status status);
template
struct MethodHandler {
- typedef std::function(PeerID, std::shared_ptr&)> type;
+ typedef std::function(FileDescriptor peerFD,
+ std::shared_ptr& data)> type;
};
template
struct SignalHandler {
- typedef std::function&)> type;
+ typedef std::function& data)> type;
};
template
struct ResultHandler {
- typedef std::function&)> type;
+ typedef std::function& resultData)> type;
};
} // namespace ipc
diff --git a/tests/unit_tests/ipc/ut-ipc.cpp b/tests/unit_tests/ipc/ut-ipc.cpp
index b8b9e95..679f3df 100644
--- a/tests/unit_tests/ipc/ut-ipc.cpp
+++ b/tests/unit_tests/ipc/ut-ipc.cpp
@@ -111,38 +111,38 @@ struct ThrowOnAcceptData {
}
};
-std::shared_ptr returnEmptyCallback(const PeerID, std::shared_ptr&)
+std::shared_ptr returnEmptyCallback(const FileDescriptor, std::shared_ptr&)
{
return std::shared_ptr(new EmptyData());
}
-std::shared_ptr returnDataCallback(const PeerID, std::shared_ptr&)
+std::shared_ptr returnDataCallback(const FileDescriptor, std::shared_ptr&)
{
return std::shared_ptr(new SendData(1));
}
-std::shared_ptr echoCallback(const PeerID, std::shared_ptr& data)
+std::shared_ptr echoCallback(const FileDescriptor, std::shared_ptr& data)
{
return data;
}
-std::shared_ptr longEchoCallback(const PeerID, std::shared_ptr& data)
+std::shared_ptr longEchoCallback(const FileDescriptor, std::shared_ptr& data)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
return data;
}
-PeerID connect(Service& s, Client& c)
+FileDescriptor connect(Service& s, Client& c)
{
- // Connects the Client to the Service and returns Clients PeerID
+ // Connects the Client to the Service and returns Clients FileDescriptor
std::mutex mutex;
std::condition_variable cv;
- PeerID peerID = 0;
- auto newPeerCallback = [&cv, &peerID, &mutex](const PeerID newPeerID) {
+ FileDescriptor peerFD = 0;
+ auto newPeerCallback = [&cv, &peerFD, &mutex](const FileDescriptor newFileDescriptor) {
std::unique_lock lock(mutex);
- peerID = newPeerID;
+ peerFD = newFileDescriptor;
cv.notify_one();
};
@@ -155,11 +155,11 @@ PeerID connect(Service& s, Client& c)
c.start();
std::unique_lock lock(mutex);
- BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(1000), [&peerID]() {
- return peerID != 0;
+ BOOST_CHECK(cv.wait_for(lock, std::chrono::milliseconds(1000), [&peerFD]() {
+ return peerFD != 0;
}));
- return peerID;
+ return peerFD;
}
void testEcho(Client& c, const MethodID methodID)
@@ -169,10 +169,10 @@ void testEcho(Client& c, const MethodID methodID)
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
-void testEcho(Service& s, const MethodID methodID, const PeerID peerID)
+void testEcho(Service& s, const MethodID methodID, const FileDescriptor peerFD)
{
std::shared_ptr sentData(new SendData(56));
- std::shared_ptr recvData = s.callSync(methodID, peerID, sentData);
+ std::shared_ptr recvData = s.callSync(methodID, peerFD, sentData);
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
@@ -216,17 +216,17 @@ BOOST_AUTO_TEST_CASE(ClientAddRemoveMethod)
c.addMethodHandler(1, returnEmptyCallback);
c.addMethodHandler(1, returnDataCallback);
- PeerID peerID = connect(s, c);
+ FileDescriptor peerFD = connect(s, c);
c.addMethodHandler(1, echoCallback);
c.addMethodHandler(2, returnDataCallback);
- testEcho(s, 1, peerID);
+ testEcho(s, 1, peerFD);
c.removeMethod(1);
c.removeMethod(2);
- BOOST_CHECK_THROW(testEcho(s, 1, peerID), IPCException);
+ BOOST_CHECK_THROW(testEcho(s, 1, peerFD), IPCException);
}
BOOST_AUTO_TEST_CASE(ServiceStartStop)
@@ -305,10 +305,10 @@ BOOST_AUTO_TEST_CASE(SyncServiceToClientEcho)
Service s(socketPath);
Client c(socketPath);
c.addMethodHandler(1, echoCallback);
- PeerID peerID = connect(s, c);
+ FileDescriptor peerFD = connect(s, c);
std::shared_ptr sentData(new SendData(56));
- std::shared_ptr recvData = s.callSync(1, peerID, sentData);
+ std::shared_ptr recvData = s.callSync(1, peerFD, sentData);
BOOST_CHECK_EQUAL(recvData->intVal, sentData->intVal);
}
@@ -349,7 +349,7 @@ BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho)
Service s(socketPath);
Client c(socketPath);
c.addMethodHandler(1, echoCallback);
- PeerID peerID = connect(s, c);
+ FileDescriptor peerFD = connect(s, c);
// Async call
std::shared_ptr sentData(new SendData(56));
@@ -364,7 +364,7 @@ BOOST_AUTO_TEST_CASE(AsyncServiceToClientEcho)
cv.notify_one();
};
- s.callAsync(1, peerID, sentData, dataBack);
+ s.callAsync(1, peerFD, sentData, dataBack);
// Wait for the response
std::unique_lock lock(mutex);
@@ -422,7 +422,7 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
{
Service s(socketPath);
- auto method = [](const PeerID, std::shared_ptr&) {
+ auto method = [](const FileDescriptor, std::shared_ptr&) {
return std::shared_ptr(new SendData(1));
};
@@ -458,7 +458,7 @@ BOOST_AUTO_TEST_CASE(DisconnectedPeerError)
BOOST_AUTO_TEST_CASE(ReadTimeout)
{
Service s(socketPath);
- auto longEchoCallback = [](const PeerID, std::shared_ptr& data) {
+ auto longEchoCallback = [](const FileDescriptor, std::shared_ptr& data) {
return std::shared_ptr(new LongSendData(data->intVal));
};
s.addMethodHandler(1, longEchoCallback);
@@ -500,12 +500,12 @@ BOOST_AUTO_TEST_CASE(AddSignalInRuntime)
connect(s, c);
std::atomic_bool isHandlerACalled(false);
- auto handlerA = [&isHandlerACalled](const PeerID, std::shared_ptr&) {
+ auto handlerA = [&isHandlerACalled](const FileDescriptor, std::shared_ptr&) {
isHandlerACalled = true;
};
std::atomic_bool isHandlerBCalled(false);
- auto handlerB = [&isHandlerBCalled](const PeerID, std::shared_ptr&) {
+ auto handlerB = [&isHandlerBCalled](const FileDescriptor, std::shared_ptr&) {
isHandlerBCalled = true;
};
@@ -528,12 +528,12 @@ BOOST_AUTO_TEST_CASE(AddSignalOffline)
Client c(socketPath);
std::atomic_bool isHandlerACalled(false);
- auto handlerA = [&isHandlerACalled](const PeerID, std::shared_ptr&) {
+ auto handlerA = [&isHandlerACalled](const FileDescriptor, std::shared_ptr&) {
isHandlerACalled = true;
};
std::atomic_bool isHandlerBCalled(false);
- auto handlerB = [&isHandlerBCalled](const PeerID, std::shared_ptr&) {
+ auto handlerB = [&isHandlerBCalled](const FileDescriptor, std::shared_ptr&) {
isHandlerBCalled = true;
};
--
2.7.4
From 59c2f3cfa7bd83eabf305cec961335153a815084 Mon Sep 17 00:00:00 2001
From: Mateusz Malicki
Date: Tue, 2 Dec 2014 16:17:13 +0100
Subject: [PATCH 04/16] Test ability to copy union and to set union type
[Bug/Feature] Test ability to copy union and to set union type
[Cause] Need to copy and add new union elements
[Solution] 1) Clear vector with unions; 2) set, copy and move elements
[Verification] Build, install, tests
Change-Id: Iea2d7eca36edcfd34f768d1e99a3da970300afd8
---
tests/unit_tests/config/ut-configuration.cpp | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/tests/unit_tests/config/ut-configuration.cpp b/tests/unit_tests/config/ut-configuration.cpp
index 7f9703b..8aab028 100644
--- a/tests/unit_tests/config/ut-configuration.cpp
+++ b/tests/unit_tests/config/ut-configuration.cpp
@@ -382,9 +382,18 @@ BOOST_AUTO_TEST_CASE(ConfigUnion)
BOOST_CHECK_EQUAL(subConfig.intVal, 54321);
BOOST_CHECK(testConfig.unions[0].is