Refactor cancel async scan architecture 06/78506/4
authorKyungwook Tak <k.tak@samsung.com>
Wed, 6 Jul 2016 01:00:19 +0000 (10:00 +0900)
committerKyungwook Tak <k.tak@samsung.com>
Wed, 6 Jul 2016 06:08:02 +0000 (15:08 +0900)
Before: Check cancelled flag on both of client/server side in loop of
        file visitor. It's hard to control response latency consistently.

After : Change client side connection to non-blocking to cancel it
        directly by sending signal to fd. Response latency is consistent
        because client just close connection.

Change-Id: If181eb9984357939b2845b7d03a17dac57a0b9d0
Signed-off-by: Kyungwook Tak <k.tak@samsung.com>
15 files changed:
src/CMakeLists.txt
src/framework/client/async-logic.cpp
src/framework/client/async-logic.h
src/framework/client/content-screening.cpp
src/framework/client/eventfd.cpp [new file with mode: 0644]
src/framework/client/eventfd.h [new file with mode: 0644]
src/framework/client/handle-ext.cpp
src/framework/client/handle-ext.h
src/framework/common/dispatcher.cpp
src/framework/common/dispatcher.h
src/framework/common/mainloop.h
src/framework/common/socket.cpp
src/framework/service/cs-logic.cpp
src/framework/service/exception.cpp
src/framework/service/server-service.cpp

