init_log();
// Increase the soft limit of open file descriptors
+ size_t openFdsLimit = 1024;
rlimit limit;
if (prlimit(0, RLIMIT_NOFILE, nullptr, &limit)) {
LOGE("prlimit() failed - %s", Cynara::safeStrError(errno).c_str());
} else {
+ openFdsLimit = limit.rlim_cur;
LOGW("Open fd limit: %zu", static_cast<size_t>(limit.rlim_cur));
if (limit.rlim_cur < limit.rlim_max) {
limit.rlim_cur = limit.rlim_max;
LOGW("Increasing open fd limit to %zu", static_cast<size_t>(limit.rlim_max));
if (prlimit(0, RLIMIT_NOFILE, &limit, nullptr)) {
LOGE("prlimit() failed - %s", Cynara::safeStrError(errno).c_str());
+ } else {
+ openFdsLimit = limit.rlim_cur;
}
}
}
Cynara::Cynara cynara;
LOGI("Cynara service is starting ...");
- cynara.init();
+ cynara.init(openFdsLimit);
LOGI("Cynara service is started");
#ifdef BUILD_WITH_SYSTEMD_DAEMON
* @brief This file implements socket layer manager for cynara
*/
+#include <atomic>
#include <csignal>
#include <cstdint>
#include <cstring>
namespace Cynara {
-SocketManager::SocketManager() : m_working(false), m_maxDesc(-1) {
+SocketManager::SocketManager(size_t openFdsLimit) : m_openFdsLimit{openFdsLimit}, m_working(false),
+ m_maxDesc(-1) {
FD_ZERO(&m_readSet);
FD_ZERO(&m_writeSet);
}
mainLoop();
}
+void SocketManager::logStats() const {
+ LOGW("stats: pendingNonReadOnlyRequests: %llu",
+ static_cast<unsigned long long>(m_stats.pendingNonReadOnlyRequests));
+ LOGW("stats: clients: %zu, clients without requests: %zu", m_stats.clients.fds.size(),
+ m_stats.clients.fdsYetWithoutRequest.size());
+ LOGW("stats: admins: %zu, admins without requests: %zu", m_stats.admins.fds.size(),
+ m_stats.admins.fdsYetWithoutRequest.size());
+ LOGW("stats: agents: %zu, agents without requests: %zu", m_stats.agents.fds.size(),
+ m_stats.agents.fdsYetWithoutRequest.size());
+ LOGW("stats: monitors: %zu monitors without requests: %zu", m_stats.monitors.fds.size(),
+ m_stats.monitors.fdsYetWithoutRequest.size());
+}
+
void SocketManager::init(void) {
LOGI("SocketManger init start");
const mode_t clientSocketUMask(0);
const mode_t agentSocketUMask(0);
const mode_t monitorSocketUMask(0);
- createDomainSocket(std::make_shared<ProtocolClient>(), PathConfig::SocketPath::client,
- clientSocketUMask, true);
- createDomainSocket(std::make_shared<ProtocolAdmin>(), PathConfig::SocketPath::admin,
- adminSocketUMask0077, false);
- createDomainSocket(std::make_shared<ProtocolAgent>(), PathConfig::SocketPath::agent,
- agentSocketUMask, false);
- createDomainSocket(std::make_shared<ProtocolMonitorGet>(), PathConfig::SocketPath::monitorGet,
- monitorSocketUMask, false);
+ m_forStats.clientSocketFd = createDomainSocket(std::make_shared<ProtocolClient>(),
+ PathConfig::SocketPath::client, clientSocketUMask,
+ true);
+ m_forStats.adminSocketFd = createDomainSocket(std::make_shared<ProtocolAdmin>(),
+ PathConfig::SocketPath::admin, adminSocketUMask0077,
+ false);
+ m_forStats.agentSocketFd = createDomainSocket(std::make_shared<ProtocolAgent>(),
+ PathConfig::SocketPath::agent, agentSocketUMask,
+ false);
+ m_forStats.monitorSocketFd = createDomainSocket(std::make_shared<ProtocolMonitorGet>(),
+ PathConfig::SocketPath::monitorGet,
+ monitorSocketUMask, false);
createSignalSocket(std::make_shared<ProtocolSignal>());
createNonReadOnlyRequestResultsNumEventFd();
// Initialize RO logic
LOGD("non-read-only logic worker thread: handling request with socket fd [%i] with"
" generation [%i] and sequence number [%i]", req.socketFd,
req.socketFdGeneration, static_cast<int>(req.request->sequenceNumber()));
+ m_stats.pendingNonReadOnlyRequests.fetch_sub(1, std::memory_order_relaxed);
// Execute the request.
auto context = RequestContext(req.protocol, req.writeQueue, req.socketFd);
req.request->execute(*m_logic, context);
fd_set readSet = m_readSet;
fd_set writeSet = m_writeSet;
+#if BUILD_TYPE_DEBUG
+ logStats();
+#else
+ if (fdsUsageIsHigh())
+ logStats();
+#endif
int ret = select(m_maxDesc + 1, &readSet, &writeSet, nullptr, nullptr);
if (ret < 0) {
}
LOGD("Accept on sock [%d]. New client socket opened [%d]", fd, clientFd);
+ if (fd == m_forStats.clientSocketFd) {
+ m_stats.clients.newFd(clientFd);
+ } else if (fd == m_forStats.adminSocketFd) {
+ m_stats.admins.newFd(clientFd);
+ } else if (fd == m_forStats.agentSocketFd) {
+ m_stats.agents.newFd(clientFd);
+ } else if (fd == m_forStats.monitorSocketFd) {
+ m_stats.monitors.newFd(clientFd);
+ }
+#if BUILD_TYPE_DEBUG
+ logStats();
+#else
+ if (fdsUsageIsHigh())
+ logStats();
+#endif
+
auto &desc = createDescriptor(clientFd, m_fds[fd].isClient());
desc.setListen(false);
desc.setProtocol(m_fds[fd].protocol()->clone());
fd,
desc.writeQueue(),
});
+
+ // Statistics
+ m_stats.closeFd(fd);
+
removeReadSocket(fd);
removeWriteSocket(fd);
desc.clear();
if (!req) // not enough data to build request yet
break;
LOGD("request extracted");
+ m_stats.seenRequestFor(fd);
auto guard = std::unique_lock{m_readOnlyLogicLock};
if (req->canBeExecutedReadOnly(*m_readOnlyLogic)) {
LOGD("Passing request to the non-read-only logic");
// Pass the request to the m_nonReadOnlyWorkerThread.
+ m_stats.pendingNonReadOnlyRequests.fetch_add(1, std::memory_order_relaxed);
m_nonReadOnlyRequests.send(NonReadOnlyRequest{
fd,
desc.getGeneration(),
return true;
}
-void SocketManager::createDomainSocket(ProtocolPtr protocol, const std::string &path, mode_t mask,
- bool client) {
+int SocketManager::createDomainSocket(ProtocolPtr protocol, const std::string &path, mode_t mask,
+ bool client) {
int fd;
#ifdef BUILD_WITH_SYSTEMD_DAEMON
fd = getSocketFromSystemD(path);
addReadSocket(fd);
LOGD("Domain socket: [%d] added.", fd);
+ return fd;
}
int SocketManager::createDomainSocketHelp(const std::string &path, mode_t mask) {
#ifndef SRC_SERVICE_SOCKETS_SOCKETMANAGER_H_
#define SRC_SERVICE_SOCKETS_SOCKETMANAGER_H_
+#include <atomic>
#include <cstdio>
#include <memory>
#include <thread>
class SocketManager {
public:
- SocketManager();
+ explicit SocketManager(size_t openFdsLimit);
~SocketManager();
void run(void);
bool m_needToDisconnectAllClients = false;
bool m_needToStopMainLoop = false;
+ size_t m_openFdsLimit;
typedef std::vector<Descriptor> FDVector;
FDVector m_fds;
fd_set m_writeSet;
int m_maxDesc;
+ struct ForStats {
+ int clientSocketFd;
+ int adminSocketFd;
+ int agentSocketFd;
+ int monitorSocketFd;
+ } m_forStats;
+
+ struct Stats {
+ struct ProtocolStats {
+ std::set<int> fds;
+ std::set<int> fdsYetWithoutRequest;
+
+ void newFd(int fd) {
+ fds.emplace(fd);
+ fdsYetWithoutRequest.emplace(fd);
+ }
+
+ void closeFd(int fd) {
+ fds.erase(fd);
+ fdsYetWithoutRequest.erase(fd);
+ }
+ };
+
+ std::atomic_uint64_t pendingNonReadOnlyRequests = 0;
+ ProtocolStats clients;
+ ProtocolStats admins;
+ ProtocolStats agents;
+ ProtocolStats monitors;
+
+ size_t openConnections() const noexcept {
+ return clients.fds.size() + admins.fds.size() + agents.fds.size() + monitors.fds.size();
+ }
+
+ void seenRequestFor(int fd) {
+ clients.fdsYetWithoutRequest.erase(fd);
+ admins.fdsYetWithoutRequest.erase(fd);
+ agents.fdsYetWithoutRequest.erase(fd);
+ monitors.fdsYetWithoutRequest.erase(fd);
+ }
+
+ void closeFd(int fd) {
+ clients.closeFd(fd);
+ admins.closeFd(fd);
+ agents.closeFd(fd);
+ monitors.closeFd(fd);
+ }
+ } m_stats;
+
+ void logStats() const;
+
+ bool fdsUsageIsHigh() const noexcept {
+ // System becomes unresponsive when number of clients gets close to 1024, no matter if the
+ // open fd limit is higher or not. So we cap it at around 800.
+ return m_stats.openConnections() > std::min<size_t>(800, m_openFdsLimit * 7 / 8);
+ }
+
void init(void);
void mainLoop(void);
void closeSocket(int fd);
bool handleRead(int fd, const RawBuffer &readbuffer);
- void createDomainSocket(ProtocolPtr protocol, const std::string &path, mode_t mask,
- bool client);
+ int createDomainSocket(ProtocolPtr protocol, const std::string &path, mode_t mask,
+ bool client);
static int createDomainSocketHelp(const std::string &path, mode_t mask);
#ifdef BUILD_WITH_SYSTEMD_DAEMON
static int getSocketFromSystemD(const std::string &path);