3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 /* Benchmark gRPC end2end in various configurations */
21 #ifndef TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
22 #define TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H
24 #include <benchmark/benchmark.h>
26 #include "src/core/lib/profiling/timers.h"
27 #include "src/proto/grpc/testing/echo.grpc.pb.h"
28 #include "test/cpp/microbenchmarks/fullstack_context_mutators.h"
29 #include "test/cpp/microbenchmarks/fullstack_fixtures.h"
34 /*******************************************************************************
35 * BENCHMARKING KERNELS
38 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
40 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
41 // messages in each call) in a loop on a single channel
43 // First parmeter (i.e state.range(0)): Message size (in bytes) to use
44 // Second parameter (i.e state.range(1)): Number of ping pong messages.
45 // Note: One ping-pong means two messages (one from client to server and
46 // the other from server to client):
47 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
48 static void BM_StreamingPingPong(benchmark::State& state) {
49 const int msg_size = state.range(0);
50 const int max_ping_pongs = state.range(1);
52 EchoTestService::AsyncService service;
53 std::unique_ptr<Fixture> fixture(new Fixture(&service));
55 EchoResponse send_response;
56 EchoResponse recv_response;
57 EchoRequest send_request;
58 EchoRequest recv_request;
61 send_request.set_message(std::string(msg_size, 'a'));
62 send_response.set_message(std::string(msg_size, 'b'));
65 std::unique_ptr<EchoTestService::Stub> stub(
66 EchoTestService::NewStub(fixture->channel()));
68 for (auto _ : state) {
69 ServerContext svr_ctx;
70 ServerContextMutator svr_ctx_mut(&svr_ctx);
71 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
72 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
73 fixture->cq(), tag(0));
75 ClientContext cli_ctx;
76 ClientContextMutator cli_ctx_mut(&cli_ctx);
77 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
79 // Establish async stream between client side and server side
82 int need_tags = (1 << 0) | (1 << 1);
84 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
86 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
87 GPR_ASSERT(need_tags & (1 << i));
88 need_tags &= ~(1 << i);
91 // Send 'max_ping_pongs' number of ping pong messages
92 int ping_pong_cnt = 0;
93 while (ping_pong_cnt < max_ping_pongs) {
94 request_rw->Write(send_request, tag(0)); // Start client send
95 response_rw.Read(&recv_request, tag(1)); // Start server recv
96 request_rw->Read(&recv_response, tag(2)); // Start client recv
98 need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
100 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
102 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
104 // If server recv is complete, start the server send operation
106 response_rw.Write(send_response, tag(3));
109 GPR_ASSERT(need_tags & (1 << i));
110 need_tags &= ~(1 << i);
116 request_rw->WritesDone(tag(0));
117 response_rw.Finish(Status::OK, tag(1));
120 request_rw->Finish(&recv_status, tag(2));
122 need_tags = (1 << 0) | (1 << 1) | (1 << 2);
124 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
125 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
126 GPR_ASSERT(need_tags & (1 << i));
127 need_tags &= ~(1 << i);
130 GPR_ASSERT(recv_status.ok());
134 fixture->Finish(state);
136 state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
139 // Repeatedly sends ping pong messages in a single streaming Bidi call in a loop
140 // First parmeter (i.e state.range(0)): Message size (in bytes) to use
141 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
142 static void BM_StreamingPingPongMsgs(benchmark::State& state) {
143 const int msg_size = state.range(0);
145 EchoTestService::AsyncService service;
146 std::unique_ptr<Fixture> fixture(new Fixture(&service));
148 EchoResponse send_response;
149 EchoResponse recv_response;
150 EchoRequest send_request;
151 EchoRequest recv_request;
154 send_request.set_message(std::string(msg_size, 'a'));
155 send_response.set_message(std::string(msg_size, 'b'));
158 std::unique_ptr<EchoTestService::Stub> stub(
159 EchoTestService::NewStub(fixture->channel()));
161 ServerContext svr_ctx;
162 ServerContextMutator svr_ctx_mut(&svr_ctx);
163 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
164 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
165 fixture->cq(), tag(0));
167 ClientContext cli_ctx;
168 ClientContextMutator cli_ctx_mut(&cli_ctx);
169 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
171 // Establish async stream between client side and server side
174 int need_tags = (1 << 0) | (1 << 1);
176 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
178 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
179 GPR_ASSERT(need_tags & (1 << i));
180 need_tags &= ~(1 << i);
183 for (auto _ : state) {
184 GPR_TIMER_SCOPE("BenchmarkCycle", 0);
185 request_rw->Write(send_request, tag(0)); // Start client send
186 response_rw.Read(&recv_request, tag(1)); // Start server recv
187 request_rw->Read(&recv_response, tag(2)); // Start client recv
189 need_tags = (1 << 0) | (1 << 1) | (1 << 2) | (1 << 3);
191 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
193 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
195 // If server recv is complete, start the server send operation
197 response_rw.Write(send_response, tag(3));
200 GPR_ASSERT(need_tags & (1 << i));
201 need_tags &= ~(1 << i);
205 request_rw->WritesDone(tag(0));
206 response_rw.Finish(Status::OK, tag(1));
208 request_rw->Finish(&recv_status, tag(2));
210 need_tags = (1 << 0) | (1 << 1) | (1 << 2);
212 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
213 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
214 GPR_ASSERT(need_tags & (1 << i));
215 need_tags &= ~(1 << i);
218 GPR_ASSERT(recv_status.ok());
221 fixture->Finish(state);
223 state.SetBytesProcessed(msg_size * state.iterations() * 2);
226 // Repeatedly makes Streaming Bidi calls (exchanging a configurable number of
227 // messages in each call) in a loop on a single channel. Different from
228 // BM_StreamingPingPong we are using stream coalescing api, e.g. WriteLast,
229 // WriteAndFinish, set_initial_metadata_corked. These apis aim at saving
230 // sendmsg syscalls for streaming by coalescing 1. initial metadata with first
231 // message; 2. final streaming message with trailing metadata.
233 // First parmeter (i.e state.range(0)): Message size (in bytes) to use
234 // Second parameter (i.e state.range(1)): Number of ping pong messages.
235 // Note: One ping-pong means two messages (one from client to server and
236 // the other from server to client):
237 // Third parameter (i.e state.range(2)): Switch between using WriteAndFinish
238 // API and WriteLast API for server.
239 template <class Fixture, class ClientContextMutator, class ServerContextMutator>
240 static void BM_StreamingPingPongWithCoalescingApi(benchmark::State& state) {
241 const int msg_size = state.range(0);
242 const int max_ping_pongs = state.range(1);
243 // This options is used to test out server API: WriteLast and WriteAndFinish
244 // respectively, since we can not use both of them on server side at the same
245 // time. Value 1 means we are testing out the WriteAndFinish API, and
246 // otherwise we are testing out the WriteLast API.
247 const int write_and_finish = state.range(2);
249 EchoTestService::AsyncService service;
250 std::unique_ptr<Fixture> fixture(new Fixture(&service));
252 EchoResponse send_response;
253 EchoResponse recv_response;
254 EchoRequest send_request;
255 EchoRequest recv_request;
258 send_request.set_message(std::string(msg_size, 'a'));
259 send_response.set_message(std::string(msg_size, 'b'));
262 std::unique_ptr<EchoTestService::Stub> stub(
263 EchoTestService::NewStub(fixture->channel()));
265 for (auto _ : state) {
266 ServerContext svr_ctx;
267 ServerContextMutator svr_ctx_mut(&svr_ctx);
268 ServerAsyncReaderWriter<EchoResponse, EchoRequest> response_rw(&svr_ctx);
269 service.RequestBidiStream(&svr_ctx, &response_rw, fixture->cq(),
270 fixture->cq(), tag(0));
272 ClientContext cli_ctx;
273 ClientContextMutator cli_ctx_mut(&cli_ctx);
274 cli_ctx.set_initial_metadata_corked(true);
275 // tag:1 here will never comes up, since we are not performing any op due
276 // to initial metadata coalescing.
277 auto request_rw = stub->AsyncBidiStream(&cli_ctx, fixture->cq(), tag(1));
283 // Send 'max_ping_pongs' number of ping pong messages
284 int ping_pong_cnt = 0;
285 while (ping_pong_cnt < max_ping_pongs) {
286 if (ping_pong_cnt == max_ping_pongs - 1) {
287 request_rw->WriteLast(send_request, WriteOptions(), tag(2));
289 request_rw->Write(send_request, tag(2)); // Start client send
292 int await_tags = (1 << 2);
294 if (ping_pong_cnt == 0) {
295 // wait for the server call structure (call_hook, etc.) to be
296 // initialized (async stream between client side and server side
297 // established). It is necessary when client init metadata is
299 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
300 while (static_cast<int>(reinterpret_cast<intptr_t>(t)) != 0) {
301 // In some cases tag:2 comes before tag:0 (write tag comes out
302 // first), this while loop is to make sure get tag:0.
303 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
304 GPR_ASSERT(await_tags & (1 << i));
305 await_tags &= ~(1 << i);
306 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
310 response_rw.Read(&recv_request, tag(3)); // Start server recv
311 request_rw->Read(&recv_response, tag(4)); // Start client recv
313 await_tags |= (1 << 3) | (1 << 4);
314 expect_tags = await_tags;
315 await_tags |= (1 << 5);
317 while (await_tags != 0) {
318 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
320 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
322 // If server recv is complete, start the server send operation
324 if (ping_pong_cnt == max_ping_pongs - 1) {
325 if (write_and_finish == 1) {
326 response_rw.WriteAndFinish(send_response, WriteOptions(),
328 expect_tags |= (1 << 5);
330 response_rw.WriteLast(send_response, WriteOptions(), tag(5));
331 // WriteLast buffers the write, so it's possible neither server
332 // write op nor client read op will finish inside the while
334 await_tags &= ~(1 << 4);
335 await_tags &= ~(1 << 5);
336 expect_tags |= (1 << 5);
339 response_rw.Write(send_response, tag(5));
340 expect_tags |= (1 << 5);
344 GPR_ASSERT(expect_tags & (1 << i));
345 expect_tags &= ~(1 << i);
346 await_tags &= ~(1 << i);
352 if (max_ping_pongs == 0) {
353 expect_tags |= (1 << 6) | (1 << 7) | (1 << 8);
355 if (write_and_finish == 1) {
356 expect_tags |= (1 << 8);
358 // server's buffered write and the client's read of the buffered write
359 // tags should come up.
360 expect_tags |= (1 << 7) | (1 << 8);
364 // No message write or initial metadata write happened yet.
365 if (max_ping_pongs == 0) {
366 request_rw->WritesDone(tag(6));
367 // wait for server call data structure(call_hook, etc.) to be
368 // initialized, since initial metadata is corked.
369 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
370 while (static_cast<int>(reinterpret_cast<intptr_t>(t)) != 0) {
371 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
372 GPR_ASSERT(expect_tags & (1 << i));
373 expect_tags &= ~(1 << i);
374 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
376 response_rw.Finish(Status::OK, tag(7));
378 if (write_and_finish != 1) {
379 response_rw.Finish(Status::OK, tag(7));
384 request_rw->Finish(&recv_status, tag(8));
386 while (expect_tags) {
387 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
388 int i = static_cast<int>(reinterpret_cast<intptr_t>(t));
389 GPR_ASSERT(expect_tags & (1 << i));
390 expect_tags &= ~(1 << i);
393 GPR_ASSERT(recv_status.ok());
397 fixture->Finish(state);
399 state.SetBytesProcessed(msg_size * state.iterations() * max_ping_pongs * 2);
401 } // namespace testing
404 #endif // TEST_CPP_MICROBENCHMARKS_FULLSTACK_STREAMING_PING_PONG_H