From: Sangwan Kwon Date: Wed, 15 Jan 2020 08:17:55 +0000 (+0900) Subject: Allocate server session per thread X-Git-Tag: submit/tizen/20200810.073515~95 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=6723aaea4b15738b7a0513c926a2ce7c4bf0378c;p=platform%2Fcore%2Fsecurity%2Fvist.git Allocate server session per thread Signed-off-by: Sangwan Kwon --- diff --git a/src/vist/common/tests/thread-pool.cpp b/src/vist/common/tests/thread-pool.cpp index 0061c0d..22396da 100644 --- a/src/vist/common/tests/thread-pool.cpp +++ b/src/vist/common/tests/thread-pool.cpp @@ -24,15 +24,24 @@ using namespace vist; -TEST(ThreadPoolTests, submit) +TEST(ThreadPoolTests, once) { int count = 0; + auto task = [&count]{ count++; }; ThreadPool worker(5); - auto task = [&count]() { - count++; - }; + worker.submit(task); + std::this_thread::sleep_for(std::chrono::seconds(1)); + EXPECT_EQ(count, 1); +} + +TEST(ThreadPoolTests, multiple) +{ + int count = 0; + auto task = [&count]{ count++; }; + + ThreadPool worker(5); std::size_t repeat = 10; while (repeat--) worker.submit(task); diff --git a/src/vist/common/thread-pool.cpp b/src/vist/common/thread-pool.cpp index 0d8bfdc..b138aca 100644 --- a/src/vist/common/thread-pool.cpp +++ b/src/vist/common/thread-pool.cpp @@ -14,11 +14,13 @@ * limitations under the License. */ +#include + +#include + #include #include -#include - #define __BEGIN_CRITICAL__ { std::unique_lock lock(this->queueMutex); #define __END_CRITICAL__ } @@ -37,6 +39,8 @@ ThreadPool::ThreadPool(std::size_t threads) return; } + DEBUG(VIST) << "Thread is waked.."; + task = std::move(tasks.front()); tasks.pop_front(); __END_CRITICAL__ diff --git a/src/vist/rmi/CMakeLists.txt b/src/vist/rmi/CMakeLists.txt index 329367e..9997628 100644 --- a/src/vist/rmi/CMakeLists.txt +++ b/src/vist/rmi/CMakeLists.txt @@ -16,6 +16,7 @@ SET(TARGET vist-rmi) SET(${TARGET}_SRCS gateway.cpp remote.cpp message.cpp + impl/server.cpp impl/socket.cpp impl/eventfd.cpp impl/mainloop.cpp diff --git a/src/vist/rmi/gateway.cpp b/src/vist/rmi/gateway.cpp index 1bbb640..f12e42d 100644 --- a/src/vist/rmi/gateway.cpp +++ b/src/vist/rmi/gateway.cpp @@ -17,6 +17,7 @@ #include "gateway.hpp" #include +#include #include #include diff --git a/src/vist/rmi/impl/server.cpp b/src/vist/rmi/impl/server.cpp new file mode 100644 index 0000000..76abbd1 --- /dev/null +++ b/src/vist/rmi/impl/server.cpp @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2018-present Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +#include +#include +#include + +#include +#include + +#include + +namespace vist { +namespace rmi { +namespace impl { + +Server::Server(const std::string& path, const Task& task, ServiceType type) : worker(5) +{ + switch (type) { + case ServiceType::OnDemand: + this->socket = std::make_unique(SystemdSocket::Create(path)); + break; + case ServiceType::General: /// fall through + default: + this->socket = std::make_unique(path); + break; + } + + this->accept(task); +} + +void Server::run(int timeout, Stopper stopper) +{ + this->mainloop.run(timeout, stopper); +} + +void Server::stop(void) +{ + this->mainloop.removeHandler(this->socket->getFd()); + this->mainloop.stop(); +} + +void Server::accept(const Task& task) +{ + auto handler = [this, task]() { + DEBUG(VIST) << "New session is accepted."; + auto connection = std::make_shared(this->socket->accept()); + + /// process task per thread + this->worker.submit([this, connection, task]{ + auto onRead = [connection, task]() { + Message request = connection->recv(); + DEBUG(VIST) << "Session header: " << request.signature; + + try { + Message reply = task(request); + connection->send(reply); + } catch (const std::exception& e) { + ERROR(VIST) << e.what(); + Message reply = Message(Message::Type::Error, e.what()); + connection->send(reply); + } + }; + + auto onClose = [this, connection]() { + DEBUG(VIST) << "Connection closed. fd: " << connection->getFd(); + this->mainloop.removeHandler(connection->getFd()); + }; + + this->mainloop.addHandler(connection->getFd(), + std::move(onRead), std::move(onClose)); + }); + }; + + INFO(VIST) << "Ready for new connection."; + this->mainloop.addHandler(this->socket->getFd(), std::move(handler)); +} + +} // namespace impl +} // namespace rmi +} // namespace vist diff --git a/src/vist/rmi/impl/server.hpp b/src/vist/rmi/impl/server.hpp index e8b4604..1625bd3 100644 --- a/src/vist/rmi/impl/server.hpp +++ b/src/vist/rmi/impl/server.hpp @@ -17,19 +17,13 @@ #pragma once #include -#include +#include #include #include -#include - -#include -#include +#include #include -#include -#include #include -#include #include namespace vist { @@ -42,21 +36,7 @@ using ServiceType = Gateway::ServiceType; class Server { public: - Server(const std::string& path, const Task& task, ServiceType type = ServiceType::General) - { - switch (type) { - case ServiceType::OnDemand: - this->socket = std::make_unique(SystemdSocket::Create(path)); - break; - case ServiceType::General: /// fall through - default: - this->socket = std::make_unique(path); - break; - } - - this->accept(task); - } - + Server(const std::string& path, const Task& task, ServiceType type = ServiceType::General); virtual ~Server() = default; Server(const Server&) = delete; @@ -65,55 +45,18 @@ public: Server(Server&&) = default; Server& operator=(Server&&) = default; - void run(int timeout = -1, Stopper stopper = nullptr) - { - this->mainloop.run(timeout, stopper); - } - - void stop(void) - { - this->mainloop.removeHandler(this->socket->getFd()); - this->mainloop.stop(); - } + void run(int timeout = -1, Stopper stopper = nullptr); + void stop(void); private: - void accept(const Task& task) - { - auto handler = [this, task]() { - DEBUG(VIST) << "New session is accepted."; - - auto connection = std::make_shared(this->socket->accept()); - auto onRead = [connection, task]() { - Message request = connection->recv(); - DEBUG(VIST) << "Session header: " << request.signature; - - try { - Message reply = task(request); - connection->send(reply); - } catch (const std::exception& e) { - ERROR(VIST) << e.what(); - Message reply = Message(Message::Type::Error, e.what()); - connection->send(reply); - } - }; - - auto onClose = [this, connection]() { - DEBUG(VIST) << "Connection closed. fd: " << connection->getFd(); - this->mainloop.removeHandler(connection->getFd()); - }; - - this->mainloop.addHandler(connection->getFd(), - std::move(onRead), std::move(onClose)); - }; - - INFO(VIST) << "Ready for new connection."; - this->mainloop.addHandler(this->socket->getFd(), std::move(handler)); - } + void accept(const Task& task); std::unique_ptr socket; Mainloop mainloop; + ThreadPool worker; }; } // namespace impl } // namespace rmi } // namespace vist +