#include <poll.h>
+#include <cstdlib>
+#include <ctime>
#include <cerrno>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
-#include <unordered_map>
+#include <utility>
#include <string>
#include <sstream>
#include <vector>
#include <boost/test/test_tools.hpp>
#include <boost_macros_wrapper.h>
+#include <test_common.h>
+#include <dpl/log/log.h>
+
#include <client-common.h>
#include <socket-manager.h>
-
-#include <dpl/log/log.h>
+#include <message-buffer.h>
using namespace CKM;
namespace {
+size_t Random(size_t max)
+{
+ static unsigned int seed = ::time(nullptr);
+ return ::rand_r(&seed) % max;
+}
+
constexpr char SERVICE_SOCKET_TEST[] = "/tmp/.central-key-manager-test.sock";
constexpr CKM::InterfaceID SOCKET_ID_TEST = 42;
constexpr std::chrono::seconds CV_TIMEOUT(10);
-struct TestSocketManager : public SocketManager {
+struct TestSocketManager final : public SocketManager {
size_t TimeoutQueueSize() const { return m_timeoutQueue.size(); }
};
#define THREAD_REQUIRE(test) THREAD_REQUIRE_MESSAGE(test, "")
-class TestService : public GenericSocketService {
-public:
- ServiceDescriptionVector GetServiceDescription() override {
- return ServiceDescriptionVector {
- {SERVICE_SOCKET_TEST, "", SOCKET_ID_TEST}
- };
- }
- void Event(const AcceptEvent &e) override {
- std::unique_lock<std::mutex> lock(m_mutex);
-
- THREAD_REQUIRE_MESSAGE(m_connections.empty() || m_connections.back().client != -1,
- "Unexpected server entry waiting for client match " <<
- m_connections.back().server);
-
- m_connections.push_back({-1 , e.connectionID.sock});
-
- LogDebug("AcceptEvent. Added: ? <=>" << e.connectionID.sock);
-
- CompareSizes();
-
- lock.unlock();
- m_cv.notify_one();
- }
+class NoOpService : public GenericSocketService {
+ void Event(const AcceptEvent &) override {}
void Event(const WriteEvent &) override {}
void Event(const ReadEvent &) override {}
- void Event(const CloseEvent &e) override {
- std::unique_lock<std::mutex> lock(m_mutex);
- THREAD_REQUIRE(!m_connections.empty());
-
- auto serverMatch = [&](const SocketPair& pair){
- return pair.server == e.connectionID.sock;
- };
- auto it = std::find_if(m_connections.begin(), m_connections.end(), serverMatch);
-
- THREAD_REQUIRE_MESSAGE(it != m_connections.end(),
- "Can't find connection for server socket = " << e.connectionID.sock);
-
- LogDebug("CloseEvent. Removing: " << it->client << "<=>" << it->server);
- THREAD_REQUIRE(it->client != -1);
-
- m_connections.erase(it);
-
- CompareSizes();
-
- lock.unlock();
- m_cv.notify_one();
- }
+ void Event(const CloseEvent &) override {}
void Event(const SecurityEvent &) override {}
void Start() override {}
void Stop() override {}
+};
- void ConnectAndWait(SockRAII& client) {
- std::unique_lock<std::mutex> lock(m_mutex);
-
- THREAD_REQUIRE_MESSAGE(m_connections.empty() || m_connections.back().client != -1,
- "Unexpected server entry waiting for client match " <<
- m_connections.back().server);
-
- int ret = client.connect(GetServiceDescription()[0].serviceHandlerPath.c_str());
- BOOST_REQUIRE(ret == CKM_API_SUCCESS);
-
- LogDebug("Connected. Waiting for AcceptEvent for: " << client.get() << "<=> ?");
-
- BOOST_REQUIRE(m_cv.wait_for(lock, CV_TIMEOUT, [&]{ return AcceptEventArrived(); }));
-
- m_connections.back().client = client.get();
-
- LogDebug("Accepted. Matched client & server: " << m_connections.back().client << "<=>" <<
- m_connections.back().server);
- }
-
- void DisconnectAndWait(SockRAII& client) {
- int sock = client.get();
- client.disconnect();
-
- LogDebug("Disconnected. Waiting for CloseEvent for: " << sock << "<=> ?");
-
- std::unique_lock<std::mutex> lock(m_mutex);
- BOOST_REQUIRE(m_cv.wait_for(lock, CV_TIMEOUT, [&]{ return ClientAbsent(sock); }));
+class TestConnection final : public ServiceConnection {
+public:
+ explicit TestConnection(const std::string& socketPath) :
+ ServiceConnection(socketPath.c_str()),
+ m_id(m_counter++) {
+ auto ret = prepareConnection();
+ BOOST_REQUIRE_MESSAGE(ret == CKM_API_SUCCESS, "ret = " << ret);
}
- void WaitForRemainingClosures() {
- std::unique_lock<std::mutex> lock(m_mutex);
- if (!m_connections.empty())
- BOOST_TEST_MESSAGE("Waiting for remaining " << m_connections.size() << " to close.");
-
- BOOST_REQUIRE(m_cv.wait_for(lock, std::chrono::seconds(OVERRIDE_SOCKET_TIMEOUT + 2), [&]{
- return m_connections.empty();
- }));
-
- CompareSizes();
+ int Send(const CKM::RawBuffer &send_buf) {
+ m_sent += send_buf.size();
+ return ServiceConnection::send(SerializeMessage(send_buf));
}
-private:
- bool ClientAbsent(int client) const {
- auto it = std::find_if(m_connections.begin(),
- m_connections.end(), [&](const SocketPair& pair){
- return pair.client == client;
- });
- return it == m_connections.end();
- }
+ template <typename T>
+ void Receive(const T& logReceived) {
+ if (m_sent == 0) {
+ // expect timeout
+ auto ret = m_socket.waitForSocket(POLLIN, 100);
+ BOOST_REQUIRE_MESSAGE(ret == 0, "ret = " << ret);
+ logReceived(0);
+ return;
+ }
- bool AcceptEventArrived() const {
- return !m_connections.empty() && m_connections.back().client == -1;
+ int ret = ServiceConnection::receive(m_recv);
+ BOOST_REQUIRE_MESSAGE(ret == CKM_API_SUCCESS, "ret = " << ret);
+ BOOST_REQUIRE(m_recv.Ready());
+ while (m_recv.Ready()) {
+ RawBuffer tmp;
+ m_recv.Deserialize(tmp);
+ const auto size = tmp.size();
+ BOOST_REQUIRE_MESSAGE(size <= m_sent, size << ">" << m_sent);
+ logReceived(size);
+ m_sent -= size;
+ }
}
- void CompareSizes() const {
- auto manager = static_cast<TestSocketManager*>(m_serviceManager);
- THREAD_REQUIRE(m_connections.size() == manager->TimeoutQueueSize());
- }
+ size_t GetId() const { return m_id; }
- std::mutex m_mutex;
- struct SocketPair {
- int client;
- int server;
- };
- std::vector<SocketPair> m_connections;
- std::condition_variable m_cv;
+private:
+ size_t m_sent = 0;
+ size_t m_id;
+ MessageBuffer m_recv;
+ static inline size_t m_counter = 0;
};
-class SocketManagerLoop {
+class SocketManagerLoop final {
public:
explicit SocketManagerLoop(SocketManager& manager) :
m_manager(manager),
~SocketManagerLoop() {
m_manager.MainLoopStop();
m_thread.join();
+
BOOST_CHECK_MESSAGE(!m_exception, m_what);
}
std::thread m_thread;
};
+std::string Id2SockPath(int id) {
+ return std::string(SERVICE_SOCKET_TEST) + std::to_string(id);
+}
+
+void unlinkIfExists(const char* path) {
+ int ret = unlink(path);
+ int err = errno;
+ BOOST_REQUIRE(ret == 0 || (ret == -1 && err == ENOENT));
+}
+
} // namespace
BOOST_AUTO_TEST_SUITE(SOCKET_MANAGER_TEST)
constexpr unsigned REPEATS = 100000;
constexpr auto INTERVAL = REPEATS/10;
- int ret = unlink(SERVICE_SOCKET_TEST);
- int err = errno;
- BOOST_REQUIRE(ret == 0 || (ret == -1 && err == ENOENT));
+ class TestService final : public NoOpService {
+ public:
+ ServiceDescriptionVector GetServiceDescription() override {
+ return ServiceDescriptionVector {
+ {SERVICE_SOCKET_TEST, "", SOCKET_ID_TEST}
+ };
+ }
+ void Event(const AcceptEvent &e) override {
+ std::unique_lock<std::mutex> lock(m_mutex);
+
+ THREAD_REQUIRE_MESSAGE(m_connections.empty() || m_connections.back().client != -1,
+ "Unexpected server entry waiting for client match " <<
+ m_connections.back().server);
+
+ m_connections.push_back({-1 , e.connectionID.sock});
+
+ LogDebug("AcceptEvent. Added: ? <=>" << e.connectionID.sock);
+
+ CompareSizes();
+
+ lock.unlock();
+ m_cv.notify_one();
+ }
+ void Event(const CloseEvent &e) override {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ THREAD_REQUIRE(!m_connections.empty());
+
+ auto serverMatch = [&](const SocketPair& pair){
+ return pair.server == e.connectionID.sock;
+ };
+ auto it = std::find_if(m_connections.begin(), m_connections.end(), serverMatch);
+
+ THREAD_REQUIRE_MESSAGE(it != m_connections.end(),
+ "Can't find connection for server socket = " <<
+ e.connectionID.sock);
+
+ LogDebug("CloseEvent. Removing: " << it->client << "<=>" << it->server);
+ THREAD_REQUIRE(it->client != -1);
+
+ m_connections.erase(it);
+
+ CompareSizes();
+
+ lock.unlock();
+ m_cv.notify_one();
+ }
+
+ void ConnectAndWait(SockRAII& client) {
+ std::unique_lock<std::mutex> lock(m_mutex);
+
+ THREAD_REQUIRE_MESSAGE(m_connections.empty() || m_connections.back().client != -1,
+ "Unexpected server entry waiting for client match " <<
+ m_connections.back().server);
+
+ int ret = client.connect(GetServiceDescription()[0].serviceHandlerPath.c_str());
+ BOOST_REQUIRE(ret == CKM_API_SUCCESS);
+
+ LogDebug("Connected. Waiting for AcceptEvent for: " << client.get() << "<=> ?");
+
+ BOOST_REQUIRE(m_cv.wait_for(lock, CV_TIMEOUT, [&]{ return AcceptEventArrived(); }));
+
+ m_connections.back().client = client.get();
+
+ LogDebug("Accepted. Matched client & server: " << m_connections.back().client <<
+ "<=>" << m_connections.back().server);
+ }
+
+ void DisconnectAndWait(SockRAII& client) {
+ int sock = client.get();
+ client.disconnect();
+
+ LogDebug("Disconnected. Waiting for CloseEvent for: " << sock << "<=> ?");
+
+ std::unique_lock<std::mutex> lock(m_mutex);
+ BOOST_REQUIRE(m_cv.wait_for(lock, CV_TIMEOUT, [&]{ return ClientAbsent(sock); }));
+ }
+
+ void WaitForRemainingClosures() {
+ std::unique_lock<std::mutex> lock(m_mutex);
+ if (!m_connections.empty())
+ BOOST_TEST_MESSAGE("Waiting for remaining " << m_connections.size() <<
+ " to close.");
+
+ BOOST_REQUIRE(m_cv.wait_for(lock,
+ std::chrono::seconds(OVERRIDE_SOCKET_TIMEOUT + 2),
+ [&]{ return m_connections.empty(); }));
+
+ CompareSizes();
+ }
+
+ private:
+ bool ClientAbsent(int client) const {
+ auto it = std::find_if(m_connections.begin(),
+ m_connections.end(), [&](const SocketPair& pair){
+ return pair.client == client;
+ });
+ return it == m_connections.end();
+ }
+
+ bool AcceptEventArrived() const {
+ return !m_connections.empty() && m_connections.back().client == -1;
+ }
+
+ void CompareSizes() const {
+ auto manager = static_cast<TestSocketManager*>(m_serviceManager);
+ THREAD_REQUIRE(m_connections.size() == manager->TimeoutQueueSize());
+ }
+
+ std::mutex m_mutex;
+ struct SocketPair {
+ int client;
+ int server;
+ };
+ std::vector<SocketPair> m_connections;
+ std::condition_variable m_cv;
+ };
+
+ unlinkIfExists(SERVICE_SOCKET_TEST);
TestSocketManager manager;
auto service = new TestService();
}
}
+POSITIVE_TEST_CASE(StressTestRandomSocketEvents)
+{
+ // Too many services or connections may trigger server side timeouts (OVERRIDE_SOCKET_TIMEOUT)
+ constexpr int SERVICES = 4;
+ constexpr int INTERVAL = 1000;
+ constexpr int REPEATS = 10000;
+ constexpr int MAX_CONNECTIONS = 4;
+ // client and server read 2048B and 4096B at once respectively
+ constexpr size_t MAX_BUF_SIZE = 5000;
+
+ enum Event {
+ CONNECT,
+ DISCONNECT,
+ SEND,
+ RECEIVE,
+
+ CNT
+ };
+
+ class TestService final : public NoOpService {
+ public:
+ explicit TestService(int id) :
+ m_desc({{Id2SockPath(id).c_str(), "", SOCKET_ID_TEST + id}}),
+ m_id(id) {
+
+ unlinkIfExists(GetSocketPath().c_str());
+ }
+
+ ServiceDescriptionVector GetServiceDescription() override { return m_desc; }
+
+ void Event(const ReadEvent &e) override {
+ LogDebug(e.connectionID.sock << ":" << e.connectionID.counter << " Received " <<
+ e.rawBuffer.size() << "B");
+ m_serviceManager->Write(e.connectionID, e.rawBuffer);
+ }
+
+ size_t GetConnectionCount() const { return m_connections.size(); }
+
+ void AddConnection() {
+ m_connections.emplace_back(new TestConnection(GetSocketPath()));
+ LogDebug(Prefix(m_connections.back()->GetId()) << "Connected");
+ }
+
+ void Disconnect(size_t idx) {
+ auto it = m_connections.begin() + idx;
+ auto cid = (*it)->GetId();
+ if (idx != m_connections.size() - 1)
+ *it = std::move(m_connections.back());
+ m_connections.pop_back();
+ LogDebug(Prefix(cid) << "Disconnected");
+ }
+
+ void Send(size_t idx) {
+ auto buffer = createRandom(Random(MAX_BUF_SIZE) + 1);
+ auto& conn = m_connections.at(idx);
+ auto ret = conn->Send(buffer);
+ BOOST_REQUIRE_MESSAGE(ret == CKM_API_SUCCESS, "ret = " << ret);
+ LogDebug(Prefix(conn->GetId())<< "Sent " << buffer.size() << "B");
+ }
+
+ void Receive(size_t idx) {
+ auto& conn = m_connections.at(idx);
+ conn->Receive([&](const size_t received) {
+ LogDebug(Prefix(conn->GetId()) << "Received " << received << "B");
+ });
+ }
+
+ private:
+ const std::string& GetSocketPath() const {
+ return m_desc.at(0).serviceHandlerPath;
+ }
+
+ std::string Prefix(size_t idx) const {
+ return std::string(" ") + std::to_string(m_id) + ":" + std::to_string(idx) + " ";
+ }
+
+ ServiceDescriptionVector m_desc;
+ int m_id;
+ std::vector<std::unique_ptr<TestConnection>> m_connections;
+ };
+
+ SocketManager manager;
+ TestService* services[SERVICES];
+
+ for (int i = 0;i<SERVICES;i++) {
+ services[i] = new TestService(i);
+ manager.RegisterSocketService(services[i]);
+ }
+
+ SocketManagerLoop loop(manager);
+
+ for (unsigned i = 0;i < REPEATS; i++) {
+ // random service
+ auto service = services[Random(SERVICES)];
+
+ // always connect if there are no active connections
+ auto eIdx = CONNECT;
+ auto cIdx = 0;
+ size_t cCnt = service->GetConnectionCount();
+ if (cCnt > 0) {
+ cIdx = Random(cCnt);
+ eIdx = static_cast<Event>(Random(CNT));
+ if (eIdx == CONNECT && cCnt == MAX_CONNECTIONS)
+ eIdx = SEND; // don't connect if there are too many
+ }
+
+ switch (eIdx) {
+ case CONNECT:
+ service->AddConnection();
+ break;
+
+ case DISCONNECT:
+ service->Disconnect(cIdx);
+ break;
+
+ case SEND:
+ service->Send(cIdx);
+ break;
+
+ case RECEIVE:
+ service->Receive(cIdx);
+ break;
+
+ default:
+ BOOST_FAIL("Unexpected event");
+ }
+
+ if ((i + 1) % INTERVAL == 0)
+ BOOST_TEST_MESSAGE("Executing random socket actions: " << i + 1 << "/" << REPEATS);
+ }
+}
+
BOOST_AUTO_TEST_SUITE_END()