Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / src / core / lib / iomgr / ev_poll_posix.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_POSIX_SOCKET_EV_POLL
24
25 #include <assert.h>
26 #include <errno.h>
27 #include <limits.h>
28 #include <poll.h>
29 #include <string.h>
30 #include <sys/socket.h>
31 #include <unistd.h>
32
33 #include <string>
34
35 #include "absl/strings/str_cat.h"
36
37 #include <grpc/support/alloc.h>
38 #include <grpc/support/log.h>
39
40 #include "src/core/lib/debug/stats.h"
41 #include "src/core/lib/gpr/murmur_hash.h"
42 #include "src/core/lib/gpr/tls.h"
43 #include "src/core/lib/gpr/useful.h"
44 #include "src/core/lib/gprpp/thd.h"
45 #include "src/core/lib/iomgr/block_annotate.h"
46 #include "src/core/lib/iomgr/ev_poll_posix.h"
47 #include "src/core/lib/iomgr/iomgr_internal.h"
48 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
49 #include "src/core/lib/profiling/timers.h"
50
51 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
52
53 /*******************************************************************************
54  * FD declarations
55  */
56 typedef struct grpc_fd_watcher {
57   struct grpc_fd_watcher* next;
58   struct grpc_fd_watcher* prev;
59   grpc_pollset* pollset;
60   grpc_pollset_worker* worker;
61   grpc_fd* fd;
62 } grpc_fd_watcher;
63
64 typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
65
66 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
67 struct grpc_fork_fd_list {
68   /* Only one of fd or cached_wakeup_fd will be set. The unused field will be
69   set to nullptr. */
70   grpc_fd* fd;
71   grpc_cached_wakeup_fd* cached_wakeup_fd;
72
73   grpc_fork_fd_list* next;
74   grpc_fork_fd_list* prev;
75 };
76
77 struct grpc_fd {
78   int fd;
79   /* refst format:
80      bit0:   1=active/0=orphaned
81      bit1-n: refcount
82      meaning that mostly we ref by two to avoid altering the orphaned bit,
83      and just unref by 1 when we're ready to flag the object as orphaned */
84   gpr_atm refst;
85
86   gpr_mu mu;
87   int shutdown;
88   int closed;
89   int released;
90   gpr_atm pollhup;
91   grpc_error_handle shutdown_error;
92
93   /* The watcher list.
94
95      The following watcher related fields are protected by watcher_mu.
96
97      An fd_watcher is an ephemeral object created when an fd wants to
98      begin polling, and destroyed after the poll.
99
100      It denotes the fd's interest in whether to read poll or write poll
101      or both or neither on this fd.
102
103      If a watcher is asked to poll for reads or writes, the read_watcher
104      or write_watcher fields are set respectively. A watcher may be asked
105      to poll for both, in which case both fields will be set.
106
107      read_watcher and write_watcher may be NULL if no watcher has been
108      asked to poll for reads or writes.
109
110      If an fd_watcher is not asked to poll for reads or writes, it's added
111      to a linked list of inactive watchers, rooted at inactive_watcher_root.
112      If at a later time there becomes need of a poller to poll, one of
113      the inactive pollers may be kicked out of their poll loops to take
114      that responsibility. */
115   grpc_fd_watcher inactive_watcher_root;
116   grpc_fd_watcher* read_watcher;
117   grpc_fd_watcher* write_watcher;
118
119   grpc_closure* read_closure;
120   grpc_closure* write_closure;
121
122   grpc_closure* on_done_closure;
123
124   grpc_iomgr_object iomgr_object;
125
126   /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
127   grpc_fork_fd_list* fork_fd_list;
128 };
129
130 /* True when GRPC_ENABLE_FORK_SUPPORT=1. */
131 static bool track_fds_for_fork = false;
132
133 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
134 static grpc_fork_fd_list* fork_fd_list_head = nullptr;
135 static gpr_mu fork_fd_list_mu;
136
137 /* Begin polling on an fd.
138    Registers that the given pollset is interested in this fd - so that if read
139    or writability interest changes, the pollset can be kicked to pick up that
140    new interest.
141    Return value is:
142      (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
143    i.e. a combination of read_mask and write_mask determined by the fd's current
144    interest in said events.
145    Polling strategies that do not need to alter their behavior depending on the
146    fd's current interest (such as epoll) do not need to call this function.
147    MUST NOT be called with a pollset lock taken */
148 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
149                               grpc_pollset_worker* worker, uint32_t read_mask,
150                               uint32_t write_mask, grpc_fd_watcher* watcher);
151 /* Complete polling previously started with fd_begin_poll
152    MUST NOT be called with a pollset lock taken
153    if got_read or got_write are 1, also does the become_{readable,writable} as
154    appropriate. */
155 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write);
156
157 /* Return 1 if this fd is orphaned, 0 otherwise */
158 static bool fd_is_orphaned(grpc_fd* fd);
159
160 #ifndef NDEBUG
161 static void fd_ref(grpc_fd* fd, const char* reason, const char* file, int line);
162 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
163                      int line);
164 #define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
165 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
166 #else
167 static void fd_ref(grpc_fd* fd);
168 static void fd_unref(grpc_fd* fd);
169 #define GRPC_FD_REF(fd, reason) fd_ref(fd)
170 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
171 #endif
172
173 #define CLOSURE_NOT_READY ((grpc_closure*)0)
174 #define CLOSURE_READY ((grpc_closure*)1)
175
176 /*******************************************************************************
177  * pollset declarations
178  */
179
180 typedef struct grpc_cached_wakeup_fd {
181   grpc_wakeup_fd fd;
182   struct grpc_cached_wakeup_fd* next;
183
184   /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
185   grpc_fork_fd_list* fork_fd_list;
186 } grpc_cached_wakeup_fd;
187
188 struct grpc_pollset_worker {
189   grpc_cached_wakeup_fd* wakeup_fd;
190   int reevaluate_polling_on_wakeup;
191   int kicked_specifically;
192   struct grpc_pollset_worker* next;
193   struct grpc_pollset_worker* prev;
194 };
195
196 struct grpc_pollset {
197   gpr_mu mu;
198   grpc_pollset_worker root_worker;
199   int shutting_down;
200   int called_shutdown;
201   int kicked_without_pollers;
202   grpc_closure* shutdown_done;
203   int pollset_set_count;
204   /* all polled fds */
205   size_t fd_count;
206   size_t fd_capacity;
207   grpc_fd** fds;
208   /* Local cache of eventfds for workers */
209   grpc_cached_wakeup_fd* local_wakeup_cache;
210 };
211
212 /* Add an fd to a pollset */
213 static void pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);
214
215 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
216
217 /* Convert a timespec to milliseconds:
218    - very small or negative poll times are clamped to zero to do a
219      non-blocking poll (which becomes spin polling)
220    - other small values are rounded up to one millisecond
221    - longer than a millisecond polls are rounded up to the next nearest
222      millisecond to avoid spinning
223    - infinite timeouts are converted to -1 */
224 static int poll_deadline_to_millis_timeout(grpc_millis deadline);
225
226 /* Allow kick to wakeup the currently polling worker */
227 #define GRPC_POLLSET_CAN_KICK_SELF 1
228 /* Force the wakee to repoll when awoken */
229 #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
230 /* As per pollset_kick, with an extended set of flags (defined above)
231    -- mostly for fd_posix's use. */
232 static grpc_error_handle pollset_kick_ext(grpc_pollset* p,
233                                           grpc_pollset_worker* specific_worker,
234                                           uint32_t flags) GRPC_MUST_USE_RESULT;
235
236 /* Return 1 if the pollset has active threads in pollset_work (pollset must
237  * be locked) */
238 static bool pollset_has_workers(grpc_pollset* pollset);
239
240 /*******************************************************************************
241  * pollset_set definitions
242  */
243
244 struct grpc_pollset_set {
245   gpr_mu mu;
246
247   size_t pollset_count;
248   size_t pollset_capacity;
249   grpc_pollset** pollsets;
250
251   size_t pollset_set_count;
252   size_t pollset_set_capacity;
253   struct grpc_pollset_set** pollset_sets;
254
255   size_t fd_count;
256   size_t fd_capacity;
257   grpc_fd** fds;
258 };
259
260 /*******************************************************************************
261  * functions to track opened fds. No-ops unless track_fds_for_fork is true.
262  */
263
264 static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
265   if (track_fds_for_fork) {
266     gpr_mu_lock(&fork_fd_list_mu);
267     if (fork_fd_list_head == node) {
268       fork_fd_list_head = node->next;
269     }
270     if (node->prev != nullptr) {
271       node->prev->next = node->next;
272     }
273     if (node->next != nullptr) {
274       node->next->prev = node->prev;
275     }
276     gpr_free(node);
277     gpr_mu_unlock(&fork_fd_list_mu);
278   }
279 }
280
281 static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
282   gpr_mu_lock(&fork_fd_list_mu);
283   node->next = fork_fd_list_head;
284   node->prev = nullptr;
285   if (fork_fd_list_head != nullptr) {
286     fork_fd_list_head->prev = node;
287   }
288   fork_fd_list_head = node;
289   gpr_mu_unlock(&fork_fd_list_mu);
290 }
291
292 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
293   if (track_fds_for_fork) {
294     fd->fork_fd_list =
295         static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
296     fd->fork_fd_list->fd = fd;
297     fd->fork_fd_list->cached_wakeup_fd = nullptr;
298     fork_fd_list_add_node(fd->fork_fd_list);
299   }
300 }
301
302 static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
303   if (track_fds_for_fork) {
304     fd->fork_fd_list =
305         static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
306     fd->fork_fd_list->cached_wakeup_fd = fd;
307     fd->fork_fd_list->fd = nullptr;
308     fork_fd_list_add_node(fd->fork_fd_list);
309   }
310 }
311
312 /*******************************************************************************
313  * fd_posix.c
314  */
315
316 #ifndef NDEBUG
317 #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
318 #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
319 static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
320                    int line) {
321   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
322     gpr_log(GPR_DEBUG,
323             "FD %d %p   ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
324             fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
325             gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
326   }
327 #else
328 #define REF_BY(fd, n, reason) \
329   do {                        \
330     ref_by(fd, n);            \
331     (void)(reason);           \
332   } while (0)
333 #define UNREF_BY(fd, n, reason) \
334   do {                          \
335     unref_by(fd, n);            \
336     (void)(reason);             \
337   } while (0)
338 static void ref_by(grpc_fd* fd, int n) {
339 #endif
340   GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
341 }
342
343 #ifndef NDEBUG
344 static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
345                      int line) {
346   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
347     gpr_log(GPR_DEBUG,
348             "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
349             fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
350             gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
351   }
352 #else
353 static void unref_by(grpc_fd* fd, int n) {
354 #endif
355   gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
356   if (old == n) {
357     gpr_mu_destroy(&fd->mu);
358     grpc_iomgr_unregister_object(&fd->iomgr_object);
359     fork_fd_list_remove_node(fd->fork_fd_list);
360     if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
361     gpr_free(fd);
362   } else {
363     GPR_ASSERT(old > n);
364   }
365 }
366
367 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
368   // Avoid unused-parameter warning for debug-only parameter
369   (void)track_err;
370   GPR_DEBUG_ASSERT(track_err == false);
371   grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r)));
372   gpr_mu_init(&r->mu);
373   gpr_atm_rel_store(&r->refst, 1);
374   r->shutdown = 0;
375   r->read_closure = CLOSURE_NOT_READY;
376   r->write_closure = CLOSURE_NOT_READY;
377   r->fd = fd;
378   r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
379       &r->inactive_watcher_root;
380   r->read_watcher = r->write_watcher = nullptr;
381   r->on_done_closure = nullptr;
382   r->closed = 0;
383   r->released = 0;
384   gpr_atm_no_barrier_store(&r->pollhup, 0);
385
386   std::string name2 = absl::StrCat(name, " fd=", fd);
387   grpc_iomgr_register_object(&r->iomgr_object, name2.c_str());
388   fork_fd_list_add_grpc_fd(r);
389   return r;
390 }
391
392 static bool fd_is_orphaned(grpc_fd* fd) {
393   return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
394 }
395
396 static grpc_error_handle pollset_kick_locked(grpc_fd_watcher* watcher) {
397   gpr_mu_lock(&watcher->pollset->mu);
398   GPR_ASSERT(watcher->worker);
399   grpc_error_handle err =
400       pollset_kick_ext(watcher->pollset, watcher->worker,
401                        GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
402   gpr_mu_unlock(&watcher->pollset->mu);
403   return err;
404 }
405
406 static void maybe_wake_one_watcher_locked(grpc_fd* fd) {
407   if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
408     pollset_kick_locked(fd->inactive_watcher_root.next);
409   } else if (fd->read_watcher) {
410     pollset_kick_locked(fd->read_watcher);
411   } else if (fd->write_watcher) {
412     pollset_kick_locked(fd->write_watcher);
413   }
414 }
415
416 static void wake_all_watchers_locked(grpc_fd* fd) {
417   grpc_fd_watcher* watcher;
418   for (watcher = fd->inactive_watcher_root.next;
419        watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
420     pollset_kick_locked(watcher);
421   }
422   if (fd->read_watcher) {
423     pollset_kick_locked(fd->read_watcher);
424   }
425   if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
426     pollset_kick_locked(fd->write_watcher);
427   }
428 }
429
430 static int has_watchers(grpc_fd* fd) {
431   return fd->read_watcher != nullptr || fd->write_watcher != nullptr ||
432          fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
433 }
434
435 static void close_fd_locked(grpc_fd* fd) {
436   fd->closed = 1;
437   if (!fd->released) {
438     close(fd->fd);
439   }
440   grpc_core::ExecCtx::Run(DEBUG_LOCATION, fd->on_done_closure, GRPC_ERROR_NONE);
441 }
442
443 static int fd_wrapped_fd(grpc_fd* fd) {
444   if (fd->released || fd->closed) {
445     return -1;
446   } else {
447     return fd->fd;
448   }
449 }
450
451 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
452                       const char* reason) {
453   fd->on_done_closure = on_done;
454   fd->released = release_fd != nullptr;
455   if (release_fd != nullptr) {
456     *release_fd = fd->fd;
457     fd->released = true;
458   }
459   gpr_mu_lock(&fd->mu);
460   REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
461   if (!has_watchers(fd)) {
462     close_fd_locked(fd);
463   } else {
464     wake_all_watchers_locked(fd);
465   }
466   gpr_mu_unlock(&fd->mu);
467   UNREF_BY(fd, 2, reason); /* drop the reference */
468 }
469
470 /* increment refcount by two to avoid changing the orphan bit */
471 #ifndef NDEBUG
472 static void fd_ref(grpc_fd* fd, const char* reason, const char* file,
473                    int line) {
474   ref_by(fd, 2, reason, file, line);
475 }
476
477 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
478                      int line) {
479   unref_by(fd, 2, reason, file, line);
480 }
481 #else
482 static void fd_ref(grpc_fd* fd) { ref_by(fd, 2); }
483
484 static void fd_unref(grpc_fd* fd) { unref_by(fd, 2); }
485 #endif
486
487 static grpc_error_handle fd_shutdown_error(grpc_fd* fd) {
488   if (!fd->shutdown) {
489     return GRPC_ERROR_NONE;
490   } else {
491     return grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
492                                   "FD shutdown", &fd->shutdown_error, 1),
493                               GRPC_ERROR_INT_GRPC_STATUS,
494                               GRPC_STATUS_UNAVAILABLE);
495   }
496 }
497
498 static void notify_on_locked(grpc_fd* fd, grpc_closure** st,
499                              grpc_closure* closure) {
500   if (fd->shutdown || gpr_atm_no_barrier_load(&fd->pollhup)) {
501     grpc_core::ExecCtx::Run(
502         DEBUG_LOCATION, closure,
503         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"),
504                            GRPC_ERROR_INT_GRPC_STATUS,
505                            GRPC_STATUS_UNAVAILABLE));
506   } else if (*st == CLOSURE_NOT_READY) {
507     /* not ready ==> switch to a waiting state by setting the closure */
508     *st = closure;
509   } else if (*st == CLOSURE_READY) {
510     /* already ready ==> queue the closure to run immediately */
511     *st = CLOSURE_NOT_READY;
512     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, fd_shutdown_error(fd));
513     maybe_wake_one_watcher_locked(fd);
514   } else {
515     /* upcallptr was set to a different closure.  This is an error! */
516     gpr_log(GPR_ERROR,
517             "User called a notify_on function with a previous callback still "
518             "pending");
519     abort();
520   }
521 }
522
523 /* returns 1 if state becomes not ready */
524 static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
525   if (*st == CLOSURE_READY) {
526     /* duplicate ready ==> ignore */
527     return 0;
528   } else if (*st == CLOSURE_NOT_READY) {
529     /* not ready, and not waiting ==> flag ready */
530     *st = CLOSURE_READY;
531     return 0;
532   } else {
533     /* waiting ==> queue closure */
534     grpc_core::ExecCtx::Run(DEBUG_LOCATION, *st, fd_shutdown_error(fd));
535     *st = CLOSURE_NOT_READY;
536     return 1;
537   }
538 }
539
540 static void fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
541   gpr_mu_lock(&fd->mu);
542   /* only shutdown once */
543   if (!fd->shutdown) {
544     fd->shutdown = 1;
545     fd->shutdown_error = why;
546     /* signal read/write closed to OS so that future operations fail */
547     shutdown(fd->fd, SHUT_RDWR);
548     set_ready_locked(fd, &fd->read_closure);
549     set_ready_locked(fd, &fd->write_closure);
550   } else {
551     GRPC_ERROR_UNREF(why);
552   }
553   gpr_mu_unlock(&fd->mu);
554 }
555
556 static bool fd_is_shutdown(grpc_fd* fd) {
557   gpr_mu_lock(&fd->mu);
558   bool r = fd->shutdown;
559   gpr_mu_unlock(&fd->mu);
560   return r;
561 }
562
563 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
564   gpr_mu_lock(&fd->mu);
565   notify_on_locked(fd, &fd->read_closure, closure);
566   gpr_mu_unlock(&fd->mu);
567 }
568
569 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
570   gpr_mu_lock(&fd->mu);
571   notify_on_locked(fd, &fd->write_closure, closure);
572   gpr_mu_unlock(&fd->mu);
573 }
574
575 static void fd_notify_on_error(grpc_fd* /*fd*/, grpc_closure* closure) {
576   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
577     gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
578   }
579   grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED);
580 }
581
582 static void fd_set_readable(grpc_fd* fd) {
583   gpr_mu_lock(&fd->mu);
584   set_ready_locked(fd, &fd->read_closure);
585   gpr_mu_unlock(&fd->mu);
586 }
587
588 static void fd_set_writable(grpc_fd* fd) {
589   gpr_mu_lock(&fd->mu);
590   set_ready_locked(fd, &fd->write_closure);
591   gpr_mu_unlock(&fd->mu);
592 }
593
594 static void fd_set_error(grpc_fd* /*fd*/) {
595   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
596     gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
597   }
598 }
599
600 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
601                               grpc_pollset_worker* worker, uint32_t read_mask,
602                               uint32_t write_mask, grpc_fd_watcher* watcher) {
603   uint32_t mask = 0;
604   grpc_closure* cur;
605   int requested;
606   /* keep track of pollers that have requested our events, in case they change
607    */
608   GRPC_FD_REF(fd, "poll");
609
610   gpr_mu_lock(&fd->mu);
611
612   /* if we are shutdown, then don't add to the watcher set */
613   if (fd->shutdown) {
614     watcher->fd = nullptr;
615     watcher->pollset = nullptr;
616     watcher->worker = nullptr;
617     gpr_mu_unlock(&fd->mu);
618     GRPC_FD_UNREF(fd, "poll");
619     return 0;
620   }
621
622   /* if there is nobody polling for read, but we need to, then start doing so */
623   cur = fd->read_closure;
624   requested = cur != CLOSURE_READY;
625   if (read_mask && fd->read_watcher == nullptr && requested) {
626     fd->read_watcher = watcher;
627     mask |= read_mask;
628   }
629   /* if there is nobody polling for write, but we need to, then start doing so
630    */
631   cur = fd->write_closure;
632   requested = cur != CLOSURE_READY;
633   if (write_mask && fd->write_watcher == nullptr && requested) {
634     fd->write_watcher = watcher;
635     mask |= write_mask;
636   }
637   /* if not polling, remember this watcher in case we need someone to later */
638   if (mask == 0 && worker != nullptr) {
639     watcher->next = &fd->inactive_watcher_root;
640     watcher->prev = watcher->next->prev;
641     watcher->next->prev = watcher->prev->next = watcher;
642   }
643   watcher->pollset = pollset;
644   watcher->worker = worker;
645   watcher->fd = fd;
646   gpr_mu_unlock(&fd->mu);
647
648   return mask;
649 }
650
651 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {
652   int was_polling = 0;
653   int kick = 0;
654   grpc_fd* fd = watcher->fd;
655
656   if (fd == nullptr) {
657     return;
658   }
659
660   gpr_mu_lock(&fd->mu);
661
662   if (watcher == fd->read_watcher) {
663     /* remove read watcher, kick if we still need a read */
664     was_polling = 1;
665     if (!got_read) {
666       kick = 1;
667     }
668     fd->read_watcher = nullptr;
669   }
670   if (watcher == fd->write_watcher) {
671     /* remove write watcher, kick if we still need a write */
672     was_polling = 1;
673     if (!got_write) {
674       kick = 1;
675     }
676     fd->write_watcher = nullptr;
677   }
678   if (!was_polling && watcher->worker != nullptr) {
679     /* remove from inactive list */
680     watcher->next->prev = watcher->prev;
681     watcher->prev->next = watcher->next;
682   }
683   if (got_read) {
684     if (set_ready_locked(fd, &fd->read_closure)) {
685       kick = 1;
686     }
687   }
688   if (got_write) {
689     if (set_ready_locked(fd, &fd->write_closure)) {
690       kick = 1;
691     }
692   }
693   if (kick) {
694     maybe_wake_one_watcher_locked(fd);
695   }
696   if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
697     close_fd_locked(fd);
698   }
699   gpr_mu_unlock(&fd->mu);
700
701   GRPC_FD_UNREF(fd, "poll");
702 }
703
704 /*******************************************************************************
705  * pollset_posix.c
706  */
707
708 static GPR_THREAD_LOCAL(grpc_pollset*) g_current_thread_poller;
709 static GPR_THREAD_LOCAL(grpc_pollset_worker*) g_current_thread_worker;
710
711 static void remove_worker(grpc_pollset* /*p*/, grpc_pollset_worker* worker) {
712   worker->prev->next = worker->next;
713   worker->next->prev = worker->prev;
714 }
715
716 static bool pollset_has_workers(grpc_pollset* p) {
717   return p->root_worker.next != &p->root_worker;
718 }
719
720 static bool pollset_in_pollset_sets(grpc_pollset* p) {
721   return p->pollset_set_count;
722 }
723
724 static bool pollset_has_observers(grpc_pollset* p) {
725   return pollset_has_workers(p) || pollset_in_pollset_sets(p);
726 }
727
728 static grpc_pollset_worker* pop_front_worker(grpc_pollset* p) {
729   if (pollset_has_workers(p)) {
730     grpc_pollset_worker* w = p->root_worker.next;
731     remove_worker(p, w);
732     return w;
733   } else {
734     return nullptr;
735   }
736 }
737
738 static void push_back_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
739   worker->next = &p->root_worker;
740   worker->prev = worker->next->prev;
741   worker->prev->next = worker->next->prev = worker;
742 }
743
744 static void push_front_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
745   worker->prev = &p->root_worker;
746   worker->next = worker->prev->next;
747   worker->prev->next = worker->next->prev = worker;
748 }
749
750 static void kick_append_error(grpc_error_handle* composite,
751                               grpc_error_handle error) {
752   if (error == GRPC_ERROR_NONE) return;
753   if (*composite == GRPC_ERROR_NONE) {
754     *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Kick Failure");
755   }
756   *composite = grpc_error_add_child(*composite, error);
757 }
758
759 static grpc_error_handle pollset_kick_ext(grpc_pollset* p,
760                                           grpc_pollset_worker* specific_worker,
761                                           uint32_t flags) {
762   GPR_TIMER_SCOPE("pollset_kick_ext", 0);
763   grpc_error_handle error = GRPC_ERROR_NONE;
764   GRPC_STATS_INC_POLLSET_KICK();
765
766   /* pollset->mu already held */
767   if (specific_worker != nullptr) {
768     if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
769       GPR_TIMER_SCOPE("pollset_kick_ext.broadcast", 0);
770       GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
771       for (specific_worker = p->root_worker.next;
772            specific_worker != &p->root_worker;
773            specific_worker = specific_worker->next) {
774         kick_append_error(
775             &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
776       }
777       p->kicked_without_pollers = true;
778     } else if (g_current_thread_worker != specific_worker) {
779       GPR_TIMER_MARK("different_thread_worker", 0);
780       if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
781         specific_worker->reevaluate_polling_on_wakeup = true;
782       }
783       specific_worker->kicked_specifically = true;
784       kick_append_error(&error,
785                         grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
786     } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
787       GPR_TIMER_MARK("kick_yoself", 0);
788       if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
789         specific_worker->reevaluate_polling_on_wakeup = true;
790       }
791       specific_worker->kicked_specifically = true;
792       kick_append_error(&error,
793                         grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
794     }
795   } else if (g_current_thread_poller != p) {
796     GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
797     GPR_TIMER_MARK("kick_anonymous", 0);
798     specific_worker = pop_front_worker(p);
799     if (specific_worker != nullptr) {
800       if (g_current_thread_worker == specific_worker) {
801         GPR_TIMER_MARK("kick_anonymous_not_self", 0);
802         push_back_worker(p, specific_worker);
803         specific_worker = pop_front_worker(p);
804         if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
805             g_current_thread_worker == specific_worker) {
806           push_back_worker(p, specific_worker);
807           specific_worker = nullptr;
808         }
809       }
810       if (specific_worker != nullptr) {
811         GPR_TIMER_MARK("finally_kick", 0);
812         push_back_worker(p, specific_worker);
813         kick_append_error(
814             &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
815       }
816     } else {
817       GPR_TIMER_MARK("kicked_no_pollers", 0);
818       p->kicked_without_pollers = true;
819     }
820   }
821
822   GRPC_LOG_IF_ERROR("pollset_kick_ext", GRPC_ERROR_REF(error));
823   return error;
824 }
825
826 static grpc_error_handle pollset_kick(grpc_pollset* p,
827                                       grpc_pollset_worker* specific_worker) {
828   return pollset_kick_ext(p, specific_worker, 0);
829 }
830
831 /* global state management */
832
833 static grpc_error_handle pollset_global_init(void) { return GRPC_ERROR_NONE; }
834
835 static void pollset_global_shutdown(void) {}
836
837 /* main interface */
838
839 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
840   gpr_mu_init(&pollset->mu);
841   *mu = &pollset->mu;
842   pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
843   pollset->shutting_down = 0;
844   pollset->called_shutdown = 0;
845   pollset->kicked_without_pollers = 0;
846   pollset->local_wakeup_cache = nullptr;
847   pollset->kicked_without_pollers = 0;
848   pollset->fd_count = 0;
849   pollset->fd_capacity = 0;
850   pollset->fds = nullptr;
851   pollset->pollset_set_count = 0;
852 }
853
854 static void pollset_destroy(grpc_pollset* pollset) {
855   GPR_ASSERT(!pollset_has_workers(pollset));
856   while (pollset->local_wakeup_cache) {
857     grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
858     fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list);
859     grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
860     gpr_free(pollset->local_wakeup_cache);
861     pollset->local_wakeup_cache = next;
862   }
863   gpr_free(pollset->fds);
864   gpr_mu_destroy(&pollset->mu);
865 }
866
867 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
868   gpr_mu_lock(&pollset->mu);
869   size_t i;
870   /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
871   for (i = 0; i < pollset->fd_count; i++) {
872     if (pollset->fds[i] == fd) goto exit;
873   }
874   if (pollset->fd_count == pollset->fd_capacity) {
875     pollset->fd_capacity =
876         GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
877     pollset->fds = static_cast<grpc_fd**>(
878         gpr_realloc(pollset->fds, sizeof(grpc_fd*) * pollset->fd_capacity));
879   }
880   pollset->fds[pollset->fd_count++] = fd;
881   GRPC_FD_REF(fd, "multipoller");
882   pollset_kick(pollset, nullptr);
883 exit:
884   gpr_mu_unlock(&pollset->mu);
885 }
886
887 static void finish_shutdown(grpc_pollset* pollset) {
888   size_t i;
889   for (i = 0; i < pollset->fd_count; i++) {
890     GRPC_FD_UNREF(pollset->fds[i], "multipoller");
891   }
892   pollset->fd_count = 0;
893   grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_done,
894                           GRPC_ERROR_NONE);
895 }
896
897 static void work_combine_error(grpc_error_handle* composite,
898                                grpc_error_handle error) {
899   if (error == GRPC_ERROR_NONE) return;
900   if (*composite == GRPC_ERROR_NONE) {
901     *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("pollset_work");
902   }
903   *composite = grpc_error_add_child(*composite, error);
904 }
905
906 static grpc_error_handle pollset_work(grpc_pollset* pollset,
907                                       grpc_pollset_worker** worker_hdl,
908                                       grpc_millis deadline) {
909   GPR_TIMER_SCOPE("pollset_work", 0);
910   grpc_pollset_worker worker;
911   if (worker_hdl) *worker_hdl = &worker;
912   grpc_error_handle error = GRPC_ERROR_NONE;
913
914   /* Avoid malloc for small number of elements. */
915   enum { inline_elements = 96 };
916   struct pollfd pollfd_space[inline_elements];
917   struct grpc_fd_watcher watcher_space[inline_elements];
918
919   /* pollset->mu already held */
920   int added_worker = 0;
921   int locked = 1;
922   int queued_work = 0;
923   int keep_polling = 0;
924   /* this must happen before we (potentially) drop pollset->mu */
925   worker.next = worker.prev = nullptr;
926   worker.reevaluate_polling_on_wakeup = 0;
927   if (pollset->local_wakeup_cache != nullptr) {
928     worker.wakeup_fd = pollset->local_wakeup_cache;
929     pollset->local_wakeup_cache = worker.wakeup_fd->next;
930   } else {
931     worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
932         gpr_malloc(sizeof(*worker.wakeup_fd)));
933     error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
934     fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
935     if (error != GRPC_ERROR_NONE) {
936       GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
937       return error;
938     }
939   }
940   worker.kicked_specifically = 0;
941   /* If we're shutting down then we don't execute any extended work */
942   if (pollset->shutting_down) {
943     GPR_TIMER_MARK("pollset_work.shutting_down", 0);
944     goto done;
945   }
946   /* Start polling, and keep doing so while we're being asked to
947      re-evaluate our pollers (this allows poll() based pollers to
948      ensure they don't miss wakeups) */
949   keep_polling = 1;
950   g_current_thread_poller = pollset;
951   while (keep_polling) {
952     keep_polling = 0;
953     if (!pollset->kicked_without_pollers ||
954         deadline <= grpc_core::ExecCtx::Get()->Now()) {
955       if (!added_worker) {
956         push_front_worker(pollset, &worker);
957         added_worker = 1;
958         g_current_thread_worker = &worker;
959       }
960       GPR_TIMER_SCOPE("maybe_work_and_unlock", 0);
961 #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
962 #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
963
964       int timeout;
965       int r;
966       size_t i, fd_count;
967       nfds_t pfd_count;
968       grpc_fd_watcher* watchers;
969       struct pollfd* pfds;
970
971       timeout = poll_deadline_to_millis_timeout(deadline);
972
973       if (pollset->fd_count + 2 <= inline_elements) {
974         pfds = pollfd_space;
975         watchers = watcher_space;
976       } else {
977         /* Allocate one buffer to hold both pfds and watchers arrays */
978         const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
979         const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
980         void* buf = gpr_malloc(pfd_size + watch_size);
981         pfds = static_cast<struct pollfd*>(buf);
982         watchers = static_cast<grpc_fd_watcher*>(
983             static_cast<void*>((static_cast<char*>(buf) + pfd_size)));
984       }
985
986       fd_count = 0;
987       pfd_count = 1;
988       pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
989       pfds[0].events = POLLIN;
990       pfds[0].revents = 0;
991       for (i = 0; i < pollset->fd_count; i++) {
992         if (fd_is_orphaned(pollset->fds[i]) ||
993             gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
994           GRPC_FD_UNREF(pollset->fds[i], "multipoller");
995         } else {
996           pollset->fds[fd_count++] = pollset->fds[i];
997           watchers[pfd_count].fd = pollset->fds[i];
998           GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
999           pfds[pfd_count].fd = pollset->fds[i]->fd;
1000           pfds[pfd_count].revents = 0;
1001           pfd_count++;
1002         }
1003       }
1004       pollset->fd_count = fd_count;
1005       gpr_mu_unlock(&pollset->mu);
1006
1007       for (i = 1; i < pfd_count; i++) {
1008         grpc_fd* fd = watchers[i].fd;
1009         pfds[i].events = static_cast<short>(
1010             fd_begin_poll(fd, pollset, &worker, POLLIN, POLLOUT, &watchers[i]));
1011         GRPC_FD_UNREF(fd, "multipoller_start");
1012       }
1013
1014       /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1015          even going into the blocking annotation if possible */
1016       GRPC_SCHEDULING_START_BLOCKING_REGION;
1017       GRPC_STATS_INC_SYSCALL_POLL();
1018       r = grpc_poll_function(pfds, pfd_count, timeout);
1019       GRPC_SCHEDULING_END_BLOCKING_REGION;
1020
1021       if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1022         gpr_log(GPR_INFO, "%p poll=%d", pollset, r);
1023       }
1024
1025       if (r < 0) {
1026         if (errno != EINTR) {
1027           work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
1028         }
1029
1030         for (i = 1; i < pfd_count; i++) {
1031           if (watchers[i].fd == nullptr) {
1032             fd_end_poll(&watchers[i], 0, 0);
1033           } else {
1034             // Wake up all the file descriptors, if we have an invalid one
1035             // we can identify it on the next pollset_work()
1036             fd_end_poll(&watchers[i], 1, 1);
1037           }
1038         }
1039       } else if (r == 0) {
1040         for (i = 1; i < pfd_count; i++) {
1041           fd_end_poll(&watchers[i], 0, 0);
1042         }
1043       } else {
1044         if (pfds[0].revents & POLLIN_CHECK) {
1045           if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1046             gpr_log(GPR_INFO, "%p: got_wakeup", pollset);
1047           }
1048           work_combine_error(
1049               &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
1050         }
1051         for (i = 1; i < pfd_count; i++) {
1052           if (watchers[i].fd == nullptr) {
1053             fd_end_poll(&watchers[i], 0, 0);
1054           } else {
1055             if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1056               gpr_log(GPR_INFO, "%p got_event: %d r:%d w:%d [%d]", pollset,
1057                       pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
1058                       (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
1059             }
1060             /* This is a mitigation to prevent poll() from spinning on a
1061              ** POLLHUP https://github.com/grpc/grpc/pull/13665
1062              */
1063             if (pfds[i].revents & POLLHUP) {
1064               gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
1065             }
1066             fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
1067                         pfds[i].revents & POLLOUT_CHECK);
1068           }
1069         }
1070       }
1071
1072       if (pfds != pollfd_space) {
1073         /* pfds and watchers are in the same memory block pointed to by pfds */
1074         gpr_free(pfds);
1075       }
1076
1077       locked = 0;
1078     } else {
1079       GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1080       pollset->kicked_without_pollers = 0;
1081     }
1082   /* Finished execution - start cleaning up.
1083      Note that we may arrive here from outside the enclosing while() loop.
1084      In that case we won't loop though as we haven't added worker to the
1085      worker list, which means nobody could ask us to re-evaluate polling). */
1086   done:
1087     if (!locked) {
1088       queued_work |= grpc_core::ExecCtx::Get()->Flush();
1089       gpr_mu_lock(&pollset->mu);
1090       locked = 1;
1091     }
1092     /* If we're forced to re-evaluate polling (via pollset_kick with
1093        GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
1094        a loop */
1095     if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) {
1096       worker.reevaluate_polling_on_wakeup = 0;
1097       pollset->kicked_without_pollers = 0;
1098       if (queued_work || worker.kicked_specifically) {
1099         /* If there's queued work on the list, then set the deadline to be
1100            immediate so we get back out of the polling loop quickly */
1101         deadline = 0;
1102       }
1103       keep_polling = 1;
1104     }
1105   }
1106   g_current_thread_poller = nullptr;
1107   if (added_worker) {
1108     remove_worker(pollset, &worker);
1109     g_current_thread_worker = nullptr;
1110   }
1111   /* release wakeup fd to the local pool */
1112   worker.wakeup_fd->next = pollset->local_wakeup_cache;
1113   pollset->local_wakeup_cache = worker.wakeup_fd;
1114   /* check shutdown conditions */
1115   if (pollset->shutting_down) {
1116     if (pollset_has_workers(pollset)) {
1117       pollset_kick(pollset, nullptr);
1118     } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1119       pollset->called_shutdown = 1;
1120       gpr_mu_unlock(&pollset->mu);
1121       finish_shutdown(pollset);
1122       grpc_core::ExecCtx::Get()->Flush();
1123       /* Continuing to access pollset here is safe -- it is the caller's
1124        * responsibility to not destroy when it has outstanding calls to
1125        * pollset_work.
1126        * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
1127       gpr_mu_lock(&pollset->mu);
1128     }
1129   }
1130   if (worker_hdl) *worker_hdl = nullptr;
1131   GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1132   return error;
1133 }
1134
1135 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
1136   GPR_ASSERT(!pollset->shutting_down);
1137   pollset->shutting_down = 1;
1138   pollset->shutdown_done = closure;
1139   pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1140   if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1141     pollset->called_shutdown = 1;
1142     finish_shutdown(pollset);
1143   }
1144 }
1145
1146 static int poll_deadline_to_millis_timeout(grpc_millis deadline) {
1147   if (deadline == GRPC_MILLIS_INF_FUTURE) return -1;
1148   if (deadline == 0) return 0;
1149   grpc_millis n = deadline - grpc_core::ExecCtx::Get()->Now();
1150   if (n < 0) return 0;
1151   if (n > INT_MAX) return -1;
1152   return static_cast<int>(n);
1153 }
1154
1155 /*******************************************************************************
1156  * pollset_set_posix.c
1157  */
1158
1159 static grpc_pollset_set* pollset_set_create(void) {
1160   grpc_pollset_set* pollset_set =
1161       static_cast<grpc_pollset_set*>(gpr_zalloc(sizeof(*pollset_set)));
1162   gpr_mu_init(&pollset_set->mu);
1163   return pollset_set;
1164 }
1165
1166 static void pollset_set_destroy(grpc_pollset_set* pollset_set) {
1167   size_t i;
1168   gpr_mu_destroy(&pollset_set->mu);
1169   for (i = 0; i < pollset_set->fd_count; i++) {
1170     GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1171   }
1172   for (i = 0; i < pollset_set->pollset_count; i++) {
1173     grpc_pollset* pollset = pollset_set->pollsets[i];
1174     gpr_mu_lock(&pollset->mu);
1175     pollset->pollset_set_count--;
1176     /* check shutdown */
1177     if (pollset->shutting_down && !pollset->called_shutdown &&
1178         !pollset_has_observers(pollset)) {
1179       pollset->called_shutdown = 1;
1180       gpr_mu_unlock(&pollset->mu);
1181       finish_shutdown(pollset);
1182     } else {
1183       gpr_mu_unlock(&pollset->mu);
1184     }
1185   }
1186   gpr_free(pollset_set->pollsets);
1187   gpr_free(pollset_set->pollset_sets);
1188   gpr_free(pollset_set->fds);
1189   gpr_free(pollset_set);
1190 }
1191
1192 static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
1193                                     grpc_pollset* pollset) {
1194   size_t i, j;
1195   gpr_mu_lock(&pollset->mu);
1196   pollset->pollset_set_count++;
1197   gpr_mu_unlock(&pollset->mu);
1198   gpr_mu_lock(&pollset_set->mu);
1199   if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1200     pollset_set->pollset_capacity =
1201         GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1202     pollset_set->pollsets = static_cast<grpc_pollset**>(gpr_realloc(
1203         pollset_set->pollsets,
1204         pollset_set->pollset_capacity * sizeof(*pollset_set->pollsets)));
1205   }
1206   pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1207   for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1208     if (fd_is_orphaned(pollset_set->fds[i])) {
1209       GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1210     } else {
1211       pollset_add_fd(pollset, pollset_set->fds[i]);
1212       pollset_set->fds[j++] = pollset_set->fds[i];
1213     }
1214   }
1215   pollset_set->fd_count = j;
1216   gpr_mu_unlock(&pollset_set->mu);
1217 }
1218
1219 static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
1220                                     grpc_pollset* pollset) {
1221   size_t i;
1222   gpr_mu_lock(&pollset_set->mu);
1223   for (i = 0; i < pollset_set->pollset_count; i++) {
1224     if (pollset_set->pollsets[i] == pollset) {
1225       pollset_set->pollset_count--;
1226       GPR_SWAP(grpc_pollset*, pollset_set->pollsets[i],
1227                pollset_set->pollsets[pollset_set->pollset_count]);
1228       break;
1229     }
1230   }
1231   gpr_mu_unlock(&pollset_set->mu);
1232   gpr_mu_lock(&pollset->mu);
1233   pollset->pollset_set_count--;
1234   /* check shutdown */
1235   if (pollset->shutting_down && !pollset->called_shutdown &&
1236       !pollset_has_observers(pollset)) {
1237     pollset->called_shutdown = 1;
1238     gpr_mu_unlock(&pollset->mu);
1239     finish_shutdown(pollset);
1240   } else {
1241     gpr_mu_unlock(&pollset->mu);
1242   }
1243 }
1244
1245 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1246                                         grpc_pollset_set* item) {
1247   size_t i, j;
1248   gpr_mu_lock(&bag->mu);
1249   if (bag->pollset_set_count == bag->pollset_set_capacity) {
1250     bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1251     bag->pollset_sets = static_cast<grpc_pollset_set**>(
1252         gpr_realloc(bag->pollset_sets,
1253                     bag->pollset_set_capacity * sizeof(*bag->pollset_sets)));
1254   }
1255   bag->pollset_sets[bag->pollset_set_count++] = item;
1256   for (i = 0, j = 0; i < bag->fd_count; i++) {
1257     if (fd_is_orphaned(bag->fds[i])) {
1258       GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1259     } else {
1260       pollset_set_add_fd(item, bag->fds[i]);
1261       bag->fds[j++] = bag->fds[i];
1262     }
1263   }
1264   bag->fd_count = j;
1265   gpr_mu_unlock(&bag->mu);
1266 }
1267
1268 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1269                                         grpc_pollset_set* item) {
1270   size_t i;
1271   gpr_mu_lock(&bag->mu);
1272   for (i = 0; i < bag->pollset_set_count; i++) {
1273     if (bag->pollset_sets[i] == item) {
1274       bag->pollset_set_count--;
1275       GPR_SWAP(grpc_pollset_set*, bag->pollset_sets[i],
1276                bag->pollset_sets[bag->pollset_set_count]);
1277       break;
1278     }
1279   }
1280   gpr_mu_unlock(&bag->mu);
1281 }
1282
1283 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1284   size_t i;
1285   gpr_mu_lock(&pollset_set->mu);
1286   if (pollset_set->fd_count == pollset_set->fd_capacity) {
1287     pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1288     pollset_set->fds = static_cast<grpc_fd**>(
1289         gpr_realloc(pollset_set->fds,
1290                     pollset_set->fd_capacity * sizeof(*pollset_set->fds)));
1291   }
1292   GRPC_FD_REF(fd, "pollset_set");
1293   pollset_set->fds[pollset_set->fd_count++] = fd;
1294   for (i = 0; i < pollset_set->pollset_count; i++) {
1295     pollset_add_fd(pollset_set->pollsets[i], fd);
1296   }
1297   for (i = 0; i < pollset_set->pollset_set_count; i++) {
1298     pollset_set_add_fd(pollset_set->pollset_sets[i], fd);
1299   }
1300   gpr_mu_unlock(&pollset_set->mu);
1301 }
1302
1303 static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1304   size_t i;
1305   gpr_mu_lock(&pollset_set->mu);
1306   for (i = 0; i < pollset_set->fd_count; i++) {
1307     if (pollset_set->fds[i] == fd) {
1308       pollset_set->fd_count--;
1309       GPR_SWAP(grpc_fd*, pollset_set->fds[i],
1310                pollset_set->fds[pollset_set->fd_count]);
1311       GRPC_FD_UNREF(fd, "pollset_set");
1312       break;
1313     }
1314   }
1315   for (i = 0; i < pollset_set->pollset_set_count; i++) {
1316     pollset_set_del_fd(pollset_set->pollset_sets[i], fd);
1317   }
1318   gpr_mu_unlock(&pollset_set->mu);
1319 }
1320
1321 /*******************************************************************************
1322  * event engine binding
1323  */
1324
1325 static bool is_any_background_poller_thread(void) { return false; }
1326
1327 static void shutdown_background_closure(void) {}
1328
1329 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1330                                              grpc_error_handle /*error*/) {
1331   return false;
1332 }
1333
1334 static void shutdown_engine(void) {
1335   pollset_global_shutdown();
1336   if (track_fds_for_fork) {
1337     gpr_mu_destroy(&fork_fd_list_mu);
1338     grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1339   }
1340 }
1341
1342 static const grpc_event_engine_vtable vtable = {
1343     sizeof(grpc_pollset),
1344     false,
1345     false,
1346
1347     fd_create,
1348     fd_wrapped_fd,
1349     fd_orphan,
1350     fd_shutdown,
1351     fd_notify_on_read,
1352     fd_notify_on_write,
1353     fd_notify_on_error,
1354     fd_set_readable,
1355     fd_set_writable,
1356     fd_set_error,
1357     fd_is_shutdown,
1358
1359     pollset_init,
1360     pollset_shutdown,
1361     pollset_destroy,
1362     pollset_work,
1363     pollset_kick,
1364     pollset_add_fd,
1365
1366     pollset_set_create,
1367     pollset_set_destroy,
1368     pollset_set_add_pollset,
1369     pollset_set_del_pollset,
1370     pollset_set_add_pollset_set,
1371     pollset_set_del_pollset_set,
1372     pollset_set_add_fd,
1373     pollset_set_del_fd,
1374
1375     is_any_background_poller_thread,
1376     shutdown_background_closure,
1377     shutdown_engine,
1378     add_closure_to_background_poller,
1379 };
1380
1381 /* Called by the child process's post-fork handler to close open fds, including
1382  * worker wakeup fds. This allows gRPC to shutdown in the child process without
1383  * interfering with connections or RPCs ongoing in the parent. */
1384 static void reset_event_manager_on_fork() {
1385   gpr_mu_lock(&fork_fd_list_mu);
1386   while (fork_fd_list_head != nullptr) {
1387     if (fork_fd_list_head->fd != nullptr) {
1388       if (!fork_fd_list_head->fd->closed) {
1389         close(fork_fd_list_head->fd->fd);
1390       }
1391       fork_fd_list_head->fd->fd = -1;
1392     } else {
1393       close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
1394       fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
1395       close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
1396       fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
1397     }
1398     fork_fd_list_head = fork_fd_list_head->next;
1399   }
1400   gpr_mu_unlock(&fork_fd_list_mu);
1401 }
1402
1403 const grpc_event_engine_vtable* grpc_init_poll_posix(
1404     bool /*explicit_request*/) {
1405   if (!grpc_has_wakeup_fd()) {
1406     gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
1407     return nullptr;
1408   }
1409   if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1410     return nullptr;
1411   }
1412   if (grpc_core::Fork::Enabled()) {
1413     track_fds_for_fork = true;
1414     gpr_mu_init(&fork_fd_list_mu);
1415     grpc_core::Fork::SetResetChildPollingEngineFunc(
1416         reset_event_manager_on_fork);
1417   }
1418   return &vtable;
1419 }
1420
1421 #endif /* GRPC_POSIX_SOCKET_EV_POLL */