Switch from select() to epoll 57/315457/1
authorKrzysztof Malysa <k.malysa@samsung.com>
Wed, 30 Oct 2024 19:10:28 +0000 (20:10 +0100)
committerKrzysztof Malysa <k.malysa@samsung.com>
Mon, 2 Dec 2024 06:12:38 +0000 (07:12 +0100)
This removes the limit of max 1024 listening fds.
And fixes some bogous state transitions in SocketManager that could
result in using closed file descriptors, reusing wrong file descriptors
and daemon crash.

Change-Id: I68d7a62b0a2f7114e0a0a9b3108df10291352a77

24 files changed:
src/common/request/AdminCheckBatchRequest.h
src/common/request/AdminCheckRequest.h
src/common/request/AgentActionRequest.h
src/common/request/AgentRegisterRequest.h
src/common/request/CancelRequest.h
src/common/request/CheckRequest.h
src/common/request/DescriptionListRequest.h
src/common/request/EraseRequest.h
src/common/request/InsertOrUpdateBucketRequest.h
src/common/request/ListRequest.h
src/common/request/MonitorEntriesPutRequest.h
src/common/request/MonitorEntryPutRequest.h
src/common/request/MonitorGetEntriesRequest.h
src/common/request/MonitorGetFlushRequest.h
src/common/request/RemoveBucketRequest.h
src/common/request/Request.h
src/common/request/SetPoliciesRequest.h
src/common/request/SignalRequest.h
src/common/request/SimpleCheckRequest.h
src/common/utils/CallInDestructor.h [new file with mode: 0644]
src/service/logic/Logic.cpp
src/service/sockets/Epoll.h [new file with mode: 0644]
src/service/sockets/SocketManager.cpp
src/service/sockets/SocketManager.h

index e07b102e058fbde5671936276994ab34175bc74e..c4e7c8822398e5095734b2e11baaf62c24d3fa93 100644 (file)
@@ -46,6 +46,8 @@ public:
 
     virtual ~AdminCheckBatchRequest() = default;
 
+    const char* name() const noexcept override { return "AdminCheckBatchRequest"; }
+
     const std::vector<AdminBatchCheck>& checks() const noexcept {
         return m_checks;
     }
index 04150acaeac3fe76c0cdad1c8b29cbbed9fa4af1..47ee2387955d99f84e3c0dcc358c27bdf579b5b2 100644 (file)
@@ -48,7 +48,9 @@ public:
         Request(sequenceNumber), m_key(key), m_startBucket(startBucket), m_recursive(recursive) {
     }
 
