3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <benchmark/benchmark.h>
23 #include <grpc/grpc.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
27 #include "src/core/lib/iomgr/ev_posix.h"
28 #include "src/core/lib/iomgr/port.h"
29 #include "src/core/lib/surface/completion_queue.h"
30 #include "test/core/util/test_config.h"
31 #include "test/cpp/microbenchmarks/helpers.h"
32 #include "test/cpp/util/test_config.h"
40 static int g_threads_active;
45 static grpc_completion_queue* g_cq;
46 static grpc_event_engine_vtable g_vtable;
48 static void pollset_shutdown(grpc_pollset* /*ps*/, grpc_closure* closure) {
49 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
52 static void pollset_init(grpc_pollset* ps, gpr_mu** mu) {
57 static void pollset_destroy(grpc_pollset* ps) { gpr_mu_destroy(&ps->mu); }
59 static grpc_error_handle pollset_kick(grpc_pollset* /*p*/,
60 grpc_pollset_worker* /*worker*/) {
61 return GRPC_ERROR_NONE;
64 /* Callback when the tag is dequeued from the completion queue. Does nothing */
65 static void cq_done_cb(void* /*done_arg*/, grpc_cq_completion* cq_completion) {
66 gpr_free(cq_completion);
69 /* Queues a completion tag if deadline is > 0.
70 * Does nothing if deadline is 0 (i.e gpr_time_0(GPR_CLOCK_MONOTONIC)) */
71 static grpc_error_handle pollset_work(grpc_pollset* ps,
72 grpc_pollset_worker** /*worker*/,
73 grpc_millis deadline) {
75 gpr_log(GPR_DEBUG, "no-op");
76 return GRPC_ERROR_NONE;
79 gpr_mu_unlock(&ps->mu);
81 void* tag = reinterpret_cast<void*>(10); // Some random number
82 GPR_ASSERT(grpc_cq_begin_op(g_cq, tag));
84 g_cq, tag, GRPC_ERROR_NONE, cq_done_cb, nullptr,
85 static_cast<grpc_cq_completion*>(gpr_malloc(sizeof(grpc_cq_completion))));
86 grpc_core::ExecCtx::Get()->Flush();
88 return GRPC_ERROR_NONE;
91 static const grpc_event_engine_vtable* init_engine_vtable(bool) {
92 memset(&g_vtable, 0, sizeof(g_vtable));
94 g_vtable.pollset_size = sizeof(grpc_pollset);
95 g_vtable.pollset_init = pollset_init;
96 g_vtable.pollset_shutdown = pollset_shutdown;
97 g_vtable.pollset_destroy = pollset_destroy;
98 g_vtable.pollset_work = pollset_work;
99 g_vtable.pollset_kick = pollset_kick;
100 g_vtable.is_any_background_poller_thread = [] { return false; };
101 g_vtable.add_closure_to_background_poller = [](grpc_closure* /*closure*/,
102 grpc_error_handle /*error*/) {
105 g_vtable.shutdown_background_closure = [] {};
106 g_vtable.shutdown_engine = [] {};
111 static void setup() {
112 // This test should only ever be run with a non or any polling engine
113 // Override the polling engine for the non-polling engine
114 // and add a custom polling engine
115 grpc_register_event_engine_factory("none", init_engine_vtable, false);
116 grpc_register_event_engine_factory("bm_cq_multiple_threads",
117 init_engine_vtable, true);
120 GPR_ASSERT(strcmp(grpc_get_poll_strategy_name(), "none") == 0 ||
121 strcmp(grpc_get_poll_strategy_name(), "bm_cq_multiple_threads") ==
124 g_cq = grpc_completion_queue_create_for_next(nullptr);
127 static void teardown() {
128 grpc_completion_queue_shutdown(g_cq);
130 /* Drain any events */
131 gpr_timespec deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
132 while (grpc_completion_queue_next(g_cq, deadline, nullptr).type !=
133 GRPC_QUEUE_SHUTDOWN) {
137 grpc_completion_queue_destroy(g_cq);
141 /* A few notes about Multi-threaded benchmarks:
144 The benchmark framework ensures that none of the threads proceed beyond the
145 state.KeepRunning() call unless all the threads have called state.keepRunning
146 at least once. So it is safe to do the initialization in one of the threads
147 before state.KeepRunning() is called.
150 The benchmark framework also ensures that no thread is running the benchmark
151 code (i.e the code between two successive calls of state.KeepRunning()) if
152 state.KeepRunning() returns false. So it is safe to do the teardown in one
153 of the threads after state.keepRunning() returns false.
155 However, our use requires synchronization because we do additional work at
156 each thread that requires specific ordering (TrackCounters must be constructed
157 after grpc_init because it needs the number of cores, initialized by grpc,
158 and its Finish call must take place before grpc_shutdown so that it can use
161 static void BM_Cq_Throughput(benchmark::State& state) {
162 gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
163 auto thd_idx = state.thread_index;
170 gpr_cv_broadcast(&g_cv);
173 gpr_cv_wait(&g_cv, &g_mu, deadline);
176 gpr_mu_unlock(&g_mu);
178 // Use a TrackCounters object to monitor the gRPC performance statistics
179 // (optionally including low-level counters) before and after the test
180 TrackCounters track_counters;
182 for (auto _ : state) {
183 GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, nullptr).type ==
187 state.SetItemsProcessed(state.iterations());
188 track_counters.Finish(state);
192 if (g_threads_active == 0) {
193 gpr_cv_broadcast(&g_cv);
195 while (g_threads_active > 0) {
196 gpr_cv_wait(&g_cv, &g_mu, deadline);
199 gpr_mu_unlock(&g_mu);
207 BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime();
209 } // namespace testing
212 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
213 // and others do not. This allows us to support both modes.
214 namespace benchmark {
215 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
216 } // namespace benchmark
218 int main(int argc, char** argv) {
219 grpc::testing::TestEnvironment env(argc, argv);
222 ::benchmark::Initialize(&argc, argv);
223 ::grpc::testing::InitTest(&argc, &argv, false);
224 benchmark::RunTheBenchmarksNamespaced();