3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <forward_list>
30 #include <grpc/grpc.h>
31 #include <grpc/support/cpu.h>
32 #include <grpc/support/log.h>
33 #include <grpcpp/alarm.h>
34 #include <grpcpp/channel.h>
35 #include <grpcpp/client_context.h>
36 #include <grpcpp/generic/generic_stub.h>
38 #include "src/core/lib/surface/completion_queue.h"
39 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
40 #include "test/cpp/qps/client.h"
41 #include "test/cpp/qps/usage_timer.h"
42 #include "test/cpp/util/create_test_channel.h"
47 class ClientRpcContext {
50 virtual ~ClientRpcContext() {}
51 // next state, return false if done. Collect stats when appropriate
52 virtual bool RunNextState(bool, HistogramEntry* entry) = 0;
53 virtual void StartNewClone(CompletionQueue* cq) = 0;
54 static void* tag(ClientRpcContext* c) { return static_cast<void*>(c); }
55 static ClientRpcContext* detag(void* t) {
56 return static_cast<ClientRpcContext*>(t);
59 virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0;
60 virtual void TryCancel() = 0;
63 template <class RequestType, class ResponseType>
64 class ClientRpcContextUnaryImpl : public ClientRpcContext {
66 ClientRpcContextUnaryImpl(
67 BenchmarkService::Stub* stub, const RequestType& req,
68 std::function<gpr_timespec()> next_issue,
70 std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
71 BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
74 std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> on_done)
80 next_state_(State::READY),
82 next_issue_(std::move(next_issue)),
83 prepare_req_(prepare_req) {}
84 ~ClientRpcContextUnaryImpl() override {}
85 void Start(CompletionQueue* cq, const ClientConfig& config) override {
86 GPR_ASSERT(!config.use_coalesce_api()); // not supported.
89 bool RunNextState(bool ok, HistogramEntry* entry) override {
90 switch (next_state_) {
92 start_ = UsageTimer::Now();
93 response_reader_ = prepare_req_(stub_, &context_, req_, cq_);
94 response_reader_->StartCall();
95 next_state_ = State::RESP_DONE;
96 response_reader_->Finish(&response_, &status_,
97 ClientRpcContext::tag(this));
99 case State::RESP_DONE:
101 entry->set_value((UsageTimer::Now() - start_) * 1e9);
103 callback_(status_, &response_, entry);
104 next_state_ = State::INVALID;
111 void StartNewClone(CompletionQueue* cq) override {
112 auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_,
113 prepare_req_, callback_);
114 clone->StartInternal(cq);
116 void TryCancel() override { context_.TryCancel(); }
119 grpc::ClientContext context_;
120 BenchmarkService::Stub* stub_;
121 CompletionQueue* cq_;
122 std::unique_ptr<Alarm> alarm_;
123 const RequestType& req_;
124 ResponseType response_;
125 enum State { INVALID, READY, RESP_DONE };
127 std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> callback_;
128 std::function<gpr_timespec()> next_issue_;
129 std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
130 BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
133 grpc::Status status_;
135 std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
138 void StartInternal(CompletionQueue* cq) {
140 if (!next_issue_) { // ready to issue
141 RunNextState(true, nullptr);
142 } else { // wait for the issue time
143 alarm_.reset(new Alarm);
144 alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
149 template <class StubType, class RequestType>
150 class AsyncClient : public ClientImpl<StubType, RequestType> {
151 // Specify which protected members we are using since there is no
152 // member name resolution until the template types are fully resolved
154 using Client::NextIssuer;
155 using Client::SetupLoadTest;
156 using Client::closed_loop_;
157 using ClientImpl<StubType, RequestType>::cores_;
158 using ClientImpl<StubType, RequestType>::channels_;
159 using ClientImpl<StubType, RequestType>::request_;
160 AsyncClient(const ClientConfig& config,
161 std::function<ClientRpcContext*(
162 StubType*, std::function<gpr_timespec()> next_issue,
165 std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
167 : ClientImpl<StubType, RequestType>(config, create_stub),
168 num_async_threads_(NumThreads(config)) {
169 SetupLoadTest(config, num_async_threads_);
171 int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
172 int num_cqs = (num_async_threads_ + tpc - 1) / tpc; // ceiling operator
173 for (int i = 0; i < num_cqs; i++) {
174 cli_cqs_.emplace_back(new CompletionQueue);
177 for (int i = 0; i < num_async_threads_; i++) {
178 cq_.emplace_back(i % cli_cqs_.size());
179 next_issuers_.emplace_back(NextIssuer(i));
180 shutdown_state_.emplace_back(new PerThreadShutdownState());
184 for (int ch = 0; ch < config.client_channels(); ch++) {
185 for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
186 auto* cq = cli_cqs_[t].get();
188 setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_);
189 ctx->Start(cq, config);
191 t = (t + 1) % cli_cqs_.size();
194 virtual ~AsyncClient() {
195 for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
198 while ((*cq)->Next(&got_tag, &ok)) {
199 delete ClientRpcContext::detag(got_tag);
204 int GetPollCount() override {
206 for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
207 count += grpc_get_cq_poll_num((*cq)->cq());
213 const int num_async_threads_;
216 struct PerThreadShutdownState {
217 mutable std::mutex mutex;
219 PerThreadShutdownState() : shutdown(false) {}
222 int NumThreads(const ClientConfig& config) {
223 int num_threads = config.async_client_threads();
224 if (num_threads <= 0) { // Use dynamic sizing
225 num_threads = cores_;
226 gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
230 void DestroyMultithreading() override final {
231 for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
232 std::lock_guard<std::mutex> lock((*ss)->mutex);
233 (*ss)->shutdown = true;
235 for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
238 this->EndThreads(); // this needed for resolution
241 ClientRpcContext* ProcessTag(size_t thread_idx, void* tag) {
242 ClientRpcContext* ctx = ClientRpcContext::detag(tag);
243 if (shutdown_state_[thread_idx]->shutdown) {
247 while (cli_cqs_[cq_[thread_idx]]->Next(&tag, &ok)) {
248 ctx = ClientRpcContext::detag(tag);
257 void ThreadFunc(size_t thread_idx, Client::Thread* t) override final {
261 HistogramEntry entry;
262 HistogramEntry* entry_ptr = &entry;
263 if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
266 std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;
268 ClientRpcContext* ctx = ProcessTag(thread_idx, got_tag);
269 if (ctx == nullptr) {
270 shutdown_mu->unlock();
273 while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
274 [&, ctx, ok, entry_ptr, shutdown_mu]() {
275 if (!ctx->RunNextState(ok, entry_ptr)) {
276 // The RPC and callback are done, so clone the ctx
277 // and kickstart the new one
278 ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
281 shutdown_mu->unlock();
283 &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) {
284 t->UpdateHistogram(entry_ptr);
285 entry = HistogramEntry();
287 ctx = ProcessTag(thread_idx, got_tag);
288 if (ctx == nullptr) {
289 shutdown_mu->unlock();
295 std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
296 std::vector<int> cq_;
297 std::vector<std::function<gpr_timespec()>> next_issuers_;
298 std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
301 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
302 const std::shared_ptr<Channel>& ch) {
303 return BenchmarkService::NewStub(ch);
306 class AsyncUnaryClient final
307 : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
309 explicit AsyncUnaryClient(const ClientConfig& config)
310 : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
311 config, SetupCtx, BenchmarkStubCreator) {
312 StartThreads(num_async_threads_);
314 ~AsyncUnaryClient() override {}
317 static void CheckDone(const grpc::Status& s, SimpleResponse* response,
318 HistogramEntry* entry) {
319 entry->set_status(s.error_code());
321 static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
322 PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
323 const SimpleRequest& request, CompletionQueue* cq) {
324 return stub->PrepareAsyncUnaryCall(ctx, request, cq);
326 static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
327 std::function<gpr_timespec()> next_issue,
328 const SimpleRequest& req) {
329 return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
330 stub, req, std::move(next_issue), AsyncUnaryClient::PrepareReq,
331 AsyncUnaryClient::CheckDone);
335 template <class RequestType, class ResponseType>
336 class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
338 ClientRpcContextStreamingPingPongImpl(
339 BenchmarkService::Stub* stub, const RequestType& req,
340 std::function<gpr_timespec()> next_issue,
341 std::function<std::unique_ptr<
342 grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
343 BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>
345 std::function<void(grpc::Status, ResponseType*)> on_done)
351 next_state_(State::INVALID),
353 next_issue_(std::move(next_issue)),
354 prepare_req_(prepare_req),
356 ~ClientRpcContextStreamingPingPongImpl() override {}
357 void Start(CompletionQueue* cq, const ClientConfig& config) override {
358 StartInternal(cq, config.messages_per_stream(), config.use_coalesce_api());
360 bool RunNextState(bool ok, HistogramEntry* entry) override {
362 switch (next_state_) {
363 case State::STREAM_IDLE:
364 if (!next_issue_) { // ready to issue
365 next_state_ = State::READY_TO_WRITE;
367 next_state_ = State::WAIT;
369 break; // loop around, don't return
371 next_state_ = State::READY_TO_WRITE;
372 alarm_.reset(new Alarm);
373 alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
375 case State::READY_TO_WRITE:
379 start_ = UsageTimer::Now();
380 next_state_ = State::WRITE_DONE;
381 if (coalesce_ && messages_issued_ == messages_per_stream_ - 1) {
382 stream_->WriteLast(req_, WriteOptions(),
383 ClientRpcContext::tag(this));
385 stream_->Write(req_, ClientRpcContext::tag(this));
388 case State::WRITE_DONE:
392 next_state_ = State::READ_DONE;
393 stream_->Read(&response_, ClientRpcContext::tag(this));
396 case State::READ_DONE:
397 entry->set_value((UsageTimer::Now() - start_) * 1e9);
398 callback_(status_, &response_);
399 if ((messages_per_stream_ != 0) &&
400 (++messages_issued_ >= messages_per_stream_)) {
401 next_state_ = State::WRITES_DONE_DONE;
403 // WritesDone should have been called on the last Write.
404 // loop around to call Finish.
407 stream_->WritesDone(ClientRpcContext::tag(this));
410 next_state_ = State::STREAM_IDLE;
411 break; // loop around
412 case State::WRITES_DONE_DONE:
413 next_state_ = State::FINISH_DONE;
414 stream_->Finish(&status_, ClientRpcContext::tag(this));
416 case State::FINISH_DONE:
417 next_state_ = State::INVALID;
426 void StartNewClone(CompletionQueue* cq) override {
427 auto* clone = new ClientRpcContextStreamingPingPongImpl(
428 stub_, req_, next_issue_, prepare_req_, callback_);
429 clone->StartInternal(cq, messages_per_stream_, coalesce_);
431 void TryCancel() override { context_.TryCancel(); }
434 grpc::ClientContext context_;
435 BenchmarkService::Stub* stub_;
436 CompletionQueue* cq_;
437 std::unique_ptr<Alarm> alarm_;
438 const RequestType& req_;
439 ResponseType response_;
451 std::function<void(grpc::Status, ResponseType*)> callback_;
452 std::function<gpr_timespec()> next_issue_;
454 std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
455 BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>
457 grpc::Status status_;
459 std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
462 // Allow a limit on number of messages in a stream
463 int messages_per_stream_;
464 int messages_issued_;
465 // Whether to use coalescing API.
468 void StartInternal(CompletionQueue* cq, int messages_per_stream,
471 messages_per_stream_ = messages_per_stream;
472 messages_issued_ = 0;
473 coalesce_ = coalesce;
475 GPR_ASSERT(messages_per_stream_ != 0);
476 context_.set_initial_metadata_corked(true);
478 stream_ = prepare_req_(stub_, &context_, cq);
479 next_state_ = State::STREAM_IDLE;
480 stream_->StartCall(ClientRpcContext::tag(this));
482 // When the initial metadata is corked, the tag will not come back and we
483 // need to manually drive the state machine.
484 RunNextState(true, nullptr);
489 class AsyncStreamingPingPongClient final
490 : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
492 explicit AsyncStreamingPingPongClient(const ClientConfig& config)
493 : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
494 config, SetupCtx, BenchmarkStubCreator) {
495 StartThreads(num_async_threads_);
498 ~AsyncStreamingPingPongClient() override {}
501 static void CheckDone(const grpc::Status& s, SimpleResponse* response) {}
502 static std::unique_ptr<
503 grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
504 PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
505 CompletionQueue* cq) {
506 auto stream = stub->PrepareAsyncStreamingCall(ctx, cq);
509 static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
510 std::function<gpr_timespec()> next_issue,
511 const SimpleRequest& req) {
512 return new ClientRpcContextStreamingPingPongImpl<SimpleRequest,
514 stub, req, std::move(next_issue),
515 AsyncStreamingPingPongClient::PrepareReq,
516 AsyncStreamingPingPongClient::CheckDone);
520 template <class RequestType, class ResponseType>
521 class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
523 ClientRpcContextStreamingFromClientImpl(
524 BenchmarkService::Stub* stub, const RequestType& req,
525 std::function<gpr_timespec()> next_issue,
526 std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
527 BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
530 std::function<void(grpc::Status, ResponseType*)> on_done)
536 next_state_(State::INVALID),
538 next_issue_(std::move(next_issue)),
539 prepare_req_(prepare_req) {}
540 ~ClientRpcContextStreamingFromClientImpl() override {}
541 void Start(CompletionQueue* cq, const ClientConfig& config) override {
542 GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
545 bool RunNextState(bool ok, HistogramEntry* entry) override {
547 switch (next_state_) {
548 case State::STREAM_IDLE:
549 if (!next_issue_) { // ready to issue
550 next_state_ = State::READY_TO_WRITE;
552 next_state_ = State::WAIT;
554 break; // loop around, don't return
556 alarm_.reset(new Alarm);
557 alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
558 next_state_ = State::READY_TO_WRITE;
560 case State::READY_TO_WRITE:
564 start_ = UsageTimer::Now();
565 next_state_ = State::WRITE_DONE;
566 stream_->Write(req_, ClientRpcContext::tag(this));
568 case State::WRITE_DONE:
572 entry->set_value((UsageTimer::Now() - start_) * 1e9);
573 next_state_ = State::STREAM_IDLE;
574 break; // loop around
581 void StartNewClone(CompletionQueue* cq) override {
582 auto* clone = new ClientRpcContextStreamingFromClientImpl(
583 stub_, req_, next_issue_, prepare_req_, callback_);
584 clone->StartInternal(cq);
586 void TryCancel() override { context_.TryCancel(); }
589 grpc::ClientContext context_;
590 BenchmarkService::Stub* stub_;
591 CompletionQueue* cq_;
592 std::unique_ptr<Alarm> alarm_;
593 const RequestType& req_;
594 ResponseType response_;
603 std::function<void(grpc::Status, ResponseType*)> callback_;
604 std::function<gpr_timespec()> next_issue_;
605 std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
606 BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
609 grpc::Status status_;
611 std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> stream_;
613 void StartInternal(CompletionQueue* cq) {
615 stream_ = prepare_req_(stub_, &context_, &response_, cq);
616 next_state_ = State::STREAM_IDLE;
617 stream_->StartCall(ClientRpcContext::tag(this));
621 class AsyncStreamingFromClientClient final
622 : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
624 explicit AsyncStreamingFromClientClient(const ClientConfig& config)
625 : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
626 config, SetupCtx, BenchmarkStubCreator) {
627 StartThreads(num_async_threads_);
630 ~AsyncStreamingFromClientClient() override {}
633 static void CheckDone(const grpc::Status& s, SimpleResponse* response) {}
634 static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> PrepareReq(
635 BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
636 SimpleResponse* resp, CompletionQueue* cq) {
637 auto stream = stub->PrepareAsyncStreamingFromClient(ctx, resp, cq);
640 static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
641 std::function<gpr_timespec()> next_issue,
642 const SimpleRequest& req) {
643 return new ClientRpcContextStreamingFromClientImpl<SimpleRequest,
645 stub, req, std::move(next_issue),
646 AsyncStreamingFromClientClient::PrepareReq,
647 AsyncStreamingFromClientClient::CheckDone);
651 template <class RequestType, class ResponseType>
652 class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
654 ClientRpcContextStreamingFromServerImpl(
655 BenchmarkService::Stub* stub, const RequestType& req,
656 std::function<gpr_timespec()> next_issue,
657 std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
658 BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
661 std::function<void(grpc::Status, ResponseType*)> on_done)
667 next_state_(State::INVALID),
669 next_issue_(std::move(next_issue)),
670 prepare_req_(prepare_req) {}
671 ~ClientRpcContextStreamingFromServerImpl() override {}
672 void Start(CompletionQueue* cq, const ClientConfig& config) override {
673 GPR_ASSERT(!config.use_coalesce_api()); // not supported
676 bool RunNextState(bool ok, HistogramEntry* entry) override {
678 switch (next_state_) {
679 case State::STREAM_IDLE:
683 start_ = UsageTimer::Now();
684 next_state_ = State::READ_DONE;
685 stream_->Read(&response_, ClientRpcContext::tag(this));
687 case State::READ_DONE:
691 entry->set_value((UsageTimer::Now() - start_) * 1e9);
692 callback_(status_, &response_);
693 next_state_ = State::STREAM_IDLE;
694 break; // loop around
701 void StartNewClone(CompletionQueue* cq) override {
702 auto* clone = new ClientRpcContextStreamingFromServerImpl(
703 stub_, req_, next_issue_, prepare_req_, callback_);
704 clone->StartInternal(cq);
706 void TryCancel() override { context_.TryCancel(); }
709 grpc::ClientContext context_;
710 BenchmarkService::Stub* stub_;
711 CompletionQueue* cq_;
712 std::unique_ptr<Alarm> alarm_;
713 const RequestType& req_;
714 ResponseType response_;
715 enum State { INVALID, STREAM_IDLE, READ_DONE };
717 std::function<void(grpc::Status, ResponseType*)> callback_;
718 std::function<gpr_timespec()> next_issue_;
719 std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
720 BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
723 grpc::Status status_;
725 std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> stream_;
727 void StartInternal(CompletionQueue* cq) {
728 // TODO(vjpai): Add support to rate-pace this
730 stream_ = prepare_req_(stub_, &context_, req_, cq);
731 next_state_ = State::STREAM_IDLE;
732 stream_->StartCall(ClientRpcContext::tag(this));
736 class AsyncStreamingFromServerClient final
737 : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
739 explicit AsyncStreamingFromServerClient(const ClientConfig& config)
740 : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
741 config, SetupCtx, BenchmarkStubCreator) {
742 StartThreads(num_async_threads_);
745 ~AsyncStreamingFromServerClient() override {}
748 static void CheckDone(const grpc::Status& s, SimpleResponse* response) {}
749 static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> PrepareReq(
750 BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
751 const SimpleRequest& req, CompletionQueue* cq) {
752 auto stream = stub->PrepareAsyncStreamingFromServer(ctx, req, cq);
755 static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
756 std::function<gpr_timespec()> next_issue,
757 const SimpleRequest& req) {
758 return new ClientRpcContextStreamingFromServerImpl<SimpleRequest,
760 stub, req, std::move(next_issue),
761 AsyncStreamingFromServerClient::PrepareReq,
762 AsyncStreamingFromServerClient::CheckDone);
766 class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
768 ClientRpcContextGenericStreamingImpl(
769 grpc::GenericStub* stub, const ByteBuffer& req,
770 std::function<gpr_timespec()> next_issue,
771 std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
772 grpc::GenericStub*, grpc::ClientContext*,
773 const grpc::string& method_name, CompletionQueue*)>
775 std::function<void(grpc::Status, ByteBuffer*)> on_done)
781 next_state_(State::INVALID),
782 callback_(std::move(on_done)),
783 next_issue_(std::move(next_issue)),
784 prepare_req_(std::move(prepare_req)) {}
785 ~ClientRpcContextGenericStreamingImpl() override {}
786 void Start(CompletionQueue* cq, const ClientConfig& config) override {
787 GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
788 StartInternal(cq, config.messages_per_stream());
790 bool RunNextState(bool ok, HistogramEntry* entry) override {
792 switch (next_state_) {
793 case State::STREAM_IDLE:
794 if (!next_issue_) { // ready to issue
795 next_state_ = State::READY_TO_WRITE;
797 next_state_ = State::WAIT;
799 break; // loop around, don't return
801 next_state_ = State::READY_TO_WRITE;
802 alarm_.reset(new Alarm);
803 alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
805 case State::READY_TO_WRITE:
809 start_ = UsageTimer::Now();
810 next_state_ = State::WRITE_DONE;
811 stream_->Write(req_, ClientRpcContext::tag(this));
813 case State::WRITE_DONE:
817 next_state_ = State::READ_DONE;
818 stream_->Read(&response_, ClientRpcContext::tag(this));
821 case State::READ_DONE:
822 entry->set_value((UsageTimer::Now() - start_) * 1e9);
823 callback_(status_, &response_);
824 if ((messages_per_stream_ != 0) &&
825 (++messages_issued_ >= messages_per_stream_)) {
826 next_state_ = State::WRITES_DONE_DONE;
827 stream_->WritesDone(ClientRpcContext::tag(this));
830 next_state_ = State::STREAM_IDLE;
831 break; // loop around
832 case State::WRITES_DONE_DONE:
833 next_state_ = State::FINISH_DONE;
834 stream_->Finish(&status_, ClientRpcContext::tag(this));
836 case State::FINISH_DONE:
837 next_state_ = State::INVALID;
846 void StartNewClone(CompletionQueue* cq) override {
847 auto* clone = new ClientRpcContextGenericStreamingImpl(
848 stub_, req_, next_issue_, prepare_req_, callback_);
849 clone->StartInternal(cq, messages_per_stream_);
851 void TryCancel() override { context_.TryCancel(); }
854 grpc::ClientContext context_;
855 grpc::GenericStub* stub_;
856 CompletionQueue* cq_;
857 std::unique_ptr<Alarm> alarm_;
859 ByteBuffer response_;
871 std::function<void(grpc::Status, ByteBuffer*)> callback_;
872 std::function<gpr_timespec()> next_issue_;
873 std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
874 grpc::GenericStub*, grpc::ClientContext*, const grpc::string&,
877 grpc::Status status_;
879 std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_;
881 // Allow a limit on number of messages in a stream
882 int messages_per_stream_;
883 int messages_issued_;
885 void StartInternal(CompletionQueue* cq, int messages_per_stream) {
887 const grpc::string kMethodName(
888 "/grpc.testing.BenchmarkService/StreamingCall");
889 messages_per_stream_ = messages_per_stream;
890 messages_issued_ = 0;
891 stream_ = prepare_req_(stub_, &context_, kMethodName, cq);
892 next_state_ = State::STREAM_IDLE;
893 stream_->StartCall(ClientRpcContext::tag(this));
897 static std::unique_ptr<grpc::GenericStub> GenericStubCreator(
898 const std::shared_ptr<Channel>& ch) {
899 return std::unique_ptr<grpc::GenericStub>(new grpc::GenericStub(ch));
902 class GenericAsyncStreamingClient final
903 : public AsyncClient<grpc::GenericStub, ByteBuffer> {
905 explicit GenericAsyncStreamingClient(const ClientConfig& config)
906 : AsyncClient<grpc::GenericStub, ByteBuffer>(config, SetupCtx,
907 GenericStubCreator) {
908 StartThreads(num_async_threads_);
911 ~GenericAsyncStreamingClient() override {}
914 static void CheckDone(const grpc::Status& s, ByteBuffer* response) {}
915 static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> PrepareReq(
916 grpc::GenericStub* stub, grpc::ClientContext* ctx,
917 const grpc::string& method_name, CompletionQueue* cq) {
918 auto stream = stub->PrepareCall(ctx, method_name, cq);
921 static ClientRpcContext* SetupCtx(grpc::GenericStub* stub,
922 std::function<gpr_timespec()> next_issue,
923 const ByteBuffer& req) {
924 return new ClientRpcContextGenericStreamingImpl(
925 stub, req, std::move(next_issue),
926 GenericAsyncStreamingClient::PrepareReq,
927 GenericAsyncStreamingClient::CheckDone);
931 std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& config) {
932 switch (config.rpc_type()) {
934 return std::unique_ptr<Client>(new AsyncUnaryClient(config));
936 return std::unique_ptr<Client>(new AsyncStreamingPingPongClient(config));
937 case STREAMING_FROM_CLIENT:
938 return std::unique_ptr<Client>(
939 new AsyncStreamingFromClientClient(config));
940 case STREAMING_FROM_SERVER:
941 return std::unique_ptr<Client>(
942 new AsyncStreamingFromServerClient(config));
943 case STREAMING_BOTH_WAYS:
944 // TODO(vjpai): Implement this
952 std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
953 const ClientConfig& args) {
954 return std::unique_ptr<Client>(new GenericAsyncStreamingClient(args));
957 } // namespace testing