Add randomized socket manager stress test 35/251535/10
authorKrzysztof Jackiewicz <k.jackiewicz@samsung.com>
Thu, 14 Jan 2021 20:35:46 +0000 (21:35 +0100)
committerKrzysztof Jackiewicz <k.jackiewicz@samsung.com>
Fri, 29 Jan 2021 11:52:19 +0000 (12:52 +0100)
Registers several test services in the manager.
In a loop:
* Selects a random service
* Selects a random action:
  * Create a new connection
  * Disconnect a random existing connection
  * Send random data through a random connection
  * Try to receive data from a random connection

Change-Id: Id208e3a6ffbd1db82cd3389ba72bd0ff998d7c61

unit-tests/test_socket-manager.cpp

index b55a31d..f869595 100644 (file)
 
 #include <poll.h>
 
+#include <cstdlib>
+#include <ctime>
 #include <cerrno>
 
 #include <thread>
 #include <mutex>
 #include <condition_variable>
 #include <chrono>
-#include <unordered_map>
+#include <utility>
 #include <string>
 #include <sstream>
 #include <vector>
 #include <boost/test/test_tools.hpp>
 #include <boost_macros_wrapper.h>
 
+#include <test_common.h>
+#include <dpl/log/log.h>
+
 #include <client-common.h>
 #include <socket-manager.h>
