this->m_loop.addEventSource(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP,
[&, fd](uint32_t event) {
- std::unique_lock<std::mutex> lock(this->m_crMtx);
+ std::lock_guard<std::mutex> lock(this->m_crMtx);
DEBUG("read event comes in to fd[" << fd << "]");
return;
}
- lock.unlock();
-
DEBUG("Start message process on fd[" << fd << "]");
onMessageProcess(conn);
});
- {
- std::lock_guard<std::mutex> l(this->m_crMtx);
- this->m_connectionRegistry[fd] = std::move(connection);
- }
+ std::lock_guard<std::mutex> lock(this->m_crMtx);
+ this->m_connectionRegistry[fd] = std::move(connection);
}
void Service::onCloseConnection(const ConnShPtr &connection)
this->m_connectionRegistry.erase(fd);
}
-bool Service::isConnectionValid(int fd) const
-{
- std::lock_guard<std::mutex> l(this->m_crMtx);
-
- return this->m_connectionRegistry.count(fd) != 0;
-}
-
}
auto inbufPtr = std::make_shared<RawBuffer>(connection->receive());
- auto fd = connection->getFd();
-
- this->m_workqueue.submit([this, &connection, fd, process, inbufPtr]() {
- auto outbuf = (*process)(connection, *inbufPtr);
-
- CpuUsageManager::reset();
-
- if (!this->isConnectionValid(fd)) {
- ERROR("Connection for fd[] is closed while task is in processing...");
- return;
+ this->m_workqueue.submit([this, connection, process, inbufPtr]() {
+ try {
+ auto outbuf = (*process)(connection, *inbufPtr);
+
+ CpuUsageManager::reset();
+
+ connection->send(outbuf);
+ } catch (const std::exception &e) {
+ ERROR("exception on workqueue task: " << e.what());
+ try {
+ connection->send(BinaryQueue::Serialize(CSR_ERROR_SYSTEM).pop());
+ } catch (const std::exception &e) {
+ ERROR("The connection is abnormally closed by the peer: " << e.what());
+ }
}
-
- connection->send(outbuf);
});
}