void Mainloop::addEventSource(int fd, uint32_t event, Callback &&callback)
{
+ std::lock_guard<std::mutex> l(this->m_mutex);
+
if (this->m_callbacks.count(fd) != 0)
ThrowExc(CSR_ERROR_SERVER, "event source on fd[" << fd << "] already added!");
void Mainloop::removeEventSource(int fd)
{
+ std::lock_guard<std::mutex> l(this->m_mutex);
+
if (this->m_callbacks.count(fd) == 0)
ThrowExc(CSR_ERROR_SERVER, "event source on fd[" << fd << "] isn't added at all");
DEBUG("Remove event source on fd[" << fd << "]");
- {
- this->m_callbacks.erase(fd);
+ this->m_callbacks.erase(fd);
- if (::epoll_ctl(m_pollfd, EPOLL_CTL_DEL, fd, nullptr) == -1) {
- if (errno == ENOENT)
- ThrowExc(CSR_ERROR_SERVER, "Tried to delete epoll item which wasn't added");
- else
- throw std::system_error(
- std::error_code(errno, std::generic_category()),
- "epoll_ctl failed to EPOLL_CTL_DEL.");
- }
+ if (::epoll_ctl(m_pollfd, EPOLL_CTL_DEL, fd, nullptr) == -1) {
+ if (errno == ENOENT)
+ ThrowExc(CSR_ERROR_SERVER, "Tried to delete epoll item which wasn't added");
+ else
+ throw std::system_error(
+ std::error_code(errno, std::generic_category()),
+ "epoll_ctl failed to EPOLL_CTL_DEL.");
}
}
+size_t Mainloop::countEventSource() const
+{
+ std::lock_guard<std::mutex> l(this->m_mutex);
+ return this->m_callbacks.size();
+}
+
void Mainloop::dispatch(int timeout)
{
int nfds = -1;
namespace Csr {
-class Mainloop {
+class API Mainloop {
public:
using Callback = std::function<void(uint32_t event)>;
void addEventSource(int fd, uint32_t event, Callback &&callback);
void removeEventSource(int fd);
+ size_t countEventSource(void) const;
void setIdleChecker(std::function<bool()> &&idleChecker);
bool m_isTimedOut;
int m_pollfd;
- std::mutex m_mutex;
+ mutable std::mutex m_mutex;
std::unordered_map<int, Callback> m_callbacks;
std::function<bool()> m_isIdle;
virtual void start(int timeout) final;
protected:
- void setIdleChecker(std::function<bool()> &&idleChecker);
+ Mainloop m_loop;
private:
virtual void onMessageProcess(const ConnShPtr &) = 0;
mutable std::mutex m_crMtx;
std::unordered_map<int, ConnShPtr> m_connectionRegistry;
- Mainloop m_loop;
std::set<SockId> m_sockIds;
};
this->add(SockId::ADMIN);
// if task is not running in workqueue, it's idle.
- this->setIdleChecker([this]() { return !this->m_workqueue.isTaskRunning(); });
+ this->m_loop.setIdleChecker([this]()->bool {
+ return (!this->m_workqueue.isTaskRunning() && this->m_loop.countEventSource() == 3);
+ });
}
RawBuffer ServerService::processCs(const ConnShPtr &conn, RawBuffer &data)