Cancel async operation on both of server/client side 30/76430/2
authorKyungwook Tak <k.tak@samsung.com>
Fri, 24 Jun 2016 02:17:10 +0000 (11:17 +0900)
committerKyungwook Tak <k.tak@samsung.com>
Fri, 24 Jun 2016 04:36:36 +0000 (13:36 +0900)
Change-Id: I5c5ee328c2ef7e18287b0ab298620cc3df97d4c7
Signed-off-by: Kyungwook Tak <k.tak@samsung.com>
16 files changed:
src/framework/client/async-logic.cpp
src/framework/client/async-logic.h
src/framework/client/content-screening.cpp
src/framework/client/handle-ext.cpp
src/framework/client/handle-ext.h
src/framework/client/handle.cpp
src/framework/client/handle.h
src/framework/common/command-id.h
src/framework/common/dispatcher.cpp
src/framework/common/dispatcher.h
src/framework/common/socket.cpp
src/framework/service/cs-logic.cpp
src/framework/service/cs-logic.h
src/framework/service/server-service.cpp
src/framework/service/server-service.h
test/test-api-content-screening-async.cpp

index 42d417a..0dec60a 100644 (file)
@@ -35,8 +35,7 @@ AsyncLogic::AsyncLogic(HandleExt *handle, void *userdata) :
        m_handle(handle),
        m_ctx(new CsContext),
        m_cb(handle->m_cb),
