Imported Upstream version 1.34.0
[platform/upstream/grpc.git] / test / cpp / qps / client_sync.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <chrono>
20 #include <memory>
21 #include <mutex>
22 #include <sstream>
23 #include <string>
24 #include <thread>
25 #include <vector>
26
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/server.h>
34 #include <grpcpp/server_builder.h>
35
36 #include "src/core/lib/profiling/timers.h"
37 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
38 #include "test/cpp/qps/client.h"
39 #include "test/cpp/qps/interarrival.h"
40 #include "test/cpp/qps/usage_timer.h"
41
42 namespace grpc {
43 namespace testing {
44
45 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
46     const std::shared_ptr<Channel>& ch) {
47   return BenchmarkService::NewStub(ch);
48 }
49
50 class SynchronousClient
51     : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
52  public:
53   SynchronousClient(const ClientConfig& config)
54       : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
55             config, BenchmarkStubCreator) {
56     num_threads_ =
57         config.outstanding_rpcs_per_channel() * config.client_channels();
58     responses_.resize(num_threads_);
59     SetupLoadTest(config, num_threads_);
60   }
61
62   ~SynchronousClient() override {}
63
64   virtual bool InitThreadFuncImpl(size_t thread_idx) = 0;
65   virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
66
67   void ThreadFunc(size_t thread_idx, Thread* t) override {
68     if (!InitThreadFuncImpl(thread_idx)) {
69       return;
70     }
71     for (;;) {
72       // run the loop body
73       HistogramEntry entry;
74       const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
75       t->UpdateHistogram(&entry);
76       if (!thread_still_ok || ThreadCompleted()) {
77         return;
78       }
79     }
80   }
81
82  protected:
83   // WaitToIssue returns false if we realize that we need to break out
84   bool WaitToIssue(int thread_idx) {
85     if (!closed_loop_) {
86       const gpr_timespec next_issue_time = NextIssueTime(thread_idx);
87       // Avoid sleeping for too long continuously because we might
88       // need to terminate before then. This is an issue since
89       // exponential distribution can occasionally produce bad outliers
90       while (true) {
91         const gpr_timespec one_sec_delay =
92             gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
93                          gpr_time_from_seconds(1, GPR_TIMESPAN));
94         if (gpr_time_cmp(next_issue_time, one_sec_delay) <= 0) {
95           gpr_sleep_until(next_issue_time);
96           return true;
97         } else {
98           gpr_sleep_until(one_sec_delay);
99           if (gpr_atm_acq_load(&thread_pool_done_) != static_cast<gpr_atm>(0)) {
100             return false;
101           }
102         }
103       }
104     }
105     return true;
106   }
107
108   size_t num_threads_;
109   std::vector<SimpleResponse> responses_;
110 };
111
112 class SynchronousUnaryClient final : public SynchronousClient {
113  public:
114   SynchronousUnaryClient(const ClientConfig& config)
115       : SynchronousClient(config) {
116     StartThreads(num_threads_);
117   }
118   ~SynchronousUnaryClient() override {}
119
120   bool InitThreadFuncImpl(size_t /*thread_idx*/) override { return true; }
121
122   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
123     if (!WaitToIssue(thread_idx)) {
124       return true;
125     }
126     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
127     double start = UsageTimer::Now();
128     GPR_TIMER_SCOPE("SynchronousUnaryClient::ThreadFunc", 0);
129     grpc::ClientContext context;
130     grpc::Status s =
131         stub->UnaryCall(&context, request_, &responses_[thread_idx]);
132     if (s.ok()) {
133       entry->set_value((UsageTimer::Now() - start) * 1e9);
134     }
135     entry->set_status(s.error_code());
136     return true;
137   }
138
139  private:
140   void DestroyMultithreading() final { EndThreads(); }
141 };
142
143 template <class StreamType>
144 class SynchronousStreamingClient : public SynchronousClient {
145  public:
146   SynchronousStreamingClient(const ClientConfig& config)
147       : SynchronousClient(config),
148         context_(num_threads_),
149         stream_(num_threads_),
150         stream_mu_(num_threads_),
151         shutdown_(num_threads_),
152         messages_per_stream_(config.messages_per_stream()),
153         messages_issued_(num_threads_) {
154     StartThreads(num_threads_);
155   }
156   ~SynchronousStreamingClient() override {
157     CleanupAllStreams([this](size_t thread_idx) {
158       // Don't log any kind of error since we may have canceled this
159       stream_[thread_idx]->Finish().IgnoreError();
160     });
161   }
162
163  protected:
164   std::vector<grpc::ClientContext> context_;
165   std::vector<std::unique_ptr<StreamType>> stream_;
166   // stream_mu_ is only needed when changing an element of stream_ or context_
167   std::vector<std::mutex> stream_mu_;
168   // use struct Bool rather than bool because vector<bool> is not concurrent
169   struct Bool {
170     bool val;
171     Bool() : val(false) {}
172   };
173   std::vector<Bool> shutdown_;
174   const int messages_per_stream_;
175   std::vector<int> messages_issued_;
176
177   void FinishStream(HistogramEntry* entry, size_t thread_idx) {
178     Status s = stream_[thread_idx]->Finish();
179     // don't set the value since the stream is failed and shouldn't be timed
180     entry->set_status(s.error_code());
181     if (!s.ok()) {
182       std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
183       if (!shutdown_[thread_idx].val) {
184         gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s",
185                 thread_idx, s.error_message().c_str());
186       }
187     }
188     // Lock the stream_mu_ now because the client context could change
189     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
190     context_[thread_idx].~ClientContext();
191     new (&context_[thread_idx]) ClientContext();
192   }
193
194   void CleanupAllStreams(const std::function<void(size_t)>& cleaner) {
195     std::vector<std::thread> cleanup_threads;
196     for (size_t i = 0; i < num_threads_; i++) {
197       cleanup_threads.emplace_back([this, i, cleaner] {
198         std::lock_guard<std::mutex> l(stream_mu_[i]);
199         shutdown_[i].val = true;
200         if (stream_[i]) {
201           cleaner(i);
202         }
203       });
204     }
205     for (auto& th : cleanup_threads) {
206       th.join();
207     }
208   }
209
210  private:
211   void DestroyMultithreading() final {
212     CleanupAllStreams(
213         [this](size_t thread_idx) { context_[thread_idx].TryCancel(); });
214     EndThreads();
215   }
216 };
217
218 class SynchronousStreamingPingPongClient final
219     : public SynchronousStreamingClient<
220           grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
221  public:
222   SynchronousStreamingPingPongClient(const ClientConfig& config)
223       : SynchronousStreamingClient(config) {}
224   ~SynchronousStreamingPingPongClient() override {
225     CleanupAllStreams(
226         [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
227   }
228
229  private:
230   bool InitThreadFuncImpl(size_t thread_idx) override {
231     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
232     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
233     if (!shutdown_[thread_idx].val) {
234       stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
235     } else {
236       return false;
237     }
238     messages_issued_[thread_idx] = 0;
239     return true;
240   }
241
242   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
243     if (!WaitToIssue(thread_idx)) {
244       return true;
245     }
246     GPR_TIMER_SCOPE("SynchronousStreamingPingPongClient::ThreadFunc", 0);
247     double start = UsageTimer::Now();
248     if (stream_[thread_idx]->Write(request_) &&
249         stream_[thread_idx]->Read(&responses_[thread_idx])) {
250       entry->set_value((UsageTimer::Now() - start) * 1e9);
251       // don't set the status since there isn't one yet
252       if ((messages_per_stream_ != 0) &&
253           (++messages_issued_[thread_idx] < messages_per_stream_)) {
254         return true;
255       } else if (messages_per_stream_ == 0) {
256         return true;
257       } else {
258         // Fall through to the below resetting code after finish
259       }
260     }
261     stream_[thread_idx]->WritesDone();
262     FinishStream(entry, thread_idx);
263     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
264     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
265     if (!shutdown_[thread_idx].val) {
266       stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
267     } else {
268       stream_[thread_idx].reset();
269       return false;
270     }
271     messages_issued_[thread_idx] = 0;
272     return true;
273   }
274 };
275
276 class SynchronousStreamingFromClientClient final
277     : public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> {
278  public:
279   SynchronousStreamingFromClientClient(const ClientConfig& config)
280       : SynchronousStreamingClient(config), last_issue_(num_threads_) {}
281   ~SynchronousStreamingFromClientClient() override {
282     CleanupAllStreams(
283         [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
284   }
285
286  private:
287   std::vector<double> last_issue_;
288
289   bool InitThreadFuncImpl(size_t thread_idx) override {
290     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
291     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
292     if (!shutdown_[thread_idx].val) {
293       stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
294                                                       &responses_[thread_idx]);
295     } else {
296       return false;
297     }
298     last_issue_[thread_idx] = UsageTimer::Now();
299     return true;
300   }
301
302   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
303     // Figure out how to make histogram sensible if this is rate-paced
304     if (!WaitToIssue(thread_idx)) {
305       return true;
306     }
307     GPR_TIMER_SCOPE("SynchronousStreamingFromClientClient::ThreadFunc", 0);
308     if (stream_[thread_idx]->Write(request_)) {
309       double now = UsageTimer::Now();
310       entry->set_value((now - last_issue_[thread_idx]) * 1e9);
311       last_issue_[thread_idx] = now;
312       return true;
313     }
314     stream_[thread_idx]->WritesDone();
315     FinishStream(entry, thread_idx);
316     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
317     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
318     if (!shutdown_[thread_idx].val) {
319       stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
320                                                       &responses_[thread_idx]);
321     } else {
322       stream_[thread_idx].reset();
323       return false;
324     }
325     return true;
326   }
327 };
328
329 class SynchronousStreamingFromServerClient final
330     : public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> {
331  public:
332   SynchronousStreamingFromServerClient(const ClientConfig& config)
333       : SynchronousStreamingClient(config), last_recv_(num_threads_) {}
334   ~SynchronousStreamingFromServerClient() override {}
335
336  private:
337   std::vector<double> last_recv_;
338
339   bool InitThreadFuncImpl(size_t thread_idx) override {
340     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
341     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
342     if (!shutdown_[thread_idx].val) {
343       stream_[thread_idx] =
344           stub->StreamingFromServer(&context_[thread_idx], request_);
345     } else {
346       return false;
347     }
348     last_recv_[thread_idx] = UsageTimer::Now();
349     return true;
350   }
351
352   bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
353     GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
354     if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
355       double now = UsageTimer::Now();
356       entry->set_value((now - last_recv_[thread_idx]) * 1e9);
357       last_recv_[thread_idx] = now;
358       return true;
359     }
360     FinishStream(entry, thread_idx);
361     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
362     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
363     if (!shutdown_[thread_idx].val) {
364       stream_[thread_idx] =
365           stub->StreamingFromServer(&context_[thread_idx], request_);
366     } else {
367       stream_[thread_idx].reset();
368       return false;
369     }
370     return true;
371   }
372 };
373
374 class SynchronousStreamingBothWaysClient final
375     : public SynchronousStreamingClient<
376           grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
377  public:
378   SynchronousStreamingBothWaysClient(const ClientConfig& config)
379       : SynchronousStreamingClient(config) {}
380   ~SynchronousStreamingBothWaysClient() override {
381     CleanupAllStreams(
382         [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
383   }
384
385  private:
386   bool InitThreadFuncImpl(size_t thread_idx) override {
387     auto* stub = channels_[thread_idx % channels_.size()].get_stub();
388     std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
389     if (!shutdown_[thread_idx].val) {
390       stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
391     } else {
392       return false;
393     }
394     return true;
395   }
396
397   bool ThreadFuncImpl(HistogramEntry* /*entry*/,
398                       size_t /*thread_idx*/) override {
399     // TODO (vjpai): Do this
400     return true;
401   }
402 };
403
404 std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {
405   GPR_ASSERT(!config.use_coalesce_api());  // not supported yet.
406   switch (config.rpc_type()) {
407     case UNARY:
408       return std::unique_ptr<Client>(new SynchronousUnaryClient(config));
409     case STREAMING:
410       return std::unique_ptr<Client>(
411           new SynchronousStreamingPingPongClient(config));
412     case STREAMING_FROM_CLIENT:
413       return std::unique_ptr<Client>(
414           new SynchronousStreamingFromClientClient(config));
415     case STREAMING_FROM_SERVER:
416       return std::unique_ptr<Client>(
417           new SynchronousStreamingFromServerClient(config));
418     case STREAMING_BOTH_WAYS:
419       return std::unique_ptr<Client>(
420           new SynchronousStreamingBothWaysClient(config));
421     default:
422       assert(false);
423       return nullptr;
424   }
425 }
426
427 }  // namespace testing
428 }  // namespace grpc