Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / src / core / lib / gprpp / fork.cc
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/gprpp/fork.h"
22
23 #include <string.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/sync.h>
27 #include <grpc/support/time.h>
28
29 #include "src/core/lib/gpr/useful.h"
30 #include "src/core/lib/gprpp/global_config.h"
31 #include "src/core/lib/gprpp/memory.h"
32
33 /*
34  * NOTE: FORKING IS NOT GENERALLY SUPPORTED, THIS IS ONLY INTENDED TO WORK
35  *       AROUND VERY SPECIFIC USE CASES.
36  */
37
38 #ifdef GRPC_ENABLE_FORK_SUPPORT
39 #define GRPC_ENABLE_FORK_SUPPORT_DEFAULT true
40 #else
41 #define GRPC_ENABLE_FORK_SUPPORT_DEFAULT false
42 #endif  // GRPC_ENABLE_FORK_SUPPORT
43
44 GPR_GLOBAL_CONFIG_DEFINE_BOOL(grpc_enable_fork_support,
45                               GRPC_ENABLE_FORK_SUPPORT_DEFAULT,
46                               "Enable fork support");
47
48 namespace grpc_core {
49 namespace internal {
50 // The exec_ctx_count has 2 modes, blocked and unblocked.
51 // When unblocked, the count is 2-indexed; exec_ctx_count=2 indicates
52 // 0 active ExecCtxs, exex_ctx_count=3 indicates 1 active ExecCtxs...
53
54 // When blocked, the exec_ctx_count is 0-indexed.  Note that ExecCtx
55 // creation can only be blocked if there is exactly 1 outstanding ExecCtx,
56 // meaning that BLOCKED and UNBLOCKED counts partition the integers
57 #define UNBLOCKED(n) ((n) + 2)
58 #define BLOCKED(n) (n)
59
60 class ExecCtxState {
61  public:
62   ExecCtxState() : fork_complete_(true) {
63     gpr_mu_init(&mu_);
64     gpr_cv_init(&cv_);
65     gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
66   }
67
68   void IncExecCtxCount() {
69     gpr_atm count = gpr_atm_no_barrier_load(&count_);
70     while (true) {
71       if (count <= BLOCKED(1)) {
72         // This only occurs if we are trying to fork.  Wait until the fork()
73         // operation completes before allowing new ExecCtxs.
74         gpr_mu_lock(&mu_);
75         if (gpr_atm_no_barrier_load(&count_) <= BLOCKED(1)) {
76           while (!fork_complete_) {
77             gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
78           }
79         }
80         gpr_mu_unlock(&mu_);
81       } else if (gpr_atm_no_barrier_cas(&count_, count, count + 1)) {
82         break;
83       }
84       count = gpr_atm_no_barrier_load(&count_);
85     }
86   }
87
88   void DecExecCtxCount() { gpr_atm_no_barrier_fetch_add(&count_, -1); }
89
90   bool BlockExecCtx() {
91     // Assumes there is an active ExecCtx when this function is called
92     if (gpr_atm_no_barrier_cas(&count_, UNBLOCKED(1), BLOCKED(1))) {
93       gpr_mu_lock(&mu_);
94       fork_complete_ = false;
95       gpr_mu_unlock(&mu_);
96       return true;
97     }
98     return false;
99   }
100
101   void AllowExecCtx() {
102     gpr_mu_lock(&mu_);
103     gpr_atm_no_barrier_store(&count_, UNBLOCKED(0));
104     fork_complete_ = true;
105     gpr_cv_broadcast(&cv_);
106     gpr_mu_unlock(&mu_);
107   }
108
109   ~ExecCtxState() {
110     gpr_mu_destroy(&mu_);
111     gpr_cv_destroy(&cv_);
112   }
113
114  private:
115   bool fork_complete_;
116   gpr_mu mu_;
117   gpr_cv cv_;
118   gpr_atm count_;
119 };
120
121 class ThreadState {
122  public:
123   ThreadState() : awaiting_threads_(false), threads_done_(false), count_(0) {
124     gpr_mu_init(&mu_);
125     gpr_cv_init(&cv_);
126   }
127
128   void IncThreadCount() {
129     gpr_mu_lock(&mu_);
130     count_++;
131     gpr_mu_unlock(&mu_);
132   }
133
134   void DecThreadCount() {
135     gpr_mu_lock(&mu_);
136     count_--;
137     if (awaiting_threads_ && count_ == 0) {
138       threads_done_ = true;
139       gpr_cv_signal(&cv_);
140     }
141     gpr_mu_unlock(&mu_);
142   }
143   void AwaitThreads() {
144     gpr_mu_lock(&mu_);
145     awaiting_threads_ = true;
146     threads_done_ = (count_ == 0);
147     while (!threads_done_) {
148       gpr_cv_wait(&cv_, &mu_, gpr_inf_future(GPR_CLOCK_REALTIME));
149     }
150     awaiting_threads_ = true;
151     gpr_mu_unlock(&mu_);
152   }
153
154   ~ThreadState() {
155     gpr_mu_destroy(&mu_);
156     gpr_cv_destroy(&cv_);
157   }
158
159  private:
160   bool awaiting_threads_;
161   bool threads_done_;
162   gpr_mu mu_;
163   gpr_cv cv_;
164   int count_;
165 };
166
167 }  // namespace internal
168
169 void Fork::GlobalInit() {
170   if (!override_enabled_) {
171     support_enabled_.store(GPR_GLOBAL_CONFIG_GET(grpc_enable_fork_support),
172                            std::memory_order_relaxed);
173   }
174   if (support_enabled_.load(std::memory_order_relaxed)) {
175     exec_ctx_state_ = new internal::ExecCtxState();
176     thread_state_ = new internal::ThreadState();
177   }
178 }
179
180 void Fork::GlobalShutdown() {
181   if (support_enabled_.load(std::memory_order_relaxed)) {
182     delete exec_ctx_state_;
183     delete thread_state_;
184   }
185 }
186
187 bool Fork::Enabled() {
188   return support_enabled_.load(std::memory_order_relaxed);
189 }
190
191 // Testing Only
192 void Fork::Enable(bool enable) {
193   override_enabled_ = true;
194   support_enabled_.store(enable, std::memory_order_relaxed);
195 }
196
197 void Fork::DoIncExecCtxCount() { exec_ctx_state_->IncExecCtxCount(); }
198
199 void Fork::DoDecExecCtxCount() { exec_ctx_state_->DecExecCtxCount(); }
200
201 void Fork::SetResetChildPollingEngineFunc(
202     Fork::child_postfork_func reset_child_polling_engine) {
203   reset_child_polling_engine_ = reset_child_polling_engine;
204 }
205 Fork::child_postfork_func Fork::GetResetChildPollingEngineFunc() {
206   return reset_child_polling_engine_;
207 }
208
209 bool Fork::BlockExecCtx() {
210   if (support_enabled_.load(std::memory_order_relaxed)) {
211     return exec_ctx_state_->BlockExecCtx();
212   }
213   return false;
214 }
215
216 void Fork::AllowExecCtx() {
217   if (support_enabled_.load(std::memory_order_relaxed)) {
218     exec_ctx_state_->AllowExecCtx();
219   }
220 }
221
222 void Fork::IncThreadCount() {
223   if (support_enabled_.load(std::memory_order_relaxed)) {
224     thread_state_->IncThreadCount();
225   }
226 }
227
228 void Fork::DecThreadCount() {
229   if (support_enabled_.load(std::memory_order_relaxed)) {
230     thread_state_->DecThreadCount();
231   }
232 }
233 void Fork::AwaitThreads() {
234   if (support_enabled_.load(std::memory_order_relaxed)) {
235     thread_state_->AwaitThreads();
236   }
237 }
238
239 internal::ExecCtxState* Fork::exec_ctx_state_ = nullptr;
240 internal::ThreadState* Fork::thread_state_ = nullptr;
241 std::atomic<bool> Fork::support_enabled_(false);
242 bool Fork::override_enabled_ = false;
243 Fork::child_postfork_func Fork::reset_child_polling_engine_ = nullptr;
244 }  // namespace grpc_core