3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
25 #include <grpcpp/channel.h>
26 #include <grpcpp/client_context.h>
27 #include <grpcpp/create_channel.h>
28 #include <grpcpp/generic/generic_stub.h>
29 #include <grpcpp/impl/codegen/proto_utils.h>
30 #include <grpcpp/server.h>
31 #include <grpcpp/server_builder.h>
32 #include <grpcpp/server_context.h>
33 #include <grpcpp/support/client_callback.h>
35 #include "src/core/lib/iomgr/iomgr.h"
36 #include "src/proto/grpc/testing/echo.grpc.pb.h"
37 #include "test/core/util/port.h"
38 #include "test/core/util/test_config.h"
39 #include "test/cpp/end2end/interceptors_util.h"
40 #include "test/cpp/end2end/test_service_impl.h"
41 #include "test/cpp/util/byte_buffer_proto_helper.h"
42 #include "test/cpp/util/string_ref_helper.h"
43 #include "test/cpp/util/test_credentials_provider.h"
45 #include <gtest/gtest.h>
47 // MAYBE_SKIP_TEST is a macro to determine if this particular test configuration
48 // should be skipped based on a decision made at SetUp time. In particular, any
49 // callback tests can only be run if the iomgr can run in the background or if
50 // the transport is in-process.
51 #define MAYBE_SKIP_TEST \
62 enum class Protocol { INPROC, TCP };
66 TestScenario(bool serve_callback, Protocol protocol, bool intercept,
67 const grpc::string& creds_type)
68 : callback_server(serve_callback),
70 use_interceptors(intercept),
71 credentials_type(creds_type) {}
75 bool use_interceptors;
76 const grpc::string credentials_type;
79 static std::ostream& operator<<(std::ostream& out,
80 const TestScenario& scenario) {
81 return out << "TestScenario{callback_server="
82 << (scenario.callback_server ? "true" : "false") << ",protocol="
83 << (scenario.protocol == Protocol::INPROC ? "INPROC" : "TCP")
84 << ",intercept=" << (scenario.use_interceptors ? "true" : "false")
85 << ",creds=" << scenario.credentials_type << "}";
88 void TestScenario::Log() const {
89 std::ostringstream out;
91 gpr_log(GPR_DEBUG, "%s", out.str().c_str());
94 class ClientCallbackEnd2endTest
95 : public ::testing::TestWithParam<TestScenario> {
97 ClientCallbackEnd2endTest() { GetParam().Log(); }
99 void SetUp() override {
100 ServerBuilder builder;
102 auto server_creds = GetCredentialsProvider()->GetServerCredentials(
103 GetParam().credentials_type);
104 // TODO(vjpai): Support testing of AuthMetadataProcessor
106 if (GetParam().protocol == Protocol::TCP) {
107 if (!grpc_iomgr_run_in_background()) {
111 picked_port_ = grpc_pick_unused_port_or_die();
112 server_address_ << "localhost:" << picked_port_;
113 builder.AddListeningPort(server_address_.str(), server_creds);
115 if (!GetParam().callback_server) {
116 builder.RegisterService(&service_);
118 builder.RegisterService(&callback_service_);
121 if (GetParam().use_interceptors) {
123 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
125 // Add 20 dummy server interceptors
126 creators.reserve(20);
127 for (auto i = 0; i < 20; i++) {
128 creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
129 new DummyInterceptorFactory()));
131 builder.experimental().SetInterceptorCreators(std::move(creators));
134 server_ = builder.BuildAndStart();
135 is_server_started_ = true;
139 ChannelArguments args;
140 auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
141 GetParam().credentials_type, &args);
142 switch (GetParam().protocol) {
144 if (!GetParam().use_interceptors) {
145 channel_ = ::grpc::CreateCustomChannel(server_address_.str(),
146 channel_creds, args);
148 channel_ = CreateCustomChannelWithInterceptors(
149 server_address_.str(), channel_creds, args,
150 CreateDummyClientInterceptors());
153 case Protocol::INPROC:
154 if (!GetParam().use_interceptors) {
155 channel_ = server_->InProcessChannel(args);
157 channel_ = server_->experimental().InProcessChannelWithInterceptors(
158 args, CreateDummyClientInterceptors());
164 stub_ = grpc::testing::EchoTestService::NewStub(channel_);
165 generic_stub_.reset(new GenericStub(channel_));
166 DummyInterceptor::Reset();
169 void TearDown() override {
170 if (is_server_started_) {
173 if (picked_port_ > 0) {
174 grpc_recycle_unused_port(picked_port_);
178 void SendRpcs(int num_rpcs, bool with_binary_metadata) {
179 grpc::string test_string("");
180 for (int i = 0; i < num_rpcs; i++) {
182 EchoResponse response;
183 ClientContext cli_ctx;
185 test_string += "Hello world. ";
186 request.set_message(test_string);
188 if (with_binary_metadata) {
189 request.mutable_param()->set_echo_metadata(true);
190 char bytes[8] = {'\0', '\1', '\2', '\3',
191 '\4', '\5', '\6', static_cast<char>(i)};
192 val = grpc::string(bytes, 8);
193 cli_ctx.AddMetadata("custom-bin", val);
196 cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
199 std::condition_variable cv;
201 stub_->experimental_async()->Echo(
202 &cli_ctx, &request, &response,
203 [&cli_ctx, &request, &response, &done, &mu, &cv, val,
204 with_binary_metadata](Status s) {
207 EXPECT_EQ(request.message(), response.message());
208 if (with_binary_metadata) {
210 1u, cli_ctx.GetServerTrailingMetadata().count("custom-bin"));
211 EXPECT_EQ(val, ToString(cli_ctx.GetServerTrailingMetadata()
215 std::lock_guard<std::mutex> l(mu);
219 std::unique_lock<std::mutex> l(mu);
226 void SendRpcsRawReq(int num_rpcs) {
227 grpc::string test_string("Hello raw world.");
229 request.set_message(test_string);
230 std::unique_ptr<ByteBuffer> send_buf = SerializeToByteBuffer(&request);
232 for (int i = 0; i < num_rpcs; i++) {
233 EchoResponse response;
234 ClientContext cli_ctx;
237 std::condition_variable cv;
239 stub_->experimental_async()->Echo(
240 &cli_ctx, send_buf.get(), &response,
241 [&request, &response, &done, &mu, &cv](Status s) {
244 EXPECT_EQ(request.message(), response.message());
245 std::lock_guard<std::mutex> l(mu);
249 std::unique_lock<std::mutex> l(mu);
256 void SendRpcsGeneric(int num_rpcs, bool maybe_except) {
257 const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
258 grpc::string test_string("");
259 for (int i = 0; i < num_rpcs; i++) {
261 std::unique_ptr<ByteBuffer> send_buf;
263 ClientContext cli_ctx;
265 test_string += "Hello world. ";
266 request.set_message(test_string);
267 send_buf = SerializeToByteBuffer(&request);
270 std::condition_variable cv;
272 generic_stub_->experimental().UnaryCall(
273 &cli_ctx, kMethodName, send_buf.get(), &recv_buf,
274 [&request, &recv_buf, &done, &mu, &cv, maybe_except](Status s) {
277 EchoResponse response;
278 EXPECT_TRUE(ParseFromByteBuffer(&recv_buf, &response));
279 EXPECT_EQ(request.message(), response.message());
280 std::lock_guard<std::mutex> l(mu);
283 #if GRPC_ALLOW_EXCEPTIONS
288 GPR_ASSERT(!maybe_except);
291 std::unique_lock<std::mutex> l(mu);
298 void SendGenericEchoAsBidi(int num_rpcs, int reuses) {
299 const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
300 grpc::string test_string("");
301 for (int i = 0; i < num_rpcs; i++) {
302 test_string += "Hello world. ";
303 class Client : public grpc::experimental::ClientBidiReactor<ByteBuffer,
306 Client(ClientCallbackEnd2endTest* test, const grpc::string& method_name,
307 const grpc::string& test_str, int reuses)
308 : reuses_remaining_(reuses) {
309 activate_ = [this, test, method_name, test_str] {
310 if (reuses_remaining_ > 0) {
311 cli_ctx_.reset(new ClientContext);
313 test->generic_stub_->experimental().PrepareBidiStreamingCall(
314 cli_ctx_.get(), method_name, this);
315 request_.set_message(test_str);
316 send_buf_ = SerializeToByteBuffer(&request_);
317 StartWrite(send_buf_.get());
318 StartRead(&recv_buf_);
321 std::unique_lock<std::mutex> l(mu_);
328 void OnWriteDone(bool ok) override { StartWritesDone(); }
329 void OnReadDone(bool ok) override {
330 EchoResponse response;
331 EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response));
332 EXPECT_EQ(request_.message(), response.message());
334 void OnDone(const Status& s) override {
339 std::unique_lock<std::mutex> l(mu_);
345 EchoRequest request_;
346 std::unique_ptr<ByteBuffer> send_buf_;
347 ByteBuffer recv_buf_;
348 std::unique_ptr<ClientContext> cli_ctx_;
349 int reuses_remaining_;
350 std::function<void()> activate_;
352 std::condition_variable cv_;
354 } rpc{this, kMethodName, test_string, reuses};
359 bool do_not_test_{false};
360 bool is_server_started_{false};
362 std::shared_ptr<Channel> channel_;
363 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
364 std::unique_ptr<grpc::GenericStub> generic_stub_;
365 TestServiceImpl service_;
366 CallbackTestServiceImpl callback_service_;
367 std::unique_ptr<Server> server_;
368 std::ostringstream server_address_;
371 TEST_P(ClientCallbackEnd2endTest, SimpleRpc) {
377 TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) {
383 TEST_P(ClientCallbackEnd2endTest, SequentialRpcsRawReq) {
389 TEST_P(ClientCallbackEnd2endTest, SendClientInitialMetadata) {
392 SimpleRequest request;
393 SimpleResponse response;
394 ClientContext cli_ctx;
396 cli_ctx.AddMetadata(kCheckClientInitialMetadataKey,
397 kCheckClientInitialMetadataVal);
400 std::condition_variable cv;
402 stub_->experimental_async()->CheckClientInitialMetadata(
403 &cli_ctx, &request, &response, [&done, &mu, &cv](Status s) {
406 std::lock_guard<std::mutex> l(mu);
410 std::unique_lock<std::mutex> l(mu);
416 TEST_P(ClientCallbackEnd2endTest, SimpleRpcWithBinaryMetadata) {
422 TEST_P(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) {
428 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) {
431 SendRpcsGeneric(10, false);
434 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) {
437 SendGenericEchoAsBidi(10, 1);
440 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) {
443 SendGenericEchoAsBidi(10, 10);
446 #if GRPC_ALLOW_EXCEPTIONS
447 TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) {
450 SendRpcsGeneric(10, true);
454 TEST_P(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
457 std::vector<std::thread> threads;
459 for (int i = 0; i < 10; ++i) {
460 threads.emplace_back([this] { SendRpcs(10, true); });
462 for (int i = 0; i < 10; ++i) {
467 TEST_P(ClientCallbackEnd2endTest, MultipleRpcs) {
470 std::vector<std::thread> threads;
472 for (int i = 0; i < 10; ++i) {
473 threads.emplace_back([this] { SendRpcs(10, false); });
475 for (int i = 0; i < 10; ++i) {
480 TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) {
484 EchoResponse response;
485 ClientContext context;
486 request.set_message("hello");
490 std::condition_variable cv;
492 stub_->experimental_async()->Echo(
493 &context, &request, &response, [&response, &done, &mu, &cv](Status s) {
494 EXPECT_EQ("", response.message());
495 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
496 std::lock_guard<std::mutex> l(mu);
500 std::unique_lock<std::mutex> l(mu);
504 if (GetParam().use_interceptors) {
505 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
509 TEST_P(ClientCallbackEnd2endTest, RequestEchoServerCancel) {
513 EchoResponse response;
514 ClientContext context;
515 request.set_message("hello");
516 context.AddMetadata(kServerTryCancelRequest,
517 grpc::to_string(CANCEL_BEFORE_PROCESSING));
520 std::condition_variable cv;
522 stub_->experimental_async()->Echo(
523 &context, &request, &response, [&done, &mu, &cv](Status s) {
524 EXPECT_FALSE(s.ok());
525 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
526 std::lock_guard<std::mutex> l(mu);
530 std::unique_lock<std::mutex> l(mu);
536 struct ClientCancelInfo {
538 int ops_before_cancel;
540 ClientCancelInfo() : cancel{false} {}
541 explicit ClientCancelInfo(int ops) : cancel{true}, ops_before_cancel{ops} {}
544 class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> {
546 WriteClient(grpc::testing::EchoTestService::Stub* stub,
547 ServerTryCancelRequestPhase server_try_cancel,
548 int num_msgs_to_send, ClientCancelInfo client_cancel = {})
549 : server_try_cancel_(server_try_cancel),
550 num_msgs_to_send_(num_msgs_to_send),
551 client_cancel_{client_cancel} {
552 grpc::string msg{"Hello server."};
553 for (int i = 0; i < num_msgs_to_send; i++) {
556 if (server_try_cancel != DO_NOT_CANCEL) {
557 // Send server_try_cancel value in the client metadata
558 context_.AddMetadata(kServerTryCancelRequest,
559 grpc::to_string(server_try_cancel));
561 context_.set_initial_metadata_corked(true);
562 stub->experimental_async()->RequestStream(&context_, &response_, this);
564 request_.set_message(msg);
567 void OnWriteDone(bool ok) override {
573 void OnDone(const Status& s) override {
574 gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent_);
576 (client_cancel_.cancel)
577 ? std::min(num_msgs_to_send_, client_cancel_.ops_before_cancel)
579 switch (server_try_cancel_) {
580 case CANCEL_BEFORE_PROCESSING:
581 case CANCEL_DURING_PROCESSING:
582 // If the RPC is canceled by server before / during messages from the
583 // client, it means that the client most likely did not get a chance to
584 // send all the messages it wanted to send. i.e num_msgs_sent <=
586 EXPECT_LE(num_msgs_sent_, num_to_send);
589 case CANCEL_AFTER_PROCESSING:
590 // If the RPC was not canceled or canceled after all messages were read
591 // by the server, the client did get a chance to send all its messages
592 EXPECT_EQ(num_msgs_sent_, num_to_send);
598 if ((server_try_cancel_ == DO_NOT_CANCEL) && !client_cancel_.cancel) {
600 EXPECT_EQ(response_.message(), desired_);
602 EXPECT_FALSE(s.ok());
603 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
605 std::unique_lock<std::mutex> l(mu_);
610 std::unique_lock<std::mutex> l(mu_);
618 if (client_cancel_.cancel &&
619 num_msgs_sent_ == client_cancel_.ops_before_cancel) {
620 context_.TryCancel();
621 } else if (num_msgs_to_send_ > num_msgs_sent_ + 1) {
622 StartWrite(&request_);
623 } else if (num_msgs_to_send_ == num_msgs_sent_ + 1) {
624 StartWriteLast(&request_, WriteOptions());
627 EchoRequest request_;
628 EchoResponse response_;
629 ClientContext context_;
630 const ServerTryCancelRequestPhase server_try_cancel_;
631 int num_msgs_sent_{0};
632 const int num_msgs_to_send_;
633 grpc::string desired_;
634 const ClientCancelInfo client_cancel_;
636 std::condition_variable cv_;
640 TEST_P(ClientCallbackEnd2endTest, RequestStream) {
643 WriteClient test{stub_.get(), DO_NOT_CANCEL, 3};
645 // Make sure that the server interceptors were not notified to cancel
646 if (GetParam().use_interceptors) {
647 EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
651 TEST_P(ClientCallbackEnd2endTest, ClientCancelsRequestStream) {
654 WriteClient test{stub_.get(), DO_NOT_CANCEL, 3, ClientCancelInfo{2}};
656 // Make sure that the server interceptors got the cancel
657 if (GetParam().use_interceptors) {
658 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
662 // Server to cancel before doing reading the request
663 TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelBeforeReads) {
666 WriteClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 1};
668 // Make sure that the server interceptors were notified
669 if (GetParam().use_interceptors) {
670 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
674 // Server to cancel while reading a request from the stream in parallel
675 TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelDuringRead) {
678 WriteClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10};
680 // Make sure that the server interceptors were notified
681 if (GetParam().use_interceptors) {
682 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
686 // Server to cancel after reading all the requests but before returning to the
688 TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelAfterReads) {
691 WriteClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 4};
693 // Make sure that the server interceptors were notified
694 if (GetParam().use_interceptors) {
695 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
699 TEST_P(ClientCallbackEnd2endTest, UnaryReactor) {
702 class UnaryClient : public grpc::experimental::ClientUnaryReactor {
704 UnaryClient(grpc::testing::EchoTestService::Stub* stub) {
705 cli_ctx_.AddMetadata("key1", "val1");
706 cli_ctx_.AddMetadata("key2", "val2");
707 request_.mutable_param()->set_echo_metadata_initially(true);
708 request_.set_message("Hello metadata");
709 stub->experimental_async()->Echo(&cli_ctx_, &request_, &response_, this);
712 void OnReadInitialMetadataDone(bool ok) override {
714 EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key1"));
717 ToString(cli_ctx_.GetServerInitialMetadata().find("key1")->second));
718 EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key2"));
721 ToString(cli_ctx_.GetServerInitialMetadata().find("key2")->second));
722 initial_metadata_done_ = true;
724 void OnDone(const Status& s) override {
725 EXPECT_TRUE(initial_metadata_done_);
726 EXPECT_EQ(0u, cli_ctx_.GetServerTrailingMetadata().size());
728 EXPECT_EQ(request_.message(), response_.message());
729 std::unique_lock<std::mutex> l(mu_);
734 std::unique_lock<std::mutex> l(mu_);
741 EchoRequest request_;
742 EchoResponse response_;
743 ClientContext cli_ctx_;
745 std::condition_variable cv_;
747 bool initial_metadata_done_{false};
750 UnaryClient test{stub_.get()};
752 // Make sure that the server interceptors were not notified of a cancel
753 if (GetParam().use_interceptors) {
754 EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
758 class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
760 ReadClient(grpc::testing::EchoTestService::Stub* stub,
761 ServerTryCancelRequestPhase server_try_cancel,
762 ClientCancelInfo client_cancel = {})
763 : server_try_cancel_(server_try_cancel), client_cancel_{client_cancel} {
764 if (server_try_cancel_ != DO_NOT_CANCEL) {
765 // Send server_try_cancel value in the client metadata
766 context_.AddMetadata(kServerTryCancelRequest,
767 grpc::to_string(server_try_cancel));
769 request_.set_message("Hello client ");
770 stub->experimental_async()->ResponseStream(&context_, &request_, this);
771 if (client_cancel_.cancel &&
772 reads_complete_ == client_cancel_.ops_before_cancel) {
773 context_.TryCancel();
775 // Even if we cancel, read until failure because there might be responses
777 StartRead(&response_);
780 void OnReadDone(bool ok) override {
782 if (server_try_cancel_ == DO_NOT_CANCEL && !client_cancel_.cancel) {
783 EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
786 EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
787 EXPECT_EQ(response_.message(),
788 request_.message() + grpc::to_string(reads_complete_));
790 if (client_cancel_.cancel &&
791 reads_complete_ == client_cancel_.ops_before_cancel) {
792 context_.TryCancel();
794 // Even if we cancel, read until failure because there might be responses
796 StartRead(&response_);
799 void OnDone(const Status& s) override {
800 gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
801 switch (server_try_cancel_) {
803 if (!client_cancel_.cancel || client_cancel_.ops_before_cancel >
804 kServerDefaultResponseStreamsToSend) {
806 EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
808 EXPECT_GE(reads_complete_, client_cancel_.ops_before_cancel);
809 EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
810 // Status might be ok or cancelled depending on whether server
811 // sent status before client cancel went through
813 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
817 case CANCEL_BEFORE_PROCESSING:
818 EXPECT_FALSE(s.ok());
819 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
820 EXPECT_EQ(reads_complete_, 0);
822 case CANCEL_DURING_PROCESSING:
823 case CANCEL_AFTER_PROCESSING:
824 // If server canceled while writing messages, client must have read
825 // less than or equal to the expected number of messages. Even if the
826 // server canceled after writing all messages, the RPC may be canceled
827 // before the Client got a chance to read all the messages.
828 EXPECT_FALSE(s.ok());
829 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
830 EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
835 std::unique_lock<std::mutex> l(mu_);
840 std::unique_lock<std::mutex> l(mu_);
847 EchoRequest request_;
848 EchoResponse response_;
849 ClientContext context_;
850 const ServerTryCancelRequestPhase server_try_cancel_;
851 int reads_complete_{0};
852 const ClientCancelInfo client_cancel_;
854 std::condition_variable cv_;
858 TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
861 ReadClient test{stub_.get(), DO_NOT_CANCEL};
863 // Make sure that the server interceptors were not notified of a cancel
864 if (GetParam().use_interceptors) {
865 EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
869 TEST_P(ClientCallbackEnd2endTest, ClientCancelsResponseStream) {
872 ReadClient test{stub_.get(), DO_NOT_CANCEL, ClientCancelInfo{2}};
874 // Because cancel in this case races with server finish, we can't be sure that
875 // server interceptors even see cancellation
878 // Server to cancel before sending any response messages
879 TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelBefore) {
882 ReadClient test{stub_.get(), CANCEL_BEFORE_PROCESSING};
884 // Make sure that the server interceptors were notified
885 if (GetParam().use_interceptors) {
886 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
890 // Server to cancel while writing a response to the stream in parallel
891 TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelDuring) {
894 ReadClient test{stub_.get(), CANCEL_DURING_PROCESSING};
896 // Make sure that the server interceptors were notified
897 if (GetParam().use_interceptors) {
898 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
902 // Server to cancel after writing all the respones to the stream but before
903 // returning to the client
904 TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelAfter) {
907 ReadClient test{stub_.get(), CANCEL_AFTER_PROCESSING};
909 // Make sure that the server interceptors were notified
910 if (GetParam().use_interceptors) {
911 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
916 : public grpc::experimental::ClientBidiReactor<EchoRequest, EchoResponse> {
918 BidiClient(grpc::testing::EchoTestService::Stub* stub,
919 ServerTryCancelRequestPhase server_try_cancel,
920 int num_msgs_to_send, ClientCancelInfo client_cancel = {})
921 : server_try_cancel_(server_try_cancel),
922 msgs_to_send_{num_msgs_to_send},
923 client_cancel_{client_cancel} {
924 if (server_try_cancel_ != DO_NOT_CANCEL) {
925 // Send server_try_cancel value in the client metadata
926 context_.AddMetadata(kServerTryCancelRequest,
927 grpc::to_string(server_try_cancel));
929 request_.set_message("Hello fren ");
930 stub->experimental_async()->BidiStream(&context_, this);
932 StartRead(&response_);
935 void OnReadDone(bool ok) override {
937 if (server_try_cancel_ == DO_NOT_CANCEL) {
938 if (!client_cancel_.cancel) {
939 EXPECT_EQ(reads_complete_, msgs_to_send_);
941 EXPECT_LE(reads_complete_, writes_complete_);
945 EXPECT_LE(reads_complete_, msgs_to_send_);
946 EXPECT_EQ(response_.message(), request_.message());
948 StartRead(&response_);
951 void OnWriteDone(bool ok) override {
952 if (server_try_cancel_ == DO_NOT_CANCEL) {
960 void OnDone(const Status& s) override {
961 gpr_log(GPR_INFO, "Sent %d messages", writes_complete_);
962 gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
963 switch (server_try_cancel_) {
965 if (!client_cancel_.cancel ||
966 client_cancel_.ops_before_cancel > msgs_to_send_) {
968 EXPECT_EQ(writes_complete_, msgs_to_send_);
969 EXPECT_EQ(reads_complete_, writes_complete_);
971 EXPECT_FALSE(s.ok());
972 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
973 EXPECT_EQ(writes_complete_, client_cancel_.ops_before_cancel);
974 EXPECT_LE(reads_complete_, writes_complete_);
977 case CANCEL_BEFORE_PROCESSING:
978 EXPECT_FALSE(s.ok());
979 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
980 // The RPC is canceled before the server did any work or returned any
981 // reads, but it's possible that some writes took place first from the
983 EXPECT_LE(writes_complete_, msgs_to_send_);
984 EXPECT_EQ(reads_complete_, 0);
986 case CANCEL_DURING_PROCESSING:
987 EXPECT_FALSE(s.ok());
988 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
989 EXPECT_LE(writes_complete_, msgs_to_send_);
990 EXPECT_LE(reads_complete_, writes_complete_);
992 case CANCEL_AFTER_PROCESSING:
993 EXPECT_FALSE(s.ok());
994 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
995 EXPECT_EQ(writes_complete_, msgs_to_send_);
996 // The Server canceled after reading the last message and after writing
997 // the message to the client. However, the RPC cancellation might have
998 // taken effect before the client actually read the response.
999 EXPECT_LE(reads_complete_, writes_complete_);
1004 std::unique_lock<std::mutex> l(mu_);
1009 std::unique_lock<std::mutex> l(mu_);
1017 if (client_cancel_.cancel &&
1018 writes_complete_ == client_cancel_.ops_before_cancel) {
1019 context_.TryCancel();
1020 } else if (writes_complete_ == msgs_to_send_) {
1023 StartWrite(&request_);
1026 EchoRequest request_;
1027 EchoResponse response_;
1028 ClientContext context_;
1029 const ServerTryCancelRequestPhase server_try_cancel_;
1030 int reads_complete_{0};
1031 int writes_complete_{0};
1032 const int msgs_to_send_;
1033 const ClientCancelInfo client_cancel_;
1035 std::condition_variable cv_;
1039 TEST_P(ClientCallbackEnd2endTest, BidiStream) {
1042 BidiClient test{stub_.get(), DO_NOT_CANCEL,
1043 kServerDefaultResponseStreamsToSend};
1045 // Make sure that the server interceptors were not notified of a cancel
1046 if (GetParam().use_interceptors) {
1047 EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
1051 TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) {
1054 BidiClient test{stub_.get(), DO_NOT_CANCEL,
1055 kServerDefaultResponseStreamsToSend, ClientCancelInfo{2}};
1057 // Make sure that the server interceptors were notified of a cancel
1058 if (GetParam().use_interceptors) {
1059 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1063 // Server to cancel before reading/writing any requests/responses on the stream
1064 TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
1067 BidiClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 2};
1069 // Make sure that the server interceptors were notified
1070 if (GetParam().use_interceptors) {
1071 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1075 // Server to cancel while reading/writing requests/responses on the stream in
1077 TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) {
1080 BidiClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10};
1082 // Make sure that the server interceptors were notified
1083 if (GetParam().use_interceptors) {
1084 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1088 // Server to cancel after reading/writing all requests/responses on the stream
1089 // but before returning to the client
1090 TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) {
1093 BidiClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 5};
1095 // Make sure that the server interceptors were notified
1096 if (GetParam().use_interceptors) {
1097 EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1101 TEST_P(ClientCallbackEnd2endTest, SimultaneousReadAndWritesDone) {
1104 class Client : public grpc::experimental::ClientBidiReactor<EchoRequest,
1107 Client(grpc::testing::EchoTestService::Stub* stub) {
1108 request_.set_message("Hello bidi ");
1109 stub->experimental_async()->BidiStream(&context_, this);
1110 StartWrite(&request_);
1113 void OnReadDone(bool ok) override {
1115 EXPECT_EQ(response_.message(), request_.message());
1117 void OnWriteDone(bool ok) override {
1119 // Now send out the simultaneous Read and WritesDone
1121 StartRead(&response_);
1123 void OnDone(const Status& s) override {
1124 EXPECT_TRUE(s.ok());
1125 EXPECT_EQ(response_.message(), request_.message());
1126 std::unique_lock<std::mutex> l(mu_);
1131 std::unique_lock<std::mutex> l(mu_);
1138 EchoRequest request_;
1139 EchoResponse response_;
1140 ClientContext context_;
1142 std::condition_variable cv_;
1144 } test{stub_.get()};
1149 TEST_P(ClientCallbackEnd2endTest, UnimplementedRpc) {
1151 ChannelArguments args;
1152 const auto& channel_creds = GetCredentialsProvider()->GetChannelCredentials(
1153 GetParam().credentials_type, &args);
1154 std::shared_ptr<Channel> channel =
1155 (GetParam().protocol == Protocol::TCP)
1156 ? ::grpc::CreateCustomChannel(server_address_.str(), channel_creds,
1158 : server_->InProcessChannel(args);
1159 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1160 stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
1161 EchoRequest request;
1162 EchoResponse response;
1163 ClientContext cli_ctx;
1164 request.set_message("Hello world.");
1166 std::condition_variable cv;
1168 stub->experimental_async()->Unimplemented(
1169 &cli_ctx, &request, &response, [&done, &mu, &cv](Status s) {
1170 EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
1171 EXPECT_EQ("", s.error_message());
1173 std::lock_guard<std::mutex> l(mu);
1177 std::unique_lock<std::mutex> l(mu);
1183 TEST_P(ClientCallbackEnd2endTest,
1184 ResponseStreamExtraReactionFlowReadsUntilDone) {
1187 class ReadAllIncomingDataClient
1188 : public grpc::experimental::ClientReadReactor<EchoResponse> {
1190 ReadAllIncomingDataClient(grpc::testing::EchoTestService::Stub* stub) {
1191 request_.set_message("Hello client ");
1192 stub->experimental_async()->ResponseStream(&context_, &request_, this);
1194 bool WaitForReadDone() {
1195 std::unique_lock<std::mutex> l(mu_);
1196 while (!read_done_) {
1203 std::unique_lock<std::mutex> l(mu_);
1208 const Status& status() {
1209 std::unique_lock<std::mutex> l(mu_);
1214 void OnReadDone(bool ok) override {
1215 std::unique_lock<std::mutex> l(mu_);
1218 read_cv_.notify_one();
1220 void OnDone(const Status& s) override {
1221 std::unique_lock<std::mutex> l(mu_);
1224 done_cv_.notify_one();
1227 EchoRequest request_;
1228 EchoResponse response_;
1229 ClientContext context_;
1230 bool read_ok_ = false;
1231 bool read_done_ = false;
1233 std::condition_variable read_cv_;
1234 std::condition_variable done_cv_;
1237 } client{stub_.get()};
1239 int reads_complete = 0;
1243 EchoResponse response;
1244 bool read_ok = true;
1246 client.StartRead(&response);
1247 read_ok = client.WaitForReadDone();
1252 client.RemoveHold();
1255 EXPECT_EQ(kServerDefaultResponseStreamsToSend, reads_complete);
1256 EXPECT_EQ(client.status().error_code(), grpc::StatusCode::OK);
1259 std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
1260 std::vector<TestScenario> scenarios;
1261 std::vector<grpc::string> credentials_types{
1262 GetCredentialsProvider()->GetSecureCredentialsTypeList()};
1263 auto insec_ok = [] {
1264 // Only allow insecure credentials type when it is registered with the
1265 // provider. User may create providers that do not have insecure.
1266 return GetCredentialsProvider()->GetChannelCredentials(
1267 kInsecureCredentialsType, nullptr) != nullptr;
1269 if (test_insecure && insec_ok()) {
1270 credentials_types.push_back(kInsecureCredentialsType);
1272 GPR_ASSERT(!credentials_types.empty());
1274 bool barr[]{false, true};
1275 Protocol parr[]{Protocol::INPROC, Protocol::TCP};
1276 for (Protocol p : parr) {
1277 for (const auto& cred : credentials_types) {
1278 // TODO(vjpai): Test inproc with secure credentials when feasible
1279 if (p == Protocol::INPROC &&
1280 (cred != kInsecureCredentialsType || !insec_ok())) {
1283 for (bool callback_server : barr) {
1284 for (bool use_interceptors : barr) {
1285 scenarios.emplace_back(callback_server, p, use_interceptors, cred);
1293 INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest,
1294 ::testing::ValuesIn(CreateTestScenarios(true)));
1297 } // namespace testing
1300 int main(int argc, char** argv) {
1301 grpc::testing::TestEnvironment env(argc, argv);
1302 ::testing::InitGoogleTest(&argc, argv);
1303 return RUN_ALL_TESTS();