m_handle(handle),
m_ctx(new CsContext),
m_cb(handle->m_cb),
- m_userdata(userdata),
- m_dispatcher(new Dispatcher(SockId::CS))
+ m_userdata(userdata)
{
// disable ask user option for async request for now
copyKvp<int>(CsContext::Key::CoreUsage);
{
auto startTime = ::time(nullptr);
+ if (this->m_handle->isStopped())
+ ThrowExcInfo(-999, "Async operation cancelled!");
+
// Already scanned files are included in history. it'll be skipped later
// on server side by every single scan_file request.
- auto retFiles = this->m_dispatcher->methodCall<std::pair<int, std::shared_ptr<StrSet>>>(
+ auto retFiles = this->m_handle->dispatch<std::pair<int, std::shared_ptr<StrSet>>>(
CommandId::GET_SCANNABLE_FILES, dir);
- if (retFiles.first != CSR_ERROR_NONE)
+ if (retFiles.first == -999) {
+ ThrowExcInfo(-999, "Async op cancelled!");
+ } else if (retFiles.first != CSR_ERROR_NONE) {
ThrowExc(retFiles.first, "Error to get scannalbe files. "
"dir: " << dir << " ret: " << retFiles.first);
+ }
+
+ if (retFiles.second == nullptr) {
+ INFO("No scannable file exist on dir: " << dir);
+ return;
+ }
#ifdef TIZEN_DEBUG_ENABLE
DEBUG("scannable file list in dir[" << dir <<
auto ts64 = static_cast<int64_t>(startTime);
- auto ret = this->m_dispatcher->methodCall<int>(CommandId::SET_DIR_TIMESTAMP, dir, ts64);
+ auto ret = this->m_handle->dispatch<int>(CommandId::SET_DIR_TIMESTAMP, dir, ts64);
if (ret != CSR_ERROR_NONE)
ERROR("Failed to set dir timestamp after scan dir[" << dir << "] with "
"ec[" << ret << "] This is server error and not affects to "
if (this->m_handle->isStopped())
ThrowExcInfo(-999, "Async op cancelled!");
- auto ret = this->m_dispatcher->methodCall<std::pair<int, CsDetected *>>(
+ auto ret = this->m_handle->dispatch<std::pair<int, CsDetected *>>(
CommandId::SCAN_FILE, this->m_ctx, file);
// for auto memory deleting in case of exception
#include "common/types.h"
#include "common/cs-context.h"
-#include "common/dispatcher.h"
#include "client/callback.h"
#include "client/handle-ext.h"
Callback m_cb;
void *m_userdata;
-
- std::unique_ptr<Dispatcher> m_dispatcher;
};
template<typename T>
auto task = std::make_shared<Task>([hExt, user_data, fileSet] {
EXCEPTION_ASYNC_SAFE_START(hExt->m_cb, user_data)
+ if (hExt->isStopped())
+ ThrowExcInfo(-999, "Async operation cancelled!");
+
auto ret = hExt->dispatch<std::pair<int, std::shared_ptr<StrSet>>>(
CommandId::CANONICALIZE_PATHS, *fileSet);
auto task = std::make_shared<Task>([hExt, user_data, dirSet] {
EXCEPTION_ASYNC_SAFE_START(hExt->m_cb, user_data)
+ if (hExt->isStopped())
+ ThrowExcInfo(-999, "Async operation cancelled!");
+
auto ret = hExt->dispatch<std::pair<int, std::shared_ptr<StrSet>>>(
CommandId::CANONICALIZE_PATHS, *dirSet);
if (!hExt->isRunning() || hExt->isStopped())
return CSR_ERROR_NO_TASK;
+ hExt->turnOnStopFlagOnly();
+
+ hExt->ping(CommandId::CANCEL_OPERATION);
+
hExt->stop();
return CSR_ERROR_NONE;
this->m_worker.join();
}
+void HandleExt::turnOnStopFlagOnly()
+{
+ std::lock_guard<std::mutex> l(this->m_flagMutex);
+ this->m_stop = true;
+}
+
void HandleExt::stop()
{
DEBUG("Stop & join worker...");
- {
- std::lock_guard<std::mutex> l(this->m_flagMutex);
- this->m_stop = true;
- }
+ this->turnOnStopFlagOnly();
if (this->m_worker.joinable())
this->m_worker.join();
virtual ~HandleExt();
void dispatchAsync(const std::shared_ptr<Task> &task);
+ void turnOnStopFlagOnly(void);
void stop(void);
bool isStopped(void) const;
bool isRunning(void) const;
namespace Client {
Handle::Handle(SockId id, ContextShPtr &&context) :
- m_sockId(id), m_ctx(std::forward<ContextShPtr>(context))
+ m_sockId(id), m_ctx(std::forward<ContextShPtr>(context)),
+ m_dispatcher(new Dispatcher(id))
{
if (!m_ctx)
throw std::invalid_argument("context shouldn't be null");
template<typename Type, typename ...Args>
Type dispatch(Args &&...);
+ template<typename ...Args>
+ void ping(Args &&...);
+
virtual void add(ResultPtr &&);
virtual void add(ResultListPtr &&);
private:
SockId m_sockId;
- std::unique_ptr<Dispatcher> m_dispatcher;
ContextShPtr m_ctx;
+ std::unique_ptr<Dispatcher> m_dispatcher;
};
template<typename Type, typename ...Args>
Type Handle::dispatch(Args &&...args)
{
- if (m_dispatcher == nullptr)
- m_dispatcher.reset(new Dispatcher(m_sockId));
+ return this->m_dispatcher->methodCall<Type>(std::forward<Args>(args)...);
+}
- return m_dispatcher->methodCall<Type>(std::forward<Args>(args)...);
+template<typename ...Args>
+void Handle::ping(Args &&...args)
+{
+ this->m_dispatcher->methodPing(std::forward<Args>(args)...);
}
} // namespace Client
GET_SCANNABLE_FILES = 0x1105,
CANONICALIZE_PATHS = 0x1106,
SET_DIR_TIMESTAMP = 0x1107,
+ CANCEL_OPERATION = 0x1108,
// handle result
JUDGE_STATUS = 0x1201,
void Dispatcher::connect()
{
- this->m_connection = std::make_shared<Connection>(
- Socket::create(this->m_sockId, Socket::Type::CLIENT));
-}
-
-bool Dispatcher::isConnected() const noexcept
-{
- return this->m_connection ? true : false;
+ std::lock_guard<std::mutex> l(this->m_connMutex);
+ if (this->m_connection == nullptr)
+ this->m_connection = std::make_shared<Connection>(
+ Socket::create(this->m_sockId, Socket::Type::CLIENT));
}
} // namespace Csr
#pragma once
#include <string>
+#include <mutex>
#include "common/macros.h"
#include "common/connection.h"
template<typename Type, typename ...Args>
Type methodCall(Args &&...args);
+ template<typename ...Args>
+ void methodPing(Args &&...args);
+
private:
- bool isConnected(void) const noexcept;
void connect(void);
SockId m_sockId;
ConnShPtr m_connection;
+ std::mutex m_connMutex;
};
template<typename Type, typename ...Args>
Type Dispatcher::methodCall(Args &&...args)
{
- if (!this->isConnected())
- this->connect();
+ this->connect();
this->m_connection->send(BinaryQueue::Serialize(std::forward<Args>(args)...).pop());
return response;
}
+template<typename ...Args>
+void Dispatcher::methodPing(Args &&...args)
+{
+ this->connect();
+
+ this->m_connection->send(BinaryQueue::Serialize(std::forward<Args>(args)...).pop());
+}
+
}
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
continue;
else
- ThrowExc(CSR_ERROR_SOCKET, "Socket read failed with errno: " << errno);
+ ThrowExc(CSR_ERROR_SOCKET, "Socket read failed on fd[" << this->m_fd <<
+ "] with errno: " << errno);
}
total += bytes;
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
continue;
else
- ThrowExc(CSR_ERROR_SOCKET, "Socket write failed with errno: " << errno);
+ ThrowExc(CSR_ERROR_SOCKET, "Socket write failed on fd[" << this->m_fd <<
+ "] with errno: " << errno);
}
total += bytes;
// 4) /opt/usr/apps/org.tizen.message (app base directory path)
// 5) /opt/usr/apps/org.tizen.flash (app base directory path)
// % items which has detected history is included in list as well.
-RawBuffer CsLogic::getScannableFiles(const std::string &dir)
+RawBuffer CsLogic::getScannableFiles(const std::string &dir, const std::function<void()> &isCancelled)
{
if (this->m_db->getEngineState(CSR_ENGINE_CS) != CSR_STATE_ENABLE)
ThrowExc(CSR_ERROR_ENGINE_DISABLED, "engine is disabled");
auto visitor = FsVisitor::create(dir, lastScanTime);
+ isCancelled();
+
StrSet fileset;
while (auto file = visitor->next()) {
+ isCancelled();
+
if (file->isInApp()) {
DEBUG("Scannable app: " << file->getAppPkgPath());
fileset.insert(file->getAppPkgPath());
#include <memory>
#include <string>
+#include <functional>
#include <ctime>
#include "common/types.h"
RawBuffer scanData(const CsContext &context, const RawBuffer &data);
RawBuffer scanFile(const CsContext &context, const std::string &filepath);
- RawBuffer getScannableFiles(const std::string &dir);
+ RawBuffer getScannableFiles(const std::string &dir, const std::function<void()> &isCancelled);
RawBuffer canonicalizePaths(const StrSet &paths);
RawBuffer setDirTimestamp(const std::string &dir, time_t ts);
RawBuffer judgeStatus(const std::string &filepath, csr_cs_action_e action);
CID_TOSTRING(GET_SCANNABLE_FILES);
CID_TOSTRING(CANONICALIZE_PATHS);
CID_TOSTRING(SET_DIR_TIMESTAMP);
+ CID_TOSTRING(CANCEL_OPERATION);
CID_TOSTRING(JUDGE_STATUS);
CID_TOSTRING(CHECK_URL);
std::string dir;
q.Deserialize(dir);
- return this->m_cslogic->getScannableFiles(dir);
+ auto fd = conn->getFd();
+
+ {
+ std::lock_guard<std::mutex> l(this->m_cancelledMutex);
+ this->m_isCancelled[fd] = false;
+ INFO("Turn off cancelled flag before scannable files start on fd: " << fd);
+ }
+
+ Closer closer([this, fd]() {
+ std::lock_guard<std::mutex> l(this->m_cancelledMutex);
+ this->m_isCancelled.erase(fd);
+ INFO("Erase cancelled flag in closer on fd: " << fd);
+ });
+
+ return this->m_cslogic->getScannableFiles(dir, [this, fd]() {
+ std::lock_guard<std::mutex> l(this->m_cancelledMutex);
+ if (this->m_isCancelled.count(fd) == 1 && this->m_isCancelled[fd])
+ ThrowExcInfo(-999, "operation cancelled on fd: " << fd);
+ });
+ }
+
+ case CommandId::CANCEL_OPERATION: {
+ std::lock_guard<std::mutex> l(this->m_cancelledMutex);
+ auto fd = conn->getFd();
+ if (this->m_isCancelled.count(fd) == 1) {
+ this->m_isCancelled[fd] = true;
+ INFO("Trun on cancelled flag of fd: " << fd);
+ } else {
+ WARN("Nothing to cancel on getting scannable list! fd: " << fd);
+ }
+
+ return RawBuffer();
}
case CommandId::CANONICALIZE_PATHS: {
StrSet paths;
q.Deserialize(paths);
+ auto fd = conn->getFd();
+
+ {
+ std::lock_guard<std::mutex> l(this->m_cancelledMutex);
+ this->m_isCancelled[fd] = false;
+ INFO("Turn off cancelled flag before canonicalize paths start on fd: " << fd);
+ }
+
+ Closer closer([this, fd]() {
+ std::lock_guard<std::mutex> l(this->m_cancelledMutex);
+ this->m_isCancelled.erase(fd);
+ INFO("Erase cancelled flag in closer of canonicalize paths on fd: " << fd);
+ });
+
return this->m_cslogic->canonicalizePaths(paths);
}
CpuUsageManager::reset();
- connection->send(outbuf);
+ if (!outbuf.empty())
+ connection->send(outbuf);
} catch (const std::exception &e) {
ERROR("exception on workqueue task: " << e.what());
try {
#pragma once
#include <memory>
+#include <map>
+#include <mutex>
#include "common/service.h"
#include "common/types.h"
std::unique_ptr<CsLogic> m_cslogic;
std::unique_ptr<WpLogic> m_wplogic;
std::unique_ptr<EmLogic> m_emlogic;
+
+ std::map<int, bool> m_isCancelled;
+ std::mutex m_cancelledMutex;
+
+ struct Closer {
+ std::function<void()> func;
+ Closer(std::function<void()> &&_func) : func(std::move(_func)) {}
+ ~Closer() {
+ if (func != nullptr)
+ func();
+ }
+ };
};
}
#include <condition_variable>
#include <thread>
+#include <chrono>
#include <mutex>
#include <vector>
#include <boost/test/unit_test.hpp>
EXCEPTION_GUARD_END
}
+BOOST_AUTO_TEST_CASE(scan_cancel_positive)
+{
+ EXCEPTION_GUARD_START
+
+ auto c = Test::Context<csr_cs_context_h>();
+ auto context = c.get();
+
+ set_default_callback(context);
+
+ AsyncTestContext testCtx;
+
+ const char *dirs[1] = {
+ "/"
+ };
+
+ // touch a file : in case of no target file to scan, we cannot cancel to scan.
+ Test::touch_file(TEST_FILE_HIGH);
+
+ ASSERT_SUCCESS(csr_cs_scan_dirs_async(context, dirs, 1, &testCtx));
+ ASSERT_SUCCESS(csr_cs_cancel_scanning(context));
+
+ //scanned, detected, completed, cancelled, error
+ ASSERT_CALLBACK(testCtx, -1, -1, 0, 1, 0);
+
+ EXCEPTION_GUARD_END
+}
-BOOST_AUTO_TEST_CASE(scan_cancel_positiive)
+BOOST_AUTO_TEST_CASE(scan_cancel_positive_100)
{
EXCEPTION_GUARD_START
Test::touch_file(TEST_FILE_HIGH);
ASSERT_SUCCESS(csr_cs_scan_dirs_async(context, dirs, 1, &testCtx));
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+ ASSERT_SUCCESS(csr_cs_cancel_scanning(context));
+
+ //scanned, detected, completed, cancelled, error
+ ASSERT_CALLBACK(testCtx, -1, -1, 0, 1, 0);
+
+ EXCEPTION_GUARD_END
+}
+
+BOOST_AUTO_TEST_CASE(scan_cancel_positive_500)
+{
+ EXCEPTION_GUARD_START
+
+ auto c = Test::Context<csr_cs_context_h>();
+ auto context = c.get();
+
+ set_default_callback(context);
+
+ AsyncTestContext testCtx;
+
+ const char *dirs[1] = {
+ "/"
+ };
+
+ // touch a file : in case of no target file to scan, we cannot cancel to scan.
+ Test::touch_file(TEST_FILE_HIGH);
+
+ ASSERT_SUCCESS(csr_cs_scan_dirs_async(context, dirs, 1, &testCtx));
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+
ASSERT_SUCCESS(csr_cs_cancel_scanning(context));
//scanned, detected, completed, cancelled, error