3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
21 #include "src/core/lib/iomgr/port.h"
23 #ifdef GRPC_POSIX_SOCKET_EV_POLL
30 #include <sys/socket.h>
35 #include "absl/strings/str_cat.h"
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/log.h>
40 #include "src/core/lib/debug/stats.h"
41 #include "src/core/lib/gpr/murmur_hash.h"
42 #include "src/core/lib/gpr/tls.h"
43 #include "src/core/lib/gpr/useful.h"
44 #include "src/core/lib/gprpp/thd.h"
45 #include "src/core/lib/iomgr/block_annotate.h"
46 #include "src/core/lib/iomgr/ev_poll_posix.h"
47 #include "src/core/lib/iomgr/iomgr_internal.h"
48 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
49 #include "src/core/lib/profiling/timers.h"
51 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
53 /*******************************************************************************
56 typedef struct grpc_fd_watcher {
57 struct grpc_fd_watcher* next;
58 struct grpc_fd_watcher* prev;
59 grpc_pollset* pollset;
60 grpc_pollset_worker* worker;
64 typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
66 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
67 struct grpc_fork_fd_list {
68 /* Only one of fd or cached_wakeup_fd will be set. The unused field will be
71 grpc_cached_wakeup_fd* cached_wakeup_fd;
73 grpc_fork_fd_list* next;
74 grpc_fork_fd_list* prev;
80 bit0: 1=active/0=orphaned
82 meaning that mostly we ref by two to avoid altering the orphaned bit,
83 and just unref by 1 when we're ready to flag the object as orphaned */
91 grpc_error_handle shutdown_error;
95 The following watcher related fields are protected by watcher_mu.
97 An fd_watcher is an ephemeral object created when an fd wants to
98 begin polling, and destroyed after the poll.
100 It denotes the fd's interest in whether to read poll or write poll
101 or both or neither on this fd.
103 If a watcher is asked to poll for reads or writes, the read_watcher
104 or write_watcher fields are set respectively. A watcher may be asked
105 to poll for both, in which case both fields will be set.
107 read_watcher and write_watcher may be NULL if no watcher has been
108 asked to poll for reads or writes.
110 If an fd_watcher is not asked to poll for reads or writes, it's added
111 to a linked list of inactive watchers, rooted at inactive_watcher_root.
112 If at a later time there becomes need of a poller to poll, one of
113 the inactive pollers may be kicked out of their poll loops to take
114 that responsibility. */
115 grpc_fd_watcher inactive_watcher_root;
116 grpc_fd_watcher* read_watcher;
117 grpc_fd_watcher* write_watcher;
119 grpc_closure* read_closure;
120 grpc_closure* write_closure;
122 grpc_closure* on_done_closure;
124 grpc_iomgr_object iomgr_object;
126 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
127 grpc_fork_fd_list* fork_fd_list;
130 /* True when GRPC_ENABLE_FORK_SUPPORT=1. */
131 static bool track_fds_for_fork = false;
133 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
134 static grpc_fork_fd_list* fork_fd_list_head = nullptr;
135 static gpr_mu fork_fd_list_mu;
137 /* Begin polling on an fd.
138 Registers that the given pollset is interested in this fd - so that if read
139 or writability interest changes, the pollset can be kicked to pick up that
142 (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
143 i.e. a combination of read_mask and write_mask determined by the fd's current
144 interest in said events.
145 Polling strategies that do not need to alter their behavior depending on the
146 fd's current interest (such as epoll) do not need to call this function.
147 MUST NOT be called with a pollset lock taken */
148 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
149 grpc_pollset_worker* worker, uint32_t read_mask,
150 uint32_t write_mask, grpc_fd_watcher* watcher);
151 /* Complete polling previously started with fd_begin_poll
152 MUST NOT be called with a pollset lock taken
153 if got_read or got_write are 1, also does the become_{readable,writable} as
155 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write);
157 /* Return 1 if this fd is orphaned, 0 otherwise */
158 static bool fd_is_orphaned(grpc_fd* fd);
161 static void fd_ref(grpc_fd* fd, const char* reason, const char* file, int line);
162 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
164 #define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
165 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
167 static void fd_ref(grpc_fd* fd);
168 static void fd_unref(grpc_fd* fd);
169 #define GRPC_FD_REF(fd, reason) fd_ref(fd)
170 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
173 #define CLOSURE_NOT_READY ((grpc_closure*)0)
174 #define CLOSURE_READY ((grpc_closure*)1)
176 /*******************************************************************************
177 * pollset declarations
180 typedef struct grpc_cached_wakeup_fd {
182 struct grpc_cached_wakeup_fd* next;
184 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
185 grpc_fork_fd_list* fork_fd_list;
186 } grpc_cached_wakeup_fd;
188 struct grpc_pollset_worker {
189 grpc_cached_wakeup_fd* wakeup_fd;
190 int reevaluate_polling_on_wakeup;
191 int kicked_specifically;
192 struct grpc_pollset_worker* next;
193 struct grpc_pollset_worker* prev;
196 struct grpc_pollset {
198 grpc_pollset_worker root_worker;
201 int kicked_without_pollers;
202 grpc_closure* shutdown_done;
203 int pollset_set_count;
208 /* Local cache of eventfds for workers */
209 grpc_cached_wakeup_fd* local_wakeup_cache;
212 /* Add an fd to a pollset */
213 static void pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);
215 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
217 /* Convert a timespec to milliseconds:
218 - very small or negative poll times are clamped to zero to do a
219 non-blocking poll (which becomes spin polling)
220 - other small values are rounded up to one millisecond
221 - longer than a millisecond polls are rounded up to the next nearest
222 millisecond to avoid spinning
223 - infinite timeouts are converted to -1 */
224 static int poll_deadline_to_millis_timeout(grpc_millis deadline);
226 /* Allow kick to wakeup the currently polling worker */
227 #define GRPC_POLLSET_CAN_KICK_SELF 1
228 /* Force the wakee to repoll when awoken */
229 #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
230 /* As per pollset_kick, with an extended set of flags (defined above)
231 -- mostly for fd_posix's use. */
232 static grpc_error_handle pollset_kick_ext(grpc_pollset* p,
233 grpc_pollset_worker* specific_worker,
234 uint32_t flags) GRPC_MUST_USE_RESULT;
236 /* Return 1 if the pollset has active threads in pollset_work (pollset must
238 static bool pollset_has_workers(grpc_pollset* pollset);
240 /*******************************************************************************
241 * pollset_set definitions
244 struct grpc_pollset_set {
247 size_t pollset_count;
248 size_t pollset_capacity;
249 grpc_pollset** pollsets;
251 size_t pollset_set_count;
252 size_t pollset_set_capacity;
253 struct grpc_pollset_set** pollset_sets;
260 /*******************************************************************************
261 * functions to track opened fds. No-ops unless track_fds_for_fork is true.
264 static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
265 if (track_fds_for_fork) {
266 gpr_mu_lock(&fork_fd_list_mu);
267 if (fork_fd_list_head == node) {
268 fork_fd_list_head = node->next;
270 if (node->prev != nullptr) {
271 node->prev->next = node->next;
273 if (node->next != nullptr) {
274 node->next->prev = node->prev;
277 gpr_mu_unlock(&fork_fd_list_mu);
281 static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
282 gpr_mu_lock(&fork_fd_list_mu);
283 node->next = fork_fd_list_head;
284 node->prev = nullptr;
285 if (fork_fd_list_head != nullptr) {
286 fork_fd_list_head->prev = node;
288 fork_fd_list_head = node;
289 gpr_mu_unlock(&fork_fd_list_mu);
292 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
293 if (track_fds_for_fork) {
295 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
296 fd->fork_fd_list->fd = fd;
297 fd->fork_fd_list->cached_wakeup_fd = nullptr;
298 fork_fd_list_add_node(fd->fork_fd_list);
302 static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
303 if (track_fds_for_fork) {
305 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
306 fd->fork_fd_list->cached_wakeup_fd = fd;
307 fd->fork_fd_list->fd = nullptr;
308 fork_fd_list_add_node(fd->fork_fd_list);
312 /*******************************************************************************
317 #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
318 #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
319 static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
321 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
323 "FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
324 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
325 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
328 #define REF_BY(fd, n, reason) \
333 #define UNREF_BY(fd, n, reason) \
338 static void ref_by(grpc_fd* fd, int n) {
340 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
344 static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
346 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
348 "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
349 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
350 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
353 static void unref_by(grpc_fd* fd, int n) {
355 gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
357 gpr_mu_destroy(&fd->mu);
358 grpc_iomgr_unregister_object(&fd->iomgr_object);
359 fork_fd_list_remove_node(fd->fork_fd_list);
360 if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
367 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
368 // Avoid unused-parameter warning for debug-only parameter
370 GPR_DEBUG_ASSERT(track_err == false);
371 grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r)));
373 gpr_atm_rel_store(&r->refst, 1);
375 r->read_closure = CLOSURE_NOT_READY;
376 r->write_closure = CLOSURE_NOT_READY;
378 r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
379 &r->inactive_watcher_root;
380 r->read_watcher = r->write_watcher = nullptr;
381 r->on_done_closure = nullptr;
384 gpr_atm_no_barrier_store(&r->pollhup, 0);
386 std::string name2 = absl::StrCat(name, " fd=", fd);
387 grpc_iomgr_register_object(&r->iomgr_object, name2.c_str());
388 fork_fd_list_add_grpc_fd(r);
392 static bool fd_is_orphaned(grpc_fd* fd) {
393 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
396 static grpc_error_handle pollset_kick_locked(grpc_fd_watcher* watcher) {
397 gpr_mu_lock(&watcher->pollset->mu);
398 GPR_ASSERT(watcher->worker);
399 grpc_error_handle err =
400 pollset_kick_ext(watcher->pollset, watcher->worker,
401 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
402 gpr_mu_unlock(&watcher->pollset->mu);
406 static void maybe_wake_one_watcher_locked(grpc_fd* fd) {
407 if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
408 pollset_kick_locked(fd->inactive_watcher_root.next);
409 } else if (fd->read_watcher) {
410 pollset_kick_locked(fd->read_watcher);
411 } else if (fd->write_watcher) {
412 pollset_kick_locked(fd->write_watcher);
416 static void wake_all_watchers_locked(grpc_fd* fd) {
417 grpc_fd_watcher* watcher;
418 for (watcher = fd->inactive_watcher_root.next;
419 watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
420 pollset_kick_locked(watcher);
422 if (fd->read_watcher) {
423 pollset_kick_locked(fd->read_watcher);
425 if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
426 pollset_kick_locked(fd->write_watcher);
430 static int has_watchers(grpc_fd* fd) {
431 return fd->read_watcher != nullptr || fd->write_watcher != nullptr ||
432 fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
435 static void close_fd_locked(grpc_fd* fd) {
440 grpc_core::ExecCtx::Run(DEBUG_LOCATION, fd->on_done_closure, GRPC_ERROR_NONE);
443 static int fd_wrapped_fd(grpc_fd* fd) {
444 if (fd->released || fd->closed) {
451 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
452 const char* reason) {
453 fd->on_done_closure = on_done;
454 fd->released = release_fd != nullptr;
455 if (release_fd != nullptr) {
456 *release_fd = fd->fd;
459 gpr_mu_lock(&fd->mu);
460 REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
461 if (!has_watchers(fd)) {
464 wake_all_watchers_locked(fd);
466 gpr_mu_unlock(&fd->mu);
467 UNREF_BY(fd, 2, reason); /* drop the reference */
470 /* increment refcount by two to avoid changing the orphan bit */
472 static void fd_ref(grpc_fd* fd, const char* reason, const char* file,
474 ref_by(fd, 2, reason, file, line);
477 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
479 unref_by(fd, 2, reason, file, line);
482 static void fd_ref(grpc_fd* fd) { ref_by(fd, 2); }
484 static void fd_unref(grpc_fd* fd) { unref_by(fd, 2); }
487 static grpc_error_handle fd_shutdown_error(grpc_fd* fd) {
489 return GRPC_ERROR_NONE;
491 return grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
492 "FD shutdown", &fd->shutdown_error, 1),
493 GRPC_ERROR_INT_GRPC_STATUS,
494 GRPC_STATUS_UNAVAILABLE);
498 static void notify_on_locked(grpc_fd* fd, grpc_closure** st,
499 grpc_closure* closure) {
500 if (fd->shutdown || gpr_atm_no_barrier_load(&fd->pollhup)) {
501 grpc_core::ExecCtx::Run(
502 DEBUG_LOCATION, closure,
503 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"),
504 GRPC_ERROR_INT_GRPC_STATUS,
505 GRPC_STATUS_UNAVAILABLE));
506 } else if (*st == CLOSURE_NOT_READY) {
507 /* not ready ==> switch to a waiting state by setting the closure */
509 } else if (*st == CLOSURE_READY) {
510 /* already ready ==> queue the closure to run immediately */
511 *st = CLOSURE_NOT_READY;
512 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, fd_shutdown_error(fd));
513 maybe_wake_one_watcher_locked(fd);
515 /* upcallptr was set to a different closure. This is an error! */
517 "User called a notify_on function with a previous callback still "
523 /* returns 1 if state becomes not ready */
524 static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
525 if (*st == CLOSURE_READY) {
526 /* duplicate ready ==> ignore */
528 } else if (*st == CLOSURE_NOT_READY) {
529 /* not ready, and not waiting ==> flag ready */
533 /* waiting ==> queue closure */
534 grpc_core::ExecCtx::Run(DEBUG_LOCATION, *st, fd_shutdown_error(fd));
535 *st = CLOSURE_NOT_READY;
540 static void fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
541 gpr_mu_lock(&fd->mu);
542 /* only shutdown once */
545 fd->shutdown_error = why;
546 /* signal read/write closed to OS so that future operations fail */
547 shutdown(fd->fd, SHUT_RDWR);
548 set_ready_locked(fd, &fd->read_closure);
549 set_ready_locked(fd, &fd->write_closure);
551 GRPC_ERROR_UNREF(why);
553 gpr_mu_unlock(&fd->mu);
556 static bool fd_is_shutdown(grpc_fd* fd) {
557 gpr_mu_lock(&fd->mu);
558 bool r = fd->shutdown;
559 gpr_mu_unlock(&fd->mu);
563 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
564 gpr_mu_lock(&fd->mu);
565 notify_on_locked(fd, &fd->read_closure, closure);
566 gpr_mu_unlock(&fd->mu);
569 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
570 gpr_mu_lock(&fd->mu);
571 notify_on_locked(fd, &fd->write_closure, closure);
572 gpr_mu_unlock(&fd->mu);
575 static void fd_notify_on_error(grpc_fd* /*fd*/, grpc_closure* closure) {
576 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
577 gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
579 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED);
582 static void fd_set_readable(grpc_fd* fd) {
583 gpr_mu_lock(&fd->mu);
584 set_ready_locked(fd, &fd->read_closure);
585 gpr_mu_unlock(&fd->mu);
588 static void fd_set_writable(grpc_fd* fd) {
589 gpr_mu_lock(&fd->mu);
590 set_ready_locked(fd, &fd->write_closure);
591 gpr_mu_unlock(&fd->mu);
594 static void fd_set_error(grpc_fd* /*fd*/) {
595 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
596 gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
600 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
601 grpc_pollset_worker* worker, uint32_t read_mask,
602 uint32_t write_mask, grpc_fd_watcher* watcher) {
606 /* keep track of pollers that have requested our events, in case they change
608 GRPC_FD_REF(fd, "poll");
610 gpr_mu_lock(&fd->mu);
612 /* if we are shutdown, then don't add to the watcher set */
614 watcher->fd = nullptr;
615 watcher->pollset = nullptr;
616 watcher->worker = nullptr;
617 gpr_mu_unlock(&fd->mu);
618 GRPC_FD_UNREF(fd, "poll");
622 /* if there is nobody polling for read, but we need to, then start doing so */
623 cur = fd->read_closure;
624 requested = cur != CLOSURE_READY;
625 if (read_mask && fd->read_watcher == nullptr && requested) {
626 fd->read_watcher = watcher;
629 /* if there is nobody polling for write, but we need to, then start doing so
631 cur = fd->write_closure;
632 requested = cur != CLOSURE_READY;
633 if (write_mask && fd->write_watcher == nullptr && requested) {
634 fd->write_watcher = watcher;
637 /* if not polling, remember this watcher in case we need someone to later */
638 if (mask == 0 && worker != nullptr) {
639 watcher->next = &fd->inactive_watcher_root;
640 watcher->prev = watcher->next->prev;
641 watcher->next->prev = watcher->prev->next = watcher;
643 watcher->pollset = pollset;
644 watcher->worker = worker;
646 gpr_mu_unlock(&fd->mu);
651 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {
654 grpc_fd* fd = watcher->fd;
660 gpr_mu_lock(&fd->mu);
662 if (watcher == fd->read_watcher) {
663 /* remove read watcher, kick if we still need a read */
668 fd->read_watcher = nullptr;
670 if (watcher == fd->write_watcher) {
671 /* remove write watcher, kick if we still need a write */
676 fd->write_watcher = nullptr;
678 if (!was_polling && watcher->worker != nullptr) {
679 /* remove from inactive list */
680 watcher->next->prev = watcher->prev;
681 watcher->prev->next = watcher->next;
684 if (set_ready_locked(fd, &fd->read_closure)) {
689 if (set_ready_locked(fd, &fd->write_closure)) {
694 maybe_wake_one_watcher_locked(fd);
696 if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
699 gpr_mu_unlock(&fd->mu);
701 GRPC_FD_UNREF(fd, "poll");
704 /*******************************************************************************
708 static GPR_THREAD_LOCAL(grpc_pollset*) g_current_thread_poller;
709 static GPR_THREAD_LOCAL(grpc_pollset_worker*) g_current_thread_worker;
711 static void remove_worker(grpc_pollset* /*p*/, grpc_pollset_worker* worker) {
712 worker->prev->next = worker->next;
713 worker->next->prev = worker->prev;
716 static bool pollset_has_workers(grpc_pollset* p) {
717 return p->root_worker.next != &p->root_worker;
720 static bool pollset_in_pollset_sets(grpc_pollset* p) {
721 return p->pollset_set_count;
724 static bool pollset_has_observers(grpc_pollset* p) {
725 return pollset_has_workers(p) || pollset_in_pollset_sets(p);
728 static grpc_pollset_worker* pop_front_worker(grpc_pollset* p) {
729 if (pollset_has_workers(p)) {
730 grpc_pollset_worker* w = p->root_worker.next;
738 static void push_back_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
739 worker->next = &p->root_worker;
740 worker->prev = worker->next->prev;
741 worker->prev->next = worker->next->prev = worker;
744 static void push_front_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
745 worker->prev = &p->root_worker;
746 worker->next = worker->prev->next;
747 worker->prev->next = worker->next->prev = worker;
750 static void kick_append_error(grpc_error_handle* composite,
751 grpc_error_handle error) {
752 if (error == GRPC_ERROR_NONE) return;
753 if (*composite == GRPC_ERROR_NONE) {
754 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Kick Failure");
756 *composite = grpc_error_add_child(*composite, error);
759 static grpc_error_handle pollset_kick_ext(grpc_pollset* p,
760 grpc_pollset_worker* specific_worker,
762 GPR_TIMER_SCOPE("pollset_kick_ext", 0);
763 grpc_error_handle error = GRPC_ERROR_NONE;
764 GRPC_STATS_INC_POLLSET_KICK();
766 /* pollset->mu already held */
767 if (specific_worker != nullptr) {
768 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
769 GPR_TIMER_SCOPE("pollset_kick_ext.broadcast", 0);
770 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
771 for (specific_worker = p->root_worker.next;
772 specific_worker != &p->root_worker;
773 specific_worker = specific_worker->next) {
775 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
777 p->kicked_without_pollers = true;
778 } else if (g_current_thread_worker != specific_worker) {
779 GPR_TIMER_MARK("different_thread_worker", 0);
780 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
781 specific_worker->reevaluate_polling_on_wakeup = true;
783 specific_worker->kicked_specifically = true;
784 kick_append_error(&error,
785 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
786 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
787 GPR_TIMER_MARK("kick_yoself", 0);
788 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
789 specific_worker->reevaluate_polling_on_wakeup = true;
791 specific_worker->kicked_specifically = true;
792 kick_append_error(&error,
793 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
795 } else if (g_current_thread_poller != p) {
796 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
797 GPR_TIMER_MARK("kick_anonymous", 0);
798 specific_worker = pop_front_worker(p);
799 if (specific_worker != nullptr) {
800 if (g_current_thread_worker == specific_worker) {
801 GPR_TIMER_MARK("kick_anonymous_not_self", 0);
802 push_back_worker(p, specific_worker);
803 specific_worker = pop_front_worker(p);
804 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
805 g_current_thread_worker == specific_worker) {
806 push_back_worker(p, specific_worker);
807 specific_worker = nullptr;
810 if (specific_worker != nullptr) {
811 GPR_TIMER_MARK("finally_kick", 0);
812 push_back_worker(p, specific_worker);
814 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
817 GPR_TIMER_MARK("kicked_no_pollers", 0);
818 p->kicked_without_pollers = true;
822 GRPC_LOG_IF_ERROR("pollset_kick_ext", GRPC_ERROR_REF(error));
826 static grpc_error_handle pollset_kick(grpc_pollset* p,
827 grpc_pollset_worker* specific_worker) {
828 return pollset_kick_ext(p, specific_worker, 0);
831 /* global state management */
833 static grpc_error_handle pollset_global_init(void) { return GRPC_ERROR_NONE; }
835 static void pollset_global_shutdown(void) {}
839 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
840 gpr_mu_init(&pollset->mu);
842 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
843 pollset->shutting_down = 0;
844 pollset->called_shutdown = 0;
845 pollset->kicked_without_pollers = 0;
846 pollset->local_wakeup_cache = nullptr;
847 pollset->kicked_without_pollers = 0;
848 pollset->fd_count = 0;
849 pollset->fd_capacity = 0;
850 pollset->fds = nullptr;
851 pollset->pollset_set_count = 0;
854 static void pollset_destroy(grpc_pollset* pollset) {
855 GPR_ASSERT(!pollset_has_workers(pollset));
856 while (pollset->local_wakeup_cache) {
857 grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
858 fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list);
859 grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
860 gpr_free(pollset->local_wakeup_cache);
861 pollset->local_wakeup_cache = next;
863 gpr_free(pollset->fds);
864 gpr_mu_destroy(&pollset->mu);
867 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
868 gpr_mu_lock(&pollset->mu);
870 /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
871 for (i = 0; i < pollset->fd_count; i++) {
872 if (pollset->fds[i] == fd) goto exit;
874 if (pollset->fd_count == pollset->fd_capacity) {
875 pollset->fd_capacity =
876 GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
877 pollset->fds = static_cast<grpc_fd**>(
878 gpr_realloc(pollset->fds, sizeof(grpc_fd*) * pollset->fd_capacity));
880 pollset->fds[pollset->fd_count++] = fd;
881 GRPC_FD_REF(fd, "multipoller");
882 pollset_kick(pollset, nullptr);
884 gpr_mu_unlock(&pollset->mu);
887 static void finish_shutdown(grpc_pollset* pollset) {
889 for (i = 0; i < pollset->fd_count; i++) {
890 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
892 pollset->fd_count = 0;
893 grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_done,
897 static void work_combine_error(grpc_error_handle* composite,
898 grpc_error_handle error) {
899 if (error == GRPC_ERROR_NONE) return;
900 if (*composite == GRPC_ERROR_NONE) {
901 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("pollset_work");
903 *composite = grpc_error_add_child(*composite, error);
906 static grpc_error_handle pollset_work(grpc_pollset* pollset,
907 grpc_pollset_worker** worker_hdl,
908 grpc_millis deadline) {
909 GPR_TIMER_SCOPE("pollset_work", 0);
910 grpc_pollset_worker worker;
911 if (worker_hdl) *worker_hdl = &worker;
912 grpc_error_handle error = GRPC_ERROR_NONE;
914 /* Avoid malloc for small number of elements. */
915 enum { inline_elements = 96 };
916 struct pollfd pollfd_space[inline_elements];
917 struct grpc_fd_watcher watcher_space[inline_elements];
919 /* pollset->mu already held */
920 int added_worker = 0;
923 int keep_polling = 0;
924 /* this must happen before we (potentially) drop pollset->mu */
925 worker.next = worker.prev = nullptr;
926 worker.reevaluate_polling_on_wakeup = 0;
927 if (pollset->local_wakeup_cache != nullptr) {
928 worker.wakeup_fd = pollset->local_wakeup_cache;
929 pollset->local_wakeup_cache = worker.wakeup_fd->next;
931 worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
932 gpr_malloc(sizeof(*worker.wakeup_fd)));
933 error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
934 fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
935 if (error != GRPC_ERROR_NONE) {
936 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
940 worker.kicked_specifically = 0;
941 /* If we're shutting down then we don't execute any extended work */
942 if (pollset->shutting_down) {
943 GPR_TIMER_MARK("pollset_work.shutting_down", 0);
946 /* Start polling, and keep doing so while we're being asked to
947 re-evaluate our pollers (this allows poll() based pollers to
948 ensure they don't miss wakeups) */
950 g_current_thread_poller = pollset;
951 while (keep_polling) {
953 if (!pollset->kicked_without_pollers ||
954 deadline <= grpc_core::ExecCtx::Get()->Now()) {
956 push_front_worker(pollset, &worker);
958 g_current_thread_worker = &worker;
960 GPR_TIMER_SCOPE("maybe_work_and_unlock", 0);
961 #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
962 #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
968 grpc_fd_watcher* watchers;
971 timeout = poll_deadline_to_millis_timeout(deadline);
973 if (pollset->fd_count + 2 <= inline_elements) {
975 watchers = watcher_space;
977 /* Allocate one buffer to hold both pfds and watchers arrays */
978 const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
979 const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
980 void* buf = gpr_malloc(pfd_size + watch_size);
981 pfds = static_cast<struct pollfd*>(buf);
982 watchers = static_cast<grpc_fd_watcher*>(
983 static_cast<void*>((static_cast<char*>(buf) + pfd_size)));
988 pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
989 pfds[0].events = POLLIN;
991 for (i = 0; i < pollset->fd_count; i++) {
992 if (fd_is_orphaned(pollset->fds[i]) ||
993 gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
994 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
996 pollset->fds[fd_count++] = pollset->fds[i];
997 watchers[pfd_count].fd = pollset->fds[i];
998 GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
999 pfds[pfd_count].fd = pollset->fds[i]->fd;
1000 pfds[pfd_count].revents = 0;
1004 pollset->fd_count = fd_count;
1005 gpr_mu_unlock(&pollset->mu);
1007 for (i = 1; i < pfd_count; i++) {
1008 grpc_fd* fd = watchers[i].fd;
1009 pfds[i].events = static_cast<short>(
1010 fd_begin_poll(fd, pollset, &worker, POLLIN, POLLOUT, &watchers[i]));
1011 GRPC_FD_UNREF(fd, "multipoller_start");
1014 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1015 even going into the blocking annotation if possible */
1016 GRPC_SCHEDULING_START_BLOCKING_REGION;
1017 GRPC_STATS_INC_SYSCALL_POLL();
1018 r = grpc_poll_function(pfds, pfd_count, timeout);
1019 GRPC_SCHEDULING_END_BLOCKING_REGION;
1021 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1022 gpr_log(GPR_INFO, "%p poll=%d", pollset, r);
1026 if (errno != EINTR) {
1027 work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
1030 for (i = 1; i < pfd_count; i++) {
1031 if (watchers[i].fd == nullptr) {
1032 fd_end_poll(&watchers[i], 0, 0);
1034 // Wake up all the file descriptors, if we have an invalid one
1035 // we can identify it on the next pollset_work()
1036 fd_end_poll(&watchers[i], 1, 1);
1039 } else if (r == 0) {
1040 for (i = 1; i < pfd_count; i++) {
1041 fd_end_poll(&watchers[i], 0, 0);
1044 if (pfds[0].revents & POLLIN_CHECK) {
1045 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1046 gpr_log(GPR_INFO, "%p: got_wakeup", pollset);
1049 &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
1051 for (i = 1; i < pfd_count; i++) {
1052 if (watchers[i].fd == nullptr) {
1053 fd_end_poll(&watchers[i], 0, 0);
1055 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1056 gpr_log(GPR_INFO, "%p got_event: %d r:%d w:%d [%d]", pollset,
1057 pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
1058 (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
1060 /* This is a mitigation to prevent poll() from spinning on a
1061 ** POLLHUP https://github.com/grpc/grpc/pull/13665
1063 if (pfds[i].revents & POLLHUP) {
1064 gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
1066 fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
1067 pfds[i].revents & POLLOUT_CHECK);
1072 if (pfds != pollfd_space) {
1073 /* pfds and watchers are in the same memory block pointed to by pfds */
1079 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1080 pollset->kicked_without_pollers = 0;
1082 /* Finished execution - start cleaning up.
1083 Note that we may arrive here from outside the enclosing while() loop.
1084 In that case we won't loop though as we haven't added worker to the
1085 worker list, which means nobody could ask us to re-evaluate polling). */
1088 queued_work |= grpc_core::ExecCtx::Get()->Flush();
1089 gpr_mu_lock(&pollset->mu);
1092 /* If we're forced to re-evaluate polling (via pollset_kick with
1093 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
1095 if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) {
1096 worker.reevaluate_polling_on_wakeup = 0;
1097 pollset->kicked_without_pollers = 0;
1098 if (queued_work || worker.kicked_specifically) {
1099 /* If there's queued work on the list, then set the deadline to be
1100 immediate so we get back out of the polling loop quickly */
1106 g_current_thread_poller = nullptr;
1108 remove_worker(pollset, &worker);
1109 g_current_thread_worker = nullptr;
1111 /* release wakeup fd to the local pool */
1112 worker.wakeup_fd->next = pollset->local_wakeup_cache;
1113 pollset->local_wakeup_cache = worker.wakeup_fd;
1114 /* check shutdown conditions */
1115 if (pollset->shutting_down) {
1116 if (pollset_has_workers(pollset)) {
1117 pollset_kick(pollset, nullptr);
1118 } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1119 pollset->called_shutdown = 1;
1120 gpr_mu_unlock(&pollset->mu);
1121 finish_shutdown(pollset);
1122 grpc_core::ExecCtx::Get()->Flush();
1123 /* Continuing to access pollset here is safe -- it is the caller's
1124 * responsibility to not destroy when it has outstanding calls to
1126 * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
1127 gpr_mu_lock(&pollset->mu);
1130 if (worker_hdl) *worker_hdl = nullptr;
1131 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1135 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
1136 GPR_ASSERT(!pollset->shutting_down);
1137 pollset->shutting_down = 1;
1138 pollset->shutdown_done = closure;
1139 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1140 if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1141 pollset->called_shutdown = 1;
1142 finish_shutdown(pollset);
1146 static int poll_deadline_to_millis_timeout(grpc_millis deadline) {
1147 if (deadline == GRPC_MILLIS_INF_FUTURE) return -1;
1148 if (deadline == 0) return 0;
1149 grpc_millis n = deadline - grpc_core::ExecCtx::Get()->Now();
1150 if (n < 0) return 0;
1151 if (n > INT_MAX) return -1;
1152 return static_cast<int>(n);
1155 /*******************************************************************************
1156 * pollset_set_posix.c
1159 static grpc_pollset_set* pollset_set_create(void) {
1160 grpc_pollset_set* pollset_set =
1161 static_cast<grpc_pollset_set*>(gpr_zalloc(sizeof(*pollset_set)));
1162 gpr_mu_init(&pollset_set->mu);
1166 static void pollset_set_destroy(grpc_pollset_set* pollset_set) {
1168 gpr_mu_destroy(&pollset_set->mu);
1169 for (i = 0; i < pollset_set->fd_count; i++) {
1170 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1172 for (i = 0; i < pollset_set->pollset_count; i++) {
1173 grpc_pollset* pollset = pollset_set->pollsets[i];
1174 gpr_mu_lock(&pollset->mu);
1175 pollset->pollset_set_count--;
1176 /* check shutdown */
1177 if (pollset->shutting_down && !pollset->called_shutdown &&
1178 !pollset_has_observers(pollset)) {
1179 pollset->called_shutdown = 1;
1180 gpr_mu_unlock(&pollset->mu);
1181 finish_shutdown(pollset);
1183 gpr_mu_unlock(&pollset->mu);
1186 gpr_free(pollset_set->pollsets);
1187 gpr_free(pollset_set->pollset_sets);
1188 gpr_free(pollset_set->fds);
1189 gpr_free(pollset_set);
1192 static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
1193 grpc_pollset* pollset) {
1195 gpr_mu_lock(&pollset->mu);
1196 pollset->pollset_set_count++;
1197 gpr_mu_unlock(&pollset->mu);
1198 gpr_mu_lock(&pollset_set->mu);
1199 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1200 pollset_set->pollset_capacity =
1201 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1202 pollset_set->pollsets = static_cast<grpc_pollset**>(gpr_realloc(
1203 pollset_set->pollsets,
1204 pollset_set->pollset_capacity * sizeof(*pollset_set->pollsets)));
1206 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1207 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1208 if (fd_is_orphaned(pollset_set->fds[i])) {
1209 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1211 pollset_add_fd(pollset, pollset_set->fds[i]);
1212 pollset_set->fds[j++] = pollset_set->fds[i];
1215 pollset_set->fd_count = j;
1216 gpr_mu_unlock(&pollset_set->mu);
1219 static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
1220 grpc_pollset* pollset) {
1222 gpr_mu_lock(&pollset_set->mu);
1223 for (i = 0; i < pollset_set->pollset_count; i++) {
1224 if (pollset_set->pollsets[i] == pollset) {
1225 pollset_set->pollset_count--;
1226 GPR_SWAP(grpc_pollset*, pollset_set->pollsets[i],
1227 pollset_set->pollsets[pollset_set->pollset_count]);
1231 gpr_mu_unlock(&pollset_set->mu);
1232 gpr_mu_lock(&pollset->mu);
1233 pollset->pollset_set_count--;
1234 /* check shutdown */
1235 if (pollset->shutting_down && !pollset->called_shutdown &&
1236 !pollset_has_observers(pollset)) {
1237 pollset->called_shutdown = 1;
1238 gpr_mu_unlock(&pollset->mu);
1239 finish_shutdown(pollset);
1241 gpr_mu_unlock(&pollset->mu);
1245 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1246 grpc_pollset_set* item) {
1248 gpr_mu_lock(&bag->mu);
1249 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1250 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1251 bag->pollset_sets = static_cast<grpc_pollset_set**>(
1252 gpr_realloc(bag->pollset_sets,
1253 bag->pollset_set_capacity * sizeof(*bag->pollset_sets)));
1255 bag->pollset_sets[bag->pollset_set_count++] = item;
1256 for (i = 0, j = 0; i < bag->fd_count; i++) {
1257 if (fd_is_orphaned(bag->fds[i])) {
1258 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1260 pollset_set_add_fd(item, bag->fds[i]);
1261 bag->fds[j++] = bag->fds[i];
1265 gpr_mu_unlock(&bag->mu);
1268 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1269 grpc_pollset_set* item) {
1271 gpr_mu_lock(&bag->mu);
1272 for (i = 0; i < bag->pollset_set_count; i++) {
1273 if (bag->pollset_sets[i] == item) {
1274 bag->pollset_set_count--;
1275 GPR_SWAP(grpc_pollset_set*, bag->pollset_sets[i],
1276 bag->pollset_sets[bag->pollset_set_count]);
1280 gpr_mu_unlock(&bag->mu);
1283 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1285 gpr_mu_lock(&pollset_set->mu);
1286 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1287 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1288 pollset_set->fds = static_cast<grpc_fd**>(
1289 gpr_realloc(pollset_set->fds,
1290 pollset_set->fd_capacity * sizeof(*pollset_set->fds)));
1292 GRPC_FD_REF(fd, "pollset_set");
1293 pollset_set->fds[pollset_set->fd_count++] = fd;
1294 for (i = 0; i < pollset_set->pollset_count; i++) {
1295 pollset_add_fd(pollset_set->pollsets[i], fd);
1297 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1298 pollset_set_add_fd(pollset_set->pollset_sets[i], fd);
1300 gpr_mu_unlock(&pollset_set->mu);
1303 static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1305 gpr_mu_lock(&pollset_set->mu);
1306 for (i = 0; i < pollset_set->fd_count; i++) {
1307 if (pollset_set->fds[i] == fd) {
1308 pollset_set->fd_count--;
1309 GPR_SWAP(grpc_fd*, pollset_set->fds[i],
1310 pollset_set->fds[pollset_set->fd_count]);
1311 GRPC_FD_UNREF(fd, "pollset_set");
1315 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1316 pollset_set_del_fd(pollset_set->pollset_sets[i], fd);
1318 gpr_mu_unlock(&pollset_set->mu);
1321 /*******************************************************************************
1322 * event engine binding
1325 static bool is_any_background_poller_thread(void) { return false; }
1327 static void shutdown_background_closure(void) {}
1329 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1330 grpc_error_handle /*error*/) {
1334 static void shutdown_engine(void) {
1335 pollset_global_shutdown();
1336 if (track_fds_for_fork) {
1337 gpr_mu_destroy(&fork_fd_list_mu);
1338 grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1342 static const grpc_event_engine_vtable vtable = {
1343 sizeof(grpc_pollset),
1367 pollset_set_destroy,
1368 pollset_set_add_pollset,
1369 pollset_set_del_pollset,
1370 pollset_set_add_pollset_set,
1371 pollset_set_del_pollset_set,
1375 is_any_background_poller_thread,
1376 shutdown_background_closure,
1378 add_closure_to_background_poller,
1381 /* Called by the child process's post-fork handler to close open fds, including
1382 * worker wakeup fds. This allows gRPC to shutdown in the child process without
1383 * interfering with connections or RPCs ongoing in the parent. */
1384 static void reset_event_manager_on_fork() {
1385 gpr_mu_lock(&fork_fd_list_mu);
1386 while (fork_fd_list_head != nullptr) {
1387 if (fork_fd_list_head->fd != nullptr) {
1388 if (!fork_fd_list_head->fd->closed) {
1389 close(fork_fd_list_head->fd->fd);
1391 fork_fd_list_head->fd->fd = -1;
1393 close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
1394 fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
1395 close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
1396 fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
1398 fork_fd_list_head = fork_fd_list_head->next;
1400 gpr_mu_unlock(&fork_fd_list_mu);
1403 const grpc_event_engine_vtable* grpc_init_poll_posix(
1404 bool /*explicit_request*/) {
1405 if (!grpc_has_wakeup_fd()) {
1406 gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
1409 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1412 if (grpc_core::Fork::Enabled()) {
1413 track_fds_for_fork = true;
1414 gpr_mu_init(&fork_fd_list_mu);
1415 grpc_core::Fork::SetResetChildPollingEngineFunc(
1416 reset_event_manager_on_fork);
1421 #endif /* GRPC_POSIX_SOCKET_EV_POLL */