d57a021cee7fa5c1138ca1a139bb9582a423968b
[platform/upstream/grpc.git] / test / cpp / microbenchmarks / bm_cq_multiple_threads.cc
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <benchmark/benchmark.h>
20 #include <string.h>
21 #include <atomic>
22
23 #include <grpc/grpc.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26
27 #include "src/core/lib/iomgr/ev_posix.h"
28 #include "src/core/lib/iomgr/port.h"
29 #include "src/core/lib/surface/completion_queue.h"
30 #include "test/core/util/test_config.h"
31 #include "test/cpp/microbenchmarks/helpers.h"
32 #include "test/cpp/util/test_config.h"
33
34 struct grpc_pollset {
35   gpr_mu mu;
36 };
37
38 static gpr_mu g_mu;
39 static gpr_cv g_cv;
40 static int g_threads_active;
41 static bool g_active;
42
43 namespace grpc {
44 namespace testing {
45 static grpc_completion_queue* g_cq;
46 static grpc_event_engine_vtable g_vtable;
47
48 static void pollset_shutdown(grpc_pollset* /*ps*/, grpc_closure* closure) {
49   grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
50 }
51
52 static void pollset_init(grpc_pollset* ps, gpr_mu** mu) {
53   gpr_mu_init(&ps->mu);
54   *mu = &ps->mu;
55 }
56
57 static void pollset_destroy(grpc_pollset* ps) { gpr_mu_destroy(&ps->mu); }
58
59 static grpc_error_handle pollset_kick(grpc_pollset* /*p*/,
60                                       grpc_pollset_worker* /*worker*/) {
61   return GRPC_ERROR_NONE;
62 }
63
64 /* Callback when the tag is dequeued from the completion queue. Does nothing */
65 static void cq_done_cb(void* /*done_arg*/, grpc_cq_completion* cq_completion) {
66   gpr_free(cq_completion);
67 }
68
69 /* Queues a completion tag if deadline is > 0.
70  * Does nothing if deadline is 0 (i.e gpr_time_0(GPR_CLOCK_MONOTONIC)) */
71 static grpc_error_handle pollset_work(grpc_pollset* ps,
72                                       grpc_pollset_worker** /*worker*/,
73                                       grpc_millis deadline) {
74   if (deadline == 0) {
75     gpr_log(GPR_DEBUG, "no-op");
76     return GRPC_ERROR_NONE;
77   }
78
79   gpr_mu_unlock(&ps->mu);
80
81   void* tag = reinterpret_cast<void*>(10);  // Some random number
82   GPR_ASSERT(grpc_cq_begin_op(g_cq, tag));
83   grpc_cq_end_op(
84       g_cq, tag, GRPC_ERROR_NONE, cq_done_cb, nullptr,
85       static_cast<grpc_cq_completion*>(gpr_malloc(sizeof(grpc_cq_completion))));
86   grpc_core::ExecCtx::Get()->Flush();
87   gpr_mu_lock(&ps->mu);
88   return GRPC_ERROR_NONE;
89 }
90
91 static const grpc_event_engine_vtable* init_engine_vtable(bool) {
92   memset(&g_vtable, 0, sizeof(g_vtable));
93
94   g_vtable.pollset_size = sizeof(grpc_pollset);
95   g_vtable.pollset_init = pollset_init;
96   g_vtable.pollset_shutdown = pollset_shutdown;
97   g_vtable.pollset_destroy = pollset_destroy;
98   g_vtable.pollset_work = pollset_work;
99   g_vtable.pollset_kick = pollset_kick;
100   g_vtable.is_any_background_poller_thread = [] { return false; };
101   g_vtable.add_closure_to_background_poller = [](grpc_closure* /*closure*/,
102                                                  grpc_error_handle /*error*/) {
103     return false;
104   };
105   g_vtable.shutdown_background_closure = [] {};
106   g_vtable.shutdown_engine = [] {};
107
108   return &g_vtable;
109 }
110
111 static void setup() {
112   // This test should only ever be run with a non or any polling engine
113   // Override the polling engine for the non-polling engine
114   // and add a custom polling engine
115   grpc_register_event_engine_factory("none", init_engine_vtable, false);
116   grpc_register_event_engine_factory("bm_cq_multiple_threads",
117                                      init_engine_vtable, true);
118
119   grpc_init();
120   GPR_ASSERT(strcmp(grpc_get_poll_strategy_name(), "none") == 0 ||
121              strcmp(grpc_get_poll_strategy_name(), "bm_cq_multiple_threads") ==
122                  0);
123
124   g_cq = grpc_completion_queue_create_for_next(nullptr);
125 }
126
127 static void teardown() {
128   grpc_completion_queue_shutdown(g_cq);
129
130   /* Drain any events */
131   gpr_timespec deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
132   while (grpc_completion_queue_next(g_cq, deadline, nullptr).type !=
133          GRPC_QUEUE_SHUTDOWN) {
134     /* Do nothing */
135   }
136
137   grpc_completion_queue_destroy(g_cq);
138   grpc_shutdown();
139 }
140
141 /* A few notes about Multi-threaded benchmarks:
142
143  Setup:
144   The benchmark framework ensures that none of the threads proceed beyond the
145   state.KeepRunning() call unless all the threads have called state.keepRunning
146   at least once.  So it is safe to do the initialization in one of the threads
147   before state.KeepRunning() is called.
148
149  Teardown:
150   The benchmark framework also ensures that no thread is running the benchmark
151   code (i.e the code between two successive calls of state.KeepRunning()) if
152   state.KeepRunning() returns false. So it is safe to do the teardown in one
153   of the threads after state.keepRunning() returns false.
154
155  However, our use requires synchronization because we do additional work at
156  each thread that requires specific ordering (TrackCounters must be constructed
157  after grpc_init because it needs the number of cores, initialized by grpc,
158  and its Finish call must take place before grpc_shutdown so that it can use
159  grpc_stats).
160 */
161 static void BM_Cq_Throughput(benchmark::State& state) {
162   gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
163   auto thd_idx = state.thread_index;
164
165   gpr_mu_lock(&g_mu);
166   g_threads_active++;
167   if (thd_idx == 0) {
168     setup();
169     g_active = true;
170     gpr_cv_broadcast(&g_cv);
171   } else {
172     while (!g_active) {
173       gpr_cv_wait(&g_cv, &g_mu, deadline);
174     }
175   }
176   gpr_mu_unlock(&g_mu);
177
178   // Use a TrackCounters object to monitor the gRPC performance statistics
179   // (optionally including low-level counters) before and after the test
180   TrackCounters track_counters;
181
182   for (auto _ : state) {
183     GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, nullptr).type ==
184                GRPC_OP_COMPLETE);
185   }
186
187   state.SetItemsProcessed(state.iterations());
188   track_counters.Finish(state);
189
190   gpr_mu_lock(&g_mu);
191   g_threads_active--;
192   if (g_threads_active == 0) {
193     gpr_cv_broadcast(&g_cv);
194   } else {
195     while (g_threads_active > 0) {
196       gpr_cv_wait(&g_cv, &g_mu, deadline);
197     }
198   }
199   gpr_mu_unlock(&g_mu);
200
201   if (thd_idx == 0) {
202     teardown();
203     g_active = false;
204   }
205 }
206
207 BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime();
208
209 }  // namespace testing
210 }  // namespace grpc
211
212 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
213 // and others do not. This allows us to support both modes.
214 namespace benchmark {
215 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
216 }  // namespace benchmark
217
218 int main(int argc, char** argv) {
219   grpc::testing::TestEnvironment env(argc, argv);
220   gpr_mu_init(&g_mu);
221   gpr_cv_init(&g_cv);
222   ::benchmark::Initialize(&argc, argv);
223   ::grpc::testing::InitTest(&argc, &argv, false);
224   benchmark::RunTheBenchmarksNamespaced();
225   return 0;
226 }