Imported Upstream version 1.33.1
[platform/upstream/grpc.git] / src / core / lib / iomgr / executor / threadpool.h
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H
20 #define GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H
21
22 #include <grpc/support/port_platform.h>
23
24 #include <grpc/grpc.h>
25
26 #include "src/core/lib/gprpp/thd.h"
27 #include "src/core/lib/iomgr/executor/mpmcqueue.h"
28
29 namespace grpc_core {
30
31 // A base abstract base class for threadpool.
32 // Threadpool is an executor that maintains a pool of threads sitting around
33 // and waiting for closures. A threadpool also maintains a queue of pending
34 // closures, when closures appearing in the queue, the threads in pool will
35 // pull them out and execute them.
36 class ThreadPoolInterface {
37  public:
38   // Waits for all pending closures to complete, then shuts down thread pool.
39   virtual ~ThreadPoolInterface() {}
40
41   // Schedules a given closure for execution later.
42   // Depending on specific subclass implementation, this routine might cause
43   // current thread to be blocked (in case of unable to schedule).
44   // Closure should contain a function pointer and arguments it will take, more
45   // details for closure struct at /grpc/include/grpc/impl/codegen/grpc_types.h
46   virtual void Add(grpc_experimental_completion_queue_functor* closure) = 0;
47
48   // Returns the current number of pending closures
49   virtual int num_pending_closures() const = 0;
50
51   // Returns the capacity of pool (number of worker threads in pool)
52   virtual int pool_capacity() const = 0;
53
54   // Thread option accessor
55   virtual const Thread::Options& thread_options() const = 0;
56
57   // Returns the thread name for threads in this ThreadPool.
58   virtual const char* thread_name() const = 0;
59 };
60
61 // Worker thread for threadpool. Executes closures in the queue, until getting a
62 // NULL closure.
63 class ThreadPoolWorker {
64  public:
65   ThreadPoolWorker(const char* thd_name, MPMCQueueInterface* queue,
66                    Thread::Options& options, int index)
67       : queue_(queue), thd_name_(thd_name), index_(index) {
68     thd_ = Thread(thd_name,
69                   [](void* th) { static_cast<ThreadPoolWorker*>(th)->Run(); },
70                   this, nullptr, options);
71   }
72
73   ~ThreadPoolWorker() {}
74
75   void Start() { thd_.Start(); }
76   void Join() { thd_.Join(); }
77
78  private:
79   // struct for tracking stats of thread
80   struct Stats {
81     gpr_timespec sleep_time;
82     Stats() { sleep_time = gpr_time_0(GPR_TIMESPAN); }
83   };
84
85   void Run();  // Pulls closures from queue and executes them
86
87   MPMCQueueInterface* queue_;  // Queue in thread pool to pull closures from
88   Thread thd_;                 // Thread wrapped in
89   Stats stats_;                // Stats to be collected in run time
90   const char* thd_name_;       // Name of thread
91   int index_;                  // Index in thread pool
92 };
93
94 // A fixed size thread pool implementation of abstract thread pool interface.
95 // In this implementation, the number of threads in pool is fixed, but the
96 // capacity of closure queue is unlimited.
97 class ThreadPool : public ThreadPoolInterface {
98  public:
99   // Creates a thread pool with size of "num_threads", with default thread name
100   // "ThreadPoolWorker" and all thread options set to default. If the given size
101   // is 0 or less, there will be 1 worker thread created inside pool.
102   ThreadPool(int num_threads);
103
104   // Same as ThreadPool(int num_threads) constructor, except
105   // that it also sets "thd_name" as the name of all threads in the thread pool.
106   ThreadPool(int num_threads, const char* thd_name);
107
108   // Same as ThreadPool(const char *thd_name, int num_threads) constructor,
109   // except that is also set thread_options for threads.
110   // Notes for stack size:
111   // If the stack size field of the passed in Thread::Options is set to default
112   // value 0, default ThreadPool stack size will be used. The current default
113   // stack size of this implementation is 1952K for mobile platform and 64K for
114   // all others.
115   ThreadPool(int num_threads, const char* thd_name,
116              const Thread::Options& thread_options);
117
118   // Waits for all pending closures to complete, then shuts down thread pool.
119   ~ThreadPool() override;
120
121   // Adds given closure into pending queue immediately. Since closure queue has
122   // infinite length, this routine will not block.
123   void Add(grpc_experimental_completion_queue_functor* closure) override;
124
125   int num_pending_closures() const override;
126   int pool_capacity() const override;
127   const Thread::Options& thread_options() const override;
128   const char* thread_name() const override;
129
130  private:
131   int num_threads_ = 0;
132   const char* thd_name_ = nullptr;
133   Thread::Options thread_options_;
134   ThreadPoolWorker** threads_ = nullptr;  // Array of worker threads
135   MPMCQueueInterface* queue_ = nullptr;   // Closure queue
136
137   Atomic<bool> shut_down_{false};  // Destructor has been called if set to true
138
139   void SharedThreadPoolConstructor();
140   // For ThreadPool, default stack size for mobile platform is 1952K. for other
141   // platforms is 64K.
142   size_t DefaultStackSize();
143   // Internal Use Only for debug checking.
144   void AssertHasNotBeenShutDown();
145 };
146
147 }  // namespace grpc_core
148
149 #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H */