-    virtual ~AdminCheckRequest() {};
+    virtual ~AdminCheckRequest() = default;
+
+    const char* name() const noexcept override { return "AdminCheckRequest"; }
 
     const PolicyKey &key(void) const {
         return m_key;
index 030349fc88b59979bbc60b94b2e7c3f29215aabf..f6fbcc9b5b474d925dfb2693edb3442dbfda3c6f 100644 (file)
@@ -43,7 +43,9 @@ public:
                        m_type(type), m_data(data) {
     }
 
-    virtual ~AgentActionRequest() {};
+    virtual ~AgentActionRequest() = default;
+
+    const char* name() const noexcept override { return "AgentActionRequest"; }
 
     const RawBuffer &data(void) const {
         return m_data;
index 150efae9adb9fa326ed9330308db49fe67c6a221..b71beaefa1b25356b7c5b41bde4020cec5046f32 100644 (file)
@@ -41,7 +41,9 @@ public:
         Request(sequenceNumber), m_agentType(agentType) {
     }
 
-    virtual ~AgentRegisterRequest() {};
+    virtual ~AgentRegisterRequest() = default;
+
+    const char* name() const noexcept override { return "AgentRegisterRequest"; }
 
     const AgentType &agentType(void) const {
         return m_agentType;
index 0014d1a945f9859b6b453e3802c48f0637c8cf56..bc32553a4f34c39f53f91b18824bcaeb54c0f6c1 100644 (file)
@@ -38,7 +38,9 @@ public:
     CancelRequest(ProtocolFrameSequenceNumber sequenceNumber) : Request(sequenceNumber) {
     }
 
-    virtual ~CancelRequest() {};
+    virtual ~CancelRequest() = default;
+
+    const char* name() const noexcept override { return "CancelRequest"; }
 
     virtual void execute(RequestTaker &taker, const RequestContext &context) const;
 
index faeb49c29034c355f600ce0c5f8bfc5145ccefeb..b9a66eceecd6bb324fcb879f1af2b84b08224e45 100644 (file)
@@ -44,7 +44,9 @@ public:
         Request(sequenceNumber), m_key(key) {
     }
 
-    virtual ~CheckRequest() {};
+    virtual ~CheckRequest() = default;
+
+    const char* name() const noexcept override { return "CheckRequest"; }
 
     const PolicyKey &key(void) const {
         return m_key;
index c6b258ff3ee5a60e099bb3f0c59b691fafe283c0..1ab765edbfd74cd3a61a3c26d4eed7f111a1b88f 100644 (file)
@@ -39,7 +39,9 @@ public:
         Request(sequenceNumber) {
     }
 
-    virtual ~DescriptionListRequest() {};
+    virtual ~DescriptionListRequest() = default;
+
+    const char* name() const noexcept override { return "DescriptionListRequest"; }
 
     virtual void execute(RequestTaker &taker, const RequestContext &context) const;
 
index a188c26a5c0566a3a7a7dcb4c2cb9b735cae3dfa..27583acc6a38fab04f4971b0dd6e0f48e2d2d65e 100644 (file)
@@ -44,7 +44,9 @@ public:
         m_filter(filter) {
     }
 
-    virtual ~EraseRequest() {};
+    virtual ~EraseRequest() = default;
+
+    const char* name() const noexcept override { return "EraseRequest"; }
 
     const PolicyBucketId &startBucket(void) const {
         return m_startBucket;
index 8dd16d0da04aa517675cce07db572264ced56eb9..8369bd501109bd720e018d29606945837a4e2ce3 100644 (file)
@@ -47,7 +47,9 @@ public:
         Request(sequenceNumber), m_bucketId(bucketId), m_result(result) {
     }
 
-    virtual ~InsertOrUpdateBucketRequest() {};
+    virtual ~InsertOrUpdateBucketRequest() = default;
+
+    const char* name() const noexcept override { return "InsertOrUpdateBucketRequest"; }
 
     const PolicyBucketId &bucketId(void) const {
         return m_bucketId;
index ad8e263c8f1bff92120b12203c17e244dc1cc59e..880ad517538d383f9e5be171a708349ae1306997 100644 (file)
@@ -47,7 +47,9 @@ public:
         Request(sequenceNumber), m_bucket(bucket), m_filter(filter) {
     }
 
-    virtual ~ListRequest() {};
+    virtual ~ListRequest() = default;
+
+    const char* name() const noexcept override { return "ListRequest"; }
 
     const PolicyBucketId &bucket(void) const {
         return m_bucket;
index afc902ff0849552fe24566f5739991f401a43705..5f202fea1051f139d4dc261250da4d1ec4688acf 100644 (file)
@@ -46,6 +46,8 @@ public:
 
     virtual ~MonitorEntriesPutRequest() = default;
 
+    const char* name() const noexcept override { return "MonitorEntriesPutRequest"; }
+
     virtual void execute(RequestTaker &taker, const RequestContext &context) const;
 
     bool canBeExecutedReadOnly(const ReadOnlyRequestTaker& taker) const override;
index eb567b4a7628d4d938f9f74bf5bf018259caa2fe..649c28a67efdd95438524982944c1816dbbbd9ac 100644 (file)
@@ -44,6 +44,8 @@ public:
 
     virtual ~MonitorEntryPutRequest() = default;
 
+    const char* name() const noexcept override { return "MonitorEntryPutRequest"; }
+
     virtual void execute(RequestTaker &taker, const RequestContext &context) const;
 
     bool canBeExecutedReadOnly(const ReadOnlyRequestTaker& taker) const override;
index 7eb9b3e121f558d23c7e85c2124dfe8b5b72997b..05fb141b8fda084ba750656fe5f83efb6fa02da7 100644 (file)
@@ -40,6 +40,8 @@ public:
           m_bufferSize(bufferSize)
     {}
 
+    const char* name() const noexcept override { return "MonitorGetEntriesRequest"; }
+
     size_t bufferSize(void) const {
         return m_bufferSize;
     }
index eeaa09d90bb18721d836374e69121d1abdd05807..3d1b9d2741256e118fb39ffdd1fa4d0eed3c84f2 100644 (file)
@@ -39,7 +39,9 @@ public:
         : Request(sequenceNumber)
     {}
 
-    virtual ~MonitorGetFlushRequest() {};
+    virtual ~MonitorGetFlushRequest() = default;
+
+    const char* name() const noexcept override { return "MonitorGetFlushRequest"; }
 
     virtual void execute(RequestTaker &taker, const RequestContext &context) const;
 
index ebf3c3cd4be938486fd761ce57b89ea306af985b..7df1af0e15e07f831b20fe93495769223fe3e241 100644 (file)
@@ -44,7 +44,9 @@ public:
         : Request(sequenceNumber), m_bucketId(bucketId) {
     }
 
-    virtual ~RemoveBucketRequest() {};
+    virtual ~RemoveBucketRequest() = default;
+
+    const char* name() const noexcept override { return "RemoveBucketRequest"; }
 
     const PolicyBucketId &bucketId(void) const {
         return m_bucketId;
index 71ca16a5740a6bb7d63d6962caee421c440c8457..fab4a84b339ccd3213af5281642d14c5a607a442 100644 (file)
@@ -41,6 +41,8 @@ public:
     }
     virtual ~Request() {};
 
+    virtual const char* name() const noexcept = 0;
+
     virtual void execute(RequestTaker &taker, const RequestContext &context) const = 0;
 
     virtual bool canBeExecutedReadOnly(const ReadOnlyRequestTaker& taker) const = 0;
index d93b12378836b1e403515746d9eafd220da06d63..528fa63c08d6d44d7615acb7d981a7ce69e80125 100644 (file)
@@ -53,7 +53,9 @@ public:
         m_removePolicies(removePolicies) {
     }
 
-    virtual ~SetPoliciesRequest() {};
+    virtual ~SetPoliciesRequest() = default;
+
+    const char* name() const noexcept override { return "SetPoliciesRequest"; }
 
     const std::map<PolicyBucketId, std::vector<Policy>> &policiesToBeInsertedOrUpdated(void) const {
         return m_insertOrUpdatePolicies;
index 2c89e97f4a149ae5716c9f956f02477f2ecd0cdb..25ae08c60f86d5d4b65bea378da0536e24547456 100644 (file)
@@ -43,7 +43,9 @@ public:
     SignalRequest(struct signalfd_siginfo sigInfo) : Request(0), m_sigInfo(sigInfo) {
     }
 
-    virtual ~SignalRequest() {};
+    virtual ~SignalRequest() = default;
+
+    const char* name() const noexcept override { return "SignalRequest"; }
 
     virtual void execute(RequestTaker &taker, const RequestContext &context) const;
 
@@ -53,7 +55,7 @@ public:
         return m_sigInfo.ssi_signo;
     }
 
-    struct signalfd_siginfo &sigInfo(void) {
+    const struct signalfd_siginfo &sigInfo() const {
         return m_sigInfo;
     }
 };
index a8f2860571a63814bddd63978f5ede52ccadce2d..86b412cf0491b34821d8196c8aba9795ed956658 100644 (file)
@@ -43,7 +43,9 @@ public:
         Request(sequenceNumber), m_key(key) {
     }
 
-    virtual ~SimpleCheckRequest() {};
+    virtual ~SimpleCheckRequest() = default;
+
+    const char* name() const noexcept override { return "SimpleCheckRequest"; }
 
     const PolicyKey &key(void) const {
         return m_key;
diff --git a/src/common/utils/CallInDestructor.h b/src/common/utils/CallInDestructor.h
new file mode 100644 (file)
index 0000000..5d8f8b3
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd. All rights reserved.
+ *
+ * This file is licensed under the terms of MIT License or the Apache License
+ * Version 2.0 of your choice. See the LICENSE.MIT file for MIT license details.
+ * See the LICENSE file or the notice below for Apache License Version 2.0
+ * details.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * @file        src/common/utils/CallInDestructor.h
+ * @author      Krzysztof MaÅ‚ysa <k.malysa@samsung.com>
+ * @version     1.0
+ * @brief       This file is the header and implementation of guard that calls function in
+                destructor.
+ */
+
+#pragma once
+
+#include <utility>
+
+namespace Cynara {
+
+template<class Func>
+class CallInDestructor {
+    Func m_func;
+
+public:
+    explicit CallInDestructor(Func func) : m_func{std::move(func)} {}
+
+    CallInDestructor(const CallInDestructor&) = delete;
+    CallInDestructor(CallInDestructor&&) = delete;
+    CallInDestructor& operator=(const CallInDestructor&) = delete;
+    CallInDestructor& operator=(CallInDestructor&&) = delete;
+
+    ~CallInDestructor() noexcept(false) {
+        m_func();
+    }
+};
+
+} // namespace Cynara
index cf350250d6d9977fa16cc6a499600538061280ee..cdcd00e03b20c8326b527f6f764cf9cd860400b9 100644 (file)
@@ -100,7 +100,8 @@ Logic::~Logic() {
 }
 
 void Logic::execute(const RequestContext &context UNUSED, const SignalRequest &request) {
-    LOGD("Processing signal: [%d]", request.signalNumber());
+    LOGD("Processing signal: [%d] from PID: %i", request.signalNumber(),
+         static_cast<int>(request.sigInfo().ssi_pid));
 
     switch (request.signalNumber()) {
     case SIGTERM:
diff --git a/src/service/sockets/Epoll.h b/src/service/sockets/Epoll.h
new file mode 100644 (file)
index 0000000..fb7e94a
--- /dev/null
@@ -0,0 +1,203 @@
+/*
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * This file is licensed under the terms of MIT License or the Apache License
+ * Version 2.0 of your choice. See the LICENSE.MIT file for MIT license details.
+ * See the LICENSE file or the notice below for Apache License Version 2.0
+ * details.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * @file        src/service/sockets/Epoll.h
+ * @author      Krzysztof Malysa <k.malysa@samsung.com>
+ * @version     1.0
+ * @brief       This file declares and implements epoll() abstraction class
+ */
+
+#include <array>
+#include <cstdint>
+#include <cstring>
+#include <error/SafeStrError.h>
+#include <exceptions/UnexpectedErrorException.h>
+#include <log/log.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+#include <vector>
+
+namespace Cynara {
+
+class Epoll {
+public:
+    Epoll() : m_epollFd{epoll_create1(EPOLL_CLOEXEC)} {
+        if (m_epollFd < 0) {
+            int err = errno;
+            LOGE("Failed to create epoll fd <%s>", safeStrError(err).c_str());
+            throw UnexpectedErrorException(err, safeStrError(err));
+        }
+    }
+
+    ~Epoll() { (void)close(m_epollFd); }
+
+    Epoll(const Epoll &) = delete;
+    Epoll(Epoll &&) = delete;
+    Epoll &operator=(const Epoll &) = delete;
+    Epoll &operator=(Epoll &&) = delete;
+
+    struct Events {
+        bool readable : 1;
+        bool writable : 1;
+        bool error : 1;
+        bool hangup : 1;
+    };
+
+    struct FdWithEvents : public Events {
+        int fd;
+    };
+
+    struct EventsToWatch {
+        bool readable : 1;
+        bool writable : 1;
+    };
+
+    void startWatching(int fd, EventsToWatch eventsToWatch) {
+        size_t minRequiredSize = static_cast<size_t>(fd) * 2 + 2;
+        if (m_eventsToWatch.size() < minRequiredSize) {
+            m_eventsToWatch.resize(minRequiredSize);
+        }
+
+        m_eventsToWatch[fd << 1] = eventsToWatch.readable;
+        m_eventsToWatch[fd << 1 | 1] = eventsToWatch.writable;
+
+        epoll_event ev;
+        ev.events = eventsToWatchToEpollEvents(eventsToWatch);
+        std::memset(&ev.data, 0, sizeof(ev.data));
+        ev.data.fd = fd;
+        if (epoll_ctl(m_epollFd, EPOLL_CTL_ADD, fd, &ev)) {
+            int err = errno;
+            LOGE("epoll_ctl(ADD, %i) failed <%s>", fd, safeStrError(err).c_str());
+            throw UnexpectedErrorException(err, safeStrError(err));
+        }
+        LOGD("epoll: started watching fd [%i] for %s", fd, eventsToWatchToStr(eventsToWatch));
+    }
+
+    void stopWatching(int fd) {
+        if (epoll_ctl(m_epollFd, EPOLL_CTL_DEL, fd, nullptr)) {
+            int err = errno;
+            LOGE("epoll_ctl(DEL, %i) failed <%s>", fd, safeStrError(err).c_str());
+            throw UnexpectedErrorException(err, safeStrError(err));
+        }
+        // We don't care about m_eventsToWatch, since they are reset upon reuse
+        LOGD("epoll: stopped watching fd [%i]", fd);
+    }
+
+    EventsToWatch getWatchedEvents(int fd) const noexcept {
+        return EventsToWatch{m_eventsToWatch[fd << 1], m_eventsToWatch[fd << 1 | 1]};
+    }
+
+    void watchReadEvents(int fd, bool watch) {
+        auto watchedEvents = getWatchedEvents(fd);
+        watchedEvents.readable = watch;
+        changeWatchedEvents(fd, watchedEvents);
+    }
+
+    void watchWriteEvents(int fd, bool watch) {
+        auto watchedEvents = getWatchedEvents(fd);
+        watchedEvents.writable = watch;
+        changeWatchedEvents(fd, watchedEvents);
+    }
+
+    void changeWatchedEvents(int fd, EventsToWatch newEventsToWatch) {
+        epoll_event ev;
+        ev.events = eventsToWatchToEpollEvents(newEventsToWatch);
+        std::memset(&ev.data, 0, sizeof(ev.data));
+        ev.data.fd = fd;
+        if (epoll_ctl(m_epollFd, EPOLL_CTL_MOD, fd, &ev)) {
+            int err = errno;
+            LOGE("epoll_ctl(MOD, %i) failed <%s>", fd, safeStrError(err).c_str());
+            throw UnexpectedErrorException(err, safeStrError(err));
+        }
+        // Needs to be done after successful epoll_ctl()
+        m_eventsToWatch[fd << 1] = newEventsToWatch.readable;
+        m_eventsToWatch[fd << 1 | 1] = newEventsToWatch.writable;
+        LOGD("epoll: changed watched events on fd [%i] to %s",
+             fd,
+             eventsToWatchToStr(newEventsToWatch));
+    }
+
+    template <size_t MAX_EVENTS_NUM>
+    std::vector<FdWithEvents> awaitEvents() {
+        std::array<epoll_event, MAX_EVENTS_NUM> events;
+        int eventsNum;
+        for (;;) {
+            eventsNum = epoll_wait(m_epollFd, events.data(), events.size(), -1);
+            if (eventsNum < 0) {
+                if (errno == EINTR)
+                    continue;
+                int err = errno;
+                LOGE("epoll_wait() failed: <%s>", safeStrError(err).c_str());
+                throw UnexpectedErrorException(err, safeStrError(err));
+            }
+            if (eventsNum == 0) {
+                LOGE("epoll_wait() unexpectedly returned 0");
+                throw UnexpectedErrorException{"epoll_wait() unexpectedly returned 0"};
+            }
+            break;
+        }
+
+        std::vector<FdWithEvents> res;
+        res.reserve(eventsNum);
+        for (int i = 0; i < eventsNum; ++i) {
+            const auto &event = events[i];
+            res.emplace_back(FdWithEvents{
+                {
+                    static_cast<bool>(event.events & EPOLLIN),
+                    static_cast<bool>(event.events & EPOLLOUT),
+                    static_cast<bool>(event.events & EPOLLERR),
+                    static_cast<bool>(event.events & EPOLLHUP),
+                },
+                event.data.fd,
+            });
+        }
+        return res;
+    }
+
+private:
+    [[nodiscard]] static decltype(epoll_event::events)
+    eventsToWatchToEpollEvents(EventsToWatch eventsToWatch) noexcept {
+        decltype(epoll_event::events) res = 0;
+        if (eventsToWatch.readable)
+            res |= EPOLLIN;
+        if (eventsToWatch.writable)
+            res |= EPOLLOUT;
+        return res;
+    }
+
+    [[nodiscard]] static const char *eventsToWatchToStr(EventsToWatch eventsToWatch) noexcept {
+        if (eventsToWatch.readable) {
+            if (eventsToWatch.writable) {
+                return "READ & WRITE";
+            }
+            return "READ";
+        }
+        if (eventsToWatch.writable) {
+            return "WRITE";
+        }
+        return "just errors";
+    }
+
+    int m_epollFd;
+    std::vector<bool> m_eventsToWatch; // memory-optimal way of storing the RW bits
+};
+
+} // namespace Cynara
index e17529b8fad82faef0d39f2c733fcde5b0e6ba6c..64ee2f21be245557ddaa56bd87066d0d115ddb60 100644 (file)
  * @file        src/service/sockets/SocketManager.cpp
  * @author      Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
  * @author      Adam Malinowski <a.malinowsk2@partner.samsung.com>
+ * @author      Krzysztof Malysa <k.malysa@samsung.com>
  * @version     1.0
  * @brief       This file implements socket layer manager for cynara
  */
 
+#include <array>
 #include <atomic>
+#include <cassert>
+#include <cinttypes>
 #include <csignal>
 #include <cstdint>
 #include <cstring>
@@ -35,7 +39,6 @@
 #include <memory>
 #include <mutex>
 #include <sys/eventfd.h>
-#include <sys/select.h>
 #include <sys/signalfd.h>
 #include <sys/socket.h>
 #include <sys/stat.h>
 #include <attributes/attributes.h>
 #include <common.h>
 #include <config/PathConfig.h>
+#include <containers/BinaryQueue.h>
+#include <containers/MutexedBinaryQueue.h>
+#include <containers/RawBuffer.h>
 #include <error/SafeStrError.h>
 #include <exceptions/DescriptorNotExistsException.h>
 #include <exceptions/InitException.h>
 #include <exceptions/UnexpectedErrorException.h>
 #include <log/log.h>
-
-#include <containers/BinaryQueue.h>
-#include <containers/MutexedBinaryQueue.h>
-#include <containers/RawBuffer.h>
 #include <logic/Logic.h>
 #include <main/Cynara.h>
 #include <protocol/ProtocolAdmin.h>
 #include <request/pointers.h>
 #include <request/RequestContext.h>
 #include <response/pointers.h>
+#include <utils/CallInDestructor.h>
 
 #include "SocketManager.h"
 
 namespace Cynara {
 
-SocketManager::SocketManager(size_t openFdsLimit) : m_openFdsLimit{openFdsLimit}, m_working(false),
-                                                    m_maxDesc(-1) {
-    FD_ZERO(&m_readSet);
-    FD_ZERO(&m_writeSet);
-}
-
-SocketManager::~SocketManager() {
-}
-
-void SocketManager::run(void) {
+void SocketManager::run() {
     init();
     mainLoop();
 }
@@ -103,8 +97,12 @@ void SocketManager::logStats() const {
          m_stats.monitors.fdsYetWithoutRequest.size());
 }
 
-void SocketManager::init(void) {
-    LOGI("SocketManger init start");
+void SocketManager::init() {
+    LOGI("SocketManager init start");
+    auto functionDoneLogger = CallInDestructor{[] {
+        LOGI("SocketManager init end");
+    }};
+
     const mode_t clientSocketUMask(0);
     const mode_t adminSocketUMask0077(S_IRWXG | S_IRWXO);
     const mode_t agentSocketUMask(0);
@@ -126,12 +124,13 @@ void SocketManager::init(void) {
     createNonReadOnlyRequestResultsNumEventFd();
     // Initialize RO logic
     m_readOnlyLogic = m_logic->createReadOnlyCopy();
-
-    LOGI("SocketManger init done");
 }
 
-void SocketManager::mainLoop(void) {
-    LOGI("SocketManger mainLoop start");
+void SocketManager::mainLoop() {
+    LOGI("SocketManager mainLoop start");
+    auto functionDoneLogger = CallInDestructor{[] {
+        LOGI("SocketManager mainLoop end");
+    }};
 
     m_nonReadOnlyWorkerThread = std::thread([this] {
         for (;;) {
@@ -167,7 +166,7 @@ void SocketManager::mainLoop(void) {
                 // Execute the request.
                 auto context = RequestContext(req.protocol, req.writeQueue, req.socketFd);
                 req.request->execute(*m_logic, context);
-                LOGD("Response size: [%i]",
+                LOGD("non-read-only logic worker thread: response size: [%i]",
                      static_cast<int>(req.writeQueue->lock()->size()));
                 // Install new RO logic
                 {
@@ -176,7 +175,7 @@ void SocketManager::mainLoop(void) {
                     m_readOnlyLogic = std::move(readOnlyLogic);
                 }
                 LOGD("non-read-only logic worker thread: sending response to request with socket fd"
-                     " [%i] with generation [%i] and sequence number [%i] of size [%i]",
+                     " [%i] with generation [%" PRIu64 "] and sequence number [%i] of size [%i]",
                      req.socketFd, req.socketFdGeneration,
                      static_cast<int>(req.request->sequenceNumber()),
                      static_cast<int>(req.writeQueue->lock()->size()));
@@ -184,7 +183,6 @@ void SocketManager::mainLoop(void) {
                 m_nonReadOnlyRequestResults.send(NonReadOnlyRequestResult{
                     req.socketFd,
                     req.socketFdGeneration,
-                    std::exchange(m_needToDisconnectAllClients, false),
                     std::exchange(m_needToStopMainLoop, false),
                 });
                 notifyTheMainThread();
@@ -192,128 +190,177 @@ void SocketManager::mainLoop(void) {
         }
     });
 
-    m_working  = true;
-    while (m_working) {
-        fd_set readSet = m_readSet;
-        fd_set writeSet = m_writeSet;
+    // End the other thread on loop exit or stack unwinding.
+    auto threadGuard = CallInDestructor{[this] {
+        m_nonReadOnlyRequests.send(NoMoreRequests{});
+        m_nonReadOnlyWorkerThread.join();
+    }};
 
+    for (;;) {
 #if BUILD_TYPE_DEBUG
         logStats();
 #else
         if (fdsUsageIsHigh())
             logStats();
 #endif
-        int ret = select(m_maxDesc + 1, &readSet, &writeSet, nullptr, nullptr);
-
-        if (ret < 0) {
-            switch (errno) {
-            case EINTR:
+        auto events = m_epoll.awaitEvents<32>();
+        // We do not have to worry about outdated events (a scenario where an event is reported for
+        // a file descriptor but processing earlier events in this batch caused the file descriptor
+        // to be closed and potentially other file descriptor might be opened with the same number
+        // before we started processing the event for it) because we close the file descriptor only
+        // after processing event reported for it is done.
+        for (const auto& event : events) {
+            // There are some cases where client writes a request and closes the connection and the
+            // request is valid and should be processed. This is the case with
+            // MonitorEntriesPutRequest. So we have to process readability event after the socket
+            // closes.
+            if ((event.error || event.hangup) && !event.readable) {
+                LOGN("Socket [%i] closed on the other end without becoming readable", event.fd);
+                closeSocket(event.fd);
                 continue;
-            default:
-                int err = errno;
-                throw UnexpectedErrorException(err, safeStrError(err));
             }
-        } else if (ret > 0) {
-            // First, write responses
-            int readyForReadNum = 0;
-            for (int i = 0; i < m_maxDesc + 1 && ret; ++i) {
-                if (FD_ISSET(i, &writeSet)) {
-                    readyForWrite(i);
-                    --ret;
-                }
-                if (FD_ISSET(i, &readSet)) {
-                    --ret;
-                    ++readyForReadNum;
+            // Write first.
+            if (event.writable) {
+                switch (readyForWrite(event.fd)) {
+                case WritingResult::THERE_STILL_IS_DATA_TO_WRITE: break;
+                case WritingResult::WRITTEN_ALL_DATA:
+                    m_epoll.watchWriteEvents(event.fd, false);
+                    break;
+                case WritingResult::ERROR:
+                    closeSocket(event.fd);
+                    continue;
                 }
             }
-            // Then accept new connections and requests
-            for (int i = 0; i < m_maxDesc + 1 && readyForReadNum; ++i) {
-                if (FD_ISSET(i, &readSet)) {
-                    readyForRead(i);
-                    --readyForReadNum;
+            // File descriptor might have been marked to ignore reads temporarily.
+            if (event.readable && m_epoll.getWatchedEvents(event.fd).readable) {
+                switch (readyForRead(event.fd)) {
+                case ReadingResult::OK: break;
+                case ReadingResult::STOP_MAIN_LOOP: return;
+                case ReadingResult::ERROR:
+                    // We assume we can drop already generated but not yet sent responses to
+                    // previous requests.
+                case ReadingResult::GOT_EOF:
+                    // We assume (client is implemented in such a way) that client will not read
+                    // from socket after closing it for writing, so we can close both ends after we
+                    // see the client closed their write end.
+                    closeSocket(event.fd);
+                    continue;
                 }
             }
+        }
 
-            LOGD("checking sockets <= %i for data to write", m_maxDesc);
-            for (int i = 0; i < m_maxDesc + 1; ++i) {
-                if (m_fds[i].isUsed() && m_fds[i].hasDataToWrite()) {
-                    LOGD("socket [%i] has data to write", i);
-                    addWriteSocket(i);
-                }
+        // Some sockets might have been marked readable again and may already have serialized
+        // requests in the buffer.
+        for (int fd : m_fdsToCheckForReadButNotProcessedRequests) {
+            switch (handleRead(fd, RawBuffer{})) {
+            case HandleReadResult::NEED_MORE_DATA:
+            case HandleReadResult::SUSPENDED_AND_MORE_REQUESTS_MAY_BE_IN_THE_BUFFER:
+                break;
+            case HandleReadResult::ERROR:
+                // We assume we can drop already generated but not yet sent responses to previous
+                // requests.
+                closeSocket(fd);
+                break;
+            }
+        }
+        m_fdsToCheckForReadButNotProcessedRequests.clear();
+
+        // TODO: do it more optimally by some marking which fds have data to write
+        LOGD("checking sockets < %zu for data to write", m_fds.size());
+        for (size_t fd = 0; fd < m_fds.size(); ++fd) {
+            if (m_fds[fd].isUsed() && m_fds[fd].hasDataToWrite()) {
+                LOGD("socket [%i] has data to write", fd);
+                m_epoll.watchWriteEvents(fd, true);
             }
         }
-    }
-
-    m_nonReadOnlyRequests.send(NoMoreRequests{});
-    m_nonReadOnlyWorkerThread.join();
 
-    LOGI("SocketManger mainLoop done");
+        // If we noticed that some socket has data to write. We have to check
+        // m_needToDisconnectAllClients and disconnect all clients if true BEFORE writing
+        // the response. All this to retain soundness of the client and admin API.
+        // Now we are safe to disconnect all clients without invalidating any file descriptor that
+        // will be used somewhere else in this function.
+        if (m_needToDisconnectAllClients.exchange(false)) {
+            LOGD("SocketManager disconnecting all clients");
+            for (size_t fd = 0; fd < m_fds.size(); ++fd) {
+                const auto& desc = m_fds[fd];
+                if (desc.isUsed() && desc.isClient() && !desc.isListen())
+                    closeSocket(fd);
+            }
+            shrinkFds();
+        }
+    }
 }
 
-void SocketManager::mainLoopStop(void) {
-    m_working = false;
+SocketManager::HandleNonReadOnlyRequestResultsResult
+SocketManager::handleNonReadOnlyRequestResults() {
+    uint64_t resNum = 0;
+    if (read(m_nonReadOnlyRequestResultsNumEventFd, &resNum, sizeof(resNum)) != sizeof(resNum)) {
+        int err = errno;
+        LOGE("Failed to read from eventfd <%s>", safeStrError(err).c_str());
+        throw UnexpectedErrorException(err, safeStrError(err));
+    }
+    for (uint64_t i = 0; i < resNum; ++i) {
+        auto resV = m_nonReadOnlyRequestResults.recv();
+        if (std::holds_alternative<CloseContextResult>(resV))
+            continue;
+
+        if (std::holds_alternative<NonReadOnlyRequestResult>(resV)) {
+            auto& res = std::get<NonReadOnlyRequestResult>(resV);
+            LOGD("main thread: handling NonReadOnlyRequestResult for socket fd [%i] with generation"
+                 " [%i]", res.socketFd, res.socketFdGeneration);
+            auto& desc = m_fds[res.socketFd];
+            if (desc.isUsed() && desc.getGeneration() == res.socketFdGeneration) {
+                // Descriptor was not closed and was not reused.
+                // There may be some requests from the socket that are already read into
+                // the buffer but not processed. Schedule reading them.
+                m_epoll.watchReadEvents(res.socketFd, true);
+                m_fdsToCheckForReadButNotProcessedRequests.emplace_back(res.socketFd);
+            }
+
+            if (res.stopMainLoop)
+                return HandleNonReadOnlyRequestResultsResult::STOP_MAIN_LOOP;
+        }
+    }
+    return HandleNonReadOnlyRequestResultsResult::OK;
 }
 
-void SocketManager::readyForRead(int fd) {
-    LOGD("SocketManger readyForRead on fd [%d] start", fd);
+SocketManager::ReadingResult SocketManager::readyForRead(int fd) {
+    LOGD("SocketManager readyForRead on fd [%d] start", fd);
+    auto functionDoneLogger = CallInDestructor{[fd] {
+        LOGD("SocketManager readyForRead on fd [%d] end", fd);
+    }};
 
     if (fd == m_nonReadOnlyRequestResultsNumEventFd) {
         LOGD("SocketManager m_nonReadOnlyRequestResultsNumEventFd is ready for read");
-        uint64_t resNum = 0;
-        if (read(fd, &resNum, sizeof(resNum)) != sizeof(resNum)) {
-            int err = errno;
-            LOGE("Failed to read from eventfd <%s>", safeStrError(err).c_str());
-            throw UnexpectedErrorException(err, safeStrError(err));
+        switch (handleNonReadOnlyRequestResults()) {
+        case HandleNonReadOnlyRequestResultsResult::OK: return ReadingResult::OK;
+        case HandleNonReadOnlyRequestResultsResult::STOP_MAIN_LOOP:
+            return ReadingResult::STOP_MAIN_LOOP;
         }
-        for (uint64_t i = 0; i < resNum; ++i) {
-            auto resV = m_nonReadOnlyRequestResults.recv();
-            if (std::holds_alternative<CloseContextResult>(resV))
-                continue;
-
-            if (std::holds_alternative<NonReadOnlyRequestResult>(resV)) {
-                auto& res = std::get<NonReadOnlyRequestResult>(resV);
-                LOGD("main thread: handling response to request with socket fd [%i] with"
-                    " generation [%i]",
-                    res.socketFd, res.socketFdGeneration);
-                if (res.socketFd == -1)
-                    continue;
-                // Handle the response
-                auto& desc = m_fds[res.socketFd];
-                if (desc.isUsed() && desc.getGeneration() == res.socketFdGeneration) {
-                    // Descriptor was not closed and was not reused.
-                    // Now we can safely read and process the other requests from the socket.
-                    addReadSocket(res.socketFd);
-                    // Process next requests if there are any on this socket
-                    handleRead(res.socketFd, RawBuffer{});
-                }
-                // Process extra events
-                if (res.disconnectAllClients)
-                    disconnectAllClients();
-                if (res.stopMainLoop)
-                    mainLoopStop();
-            }
-        }
-        return;
     }
 
     auto &desc = m_fds[fd];
     if (desc.isListen()) {
         readyForAccept(fd);
-        return;
+        return ReadingResult::OK;
     }
 
     RawBuffer readBuffer(DEFAULT_BUFFER_SIZE);
     ssize_t size = read(fd, readBuffer.data(), DEFAULT_BUFFER_SIZE);
-
     if (size > 0) {
         LOGD("read [%zd] bytes", size);
         readBuffer.resize(size);
-        if (handleRead(fd, readBuffer)) {
-            LOGD("SocketManger readyForRead on fd [%d] successfully done", fd);
-            return;
+        switch (handleRead(fd, readBuffer)) {
+        case HandleReadResult::NEED_MORE_DATA:
+        case HandleReadResult::SUSPENDED_AND_MORE_REQUESTS_MAY_BE_IN_THE_BUFFER:
+            return ReadingResult::OK;
+        case HandleReadResult::ERROR:
+            LOGI("interpreting buffer read from [%d] failed", fd);
+            return ReadingResult::ERROR;
         }
-        LOGI("interpreting buffer read from [%d] failed", fd);
-    } else if (size < 0) {
+    }
+
+    if (size < 0) {
         int err = errno;
         switch (err) {
             case EAGAIN:
@@ -321,20 +368,24 @@ void SocketManager::readyForRead(int fd) {
             case EWOULDBLOCK:
 #endif
             case EINTR:
-                return;
+                return ReadingResult::OK; // Reading will be tried again later.
             default:
-                LOGW("While reading from [%d] socket, error [%d]:<%s>",
-                     fd, err, safeStrError(err).c_str());
+                LOGW("While reading from [%d] socket, error [%d]: <%s>", fd, err,
+                     safeStrError(err).c_str());
+                return ReadingResult::ERROR;
         }
-    } else {
-        LOGN("Socket [%d] closed on other end", fd);
     }
-    closeSocket(fd);
-    LOGD("SocketManger readyForRead on fd [%d] done", fd);
+
+    LOGN("Socket [%d] closed on other end", fd);
+    return ReadingResult::GOT_EOF;
 }
 
-void SocketManager::readyForWrite(int fd) {
-    LOGD("SocketManger readyForWrite on fd [%d] start", fd);
+SocketManager::WritingResult SocketManager::readyForWrite(int fd) {
+    LOGD("SocketManager readyForWrite on fd [%d] start", fd);
+    auto functionDoneLogger = CallInDestructor{[fd] {
+        LOGD("SocketManager readyForWrite on fd [%d] end", fd);
+    }};
+
     auto &desc = m_fds[fd];
     auto &buffer = desc.prepareWriteBuffer();
     size_t size = buffer.size();
@@ -344,34 +395,36 @@ void SocketManager::readyForWrite(int fd) {
         switch (err) {
         case EAGAIN:
         case EINTR:
-            // select will trigger write once again, nothing to do
-            break;
+            // epoll will trigger write once again, nothing to do
+            return WritingResult::THERE_STILL_IS_DATA_TO_WRITE;
         case EPIPE:
         default:
-            LOGD("Error during write to fd [%d]:<%s> ", fd, safeStrError(err).c_str());
-            closeSocket(fd);
-            break;
+            LOGD("Error during write to fd [%d]: <%s> ", fd, safeStrError(err).c_str());
+            return WritingResult::ERROR;
         }
-        return; // We do not want to propagate error to next layer
     }
 
     LOGD("written [%zd] bytes", result);
     buffer.erase(buffer.begin(), buffer.begin() + result);
 
-    if (buffer.empty())
-        removeWriteSocket(fd);
-    LOGD("SocketManger readyForWrite on fd [%d] done", fd);
+    return buffer.empty() ? WritingResult::WRITTEN_ALL_DATA :
+                            WritingResult::THERE_STILL_IS_DATA_TO_WRITE;
 }
 
 void SocketManager::readyForAccept(int fd) {
+    LOGD("SocketManager readyForAccept on fd [%d] start", fd);
+    auto functionDoneLogger = CallInDestructor{[fd] {
+        LOGD("SocketManager readyForAccept on fd [%d] end", fd);
+    }};
+
     if (fdsUsageIsHigh()) {
-        LOGD("SocketManger readyForAccept on fd [%d]: high memory usage -> stop listening", fd);
+        LOGD("SocketManager readyForAccept on fd [%d]: high memory usage -> stop listening", fd);
         m_listenSocketsDisabledBecauseOfHighFdUsage.emplace(fd);
-        removeReadSocket(fd);
+
+        m_epoll.watchReadEvents(fd, false);
         return;
     }
 
-    LOGD("SocketManger readyForAccept on fd [%d] start", fd);
     struct sockaddr_un clientAddr;
     unsigned int clientLen = sizeof(clientAddr);
     int clientFd = accept4(fd, (struct sockaddr*) &clientAddr, &clientLen, SOCK_NONBLOCK);
@@ -398,16 +451,20 @@ void SocketManager::readyForAccept(int fd) {
             logStats();
 #endif
 
-    auto &desc = createDescriptor(clientFd, m_fds[fd].isClient());
+    auto &desc = createDescriptorWatchedForRead(clientFd, m_fds[fd].isClient());
     desc.setListen(false);
     desc.setProtocol(m_fds[fd].protocol()->clone());
     desc.setReadOnlyProtocol(m_fds[fd].protocol()->clone());
-    addReadSocket(clientFd);
-    LOGD("SocketManger readyForAccept on fd [%d] done", fd);
 }
 
 void SocketManager::closeSocket(int fd) {
-    LOGD("SocketManger closeSocket fd [%d] start", fd);
+    LOGD("SocketManager closeSocket fd [%d] start", fd);
+    auto functionDoneLogger = CallInDestructor{[fd] {
+        LOGD("SocketManager closeSocket fd [%d] end", fd);
+    }};
+
+    m_epoll.stopWatching(fd);
+
     Descriptor &desc = m_fds[fd];
     m_nonReadOnlyRequests.send(CloseContext{
         fd,
@@ -418,35 +475,39 @@ void SocketManager::closeSocket(int fd) {
     // Statistics
     m_stats.closeFd(fd);
 
-    removeReadSocket(fd);
-    removeWriteSocket(fd);
     desc.clear();
+    shrinkFds();
+
     close(fd);
 
     // Restore sockets that were disabled due to high fd usage
     if (fdUsageWasHigh && !fdsUsageIsHigh()) {
         for (auto fd : m_listenSocketsDisabledBecauseOfHighFdUsage) {
-            LOGD("SocketManger: start listening again on fd [%d]", fd);
-            addReadSocket(fd);
+            LOGD("SocketManager: start listening again on fd [%d]", fd);
+            m_epoll.watchReadEvents(fd, true);
         }
         m_listenSocketsDisabledBecauseOfHighFdUsage.clear();
     }
-
-    LOGD("SocketManger closeSocket fd [%d] done", fd);
 }
 
-bool SocketManager::handleRead(int fd, const RawBuffer &readbuffer) {
-    LOGD("SocketManger handleRead on fd [%d] start", fd);
+SocketManager::HandleReadResult SocketManager::handleRead(int fd, const RawBuffer &readBuffer) {
+    LOGD("SocketManager handleRead on fd [%d] start", fd);
+    auto functionDoneLogger = CallInDestructor{[fd] {
+        LOGD("SocketManager handleRead on fd [%d] end", fd);
+    }};
+
     auto &desc = m_fds[fd];
-    desc.pushReadBuffer(readbuffer);
+    desc.pushReadBuffer(readBuffer);
 
     try {
         while(true) {
-            //try extract request from binary data received on socket
+            // Try extract request from binary data received on socket.
             auto req = desc.extractRequest();
-            if (!req)   // not enough data to build request yet
-                break;
-            LOGD("request extracted");
+            if (!req) {
+                // Not enough data to build request yet.
+                return HandleReadResult::NEED_MORE_DATA;
+            }
+            LOGD("request extracted: %s", req->name());
             m_stats.seenRequestFor(fd);
 
             auto guard = std::unique_lock{m_readOnlyLogicLock};
@@ -465,7 +526,7 @@ bool SocketManager::handleRead(int fd, const RawBuffer &readbuffer) {
                 // request comes after the write request, but we handle it before the write request
                 // finishes and return the answer based on the old policy. Or use desc.writeQueue()
                 // by read-only logic.
-                removeReadSocket(fd);
+                m_epoll.watchReadEvents(fd, false);
 
                 LOGD("Passing request to the non-read-only logic");
                 // Pass the request to the m_nonReadOnlyWorkerThread.
@@ -477,15 +538,13 @@ bool SocketManager::handleRead(int fd, const RawBuffer &readbuffer) {
                     desc.protocol(),
                     desc.writeQueue(),
                 });
-                break;
+                return HandleReadResult::SUSPENDED_AND_MORE_REQUESTS_MAY_BE_IN_THE_BUFFER;
             }
         }
     } catch (const Exception &ex) {
         LOGE("Error handling request <%s>. Closing socket", ex.what());
-        return false;
+        return HandleReadResult::ERROR;
     }
-    LOGD("SocketManger handleRead on fd [%d] done", fd);
-    return true;
 }
 
 int SocketManager::createDomainSocket(ProtocolPtr protocol, const std::string &path, mode_t mask,
@@ -497,12 +556,10 @@ int SocketManager::createDomainSocket(ProtocolPtr protocol, const std::string &p
 #endif
         fd = createDomainSocketHelp(path, mask);
 
-    auto &desc = createDescriptor(fd, client);
+    auto &desc = createDescriptorWatchedForRead(fd, client);
     desc.setListen(true);
     desc.setProtocol(protocol);
     desc.setReadOnlyProtocol(protocol->clone());
-    addReadSocket(fd);
-
     LOGD("Domain socket: [%d] added.", fd);
     return fd;
 }
@@ -573,8 +630,8 @@ int SocketManager::getSocketFromSystemD(const std::string &path) {
 
     for (int fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; ++fd) {
         if (sd_is_socket_unix(fd, SOCK_STREAM, 1, path.c_str(), 0) > 0) {
-            LOGI("Useable socket <%s> was passed by SystemD under descriptor [%d]",
-                    path.c_str(), fd);
+            LOGI("Useable socket <%s> was passed by SystemD under descriptor [%d]", path.c_str(),
+                 fd);
             return fd;
         }
     }
@@ -602,12 +659,10 @@ void SocketManager::createSignalSocket(ProtocolPtr protocol) {
         return;
     }
 
-    auto &desc = createDescriptor(fd, false);
+    auto &desc = createDescriptorWatchedForRead(fd, false);
     desc.setListen(false);
     desc.setProtocol(protocol);
     desc.setReadOnlyProtocol(protocol->clone());
-    addReadSocket(fd);
-
     LOGD("Signal socket: [%d] added.", fd);
 }
 
@@ -620,48 +675,33 @@ void SocketManager::createNonReadOnlyRequestResultsNumEventFd() {
     }
     m_nonReadOnlyRequestResultsNumEventFd = efd;
     // Mark the efd to be listened on for read events
-    auto& desc = createDescriptor(efd, false);
+    auto& desc = createDescriptorWatchedForRead(efd, false);
     desc.setListen(false);
     desc.setProtocol(nullptr);
     desc.setReadOnlyProtocol(nullptr);
-    addReadSocket(efd);
-    LOGD("SocketManger created nonReadOnlyRequestResultsNumEventFd [%d]", efd);
+    LOGD("SocketManager created nonReadOnlyRequestResultsNumEventFd [%d]", efd);
 }
 
-Descriptor &SocketManager::createDescriptor(int fd, bool client) {
-    if (fd > m_maxDesc) {
-        m_maxDesc = fd;
-        if (fd >= static_cast<int>(m_fds.size()))
-            m_fds.resize(fd + 20);
-    }
+Descriptor &SocketManager::createDescriptorWatchedForRead(int fd, bool client) {
+    assert(fd >= 0);
+    if (static_cast<size_t>(fd) >= m_fds.size())
+        m_fds.resize(fd + 1);
     auto &desc = m_fds[fd];
-    desc.setUsed(true);
-    desc.setClient(client);
-    return desc;
-}
 
-void SocketManager::addReadSocket(int fd) {
-    FD_SET(fd, &m_readSet);
-}
-
-void SocketManager::removeReadSocket(int fd) {
-    FD_CLR(fd, &m_readSet);
-}
+    m_epoll.startWatching(fd, Epoll::EventsToWatch{true, false});
 
-void SocketManager::addWriteSocket(int fd) {
-    FD_SET(fd, &m_writeSet);
-}
-
-void SocketManager::removeWriteSocket(int fd) {
-    FD_CLR(fd, &m_writeSet);
+    desc.setUsed(true); // Needs to be done after potential exceptions from epoll.
+    desc.setClient(client);
+    return desc;
 }
 
-void SocketManager::disconnectAllClients(void) {
-    for(int i = 0; i <= m_maxDesc; ++i) {
-        auto &desc = m_fds[i];
-        if(desc.isUsed() && desc.isClient() && !desc.isListen())
-            closeSocket(i);
+void SocketManager::shrinkFds() {
+    // Reduce size of m_fds to speed up processing
+    size_t newSize = m_fds.size();
+    while (newSize > 0 && !m_fds[newSize - 1].isUsed()) {
+        --newSize;
     }
+    m_fds.resize(newSize);
 }
 
 } // namespace Cynara
index 3063d0761bbdc94f839fe911f4bbcf329dafca9b..4f4a9549c68b1ebcdbb26654f2a720835863cd6c 100644 (file)
@@ -33,6 +33,7 @@
 #include <cstdio>
 #include <memory>
 #include <thread>
+#include <unordered_map>
 #include <variant>
 #include <vector>
 
@@ -44,6 +45,7 @@
 #include <protocol/Protocol.h>
 #include <request/RequestTaker.h>
 #include "Descriptor.h"
+#include "Epoll.h"
 #include "logic/Logic.h"
 #include "utils/Channel.h"
 
@@ -53,16 +55,16 @@ const size_t DEFAULT_BUFFER_SIZE = BUFSIZ;
 
 class SocketManager {
 public:
-    explicit SocketManager(size_t openFdsLimit);
-    ~SocketManager();
+    explicit SocketManager(size_t openFdsLimit) : m_openFdsLimit{openFdsLimit} {}
+    ~SocketManager() = default;
 
-    void run(void);
+    void run();
 
     void bindLogic(LogicPtr logic) {
         m_logic = logic;
     }
 
-    void unbindAll(void) {
+    void unbindAll() {
         m_logic.reset();
     }
 
@@ -70,7 +72,7 @@ public:
     void signalDisconnectAllClients() {
         if (std::this_thread::get_id() != m_nonReadOnlyWorkerThread.get_id())
             throw UnexpectedErrorException{"signalDisconnectAllClients() call in the wrong thread"};
-        m_needToDisconnectAllClients = true;
+        m_needToDisconnectAllClients.store(true);
     }
 
     // Only safe to call from m_nonReadOnlyWorkerThread
@@ -97,7 +99,6 @@ private:
     struct NonReadOnlyRequestResult {
         int socketFd;
         uint64_t socketFdGeneration;
-        bool disconnectAllClients;
         bool stopMainLoop;
     };
     // Used to break the event loop and write the created responses
@@ -112,20 +113,24 @@ private:
     Channel<std::variant<NonReadOnlyRequestResult, CloseContextResult>> m_nonReadOnlyRequestResults;
     int m_nonReadOnlyRequestResultsNumEventFd = -1;
     // Only accessed from m_nonReadOnlyWorkerThread
-    bool m_needToDisconnectAllClients = false;
     bool m_needToStopMainLoop = false;
+    // Set in m_nonReadOnlyWorkerThread, read and reset in the main thread.
+    // The problem is that handling requests in the m_nonReadOnlyWorkerThread may write to buffers
+    // in m_fds, so that the main thread will write responses before the main thread will receive
+    // notification through m_nonReadOnlyRequestResultsNumEventFd about the request being completed.
+    // But to avoid situation where clients are still connected after the response was received we
+    // have to disconnect them before writing of the response happens, so we need "faster"
+    // notification method. That is why this is used instead of signaling the need to disconnect all
+    // clients through NonReadOnlyRequestResult.
+    std::atomic_bool m_needToDisconnectAllClients = false;
 
     size_t m_openFdsLimit;
-    typedef std::vector<Descriptor> FDVector;
-    FDVector m_fds;
 
-    std::set<int> m_listenSocketsDisabledBecauseOfHighFdUsage;
-
-    bool m_working;
+    std::vector<Descriptor> m_fds;
+    Epoll m_epoll;
+    std::vector<int> m_fdsToCheckForReadButNotProcessedRequests;
 
-    fd_set m_readSet;
-    fd_set m_writeSet;
-    int m_maxDesc;
+    std::set<int> m_listenSocketsDisabledBecauseOfHighFdUsage;
 
     struct ForStats {
         int clientSocketFd = -1;
@@ -175,24 +180,47 @@ private:
         }
     } m_stats;
 
+    enum class [[nodiscard]] WritingResult {
+        WRITTEN_ALL_DATA,
+        THERE_STILL_IS_DATA_TO_WRITE,
+        ERROR,
+    };
+
+    enum class [[nodiscard]] ReadingResult {
+        OK,
+        STOP_MAIN_LOOP,
+        GOT_EOF,
+        ERROR,
+    };
+
+    enum class [[nodiscard]] HandleReadResult {
+        NEED_MORE_DATA,
+        SUSPENDED_AND_MORE_REQUESTS_MAY_BE_IN_THE_BUFFER,
+        ERROR,
+    };
+
+    enum class [[nodiscard]] HandleNonReadOnlyRequestResultsResult {
+        OK,
+        STOP_MAIN_LOOP,
+    };
+
     void logStats() const;
 
     bool fdsUsageIsHigh() const noexcept {
-        // System becomes unresponsive when number of clients gets close to 1024, no matter if the
-        // open fd limit is higher or not. So we cap it at around 800.
-        return m_stats.openConnections() > std::min<size_t>(800, m_openFdsLimit * 7 / 8);
+        // 50 is a safe margin for number of file descriptors used by other parts of the code
+        return m_stats.openConnections() > std::max<size_t>( m_openFdsLimit, 70) - 50;
     }
 
-    void init(void);
-    void mainLoop(void);
+    void init();
+    void mainLoop();
 
-    void mainLoopStop(void);
+    HandleNonReadOnlyRequestResultsResult handleNonReadOnlyRequestResults();
 
-    void readyForRead(int fd);
-    void readyForWrite(int fd);
+    ReadingResult readyForRead(int fd);
+    WritingResult readyForWrite(int fd);
     void readyForAccept(int fd);
     void closeSocket(int fd);
-    bool handleRead(int fd, const RawBuffer &readbuffer);
+    HandleReadResult handleRead(int fd, const RawBuffer &readBuffer);
 
     int createDomainSocket(ProtocolPtr protocol, const std::string &path, mode_t mask,
                            bool client);
@@ -203,13 +231,9 @@ private:
     void createSignalSocket(ProtocolPtr protocol);
     void createNonReadOnlyRequestResultsNumEventFd();
 
-    Descriptor &createDescriptor(int fd, bool client);
+    Descriptor &createDescriptorWatchedForRead(int fd, bool client);
 
-    void addReadSocket(int fd);
-    void removeReadSocket(int fd);
-    void addWriteSocket(int fd);
-    void removeWriteSocket(int fd);
-    void disconnectAllClients(void);
+    void shrinkFds();
 };
 
 } // namespace Cynara