e3001c7a8d5fba2234218e866b05e19eedc90759
[platform/upstream/grpc.git] / test / core / surface / completion_queue_test.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include "src/core/lib/surface/completion_queue.h"
20
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/log.h>
23 #include <grpc/support/time.h>
24 #include "src/core/lib/gpr/useful.h"
25 #include "src/core/lib/gprpp/memory.h"
26 #include "src/core/lib/gprpp/sync.h"
27 #include "src/core/lib/iomgr/iomgr.h"
28 #include "test/core/util/test_config.h"
29
30 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
31
32 static void* create_test_tag(void) {
33   static intptr_t i = 0;
34   return reinterpret_cast<void*>(++i);
35 }
36
37 /* helper for tests to shutdown correctly and tersely */
38 static void shutdown_and_destroy(grpc_completion_queue* cc) {
39   grpc_event ev;
40   grpc_completion_queue_shutdown(cc);
41
42   switch (grpc_get_cq_completion_type(cc)) {
43     case GRPC_CQ_NEXT: {
44       ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
45                                       nullptr);
46       GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
47       break;
48     }
49     case GRPC_CQ_PLUCK: {
50       ev = grpc_completion_queue_pluck(
51           cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
52       GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
53       break;
54     }
55     case GRPC_CQ_CALLBACK: {
56       // Nothing to do here. The shutdown callback will be invoked when
57       // possible.
58       break;
59     }
60     default: {
61       gpr_log(GPR_ERROR, "Unknown completion type");
62       break;
63     }
64   }
65
66   grpc_completion_queue_destroy(cc);
67 }
68
69 /* ensure we can create and destroy a completion channel */
70 static void test_no_op(void) {
71   grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
72   grpc_cq_polling_type polling_types[] = {
73       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
74   grpc_completion_queue_attributes attr;
75   LOG_TEST("test_no_op");
76
77   attr.version = 1;
78   for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
79     for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
80       attr.cq_completion_type = completion_types[i];
81       attr.cq_polling_type = polling_types[j];
82       shutdown_and_destroy(grpc_completion_queue_create(
83           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr));
84     }
85   }
86 }
87
88 static void test_pollset_conversion(void) {
89   grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
90   grpc_cq_polling_type polling_types[] = {GRPC_CQ_DEFAULT_POLLING,
91                                           GRPC_CQ_NON_LISTENING};
92   grpc_completion_queue* cq;
93   grpc_completion_queue_attributes attr;
94
95   LOG_TEST("test_pollset_conversion");
96
97   attr.version = 1;
98   for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
99     for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
100       attr.cq_completion_type = completion_types[i];
101       attr.cq_polling_type = polling_types[j];
102       cq = grpc_completion_queue_create(
103           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
104       GPR_ASSERT(grpc_cq_pollset(cq) != nullptr);
105       shutdown_and_destroy(cq);
106     }
107   }
108 }
109
110 static void test_wait_empty(void) {
111   grpc_cq_polling_type polling_types[] = {
112       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
113   grpc_completion_queue* cc;
114   grpc_completion_queue_attributes attr;
115   grpc_event event;
116
117   LOG_TEST("test_wait_empty");
118
119   attr.version = 1;
120   attr.cq_completion_type = GRPC_CQ_NEXT;
121   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
122     attr.cq_polling_type = polling_types[i];
123     cc = grpc_completion_queue_create(
124         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
125     event =
126         grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), nullptr);
127     GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
128     shutdown_and_destroy(cc);
129   }
130 }
131
132 static void do_nothing_end_completion(void* /*arg*/,
133                                       grpc_cq_completion* /*c*/) {}
134
135 static void test_cq_end_op(void) {
136   grpc_event ev;
137   grpc_completion_queue* cc;
138   grpc_cq_completion completion;
139   grpc_cq_polling_type polling_types[] = {
140       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
141   grpc_completion_queue_attributes attr;
142   void* tag = create_test_tag();
143
144   LOG_TEST("test_cq_end_op");
145
146   attr.version = 1;
147   attr.cq_completion_type = GRPC_CQ_NEXT;
148   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
149     grpc_core::ExecCtx exec_ctx;
150     attr.cq_polling_type = polling_types[i];
151     cc = grpc_completion_queue_create(
152         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
153
154     GPR_ASSERT(grpc_cq_begin_op(cc, tag));
155     grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr,
156                    &completion);
157
158     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
159                                     nullptr);
160     GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
161     GPR_ASSERT(ev.tag == tag);
162     GPR_ASSERT(ev.success);
163
164     shutdown_and_destroy(cc);
165   }
166 }
167
168 static void test_cq_tls_cache_full(void) {
169   grpc_event ev;
170   grpc_completion_queue* cc;
171   grpc_cq_completion completion;
172   grpc_cq_polling_type polling_types[] = {
173       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
174   grpc_completion_queue_attributes attr;
175   void* tag = create_test_tag();
176   void* res_tag;
177   int ok;
178
179   LOG_TEST("test_cq_tls_cache_full");
180
181   attr.version = 1;
182   attr.cq_completion_type = GRPC_CQ_NEXT;
183   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
184     grpc_core::ExecCtx exec_ctx;  // Reset exec_ctx
185     attr.cq_polling_type = polling_types[i];
186     cc = grpc_completion_queue_create(
187         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
188
189     grpc_completion_queue_thread_local_cache_init(cc);
190     GPR_ASSERT(grpc_cq_begin_op(cc, tag));
191     grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr,
192                    &completion);
193
194     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
195                                     nullptr);
196     GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
197
198     GPR_ASSERT(
199         grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 1);
200     GPR_ASSERT(res_tag == tag);
201     GPR_ASSERT(ok);
202
203     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
204                                     nullptr);
205     GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
206
207     shutdown_and_destroy(cc);
208   }
209 }
210
211 static void test_cq_tls_cache_empty(void) {
212   grpc_completion_queue* cc;
213   grpc_cq_polling_type polling_types[] = {
214       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
215   grpc_completion_queue_attributes attr;
216   void* res_tag;
217   int ok;
218
219   LOG_TEST("test_cq_tls_cache_empty");
220
221   attr.version = 1;
222   attr.cq_completion_type = GRPC_CQ_NEXT;
223   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
224     grpc_core::ExecCtx exec_ctx;  // Reset exec_ctx
225     attr.cq_polling_type = polling_types[i];
226     cc = grpc_completion_queue_create(
227         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
228
229     GPR_ASSERT(
230         grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
231     grpc_completion_queue_thread_local_cache_init(cc);
232     GPR_ASSERT(
233         grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
234     shutdown_and_destroy(cc);
235   }
236 }
237
238 static void test_shutdown_then_next_polling(void) {
239   grpc_cq_polling_type polling_types[] = {
240       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
241   grpc_completion_queue* cc;
242   grpc_completion_queue_attributes attr;
243   grpc_event event;
244   LOG_TEST("test_shutdown_then_next_polling");
245
246   attr.version = 1;
247   attr.cq_completion_type = GRPC_CQ_NEXT;
248   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
249     attr.cq_polling_type = polling_types[i];
250     cc = grpc_completion_queue_create(
251         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
252     grpc_completion_queue_shutdown(cc);
253     event = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
254                                        nullptr);
255     GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
256     grpc_completion_queue_destroy(cc);
257   }
258 }
259
260 static void test_shutdown_then_next_with_timeout(void) {
261   grpc_cq_polling_type polling_types[] = {
262       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
263   grpc_completion_queue* cc;
264   grpc_completion_queue_attributes attr;
265   grpc_event event;
266   LOG_TEST("test_shutdown_then_next_with_timeout");
267
268   attr.version = 1;
269   attr.cq_completion_type = GRPC_CQ_NEXT;
270   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
271     attr.cq_polling_type = polling_types[i];
272     cc = grpc_completion_queue_create(
273         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
274
275     grpc_completion_queue_shutdown(cc);
276     event = grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME),
277                                        nullptr);
278     GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
279     grpc_completion_queue_destroy(cc);
280   }
281 }
282
283 static void test_pluck(void) {
284   grpc_event ev;
285   grpc_completion_queue* cc;
286   void* tags[128];
287   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
288   grpc_cq_polling_type polling_types[] = {
289       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
290   grpc_completion_queue_attributes attr;
291   unsigned i, j;
292
293   LOG_TEST("test_pluck");
294
295   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
296     tags[i] = create_test_tag();
297     for (j = 0; j < i; j++) {
298       GPR_ASSERT(tags[i] != tags[j]);
299     }
300   }
301
302   attr.version = 1;
303   attr.cq_completion_type = GRPC_CQ_PLUCK;
304   for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
305     grpc_core::ExecCtx exec_ctx;  // reset exec_ctx
306     attr.cq_polling_type = polling_types[pidx];
307     cc = grpc_completion_queue_create(
308         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
309
310     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
311       GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
312       grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
313                      nullptr, &completions[i]);
314     }
315
316     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
317       ev = grpc_completion_queue_pluck(
318           cc, tags[i], gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
319       GPR_ASSERT(ev.tag == tags[i]);
320     }
321
322     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
323       GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
324       grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
325                      nullptr, &completions[i]);
326     }
327
328     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
329       ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1],
330                                        gpr_inf_past(GPR_CLOCK_REALTIME),
331                                        nullptr);
332       GPR_ASSERT(ev.tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]);
333     }
334
335     shutdown_and_destroy(cc);
336   }
337 }
338
339 static void test_pluck_after_shutdown(void) {
340   grpc_cq_polling_type polling_types[] = {
341       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
342   grpc_event ev;
343   grpc_completion_queue* cc;
344   grpc_completion_queue_attributes attr;
345
346   LOG_TEST("test_pluck_after_shutdown");
347
348   attr.version = 1;
349   attr.cq_completion_type = GRPC_CQ_PLUCK;
350   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
351     attr.cq_polling_type = polling_types[i];
352     cc = grpc_completion_queue_create(
353         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
354     grpc_completion_queue_shutdown(cc);
355     ev = grpc_completion_queue_pluck(
356         cc, nullptr, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
357     GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
358     grpc_completion_queue_destroy(cc);
359   }
360 }
361
362 static void test_callback(void) {
363   grpc_completion_queue* cc;
364   static void* tags[128];
365   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
366   grpc_cq_polling_type polling_types[] = {
367       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
368   grpc_completion_queue_attributes attr;
369   unsigned i;
370   static gpr_mu mu, shutdown_mu;
371   static gpr_cv cv, shutdown_cv;
372   static int cb_counter;
373   gpr_mu_init(&mu);
374   gpr_mu_init(&shutdown_mu);
375   gpr_cv_init(&cv);
376   gpr_cv_init(&shutdown_cv);
377
378   LOG_TEST("test_callback");
379
380   bool got_shutdown = false;
381   class ShutdownCallback : public grpc_completion_queue_functor {
382    public:
383     explicit ShutdownCallback(bool* done) : done_(done) {
384       functor_run = &ShutdownCallback::Run;
385       inlineable = false;
386     }
387     ~ShutdownCallback() {}
388     static void Run(grpc_completion_queue_functor* cb, int ok) {
389       gpr_mu_lock(&shutdown_mu);
390       *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
391       // Signal when the shutdown callback is completed.
392       gpr_cv_signal(&shutdown_cv);
393       gpr_mu_unlock(&shutdown_mu);
394     }
395
396    private:
397     bool* done_;
398   };
399   ShutdownCallback shutdown_cb(&got_shutdown);
400
401   attr.version = 2;
402   attr.cq_completion_type = GRPC_CQ_CALLBACK;
403   attr.cq_shutdown_cb = &shutdown_cb;
404
405   for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
406     int sumtags = 0;
407     int counter = 0;
408     cb_counter = 0;
409     {
410       // reset exec_ctx types
411       grpc_core::ExecCtx exec_ctx;
412       attr.cq_polling_type = polling_types[pidx];
413       cc = grpc_completion_queue_create(
414           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
415
416       class TagCallback : public grpc_completion_queue_functor {
417        public:
418         TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
419           functor_run = &TagCallback::Run;
420           // Inlineable should be false since this callback takes locks.
421           inlineable = false;
422         }
423         ~TagCallback() {}
424         static void Run(grpc_completion_queue_functor* cb, int ok) {
425           GPR_ASSERT(static_cast<bool>(ok));
426           auto* callback = static_cast<TagCallback*>(cb);
427           gpr_mu_lock(&mu);
428           cb_counter++;
429           *callback->counter_ += callback->tag_;
430           if (cb_counter == GPR_ARRAY_SIZE(tags)) {
431             gpr_cv_signal(&cv);
432           }
433           gpr_mu_unlock(&mu);
434           delete callback;
435         };
436
437        private:
438         int* counter_;
439         int tag_;
440       };
441
442       for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
443         tags[i] = static_cast<void*>(new TagCallback(&counter, i));
444         sumtags += i;
445       }
446
447       for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
448         GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
449         grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
450                        nullptr, &completions[i]);
451       }
452
453       gpr_mu_lock(&mu);
454       while (cb_counter != GPR_ARRAY_SIZE(tags)) {
455         // Wait for all the callbacks to complete.
456         gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
457       }
458       gpr_mu_unlock(&mu);
459
460       shutdown_and_destroy(cc);
461
462       gpr_mu_lock(&shutdown_mu);
463       while (!got_shutdown) {
464         // Wait for the shutdown callback to complete.
465         gpr_cv_wait(&shutdown_cv, &shutdown_mu,
466                     gpr_inf_future(GPR_CLOCK_REALTIME));
467       }
468       gpr_mu_unlock(&shutdown_mu);
469     }
470
471     // Run the assertions to check if the test ran successfully.
472     GPR_ASSERT(sumtags == counter);
473     GPR_ASSERT(got_shutdown);
474     got_shutdown = false;
475   }
476
477   gpr_cv_destroy(&cv);
478   gpr_cv_destroy(&shutdown_cv);
479   gpr_mu_destroy(&mu);
480   gpr_mu_destroy(&shutdown_mu);
481 }
482
483 struct thread_state {
484   grpc_completion_queue* cc;
485   void* tag;
486 };
487
488 int main(int argc, char** argv) {
489   grpc::testing::TestEnvironment env(argc, argv);
490   grpc_init();
491   test_no_op();
492   test_pollset_conversion();
493   test_wait_empty();
494   test_shutdown_then_next_polling();
495   test_shutdown_then_next_with_timeout();
496   test_cq_end_op();
497   test_pluck();
498   test_pluck_after_shutdown();
499   test_cq_tls_cache_full();
500   test_cq_tls_cache_empty();
501   test_callback();
502   grpc_shutdown();
503   return 0;
504 }