Simplify socket-manager timeout logic 24/279124/3
authorKonrad Lipinski <k.lipinski2@samsung.com>
Mon, 18 Jul 2022 08:14:44 +0000 (10:14 +0200)
committerKonrad Lipinski <k.lipinski2@samsung.com>
Tue, 2 Aug 2022 10:54:56 +0000 (12:54 +0200)
The intention of the timeout logic is to close stale sockets (ones that
have been inactive for SOCKET_TIMEOUT seconds). The closure doesn't
really have to happen immediately after that, as long as it happens
eventually when, say, security-manager's IO thread wakes up.

* use select() without timeout
* replace timeout priority queue with generation-based management
* each generation lasts at least SOCKET_TIMEOUT seconds
* maintain per-socket activity booleans for the current generation
* a socket becomes active when performing or getting primed for IO
* when a new generation begins, loop through all sockets, time out all
  inactive ones, set all remaining to inactive

Change-Id: I50a06f1566806fa9d7d69fe2367d6ade0f93acf5

src/server/main/include/socket-manager.h
src/server/main/socket-manager.cpp

index f5683de..a9e9e7e 100644 (file)
@@ -75,6 +75,7 @@ private:
     void ReadyForAccept();
     bool GotSigTerm() const;
     void ProcessQueue(void);
+    void ProcessTimeout();
     void NotifyMe(void);
     void CloseSocket(int sock);
 
@@ -82,9 +83,8 @@ private:
 
     struct SocketDescription {
         bool isOpen = false;
-        bool isTimeout = false;
+        bool isActiveThisGeneration;
         unsigned counter = 0;
-        time_t timeout = 0;
         MessageBuffer buffer;
     };
 
@@ -97,24 +97,16 @@ private:
         MessageBuffer buffer;
     };
 
-    struct Timeout {
-        time_t time;
-        int sock;
-        bool operator<(const Timeout &second) const {
-            return time > second.time; // mininum first!
-        }
-    };
-
     SocketDescriptionVector m_socketDescriptionVector;
     Service *m_service = nullptr;
     fd_set m_readSet;
     fd_set m_writeSet;
     int m_maxDesc = 0;
     int m_signalFd, m_listenSock = -1, m_notifyMe;
+    time_t m_nextGenerationStart = 0;
     std::mutex m_eventQueueMutex;
     std::queue<WriteBuffer> m_writeBufferQueue;
     std::queue<ConnectionID> m_closeQueue;
-    std::priority_queue<Timeout> m_timeoutQueue;
 };
 
 } // namespace SecurityManager
index 97b50d4..7fbdb2c 100644 (file)
@@ -75,17 +75,9 @@ void SocketManager::CreateDefaultReadSocketDescription(int sock)
 
     auto &desc = m_socketDescriptionVector[sock];
     desc.isOpen = true;
-    desc.timeout = monotonicCoarseNow() + SOCKET_TIMEOUT;
+    desc.isActiveThisGeneration = true;
     desc.buffer.InitForInput();
 
-    if (false == desc.isTimeout) {
-        desc.isTimeout = true;
-        Timeout tm;
-        tm.time = desc.timeout;
-        tm.sock = sock;
-        m_timeoutQueue.push(tm);
-    }
-
     RegisterFdForReading(sock);
 }
 
