auto &desc = m_socketDescriptionVector[sock];
desc.isOpen = true;
- desc.timeout = monotonicCoarseNow() + SOCKET_TIMEOUT;
+ desc.isActiveThisGeneration = true;
desc.buffer.InitForInput();
- if (false == desc.isTimeout) {
- desc.isTimeout = true;
- Timeout tm;
- tm.time = desc.timeout;
- tm.sock = sock;
- m_timeoutQueue.push(tm);
- }
-
RegisterFdForReading(sock);
}
SocketManager::~SocketManager() {
delete m_service;
- for (size_t i = 0; i < m_socketDescriptionVector.size(); ++i)
+ for (int i = 0; i <= m_maxDesc; ++i)
if (m_socketDescriptionVector[i].isOpen)
close(i);
void SocketManager::ReadyForRead(int sock) {
auto &desc = m_socketDescriptionVector[sock];
+ desc.isActiveThisGeneration = true;
auto &buffer = desc.buffer;
- desc.timeout = monotonicCoarseNow() + SOCKET_TIMEOUT;
ssize_t size = read(sock, buffer.Ptr(), buffer.InputSize());
void SocketManager::ReadyForWrite(int sock) {
auto &desc = m_socketDescriptionVector[sock];
+ desc.isActiveThisGeneration = true;
auto &buffer = desc.buffer;
ssize_t result = write(sock, buffer.Ptr(), buffer.OutputSize());
if (result == -1) {
return; // We do not want to propagate error to next layer
}
- desc.timeout = monotonicCoarseNow() + SOCKET_TIMEOUT;
if (buffer.OutputDone(result))
CloseSocket(sock);
}
fd_set readSet = m_readSet;
fd_set writeSet = m_writeSet;
- timeval localTempTimeout;
- timeval *ptrTimeout = &localTempTimeout;
-
- // I need to extract timeout from priority_queue.
- // Timeout in priority_queue may be deprecated.
- // I need to find some actual one.
- while (!m_timeoutQueue.empty()) {
- auto &top = m_timeoutQueue.top();
- auto &desc = m_socketDescriptionVector[top.sock];
-
- if (top.time == desc.timeout) {
- // This timeout matches timeout from socket.
- // It can be used.
- break;
- } else {
- // This socket was used after timeout in priority queue was set up.
- // We need to update timeout and find some useable one.
- Timeout tm = { desc.timeout , top.sock};
- m_timeoutQueue.pop();
- m_timeoutQueue.push(tm);
- }
- }
-
- if (m_timeoutQueue.empty()) {
- LogDebug("No usable timeout found.");
- ptrTimeout = NULL; // select will wait without timeout
- } else {
- time_t currentTime = monotonicCoarseNow();
- auto &pqTimeout = m_timeoutQueue.top();
-
- // 0 means that select won't block and socket will be closed ;-)
- ptrTimeout->tv_sec =
- currentTime < pqTimeout.time ? pqTimeout.time - currentTime : 0;
- ptrTimeout->tv_usec = 0;
-// LogDebug("Set up timeout: " << (int)ptrTimeout->tv_sec
-// << " seconds. Socket: " << pqTimeout.sock);
- }
-
- int ret = select(m_maxDesc+1, &readSet, &writeSet, NULL, ptrTimeout);
-
- if (0 == ret) { // timeout
- Assert(!m_timeoutQueue.empty());
-
- Timeout pqTimeout = m_timeoutQueue.top();
- m_timeoutQueue.pop();
-
- auto &desc = m_socketDescriptionVector[pqTimeout.sock];
-
- if (!desc.isTimeout || !desc.isOpen) {
- // Connection was closed. Timeout is useless...
- desc.isTimeout = false;
- continue;
- }
-
- if (pqTimeout.time < desc.timeout) {
- // Is it possible?
- // This socket was used after timeout. We need to update timeout.
- pqTimeout.time = desc.timeout;
- m_timeoutQueue.push(pqTimeout);
- continue;
- }
-
- // timeout from m_timeoutQueue matches with socket.timeout
- // and connection is open. Time to close it!
- // Putting new timeout in queue here is pointless.
- desc.isTimeout = false;
- LogWarning("Closing socket because of timeout: " << pqTimeout.sock);
- CloseSocket(pqTimeout.sock);
-
- // All done. Now we should process next select ;-)
- continue;
- }
-
- if (-1 == ret) {
- switch (errno) {
- case EINTR:
- LogDebug("EINTR in select");
- break;
- default:
- int err = errno;
- LogError("Error in select: " << GetErrnoString(err));
- return;
- }
- continue;
+ int ret = TEMP_FAILURE_RETRY(select(m_maxDesc+1, &readSet, &writeSet, nullptr, nullptr));
+ if (ret < 0) {
+ int err = errno;
+ LogError("Error in select: " << GetErrnoString(err));
+ return;
}
if (FD_ISSET(m_signalFd, &readSet)) {
ret--;
}
- for (int i = 0; i < m_maxDesc+1 && ret; ++i) {
+ for (int i = 0; ret; ++i) {
if (FD_ISSET(i, &readSet)) {
ReadyForRead(i);
ret--;
ret--;
}
}
+
ProcessQueue();
+ ProcessTimeout();
}
}
continue;
}
+ desc.isActiveThisGeneration = true;
desc.buffer = std::move(buffer.buffer);
desc.buffer.ModeOutput();
}
}
+void SocketManager::ProcessTimeout() {
+ if (const auto now = monotonicCoarseNow(); now > m_nextGenerationStart) {
+ m_nextGenerationStart = now + SOCKET_TIMEOUT;
+ for (int i = 0; i <= m_maxDesc; ++i) {
+ auto &desc = m_socketDescriptionVector[i];
+ if (desc.isOpen) {
+ if (!desc.isActiveThisGeneration)
+ CloseSocket(i);
+ else
+ desc.isActiveThisGeneration = false;
+ }
+ }
+ }
+}
+
void SocketManager::CloseSocket(int sock) {
LogDebug("Closing socket: " << sock);
auto &desc = m_socketDescriptionVector[sock];