-       m_userdata(userdata),
-       m_dispatcher(new Dispatcher(SockId::CS))
+       m_userdata(userdata)
 {
        // disable ask user option for async request for now
        copyKvp<int>(CsContext::Key::CoreUsage);
@@ -60,14 +59,25 @@ void AsyncLogic::scanDir(const std::string &dir)
 {
        auto startTime = ::time(nullptr);
 
+       if (this->m_handle->isStopped())
+               ThrowExcInfo(-999, "Async operation cancelled!");
+
        // Already scanned files are included in history. it'll be skipped later
        // on server side by every single scan_file request.
-       auto retFiles = this->m_dispatcher->methodCall<std::pair<int, std::shared_ptr<StrSet>>>(
+       auto retFiles = this->m_handle->dispatch<std::pair<int, std::shared_ptr<StrSet>>>(
                                                CommandId::GET_SCANNABLE_FILES, dir);
 
-       if (retFiles.first != CSR_ERROR_NONE)
+       if (retFiles.first == -999) {
+               ThrowExcInfo(-999, "Async op cancelled!");
+       } else if (retFiles.first != CSR_ERROR_NONE) {
                ThrowExc(retFiles.first, "Error to get scannalbe files. "
                                 "dir: " << dir << " ret: " << retFiles.first);
+       }
+
+       if (retFiles.second == nullptr) {
+               INFO("No scannable file exist on dir: " << dir);
+               return;
+       }
 
 #ifdef TIZEN_DEBUG_ENABLE
        DEBUG("scannable file list in dir[" << dir <<
@@ -82,7 +92,7 @@ void AsyncLogic::scanDir(const std::string &dir)
 
        auto ts64 = static_cast<int64_t>(startTime);
 
-       auto ret = this->m_dispatcher->methodCall<int>(CommandId::SET_DIR_TIMESTAMP, dir, ts64);
+       auto ret = this->m_handle->dispatch<int>(CommandId::SET_DIR_TIMESTAMP, dir, ts64);
        if (ret != CSR_ERROR_NONE)
                ERROR("Failed to set dir timestamp after scan dir[" << dir << "] with "
                          "ec[" << ret << "] This is server error and not affects to "
@@ -96,7 +106,7 @@ void AsyncLogic::scanFiles(const StrSet &fileSet)
                if (this->m_handle->isStopped())
                        ThrowExcInfo(-999, "Async op cancelled!");
 
-               auto ret = this->m_dispatcher->methodCall<std::pair<int, CsDetected *>>(
+               auto ret = this->m_handle->dispatch<std::pair<int, CsDetected *>>(
                                           CommandId::SCAN_FILE, this->m_ctx, file);
 
                // for auto memory deleting in case of exception
index f245674..a1294ec 100644 (file)
@@ -26,7 +26,6 @@
 
 #include "common/types.h"
 #include "common/cs-context.h"
-#include "common/dispatcher.h"
 #include "client/callback.h"
 #include "client/handle-ext.h"
 
@@ -55,8 +54,6 @@ private:
 
        Callback m_cb;
        void *m_userdata;
-
-       std::unique_ptr<Dispatcher> m_dispatcher;
 };
 
 template<typename T>
index 712538f..98f9a5c 100644 (file)
@@ -390,6 +390,9 @@ int csr_cs_scan_files_async(csr_cs_context_h handle, const char *file_paths[],
        auto task = std::make_shared<Task>([hExt, user_data, fileSet] {
                EXCEPTION_ASYNC_SAFE_START(hExt->m_cb, user_data)
 
+               if (hExt->isStopped())
+                       ThrowExcInfo(-999, "Async operation cancelled!");
+
                auto ret = hExt->dispatch<std::pair<int, std::shared_ptr<StrSet>>>(
                                        CommandId::CANONICALIZE_PATHS, *fileSet);
 
@@ -489,6 +492,9 @@ int csr_cs_scan_dirs_async(csr_cs_context_h handle, const char *dir_paths[],
        auto task = std::make_shared<Task>([hExt, user_data, dirSet] {
                EXCEPTION_ASYNC_SAFE_START(hExt->m_cb, user_data)
 
+               if (hExt->isStopped())
+                       ThrowExcInfo(-999, "Async operation cancelled!");
+
                auto ret = hExt->dispatch<std::pair<int, std::shared_ptr<StrSet>>>(
                                        CommandId::CANONICALIZE_PATHS, *dirSet);
 
@@ -534,6 +540,10 @@ int csr_cs_cancel_scanning(csr_cs_context_h handle)
        if (!hExt->isRunning() || hExt->isStopped())
                return CSR_ERROR_NO_TASK;
 
+       hExt->turnOnStopFlagOnly();
+
+       hExt->ping(CommandId::CANCEL_OPERATION);
+
        hExt->stop();
 
        return CSR_ERROR_NONE;
index 0b89eee..d0e6fda 100644 (file)
@@ -41,14 +41,17 @@ HandleExt::~HandleExt()
                this->m_worker.join();
 }
 
+void HandleExt::turnOnStopFlagOnly()
+{
+       std::lock_guard<std::mutex> l(this->m_flagMutex);
+       this->m_stop = true;
+}
+
 void HandleExt::stop()
 {
        DEBUG("Stop & join worker...");
 
-       {
-               std::lock_guard<std::mutex> l(this->m_flagMutex);
-               this->m_stop = true;
-       }
+       this->turnOnStopFlagOnly();
 
        if (this->m_worker.joinable())
                this->m_worker.join();
index e885d7e..d9789a0 100644 (file)
@@ -37,6 +37,7 @@ public:
        virtual ~HandleExt();
 
        void dispatchAsync(const std::shared_ptr<Task> &task);
+       void turnOnStopFlagOnly(void);
        void stop(void);
        bool isStopped(void) const;
        bool isRunning(void) const;
index 1553e1c..41f1e44 100644 (file)
@@ -30,7 +30,8 @@ namespace Csr {
 namespace Client {
 
 Handle::Handle(SockId id, ContextShPtr &&context) :
-       m_sockId(id), m_ctx(std::forward<ContextShPtr>(context))
+       m_sockId(id), m_ctx(std::forward<ContextShPtr>(context)),
+       m_dispatcher(new Dispatcher(id))
 {
        if (!m_ctx)
                throw std::invalid_argument("context shouldn't be null");
index 6d86645..a398c0a 100644 (file)
@@ -40,6 +40,9 @@ public:
        template<typename Type, typename ...Args>
        Type dispatch(Args &&...);
 
+       template<typename ...Args>
+       void ping(Args &&...);
+
        virtual void add(ResultPtr &&);
        virtual void add(ResultListPtr &&);
 
@@ -52,17 +55,20 @@ protected:
 
 private:
        SockId m_sockId;
-       std::unique_ptr<Dispatcher> m_dispatcher;
        ContextShPtr m_ctx;
+       std::unique_ptr<Dispatcher> m_dispatcher;
 };
 
 template<typename Type, typename ...Args>
 Type Handle::dispatch(Args &&...args)
 {
-       if (m_dispatcher == nullptr)
-               m_dispatcher.reset(new Dispatcher(m_sockId));
+       return this->m_dispatcher->methodCall<Type>(std::forward<Args>(args)...);
+}
 
-       return m_dispatcher->methodCall<Type>(std::forward<Args>(args)...);
+template<typename ...Args>
+void Handle::ping(Args &&...args)
+{
+       this->m_dispatcher->methodPing(std::forward<Args>(args)...);
 }
 
 } // namespace Client
index ee64d25..c4b0936 100644 (file)
@@ -36,6 +36,7 @@ enum class CommandId : int {
        GET_SCANNABLE_FILES = 0x1105,
        CANONICALIZE_PATHS  = 0x1106,
        SET_DIR_TIMESTAMP   = 0x1107,
+       CANCEL_OPERATION    = 0x1108,
        // handle result
        JUDGE_STATUS        = 0x1201,
 
index ea15f9c..9f8e023 100644 (file)
@@ -33,13 +33,10 @@ Dispatcher::Dispatcher(SockId sockId) noexcept : m_sockId(sockId)
 
 void Dispatcher::connect()
 {
-       this->m_connection = std::make_shared<Connection>(
-                       Socket::create(this->m_sockId, Socket::Type::CLIENT));
-}
-
-bool Dispatcher::isConnected() const noexcept
-{
-       return this->m_connection ? true : false;
+       std::lock_guard<std::mutex> l(this->m_connMutex);
+       if (this->m_connection == nullptr)
+               this->m_connection = std::make_shared<Connection>(
+                               Socket::create(this->m_sockId, Socket::Type::CLIENT));
 }
 
 } // namespace Csr
index 211d3e2..9b4f0fb 100644 (file)
@@ -22,6 +22,7 @@
 #pragma once
 
 #include <string>
+#include <mutex>
 
 #include "common/macros.h"
 #include "common/connection.h"
@@ -42,19 +43,21 @@ public:
        template<typename Type, typename ...Args>
        Type methodCall(Args &&...args);
 
+       template<typename ...Args>
+       void methodPing(Args &&...args);
+
 private:
-       bool isConnected(void) const noexcept;
        void connect(void);
 
        SockId m_sockId;
        ConnShPtr m_connection;
+       std::mutex m_connMutex;
 };
 
 template<typename Type, typename ...Args>
 Type Dispatcher::methodCall(Args &&...args)
 {
-       if (!this->isConnected())
-               this->connect();
+       this->connect();
 
        this->m_connection->send(BinaryQueue::Serialize(std::forward<Args>(args)...).pop());
 
@@ -67,4 +70,12 @@ Type Dispatcher::methodCall(Args &&...args)
        return response;
 }
 
+template<typename ...Args>
+void Dispatcher::methodPing(Args &&...args)
+{
+       this->connect();
+
+       this->m_connection->send(BinaryQueue::Serialize(std::forward<Args>(args)...).pop());
+}
+
 }
index d60e1e2..ceacba8 100644 (file)
@@ -177,7 +177,8 @@ RawBuffer Socket::read(void) const
                        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
                                continue;
                        else
-                               ThrowExc(CSR_ERROR_SOCKET, "Socket read failed with errno: " << errno);
+                               ThrowExc(CSR_ERROR_SOCKET, "Socket read failed on fd[" << this->m_fd <<
+                                                "] with errno: " << errno);
                }
 
                total += bytes;
@@ -211,7 +212,8 @@ void Socket::write(const RawBuffer &data) const
                        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
                                continue;
                        else
-                               ThrowExc(CSR_ERROR_SOCKET, "Socket write failed with errno: " << errno);
+                               ThrowExc(CSR_ERROR_SOCKET, "Socket write failed on fd[" << this->m_fd <<
+                                                "] with errno: " << errno);
                }
 
                total += bytes;
index b4da90d..032e45b 100644 (file)
@@ -477,7 +477,7 @@ RawBuffer CsLogic::scanFile(const CsContext &context, const std::string &filepat
 //           4) /opt/usr/apps/org.tizen.message  (app base directory path)
 //           5) /opt/usr/apps/org.tizen.flash    (app base directory path)
 //           % items which has detected history is included in list as well.
-RawBuffer CsLogic::getScannableFiles(const std::string &dir)
+RawBuffer CsLogic::getScannableFiles(const std::string &dir, const std::function<void()> &isCancelled)
 {
        if (this->m_db->getEngineState(CSR_ENGINE_CS) != CSR_STATE_ENABLE)
                ThrowExc(CSR_ERROR_ENGINE_DISABLED, "engine is disabled");
@@ -486,9 +486,13 @@ RawBuffer CsLogic::getScannableFiles(const std::string &dir)
 
        auto visitor = FsVisitor::create(dir, lastScanTime);
 
+       isCancelled();
+
        StrSet fileset;
 
        while (auto file = visitor->next()) {
+               isCancelled();
+
                if (file->isInApp()) {
                        DEBUG("Scannable app: " << file->getAppPkgPath());
                        fileset.insert(file->getAppPkgPath());
index db6a258..ca9ef58 100644 (file)
@@ -23,6 +23,7 @@
 
 #include <memory>
 #include <string>
+#include <functional>
 #include <ctime>
 
 #include "common/types.h"
@@ -45,7 +46,7 @@ public:
 
        RawBuffer scanData(const CsContext &context, const RawBuffer &data);
        RawBuffer scanFile(const CsContext &context, const std::string &filepath);
-       RawBuffer getScannableFiles(const std::string &dir);
+       RawBuffer getScannableFiles(const std::string &dir, const std::function<void()> &isCancelled);
        RawBuffer canonicalizePaths(const StrSet &paths);
        RawBuffer setDirTimestamp(const std::string &dir, time_t ts);
        RawBuffer judgeStatus(const std::string &filepath, csr_cs_action_e action);
index b60f3bb..6b1ac4f 100644 (file)
@@ -56,6 +56,7 @@ std::string cidToString(const CommandId &cid)
        CID_TOSTRING(GET_SCANNABLE_FILES);
        CID_TOSTRING(CANONICALIZE_PATHS);
        CID_TOSTRING(SET_DIR_TIMESTAMP);
+       CID_TOSTRING(CANCEL_OPERATION);
        CID_TOSTRING(JUDGE_STATUS);
 
        CID_TOSTRING(CHECK_URL);
@@ -158,7 +159,38 @@ RawBuffer ServerService::processCs(const ConnShPtr &conn, RawBuffer &data)
                std::string dir;
                q.Deserialize(dir);
 
-               return this->m_cslogic->getScannableFiles(dir);
+               auto fd = conn->getFd();
+
+               {
+                       std::lock_guard<std::mutex> l(this->m_cancelledMutex);
+                       this->m_isCancelled[fd] = false;
+                       INFO("Turn off cancelled flag before scannable files start on fd: " << fd);
+               }
+
+               Closer closer([this, fd]() {
+                       std::lock_guard<std::mutex> l(this->m_cancelledMutex);
+                       this->m_isCancelled.erase(fd);
+                       INFO("Erase cancelled flag in closer on fd: " << fd);
+               });
+
+               return this->m_cslogic->getScannableFiles(dir, [this, fd]() {
+                       std::lock_guard<std::mutex> l(this->m_cancelledMutex);
+                       if (this->m_isCancelled.count(fd) == 1 && this->m_isCancelled[fd])
+                               ThrowExcInfo(-999, "operation cancelled on fd: " << fd);
+               });
+       }
+
+       case CommandId::CANCEL_OPERATION: {
+               std::lock_guard<std::mutex> l(this->m_cancelledMutex);
+               auto fd = conn->getFd();
+               if (this->m_isCancelled.count(fd) == 1) {
+                       this->m_isCancelled[fd] = true;
+                       INFO("Trun on cancelled flag of fd: " << fd);
+               } else {
+                       WARN("Nothing to cancel on getting scannable list! fd: " << fd);
+               }
+
+               return RawBuffer();
        }
 
        case CommandId::CANONICALIZE_PATHS: {
@@ -167,6 +199,20 @@ RawBuffer ServerService::processCs(const ConnShPtr &conn, RawBuffer &data)
                StrSet paths;
                q.Deserialize(paths);
 
+               auto fd = conn->getFd();
+
+               {
+                       std::lock_guard<std::mutex> l(this->m_cancelledMutex);
+                       this->m_isCancelled[fd] = false;
+                       INFO("Turn off cancelled flag before canonicalize paths start on fd: " << fd);
+               }
+
+               Closer closer([this, fd]() {
+                       std::lock_guard<std::mutex> l(this->m_cancelledMutex);
+                       this->m_isCancelled.erase(fd);
+                       INFO("Erase cancelled flag in closer of canonicalize paths on fd: " << fd);
+               });
+
                return this->m_cslogic->canonicalizePaths(paths);
        }
 
@@ -382,7 +428,8 @@ void ServerService::onMessageProcess(const ConnShPtr &connection)
 
                        CpuUsageManager::reset();
 
-                       connection->send(outbuf);
+                       if (!outbuf.empty())
+                               connection->send(outbuf);
                } catch (const std::exception &e) {
                        ERROR("exception on workqueue task: " << e.what());
                        try {
index eb01ac9..2930c47 100644 (file)
@@ -22,6 +22,8 @@
 #pragma once
 
 #include <memory>
+#include <map>
+#include <mutex>
 
 #include "common/service.h"
 #include "common/types.h"
@@ -56,6 +58,18 @@ private:
        std::unique_ptr<CsLogic> m_cslogic;
        std::unique_ptr<WpLogic> m_wplogic;
        std::unique_ptr<EmLogic> m_emlogic;
+
+       std::map<int, bool> m_isCancelled;
+       std::mutex m_cancelledMutex;
+
+       struct Closer {
+               std::function<void()> func;
+               Closer(std::function<void()> &&_func) : func(std::move(_func)) {}
+               ~Closer() {
+                       if (func != nullptr)
+                               func();
+               }
+       };
 };
 
 }
index b2e5005..23d15e7 100644 (file)
@@ -23,6 +23,7 @@
 
 #include <condition_variable>
 #include <thread>
+#include <chrono>
 #include <mutex>
 #include <vector>
 #include <boost/test/unit_test.hpp>
@@ -598,8 +599,34 @@ BOOST_AUTO_TEST_CASE(scan_dirs_negative)
        EXCEPTION_GUARD_END
 }
 
+BOOST_AUTO_TEST_CASE(scan_cancel_positive)
+{
+       EXCEPTION_GUARD_START
+
+       auto c = Test::Context<csr_cs_context_h>();
+       auto context = c.get();
+
+       set_default_callback(context);
+
+       AsyncTestContext testCtx;
+
+       const char *dirs[1] = {
+               "/"
+       };
+
+       // touch a file : in case of no target file to scan, we cannot cancel to scan.
+       Test::touch_file(TEST_FILE_HIGH);
+
+       ASSERT_SUCCESS(csr_cs_scan_dirs_async(context, dirs, 1, &testCtx));
+       ASSERT_SUCCESS(csr_cs_cancel_scanning(context));
+
+       //scanned, detected, completed, cancelled, error
+       ASSERT_CALLBACK(testCtx, -1, -1, 0, 1, 0);
+
+       EXCEPTION_GUARD_END
+}
 
-BOOST_AUTO_TEST_CASE(scan_cancel_positiive)
+BOOST_AUTO_TEST_CASE(scan_cancel_positive_100)
 {
        EXCEPTION_GUARD_START
 
@@ -618,6 +645,39 @@ BOOST_AUTO_TEST_CASE(scan_cancel_positiive)
        Test::touch_file(TEST_FILE_HIGH);
 
        ASSERT_SUCCESS(csr_cs_scan_dirs_async(context, dirs, 1, &testCtx));
+
+       std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+       ASSERT_SUCCESS(csr_cs_cancel_scanning(context));
+
+       //scanned, detected, completed, cancelled, error
+       ASSERT_CALLBACK(testCtx, -1, -1, 0, 1, 0);
+
+       EXCEPTION_GUARD_END
+}
+
+BOOST_AUTO_TEST_CASE(scan_cancel_positive_500)
+{
+       EXCEPTION_GUARD_START
+
+       auto c = Test::Context<csr_cs_context_h>();
+       auto context = c.get();
+
+       set_default_callback(context);
+
+       AsyncTestContext testCtx;
+
+       const char *dirs[1] = {
+               "/"
+       };
+
+       // touch a file : in case of no target file to scan, we cannot cancel to scan.
+       Test::touch_file(TEST_FILE_HIGH);
+
+       ASSERT_SUCCESS(csr_cs_scan_dirs_async(context, dirs, 1, &testCtx));
+
+       std::this_thread::sleep_for(std::chrono::milliseconds(500));
+
        ASSERT_SUCCESS(csr_cs_cancel_scanning(context));
 
        //scanned, detected, completed, cancelled, error