--- /dev/null
+/*
+ * Copyright (c) 2024 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * This file is licensed under the terms of MIT License or the Apache License
+ * Version 2.0 of your choice. See the LICENSE.MIT file for MIT license details.
+ * See the LICENSE file or the notice below for Apache License Version 2.0
+ * details.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * @file src/service/sockets/Epoll.h
+ * @author Krzysztof Malysa <k.malysa@samsung.com>
+ * @version 1.0
+ * @brief This file declares and implements epoll() abstraction class
+ */
+
+#include <array>
+#include <cstdint>
+#include <cstring>
+#include <error/SafeStrError.h>
+#include <exceptions/UnexpectedErrorException.h>
+#include <log/log.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+#include <vector>
+
+namespace Cynara {
+
+class Epoll {
+public:
+ Epoll() : m_epollFd{epoll_create1(EPOLL_CLOEXEC)} {
+ if (m_epollFd < 0) {
+ int err = errno;
+ LOGE("Failed to create epoll fd <%s>", safeStrError(err).c_str());
+ throw UnexpectedErrorException(err, safeStrError(err));
+ }
+ }
+
+ ~Epoll() { (void)close(m_epollFd); }
+
+ Epoll(const Epoll &) = delete;
+ Epoll(Epoll &&) = delete;
+ Epoll &operator=(const Epoll &) = delete;
+ Epoll &operator=(Epoll &&) = delete;
+
+ struct Events {
+ bool readable : 1;
+ bool writable : 1;
+ bool error : 1;
+ bool hangup : 1;
+ };
+
+ struct FdWithEvents : public Events {
+ int fd;
+ };
+
+ struct EventsToWatch {
+ bool readable : 1;
+ bool writable : 1;
+ };
+
+ void startWatching(int fd, EventsToWatch eventsToWatch) {
+ size_t minRequiredSize = static_cast<size_t>(fd) * 2 + 2;
+ if (m_eventsToWatch.size() < minRequiredSize) {
+ m_eventsToWatch.resize(minRequiredSize);
+ }
+
+ m_eventsToWatch[fd << 1] = eventsToWatch.readable;
+ m_eventsToWatch[fd << 1 | 1] = eventsToWatch.writable;
+
+ epoll_event ev;
+ ev.events = eventsToWatchToEpollEvents(eventsToWatch);
+ std::memset(&ev.data, 0, sizeof(ev.data));
+ ev.data.fd = fd;
+ if (epoll_ctl(m_epollFd, EPOLL_CTL_ADD, fd, &ev)) {
+ int err = errno;
+ LOGE("epoll_ctl(ADD, %i) failed <%s>", fd, safeStrError(err).c_str());
+ throw UnexpectedErrorException(err, safeStrError(err));
+ }
+ LOGD("epoll: started watching fd [%i] for %s", fd, eventsToWatchToStr(eventsToWatch));
+ }
+
+ void stopWatching(int fd) {
+ if (epoll_ctl(m_epollFd, EPOLL_CTL_DEL, fd, nullptr)) {
+ int err = errno;
+ LOGE("epoll_ctl(DEL, %i) failed <%s>", fd, safeStrError(err).c_str());
+ throw UnexpectedErrorException(err, safeStrError(err));
+ }
+ // We don't care about m_eventsToWatch, since they are reset upon reuse
+ LOGD("epoll: stopped watching fd [%i]", fd);
+ }
+
+ EventsToWatch getWatchedEvents(int fd) const noexcept {
+ return EventsToWatch{m_eventsToWatch[fd << 1], m_eventsToWatch[fd << 1 | 1]};
+ }
+
+ void watchReadEvents(int fd, bool watch) {
+ auto watchedEvents = getWatchedEvents(fd);
+ watchedEvents.readable = watch;
+ changeWatchedEvents(fd, watchedEvents);
+ }
+
+ void watchWriteEvents(int fd, bool watch) {
+ auto watchedEvents = getWatchedEvents(fd);
+ watchedEvents.writable = watch;
+ changeWatchedEvents(fd, watchedEvents);
+ }
+
+ void changeWatchedEvents(int fd, EventsToWatch newEventsToWatch) {
+ epoll_event ev;
+ ev.events = eventsToWatchToEpollEvents(newEventsToWatch);
+ std::memset(&ev.data, 0, sizeof(ev.data));
+ ev.data.fd = fd;
+ if (epoll_ctl(m_epollFd, EPOLL_CTL_MOD, fd, &ev)) {
+ int err = errno;
+ LOGE("epoll_ctl(MOD, %i) failed <%s>", fd, safeStrError(err).c_str());
+ throw UnexpectedErrorException(err, safeStrError(err));
+ }
+ // Needs to be done after successful epoll_ctl()
+ m_eventsToWatch[fd << 1] = newEventsToWatch.readable;
+ m_eventsToWatch[fd << 1 | 1] = newEventsToWatch.writable;
+ LOGD("epoll: changed watched events on fd [%i] to %s",
+ fd,
+ eventsToWatchToStr(newEventsToWatch));
+ }
+
+ template <size_t MAX_EVENTS_NUM>
+ std::vector<FdWithEvents> awaitEvents() {
+ std::array<epoll_event, MAX_EVENTS_NUM> events;
+ int eventsNum;
+ for (;;) {
+ eventsNum = epoll_wait(m_epollFd, events.data(), events.size(), -1);
+ if (eventsNum < 0) {
+ if (errno == EINTR)
+ continue;
+ int err = errno;
+ LOGE("epoll_wait() failed: <%s>", safeStrError(err).c_str());
+ throw UnexpectedErrorException(err, safeStrError(err));
+ }
+ if (eventsNum == 0) {
+ LOGE("epoll_wait() unexpectedly returned 0");
+ throw UnexpectedErrorException{"epoll_wait() unexpectedly returned 0"};
+ }
+ break;
+ }
+
+ std::vector<FdWithEvents> res;
+ res.reserve(eventsNum);
+ for (int i = 0; i < eventsNum; ++i) {
+ const auto &event = events[i];
+ res.emplace_back(FdWithEvents{
+ {
+ static_cast<bool>(event.events & EPOLLIN),
+ static_cast<bool>(event.events & EPOLLOUT),
+ static_cast<bool>(event.events & EPOLLERR),
+ static_cast<bool>(event.events & EPOLLHUP),
+ },
+ event.data.fd,
+ });
+ }
+ return res;
+ }
+
+private:
+ [[nodiscard]] static decltype(epoll_event::events)
+ eventsToWatchToEpollEvents(EventsToWatch eventsToWatch) noexcept {
+ decltype(epoll_event::events) res = 0;
+ if (eventsToWatch.readable)
+ res |= EPOLLIN;
+ if (eventsToWatch.writable)
+ res |= EPOLLOUT;
+ return res;
+ }
+
+ [[nodiscard]] static const char *eventsToWatchToStr(EventsToWatch eventsToWatch) noexcept {
+ if (eventsToWatch.readable) {
+ if (eventsToWatch.writable) {
+ return "READ & WRITE";
+ }
+ return "READ";
+ }
+ if (eventsToWatch.writable) {
+ return "WRITE";
+ }
+ return "just errors";
+ }
+
+ int m_epollFd;
+ std::vector<bool> m_eventsToWatch; // memory-optimal way of storing the RW bits
+};
+
+} // namespace Cynara
* @file src/service/sockets/SocketManager.cpp
* @author Lukasz Wojciechowski <l.wojciechow@partner.samsung.com>
* @author Adam Malinowski <a.malinowsk2@partner.samsung.com>
+ * @author Krzysztof Malysa <k.malysa@samsung.com>
* @version 1.0
* @brief This file implements socket layer manager for cynara
*/
+#include <array>
#include <atomic>
+#include <cassert>
+#include <cinttypes>
#include <csignal>
#include <cstdint>
#include <cstring>
#include <memory>
#include <mutex>
#include <sys/eventfd.h>
-#include <sys/select.h>
#include <sys/signalfd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <attributes/attributes.h>
#include <common.h>
#include <config/PathConfig.h>
+#include <containers/BinaryQueue.h>
+#include <containers/MutexedBinaryQueue.h>
+#include <containers/RawBuffer.h>
#include <error/SafeStrError.h>
#include <exceptions/DescriptorNotExistsException.h>
#include <exceptions/InitException.h>
#include <exceptions/UnexpectedErrorException.h>
#include <log/log.h>
-
-#include <containers/BinaryQueue.h>
-#include <containers/MutexedBinaryQueue.h>
-#include <containers/RawBuffer.h>
#include <logic/Logic.h>
#include <main/Cynara.h>
#include <protocol/ProtocolAdmin.h>
#include <request/pointers.h>
#include <request/RequestContext.h>
#include <response/pointers.h>
+#include <utils/CallInDestructor.h>
#include "SocketManager.h"
namespace Cynara {
-SocketManager::SocketManager(size_t openFdsLimit) : m_openFdsLimit{openFdsLimit}, m_working(false),
- m_maxDesc(-1) {
- FD_ZERO(&m_readSet);
- FD_ZERO(&m_writeSet);
-}
-
-SocketManager::~SocketManager() {
-}
-
-void SocketManager::run(void) {
+void SocketManager::run() {
init();
mainLoop();
}
m_stats.monitors.fdsYetWithoutRequest.size());
}
-void SocketManager::init(void) {
- LOGI("SocketManger init start");
+void SocketManager::init() {
+ LOGI("SocketManager init start");
+ auto functionDoneLogger = CallInDestructor{[] {
+ LOGI("SocketManager init end");
+ }};
+
const mode_t clientSocketUMask(0);
const mode_t adminSocketUMask0077(S_IRWXG | S_IRWXO);
const mode_t agentSocketUMask(0);
createNonReadOnlyRequestResultsNumEventFd();
// Initialize RO logic
m_readOnlyLogic = m_logic->createReadOnlyCopy();
-
- LOGI("SocketManger init done");
}
-void SocketManager::mainLoop(void) {
- LOGI("SocketManger mainLoop start");
+void SocketManager::mainLoop() {
+ LOGI("SocketManager mainLoop start");
+ auto functionDoneLogger = CallInDestructor{[] {
+ LOGI("SocketManager mainLoop end");
+ }};
m_nonReadOnlyWorkerThread = std::thread([this] {
for (;;) {
// Execute the request.
auto context = RequestContext(req.protocol, req.writeQueue, req.socketFd);
req.request->execute(*m_logic, context);
- LOGD("Response size: [%i]",
+ LOGD("non-read-only logic worker thread: response size: [%i]",
static_cast<int>(req.writeQueue->lock()->size()));
// Install new RO logic
{
m_readOnlyLogic = std::move(readOnlyLogic);
}
LOGD("non-read-only logic worker thread: sending response to request with socket fd"
- " [%i] with generation [%i] and sequence number [%i] of size [%i]",
+ " [%i] with generation [%" PRIu64 "] and sequence number [%i] of size [%i]",
req.socketFd, req.socketFdGeneration,
static_cast<int>(req.request->sequenceNumber()),
static_cast<int>(req.writeQueue->lock()->size()));
m_nonReadOnlyRequestResults.send(NonReadOnlyRequestResult{
req.socketFd,
req.socketFdGeneration,
- std::exchange(m_needToDisconnectAllClients, false),
std::exchange(m_needToStopMainLoop, false),
});
notifyTheMainThread();
}
});
- m_working = true;
- while (m_working) {
- fd_set readSet = m_readSet;
- fd_set writeSet = m_writeSet;
+ // End the other thread on loop exit or stack unwinding.
+ auto threadGuard = CallInDestructor{[this] {
+ m_nonReadOnlyRequests.send(NoMoreRequests{});
+ m_nonReadOnlyWorkerThread.join();
+ }};
+ for (;;) {
#if BUILD_TYPE_DEBUG
logStats();
#else
if (fdsUsageIsHigh())
logStats();
#endif
- int ret = select(m_maxDesc + 1, &readSet, &writeSet, nullptr, nullptr);
-
- if (ret < 0) {
- switch (errno) {
- case EINTR:
+ auto events = m_epoll.awaitEvents<32>();
+ // We do not have to worry about outdated events (a scenario where an event is reported for
+ // a file descriptor but processing earlier events in this batch caused the file descriptor
+ // to be closed and potentially other file descriptor might be opened with the same number
+ // before we started processing the event for it) because we close the file descriptor only
+ // after processing event reported for it is done.
+ for (const auto& event : events) {
+ // There are some cases where client writes a request and closes the connection and the
+ // request is valid and should be processed. This is the case with
+ // MonitorEntriesPutRequest. So we have to process readability event after the socket
+ // closes.
+ if ((event.error || event.hangup) && !event.readable) {
+ LOGN("Socket [%i] closed on the other end without becoming readable", event.fd);
+ closeSocket(event.fd);
continue;
- default:
- int err = errno;
- throw UnexpectedErrorException(err, safeStrError(err));
}
- } else if (ret > 0) {
- // First, write responses
- int readyForReadNum = 0;
- for (int i = 0; i < m_maxDesc + 1 && ret; ++i) {
- if (FD_ISSET(i, &writeSet)) {
- readyForWrite(i);
- --ret;
- }
- if (FD_ISSET(i, &readSet)) {
- --ret;
- ++readyForReadNum;
+ // Write first.
+ if (event.writable) {
+ switch (readyForWrite(event.fd)) {
+ case WritingResult::THERE_STILL_IS_DATA_TO_WRITE: break;
+ case WritingResult::WRITTEN_ALL_DATA:
+ m_epoll.watchWriteEvents(event.fd, false);
+ break;
+ case WritingResult::ERROR:
+ closeSocket(event.fd);
+ continue;
}
}
- // Then accept new connections and requests
- for (int i = 0; i < m_maxDesc + 1 && readyForReadNum; ++i) {
- if (FD_ISSET(i, &readSet)) {
- readyForRead(i);
- --readyForReadNum;
+ // File descriptor might have been marked to ignore reads temporarily.
+ if (event.readable && m_epoll.getWatchedEvents(event.fd).readable) {
+ switch (readyForRead(event.fd)) {
+ case ReadingResult::OK: break;
+ case ReadingResult::STOP_MAIN_LOOP: return;
+ case ReadingResult::ERROR:
+ // We assume we can drop already generated but not yet sent responses to
+ // previous requests.
+ case ReadingResult::GOT_EOF:
+ // We assume (client is implemented in such a way) that client will not read
+ // from socket after closing it for writing, so we can close both ends after we
+ // see the client closed their write end.
+ closeSocket(event.fd);
+ continue;
}
}
+ }
- LOGD("checking sockets <= %i for data to write", m_maxDesc);
- for (int i = 0; i < m_maxDesc + 1; ++i) {
- if (m_fds[i].isUsed() && m_fds[i].hasDataToWrite()) {
- LOGD("socket [%i] has data to write", i);
- addWriteSocket(i);
- }
+ // Some sockets might have been marked readable again and may already have serialized
+ // requests in the buffer.
+ for (int fd : m_fdsToCheckForReadButNotProcessedRequests) {
+ switch (handleRead(fd, RawBuffer{})) {
+ case HandleReadResult::NEED_MORE_DATA:
+ case HandleReadResult::SUSPENDED_AND_MORE_REQUESTS_MAY_BE_IN_THE_BUFFER:
+ break;
+ case HandleReadResult::ERROR:
+ // We assume we can drop already generated but not yet sent responses to previous
+ // requests.
+ closeSocket(fd);
+ break;
+ }
+ }
+ m_fdsToCheckForReadButNotProcessedRequests.clear();
+
+ // TODO: do it more optimally by some marking which fds have data to write
+ LOGD("checking sockets < %zu for data to write", m_fds.size());
+ for (size_t fd = 0; fd < m_fds.size(); ++fd) {
+ if (m_fds[fd].isUsed() && m_fds[fd].hasDataToWrite()) {
+ LOGD("socket [%i] has data to write", fd);
+ m_epoll.watchWriteEvents(fd, true);
}
}
- }
-
- m_nonReadOnlyRequests.send(NoMoreRequests{});
- m_nonReadOnlyWorkerThread.join();
- LOGI("SocketManger mainLoop done");
+ // If we noticed that some socket has data to write. We have to check
+ // m_needToDisconnectAllClients and disconnect all clients if true BEFORE writing
+ // the response. All this to retain soundness of the client and admin API.
+ // Now we are safe to disconnect all clients without invalidating any file descriptor that
+ // will be used somewhere else in this function.
+ if (m_needToDisconnectAllClients.exchange(false)) {
+ LOGD("SocketManager disconnecting all clients");
+ for (size_t fd = 0; fd < m_fds.size(); ++fd) {
+ const auto& desc = m_fds[fd];
+ if (desc.isUsed() && desc.isClient() && !desc.isListen())
+ closeSocket(fd);
+ }
+ shrinkFds();
+ }
+ }
}
-void SocketManager::mainLoopStop(void) {
- m_working = false;
+SocketManager::HandleNonReadOnlyRequestResultsResult
+SocketManager::handleNonReadOnlyRequestResults() {
+ uint64_t resNum = 0;
+ if (read(m_nonReadOnlyRequestResultsNumEventFd, &resNum, sizeof(resNum)) != sizeof(resNum)) {
+ int err = errno;
+ LOGE("Failed to read from eventfd <%s>", safeStrError(err).c_str());
+ throw UnexpectedErrorException(err, safeStrError(err));
+ }
+ for (uint64_t i = 0; i < resNum; ++i) {
+ auto resV = m_nonReadOnlyRequestResults.recv();
+ if (std::holds_alternative<CloseContextResult>(resV))
+ continue;
+
+ if (std::holds_alternative<NonReadOnlyRequestResult>(resV)) {
+ auto& res = std::get<NonReadOnlyRequestResult>(resV);
+ LOGD("main thread: handling NonReadOnlyRequestResult for socket fd [%i] with generation"
+ " [%i]", res.socketFd, res.socketFdGeneration);
+ auto& desc = m_fds[res.socketFd];
+ if (desc.isUsed() && desc.getGeneration() == res.socketFdGeneration) {
+ // Descriptor was not closed and was not reused.
+ // There may be some requests from the socket that are already read into
+ // the buffer but not processed. Schedule reading them.
+ m_epoll.watchReadEvents(res.socketFd, true);
+ m_fdsToCheckForReadButNotProcessedRequests.emplace_back(res.socketFd);
+ }
+
+ if (res.stopMainLoop)
+ return HandleNonReadOnlyRequestResultsResult::STOP_MAIN_LOOP;
+ }
+ }
+ return HandleNonReadOnlyRequestResultsResult::OK;
}
-void SocketManager::readyForRead(int fd) {
- LOGD("SocketManger readyForRead on fd [%d] start", fd);
+SocketManager::ReadingResult SocketManager::readyForRead(int fd) {
+ LOGD("SocketManager readyForRead on fd [%d] start", fd);
+ auto functionDoneLogger = CallInDestructor{[fd] {
+ LOGD("SocketManager readyForRead on fd [%d] end", fd);
+ }};
if (fd == m_nonReadOnlyRequestResultsNumEventFd) {
LOGD("SocketManager m_nonReadOnlyRequestResultsNumEventFd is ready for read");
- uint64_t resNum = 0;
- if (read(fd, &resNum, sizeof(resNum)) != sizeof(resNum)) {
- int err = errno;
- LOGE("Failed to read from eventfd <%s>", safeStrError(err).c_str());
- throw UnexpectedErrorException(err, safeStrError(err));
+ switch (handleNonReadOnlyRequestResults()) {
+ case HandleNonReadOnlyRequestResultsResult::OK: return ReadingResult::OK;
+ case HandleNonReadOnlyRequestResultsResult::STOP_MAIN_LOOP:
+ return ReadingResult::STOP_MAIN_LOOP;
}
- for (uint64_t i = 0; i < resNum; ++i) {
- auto resV = m_nonReadOnlyRequestResults.recv();
- if (std::holds_alternative<CloseContextResult>(resV))
- continue;
-
- if (std::holds_alternative<NonReadOnlyRequestResult>(resV)) {
- auto& res = std::get<NonReadOnlyRequestResult>(resV);
- LOGD("main thread: handling response to request with socket fd [%i] with"
- " generation [%i]",
- res.socketFd, res.socketFdGeneration);
- if (res.socketFd == -1)
- continue;
- // Handle the response
- auto& desc = m_fds[res.socketFd];
- if (desc.isUsed() && desc.getGeneration() == res.socketFdGeneration) {
- // Descriptor was not closed and was not reused.
- // Now we can safely read and process the other requests from the socket.
- addReadSocket(res.socketFd);
- // Process next requests if there are any on this socket
- handleRead(res.socketFd, RawBuffer{});
- }
- // Process extra events
- if (res.disconnectAllClients)
- disconnectAllClients();
- if (res.stopMainLoop)
- mainLoopStop();
- }
- }
- return;
}
auto &desc = m_fds[fd];
if (desc.isListen()) {
readyForAccept(fd);
- return;
+ return ReadingResult::OK;
}
RawBuffer readBuffer(DEFAULT_BUFFER_SIZE);
ssize_t size = read(fd, readBuffer.data(), DEFAULT_BUFFER_SIZE);
-
if (size > 0) {
LOGD("read [%zd] bytes", size);
readBuffer.resize(size);
- if (handleRead(fd, readBuffer)) {
- LOGD("SocketManger readyForRead on fd [%d] successfully done", fd);
- return;
+ switch (handleRead(fd, readBuffer)) {
+ case HandleReadResult::NEED_MORE_DATA:
+ case HandleReadResult::SUSPENDED_AND_MORE_REQUESTS_MAY_BE_IN_THE_BUFFER:
+ return ReadingResult::OK;
+ case HandleReadResult::ERROR:
+ LOGI("interpreting buffer read from [%d] failed", fd);
+ return ReadingResult::ERROR;
}
- LOGI("interpreting buffer read from [%d] failed", fd);
- } else if (size < 0) {
+ }
+
+ if (size < 0) {
int err = errno;
switch (err) {
case EAGAIN:
case EWOULDBLOCK:
#endif
case EINTR:
- return;
+ return ReadingResult::OK; // Reading will be tried again later.
default:
- LOGW("While reading from [%d] socket, error [%d]:<%s>",
- fd, err, safeStrError(err).c_str());
+ LOGW("While reading from [%d] socket, error [%d]: <%s>", fd, err,
+ safeStrError(err).c_str());
+ return ReadingResult::ERROR;
}
- } else {
- LOGN("Socket [%d] closed on other end", fd);
}
- closeSocket(fd);
- LOGD("SocketManger readyForRead on fd [%d] done", fd);
+
+ LOGN("Socket [%d] closed on other end", fd);
+ return ReadingResult::GOT_EOF;
}
-void SocketManager::readyForWrite(int fd) {
- LOGD("SocketManger readyForWrite on fd [%d] start", fd);
+SocketManager::WritingResult SocketManager::readyForWrite(int fd) {
+ LOGD("SocketManager readyForWrite on fd [%d] start", fd);
+ auto functionDoneLogger = CallInDestructor{[fd] {
+ LOGD("SocketManager readyForWrite on fd [%d] end", fd);
+ }};
+
auto &desc = m_fds[fd];
auto &buffer = desc.prepareWriteBuffer();
size_t size = buffer.size();
switch (err) {
case EAGAIN:
case EINTR:
- // select will trigger write once again, nothing to do
- break;
+ // epoll will trigger write once again, nothing to do
+ return WritingResult::THERE_STILL_IS_DATA_TO_WRITE;
case EPIPE:
default:
- LOGD("Error during write to fd [%d]:<%s> ", fd, safeStrError(err).c_str());
- closeSocket(fd);
- break;
+ LOGD("Error during write to fd [%d]: <%s> ", fd, safeStrError(err).c_str());
+ return WritingResult::ERROR;
}
- return; // We do not want to propagate error to next layer
}
LOGD("written [%zd] bytes", result);
buffer.erase(buffer.begin(), buffer.begin() + result);
- if (buffer.empty())
- removeWriteSocket(fd);
- LOGD("SocketManger readyForWrite on fd [%d] done", fd);
+ return buffer.empty() ? WritingResult::WRITTEN_ALL_DATA :
+ WritingResult::THERE_STILL_IS_DATA_TO_WRITE;
}
void SocketManager::readyForAccept(int fd) {
+ LOGD("SocketManager readyForAccept on fd [%d] start", fd);
+ auto functionDoneLogger = CallInDestructor{[fd] {
+ LOGD("SocketManager readyForAccept on fd [%d] end", fd);
+ }};
+
if (fdsUsageIsHigh()) {
- LOGD("SocketManger readyForAccept on fd [%d]: high memory usage -> stop listening", fd);
+ LOGD("SocketManager readyForAccept on fd [%d]: high memory usage -> stop listening", fd);
m_listenSocketsDisabledBecauseOfHighFdUsage.emplace(fd);
- removeReadSocket(fd);
+
+ m_epoll.watchReadEvents(fd, false);
return;
}
- LOGD("SocketManger readyForAccept on fd [%d] start", fd);
struct sockaddr_un clientAddr;
unsigned int clientLen = sizeof(clientAddr);
int clientFd = accept4(fd, (struct sockaddr*) &clientAddr, &clientLen, SOCK_NONBLOCK);
logStats();
#endif
- auto &desc = createDescriptor(clientFd, m_fds[fd].isClient());
+ auto &desc = createDescriptorWatchedForRead(clientFd, m_fds[fd].isClient());
desc.setListen(false);
desc.setProtocol(m_fds[fd].protocol()->clone());
desc.setReadOnlyProtocol(m_fds[fd].protocol()->clone());
- addReadSocket(clientFd);
- LOGD("SocketManger readyForAccept on fd [%d] done", fd);
}
void SocketManager::closeSocket(int fd) {
- LOGD("SocketManger closeSocket fd [%d] start", fd);
+ LOGD("SocketManager closeSocket fd [%d] start", fd);
+ auto functionDoneLogger = CallInDestructor{[fd] {
+ LOGD("SocketManager closeSocket fd [%d] end", fd);
+ }};
+
+ m_epoll.stopWatching(fd);
+
Descriptor &desc = m_fds[fd];
m_nonReadOnlyRequests.send(CloseContext{
fd,
// Statistics
m_stats.closeFd(fd);
- removeReadSocket(fd);
- removeWriteSocket(fd);
desc.clear();
+ shrinkFds();
+
close(fd);
// Restore sockets that were disabled due to high fd usage
if (fdUsageWasHigh && !fdsUsageIsHigh()) {
for (auto fd : m_listenSocketsDisabledBecauseOfHighFdUsage) {
- LOGD("SocketManger: start listening again on fd [%d]", fd);
- addReadSocket(fd);
+ LOGD("SocketManager: start listening again on fd [%d]", fd);
+ m_epoll.watchReadEvents(fd, true);
}
m_listenSocketsDisabledBecauseOfHighFdUsage.clear();
}
-
- LOGD("SocketManger closeSocket fd [%d] done", fd);
}
-bool SocketManager::handleRead(int fd, const RawBuffer &readbuffer) {
- LOGD("SocketManger handleRead on fd [%d] start", fd);
+SocketManager::HandleReadResult SocketManager::handleRead(int fd, const RawBuffer &readBuffer) {
+ LOGD("SocketManager handleRead on fd [%d] start", fd);
+ auto functionDoneLogger = CallInDestructor{[fd] {
+ LOGD("SocketManager handleRead on fd [%d] end", fd);
+ }};
+
auto &desc = m_fds[fd];
- desc.pushReadBuffer(readbuffer);
+ desc.pushReadBuffer(readBuffer);
try {
while(true) {
- //try extract request from binary data received on socket
+ // Try extract request from binary data received on socket.
auto req = desc.extractRequest();
- if (!req) // not enough data to build request yet
- break;
- LOGD("request extracted");
+ if (!req) {
+ // Not enough data to build request yet.
+ return HandleReadResult::NEED_MORE_DATA;
+ }
+ LOGD("request extracted: %s", req->name());
m_stats.seenRequestFor(fd);
auto guard = std::unique_lock{m_readOnlyLogicLock};
// request comes after the write request, but we handle it before the write request
// finishes and return the answer based on the old policy. Or use desc.writeQueue()
// by read-only logic.
- removeReadSocket(fd);
+ m_epoll.watchReadEvents(fd, false);
LOGD("Passing request to the non-read-only logic");
// Pass the request to the m_nonReadOnlyWorkerThread.
desc.protocol(),
desc.writeQueue(),
});
- break;
+ return HandleReadResult::SUSPENDED_AND_MORE_REQUESTS_MAY_BE_IN_THE_BUFFER;
}
}
} catch (const Exception &ex) {
LOGE("Error handling request <%s>. Closing socket", ex.what());
- return false;
+ return HandleReadResult::ERROR;
}
- LOGD("SocketManger handleRead on fd [%d] done", fd);
- return true;
}
int SocketManager::createDomainSocket(ProtocolPtr protocol, const std::string &path, mode_t mask,
#endif
fd = createDomainSocketHelp(path, mask);
- auto &desc = createDescriptor(fd, client);
+ auto &desc = createDescriptorWatchedForRead(fd, client);
desc.setListen(true);
desc.setProtocol(protocol);
desc.setReadOnlyProtocol(protocol->clone());
- addReadSocket(fd);
-
LOGD("Domain socket: [%d] added.", fd);
return fd;
}
for (int fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; ++fd) {
if (sd_is_socket_unix(fd, SOCK_STREAM, 1, path.c_str(), 0) > 0) {
- LOGI("Useable socket <%s> was passed by SystemD under descriptor [%d]",
- path.c_str(), fd);
+ LOGI("Useable socket <%s> was passed by SystemD under descriptor [%d]", path.c_str(),
+ fd);
return fd;
}
}
return;
}
- auto &desc = createDescriptor(fd, false);
+ auto &desc = createDescriptorWatchedForRead(fd, false);
desc.setListen(false);
desc.setProtocol(protocol);
desc.setReadOnlyProtocol(protocol->clone());
- addReadSocket(fd);
-
LOGD("Signal socket: [%d] added.", fd);
}
}
m_nonReadOnlyRequestResultsNumEventFd = efd;
// Mark the efd to be listened on for read events
- auto& desc = createDescriptor(efd, false);
+ auto& desc = createDescriptorWatchedForRead(efd, false);
desc.setListen(false);
desc.setProtocol(nullptr);
desc.setReadOnlyProtocol(nullptr);
- addReadSocket(efd);
- LOGD("SocketManger created nonReadOnlyRequestResultsNumEventFd [%d]", efd);
+ LOGD("SocketManager created nonReadOnlyRequestResultsNumEventFd [%d]", efd);
}
-Descriptor &SocketManager::createDescriptor(int fd, bool client) {
- if (fd > m_maxDesc) {
- m_maxDesc = fd;
- if (fd >= static_cast<int>(m_fds.size()))
- m_fds.resize(fd + 20);
- }
+Descriptor &SocketManager::createDescriptorWatchedForRead(int fd, bool client) {
+ assert(fd >= 0);
+ if (static_cast<size_t>(fd) >= m_fds.size())
+ m_fds.resize(fd + 1);
auto &desc = m_fds[fd];
- desc.setUsed(true);
- desc.setClient(client);
- return desc;
-}
-void SocketManager::addReadSocket(int fd) {
- FD_SET(fd, &m_readSet);
-}
-
-void SocketManager::removeReadSocket(int fd) {
- FD_CLR(fd, &m_readSet);
-}
+ m_epoll.startWatching(fd, Epoll::EventsToWatch{true, false});
-void SocketManager::addWriteSocket(int fd) {
- FD_SET(fd, &m_writeSet);
-}
-
-void SocketManager::removeWriteSocket(int fd) {
- FD_CLR(fd, &m_writeSet);
+ desc.setUsed(true); // Needs to be done after potential exceptions from epoll.
+ desc.setClient(client);
+ return desc;
}
-void SocketManager::disconnectAllClients(void) {
- for(int i = 0; i <= m_maxDesc; ++i) {
- auto &desc = m_fds[i];
- if(desc.isUsed() && desc.isClient() && !desc.isListen())
- closeSocket(i);
+void SocketManager::shrinkFds() {
+ // Reduce size of m_fds to speed up processing
+ size_t newSize = m_fds.size();
+ while (newSize > 0 && !m_fds[newSize - 1].isUsed()) {
+ --newSize;
}
+ m_fds.resize(newSize);
}
} // namespace Cynara