dcfa2dbf875aaea5e38fab7c0528e6c1d7b4fbf6
[platform/upstream/grpc.git] / test / cpp / qps / client_callback.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <list>
20 #include <memory>
21 #include <mutex>
22 #include <sstream>
23 #include <string>
24 #include <thread>
25 #include <utility>
26 #include <vector>
27
28 #include <grpc/grpc.h>
29 #include <grpc/support/cpu.h>
30 #include <grpc/support/log.h>
31 #include <grpcpp/alarm.h>
32 #include <grpcpp/channel.h>
33 #include <grpcpp/client_context.h>
34
35 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
36 #include "test/cpp/qps/client.h"
37 #include "test/cpp/qps/usage_timer.h"
38
39 namespace grpc {
40 namespace testing {
41
42 /**
43  * Maintains context info per RPC
44  */
45 struct CallbackClientRpcContext {
46   CallbackClientRpcContext(BenchmarkService::Stub* stub) : stub_(stub) {}
47
48   ~CallbackClientRpcContext() {}
49
50   SimpleResponse response_;
51   ClientContext context_;
52   Alarm alarm_;
53   BenchmarkService::Stub* stub_;
54 };
55
56 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
57     const std::shared_ptr<Channel>& ch) {
58   return BenchmarkService::NewStub(ch);
59 }
60
61 class CallbackClient
62     : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
63  public:
64   CallbackClient(const ClientConfig& config)
65       : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
66             config, BenchmarkStubCreator) {
67     num_threads_ = NumThreads(config);
68     rpcs_done_ = 0;
69
70     //  Don't divide the fixed load among threads as the user threads
71     //  only bootstrap the RPCs
72     SetupLoadTest(config, 1);
73     total_outstanding_rpcs_ =
74         config.client_channels() * config.outstanding_rpcs_per_channel();
75   }
76
77   virtual ~CallbackClient() {}
78
79   /**
80    * The main thread of the benchmark will be waiting on DestroyMultithreading.
81    * Increment the rpcs_done_ variable to signify that the Callback RPC
82    * after thread completion is done. When the last outstanding rpc increments
83    * the counter it should also signal the main thread's conditional variable.
84    */
85   void NotifyMainThreadOfThreadCompletion() {
86     std::lock_guard<std::mutex> l(shutdown_mu_);
87     rpcs_done_++;
88     if (rpcs_done_ == total_outstanding_rpcs_) {
89       shutdown_cv_.notify_one();
90     }
91   }
92
93   gpr_timespec NextRPCIssueTime() {
94     std::lock_guard<std::mutex> l(next_issue_time_mu_);
95     return Client::NextIssueTime(0);
96   }
97
98  protected:
99   size_t num_threads_;
100   size_t total_outstanding_rpcs_;
101   // The below mutex and condition variable is used by main benchmark thread to
102   // wait on completion of all RPCs before shutdown
103   std::mutex shutdown_mu_;
104   std::condition_variable shutdown_cv_;
105   // Number of rpcs done after thread completion
106   size_t rpcs_done_;
107   // Vector of Context data pointers for running a RPC
108   std::vector<std::unique_ptr<CallbackClientRpcContext>> ctx_;
109
110   virtual void InitThreadFuncImpl(size_t thread_idx) = 0;
111   virtual bool ThreadFuncImpl(Thread* t, size_t thread_idx) = 0;
112
113   void ThreadFunc(size_t thread_idx, Thread* t) override {
114     InitThreadFuncImpl(thread_idx);
115     ThreadFuncImpl(t, thread_idx);
116   }
117
118  private:
119   std::mutex next_issue_time_mu_;  // Used by next issue time
120
121   int NumThreads(const ClientConfig& config) {
122     int num_threads = config.async_client_threads();
123     if (num_threads <= 0) {  // Use dynamic sizing
124       num_threads = cores_;
125       gpr_log(GPR_INFO, "Sizing callback client to %d threads", num_threads);
126     }
127     return num_threads;
128   }
129
130   /**
131    * Wait until all outstanding Callback RPCs are done
132    */
133   void DestroyMultithreading() final {
134     std::unique_lock<std::mutex> l(shutdown_mu_);
135     while (rpcs_done_ != total_outstanding_rpcs_) {
136       shutdown_cv_.wait(l);
137     }
138     EndThreads();
139   }
140 };
141
142 class CallbackUnaryClient final : public CallbackClient {
143  public:
144   CallbackUnaryClient(const ClientConfig& config) : CallbackClient(config) {
145     for (int ch = 0; ch < config.client_channels(); ch++) {
146       for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
147         ctx_.emplace_back(
148             new CallbackClientRpcContext(channels_[ch].get_stub()));
149       }
150     }
151     StartThreads(num_threads_);
152   }
153   ~CallbackUnaryClient() {}
154
155  protected:
156   bool ThreadFuncImpl(Thread* t, size_t thread_idx) override {
157     for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
158          vector_idx += num_threads_) {
159       ScheduleRpc(t, vector_idx);
160     }
161     return true;
162   }
163
164   void InitThreadFuncImpl(size_t thread_idx) override { return; }
165
166  private:
167   void ScheduleRpc(Thread* t, size_t vector_idx) {
168     if (!closed_loop_) {
169       gpr_timespec next_issue_time = NextRPCIssueTime();
170       // Start an alarm callback to run the internal callback after
171       // next_issue_time
172       ctx_[vector_idx]->alarm_.experimental().Set(
173           next_issue_time, [this, t, vector_idx](bool ok) {
174             IssueUnaryCallbackRpc(t, vector_idx);
175           });
176     } else {
177       IssueUnaryCallbackRpc(t, vector_idx);
178     }
179   }
180
181   void IssueUnaryCallbackRpc(Thread* t, size_t vector_idx) {
182     GPR_TIMER_SCOPE("CallbackUnaryClient::ThreadFunc", 0);
183     double start = UsageTimer::Now();
184     ctx_[vector_idx]->stub_->experimental_async()->UnaryCall(
185         (&ctx_[vector_idx]->context_), &request_, &ctx_[vector_idx]->response_,
186         [this, t, start, vector_idx](grpc::Status s) {
187           // Update Histogram with data from the callback run
188           HistogramEntry entry;
189           if (s.ok()) {
190             entry.set_value((UsageTimer::Now() - start) * 1e9);
191           }
192           entry.set_status(s.error_code());
193           t->UpdateHistogram(&entry);
194
195           if (ThreadCompleted() || !s.ok()) {
196             // Notify thread of completion
197             NotifyMainThreadOfThreadCompletion();
198           } else {
199             // Reallocate ctx for next RPC
200             ctx_[vector_idx].reset(
201                 new CallbackClientRpcContext(ctx_[vector_idx]->stub_));
202             // Schedule a new RPC
203             ScheduleRpc(t, vector_idx);
204           }
205         });
206   }
207 };
208
209 class CallbackStreamingClient : public CallbackClient {
210  public:
211   CallbackStreamingClient(const ClientConfig& config)
212       : CallbackClient(config),
213         messages_per_stream_(config.messages_per_stream()) {
214     for (int ch = 0; ch < config.client_channels(); ch++) {
215       for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
216         ctx_.emplace_back(
217             new CallbackClientRpcContext(channels_[ch].get_stub()));
218       }
219     }
220     StartThreads(num_threads_);
221   }
222   ~CallbackStreamingClient() {}
223
224   void AddHistogramEntry(double start, bool ok, Thread* thread_ptr) {
225     // Update Histogram with data from the callback run
226     HistogramEntry entry;
227     if (ok) {
228       entry.set_value((UsageTimer::Now() - start) * 1e9);
229     }
230     thread_ptr->UpdateHistogram(&entry);
231   }
232
233   int messages_per_stream() { return messages_per_stream_; }
234
235  protected:
236   const int messages_per_stream_;
237 };
238
239 class CallbackStreamingPingPongClient : public CallbackStreamingClient {
240  public:
241   CallbackStreamingPingPongClient(const ClientConfig& config)
242       : CallbackStreamingClient(config) {}
243   ~CallbackStreamingPingPongClient() {}
244 };
245
246 class CallbackStreamingPingPongReactor final
247     : public grpc::experimental::ClientBidiReactor<SimpleRequest,
248                                                    SimpleResponse> {
249  public:
250   CallbackStreamingPingPongReactor(
251       CallbackStreamingPingPongClient* client,
252       std::unique_ptr<CallbackClientRpcContext> ctx)
253       : client_(client), ctx_(std::move(ctx)), messages_issued_(0) {}
254
255   void StartNewRpc() {
256     ctx_->stub_->experimental_async()->StreamingCall(&(ctx_->context_), this);
257     write_time_ = UsageTimer::Now();
258     StartWrite(client_->request());
259     writes_done_started_.clear();
260     StartCall();
261   }
262
263   void OnWriteDone(bool ok) override {
264     if (!ok) {
265       gpr_log(GPR_ERROR, "Error writing RPC");
266     }
267     if ((!ok || client_->ThreadCompleted()) &&
268         !writes_done_started_.test_and_set()) {
269       StartWritesDone();
270     }
271     StartRead(&ctx_->response_);
272   }
273
274   void OnReadDone(bool ok) override {
275     client_->AddHistogramEntry(write_time_, ok, thread_ptr_);
276
277     if (client_->ThreadCompleted() || !ok ||
278         (client_->messages_per_stream() != 0 &&
279          ++messages_issued_ >= client_->messages_per_stream())) {
280       if (!ok) {
281         gpr_log(GPR_ERROR, "Error reading RPC");
282       }
283       if (!writes_done_started_.test_and_set()) {
284         StartWritesDone();
285       }
286       return;
287     }
288     if (!client_->IsClosedLoop()) {
289       gpr_timespec next_issue_time = client_->NextRPCIssueTime();
290       // Start an alarm callback to run the internal callback after
291       // next_issue_time
292       ctx_->alarm_.experimental().Set(next_issue_time, [this](bool ok) {
293         write_time_ = UsageTimer::Now();
294         StartWrite(client_->request());
295       });
296     } else {
297       write_time_ = UsageTimer::Now();
298       StartWrite(client_->request());
299     }
300   }
301
302   void OnDone(const Status& s) override {
303     if (client_->ThreadCompleted() || !s.ok()) {
304       client_->NotifyMainThreadOfThreadCompletion();
305       return;
306     }
307     ctx_.reset(new CallbackClientRpcContext(ctx_->stub_));
308     ScheduleRpc();
309   }
310
311   void ScheduleRpc() {
312     if (!client_->IsClosedLoop()) {
313       gpr_timespec next_issue_time = client_->NextRPCIssueTime();
314       // Start an alarm callback to run the internal callback after
315       // next_issue_time
316       ctx_->alarm_.experimental().Set(next_issue_time,
317                                       [this](bool ok) { StartNewRpc(); });
318     } else {
319       StartNewRpc();
320     }
321   }
322
323   void set_thread_ptr(Client::Thread* ptr) { thread_ptr_ = ptr; }
324
325   CallbackStreamingPingPongClient* client_;
326   std::unique_ptr<CallbackClientRpcContext> ctx_;
327   std::atomic_flag writes_done_started_;
328   Client::Thread* thread_ptr_;  // Needed to update histogram entries
329   double write_time_;           // Track ping-pong round start time
330   int messages_issued_;         // Messages issued by this stream
331 };
332
333 class CallbackStreamingPingPongClientImpl final
334     : public CallbackStreamingPingPongClient {
335  public:
336   CallbackStreamingPingPongClientImpl(const ClientConfig& config)
337       : CallbackStreamingPingPongClient(config) {
338     for (size_t i = 0; i < total_outstanding_rpcs_; i++)
339       reactor_.emplace_back(
340           new CallbackStreamingPingPongReactor(this, std::move(ctx_[i])));
341   }
342   ~CallbackStreamingPingPongClientImpl() {}
343
344   bool ThreadFuncImpl(Client::Thread* t, size_t thread_idx) override {
345     for (size_t vector_idx = thread_idx; vector_idx < total_outstanding_rpcs_;
346          vector_idx += num_threads_) {
347       reactor_[vector_idx]->set_thread_ptr(t);
348       reactor_[vector_idx]->ScheduleRpc();
349     }
350     return true;
351   }
352
353   void InitThreadFuncImpl(size_t thread_idx) override {}
354
355  private:
356   std::vector<std::unique_ptr<CallbackStreamingPingPongReactor>> reactor_;
357 };
358
359 // TODO(mhaidry) : Implement Streaming from client, server and both ways
360
361 std::unique_ptr<Client> CreateCallbackClient(const ClientConfig& config) {
362   switch (config.rpc_type()) {
363     case UNARY:
364       return std::unique_ptr<Client>(new CallbackUnaryClient(config));
365     case STREAMING:
366       return std::unique_ptr<Client>(
367           new CallbackStreamingPingPongClientImpl(config));
368     case STREAMING_FROM_CLIENT:
369     case STREAMING_FROM_SERVER:
370     case STREAMING_BOTH_WAYS:
371       assert(false);
372       return nullptr;
373     default:
374       assert(false);
375       return nullptr;
376   }
377 }
378
379 }  // namespace testing
380 }  // namespace grpc