ThreadPool and relevant test cases have been added to the devel-api.
Change-Id: I45814c947c7d9206cac22f5b5e856eea0a0d1751
utc-Dali-Texture.cpp
utc-Dali-TextureSet.cpp
utc-Dali-Thread.cpp
+ utc-Dali-ThreadPool.cpp
utc-Dali-TouchEventCombiner.cpp
utc-Dali-TouchProcessing.cpp
utc-Dali-TouchDataProcessing.cpp
--- /dev/null
+/*
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <iostream>
+#include <stdlib.h>
+#include <unistd.h>
+#include <type_traits>
+#include <dali-test-suite-utils.h>
+#include <dali/devel-api/threading/thread-pool.h>
+
+namespace
+{
+Dali::ThreadPool gThreadPool;
+
+// Helper function dividing workload into N batches
+// the loop lambda contains
+Dali::UniqueFutureGroup ForEachMT( Dali::ThreadPool* pThreadPool,
+ uint32_t first,
+ uint32_t size,
+ std::function<void( uint32_t, uint32_t, uint32_t )> task )
+{
+ uint32_t i = 0;
+ uint32_t j = 0;
+ const auto workerCount = uint32_t(pThreadPool->GetWorkerCount());
+ const auto step = size / workerCount;
+ j = workerCount + step;
+
+ std::vector<Dali::Task> tasks;
+ tasks.reserve( workerCount );
+
+ for( auto threadIndex = 0u; threadIndex < workerCount; ++threadIndex )
+ {
+ Dali::Task lambda = [task, i, j]( int workerIndex )
+ {
+ task( uint32_t(workerIndex), i, j );
+ };
+ tasks.emplace_back( lambda );
+ i = j;
+ j = i + step;
+ if( j > size )
+ j = size;
+ }
+ return pThreadPool->SubmitTasks( tasks, workerCount );
+}
+
+}
+
+int UtcDaliThreadPoolMultipleTasks(void)
+{
+ // initialise global thread pool
+ if( !gThreadPool.GetWorkerCount() )
+ {
+ gThreadPool.Initialize( 0u );
+ }
+
+ // populate inputs
+ std::array<int, 8192> inputs;
+ int checksum = 0;
+ for( auto i = 0; i < decltype(i)(inputs.size()); ++i )
+ {
+ inputs[i] = i;
+ checksum += i;
+ }
+
+ // allocate outputs ( number of outputs equals number of worker threads
+ auto workerCount = gThreadPool.GetWorkerCount();
+
+ std::vector<int> outputs;
+ outputs.resize( workerCount );
+ std::fill( outputs.begin(), outputs.end(), 0 );
+
+ // submit
+ auto future = ForEachMT( &gThreadPool, 0, inputs.size(), [&inputs, &outputs]( uint32_t workerIndex, uint32_t begin, uint32_t end )
+ {
+ for( auto i = begin; i < end; ++i )
+ {
+ outputs[workerIndex] += inputs[i];
+ }
+ });
+
+ future->Wait();
+
+ // check outputs
+ int checksum2 = 0;
+ for( auto output : outputs )
+ {
+ checksum2 += output;
+ }
+
+ printf("sum: %d, sum2: %d\n", checksum, checksum2);
+
+
+ DALI_TEST_EQUALS( checksum, checksum2, TEST_LOCATION );
+
+ END_TEST;
+}
+
+int UtcDaliThreadPoolSingleTask(void)
+{
+ // initialise global thread pool
+ if( !gThreadPool.GetWorkerCount() )
+ {
+ gThreadPool.Initialize( 0u );
+ }
+
+ // some long lasting task
+ int counter = 0;
+ auto task = [&counter]( int workerIndex ){
+ for( int i = 0; i < 10; ++i )
+ {
+ counter++;
+ usleep( 16 * 1000 );
+ }
+ };
+
+ auto future = gThreadPool.SubmitTask( 0, task );
+ future->Wait();
+ DALI_TEST_EQUALS( counter, 10, TEST_LOCATION );
+
+ END_TEST;
+}
+
+int UtcDaliThreadPoolSubmitTasksCopyArray(void)
+{
+ // initialise global thread pool
+ if( !gThreadPool.GetWorkerCount() )
+ {
+ gThreadPool.Initialize( 0u );
+ }
+
+ std::array<uint8_t, 1024*1024> dataSrc;
+ for( auto i = 0; i < decltype(i)(dataSrc.size()); ++i)
+ {
+ dataSrc[i] = (std::rand() % 0xff);
+ }
+
+ std::array<uint8_t, 1024*1024> dataDst;
+
+ // each task copies 1kb od data
+ std::vector<Dali::Task> tasks;
+ for( int i = 0; i < 1024; ++i )
+ {
+ auto task = [&dataSrc, &dataDst, i ]( int workerIndex )
+ {
+ for( int k = 0; k < 1024; ++k )
+ {
+ dataDst[i*1024+k] = dataSrc[i*1024+k];
+ }
+ };
+ tasks.push_back( task );
+ }
+
+ DALI_TEST_EQUALS( 1024, tasks.size(), TEST_LOCATION );
+
+ gThreadPool.SubmitTasks( tasks );
+
+ // wait for pool to finish
+ gThreadPool.Wait();
+
+ // compare arrays
+ for( auto i = 0; i < decltype(i)(dataSrc.size()); ++i )
+ {
+ DALI_TEST_EQUALS( dataSrc[i], dataDst[i], TEST_LOCATION );
+ if( dataSrc[i] != dataDst[i] )
+ {
+ break;
+ }
+ }
+
+ END_TEST;
+}
\ No newline at end of file
$(devel_api_src_dir)/threading/conditional-wait.cpp \
$(devel_api_src_dir)/threading/mutex.cpp \
$(devel_api_src_dir)/threading/thread.cpp \
+ $(devel_api_src_dir)/threading/thread-pool.cpp \
$(devel_api_src_dir)/update/frame-callback-interface.cpp \
$(devel_api_src_dir)/update/update-proxy.cpp
devel_api_core_threading_header_files = \
$(devel_api_src_dir)/threading/conditional-wait.h \
$(devel_api_src_dir)/threading/mutex.h \
- $(devel_api_src_dir)/threading/thread.h
+ $(devel_api_src_dir)/threading/thread.h \
+ $(devel_api_src_dir)/threading/thread-pool.h
devel_api_core_update_header_files = \
$(devel_api_src_dir)/update/frame-callback-interface.h \
--- /dev/null
+/*
+ * Copyright (c) 2018 Samsung Electronics Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "thread-pool.h"
+#include <cmath>
+
+
+
+namespace Dali
+{
+namespace
+{
+template<typename T, typename... Args>
+std::unique_ptr<T> make_unique(Args&&... args)
+{
+ return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
+}
+}
+
+/**
+ * WorkerThread executes tasks submitted to the pool
+ */
+class WorkerThread
+{
+public:
+
+ /**
+ * @brief Constructor of worker thread
+ * @param index Thread index assigned to the object during pool initialisation
+ */
+ explicit WorkerThread( uint32_t index );
+
+ /**
+ * @brief Destructor of the worker thread
+ */
+ ~WorkerThread();
+
+ WorkerThread(const WorkerThread &other) = delete;
+ WorkerThread &operator=(const WorkerThread &other) = delete;
+
+ /**
+ * @brief Adds task to the task queue
+ * @param task Task to be executed by the thread
+ */
+ void AddTask( Task task );
+
+ /**
+ * @brief Wakes up thread.
+ */
+ void Notify();
+
+ /**
+ * @brief Waits for the thread to complete all the tasks currently in the queue.
+ */
+ void Wait();
+
+private:
+
+ /**
+ * @brief Internal thread loop function
+ */
+ void WaitAndExecute();
+
+ std::thread mWorker;
+ uint32_t mIndex;
+ TaskQueue mTaskQueue;
+ std::mutex mTaskQueueMutex;
+ std::condition_variable mConditionVariable;
+
+ bool mTerminating {false} ;
+};
+
+void WorkerThread::WaitAndExecute()
+{
+ while( true )
+ {
+ Task task;
+
+ {
+ std::unique_lock< std::mutex > lock{ mTaskQueueMutex };
+
+ mConditionVariable.wait( lock, [ this ]() -> bool {
+ return !mTaskQueue.empty() || mTerminating;
+ } );
+
+ if( mTerminating )
+ {
+ break;
+ }
+
+ task = mTaskQueue.front();
+ }
+
+ task( mIndex );
+
+ {
+ std::lock_guard< std::mutex > lock{ mTaskQueueMutex };
+
+ mTaskQueue.pop();
+
+ mConditionVariable.notify_one();
+ }
+ }
+}
+
+WorkerThread::WorkerThread(uint32_t index) : mIndex( index )
+{
+ // Have to pass "this" as an argument because WaitAndExecute is a member function.
+ mWorker = std::thread{ &WorkerThread::WaitAndExecute, this };
+}
+
+WorkerThread::~WorkerThread()
+{
+ if( mWorker.joinable() )
+ {
+ Notify();
+ Wait();
+
+ {
+ std::lock_guard< std::mutex > lock{ mTaskQueueMutex };
+ mTerminating = true;
+ mConditionVariable.notify_one();
+ }
+
+ mWorker.join();
+ }
+}
+
+void WorkerThread::AddTask( Task task )
+{
+ std::lock_guard< std::mutex > lock{ mTaskQueueMutex };
+ mTaskQueue.push( std::move( task ) );
+ mConditionVariable.notify_one();
+}
+
+void WorkerThread::Notify()
+{
+ std::lock_guard< std::mutex > lock{ mTaskQueueMutex };
+ mConditionVariable.notify_one();
+}
+
+void WorkerThread::Wait()
+{
+ std::unique_lock< std::mutex > lock{ mTaskQueueMutex };
+ mConditionVariable.wait( lock, [ this ]() -> bool {
+ return mTaskQueue.empty();
+ } );
+}
+
+// ThreadPool -----------------------------------------------------------------------------------------------
+
+struct ThreadPool::Impl
+{
+ std::vector<std::unique_ptr<WorkerThread>> mWorkers;
+ uint32_t mWorkerIndex{ 0u };
+};
+
+ThreadPool::ThreadPool()
+{
+ mImpl = make_unique<Impl>();
+}
+
+ThreadPool::~ThreadPool() = default;
+
+bool ThreadPool::Initialize( uint32_t threadCount )
+{
+ /**
+ * Get the system's supported thread count.
+ */
+ auto thread_count = threadCount + 1;
+ if( !threadCount )
+ {
+ thread_count = std::thread::hardware_concurrency();
+ if( !thread_count )
+ {
+ return false;
+ }
+ }
+
+ /**
+ * Spawn the worker threads.
+ */
+ for( auto i = 0u; i < thread_count - 1; i++ )
+ {
+ /**
+ * The workers will execute an infinite loop function
+ * and will wait for a job to enter the job queue. Once a job is in the the queue
+ * the threads will wake up to acquire and execute it.
+ */
+ mImpl->mWorkers.push_back( make_unique< WorkerThread >( i ) );
+ }
+
+ return true;
+}
+
+
+void ThreadPool::Wait()
+{
+ for( auto& worker : mImpl->mWorkers )
+ {
+ worker->Wait();
+ }
+}
+
+SharedFuture ThreadPool::SubmitTask( uint32_t workerIndex, const Task& task )
+{
+ auto future = std::shared_ptr< Future< void > >( new Future< void > );
+ mImpl->mWorkers[workerIndex]->AddTask( [task, future]( uint32_t index )
+ {
+ task( index );
+
+ future->mPromise.set_value();
+ });
+
+ return future;
+}
+
+SharedFuture ThreadPool::SubmitTasks( const std::vector< Task >& tasks )
+{
+ auto future = std::shared_ptr< Future< void > >( new Future< void > );
+
+ mImpl->mWorkers[ mImpl->mWorkerIndex++ % static_cast< uint32_t >( mImpl->mWorkers.size() )]->AddTask(
+ [ future, tasks ]( uint32_t index ) {
+ for( auto& task : tasks )
+ {
+ task( index );
+ }
+
+ future->mPromise.set_value();
+
+ } );
+
+ return future;
+}
+
+UniqueFutureGroup ThreadPool::SubmitTasks( const std::vector< Task >& tasks, uint32_t threadMask )
+{
+ auto retval = make_unique<FutureGroup<void>>();
+
+ /**
+ * Use square root of number of sumbitted tasks to estimate optimal number of threads
+ * used to execute jobs
+ */
+ auto threads = uint32_t(std::log2(float(tasks.size())));
+
+ if( threadMask != 0 )
+ {
+ threads = threadMask;
+ }
+
+ if( threads > mImpl->mWorkers.size() )
+ {
+ threads = uint32_t(mImpl->mWorkers.size());
+ }
+ else if( !threads )
+ {
+ threads = 1;
+ }
+
+ auto payloadPerThread = uint32_t(tasks.size() / threads);
+ auto remaining = uint32_t(tasks.size() % threads);
+
+ uint32_t taskIndex = 0;
+ uint32_t taskSize = uint32_t(remaining + payloadPerThread); // add 'remaining' tasks to the very first job list
+
+ for( auto wt = 0u; wt < threads; ++wt )
+ {
+ auto future = std::shared_ptr< Future< void > >( new Future< void > );
+ retval->mFutures.emplace_back( future );
+ mImpl->mWorkers[ mImpl->mWorkerIndex++ % static_cast< uint32_t >( mImpl->mWorkers.size() )]->AddTask(
+ [ future, tasks, taskIndex, taskSize ]( uint32_t index ) {
+ auto begin = tasks.begin() + int(taskIndex);
+ auto end = begin + int(taskSize);
+ for( auto it = begin; it < end; ++it )
+ {
+ (*it)( index );
+ }
+ future->mPromise.set_value();
+ } );
+
+ taskIndex += taskSize;
+ taskSize = payloadPerThread;
+ }
+
+ return retval;
+}
+
+size_t ThreadPool::GetWorkerCount() const
+{
+ return mImpl->mWorkers.size();
+}
+
+} //namespace Dali
--- /dev/null
+#ifndef DALI_THREAD_POOL_H
+#define DALI_THREAD_POOL_H
+
+/*
+ * Copyright (c) 2019 Samsung Electronics Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// INTERNAL INCLUDES
+#include <dali/public-api/common/dali-common.h>
+
+// EXTERNAL INCLUDES
+#include <thread>
+#include <functional>
+#include <mutex>
+#include <queue>
+#include <condition_variable>
+#include <future>
+#include <algorithm>
+#include <iostream>
+#include <memory>
+
+namespace Dali
+{
+using Task = std::function<void(uint32_t)>;
+
+using TaskQueue = std::queue<Task>;
+
+/**
+ * Future contains the result of submitted task. When queried
+ * it applies internal synchronization mechanism to make sure
+ * the value is available.
+ */
+template<typename T>
+class DALI_INTERNAL Future final
+{
+ friend class ThreadPool;
+
+public:
+
+ /**
+ * @brief Constructor of Future
+ */
+ Future()
+ {
+ mFuture = mPromise.get_future();
+ }
+
+ /**
+ * @brief Destructor of Future
+ */
+ ~Future()
+ {
+ Wait();
+ }
+
+ /**
+ * @brief Returns value of future, blocks if needed.
+ * @return Value stored by the future
+ */
+ T Get() const
+ {
+ return mFuture.get();
+ }
+
+ /**
+ * @brief Waits until the value of future is ready. This function
+ * is a fencing mechanism.
+ */
+ void Wait() const
+ {
+ if( IsValid() )
+ {
+ mFuture.wait();
+ }
+ }
+
+ /**
+ * @brief Tests whether the future is valid
+ * @return True if valid, False otherwise
+ */
+ bool IsValid() const
+ {
+ return mFuture.valid();
+ }
+
+ /**
+ * @brief Resets the future bringing it to the initial state.
+ * It's required in order to reuse the same Future object.
+ */
+ void Reset()
+ {
+ mPromise = std::promise<T>();
+ mFuture = mPromise.get_future();
+ }
+
+private:
+
+ std::promise<T> mPromise{};
+ std::future<T> mFuture{};
+};
+
+using SharedFuture = std::shared_ptr<Future<void>>;
+
+/**
+ * FutureGroup binds many Future objects and applies synchronization.
+ */
+template<typename T>
+class FutureGroup final
+{
+ friend class ThreadPool;
+
+public:
+
+ /**
+ * Waits for all the Futures to complete.
+ */
+ void Wait()
+ {
+ for (auto &future : mFutures)
+ {
+ future->Wait();
+ }
+ }
+
+private:
+
+ std::vector<std::shared_ptr<Future<T> > > mFutures;
+};
+
+using UniqueFutureGroup = std::unique_ptr<FutureGroup<void>>;
+
+
+
+/**
+ * ThreadPool creates and manages worker threads and tasks submitted for execution.
+ */
+class DALI_CORE_API ThreadPool final
+{
+public:
+
+ /**
+ * @brief Constructor of thread pool.
+ */
+ ThreadPool();
+
+ /**
+ * @brief Destructor of thread pool.
+ */
+ ~ThreadPool();
+
+ /**
+ * @brief Intializes thread pool
+ * @param threadCount Number of worker threads to use. If 0 then thread count equals hardware thread count.
+ * @return True if success
+ */
+ bool Initialize( uint32_t threadCount = 0u );
+
+ /**
+ * @brief Waits until all threads finish execution and go back to the idle state.
+ */
+ void Wait();
+
+ /**
+ * @brief Submits a single task to specified ( by the index ) worker thread.
+ * @param workerIndex Index of thread to be used
+ * @param task Task submitted for execution
+ * @return Shared pointer to the Future object
+ */
+ SharedFuture SubmitTask(uint32_t workerIndex, const Task &task);
+
+ /**
+ * @brief Submits vector of tasks to the pool
+ * @param tasks Vector containing tasks to be executed
+ * @return Shared pointer to the Future object
+ */
+ SharedFuture SubmitTasks(const std::vector<Task>& tasks);
+
+ /**
+ * @brief Submits tasks to threads specified by thread mask.
+ * @param tasks Vector of tasks
+ * @param threadMask Mask of threads to be used or 0 to use all available threads
+ * @return Unique pointer to the FutureGroup object
+ */
+ UniqueFutureGroup SubmitTasks(const std::vector<Task>& tasks, uint32_t threadMask);
+
+ /**
+ * @brief Returns number of worker threads
+ * @return Number of worker threads
+ */
+ size_t GetWorkerCount() const;
+
+private:
+
+ struct Impl;
+ std::unique_ptr<Impl> mImpl;
+};
+
+} //namespace Dali
+
+#endif // DALI_THREAD_POOL_H