*/
#include "client/handle-ext.h"
-#include <algorithm>
+#include <utility>
-#include "client/utils.h"
-#include "common/dispatcher.h"
#include "common/audit/logger.h"
+#include "common/exception.h"
namespace Csr {
namespace Client {
-HandleExt::HandleExt(ContextShPtr &&context) :
- Handle(std::move(context)),
- m_stop(false)
+HandleExt::HandleExt(SockId id, ContextShPtr &&context) :
+ Handle(id, std::move(context)), m_stop(false), m_isRunning(false)
{
}
HandleExt::~HandleExt()
{
- DEBUG("Destroying extended handle... join all workers...");
- eraseJoinableIf();
+ DEBUG("Destroying extended handle... join worker...");
+ if (this->m_worker.joinable())
+ this->m_worker.join();
}
-void HandleExt::stop()
+void HandleExt::setStopFunc(std::function<void()> &&func)
{
- DEBUG("Stop & join all workers...");
- m_stop = true;
- eraseJoinableIf();
+ std::lock_guard<std::mutex> l(this->m_flagMutex);
+ this->m_stopFunc = std::move(func);
}
-bool HandleExt::isStopped() const
+bool HandleExt::stop()
{
- return m_stop.load();
-}
+ DEBUG("Stop & join worker...");
-void HandleExt::eraseJoinableIf(std::function<bool(const WorkerMapPair &)> pred)
-{
- std::unique_lock<std::mutex> l(m_mutex);
- DEBUG("clean joinable workers! current worker map size: " <<
- m_workerMap.size());
- auto it = m_workerMap.begin();
+ {
+ std::lock_guard<std::mutex> l(this->m_flagMutex);
- while (it != m_workerMap.end()) {
- DEBUG("Worker map traversing to erase! current iter tid: " << it->first);
+ if (!this->m_isRunning || this->m_stop)
+ return false;
- if (!it->second.t.joinable())
- throw std::logic_error(FORMAT("All workers should be joinable "
- "but it isn't. tid: " << it->first));
+ this->m_stop = true;
- if (!pred(*it)) {
- ++it;
- continue;
- }
-
- DEBUG("Joining worker! tid:" << it->first);
- l.unlock();
- it->second.t.join(); // release lock for worker who calls done()
- l.lock();
- DEBUG("Joined worker! tid:" << it->first);
- it = m_workerMap.erase(it);
+ if (this->m_stopFunc != nullptr)
+ this->m_stopFunc();
}
+
+ if (this->m_worker.joinable())
+ this->m_worker.join();
+
+ return true;
}
-void HandleExt::done()
+bool HandleExt::isStopped() const
{
- std::lock_guard<std::mutex> l(m_mutex);
- auto it = m_workerMap.find(std::this_thread::get_id());
-
- if (it == m_workerMap.end())
- throw std::logic_error(FORMAT("worker done but it's not registered in map. "
- "tid: " << std::this_thread::get_id()));
+ std::lock_guard<std::mutex> l(this->m_flagMutex);
+ return this->m_stop;
+}
- it->second.isDone = true;
+bool HandleExt::isRunning() const
+{
+ std::lock_guard<std::mutex> l(this->m_flagMutex);
+ return this->m_isRunning;
}
-void HandleExt::dispatchAsync(const Task &f)
+void HandleExt::dispatchAsync(const std::shared_ptr<Task> &f)
{
- eraseJoinableIf([](const WorkerMapPair & pair) {
- return pair.second.isDone.load();
- });
- // TODO: how to handle exceptions in workers
- std::thread t([this, f] {
- DEBUG("client async thread dispatched! tid: " << std::this_thread::get_id());
+ std::lock_guard<std::mutex> l(this->m_flagMutex);
- f();
- done();
+ if (this->m_isRunning)
+ ThrowExc(CSR_ERROR_BUSY, "Worker is already running. Async is busy!");
- DEBUG("client async thread done! tid: " << std::this_thread::get_id());
- });
+ if (this->m_worker.joinable())
+ this->m_worker.join();
- {
- std::lock_guard<std::mutex> l(m_mutex);
- m_workerMap.emplace(t.get_id(), std::move(t));
- }
-}
+ this->m_isRunning = true;
+ this->m_stop = false;
-HandleExt::Worker::Worker() : isDone(false)
-{
- DEBUG("Worker default constructor called");
-}
+ this->m_worker = std::thread([this, f] {
+ DEBUG("client async thread dispatched! tid: " << std::this_thread::get_id());
-HandleExt::Worker::Worker(std::thread &&_t) :
- isDone(false),
- t(std::forward<std::thread>(_t))
-{
-}
+ {
+ // Wait for client lib API func returned & mutex freed by scoped-lock dtor
+ // This is for invoking registered callbacks follows returning API func
+ std::lock_guard<std::mutex> _l(this->m_dispatchMutex);
+ }
+ (*f)();
-HandleExt::Worker::Worker(HandleExt::Worker &&other) :
- isDone(other.isDone.load()),
- t(std::move(other.t))
-{
-}
+ {
+ std::lock_guard<std::mutex> _l(this->m_flagMutex);
+ this->m_isRunning = false;
+ }
-HandleExt::Worker &HandleExt::Worker::operator=(HandleExt::Worker &&other)
-{
- isDone = other.isDone.load();
- t = std::move(other.t);
- return *this;
+ DEBUG("client async thread done! tid: " << std::this_thread::get_id());
+ });
}
void HandleExt::add(ResultPtr &&ptr)
{
- std::lock_guard<std::mutex> l(m_resultsMutex);
- m_results.emplace_back(std::forward<ResultPtr>(ptr));
+ std::lock_guard<std::mutex> l(this->m_resultsMutex);
+
+ this->m_results.emplace_back(std::move(ptr));
}
void HandleExt::add(ResultListPtr &&ptr)
{
- std::lock_guard<std::mutex> l(m_resultsMutex);
- m_resultLists.emplace_back(std::forward<ResultListPtr>(ptr));
+ std::lock_guard<std::mutex> l(this->m_resultsMutex);
+
+ this->m_resultLists.emplace_back(std::move(ptr));
}
} // namespace Client