Imported Upstream version 1.34.0
[platform/upstream/grpc.git] / test / cpp / qps / client_callback.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <list>
20 #include <memory>
21 #include <mutex>
22 #include <sstream>
23 #include <string>
24 #include <thread>
25 #include <utility>
26 #include <vector>
27
28 #include <grpc/grpc.h>
29 #include <grpc/support/cpu.h>
30 #include <grpc/support/log.h>
31 #include <grpcpp/alarm.h>
32 #include <grpcpp/channel.h>
33 #include <grpcpp/client_context.h>
34
35 #include "absl/memory/memory.h"
36
37 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
38 #include "test/cpp/qps/client.h"
39 #include "test/cpp/qps/usage_timer.h"
40
41 namespace grpc {
42 namespace testing {
43
44 /**
45  * Maintains context info per RPC
46  */
47 struct CallbackClientRpcContext {
48   CallbackClientRpcContext(BenchmarkService::Stub* stub)
49       : alarm_(nullptr), stub_(stub) {}
50
51   ~CallbackClientRpcContext() {}
52
53   SimpleResponse response_;
54   ClientContext context_;
55   std::unique_ptr<Alarm> alarm_;
56   BenchmarkService::Stub* stub_;
57 };
58
59 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
60     const std::shared_ptr<Channel>& ch) {
61   return BenchmarkService::NewStub(ch);
62 }
63
64 class CallbackClient
65     : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
66  public:
67   CallbackClient(const ClientConfig& config)
68       : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
69             config, BenchmarkStubCreator) {
70     num_threads_ = NumThreads(config);
71     rpcs_done_ = 0;
72
73     //  Don't divide the fixed load among threads as the user threads
74     //  only bootstrap the RPCs
75     SetupLoadTest(config, 1);
76     total_outstanding_rpcs_ =
77         config.client_channels() * config.outstanding_rpcs_per_channel();
78   }
79
80   ~CallbackClient() override {}
81
82   /**
83    * The main thread of the benchmark will be waiting on DestroyMultithreading.
84    * Increment the rpcs_done_ variable to signify that the Callback RPC
85    * after thread completion is done. When the last outstanding rpc increments
86    * the counter it should also signal the main thread's conditional variable.
87    */
88   void NotifyMainThreadOfThreadCompletion() {
89     std::lock_guard<std::mutex> l(shutdown_mu_);
90     rpcs_done_++;
91     if (rpcs_done_ == total_outstanding_rpcs_) {
92       shutdown_cv_.notify_one();
93     }
94   }
95
96   gpr_timespec NextRPCIssueTime() {
97     std::lock_guard<std::mutex> l(next_issue_time_mu_);
98     return Client::NextIssueTime(0);
99   }
100
101  protected:
102   size_t num_threads_;
103   size_t total_outstanding_rpcs_;
104   // The below mutex and condition variable is used by main benchmark thread to
105   // wait on completion of all RPCs before shutdown
106   std::mutex shutdown_mu_;
107   std::condition_variable shutdown_cv_;
108   // Number of rpcs done after thread completion
109   size_t rpcs_done_;
110   // Vector of Context data pointers for running a RPC
111   std::vector<std::unique_ptr<CallbackClientRpcContext>> ctx_;
112
113   virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
114   virtual bool ThreadFuncImpl(Thread* t, size_t thread_idx) = 0;
115
116   void ThreadFunc(size_t thread_idx, Thread* t) override {
117     InitThreadFuncImpl(thread_idx);
118     ThreadFuncImpl(t, thread_idx);
119   }
120
121  private:
122   std::mutex next_issue_time_mu_;  // Used by next issue time
123
124   int NumThreads(const ClientConfig& config) {
125     int num_threads = config.async_client_threads();
126     if (num_threads <= 0) {  // Use dynamic sizing
127       num_threads = cores_;
128       gpr_log(GPR_INFO, "Sizing callback client to %d threads", num_threads);
129     }
130     return num_threads;
131   }
132
133   /**
134    * Wait until all outstanding Callback RPCs are done
135    */
136   void DestroyMultithreading() final {
137     std::unique_lock<std::mutex> l(shutdown_mu_);
138     while (rpcs_done_ != total_outstanding_rpcs_) {
139       shutdown_cv_.wait(l);
140     }
141     EndThreads();
142   }
143 };
144
145 class CallbackUnaryClient final : public CallbackClient {
146  public:
147   CallbackUnaryClient(const ClientConfig& config) : CallbackClient(config) {
148     for (int ch = 0; ch < config.client_channels(); ch++) {
149       for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
150         ctx_.emplace_back(
151             new CallbackClientRpcContext(channels_[ch].get_stub()));
152       }
153     }
154     StartThreads(num_threads_);
155   }
156   ~CallbackUnaryClient() override {}
157
158  protected:
159   bool ThreadFuncImpl(Thread* t, size_t thread_idx) override {
160     for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
161          vector_idx += num_threads_) {
162       ScheduleRpc(t, vector_idx);
163     }
164     return true;
165   }
166
167   void InitThreadFuncImpl(size_t /*thread_idx*/) override {}
168
169  private:
170   void ScheduleRpc(Thread* t, size_t vector_idx) {
171     if (!closed_loop_) {
172       gpr_timespec next_issue_time = NextRPCIssueTime();
173       // Start an alarm callback to run the internal callback after
174       // next_issue_time
175       if (ctx_[vector_idx]->alarm_ == nullptr) {
176         ctx_[vector_idx]->alarm_ = absl::make_unique<Alarm>();
177       }
178       ctx_[vector_idx]->alarm_->experimental().Set(
179           next_issue_time, [this, t, vector_idx](bool /*ok*/) {
180             IssueUnaryCallbackRpc(t, vector_idx);
181           });
182     } else {
183       IssueUnaryCallbackRpc(t, vector_idx);
184     }
185   }
186
187   void IssueUnaryCallbackRpc(Thread* t, size_t vector_idx) {
188     GPR_TIMER_SCOPE("CallbackUnaryClient::ThreadFunc", 0);
189     double start = UsageTimer::Now();
190     ctx_[vector_idx]->stub_->experimental_async()->UnaryCall(
191         (&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_,
192         [this, t, start, vector_idx](grpc::Status s) {
193           // Update Histogram with data from the callback run
194           HistogramEntry entry;
195           if (s.ok()) {
196             entry.set_value((UsageTimer::Now() - start) * 1e9);
197           }
198           entry.set_status(s.error_code());
199           t->UpdateHistogram(&entry);
200
201           if (ThreadCompleted() || !s.ok()) {
202             // Notify thread of completion
203             NotifyMainThreadOfThreadCompletion();
204           } else {
205             // Reallocate ctx for next RPC
206             ctx_[vector_idx] = absl::make_unique<CallbackClientRpcContext>(
207                 ctx_[vector_idx]->stub_);
208             // Schedule a new RPC
209             ScheduleRpc(t, vector_idx);
210           }
211         });
212   }
213 };
214
215 class CallbackStreamingClient : public CallbackClient {
216  public:
217   CallbackStreamingClient(const ClientConfig& config)
218       : CallbackClient(config),
219         messages_per_stream_(config.messages_per_stream()) {
220     for (int ch = 0; ch < config.client_channels(); ch++) {
221       for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
222         ctx_.emplace_back(
223             new CallbackClientRpcContext(channels_[ch].get_stub()));
224       }
225     }
226     StartThreads(num_threads_);
227   }
228   ~CallbackStreamingClient() override {}
229
230   void AddHistogramEntry(double start, bool ok, Thread* thread_ptr) {
231     // Update Histogram with data from the callback run
232     HistogramEntry entry;
233     if (ok) {
234       entry.set_value((UsageTimer::Now() - start) * 1e9);
235     }
236     thread_ptr->UpdateHistogram(&entry);
237   }
238
239   int messages_per_stream() { return messages_per_stream_; }
240
241  protected:
242   const int messages_per_stream_;
243 };
244
245 class CallbackStreamingPingPongClient : public CallbackStreamingClient {
246  public:
247   CallbackStreamingPingPongClient(const ClientConfig& config)
248       : CallbackStreamingClient(config) {}
249   ~CallbackStreamingPingPongClient() override {}
250 };
251
252 class CallbackStreamingPingPongReactor final
253     : public grpc::experimental::ClientBidiReactor<SimpleRequest,
254                                                    SimpleResponse> {
255  public:
256   CallbackStreamingPingPongReactor(
257       CallbackStreamingPingPongClient* client,
258       std::unique_ptr<CallbackClientRpcContext> ctx)
259       : client_(client), ctx_(std::move(ctx)), messages_issued_(0) {}
260
261   void StartNewRpc() {
262     ctx_->stub_->experimental_async()->StreamingCall(&(ctx_->context_), this);
263     write_time_ = UsageTimer::Now();
264     StartWrite(client_->request());
265     writes_done_started_.clear();
266     StartCall();
267   }
268
269   void OnWriteDone(bool ok) override {
270     if (!ok) {
271       gpr_log(GPR_ERROR, "Error writing RPC");
272     }
273     if ((!ok || client_->ThreadCompleted()) &&
274         !writes_done_started_.test_and_set()) {
275       StartWritesDone();
276     }
277     StartRead(&ctx_->response_);
278   }
279
280   void OnReadDone(bool ok) override {
281     client_->AddHistogramEntry(write_time_, ok, thread_ptr_);
282
283     if (client_->ThreadCompleted() || !ok ||
284         (client_->messages_per_stream() != 0 &&
285          ++messages_issued_ >= client_->messages_per_stream())) {
286       if (!ok) {
287         gpr_log(GPR_ERROR, "Error reading RPC");
288       }
289       if (!writes_done_started_.test_and_set()) {
290         StartWritesDone();
291       }
292       return;
293     }
294     if (!client_->IsClosedLoop()) {
295       gpr_timespec next_issue_time = client_->NextRPCIssueTime();
296       // Start an alarm callback to run the internal callback after
297       // next_issue_time
298       ctx_->alarm_->experimental().Set(next_issue_time, [this](bool /*ok*/) {
299         write_time_ = UsageTimer::Now();
300         StartWrite(client_->request());
301       });
302     } else {
303       write_time_ = UsageTimer::Now();
304       StartWrite(client_->request());
305     }
306   }
307
308   void OnDone(const Status& s) override {
309     if (client_->ThreadCompleted() || !s.ok()) {
310       client_->NotifyMainThreadOfThreadCompletion();
311       return;
312     }
313     ctx_ = absl::make_unique<CallbackClientRpcContext>(ctx_->stub_);
314     ScheduleRpc();
315   }
316
317   void ScheduleRpc() {
318     if (!client_->IsClosedLoop()) {
319       gpr_timespec next_issue_time = client_->NextRPCIssueTime();
320       // Start an alarm callback to run the internal callback after
321       // next_issue_time
322       if (ctx_->alarm_ == nullptr) {
323         ctx_->alarm_ = absl::make_unique<Alarm>();
324       }
325       ctx_->alarm_->experimental().Set(next_issue_time,
326                                        [this](bool /*ok*/) { StartNewRpc(); });
327     } else {
328       StartNewRpc();
329     }
330   }
331
332   void set_thread_ptr(Client::Thread* ptr) { thread_ptr_ = ptr; }
333
334   CallbackStreamingPingPongClient* client_;
335   std::unique_ptr<CallbackClientRpcContext> ctx_;
336   std::atomic_flag writes_done_started_;
337   Client::Thread* thread_ptr_;  // Needed to update histogram entries
338   double write_time_;           // Track ping-pong round start time
339   int messages_issued_;         // Messages issued by this stream
340 };
341
342 class CallbackStreamingPingPongClientImpl final
343     : public CallbackStreamingPingPongClient {
344  public:
345   CallbackStreamingPingPongClientImpl(const ClientConfig& config)
346       : CallbackStreamingPingPongClient(config) {
347     for (size_t i = 0; i < total_outstanding_rpcs_; i++) {
348       reactor_.emplace_back(
349           new CallbackStreamingPingPongReactor(this, std::move(ctx_[i])));
350     }
351   }
352   ~CallbackStreamingPingPongClientImpl() override {}
353
354   bool ThreadFuncImpl(Client::Thread* t, size_t thread_idx) override {
355     for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
356          vector_idx += num_threads_) {
357       reactor_[vector_idx]->set_thread_ptr(t);
358       reactor_[vector_idx]->ScheduleRpc();
359     }
360     return true;
361   }
362
363   void InitThreadFuncImpl(size_t /*thread_idx*/) override {}
364
365  private:
366   std::vector<std::unique_ptr<CallbackStreamingPingPongReactor>> reactor_;
367 };
368
369 // TODO(mhaidry) : Implement Streaming from client, server and both ways
370
371 std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config) {
372   switch (config.rpc_type()) {
373     case UNARY:
374       return std::unique_ptr<Client>(new CallbackUnaryClient(config));
375     case STREAMING:
376       return std::unique_ptr<Client>(
377           new CallbackStreamingPingPongClientImpl(config));
378     case STREAMING_FROM_CLIENT:
379     case STREAMING_FROM_SERVER:
380     case STREAMING_BOTH_WAYS:
381       assert(false);
382       return nullptr;
383     default:
384       assert(false);
385       return nullptr;
386   }
387 }
388
389 }  // namespace testing
390 }  // namespace grpc