Added ThreadPool 25/209225/3
authoradam.b <adam.b@samsung.com>
Wed, 3 Jul 2019 15:05:49 +0000 (16:05 +0100)
committeradam.b <adam.b@samsung.com>
Thu, 4 Jul 2019 10:52:00 +0000 (11:52 +0100)
ThreadPool and relevant test cases have been added to the devel-api.

Change-Id: I45814c947c7d9206cac22f5b5e856eea0a0d1751

automated-tests/src/dali/CMakeLists.txt
automated-tests/src/dali/utc-Dali-ThreadPool.cpp [new file with mode: 0644]
dali/devel-api/file.list
dali/devel-api/threading/thread-pool.cpp [new file with mode: 0644]
dali/devel-api/threading/thread-pool.h [new file with mode: 0644]

index 8267394..17c4767 100644 (file)
@@ -93,6 +93,7 @@ SET(TC_SOURCES
         utc-Dali-Texture.cpp
         utc-Dali-TextureSet.cpp
         utc-Dali-Thread.cpp
+        utc-Dali-ThreadPool.cpp
         utc-Dali-TouchEventCombiner.cpp
         utc-Dali-TouchProcessing.cpp
         utc-Dali-TouchDataProcessing.cpp
diff --git a/automated-tests/src/dali/utc-Dali-ThreadPool.cpp b/automated-tests/src/dali/utc-Dali-ThreadPool.cpp
new file mode 100644 (file)
index 0000000..c7c6335
--- /dev/null
@@ -0,0 +1,185 @@
+/*
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <iostream>
+#include <stdlib.h>
+#include <unistd.h>
+#include <type_traits>
+#include <dali-test-suite-utils.h>
+#include <dali/devel-api/threading/thread-pool.h>
+
+namespace
+{
+Dali::ThreadPool gThreadPool;
+
+// Helper function dividing workload into N batches
+// the loop lambda contains
+Dali::UniqueFutureGroup ForEachMT( Dali::ThreadPool* pThreadPool,
+                                   uint32_t first,
+                                   uint32_t size,
+                                   std::function<void( uint32_t, uint32_t, uint32_t )> task )
+{
+  uint32_t i = 0;
+  uint32_t j = 0;
+  const auto workerCount = uint32_t(pThreadPool->GetWorkerCount());
+  const auto step = size / workerCount;
+  j = workerCount + step;
+
+  std::vector<Dali::Task> tasks;
+  tasks.reserve( workerCount );
+
+  for( auto threadIndex = 0u; threadIndex < workerCount; ++threadIndex )
+  {
+    Dali::Task lambda = [task, i, j]( int workerIndex )
+    {
+      task( uint32_t(workerIndex), i, j );
+    };
+    tasks.emplace_back( lambda );
+    i = j;
+    j = i + step;
+    if( j > size )
+      j = size;
+  }
+  return pThreadPool->SubmitTasks( tasks, workerCount );
+}
+
+}
+
+int UtcDaliThreadPoolMultipleTasks(void)
+{
+  // initialise global thread pool
+  if( !gThreadPool.GetWorkerCount() )
+  {
+    gThreadPool.Initialize( 0u );
+  }
+
+  // populate inputs
+  std::array<int, 8192> inputs;
+  int checksum = 0;
+  for( auto i = 0; i < decltype(i)(inputs.size()); ++i )
+  {
+    inputs[i] = i;
+    checksum += i;
+  }
+
+  // allocate outputs ( number of outputs equals number of worker threads
+  auto workerCount = gThreadPool.GetWorkerCount();
+
+  std::vector<int> outputs;
+  outputs.resize( workerCount );
+  std::fill( outputs.begin(), outputs.end(), 0 );
+
+  // submit
+  auto future = ForEachMT( &gThreadPool, 0, inputs.size(), [&inputs, &outputs]( uint32_t workerIndex, uint32_t begin, uint32_t end )
+  {
+    for( auto i = begin; i < end; ++i )
+    {
+      outputs[workerIndex] += inputs[i];
+    }
+  });
+
+  future->Wait();
+
+  // check outputs
+  int checksum2 = 0;
+  for( auto output : outputs )
+  {
+    checksum2 += output;
+  }
+
+  printf("sum: %d, sum2: %d\n", checksum, checksum2);
+
+
+  DALI_TEST_EQUALS( checksum, checksum2, TEST_LOCATION );
+
+  END_TEST;
+}
+
+int UtcDaliThreadPoolSingleTask(void)
+{
+  // initialise global thread pool
+  if( !gThreadPool.GetWorkerCount() )
+  {
+    gThreadPool.Initialize( 0u );
+  }
+
+  // some long lasting task
+  int counter = 0;
+  auto task = [&counter]( int workerIndex ){
+    for( int i = 0; i < 10; ++i )
+    {
+      counter++;
+      usleep( 16 * 1000 );
+    }
+  };
+
+  auto future = gThreadPool.SubmitTask( 0, task );
+  future->Wait();
+  DALI_TEST_EQUALS( counter, 10, TEST_LOCATION );
+
+  END_TEST;
+}
+
+int UtcDaliThreadPoolSubmitTasksCopyArray(void)
+{
+  // initialise global thread pool
+  if( !gThreadPool.GetWorkerCount() )
+  {
+    gThreadPool.Initialize( 0u );
+  }
+
+  std::array<uint8_t, 1024*1024> dataSrc;
+  for( auto i = 0; i < decltype(i)(dataSrc.size()); ++i)
+  {
+    dataSrc[i] = (std::rand() % 0xff);
+  }
+
+  std::array<uint8_t, 1024*1024> dataDst;
+
+  // each task copies 1kb od data
+  std::vector<Dali::Task> tasks;
+  for( int i = 0; i < 1024; ++i )
+  {
+    auto task = [&dataSrc, &dataDst, i ]( int workerIndex )
+    {
+      for( int k = 0; k < 1024; ++k )
+      {
+        dataDst[i*1024+k] = dataSrc[i*1024+k];
+      }
+    };
+    tasks.push_back( task );
+  }
+
+  DALI_TEST_EQUALS( 1024, tasks.size(), TEST_LOCATION );
+
+  gThreadPool.SubmitTasks( tasks );
+
+  // wait for pool to finish
+  gThreadPool.Wait();
+
+  // compare arrays
+  for( auto i = 0; i < decltype(i)(dataSrc.size()); ++i )
+  {
+    DALI_TEST_EQUALS( dataSrc[i], dataDst[i], TEST_LOCATION );
+    if( dataSrc[i] != dataDst[i]  )
+    {
+      break;
+    }
+  }
+
+  END_TEST;
+}
\ No newline at end of file
index 06b2fb7..42b3256 100755 (executable)
@@ -21,6 +21,7 @@ devel_api_src_files = \
   $(devel_api_src_dir)/threading/conditional-wait.cpp \
   $(devel_api_src_dir)/threading/mutex.cpp \
   $(devel_api_src_dir)/threading/thread.cpp \
+  $(devel_api_src_dir)/threading/thread-pool.cpp \
   $(devel_api_src_dir)/update/frame-callback-interface.cpp \
   $(devel_api_src_dir)/update/update-proxy.cpp
 
@@ -77,7 +78,8 @@ devel_api_core_scripting_header_files = \
 devel_api_core_threading_header_files = \
   $(devel_api_src_dir)/threading/conditional-wait.h \
   $(devel_api_src_dir)/threading/mutex.h \
-  $(devel_api_src_dir)/threading/thread.h
+  $(devel_api_src_dir)/threading/thread.h \
+  $(devel_api_src_dir)/threading/thread-pool.h
 
 devel_api_core_update_header_files = \
   $(devel_api_src_dir)/update/frame-callback-interface.h \
diff --git a/dali/devel-api/threading/thread-pool.cpp b/dali/devel-api/threading/thread-pool.cpp
new file mode 100644 (file)
index 0000000..bf55715
--- /dev/null
@@ -0,0 +1,307 @@
+/*
+ * Copyright (c) 2018 Samsung Electronics Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "thread-pool.h"
+#include <cmath>
+
+
+
+namespace Dali
+{
+namespace
+{
+template<typename T, typename... Args>
+std::unique_ptr<T> make_unique(Args&&... args)
+{
+  return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
+}
+}
+
+/**
+ * WorkerThread executes tasks submitted to the pool
+ */
+class WorkerThread
+{
+public:
+
+  /**
+   * @brief Constructor of worker thread
+   * @param index Thread index assigned to the object during pool initialisation
+   */
+  explicit WorkerThread( uint32_t index );
+
+  /**
+   * @brief Destructor of the worker thread
+   */
+  ~WorkerThread();
+
+  WorkerThread(const WorkerThread &other) = delete;
+  WorkerThread &operator=(const WorkerThread &other) = delete;
+
+  /**
+   * @brief Adds task to the task queue
+   * @param task Task to be executed by the thread
+   */
+  void AddTask( Task task );
+
+  /**
+   * @brief Wakes up thread.
+   */
+  void Notify();
+
+  /**
+   * @brief Waits for the thread to complete all the tasks currently in the queue.
+   */
+  void Wait();
+
+private:
+
+  /**
+   * @brief Internal thread loop function
+   */
+  void WaitAndExecute();
+
+  std::thread   mWorker;
+  uint32_t      mIndex;
+  TaskQueue     mTaskQueue;
+  std::mutex    mTaskQueueMutex;
+  std::condition_variable mConditionVariable;
+
+  bool mTerminating {false} ;
+};
+
+void WorkerThread::WaitAndExecute()
+{
+  while( true )
+  {
+    Task task;
+
+    {
+      std::unique_lock< std::mutex > lock{ mTaskQueueMutex };
+
+      mConditionVariable.wait( lock, [ this ]() -> bool {
+        return !mTaskQueue.empty() || mTerminating;
+      } );
+
+      if( mTerminating )
+      {
+        break;
+      }
+
+      task = mTaskQueue.front();
+    }
+
+    task( mIndex );
+
+    {
+      std::lock_guard< std::mutex > lock{ mTaskQueueMutex };
+
+      mTaskQueue.pop();
+
+      mConditionVariable.notify_one();
+    }
+  }
+}
+
+WorkerThread::WorkerThread(uint32_t index) : mIndex( index )
+{
+  // Have to pass "this" as an argument because WaitAndExecute is a member function.
+  mWorker = std::thread{ &WorkerThread::WaitAndExecute, this };
+}
+
+WorkerThread::~WorkerThread()
+{
+  if( mWorker.joinable() )
+  {
+    Notify();
+    Wait();
+
+    {
+      std::lock_guard< std::mutex > lock{ mTaskQueueMutex };
+      mTerminating = true;
+      mConditionVariable.notify_one();
+    }
+
+    mWorker.join();
+  }
+}
+
+void WorkerThread::AddTask( Task task )
+{
+  std::lock_guard< std::mutex > lock{ mTaskQueueMutex };
+  mTaskQueue.push( std::move( task ) );
+  mConditionVariable.notify_one();
+}
+
+void WorkerThread::Notify()
+{
+  std::lock_guard< std::mutex > lock{ mTaskQueueMutex };
+  mConditionVariable.notify_one();
+}
+
+void WorkerThread::Wait()
+{
+  std::unique_lock< std::mutex > lock{ mTaskQueueMutex };
+  mConditionVariable.wait( lock, [ this ]() -> bool {
+    return mTaskQueue.empty();
+  } );
+}
+
+// ThreadPool -----------------------------------------------------------------------------------------------
+
+struct ThreadPool::Impl
+{
+  std::vector<std::unique_ptr<WorkerThread>> mWorkers;
+  uint32_t                                   mWorkerIndex{ 0u };
+};
+
+ThreadPool::ThreadPool()
+{
+  mImpl = make_unique<Impl>();
+}
+
+ThreadPool::~ThreadPool() = default;
+
+bool ThreadPool::Initialize( uint32_t threadCount )
+{
+  /**
+   * Get the system's supported thread count.
+   */
+  auto thread_count = threadCount + 1;
+  if( !threadCount )
+  {
+    thread_count = std::thread::hardware_concurrency();
+    if( !thread_count )
+    {
+      return false;
+    }
+  }
+
+  /**
+   * Spawn the worker threads.
+   */
+  for( auto i = 0u; i < thread_count - 1; i++ )
+  {
+    /**
+    * The workers will execute an infinite loop function
+    * and will wait for a job to enter the job queue. Once a job is in the the queue
+    * the threads will wake up to acquire and execute it.
+    */
+    mImpl->mWorkers.push_back( make_unique< WorkerThread >( i ) );
+  }
+
+  return true;
+}
+
+
+void ThreadPool::Wait()
+{
+  for( auto& worker : mImpl->mWorkers )
+  {
+    worker->Wait();
+  }
+}
+
+SharedFuture ThreadPool::SubmitTask( uint32_t workerIndex, const Task& task )
+{
+  auto future = std::shared_ptr< Future< void > >( new Future< void > );
+  mImpl->mWorkers[workerIndex]->AddTask( [task, future]( uint32_t index )
+                                  {
+                                    task( index );
+
+                                    future->mPromise.set_value();
+                                  });
+
+  return future;
+}
+
+SharedFuture ThreadPool::SubmitTasks( const std::vector< Task >& tasks )
+{
+  auto future = std::shared_ptr< Future< void > >( new Future< void > );
+
+  mImpl->mWorkers[ mImpl->mWorkerIndex++ % static_cast< uint32_t >( mImpl->mWorkers.size() )]->AddTask(
+    [ future, tasks ]( uint32_t index ) {
+      for( auto& task : tasks )
+      {
+        task( index );
+      }
+
+      future->mPromise.set_value();
+
+    } );
+
+  return future;
+}
+
+UniqueFutureGroup ThreadPool::SubmitTasks( const std::vector< Task >& tasks, uint32_t threadMask )
+{
+  auto retval = make_unique<FutureGroup<void>>();
+
+  /**
+   * Use square root of number of sumbitted tasks to estimate optimal number of threads
+   * used to execute jobs
+   */
+  auto threads = uint32_t(std::log2(float(tasks.size())));
+
+  if( threadMask != 0 )
+  {
+    threads = threadMask;
+  }
+
+  if( threads > mImpl->mWorkers.size() )
+  {
+    threads = uint32_t(mImpl->mWorkers.size());
+  }
+  else if( !threads )
+  {
+    threads = 1;
+  }
+
+  auto payloadPerThread = uint32_t(tasks.size() / threads);
+  auto remaining = uint32_t(tasks.size() % threads);
+
+  uint32_t taskIndex = 0;
+  uint32_t taskSize = uint32_t(remaining + payloadPerThread); // add 'remaining' tasks to the very first job list
+
+  for( auto wt = 0u; wt < threads; ++wt )
+  {
+    auto future = std::shared_ptr< Future< void > >( new Future< void > );
+    retval->mFutures.emplace_back( future );
+    mImpl->mWorkers[ mImpl->mWorkerIndex++ % static_cast< uint32_t >( mImpl->mWorkers.size() )]->AddTask(
+      [ future, tasks, taskIndex, taskSize ]( uint32_t index ) {
+        auto begin = tasks.begin() + int(taskIndex);
+        auto end = begin + int(taskSize);
+        for( auto it = begin; it < end; ++it )
+        {
+          (*it)( index );
+        }
+        future->mPromise.set_value();
+      } );
+
+    taskIndex += taskSize;
+    taskSize = payloadPerThread;
+  }
+
+  return retval;
+}
+
+size_t ThreadPool::GetWorkerCount() const
+{
+  return mImpl->mWorkers.size();
+}
+
+} //namespace Dali
diff --git a/dali/devel-api/threading/thread-pool.h b/dali/devel-api/threading/thread-pool.h
new file mode 100644 (file)
index 0000000..b90fb28
--- /dev/null
@@ -0,0 +1,213 @@
+#ifndef DALI_THREAD_POOL_H
+#define DALI_THREAD_POOL_H
+
+/*
+ * Copyright (c) 2019 Samsung Electronics Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// INTERNAL INCLUDES
+#include <dali/public-api/common/dali-common.h>
+
+// EXTERNAL INCLUDES
+#include <thread>
+#include <functional>
+#include <mutex>
+#include <queue>
+#include <condition_variable>
+#include <future>
+#include <algorithm>
+#include <iostream>
+#include <memory>
+
+namespace Dali
+{
+using Task = std::function<void(uint32_t)>;
+
+using TaskQueue = std::queue<Task>;
+
+/**
+ * Future contains the result of submitted task. When queried
+ * it applies internal synchronization mechanism to make sure
+ * the value is available.
+ */
+template<typename T>
+class DALI_INTERNAL Future final
+{
+  friend class ThreadPool;
+
+public:
+
+  /**
+   * @brief Constructor of Future
+   */
+  Future()
+  {
+    mFuture = mPromise.get_future();
+  }
+
+  /**
+   * @brief Destructor of Future
+   */
+  ~Future()
+  {
+    Wait();
+  }
+
+  /**
+   * @brief Returns value of future, blocks if needed.
+   * @return Value stored by the future
+   */
+  T Get() const
+  {
+    return mFuture.get();
+  }
+
+  /**
+   * @brief Waits until the value of future is ready. This function
+   * is a fencing mechanism.
+   */
+  void Wait() const
+  {
+    if( IsValid() )
+    {
+      mFuture.wait();
+    }
+  }
+
+  /**
+   * @brief Tests whether the future is valid
+   * @return True if valid, False otherwise
+   */
+  bool IsValid() const
+  {
+    return mFuture.valid();
+  }
+
+  /**
+   * @brief Resets the future bringing it to the initial state.
+   * It's required in order to reuse the same Future object.
+   */
+  void Reset()
+  {
+    mPromise = std::promise<T>();
+    mFuture  = mPromise.get_future();
+  }
+
+private:
+
+  std::promise<T> mPromise{};
+  std::future<T>  mFuture{};
+};
+
+using SharedFuture = std::shared_ptr<Future<void>>;
+
+/**
+ * FutureGroup binds many Future objects and applies synchronization.
+ */
+template<typename T>
+class FutureGroup final
+{
+  friend class ThreadPool;
+
+public:
+
+  /**
+   * Waits for all the Futures to complete.
+   */
+  void Wait()
+  {
+    for (auto &future : mFutures)
+    {
+      future->Wait();
+    }
+  }
+
+private:
+
+  std::vector<std::shared_ptr<Future<T> > > mFutures;
+};
+
+using UniqueFutureGroup = std::unique_ptr<FutureGroup<void>>;
+
+
+
+/**
+ * ThreadPool creates and manages worker threads and tasks submitted for execution.
+ */
+class DALI_CORE_API ThreadPool final
+{
+public:
+
+  /**
+   * @brief Constructor of thread pool.
+   */
+  ThreadPool();
+
+  /**
+   * @brief Destructor of thread pool.
+   */
+  ~ThreadPool();
+
+  /**
+   * @brief Intializes thread pool
+   * @param threadCount Number of worker threads to use. If 0 then thread count equals hardware thread count.
+   * @return True if success
+   */
+  bool Initialize( uint32_t threadCount = 0u );
+
+  /**
+   * @brief Waits until all threads finish execution and go back to the idle state.
+   */
+  void Wait();
+
+  /**
+   * @brief Submits a single task to specified ( by the index ) worker thread.
+   * @param workerIndex Index of thread to be used
+   * @param task Task submitted for execution
+   * @return Shared pointer to the Future object
+   */
+  SharedFuture SubmitTask(uint32_t workerIndex, const Task &task);
+
+  /**
+   * @brief Submits vector of tasks to the pool
+   * @param tasks Vector containing tasks to be executed
+   * @return Shared pointer to the Future object
+   */
+  SharedFuture SubmitTasks(const std::vector<Task>& tasks);
+
+  /**
+   * @brief Submits tasks to threads specified by thread mask.
+   * @param tasks Vector of tasks
+   * @param threadMask Mask of threads to be used or 0 to use all available threads
+   * @return Unique pointer to the FutureGroup object
+   */
+  UniqueFutureGroup SubmitTasks(const std::vector<Task>& tasks, uint32_t threadMask);
+
+  /**
+   * @brief Returns number of worker threads
+   * @return Number of worker threads
+   */
+  size_t GetWorkerCount() const;
+
+private:
+
+  struct Impl;
+  std::unique_ptr<Impl> mImpl;
+};
+
+} //namespace Dali
+
+#endif // DALI_THREAD_POOL_H