3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/server.h>
34 #include <grpcpp/server_builder.h>
36 #include "src/core/lib/profiling/timers.h"
37 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
38 #include "test/cpp/qps/client.h"
39 #include "test/cpp/qps/interarrival.h"
40 #include "test/cpp/qps/usage_timer.h"
45 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
46 const std::shared_ptr<Channel>& ch) {
47 return BenchmarkService::NewStub(ch);
50 class SynchronousClient
51 : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
53 SynchronousClient(const ClientConfig& config)
54 : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
55 config, BenchmarkStubCreator) {
57 config.outstanding_rpcs_per_channel() * config.client_channels();
58 responses_.resize(num_threads_);
59 SetupLoadTest(config, num_threads_);
62 ~SynchronousClient() override {}
64 virtual bool InitThreadFuncImpl(size_t thread_idx) = 0;
65 virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
67 void ThreadFunc(size_t thread_idx, Thread* t) override {
68 if (!InitThreadFuncImpl(thread_idx)) {
74 const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
75 t->UpdateHistogram(&entry);
76 if (!thread_still_ok || ThreadCompleted()) {
83 // WaitToIssue returns false if we realize that we need to break out
84 bool WaitToIssue(int thread_idx) {
86 const gpr_timespec next_issue_time = NextIssueTime(thread_idx);
87 // Avoid sleeping for too long continuously because we might
88 // need to terminate before then. This is an issue since
89 // exponential distribution can occasionally produce bad outliers
91 const gpr_timespec one_sec_delay =
92 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
93 gpr_time_from_seconds(1, GPR_TIMESPAN));
94 if (gpr_time_cmp(next_issue_time, one_sec_delay) <= 0) {
95 gpr_sleep_until(next_issue_time);
98 gpr_sleep_until(one_sec_delay);
99 if (gpr_atm_acq_load(&thread_pool_done_) != static_cast<gpr_atm>(0)) {
109 std::vector<SimpleResponse> responses_;
112 class SynchronousUnaryClient final : public SynchronousClient {
114 SynchronousUnaryClient(const ClientConfig& config)
115 : SynchronousClient(config) {
116 StartThreads(num_threads_);
118 ~SynchronousUnaryClient() override {}
120 bool InitThreadFuncImpl(size_t /*thread_idx*/) override { return true; }
122 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
123 if (!WaitToIssue(thread_idx)) {
126 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
127 double start = UsageTimer::Now();
128 GPR_TIMER_SCOPE("SynchronousUnaryClient::ThreadFunc", 0);
129 grpc::ClientContext context;
131 stub->UnaryCall(&context, request_, &responses_[thread_idx]);
133 entry->set_value((UsageTimer::Now() - start) * 1e9);
135 entry->set_status(s.error_code());
140 void DestroyMultithreading() final { EndThreads(); }
143 template <class StreamType>
144 class SynchronousStreamingClient : public SynchronousClient {
146 SynchronousStreamingClient(const ClientConfig& config)
147 : SynchronousClient(config),
148 context_(num_threads_),
149 stream_(num_threads_),
150 stream_mu_(num_threads_),
151 shutdown_(num_threads_),
152 messages_per_stream_(config.messages_per_stream()),
153 messages_issued_(num_threads_) {
154 StartThreads(num_threads_);
156 ~SynchronousStreamingClient() override {
157 CleanupAllStreams([this](size_t thread_idx) {
158 // Don't log any kind of error since we may have canceled this
159 stream_[thread_idx]->Finish().IgnoreError();
164 std::vector<grpc::ClientContext> context_;
165 std::vector<std::unique_ptr<StreamType>> stream_;
166 // stream_mu_ is only needed when changing an element of stream_ or context_
167 std::vector<std::mutex> stream_mu_;
168 // use struct Bool rather than bool because vector<bool> is not concurrent
171 Bool() : val(false) {}
173 std::vector<Bool> shutdown_;
174 const int messages_per_stream_;
175 std::vector<int> messages_issued_;
177 void FinishStream(HistogramEntry* entry, size_t thread_idx) {
178 Status s = stream_[thread_idx]->Finish();
179 // don't set the value since the stream is failed and shouldn't be timed
180 entry->set_status(s.error_code());
182 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
183 if (!shutdown_[thread_idx].val) {
184 gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s",
185 thread_idx, s.error_message().c_str());
188 // Lock the stream_mu_ now because the client context could change
189 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
190 context_[thread_idx].~ClientContext();
191 new (&context_[thread_idx]) ClientContext();
194 void CleanupAllStreams(const std::function<void(size_t)>& cleaner) {
195 std::vector<std::thread> cleanup_threads;
196 for (size_t i = 0; i < num_threads_; i++) {
197 cleanup_threads.emplace_back([this, i, cleaner] {
198 std::lock_guard<std::mutex> l(stream_mu_[i]);
199 shutdown_[i].val = true;
205 for (auto& th : cleanup_threads) {
211 void DestroyMultithreading() final {
213 [this](size_t thread_idx) { context_[thread_idx].TryCancel(); });
218 class SynchronousStreamingPingPongClient final
219 : public SynchronousStreamingClient<
220 grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
222 SynchronousStreamingPingPongClient(const ClientConfig& config)
223 : SynchronousStreamingClient(config) {}
224 ~SynchronousStreamingPingPongClient() override {
226 [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
230 bool InitThreadFuncImpl(size_t thread_idx) override {
231 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
232 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
233 if (!shutdown_[thread_idx].val) {
234 stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
238 messages_issued_[thread_idx] = 0;
242 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
243 if (!WaitToIssue(thread_idx)) {
246 GPR_TIMER_SCOPE("SynchronousStreamingPingPongClient::ThreadFunc", 0);
247 double start = UsageTimer::Now();
248 if (stream_[thread_idx]->Write(request_) &&
249 stream_[thread_idx]->Read(&responses_[thread_idx])) {
250 entry->set_value((UsageTimer::Now() - start) * 1e9);
251 // don't set the status since there isn't one yet
252 if ((messages_per_stream_ != 0) &&
253 (++messages_issued_[thread_idx] < messages_per_stream_)) {
255 } else if (messages_per_stream_ == 0) {
258 // Fall through to the below resetting code after finish
261 stream_[thread_idx]->WritesDone();
262 FinishStream(entry, thread_idx);
263 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
264 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
265 if (!shutdown_[thread_idx].val) {
266 stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
268 stream_[thread_idx].reset();
271 messages_issued_[thread_idx] = 0;
276 class SynchronousStreamingFromClientClient final
277 : public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> {
279 SynchronousStreamingFromClientClient(const ClientConfig& config)
280 : SynchronousStreamingClient(config), last_issue_(num_threads_) {}
281 ~SynchronousStreamingFromClientClient() override {
283 [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
287 std::vector<double> last_issue_;
289 bool InitThreadFuncImpl(size_t thread_idx) override {
290 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
291 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
292 if (!shutdown_[thread_idx].val) {
293 stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
294 &responses_[thread_idx]);
298 last_issue_[thread_idx] = UsageTimer::Now();
302 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
303 // Figure out how to make histogram sensible if this is rate-paced
304 if (!WaitToIssue(thread_idx)) {
307 GPR_TIMER_SCOPE("SynchronousStreamingFromClientClient::ThreadFunc", 0);
308 if (stream_[thread_idx]->Write(request_)) {
309 double now = UsageTimer::Now();
310 entry->set_value((now - last_issue_[thread_idx]) * 1e9);
311 last_issue_[thread_idx] = now;
314 stream_[thread_idx]->WritesDone();
315 FinishStream(entry, thread_idx);
316 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
317 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
318 if (!shutdown_[thread_idx].val) {
319 stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
320 &responses_[thread_idx]);
322 stream_[thread_idx].reset();
329 class SynchronousStreamingFromServerClient final
330 : public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> {
332 SynchronousStreamingFromServerClient(const ClientConfig& config)
333 : SynchronousStreamingClient(config), last_recv_(num_threads_) {}
334 ~SynchronousStreamingFromServerClient() override {}
337 std::vector<double> last_recv_;
339 bool InitThreadFuncImpl(size_t thread_idx) override {
340 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
341 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
342 if (!shutdown_[thread_idx].val) {
343 stream_[thread_idx] =
344 stub->StreamingFromServer(&context_[thread_idx], request_);
348 last_recv_[thread_idx] = UsageTimer::Now();
352 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
353 GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
354 if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
355 double now = UsageTimer::Now();
356 entry->set_value((now - last_recv_[thread_idx]) * 1e9);
357 last_recv_[thread_idx] = now;
360 FinishStream(entry, thread_idx);
361 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
362 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
363 if (!shutdown_[thread_idx].val) {
364 stream_[thread_idx] =
365 stub->StreamingFromServer(&context_[thread_idx], request_);
367 stream_[thread_idx].reset();
374 class SynchronousStreamingBothWaysClient final
375 : public SynchronousStreamingClient<
376 grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
378 SynchronousStreamingBothWaysClient(const ClientConfig& config)
379 : SynchronousStreamingClient(config) {}
380 ~SynchronousStreamingBothWaysClient() override {
382 [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
386 bool InitThreadFuncImpl(size_t thread_idx) override {
387 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
388 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
389 if (!shutdown_[thread_idx].val) {
390 stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
397 bool ThreadFuncImpl(HistogramEntry* /*entry*/,
398 size_t /*thread_idx*/) override {
399 // TODO (vjpai): Do this
404 std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {
405 GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
406 switch (config.rpc_type()) {
408 return std::unique_ptr<Client>(new SynchronousUnaryClient(config));
410 return std::unique_ptr<Client>(
411 new SynchronousStreamingPingPongClient(config));
412 case STREAMING_FROM_CLIENT:
413 return std::unique_ptr<Client>(
414 new SynchronousStreamingFromClientClient(config));
415 case STREAMING_FROM_SERVER:
416 return std::unique_ptr<Client>(
417 new SynchronousStreamingFromServerClient(config));
418 case STREAMING_BOTH_WAYS:
419 return std::unique_ptr<Client>(
420 new SynchronousStreamingBothWaysClient(config));
427 } // namespace testing