auto hExt = reinterpret_cast<Client::HandleExt *>(handle);
- if (hExt->hasRunning()) {
+ if (hExt->isRunning()) {
ERROR("Async scanning already running with this handle.");
return CSR_ERROR_BUSY;
}
fileSet->emplace(Client::getAbsolutePath(file_paths[i]));
}
- hExt->dispatchAsync([hExt, user_data, fileSet] {
+ auto task = std::make_shared<Task>([hExt, user_data, fileSet] {
auto ret = hExt->dispatch<std::pair<int, std::shared_ptr<StrSet>>>(
CommandId::CANONICALIZE_PATHS, *fileSet);
l.scanFiles(*canonicalizedFiles).second();
});
+ hExt->dispatchAsync(task);
+
return CSR_ERROR_NONE;
EXCEPTION_SAFE_END
auto hExt = reinterpret_cast<Client::HandleExt *>(handle);
- if (hExt->hasRunning()) {
+ if (hExt->isRunning()) {
ERROR("Async scanning already running with this handle.");
return CSR_ERROR_BUSY;
}
auto dir = std::make_shared<std::string>(Client::getAbsolutePath(dir_path));
- hExt->dispatchAsync([hExt, user_data, dir] {
+ auto task = std::make_shared<Task>([hExt, user_data, dir] {
Client::AsyncLogic l(hExt, user_data, [&hExt] { return hExt->isStopped(); });
l.scanDir(*dir).second();
});
+ hExt->dispatchAsync(task);
+
return CSR_ERROR_NONE;
EXCEPTION_SAFE_END
auto hExt = reinterpret_cast<Client::HandleExt *>(handle);
- if (hExt->hasRunning()) {
+ if (hExt->isRunning()) {
ERROR("Async scanning already running with this handle.");
return CSR_ERROR_BUSY;
}
dirSet->insert(Client::getAbsolutePath(dir_paths[i]));
}
- hExt->dispatchAsync([hExt, user_data, dirSet] {
+ auto task = std::make_shared<Task>([hExt, user_data, dirSet] {
auto ret = hExt->dispatch<std::pair<int, std::shared_ptr<StrSet>>>(
CommandId::CANONICALIZE_PATHS, *dirSet);
l.scanDirs(*canonicalizedDirs).second();
});
+ hExt->dispatchAsync(task);
+
return CSR_ERROR_NONE;
EXCEPTION_SAFE_END
auto hExt = reinterpret_cast<Client::HandleExt *>(handle);
- if (!hExt->hasRunning() || hExt->isStopped())
+ if (!hExt->isRunning() || hExt->isStopped())
return CSR_ERROR_NO_TASK;
hExt->stop();
*/
#include "client/handle-ext.h"
-#include <algorithm>
+#include <utility>
-#include "client/utils.h"
-#include "common/dispatcher.h"
#include "common/audit/logger.h"
#include "common/exception.h"
namespace Client {
HandleExt::HandleExt(SockId id, ContextShPtr &&context) :
- Handle(id, std::move(context)),
- m_stop(false)
+ Handle(id, std::move(context)), m_stop(false), m_isRunning(false)
{
}
HandleExt::~HandleExt()
{
- DEBUG("Destroying extended handle... join all workers...");
- eraseJoinableIf();
+ DEBUG("Destroying extended handle... join worker...");
+ if (this->m_worker.joinable())
+ this->m_worker.join();
}
void HandleExt::stop()
{
- DEBUG("Stop & join all workers...");
- m_stop = true;
- eraseJoinableIf();
-}
-
-bool HandleExt::isStopped() const noexcept
-{
- return m_stop.load();
-}
-
-bool HandleExt::hasRunning()
-{
- std::unique_lock<std::mutex> l(m_mutex);
- auto it = m_workerMap.begin();
+ DEBUG("Stop & join worker...");
- while (it != m_workerMap.end()) {
- if (!it->second.isDone.load())
- return true;
+ {
+ std::lock_guard<std::mutex> l(this->m_flagMutex);
+ this->m_stop = true;
}
- return false;
+ if (this->m_worker.joinable())
+ this->m_worker.join();
}
-void HandleExt::eraseJoinableIf(std::function<bool(const WorkerMapPair &)> pred)
+bool HandleExt::isStopped() const
{
- std::unique_lock<std::mutex> l(m_mutex);
- DEBUG("clean joinable workers! current worker map size: " <<
- m_workerMap.size());
- auto it = m_workerMap.begin();
-
- while (it != m_workerMap.end()) {
- DEBUG("Worker map traversing to erase! current iter tid: " << it->first);
-
- if (!it->second.t.joinable())
- ThrowExc(InternalError, "All workers should be joinable but it isn't. "
- "tid: " << it->first);
-
- if (!pred(*it)) {
- ++it;
- continue;
- }
-
- DEBUG("Joining worker! tid:" << it->first);
- l.unlock();
- it->second.t.join(); // release lock for worker who calls done()
- l.lock();
- DEBUG("Joined worker! tid:" << it->first);
- it = m_workerMap.erase(it);
- }
+ std::lock_guard<std::mutex> l(this->m_flagMutex);
+ return this->m_stop;
}
-void HandleExt::done()
+bool HandleExt::isRunning() const
{
- std::lock_guard<std::mutex> l(m_mutex);
- auto it = m_workerMap.find(std::this_thread::get_id());
-
- if (it == m_workerMap.end())
- ThrowExc(InternalError, "worker done but it's not registered in map. "
- "tid: " << std::this_thread::get_id());
-
- it->second.isDone = true;
+ std::lock_guard<std::mutex> l(this->m_flagMutex);
+ return this->m_isRunning;
}
-void HandleExt::dispatchAsync(const Task &f)
+void HandleExt::dispatchAsync(const std::shared_ptr<Task> &f)
{
- eraseJoinableIf([](const WorkerMapPair & pair) {
- return pair.second.isDone.load();
- });
- // TODO: how to handle exceptions in workers
- std::thread t([this, f] {
- DEBUG("client async thread dispatched! tid: " << std::this_thread::get_id());
- m_stop = false;
+ std::lock_guard<std::mutex> l(this->m_flagMutex);
- f();
- done();
+ if (this->m_isRunning)
+ ThrowExc(BusyError, "Worker is already running. Async is busy!");
- DEBUG("client async thread done! tid: " << std::this_thread::get_id());
- });
+ if (this->m_worker.joinable())
+ this->m_worker.join();
- {
- std::lock_guard<std::mutex> l(m_mutex);
- m_workerMap.emplace(t.get_id(), std::move(t));
- }
-}
+ this->m_isRunning = true;
+ this->m_stop = false;
-HandleExt::Worker::Worker() : isDone(false)
-{
- DEBUG("Worker default constructor called");
-}
+ // TODO: how to handle exceptions in workers
+ this->m_worker = std::thread([this, f] {
+ DEBUG("client async thread dispatched! tid: " << std::this_thread::get_id());
-HandleExt::Worker::Worker(std::thread &&_t) :
- isDone(false),
- t(std::forward<std::thread>(_t))
-{
-}
+ (*f)();
-HandleExt::Worker::Worker(HandleExt::Worker &&other) :
- isDone(other.isDone.load()),
- t(std::move(other.t))
-{
-}
+ {
+ std::lock_guard<std::mutex> _l(this->m_flagMutex);
+ this->m_isRunning = false;
+ }
-HandleExt::Worker &HandleExt::Worker::operator=(HandleExt::Worker &&other)
-{
- isDone = other.isDone.load();
- t = std::move(other.t);
- return *this;
+ DEBUG("client async thread done! tid: " << std::this_thread::get_id());
+ });
}
void HandleExt::add(ResultPtr &&ptr)
{
- std::lock_guard<std::mutex> l(m_resultsMutex);
- m_results.emplace_back(std::forward<ResultPtr>(ptr));
+ std::lock_guard<std::mutex> l(this->m_resultsMutex);
+
+ this->m_results.emplace_back(std::move(ptr));
}
void HandleExt::add(ResultListPtr &&ptr)
{
- std::lock_guard<std::mutex> l(m_resultsMutex);
- m_resultLists.emplace_back(std::forward<ResultListPtr>(ptr));
+ std::lock_guard<std::mutex> l(this->m_resultsMutex);
+
+ this->m_resultLists.emplace_back(std::move(ptr));
}
} // namespace Client
#include <mutex>
#include <thread>
-#include <atomic>
-
-#include <string>
-#include <map>
-#include <set>
-#include <utility>
+#include <memory>
#include "client/handle.h"
#include "client/callback.h"
-#include "common/icontext.h"
namespace Csr {
namespace Client {
explicit HandleExt(SockId id, ContextShPtr &&);
virtual ~HandleExt();
- void dispatchAsync(const Task &task);
+ void dispatchAsync(const std::shared_ptr<Task> &task);
void stop(void);
- bool isStopped(void) const noexcept;
- bool hasRunning(void);
+ bool isStopped(void) const;
+ bool isRunning(void) const;
Callback m_cb; // TODO: to refine..
virtual void add(ResultListPtr &&) override;
private:
- struct Worker {
- std::atomic<bool> isDone;
- std::thread t;
-
- Worker();
- Worker(const std::thread &_t) = delete; // to prevent thread instance copied
- Worker(std::thread &&_t);
- Worker(Worker &&other);
- Worker &operator=(Worker &&other);
- };
-
- using WorkerMapPair = std::pair<const std::thread::id, Worker>;
-
- void eraseJoinableIf(std::function<bool(const WorkerMapPair &)>
- = [](const WorkerMapPair &)
- {
- return true;
- });
- void done(void);
-
- std::atomic<bool> m_stop;
- std::mutex m_mutex;
- std::mutex m_resultsMutex;
- std::map<std::thread::id, Worker> m_workerMap;
+ bool m_stop;
+ bool m_isRunning;
+ std::thread m_worker;
+ mutable std::mutex m_resultsMutex;
+ mutable std::mutex m_flagMutex;
};
} // namespace Client