Allocate server session per thread
authorSangwan Kwon <sangwan.kwon@samsung.com>
Wed, 15 Jan 2020 08:17:55 +0000 (17:17 +0900)
committer권상완/Security 2Lab(SR)/Engineer/삼성전자 <sangwan.kwon@samsung.com>
Fri, 17 Jan 2020 06:24:01 +0000 (15:24 +0900)
Signed-off-by: Sangwan Kwon <sangwan.kwon@samsung.com>
src/vist/common/tests/thread-pool.cpp
src/vist/common/thread-pool.cpp
src/vist/rmi/CMakeLists.txt
src/vist/rmi/gateway.cpp
src/vist/rmi/impl/server.cpp [new file with mode: 0644]
src/vist/rmi/impl/server.hpp

index 0061c0d..22396da 100644 (file)
 
 using namespace vist;
 
-TEST(ThreadPoolTests, submit)
+TEST(ThreadPoolTests, once)
 {
        int count = 0;
+       auto task = [&count]{ count++; };
 
        ThreadPool worker(5);
-       auto task = [&count]() {
-               count++;
-       };
+       worker.submit(task);
 
+       std::this_thread::sleep_for(std::chrono::seconds(1));
+       EXPECT_EQ(count, 1);
+}
+
+TEST(ThreadPoolTests, multiple)
+{
+       int count = 0;
+       auto task = [&count]{ count++; };
+
+       ThreadPool worker(5);
        std::size_t repeat = 10;
        while (repeat--)
                worker.submit(task);
index 0d8bfdc..b138aca 100644 (file)
  * limitations under the License.
  */
 
+#include <vist/thread-pool.hpp>
+
+#include <vist/logger.hpp>
+
 #include <sys/types.h>
 #include <unistd.h>
 
-#include <vist/thread-pool.hpp>
-
 #define __BEGIN_CRITICAL__  { std::unique_lock<std::mutex> lock(this->queueMutex);
 #define __END_CRITICAL__    }
 
@@ -37,6 +39,8 @@ ThreadPool::ThreadPool(std::size_t threads)
                                        return;
                                }
 
+                               DEBUG(VIST) << "Thread is waked..";
+
                                task = std::move(tasks.front());
                                tasks.pop_front();
                                __END_CRITICAL__
