3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/lib/gprpp/fork.h"
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/sync.h>
27 #include <grpc/support/time.h>
29 #include "src/core/lib/gpr/useful.h"
30 #include "src/core/lib/gprpp/global_config.h"
31 #include "src/core/lib/gprpp/memory.h"
34 * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
35 * AROUND VERY SPECIFIC USE CASES.
38 #ifdef GRPC_ENABLE_FORK_SUPPORT
39 #define GRPC_ENABLE_FORK_SUPPORT_DEFAULT true
41 #define GRPC_ENABLE_FORK_SUPPORT_DEFAULT false
42 #endif // GRPC_ENABLE_FORK_SUPPORT
44 GPR_GLOBAL_CONFIG_DEFINE_BOOL(grpc_enable_fork_support,
45 GRPC_ENABLE_FORK_SUPPORT_DEFAULT,
46 "Enable folk support");
50 // The exec_ctx_count has 2 modes, blocked and unblocked.
51 // When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates
52 // 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs...
54 // When blocked, the exec_ctx_count is 0-indexed. Note that ExecCtx
55 // creation can only be blocked if there is exactly 1 outstanding ExecCtx,
56 // meaning that BLOCKED and UNBLOCKED counts partition the integers
57 #define UNBLOCKED(n) (n + 2)
58 #define BLOCKED(n) (n)
62 ExecCtxState() : fork_complete_(true) {
65 gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
68 void IncExecCtxCount() {
69 gpr_atm count = gpr_atm_no_barrier_load(&count_);
71 if (count <= BLOCKED(1)) {
72 // This only occurs if we are trying to fork. Wait until the fork()
73 // operation completes before allowing new ExecCtxs.
75 if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) {
76 while (!fork_complete_) {
77 gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
81 } else if (gpr_atm_no_barrier_cas(&count_, count, count + 1)) {
84 count = gpr_atm_no_barrier_load(&count_);
88 void DecExecCtxCount() { gpr_atm_no_barrier_fetch_add(&count_, -1); }
91 // Assumes there is an active ExecCtx when this function is called
92 if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1), BLOCKED(1))) {
94 fork_complete_ = false;
101 void AllowExecCtx() {
103 gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
104 fork_complete_ = true;
105 gpr_cv_broadcast(&cv_);
110 gpr_mu_destroy(&mu_);
111 gpr_cv_destroy(&cv_);
123 ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) {
128 void IncThreadCount() {
134 void DecThreadCount() {
137 if (awaiting_threads_ && count_ == 0) {
138 threads_done_ = true;
143 void AwaitThreads() {
145 awaiting_threads_ = true;
146 threads_done_ = (count_ == 0);
147 while (!threads_done_) {
148 gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
150 awaiting_threads_ = true;
155 gpr_mu_destroy(&mu_);
156 gpr_cv_destroy(&cv_);
160 bool awaiting_threads_;
167 } // namespace internal
169 void Fork::GlobalInit() {
170 if (!override_enabled_) {
171 support_enabled_.Store(GPR_GLOBAL_CONFIG_GET(grpc_enable_fork_support),
172 MemoryOrder::RELAXED);
174 if (support_enabled_.Load(MemoryOrder::RELAXED)) {
175 exec_ctx_state_ = new internal::ExecCtxState();
176 thread_state_ = new internal::ThreadState();
180 void Fork::GlobalShutdown() {
181 if (support_enabled_.Load(MemoryOrder::RELAXED)) {
182 delete exec_ctx_state_;
183 delete thread_state_;
187 bool Fork::Enabled() { return support_enabled_.Load(MemoryOrder::RELAXED); }
190 void Fork::Enable(bool enable) {
191 override_enabled_ = true;
192 support_enabled_.Store(enable, MemoryOrder::RELAXED);
195 void Fork::DoIncExecCtxCount() { exec_ctx_state_->IncExecCtxCount(); }
197 void Fork::DoDecExecCtxCount() { exec_ctx_state_->DecExecCtxCount(); }
199 void Fork::SetResetChildPollingEngineFunc(
200 Fork::child_postfork_func reset_child_polling_engine) {
201 reset_child_polling_engine_ = reset_child_polling_engine;
203 Fork::child_postfork_func Fork::GetResetChildPollingEngineFunc() {
204 return reset_child_polling_engine_;
207 bool Fork::BlockExecCtx() {
208 if (support_enabled_.Load(MemoryOrder::RELAXED)) {
209 return exec_ctx_state_->BlockExecCtx();
214 void Fork::AllowExecCtx() {
215 if (support_enabled_.Load(MemoryOrder::RELAXED)) {
216 exec_ctx_state_->AllowExecCtx();
220 void Fork::IncThreadCount() {
221 if (support_enabled_.Load(MemoryOrder::RELAXED)) {
222 thread_state_->IncThreadCount();
226 void Fork::DecThreadCount() {
227 if (support_enabled_.Load(MemoryOrder::RELAXED)) {
228 thread_state_->DecThreadCount();
231 void Fork::AwaitThreads() {
232 if (support_enabled_.Load(MemoryOrder::RELAXED)) {
233 thread_state_->AwaitThreads();
237 internal::ExecCtxState* Fork::exec_ctx_state_ = nullptr;
238 internal::ThreadState* Fork::thread_state_ = nullptr;
239 Atomic<bool> Fork::support_enabled_(false);
240 bool Fork::override_enabled_ = false;
241 Fork::child_postfork_func Fork::reset_child_polling_engine_ = nullptr;
242 } // namespace grpc_core