--- /dev/null
+/*
+ * Copyright (c) 2020 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+#pragma once
+
+#include <vist/logger.hpp>
+#include <vist/rmi/impl/ondemand/connection.hpp>
+
+namespace vist {
+namespace rmi {
+namespace impl {
+namespace ondemand {
+
+class Client {
+public:
+ Client(const std::string& path) : connection(path)
+ {
+ DEBUG(VIST) << "Success to connect to : " << path << " by fd[" << connection.getFd() << "]";
+ }
+
+ inline Message request(Message& message)
+ {
+ return this->connection.request(message);
+ }
+
+private:
+ Connection connection;
+};
+
+} // namespace ondemand
+} // namespace impl
+} // namespace rmi
+} // namespace vist
Connection::Connection(const std::string& path) :
socket(Socket::connect(path))
{
+ DEBUG(VIST) << "Connect to " << path << " by fd: " << socket.getFd();
}
void Connection::send(Message& message)
{
- DEBUG(VIST) << "Send to socket[" << this->socket.getFd()
- << "] Header: " << message.signature;
std::lock_guard<std::mutex> lock(this->sendMutex);
message.header.id = this->sequence++;
this->socket.recv(message.getBuffer().data(), message.size());
message.disclose(message.signature);
- DEBUG(VIST) << "Send to socket[" << this->socket.getFd()
- << "] Header: " << message.signature;
return message;
}
auto handler = std::make_pair(std::move(onEventPtr), std::move(onErrorPtr));
this->listener.insert({fd, handler});
+ DEBUG(VIST) << "FD[" << fd << "] listens to events.";
}
void Mainloop::removeHandler(const int fd)
{
auto wakeup = [this]() {
this->wakeupSignal.receive();
- this->removeHandler(wakeupSignal.getFd());
+ this->removeHandler(this->wakeupSignal.getFd());
this->stopped = true;
};
+ DEBUG(VIST) << "Add eventfd to mainloop for wakeup: " << this->wakeupSignal.getFd();
this->addHandler(this->wakeupSignal.getFd(), wakeup);
}
try {
if ((event[i].events & (EPOLLHUP | EPOLLRDHUP))) {
+ WARN(VIST) << "Connected client might be disconnected.";
if (onError != nullptr)
(*onError)();
} else {
(*onEvent)();
}
- } catch (std::exception& e) {
+ } catch (const std::exception& e) {
ERROR(VIST) << e.what();
}
}
--- /dev/null
+/*
+ * Copyright (c) 2018-present Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+#pragma once
+
+#include <vist/rmi/impl/server.hpp>
+#include <vist/rmi/impl/ondemand/connection.hpp>
+#include <vist/rmi/impl/ondemand/mainloop.hpp>
+#include <vist/rmi/impl/ondemand/socket.hpp>
+
+#include <vist/exception.hpp>
+#include <vist/logger.hpp>
+
+#include <memory>
+#include <mutex>
+#include <set>
+#include <string>
+#include <unordered_map>
+
+namespace vist {
+namespace rmi {
+namespace impl {
+namespace ondemand {
+
+class Server : public impl::Server {
+public:
+ Server(const std::string& path, const Task& task) : impl::Server(path, task), socket(path)
+ {
+ this->accept(task);
+ }
+
+ virtual ~Server() = default;
+
+ Server(const Server&) = delete;
+ Server& operator=(const Server&) = delete;
+
+ Server(Server&&) = default;
+ Server& operator=(Server&&) = default;
+
+ void run(void) override
+ {
+ this->mainloop.run();
+ }
+
+ void stop(void) override
+ {
+ this->mainloop.removeHandler(this->socket.getFd());
+ this->mainloop.stop();
+ }
+
+private:
+ void accept(const Task& task) override
+ {
+ auto handler = [this, task]() {
+ DEBUG(VIST) << "New session is accepted.";
+
+ auto connection = std::make_shared<Connection>(this->socket.accept());
+ auto onRead = [connection, task]() {
+ Message request = connection->recv();
+ DEBUG(VIST) << "Session header: " << request.signature;
+
+ try {
+ Message reply = task(request);
+ connection->send(reply);
+ } catch (const std::exception& e) {
+ ERROR(VIST) << e.what();
+ }
+ };
+
+ auto onClose = [this, connection]() {
+ DEBUG(VIST) << "Connection closed. fd: " << connection->getFd();
+ this->mainloop.removeHandler(connection->getFd());
+ };
+
+ this->mainloop.addHandler(connection->getFd(),
+ std::move(onRead), std::move(onClose));
+ };
+
+ INFO(VIST) << "Ready for new connection.";
+ this->mainloop.addHandler(this->socket.getFd(), std::move(handler));
+ }
+
+ Socket socket;
+ Mainloop mainloop;
+};
+
+} // namespace ondemand
+} // namespace impl
+} // namespace rmi
+} // namespace vist
#include "socket.hpp"
+#include <vist/logger.hpp>
+
#include <fstream>
#include <iostream>
#include <fcntl.h>
void set_cloexec(int fd)
{
if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
- throw std::runtime_error("Failed to set CLOSEXEC.");
+ THROW(ErrCode::RuntimeError) << "Failed to set CLOSEXEC.";
}
} // anonymous namespace
Socket::Socket(const std::string& path)
{
if (path.size() >= sizeof(::sockaddr_un::sun_path))
- throw std::invalid_argument("Socket path size is wrong.");
+ THROW(ErrCode::LogicError) << "Socket path size is wrong.";
int fd = ::socket(AF_UNIX, SOCK_STREAM, 0);
if (fd == -1)
- throw std::runtime_error("Failed to create socket.");
+ THROW(ErrCode::RuntimeError) << "Failed to create socket.";
set_cloexec(fd);
struct stat buf;
if (::stat(path.c_str(), &buf) == 0)
if (::unlink(path.c_str()) == -1)
- throw std::runtime_error("Failed to remove exist socket.");
+ THROW(ErrCode::RuntimeError) << "Failed to remove exist socket.";
if (::bind(fd, reinterpret_cast<::sockaddr*>(&addr), sizeof(::sockaddr_un)) == -1) {
::close(fd);
- throw std::runtime_error("Failed to bind.");
+ THROW(ErrCode::RuntimeError) << "Failed to bind.";
}
if (::listen(fd, MAX_BACKLOG_SIZE) == -1) {
::close(fd);
- throw std::runtime_error("Failed to liten.");
+ THROW(ErrCode::RuntimeError) << "Failed to liten.";
}
this->fd = fd;
+
+ DEBUG(VIST) << "Socket is created: " << path << ", and is listening.. fd[" << fd << "]";
}
Socket::Socket(Socket&& that) : fd(that.fd)
Socket Socket::connect(const std::string& path)
{
if (path.size() >= sizeof(::sockaddr_un::sun_path))
- throw std::invalid_argument("Socket path size is wrong.");
+ THROW(ErrCode::LogicError) << "Socket path size is wrong.";
int fd = ::socket(AF_UNIX, SOCK_STREAM, 0);
if (fd == -1)
if (addr.sun_path[0] == '@')
addr.sun_path[0] = '\0';
+ DEBUG(VIST) << "Start to connect: " << path;
errno = 0;
if (::connect(fd, reinterpret_cast<::sockaddr*>(&addr), sizeof(sockaddr_un)) == -1) {
::close(fd);
- THROW(ErrCode::RuntimeError) << "Failed to read connect to: " << path
+ ERROR(VIST) << "Failed to connect to: " << path;
+ THROW(ErrCode::RuntimeError) << "Failed to connect to: " << path
<< ", with: " << errno;
}
Message msg(Message::Type::Signal, requestSignature);
msg.enclose(request1, request2, request3);
- int repeat = 3;
- while (repeat--) {
+ /// Do not request multiple times. (The above sever only processes once.)
+ {
Message reply = conn.request(msg);
EXPECT_EQ(reply.signature, responseSignature);
--- /dev/null
+/*
+ * Copyright (c) 2019 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+#include <vist/rmi/message.hpp>
+#include <vist/rmi/impl/ondemand/server.hpp>
+#include <vist/rmi/impl/ondemand/client.hpp>
+
+#include <string>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+using namespace vist::rmi;
+using namespace vist::rmi::impl::ondemand;
+
+namespace {
+
+/// Request variables
+std::string requestSignature = "request signature";
+int request1 = 100;
+bool request2 = true;
+std::string request3 = "request argument";
+
+/// Response variables
+std::string responseSignature = "response signature";
+int response1 = 300;
+bool response2 = false;
+std::string response3 = "response argument";
+
+} // anonymous namespace
+
+TEST(ServerClientTests, ondemand)
+{
+ std::string sockPath = "./vist-test.sock";
+
+ auto task = [&](Message& message) -> Message {
+ EXPECT_EQ(message.signature, requestSignature);
+
+ int recv1;
+ bool recv2;
+ std::string recv3;
+ message.disclose(recv1, recv2, recv3);
+ EXPECT_EQ(request1, recv1);
+ EXPECT_EQ(request2, recv2);
+ EXPECT_EQ(request3, recv3);
+
+ Message reply(Message::Type::Reply, responseSignature);
+ reply.enclose(response1, response2, response3);
+ return reply;
+ };
+
+ Server server(sockPath, task);
+ auto serverThread = std::thread([&]() {
+ server.run();
+ });
+
+ { /// Client configuration
+ auto clientClosure = [&]() {
+ Client client(sockPath);
+
+ Message message(Message::Type::MethodCall, requestSignature);
+ message.enclose(request1, request2, request3);
+
+ for (int i = 0; i < 3; i++) {
+ auto reply = client.request(message);
+ EXPECT_EQ(reply.signature, responseSignature);
+
+ int recv1;
+ bool recv2;
+ std::string recv3;
+ reply.disclose(recv1, recv2, recv3);
+ EXPECT_EQ(response1, recv1);
+ EXPECT_EQ(response2, recv2);
+ EXPECT_EQ(response3, recv3);
+ }
+ };
+
+ for (int i = 0; i < 3; i++)
+ clientClosure();
+ }
+
+ server.stop();
+
+ if (serverThread.joinable())
+ serverThread.join();
+}