Add logging some statistics about current connections 03/319403/7
authorKrzysztof Malysa <k.malysa@samsung.com>
Tue, 22 Oct 2024 15:32:59 +0000 (17:32 +0200)
committerKrzysztof Malysa <k.malysa@samsung.com>
Thu, 24 Oct 2024 14:47:28 +0000 (16:47 +0200)
Change-Id: I92ebd3c1338fa9d25e8dafcb271608fe83c666a9

src/service/main/Cynara.cpp
src/service/main/Cynara.h
src/service/main/main.cpp
src/service/sockets/SocketManager.cpp
src/service/sockets/SocketManager.h

index 31a9801f3e4175aea98a9e804c3c8ccbe1cb000d..e3959d71f52d6dbb4d50562eba55941db4f9dabd 100644 (file)
@@ -55,11 +55,11 @@ Cynara::~Cynara() {
     finalize();
 }
 
-void Cynara::init(void) {
+void Cynara::init(size_t openFdsLimit) {
     m_agentManager = std::make_shared<AgentManager>();
     m_logic = std::make_shared<Logic>();
     m_pluginManager = std::make_shared<PluginManager>(PathConfig::PluginPath::serviceDir);
-    m_socketManager = std::make_shared<SocketManager>();
+    m_socketManager = std::make_shared<SocketManager>(openFdsLimit);
     m_backup = std::make_shared<EmergencyBackup>(PathConfig::StoragePath::dbDir, PathConfig::BackupPath::dbRODir);
     m_storageBackend = std::make_shared<InMemoryStorageBackend>(PathConfig::StoragePath::dbDir);
     m_storage = std::make_shared<Storage>(*m_storageBackend);
index f639caa91d0bd728ee9dfd8f93983ccb492a74c7..03ead60d4e5db27a061912d10b4780e46bffaafb 100644 (file)
@@ -40,7 +40,7 @@ public:
     Cynara();
     ~Cynara();
 
-    void init(void);
+    void init(size_t openFdsLimit);
     void run(void);
     void finalize(void);
 
index a91aad5c4c0403ea7506366a1c1629a90d030cdb..6d8c48915cb005462737fc81872f99f8eb9e0952 100644 (file)
@@ -117,23 +117,27 @@ int main(int argc, char **argv) {
         init_log();
 
         // Increase the soft limit of open file descriptors
+        size_t openFdsLimit = 1024;
         rlimit limit;
         if (prlimit(0, RLIMIT_NOFILE, nullptr, &limit)) {
             LOGE("prlimit() failed - %s", Cynara::safeStrError(errno).c_str());
         } else {
+            openFdsLimit = limit.rlim_cur;
             LOGW("Open fd limit: %zu", static_cast<size_t>(limit.rlim_cur));
             if (limit.rlim_cur < limit.rlim_max) {
                 limit.rlim_cur = limit.rlim_max;
                 LOGW("Increasing open fd limit to %zu", static_cast<size_t>(limit.rlim_max));
                 if (prlimit(0, RLIMIT_NOFILE, &limit, nullptr)) {
                     LOGE("prlimit() failed - %s", Cynara::safeStrError(errno).c_str());
+                } else {
+                    openFdsLimit = limit.rlim_cur;
                 }
             }
         }
 
         Cynara::Cynara cynara;
         LOGI("Cynara service is starting ...");
-        cynara.init();
+        cynara.init(openFdsLimit);
         LOGI("Cynara service is started");
 
 #ifdef BUILD_WITH_SYSTEMD_DAEMON
index b69cc91d8e4d6c129ccb3d707034625fe65dea04..7be676be77e38f8c744ec1d18273d1aa46f51428 100644 (file)
@@ -26,6 +26,7 @@
  * @brief       This file implements socket layer manager for cynara
  */
 
+#include <atomic>
 #include <csignal>
 #include <cstdint>
 #include <cstring>
@@ -75,7 +76,8 @@
 
 namespace Cynara {
 
-SocketManager::SocketManager() : m_working(false), m_maxDesc(-1) {
+SocketManager::SocketManager(size_t openFdsLimit) : m_openFdsLimit{openFdsLimit}, m_working(false),
+                                                    m_maxDesc(-1) {
     FD_ZERO(&m_readSet);
     FD_ZERO(&m_writeSet);
 }
@@ -88,6 +90,19 @@ void SocketManager::run(void) {
     mainLoop();
 }
 
+void SocketManager::logStats() const {
+    LOGW("stats: pendingNonReadOnlyRequests: %llu",
+         static_cast<unsigned long long>(m_stats.pendingNonReadOnlyRequests));
+    LOGW("stats: clients: %zu, clients without requests: %zu", m_stats.clients.fds.size(),
+         m_stats.clients.fdsYetWithoutRequest.size());
+    LOGW("stats: admins: %zu, admins without requests: %zu", m_stats.admins.fds.size(),
+         m_stats.admins.fdsYetWithoutRequest.size());
+    LOGW("stats: agents: %zu, agents without requests: %zu", m_stats.agents.fds.size(),
+         m_stats.agents.fdsYetWithoutRequest.size());
+    LOGW("stats: monitors: %zu monitors without requests: %zu", m_stats.monitors.fds.size(),
+         m_stats.monitors.fdsYetWithoutRequest.size());
+}
+
 void SocketManager::init(void) {
     LOGI("SocketManger init start");
     const mode_t clientSocketUMask(0);
@@ -95,14 +110,18 @@ void SocketManager::init(void) {
     const mode_t agentSocketUMask(0);
     const mode_t monitorSocketUMask(0);
 
-    createDomainSocket(std::make_shared<ProtocolClient>(), PathConfig::SocketPath::client,
-                       clientSocketUMask, true);
-    createDomainSocket(std::make_shared<ProtocolAdmin>(), PathConfig::SocketPath::admin,
-                       adminSocketUMask0077, false);
-    createDomainSocket(std::make_shared<ProtocolAgent>(), PathConfig::SocketPath::agent,
-                       agentSocketUMask, false);
-    createDomainSocket(std::make_shared<ProtocolMonitorGet>(), PathConfig::SocketPath::monitorGet,
-                       monitorSocketUMask, false);
+    m_forStats.clientSocketFd = createDomainSocket(std::make_shared<ProtocolClient>(),
+                                                   PathConfig::SocketPath::client, clientSocketUMask,
+                                                   true);
+    m_forStats.adminSocketFd = createDomainSocket(std::make_shared<ProtocolAdmin>(),
+                                                  PathConfig::SocketPath::admin, adminSocketUMask0077,
+                                                  false);
+    m_forStats.agentSocketFd = createDomainSocket(std::make_shared<ProtocolAgent>(),
+                                                  PathConfig::SocketPath::agent, agentSocketUMask,
+                                                  false);
+    m_forStats.monitorSocketFd = createDomainSocket(std::make_shared<ProtocolMonitorGet>(),
+                                                    PathConfig::SocketPath::monitorGet,
+                                                    monitorSocketUMask, false);
     createSignalSocket(std::make_shared<ProtocolSignal>());
     createNonReadOnlyRequestResultsNumEventFd();
     // Initialize RO logic
@@ -144,6 +163,7 @@ void SocketManager::mainLoop(void) {
                 LOGD("non-read-only logic worker thread: handling request with socket fd [%i] with"
                      " generation [%i] and sequence number [%i]", req.socketFd,
                      req.socketFdGeneration, static_cast<int>(req.request->sequenceNumber()));
+                m_stats.pendingNonReadOnlyRequests.fetch_sub(1, std::memory_order_relaxed);
                 // Execute the request.
                 auto context = RequestContext(req.protocol, req.writeQueue, req.socketFd);
                 req.request->execute(*m_logic, context);
@@ -177,6 +197,12 @@ void SocketManager::mainLoop(void) {
         fd_set readSet = m_readSet;
         fd_set writeSet = m_writeSet;
 
+#if BUILD_TYPE_DEBUG
+        logStats();
+#else
+        if (fdsUsageIsHigh())
+            logStats();
+#endif
         int ret = select(m_maxDesc + 1, &readSet, &writeSet, nullptr, nullptr);
 
         if (ret < 0) {
@@ -349,6 +375,22 @@ void SocketManager::readyForAccept(int fd) {
     }
     LOGD("Accept on sock [%d]. New client socket opened [%d]", fd, clientFd);
 
+    if (fd == m_forStats.clientSocketFd) {
+        m_stats.clients.newFd(clientFd);
+    } else if (fd == m_forStats.adminSocketFd) {
+        m_stats.admins.newFd(clientFd);
+    } else if (fd == m_forStats.agentSocketFd) {
+        m_stats.agents.newFd(clientFd);
+    } else if (fd == m_forStats.monitorSocketFd) {
+        m_stats.monitors.newFd(clientFd);
+    }
+#if BUILD_TYPE_DEBUG
+        logStats();
+#else
+        if (fdsUsageIsHigh())
+            logStats();
+#endif
+
     auto &desc = createDescriptor(clientFd, m_fds[fd].isClient());
     desc.setListen(false);
     desc.setProtocol(m_fds[fd].protocol()->clone());
@@ -364,6 +406,10 @@ void SocketManager::closeSocket(int fd) {
         fd,
         desc.writeQueue(),
     });
+
+    // Statistics
+    m_stats.closeFd(fd);
+
     removeReadSocket(fd);
     removeWriteSocket(fd);
     desc.clear();
@@ -383,6 +429,7 @@ bool SocketManager::handleRead(int fd, const RawBuffer &readbuffer) {
             if (!req)   // not enough data to build request yet
                 break;
             LOGD("request extracted");
+            m_stats.seenRequestFor(fd);
 
             auto guard = std::unique_lock{m_readOnlyLogicLock};
             if (req->canBeExecutedReadOnly(*m_readOnlyLogic)) {
@@ -404,6 +451,7 @@ bool SocketManager::handleRead(int fd, const RawBuffer &readbuffer) {
 
                 LOGD("Passing request to the non-read-only logic");
                 // Pass the request to the m_nonReadOnlyWorkerThread.
+                m_stats.pendingNonReadOnlyRequests.fetch_add(1, std::memory_order_relaxed);
                 m_nonReadOnlyRequests.send(NonReadOnlyRequest{
                     fd,
                     desc.getGeneration(),
@@ -422,8 +470,8 @@ bool SocketManager::handleRead(int fd, const RawBuffer &readbuffer) {
     return true;
 }
 
-void SocketManager::createDomainSocket(ProtocolPtr protocol, const std::string &path, mode_t mask,
-                                       bool client) {
+int SocketManager::createDomainSocket(ProtocolPtr protocol, const std::string &path, mode_t mask,
+                                      bool client) {
     int fd;
 #ifdef BUILD_WITH_SYSTEMD_DAEMON
     fd = getSocketFromSystemD(path);
@@ -438,6 +486,7 @@ void SocketManager::createDomainSocket(ProtocolPtr protocol, const std::string &
     addReadSocket(fd);
 
     LOGD("Domain socket: [%d] added.", fd);
+    return fd;
 }
 
 int SocketManager::createDomainSocketHelp(const std::string &path, mode_t mask) {
index 0860ae78da10ecb69763132bd3b34c1f307ad38b..8c6be64e9f45e10371f8531fd43389d64c23de25 100644 (file)
@@ -28,6 +28,7 @@
 #ifndef SRC_SERVICE_SOCKETS_SOCKETMANAGER_H_
 #define SRC_SERVICE_SOCKETS_SOCKETMANAGER_H_
 
+#include <atomic>
 #include <cstdio>
 #include <memory>
 #include <thread>
@@ -51,7 +52,7 @@ const size_t DEFAULT_BUFFER_SIZE = BUFSIZ;
 
 class SocketManager {
 public:
-    SocketManager();
+    explicit SocketManager(size_t openFdsLimit);
     ~SocketManager();
 
     void run(void);
@@ -113,6 +114,7 @@ private:
     bool m_needToDisconnectAllClients = false;
     bool m_needToStopMainLoop = false;
 
+    size_t m_openFdsLimit;
     typedef std::vector<Descriptor> FDVector;
     FDVector m_fds;
 
@@ -122,6 +124,62 @@ private:
     fd_set m_writeSet;
     int m_maxDesc;
 
+    struct ForStats {
+        int clientSocketFd;
+        int adminSocketFd;
+        int agentSocketFd;
+        int monitorSocketFd;
+    } m_forStats;
+
+    struct Stats {
+        struct ProtocolStats {
+            std::set<int> fds;
+            std::set<int> fdsYetWithoutRequest;
+
+            void newFd(int fd) {
+                fds.emplace(fd);
+                fdsYetWithoutRequest.emplace(fd);
+            }
+
+            void closeFd(int fd) {
+                fds.erase(fd);
+                fdsYetWithoutRequest.erase(fd);
+            }
+        };
+
+        std::atomic_uint64_t pendingNonReadOnlyRequests = 0;
+        ProtocolStats clients;
+        ProtocolStats admins;
+        ProtocolStats agents;
+        ProtocolStats monitors;
+
+        size_t openConnections() const noexcept {
+            return clients.fds.size() + admins.fds.size() + agents.fds.size() + monitors.fds.size();
+        }
+
+        void seenRequestFor(int fd) {
+            clients.fdsYetWithoutRequest.erase(fd);
+            admins.fdsYetWithoutRequest.erase(fd);
+            agents.fdsYetWithoutRequest.erase(fd);
+            monitors.fdsYetWithoutRequest.erase(fd);
+        }
+
+        void closeFd(int fd) {
+            clients.closeFd(fd);
+            admins.closeFd(fd);
+            agents.closeFd(fd);
+            monitors.closeFd(fd);
+        }
+    } m_stats;
+
+    void logStats() const;
+
+    bool fdsUsageIsHigh() const noexcept {
+        // System becomes unresponsive when number of clients gets close to 1024, no matter if the
+        // open fd limit is higher or not. So we cap it at around 800.
+        return m_stats.openConnections() > std::min<size_t>(800, m_openFdsLimit * 7 / 8);
+    }
+
     void init(void);
     void mainLoop(void);
 
@@ -133,8 +191,8 @@ private:
     void closeSocket(int fd);
     bool handleRead(int fd, const RawBuffer &readbuffer);
 
-    void createDomainSocket(ProtocolPtr protocol, const std::string &path, mode_t mask,
-                            bool client);
+    int createDomainSocket(ProtocolPtr protocol, const std::string &path, mode_t mask,
+                           bool client);
     static int createDomainSocketHelp(const std::string &path, mode_t mask);
 #ifdef BUILD_WITH_SYSTEMD_DAEMON
     static int getSocketFromSystemD(const std::string &path);