// Some sockets might have been marked readable again and may already have serialized
// requests in the buffer.
for (int fd : m_fdsToCheckForReadButNotProcessedRequests) {
- switch (handleRead(fd, RawBuffer{})) {
- case HandleReadResult::NEED_MORE_DATA:
- case HandleReadResult::SUSPENDED_AND_MORE_REQUESTS_MAY_BE_IN_THE_BUFFER:
- break;
- case HandleReadResult::ERROR:
- // We assume we can drop already generated but not yet sent responses to previous
- // requests.
- closeSocket(fd);
- break;
+ // fd might become closed due to hangup event for it or write error.
+ if (isFdUsed(fd)) {
+ switch (handleRead(fd, RawBuffer{})) {
+ case HandleReadResult::NEED_MORE_DATA:
+ case HandleReadResult::SUSPENDED_AND_MORE_REQUESTS_MAY_BE_IN_THE_BUFFER:
+ break;
+ case HandleReadResult::ERROR:
+ // We assume we can drop already generated but not yet sent responses to
+ // previous requests.
+ closeSocket(fd);
+ break;
+ }
}
}
m_fdsToCheckForReadButNotProcessedRequests.clear();
// will be used somewhere else in this function.
if (m_needToDisconnectAllClients.exchange(false)) {
LOGD("SocketManager disconnecting all clients");
+ // m_fds.size() may change during iteration of the loop (closeSocket() calls
+ // shrinkFds())
for (size_t fd = 0; fd < m_fds.size(); ++fd) {
- const auto& desc = m_fds[fd];
+ const auto& desc = m_fds[fd]; // safe as fd < m_fds.size()
if (desc.isUsed() && desc.isClient() && !desc.isListen())
closeSocket(fd);
}
- shrinkFds();
}
}
}
auto& res = std::get<NonReadOnlyRequestResult>(resV);
LOGD("main thread: handling NonReadOnlyRequestResult for socket fd [%i] with generation"
" [%i]", res.socketFd, res.socketFdGeneration);
- auto& desc = m_fds[res.socketFd];
- if (desc.isUsed() && desc.getGeneration() == res.socketFdGeneration) {
+ if (isFdUsed(res.socketFd) &&
+ m_fds[res.socketFd].getGeneration() == res.socketFdGeneration) {
// Descriptor was not closed and was not reused.
// There may be some requests from the socket that are already read into
// the buffer but not processed. Schedule reading them.
LOGD("SocketManager readyForRead on fd [%d] end", fd);
}};
+ assert(isFdUsed(fd));
+
if (fd == m_nonReadOnlyRequestResultsNumEventFd) {
LOGD("SocketManager m_nonReadOnlyRequestResultsNumEventFd is ready for read");
switch (handleNonReadOnlyRequestResults()) {
}
}
- auto &desc = m_fds[fd];
- if (desc.isListen()) {
+ if (m_fds[fd].isListen()) {
readyForAccept(fd);
return ReadingResult::OK;
}
LOGD("SocketManager readyForWrite on fd [%d] end", fd);
}};
- auto &desc = m_fds[fd];
- auto &buffer = desc.prepareWriteBuffer();
+ assert(isFdUsed(fd));
+
+ auto &buffer = m_fds[fd].prepareWriteBuffer();
size_t size = buffer.size();
ssize_t result = send(fd, buffer.data(), size, MSG_NOSIGNAL);
if (result == -1) {
LOGD("SocketManager readyForAccept on fd [%d] end", fd);
}};
+ assert(isFdUsed(fd));
+
if (fdsUsageIsHigh()) {
LOGD("SocketManager readyForAccept on fd [%d]: high memory usage -> stop listening", fd);
m_listenSocketsDisabledBecauseOfHighFdUsage.emplace(fd);
logStats();
#endif
- auto &desc = createDescriptorWatchedForRead(clientFd, m_fds[fd].isClient());
+ auto &desc = createDescriptorWatchedForRead(clientFd, m_fds[fd].isClient()); // changes m_fds
desc.setListen(false);
desc.setProtocol(m_fds[fd].protocol()->clone());
desc.setReadOnlyProtocol(m_fds[fd].protocol()->clone());
LOGD("SocketManager closeSocket fd [%d] end", fd);
}};
+ assert(isFdUsed(fd));
+
m_epoll.stopWatching(fd);
Descriptor &desc = m_fds[fd];
LOGD("SocketManager handleRead on fd [%d] end", fd);
}};
+ assert(isFdUsed(fd));
auto &desc = m_fds[fd];
desc.pushReadBuffer(readBuffer);