remote.cpp
message.cpp
impl/general/protocol.cpp
- impl/ondemand/socket.cpp)
+ impl/ondemand/socket.cpp
+ impl/ondemand/eventfd.cpp
+ impl/ondemand/mainloop.cpp
+ impl/ondemand/connection.cpp)
ADD_LIBRARY(${TARGET} SHARED ${${TARGET}_SRCS})
SET_TARGET_PROPERTIES(${TARGET} PROPERTIES COMPILE_FLAGS "-fvisibility=hidden")
--- /dev/null
+/*
+ * Copyright (c) 2018-present Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+#include "connection.hpp"
+
+#include <vist/logger.hpp>
+
+#include <utility>
+
+namespace vist {
+namespace rmi {
+namespace impl {
+namespace ondemand {
+
+Connection::Connection(Socket&& socket) noexcept : socket(std::move(socket))
+{
+}
+
+Connection::Connection(const std::string& path) :
+ socket(Socket::connect(path))
+{
+}
+
+void Connection::send(Message& message)
+{
+ DEBUG(VIST) << "Send to socket[" << this->socket.getFd()
+ << "] Header: " << message.signature;
+ std::lock_guard<std::mutex> lock(this->sendMutex);
+
+ message.header.id = this->sequence++;
+ this->socket.send(&message.header);
+
+ this->socket.send(message.getBuffer().data(), message.header.length);
+}
+
+Message Connection::recv(void) const
+{
+ std::lock_guard<std::mutex> lock(this->recvMutex);
+ Message::Header header;
+ this->socket.recv(&header);
+
+ Message message(header);
+ this->socket.recv(message.getBuffer().data(), message.size());
+ message.disclose(message.signature);
+
+ DEBUG(VIST) << "Send to socket[" << this->socket.getFd()
+ << "] Header: " << message.signature;
+ return message;
+}
+
+Message Connection::request(Message& message)
+{
+ this->send(message);
+ return this->recv();
+}
+
+int Connection::getFd(void) const noexcept
+{
+ return this->socket.getFd();
+}
+
+} // namespace ondemand
+} // namespace impl
+} // namespace rmi
+} // namespace vist
--- /dev/null
+/*
+ * Copyright (c) 2018-present Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+#pragma once
+
+#include <vist/rmi/message.hpp>
+#include <vist/rmi/impl/ondemand/socket.hpp>
+
+#include <mutex>
+#include <utility>
+
+namespace vist {
+namespace rmi {
+namespace impl {
+namespace ondemand {
+
+class Connection {
+public:
+ explicit Connection(Socket&& socket) noexcept;
+ explicit Connection(const std::string& path);
+ virtual ~Connection() = default;
+
+ Connection(const Connection&) = delete;
+ Connection& operator=(const Connection&) = delete;
+
+ Connection(Connection&&) = default;
+ Connection& operator=(Connection&&) = default;
+
+ // server-side
+ void send(Message& message);
+ Message recv(void) const;
+
+ // client-side
+ Message request(Message& message);
+
+ int getFd(void) const noexcept;
+
+private:
+ Socket socket;
+
+ // SOCK_STREAM are full-duplex byte streams
+ mutable std::mutex sendMutex;
+ mutable std::mutex recvMutex;
+
+ unsigned int sequence = 0;
+};
+
+} // namespace ondemand
+} // namespace impl
+} // namespace rmi
+} // namespace vist
--- /dev/null
+/*
+ * Copyright (c) 2018-present Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+#include "eventfd.hpp"
+
+#include <vist/exception.hpp>
+
+#include <errno.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <cstdint>
+
+namespace vist {
+namespace rmi {
+namespace impl {
+namespace ondemand {
+
+EventFD::EventFD(unsigned int initval, int flags)
+ : fd(::eventfd(initval, flags))
+{
+ if (this->fd == -1)
+ THROW(ErrCode::RuntimeError) << "Failed to create eventfd.";
+}
+
+EventFD::~EventFD()
+{
+ ::close(fd);
+}
+
+void EventFD::send(void)
+{
+ const std::uint64_t val = 1;
+ errno = 0;
+ if (::write(this->fd, &val, sizeof(val)) == -1)
+ THROW(ErrCode::RuntimeError) << "Failed to write to eventfd: " << errno;
+}
+
+void EventFD::receive(void)
+{
+ std::uint64_t val = 0;
+ errno = 0;
+ if (::read(this->fd, &val, sizeof(val)) == -1)
+ THROW(ErrCode::RuntimeError) << "Failed to read from eventfd: " << errno;
+}
+
+int EventFD::getFd(void) const noexcept
+{
+ return this->fd;
+}
+
+} // namespace ondemand
+} // namespace impl
+} // namespace rmi
+} // namespace vist
--- /dev/null
+/*
+ * Copyright (c) 2018-present Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+#pragma once
+
+#include <sys/eventfd.h>
+
+namespace vist {
+namespace rmi {
+namespace impl {
+namespace ondemand {
+
+class EventFD final {
+public:
+ explicit EventFD(unsigned int initval = 0, int flags = EFD_SEMAPHORE | EFD_CLOEXEC);
+ ~EventFD();
+
+ EventFD(const EventFD&) = delete;
+ EventFD& operator=(const EventFD&) = delete;
+
+ EventFD(EventFD&&) = delete;
+ EventFD& operator=(EventFD&&) = delete;
+
+ void send(void);
+ void receive(void);
+
+ int getFd(void) const noexcept;
+
+private:
+ int fd;
+};
+
+} // namespace ondemand
+} // namespace impl
+} // namespace rmi
+} // namespace vist
--- /dev/null
+/*
+ * Copyright (c) 2018 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+#include "mainloop.hpp"
+
+#include <vist/exception.hpp>
+#include <vist/logger.hpp>
+
+#include <cstring>
+
+#include <unistd.h>
+#include <errno.h>
+
+namespace vist {
+namespace rmi {
+namespace impl {
+namespace ondemand {
+
+Mainloop::Mainloop() :
+ epollFd(::epoll_create1(EPOLL_CLOEXEC)),
+ stopped(false)
+{
+ if (epollFd == -1)
+ THROW(ErrCode::RuntimeError) << "Failed to create epoll instance.";
+}
+
+Mainloop::~Mainloop()
+{
+ ::close(this->epollFd);
+}
+
+void Mainloop::addHandler(const int fd, OnEvent&& onEvent, OnError&& onError)
+{
+ std::lock_guard<Mutex> lock(mutex);
+
+ if (this->listener.find(fd) != this->listener.end())
+ THROW(ErrCode::RuntimeError) << "Event is already registered.";
+
+ ::epoll_event event;
+ std::memset(&event, 0, sizeof(epoll_event));
+
+ event.events = EPOLLIN | EPOLLHUP | EPOLLRDHUP;
+ event.data.fd = fd;
+
+ if (::epoll_ctl(this->epollFd, EPOLL_CTL_ADD, fd, &event) == -1)
+ THROW(ErrCode::RuntimeError) << "Failed to add event source.";
+
+ auto onEventPtr = std::make_shared<OnEvent>(onEvent);
+ auto onErrorPtr = (onError != nullptr) ? std::make_shared<OnError>(onError) : nullptr;
+
+ auto handler = std::make_pair(std::move(onEventPtr), std::move(onErrorPtr));
+
+ this->listener.insert({fd, handler});
+}
+
+void Mainloop::removeHandler(const int fd)
+{
+ std::lock_guard<Mutex> lock(mutex);
+
+ auto iter = this->listener.find(fd);
+ if (iter == this->listener.end())
+ return;
+
+ this->listener.erase(iter);
+
+ ::epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, NULL);
+}
+
+bool Mainloop::prepare(void)
+{
+ auto wakeup = [this]() {
+ this->wakeupSignal.receive();
+ this->removeHandler(wakeupSignal.getFd());
+ this->stopped = true;
+ };
+
+ this->addHandler(this->wakeupSignal.getFd(), wakeup);
+}
+
+bool Mainloop::dispatch(int timeout) noexcept
+{
+ int nfds;
+ ::epoll_event event[MAX_EPOLL_EVENTS];
+
+ do {
+ errno = 0;
+ nfds = ::epoll_wait(epollFd, event, MAX_EPOLL_EVENTS, timeout);
+ } while ((nfds == -1) && (errno == EINTR));
+
+ if (nfds <= 0)
+ return false;
+
+ for (int i = 0; i < nfds; i++) {
+ std::shared_ptr<OnEvent> onEvent;
+ std::shared_ptr<OnError> onError;
+
+ {
+ std::lock_guard<Mutex> lock(mutex);
+
+ auto iter = this->listener.find(event[i].data.fd);
+ if (iter == this->listener.end())
+ continue;
+
+ onEvent = iter->second.first;
+ onError = iter->second.second;
+ }
+
+ try {
+ if ((event[i].events & (EPOLLHUP | EPOLLRDHUP))) {
+ if (onError != nullptr)
+ (*onError)();
+ } else {
+ (*onEvent)();
+ }
+
+ } catch (std::exception& e) {
+ ERROR(VIST) << e.what();
+ }
+ }
+
+ return true;
+}
+
+void Mainloop::run(int timeout)
+{
+ bool done = false;
+ this->stopped = false;
+
+ this->prepare();
+
+ while (!this->stopped && !done)
+ done = !dispatch(timeout);
+}
+
+void Mainloop::stop(void)
+{
+ this->wakeupSignal.send();
+}
+
+} // namespace ondemand
+} // namespace impl
+} // namespace rmi
+} // namespace vist
--- /dev/null
+/*
+ * Copyright (c) 2018-present Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+#pragma once
+
+#include <vist/rmi/impl/ondemand/eventfd.hpp>
+
+#include <atomic>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <stdexcept>
+#include <string>
+#include <unordered_map>
+
+#include <sys/epoll.h>
+
+namespace vist {
+namespace rmi {
+namespace impl {
+namespace ondemand {
+
+class Mainloop {
+public:
+ using OnEvent = std::function<void(void)>;
+ using OnError = std::function<void(void)>;
+
+ Mainloop();
+ virtual ~Mainloop();
+
+ Mainloop(const Mainloop&) = delete;
+ Mainloop& operator=(const Mainloop&) = delete;
+
+ Mainloop(Mainloop&&) = delete;
+ Mainloop& operator=(Mainloop&&) = delete;
+
+ void addHandler(const int fd, OnEvent&& onEvent, OnError&& = nullptr);
+ void removeHandler(const int fd);
+
+ void run(int timeout = -1);
+ void stop(void);
+
+private:
+ // recursive_mutex makes additional calls to lock in calling thread.
+ // And other threads will block (for calls to lock).
+ // So, addHandler() can be called during dispatch().
+ using Mutex = std::recursive_mutex;
+ using Handler = std::pair<std::shared_ptr<OnEvent>, std::shared_ptr<OnError>>;
+ using Listener = std::unordered_map<int, Handler>;
+
+ bool prepare(void);
+
+ bool dispatch(const int timeout) noexcept;
+
+ Mutex mutex;
+ Listener listener;
+ EventFD wakeupSignal;
+
+ int epollFd;
+ std::atomic<bool> stopped;
+
+ const int MAX_EPOLL_EVENTS = 16;
+};
+
+} // namespace ondemand
+} // namespace impl
+} // namespace rmi
+} // namespace vist
--- /dev/null
+/*
+ * Copyright (c) 2020 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+#include <vist/rmi/message.hpp>
+#include <vist/rmi/impl/ondemand/connection.hpp>
+#include <vist/rmi/impl/ondemand/socket.hpp>
+#include <vist/rmi/impl/ondemand/mainloop.hpp>
+
+#include <string>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+using namespace vist::rmi;
+using namespace vist::rmi::impl::ondemand;
+
+TEST(ConnectionTests, socket_communication)
+{
+ std::string sockPath = ("@vist-test.sock");
+
+ // server-side
+ Mainloop mainloop;
+ Socket socket(sockPath);
+
+ std::string requestSignature = "request signature";
+ int request1 = 100;
+ bool request2 = true;
+ std::string request3 = "request argument";
+
+ std::string responseSignature = "response signature";
+ int response1 = 300;
+ bool response2 = false;
+ std::string response3 = "response argument";
+
+ auto onAccept = [&]() {
+ Connection conn(socket.accept());
+ Message request = conn.recv();
+ EXPECT_EQ(requestSignature, request.signature);
+
+ int recv1;
+ bool recv2;
+ std::string recv3;
+ request.disclose(recv1, recv2, recv3);
+ EXPECT_EQ(request1, recv1);
+ EXPECT_EQ(request2, recv2);
+ EXPECT_EQ(request3, recv3);
+
+ Message reply(Message::Type::Reply, responseSignature);
+ reply.enclose(response1, response2, response3);
+ conn.send(reply);
+
+ mainloop.removeHandler(socket.getFd());
+ mainloop.stop();
+ };
+
+ mainloop.addHandler(socket.getFd(), std::move(onAccept));
+ auto serverThread = std::thread([&]() { mainloop.run(); });
+
+ // client-side
+ Connection conn(sockPath);
+ Message msg(Message::Type::Signal, requestSignature);
+ msg.enclose(request1, request2, request3);
+
+ int repeat = 3;
+ while (repeat--) {
+ Message reply = conn.request(msg);
+ EXPECT_EQ(reply.signature, responseSignature);
+
+ int recv1;
+ bool recv2;
+ std::string recv3;
+ reply.disclose(recv1, recv2, recv3);
+ EXPECT_EQ(response1, recv1);
+ EXPECT_EQ(response2, recv2);
+ EXPECT_EQ(response3, recv3);
+ }
+
+ if (serverThread.joinable())
+ serverThread.join();
+}