3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include "src/core/lib/surface/completion_queue.h"
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/log.h>
23 #include <grpc/support/time.h>
24 #include "src/core/lib/gpr/useful.h"
25 #include "src/core/lib/gprpp/memory.h"
26 #include "src/core/lib/gprpp/sync.h"
27 #include "src/core/lib/iomgr/iomgr.h"
28 #include "test/core/util/test_config.h"
30 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
32 static void* create_test_tag(void) {
33 static intptr_t i = 0;
34 return reinterpret_cast<void*>(++i);
37 /* helper for tests to shutdown correctly and tersely */
38 static void shutdown_and_destroy(grpc_completion_queue* cc) {
40 grpc_completion_queue_shutdown(cc);
42 switch (grpc_get_cq_completion_type(cc)) {
44 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
46 GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
50 ev = grpc_completion_queue_pluck(
51 cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
52 GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
55 case GRPC_CQ_CALLBACK: {
56 // Nothing to do here. The shutdown callback will be invoked when
61 gpr_log(GPR_ERROR, "Unknown completion type");
66 grpc_completion_queue_destroy(cc);
69 /* ensure we can create and destroy a completion channel */
70 static void test_no_op(void) {
71 grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
72 grpc_cq_polling_type polling_types[] = {
73 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
74 grpc_completion_queue_attributes attr;
75 LOG_TEST("test_no_op");
78 for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
79 for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
80 attr.cq_completion_type = completion_types[i];
81 attr.cq_polling_type = polling_types[j];
82 shutdown_and_destroy(grpc_completion_queue_create(
83 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr));
88 static void test_pollset_conversion(void) {
89 grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
90 grpc_cq_polling_type polling_types[] = {GRPC_CQ_DEFAULT_POLLING,
91 GRPC_CQ_NON_LISTENING};
92 grpc_completion_queue* cq;
93 grpc_completion_queue_attributes attr;
95 LOG_TEST("test_pollset_conversion");
98 for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
99 for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
100 attr.cq_completion_type = completion_types[i];
101 attr.cq_polling_type = polling_types[j];
102 cq = grpc_completion_queue_create(
103 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
104 GPR_ASSERT(grpc_cq_pollset(cq) != nullptr);
105 shutdown_and_destroy(cq);
110 static void test_wait_empty(void) {
111 grpc_cq_polling_type polling_types[] = {
112 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
113 grpc_completion_queue* cc;
114 grpc_completion_queue_attributes attr;
117 LOG_TEST("test_wait_empty");
120 attr.cq_completion_type = GRPC_CQ_NEXT;
121 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
122 attr.cq_polling_type = polling_types[i];
123 cc = grpc_completion_queue_create(
124 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
126 grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), nullptr);
127 GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
128 shutdown_and_destroy(cc);
132 static void do_nothing_end_completion(void* /*arg*/,
133 grpc_cq_completion* /*c*/) {}
135 static void test_cq_end_op(void) {
137 grpc_completion_queue* cc;
138 grpc_cq_completion completion;
139 grpc_cq_polling_type polling_types[] = {
140 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
141 grpc_completion_queue_attributes attr;
142 void* tag = create_test_tag();
144 LOG_TEST("test_cq_end_op");
147 attr.cq_completion_type = GRPC_CQ_NEXT;
148 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
149 grpc_core::ExecCtx exec_ctx;
150 attr.cq_polling_type = polling_types[i];
151 cc = grpc_completion_queue_create(
152 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
154 GPR_ASSERT(grpc_cq_begin_op(cc, tag));
155 grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr,
158 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
160 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
161 GPR_ASSERT(ev.tag == tag);
162 GPR_ASSERT(ev.success);
164 shutdown_and_destroy(cc);
168 static void test_cq_tls_cache_full(void) {
170 grpc_completion_queue* cc;
171 grpc_cq_completion completion;
172 grpc_cq_polling_type polling_types[] = {
173 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
174 grpc_completion_queue_attributes attr;
175 void* tag = create_test_tag();
179 LOG_TEST("test_cq_tls_cache_full");
182 attr.cq_completion_type = GRPC_CQ_NEXT;
183 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
184 grpc_core::ExecCtx exec_ctx; // Reset exec_ctx
185 attr.cq_polling_type = polling_types[i];
186 cc = grpc_completion_queue_create(
187 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
189 grpc_completion_queue_thread_local_cache_init(cc);
190 GPR_ASSERT(grpc_cq_begin_op(cc, tag));
191 grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr,
194 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
196 GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
199 grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 1);
200 GPR_ASSERT(res_tag == tag);
203 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
205 GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
207 shutdown_and_destroy(cc);
211 static void test_cq_tls_cache_empty(void) {
212 grpc_completion_queue* cc;
213 grpc_cq_polling_type polling_types[] = {
214 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
215 grpc_completion_queue_attributes attr;
219 LOG_TEST("test_cq_tls_cache_empty");
222 attr.cq_completion_type = GRPC_CQ_NEXT;
223 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
224 grpc_core::ExecCtx exec_ctx; // Reset exec_ctx
225 attr.cq_polling_type = polling_types[i];
226 cc = grpc_completion_queue_create(
227 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
230 grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
231 grpc_completion_queue_thread_local_cache_init(cc);
233 grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
234 shutdown_and_destroy(cc);
238 static void test_shutdown_then_next_polling(void) {
239 grpc_cq_polling_type polling_types[] = {
240 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
241 grpc_completion_queue* cc;
242 grpc_completion_queue_attributes attr;
244 LOG_TEST("test_shutdown_then_next_polling");
247 attr.cq_completion_type = GRPC_CQ_NEXT;
248 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
249 attr.cq_polling_type = polling_types[i];
250 cc = grpc_completion_queue_create(
251 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
252 grpc_completion_queue_shutdown(cc);
253 event = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
255 GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
256 grpc_completion_queue_destroy(cc);
260 static void test_shutdown_then_next_with_timeout(void) {
261 grpc_cq_polling_type polling_types[] = {
262 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
263 grpc_completion_queue* cc;
264 grpc_completion_queue_attributes attr;
266 LOG_TEST("test_shutdown_then_next_with_timeout");
269 attr.cq_completion_type = GRPC_CQ_NEXT;
270 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
271 attr.cq_polling_type = polling_types[i];
272 cc = grpc_completion_queue_create(
273 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
275 grpc_completion_queue_shutdown(cc);
276 event = grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME),
278 GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
279 grpc_completion_queue_destroy(cc);
283 static void test_pluck(void) {
285 grpc_completion_queue* cc;
287 grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
288 grpc_cq_polling_type polling_types[] = {
289 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
290 grpc_completion_queue_attributes attr;
293 LOG_TEST("test_pluck");
295 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
296 tags[i] = create_test_tag();
297 for (j = 0; j < i; j++) {
298 GPR_ASSERT(tags[i] != tags[j]);
303 attr.cq_completion_type = GRPC_CQ_PLUCK;
304 for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
305 grpc_core::ExecCtx exec_ctx; // reset exec_ctx
306 attr.cq_polling_type = polling_types[pidx];
307 cc = grpc_completion_queue_create(
308 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
310 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
311 GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
312 grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
313 nullptr, &completions[i]);
316 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
317 ev = grpc_completion_queue_pluck(
318 cc, tags[i], gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
319 GPR_ASSERT(ev.tag == tags[i]);
322 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
323 GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
324 grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
325 nullptr, &completions[i]);
328 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
329 ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1],
330 gpr_inf_past(GPR_CLOCK_REALTIME),
332 GPR_ASSERT(ev.tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]);
335 shutdown_and_destroy(cc);
339 static void test_pluck_after_shutdown(void) {
340 grpc_cq_polling_type polling_types[] = {
341 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
343 grpc_completion_queue* cc;
344 grpc_completion_queue_attributes attr;
346 LOG_TEST("test_pluck_after_shutdown");
349 attr.cq_completion_type = GRPC_CQ_PLUCK;
350 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
351 attr.cq_polling_type = polling_types[i];
352 cc = grpc_completion_queue_create(
353 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
354 grpc_completion_queue_shutdown(cc);
355 ev = grpc_completion_queue_pluck(
356 cc, nullptr, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
357 GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
358 grpc_completion_queue_destroy(cc);
362 static void test_callback(void) {
363 grpc_completion_queue* cc;
364 static void* tags[128];
365 grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
366 grpc_cq_polling_type polling_types[] = {
367 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
368 grpc_completion_queue_attributes attr;
370 static gpr_mu mu, shutdown_mu;
371 static gpr_cv cv, shutdown_cv;
372 static int cb_counter;
374 gpr_mu_init(&shutdown_mu);
376 gpr_cv_init(&shutdown_cv);
378 LOG_TEST("test_callback");
380 bool got_shutdown = false;
381 class ShutdownCallback : public grpc_completion_queue_functor {
383 explicit ShutdownCallback(bool* done) : done_(done) {
384 functor_run = &ShutdownCallback::Run;
387 ~ShutdownCallback() {}
388 static void Run(grpc_completion_queue_functor* cb, int ok) {
389 gpr_mu_lock(&shutdown_mu);
390 *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
391 // Signal when the shutdown callback is completed.
392 gpr_cv_signal(&shutdown_cv);
393 gpr_mu_unlock(&shutdown_mu);
399 ShutdownCallback shutdown_cb(&got_shutdown);
402 attr.cq_completion_type = GRPC_CQ_CALLBACK;
403 attr.cq_shutdown_cb = &shutdown_cb;
405 for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
410 // reset exec_ctx types
411 grpc_core::ExecCtx exec_ctx;
412 attr.cq_polling_type = polling_types[pidx];
413 cc = grpc_completion_queue_create(
414 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
416 class TagCallback : public grpc_completion_queue_functor {
418 TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
419 functor_run = &TagCallback::Run;
420 // Inlineable should be false since this callback takes locks.
424 static void Run(grpc_completion_queue_functor* cb, int ok) {
425 GPR_ASSERT(static_cast<bool>(ok));
426 auto* callback = static_cast<TagCallback*>(cb);
429 *callback->counter_ += callback->tag_;
430 if (cb_counter == GPR_ARRAY_SIZE(tags)) {
442 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
443 tags[i] = static_cast<void*>(new TagCallback(&counter, i));
447 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
448 GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
449 grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
450 nullptr, &completions[i]);
454 while (cb_counter != GPR_ARRAY_SIZE(tags)) {
455 // Wait for all the callbacks to complete.
456 gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
460 shutdown_and_destroy(cc);
462 gpr_mu_lock(&shutdown_mu);
463 while (!got_shutdown) {
464 // Wait for the shutdown callback to complete.
465 gpr_cv_wait(&shutdown_cv, &shutdown_mu,
466 gpr_inf_future(GPR_CLOCK_REALTIME));
468 gpr_mu_unlock(&shutdown_mu);
471 // Run the assertions to check if the test ran successfully.
472 GPR_ASSERT(sumtags == counter);
473 GPR_ASSERT(got_shutdown);
474 got_shutdown = false;
478 gpr_cv_destroy(&shutdown_cv);
480 gpr_mu_destroy(&shutdown_mu);
483 struct thread_state {
484 grpc_completion_queue* cc;
488 int main(int argc, char** argv) {
489 grpc::testing::TestEnvironment env(argc, argv);
492 test_pollset_conversion();
494 test_shutdown_then_next_polling();
495 test_shutdown_then_next_with_timeout();
498 test_pluck_after_shutdown();
499 test_cq_tls_cache_full();
500 test_cq_tls_cache_empty();