3 * Copyright 2020 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 /// Event engine based on Apple's CFRunLoop API family. If the CFRunLoop engine
20 /// is enabled (see iomgr_posix_cfstream.cc), a global thread is started to
21 /// handle and trigger all the CFStream events. The CFStream streams register
22 /// themselves with the run loop with functions grpc_apple_register_read_stream
23 /// and grpc_apple_register_read_stream. Pollsets are dummy and block on a
24 /// condition variable in pollset_work().
26 #include <grpc/support/port_platform.h>
28 #include "src/core/lib/iomgr/port.h"
32 #include <CoreFoundation/CoreFoundation.h>
36 #include "absl/time/time.h"
38 #include "src/core/lib/gprpp/thd.h"
39 #include "src/core/lib/gprpp/time_util.h"
40 #include "src/core/lib/iomgr/ev_apple.h"
42 grpc_core::DebugOnlyTraceFlag grpc_apple_polling_trace(false, "apple_polling");
45 #define GRPC_POLLING_TRACE(format, ...) \
46 if (GRPC_TRACE_FLAG_ENABLED(grpc_apple_polling_trace)) { \
47 gpr_log(GPR_DEBUG, "(polling) " format, __VA_ARGS__); \
50 #define GRPC_POLLING_TRACE(...)
53 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
55 struct GlobalRunLoopContext {
56 grpc_core::CondVar init_cv;
57 grpc_core::CondVar input_source_cv;
61 // Whether an input source registration is pending. Protected by mu.
62 bool input_source_registered = false;
64 // The reference to the global run loop object. Protected by mu.
65 CFRunLoopRef run_loop;
67 // Whether the pollset has been globally shut down. Protected by mu.
68 bool is_shutdown = false;
71 struct GrpcAppleWorker {
72 // The condition varible to kick the worker. Works with the pollset's lock
73 // (GrpcApplePollset.mu).
74 grpc_core::CondVar cv;
76 // Whether the worker is kicked. Protected by the pollset's lock
77 // (GrpcApplePollset.mu).
81 struct GrpcApplePollset {
84 // Tracks the current workers in the pollset. Protected by mu.
85 std::list<GrpcAppleWorker*> workers;
87 // Whether the pollset is shut down. Protected by mu.
88 bool is_shutdown = false;
90 // Closure to call when shutdown is done. Protected by mu.
91 grpc_closure* shutdown_closure;
93 // Whether there's an outstanding kick that was not processed. Protected by
95 bool kicked_without_poller = false;
98 static GlobalRunLoopContext* gGlobalRunLoopContext = nullptr;
99 static grpc_core::Thread* gGlobalRunLoopThread = nullptr;
101 /// Register the stream with the dispatch queue. Callbacks of the stream will be
102 /// issued to the dispatch queue when a network event happens and will be
103 /// managed by Grand Central Dispatch.
104 static void grpc_apple_register_read_stream_queue(
105 CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) {
106 CFReadStreamSetDispatchQueue(read_stream, dispatch_queue);
109 /// Register the stream with the dispatch queue. Callbacks of the stream will be
110 /// issued to the dispatch queue when a network event happens and will be
111 /// managed by Grand Central Dispatch.
112 static void grpc_apple_register_write_stream_queue(
113 CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) {
114 CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue);
117 /// Register the stream with the global run loop. Callbacks of the stream will
118 /// be issued to the run loop when a network event happens and will be driven by
119 /// the global run loop thread gGlobalRunLoopThread.
120 static void grpc_apple_register_read_stream_run_loop(
121 CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) {
122 GRPC_POLLING_TRACE("Register read stream: %p", read_stream);
123 grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
124 CFReadStreamScheduleWithRunLoop(read_stream, gGlobalRunLoopContext->run_loop,
125 kCFRunLoopDefaultMode);
126 gGlobalRunLoopContext->input_source_registered = true;
127 gGlobalRunLoopContext->input_source_cv.Signal();
130 /// Register the stream with the global run loop. Callbacks of the stream will
131 /// be issued to the run loop when a network event happens, and will be driven
132 /// by the global run loop thread gGlobalRunLoopThread.
133 static void grpc_apple_register_write_stream_run_loop(
134 CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) {
135 GRPC_POLLING_TRACE("Register write stream: %p", write_stream);
136 grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
137 CFWriteStreamScheduleWithRunLoop(
138 write_stream, gGlobalRunLoopContext->run_loop, kCFRunLoopDefaultMode);
139 gGlobalRunLoopContext->input_source_registered = true;
140 gGlobalRunLoopContext->input_source_cv.Signal();
143 /// The default implementation of stream registration is to register the stream
144 /// to a dispatch queue. However, if the CFRunLoop based pollset is enabled (by
145 /// macro and environment variable, see docs in iomgr_posix_cfstream.cc), the
146 /// CFStream streams are registered with the global run loop instead (see
147 /// pollset_global_init below).
148 static void (*grpc_apple_register_read_stream_impl)(
149 CFReadStreamRef, dispatch_queue_t) = grpc_apple_register_read_stream_queue;
150 static void (*grpc_apple_register_write_stream_impl)(CFWriteStreamRef,
152 grpc_apple_register_write_stream_queue;
154 void grpc_apple_register_read_stream(CFReadStreamRef read_stream,
155 dispatch_queue_t dispatch_queue) {
156 grpc_apple_register_read_stream_impl(read_stream, dispatch_queue);
159 void grpc_apple_register_write_stream(CFWriteStreamRef write_stream,
160 dispatch_queue_t dispatch_queue) {
161 grpc_apple_register_write_stream_impl(write_stream, dispatch_queue);
164 /// Drive the run loop in a global singleton thread until the global run loop is
166 static void GlobalRunLoopFunc(void* arg) {
167 grpc_core::LockableAndReleasableMutexLock lock(&gGlobalRunLoopContext->mu);
168 gGlobalRunLoopContext->run_loop = CFRunLoopGetCurrent();
169 gGlobalRunLoopContext->init_cv.Signal();
171 while (!gGlobalRunLoopContext->is_shutdown) {
172 // CFRunLoopRun() will return immediately if no stream is registered on it.
173 // So we wait on a conditional variable until a stream is registered;
174 // otherwise we'll be running a spinning loop.
175 while (!gGlobalRunLoopContext->input_source_registered) {
176 gGlobalRunLoopContext->input_source_cv.Wait(&gGlobalRunLoopContext->mu);
178 gGlobalRunLoopContext->input_source_registered = false;
186 // pollset implementation
188 static void pollset_global_init(void) {
189 gGlobalRunLoopContext = new GlobalRunLoopContext;
191 grpc_apple_register_read_stream_impl =
192 grpc_apple_register_read_stream_run_loop;
193 grpc_apple_register_write_stream_impl =
194 grpc_apple_register_write_stream_run_loop;
196 grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
197 gGlobalRunLoopThread =
198 new grpc_core::Thread("apple_ev", GlobalRunLoopFunc, nullptr);
199 gGlobalRunLoopThread->Start();
200 while (gGlobalRunLoopContext->run_loop == NULL)
201 gGlobalRunLoopContext->init_cv.Wait(&gGlobalRunLoopContext->mu);
204 static void pollset_global_shutdown(void) {
206 grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu);
207 gGlobalRunLoopContext->is_shutdown = true;
208 CFRunLoopStop(gGlobalRunLoopContext->run_loop);
210 gGlobalRunLoopThread->Join();
211 delete gGlobalRunLoopThread;
212 delete gGlobalRunLoopContext;
215 /// The caller must acquire the lock GrpcApplePollset.mu before calling this
216 /// function. The lock may be temporarily released when waiting on the condition
217 /// variable but will be re-acquired before the function returns.
219 /// The Apple pollset simply waits on a condition variable until it is kicked.
220 /// The network events are handled in the global run loop thread. Processing of
221 /// these events will eventually trigger the kick.
222 static grpc_error* pollset_work(grpc_pollset* pollset,
223 grpc_pollset_worker** worker,
224 grpc_millis deadline) {
225 GRPC_POLLING_TRACE("pollset work: %p, worker: %p, deadline: %" PRIu64,
226 pollset, worker, deadline);
227 GrpcApplePollset* apple_pollset =
228 reinterpret_cast<GrpcApplePollset*>(pollset);
229 GrpcAppleWorker actual_worker;
231 *worker = reinterpret_cast<grpc_pollset_worker*>(&actual_worker);
234 if (apple_pollset->kicked_without_poller) {
235 // Process the outstanding kick and reset the flag. Do not block.
236 apple_pollset->kicked_without_poller = false;
238 // Block until kicked, timed out, or the pollset shuts down.
239 apple_pollset->workers.push_front(&actual_worker);
240 auto it = apple_pollset->workers.begin();
242 while (!actual_worker.kicked && !apple_pollset->is_shutdown) {
243 if (actual_worker.cv.WaitWithDeadline(
244 &apple_pollset->mu, grpc_core::ToAbslTime(grpc_millis_to_timespec(
245 deadline, GPR_CLOCK_REALTIME)))) {
251 apple_pollset->workers.erase(it);
253 // If the pollset is shut down asynchronously and this is the last pending
254 // worker, the shutdown process is complete at this moment and the shutdown
255 // callback will be called.
256 if (apple_pollset->is_shutdown && apple_pollset->workers.empty()) {
257 grpc_core::ExecCtx::Run(DEBUG_LOCATION, apple_pollset->shutdown_closure,
262 return GRPC_ERROR_NONE;
265 /// Kick a specific worker. The caller must acquire the lock GrpcApplePollset.mu
266 /// before calling this function.
267 static void kick_worker(GrpcAppleWorker* worker) {
268 worker->kicked = true;
272 /// The caller must acquire the lock GrpcApplePollset.mu before calling this
273 /// function. The kick action simply signals the condition variable of the
275 static grpc_error* pollset_kick(grpc_pollset* pollset,
276 grpc_pollset_worker* specific_worker) {
277 GrpcApplePollset* apple_pollset =
278 reinterpret_cast<GrpcApplePollset*>(pollset);
280 GRPC_POLLING_TRACE("pollset kick: %p, worker:%p", pollset, specific_worker);
282 if (specific_worker == nullptr) {
283 if (apple_pollset->workers.empty()) {
284 apple_pollset->kicked_without_poller = true;
286 GrpcAppleWorker* actual_worker = apple_pollset->workers.front();
287 kick_worker(actual_worker);
289 } else if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
290 for (auto& actual_worker : apple_pollset->workers) {
291 kick_worker(actual_worker);
294 GrpcAppleWorker* actual_worker =
295 reinterpret_cast<GrpcAppleWorker*>(specific_worker);
296 kick_worker(actual_worker);
299 return GRPC_ERROR_NONE;
302 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
303 GRPC_POLLING_TRACE("pollset init: %p", pollset);
304 GrpcApplePollset* apple_pollset = new (pollset) GrpcApplePollset();
305 *mu = grpc_core::GetUnderlyingGprMu(&apple_pollset->mu);
308 /// The caller must acquire the lock GrpcApplePollset.mu before calling this
310 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
311 GRPC_POLLING_TRACE("pollset shutdown: %p", pollset);
313 GrpcApplePollset* apple_pollset =
314 reinterpret_cast<GrpcApplePollset*>(pollset);
315 apple_pollset->is_shutdown = true;
316 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
318 // If there is any worker blocked, shutdown will be done asynchronously.
319 if (apple_pollset->workers.empty()) {
320 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
322 apple_pollset->shutdown_closure = closure;
326 static void pollset_destroy(grpc_pollset* pollset) {
327 GRPC_POLLING_TRACE("pollset destroy: %p", pollset);
328 GrpcApplePollset* apple_pollset =
329 reinterpret_cast<GrpcApplePollset*>(pollset);
330 apple_pollset->~GrpcApplePollset();
333 size_t pollset_size(void) { return sizeof(GrpcApplePollset); }
335 grpc_pollset_vtable grpc_apple_pollset_vtable = {
336 pollset_global_init, pollset_global_shutdown,
337 pollset_init, pollset_shutdown,
338 pollset_destroy, pollset_work,
339 pollset_kick, pollset_size};
341 // pollset_set implementation
343 grpc_pollset_set* pollset_set_create(void) { return nullptr; }
344 void pollset_set_destroy(grpc_pollset_set* pollset_set) {}
345 void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
346 grpc_pollset* pollset) {}
347 void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
348 grpc_pollset* pollset) {}
349 void pollset_set_add_pollset_set(grpc_pollset_set* bag,
350 grpc_pollset_set* item) {}
351 void pollset_set_del_pollset_set(grpc_pollset_set* bag,
352 grpc_pollset_set* item) {}
354 grpc_pollset_set_vtable grpc_apple_pollset_set_vtable = {
355 pollset_set_create, pollset_set_destroy,
356 pollset_set_add_pollset, pollset_set_del_pollset,
357 pollset_set_add_pollset_set, pollset_set_del_pollset_set};