void Service::setNewConnectionCallback(const ConnCallback &callback)
{
- this->m_onNewConnection = [this, &callback](const ConnShPtr &connection) {
+ this->m_onNewConnection = [this, &callback](ConnShPtr &&connection) {
if (!connection)
ThrowExc(CSR_ERROR_SERVER, "onNewConnection called but ConnShPtr is nullptr.");
- int fd = connection->getFd();
+ auto fd = connection->getFd();
INFO("welcome! accepted client socket fd[" << fd << "]");
this->m_loop.addEventSource(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP,
[&, fd](uint32_t event) {
+ std::unique_lock<std::mutex> lock(this->m_crMtx);
+
DEBUG("read event comes in to fd[" << fd << "]");
if (this->m_connectionRegistry.count(fd) == 0)
return;
}
+ lock.unlock();
+
DEBUG("Start message process on fd[" << fd << "]");
onMessageProcess(conn);
});
- this->m_connectionRegistry[fd] = connection;
+ {
+ std::lock_guard<std::mutex> l(this->m_crMtx);
+ this->m_connectionRegistry[fd] = std::move(connection);
+ }
};
}
if (!connection)
ThrowExc(CSR_ERROR_SERVER, "no connection to close");
- int fd = connection->getFd();
+ auto fd = connection->getFd();
if (this->m_connectionRegistry.count(fd) == 0)
ThrowExc(CSR_ERROR_SERVER, "no connection in registry to remove "
INFO("good-bye! close socket fd[" << fd << "]");
this->m_loop.removeEventSource(fd);
+
this->m_connectionRegistry.erase(fd);
if (callback)
};
}
+bool Service::isConnectionValid(int fd) const
+{
+ std::lock_guard<std::mutex> l(this->m_crMtx);
+
+ return this->m_connectionRegistry.count(fd) != 0;
+}
+
}
#include <string>
#include <functional>
#include <set>
+#include <mutex>
#include "common/macros.h"
#include "common/connection.h"
protected:
void setIdleChecker(std::function<bool()> &&idleChecker);
+ bool isConnectionValid(int fd) const;
private:
virtual void onMessageProcess(const ConnShPtr &) = 0;
- ConnCallback m_onNewConnection;
+ std::function<void(ConnShPtr &&)> m_onNewConnection;
ConnCallback m_onCloseConnection;
+ mutable std::mutex m_crMtx;
std::unordered_map<int, ConnShPtr> m_connectionRegistry;
Mainloop m_loop;
auto inbufPtr = std::make_shared<RawBuffer>(connection->receive());
- this->m_workqueue.submit([this, &connection, process, inbufPtr]() {
- auto outbuf = (*process)(connection, *inbufPtr);
+ auto fd = connection->getFd();
- connection->send(outbuf);
+ this->m_workqueue.submit([this, &connection, fd, process, inbufPtr]() {
+ auto outbuf = (*process)(connection, *inbufPtr);
CpuUsageManager::reset();
+
+ if (!this->isConnectionValid(fd)) {
+ ERROR("Connection for fd[] is closed while task is in processing...");
+ return;
+ }
+
+ connection->send(outbuf);
});
}