Imported Upstream version 1.22.0
[platform/upstream/grpc.git] / src / core / lib / surface / completion_queue.cc
1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 #include <grpc/support/port_platform.h>
19
20 #include "src/core/lib/surface/completion_queue.h"
21
22 #include <inttypes.h>
23 #include <stdio.h>
24 #include <string.h>
25
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/atm.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/string_util.h>
30 #include <grpc/support/time.h>
31
32 #include "src/core/lib/debug/stats.h"
33 #include "src/core/lib/gpr/spinlock.h"
34 #include "src/core/lib/gpr/string.h"
35 #include "src/core/lib/gpr/tls.h"
36 #include "src/core/lib/gprpp/atomic.h"
37 #include "src/core/lib/iomgr/executor.h"
38 #include "src/core/lib/iomgr/pollset.h"
39 #include "src/core/lib/iomgr/timer.h"
40 #include "src/core/lib/profiling/timers.h"
41 #include "src/core/lib/surface/api_trace.h"
42 #include "src/core/lib/surface/call.h"
43 #include "src/core/lib/surface/event_string.h"
44
45 grpc_core::TraceFlag grpc_trace_operation_failures(false, "op_failure");
46 grpc_core::DebugOnlyTraceFlag grpc_trace_pending_tags(false, "pending_tags");
47 grpc_core::DebugOnlyTraceFlag grpc_trace_cq_refcount(false, "cq_refcount");
48
49 namespace {
50
51 // Specifies a cq thread local cache.
52 // The first event that occurs on a thread
53 // with a cq cache will go into that cache, and
54 // will only be returned on the thread that initialized the cache.
55 // NOTE: Only one event will ever be cached.
56 GPR_TLS_DECL(g_cached_event);
57 GPR_TLS_DECL(g_cached_cq);
58
59 typedef struct {
60   grpc_pollset_worker** worker;
61   void* tag;
62 } plucker;
63
64 typedef struct {
65   bool can_get_pollset;
66   bool can_listen;
67   size_t (*size)(void);
68   void (*init)(grpc_pollset* pollset, gpr_mu** mu);
69   grpc_error* (*kick)(grpc_pollset* pollset,
70                       grpc_pollset_worker* specific_worker);
71   grpc_error* (*work)(grpc_pollset* pollset, grpc_pollset_worker** worker,
72                       grpc_millis deadline);
73   void (*shutdown)(grpc_pollset* pollset, grpc_closure* closure);
74   void (*destroy)(grpc_pollset* pollset);
75 } cq_poller_vtable;
76
77 typedef struct non_polling_worker {
78   gpr_cv cv;
79   bool kicked;
80   struct non_polling_worker* next;
81   struct non_polling_worker* prev;
82 } non_polling_worker;
83
84 typedef struct {
85   gpr_mu mu;
86   bool kicked_without_poller;
87   non_polling_worker* root;
88   grpc_closure* shutdown;
89 } non_polling_poller;
90
91 size_t non_polling_poller_size(void) { return sizeof(non_polling_poller); }
92
93 void non_polling_poller_init(grpc_pollset* pollset, gpr_mu** mu) {
94   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
95   gpr_mu_init(&npp->mu);
96   *mu = &npp->mu;
97 }
98
99 void non_polling_poller_destroy(grpc_pollset* pollset) {
100   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
101   gpr_mu_destroy(&npp->mu);
102 }
103
104 grpc_error* non_polling_poller_work(grpc_pollset* pollset,
105                                     grpc_pollset_worker** worker,
106                                     grpc_millis deadline) {
107   non_polling_poller* npp = reinterpret_cast<non_polling_poller*>(pollset);
108   if (npp->shutdown) return GRPC_ERROR_NONE;
109   if (npp->kicked_without_poller) {
110     npp->kicked_without_poller = false;
111     return GRPC_ERROR_NONE;
112   }
113   non_polling_worker w;
114   gpr_cv_init(&w.cv);
115   if (worker != nullptr) *worker = reinterpret_cast<grpc_pollset_worker*>(&w);
116   if (npp->root == nullptr) {
117     npp->root = w.next = w.prev = &w;
118   } else {
119     w.next = npp->root;
120     w.prev = w.next->prev;
121     w.next->prev = w.prev->next = &w;
122   }
123   w.kicked = false;
124   gpr_timespec deadline_ts =
125       grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC);
126   while (!npp->shutdown && !w.kicked &&
127          !gpr_cv_wait(&w.cv, &npp->mu, deadline_ts))
128     ;
129   grpc_core::ExecCtx::Get()->InvalidateNow();
130   if (&w == npp->root) {
131     npp->root = w.next;
132     if (&w == npp->root) {
133       if (npp->shutdown) {
134         GRPC_CLOSURE_SCHED(npp->shutdown, GRPC_ERROR_NONE);
135       }
136       npp->root = nullptr;
137     }
138   }
139   w.next->prev = w.prev;
140   w.prev->next = w.next;
141   gpr_cv_destroy(&w.cv);
142   if (worker != nullptr) *worker = nullptr;
143   return GRPC_ERROR_NONE;
144 }
145
146 grpc_error* non_polling_poller_kick(grpc_pollset* pollset,
147                                     grpc_pollset_worker* specific_worker) {
148   non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
149   if (specific_worker == nullptr)
150     specific_worker = reinterpret_cast<grpc_pollset_worker*>(p->root);
151   if (specific_worker != nullptr) {
152     non_polling_worker* w =
153         reinterpret_cast<non_polling_worker*>(specific_worker);
154     if (!w->kicked) {
155       w->kicked = true;
156       gpr_cv_signal(&w->cv);
157     }
158   } else {
159     p->kicked_without_poller = true;
160   }
161   return GRPC_ERROR_NONE;
162 }
163
164 void non_polling_poller_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
165   non_polling_poller* p = reinterpret_cast<non_polling_poller*>(pollset);
166   GPR_ASSERT(closure != nullptr);
167   p->shutdown = closure;
168   if (p->root == nullptr) {
169     GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
170   } else {
171     non_polling_worker* w = p->root;
172     do {
173       gpr_cv_signal(&w->cv);
174       w = w->next;
175     } while (w != p->root);
176   }
177 }
178
179 const cq_poller_vtable g_poller_vtable_by_poller_type[] = {
180     /* GRPC_CQ_DEFAULT_POLLING */
181     {true, true, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
182      grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
183     /* GRPC_CQ_NON_LISTENING */
184     {true, false, grpc_pollset_size, grpc_pollset_init, grpc_pollset_kick,
185      grpc_pollset_work, grpc_pollset_shutdown, grpc_pollset_destroy},
186     /* GRPC_CQ_NON_POLLING */
187     {false, false, non_polling_poller_size, non_polling_poller_init,
188      non_polling_poller_kick, non_polling_poller_work,
189      non_polling_poller_shutdown, non_polling_poller_destroy},
190 };
191
192 }  // namespace
193
194 struct cq_vtable {
195   grpc_cq_completion_type cq_completion_type;
196   size_t data_size;
197   void (*init)(void* data,
198                grpc_experimental_completion_queue_functor* shutdown_callback);
199   void (*shutdown)(grpc_completion_queue* cq);
200   void (*destroy)(void* data);
201   bool (*begin_op)(grpc_completion_queue* cq, void* tag);
202   void (*end_op)(grpc_completion_queue* cq, void* tag, grpc_error* error,
203                  void (*done)(void* done_arg, grpc_cq_completion* storage),
204                  void* done_arg, grpc_cq_completion* storage, bool internal);
205   grpc_event (*next)(grpc_completion_queue* cq, gpr_timespec deadline,
206                      void* reserved);
207   grpc_event (*pluck)(grpc_completion_queue* cq, void* tag,
208                       gpr_timespec deadline, void* reserved);
209 };
210
211 namespace {
212
213 /* Queue that holds the cq_completion_events. Internally uses gpr_mpscq queue
214  * (a lockfree multiproducer single consumer queue). It uses a queue_lock
215  * to support multiple consumers.
216  * Only used in completion queues whose completion_type is GRPC_CQ_NEXT */
217 class CqEventQueue {
218  public:
219   CqEventQueue() { gpr_mpscq_init(&queue_); }
220   ~CqEventQueue() { gpr_mpscq_destroy(&queue_); }
221
222   /* Note: The counter is not incremented/decremented atomically with push/pop.
223    * The count is only eventually consistent */
224   intptr_t num_items() const {
225     return num_queue_items_.Load(grpc_core::MemoryOrder::RELAXED);
226   }
227
228   bool Push(grpc_cq_completion* c);
229   grpc_cq_completion* Pop();
230
231  private:
232   /* Spinlock to serialize consumers i.e pop() operations */
233   gpr_spinlock queue_lock_ = GPR_SPINLOCK_INITIALIZER;
234
235   gpr_mpscq queue_;
236
237   /* A lazy counter of number of items in the queue. This is NOT atomically
238      incremented/decremented along with push/pop operations and hence is only
239      eventually consistent */
240   grpc_core::Atomic<intptr_t> num_queue_items_{0};
241 };
242
243 struct cq_next_data {
244   ~cq_next_data() { GPR_ASSERT(queue.num_items() == 0); }
245
246   /** Completed events for completion-queues of type GRPC_CQ_NEXT */
247   CqEventQueue queue;
248
249   /** Counter of how many things have ever been queued on this completion queue
250       useful for avoiding locks to check the queue */
251   grpc_core::Atomic<intptr_t> things_queued_ever{0};
252
253   /** Number of outstanding events (+1 if not shut down)
254       Initial count is dropped by grpc_completion_queue_shutdown */
255   grpc_core::Atomic<intptr_t> pending_events{1};
256
257   /** 0 initially. 1 once we initiated shutdown */
258   bool shutdown_called = false;
259 };
260
261 struct cq_pluck_data {
262   cq_pluck_data() {
263     completed_tail = &completed_head;
264     completed_head.next = reinterpret_cast<uintptr_t>(completed_tail);
265   }
266
267   ~cq_pluck_data() {
268     GPR_ASSERT(completed_head.next ==
269                reinterpret_cast<uintptr_t>(&completed_head));
270   }
271
272   /** Completed events for completion-queues of type GRPC_CQ_PLUCK */
273   grpc_cq_completion completed_head;
274   grpc_cq_completion* completed_tail;
275
276   /** Number of pending events (+1 if we're not shutdown).
277       Initial count is dropped by grpc_completion_queue_shutdown. */
278   grpc_core::Atomic<intptr_t> pending_events{1};
279
280   /** Counter of how many things have ever been queued on this completion queue
281       useful for avoiding locks to check the queue */
282   grpc_core::Atomic<intptr_t> things_queued_ever{0};
283
284   /** 0 initially. 1 once we completed shutting */
285   /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
286    * (pending_events == 0). So consider removing this in future and use
287    * pending_events */
288   grpc_core::Atomic<bool> shutdown{false};
289
290   /** 0 initially. 1 once we initiated shutdown */
291   bool shutdown_called = false;
292
293   int num_pluckers = 0;
294   plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
295 };
296
297 struct cq_callback_data {
298   cq_callback_data(
299       grpc_experimental_completion_queue_functor* shutdown_callback)
300       : shutdown_callback(shutdown_callback) {}
301   /** No actual completed events queue, unlike other types */
302
303   /** Number of pending events (+1 if we're not shutdown).
304       Initial count is dropped by grpc_completion_queue_shutdown. */
305   grpc_core::Atomic<intptr_t> pending_events{1};
306
307   /** Counter of how many things have ever been queued on this completion queue
308       useful for avoiding locks to check the queue */
309   grpc_core::Atomic<intptr_t> things_queued_ever{0};
310
311   /** 0 initially. 1 once we initiated shutdown */
312   bool shutdown_called = false;
313
314   /** A callback that gets invoked when the CQ completes shutdown */
315   grpc_experimental_completion_queue_functor* shutdown_callback;
316 };
317
318 }  // namespace
319
320 /* Completion queue structure */
321 struct grpc_completion_queue {
322   /** Once owning_refs drops to zero, we will destroy the cq */
323   gpr_refcount owning_refs;
324
325   gpr_mu* mu;
326
327   const cq_vtable* vtable;
328   const cq_poller_vtable* poller_vtable;
329
330 #ifndef NDEBUG
331   void** outstanding_tags;
332   size_t outstanding_tag_count;
333   size_t outstanding_tag_capacity;
334 #endif
335
336   grpc_closure pollset_shutdown_done;
337   int num_polls;
338 };
339
340 /* Forward declarations */
341 static void cq_finish_shutdown_next(grpc_completion_queue* cq);
342 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq);
343 static void cq_finish_shutdown_callback(grpc_completion_queue* cq);
344 static void cq_shutdown_next(grpc_completion_queue* cq);
345 static void cq_shutdown_pluck(grpc_completion_queue* cq);
346 static void cq_shutdown_callback(grpc_completion_queue* cq);
347
348 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag);
349 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag);
350 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag);
351
352 // A cq_end_op function is called when an operation on a given CQ with
353 // a given tag has completed. The storage argument is a reference to the
354 // space reserved for this completion as it is placed into the corresponding
355 // queue. The done argument is a callback that will be invoked when it is
356 // safe to free up that storage. The storage MUST NOT be freed until the
357 // done callback is invoked.
358 static void cq_end_op_for_next(
359     grpc_completion_queue* cq, void* tag, grpc_error* error,
360     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
361     grpc_cq_completion* storage, bool internal);
362
363 static void cq_end_op_for_pluck(
364     grpc_completion_queue* cq, void* tag, grpc_error* error,
365     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
366     grpc_cq_completion* storage, bool internal);
367
368 static void cq_end_op_for_callback(
369     grpc_completion_queue* cq, void* tag, grpc_error* error,
370     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
371     grpc_cq_completion* storage, bool internal);
372
373 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
374                           void* reserved);
375
376 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
377                            gpr_timespec deadline, void* reserved);
378
379 // Note that cq_init_next and cq_init_pluck do not use the shutdown_callback
380 static void cq_init_next(
381     void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
382 static void cq_init_pluck(
383     void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
384 static void cq_init_callback(
385     void* data, grpc_experimental_completion_queue_functor* shutdown_callback);
386 static void cq_destroy_next(void* data);
387 static void cq_destroy_pluck(void* data);
388 static void cq_destroy_callback(void* data);
389
390 /* Completion queue vtables based on the completion-type */
391 static const cq_vtable g_cq_vtable[] = {
392     /* GRPC_CQ_NEXT */
393     {GRPC_CQ_NEXT, sizeof(cq_next_data), cq_init_next, cq_shutdown_next,
394      cq_destroy_next, cq_begin_op_for_next, cq_end_op_for_next, cq_next,
395      nullptr},
396     /* GRPC_CQ_PLUCK */
397     {GRPC_CQ_PLUCK, sizeof(cq_pluck_data), cq_init_pluck, cq_shutdown_pluck,
398      cq_destroy_pluck, cq_begin_op_for_pluck, cq_end_op_for_pluck, nullptr,
399      cq_pluck},
400     /* GRPC_CQ_CALLBACK */
401     {GRPC_CQ_CALLBACK, sizeof(cq_callback_data), cq_init_callback,
402      cq_shutdown_callback, cq_destroy_callback, cq_begin_op_for_callback,
403      cq_end_op_for_callback, nullptr, nullptr},
404 };
405
406 #define DATA_FROM_CQ(cq) ((void*)(cq + 1))
407 #define POLLSET_FROM_CQ(cq) \
408   ((grpc_pollset*)(cq->vtable->data_size + (char*)DATA_FROM_CQ(cq)))
409
410 grpc_core::TraceFlag grpc_cq_pluck_trace(false, "queue_pluck");
411
412 #define GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, event)      \
413   do {                                                    \
414     if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) &&        \
415         (GRPC_TRACE_FLAG_ENABLED(grpc_cq_pluck_trace) ||  \
416          (event)->type != GRPC_QUEUE_TIMEOUT)) {          \
417       char* _ev = grpc_event_string(event);               \
418       gpr_log(GPR_INFO, "RETURN_EVENT[%p]: %s", cq, _ev); \
419       gpr_free(_ev);                                      \
420     }                                                     \
421   } while (0)
422
423 static void on_pollset_shutdown_done(void* cq, grpc_error* error);
424
425 void grpc_cq_global_init() {
426   gpr_tls_init(&g_cached_event);
427   gpr_tls_init(&g_cached_cq);
428 }
429
430 void grpc_completion_queue_thread_local_cache_init(grpc_completion_queue* cq) {
431   if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == nullptr) {
432     gpr_tls_set(&g_cached_event, (intptr_t)0);
433     gpr_tls_set(&g_cached_cq, (intptr_t)cq);
434   }
435 }
436
437 int grpc_completion_queue_thread_local_cache_flush(grpc_completion_queue* cq,
438                                                    void** tag, int* ok) {
439   grpc_cq_completion* storage =
440       (grpc_cq_completion*)gpr_tls_get(&g_cached_event);
441   int ret = 0;
442   if (storage != nullptr &&
443       (grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq) {
444     *tag = storage->tag;
445     grpc_core::ExecCtx exec_ctx;
446     *ok = (storage->next & static_cast<uintptr_t>(1)) == 1;
447     storage->done(storage->done_arg, storage);
448     ret = 1;
449     cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
450     if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
451       GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
452       gpr_mu_lock(cq->mu);
453       cq_finish_shutdown_next(cq);
454       gpr_mu_unlock(cq->mu);
455       GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
456     }
457   }
458   gpr_tls_set(&g_cached_event, (intptr_t)0);
459   gpr_tls_set(&g_cached_cq, (intptr_t)0);
460
461   return ret;
462 }
463
464 bool CqEventQueue::Push(grpc_cq_completion* c) {
465   gpr_mpscq_push(&queue_, reinterpret_cast<gpr_mpscq_node*>(c));
466   return num_queue_items_.FetchAdd(1, grpc_core::MemoryOrder::RELAXED) == 0;
467 }
468
469 grpc_cq_completion* CqEventQueue::Pop() {
470   grpc_cq_completion* c = nullptr;
471
472   if (gpr_spinlock_trylock(&queue_lock_)) {
473     GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_SUCCESSES();
474
475     bool is_empty = false;
476     c = reinterpret_cast<grpc_cq_completion*>(
477         gpr_mpscq_pop_and_check_end(&queue_, &is_empty));
478     gpr_spinlock_unlock(&queue_lock_);
479
480     if (c == nullptr && !is_empty) {
481       GRPC_STATS_INC_CQ_EV_QUEUE_TRANSIENT_POP_FAILURES();
482     }
483   } else {
484     GRPC_STATS_INC_CQ_EV_QUEUE_TRYLOCK_FAILURES();
485   }
486
487   if (c) {
488     num_queue_items_.FetchSub(1, grpc_core::MemoryOrder::RELAXED);
489   }
490
491   return c;
492 }
493
494 grpc_completion_queue* grpc_completion_queue_create_internal(
495     grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
496     grpc_experimental_completion_queue_functor* shutdown_callback) {
497   GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);
498
499   grpc_completion_queue* cq;
500
501   GRPC_API_TRACE(
502       "grpc_completion_queue_create_internal(completion_type=%d, "
503       "polling_type=%d)",
504       2, (completion_type, polling_type));
505
506   const cq_vtable* vtable = &g_cq_vtable[completion_type];
507   const cq_poller_vtable* poller_vtable =
508       &g_poller_vtable_by_poller_type[polling_type];
509
510   grpc_core::ExecCtx exec_ctx;
511   GRPC_STATS_INC_CQS_CREATED();
512
513   cq = static_cast<grpc_completion_queue*>(
514       gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
515                  poller_vtable->size()));
516
517   cq->vtable = vtable;
518   cq->poller_vtable = poller_vtable;
519
520   /* One for destroy(), one for pollset_shutdown */
521   gpr_ref_init(&cq->owning_refs, 2);
522
523   poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
524   vtable->init(DATA_FROM_CQ(cq), shutdown_callback);
525
526   GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
527                     grpc_schedule_on_exec_ctx);
528   return cq;
529 }
530
531 static void cq_init_next(
532     void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
533   new (data) cq_next_data();
534 }
535
536 static void cq_destroy_next(void* data) {
537   cq_next_data* cqd = static_cast<cq_next_data*>(data);
538   cqd->~cq_next_data();
539 }
540
541 static void cq_init_pluck(
542     void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
543   new (data) cq_pluck_data();
544 }
545
546 static void cq_destroy_pluck(void* data) {
547   cq_pluck_data* cqd = static_cast<cq_pluck_data*>(data);
548   cqd->~cq_pluck_data();
549 }
550
551 static void cq_init_callback(
552     void* data, grpc_experimental_completion_queue_functor* shutdown_callback) {
553   new (data) cq_callback_data(shutdown_callback);
554 }
555
556 static void cq_destroy_callback(void* data) {
557   cq_callback_data* cqd = static_cast<cq_callback_data*>(data);
558   cqd->~cq_callback_data();
559 }
560
561 grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue* cq) {
562   return cq->vtable->cq_completion_type;
563 }
564
565 int grpc_get_cq_poll_num(grpc_completion_queue* cq) {
566   int cur_num_polls;
567   gpr_mu_lock(cq->mu);
568   cur_num_polls = cq->num_polls;
569   gpr_mu_unlock(cq->mu);
570   return cur_num_polls;
571 }
572
573 #ifndef NDEBUG
574 void grpc_cq_internal_ref(grpc_completion_queue* cq, const char* reason,
575                           const char* file, int line) {
576   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_cq_refcount)) {
577     gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
578     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
579             "CQ:%p   ref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val + 1,
580             reason);
581   }
582 #else
583 void grpc_cq_internal_ref(grpc_completion_queue* cq) {
584 #endif
585   gpr_ref(&cq->owning_refs);
586 }
587
588 static void on_pollset_shutdown_done(void* arg, grpc_error* error) {
589   grpc_completion_queue* cq = static_cast<grpc_completion_queue*>(arg);
590   GRPC_CQ_INTERNAL_UNREF(cq, "pollset_destroy");
591 }
592
593 #ifndef NDEBUG
594 void grpc_cq_internal_unref(grpc_completion_queue* cq, const char* reason,
595                             const char* file, int line) {
596   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_cq_refcount)) {
597     gpr_atm val = gpr_atm_no_barrier_load(&cq->owning_refs.count);
598     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
599             "CQ:%p unref %" PRIdPTR " -> %" PRIdPTR " %s", cq, val, val - 1,
600             reason);
601   }
602 #else
603 void grpc_cq_internal_unref(grpc_completion_queue* cq) {
604 #endif
605   if (gpr_unref(&cq->owning_refs)) {
606     cq->vtable->destroy(DATA_FROM_CQ(cq));
607     cq->poller_vtable->destroy(POLLSET_FROM_CQ(cq));
608 #ifndef NDEBUG
609     gpr_free(cq->outstanding_tags);
610 #endif
611     gpr_free(cq);
612   }
613 }
614
615 #ifndef NDEBUG
616 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {
617   int found = 0;
618   if (lock_cq) {
619     gpr_mu_lock(cq->mu);
620   }
621
622   for (int i = 0; i < static_cast<int>(cq->outstanding_tag_count); i++) {
623     if (cq->outstanding_tags[i] == tag) {
624       cq->outstanding_tag_count--;
625       GPR_SWAP(void*, cq->outstanding_tags[i],
626                cq->outstanding_tags[cq->outstanding_tag_count]);
627       found = 1;
628       break;
629     }
630   }
631
632   if (lock_cq) {
633     gpr_mu_unlock(cq->mu);
634   }
635
636   GPR_ASSERT(found);
637 }
638 #else
639 static void cq_check_tag(grpc_completion_queue* cq, void* tag, bool lock_cq) {}
640 #endif
641
642 static bool cq_begin_op_for_next(grpc_completion_queue* cq, void* tag) {
643   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
644   return cqd->pending_events.IncrementIfNonzero();
645 }
646
647 static bool cq_begin_op_for_pluck(grpc_completion_queue* cq, void* tag) {
648   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
649   return cqd->pending_events.IncrementIfNonzero();
650 }
651
652 static bool cq_begin_op_for_callback(grpc_completion_queue* cq, void* tag) {
653   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
654   return cqd->pending_events.IncrementIfNonzero();
655 }
656
657 bool grpc_cq_begin_op(grpc_completion_queue* cq, void* tag) {
658 #ifndef NDEBUG
659   gpr_mu_lock(cq->mu);
660   if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
661     cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
662     cq->outstanding_tags = static_cast<void**>(gpr_realloc(
663         cq->outstanding_tags,
664         sizeof(*cq->outstanding_tags) * cq->outstanding_tag_capacity));
665   }
666   cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
667   gpr_mu_unlock(cq->mu);
668 #endif
669   return cq->vtable->begin_op(cq, tag);
670 }
671
672 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
673  * completion
674  * type of GRPC_CQ_NEXT) */
675 static void cq_end_op_for_next(
676     grpc_completion_queue* cq, void* tag, grpc_error* error,
677     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
678     grpc_cq_completion* storage, bool internal) {
679   GPR_TIMER_SCOPE("cq_end_op_for_next", 0);
680
681   if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
682       (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
683        error != GRPC_ERROR_NONE)) {
684     const char* errmsg = grpc_error_string(error);
685     GRPC_API_TRACE(
686         "cq_end_op_for_next(cq=%p, tag=%p, error=%s, "
687         "done=%p, done_arg=%p, storage=%p)",
688         6, (cq, tag, errmsg, done, done_arg, storage));
689     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
690         error != GRPC_ERROR_NONE) {
691       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
692     }
693   }
694   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
695   int is_success = (error == GRPC_ERROR_NONE);
696
697   storage->tag = tag;
698   storage->done = done;
699   storage->done_arg = done_arg;
700   storage->next = static_cast<uintptr_t>(is_success);
701
702   cq_check_tag(cq, tag, true); /* Used in debug builds only */
703
704   if ((grpc_completion_queue*)gpr_tls_get(&g_cached_cq) == cq &&
705       (grpc_cq_completion*)gpr_tls_get(&g_cached_event) == nullptr) {
706     gpr_tls_set(&g_cached_event, (intptr_t)storage);
707   } else {
708     /* Add the completion to the queue */
709     bool is_first = cqd->queue.Push(storage);
710     cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
711     /* Since we do not hold the cq lock here, it is important to do an 'acquire'
712        load here (instead of a 'no_barrier' load) to match with the release
713        store
714        (done via pending_events.FetchSub(1, ACQ_REL)) in cq_shutdown_next
715        */
716     if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) != 1) {
717       /* Only kick if this is the first item queued */
718       if (is_first) {
719         gpr_mu_lock(cq->mu);
720         grpc_error* kick_error =
721             cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
722         gpr_mu_unlock(cq->mu);
723
724         if (kick_error != GRPC_ERROR_NONE) {
725           const char* msg = grpc_error_string(kick_error);
726           gpr_log(GPR_ERROR, "Kick failed: %s", msg);
727           GRPC_ERROR_UNREF(kick_error);
728         }
729       }
730       if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) ==
731           1) {
732         GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
733         gpr_mu_lock(cq->mu);
734         cq_finish_shutdown_next(cq);
735         gpr_mu_unlock(cq->mu);
736         GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
737       }
738     } else {
739       GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
740       cqd->pending_events.Store(0, grpc_core::MemoryOrder::RELEASE);
741       gpr_mu_lock(cq->mu);
742       cq_finish_shutdown_next(cq);
743       gpr_mu_unlock(cq->mu);
744       GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
745     }
746   }
747
748   GRPC_ERROR_UNREF(error);
749 }
750
751 /* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
752  * completion
753  * type of GRPC_CQ_PLUCK) */
754 static void cq_end_op_for_pluck(
755     grpc_completion_queue* cq, void* tag, grpc_error* error,
756     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
757     grpc_cq_completion* storage, bool internal) {
758   GPR_TIMER_SCOPE("cq_end_op_for_pluck", 0);
759
760   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
761   int is_success = (error == GRPC_ERROR_NONE);
762
763   if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
764       (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
765        error != GRPC_ERROR_NONE)) {
766     const char* errmsg = grpc_error_string(error);
767     GRPC_API_TRACE(
768         "cq_end_op_for_pluck(cq=%p, tag=%p, error=%s, "
769         "done=%p, done_arg=%p, storage=%p)",
770         6, (cq, tag, errmsg, done, done_arg, storage));
771     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
772         error != GRPC_ERROR_NONE) {
773       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
774     }
775   }
776
777   storage->tag = tag;
778   storage->done = done;
779   storage->done_arg = done_arg;
780   storage->next =
781       ((uintptr_t)&cqd->completed_head) | (static_cast<uintptr_t>(is_success));
782
783   gpr_mu_lock(cq->mu);
784   cq_check_tag(cq, tag, false); /* Used in debug builds only */
785
786   /* Add to the list of completions */
787   cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
788   cqd->completed_tail->next =
789       ((uintptr_t)storage) | (1u & cqd->completed_tail->next);
790   cqd->completed_tail = storage;
791
792   if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
793     cq_finish_shutdown_pluck(cq);
794     gpr_mu_unlock(cq->mu);
795   } else {
796     grpc_pollset_worker* pluck_worker = nullptr;
797     for (int i = 0; i < cqd->num_pluckers; i++) {
798       if (cqd->pluckers[i].tag == tag) {
799         pluck_worker = *cqd->pluckers[i].worker;
800         break;
801       }
802     }
803
804     grpc_error* kick_error =
805         cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), pluck_worker);
806
807     gpr_mu_unlock(cq->mu);
808
809     if (kick_error != GRPC_ERROR_NONE) {
810       const char* msg = grpc_error_string(kick_error);
811       gpr_log(GPR_ERROR, "Kick failed: %s", msg);
812
813       GRPC_ERROR_UNREF(kick_error);
814     }
815   }
816
817   GRPC_ERROR_UNREF(error);
818 }
819
820 static void functor_callback(void* arg, grpc_error* error) {
821   auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(arg);
822   functor->functor_run(functor, error == GRPC_ERROR_NONE);
823 }
824
825 /* Complete an event on a completion queue of type GRPC_CQ_CALLBACK */
826 static void cq_end_op_for_callback(
827     grpc_completion_queue* cq, void* tag, grpc_error* error,
828     void (*done)(void* done_arg, grpc_cq_completion* storage), void* done_arg,
829     grpc_cq_completion* storage, bool internal) {
830   GPR_TIMER_SCOPE("cq_end_op_for_callback", 0);
831
832   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
833
834   if (GRPC_TRACE_FLAG_ENABLED(grpc_api_trace) ||
835       (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
836        error != GRPC_ERROR_NONE)) {
837     const char* errmsg = grpc_error_string(error);
838     GRPC_API_TRACE(
839         "cq_end_op_for_callback(cq=%p, tag=%p, error=%s, "
840         "done=%p, done_arg=%p, storage=%p)",
841         6, (cq, tag, errmsg, done, done_arg, storage));
842     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures) &&
843         error != GRPC_ERROR_NONE) {
844       gpr_log(GPR_ERROR, "Operation failed: tag=%p, error=%s", tag, errmsg);
845     }
846   }
847
848   // The callback-based CQ isn't really a queue at all and thus has no need
849   // for reserved storage. Invoke the done callback right away to release it.
850   done(done_arg, storage);
851
852   cq_check_tag(cq, tag, true); /* Used in debug builds only */
853
854   cqd->things_queued_ever.FetchAdd(1, grpc_core::MemoryOrder::RELAXED);
855   if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
856     cq_finish_shutdown_callback(cq);
857   }
858
859   auto* functor = static_cast<grpc_experimental_completion_queue_functor*>(tag);
860   if (internal) {
861     grpc_core::ApplicationCallbackExecCtx::Enqueue(functor,
862                                                    (error == GRPC_ERROR_NONE));
863     GRPC_ERROR_UNREF(error);
864   } else {
865     GRPC_CLOSURE_SCHED(
866         GRPC_CLOSURE_CREATE(
867             functor_callback, functor,
868             grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)),
869         error);
870   }
871 }
872
873 void grpc_cq_end_op(grpc_completion_queue* cq, void* tag, grpc_error* error,
874                     void (*done)(void* done_arg, grpc_cq_completion* storage),
875                     void* done_arg, grpc_cq_completion* storage,
876                     bool internal) {
877   cq->vtable->end_op(cq, tag, error, done, done_arg, storage, internal);
878 }
879
880 typedef struct {
881   gpr_atm last_seen_things_queued_ever;
882   grpc_completion_queue* cq;
883   grpc_millis deadline;
884   grpc_cq_completion* stolen_completion;
885   void* tag; /* for pluck */
886   bool first_loop;
887 } cq_is_finished_arg;
888
889 class ExecCtxNext : public grpc_core::ExecCtx {
890  public:
891   ExecCtxNext(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
892
893   bool CheckReadyToFinish() override {
894     cq_is_finished_arg* a =
895         static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
896     grpc_completion_queue* cq = a->cq;
897     cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
898     GPR_ASSERT(a->stolen_completion == nullptr);
899
900     intptr_t current_last_seen_things_queued_ever =
901         cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
902
903     if (current_last_seen_things_queued_ever !=
904         a->last_seen_things_queued_ever) {
905       a->last_seen_things_queued_ever =
906           cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
907
908       /* Pop a cq_completion from the queue. Returns NULL if the queue is empty
909        * might return NULL in some cases even if the queue is not empty; but
910        * that
911        * is ok and doesn't affect correctness. Might effect the tail latencies a
912        * bit) */
913       a->stolen_completion = cqd->queue.Pop();
914       if (a->stolen_completion != nullptr) {
915         return true;
916       }
917     }
918     return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
919   }
920
921  private:
922   void* check_ready_to_finish_arg_;
923 };
924
925 #ifndef NDEBUG
926 static void dump_pending_tags(grpc_completion_queue* cq) {
927   if (!GRPC_TRACE_FLAG_ENABLED(grpc_trace_pending_tags)) return;
928
929   gpr_strvec v;
930   gpr_strvec_init(&v);
931   gpr_strvec_add(&v, gpr_strdup("PENDING TAGS:"));
932   gpr_mu_lock(cq->mu);
933   for (size_t i = 0; i < cq->outstanding_tag_count; i++) {
934     char* s;
935     gpr_asprintf(&s, " %p", cq->outstanding_tags[i]);
936     gpr_strvec_add(&v, s);
937   }
938   gpr_mu_unlock(cq->mu);
939   char* out = gpr_strvec_flatten(&v, nullptr);
940   gpr_strvec_destroy(&v);
941   gpr_log(GPR_DEBUG, "%s", out);
942   gpr_free(out);
943 }
944 #else
945 static void dump_pending_tags(grpc_completion_queue* cq) {}
946 #endif
947
948 static grpc_event cq_next(grpc_completion_queue* cq, gpr_timespec deadline,
949                           void* reserved) {
950   GPR_TIMER_SCOPE("grpc_completion_queue_next", 0);
951
952   grpc_event ret;
953   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
954
955   GRPC_API_TRACE(
956       "grpc_completion_queue_next("
957       "cq=%p, "
958       "deadline=gpr_timespec { tv_sec: %" PRId64
959       ", tv_nsec: %d, clock_type: %d }, "
960       "reserved=%p)",
961       5,
962       (cq, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
963        reserved));
964   GPR_ASSERT(!reserved);
965
966   dump_pending_tags(cq);
967
968   GRPC_CQ_INTERNAL_REF(cq, "next");
969
970   grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
971   cq_is_finished_arg is_finished_arg = {
972       cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED),
973       cq,
974       deadline_millis,
975       nullptr,
976       nullptr,
977       true};
978   ExecCtxNext exec_ctx(&is_finished_arg);
979   for (;;) {
980     grpc_millis iteration_deadline = deadline_millis;
981
982     if (is_finished_arg.stolen_completion != nullptr) {
983       grpc_cq_completion* c = is_finished_arg.stolen_completion;
984       is_finished_arg.stolen_completion = nullptr;
985       ret.type = GRPC_OP_COMPLETE;
986       ret.success = c->next & 1u;
987       ret.tag = c->tag;
988       c->done(c->done_arg, c);
989       break;
990     }
991
992     grpc_cq_completion* c = cqd->queue.Pop();
993
994     if (c != nullptr) {
995       ret.type = GRPC_OP_COMPLETE;
996       ret.success = c->next & 1u;
997       ret.tag = c->tag;
998       c->done(c->done_arg, c);
999       break;
1000     } else {
1001       /* If c == NULL it means either the queue is empty OR in an transient
1002          inconsistent state. If it is the latter, we shold do a 0-timeout poll
1003          so that the thread comes back quickly from poll to make a second
1004          attempt at popping. Not doing this can potentially deadlock this
1005          thread forever (if the deadline is infinity) */
1006       if (cqd->queue.num_items() > 0) {
1007         iteration_deadline = 0;
1008       }
1009     }
1010
1011     if (cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) == 0) {
1012       /* Before returning, check if the queue has any items left over (since
1013          gpr_mpscq_pop() can sometimes return NULL even if the queue is not
1014          empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
1015       if (cqd->queue.num_items() > 0) {
1016         /* Go to the beginning of the loop. No point doing a poll because
1017            (cq->shutdown == true) is only possible when there is no pending
1018            work (i.e cq->pending_events == 0) and any outstanding completion
1019            events should have already been queued on this cq */
1020         continue;
1021       }
1022
1023       ret.type = GRPC_QUEUE_SHUTDOWN;
1024       ret.success = 0;
1025       break;
1026     }
1027
1028     if (!is_finished_arg.first_loop &&
1029         grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
1030       ret.type = GRPC_QUEUE_TIMEOUT;
1031       ret.success = 0;
1032       dump_pending_tags(cq);
1033       break;
1034     }
1035
1036     /* The main polling work happens in grpc_pollset_work */
1037     gpr_mu_lock(cq->mu);
1038     cq->num_polls++;
1039     grpc_error* err = cq->poller_vtable->work(POLLSET_FROM_CQ(cq), nullptr,
1040                                               iteration_deadline);
1041     gpr_mu_unlock(cq->mu);
1042
1043     if (err != GRPC_ERROR_NONE) {
1044       const char* msg = grpc_error_string(err);
1045       gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg);
1046
1047       GRPC_ERROR_UNREF(err);
1048       ret.type = GRPC_QUEUE_TIMEOUT;
1049       ret.success = 0;
1050       dump_pending_tags(cq);
1051       break;
1052     }
1053     is_finished_arg.first_loop = false;
1054   }
1055
1056   if (cqd->queue.num_items() > 0 &&
1057       cqd->pending_events.Load(grpc_core::MemoryOrder::ACQUIRE) > 0) {
1058     gpr_mu_lock(cq->mu);
1059     cq->poller_vtable->kick(POLLSET_FROM_CQ(cq), nullptr);
1060     gpr_mu_unlock(cq->mu);
1061   }
1062
1063   GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1064   GRPC_CQ_INTERNAL_UNREF(cq, "next");
1065
1066   GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1067
1068   return ret;
1069 }
1070
1071 /* Finishes the completion queue shutdown. This means that there are no more
1072    completion events / tags expected from the completion queue
1073    - Must be called under completion queue lock
1074    - Must be called only once in completion queue's lifetime
1075    - grpc_completion_queue_shutdown() MUST have been called before calling
1076    this function */
1077 static void cq_finish_shutdown_next(grpc_completion_queue* cq) {
1078   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1079
1080   GPR_ASSERT(cqd->shutdown_called);
1081   GPR_ASSERT(cqd->pending_events.Load(grpc_core::MemoryOrder::RELAXED) == 0);
1082
1083   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1084 }
1085
1086 static void cq_shutdown_next(grpc_completion_queue* cq) {
1087   cq_next_data* cqd = static_cast<cq_next_data*> DATA_FROM_CQ(cq);
1088
1089   /* Need an extra ref for cq here because:
1090    * We call cq_finish_shutdown_next() below, that would call pollset shutdown.
1091    * Pollset shutdown decrements the cq ref count which can potentially destroy
1092    * the cq (if that happens to be the last ref).
1093    * Creating an extra ref here prevents the cq from getting destroyed while
1094    * this function is still active */
1095   GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
1096   gpr_mu_lock(cq->mu);
1097   if (cqd->shutdown_called) {
1098     gpr_mu_unlock(cq->mu);
1099     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1100     return;
1101   }
1102   cqd->shutdown_called = true;
1103   /* Doing acq/release FetchSub here to match with
1104    * cq_begin_op_for_next and cq_end_op_for_next functions which read/write
1105    * on this counter without necessarily holding a lock on cq */
1106   if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1107     cq_finish_shutdown_next(cq);
1108   }
1109   gpr_mu_unlock(cq->mu);
1110   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down");
1111 }
1112
1113 grpc_event grpc_completion_queue_next(grpc_completion_queue* cq,
1114                                       gpr_timespec deadline, void* reserved) {
1115   return cq->vtable->next(cq, deadline, reserved);
1116 }
1117
1118 static int add_plucker(grpc_completion_queue* cq, void* tag,
1119                        grpc_pollset_worker** worker) {
1120   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1121   if (cqd->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
1122     return 0;
1123   }
1124   cqd->pluckers[cqd->num_pluckers].tag = tag;
1125   cqd->pluckers[cqd->num_pluckers].worker = worker;
1126   cqd->num_pluckers++;
1127   return 1;
1128 }
1129
1130 static void del_plucker(grpc_completion_queue* cq, void* tag,
1131                         grpc_pollset_worker** worker) {
1132   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1133   for (int i = 0; i < cqd->num_pluckers; i++) {
1134     if (cqd->pluckers[i].tag == tag && cqd->pluckers[i].worker == worker) {
1135       cqd->num_pluckers--;
1136       GPR_SWAP(plucker, cqd->pluckers[i], cqd->pluckers[cqd->num_pluckers]);
1137       return;
1138     }
1139   }
1140   GPR_UNREACHABLE_CODE(return );
1141 }
1142
1143 class ExecCtxPluck : public grpc_core::ExecCtx {
1144  public:
1145   ExecCtxPluck(void* arg) : ExecCtx(0), check_ready_to_finish_arg_(arg) {}
1146
1147   bool CheckReadyToFinish() override {
1148     cq_is_finished_arg* a =
1149         static_cast<cq_is_finished_arg*>(check_ready_to_finish_arg_);
1150     grpc_completion_queue* cq = a->cq;
1151     cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1152
1153     GPR_ASSERT(a->stolen_completion == nullptr);
1154     gpr_atm current_last_seen_things_queued_ever =
1155         cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
1156     if (current_last_seen_things_queued_ever !=
1157         a->last_seen_things_queued_ever) {
1158       gpr_mu_lock(cq->mu);
1159       a->last_seen_things_queued_ever =
1160           cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED);
1161       grpc_cq_completion* c;
1162       grpc_cq_completion* prev = &cqd->completed_head;
1163       while ((c = (grpc_cq_completion*)(prev->next &
1164                                         ~static_cast<uintptr_t>(1))) !=
1165              &cqd->completed_head) {
1166         if (c->tag == a->tag) {
1167           prev->next = (prev->next & static_cast<uintptr_t>(1)) |
1168                        (c->next & ~static_cast<uintptr_t>(1));
1169           if (c == cqd->completed_tail) {
1170             cqd->completed_tail = prev;
1171           }
1172           gpr_mu_unlock(cq->mu);
1173           a->stolen_completion = c;
1174           return true;
1175         }
1176         prev = c;
1177       }
1178       gpr_mu_unlock(cq->mu);
1179     }
1180     return !a->first_loop && a->deadline < grpc_core::ExecCtx::Get()->Now();
1181   }
1182
1183  private:
1184   void* check_ready_to_finish_arg_;
1185 };
1186
1187 static grpc_event cq_pluck(grpc_completion_queue* cq, void* tag,
1188                            gpr_timespec deadline, void* reserved) {
1189   GPR_TIMER_SCOPE("grpc_completion_queue_pluck", 0);
1190
1191   grpc_event ret;
1192   grpc_cq_completion* c;
1193   grpc_cq_completion* prev;
1194   grpc_pollset_worker* worker = nullptr;
1195   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1196
1197   if (GRPC_TRACE_FLAG_ENABLED(grpc_cq_pluck_trace)) {
1198     GRPC_API_TRACE(
1199         "grpc_completion_queue_pluck("
1200         "cq=%p, tag=%p, "
1201         "deadline=gpr_timespec { tv_sec: %" PRId64
1202         ", tv_nsec: %d, clock_type: %d }, "
1203         "reserved=%p)",
1204         6,
1205         (cq, tag, deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
1206          reserved));
1207   }
1208   GPR_ASSERT(!reserved);
1209
1210   dump_pending_tags(cq);
1211
1212   GRPC_CQ_INTERNAL_REF(cq, "pluck");
1213   gpr_mu_lock(cq->mu);
1214   grpc_millis deadline_millis = grpc_timespec_to_millis_round_up(deadline);
1215   cq_is_finished_arg is_finished_arg = {
1216       cqd->things_queued_ever.Load(grpc_core::MemoryOrder::RELAXED),
1217       cq,
1218       deadline_millis,
1219       nullptr,
1220       tag,
1221       true};
1222   ExecCtxPluck exec_ctx(&is_finished_arg);
1223   for (;;) {
1224     if (is_finished_arg.stolen_completion != nullptr) {
1225       gpr_mu_unlock(cq->mu);
1226       c = is_finished_arg.stolen_completion;
1227       is_finished_arg.stolen_completion = nullptr;
1228       ret.type = GRPC_OP_COMPLETE;
1229       ret.success = c->next & 1u;
1230       ret.tag = c->tag;
1231       c->done(c->done_arg, c);
1232       break;
1233     }
1234     prev = &cqd->completed_head;
1235     while (
1236         (c = (grpc_cq_completion*)(prev->next & ~static_cast<uintptr_t>(1))) !=
1237         &cqd->completed_head) {
1238       if (c->tag == tag) {
1239         prev->next = (prev->next & static_cast<uintptr_t>(1)) |
1240                      (c->next & ~static_cast<uintptr_t>(1));
1241         if (c == cqd->completed_tail) {
1242           cqd->completed_tail = prev;
1243         }
1244         gpr_mu_unlock(cq->mu);
1245         ret.type = GRPC_OP_COMPLETE;
1246         ret.success = c->next & 1u;
1247         ret.tag = c->tag;
1248         c->done(c->done_arg, c);
1249         goto done;
1250       }
1251       prev = c;
1252     }
1253     if (cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED)) {
1254       gpr_mu_unlock(cq->mu);
1255       ret.type = GRPC_QUEUE_SHUTDOWN;
1256       ret.success = 0;
1257       break;
1258     }
1259     if (!add_plucker(cq, tag, &worker)) {
1260       gpr_log(GPR_DEBUG,
1261               "Too many outstanding grpc_completion_queue_pluck calls: maximum "
1262               "is %d",
1263               GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
1264       gpr_mu_unlock(cq->mu);
1265       /* TODO(ctiller): should we use a different result here */
1266       ret.type = GRPC_QUEUE_TIMEOUT;
1267       ret.success = 0;
1268       dump_pending_tags(cq);
1269       break;
1270     }
1271     if (!is_finished_arg.first_loop &&
1272         grpc_core::ExecCtx::Get()->Now() >= deadline_millis) {
1273       del_plucker(cq, tag, &worker);
1274       gpr_mu_unlock(cq->mu);
1275       ret.type = GRPC_QUEUE_TIMEOUT;
1276       ret.success = 0;
1277       dump_pending_tags(cq);
1278       break;
1279     }
1280     cq->num_polls++;
1281     grpc_error* err =
1282         cq->poller_vtable->work(POLLSET_FROM_CQ(cq), &worker, deadline_millis);
1283     if (err != GRPC_ERROR_NONE) {
1284       del_plucker(cq, tag, &worker);
1285       gpr_mu_unlock(cq->mu);
1286       const char* msg = grpc_error_string(err);
1287       gpr_log(GPR_ERROR, "Completion queue pluck failed: %s", msg);
1288
1289       GRPC_ERROR_UNREF(err);
1290       ret.type = GRPC_QUEUE_TIMEOUT;
1291       ret.success = 0;
1292       dump_pending_tags(cq);
1293       break;
1294     }
1295     is_finished_arg.first_loop = false;
1296     del_plucker(cq, tag, &worker);
1297   }
1298 done:
1299   GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
1300   GRPC_CQ_INTERNAL_UNREF(cq, "pluck");
1301
1302   GPR_ASSERT(is_finished_arg.stolen_completion == nullptr);
1303
1304   return ret;
1305 }
1306
1307 grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
1308                                        gpr_timespec deadline, void* reserved) {
1309   return cq->vtable->pluck(cq, tag, deadline, reserved);
1310 }
1311
1312 static void cq_finish_shutdown_pluck(grpc_completion_queue* cq) {
1313   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1314
1315   GPR_ASSERT(cqd->shutdown_called);
1316   GPR_ASSERT(!cqd->shutdown.Load(grpc_core::MemoryOrder::RELAXED));
1317   cqd->shutdown.Store(1, grpc_core::MemoryOrder::RELAXED);
1318
1319   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1320 }
1321
1322 /* NOTE: This function is almost exactly identical to cq_shutdown_next() but
1323  * merging them is a bit tricky and probably not worth it */
1324 static void cq_shutdown_pluck(grpc_completion_queue* cq) {
1325   cq_pluck_data* cqd = static_cast<cq_pluck_data*> DATA_FROM_CQ(cq);
1326
1327   /* Need an extra ref for cq here because:
1328    * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
1329    * Pollset shutdown decrements the cq ref count which can potentially destroy
1330    * the cq (if that happens to be the last ref).
1331    * Creating an extra ref here prevents the cq from getting destroyed while
1332    * this function is still active */
1333   GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
1334   gpr_mu_lock(cq->mu);
1335   if (cqd->shutdown_called) {
1336     gpr_mu_unlock(cq->mu);
1337     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1338     return;
1339   }
1340   cqd->shutdown_called = true;
1341   if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1342     cq_finish_shutdown_pluck(cq);
1343   }
1344   gpr_mu_unlock(cq->mu);
1345   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (pluck cq)");
1346 }
1347
1348 static void cq_finish_shutdown_callback(grpc_completion_queue* cq) {
1349   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1350   auto* callback = cqd->shutdown_callback;
1351
1352   GPR_ASSERT(cqd->shutdown_called);
1353
1354   cq->poller_vtable->shutdown(POLLSET_FROM_CQ(cq), &cq->pollset_shutdown_done);
1355   GRPC_CLOSURE_SCHED(
1356       GRPC_CLOSURE_CREATE(
1357           functor_callback, callback,
1358           grpc_core::Executor::Scheduler(grpc_core::ExecutorJobType::SHORT)),
1359       GRPC_ERROR_NONE);
1360 }
1361
1362 static void cq_shutdown_callback(grpc_completion_queue* cq) {
1363   cq_callback_data* cqd = static_cast<cq_callback_data*> DATA_FROM_CQ(cq);
1364
1365   /* Need an extra ref for cq here because:
1366    * We call cq_finish_shutdown_callback() below, which calls pollset shutdown.
1367    * Pollset shutdown decrements the cq ref count which can potentially destroy
1368    * the cq (if that happens to be the last ref).
1369    * Creating an extra ref here prevents the cq from getting destroyed while
1370    * this function is still active */
1371   GRPC_CQ_INTERNAL_REF(cq, "shutting_down (callback cq)");
1372   gpr_mu_lock(cq->mu);
1373   if (cqd->shutdown_called) {
1374     gpr_mu_unlock(cq->mu);
1375     GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1376     return;
1377   }
1378   cqd->shutdown_called = true;
1379   if (cqd->pending_events.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1) {
1380     gpr_mu_unlock(cq->mu);
1381     cq_finish_shutdown_callback(cq);
1382   } else {
1383     gpr_mu_unlock(cq->mu);
1384   }
1385   GRPC_CQ_INTERNAL_UNREF(cq, "shutting_down (callback cq)");
1386 }
1387
1388 /* Shutdown simply drops a ref that we reserved at creation time; if we drop
1389    to zero here, then enter shutdown mode and wake up any waiters */
1390 void grpc_completion_queue_shutdown(grpc_completion_queue* cq) {
1391   GPR_TIMER_SCOPE("grpc_completion_queue_shutdown", 0);
1392   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1393   grpc_core::ExecCtx exec_ctx;
1394   GRPC_API_TRACE("grpc_completion_queue_shutdown(cq=%p)", 1, (cq));
1395   cq->vtable->shutdown(cq);
1396 }
1397
1398 void grpc_completion_queue_destroy(grpc_completion_queue* cq) {
1399   GPR_TIMER_SCOPE("grpc_completion_queue_destroy", 0);
1400   GRPC_API_TRACE("grpc_completion_queue_destroy(cq=%p)", 1, (cq));
1401   grpc_completion_queue_shutdown(cq);
1402
1403   grpc_core::ExecCtx exec_ctx;
1404   GRPC_CQ_INTERNAL_UNREF(cq, "destroy");
1405 }
1406
1407 grpc_pollset* grpc_cq_pollset(grpc_completion_queue* cq) {
1408   return cq->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cq) : nullptr;
1409 }
1410
1411 bool grpc_cq_can_listen(grpc_completion_queue* cq) {
1412   return cq->poller_vtable->can_listen;
1413 }