ede24f333ba7556212c79822f9d6a17d5a74cc3c
[platform/upstream/grpc.git] / test / cpp / end2end / client_callback_end2end_test.cc
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <algorithm>
20 #include <functional>
21 #include <mutex>
22 #include <sstream>
23 #include <thread>
24
25 #include <grpcpp/channel.h>
26 #include <grpcpp/client_context.h>
27 #include <grpcpp/create_channel.h>
28 #include <grpcpp/generic/generic_stub.h>
29 #include <grpcpp/impl/codegen/proto_utils.h>
30 #include <grpcpp/server.h>
31 #include <grpcpp/server_builder.h>
32 #include <grpcpp/server_context.h>
33 #include <grpcpp/support/client_callback.h>
34
35 #include "src/core/lib/iomgr/iomgr.h"
36 #include "src/proto/grpc/testing/echo.grpc.pb.h"
37 #include "test/core/util/port.h"
38 #include "test/core/util/test_config.h"
39 #include "test/cpp/end2end/interceptors_util.h"
40 #include "test/cpp/end2end/test_service_impl.h"
41 #include "test/cpp/util/byte_buffer_proto_helper.h"
42 #include "test/cpp/util/string_ref_helper.h"
43 #include "test/cpp/util/test_credentials_provider.h"
44
45 #include <gtest/gtest.h>
46
47 // MAYBE_SKIP_TEST is a macro to determine if this particular test configuration
48 // should be skipped based on a decision made at SetUp time. In particular, any
49 // callback tests can only be run if the iomgr can run in the background or if
50 // the transport is in-process.
51 #define MAYBE_SKIP_TEST \
52   do {                  \
53     if (do_not_test_) { \
54       return;           \
55     }                   \
56   } while (0)
57
58 namespace grpc {
59 namespace testing {
60 namespace {
61
62 enum class Protocol { INPROC, TCP };
63
64 class TestScenario {
65  public:
66   TestScenario(bool serve_callback, Protocol protocol, bool intercept,
67                const grpc::string& creds_type)
68       : callback_server(serve_callback),
69         protocol(protocol),
70         use_interceptors(intercept),
71         credentials_type(creds_type) {}
72   void Log() const;
73   bool callback_server;
74   Protocol protocol;
75   bool use_interceptors;
76   const grpc::string credentials_type;
77 };
78
79 static std::ostream& operator<<(std::ostream& out,
80                                 const TestScenario& scenario) {
81   return out << "TestScenario{callback_server="
82              << (scenario.callback_server ? "true" : "false") << ",protocol="
83              << (scenario.protocol == Protocol::INPROC ? "INPROC" : "TCP")
84              << ",intercept=" << (scenario.use_interceptors ? "true" : "false")
85              << ",creds=" << scenario.credentials_type << "}";
86 }
87
88 void TestScenario::Log() const {
89   std::ostringstream out;
90   out << *this;
91   gpr_log(GPR_DEBUG, "%s", out.str().c_str());
92 }
93
94 class ClientCallbackEnd2endTest
95     : public ::testing::TestWithParam<TestScenario> {
96  protected:
97   ClientCallbackEnd2endTest() { GetParam().Log(); }
98
99   void SetUp() override {
100     ServerBuilder builder;
101
102     auto server_creds = GetCredentialsProvider()->GetServerCredentials(
103         GetParam().credentials_type);
104     // TODO(vjpai): Support testing of AuthMetadataProcessor
105
106     if (GetParam().protocol == Protocol::TCP) {
107       if (!grpc_iomgr_run_in_background()) {
108         do_not_test_ = true;
109         return;
110       }
111       picked_port_ = grpc_pick_unused_port_or_die();
112       server_address_ << "localhost:" << picked_port_;
113       builder.AddListeningPort(server_address_.str(), server_creds);
114     }
115     if (!GetParam().callback_server) {
116       builder.RegisterService(&service_);
117     } else {
118       builder.RegisterService(&callback_service_);
119     }
120
121     if (GetParam().use_interceptors) {
122       std::vector<
123           std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
124           creators;
125       // Add 20 dummy server interceptors
126       creators.reserve(20);
127       for (auto i = 0; i < 20; i++) {
128         creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
129             new DummyInterceptorFactory()));
130       }
131       builder.experimental().SetInterceptorCreators(std::move(creators));
132     }
133
134     server_ = builder.BuildAndStart();
135     is_server_started_ = true;
136   }
137
138   void ResetStub() {
139     ChannelArguments args;
140     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
141         GetParam().credentials_type, &args);
142     switch (GetParam().protocol) {
143       case Protocol::TCP:
144         if (!GetParam().use_interceptors) {
145           channel_ = ::grpc::CreateCustomChannel(server_address_.str(),
146                                                  channel_creds, args);
147         } else {
148           channel_ = CreateCustomChannelWithInterceptors(
149               server_address_.str(), channel_creds, args,
150               CreateDummyClientInterceptors());
151         }
152         break;
153       case Protocol::INPROC:
154         if (!GetParam().use_interceptors) {
155           channel_ = server_->InProcessChannel(args);
156         } else {
157           channel_ = server_->experimental().InProcessChannelWithInterceptors(
158               args, CreateDummyClientInterceptors());
159         }
160         break;
161       default:
162         assert(false);
163     }
164     stub_ = grpc::testing::EchoTestService::NewStub(channel_);
165     generic_stub_.reset(new GenericStub(channel_));
166     DummyInterceptor::Reset();
167   }
168
169   void TearDown() override {
170     if (is_server_started_) {
171       server_->Shutdown();
172     }
173     if (picked_port_ > 0) {
174       grpc_recycle_unused_port(picked_port_);
175     }
176   }
177
178   void SendRpcs(int num_rpcs, bool with_binary_metadata) {
179     grpc::string test_string("");
180     for (int i = 0; i < num_rpcs; i++) {
181       EchoRequest request;
182       EchoResponse response;
183       ClientContext cli_ctx;
184
185       test_string += "Hello world. ";
186       request.set_message(test_string);
187       grpc::string val;
188       if (with_binary_metadata) {
189         request.mutable_param()->set_echo_metadata(true);
190         char bytes[8] = {'\0', '\1', '\2', '\3',
191                          '\4', '\5', '\6', static_cast<char>(i)};
192         val = grpc::string(bytes, 8);
193         cli_ctx.AddMetadata("custom-bin", val);
194       }
195
196       cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
197
198       std::mutex mu;
199       std::condition_variable cv;
200       bool done = false;
201       stub_->experimental_async()->Echo(
202           &cli_ctx, &request, &response,
203           [&cli_ctx, &request, &response, &done, &mu, &cv, val,
204            with_binary_metadata](Status s) {
205             GPR_ASSERT(s.ok());
206
207             EXPECT_EQ(request.message(), response.message());
208             if (with_binary_metadata) {
209               EXPECT_EQ(
210                   1u, cli_ctx.GetServerTrailingMetadata().count("custom-bin"));
211               EXPECT_EQ(val, ToString(cli_ctx.GetServerTrailingMetadata()
212                                           .find("custom-bin")
213                                           ->second));
214             }
215             std::lock_guard<std::mutex> l(mu);
216             done = true;
217             cv.notify_one();
218           });
219       std::unique_lock<std::mutex> l(mu);
220       while (!done) {
221         cv.wait(l);
222       }
223     }
224   }
225
226   void SendRpcsRawReq(int num_rpcs) {
227     grpc::string test_string("Hello raw world.");
228     EchoRequest request;
229     request.set_message(test_string);
230     std::unique_ptr<ByteBuffer> send_buf = SerializeToByteBuffer(&request);
231
232     for (int i = 0; i < num_rpcs; i++) {
233       EchoResponse response;
234       ClientContext cli_ctx;
235
236       std::mutex mu;
237       std::condition_variable cv;
238       bool done = false;
239       stub_->experimental_async()->Echo(
240           &cli_ctx, send_buf.get(), &response,
241           [&request, &response, &done, &mu, &cv](Status s) {
242             GPR_ASSERT(s.ok());
243
244             EXPECT_EQ(request.message(), response.message());
245             std::lock_guard<std::mutex> l(mu);
246             done = true;
247             cv.notify_one();
248           });
249       std::unique_lock<std::mutex> l(mu);
250       while (!done) {
251         cv.wait(l);
252       }
253     }
254   }
255
256   void SendRpcsGeneric(int num_rpcs, bool maybe_except) {
257     const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
258     grpc::string test_string("");
259     for (int i = 0; i < num_rpcs; i++) {
260       EchoRequest request;
261       std::unique_ptr<ByteBuffer> send_buf;
262       ByteBuffer recv_buf;
263       ClientContext cli_ctx;
264
265       test_string += "Hello world. ";
266       request.set_message(test_string);
267       send_buf = SerializeToByteBuffer(&request);
268
269       std::mutex mu;
270       std::condition_variable cv;
271       bool done = false;
272       generic_stub_->experimental().UnaryCall(
273           &cli_ctx, kMethodName, send_buf.get(), &recv_buf,
274           [&request, &recv_buf, &done, &mu, &cv, maybe_except](Status s) {
275             GPR_ASSERT(s.ok());
276
277             EchoResponse response;
278             EXPECT_TRUE(ParseFromByteBuffer(&recv_buf, &response));
279             EXPECT_EQ(request.message(), response.message());
280             std::lock_guard<std::mutex> l(mu);
281             done = true;
282             cv.notify_one();
283 #if GRPC_ALLOW_EXCEPTIONS
284             if (maybe_except) {
285               throw - 1;
286             }
287 #else
288             GPR_ASSERT(!maybe_except);
289 #endif
290           });
291       std::unique_lock<std::mutex> l(mu);
292       while (!done) {
293         cv.wait(l);
294       }
295     }
296   }
297
298   void SendGenericEchoAsBidi(int num_rpcs, int reuses) {
299     const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
300     grpc::string test_string("");
301     for (int i = 0; i < num_rpcs; i++) {
302       test_string += "Hello world. ";
303       class Client : public grpc::experimental::ClientBidiReactor<ByteBuffer,
304                                                                   ByteBuffer> {
305        public:
306         Client(ClientCallbackEnd2endTest* test, const grpc::string& method_name,
307                const grpc::string& test_str, int reuses)
308             : reuses_remaining_(reuses) {
309           activate_ = [this, test, method_name, test_str] {
310             if (reuses_remaining_ > 0) {
311               cli_ctx_.reset(new ClientContext);
312               reuses_remaining_--;
313               test->generic_stub_->experimental().PrepareBidiStreamingCall(
314                   cli_ctx_.get(), method_name, this);
315               request_.set_message(test_str);
316               send_buf_ = SerializeToByteBuffer(&request_);
317               StartWrite(send_buf_.get());
318               StartRead(&recv_buf_);
319               StartCall();
320             } else {
321               std::unique_lock<std::mutex> l(mu_);
322               done_ = true;
323               cv_.notify_one();
324             }
325           };
326           activate_();
327         }
328         void OnWriteDone(bool ok) override { StartWritesDone(); }
329         void OnReadDone(bool ok) override {
330           EchoResponse response;
331           EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response));
332           EXPECT_EQ(request_.message(), response.message());
333         };
334         void OnDone(const Status& s) override {
335           EXPECT_TRUE(s.ok());
336           activate_();
337         }
338         void Await() {
339           std::unique_lock<std::mutex> l(mu_);
340           while (!done_) {
341             cv_.wait(l);
342           }
343         }
344
345         EchoRequest request_;
346         std::unique_ptr<ByteBuffer> send_buf_;
347         ByteBuffer recv_buf_;
348         std::unique_ptr<ClientContext> cli_ctx_;
349         int reuses_remaining_;
350         std::function<void()> activate_;
351         std::mutex mu_;
352         std::condition_variable cv_;
353         bool done_ = false;
354       } rpc{this, kMethodName, test_string, reuses};
355
356       rpc.Await();
357     }
358   }
359   bool do_not_test_{false};
360   bool is_server_started_{false};
361   int picked_port_{0};
362   std::shared_ptr<Channel> channel_;
363   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
364   std::unique_ptr<grpc::GenericStub> generic_stub_;
365   TestServiceImpl service_;
366   CallbackTestServiceImpl callback_service_;
367   std::unique_ptr<Server> server_;
368   std::ostringstream server_address_;
369 };
370
371 TEST_P(ClientCallbackEnd2endTest, SimpleRpc) {
372   MAYBE_SKIP_TEST;
373   ResetStub();
374   SendRpcs(1, false);
375 }
376
377 TEST_P(ClientCallbackEnd2endTest, SequentialRpcs) {
378   MAYBE_SKIP_TEST;
379   ResetStub();
380   SendRpcs(10, false);
381 }
382
383 TEST_P(ClientCallbackEnd2endTest, SequentialRpcsRawReq) {
384   MAYBE_SKIP_TEST;
385   ResetStub();
386   SendRpcsRawReq(10);
387 }
388
389 TEST_P(ClientCallbackEnd2endTest, SendClientInitialMetadata) {
390   MAYBE_SKIP_TEST;
391   ResetStub();
392   SimpleRequest request;
393   SimpleResponse response;
394   ClientContext cli_ctx;
395
396   cli_ctx.AddMetadata(kCheckClientInitialMetadataKey,
397                       kCheckClientInitialMetadataVal);
398
399   std::mutex mu;
400   std::condition_variable cv;
401   bool done = false;
402   stub_->experimental_async()->CheckClientInitialMetadata(
403       &cli_ctx, &request, &response, [&done, &mu, &cv](Status s) {
404         GPR_ASSERT(s.ok());
405
406         std::lock_guard<std::mutex> l(mu);
407         done = true;
408         cv.notify_one();
409       });
410   std::unique_lock<std::mutex> l(mu);
411   while (!done) {
412     cv.wait(l);
413   }
414 }
415
416 TEST_P(ClientCallbackEnd2endTest, SimpleRpcWithBinaryMetadata) {
417   MAYBE_SKIP_TEST;
418   ResetStub();
419   SendRpcs(1, true);
420 }
421
422 TEST_P(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) {
423   MAYBE_SKIP_TEST;
424   ResetStub();
425   SendRpcs(10, true);
426 }
427
428 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) {
429   MAYBE_SKIP_TEST;
430   ResetStub();
431   SendRpcsGeneric(10, false);
432 }
433
434 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) {
435   MAYBE_SKIP_TEST;
436   ResetStub();
437   SendGenericEchoAsBidi(10, 1);
438 }
439
440 TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) {
441   MAYBE_SKIP_TEST;
442   ResetStub();
443   SendGenericEchoAsBidi(10, 10);
444 }
445
446 #if GRPC_ALLOW_EXCEPTIONS
447 TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) {
448   MAYBE_SKIP_TEST;
449   ResetStub();
450   SendRpcsGeneric(10, true);
451 }
452 #endif
453
454 TEST_P(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
455   MAYBE_SKIP_TEST;
456   ResetStub();
457   std::vector<std::thread> threads;
458   threads.reserve(10);
459   for (int i = 0; i < 10; ++i) {
460     threads.emplace_back([this] { SendRpcs(10, true); });
461   }
462   for (int i = 0; i < 10; ++i) {
463     threads[i].join();
464   }
465 }
466
467 TEST_P(ClientCallbackEnd2endTest, MultipleRpcs) {
468   MAYBE_SKIP_TEST;
469   ResetStub();
470   std::vector<std::thread> threads;
471   threads.reserve(10);
472   for (int i = 0; i < 10; ++i) {
473     threads.emplace_back([this] { SendRpcs(10, false); });
474   }
475   for (int i = 0; i < 10; ++i) {
476     threads[i].join();
477   }
478 }
479
480 TEST_P(ClientCallbackEnd2endTest, CancelRpcBeforeStart) {
481   MAYBE_SKIP_TEST;
482   ResetStub();
483   EchoRequest request;
484   EchoResponse response;
485   ClientContext context;
486   request.set_message("hello");
487   context.TryCancel();
488
489   std::mutex mu;
490   std::condition_variable cv;
491   bool done = false;
492   stub_->experimental_async()->Echo(
493       &context, &request, &response, [&response, &done, &mu, &cv](Status s) {
494         EXPECT_EQ("", response.message());
495         EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
496         std::lock_guard<std::mutex> l(mu);
497         done = true;
498         cv.notify_one();
499       });
500   std::unique_lock<std::mutex> l(mu);
501   while (!done) {
502     cv.wait(l);
503   }
504   if (GetParam().use_interceptors) {
505     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
506   }
507 }
508
509 TEST_P(ClientCallbackEnd2endTest, RequestEchoServerCancel) {
510   MAYBE_SKIP_TEST;
511   ResetStub();
512   EchoRequest request;
513   EchoResponse response;
514   ClientContext context;
515   request.set_message("hello");
516   context.AddMetadata(kServerTryCancelRequest,
517                       grpc::to_string(CANCEL_BEFORE_PROCESSING));
518
519   std::mutex mu;
520   std::condition_variable cv;
521   bool done = false;
522   stub_->experimental_async()->Echo(
523       &context, &request, &response, [&done, &mu, &cv](Status s) {
524         EXPECT_FALSE(s.ok());
525         EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
526         std::lock_guard<std::mutex> l(mu);
527         done = true;
528         cv.notify_one();
529       });
530   std::unique_lock<std::mutex> l(mu);
531   while (!done) {
532     cv.wait(l);
533   }
534 }
535
536 struct ClientCancelInfo {
537   bool cancel{false};
538   int ops_before_cancel;
539
540   ClientCancelInfo() : cancel{false} {}
541   explicit ClientCancelInfo(int ops) : cancel{true}, ops_before_cancel{ops} {}
542 };
543
544 class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> {
545  public:
546   WriteClient(grpc::testing::EchoTestService::Stub* stub,
547               ServerTryCancelRequestPhase server_try_cancel,
548               int num_msgs_to_send, ClientCancelInfo client_cancel = {})
549       : server_try_cancel_(server_try_cancel),
550         num_msgs_to_send_(num_msgs_to_send),
551         client_cancel_{client_cancel} {
552     grpc::string msg{"Hello server."};
553     for (int i = 0; i < num_msgs_to_send; i++) {
554       desired_ += msg;
555     }
556     if (server_try_cancel != DO_NOT_CANCEL) {
557       // Send server_try_cancel value in the client metadata
558       context_.AddMetadata(kServerTryCancelRequest,
559                            grpc::to_string(server_try_cancel));
560     }
561     context_.set_initial_metadata_corked(true);
562     stub->experimental_async()->RequestStream(&context_, &response_, this);
563     StartCall();
564     request_.set_message(msg);
565     MaybeWrite();
566   }
567   void OnWriteDone(bool ok) override {
568     if (ok) {
569       num_msgs_sent_++;
570       MaybeWrite();
571     }
572   }
573   void OnDone(const Status& s) override {
574     gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent_);
575     int num_to_send =
576         (client_cancel_.cancel)
577             ? std::min(num_msgs_to_send_, client_cancel_.ops_before_cancel)
578             : num_msgs_to_send_;
579     switch (server_try_cancel_) {
580       case CANCEL_BEFORE_PROCESSING:
581       case CANCEL_DURING_PROCESSING:
582         // If the RPC is canceled by server before / during messages from the
583         // client, it means that the client most likely did not get a chance to
584         // send all the messages it wanted to send. i.e num_msgs_sent <=
585         // num_msgs_to_send
586         EXPECT_LE(num_msgs_sent_, num_to_send);
587         break;
588       case DO_NOT_CANCEL:
589       case CANCEL_AFTER_PROCESSING:
590         // If the RPC was not canceled or canceled after all messages were read
591         // by the server, the client did get a chance to send all its messages
592         EXPECT_EQ(num_msgs_sent_, num_to_send);
593         break;
594       default:
595         assert(false);
596         break;
597     }
598     if ((server_try_cancel_ == DO_NOT_CANCEL) && !client_cancel_.cancel) {
599       EXPECT_TRUE(s.ok());
600       EXPECT_EQ(response_.message(), desired_);
601     } else {
602       EXPECT_FALSE(s.ok());
603       EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
604     }
605     std::unique_lock<std::mutex> l(mu_);
606     done_ = true;
607     cv_.notify_one();
608   }
609   void Await() {
610     std::unique_lock<std::mutex> l(mu_);
611     while (!done_) {
612       cv_.wait(l);
613     }
614   }
615
616  private:
617   void MaybeWrite() {
618     if (client_cancel_.cancel &&
619         num_msgs_sent_ == client_cancel_.ops_before_cancel) {
620       context_.TryCancel();
621     } else if (num_msgs_to_send_ > num_msgs_sent_ + 1) {
622       StartWrite(&request_);
623     } else if (num_msgs_to_send_ == num_msgs_sent_ + 1) {
624       StartWriteLast(&request_, WriteOptions());
625     }
626   }
627   EchoRequest request_;
628   EchoResponse response_;
629   ClientContext context_;
630   const ServerTryCancelRequestPhase server_try_cancel_;
631   int num_msgs_sent_{0};
632   const int num_msgs_to_send_;
633   grpc::string desired_;
634   const ClientCancelInfo client_cancel_;
635   std::mutex mu_;
636   std::condition_variable cv_;
637   bool done_ = false;
638 };
639
640 TEST_P(ClientCallbackEnd2endTest, RequestStream) {
641   MAYBE_SKIP_TEST;
642   ResetStub();
643   WriteClient test{stub_.get(), DO_NOT_CANCEL, 3};
644   test.Await();
645   // Make sure that the server interceptors were not notified to cancel
646   if (GetParam().use_interceptors) {
647     EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
648   }
649 }
650
651 TEST_P(ClientCallbackEnd2endTest, ClientCancelsRequestStream) {
652   MAYBE_SKIP_TEST;
653   ResetStub();
654   WriteClient test{stub_.get(), DO_NOT_CANCEL, 3, ClientCancelInfo{2}};
655   test.Await();
656   // Make sure that the server interceptors got the cancel
657   if (GetParam().use_interceptors) {
658     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
659   }
660 }
661
662 // Server to cancel before doing reading the request
663 TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelBeforeReads) {
664   MAYBE_SKIP_TEST;
665   ResetStub();
666   WriteClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 1};
667   test.Await();
668   // Make sure that the server interceptors were notified
669   if (GetParam().use_interceptors) {
670     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
671   }
672 }
673
674 // Server to cancel while reading a request from the stream in parallel
675 TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelDuringRead) {
676   MAYBE_SKIP_TEST;
677   ResetStub();
678   WriteClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10};
679   test.Await();
680   // Make sure that the server interceptors were notified
681   if (GetParam().use_interceptors) {
682     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
683   }
684 }
685
686 // Server to cancel after reading all the requests but before returning to the
687 // client
688 TEST_P(ClientCallbackEnd2endTest, RequestStreamServerCancelAfterReads) {
689   MAYBE_SKIP_TEST;
690   ResetStub();
691   WriteClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 4};
692   test.Await();
693   // Make sure that the server interceptors were notified
694   if (GetParam().use_interceptors) {
695     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
696   }
697 }
698
699 TEST_P(ClientCallbackEnd2endTest, UnaryReactor) {
700   MAYBE_SKIP_TEST;
701   ResetStub();
702   class UnaryClient : public grpc::experimental::ClientUnaryReactor {
703    public:
704     UnaryClient(grpc::testing::EchoTestService::Stub* stub) {
705       cli_ctx_.AddMetadata("key1", "val1");
706       cli_ctx_.AddMetadata("key2", "val2");
707       request_.mutable_param()->set_echo_metadata_initially(true);
708       request_.set_message("Hello metadata");
709       stub->experimental_async()->Echo(&cli_ctx_, &request_, &response_, this);
710       StartCall();
711     }
712     void OnReadInitialMetadataDone(bool ok) override {
713       EXPECT_TRUE(ok);
714       EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key1"));
715       EXPECT_EQ(
716           "val1",
717           ToString(cli_ctx_.GetServerInitialMetadata().find("key1")->second));
718       EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key2"));
719       EXPECT_EQ(
720           "val2",
721           ToString(cli_ctx_.GetServerInitialMetadata().find("key2")->second));
722       initial_metadata_done_ = true;
723     }
724     void OnDone(const Status& s) override {
725       EXPECT_TRUE(initial_metadata_done_);
726       EXPECT_EQ(0u, cli_ctx_.GetServerTrailingMetadata().size());
727       EXPECT_TRUE(s.ok());
728       EXPECT_EQ(request_.message(), response_.message());
729       std::unique_lock<std::mutex> l(mu_);
730       done_ = true;
731       cv_.notify_one();
732     }
733     void Await() {
734       std::unique_lock<std::mutex> l(mu_);
735       while (!done_) {
736         cv_.wait(l);
737       }
738     }
739
740    private:
741     EchoRequest request_;
742     EchoResponse response_;
743     ClientContext cli_ctx_;
744     std::mutex mu_;
745     std::condition_variable cv_;
746     bool done_{false};
747     bool initial_metadata_done_{false};
748   };
749
750   UnaryClient test{stub_.get()};
751   test.Await();
752   // Make sure that the server interceptors were not notified of a cancel
753   if (GetParam().use_interceptors) {
754     EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
755   }
756 }
757
758 class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
759  public:
760   ReadClient(grpc::testing::EchoTestService::Stub* stub,
761              ServerTryCancelRequestPhase server_try_cancel,
762              ClientCancelInfo client_cancel = {})
763       : server_try_cancel_(server_try_cancel), client_cancel_{client_cancel} {
764     if (server_try_cancel_ != DO_NOT_CANCEL) {
765       // Send server_try_cancel value in the client metadata
766       context_.AddMetadata(kServerTryCancelRequest,
767                            grpc::to_string(server_try_cancel));
768     }
769     request_.set_message("Hello client ");
770     stub->experimental_async()->ResponseStream(&context_, &request_, this);
771     if (client_cancel_.cancel &&
772         reads_complete_ == client_cancel_.ops_before_cancel) {
773       context_.TryCancel();
774     }
775     // Even if we cancel, read until failure because there might be responses
776     // pending
777     StartRead(&response_);
778     StartCall();
779   }
780   void OnReadDone(bool ok) override {
781     if (!ok) {
782       if (server_try_cancel_ == DO_NOT_CANCEL && !client_cancel_.cancel) {
783         EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
784       }
785     } else {
786       EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
787       EXPECT_EQ(response_.message(),
788                 request_.message() + grpc::to_string(reads_complete_));
789       reads_complete_++;
790       if (client_cancel_.cancel &&
791           reads_complete_ == client_cancel_.ops_before_cancel) {
792         context_.TryCancel();
793       }
794       // Even if we cancel, read until failure because there might be responses
795       // pending
796       StartRead(&response_);
797     }
798   }
799   void OnDone(const Status& s) override {
800     gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
801     switch (server_try_cancel_) {
802       case DO_NOT_CANCEL:
803         if (!client_cancel_.cancel || client_cancel_.ops_before_cancel >
804                                           kServerDefaultResponseStreamsToSend) {
805           EXPECT_TRUE(s.ok());
806           EXPECT_EQ(reads_complete_, kServerDefaultResponseStreamsToSend);
807         } else {
808           EXPECT_GE(reads_complete_, client_cancel_.ops_before_cancel);
809           EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
810           // Status might be ok or cancelled depending on whether server
811           // sent status before client cancel went through
812           if (!s.ok()) {
813             EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
814           }
815         }
816         break;
817       case CANCEL_BEFORE_PROCESSING:
818         EXPECT_FALSE(s.ok());
819         EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
820         EXPECT_EQ(reads_complete_, 0);
821         break;
822       case CANCEL_DURING_PROCESSING:
823       case CANCEL_AFTER_PROCESSING:
824         // If server canceled while writing messages, client must have read
825         // less than or equal to the expected number of messages. Even if the
826         // server canceled after writing all messages, the RPC may be canceled
827         // before the Client got a chance to read all the messages.
828         EXPECT_FALSE(s.ok());
829         EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
830         EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
831         break;
832       default:
833         assert(false);
834     }
835     std::unique_lock<std::mutex> l(mu_);
836     done_ = true;
837     cv_.notify_one();
838   }
839   void Await() {
840     std::unique_lock<std::mutex> l(mu_);
841     while (!done_) {
842       cv_.wait(l);
843     }
844   }
845
846  private:
847   EchoRequest request_;
848   EchoResponse response_;
849   ClientContext context_;
850   const ServerTryCancelRequestPhase server_try_cancel_;
851   int reads_complete_{0};
852   const ClientCancelInfo client_cancel_;
853   std::mutex mu_;
854   std::condition_variable cv_;
855   bool done_ = false;
856 };
857
858 TEST_P(ClientCallbackEnd2endTest, ResponseStream) {
859   MAYBE_SKIP_TEST;
860   ResetStub();
861   ReadClient test{stub_.get(), DO_NOT_CANCEL};
862   test.Await();
863   // Make sure that the server interceptors were not notified of a cancel
864   if (GetParam().use_interceptors) {
865     EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
866   }
867 }
868
869 TEST_P(ClientCallbackEnd2endTest, ClientCancelsResponseStream) {
870   MAYBE_SKIP_TEST;
871   ResetStub();
872   ReadClient test{stub_.get(), DO_NOT_CANCEL, ClientCancelInfo{2}};
873   test.Await();
874   // Because cancel in this case races with server finish, we can't be sure that
875   // server interceptors even see cancellation
876 }
877
878 // Server to cancel before sending any response messages
879 TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelBefore) {
880   MAYBE_SKIP_TEST;
881   ResetStub();
882   ReadClient test{stub_.get(), CANCEL_BEFORE_PROCESSING};
883   test.Await();
884   // Make sure that the server interceptors were notified
885   if (GetParam().use_interceptors) {
886     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
887   }
888 }
889
890 // Server to cancel while writing a response to the stream in parallel
891 TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelDuring) {
892   MAYBE_SKIP_TEST;
893   ResetStub();
894   ReadClient test{stub_.get(), CANCEL_DURING_PROCESSING};
895   test.Await();
896   // Make sure that the server interceptors were notified
897   if (GetParam().use_interceptors) {
898     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
899   }
900 }
901
902 // Server to cancel after writing all the respones to the stream but before
903 // returning to the client
904 TEST_P(ClientCallbackEnd2endTest, ResponseStreamServerCancelAfter) {
905   MAYBE_SKIP_TEST;
906   ResetStub();
907   ReadClient test{stub_.get(), CANCEL_AFTER_PROCESSING};
908   test.Await();
909   // Make sure that the server interceptors were notified
910   if (GetParam().use_interceptors) {
911     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
912   }
913 }
914
915 class BidiClient
916     : public grpc::experimental::ClientBidiReactor<EchoRequest, EchoResponse> {
917  public:
918   BidiClient(grpc::testing::EchoTestService::Stub* stub,
919              ServerTryCancelRequestPhase server_try_cancel,
920              int num_msgs_to_send, ClientCancelInfo client_cancel = {})
921       : server_try_cancel_(server_try_cancel),
922         msgs_to_send_{num_msgs_to_send},
923         client_cancel_{client_cancel} {
924     if (server_try_cancel_ != DO_NOT_CANCEL) {
925       // Send server_try_cancel value in the client metadata
926       context_.AddMetadata(kServerTryCancelRequest,
927                            grpc::to_string(server_try_cancel));
928     }
929     request_.set_message("Hello fren ");
930     stub->experimental_async()->BidiStream(&context_, this);
931     MaybeWrite();
932     StartRead(&response_);
933     StartCall();
934   }
935   void OnReadDone(bool ok) override {
936     if (!ok) {
937       if (server_try_cancel_ == DO_NOT_CANCEL) {
938         if (!client_cancel_.cancel) {
939           EXPECT_EQ(reads_complete_, msgs_to_send_);
940         } else {
941           EXPECT_LE(reads_complete_, writes_complete_);
942         }
943       }
944     } else {
945       EXPECT_LE(reads_complete_, msgs_to_send_);
946       EXPECT_EQ(response_.message(), request_.message());
947       reads_complete_++;
948       StartRead(&response_);
949     }
950   }
951   void OnWriteDone(bool ok) override {
952     if (server_try_cancel_ == DO_NOT_CANCEL) {
953       EXPECT_TRUE(ok);
954     } else if (!ok) {
955       return;
956     }
957     writes_complete_++;
958     MaybeWrite();
959   }
960   void OnDone(const Status& s) override {
961     gpr_log(GPR_INFO, "Sent %d messages", writes_complete_);
962     gpr_log(GPR_INFO, "Read %d messages", reads_complete_);
963     switch (server_try_cancel_) {
964       case DO_NOT_CANCEL:
965         if (!client_cancel_.cancel ||
966             client_cancel_.ops_before_cancel > msgs_to_send_) {
967           EXPECT_TRUE(s.ok());
968           EXPECT_EQ(writes_complete_, msgs_to_send_);
969           EXPECT_EQ(reads_complete_, writes_complete_);
970         } else {
971           EXPECT_FALSE(s.ok());
972           EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
973           EXPECT_EQ(writes_complete_, client_cancel_.ops_before_cancel);
974           EXPECT_LE(reads_complete_, writes_complete_);
975         }
976         break;
977       case CANCEL_BEFORE_PROCESSING:
978         EXPECT_FALSE(s.ok());
979         EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
980         // The RPC is canceled before the server did any work or returned any
981         // reads, but it's possible that some writes took place first from the
982         // client
983         EXPECT_LE(writes_complete_, msgs_to_send_);
984         EXPECT_EQ(reads_complete_, 0);
985         break;
986       case CANCEL_DURING_PROCESSING:
987         EXPECT_FALSE(s.ok());
988         EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
989         EXPECT_LE(writes_complete_, msgs_to_send_);
990         EXPECT_LE(reads_complete_, writes_complete_);
991         break;
992       case CANCEL_AFTER_PROCESSING:
993         EXPECT_FALSE(s.ok());
994         EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
995         EXPECT_EQ(writes_complete_, msgs_to_send_);
996         // The Server canceled after reading the last message and after writing
997         // the message to the client. However, the RPC cancellation might have
998         // taken effect before the client actually read the response.
999         EXPECT_LE(reads_complete_, writes_complete_);
1000         break;
1001       default:
1002         assert(false);
1003     }
1004     std::unique_lock<std::mutex> l(mu_);
1005     done_ = true;
1006     cv_.notify_one();
1007   }
1008   void Await() {
1009     std::unique_lock<std::mutex> l(mu_);
1010     while (!done_) {
1011       cv_.wait(l);
1012     }
1013   }
1014
1015  private:
1016   void MaybeWrite() {
1017     if (client_cancel_.cancel &&
1018         writes_complete_ == client_cancel_.ops_before_cancel) {
1019       context_.TryCancel();
1020     } else if (writes_complete_ == msgs_to_send_) {
1021       StartWritesDone();
1022     } else {
1023       StartWrite(&request_);
1024     }
1025   }
1026   EchoRequest request_;
1027   EchoResponse response_;
1028   ClientContext context_;
1029   const ServerTryCancelRequestPhase server_try_cancel_;
1030   int reads_complete_{0};
1031   int writes_complete_{0};
1032   const int msgs_to_send_;
1033   const ClientCancelInfo client_cancel_;
1034   std::mutex mu_;
1035   std::condition_variable cv_;
1036   bool done_ = false;
1037 };
1038
1039 TEST_P(ClientCallbackEnd2endTest, BidiStream) {
1040   MAYBE_SKIP_TEST;
1041   ResetStub();
1042   BidiClient test{stub_.get(), DO_NOT_CANCEL,
1043                   kServerDefaultResponseStreamsToSend};
1044   test.Await();
1045   // Make sure that the server interceptors were not notified of a cancel
1046   if (GetParam().use_interceptors) {
1047     EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
1048   }
1049 }
1050
1051 TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) {
1052   MAYBE_SKIP_TEST;
1053   ResetStub();
1054   BidiClient test{stub_.get(), DO_NOT_CANCEL,
1055                   kServerDefaultResponseStreamsToSend, ClientCancelInfo{2}};
1056   test.Await();
1057   // Make sure that the server interceptors were notified of a cancel
1058   if (GetParam().use_interceptors) {
1059     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1060   }
1061 }
1062
1063 // Server to cancel before reading/writing any requests/responses on the stream
1064 TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
1065   MAYBE_SKIP_TEST;
1066   ResetStub();
1067   BidiClient test{stub_.get(), CANCEL_BEFORE_PROCESSING, 2};
1068   test.Await();
1069   // Make sure that the server interceptors were notified
1070   if (GetParam().use_interceptors) {
1071     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1072   }
1073 }
1074
1075 // Server to cancel while reading/writing requests/responses on the stream in
1076 // parallel
1077 TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) {
1078   MAYBE_SKIP_TEST;
1079   ResetStub();
1080   BidiClient test{stub_.get(), CANCEL_DURING_PROCESSING, 10};
1081   test.Await();
1082   // Make sure that the server interceptors were notified
1083   if (GetParam().use_interceptors) {
1084     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1085   }
1086 }
1087
1088 // Server to cancel after reading/writing all requests/responses on the stream
1089 // but before returning to the client
1090 TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) {
1091   MAYBE_SKIP_TEST;
1092   ResetStub();
1093   BidiClient test{stub_.get(), CANCEL_AFTER_PROCESSING, 5};
1094   test.Await();
1095   // Make sure that the server interceptors were notified
1096   if (GetParam().use_interceptors) {
1097     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1098   }
1099 }
1100
1101 TEST_P(ClientCallbackEnd2endTest, SimultaneousReadAndWritesDone) {
1102   MAYBE_SKIP_TEST;
1103   ResetStub();
1104   class Client : public grpc::experimental::ClientBidiReactor<EchoRequest,
1105                                                               EchoResponse> {
1106    public:
1107     Client(grpc::testing::EchoTestService::Stub* stub) {
1108       request_.set_message("Hello bidi ");
1109       stub->experimental_async()->BidiStream(&context_, this);
1110       StartWrite(&request_);
1111       StartCall();
1112     }
1113     void OnReadDone(bool ok) override {
1114       EXPECT_TRUE(ok);
1115       EXPECT_EQ(response_.message(), request_.message());
1116     }
1117     void OnWriteDone(bool ok) override {
1118       EXPECT_TRUE(ok);
1119       // Now send out the simultaneous Read and WritesDone
1120       StartWritesDone();
1121       StartRead(&response_);
1122     }
1123     void OnDone(const Status& s) override {
1124       EXPECT_TRUE(s.ok());
1125       EXPECT_EQ(response_.message(), request_.message());
1126       std::unique_lock<std::mutex> l(mu_);
1127       done_ = true;
1128       cv_.notify_one();
1129     }
1130     void Await() {
1131       std::unique_lock<std::mutex> l(mu_);
1132       while (!done_) {
1133         cv_.wait(l);
1134       }
1135     }
1136
1137    private:
1138     EchoRequest request_;
1139     EchoResponse response_;
1140     ClientContext context_;
1141     std::mutex mu_;
1142     std::condition_variable cv_;
1143     bool done_ = false;
1144   } test{stub_.get()};
1145
1146   test.Await();
1147 }
1148
1149 TEST_P(ClientCallbackEnd2endTest, UnimplementedRpc) {
1150   MAYBE_SKIP_TEST;
1151   ChannelArguments args;
1152   const auto& channel_creds = GetCredentialsProvider()->GetChannelCredentials(
1153       GetParam().credentials_type, &args);
1154   std::shared_ptr<Channel> channel =
1155       (GetParam().protocol == Protocol::TCP)
1156           ? ::grpc::CreateCustomChannel(server_address_.str(), channel_creds,
1157                                         args)
1158           : server_->InProcessChannel(args);
1159   std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1160   stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
1161   EchoRequest request;
1162   EchoResponse response;
1163   ClientContext cli_ctx;
1164   request.set_message("Hello world.");
1165   std::mutex mu;
1166   std::condition_variable cv;
1167   bool done = false;
1168   stub->experimental_async()->Unimplemented(
1169       &cli_ctx, &request, &response, [&done, &mu, &cv](Status s) {
1170         EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
1171         EXPECT_EQ("", s.error_message());
1172
1173         std::lock_guard<std::mutex> l(mu);
1174         done = true;
1175         cv.notify_one();
1176       });
1177   std::unique_lock<std::mutex> l(mu);
1178   while (!done) {
1179     cv.wait(l);
1180   }
1181 }
1182
1183 TEST_P(ClientCallbackEnd2endTest,
1184        ResponseStreamExtraReactionFlowReadsUntilDone) {
1185   MAYBE_SKIP_TEST;
1186   ResetStub();
1187   class ReadAllIncomingDataClient
1188       : public grpc::experimental::ClientReadReactor<EchoResponse> {
1189    public:
1190     ReadAllIncomingDataClient(grpc::testing::EchoTestService::Stub* stub) {
1191       request_.set_message("Hello client ");
1192       stub->experimental_async()->ResponseStream(&context_, &request_, this);
1193     }
1194     bool WaitForReadDone() {
1195       std::unique_lock<std::mutex> l(mu_);
1196       while (!read_done_) {
1197         read_cv_.wait(l);
1198       }
1199       read_done_ = false;
1200       return read_ok_;
1201     }
1202     void Await() {
1203       std::unique_lock<std::mutex> l(mu_);
1204       while (!done_) {
1205         done_cv_.wait(l);
1206       }
1207     }
1208     const Status& status() {
1209       std::unique_lock<std::mutex> l(mu_);
1210       return status_;
1211     }
1212
1213    private:
1214     void OnReadDone(bool ok) override {
1215       std::unique_lock<std::mutex> l(mu_);
1216       read_ok_ = ok;
1217       read_done_ = true;
1218       read_cv_.notify_one();
1219     }
1220     void OnDone(const Status& s) override {
1221       std::unique_lock<std::mutex> l(mu_);
1222       done_ = true;
1223       status_ = s;
1224       done_cv_.notify_one();
1225     }
1226
1227     EchoRequest request_;
1228     EchoResponse response_;
1229     ClientContext context_;
1230     bool read_ok_ = false;
1231     bool read_done_ = false;
1232     std::mutex mu_;
1233     std::condition_variable read_cv_;
1234     std::condition_variable done_cv_;
1235     bool done_ = false;
1236     Status status_;
1237   } client{stub_.get()};
1238
1239   int reads_complete = 0;
1240   client.AddHold();
1241   client.StartCall();
1242
1243   EchoResponse response;
1244   bool read_ok = true;
1245   while (read_ok) {
1246     client.StartRead(&response);
1247     read_ok = client.WaitForReadDone();
1248     if (read_ok) {
1249       ++reads_complete;
1250     }
1251   }
1252   client.RemoveHold();
1253   client.Await();
1254
1255   EXPECT_EQ(kServerDefaultResponseStreamsToSend, reads_complete);
1256   EXPECT_EQ(client.status().error_code(), grpc::StatusCode::OK);
1257 }
1258
1259 std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
1260   std::vector<TestScenario> scenarios;
1261   std::vector<grpc::string> credentials_types{
1262       GetCredentialsProvider()->GetSecureCredentialsTypeList()};
1263   auto insec_ok = [] {
1264     // Only allow insecure credentials type when it is registered with the
1265     // provider. User may create providers that do not have insecure.
1266     return GetCredentialsProvider()->GetChannelCredentials(
1267                kInsecureCredentialsType, nullptr) != nullptr;
1268   };
1269   if (test_insecure && insec_ok()) {
1270     credentials_types.push_back(kInsecureCredentialsType);
1271   }
1272   GPR_ASSERT(!credentials_types.empty());
1273
1274   bool barr[]{false, true};
1275   Protocol parr[]{Protocol::INPROC, Protocol::TCP};
1276   for (Protocol p : parr) {
1277     for (const auto& cred : credentials_types) {
1278       // TODO(vjpai): Test inproc with secure credentials when feasible
1279       if (p == Protocol::INPROC &&
1280           (cred != kInsecureCredentialsType || !insec_ok())) {
1281         continue;
1282       }
1283       for (bool callback_server : barr) {
1284         for (bool use_interceptors : barr) {
1285           scenarios.emplace_back(callback_server, p, use_interceptors, cred);
1286         }
1287       }
1288     }
1289   }
1290   return scenarios;
1291 }
1292
1293 INSTANTIATE_TEST_CASE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest,
1294                         ::testing::ValuesIn(CreateTestScenarios(true)));
1295
1296 }  // namespace
1297 }  // namespace testing
1298 }  // namespace grpc
1299
1300 int main(int argc, char** argv) {
1301   grpc::testing::TestEnvironment env(argc, argv);
1302   ::testing::InitGoogleTest(&argc, argv);
1303   return RUN_ALL_TESTS();
1304 }