-
-#include <dpl/log/log.h>
+#include <message-buffer.h>
 
 using namespace CKM;
 
 namespace {
 
+size_t Random(size_t max)
+{
+       static unsigned int seed = ::time(nullptr);
+       return ::rand_r(&seed) % max;
+}
+
 constexpr char SERVICE_SOCKET_TEST[] = "/tmp/.central-key-manager-test.sock";
 constexpr CKM::InterfaceID SOCKET_ID_TEST = 42;
 constexpr std::chrono::seconds CV_TIMEOUT(10);
 
-struct TestSocketManager : public SocketManager {
+struct TestSocketManager final : public SocketManager {
        size_t TimeoutQueueSize() const { return m_timeoutQueue.size(); }
 };
 
@@ -58,128 +68,64 @@ struct TestSocketManager : public SocketManager {
 
 #define THREAD_REQUIRE(test) THREAD_REQUIRE_MESSAGE(test, "")
 
-class TestService : public GenericSocketService {
-public:
-       ServiceDescriptionVector GetServiceDescription() override {
-               return ServiceDescriptionVector {
-                       {SERVICE_SOCKET_TEST, "", SOCKET_ID_TEST}
-               };
-       }
-       void Event(const AcceptEvent &e) override {
-               std::unique_lock<std::mutex> lock(m_mutex);
-
-               THREAD_REQUIRE_MESSAGE(m_connections.empty() || m_connections.back().client != -1,
-                                      "Unexpected server entry waiting for client match " <<
-                                      m_connections.back().server);
-
-               m_connections.push_back({-1 , e.connectionID.sock});
-
-               LogDebug("AcceptEvent. Added: ? <=>" << e.connectionID.sock);
-
-               CompareSizes();
-
-               lock.unlock();
-               m_cv.notify_one();
-       }
+class NoOpService : public GenericSocketService {
+       void Event(const AcceptEvent &) override {}
        void Event(const WriteEvent &) override {}
        void Event(const ReadEvent &) override {}
-       void Event(const CloseEvent &e) override {
-               std::unique_lock<std::mutex> lock(m_mutex);
-               THREAD_REQUIRE(!m_connections.empty());
-
-               auto serverMatch = [&](const SocketPair& pair){
-                       return pair.server == e.connectionID.sock;
-               };
-               auto it = std::find_if(m_connections.begin(), m_connections.end(), serverMatch);
-
-               THREAD_REQUIRE_MESSAGE(it != m_connections.end(),
-                                      "Can't find connection for server socket = " << e.connectionID.sock);
-
-               LogDebug("CloseEvent. Removing: " << it->client << "<=>" << it->server);
-               THREAD_REQUIRE(it->client != -1);
-
-               m_connections.erase(it);
-
-               CompareSizes();
-
-               lock.unlock();
-               m_cv.notify_one();
-       }
+       void Event(const CloseEvent &) override {}
        void Event(const SecurityEvent &) override {}
 
        void Start() override {}
        void Stop() override {}
+};
 
-       void ConnectAndWait(SockRAII& client) {
-               std::unique_lock<std::mutex> lock(m_mutex);
-
-               THREAD_REQUIRE_MESSAGE(m_connections.empty() || m_connections.back().client != -1,
-                                      "Unexpected server entry waiting for client match " <<
-                                      m_connections.back().server);
-
-               int ret = client.connect(GetServiceDescription()[0].serviceHandlerPath.c_str());
-               BOOST_REQUIRE(ret == CKM_API_SUCCESS);
-
-               LogDebug("Connected. Waiting for AcceptEvent for: " << client.get() << "<=> ?");
-
-               BOOST_REQUIRE(m_cv.wait_for(lock, CV_TIMEOUT, [&]{ return AcceptEventArrived(); }));
-
-               m_connections.back().client = client.get();
-
-               LogDebug("Accepted. Matched client & server: " << m_connections.back().client << "<=>" <<
-                        m_connections.back().server);
-       }
-
-       void DisconnectAndWait(SockRAII& client) {
-               int sock = client.get();
-               client.disconnect();
-
-               LogDebug("Disconnected. Waiting for CloseEvent for: " << sock << "<=> ?");
-
-               std::unique_lock<std::mutex> lock(m_mutex);
-               BOOST_REQUIRE(m_cv.wait_for(lock, CV_TIMEOUT, [&]{ return ClientAbsent(sock); }));
+class TestConnection final : public ServiceConnection {
+public:
+       explicit TestConnection(const std::string& socketPath) :
+               ServiceConnection(socketPath.c_str()),
+               m_id(m_counter++) {
+               auto ret = prepareConnection();
+               BOOST_REQUIRE_MESSAGE(ret == CKM_API_SUCCESS, "ret = " << ret);
        }
 
-       void WaitForRemainingClosures() {
-               std::unique_lock<std::mutex> lock(m_mutex);
-               if (!m_connections.empty())
-                       BOOST_TEST_MESSAGE("Waiting for remaining " << m_connections.size() << " to close.");
-
-               BOOST_REQUIRE(m_cv.wait_for(lock, std::chrono::seconds(OVERRIDE_SOCKET_TIMEOUT + 2), [&]{
-                       return m_connections.empty();
-               }));
-
-               CompareSizes();
+       int Send(const CKM::RawBuffer &send_buf) {
+               m_sent += send_buf.size();
+               return ServiceConnection::send(SerializeMessage(send_buf));
        }
 
-private:
-       bool ClientAbsent(int client) const {
-               auto it = std::find_if(m_connections.begin(),
-                                      m_connections.end(), [&](const SocketPair& pair){
-                       return pair.client == client;
-               });
-               return it == m_connections.end();
-       }
+       template <typename T>
+       void Receive(const T& logReceived) {
+               if (m_sent == 0) {
+                       // expect timeout
+                       auto ret = m_socket.waitForSocket(POLLIN, 100);
+                       BOOST_REQUIRE_MESSAGE(ret == 0, "ret = " << ret);
+                       logReceived(0);
+                       return;
+               }
 
-       bool AcceptEventArrived() const {
-               return !m_connections.empty() && m_connections.back().client == -1;
+               int ret = ServiceConnection::receive(m_recv);
+               BOOST_REQUIRE_MESSAGE(ret == CKM_API_SUCCESS, "ret = " << ret);
+               BOOST_REQUIRE(m_recv.Ready());
+               while (m_recv.Ready()) {
+                       RawBuffer tmp;
+                       m_recv.Deserialize(tmp);
+                       const auto size = tmp.size();
+                       BOOST_REQUIRE_MESSAGE(size <= m_sent, size << ">" << m_sent);
+                       logReceived(size);
+                       m_sent -= size;
+               }
        }
 
-       void CompareSizes() const {
-               auto manager = static_cast<TestSocketManager*>(m_serviceManager);
-               THREAD_REQUIRE(m_connections.size() == manager->TimeoutQueueSize());
-       }
+       size_t GetId() const { return m_id; }
 
-       std::mutex m_mutex;
-       struct SocketPair {
-               int client;
-               int server;
-       };
-       std::vector<SocketPair> m_connections;
-       std::condition_variable m_cv;
+private:
+       size_t m_sent = 0;
+       size_t m_id;
+       MessageBuffer m_recv;
+       static inline size_t m_counter = 0;
 };
 
-class SocketManagerLoop {
+class SocketManagerLoop final {
 public:
        explicit SocketManagerLoop(SocketManager& manager) :
                m_manager(manager),
@@ -199,6 +145,7 @@ public:
        ~SocketManagerLoop() {
                m_manager.MainLoopStop();
                m_thread.join();
+
                BOOST_CHECK_MESSAGE(!m_exception, m_what);
        }
 
@@ -209,6 +156,16 @@ private:
        std::thread m_thread;
 };
 
+std::string Id2SockPath(int id) {
+       return std::string(SERVICE_SOCKET_TEST) + std::to_string(id);
+}
+
+void unlinkIfExists(const char* path) {
+       int ret = unlink(path);
+       int err = errno;
+       BOOST_REQUIRE(ret == 0 || (ret == -1 && err == ENOENT));
+}
+
 } // namespace
 
 BOOST_AUTO_TEST_SUITE(SOCKET_MANAGER_TEST)
@@ -219,9 +176,124 @@ POSITIVE_TEST_CASE(StressTestGrowingTimeoutQueue)
        constexpr unsigned REPEATS = 100000;
        constexpr auto INTERVAL = REPEATS/10;
 
-       int ret = unlink(SERVICE_SOCKET_TEST);
-       int err = errno;
-       BOOST_REQUIRE(ret == 0 || (ret == -1 && err == ENOENT));
+       class TestService final : public NoOpService {
+       public:
+               ServiceDescriptionVector GetServiceDescription() override {
+                       return ServiceDescriptionVector {
+                               {SERVICE_SOCKET_TEST, "", SOCKET_ID_TEST}
+                       };
+               }
+               void Event(const AcceptEvent &e) override {
+                       std::unique_lock<std::mutex> lock(m_mutex);
+
+                       THREAD_REQUIRE_MESSAGE(m_connections.empty() || m_connections.back().client != -1,
+                                              "Unexpected server entry waiting for client match " <<
+                                              m_connections.back().server);
+
+                       m_connections.push_back({-1 , e.connectionID.sock});
+
+                       LogDebug("AcceptEvent. Added: ? <=>" << e.connectionID.sock);
+
+                       CompareSizes();
+
+                       lock.unlock();
+                       m_cv.notify_one();
+               }
+               void Event(const CloseEvent &e) override {
+                       std::unique_lock<std::mutex> lock(m_mutex);
+                       THREAD_REQUIRE(!m_connections.empty());
+
+                       auto serverMatch = [&](const SocketPair& pair){
+                               return pair.server == e.connectionID.sock;
+                       };
+                       auto it = std::find_if(m_connections.begin(), m_connections.end(), serverMatch);
+
+                       THREAD_REQUIRE_MESSAGE(it != m_connections.end(),
+                                              "Can't find connection for server socket = " <<
+                                              e.connectionID.sock);
+
+                       LogDebug("CloseEvent. Removing: " << it->client << "<=>" << it->server);
+                       THREAD_REQUIRE(it->client != -1);
+
+                       m_connections.erase(it);
+
+                       CompareSizes();
+
+                       lock.unlock();
+                       m_cv.notify_one();
+               }
+
+               void ConnectAndWait(SockRAII& client) {
+                       std::unique_lock<std::mutex> lock(m_mutex);
+
+                       THREAD_REQUIRE_MESSAGE(m_connections.empty() || m_connections.back().client != -1,
+                                              "Unexpected server entry waiting for client match " <<
+                                              m_connections.back().server);
+
+                       int ret = client.connect(GetServiceDescription()[0].serviceHandlerPath.c_str());
+                       BOOST_REQUIRE(ret == CKM_API_SUCCESS);
+
+                       LogDebug("Connected. Waiting for AcceptEvent for: " << client.get() << "<=> ?");
+
+                       BOOST_REQUIRE(m_cv.wait_for(lock, CV_TIMEOUT, [&]{ return AcceptEventArrived(); }));
+
+                       m_connections.back().client = client.get();
+
+                       LogDebug("Accepted. Matched client & server: " << m_connections.back().client <<
+                                "<=>" << m_connections.back().server);
+               }
+
+               void DisconnectAndWait(SockRAII& client) {
+                       int sock = client.get();
+                       client.disconnect();
+
+                       LogDebug("Disconnected. Waiting for CloseEvent for: " << sock << "<=> ?");
+
+                       std::unique_lock<std::mutex> lock(m_mutex);
+                       BOOST_REQUIRE(m_cv.wait_for(lock, CV_TIMEOUT, [&]{ return ClientAbsent(sock); }));
+               }
+
+               void WaitForRemainingClosures() {
+                       std::unique_lock<std::mutex> lock(m_mutex);
+                       if (!m_connections.empty())
+                               BOOST_TEST_MESSAGE("Waiting for remaining " << m_connections.size() <<
+                                                  " to close.");
+
+                       BOOST_REQUIRE(m_cv.wait_for(lock,
+                                                   std::chrono::seconds(OVERRIDE_SOCKET_TIMEOUT + 2),
+                                                   [&]{ return m_connections.empty(); }));
+
+                       CompareSizes();
+               }
+
+       private:
+               bool ClientAbsent(int client) const {
+                       auto it = std::find_if(m_connections.begin(),
+                                              m_connections.end(), [&](const SocketPair& pair){
+                               return pair.client == client;
+                       });
+                       return it == m_connections.end();
+               }
+
+               bool AcceptEventArrived() const {
+                       return !m_connections.empty() && m_connections.back().client == -1;
+               }
+
+               void CompareSizes() const {
+                       auto manager = static_cast<TestSocketManager*>(m_serviceManager);
+                       THREAD_REQUIRE(m_connections.size() == manager->TimeoutQueueSize());
+               }
+
+               std::mutex m_mutex;
+               struct SocketPair {
+                       int client;
+                       int server;
+               };
+               std::vector<SocketPair> m_connections;
+               std::condition_variable m_cv;
+       };
+
+       unlinkIfExists(SERVICE_SOCKET_TEST);
 
        TestSocketManager manager;
        auto service = new TestService();
@@ -250,4 +322,136 @@ POSITIVE_TEST_CASE(StressTestGrowingTimeoutQueue)
        }
 }
 
+POSITIVE_TEST_CASE(StressTestRandomSocketEvents)
+{
+       // Too many services or connections may trigger server side timeouts (OVERRIDE_SOCKET_TIMEOUT)
+       constexpr int SERVICES = 4;
+       constexpr int INTERVAL = 1000;
+       constexpr int REPEATS = 10000;
+       constexpr int MAX_CONNECTIONS = 4;
+       // client and server read 2048B and 4096B at once respectively
+       constexpr size_t MAX_BUF_SIZE = 5000;
+
+       enum Event {
+               CONNECT,
+               DISCONNECT,
+               SEND,
+               RECEIVE,
+
+               CNT
+       };
+
+       class TestService final : public NoOpService {
+       public:
+               explicit TestService(int id) :
+                       m_desc({{Id2SockPath(id).c_str(), "", SOCKET_ID_TEST + id}}),
+                       m_id(id) {
+
+                       unlinkIfExists(GetSocketPath().c_str());
+               }
+
+               ServiceDescriptionVector GetServiceDescription() override { return m_desc; }
+
+               void Event(const ReadEvent &e) override {
+                       LogDebug(e.connectionID.sock << ":" << e.connectionID.counter << " Received " <<
+                                e.rawBuffer.size() << "B");
+                       m_serviceManager->Write(e.connectionID, e.rawBuffer);
+               }
+
+               size_t GetConnectionCount() const { return m_connections.size(); }
+
+               void AddConnection() {
+                       m_connections.emplace_back(new TestConnection(GetSocketPath()));
+                       LogDebug(Prefix(m_connections.back()->GetId()) << "Connected");
+               }
+
+               void Disconnect(size_t idx) {
+                       auto it = m_connections.begin() + idx;
+                       auto cid = (*it)->GetId();
+                       if (idx != m_connections.size() - 1)
+                               *it = std::move(m_connections.back());
+                       m_connections.pop_back();
+                       LogDebug(Prefix(cid) << "Disconnected");
+               }
+
+               void Send(size_t idx) {
+                       auto buffer = createRandom(Random(MAX_BUF_SIZE) + 1);
+                       auto& conn = m_connections.at(idx);
+                       auto ret = conn->Send(buffer);
+                       BOOST_REQUIRE_MESSAGE(ret == CKM_API_SUCCESS, "ret = " << ret);
+                       LogDebug(Prefix(conn->GetId())<< "Sent " << buffer.size() << "B");
+               }
+
+               void Receive(size_t idx) {
+                       auto& conn = m_connections.at(idx);
+                       conn->Receive([&](const size_t received) {
+                               LogDebug(Prefix(conn->GetId()) << "Received " << received << "B");
+                       });
+               }
+
+       private:
+               const std::string& GetSocketPath() const {
+                       return m_desc.at(0).serviceHandlerPath;
+               }
+
+               std::string Prefix(size_t idx) const {
+                       return std::string(" ") + std::to_string(m_id) + ":" + std::to_string(idx) + " ";
+               }
+
+               ServiceDescriptionVector m_desc;
+               int m_id;
+               std::vector<std::unique_ptr<TestConnection>> m_connections;
+       };
+
+       SocketManager manager;
+       TestService* services[SERVICES];
+
+       for (int i = 0;i<SERVICES;i++) {
+               services[i] = new TestService(i);
+               manager.RegisterSocketService(services[i]);
+       }
+
+       SocketManagerLoop loop(manager);
+
+       for (unsigned i = 0;i < REPEATS; i++) {
+               // random service
+               auto service = services[Random(SERVICES)];
+
+               // always connect if there are no active connections
+               auto eIdx = CONNECT;
+               auto cIdx = 0;
+               size_t cCnt = service->GetConnectionCount();
+               if (cCnt > 0) {
+                       cIdx = Random(cCnt);
+                       eIdx = static_cast<Event>(Random(CNT));
+                       if (eIdx == CONNECT && cCnt == MAX_CONNECTIONS)
+                               eIdx = SEND; // don't connect if there are too many
+               }
+
+               switch (eIdx) {
+               case CONNECT:
+                       service->AddConnection();
+                       break;
+
+               case DISCONNECT:
+                       service->Disconnect(cIdx);
+                       break;
+
+               case SEND:
+                       service->Send(cIdx);
+                       break;
+
+               case RECEIVE:
+                       service->Receive(cIdx);
+                       break;
+
+               default:
+                       BOOST_FAIL("Unexpected event");
+               }
+
+               if ((i + 1) % INTERVAL == 0)
+                       BOOST_TEST_MESSAGE("Executing random socket actions: " << i + 1 << "/" << REPEATS);
+       }
+}
+
 BOOST_AUTO_TEST_SUITE_END()