this->m_isRunning = true;
this->m_stop = false;
- // TODO: how to handle exceptions in workers
this->m_worker = std::thread([this, f] {
DEBUG("client async thread dispatched! tid: " << std::this_thread::get_id());
void Dispatcher::connect()
{
- this->m_connection = std::make_shared<Connection>(Socket::connect(this->m_sockId));
+ this->m_connection = std::make_shared<Connection>(
+ Socket::create(this->m_sockId, Socket::Type::CLIENT));
}
bool Dispatcher::isConnected() const noexcept
template<typename Type, typename ...Args>
Type Dispatcher::methodCall(Args &&...args)
{
- if (!isConnected())
- connect();
+ if (!this->isConnected())
+ this->connect();
- m_connection->send(BinaryQueue::Serialize(std::forward<Args>(args)...).pop());
+ this->m_connection->send(BinaryQueue::Serialize(std::forward<Args>(args)...).pop());
BinaryQueue q;
- q.push(m_connection->receive());
+ q.push(this->m_connection->receive());
Type response;
q.Deserialize(response);
void Mainloop::addEventSource(int fd, uint32_t event, Callback &&callback)
{
- /* TODO: use scoped-lock to thread safe on member variables */
if (this->m_callbacks.count(fd) != 0)
ThrowExc(CSR_ERROR_SERVER, "event source on fd[" << fd << "] already added!");
void Mainloop::removeEventSource(int fd)
{
- /* TODO: use scoped-lock to thread safe on member variables */
if (this->m_callbacks.count(fd) == 0)
ThrowExc(CSR_ERROR_SERVER, "event source on fd[" << fd << "] isn't added at all");
}
for (int i = 0; i < nfds; i++) {
- /* TODO: use scoped-lock to thread safe on member variables */
int fd = event[i].data.fd;
if (this->m_callbacks.count(fd) == 0)
INFO("Service start!");
for (const auto &id : this->m_sockIds) {
- auto socket = std::make_shared<Socket>(id);
+ auto socket = std::make_shared<Socket>(Socket::create(id, Socket::Type::SERVER));
DEBUG("Get systemd socket[" << socket->getFd() <<
"] for sock id: " << static_cast<int>(id));
this->m_loop.run(timeout);
}
-void Service::setNewConnectionCallback(const ConnCallback &/*callback*/)
+void Service::setNewConnectionCallback(const ConnCallback &callback)
{
- /* TODO: scoped-lock */
- this->m_onNewConnection = [&](const ConnShPtr & connection) {
+ this->m_onNewConnection = [this, &callback](const ConnShPtr &connection) {
if (!connection)
ThrowExc(CSR_ERROR_SERVER, "onNewConnection called but ConnShPtr is nullptr.");
INFO("welcome! accepted client socket fd[" << fd << "]");
- /*
- // TODO: disable temporarily
- if (callback)
- callback(connection);
- */
+ if (callback)
+ callback(connection);
this->m_loop.addEventSource(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP,
- [ &, fd](uint32_t event) {
+ [&, fd](uint32_t event) {
DEBUG("read event comes in to fd[" << fd << "]");
if (this->m_connectionRegistry.count(fd) == 0)
};
}
-void Service::setCloseConnectionCallback(const ConnCallback &/*callback*/)
+void Service::setCloseConnectionCallback(const ConnCallback &callback)
{
- /* TODO: scoped-lock */
- this->m_onCloseConnection = [&](const ConnShPtr & connection) {
+ this->m_onCloseConnection = [this, &callback](const ConnShPtr &connection) {
if (!connection)
ThrowExc(CSR_ERROR_SERVER, "no connection to close");
INFO("good-bye! close socket fd[" << fd << "]");
this->m_loop.removeEventSource(fd);
- this->m_connectionRegistry.erase(fd); /* scoped-lock needed? */
+ this->m_connectionRegistry.erase(fd);
- /*
- // TODO: disable temporarily
- if (callback)
- callback(connection);
- */
+ if (callback)
+ callback(connection);
};
}
} // namespace anonymous
+Socket Socket::create(SockId sockId, Socket::Type type)
+{
+ switch (type) {
+ case Socket::Type::SERVER:
+ return Socket(sockId);
+ case Socket::Type::CLIENT:
+ return Socket::connect(sockId);
+ default:
+ ThrowExc(CSR_ERROR_SOCKET, "Invalid type to Socket::create");
+ }
+}
+
Socket::Socket(SockId sockId, int fd) : m_sockId(sockId), m_fd(fd)
{
if (this->m_fd < 0)
class API Socket {
public:
- // Socket with accepted / connected
- Socket(SockId sockId, int fd);
+ enum class Type : int {
+ SERVER = 0x01,
+ CLIENT = 0x02
+ };
- // Create systemd socket
- Socket(SockId sockId);
+ static Socket create(SockId, Socket::Type);
Socket(const Socket &) = delete;
Socket &operator=(const Socket &) = delete;
RawBuffer read(void) const;
void write(const RawBuffer &data) const;
- /* TODO: can it be constructor? */
+private:
static Socket connect(SockId);
+ Socket(SockId sockId, int fd);
+ Socket(SockId sockId);
-private:
SockId m_sockId;
int m_fd;
};