e09f54dcc3f766ac79e1af41c4fca7d660c510dc
[platform/upstream/grpc.git] / test / cpp / end2end / async_end2end_test.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <cinttypes>
20 #include <memory>
21 #include <thread>
22
23 #include <grpc/grpc.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/time.h>
27 #include <grpcpp/channel.h>
28 #include <grpcpp/client_context.h>
29 #include <grpcpp/create_channel.h>
30 #include <grpcpp/ext/health_check_service_server_builder_option.h>
31 #include <grpcpp/server.h>
32 #include <grpcpp/server_builder.h>
33 #include <grpcpp/server_context.h>
34
35 #include "src/core/lib/gpr/env.h"
36 #include "src/core/lib/gpr/tls.h"
37 #include "src/core/lib/iomgr/port.h"
38 #include "src/proto/grpc/health/v1/health.grpc.pb.h"
39 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
40 #include "src/proto/grpc/testing/echo.grpc.pb.h"
41 #include "test/core/util/port.h"
42 #include "test/core/util/test_config.h"
43 #include "test/cpp/util/string_ref_helper.h"
44 #include "test/cpp/util/test_credentials_provider.h"
45
46 #include <gtest/gtest.h>
47
48 using grpc::testing::EchoRequest;
49 using grpc::testing::EchoResponse;
50 using grpc::testing::kTlsCredentialsType;
51 using std::chrono::system_clock;
52
53 namespace grpc {
54 namespace testing {
55
56 namespace {
57
58 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
59 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
60
61 class Verifier {
62  public:
63   Verifier() : lambda_run_(false) {}
64   // Expect sets the expected ok value for a specific tag
65   Verifier& Expect(int i, bool expect_ok) {
66     return ExpectUnless(i, expect_ok, false);
67   }
68   // ExpectUnless sets the expected ok value for a specific tag
69   // unless the tag was already marked seen (as a result of ExpectMaybe)
70   Verifier& ExpectUnless(int i, bool expect_ok, bool seen) {
71     if (!seen) {
72       expectations_[tag(i)] = expect_ok;
73     }
74     return *this;
75   }
76   // ExpectMaybe sets the expected ok value for a specific tag, but does not
77   // require it to appear
78   // If it does, sets *seen to true
79   Verifier& ExpectMaybe(int i, bool expect_ok, bool* seen) {
80     if (!*seen) {
81       maybe_expectations_[tag(i)] = MaybeExpect{expect_ok, seen};
82     }
83     return *this;
84   }
85
86   // Next waits for 1 async tag to complete, checks its
87   // expectations, and returns the tag
88   int Next(CompletionQueue* cq, bool ignore_ok) {
89     bool ok;
90     void* got_tag;
91     EXPECT_TRUE(cq->Next(&got_tag, &ok));
92     GotTag(got_tag, ok, ignore_ok);
93     return detag(got_tag);
94   }
95
96   template <typename T>
97   CompletionQueue::NextStatus DoOnceThenAsyncNext(
98       CompletionQueue* cq, void** got_tag, bool* ok, T deadline,
99       std::function<void(void)> lambda) {
100     if (lambda_run_) {
101       return cq->AsyncNext(got_tag, ok, deadline);
102     } else {
103       lambda_run_ = true;
104       return cq->DoThenAsyncNext(lambda, got_tag, ok, deadline);
105     }
106   }
107
108   // Verify keeps calling Next until all currently set
109   // expected tags are complete
110   void Verify(CompletionQueue* cq) { Verify(cq, false); }
111
112   // This version of Verify allows optionally ignoring the
113   // outcome of the expectation
114   void Verify(CompletionQueue* cq, bool ignore_ok) {
115     GPR_ASSERT(!expectations_.empty() || !maybe_expectations_.empty());
116     while (!expectations_.empty()) {
117       Next(cq, ignore_ok);
118     }
119   }
120
121   // This version of Verify stops after a certain deadline
122   void Verify(CompletionQueue* cq,
123               std::chrono::system_clock::time_point deadline) {
124     if (expectations_.empty()) {
125       bool ok;
126       void* got_tag;
127       EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
128                 CompletionQueue::TIMEOUT);
129     } else {
130       while (!expectations_.empty()) {
131         bool ok;
132         void* got_tag;
133         EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
134                   CompletionQueue::GOT_EVENT);
135         GotTag(got_tag, ok, false);
136       }
137     }
138   }
139
140   // This version of Verify stops after a certain deadline, and uses the
141   // DoThenAsyncNext API
142   // to call the lambda
143   void Verify(CompletionQueue* cq,
144               std::chrono::system_clock::time_point deadline,
145               const std::function<void(void)>& lambda) {
146     if (expectations_.empty()) {
147       bool ok;
148       void* got_tag;
149       EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
150                 CompletionQueue::TIMEOUT);
151     } else {
152       while (!expectations_.empty()) {
153         bool ok;
154         void* got_tag;
155         EXPECT_EQ(DoOnceThenAsyncNext(cq, &got_tag, &ok, deadline, lambda),
156                   CompletionQueue::GOT_EVENT);
157         GotTag(got_tag, ok, false);
158       }
159     }
160   }
161
162  private:
163   void GotTag(void* got_tag, bool ok, bool ignore_ok) {
164     auto it = expectations_.find(got_tag);
165     if (it != expectations_.end()) {
166       if (!ignore_ok) {
167         EXPECT_EQ(it->second, ok);
168       }
169       expectations_.erase(it);
170     } else {
171       auto it2 = maybe_expectations_.find(got_tag);
172       if (it2 != maybe_expectations_.end()) {
173         if (it2->second.seen != nullptr) {
174           EXPECT_FALSE(*it2->second.seen);
175           *it2->second.seen = true;
176         }
177         if (!ignore_ok) {
178           EXPECT_EQ(it2->second.ok, ok);
179         }
180       } else {
181         gpr_log(GPR_ERROR, "Unexpected tag: %p", got_tag);
182         abort();
183       }
184     }
185   }
186
187   struct MaybeExpect {
188     bool ok;
189     bool* seen;
190   };
191
192   std::map<void*, bool> expectations_;
193   std::map<void*, MaybeExpect> maybe_expectations_;
194   bool lambda_run_;
195 };
196
197 bool plugin_has_sync_methods(std::unique_ptr<ServerBuilderPlugin>& plugin) {
198   return plugin->has_sync_methods();
199 }
200
201 // This class disables the server builder plugins that may add sync services to
202 // the server. If there are sync services, UnimplementedRpc test will triger
203 // the sync unknown rpc routine on the server side, rather than the async one
204 // that needs to be tested here.
205 class ServerBuilderSyncPluginDisabler : public ::grpc::ServerBuilderOption {
206  public:
207   void UpdateArguments(ChannelArguments* arg) override {}
208
209   void UpdatePlugins(
210       std::vector<std::unique_ptr<ServerBuilderPlugin>>* plugins) override {
211     plugins->erase(std::remove_if(plugins->begin(), plugins->end(),
212                                   plugin_has_sync_methods),
213                    plugins->end());
214   }
215 };
216
217 class TestScenario {
218  public:
219   TestScenario(bool inproc_stub, const grpc::string& creds_type, bool hcs,
220                const grpc::string& content)
221       : inproc(inproc_stub),
222         health_check_service(hcs),
223         credentials_type(creds_type),
224         message_content(content) {}
225   void Log() const;
226   bool inproc;
227   bool health_check_service;
228   const grpc::string credentials_type;
229   const grpc::string message_content;
230 };
231
232 static std::ostream& operator<<(std::ostream& out,
233                                 const TestScenario& scenario) {
234   return out << "TestScenario{inproc=" << (scenario.inproc ? "true" : "false")
235              << ", credentials='" << scenario.credentials_type
236              << ", health_check_service="
237              << (scenario.health_check_service ? "true" : "false")
238              << "', message_size=" << scenario.message_content.size() << "}";
239 }
240
241 void TestScenario::Log() const {
242   std::ostringstream out;
243   out << *this;
244   gpr_log(GPR_DEBUG, "%s", out.str().c_str());
245 }
246
247 class HealthCheck : public health::v1::Health::Service {};
248
249 class AsyncEnd2endTest : public ::testing::TestWithParam<TestScenario> {
250  protected:
251   AsyncEnd2endTest() { GetParam().Log(); }
252
253   void SetUp() override {
254     port_ = grpc_pick_unused_port_or_die();
255     server_address_ << "localhost:" << port_;
256
257     // Setup server
258     BuildAndStartServer();
259   }
260
261   void TearDown() override {
262     server_->Shutdown();
263     void* ignored_tag;
264     bool ignored_ok;
265     cq_->Shutdown();
266     while (cq_->Next(&ignored_tag, &ignored_ok))
267       ;
268     stub_.reset();
269     grpc_recycle_unused_port(port_);
270   }
271
272   void BuildAndStartServer() {
273     ServerBuilder builder;
274     auto server_creds = GetCredentialsProvider()->GetServerCredentials(
275         GetParam().credentials_type);
276     builder.AddListeningPort(server_address_.str(), server_creds);
277     service_.reset(new grpc::testing::EchoTestService::AsyncService());
278     builder.RegisterService(service_.get());
279     if (GetParam().health_check_service) {
280       builder.RegisterService(&health_check_);
281     }
282     cq_ = builder.AddCompletionQueue();
283
284     // TODO(zyc): make a test option to choose wheather sync plugins should be
285     // deleted
286     std::unique_ptr<ServerBuilderOption> sync_plugin_disabler(
287         new ServerBuilderSyncPluginDisabler());
288     builder.SetOption(move(sync_plugin_disabler));
289     server_ = builder.BuildAndStart();
290   }
291
292   void ResetStub() {
293     ChannelArguments args;
294     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
295         GetParam().credentials_type, &args);
296     std::shared_ptr<Channel> channel =
297         !(GetParam().inproc)
298             ? CreateCustomChannel(server_address_.str(), channel_creds, args)
299             : server_->InProcessChannel(args);
300     stub_ = grpc::testing::EchoTestService::NewStub(channel);
301   }
302
303   void SendRpc(int num_rpcs) {
304     for (int i = 0; i < num_rpcs; i++) {
305       EchoRequest send_request;
306       EchoRequest recv_request;
307       EchoResponse send_response;
308       EchoResponse recv_response;
309       Status recv_status;
310
311       ClientContext cli_ctx;
312       ServerContext srv_ctx;
313       grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
314
315       send_request.set_message(GetParam().message_content);
316       std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
317           stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
318
319       service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
320                             cq_.get(), cq_.get(), tag(2));
321
322       response_reader->Finish(&recv_response, &recv_status, tag(4));
323
324       Verifier().Expect(2, true).Verify(cq_.get());
325       EXPECT_EQ(send_request.message(), recv_request.message());
326
327       send_response.set_message(recv_request.message());
328       response_writer.Finish(send_response, Status::OK, tag(3));
329       Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
330
331       EXPECT_EQ(send_response.message(), recv_response.message());
332       EXPECT_TRUE(recv_status.ok());
333     }
334   }
335
336   std::unique_ptr<ServerCompletionQueue> cq_;
337   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
338   std::unique_ptr<Server> server_;
339   std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
340   HealthCheck health_check_;
341   std::ostringstream server_address_;
342   int port_;
343 };
344
345 TEST_P(AsyncEnd2endTest, SimpleRpc) {
346   ResetStub();
347   SendRpc(1);
348 }
349
350 TEST_P(AsyncEnd2endTest, SequentialRpcs) {
351   ResetStub();
352   SendRpc(10);
353 }
354
355 TEST_P(AsyncEnd2endTest, ReconnectChannel) {
356   // GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS is set to 100ms in main()
357   if (GetParam().inproc) {
358     return;
359   }
360   int poller_slowdown_factor = 1;
361   // It needs 2 pollset_works to reconnect the channel with polling engine
362   // "poll"
363   char* s = gpr_getenv("GRPC_POLL_STRATEGY");
364   if (s != nullptr && 0 == strcmp(s, "poll")) {
365     poller_slowdown_factor = 2;
366   }
367   gpr_free(s);
368   ResetStub();
369   SendRpc(1);
370   server_->Shutdown();
371   void* ignored_tag;
372   bool ignored_ok;
373   cq_->Shutdown();
374   while (cq_->Next(&ignored_tag, &ignored_ok))
375     ;
376   BuildAndStartServer();
377   // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
378   // reconnect the channel.
379   gpr_sleep_until(gpr_time_add(
380       gpr_now(GPR_CLOCK_REALTIME),
381       gpr_time_from_millis(
382           300 * poller_slowdown_factor * grpc_test_slowdown_factor(),
383           GPR_TIMESPAN)));
384   SendRpc(1);
385 }
386
387 // We do not need to protect notify because the use is synchronized.
388 void ServerWait(Server* server, int* notify) {
389   server->Wait();
390   *notify = 1;
391 }
392 TEST_P(AsyncEnd2endTest, WaitAndShutdownTest) {
393   int notify = 0;
394   std::thread wait_thread(&ServerWait, server_.get(), &notify);
395   ResetStub();
396   SendRpc(1);
397   EXPECT_EQ(0, notify);
398   server_->Shutdown();
399   wait_thread.join();
400   EXPECT_EQ(1, notify);
401 }
402
403 TEST_P(AsyncEnd2endTest, ShutdownThenWait) {
404   ResetStub();
405   SendRpc(1);
406   std::thread t([this]() { server_->Shutdown(); });
407   server_->Wait();
408   t.join();
409 }
410
411 // Test a simple RPC using the async version of Next
412 TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
413   ResetStub();
414
415   EchoRequest send_request;
416   EchoRequest recv_request;
417   EchoResponse send_response;
418   EchoResponse recv_response;
419   Status recv_status;
420
421   ClientContext cli_ctx;
422   ServerContext srv_ctx;
423   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
424
425   send_request.set_message(GetParam().message_content);
426   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
427       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
428
429   std::chrono::system_clock::time_point time_now(
430       std::chrono::system_clock::now());
431   std::chrono::system_clock::time_point time_limit(
432       std::chrono::system_clock::now() + std::chrono::seconds(10));
433   Verifier().Verify(cq_.get(), time_now);
434   Verifier().Verify(cq_.get(), time_now);
435
436   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
437                         cq_.get(), tag(2));
438   response_reader->Finish(&recv_response, &recv_status, tag(4));
439
440   Verifier().Expect(2, true).Verify(cq_.get(), time_limit);
441   EXPECT_EQ(send_request.message(), recv_request.message());
442
443   send_response.set_message(recv_request.message());
444   response_writer.Finish(send_response, Status::OK, tag(3));
445   Verifier().Expect(3, true).Expect(4, true).Verify(
446       cq_.get(), std::chrono::system_clock::time_point::max());
447
448   EXPECT_EQ(send_response.message(), recv_response.message());
449   EXPECT_TRUE(recv_status.ok());
450 }
451
452 // Test a simple RPC using the async version of Next
453 TEST_P(AsyncEnd2endTest, DoThenAsyncNextRpc) {
454   ResetStub();
455
456   EchoRequest send_request;
457   EchoRequest recv_request;
458   EchoResponse send_response;
459   EchoResponse recv_response;
460   Status recv_status;
461
462   ClientContext cli_ctx;
463   ServerContext srv_ctx;
464   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
465
466   send_request.set_message(GetParam().message_content);
467   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
468       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
469
470   std::chrono::system_clock::time_point time_now(
471       std::chrono::system_clock::now());
472   std::chrono::system_clock::time_point time_limit(
473       std::chrono::system_clock::now() + std::chrono::seconds(10));
474   Verifier().Verify(cq_.get(), time_now);
475   Verifier().Verify(cq_.get(), time_now);
476
477   auto resp_writer_ptr = &response_writer;
478   auto lambda_2 = [&, this, resp_writer_ptr]() {
479     service_->RequestEcho(&srv_ctx, &recv_request, resp_writer_ptr, cq_.get(),
480                           cq_.get(), tag(2));
481   };
482   response_reader->Finish(&recv_response, &recv_status, tag(4));
483
484   Verifier().Expect(2, true).Verify(cq_.get(), time_limit, lambda_2);
485   EXPECT_EQ(send_request.message(), recv_request.message());
486
487   send_response.set_message(recv_request.message());
488   auto lambda_3 = [resp_writer_ptr, send_response]() {
489     resp_writer_ptr->Finish(send_response, Status::OK, tag(3));
490   };
491   Verifier().Expect(3, true).Expect(4, true).Verify(
492       cq_.get(), std::chrono::system_clock::time_point::max(), lambda_3);
493
494   EXPECT_EQ(send_response.message(), recv_response.message());
495   EXPECT_TRUE(recv_status.ok());
496 }
497
498 // Two pings and a final pong.
499 TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
500   ResetStub();
501
502   EchoRequest send_request;
503   EchoRequest recv_request;
504   EchoResponse send_response;
505   EchoResponse recv_response;
506   Status recv_status;
507   ClientContext cli_ctx;
508   ServerContext srv_ctx;
509   ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
510
511   send_request.set_message(GetParam().message_content);
512   std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
513       stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
514
515   service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
516                                  tag(2));
517
518   Verifier().Expect(2, true).Expect(1, true).Verify(cq_.get());
519
520   cli_stream->Write(send_request, tag(3));
521   srv_stream.Read(&recv_request, tag(4));
522   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
523   EXPECT_EQ(send_request.message(), recv_request.message());
524
525   cli_stream->Write(send_request, tag(5));
526   srv_stream.Read(&recv_request, tag(6));
527   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
528
529   EXPECT_EQ(send_request.message(), recv_request.message());
530   cli_stream->WritesDone(tag(7));
531   srv_stream.Read(&recv_request, tag(8));
532   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
533
534   send_response.set_message(recv_request.message());
535   srv_stream.Finish(send_response, Status::OK, tag(9));
536   cli_stream->Finish(&recv_status, tag(10));
537   Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
538
539   EXPECT_EQ(send_response.message(), recv_response.message());
540   EXPECT_TRUE(recv_status.ok());
541 }
542
543 // Two pings and a final pong.
544 TEST_P(AsyncEnd2endTest, SimpleClientStreamingWithCoalescingApi) {
545   ResetStub();
546
547   EchoRequest send_request;
548   EchoRequest recv_request;
549   EchoResponse send_response;
550   EchoResponse recv_response;
551   Status recv_status;
552   ClientContext cli_ctx;
553   ServerContext srv_ctx;
554   ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
555
556   send_request.set_message(GetParam().message_content);
557   cli_ctx.set_initial_metadata_corked(true);
558   // tag:1 never comes up since no op is performed
559   std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
560       stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
561
562   service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
563                                  tag(2));
564
565   cli_stream->Write(send_request, tag(3));
566
567   bool seen3 = false;
568
569   Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
570
571   srv_stream.Read(&recv_request, tag(4));
572
573   Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
574
575   EXPECT_EQ(send_request.message(), recv_request.message());
576
577   cli_stream->WriteLast(send_request, WriteOptions(), tag(5));
578   srv_stream.Read(&recv_request, tag(6));
579   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
580   EXPECT_EQ(send_request.message(), recv_request.message());
581
582   srv_stream.Read(&recv_request, tag(7));
583   Verifier().Expect(7, false).Verify(cq_.get());
584
585   send_response.set_message(recv_request.message());
586   srv_stream.Finish(send_response, Status::OK, tag(8));
587   cli_stream->Finish(&recv_status, tag(9));
588   Verifier().Expect(8, true).Expect(9, true).Verify(cq_.get());
589
590   EXPECT_EQ(send_response.message(), recv_response.message());
591   EXPECT_TRUE(recv_status.ok());
592 }
593
594 // One ping, two pongs.
595 TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
596   ResetStub();
597
598   EchoRequest send_request;
599   EchoRequest recv_request;
600   EchoResponse send_response;
601   EchoResponse recv_response;
602   Status recv_status;
603   ClientContext cli_ctx;
604   ServerContext srv_ctx;
605   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
606
607   send_request.set_message(GetParam().message_content);
608   std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
609       stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
610
611   service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
612                                   cq_.get(), cq_.get(), tag(2));
613
614   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
615   EXPECT_EQ(send_request.message(), recv_request.message());
616
617   send_response.set_message(recv_request.message());
618   srv_stream.Write(send_response, tag(3));
619   cli_stream->Read(&recv_response, tag(4));
620   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
621   EXPECT_EQ(send_response.message(), recv_response.message());
622
623   srv_stream.Write(send_response, tag(5));
624   cli_stream->Read(&recv_response, tag(6));
625   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
626   EXPECT_EQ(send_response.message(), recv_response.message());
627
628   srv_stream.Finish(Status::OK, tag(7));
629   cli_stream->Read(&recv_response, tag(8));
630   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
631
632   cli_stream->Finish(&recv_status, tag(9));
633   Verifier().Expect(9, true).Verify(cq_.get());
634
635   EXPECT_TRUE(recv_status.ok());
636 }
637
638 // One ping, two pongs. Using WriteAndFinish API
639 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWAF) {
640   ResetStub();
641
642   EchoRequest send_request;
643   EchoRequest recv_request;
644   EchoResponse send_response;
645   EchoResponse recv_response;
646   Status recv_status;
647   ClientContext cli_ctx;
648   ServerContext srv_ctx;
649   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
650
651   send_request.set_message(GetParam().message_content);
652   std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
653       stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
654
655   service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
656                                   cq_.get(), cq_.get(), tag(2));
657
658   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
659   EXPECT_EQ(send_request.message(), recv_request.message());
660
661   send_response.set_message(recv_request.message());
662   srv_stream.Write(send_response, tag(3));
663   cli_stream->Read(&recv_response, tag(4));
664   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
665   EXPECT_EQ(send_response.message(), recv_response.message());
666
667   srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(5));
668   cli_stream->Read(&recv_response, tag(6));
669   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
670   EXPECT_EQ(send_response.message(), recv_response.message());
671
672   cli_stream->Read(&recv_response, tag(7));
673   Verifier().Expect(7, false).Verify(cq_.get());
674
675   cli_stream->Finish(&recv_status, tag(8));
676   Verifier().Expect(8, true).Verify(cq_.get());
677
678   EXPECT_TRUE(recv_status.ok());
679 }
680
681 // One ping, two pongs. Using WriteLast API
682 TEST_P(AsyncEnd2endTest, SimpleServerStreamingWithCoalescingApiWL) {
683   ResetStub();
684
685   EchoRequest send_request;
686   EchoRequest recv_request;
687   EchoResponse send_response;
688   EchoResponse recv_response;
689   Status recv_status;
690   ClientContext cli_ctx;
691   ServerContext srv_ctx;
692   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
693
694   send_request.set_message(GetParam().message_content);
695   std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
696       stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
697
698   service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
699                                   cq_.get(), cq_.get(), tag(2));
700
701   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
702   EXPECT_EQ(send_request.message(), recv_request.message());
703
704   send_response.set_message(recv_request.message());
705   srv_stream.Write(send_response, tag(3));
706   cli_stream->Read(&recv_response, tag(4));
707   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
708   EXPECT_EQ(send_response.message(), recv_response.message());
709
710   srv_stream.WriteLast(send_response, WriteOptions(), tag(5));
711   cli_stream->Read(&recv_response, tag(6));
712   srv_stream.Finish(Status::OK, tag(7));
713   Verifier().Expect(5, true).Expect(6, true).Expect(7, true).Verify(cq_.get());
714   EXPECT_EQ(send_response.message(), recv_response.message());
715
716   cli_stream->Read(&recv_response, tag(8));
717   Verifier().Expect(8, false).Verify(cq_.get());
718
719   cli_stream->Finish(&recv_status, tag(9));
720   Verifier().Expect(9, true).Verify(cq_.get());
721
722   EXPECT_TRUE(recv_status.ok());
723 }
724
725 // One ping, one pong.
726 TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
727   ResetStub();
728
729   EchoRequest send_request;
730   EchoRequest recv_request;
731   EchoResponse send_response;
732   EchoResponse recv_response;
733   Status recv_status;
734   ClientContext cli_ctx;
735   ServerContext srv_ctx;
736   ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
737
738   send_request.set_message(GetParam().message_content);
739   std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
740       cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
741
742   service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
743                               tag(2));
744
745   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
746
747   cli_stream->Write(send_request, tag(3));
748   srv_stream.Read(&recv_request, tag(4));
749   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
750   EXPECT_EQ(send_request.message(), recv_request.message());
751
752   send_response.set_message(recv_request.message());
753   srv_stream.Write(send_response, tag(5));
754   cli_stream->Read(&recv_response, tag(6));
755   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
756   EXPECT_EQ(send_response.message(), recv_response.message());
757
758   cli_stream->WritesDone(tag(7));
759   srv_stream.Read(&recv_request, tag(8));
760   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
761
762   srv_stream.Finish(Status::OK, tag(9));
763   cli_stream->Finish(&recv_status, tag(10));
764   Verifier().Expect(9, true).Expect(10, true).Verify(cq_.get());
765
766   EXPECT_TRUE(recv_status.ok());
767 }
768
769 // One ping, one pong. Using server:WriteAndFinish api
770 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWAF) {
771   ResetStub();
772
773   EchoRequest send_request;
774   EchoRequest recv_request;
775   EchoResponse send_response;
776   EchoResponse recv_response;
777   Status recv_status;
778   ClientContext cli_ctx;
779   ServerContext srv_ctx;
780   ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
781
782   send_request.set_message(GetParam().message_content);
783   cli_ctx.set_initial_metadata_corked(true);
784   std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
785       cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
786
787   service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
788                               tag(2));
789
790   cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
791
792   bool seen3 = false;
793
794   Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
795
796   srv_stream.Read(&recv_request, tag(4));
797
798   Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
799   EXPECT_EQ(send_request.message(), recv_request.message());
800
801   srv_stream.Read(&recv_request, tag(5));
802   Verifier().Expect(5, false).Verify(cq_.get());
803
804   send_response.set_message(recv_request.message());
805   srv_stream.WriteAndFinish(send_response, WriteOptions(), Status::OK, tag(6));
806   cli_stream->Read(&recv_response, tag(7));
807   Verifier().Expect(6, true).Expect(7, true).Verify(cq_.get());
808   EXPECT_EQ(send_response.message(), recv_response.message());
809
810   cli_stream->Finish(&recv_status, tag(8));
811   Verifier().Expect(8, true).Verify(cq_.get());
812
813   EXPECT_TRUE(recv_status.ok());
814 }
815
816 // One ping, one pong. Using server:WriteLast api
817 TEST_P(AsyncEnd2endTest, SimpleBidiStreamingWithCoalescingApiWL) {
818   ResetStub();
819
820   EchoRequest send_request;
821   EchoRequest recv_request;
822   EchoResponse send_response;
823   EchoResponse recv_response;
824   Status recv_status;
825   ClientContext cli_ctx;
826   ServerContext srv_ctx;
827   ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
828
829   send_request.set_message(GetParam().message_content);
830   cli_ctx.set_initial_metadata_corked(true);
831   std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
832       cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
833
834   service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
835                               tag(2));
836
837   cli_stream->WriteLast(send_request, WriteOptions(), tag(3));
838
839   bool seen3 = false;
840
841   Verifier().Expect(2, true).ExpectMaybe(3, true, &seen3).Verify(cq_.get());
842
843   srv_stream.Read(&recv_request, tag(4));
844
845   Verifier().ExpectUnless(3, true, seen3).Expect(4, true).Verify(cq_.get());
846   EXPECT_EQ(send_request.message(), recv_request.message());
847
848   srv_stream.Read(&recv_request, tag(5));
849   Verifier().Expect(5, false).Verify(cq_.get());
850
851   send_response.set_message(recv_request.message());
852   srv_stream.WriteLast(send_response, WriteOptions(), tag(6));
853   srv_stream.Finish(Status::OK, tag(7));
854   cli_stream->Read(&recv_response, tag(8));
855   Verifier().Expect(6, true).Expect(7, true).Expect(8, true).Verify(cq_.get());
856   EXPECT_EQ(send_response.message(), recv_response.message());
857
858   cli_stream->Finish(&recv_status, tag(9));
859   Verifier().Expect(9, true).Verify(cq_.get());
860
861   EXPECT_TRUE(recv_status.ok());
862 }
863
864 // Metadata tests
865 TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
866   ResetStub();
867
868   EchoRequest send_request;
869   EchoRequest recv_request;
870   EchoResponse send_response;
871   EchoResponse recv_response;
872   Status recv_status;
873
874   ClientContext cli_ctx;
875   ServerContext srv_ctx;
876   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
877
878   send_request.set_message(GetParam().message_content);
879   std::pair<grpc::string, grpc::string> meta1("key1", "val1");
880   std::pair<grpc::string, grpc::string> meta2("key2", "val2");
881   std::pair<grpc::string, grpc::string> meta3("g.r.d-bin", "xyz");
882   cli_ctx.AddMetadata(meta1.first, meta1.second);
883   cli_ctx.AddMetadata(meta2.first, meta2.second);
884   cli_ctx.AddMetadata(meta3.first, meta3.second);
885
886   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
887       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
888   response_reader->Finish(&recv_response, &recv_status, tag(4));
889
890   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
891                         cq_.get(), tag(2));
892   Verifier().Expect(2, true).Verify(cq_.get());
893   EXPECT_EQ(send_request.message(), recv_request.message());
894   const auto& client_initial_metadata = srv_ctx.client_metadata();
895   EXPECT_EQ(meta1.second,
896             ToString(client_initial_metadata.find(meta1.first)->second));
897   EXPECT_EQ(meta2.second,
898             ToString(client_initial_metadata.find(meta2.first)->second));
899   EXPECT_EQ(meta3.second,
900             ToString(client_initial_metadata.find(meta3.first)->second));
901   EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
902
903   send_response.set_message(recv_request.message());
904   response_writer.Finish(send_response, Status::OK, tag(3));
905   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
906
907   EXPECT_EQ(send_response.message(), recv_response.message());
908   EXPECT_TRUE(recv_status.ok());
909 }
910
911 TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
912   ResetStub();
913
914   EchoRequest send_request;
915   EchoRequest recv_request;
916   EchoResponse send_response;
917   EchoResponse recv_response;
918   Status recv_status;
919
920   ClientContext cli_ctx;
921   ServerContext srv_ctx;
922   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
923
924   send_request.set_message(GetParam().message_content);
925   std::pair<grpc::string, grpc::string> meta1("key1", "val1");
926   std::pair<grpc::string, grpc::string> meta2("key2", "val2");
927
928   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
929       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
930   response_reader->ReadInitialMetadata(tag(4));
931
932   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
933                         cq_.get(), tag(2));
934   Verifier().Expect(2, true).Verify(cq_.get());
935   EXPECT_EQ(send_request.message(), recv_request.message());
936   srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
937   srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
938   response_writer.SendInitialMetadata(tag(3));
939   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
940   const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
941   EXPECT_EQ(meta1.second,
942             ToString(server_initial_metadata.find(meta1.first)->second));
943   EXPECT_EQ(meta2.second,
944             ToString(server_initial_metadata.find(meta2.first)->second));
945   EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
946
947   send_response.set_message(recv_request.message());
948   response_writer.Finish(send_response, Status::OK, tag(5));
949   response_reader->Finish(&recv_response, &recv_status, tag(6));
950   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
951
952   EXPECT_EQ(send_response.message(), recv_response.message());
953   EXPECT_TRUE(recv_status.ok());
954 }
955
956 // 1 ping, 2 pongs.
957 TEST_P(AsyncEnd2endTest, ServerInitialMetadataServerStreaming) {
958   ResetStub();
959   EchoRequest send_request;
960   EchoRequest recv_request;
961   EchoResponse send_response;
962   EchoResponse recv_response;
963   Status recv_status;
964   ClientContext cli_ctx;
965   ServerContext srv_ctx;
966   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
967
968   std::pair<::grpc::string, ::grpc::string> meta1("key1", "val1");
969   std::pair<::grpc::string, ::grpc::string> meta2("key2", "val2");
970
971   std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
972       stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
973   cli_stream->ReadInitialMetadata(tag(11));
974   service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
975                                   cq_.get(), cq_.get(), tag(2));
976
977   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
978
979   srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
980   srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
981   srv_stream.SendInitialMetadata(tag(10));
982   Verifier().Expect(10, true).Expect(11, true).Verify(cq_.get());
983   auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
984   EXPECT_EQ(meta1.second,
985             ToString(server_initial_metadata.find(meta1.first)->second));
986   EXPECT_EQ(meta2.second,
987             ToString(server_initial_metadata.find(meta2.first)->second));
988   EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
989
990   srv_stream.Write(send_response, tag(3));
991
992   cli_stream->Read(&recv_response, tag(4));
993   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
994
995   srv_stream.Write(send_response, tag(5));
996   cli_stream->Read(&recv_response, tag(6));
997   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
998
999   srv_stream.Finish(Status::OK, tag(7));
1000   cli_stream->Read(&recv_response, tag(8));
1001   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
1002
1003   cli_stream->Finish(&recv_status, tag(9));
1004   Verifier().Expect(9, true).Verify(cq_.get());
1005
1006   EXPECT_TRUE(recv_status.ok());
1007 }
1008
1009 // 1 ping, 2 pongs.
1010 // Test for server initial metadata being sent implicitly
1011 TEST_P(AsyncEnd2endTest, ServerInitialMetadataServerStreamingImplicit) {
1012   ResetStub();
1013   EchoRequest send_request;
1014   EchoRequest recv_request;
1015   EchoResponse send_response;
1016   EchoResponse recv_response;
1017   Status recv_status;
1018   ClientContext cli_ctx;
1019   ServerContext srv_ctx;
1020   ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1021
1022   send_request.set_message(GetParam().message_content);
1023   std::pair<::grpc::string, ::grpc::string> meta1("key1", "val1");
1024   std::pair<::grpc::string, ::grpc::string> meta2("key2", "val2");
1025
1026   std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1027       stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
1028   service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1029                                   cq_.get(), cq_.get(), tag(2));
1030
1031   Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1032   EXPECT_EQ(send_request.message(), recv_request.message());
1033
1034   srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
1035   srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
1036   send_response.set_message(recv_request.message());
1037   srv_stream.Write(send_response, tag(3));
1038
1039   cli_stream->Read(&recv_response, tag(4));
1040   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1041   EXPECT_EQ(send_response.message(), recv_response.message());
1042
1043   auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1044   EXPECT_EQ(meta1.second,
1045             ToString(server_initial_metadata.find(meta1.first)->second));
1046   EXPECT_EQ(meta2.second,
1047             ToString(server_initial_metadata.find(meta2.first)->second));
1048   EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
1049
1050   srv_stream.Write(send_response, tag(5));
1051   cli_stream->Read(&recv_response, tag(6));
1052   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1053
1054   srv_stream.Finish(Status::OK, tag(7));
1055   cli_stream->Read(&recv_response, tag(8));
1056   Verifier().Expect(7, true).Expect(8, false).Verify(cq_.get());
1057
1058   cli_stream->Finish(&recv_status, tag(9));
1059   Verifier().Expect(9, true).Verify(cq_.get());
1060
1061   EXPECT_TRUE(recv_status.ok());
1062 }
1063
1064 TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
1065   ResetStub();
1066
1067   EchoRequest send_request;
1068   EchoRequest recv_request;
1069   EchoResponse send_response;
1070   EchoResponse recv_response;
1071   Status recv_status;
1072
1073   ClientContext cli_ctx;
1074   ServerContext srv_ctx;
1075   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1076
1077   send_request.set_message(GetParam().message_content);
1078   std::pair<grpc::string, grpc::string> meta1("key1", "val1");
1079   std::pair<grpc::string, grpc::string> meta2("key2", "val2");
1080
1081   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1082       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1083   response_reader->Finish(&recv_response, &recv_status, tag(5));
1084
1085   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1086                         cq_.get(), tag(2));
1087   Verifier().Expect(2, true).Verify(cq_.get());
1088   EXPECT_EQ(send_request.message(), recv_request.message());
1089   response_writer.SendInitialMetadata(tag(3));
1090   Verifier().Expect(3, true).Verify(cq_.get());
1091
1092   send_response.set_message(recv_request.message());
1093   srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
1094   srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
1095   response_writer.Finish(send_response, Status::OK, tag(4));
1096
1097   Verifier().Expect(4, true).Expect(5, true).Verify(cq_.get());
1098
1099   EXPECT_EQ(send_response.message(), recv_response.message());
1100   EXPECT_TRUE(recv_status.ok());
1101   const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1102   EXPECT_EQ(meta1.second,
1103             ToString(server_trailing_metadata.find(meta1.first)->second));
1104   EXPECT_EQ(meta2.second,
1105             ToString(server_trailing_metadata.find(meta2.first)->second));
1106   EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
1107 }
1108
1109 TEST_P(AsyncEnd2endTest, MetadataRpc) {
1110   ResetStub();
1111
1112   EchoRequest send_request;
1113   EchoRequest recv_request;
1114   EchoResponse send_response;
1115   EchoResponse recv_response;
1116   Status recv_status;
1117
1118   ClientContext cli_ctx;
1119   ServerContext srv_ctx;
1120   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1121
1122   send_request.set_message(GetParam().message_content);
1123   std::pair<grpc::string, grpc::string> meta1("key1", "val1");
1124   std::pair<grpc::string, grpc::string> meta2(
1125       "key2-bin",
1126       grpc::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
1127   std::pair<grpc::string, grpc::string> meta3("key3", "val3");
1128   std::pair<grpc::string, grpc::string> meta6(
1129       "key4-bin",
1130       grpc::string("\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
1131                    14));
1132   std::pair<grpc::string, grpc::string> meta5("key5", "val5");
1133   std::pair<grpc::string, grpc::string> meta4(
1134       "key6-bin",
1135       grpc::string(
1136           "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
1137
1138   cli_ctx.AddMetadata(meta1.first, meta1.second);
1139   cli_ctx.AddMetadata(meta2.first, meta2.second);
1140
1141   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1142       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1143   response_reader->ReadInitialMetadata(tag(4));
1144
1145   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1146                         cq_.get(), tag(2));
1147   Verifier().Expect(2, true).Verify(cq_.get());
1148   EXPECT_EQ(send_request.message(), recv_request.message());
1149   const auto& client_initial_metadata = srv_ctx.client_metadata();
1150   EXPECT_EQ(meta1.second,
1151             ToString(client_initial_metadata.find(meta1.first)->second));
1152   EXPECT_EQ(meta2.second,
1153             ToString(client_initial_metadata.find(meta2.first)->second));
1154   EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
1155
1156   srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
1157   srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
1158   response_writer.SendInitialMetadata(tag(3));
1159   Verifier().Expect(3, true).Expect(4, true).Verify(cq_.get());
1160   const auto& server_initial_metadata = cli_ctx.GetServerInitialMetadata();
1161   EXPECT_EQ(meta3.second,
1162             ToString(server_initial_metadata.find(meta3.first)->second));
1163   EXPECT_EQ(meta4.second,
1164             ToString(server_initial_metadata.find(meta4.first)->second));
1165   EXPECT_GE(server_initial_metadata.size(), static_cast<size_t>(2));
1166
1167   send_response.set_message(recv_request.message());
1168   srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
1169   srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
1170   response_writer.Finish(send_response, Status::OK, tag(5));
1171   response_reader->Finish(&recv_response, &recv_status, tag(6));
1172
1173   Verifier().Expect(5, true).Expect(6, true).Verify(cq_.get());
1174
1175   EXPECT_EQ(send_response.message(), recv_response.message());
1176   EXPECT_TRUE(recv_status.ok());
1177   const auto& server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
1178   EXPECT_EQ(meta5.second,
1179             ToString(server_trailing_metadata.find(meta5.first)->second));
1180   EXPECT_EQ(meta6.second,
1181             ToString(server_trailing_metadata.find(meta6.first)->second));
1182   EXPECT_GE(server_trailing_metadata.size(), static_cast<size_t>(2));
1183 }
1184
1185 // Server uses AsyncNotifyWhenDone API to check for cancellation
1186 TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
1187   ResetStub();
1188
1189   EchoRequest send_request;
1190   EchoRequest recv_request;
1191   EchoResponse send_response;
1192   EchoResponse recv_response;
1193   Status recv_status;
1194
1195   ClientContext cli_ctx;
1196   ServerContext srv_ctx;
1197   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1198
1199   send_request.set_message(GetParam().message_content);
1200   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1201       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1202   response_reader->Finish(&recv_response, &recv_status, tag(4));
1203
1204   srv_ctx.AsyncNotifyWhenDone(tag(5));
1205   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1206                         cq_.get(), tag(2));
1207
1208   Verifier().Expect(2, true).Verify(cq_.get());
1209   EXPECT_EQ(send_request.message(), recv_request.message());
1210
1211   cli_ctx.TryCancel();
1212   Verifier().Expect(5, true).Expect(4, true).Verify(cq_.get());
1213   EXPECT_TRUE(srv_ctx.IsCancelled());
1214
1215   EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
1216 }
1217
1218 // Server uses AsyncNotifyWhenDone API to check for normal finish
1219 TEST_P(AsyncEnd2endTest, ServerCheckDone) {
1220   ResetStub();
1221
1222   EchoRequest send_request;
1223   EchoRequest recv_request;
1224   EchoResponse send_response;
1225   EchoResponse recv_response;
1226   Status recv_status;
1227
1228   ClientContext cli_ctx;
1229   ServerContext srv_ctx;
1230   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
1231
1232   send_request.set_message(GetParam().message_content);
1233   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1234       stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
1235   response_reader->Finish(&recv_response, &recv_status, tag(4));
1236
1237   srv_ctx.AsyncNotifyWhenDone(tag(5));
1238   service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
1239                         cq_.get(), tag(2));
1240
1241   Verifier().Expect(2, true).Verify(cq_.get());
1242   EXPECT_EQ(send_request.message(), recv_request.message());
1243
1244   send_response.set_message(recv_request.message());
1245   response_writer.Finish(send_response, Status::OK, tag(3));
1246   Verifier().Expect(3, true).Expect(4, true).Expect(5, true).Verify(cq_.get());
1247   EXPECT_FALSE(srv_ctx.IsCancelled());
1248
1249   EXPECT_EQ(send_response.message(), recv_response.message());
1250   EXPECT_TRUE(recv_status.ok());
1251 }
1252
1253 TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
1254   ChannelArguments args;
1255   const auto& channel_creds = GetCredentialsProvider()->GetChannelCredentials(
1256       GetParam().credentials_type, &args);
1257   std::shared_ptr<Channel> channel =
1258       !(GetParam().inproc)
1259           ? CreateCustomChannel(server_address_.str(), channel_creds, args)
1260           : server_->InProcessChannel(args);
1261   std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1262   stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
1263   EchoRequest send_request;
1264   EchoResponse recv_response;
1265   Status recv_status;
1266
1267   ClientContext cli_ctx;
1268   send_request.set_message(GetParam().message_content);
1269   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
1270       stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
1271
1272   response_reader->Finish(&recv_response, &recv_status, tag(4));
1273   Verifier().Expect(4, true).Verify(cq_.get());
1274
1275   EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
1276   EXPECT_EQ("", recv_status.error_message());
1277 }
1278
1279 // This class is for testing scenarios where RPCs are cancelled on the server
1280 // by calling ServerContext::TryCancel(). Server uses AsyncNotifyWhenDone
1281 // API to check for cancellation
1282 class AsyncEnd2endServerTryCancelTest : public AsyncEnd2endTest {
1283  protected:
1284   typedef enum {
1285     DO_NOT_CANCEL = 0,
1286     CANCEL_BEFORE_PROCESSING,
1287     CANCEL_DURING_PROCESSING,
1288     CANCEL_AFTER_PROCESSING
1289   } ServerTryCancelRequestPhase;
1290
1291   // Helper for testing client-streaming RPCs which are cancelled on the server.
1292   // Depending on the value of server_try_cancel parameter, this will test one
1293   // of the following three scenarios:
1294   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
1295   //   any messages from the client
1296   //
1297   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1298   //   messages from the client
1299   //
1300   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1301   //   messages from the client (but before sending any status back to the
1302   //   client)
1303   void TestClientStreamingServerCancel(
1304       ServerTryCancelRequestPhase server_try_cancel) {
1305     ResetStub();
1306
1307     EchoRequest recv_request;
1308     EchoResponse send_response;
1309     EchoResponse recv_response;
1310     Status recv_status;
1311
1312     ClientContext cli_ctx;
1313     ServerContext srv_ctx;
1314     ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1315
1316     // Initiate the 'RequestStream' call on client
1317     CompletionQueue cli_cq;
1318
1319     std::unique_ptr<ClientAsyncWriter<EchoRequest>> cli_stream(
1320         stub_->AsyncRequestStream(&cli_ctx, &recv_response, &cli_cq, tag(1)));
1321
1322     // On the server, request to be notified of 'RequestStream' calls
1323     // and receive the 'RequestStream' call just made by the client
1324     srv_ctx.AsyncNotifyWhenDone(tag(11));
1325     service_->RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1326                                    tag(2));
1327     std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1328     Verifier().Expect(2, true).Verify(cq_.get());
1329     t1.join();
1330
1331     bool expected_server_cq_result = true;
1332     bool expected_client_cq_result = true;
1333
1334     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1335       srv_ctx.TryCancel();
1336       Verifier().Expect(11, true).Verify(cq_.get());
1337       EXPECT_TRUE(srv_ctx.IsCancelled());
1338
1339       // Since cancellation is done before server reads any results, we know
1340       // for sure that all server cq results will return false from this
1341       // point forward
1342       expected_server_cq_result = false;
1343       expected_client_cq_result = false;
1344     }
1345
1346     bool ignore_client_cq_result =
1347         (server_try_cancel == CANCEL_DURING_PROCESSING) ||
1348         (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1349
1350     std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1351                             &ignore_client_cq_result] {
1352       EchoRequest send_request;
1353       // Client sends 3 messages (tags 3, 4 and 5)
1354       for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1355         send_request.set_message("Ping " + grpc::to_string(tag_idx));
1356         cli_stream->Write(send_request, tag(tag_idx));
1357         Verifier()
1358             .Expect(tag_idx, expected_client_cq_result)
1359             .Verify(&cli_cq, ignore_client_cq_result);
1360       }
1361       cli_stream->WritesDone(tag(6));
1362       // Ignore ok on WritesDone since cancel can affect it
1363       Verifier()
1364           .Expect(6, expected_client_cq_result)
1365           .Verify(&cli_cq, ignore_client_cq_result);
1366     });
1367
1368     bool ignore_cq_result = false;
1369     bool want_done_tag = false;
1370     std::thread* server_try_cancel_thd = nullptr;
1371
1372     auto verif = Verifier();
1373
1374     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1375       server_try_cancel_thd =
1376           new std::thread(&ServerContext::TryCancel, &srv_ctx);
1377       // Server will cancel the RPC in a parallel thread while reading the
1378       // requests from the client. Since the cancellation can happen at anytime,
1379       // some of the cq results (i.e those until cancellation) might be true but
1380       // its non deterministic. So better to ignore the cq results
1381       ignore_cq_result = true;
1382       // Expect that we might possibly see the done tag that
1383       // indicates cancellation completion in this case
1384       want_done_tag = true;
1385       verif.Expect(11, true);
1386     }
1387
1388     // Server reads 3 messages (tags 6, 7 and 8)
1389     // But if want_done_tag is true, we might also see tag 11
1390     for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1391       srv_stream.Read(&recv_request, tag(tag_idx));
1392       // Note that we'll add something to the verifier and verify that
1393       // something was seen, but it might be tag 11 and not what we
1394       // just added
1395       int got_tag = verif.Expect(tag_idx, expected_server_cq_result)
1396                         .Next(cq_.get(), ignore_cq_result);
1397       GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1398       if (got_tag == 11) {
1399         EXPECT_TRUE(srv_ctx.IsCancelled());
1400         want_done_tag = false;
1401         // Now get the other entry that we were waiting on
1402         EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1403       }
1404     }
1405
1406     cli_thread.join();
1407
1408     if (server_try_cancel_thd != nullptr) {
1409       server_try_cancel_thd->join();
1410       delete server_try_cancel_thd;
1411     }
1412
1413     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1414       srv_ctx.TryCancel();
1415       want_done_tag = true;
1416       verif.Expect(11, true);
1417     }
1418
1419     if (want_done_tag) {
1420       verif.Verify(cq_.get());
1421       EXPECT_TRUE(srv_ctx.IsCancelled());
1422       want_done_tag = false;
1423     }
1424
1425     // The RPC has been cancelled at this point for sure (i.e irrespective of
1426     // the value of `server_try_cancel` is). So, from this point forward, we
1427     // know that cq results are supposed to return false on server.
1428
1429     // Server sends the final message and cancelled status (but the RPC is
1430     // already cancelled at this point. So we expect the operation to fail)
1431     srv_stream.Finish(send_response, Status::CANCELLED, tag(9));
1432     Verifier().Expect(9, false).Verify(cq_.get());
1433
1434     // Client will see the cancellation
1435     cli_stream->Finish(&recv_status, tag(10));
1436     Verifier().Expect(10, true).Verify(&cli_cq);
1437     EXPECT_FALSE(recv_status.ok());
1438     EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1439
1440     cli_cq.Shutdown();
1441     void* dummy_tag;
1442     bool dummy_ok;
1443     while (cli_cq.Next(&dummy_tag, &dummy_ok)) {
1444     }
1445   }
1446
1447   // Helper for testing server-streaming RPCs which are cancelled on the server.
1448   // Depending on the value of server_try_cancel parameter, this will test one
1449   // of the following three scenarios:
1450   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before sending
1451   //   any messages to the client
1452   //
1453   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while sending
1454   //   messages to the client
1455   //
1456   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after sending all
1457   //   messages to the client (but before sending any status back to the
1458   //   client)
1459   void TestServerStreamingServerCancel(
1460       ServerTryCancelRequestPhase server_try_cancel) {
1461     ResetStub();
1462
1463     EchoRequest send_request;
1464     EchoRequest recv_request;
1465     EchoResponse send_response;
1466     Status recv_status;
1467     ClientContext cli_ctx;
1468     ServerContext srv_ctx;
1469     ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
1470
1471     send_request.set_message("Ping");
1472     // Initiate the 'ResponseStream' call on the client
1473     CompletionQueue cli_cq;
1474     std::unique_ptr<ClientAsyncReader<EchoResponse>> cli_stream(
1475         stub_->AsyncResponseStream(&cli_ctx, send_request, &cli_cq, tag(1)));
1476     // On the server, request to be notified of 'ResponseStream' calls and
1477     // receive the call just made by the client
1478     srv_ctx.AsyncNotifyWhenDone(tag(11));
1479     service_->RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
1480                                     cq_.get(), cq_.get(), tag(2));
1481
1482     std::thread t1([&cli_cq] { Verifier().Expect(1, true).Verify(&cli_cq); });
1483     Verifier().Expect(2, true).Verify(cq_.get());
1484     t1.join();
1485
1486     EXPECT_EQ(send_request.message(), recv_request.message());
1487
1488     bool expected_cq_result = true;
1489     bool ignore_cq_result = false;
1490     bool want_done_tag = false;
1491     bool expected_client_cq_result = true;
1492     bool ignore_client_cq_result =
1493         (server_try_cancel != CANCEL_BEFORE_PROCESSING);
1494
1495     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1496       srv_ctx.TryCancel();
1497       Verifier().Expect(11, true).Verify(cq_.get());
1498       EXPECT_TRUE(srv_ctx.IsCancelled());
1499
1500       // We know for sure that all cq results will be false from this point
1501       // since the server cancelled the RPC
1502       expected_cq_result = false;
1503       expected_client_cq_result = false;
1504     }
1505
1506     std::thread cli_thread([&cli_cq, &cli_stream, &expected_client_cq_result,
1507                             &ignore_client_cq_result] {
1508       // Client attempts to read the three messages from the server
1509       for (int tag_idx = 6; tag_idx <= 8; tag_idx++) {
1510         EchoResponse recv_response;
1511         cli_stream->Read(&recv_response, tag(tag_idx));
1512         Verifier()
1513             .Expect(tag_idx, expected_client_cq_result)
1514             .Verify(&cli_cq, ignore_client_cq_result);
1515       }
1516     });
1517
1518     std::thread* server_try_cancel_thd = nullptr;
1519
1520     auto verif = Verifier();
1521
1522     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1523       server_try_cancel_thd =
1524           new std::thread(&ServerContext::TryCancel, &srv_ctx);
1525
1526       // Server will cancel the RPC in a parallel thread while writing responses
1527       // to the client. Since the cancellation can happen at anytime, some of
1528       // the cq results (i.e those until cancellation) might be true but it is
1529       // non deterministic. So better to ignore the cq results
1530       ignore_cq_result = true;
1531       // Expect that we might possibly see the done tag that
1532       // indicates cancellation completion in this case
1533       want_done_tag = true;
1534       verif.Expect(11, true);
1535     }
1536
1537     // Server sends three messages (tags 3, 4 and 5)
1538     // But if want_done tag is true, we might also see tag 11
1539     for (int tag_idx = 3; tag_idx <= 5; tag_idx++) {
1540       send_response.set_message("Pong " + grpc::to_string(tag_idx));
1541       srv_stream.Write(send_response, tag(tag_idx));
1542       // Note that we'll add something to the verifier and verify that
1543       // something was seen, but it might be tag 11 and not what we
1544       // just added
1545       int got_tag = verif.Expect(tag_idx, expected_cq_result)
1546                         .Next(cq_.get(), ignore_cq_result);
1547       GPR_ASSERT((got_tag == tag_idx) || (got_tag == 11 && want_done_tag));
1548       if (got_tag == 11) {
1549         EXPECT_TRUE(srv_ctx.IsCancelled());
1550         want_done_tag = false;
1551         // Now get the other entry that we were waiting on
1552         EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), tag_idx);
1553       }
1554     }
1555
1556     if (server_try_cancel_thd != nullptr) {
1557       server_try_cancel_thd->join();
1558       delete server_try_cancel_thd;
1559     }
1560
1561     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1562       srv_ctx.TryCancel();
1563       want_done_tag = true;
1564       verif.Expect(11, true);
1565     }
1566
1567     if (want_done_tag) {
1568       verif.Verify(cq_.get());
1569       EXPECT_TRUE(srv_ctx.IsCancelled());
1570       want_done_tag = false;
1571     }
1572
1573     cli_thread.join();
1574
1575     // The RPC has been cancelled at this point for sure (i.e irrespective of
1576     // the value of `server_try_cancel` is). So, from this point forward, we
1577     // know that cq results are supposed to return false on server.
1578
1579     // Server finishes the stream (but the RPC is already cancelled)
1580     srv_stream.Finish(Status::CANCELLED, tag(9));
1581     Verifier().Expect(9, false).Verify(cq_.get());
1582
1583     // Client will see the cancellation
1584     cli_stream->Finish(&recv_status, tag(10));
1585     Verifier().Expect(10, true).Verify(&cli_cq);
1586     EXPECT_FALSE(recv_status.ok());
1587     EXPECT_EQ(::grpc::StatusCode::CANCELLED, recv_status.error_code());
1588
1589     cli_cq.Shutdown();
1590     void* dummy_tag;
1591     bool dummy_ok;
1592     while (cli_cq.Next(&dummy_tag, &dummy_ok)) {
1593     }
1594   }
1595
1596   // Helper for testing bidirectinal-streaming RPCs which are cancelled on the
1597   // server.
1598   //
1599   // Depending on the value of server_try_cancel parameter, this will
1600   // test one of the following three scenarios:
1601   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
1602   //   writing any messages from/to the client
1603   //
1604   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
1605   //   messages from the client
1606   //
1607   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
1608   //   messages from the client (but before sending any status back to the
1609   //   client)
1610   void TestBidiStreamingServerCancel(
1611       ServerTryCancelRequestPhase server_try_cancel) {
1612     ResetStub();
1613
1614     EchoRequest send_request;
1615     EchoRequest recv_request;
1616     EchoResponse send_response;
1617     EchoResponse recv_response;
1618     Status recv_status;
1619     ClientContext cli_ctx;
1620     ServerContext srv_ctx;
1621     ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
1622
1623     // Initiate the call from the client side
1624     std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
1625         cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
1626
1627     // On the server, request to be notified of the 'BidiStream' call and
1628     // receive the call just made by the client
1629     srv_ctx.AsyncNotifyWhenDone(tag(11));
1630     service_->RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
1631                                 tag(2));
1632     Verifier().Expect(1, true).Expect(2, true).Verify(cq_.get());
1633
1634     auto verif = Verifier();
1635
1636     // Client sends the first and the only message
1637     send_request.set_message("Ping");
1638     cli_stream->Write(send_request, tag(3));
1639     verif.Expect(3, true);
1640
1641     bool expected_cq_result = true;
1642     bool ignore_cq_result = false;
1643     bool want_done_tag = false;
1644
1645     int got_tag, got_tag2;
1646     bool tag_3_done = false;
1647
1648     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
1649       srv_ctx.TryCancel();
1650       verif.Expect(11, true);
1651       // We know for sure that all server cq results will be false from
1652       // this point since the server cancelled the RPC. However, we can't
1653       // say for sure about the client
1654       expected_cq_result = false;
1655       ignore_cq_result = true;
1656
1657       do {
1658         got_tag = verif.Next(cq_.get(), ignore_cq_result);
1659         GPR_ASSERT(((got_tag == 3) && !tag_3_done) || (got_tag == 11));
1660         if (got_tag == 3) {
1661           tag_3_done = true;
1662         }
1663       } while (got_tag != 11);
1664       EXPECT_TRUE(srv_ctx.IsCancelled());
1665     }
1666
1667     std::thread* server_try_cancel_thd = nullptr;
1668
1669     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
1670       server_try_cancel_thd =
1671           new std::thread(&ServerContext::TryCancel, &srv_ctx);
1672
1673       // Since server is going to cancel the RPC in a parallel thread, some of
1674       // the cq results (i.e those until the cancellation) might be true. Since
1675       // that number is non-deterministic, it is better to ignore the cq results
1676       ignore_cq_result = true;
1677       // Expect that we might possibly see the done tag that
1678       // indicates cancellation completion in this case
1679       want_done_tag = true;
1680       verif.Expect(11, true);
1681     }
1682
1683     srv_stream.Read(&recv_request, tag(4));
1684     verif.Expect(4, expected_cq_result);
1685     got_tag = tag_3_done ? 3 : verif.Next(cq_.get(), ignore_cq_result);
1686     got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1687     GPR_ASSERT((got_tag == 3) || (got_tag == 4) ||
1688                (got_tag == 11 && want_done_tag));
1689     GPR_ASSERT((got_tag2 == 3) || (got_tag2 == 4) ||
1690                (got_tag2 == 11 && want_done_tag));
1691     // If we get 3 and 4, we don't need to wait for 11, but if
1692     // we get 11, we should also clear 3 and 4
1693     if (got_tag + got_tag2 != 7) {
1694       EXPECT_TRUE(srv_ctx.IsCancelled());
1695       want_done_tag = false;
1696       got_tag = verif.Next(cq_.get(), ignore_cq_result);
1697       GPR_ASSERT((got_tag == 3) || (got_tag == 4));
1698     }
1699
1700     send_response.set_message("Pong");
1701     srv_stream.Write(send_response, tag(5));
1702     verif.Expect(5, expected_cq_result);
1703
1704     cli_stream->Read(&recv_response, tag(6));
1705     verif.Expect(6, expected_cq_result);
1706     got_tag = verif.Next(cq_.get(), ignore_cq_result);
1707     got_tag2 = verif.Next(cq_.get(), ignore_cq_result);
1708     GPR_ASSERT((got_tag == 5) || (got_tag == 6) ||
1709                (got_tag == 11 && want_done_tag));
1710     GPR_ASSERT((got_tag2 == 5) || (got_tag2 == 6) ||
1711                (got_tag2 == 11 && want_done_tag));
1712     // If we get 5 and 6, we don't need to wait for 11, but if
1713     // we get 11, we should also clear 5 and 6
1714     if (got_tag + got_tag2 != 11) {
1715       EXPECT_TRUE(srv_ctx.IsCancelled());
1716       want_done_tag = false;
1717       got_tag = verif.Next(cq_.get(), ignore_cq_result);
1718       GPR_ASSERT((got_tag == 5) || (got_tag == 6));
1719     }
1720
1721     // This is expected to succeed in all cases
1722     cli_stream->WritesDone(tag(7));
1723     verif.Expect(7, true);
1724     // TODO(vjpai): Consider whether the following is too flexible
1725     // or whether it should just be reset to ignore_cq_result
1726     bool ignore_cq_wd_result =
1727         ignore_cq_result || (server_try_cancel == CANCEL_BEFORE_PROCESSING);
1728     got_tag = verif.Next(cq_.get(), ignore_cq_wd_result);
1729     GPR_ASSERT((got_tag == 7) || (got_tag == 11 && want_done_tag));
1730     if (got_tag == 11) {
1731       EXPECT_TRUE(srv_ctx.IsCancelled());
1732       want_done_tag = false;
1733       // Now get the other entry that we were waiting on
1734       EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_wd_result), 7);
1735     }
1736
1737     // This is expected to fail in all cases i.e for all values of
1738     // server_try_cancel. This is because at this point, either there are no
1739     // more msgs from the client (because client called WritesDone) or the RPC
1740     // is cancelled on the server
1741     srv_stream.Read(&recv_request, tag(8));
1742     verif.Expect(8, false);
1743     got_tag = verif.Next(cq_.get(), ignore_cq_result);
1744     GPR_ASSERT((got_tag == 8) || (got_tag == 11 && want_done_tag));
1745     if (got_tag == 11) {
1746       EXPECT_TRUE(srv_ctx.IsCancelled());
1747       want_done_tag = false;
1748       // Now get the other entry that we were waiting on
1749       EXPECT_EQ(verif.Next(cq_.get(), ignore_cq_result), 8);
1750     }
1751
1752     if (server_try_cancel_thd != nullptr) {
1753       server_try_cancel_thd->join();
1754       delete server_try_cancel_thd;
1755     }
1756
1757     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
1758       srv_ctx.TryCancel();
1759       want_done_tag = true;
1760       verif.Expect(11, true);
1761     }
1762
1763     if (want_done_tag) {
1764       verif.Verify(cq_.get());
1765       EXPECT_TRUE(srv_ctx.IsCancelled());
1766       want_done_tag = false;
1767     }
1768
1769     // The RPC has been cancelled at this point for sure (i.e irrespective of
1770     // the value of `server_try_cancel` is). So, from this point forward, we
1771     // know that cq results are supposed to return false on server.
1772
1773     srv_stream.Finish(Status::CANCELLED, tag(9));
1774     Verifier().Expect(9, false).Verify(cq_.get());
1775
1776     cli_stream->Finish(&recv_status, tag(10));
1777     Verifier().Expect(10, true).Verify(cq_.get());
1778     EXPECT_FALSE(recv_status.ok());
1779     EXPECT_EQ(grpc::StatusCode::CANCELLED, recv_status.error_code());
1780   }
1781 };
1782
1783 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelBefore) {
1784   TestClientStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1785 }
1786
1787 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelDuring) {
1788   TestClientStreamingServerCancel(CANCEL_DURING_PROCESSING);
1789 }
1790
1791 TEST_P(AsyncEnd2endServerTryCancelTest, ClientStreamingServerTryCancelAfter) {
1792   TestClientStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1793 }
1794
1795 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelBefore) {
1796   TestServerStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1797 }
1798
1799 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelDuring) {
1800   TestServerStreamingServerCancel(CANCEL_DURING_PROCESSING);
1801 }
1802
1803 TEST_P(AsyncEnd2endServerTryCancelTest, ServerStreamingServerTryCancelAfter) {
1804   TestServerStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1805 }
1806
1807 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelBefore) {
1808   TestBidiStreamingServerCancel(CANCEL_BEFORE_PROCESSING);
1809 }
1810
1811 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelDuring) {
1812   TestBidiStreamingServerCancel(CANCEL_DURING_PROCESSING);
1813 }
1814
1815 TEST_P(AsyncEnd2endServerTryCancelTest, ServerBidiStreamingTryCancelAfter) {
1816   TestBidiStreamingServerCancel(CANCEL_AFTER_PROCESSING);
1817 }
1818
1819 std::vector<TestScenario> CreateTestScenarios(bool test_secure,
1820                                               bool test_message_size_limit) {
1821   std::vector<TestScenario> scenarios;
1822   std::vector<grpc::string> credentials_types;
1823   std::vector<grpc::string> messages;
1824
1825   auto insec_ok = [] {
1826     // Only allow insecure credentials type when it is registered with the
1827     // provider. User may create providers that do not have insecure.
1828     return GetCredentialsProvider()->GetChannelCredentials(
1829                kInsecureCredentialsType, nullptr) != nullptr;
1830   };
1831
1832   if (insec_ok()) {
1833     credentials_types.push_back(kInsecureCredentialsType);
1834   }
1835   auto sec_list = GetCredentialsProvider()->GetSecureCredentialsTypeList();
1836   for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) {
1837     credentials_types.push_back(*sec);
1838   }
1839   GPR_ASSERT(!credentials_types.empty());
1840
1841   messages.push_back("Hello");
1842   if (test_message_size_limit) {
1843     for (size_t k = 1; k < GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH / 1024;
1844          k *= 32) {
1845       grpc::string big_msg;
1846       for (size_t i = 0; i < k * 1024; ++i) {
1847         char c = 'a' + (i % 26);
1848         big_msg += c;
1849       }
1850       messages.push_back(big_msg);
1851     }
1852     messages.push_back(
1853         grpc::string(GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH - 10, 'a'));
1854   }
1855
1856   // TODO (sreek) Renable tests with health check service after the issue
1857   // https://github.com/grpc/grpc/issues/11223 is resolved
1858   for (auto health_check_service : {false}) {
1859     for (auto msg = messages.begin(); msg != messages.end(); msg++) {
1860       for (auto cred = credentials_types.begin();
1861            cred != credentials_types.end(); ++cred) {
1862         scenarios.emplace_back(false, *cred, health_check_service, *msg);
1863       }
1864       if (insec_ok()) {
1865         scenarios.emplace_back(true, kInsecureCredentialsType,
1866                                health_check_service, *msg);
1867       }
1868     }
1869   }
1870   return scenarios;
1871 }
1872
1873 INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
1874                         ::testing::ValuesIn(CreateTestScenarios(true, true)));
1875 INSTANTIATE_TEST_CASE_P(AsyncEnd2endServerTryCancel,
1876                         AsyncEnd2endServerTryCancelTest,
1877                         ::testing::ValuesIn(CreateTestScenarios(false, false)));
1878
1879 }  // namespace
1880 }  // namespace testing
1881 }  // namespace grpc
1882
1883 int main(int argc, char** argv) {
1884   // Change the backup poll interval from 5s to 100ms to speed up the
1885   // ReconnectChannel test
1886   gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "100");
1887   grpc::testing::TestEnvironment env(argc, argv);
1888   ::testing::InitGoogleTest(&argc, argv);
1889   int ret = RUN_ALL_TESTS();
1890   return ret;
1891 }