@@ -120,7 +112,7 @@ SocketManager::SocketManager()
 SocketManager::~SocketManager() {
     delete m_service;
 
-    for (size_t i = 0; i < m_socketDescriptionVector.size(); ++i)
+    for (int i = 0; i <= m_maxDesc; ++i)
         if (m_socketDescriptionVector[i].isOpen)
             close(i);
 
@@ -171,8 +163,8 @@ bool SocketManager::GotSigTerm() const {
 
 void SocketManager::ReadyForRead(int sock) {
     auto &desc = m_socketDescriptionVector[sock];
+    desc.isActiveThisGeneration = true;
     auto &buffer = desc.buffer;
-    desc.timeout = monotonicCoarseNow() + SOCKET_TIMEOUT;
 
     ssize_t size = read(sock, buffer.Ptr(), buffer.InputSize());
 
@@ -225,6 +217,7 @@ close:
 
 void SocketManager::ReadyForWrite(int sock) {
     auto &desc = m_socketDescriptionVector[sock];
+    desc.isActiveThisGeneration = true;
     auto &buffer = desc.buffer;
     ssize_t result = write(sock, buffer.Ptr(), buffer.OutputSize());
     if (result == -1) {
@@ -242,7 +235,6 @@ void SocketManager::ReadyForWrite(int sock) {
         return; // We do not want to propagate error to next layer
     }
 
-    desc.timeout = monotonicCoarseNow() + SOCKET_TIMEOUT;
     if (buffer.OutputDone(result))
         CloseSocket(sock);
 }
@@ -258,90 +250,11 @@ void SocketManager::MainLoop() {
         fd_set readSet = m_readSet;
         fd_set writeSet = m_writeSet;
 
-        timeval localTempTimeout;
-        timeval *ptrTimeout = &localTempTimeout;
-
-        // I need to extract timeout from priority_queue.
-        // Timeout in priority_queue may be deprecated.
-        // I need to find some actual one.
-        while (!m_timeoutQueue.empty()) {
-            auto &top = m_timeoutQueue.top();
-            auto &desc = m_socketDescriptionVector[top.sock];
-
-            if (top.time == desc.timeout) {
-                // This timeout matches timeout from socket.
-                // It can be used.
-                break;
-            } else {
-                // This socket was used after timeout in priority queue was set up.
-                // We need to update timeout and find some useable one.
-                Timeout tm = { desc.timeout , top.sock};
-                m_timeoutQueue.pop();
-                m_timeoutQueue.push(tm);
-            }
-        }
-
-        if (m_timeoutQueue.empty()) {
-            LogDebug("No usable timeout found.");
-            ptrTimeout = NULL; // select will wait without timeout
-        } else {
-            time_t currentTime = monotonicCoarseNow();
-            auto &pqTimeout = m_timeoutQueue.top();
-
-            // 0 means that select won't block and socket will be closed ;-)
-            ptrTimeout->tv_sec =
-              currentTime < pqTimeout.time ? pqTimeout.time - currentTime : 0;
-            ptrTimeout->tv_usec = 0;
-//            LogDebug("Set up timeout: " << (int)ptrTimeout->tv_sec
-//                << " seconds. Socket: " << pqTimeout.sock);
-        }
-
-        int ret = select(m_maxDesc+1, &readSet, &writeSet, NULL, ptrTimeout);
-
-        if (0 == ret) { // timeout
-            Assert(!m_timeoutQueue.empty());
-
-            Timeout pqTimeout = m_timeoutQueue.top();
-            m_timeoutQueue.pop();
-
-            auto &desc = m_socketDescriptionVector[pqTimeout.sock];
-
-            if (!desc.isTimeout || !desc.isOpen) {
-                // Connection was closed. Timeout is useless...
-                desc.isTimeout = false;
-                continue;
-            }
-
-            if (pqTimeout.time < desc.timeout) {
-                // Is it possible?
-                // This socket was used after timeout. We need to update timeout.
-                pqTimeout.time = desc.timeout;
-                m_timeoutQueue.push(pqTimeout);
-                continue;
-            }
-
-            // timeout from m_timeoutQueue matches with socket.timeout
-            // and connection is open. Time to close it!
-            // Putting new timeout in queue here is pointless.
-            desc.isTimeout = false;
-            LogWarning("Closing socket because of timeout: " << pqTimeout.sock);
-            CloseSocket(pqTimeout.sock);
-
-            // All done. Now we should process next select ;-)
-            continue;
-        }
-
-        if (-1 == ret) {
-            switch (errno) {
-            case EINTR:
-                LogDebug("EINTR in select");
-                break;
-            default:
-                int err = errno;
-                LogError("Error in select: " << GetErrnoString(err));
-                return;
-            }
-            continue;
+        int ret = TEMP_FAILURE_RETRY(select(m_maxDesc+1, &readSet, &writeSet, nullptr, nullptr));
+        if (ret < 0) {
+            int err = errno;
+            LogError("Error in select: " << GetErrnoString(err));
+            return;
         }
 
         if (FD_ISSET(m_signalFd, &readSet)) {
@@ -362,7 +275,7 @@ void SocketManager::MainLoop() {
             ret--;
         }
 
-        for (int i = 0; i < m_maxDesc+1 && ret; ++i) {
+        for (int i = 0; ret; ++i) {
             if (FD_ISSET(i, &readSet)) {
                 ReadyForRead(i);
                 ret--;
@@ -371,7 +284,9 @@ void SocketManager::MainLoop() {
                 ret--;
             }
         }
+
         ProcessQueue();
+        ProcessTimeout();
     }
 }
 
@@ -525,6 +440,7 @@ void SocketManager::ProcessQueue() {
                 continue;
             }
 
+            desc.isActiveThisGeneration = true;
             desc.buffer = std::move(buffer.buffer);
 
             desc.buffer.ModeOutput();
@@ -549,6 +465,21 @@ void SocketManager::ProcessQueue() {
     }
 }
 
+void SocketManager::ProcessTimeout() {
+    if (const auto now = monotonicCoarseNow(); now > m_nextGenerationStart) {
+        m_nextGenerationStart = now + SOCKET_TIMEOUT;
+        for (int i = 0; i <= m_maxDesc; ++i) {
+            auto &desc = m_socketDescriptionVector[i];
+            if (desc.isOpen) {
+                if (!desc.isActiveThisGeneration)
+                    CloseSocket(i);
+                else
+                    desc.isActiveThisGeneration = false;
+            }
+        }
+    }
+}
+
 void SocketManager::CloseSocket(int sock) {
     LogDebug("Closing socket: " << sock);
     auto &desc = m_socketDescriptionVector[sock];