3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
23 #include "absl/memory/memory.h"
25 #include <grpc/grpc.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/time.h>
29 #include <grpcpp/channel.h>
30 #include <grpcpp/client_context.h>
31 #include <grpcpp/create_channel.h>
32 #include <grpcpp/ext/health_check_service_server_builder_option.h>
33 #include <grpcpp/server.h>
34 #include <grpcpp/server_builder.h>
35 #include <grpcpp/server_context.h>
37 #include "src/core/ext/filters/client_channel/backup_poller.h"
38 #include "src/core/lib/gpr/tls.h"
39 #include "src/core/lib/iomgr/port.h"
40 #include "src/proto/grpc/health/v1/health.grpc.pb.h"
41 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
42 #include "src/proto/grpc/testing/echo.grpc.pb.h"
43 #include "test/core/util/port.h"
44 #include "test/core/util/test_config.h"
45 #include "test/cpp/util/string_ref_helper.h"
46 #include "test/cpp/util/test_credentials_provider.h"
48 #ifdef GRPC_POSIX_SOCKET_EV
49 #include "src/core/lib/iomgr/ev_posix.h"
50 #endif // GRPC_POSIX_SOCKET_EV
52 #include <gtest/gtest.h>
54 using grpc::testing::EchoRequest;
55 using grpc::testing::EchoResponse;
56 using std::chrono::system_clock;
63 void* tag(int t) { return reinterpret_cast<void*>(t); }
64 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
68 Verifier() : lambda_run_(false) {}
69 // Expect sets the expected ok value for a specific tag
70 Verifier& Expect(int i, bool expect_ok) {
71 return ExpectUnless(i, expect_ok, false);
73 // ExpectUnless sets the expected ok value for a specific tag
74 // unless the tag was already marked seen (as a result of ExpectMaybe)
75 Verifier& ExpectUnless(int i, bool expect_ok, bool seen) {
77 expectations_[tag(i)] = expect_ok;
81 // ExpectMaybe sets the expected ok value for a specific tag, but does not
82 // require it to appear
83 // If it does, sets *seen to true
84 Verifier& ExpectMaybe(int i, bool expect_ok, bool* seen) {
86 maybe_expectations_[tag(i)] = MaybeExpect{expect_ok, seen};
91 // Next waits for 1 async tag to complete, checks its
92 // expectations, and returns the tag
93 int Next(CompletionQueue* cq, bool ignore_ok) {
96 EXPECT_TRUE(cq->Next(&got_tag, &ok));
97 GotTag(got_tag, ok, ignore_ok);
98 return detag(got_tag);
101 template <typename T>
102 CompletionQueue::NextStatus DoOnceThenAsyncNext(
103 CompletionQueue* cq, void** got_tag, bool* ok, T deadline,
104 std::function<void(void)> lambda) {
106 return cq->AsyncNext(got_tag, ok, deadline);
109 return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline);
113 // Verify keeps calling Next until all currently set
114 // expected tags are complete
115 void Verify(CompletionQueue* cq) { Verify(cq, false); }
117 // This version of Verify allows optionally ignoring the
118 // outcome of the expectation
119 void Verify(CompletionQueue* cq, bool ignore_ok) {
120 GPR_ASSERT(!expectations_.empty() || !maybe_expectations_.empty());
121 while (!expectations_.empty()) {
124 maybe_expectations_.clear();
127 // This version of Verify stops after a certain deadline
128 void Verify(CompletionQueue* cq,
129 std::chrono::system_clock::time_point deadline) {
130 if (expectations_.empty()) {
133 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
134 CompletionQueue::TIMEOUT);
136 while (!expectations_.empty()) {
139 EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
140 CompletionQueue::GOT_EVENT);
141 GotTag(got_tag, ok, false);
144 maybe_expectations_.clear();
147 // This version of Verify stops after a certain deadline, and uses the
148 // DoThenAsyncNext API
149 // to call the lambda
150 void Verify(CompletionQueue* cq,
151 std::chrono::system_clock::time_point deadline,
152 const std::function<void(void)>& lambda) {
153 if (expectations_.empty()) {
156 EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
157 CompletionQueue::TIMEOUT);
159 while (!expectations_.empty()) {
162 EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
163 CompletionQueue::GOT_EVENT);
164 GotTag(got_tag, ok, false);
167 maybe_expectations_.clear();
171 void GotTag(void* got_tag, bool ok, bool ignore_ok) {
172 auto it = expectations_.find(got_tag);
173 if (it != expectations_.end()) {
175 EXPECT_EQ(it->second, ok);
177 expectations_.erase(it);
179 auto it2 = maybe_expectations_.find(got_tag);
180 if (it2 != maybe_expectations_.end()) {
181 if (it2->second.seen != nullptr) {
182 EXPECT_FALSE(*it2->second.seen);
183 *it2->second.seen = true;
186 EXPECT_EQ(it2->second.ok, ok);
188 maybe_expectations_.erase(it2);
190 gpr_log(GPR_ERROR, "Unexpected tag: %p", got_tag);
201 std::map<void*, bool> expectations_;
202 std::map<void*, MaybeExpect> maybe_expectations_;
206 bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
207 return plugin->has_sync_methods();
210 // This class disables the server builder plugins that may add sync services to
211 // the server. If there are sync services, UnimplementedRpc test will triger
212 // the sync unknown rpc routine on the server side, rather than the async one
213 // that needs to be tested here.
214 class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption {
216 void UpdateArguments(ChannelArguments* /*arg*/) override {}
219 std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) override {
220 plugins->erase(std::remove_if(plugins->begin(), plugins->end(),
221 plugin_has_sync_methods),
228 TestScenario(bool inproc_stub, const std::string& creds_type, bool hcs,
229 const std::string& content)
230 : inproc(inproc_stub),
231 health_check_service(hcs),
232 credentials_type(creds_type),
233 message_content(content) {}
236 bool health_check_service;
237 const std::string credentials_type;
238 const std::string message_content;
241 static std::ostream& operator<<(std::ostream& out,
242 const TestScenario& scenario) {
243 return out << "TestScenario{inproc=" << (scenario.inproc ? "true" : "false")
244 << ", credentials='" << scenario.credentials_type
245 << ", health_check_service="
246 << (scenario.health_check_service ? "true" : "false")
247 << "', message_size=" << scenario.message_content.size() << "}";
250 void TestScenario::Log() const {
251 std::ostringstream out;
253 gpr_log(GPR_DEBUG, "%s", out.str().c_str());
256 class HealthCheck : public health::v1::Health::Service {};
258 class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
260 AsyncEnd2endTest() { GetParam().Log(); }
262 void SetUp() override {
263 port_ = grpc_pick_unused_port_or_die();
264 server_address_ << "localhost:" << port_;
267 BuildAndStartServer();
270 void TearDown() override {
275 while (cq_->Next(&ignored_tag, &ignored_ok)) {
278 grpc_recycle_unused_port(port_);
281 void BuildAndStartServer() {
282 ServerBuilder builder;
283 auto server_creds = GetCredentialsProvider()->GetServerCredentials(
284 GetParam().credentials_type);
285 builder.AddListeningPort(server_address_.str(), server_creds);
287 absl::make_unique<grpc::testing::EchoTestService::AsyncService>();
288 builder.RegisterService(service_.get());
289 if (GetParam().health_check_service) {
290 builder.RegisterService(&health_check_);
292 cq_ = builder.AddCompletionQueue();
294 // TODO(zyc): make a test option to choose wheather sync plugins should be
296 std::unique_ptr<ServerBuilderOption> sync_plugin_disabler(
297 new ServerBuilderSyncPluginDisabler());
298 builder.SetOption(move(sync_plugin_disabler));
299 server_ = builder.BuildAndStart();
303 ChannelArguments args;
304 auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
305 GetParam().credentials_type, &args);
306 std::shared_ptr<Channel> channel =
307 !(GetParam().inproc) ? ::grpc::CreateCustomChannel(
308 server_address_.str(), channel_creds, args)
309 : server_->InProcessChannel(args);
310 stub_ = grpc::testing::EchoTestService::NewStub(channel);
313 void SendRpc(int num_rpcs) {
314 for (int i = 0; i < num_rpcs; i++) {
315 EchoRequest send_request;
316 EchoRequest recv_request;
317 EchoResponse send_response;
318 EchoResponse recv_response;
321 ClientContext cli_ctx;
322 ServerContext srv_ctx;
323 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
325 send_request.set_message(GetParam().message_content);
326 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
327 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
329 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
330 cq_.get(), cq_.get(), tag(2));
332 response_reader->Finish(&recv_response, &recv_status, tag(4));
334 Verifier().Expect(2, true).Verify(cq_.get());
335 EXPECT_EQ(send_request.message(), recv_request.message());
337 send_response.set_message(recv_request.message());
338 response_writer.Finish(send_response, Status::OK, tag(3));
339 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
341 EXPECT_EQ(send_response.message(), recv_response.message());
342 EXPECT_TRUE(recv_status.ok());
346 std::unique_ptr<ServerCompletionQueue> cq_;
347 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
348 std::unique_ptr<Server> server_;
349 std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
350 HealthCheck health_check_;
351 std::ostringstream server_address_;
355 TEST_P(AsyncEnd2endTest, SimpleRpc) {
360 TEST_P(AsyncEnd2endTest, SimpleRpcWithExpectedError) {
363 EchoRequest send_request;
364 EchoRequest recv_request;
365 EchoResponse send_response;
366 EchoResponse recv_response;
369 ClientContext cli_ctx;
370 ServerContext srv_ctx;
371 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
372 ErrorStatus error_status;
374 send_request.set_message(GetParam().message_content);
375 error_status.set_code(1); // CANCELLED
376 error_status.set_error_message("cancel error message");
377 *send_request.mutable_param()->mutable_expected_error() = error_status;
379 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
380 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
382 srv_ctx.AsyncNotifyWhenDone(tag(5));
383 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
386 response_reader->Finish(&recv_response, &recv_status, tag(4));
388 Verifier().Expect(2, true).Verify(cq_.get());
389 EXPECT_EQ(send_request.message(), recv_request.message());
391 send_response.set_message(recv_request.message());
392 response_writer.Finish(
395 static_cast<StatusCode>(recv_request.param().expected_error().code()),
396 recv_request.param().expected_error().error_message()),
398 Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get());
400 EXPECT_EQ(recv_response.message(), "");
401 EXPECT_EQ(recv_status.error_code(), error_status.code());
402 EXPECT_EQ(recv_status.error_message(), error_status.error_message());
403 EXPECT_FALSE(srv_ctx.IsCancelled());
406 TEST_P(AsyncEnd2endTest, SequentialRpcs) {
411 TEST_P(AsyncEnd2endTest, ReconnectChannel) {
412 // GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS is set to 100ms in main()
413 if (GetParam().inproc) {
416 int poller_slowdown_factor = 1;
417 #ifdef GRPC_POSIX_SOCKET_EV
418 // It needs 2 pollset_works to reconnect the channel with polling engine
420 grpc_core::UniquePtr<char> poller = GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy);
421 if (0 == strcmp(poller.get(), "poll")) {
422 poller_slowdown_factor = 2;
424 #endif // GRPC_POSIX_SOCKET_EV
431 while (cq_->Next(&ignored_tag, &ignored_ok)) {
433 BuildAndStartServer();
434 // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
435 // reconnect the channel.
436 gpr_sleep_until(gpr_time_add(
437 gpr_now(GPR_CLOCK_REALTIME),
438 gpr_time_from_millis(
439 300 * poller_slowdown_factor * grpc_test_slowdown_factor(),
444 // We do not need to protect notify because the use is synchronized.
445 void ServerWait(Server* server, int* notify) {
449 TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
451 std::thread wait_thread(&ServerWait, server_.get(), ¬ify);
454 EXPECT_EQ(0, notify);
457 EXPECT_EQ(1, notify);
460 TEST_P(AsyncEnd2endTest, ShutdownThenWait) {
463 std::thread t([this]() { server_->Shutdown(); });
468 // Test a simple RPC using the async version of Next
469 TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
472 EchoRequest send_request;
473 EchoRequest recv_request;
474 EchoResponse send_response;
475 EchoResponse recv_response;
478 ClientContext cli_ctx;
479 ServerContext srv_ctx;
480 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
482 send_request.set_message(GetParam().message_content);
483 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
484 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
486 std::chrono::system_clock::time_point time_now(
487 std::chrono::system_clock::now());
488 std::chrono::system_clock::time_point time_limit(
489 std::chrono::system_clock::now() + std::chrono::seconds(10));
490 Verifier().Verify(cq_.get(), time_now);
491 Verifier().Verify(cq_.get(), time_now);
493 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
495 response_reader->Finish(&recv_response, &recv_status, tag(4));
497 Verifier().Expect(2, true).Verify(cq_.get(), time_limit);
498 EXPECT_EQ(send_request.message(), recv_request.message());
500 send_response.set_message(recv_request.message());
501 response_writer.Finish(send_response, Status::OK, tag(3));
502 Verifier().Expect(3, true).Expect(4, true).Verify(
503 cq_.get(), std::chrono::system_clock::time_point::max());
505 EXPECT_EQ(send_response.message(), recv_response.message());
506 EXPECT_TRUE(recv_status.ok());
509 // Test a simple RPC using the async version of Next
510 TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
513 EchoRequest send_request;
514 EchoRequest recv_request;
515 EchoResponse send_response;
516 EchoResponse recv_response;
519 ClientContext cli_ctx;
520 ServerContext srv_ctx;
521 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
523 send_request.set_message(GetParam().message_content);
524 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
525 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
527 std::chrono::system_clock::time_point time_now(
528 std::chrono::system_clock::now());
529 std::chrono::system_clock::time_point time_limit(
530 std::chrono::system_clock::now() + std::chrono::seconds(10));
531 Verifier().Verify(cq_.get(), time_now);
532 Verifier().Verify(cq_.get(), time_now);
534 auto resp_writer_ptr = &response_writer;
535 auto lambda_2 = [&, this, resp_writer_ptr]() {
536 service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(),
539 response_reader->Finish(&recv_response, &recv_status, tag(4));
541 Verifier().Expect(2, true).Verify(cq_.get(), time_limit, lambda_2);
542 EXPECT_EQ(send_request.message(), recv_request.message());
544 send_response.set_message(recv_request.message());
545 auto lambda_3 = [resp_writer_ptr, send_response]() {
546 resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
548 Verifier().Expect(3, true).Expect(4, true).Verify(
549 cq_.get(), std::chrono::system_clock::time_point::max(), lambda_3);
551 EXPECT_EQ(send_response.message(), recv_response.message());
552 EXPECT_TRUE(recv_status.ok());
555 // Two pings and a final pong.
556 TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
559 EchoRequest send_request;
560 EchoRequest recv_request;
561 EchoResponse send_response;
562 EchoResponse recv_response;
564 ClientContext cli_ctx;
565 ServerContext srv_ctx;
566 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
568 send_request.set_message(GetParam().message_content);
569 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
570 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
572 service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
575 Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
577 cli_stream->Write(send_request, tag(3));
578 srv_stream.Read(&recv_request, tag(4));
579 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
580 EXPECT_EQ(send_request.message(), recv_request.message());
582 cli_stream->Write(send_request, tag(5));
583 srv_stream.Read(&recv_request, tag(6));
584 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
586 EXPECT_EQ(send_request.message(), recv_request.message());
587 cli_stream->WritesDone(tag(7));
588 srv_stream.Read(&recv_request, tag(8));
589 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
591 send_response.set_message(recv_request.message());
592 srv_stream.Finish(send_response, Status::OK, tag(9));
593 cli_stream->Finish(&recv_status, tag(10));
594 Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
596 EXPECT_EQ(send_response.message(), recv_response.message());
597 EXPECT_TRUE(recv_status.ok());
600 // Two pings and a final pong.
601 TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
604 EchoRequest send_request;
605 EchoRequest recv_request;
606 EchoResponse send_response;
607 EchoResponse recv_response;
609 ClientContext cli_ctx;
610 ServerContext srv_ctx;
611 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
613 send_request.set_message(GetParam().message_content);
614 cli_ctx.set_initial_metadata_corked(true);
615 // tag:1 never comes up since no op is performed
616 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
617 stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
619 service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
622 cli_stream->Write(send_request, tag(3));
626 Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
628 srv_stream.Read(&recv_request, tag(4));
630 Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
632 EXPECT_EQ(send_request.message(), recv_request.message());
634 cli_stream->WriteLast(send_request, WriteOptions(), tag(5));
635 srv_stream.Read(&recv_request, tag(6));
636 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
637 EXPECT_EQ(send_request.message(), recv_request.message());
639 srv_stream.Read(&recv_request, tag(7));
640 Verifier().Expect(7, false).Verify(cq_.get());
642 send_response.set_message(recv_request.message());
643 srv_stream.Finish(send_response, Status::OK, tag(8));
644 cli_stream->Finish(&recv_status, tag(9));
645 Verifier().Expect(8, true).Expect(9, true).Verify(cq_.get());
647 EXPECT_EQ(send_response.message(), recv_response.message());
648 EXPECT_TRUE(recv_status.ok());
651 // One ping, two pongs.
652 TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
655 EchoRequest send_request;
656 EchoRequest recv_request;
657 EchoResponse send_response;
658 EchoResponse recv_response;
660 ClientContext cli_ctx;
661 ServerContext srv_ctx;
662 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
664 send_request.set_message(GetParam().message_content);
665 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
666 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
668 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
669 cq_.get(), cq_.get(), tag(2));
671 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
672 EXPECT_EQ(send_request.message(), recv_request.message());
674 send_response.set_message(recv_request.message());
675 srv_stream.Write(send_response, tag(3));
676 cli_stream->Read(&recv_response, tag(4));
677 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
678 EXPECT_EQ(send_response.message(), recv_response.message());
680 srv_stream.Write(send_response, tag(5));
681 cli_stream->Read(&recv_response, tag(6));
682 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
683 EXPECT_EQ(send_response.message(), recv_response.message());
685 srv_stream.Finish(Status::OK, tag(7));
686 cli_stream->Read(&recv_response, tag(8));
687 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
689 cli_stream->Finish(&recv_status, tag(9));
690 Verifier().Expect(9, true).Verify(cq_.get());
692 EXPECT_TRUE(recv_status.ok());
695 // One ping, two pongs. Using WriteAndFinish API
696 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
699 EchoRequest send_request;
700 EchoRequest recv_request;
701 EchoResponse send_response;
702 EchoResponse recv_response;
704 ClientContext cli_ctx;
705 ServerContext srv_ctx;
706 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
708 send_request.set_message(GetParam().message_content);
709 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
710 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
712 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
713 cq_.get(), cq_.get(), tag(2));
715 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
716 EXPECT_EQ(send_request.message(), recv_request.message());
718 send_response.set_message(recv_request.message());
719 srv_stream.Write(send_response, tag(3));
720 cli_stream->Read(&recv_response, tag(4));
721 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
722 EXPECT_EQ(send_response.message(), recv_response.message());
724 srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5));
725 cli_stream->Read(&recv_response, tag(6));
726 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
727 EXPECT_EQ(send_response.message(), recv_response.message());
729 cli_stream->Read(&recv_response, tag(7));
730 Verifier().Expect(7, false).Verify(cq_.get());
732 cli_stream->Finish(&recv_status, tag(8));
733 Verifier().Expect(8, true).Verify(cq_.get());
735 EXPECT_TRUE(recv_status.ok());
738 // One ping, two pongs. Using WriteLast API
739 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
742 EchoRequest send_request;
743 EchoRequest recv_request;
744 EchoResponse send_response;
745 EchoResponse recv_response;
747 ClientContext cli_ctx;
748 ServerContext srv_ctx;
749 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
751 send_request.set_message(GetParam().message_content);
752 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
753 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
755 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
756 cq_.get(), cq_.get(), tag(2));
758 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
759 EXPECT_EQ(send_request.message(), recv_request.message());
761 send_response.set_message(recv_request.message());
762 srv_stream.Write(send_response, tag(3));
763 cli_stream->Read(&recv_response, tag(4));
764 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
765 EXPECT_EQ(send_response.message(), recv_response.message());
767 srv_stream.WriteLast(send_response, WriteOptions(), tag(5));
768 cli_stream->Read(&recv_response, tag(6));
769 srv_stream.Finish(Status::OK, tag(7));
770 Verifier().Expect(5, true).Expect(6, true).Expect(7, true).Verify(cq_.get());
771 EXPECT_EQ(send_response.message(), recv_response.message());
773 cli_stream->Read(&recv_response, tag(8));
774 Verifier().Expect(8, false).Verify(cq_.get());
776 cli_stream->Finish(&recv_status, tag(9));
777 Verifier().Expect(9, true).Verify(cq_.get());
779 EXPECT_TRUE(recv_status.ok());
782 // One ping, one pong.
783 TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
786 EchoRequest send_request;
787 EchoRequest recv_request;
788 EchoResponse send_response;
789 EchoResponse recv_response;
791 ClientContext cli_ctx;
792 ServerContext srv_ctx;
793 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
795 send_request.set_message(GetParam().message_content);
796 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
797 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
799 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
802 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
804 cli_stream->Write(send_request, tag(3));
805 srv_stream.Read(&recv_request, tag(4));
806 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
807 EXPECT_EQ(send_request.message(), recv_request.message());
809 send_response.set_message(recv_request.message());
810 srv_stream.Write(send_response, tag(5));
811 cli_stream->Read(&recv_response, tag(6));
812 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
813 EXPECT_EQ(send_response.message(), recv_response.message());
815 cli_stream->WritesDone(tag(7));
816 srv_stream.Read(&recv_request, tag(8));
817 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
819 srv_stream.Finish(Status::OK, tag(9));
820 cli_stream->Finish(&recv_status, tag(10));
821 Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
823 EXPECT_TRUE(recv_status.ok());
826 // One ping, one pong. Using server:WriteAndFinish api
827 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
830 EchoRequest send_request;
831 EchoRequest recv_request;
832 EchoResponse send_response;
833 EchoResponse recv_response;
835 ClientContext cli_ctx;
836 ServerContext srv_ctx;
837 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
839 send_request.set_message(GetParam().message_content);
840 cli_ctx.set_initial_metadata_corked(true);
841 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
842 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
844 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
847 cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
851 Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
853 srv_stream.Read(&recv_request, tag(4));
855 Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
856 EXPECT_EQ(send_request.message(), recv_request.message());
858 srv_stream.Read(&recv_request, tag(5));
859 Verifier().Expect(5, false).Verify(cq_.get());
861 send_response.set_message(recv_request.message());
862 srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6));
863 cli_stream->Read(&recv_response, tag(7));
864 Verifier().Expect(6, true).Expect(7, true).Verify(cq_.get());
865 EXPECT_EQ(send_response.message(), recv_response.message());
867 cli_stream->Finish(&recv_status, tag(8));
868 Verifier().Expect(8, true).Verify(cq_.get());
870 EXPECT_TRUE(recv_status.ok());
873 // One ping, one pong. Using server:WriteLast api
874 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
877 EchoRequest send_request;
878 EchoRequest recv_request;
879 EchoResponse send_response;
880 EchoResponse recv_response;
882 ClientContext cli_ctx;
883 ServerContext srv_ctx;
884 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
886 send_request.set_message(GetParam().message_content);
887 cli_ctx.set_initial_metadata_corked(true);
888 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
889 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
891 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
894 cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
898 Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
900 srv_stream.Read(&recv_request, tag(4));
902 Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
903 EXPECT_EQ(send_request.message(), recv_request.message());
905 srv_stream.Read(&recv_request, tag(5));
906 Verifier().Expect(5, false).Verify(cq_.get());
908 send_response.set_message(recv_request.message());
909 srv_stream.WriteLast(send_response, WriteOptions(), tag(6));
910 srv_stream.Finish(Status::OK, tag(7));
911 cli_stream->Read(&recv_response, tag(8));
912 Verifier().Expect(6, true).Expect(7, true).Expect(8, true).Verify(cq_.get());
913 EXPECT_EQ(send_response.message(), recv_response.message());
915 cli_stream->Finish(&recv_status, tag(9));
916 Verifier().Expect(9, true).Verify(cq_.get());
918 EXPECT_TRUE(recv_status.ok());
922 TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
925 EchoRequest send_request;
926 EchoRequest recv_request;
927 EchoResponse send_response;
928 EchoResponse recv_response;
931 ClientContext cli_ctx;
932 ServerContext srv_ctx;
933 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
935 send_request.set_message(GetParam().message_content);
936 std::pair<std::string, std::string> meta1("key1", "val1");
937 std::pair<std::string, std::string> meta2("key2", "val2");
938 std::pair<std::string, std::string> meta3("g.r.d-bin", "xyz");
939 cli_ctx.AddMetadata(meta1.first, meta1.second);
940 cli_ctx.AddMetadata(meta2.first, meta2.second);
941 cli_ctx.AddMetadata(meta3.first, meta3.second);
943 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
944 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
945 response_reader->Finish(&recv_response, &recv_status, tag(4));
947 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
949 Verifier().Expect(2, true).Verify(cq_.get());
950 EXPECT_EQ(send_request.message(), recv_request.message());
951 const auto& client_initial_metadata = srv_ctx.client_metadata();
952 EXPECT_EQ(meta1.second,
953 ToString(client_initial_metadata.find(meta1.first)->second));
954 EXPECT_EQ(meta2.second,
955 ToString(client_initial_metadata.find(meta2.first)->second));
956 EXPECT_EQ(meta3.second,
957 ToString(client_initial_metadata.find(meta3.first)->second));
958 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
960 send_response.set_message(recv_request.message());
961 response_writer.Finish(send_response, Status::OK, tag(3));
962 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
964 EXPECT_EQ(send_response.message(), recv_response.message());
965 EXPECT_TRUE(recv_status.ok());
968 TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
971 EchoRequest send_request;
972 EchoRequest recv_request;
973 EchoResponse send_response;
974 EchoResponse recv_response;
977 ClientContext cli_ctx;
978 ServerContext srv_ctx;
979 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
981 send_request.set_message(GetParam().message_content);
982 std::pair<std::string, std::string> meta1("key1", "val1");
983 std::pair<std::string, std::string> meta2("key2", "val2");
985 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
986 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
987 response_reader->ReadInitialMetadata(tag(4));
989 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
991 Verifier().Expect(2, true).Verify(cq_.get());
992 EXPECT_EQ(send_request.message(), recv_request.message());
993 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
994 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
995 response_writer.SendInitialMetadata(tag(3));
996 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
997 const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
998 EXPECT_EQ(meta1.second,
999 ToString(server_initial_metadata.find(meta1.first)->second));
1000 EXPECT_EQ(meta2.second,
1001 ToString(server_initial_metadata.find(meta2.first)->second));
1002 EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
1004 send_response.set_message(recv_request.message());
1005 response_writer.Finish(send_response, Status::OK, tag(5));
1006 response_reader->Finish(&recv_response, &recv_status, tag(6));
1007 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1009 EXPECT_EQ(send_response.message(), recv_response.message());
1010 EXPECT_TRUE(recv_status.ok());
1014 TEST_P(AsyncEnd2endTest, ServerInitialMetadataServerStreaming) {
1016 EchoRequest send_request;
1017 EchoRequest recv_request;
1018 EchoResponse send_response;
1019 EchoResponse recv_response;
1021 ClientContext cli_ctx;
1022 ServerContext srv_ctx;
1023 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1025 std::pair<::std::string, ::std::string> meta1("key1", "val1");
1026 std::pair<::std::string, ::std::string> meta2("key2", "val2");
1028 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1029 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
1030 cli_stream->ReadInitialMetadata(tag(11));
1031 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1032 cq_.get(), cq_.get(), tag(2));
1034 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1036 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
1037 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
1038 srv_stream.SendInitialMetadata(tag(10));
1039 Verifier().Expect(10, true).Expect(11, true).Verify(cq_.get());
1040 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1041 EXPECT_EQ(meta1.second,
1042 ToString(server_initial_metadata.find(meta1.first)->second));
1043 EXPECT_EQ(meta2.second,
1044 ToString(server_initial_metadata.find(meta2.first)->second));
1045 EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
1047 srv_stream.Write(send_response, tag(3));
1049 cli_stream->Read(&recv_response, tag(4));
1050 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1052 srv_stream.Write(send_response, tag(5));
1053 cli_stream->Read(&recv_response, tag(6));
1054 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1056 srv_stream.Finish(Status::OK, tag(7));
1057 cli_stream->Read(&recv_response, tag(8));
1058 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
1060 cli_stream->Finish(&recv_status, tag(9));
1061 Verifier().Expect(9, true).Verify(cq_.get());
1063 EXPECT_TRUE(recv_status.ok());
1067 // Test for server initial metadata being sent implicitly
1068 TEST_P(AsyncEnd2endTest, ServerInitialMetadataServerStreamingImplicit) {
1070 EchoRequest send_request;
1071 EchoRequest recv_request;
1072 EchoResponse send_response;
1073 EchoResponse recv_response;
1075 ClientContext cli_ctx;
1076 ServerContext srv_ctx;
1077 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1079 send_request.set_message(GetParam().message_content);
1080 std::pair<::std::string, ::std::string> meta1("key1", "val1");
1081 std::pair<::std::string, ::std::string> meta2("key2", "val2");
1083 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1084 stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
1085 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1086 cq_.get(), cq_.get(), tag(2));
1088 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1089 EXPECT_EQ(send_request.message(), recv_request.message());
1091 srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
1092 srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
1093 send_response.set_message(recv_request.message());
1094 srv_stream.Write(send_response, tag(3));
1096 cli_stream->Read(&recv_response, tag(4));
1097 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1098 EXPECT_EQ(send_response.message(), recv_response.message());
1100 auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1101 EXPECT_EQ(meta1.second,
1102 ToString(server_initial_metadata.find(meta1.first)->second));
1103 EXPECT_EQ(meta2.second,
1104 ToString(server_initial_metadata.find(meta2.first)->second));
1105 EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
1107 srv_stream.Write(send_response, tag(5));
1108 cli_stream->Read(&recv_response, tag(6));
1109 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1111 srv_stream.Finish(Status::OK, tag(7));
1112 cli_stream->Read(&recv_response, tag(8));
1113 Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
1115 cli_stream->Finish(&recv_status, tag(9));
1116 Verifier().Expect(9, true).Verify(cq_.get());
1118 EXPECT_TRUE(recv_status.ok());
1121 TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
1124 EchoRequest send_request;
1125 EchoRequest recv_request;
1126 EchoResponse send_response;
1127 EchoResponse recv_response;
1130 ClientContext cli_ctx;
1131 ServerContext srv_ctx;
1132 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1134 send_request.set_message(GetParam().message_content);
1135 std::pair<std::string, std::string> meta1("key1", "val1");
1136 std::pair<std::string, std::string> meta2("key2", "val2");
1138 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1139 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1140 response_reader->Finish(&recv_response, &recv_status, tag(5));
1142 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1144 Verifier().Expect(2, true).Verify(cq_.get());
1145 EXPECT_EQ(send_request.message(), recv_request.message());
1146 response_writer.SendInitialMetadata(tag(3));
1147 Verifier().Expect(3, true).Verify(cq_.get());
1149 send_response.set_message(recv_request.message());
1150 srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
1151 srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
1152 response_writer.Finish(send_response, Status::OK, tag(4));
1154 Verifier().Expect(4, true).Expect(5, true).Verify(cq_.get());
1156 EXPECT_EQ(send_response.message(), recv_response.message());
1157 EXPECT_TRUE(recv_status.ok());
1158 const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1159 EXPECT_EQ(meta1.second,
1160 ToString(server_trailing_metadata.find(meta1.first)->second));
1161 EXPECT_EQ(meta2.second,
1162 ToString(server_trailing_metadata.find(meta2.first)->second));
1163 EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
1166 TEST_P(AsyncEnd2endTest, MetadataRpc) {
1169 EchoRequest send_request;
1170 EchoRequest recv_request;
1171 EchoResponse send_response;
1172 EchoResponse recv_response;
1175 ClientContext cli_ctx;
1176 ServerContext srv_ctx;
1177 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1179 send_request.set_message(GetParam().message_content);
1180 std::pair<std::string, std::string> meta1("key1", "val1");
1181 std::pair<std::string, std::string> meta2(
1183 std::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
1184 std::pair<std::string, std::string> meta3("key3", "val3");
1185 std::pair<std::string, std::string> meta6(
1187 std::string("\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
1189 std::pair<std::string, std::string> meta5("key5", "val5");
1190 std::pair<std::string, std::string> meta4(
1193 "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
1195 cli_ctx.AddMetadata(meta1.first, meta1.second);
1196 cli_ctx.AddMetadata(meta2.first, meta2.second);
1198 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1199 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1200 response_reader->ReadInitialMetadata(tag(4));
1202 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1204 Verifier().Expect(2, true).Verify(cq_.get());
1205 EXPECT_EQ(send_request.message(), recv_request.message());
1206 const auto& client_initial_metadata = srv_ctx.client_metadata();
1207 EXPECT_EQ(meta1.second,
1208 ToString(client_initial_metadata.find(meta1.first)->second));
1209 EXPECT_EQ(meta2.second,
1210 ToString(client_initial_metadata.find(meta2.first)->second));
1211 EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
1213 srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
1214 srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
1215 response_writer.SendInitialMetadata(tag(3));
1216 Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1217 const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1218 EXPECT_EQ(meta3.second,
1219 ToString(server_initial_metadata.find(meta3.first)->second));
1220 EXPECT_EQ(meta4.second,
1221 ToString(server_initial_metadata.find(meta4.first)->second));
1222 EXPECT_GE(server_initial_metadata.size(), static_cast<size_t>(2));
1224 send_response.set_message(recv_request.message());
1225 srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
1226 srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
1227 response_writer.Finish(send_response, Status::OK, tag(5));
1228 response_reader->Finish(&recv_response, &recv_status, tag(6));
1230 Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1232 EXPECT_EQ(send_response.message(), recv_response.message());
1233 EXPECT_TRUE(recv_status.ok());
1234 const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1235 EXPECT_EQ(meta5.second,
1236 ToString(server_trailing_metadata.find(meta5.first)->second));
1237 EXPECT_EQ(meta6.second,
1238 ToString(server_trailing_metadata.find(meta6.first)->second));
1239 EXPECT_GE(server_trailing_metadata.size(), static_cast<size_t>(2));
1242 // Server uses AsyncNotifyWhenDone API to check for cancellation
1243 TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
1246 EchoRequest send_request;
1247 EchoRequest recv_request;
1248 EchoResponse send_response;
1249 EchoResponse recv_response;
1252 ClientContext cli_ctx;
1253 ServerContext srv_ctx;
1254 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1256 send_request.set_message(GetParam().message_content);
1257 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1258 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1259 response_reader->Finish(&recv_response, &recv_status, tag(4));
1261 srv_ctx.AsyncNotifyWhenDone(tag(5));
1262 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1265 Verifier().Expect(2, true).Verify(cq_.get());
1266 EXPECT_EQ(send_request.message(), recv_request.message());
1268 cli_ctx.TryCancel();
1269 Verifier().Expect(5, true).Expect(4, true).Verify(cq_.get());
1270 EXPECT_TRUE(srv_ctx.IsCancelled());
1272 EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
1275 // Server uses AsyncNotifyWhenDone API to check for normal finish
1276 TEST_P(AsyncEnd2endTest, ServerCheckDone) {
1279 EchoRequest send_request;
1280 EchoRequest recv_request;
1281 EchoResponse send_response;
1282 EchoResponse recv_response;
1285 ClientContext cli_ctx;
1286 ServerContext srv_ctx;
1287 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1289 send_request.set_message(GetParam().message_content);
1290 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1291 stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1292 response_reader->Finish(&recv_response, &recv_status, tag(4));
1294 srv_ctx.AsyncNotifyWhenDone(tag(5));
1295 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1298 Verifier().Expect(2, true).Verify(cq_.get());
1299 EXPECT_EQ(send_request.message(), recv_request.message());
1301 send_response.set_message(recv_request.message());
1302 response_writer.Finish(send_response, Status::OK, tag(3));
1303 Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get());
1304 EXPECT_FALSE(srv_ctx.IsCancelled());
1306 EXPECT_EQ(send_response.message(), recv_response.message());
1307 EXPECT_TRUE(recv_status.ok());
1310 TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
1311 ChannelArguments args;
1312 const auto& channel_creds = GetCredentialsProvider()->GetChannelCredentials(
1313 GetParam().credentials_type, &args);
1314 std::shared_ptr<Channel> channel =
1315 !(GetParam().inproc) ? ::grpc::CreateCustomChannel(server_address_.str(),
1316 channel_creds, args)
1317 : server_->InProcessChannel(args);
1318 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1319 stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
1320 EchoRequest send_request;
1321 EchoResponse recv_response;
1324 ClientContext cli_ctx;
1325 send_request.set_message(GetParam().message_content);
1326 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1327 stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
1329 response_reader->Finish(&recv_response, &recv_status, tag(4));
1330 Verifier().Expect(4, true).Verify(cq_.get());
1332 EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
1333 EXPECT_EQ("", recv_status.error_message());
1336 // This class is for testing scenarios where RPCs are cancelled on the server
1337 // by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
1338 // API to check for cancellation
1339 class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
1343 CANCEL_BEFORE_PROCESSING,
1344 CANCEL_DURING_PROCESSING,
1345 CANCEL_AFTER_PROCESSING
1346 } ServerTryCancelRequestPhase;
1348 // Helper for testing client-streaming RPCs which are cancelled on the server.
1349 // Depending on the value of server_try_cancel parameter, this will test one
1350 // of the following three scenarios:
1351 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
1352 // any messages from the client
1354 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1355 // messages from the client
1357 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1358 // messages from the client (but before sending any status back to the
1360 void TestClientStreamingServerCancel(
1361 ServerTryCancelRequestPhase server_try_cancel) {
1364 EchoRequest recv_request;
1365 EchoResponse send_response;
1366 EchoResponse recv_response;
1369 ClientContext cli_ctx;
1370 ServerContext srv_ctx;
1371 ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1373 // Initiate the 'RequestStream' call on client
1374 CompletionQueue cli_cq;
1376 std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
1377 stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq, tag(1)));
1379 // On the server, request to be notified of 'RequestStream' calls
1380 // and receive the 'RequestStream' call just made by the client
1381 srv_ctx.AsyncNotifyWhenDone(tag(11));
1382 service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1384 std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1385 Verifier().Expect(2, true).Verify(cq_.get());
1388 bool expected_server_cq_result = true;
1389 bool expected_client_cq_result = true;
1391 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1392 srv_ctx.TryCancel();
1393 Verifier().Expect(11, true).Verify(cq_.get());
1394 EXPECT_TRUE(srv_ctx.IsCancelled());
1396 // Since cancellation is done before server reads any results, we know
1397 // for sure that all server cq results will return false from this
1399 expected_server_cq_result = false;
1400 expected_client_cq_result = false;
1403 bool ignore_client_cq_result =
1404 (server_try_cancel == CANCEL_DURING_PROCESSING) ||
1405 (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1407 std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1408 &ignore_client_cq_result] {
1409 EchoRequest send_request;
1410 // Client sends 3 messages (tags 3, 4 and 5)
1411 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1412 send_request.set_message("Ping " + std::to_string(tag_idx));
1413 cli_stream->Write(send_request, tag(tag_idx));
1415 .Expect(tag_idx, expected_client_cq_result)
1416 .Verify(&cli_cq, ignore_client_cq_result);
1418 cli_stream->WritesDone(tag(6));
1419 // Ignore ok on WritesDone since cancel can affect it
1421 .Expect(6, expected_client_cq_result)
1422 .Verify(&cli_cq, ignore_client_cq_result);
1425 bool ignore_cq_result = false;
1426 bool want_done_tag = false;
1427 std::thread* server_try_cancel_thd = nullptr;
1429 auto verif = Verifier();
1431 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1432 server_try_cancel_thd =
1433 new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1434 // Server will cancel the RPC in a parallel thread while reading the
1435 // requests from the client. Since the cancellation can happen at anytime,
1436 // some of the cq results (i.e those until cancellation) might be true but
1437 // its non deterministic. So better to ignore the cq results
1438 ignore_cq_result = true;
1439 // Expect that we might possibly see the done tag that
1440 // indicates cancellation completion in this case
1441 want_done_tag = true;
1442 verif.Expect(11, true);
1445 // Server reads 3 messages (tags 6, 7 and 8)
1446 // But if want_done_tag is true, we might also see tag 11
1447 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1448 srv_stream.Read(&recv_request, tag(tag_idx));
1449 // Note that we'll add something to the verifier and verify that
1450 // something was seen, but it might be tag 11 and not what we
1452 int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
1453 .Next(cq_.get(), ignore_cq_result);
1454 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1455 if (got_tag == 11) {
1456 EXPECT_TRUE(srv_ctx.IsCancelled());
1457 want_done_tag = false;
1458 // Now get the other entry that we were waiting on
1459 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1465 if (server_try_cancel_thd != nullptr) {
1466 server_try_cancel_thd->join();
1467 delete server_try_cancel_thd;
1470 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1471 srv_ctx.TryCancel();
1472 want_done_tag = true;
1473 verif.Expect(11, true);
1476 if (want_done_tag) {
1477 verif.Verify(cq_.get());
1478 EXPECT_TRUE(srv_ctx.IsCancelled());
1479 want_done_tag = false;
1482 // The RPC has been cancelled at this point for sure (i.e irrespective of
1483 // the value of `server_try_cancel` is). So, from this point forward, we
1484 // know that cq results are supposed to return false on server.
1486 // Server sends the final message and cancelled status (but the RPC is
1487 // already cancelled at this point. So we expect the operation to fail)
1488 srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
1489 Verifier().Expect(9, false).Verify(cq_.get());
1491 // Client will see the cancellation
1492 cli_stream->Finish(&recv_status, tag(10));
1493 Verifier().Expect(10, true).Verify(&cli_cq);
1494 EXPECT_FALSE(recv_status.ok());
1495 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1500 while (cli_cq.Next(&phony_tag, &phony_ok)) {
1504 // Helper for testing server-streaming RPCs which are cancelled on the server.
1505 // Depending on the value of server_try_cancel parameter, this will test one
1506 // of the following three scenarios:
1507 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending
1508 // any messages to the client
1510 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending
1511 // messages to the client
1513 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all
1514 // messages to the client (but before sending any status back to the
1516 void TestServerStreamingServerCancel(
1517 ServerTryCancelRequestPhase server_try_cancel) {
1520 EchoRequest send_request;
1521 EchoRequest recv_request;
1522 EchoResponse send_response;
1524 ClientContext cli_ctx;
1525 ServerContext srv_ctx;
1526 ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1528 send_request.set_message("Ping");
1529 // Initiate the 'ResponseStream' call on the client
1530 CompletionQueue cli_cq;
1531 std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1532 stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq, tag(1)));
1533 // On the server, request to be notified of 'ResponseStream' calls and
1534 // receive the call just made by the client
1535 srv_ctx.AsyncNotifyWhenDone(tag(11));
1536 service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1537 cq_.get(), cq_.get(), tag(2));
1539 std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1540 Verifier().Expect(2, true).Verify(cq_.get());
1543 EXPECT_EQ(send_request.message(), recv_request.message());
1545 bool expected_cq_result = true;
1546 bool ignore_cq_result = false;
1547 bool want_done_tag = false;
1548 bool expected_client_cq_result = true;
1549 bool ignore_client_cq_result =
1550 (server_try_cancel != CANCEL_BEFORE_PROCESSING);
1552 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1553 srv_ctx.TryCancel();
1554 Verifier().Expect(11, true).Verify(cq_.get());
1555 EXPECT_TRUE(srv_ctx.IsCancelled());
1557 // We know for sure that all cq results will be false from this point
1558 // since the server cancelled the RPC
1559 expected_cq_result = false;
1560 expected_client_cq_result = false;
1563 std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1564 &ignore_client_cq_result] {
1565 // Client attempts to read the three messages from the server
1566 for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1567 EchoResponse recv_response;
1568 cli_stream->Read(&recv_response, tag(tag_idx));
1570 .Expect(tag_idx, expected_client_cq_result)
1571 .Verify(&cli_cq, ignore_client_cq_result);
1575 std::thread* server_try_cancel_thd = nullptr;
1577 auto verif = Verifier();
1579 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1580 server_try_cancel_thd =
1581 new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1583 // Server will cancel the RPC in a parallel thread while writing responses
1584 // to the client. Since the cancellation can happen at anytime, some of
1585 // the cq results (i.e those until cancellation) might be true but it is
1586 // non deterministic. So better to ignore the cq results
1587 ignore_cq_result = true;
1588 // Expect that we might possibly see the done tag that
1589 // indicates cancellation completion in this case
1590 want_done_tag = true;
1591 verif.Expect(11, true);
1594 // Server sends three messages (tags 3, 4 and 5)
1595 // But if want_done tag is true, we might also see tag 11
1596 for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1597 send_response.set_message("Pong " + std::to_string(tag_idx));
1598 srv_stream.Write(send_response, tag(tag_idx));
1599 // Note that we'll add something to the verifier and verify that
1600 // something was seen, but it might be tag 11 and not what we
1602 int got_tag = verif.Expect(tag_idx, expected_cq_result)
1603 .Next(cq_.get(), ignore_cq_result);
1604 GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1605 if (got_tag == 11) {
1606 EXPECT_TRUE(srv_ctx.IsCancelled());
1607 want_done_tag = false;
1608 // Now get the other entry that we were waiting on
1609 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1613 if (server_try_cancel_thd != nullptr) {
1614 server_try_cancel_thd->join();
1615 delete server_try_cancel_thd;
1618 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1619 srv_ctx.TryCancel();
1620 want_done_tag = true;
1621 verif.Expect(11, true);
1624 if (want_done_tag) {
1625 verif.Verify(cq_.get());
1626 EXPECT_TRUE(srv_ctx.IsCancelled());
1627 want_done_tag = false;
1632 // The RPC has been cancelled at this point for sure (i.e irrespective of
1633 // the value of `server_try_cancel` is). So, from this point forward, we
1634 // know that cq results are supposed to return false on server.
1636 // Server finishes the stream (but the RPC is already cancelled)
1637 srv_stream.Finish(Status::CANCELLED, tag(9));
1638 Verifier().Expect(9, false).Verify(cq_.get());
1640 // Client will see the cancellation
1641 cli_stream->Finish(&recv_status, tag(10));
1642 Verifier().Expect(10, true).Verify(&cli_cq);
1643 EXPECT_FALSE(recv_status.ok());
1644 EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1649 while (cli_cq.Next(&phony_tag, &phony_ok)) {
1653 // Helper for testing bidirectinal-streaming RPCs which are cancelled on the
1656 // Depending on the value of server_try_cancel parameter, this will
1657 // test one of the following three scenarios:
1658 // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
1659 // writing any messages from/to the client
1661 // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1662 // messages from the client
1664 // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1665 // messages from the client (but before sending any status back to the
1667 void TestBidiStreamingServerCancel(
1668 ServerTryCancelRequestPhase server_try_cancel) {
1671 EchoRequest send_request;
1672 EchoRequest recv_request;
1673 EchoResponse send_response;
1674 EchoResponse recv_response;
1676 ClientContext cli_ctx;
1677 ServerContext srv_ctx;
1678 ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1680 // Initiate the call from the client side
1681 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
1682 cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
1684 // On the server, request to be notified of the 'BidiStream' call and
1685 // receive the call just made by the client
1686 srv_ctx.AsyncNotifyWhenDone(tag(11));
1687 service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1689 Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1691 auto verif = Verifier();
1693 // Client sends the first and the only message
1694 send_request.set_message("Ping");
1695 cli_stream->Write(send_request, tag(3));
1696 verif.Expect(3, true);
1698 bool expected_cq_result = true;
1699 bool ignore_cq_result = false;
1700 bool want_done_tag = false;
1702 int got_tag, got_tag2;
1703 bool tag_3_done = false;
1705 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1706 srv_ctx.TryCancel();
1707 verif.Expect(11, true);
1708 // We know for sure that all server cq results will be false from
1709 // this point since the server cancelled the RPC. However, we can't
1710 // say for sure about the client
1711 expected_cq_result = false;
1712 ignore_cq_result = true;
1715 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1716 GPR_ASSERT(((got_tag == 3) && !tag_3_done) || (got_tag == 11));
1720 } while (got_tag != 11);
1721 EXPECT_TRUE(srv_ctx.IsCancelled());
1724 std::thread* server_try_cancel_thd = nullptr;
1726 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1727 server_try_cancel_thd =
1728 new std::thread([&srv_ctx] { srv_ctx.TryCancel(); });
1730 // Since server is going to cancel the RPC in a parallel thread, some of
1731 // the cq results (i.e those until the cancellation) might be true. Since
1732 // that number is non-deterministic, it is better to ignore the cq results
1733 ignore_cq_result = true;
1734 // Expect that we might possibly see the done tag that
1735 // indicates cancellation completion in this case
1736 want_done_tag = true;
1737 verif.Expect(11, true);
1740 srv_stream.Read(&recv_request, tag(4));
1741 verif.Expect(4, expected_cq_result);
1742 got_tag = tag_3_done ? 3 : verif.Next(cq_.get(), ignore_cq_result);
1743 got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1744 GPR_ASSERT((got_tag == 3) || (got_tag == 4) ||
1745 (got_tag == 11 && want_done_tag));
1746 GPR_ASSERT((got_tag2 == 3) || (got_tag2 == 4) ||
1747 (got_tag2 == 11 && want_done_tag));
1748 // If we get 3 and 4, we don't need to wait for 11, but if
1749 // we get 11, we should also clear 3 and 4
1750 if (got_tag + got_tag2 != 7) {
1751 EXPECT_TRUE(srv_ctx.IsCancelled());
1752 want_done_tag = false;
1753 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1754 GPR_ASSERT((got_tag == 3) || (got_tag == 4));
1757 send_response.set_message("Pong");
1758 srv_stream.Write(send_response, tag(5));
1759 verif.Expect(5, expected_cq_result);
1761 cli_stream->Read(&recv_response, tag(6));
1762 verif.Expect(6, expected_cq_result);
1763 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1764 got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1765 GPR_ASSERT((got_tag == 5) || (got_tag == 6) ||
1766 (got_tag == 11 && want_done_tag));
1767 GPR_ASSERT((got_tag2 == 5) || (got_tag2 == 6) ||
1768 (got_tag2 == 11 && want_done_tag));
1769 // If we get 5 and 6, we don't need to wait for 11, but if
1770 // we get 11, we should also clear 5 and 6
1771 if (got_tag + got_tag2 != 11) {
1772 EXPECT_TRUE(srv_ctx.IsCancelled());
1773 want_done_tag = false;
1774 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1775 GPR_ASSERT((got_tag == 5) || (got_tag == 6));
1778 // This is expected to succeed in all cases
1779 cli_stream->WritesDone(tag(7));
1780 verif.Expect(7, true);
1781 // TODO(vjpai): Consider whether the following is too flexible
1782 // or whether it should just be reset to ignore_cq_result
1783 bool ignore_cq_wd_result =
1784 ignore_cq_result || (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1785 got_tag = verif.Next(cq_.get(), ignore_cq_wd_result);
1786 GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
1787 if (got_tag == 11) {
1788 EXPECT_TRUE(srv_ctx.IsCancelled());
1789 want_done_tag = false;
1790 // Now get the other entry that we were waiting on
1791 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_wd_result), 7);
1794 // This is expected to fail in all cases i.e for all values of
1795 // server_try_cancel. This is because at this point, either there are no
1796 // more msgs from the client (because client called WritesDone) or the RPC
1797 // is cancelled on the server
1798 srv_stream.Read(&recv_request, tag(8));
1799 verif.Expect(8, false);
1800 got_tag = verif.Next(cq_.get(), ignore_cq_result);
1801 GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
1802 if (got_tag == 11) {
1803 EXPECT_TRUE(srv_ctx.IsCancelled());
1804 want_done_tag = false;
1805 // Now get the other entry that we were waiting on
1806 EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
1809 if (server_try_cancel_thd != nullptr) {
1810 server_try_cancel_thd->join();
1811 delete server_try_cancel_thd;
1814 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1815 srv_ctx.TryCancel();
1816 want_done_tag = true;
1817 verif.Expect(11, true);
1820 if (want_done_tag) {
1821 verif.Verify(cq_.get());
1822 EXPECT_TRUE(srv_ctx.IsCancelled());
1823 want_done_tag = false;
1826 // The RPC has been cancelled at this point for sure (i.e irrespective of
1827 // the value of `server_try_cancel` is). So, from this point forward, we
1828 // know that cq results are supposed to return false on server.
1830 srv_stream.Finish(Status::CANCELLED, tag(9));
1831 Verifier().Expect(9, false).Verify(cq_.get());
1833 cli_stream->Finish(&recv_status, tag(10));
1834 Verifier().Expect(10, true).Verify(cq_.get());
1835 EXPECT_FALSE(recv_status.ok());
1836 EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1840 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
1841 TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1844 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
1845 TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING);
1848 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
1849 TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1852 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
1853 TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1856 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
1857 TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING);
1860 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
1861 TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1864 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
1865 TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1868 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
1869 TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING);
1872 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
1873 TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1876 std::vector<TestScenario> CreateTestScenarios(bool /*test_secure*/,
1877 bool test_message_size_limit) {
1878 std::vector<TestScenario> scenarios;
1879 std::vector<std::string> credentials_types;
1880 std::vector<std::string> messages;
1882 auto insec_ok = [] {
1883 // Only allow insecure credentials type when it is registered with the
1884 // provider. User may create providers that do not have insecure.
1885 return GetCredentialsProvider()->GetChannelCredentials(
1886 kInsecureCredentialsType, nullptr) != nullptr;
1890 credentials_types.push_back(kInsecureCredentialsType);
1892 auto sec_list = GetCredentialsProvider()->GetSecureCredentialsTypeList();
1893 for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
1894 credentials_types.push_back(*sec);
1896 GPR_ASSERT(!credentials_types.empty());
1898 messages.push_back("Hello");
1899 if (test_message_size_limit) {
1900 for (size_t k = 1; k < GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH / 1024;
1902 std::string big_msg;
1903 for (size_t i = 0; i < k * 1024; ++i) {
1904 char c = 'a' + (i % 26);
1907 messages.push_back(big_msg);
1909 if (!BuiltUnderMsan()) {
1910 // 4MB message processing with SSL is very slow under msan
1911 // (causes timeouts) and doesn't really increase the signal from tests.
1912 // Reserve 100 bytes for other fields of the message proto.
1914 std::string(GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH - 100, 'a'));
1918 // TODO (sreek) Renable tests with health check service after the issue
1919 // https://github.com/grpc/grpc/issues/11223 is resolved
1920 for (auto health_check_service : {false}) {
1921 for (auto msg = messages.begin(); msg != messages.end(); msg++) {
1922 for (auto cred = credentials_types.begin();
1923 cred != credentials_types.end(); ++cred) {
1924 scenarios.emplace_back(false, *cred, health_check_service, *msg);
1927 scenarios.emplace_back(true, kInsecureCredentialsType,
1928 health_check_service, *msg);
1935 INSTANTIATE_TEST_SUITE_P(AsyncEnd2end, AsyncEnd2endTest,
1936 ::testing::ValuesIn(CreateTestScenarios(true, true)));
1937 INSTANTIATE_TEST_SUITE_P(AsyncEnd2endServerTryCancel,
1938 AsyncEnd2endServerTryCancelTest,
1939 ::testing::ValuesIn(CreateTestScenarios(false,
1943 } // namespace testing
1946 int main(int argc, char** argv) {
1947 // Change the backup poll interval from 5s to 100ms to speed up the
1948 // ReconnectChannel test
1949 GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, 100);
1950 grpc::testing::TestEnvironment env(argc, argv);
1951 ::testing::InitGoogleTest(&argc, argv);
1952 int ret = RUN_ALL_TESTS();