3 * Copyright 2019 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H
20 #define GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H
22 #include <grpc/support/port_platform.h>
24 #include <grpc/grpc.h>
26 #include "src/core/lib/gprpp/thd.h"
27 #include "src/core/lib/iomgr/executor/mpmcqueue.h"
31 // A base abstract base class for threadpool.
32 // Threadpool is an executor that maintains a pool of threads sitting around
33 // and waiting for closures. A threadpool also maintains a queue of pending
34 // closures, when closures appearing in the queue, the threads in pool will
35 // pull them out and execute them.
36 class ThreadPoolInterface {
38 // Waits for all pending closures to complete, then shuts down thread pool.
39 virtual ~ThreadPoolInterface() {}
41 // Schedules a given closure for execution later.
42 // Depending on specific subclass implementation, this routine might cause
43 // current thread to be blocked (in case of unable to schedule).
44 // Closure should contain a function pointer and arguments it will take, more
45 // details for closure struct at /grpc/include/grpc/impl/codegen/grpc_types.h
46 virtual void Add(grpc_experimental_completion_queue_functor* closure) = 0;
48 // Returns the current number of pending closures
49 virtual int num_pending_closures() const = 0;
51 // Returns the capacity of pool (number of worker threads in pool)
52 virtual int pool_capacity() const = 0;
54 // Thread option accessor
55 virtual const Thread::Options& thread_options() const = 0;
57 // Returns the thread name for threads in this ThreadPool.
58 virtual const char* thread_name() const = 0;
61 // Worker thread for threadpool. Executes closures in the queue, until getting a
63 class ThreadPoolWorker {
65 ThreadPoolWorker(const char* thd_name, MPMCQueueInterface* queue,
66 Thread::Options& options, int index)
67 : queue_(queue), thd_name_(thd_name), index_(index) {
68 thd_ = Thread(thd_name,
69 [](void* th) { static_cast<ThreadPoolWorker*>(th)->Run(); },
70 this, nullptr, options);
73 ~ThreadPoolWorker() {}
75 void Start() { thd_.Start(); }
76 void Join() { thd_.Join(); }
79 // struct for tracking stats of thread
81 gpr_timespec sleep_time;
82 Stats() { sleep_time = gpr_time_0(GPR_TIMESPAN); }
85 void Run(); // Pulls closures from queue and executes them
87 MPMCQueueInterface* queue_; // Queue in thread pool to pull closures from
88 Thread thd_; // Thread wrapped in
89 Stats stats_; // Stats to be collected in run time
90 const char* thd_name_; // Name of thread
91 int index_; // Index in thread pool
94 // A fixed size thread pool implementation of abstract thread pool interface.
95 // In this implementation, the number of threads in pool is fixed, but the
96 // capacity of closure queue is unlimited.
97 class ThreadPool : public ThreadPoolInterface {
99 // Creates a thread pool with size of "num_threads", with default thread name
100 // "ThreadPoolWorker" and all thread options set to default. If the given size
101 // is 0 or less, there will be 1 worker thread created inside pool.
102 ThreadPool(int num_threads);
104 // Same as ThreadPool(int num_threads) constructor, except
105 // that it also sets "thd_name" as the name of all threads in the thread pool.
106 ThreadPool(int num_threads, const char* thd_name);
108 // Same as ThreadPool(const char *thd_name, int num_threads) constructor,
109 // except that is also set thread_options for threads.
110 // Notes for stack size:
111 // If the stack size field of the passed in Thread::Options is set to default
112 // value 0, default ThreadPool stack size will be used. The current default
113 // stack size of this implementation is 1952K for mobile platform and 64K for
115 ThreadPool(int num_threads, const char* thd_name,
116 const Thread::Options& thread_options);
118 // Waits for all pending closures to complete, then shuts down thread pool.
119 ~ThreadPool() override;
121 // Adds given closure into pending queue immediately. Since closure queue has
122 // infinite length, this routine will not block.
123 void Add(grpc_experimental_completion_queue_functor* closure) override;
125 int num_pending_closures() const override;
126 int pool_capacity() const override;
127 const Thread::Options& thread_options() const override;
128 const char* thread_name() const override;
131 int num_threads_ = 0;
132 const char* thd_name_ = nullptr;
133 Thread::Options thread_options_;
134 ThreadPoolWorker** threads_ = nullptr; // Array of worker threads
135 MPMCQueueInterface* queue_ = nullptr; // Closure queue
137 Atomic<bool> shut_down_{false}; // Destructor has been called if set to true
139 void SharedThreadPoolConstructor();
140 // For ThreadPool, default stack size for mobile platform is 1952K. for other
142 size_t DefaultStackSize();
143 // Internal Use Only for debug checking.
144 void AssertHasNotBeenShutDown();
147 } // namespace grpc_core
149 #endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_THREADPOOL_H */