index 329367e..9997628 100644 (file)
@@ -16,6 +16,7 @@ SET(TARGET vist-rmi)
 SET(${TARGET}_SRCS gateway.cpp
                                   remote.cpp
                                   message.cpp
+                                  impl/server.cpp
                                   impl/socket.cpp
                                   impl/eventfd.cpp
                                   impl/mainloop.cpp
index 1bbb640..f12e42d 100644 (file)
@@ -17,6 +17,7 @@
 #include "gateway.hpp"
 
 #include <vist/exception.hpp>
+#include <vist/logger.hpp>
 #include <vist/rmi/message.hpp>
 #include <vist/rmi/impl/server.hpp>
 
diff --git a/src/vist/rmi/impl/server.cpp b/src/vist/rmi/impl/server.cpp
new file mode 100644 (file)
index 0000000..76abbd1
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ *  Copyright (c) 2018-present Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+#include <vist/rmi/impl/server.hpp>
+#include <vist/rmi/impl/connection.hpp>
+#include <vist/rmi/impl/systemd-socket.hpp>
+
+#include <vist/exception.hpp>
+#include <vist/logger.hpp>
+
+#include <thread>
+
+namespace vist {
+namespace rmi {
+namespace impl {
+
+Server::Server(const std::string& path, const Task& task, ServiceType type) : worker(5)
+{
+       switch (type) {
+       case ServiceType::OnDemand:
+               this->socket = std::make_unique<Socket>(SystemdSocket::Create(path));
+               break;
+       case ServiceType::General: /// fall through
+       default:
+               this->socket = std::make_unique<Socket>(path);
+               break;
+       }
+
+       this->accept(task);
+}
+
+void Server::run(int timeout, Stopper stopper)
+{
+       this->mainloop.run(timeout, stopper);
+}
+
+void Server::stop(void)
+{
+       this->mainloop.removeHandler(this->socket->getFd());
+       this->mainloop.stop();
+}
+
+void Server::accept(const Task& task)
+{
+       auto handler = [this, task]() {
+               DEBUG(VIST) << "New session is accepted.";
+               auto connection = std::make_shared<Connection>(this->socket->accept());
+
+               /// process task per thread
+               this->worker.submit([this, connection, task]{
+                       auto onRead = [connection, task]() {
+                               Message request = connection->recv();
+                               DEBUG(VIST) << "Session header: " << request.signature;
+
+                               try {
+                                       Message reply = task(request);
+                                       connection->send(reply);
+                               } catch (const std::exception& e) {
+                                       ERROR(VIST) << e.what();
+                                       Message reply = Message(Message::Type::Error, e.what());
+                                       connection->send(reply);
+                               }
+                       };
+
+                       auto onClose = [this, connection]() {
+                               DEBUG(VIST) << "Connection closed. fd: " << connection->getFd();
+                               this->mainloop.removeHandler(connection->getFd());
+                       };
+
+                       this->mainloop.addHandler(connection->getFd(),
+                                                                       std::move(onRead), std::move(onClose));
+               });
+       };
+
+       INFO(VIST) << "Ready for new connection.";
+       this->mainloop.addHandler(this->socket->getFd(), std::move(handler));
+}
+
+} // namespace impl
+} // namespace rmi
+} // namespace vist
index e8b4604..1625bd3 100644 (file)
 #pragma once
 
 #include <vist/rmi/gateway.hpp>
-#include <vist/rmi/impl/connection.hpp>
+#include <vist/rmi/message.hpp>
 #include <vist/rmi/impl/mainloop.hpp>
 #include <vist/rmi/impl/socket.hpp>
-#include <vist/rmi/impl/systemd-socket.hpp>
-
-#include <vist/exception.hpp>
-#include <vist/logger.hpp>
+#include <vist/thread-pool.hpp>
 
 #include <memory>
-#include <mutex>
-#include <set>
 #include <string>
-#include <unordered_map>
 #include <functional>
 
 namespace vist {
@@ -42,21 +36,7 @@ using ServiceType = Gateway::ServiceType;
 
 class Server {
 public:
-       Server(const std::string& path, const Task& task, ServiceType type = ServiceType::General)
-       {
-               switch (type) {
-               case ServiceType::OnDemand:
-                       this->socket = std::make_unique<Socket>(SystemdSocket::Create(path));
-                       break;
-               case ServiceType::General: /// fall through
-               default:
-                       this->socket = std::make_unique<Socket>(path);
-                       break;
-               }
-
-               this->accept(task);
-       }
-
+       Server(const std::string& path, const Task& task, ServiceType type = ServiceType::General);
        virtual ~Server() = default;
 
        Server(const Server&) = delete;
@@ -65,55 +45,18 @@ public:
        Server(Server&&) = default;
        Server& operator=(Server&&) = default;
 
-       void run(int timeout = -1, Stopper stopper = nullptr)
-       {
-               this->mainloop.run(timeout, stopper);
-       }
-
-       void stop(void)
-       {
-               this->mainloop.removeHandler(this->socket->getFd());
-               this->mainloop.stop();
-       }
+       void run(int timeout = -1, Stopper stopper = nullptr);
+       void stop(void);
 
 private:
-       void accept(const Task& task)
-       {
-               auto handler = [this, task]() {
-                       DEBUG(VIST) << "New session is accepted.";
-
-                       auto connection = std::make_shared<Connection>(this->socket->accept());
-                       auto onRead = [connection, task]() {
-                               Message request = connection->recv();
-                               DEBUG(VIST) << "Session header: " << request.signature;
-
-                               try {
-                                       Message reply = task(request);
-                                       connection->send(reply);
-                               } catch (const std::exception& e) {
-                                       ERROR(VIST) << e.what();
-                                       Message reply = Message(Message::Type::Error, e.what());
-                                       connection->send(reply);
-                               }
-                       };
-
-                       auto onClose = [this, connection]() {
-                               DEBUG(VIST) << "Connection closed. fd: " << connection->getFd();
-                               this->mainloop.removeHandler(connection->getFd());
-                       };
-
-                       this->mainloop.addHandler(connection->getFd(),
-                                                                       std::move(onRead), std::move(onClose));
-               };
-
-               INFO(VIST) << "Ready for new connection.";
-               this->mainloop.addHandler(this->socket->getFd(), std::move(handler));
-       }
+       void accept(const Task& task);
 
        std::unique_ptr<Socket> socket;
        Mainloop mainloop;
+       ThreadPool worker;
 };
 
 } // namespace impl
 } // namespace rmi
 } // namespace vist
+