From 1030bb012847bcdcf5e254cca0eb652b75151d1f Mon Sep 17 00:00:00 2001 From: Sangwan Kwon Date: Thu, 2 Jan 2020 15:58:09 +0900 Subject: [PATCH] Add socket connection Signed-off-by: Sangwan Kwon --- src/vist/rmi/CMakeLists.txt | 5 +- src/vist/rmi/impl/ondemand/connection.cpp | 78 +++++++++ src/vist/rmi/impl/ondemand/connection.hpp | 64 +++++++ src/vist/rmi/impl/ondemand/eventfd.cpp | 68 ++++++++ src/vist/rmi/impl/ondemand/eventfd.hpp | 49 ++++++ src/vist/rmi/impl/ondemand/mainloop.cpp | 156 ++++++++++++++++++ src/vist/rmi/impl/ondemand/mainloop.hpp | 81 +++++++++ .../rmi/impl/ondemand/tests/connection.cpp | 93 +++++++++++ 8 files changed, 593 insertions(+), 1 deletion(-) create mode 100644 src/vist/rmi/impl/ondemand/connection.cpp create mode 100644 src/vist/rmi/impl/ondemand/connection.hpp create mode 100644 src/vist/rmi/impl/ondemand/eventfd.cpp create mode 100644 src/vist/rmi/impl/ondemand/eventfd.hpp create mode 100644 src/vist/rmi/impl/ondemand/mainloop.cpp create mode 100644 src/vist/rmi/impl/ondemand/mainloop.hpp create mode 100644 src/vist/rmi/impl/ondemand/tests/connection.cpp diff --git a/src/vist/rmi/CMakeLists.txt b/src/vist/rmi/CMakeLists.txt index dbd06e6..d06d07d 100644 --- a/src/vist/rmi/CMakeLists.txt +++ b/src/vist/rmi/CMakeLists.txt @@ -17,7 +17,10 @@ SET(${TARGET}_SRCS gateway.cpp remote.cpp message.cpp impl/general/protocol.cpp - impl/ondemand/socket.cpp) + impl/ondemand/socket.cpp + impl/ondemand/eventfd.cpp + impl/ondemand/mainloop.cpp + impl/ondemand/connection.cpp) ADD_LIBRARY(${TARGET} SHARED ${${TARGET}_SRCS}) SET_TARGET_PROPERTIES(${TARGET} PROPERTIES COMPILE_FLAGS "-fvisibility=hidden") diff --git a/src/vist/rmi/impl/ondemand/connection.cpp b/src/vist/rmi/impl/ondemand/connection.cpp new file mode 100644 index 0000000..2a49ff5 --- /dev/null +++ b/src/vist/rmi/impl/ondemand/connection.cpp @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2018-present Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +#include "connection.hpp" + +#include + +#include + +namespace vist { +namespace rmi { +namespace impl { +namespace ondemand { + +Connection::Connection(Socket&& socket) noexcept : socket(std::move(socket)) +{ +} + +Connection::Connection(const std::string& path) : + socket(Socket::connect(path)) +{ +} + +void Connection::send(Message& message) +{ + DEBUG(VIST) << "Send to socket[" << this->socket.getFd() + << "] Header: " << message.signature; + std::lock_guard lock(this->sendMutex); + + message.header.id = this->sequence++; + this->socket.send(&message.header); + + this->socket.send(message.getBuffer().data(), message.header.length); +} + +Message Connection::recv(void) const +{ + std::lock_guard lock(this->recvMutex); + Message::Header header; + this->socket.recv(&header); + + Message message(header); + this->socket.recv(message.getBuffer().data(), message.size()); + message.disclose(message.signature); + + DEBUG(VIST) << "Send to socket[" << this->socket.getFd() + << "] Header: " << message.signature; + return message; +} + +Message Connection::request(Message& message) +{ + this->send(message); + return this->recv(); +} + +int Connection::getFd(void) const noexcept +{ + return this->socket.getFd(); +} + +} // namespace ondemand +} // namespace impl +} // namespace rmi +} // namespace vist diff --git a/src/vist/rmi/impl/ondemand/connection.hpp b/src/vist/rmi/impl/ondemand/connection.hpp new file mode 100644 index 0000000..080391d --- /dev/null +++ b/src/vist/rmi/impl/ondemand/connection.hpp @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2018-present Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +#pragma once + +#include +#include + +#include +#include + +namespace vist { +namespace rmi { +namespace impl { +namespace ondemand { + +class Connection { +public: + explicit Connection(Socket&& socket) noexcept; + explicit Connection(const std::string& path); + virtual ~Connection() = default; + + Connection(const Connection&) = delete; + Connection& operator=(const Connection&) = delete; + + Connection(Connection&&) = default; + Connection& operator=(Connection&&) = default; + + // server-side + void send(Message& message); + Message recv(void) const; + + // client-side + Message request(Message& message); + + int getFd(void) const noexcept; + +private: + Socket socket; + + // SOCK_STREAM are full-duplex byte streams + mutable std::mutex sendMutex; + mutable std::mutex recvMutex; + + unsigned int sequence = 0; +}; + +} // namespace ondemand +} // namespace impl +} // namespace rmi +} // namespace vist diff --git a/src/vist/rmi/impl/ondemand/eventfd.cpp b/src/vist/rmi/impl/ondemand/eventfd.cpp new file mode 100644 index 0000000..e3ec163 --- /dev/null +++ b/src/vist/rmi/impl/ondemand/eventfd.cpp @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2018-present Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +#include "eventfd.hpp" + +#include + +#include +#include +#include + +#include + +namespace vist { +namespace rmi { +namespace impl { +namespace ondemand { + +EventFD::EventFD(unsigned int initval, int flags) + : fd(::eventfd(initval, flags)) +{ + if (this->fd == -1) + THROW(ErrCode::RuntimeError) << "Failed to create eventfd."; +} + +EventFD::~EventFD() +{ + ::close(fd); +} + +void EventFD::send(void) +{ + const std::uint64_t val = 1; + errno = 0; + if (::write(this->fd, &val, sizeof(val)) == -1) + THROW(ErrCode::RuntimeError) << "Failed to write to eventfd: " << errno; +} + +void EventFD::receive(void) +{ + std::uint64_t val = 0; + errno = 0; + if (::read(this->fd, &val, sizeof(val)) == -1) + THROW(ErrCode::RuntimeError) << "Failed to read from eventfd: " << errno; +} + +int EventFD::getFd(void) const noexcept +{ + return this->fd; +} + +} // namespace ondemand +} // namespace impl +} // namespace rmi +} // namespace vist diff --git a/src/vist/rmi/impl/ondemand/eventfd.hpp b/src/vist/rmi/impl/ondemand/eventfd.hpp new file mode 100644 index 0000000..c017fe3 --- /dev/null +++ b/src/vist/rmi/impl/ondemand/eventfd.hpp @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2018-present Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +#pragma once + +#include + +namespace vist { +namespace rmi { +namespace impl { +namespace ondemand { + +class EventFD final { +public: + explicit EventFD(unsigned int initval = 0, int flags = EFD_SEMAPHORE | EFD_CLOEXEC); + ~EventFD(); + + EventFD(const EventFD&) = delete; + EventFD& operator=(const EventFD&) = delete; + + EventFD(EventFD&&) = delete; + EventFD& operator=(EventFD&&) = delete; + + void send(void); + void receive(void); + + int getFd(void) const noexcept; + +private: + int fd; +}; + +} // namespace ondemand +} // namespace impl +} // namespace rmi +} // namespace vist diff --git a/src/vist/rmi/impl/ondemand/mainloop.cpp b/src/vist/rmi/impl/ondemand/mainloop.cpp new file mode 100644 index 0000000..2927aa1 --- /dev/null +++ b/src/vist/rmi/impl/ondemand/mainloop.cpp @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2018 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +#include "mainloop.hpp" + +#include +#include + +#include + +#include +#include + +namespace vist { +namespace rmi { +namespace impl { +namespace ondemand { + +Mainloop::Mainloop() : + epollFd(::epoll_create1(EPOLL_CLOEXEC)), + stopped(false) +{ + if (epollFd == -1) + THROW(ErrCode::RuntimeError) << "Failed to create epoll instance."; +} + +Mainloop::~Mainloop() +{ + ::close(this->epollFd); +} + +void Mainloop::addHandler(const int fd, OnEvent&& onEvent, OnError&& onError) +{ + std::lock_guard lock(mutex); + + if (this->listener.find(fd) != this->listener.end()) + THROW(ErrCode::RuntimeError) << "Event is already registered."; + + ::epoll_event event; + std::memset(&event, 0, sizeof(epoll_event)); + + event.events = EPOLLIN | EPOLLHUP | EPOLLRDHUP; + event.data.fd = fd; + + if (::epoll_ctl(this->epollFd, EPOLL_CTL_ADD, fd, &event) == -1) + THROW(ErrCode::RuntimeError) << "Failed to add event source."; + + auto onEventPtr = std::make_shared(onEvent); + auto onErrorPtr = (onError != nullptr) ? std::make_shared(onError) : nullptr; + + auto handler = std::make_pair(std::move(onEventPtr), std::move(onErrorPtr)); + + this->listener.insert({fd, handler}); +} + +void Mainloop::removeHandler(const int fd) +{ + std::lock_guard lock(mutex); + + auto iter = this->listener.find(fd); + if (iter == this->listener.end()) + return; + + this->listener.erase(iter); + + ::epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, NULL); +} + +bool Mainloop::prepare(void) +{ + auto wakeup = [this]() { + this->wakeupSignal.receive(); + this->removeHandler(wakeupSignal.getFd()); + this->stopped = true; + }; + + this->addHandler(this->wakeupSignal.getFd(), wakeup); +} + +bool Mainloop::dispatch(int timeout) noexcept +{ + int nfds; + ::epoll_event event[MAX_EPOLL_EVENTS]; + + do { + errno = 0; + nfds = ::epoll_wait(epollFd, event, MAX_EPOLL_EVENTS, timeout); + } while ((nfds == -1) && (errno == EINTR)); + + if (nfds <= 0) + return false; + + for (int i = 0; i < nfds; i++) { + std::shared_ptr onEvent; + std::shared_ptr onError; + + { + std::lock_guard lock(mutex); + + auto iter = this->listener.find(event[i].data.fd); + if (iter == this->listener.end()) + continue; + + onEvent = iter->second.first; + onError = iter->second.second; + } + + try { + if ((event[i].events & (EPOLLHUP | EPOLLRDHUP))) { + if (onError != nullptr) + (*onError)(); + } else { + (*onEvent)(); + } + + } catch (std::exception& e) { + ERROR(VIST) << e.what(); + } + } + + return true; +} + +void Mainloop::run(int timeout) +{ + bool done = false; + this->stopped = false; + + this->prepare(); + + while (!this->stopped && !done) + done = !dispatch(timeout); +} + +void Mainloop::stop(void) +{ + this->wakeupSignal.send(); +} + +} // namespace ondemand +} // namespace impl +} // namespace rmi +} // namespace vist diff --git a/src/vist/rmi/impl/ondemand/mainloop.hpp b/src/vist/rmi/impl/ondemand/mainloop.hpp new file mode 100644 index 0000000..6c76d9a --- /dev/null +++ b/src/vist/rmi/impl/ondemand/mainloop.hpp @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2018-present Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace vist { +namespace rmi { +namespace impl { +namespace ondemand { + +class Mainloop { +public: + using OnEvent = std::function; + using OnError = std::function; + + Mainloop(); + virtual ~Mainloop(); + + Mainloop(const Mainloop&) = delete; + Mainloop& operator=(const Mainloop&) = delete; + + Mainloop(Mainloop&&) = delete; + Mainloop& operator=(Mainloop&&) = delete; + + void addHandler(const int fd, OnEvent&& onEvent, OnError&& = nullptr); + void removeHandler(const int fd); + + void run(int timeout = -1); + void stop(void); + +private: + // recursive_mutex makes additional calls to lock in calling thread. + // And other threads will block (for calls to lock). + // So, addHandler() can be called during dispatch(). + using Mutex = std::recursive_mutex; + using Handler = std::pair, std::shared_ptr>; + using Listener = std::unordered_map; + + bool prepare(void); + + bool dispatch(const int timeout) noexcept; + + Mutex mutex; + Listener listener; + EventFD wakeupSignal; + + int epollFd; + std::atomic stopped; + + const int MAX_EPOLL_EVENTS = 16; +}; + +} // namespace ondemand +} // namespace impl +} // namespace rmi +} // namespace vist diff --git a/src/vist/rmi/impl/ondemand/tests/connection.cpp b/src/vist/rmi/impl/ondemand/tests/connection.cpp new file mode 100644 index 0000000..bc51510 --- /dev/null +++ b/src/vist/rmi/impl/ondemand/tests/connection.cpp @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2020 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +#include +#include +#include +#include + +#include +#include + +#include + +using namespace vist::rmi; +using namespace vist::rmi::impl::ondemand; + +TEST(ConnectionTests, socket_communication) +{ + std::string sockPath = ("@vist-test.sock"); + + // server-side + Mainloop mainloop; + Socket socket(sockPath); + + std::string requestSignature = "request signature"; + int request1 = 100; + bool request2 = true; + std::string request3 = "request argument"; + + std::string responseSignature = "response signature"; + int response1 = 300; + bool response2 = false; + std::string response3 = "response argument"; + + auto onAccept = [&]() { + Connection conn(socket.accept()); + Message request = conn.recv(); + EXPECT_EQ(requestSignature, request.signature); + + int recv1; + bool recv2; + std::string recv3; + request.disclose(recv1, recv2, recv3); + EXPECT_EQ(request1, recv1); + EXPECT_EQ(request2, recv2); + EXPECT_EQ(request3, recv3); + + Message reply(Message::Type::Reply, responseSignature); + reply.enclose(response1, response2, response3); + conn.send(reply); + + mainloop.removeHandler(socket.getFd()); + mainloop.stop(); + }; + + mainloop.addHandler(socket.getFd(), std::move(onAccept)); + auto serverThread = std::thread([&]() { mainloop.run(); }); + + // client-side + Connection conn(sockPath); + Message msg(Message::Type::Signal, requestSignature); + msg.enclose(request1, request2, request3); + + int repeat = 3; + while (repeat--) { + Message reply = conn.request(msg); + EXPECT_EQ(reply.signature, responseSignature); + + int recv1; + bool recv2; + std::string recv3; + reply.disclose(recv1, recv2, recv3); + EXPECT_EQ(response1, recv1); + EXPECT_EQ(response2, recv2); + EXPECT_EQ(response3, recv3); + } + + if (serverThread.joinable()) + serverThread.join(); +} -- 2.34.1