index 8470339..a17ad25 100755 (executable)
@@ -103,6 +103,7 @@ SET(${TARGET_CSR_CLIENT}_SRCS
        framework/client/canonicalize.cpp
        framework/client/content-screening.cpp
        framework/client/engine-manager.cpp
+       framework/client/eventfd.cpp
        framework/client/handle.cpp
        framework/client/handle-ext.cpp
        framework/client/utils.cpp
index 5ed5192..43ca501 100644 (file)
 
 #include <cstdint>
 #include <utility>
+#include <sys/epoll.h>
 
 #include "common/exception.h"
 #include "common/cs-detected.h"
+#include "common/connection.h"
 #include "common/async-protocol.h"
 #include "common/audit/logger.h"
 
@@ -33,12 +35,15 @@ namespace Csr {
 namespace Client {
 
 AsyncLogic::AsyncLogic(HandleExt *handle, void *userdata) :
-       m_handle(handle), m_userdata(userdata)
+       m_handle(handle), m_userdata(userdata), m_dispatcherAsync(new Dispatcher(SockId::CS))
 {
 }
 
-AsyncLogic::~AsyncLogic()
+void AsyncLogic::stop(void)
 {
+       INFO("async logic stop called! Let's send cancel signal to loop");
+       this->m_dispatcherAsync->methodPing(CommandId::CANCEL_OPERATION);
+       this->m_cancelSignal.send();
 }
 
 void AsyncLogic::scanDirs(const StrSet &dirs)
@@ -53,22 +58,31 @@ void AsyncLogic::scanFiles(const StrSet &files)
 
 void AsyncLogic::scanHelper(const CommandId &id, const StrSet &s)
 {
-       auto ret = this->m_handle->dispatch<int>(id, *(this->m_handle->getContext()), s);
+       this->m_dispatcherAsync->methodPing(id, *(this->m_handle->getContext()), s);
 
-       if (ret != ASYNC_EVENT_START)
-               ThrowExc(ret, "Error on async scan. ret: " << ret);
+       auto fd = this->m_dispatcherAsync->getFd();
+       auto cancelEventFd = this->m_cancelSignal.getFd();
 
-       DEBUG("loop for waiting server event start!!");
+       this->m_loop.addEventSource(cancelEventFd, EPOLLIN,
+       [&](uint32_t) {
+               this->m_cancelSignal.receive();
+               ThrowExcInfo(ASYNC_EVENT_CANCEL, "Async event cancelled on fd: " << fd);
+       });
 
-       while (true) {
-               auto event = this->m_handle->revent<int>();
+       this->m_loop.addEventSource(fd, EPOLLIN | EPOLLHUP | EPOLLRDHUP,
+       [&](uint32_t e) {
+               if (e & (EPOLLHUP | EPOLLRDHUP))
+                       ThrowExc(CSR_ERROR_SOCKET, "csr-server might be crashed. Finish async client loop");
+
+               // read event
+               auto event = this->m_dispatcherAsync->receiveEvent<int>();
 
                DEBUG("event received: " << event);
 
                switch (event) {
                case ASYNC_EVENT_MALWARE_NONE: {
                        DEBUG("ASYNC_EVENT_MALWARE_NONE comes in!");
-                       auto targetName = this->m_handle->revent<std::string>();
+                       auto targetName = this->m_dispatcherAsync->receiveEvent<std::string>();
 
                        if (targetName.empty()) {
                                ERROR("scanned event received but target name is empty");
@@ -83,7 +97,7 @@ void AsyncLogic::scanHelper(const CommandId &id, const StrSet &s)
 
                case ASYNC_EVENT_MALWARE_DETECTED: {
                        DEBUG("ASYNC_EVENT_MALWARE_DETECTED comes in!");
-                       auto malware = this->m_handle->revent<CsDetected *>();
+                       auto malware = this->m_dispatcherAsync->receiveEvent<CsDetected *>();
 
                        if (malware == nullptr) {
                                ERROR("malware detected event received but handle is null");
@@ -101,21 +115,23 @@ void AsyncLogic::scanHelper(const CommandId &id, const StrSet &s)
                        break;
                }
 
-               case ASYNC_EVENT_COMPLETE: {
-                       DEBUG("Async operation completed");
-                       return;
-               }
-
-               case ASYNC_EVENT_CANCEL: {
-                       ThrowExcInfo(ASYNC_EVENT_CANCEL, "Async operation cancelled!");
-               }
-
                default:
-                       ThrowExc(event, "Error on async scan! ec: " << event);
+                       ThrowExcInfo(event, "Async event loop terminated by event: " << event);
                }
+       });
+
+       try {
+               while (true)
+                       this->m_loop.dispatch(-1);
+       } catch (const Exception &e) {
+               switch (e.error()) {
+                       case ASYNC_EVENT_COMPLETE:
+                               INFO("Async operation completed");
+                               break;
 
-               if (this->m_handle->isStopped())
-                       ThrowExcInfo(ASYNC_EVENT_CANCEL, "Async op cancelled!");
+                       default:
+                               throw;
+               }
        }
 }
 
index 0136074..7e82857 100644 (file)
  */
 #pragma once
 
+#include <memory>
+
 #include "common/types.h"
 #include "common/command-id.h"
+#include "common/dispatcher.h"
+#include "common/mainloop.h"
 #include "client/handle-ext.h"
+#include "client/eventfd.h"
 
 namespace Csr {
 namespace Client {
@@ -31,7 +36,7 @@ namespace Client {
 class AsyncLogic {
 public:
        AsyncLogic(HandleExt *handle, void *userdata);
-       virtual ~AsyncLogic();
+       virtual ~AsyncLogic() = default;
 
        void scanFiles(const StrSet &files);
        void scanDirs(const StrSet &dirs);
@@ -42,8 +47,10 @@ private:
        void scanHelper(const CommandId &id, const StrSet &s);
 
        HandleExt *m_handle; // for registering results for auto-release
-
        void *m_userdata;
+       Mainloop m_loop;
+       EventFd m_cancelSignal;
+       std::unique_ptr<Dispatcher> m_dispatcherAsync;
 };
 
 }
index 61684c0..17d518a 100644 (file)
@@ -400,11 +400,13 @@ int csr_cs_scan_files_async(csr_cs_context_h handle, const char *file_paths[],
        auto task = std::make_shared<Task>([hExt, user_data, fileSet] {
                EXCEPTION_ASYNC_SAFE_START(hExt->m_cb, user_data)
 
+               Client::AsyncLogic l(hExt, user_data);
+
+               hExt->setStopFunc([&l]() { l.stop(); });
+
                if (hExt->isStopped())
                        ThrowExcInfo(ASYNC_EVENT_CANCEL, "Async operation cancelled!");
 
-               Client::AsyncLogic l(hExt, user_data);
-
                l.scanFiles(*fileSet);
 
                EXCEPTION_SAFE_END
@@ -458,11 +460,13 @@ int csr_cs_scan_dirs_async(csr_cs_context_h handle, const char *dir_paths[],
        auto task = std::make_shared<Task>([hExt, user_data, dirSet] {
                EXCEPTION_ASYNC_SAFE_START(hExt->m_cb, user_data)
 
+               Client::AsyncLogic l(hExt, user_data);
+
+               hExt->setStopFunc([&l]() { l.stop(); });
+
                if (hExt->isStopped())
                        ThrowExcInfo(ASYNC_EVENT_CANCEL, "Async operation cancelled!");
 
-               Client::AsyncLogic l(hExt, user_data);
-
                l.scanDirs(*dirSet);
 
                EXCEPTION_SAFE_END
@@ -490,10 +494,6 @@ int csr_cs_cancel_scanning(csr_cs_context_h handle)
        if (!hExt->isRunning() || hExt->isStopped())
                return CSR_ERROR_NO_TASK;
 
-       hExt->turnOnStopFlagOnly();
-
-       hExt->ping(CommandId::CANCEL_OPERATION);
-
        hExt->stop();
 
        return CSR_ERROR_NONE;
diff --git a/src/framework/client/eventfd.cpp b/src/framework/client/eventfd.cpp
new file mode 100644 (file)
index 0000000..ec3d9ed
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ *  Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+/*
+ * @file        eventfd.cpp
+ * @author      Jaemin Ryu (jm77.ryu@samsung.com)
+ * @version     1.0
+ * @brief
+ */
+#include "client/eventfd.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+#include <cstdint>
+
+#include "common/exception.h"
+
+namespace Csr {
+namespace Client {
+
+EventFd::EventFd(unsigned int initval, int flags) :
+       m_fd(::eventfd(initval, flags))
+{
+       if (this->m_fd == -1)
+               ThrowExc(CSR_ERROR_SERVER, "Eventfd from constructor is failed!");
+}
+
+EventFd::~EventFd()
+{
+       if (this->m_fd != -1)
+               ::close(this->m_fd);
+}
+
+void EventFd::send()
+{
+       const std::uint64_t val = 1;
+       if (::write(this->m_fd, &val, sizeof(val)) == -1)
+               ThrowExc(CSR_ERROR_SOCKET, "EventFd send to fd[" << this->m_fd << "] is failed!");
+}
+
+void EventFd::receive()
+{
+       std::uint64_t val;
+       if (::read(this->m_fd, &val, sizeof(val)) == -1)
+               ThrowExc(CSR_ERROR_SOCKET, "EventFd receive from fd[" << this->m_fd << "] is failed!");
+}
+
+}
+}
diff --git a/src/framework/client/eventfd.h b/src/framework/client/eventfd.h
new file mode 100644 (file)
index 0000000..b6719d1
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ *  Copyright (c) 2016 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+/*
+ * @file        eventfd.h
+ * @author      Jaemin Ryu (jm77.ryu@samsung.com)
+ * @version     1.0
+ * @brief
+ */
+#pragma once
+
+#include <sys/eventfd.h>
+
+namespace Csr {
+namespace Client {
+
+class EventFd {
+public:
+       EventFd(unsigned int initval = 0, int flags = EFD_SEMAPHORE | EFD_CLOEXEC);
+       ~EventFd();
+
+       EventFd(const EventFd &) = delete;
+       EventFd &operator=(const EventFd &) = delete;
+
+       void send();
+       void receive();
+
+       inline int getFd() const noexcept
+       {
+               return this->m_fd;
+       }
+
+private:
+       int m_fd;
+};
+
+}
+}
index d0e6fda..66a9273 100644 (file)
@@ -41,17 +41,23 @@ HandleExt::~HandleExt()
                this->m_worker.join();
 }
 
-void HandleExt::turnOnStopFlagOnly()
+void HandleExt::setStopFunc(std::function<void()> &&func)
 {
        std::lock_guard<std::mutex> l(this->m_flagMutex);
-       this->m_stop = true;
+       this->m_stopFunc = std::move(func);
 }
 
 void HandleExt::stop()
 {
        DEBUG("Stop & join worker...");
 
-       this->turnOnStopFlagOnly();
+       {
+               std::lock_guard<std::mutex> l(this->m_flagMutex);
+               this->m_stop = true;
+
+               if (this->m_stopFunc != nullptr)
+                       this->m_stopFunc();
+       }
 
        if (this->m_worker.joinable())
                this->m_worker.join();
index d9789a0..31efdf9 100644 (file)
@@ -37,7 +37,7 @@ public:
        virtual ~HandleExt();
 
        void dispatchAsync(const std::shared_ptr<Task> &task);
-       void turnOnStopFlagOnly(void);
+       void setStopFunc(std::function<void()> &&func);
        void stop(void);
        bool isStopped(void) const;
        bool isRunning(void) const;
@@ -52,6 +52,7 @@ private:
        bool m_stop;
        bool m_isRunning;
        std::thread m_worker;
+       std::function<void()> m_stopFunc;
        mutable std::mutex m_resultsMutex;
        mutable std::mutex m_flagMutex;
 };
index 9f8e023..2982a23 100644 (file)
@@ -39,4 +39,12 @@ void Dispatcher::connect()
                                Socket::create(this->m_sockId, Socket::Type::CLIENT));
 }
 
+int Dispatcher::getFd() const noexcept
+{
+       if (this->m_connection == nullptr)
+               return -1;
+       else
+               return this->m_connection->getFd();
+}
+
 } // namespace Csr
index 3b534db..08a91ee 100644 (file)
@@ -40,6 +40,8 @@ public:
        Dispatcher(Dispatcher &&) = delete;
        Dispatcher &operator=(Dispatcher &&) = delete;
 
+       int getFd(void) const noexcept;
+
        template<typename Type, typename ...Args>
        Type methodCall(Args &&...args);
 
index 7a16bab..18b04d2 100644 (file)
@@ -45,6 +45,9 @@ public:
        // if timeout is negative value, no timeout on idle.
        void run(int timeout);
 
+       // Moved to public to customize stop condition
+       void dispatch(int timeout);
+
        void addEventSource(int fd, uint32_t event, Callback &&callback);
        void removeEventSource(int fd);
        size_t countEventSource(void) const;
@@ -52,7 +55,6 @@ public:
        void setIdleChecker(std::function<bool()> &&idleChecker);
 
 private:
-       void dispatch(int timeout);
 
        bool m_isTimedOut;
        int m_pollfd;
index 0b5c499..f698772 100644 (file)
@@ -200,7 +200,7 @@ void Socket::write(const RawBuffer &data) const
 
        auto bytes = ::write(this->m_fd, &size, sizeof(size));
        if (bytes < 0)
-               ThrowExc(CSR_ERROR_SOCKET, "Socket data size write failed on fd[" << this->m_fd <<
+               ThrowExcWarn(CSR_ERROR_SOCKET, "Socket data size write failed on fd[" << this->m_fd <<
                                 "] with errno: " << errno);
 
        while (total < size) {
@@ -210,7 +210,7 @@ void Socket::write(const RawBuffer &data) const
                        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
                                continue;
                        else
-                               ThrowExc(CSR_ERROR_SOCKET, "Socket write failed on fd[" << this->m_fd <<
+                               ThrowExcWarn(CSR_ERROR_SOCKET, "Socket write failed on fd[" << this->m_fd <<
                                                 "] with errno: " << errno);
                }
 
index f650644..ed0542e 100644 (file)
@@ -305,7 +305,7 @@ Db::Cache CsLogic::scanAppDelta(const FilePtr &pkgPtr, const CancelChecker &isCa
 }
 
 int CsLogic::scanApp(const CsContext &context, const FilePtr &pkgPtr,
-                                        CsDetectedPtr &malware, const std::function<void()> &isCancelled)
+                                        CsDetectedPtr &malware, const CancelChecker &isCancelled)
 {
        const auto &pkgPath = pkgPtr->getName();
        const auto &pkgId = pkgPtr->getAppPkgId();
@@ -463,7 +463,7 @@ CsLogic::ScanStage CsLogic::judgeScanStage(const CsDetectedPtr &riskiest,
 }
 
 int CsLogic::scanFileInternal(const CsContext &context, const FilePtr &target,
-                                                         CsDetectedPtr &malware, const std::function<void()> &isCancelled)
+                                                         CsDetectedPtr &malware, const CancelChecker &isCancelled)
 {
        if (target->isInApp())
                return this->scanApp(context, target, malware, isCancelled);
@@ -525,13 +525,11 @@ RawBuffer CsLogic::scanFile(const CsContext &context, const std::string &filepat
 }
 
 RawBuffer CsLogic::scanFilesAsync(const ConnShPtr &conn, const CsContext &context,
-                                                                 StrSet &paths, const std::function<void()> &isCancelled)
+                                                                 StrSet &paths, const CancelChecker &isCancelled)
 {
        if (this->m_db->getEngineState(CSR_ENGINE_CS) != CSR_STATE_ENABLE)
                ThrowExc(CSR_ERROR_ENGINE_DISABLED, "engine is disabled");
 
-       conn->send(BinaryQueue::Serialize(ASYNC_EVENT_START).pop());
-
        StrSet canonicalized;
 
        for (const auto &path : paths) {
@@ -588,7 +586,7 @@ RawBuffer CsLogic::scanFilesAsync(const ConnShPtr &conn, const CsContext &contex
 }
 
 RawBuffer CsLogic::scanDirsAsync(const ConnShPtr &conn, const CsContext &context,
-                                                                StrSet &paths, const std::function<void()> &isCancelled)
+                                                                StrSet &paths, const CancelChecker &isCancelled)
 {
        if (this->m_db->getEngineState(CSR_ENGINE_CS) != CSR_STATE_ENABLE)
                ThrowExc(CSR_ERROR_ENGINE_DISABLED, "engine is disabled");
@@ -596,6 +594,8 @@ RawBuffer CsLogic::scanDirsAsync(const ConnShPtr &conn, const CsContext &context
        StrSet dirs;
 
        for (const auto &path : paths) {
+               isCancelled();
+
                try {
                        auto target = canonicalizePath(path, true);
 
@@ -621,20 +621,14 @@ RawBuffer CsLogic::scanDirsAsync(const ConnShPtr &conn, const CsContext &context
 
        eraseSubdirectories(dirs);
 
-       DEBUG("send error none to client before starting scanning");
-
-       conn->send(BinaryQueue::Serialize(ASYNC_EVENT_START).pop());
-
        CsEngineContext engineContext(this->m_loader);
        auto t = this->m_loader->getEngineLatestUpdateTime(engineContext.get());
 
-       DEBUG("Start async scanning!!!!!");
+       INFO("Start async scanning!!!!!");
 
        StrSet malwareList;
        for (const auto &dir : dirs) {
-               isCancelled();
-
-               DEBUG("Start async scanning for dir: " << dir);
+               INFO("Start async scanning for dir: " << dir);
 
                for (auto &row : this->m_db->getDetectedAllByNameOnDir(dir, t)) {
                        isCancelled();
@@ -643,7 +637,7 @@ RawBuffer CsLogic::scanDirsAsync(const ConnShPtr &conn, const CsContext &context
                                auto fileptr = File::create(row->targetName, nullptr);
 
                                CsDetectedPtr malware;
-                               auto retcode = this->scanFileInternal(context, fileptr, malware);
+                               auto retcode = this->scanFileInternal(context, fileptr, malware, isCancelled);
 
                                switch (retcode) {
                                case CSR_ERROR_NONE:
@@ -677,13 +671,15 @@ RawBuffer CsLogic::scanDirsAsync(const ConnShPtr &conn, const CsContext &context
                        }
                }
 
+               INFO("detected malwares rescanning done from db.");
+
                auto startTime = ::time(nullptr);
                auto lastScanTime = this->m_db->getLastScanTime(dir, t);
                auto visitor = FsVisitor::create([&](const FilePtr &file) {
                        isCancelled();
 
                        CsDetectedPtr malware;
-                       auto retcode = this->scanFileInternal(context, file, malware);
+                       auto retcode = this->scanFileInternal(context, file, malware, isCancelled);
 
                        DEBUG("scanFileInternal done. file: " << file->getName() <<
                                  " retcode: " << retcode);
index f395e10..5019fde 100644 (file)
@@ -37,11 +37,12 @@ RawBuffer exceptionGuard(const std::function<RawBuffer()> &func)
        try {
                return func();
        } catch (const Exception &e) {
-               if (e.error() == ASYNC_EVENT_CANCEL)
+               if (e.error() == CSR_ERROR_SOCKET)
+                       WARN("Socket error. Client might cancel async scan or crashed: " << e.what());
+               else if (e.error() == ASYNC_EVENT_CANCEL)
                        INFO("Async operation cancel exception: " << e.what());
                else
                        ERROR("Exception caught. code: " << e.error() << " message: " << e.what());
-
                return BinaryQueue::Serialize(e.error()).pop();
        } catch (const std::invalid_argument &e) {
                ERROR("Invalid argument: " << e.what());
index 38bd875..24bd1bf 100644 (file)
@@ -428,6 +428,12 @@ void ServerService::onMessageProcess(const ConnShPtr &connection)
 
                        if (!outbuf.empty())
                                connection->send(outbuf);
+               } catch (const Exception &e) {
+                       if (e.error() == CSR_ERROR_SOCKET)
+                               WARN("The connection is closed by the peer. Client might cancel async "
+                                        "scanning or crashed: " << e.what());
+                       else
+                               throw;
                } catch (const std::exception &e) {
                        ERROR("exception on workqueue task: " << e.what());
                        try {