- add sources.
[platform/framework/web/crosswalk.git] / src / native_client_sdk / src / libraries / sdk_util / thread_pool.cc
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "sdk_util/thread_pool.h"
6
7 #include <pthread.h>
8 #include <semaphore.h>
9 #include <stdio.h>
10 #include <stdlib.h>
11
12 #include "sdk_util/auto_lock.h"
13
14 namespace sdk_util {
15
16 // Initializes mutex, semaphores and a pool of threads.  If 0 is passed for
17 // num_threads, all work will be performed on the dispatch thread.
18 ThreadPool::ThreadPool(int num_threads)
19     : threads_(NULL), counter_(0), num_threads_(num_threads), exiting_(false),
20       user_data_(NULL), user_work_function_(NULL) {
21   if (num_threads_ > 0) {
22     int status;
23     status = sem_init(&work_sem_, 0, 0);
24     if (-1 == status) {
25       fprintf(stderr, "Failed to initialize semaphore!\n");
26       exit(-1);
27     }
28     status = sem_init(&done_sem_, 0, 0);
29     if (-1 == status) {
30       fprintf(stderr, "Failed to initialize semaphore!\n");
31       exit(-1);
32     }
33     threads_ = new pthread_t[num_threads_];
34     for (int i = 0; i < num_threads_; i++) {
35       status = pthread_create(&threads_[i], NULL, WorkerThreadEntry, this);
36       if (0 != status) {
37         fprintf(stderr, "Failed to create thread!\n");
38         exit(-1);
39       }
40     }
41   }
42 }
43
44 // Post exit request, wait for all threads to join, and cleanup.
45 ThreadPool::~ThreadPool() {
46   if (num_threads_ > 0) {
47     PostExitAndJoinAll();
48     delete[] threads_;
49     sem_destroy(&done_sem_);
50     sem_destroy(&work_sem_);
51   }
52 }
53
54 // Setup work parameters.  This function is called from the dispatch thread,
55 // when all worker threads are sleeping.
56 void ThreadPool::Setup(int counter, WorkFunction work, void *data) {
57   counter_ = counter;
58   user_work_function_ = work;
59   user_data_ = data;
60 }
61
62 // Return decremented task counter.  This function
63 // can be called from multiple threads at any given time.
64 int ThreadPool::DecCounter() {
65   return AtomicAddFetch(&counter_, -1);
66 }
67
68 // Set exit flag, post and join all the threads in the pool.  This function is
69 // called only from the dispatch thread, and only when all worker threads are
70 // sleeping.
71 void ThreadPool::PostExitAndJoinAll() {
72   exiting_ = true;
73   // Wake up all the sleeping worker threads.
74   for (int i = 0; i < num_threads_; ++i)
75     sem_post(&work_sem_);
76   void* retval;
77   for (int i = 0; i < num_threads_; ++i)
78     pthread_join(threads_[i], &retval);
79 }
80
81 // Main work loop - one for each worker thread.
82 void ThreadPool::WorkLoop() {
83   while (true) {
84     // Wait for work. If no work is availble, this thread will sleep here.
85     sem_wait(&work_sem_);
86     if (exiting_) break;
87     while (true) {
88       // Grab a task index to work on from the counter.
89       int task_index = DecCounter();
90       if (task_index < 0)
91         break;
92       user_work_function_(task_index, user_data_);
93     }
94     // Post to dispatch thread work is done.
95     sem_post(&done_sem_);
96   }
97 }
98
99 // pthread entry point for a worker thread.
100 void* ThreadPool::WorkerThreadEntry(void* thiz) {
101   static_cast<ThreadPool*>(thiz)->WorkLoop();
102   return NULL;
103 }
104
105 // DispatchMany() will dispatch a set of tasks across worker threads.
106 // Note: This function will block until all work has completed.
107 void ThreadPool::DispatchMany(int num_tasks, WorkFunction work, void* data) {
108   // On entry, all worker threads are sleeping.
109   Setup(num_tasks, work, data);
110
111   // Wake up the worker threads & have them process tasks.
112   for (int i = 0; i < num_threads_; i++)
113     sem_post(&work_sem_);
114
115   // Worker threads are now awake and busy.
116
117   // This dispatch thread will now sleep-wait for the worker threads to finish.
118   for (int i = 0; i < num_threads_; i++)
119     sem_wait(&done_sem_);
120   // On exit, all tasks are done and all worker threads are sleeping again.
121 }
122
123 //  DispatchHere will dispatch all tasks on this thread.
124 void ThreadPool::DispatchHere(int num_tasks, WorkFunction work, void* data) {
125   for (int i = 0; i < num_tasks; i++)
126     work(i, data);
127 }
128
129 // Dispatch() will invoke the user supplied work function across
130 // one or more threads for each task.
131 // Note: This function will block until all work has completed.
132 void ThreadPool::Dispatch(int num_tasks, WorkFunction work, void* data) {
133   if (num_threads_ > 0)
134     DispatchMany(num_tasks, work, data);
135   else
136     DispatchHere(num_tasks, work, data);
137 }
138
139 }  // namespace sdk_util
140