40023c72f62e91cd861620152ff9d6af2777ea01
[platform/upstream/grpc.git] / test / cpp / end2end / end2end_test.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <mutex>
20 #include <thread>
21
22 #include <grpc/grpc.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25 #include <grpc/support/time.h>
26 #include <grpcpp/channel.h>
27 #include <grpcpp/client_context.h>
28 #include <grpcpp/create_channel.h>
29 #include <grpcpp/resource_quota.h>
30 #include <grpcpp/security/auth_metadata_processor.h>
31 #include <grpcpp/security/credentials.h>
32 #include <grpcpp/security/server_credentials.h>
33 #include <grpcpp/server.h>
34 #include <grpcpp/server_builder.h>
35 #include <grpcpp/server_context.h>
36
37 #include "src/core/lib/gpr/env.h"
38 #include "src/core/lib/iomgr/iomgr.h"
39 #include "src/core/lib/security/credentials/credentials.h"
40 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
41 #include "src/proto/grpc/testing/echo.grpc.pb.h"
42 #include "test/core/util/port.h"
43 #include "test/core/util/test_config.h"
44 #include "test/cpp/end2end/interceptors_util.h"
45 #include "test/cpp/end2end/test_service_impl.h"
46 #include "test/cpp/util/string_ref_helper.h"
47 #include "test/cpp/util/test_credentials_provider.h"
48
49 #include <gtest/gtest.h>
50
51 using grpc::testing::EchoRequest;
52 using grpc::testing::EchoResponse;
53 using grpc::testing::kTlsCredentialsType;
54 using std::chrono::system_clock;
55
56 // MAYBE_SKIP_TEST is a macro to determine if this particular test configuration
57 // should be skipped based on a decision made at SetUp time. In particular,
58 // tests that use the callback server can only be run if the iomgr can run in
59 // the background or if the transport is in-process.
60 #define MAYBE_SKIP_TEST \
61   do {                  \
62     if (do_not_test_) { \
63       return;           \
64     }                   \
65   } while (0)
66
67 namespace grpc {
68 namespace testing {
69 namespace {
70
71 bool CheckIsLocalhost(const grpc::string& addr) {
72   const grpc::string kIpv6("ipv6:[::1]:");
73   const grpc::string kIpv4MappedIpv6("ipv6:[::ffff:127.0.0.1]:");
74   const grpc::string kIpv4("ipv4:127.0.0.1:");
75   return addr.substr(0, kIpv4.size()) == kIpv4 ||
76          addr.substr(0, kIpv4MappedIpv6.size()) == kIpv4MappedIpv6 ||
77          addr.substr(0, kIpv6.size()) == kIpv6;
78 }
79
80 const char kTestCredsPluginErrorMsg[] = "Could not find plugin metadata.";
81
82 class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin {
83  public:
84   static const char kGoodMetadataKey[];
85   static const char kBadMetadataKey[];
86
87   TestMetadataCredentialsPlugin(const grpc::string_ref& metadata_key,
88                                 const grpc::string_ref& metadata_value,
89                                 bool is_blocking, bool is_successful)
90       : metadata_key_(metadata_key.data(), metadata_key.length()),
91         metadata_value_(metadata_value.data(), metadata_value.length()),
92         is_blocking_(is_blocking),
93         is_successful_(is_successful) {}
94
95   bool IsBlocking() const override { return is_blocking_; }
96
97   Status GetMetadata(
98       grpc::string_ref service_url, grpc::string_ref method_name,
99       const grpc::AuthContext& channel_auth_context,
100       std::multimap<grpc::string, grpc::string>* metadata) override {
101     EXPECT_GT(service_url.length(), 0UL);
102     EXPECT_GT(method_name.length(), 0UL);
103     EXPECT_TRUE(channel_auth_context.IsPeerAuthenticated());
104     EXPECT_TRUE(metadata != nullptr);
105     if (is_successful_) {
106       metadata->insert(std::make_pair(metadata_key_, metadata_value_));
107       return Status::OK;
108     } else {
109       return Status(StatusCode::NOT_FOUND, kTestCredsPluginErrorMsg);
110     }
111   }
112
113  private:
114   grpc::string metadata_key_;
115   grpc::string metadata_value_;
116   bool is_blocking_;
117   bool is_successful_;
118 };
119
120 const char TestMetadataCredentialsPlugin::kBadMetadataKey[] =
121     "TestPluginMetadata";
122 const char TestMetadataCredentialsPlugin::kGoodMetadataKey[] =
123     "test-plugin-metadata";
124
125 class TestAuthMetadataProcessor : public AuthMetadataProcessor {
126  public:
127   static const char kGoodGuy[];
128
129   TestAuthMetadataProcessor(bool is_blocking) : is_blocking_(is_blocking) {}
130
131   std::shared_ptr<CallCredentials> GetCompatibleClientCreds() {
132     return MetadataCredentialsFromPlugin(
133         std::unique_ptr<MetadataCredentialsPlugin>(
134             new TestMetadataCredentialsPlugin(
135                 TestMetadataCredentialsPlugin::kGoodMetadataKey, kGoodGuy,
136                 is_blocking_, true)));
137   }
138
139   std::shared_ptr<CallCredentials> GetIncompatibleClientCreds() {
140     return MetadataCredentialsFromPlugin(
141         std::unique_ptr<MetadataCredentialsPlugin>(
142             new TestMetadataCredentialsPlugin(
143                 TestMetadataCredentialsPlugin::kGoodMetadataKey, "Mr Hyde",
144                 is_blocking_, true)));
145   }
146
147   // Interface implementation
148   bool IsBlocking() const override { return is_blocking_; }
149
150   Status Process(const InputMetadata& auth_metadata, AuthContext* context,
151                  OutputMetadata* consumed_auth_metadata,
152                  OutputMetadata* response_metadata) override {
153     EXPECT_TRUE(consumed_auth_metadata != nullptr);
154     EXPECT_TRUE(context != nullptr);
155     EXPECT_TRUE(response_metadata != nullptr);
156     auto auth_md =
157         auth_metadata.find(TestMetadataCredentialsPlugin::kGoodMetadataKey);
158     EXPECT_NE(auth_md, auth_metadata.end());
159     string_ref auth_md_value = auth_md->second;
160     if (auth_md_value == kGoodGuy) {
161       context->AddProperty(kIdentityPropName, kGoodGuy);
162       context->SetPeerIdentityPropertyName(kIdentityPropName);
163       consumed_auth_metadata->insert(std::make_pair(
164           string(auth_md->first.data(), auth_md->first.length()),
165           string(auth_md->second.data(), auth_md->second.length())));
166       return Status::OK;
167     } else {
168       return Status(StatusCode::UNAUTHENTICATED,
169                     string("Invalid principal: ") +
170                         string(auth_md_value.data(), auth_md_value.length()));
171     }
172   }
173
174  private:
175   static const char kIdentityPropName[];
176   bool is_blocking_;
177 };
178
179 const char TestAuthMetadataProcessor::kGoodGuy[] = "Dr Jekyll";
180 const char TestAuthMetadataProcessor::kIdentityPropName[] = "novel identity";
181
182 class Proxy : public ::grpc::testing::EchoTestService::Service {
183  public:
184   Proxy(const std::shared_ptr<Channel>& channel)
185       : stub_(grpc::testing::EchoTestService::NewStub(channel)) {}
186
187   Status Echo(ServerContext* server_context, const EchoRequest* request,
188               EchoResponse* response) override {
189     std::unique_ptr<ClientContext> client_context =
190         ClientContext::FromServerContext(*server_context);
191     return stub_->Echo(client_context.get(), *request, response);
192   }
193
194  private:
195   std::unique_ptr<::grpc::testing::EchoTestService::Stub> stub_;
196 };
197
198 class TestServiceImplDupPkg
199     : public ::grpc::testing::duplicate::EchoTestService::Service {
200  public:
201   Status Echo(ServerContext* context, const EchoRequest* request,
202               EchoResponse* response) override {
203     response->set_message("no package");
204     return Status::OK;
205   }
206 };
207
208 class TestScenario {
209  public:
210   TestScenario(bool interceptors, bool proxy, bool inproc_stub,
211                const grpc::string& creds_type, bool use_callback_server)
212       : use_interceptors(interceptors),
213         use_proxy(proxy),
214         inproc(inproc_stub),
215         credentials_type(creds_type),
216         callback_server(use_callback_server) {}
217   void Log() const;
218   bool use_interceptors;
219   bool use_proxy;
220   bool inproc;
221   const grpc::string credentials_type;
222   bool callback_server;
223 };
224
225 static std::ostream& operator<<(std::ostream& out,
226                                 const TestScenario& scenario) {
227   return out << "TestScenario{use_interceptors="
228              << (scenario.use_interceptors ? "true" : "false")
229              << ", use_proxy=" << (scenario.use_proxy ? "true" : "false")
230              << ", inproc=" << (scenario.inproc ? "true" : "false")
231              << ", server_type="
232              << (scenario.callback_server ? "callback" : "sync")
233              << ", credentials='" << scenario.credentials_type << "'}";
234 }
235
236 void TestScenario::Log() const {
237   std::ostringstream out;
238   out << *this;
239   gpr_log(GPR_DEBUG, "%s", out.str().c_str());
240 }
241
242 class End2endTest : public ::testing::TestWithParam<TestScenario> {
243  protected:
244   End2endTest()
245       : is_server_started_(false),
246         kMaxMessageSize_(8192),
247         special_service_("special"),
248         first_picked_port_(0) {
249     GetParam().Log();
250   }
251
252   void SetUp() override {
253     if (GetParam().callback_server && !GetParam().inproc &&
254         !grpc_iomgr_run_in_background()) {
255       do_not_test_ = true;
256       return;
257     }
258   }
259
260   void TearDown() override {
261     if (is_server_started_) {
262       server_->Shutdown();
263       if (proxy_server_) proxy_server_->Shutdown();
264     }
265     if (first_picked_port_ > 0) {
266       grpc_recycle_unused_port(first_picked_port_);
267     }
268   }
269
270   void StartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
271     int port = grpc_pick_unused_port_or_die();
272     first_picked_port_ = port;
273     server_address_ << "127.0.0.1:" << port;
274     // Setup server
275     BuildAndStartServer(processor);
276   }
277
278   void RestartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) {
279     if (is_server_started_) {
280       server_->Shutdown();
281       BuildAndStartServer(processor);
282     }
283   }
284
285   void BuildAndStartServer(
286       const std::shared_ptr<AuthMetadataProcessor>& processor) {
287     ServerBuilder builder;
288     ConfigureServerBuilder(&builder);
289     auto server_creds = GetCredentialsProvider()->GetServerCredentials(
290         GetParam().credentials_type);
291     if (GetParam().credentials_type != kInsecureCredentialsType) {
292       server_creds->SetAuthMetadataProcessor(processor);
293     }
294     if (GetParam().use_interceptors) {
295       std::vector<
296           std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
297           creators;
298       // Add 20 dummy server interceptors
299       creators.reserve(20);
300       for (auto i = 0; i < 20; i++) {
301         creators.push_back(std::unique_ptr<DummyInterceptorFactory>(
302             new DummyInterceptorFactory()));
303       }
304       builder.experimental().SetInterceptorCreators(std::move(creators));
305     }
306     builder.AddListeningPort(server_address_.str(), server_creds);
307     if (!GetParam().callback_server) {
308       builder.RegisterService(&service_);
309     } else {
310       builder.RegisterService(&callback_service_);
311     }
312     builder.RegisterService("foo.test.youtube.com", &special_service_);
313     builder.RegisterService(&dup_pkg_service_);
314
315     builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
316     builder.SetSyncServerOption(
317         ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
318
319     server_ = builder.BuildAndStart();
320     is_server_started_ = true;
321   }
322
323   virtual void ConfigureServerBuilder(ServerBuilder* builder) {
324     builder->SetMaxMessageSize(
325         kMaxMessageSize_);  // For testing max message size.
326   }
327
328   void ResetChannel() {
329     if (!is_server_started_) {
330       StartServer(std::shared_ptr<AuthMetadataProcessor>());
331     }
332     EXPECT_TRUE(is_server_started_);
333     ChannelArguments args;
334     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
335         GetParam().credentials_type, &args);
336     if (!user_agent_prefix_.empty()) {
337       args.SetUserAgentPrefix(user_agent_prefix_);
338     }
339     args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test");
340
341     if (!GetParam().inproc) {
342       if (!GetParam().use_interceptors) {
343         channel_ =
344             CreateCustomChannel(server_address_.str(), channel_creds, args);
345       } else {
346         channel_ = CreateCustomChannelWithInterceptors(
347             server_address_.str(), channel_creds, args,
348             CreateDummyClientInterceptors());
349       }
350     } else {
351       if (!GetParam().use_interceptors) {
352         channel_ = server_->InProcessChannel(args);
353       } else {
354         channel_ = server_->experimental().InProcessChannelWithInterceptors(
355             args, CreateDummyClientInterceptors());
356       }
357     }
358   }
359
360   void ResetStub() {
361     ResetChannel();
362     if (GetParam().use_proxy) {
363       proxy_service_.reset(new Proxy(channel_));
364       int port = grpc_pick_unused_port_or_die();
365       std::ostringstream proxyaddr;
366       proxyaddr << "localhost:" << port;
367       ServerBuilder builder;
368       builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials());
369       builder.RegisterService(proxy_service_.get());
370
371       builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4);
372       builder.SetSyncServerOption(
373           ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10);
374
375       proxy_server_ = builder.BuildAndStart();
376
377       channel_ = CreateChannel(proxyaddr.str(), InsecureChannelCredentials());
378     }
379
380     stub_ = grpc::testing::EchoTestService::NewStub(channel_);
381     DummyInterceptor::Reset();
382   }
383
384   bool do_not_test_{false};
385   bool is_server_started_;
386   std::shared_ptr<Channel> channel_;
387   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
388   std::unique_ptr<Server> server_;
389   std::unique_ptr<Server> proxy_server_;
390   std::unique_ptr<Proxy> proxy_service_;
391   std::ostringstream server_address_;
392   const int kMaxMessageSize_;
393   TestServiceImpl service_;
394   CallbackTestServiceImpl callback_service_;
395   TestServiceImpl special_service_;
396   TestServiceImplDupPkg dup_pkg_service_;
397   grpc::string user_agent_prefix_;
398   int first_picked_port_;
399 };
400
401 static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
402                     bool with_binary_metadata) {
403   EchoRequest request;
404   EchoResponse response;
405   request.set_message("Hello hello hello hello");
406
407   for (int i = 0; i < num_rpcs; ++i) {
408     ClientContext context;
409     if (with_binary_metadata) {
410       char bytes[8] = {'\0', '\1', '\2', '\3',
411                        '\4', '\5', '\6', static_cast<char>(i)};
412       context.AddMetadata("custom-bin", grpc::string(bytes, 8));
413     }
414     context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
415     Status s = stub->Echo(&context, request, &response);
416     EXPECT_EQ(response.message(), request.message());
417     EXPECT_TRUE(s.ok());
418   }
419 }
420
421 // This class is for testing scenarios where RPCs are cancelled on the server
422 // by calling ServerContext::TryCancel()
423 class End2endServerTryCancelTest : public End2endTest {
424  protected:
425   // Helper for testing client-streaming RPCs which are cancelled on the server.
426   // Depending on the value of server_try_cancel parameter, this will test one
427   // of the following three scenarios:
428   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading
429   //   any messages from the client
430   //
431   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading
432   //   messages from the client
433   //
434   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all
435   //   the messages from the client
436   //
437   // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
438   void TestRequestStreamServerCancel(
439       ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) {
440     MAYBE_SKIP_TEST;
441     RestartServer(std::shared_ptr<AuthMetadataProcessor>());
442     ResetStub();
443     EchoRequest request;
444     EchoResponse response;
445     ClientContext context;
446
447     // Send server_try_cancel value in the client metadata
448     context.AddMetadata(kServerTryCancelRequest,
449                         grpc::to_string(server_try_cancel));
450
451     auto stream = stub_->RequestStream(&context, &response);
452
453     int num_msgs_sent = 0;
454     while (num_msgs_sent < num_msgs_to_send) {
455       request.set_message("hello");
456       if (!stream->Write(request)) {
457         break;
458       }
459       num_msgs_sent++;
460     }
461     gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
462
463     stream->WritesDone();
464     Status s = stream->Finish();
465
466     // At this point, we know for sure that RPC was cancelled by the server
467     // since we passed server_try_cancel value in the metadata. Depending on the
468     // value of server_try_cancel, the RPC might have been cancelled by the
469     // server at different stages. The following validates our expectations of
470     // number of messages sent in various cancellation scenarios:
471
472     switch (server_try_cancel) {
473       case CANCEL_BEFORE_PROCESSING:
474       case CANCEL_DURING_PROCESSING:
475         // If the RPC is cancelled by server before / during messages from the
476         // client, it means that the client most likely did not get a chance to
477         // send all the messages it wanted to send. i.e num_msgs_sent <=
478         // num_msgs_to_send
479         EXPECT_LE(num_msgs_sent, num_msgs_to_send);
480         break;
481
482       case CANCEL_AFTER_PROCESSING:
483         // If the RPC was cancelled after all messages were read by the server,
484         // the client did get a chance to send all its messages
485         EXPECT_EQ(num_msgs_sent, num_msgs_to_send);
486         break;
487
488       default:
489         gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
490                 server_try_cancel);
491         EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
492                     server_try_cancel <= CANCEL_AFTER_PROCESSING);
493         break;
494     }
495
496     EXPECT_FALSE(s.ok());
497     EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
498     // Make sure that the server interceptors were notified
499     if (GetParam().use_interceptors) {
500       EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
501     }
502   }
503
504   // Helper for testing server-streaming RPCs which are cancelled on the server.
505   // Depending on the value of server_try_cancel parameter, this will test one
506   // of the following three scenarios:
507   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing
508   //   any messages to the client
509   //
510   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing
511   //   messages to the client
512   //
513   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all
514   //   the messages to the client
515   //
516   // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
517   void TestResponseStreamServerCancel(
518       ServerTryCancelRequestPhase server_try_cancel) {
519     MAYBE_SKIP_TEST;
520     RestartServer(std::shared_ptr<AuthMetadataProcessor>());
521     ResetStub();
522     EchoRequest request;
523     EchoResponse response;
524     ClientContext context;
525
526     // Send server_try_cancel in the client metadata
527     context.AddMetadata(kServerTryCancelRequest,
528                         grpc::to_string(server_try_cancel));
529
530     request.set_message("hello");
531     auto stream = stub_->ResponseStream(&context, request);
532
533     int num_msgs_read = 0;
534     while (num_msgs_read < kServerDefaultResponseStreamsToSend) {
535       if (!stream->Read(&response)) {
536         break;
537       }
538       EXPECT_EQ(response.message(),
539                 request.message() + grpc::to_string(num_msgs_read));
540       num_msgs_read++;
541     }
542     gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
543
544     Status s = stream->Finish();
545
546     // Depending on the value of server_try_cancel, the RPC might have been
547     // cancelled by the server at different stages. The following validates our
548     // expectations of number of messages read in various cancellation
549     // scenarios:
550     switch (server_try_cancel) {
551       case CANCEL_BEFORE_PROCESSING:
552         // Server cancelled before sending any messages. Which means the client
553         // wouldn't have read any
554         EXPECT_EQ(num_msgs_read, 0);
555         break;
556
557       case CANCEL_DURING_PROCESSING:
558         // Server cancelled while writing messages. Client must have read less
559         // than or equal to the expected number of messages
560         EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
561         break;
562
563       case CANCEL_AFTER_PROCESSING:
564         // Even though the Server cancelled after writing all messages, the RPC
565         // may be cancelled before the Client got a chance to read all the
566         // messages.
567         EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend);
568         break;
569
570       default: {
571         gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
572                 server_try_cancel);
573         EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
574                     server_try_cancel <= CANCEL_AFTER_PROCESSING);
575         break;
576       }
577     }
578
579     EXPECT_FALSE(s.ok());
580     // Make sure that the server interceptors were notified
581     if (GetParam().use_interceptors) {
582       EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
583     }
584   }
585
586   // Helper for testing bidirectional-streaming RPCs which are cancelled on the
587   // server. Depending on the value of server_try_cancel parameter, this will
588   // test one of the following three scenarios:
589   //   CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/
590   //   writing any messages from/to the client
591   //
592   //   CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/
593   //   writing messages from/to the client
594   //
595   //   CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing
596   //   all the messages from/to the client
597   //
598   // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL.
599   void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel,
600                                   int num_messages) {
601     MAYBE_SKIP_TEST;
602     RestartServer(std::shared_ptr<AuthMetadataProcessor>());
603     ResetStub();
604     EchoRequest request;
605     EchoResponse response;
606     ClientContext context;
607
608     // Send server_try_cancel in the client metadata
609     context.AddMetadata(kServerTryCancelRequest,
610                         grpc::to_string(server_try_cancel));
611
612     auto stream = stub_->BidiStream(&context);
613
614     int num_msgs_read = 0;
615     int num_msgs_sent = 0;
616     while (num_msgs_sent < num_messages) {
617       request.set_message("hello " + grpc::to_string(num_msgs_sent));
618       if (!stream->Write(request)) {
619         break;
620       }
621       num_msgs_sent++;
622
623       if (!stream->Read(&response)) {
624         break;
625       }
626       num_msgs_read++;
627
628       EXPECT_EQ(response.message(), request.message());
629     }
630     gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent);
631     gpr_log(GPR_INFO, "Read %d messages", num_msgs_read);
632
633     stream->WritesDone();
634     Status s = stream->Finish();
635
636     // Depending on the value of server_try_cancel, the RPC might have been
637     // cancelled by the server at different stages. The following validates our
638     // expectations of number of messages read in various cancellation
639     // scenarios:
640     switch (server_try_cancel) {
641       case CANCEL_BEFORE_PROCESSING:
642         EXPECT_EQ(num_msgs_read, 0);
643         break;
644
645       case CANCEL_DURING_PROCESSING:
646         EXPECT_LE(num_msgs_sent, num_messages);
647         EXPECT_LE(num_msgs_read, num_msgs_sent);
648         break;
649
650       case CANCEL_AFTER_PROCESSING:
651         EXPECT_EQ(num_msgs_sent, num_messages);
652
653         // The Server cancelled after reading the last message and after writing
654         // the message to the client. However, the RPC cancellation might have
655         // taken effect before the client actually read the response.
656         EXPECT_LE(num_msgs_read, num_msgs_sent);
657         break;
658
659       default:
660         gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d",
661                 server_try_cancel);
662         EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL &&
663                     server_try_cancel <= CANCEL_AFTER_PROCESSING);
664         break;
665     }
666
667     EXPECT_FALSE(s.ok());
668     EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
669     // Make sure that the server interceptors were notified
670     if (GetParam().use_interceptors) {
671       EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
672     }
673   }
674 };
675
676 TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) {
677   MAYBE_SKIP_TEST;
678   ResetStub();
679   EchoRequest request;
680   EchoResponse response;
681   ClientContext context;
682
683   context.AddMetadata(kServerTryCancelRequest,
684                       grpc::to_string(CANCEL_BEFORE_PROCESSING));
685   Status s = stub_->Echo(&context, request, &response);
686   EXPECT_FALSE(s.ok());
687   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
688 }
689
690 // Server to cancel before doing reading the request
691 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) {
692   TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1);
693 }
694
695 // Server to cancel while reading a request from the stream in parallel
696 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) {
697   TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
698 }
699
700 // Server to cancel after reading all the requests but before returning to the
701 // client
702 TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) {
703   TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4);
704 }
705
706 // Server to cancel before sending any response messages
707 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) {
708   TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING);
709 }
710
711 // Server to cancel while writing a response to the stream in parallel
712 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) {
713   TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING);
714 }
715
716 // Server to cancel after writing all the respones to the stream but before
717 // returning to the client
718 TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) {
719   TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING);
720 }
721
722 // Server to cancel before reading/writing any requests/responses on the stream
723 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) {
724   TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2);
725 }
726
727 // Server to cancel while reading/writing requests/responses on the stream in
728 // parallel
729 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) {
730   TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10);
731 }
732
733 // Server to cancel after reading/writing all requests/responses on the stream
734 // but before returning to the client
735 TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) {
736   TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5);
737 }
738
739 TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) {
740   MAYBE_SKIP_TEST;
741   // User-Agent is an HTTP header for HTTP transports only
742   if (GetParam().inproc) {
743     return;
744   }
745   user_agent_prefix_ = "custom_prefix";
746   ResetStub();
747   EchoRequest request;
748   EchoResponse response;
749   request.set_message("Hello hello hello hello");
750   request.mutable_param()->set_echo_metadata(true);
751
752   ClientContext context;
753   Status s = stub_->Echo(&context, request, &response);
754   EXPECT_EQ(response.message(), request.message());
755   EXPECT_TRUE(s.ok());
756   const auto& trailing_metadata = context.GetServerTrailingMetadata();
757   auto iter = trailing_metadata.find("user-agent");
758   EXPECT_TRUE(iter != trailing_metadata.end());
759   grpc::string expected_prefix = user_agent_prefix_ + " grpc-c++/";
760   EXPECT_TRUE(iter->second.starts_with(expected_prefix)) << iter->second;
761 }
762
763 TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
764   MAYBE_SKIP_TEST;
765   ResetStub();
766   std::vector<std::thread> threads;
767   threads.reserve(10);
768   for (int i = 0; i < 10; ++i) {
769     threads.emplace_back(SendRpc, stub_.get(), 10, true);
770   }
771   for (int i = 0; i < 10; ++i) {
772     threads[i].join();
773   }
774 }
775
776 TEST_P(End2endTest, MultipleRpcs) {
777   MAYBE_SKIP_TEST;
778   ResetStub();
779   std::vector<std::thread> threads;
780   threads.reserve(10);
781   for (int i = 0; i < 10; ++i) {
782     threads.emplace_back(SendRpc, stub_.get(), 10, false);
783   }
784   for (int i = 0; i < 10; ++i) {
785     threads[i].join();
786   }
787 }
788
789 TEST_P(End2endTest, EmptyBinaryMetadata) {
790   MAYBE_SKIP_TEST;
791   ResetStub();
792   EchoRequest request;
793   EchoResponse response;
794   request.set_message("Hello hello hello hello");
795   ClientContext context;
796   context.AddMetadata("custom-bin", "");
797   Status s = stub_->Echo(&context, request, &response);
798   EXPECT_EQ(response.message(), request.message());
799   EXPECT_TRUE(s.ok());
800 }
801
802 TEST_P(End2endTest, ReconnectChannel) {
803   MAYBE_SKIP_TEST;
804   if (GetParam().inproc) {
805     return;
806   }
807   int poller_slowdown_factor = 1;
808   // It needs 2 pollset_works to reconnect the channel with polling engine
809   // "poll"
810   char* s = gpr_getenv("GRPC_POLL_STRATEGY");
811   if (s != nullptr && 0 == strcmp(s, "poll")) {
812     poller_slowdown_factor = 2;
813   }
814   gpr_free(s);
815   ResetStub();
816   SendRpc(stub_.get(), 1, false);
817   RestartServer(std::shared_ptr<AuthMetadataProcessor>());
818   // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to
819   // reconnect the channel.
820   gpr_sleep_until(gpr_time_add(
821       gpr_now(GPR_CLOCK_REALTIME),
822       gpr_time_from_millis(
823           300 * poller_slowdown_factor * grpc_test_slowdown_factor(),
824           GPR_TIMESPAN)));
825   SendRpc(stub_.get(), 1, false);
826 }
827
828 TEST_P(End2endTest, RequestStreamOneRequest) {
829   MAYBE_SKIP_TEST;
830   ResetStub();
831   EchoRequest request;
832   EchoResponse response;
833   ClientContext context;
834
835   auto stream = stub_->RequestStream(&context, &response);
836   request.set_message("hello");
837   EXPECT_TRUE(stream->Write(request));
838   stream->WritesDone();
839   Status s = stream->Finish();
840   EXPECT_EQ(response.message(), request.message());
841   EXPECT_TRUE(s.ok());
842   EXPECT_TRUE(context.debug_error_string().empty());
843 }
844
845 TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) {
846   MAYBE_SKIP_TEST;
847   ResetStub();
848   EchoRequest request;
849   EchoResponse response;
850   ClientContext context;
851
852   context.set_initial_metadata_corked(true);
853   auto stream = stub_->RequestStream(&context, &response);
854   request.set_message("hello");
855   stream->WriteLast(request, WriteOptions());
856   Status s = stream->Finish();
857   EXPECT_EQ(response.message(), request.message());
858   EXPECT_TRUE(s.ok());
859 }
860
861 TEST_P(End2endTest, RequestStreamTwoRequests) {
862   MAYBE_SKIP_TEST;
863   ResetStub();
864   EchoRequest request;
865   EchoResponse response;
866   ClientContext context;
867
868   auto stream = stub_->RequestStream(&context, &response);
869   request.set_message("hello");
870   EXPECT_TRUE(stream->Write(request));
871   EXPECT_TRUE(stream->Write(request));
872   stream->WritesDone();
873   Status s = stream->Finish();
874   EXPECT_EQ(response.message(), "hellohello");
875   EXPECT_TRUE(s.ok());
876 }
877
878 TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) {
879   MAYBE_SKIP_TEST;
880   ResetStub();
881   EchoRequest request;
882   EchoResponse response;
883   ClientContext context;
884
885   auto stream = stub_->RequestStream(&context, &response);
886   request.set_message("hello");
887   EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
888   EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through()));
889   stream->WritesDone();
890   Status s = stream->Finish();
891   EXPECT_EQ(response.message(), "hellohello");
892   EXPECT_TRUE(s.ok());
893 }
894
895 TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) {
896   MAYBE_SKIP_TEST;
897   ResetStub();
898   EchoRequest request;
899   EchoResponse response;
900   ClientContext context;
901
902   context.set_initial_metadata_corked(true);
903   auto stream = stub_->RequestStream(&context, &response);
904   request.set_message("hello");
905   EXPECT_TRUE(stream->Write(request));
906   stream->WriteLast(request, WriteOptions());
907   Status s = stream->Finish();
908   EXPECT_EQ(response.message(), "hellohello");
909   EXPECT_TRUE(s.ok());
910 }
911
912 TEST_P(End2endTest, ResponseStream) {
913   MAYBE_SKIP_TEST;
914   ResetStub();
915   EchoRequest request;
916   EchoResponse response;
917   ClientContext context;
918   request.set_message("hello");
919
920   auto stream = stub_->ResponseStream(&context, request);
921   for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
922     EXPECT_TRUE(stream->Read(&response));
923     EXPECT_EQ(response.message(), request.message() + grpc::to_string(i));
924   }
925   EXPECT_FALSE(stream->Read(&response));
926
927   Status s = stream->Finish();
928   EXPECT_TRUE(s.ok());
929 }
930
931 TEST_P(End2endTest, ResponseStreamWithCoalescingApi) {
932   MAYBE_SKIP_TEST;
933   ResetStub();
934   EchoRequest request;
935   EchoResponse response;
936   ClientContext context;
937   request.set_message("hello");
938   context.AddMetadata(kServerUseCoalescingApi, "1");
939
940   auto stream = stub_->ResponseStream(&context, request);
941   for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
942     EXPECT_TRUE(stream->Read(&response));
943     EXPECT_EQ(response.message(), request.message() + grpc::to_string(i));
944   }
945   EXPECT_FALSE(stream->Read(&response));
946
947   Status s = stream->Finish();
948   EXPECT_TRUE(s.ok());
949 }
950
951 // This was added to prevent regression from issue:
952 // https://github.com/grpc/grpc/issues/11546
953 TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) {
954   MAYBE_SKIP_TEST;
955   ResetStub();
956   EchoRequest request;
957   EchoResponse response;
958   ClientContext context;
959   request.set_message("hello");
960   context.AddMetadata(kServerUseCoalescingApi, "1");
961   // We will only send one message, forcing everything (init metadata, message,
962   // trailing) to be coalesced together.
963   context.AddMetadata(kServerResponseStreamsToSend, "1");
964
965   auto stream = stub_->ResponseStream(&context, request);
966   EXPECT_TRUE(stream->Read(&response));
967   EXPECT_EQ(response.message(), request.message() + "0");
968
969   EXPECT_FALSE(stream->Read(&response));
970
971   Status s = stream->Finish();
972   EXPECT_TRUE(s.ok());
973 }
974
975 TEST_P(End2endTest, BidiStream) {
976   MAYBE_SKIP_TEST;
977   ResetStub();
978   EchoRequest request;
979   EchoResponse response;
980   ClientContext context;
981   grpc::string msg("hello");
982
983   auto stream = stub_->BidiStream(&context);
984
985   for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) {
986     request.set_message(msg + grpc::to_string(i));
987     EXPECT_TRUE(stream->Write(request));
988     EXPECT_TRUE(stream->Read(&response));
989     EXPECT_EQ(response.message(), request.message());
990   }
991
992   stream->WritesDone();
993   EXPECT_FALSE(stream->Read(&response));
994   EXPECT_FALSE(stream->Read(&response));
995
996   Status s = stream->Finish();
997   EXPECT_TRUE(s.ok());
998 }
999
1000 TEST_P(End2endTest, BidiStreamWithCoalescingApi) {
1001   MAYBE_SKIP_TEST;
1002   ResetStub();
1003   EchoRequest request;
1004   EchoResponse response;
1005   ClientContext context;
1006   context.AddMetadata(kServerFinishAfterNReads, "3");
1007   context.set_initial_metadata_corked(true);
1008   grpc::string msg("hello");
1009
1010   auto stream = stub_->BidiStream(&context);
1011
1012   request.set_message(msg + "0");
1013   EXPECT_TRUE(stream->Write(request));
1014   EXPECT_TRUE(stream->Read(&response));
1015   EXPECT_EQ(response.message(), request.message());
1016
1017   request.set_message(msg + "1");
1018   EXPECT_TRUE(stream->Write(request));
1019   EXPECT_TRUE(stream->Read(&response));
1020   EXPECT_EQ(response.message(), request.message());
1021
1022   request.set_message(msg + "2");
1023   stream->WriteLast(request, WriteOptions());
1024   EXPECT_TRUE(stream->Read(&response));
1025   EXPECT_EQ(response.message(), request.message());
1026
1027   EXPECT_FALSE(stream->Read(&response));
1028   EXPECT_FALSE(stream->Read(&response));
1029
1030   Status s = stream->Finish();
1031   EXPECT_TRUE(s.ok());
1032 }
1033
1034 // This was added to prevent regression from issue:
1035 // https://github.com/grpc/grpc/issues/11546
1036 TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) {
1037   MAYBE_SKIP_TEST;
1038   ResetStub();
1039   EchoRequest request;
1040   EchoResponse response;
1041   ClientContext context;
1042   context.AddMetadata(kServerFinishAfterNReads, "1");
1043   context.set_initial_metadata_corked(true);
1044   grpc::string msg("hello");
1045
1046   auto stream = stub_->BidiStream(&context);
1047
1048   request.set_message(msg + "0");
1049   stream->WriteLast(request, WriteOptions());
1050   EXPECT_TRUE(stream->Read(&response));
1051   EXPECT_EQ(response.message(), request.message());
1052
1053   EXPECT_FALSE(stream->Read(&response));
1054   EXPECT_FALSE(stream->Read(&response));
1055
1056   Status s = stream->Finish();
1057   EXPECT_TRUE(s.ok());
1058 }
1059
1060 // Talk to the two services with the same name but different package names.
1061 // The two stubs are created on the same channel.
1062 TEST_P(End2endTest, DiffPackageServices) {
1063   MAYBE_SKIP_TEST;
1064   ResetStub();
1065   EchoRequest request;
1066   EchoResponse response;
1067   request.set_message("Hello");
1068
1069   ClientContext context;
1070   Status s = stub_->Echo(&context, request, &response);
1071   EXPECT_EQ(response.message(), request.message());
1072   EXPECT_TRUE(s.ok());
1073
1074   std::unique_ptr<grpc::testing::duplicate::EchoTestService::Stub> dup_pkg_stub(
1075       grpc::testing::duplicate::EchoTestService::NewStub(channel_));
1076   ClientContext context2;
1077   s = dup_pkg_stub->Echo(&context2, request, &response);
1078   EXPECT_EQ("no package", response.message());
1079   EXPECT_TRUE(s.ok());
1080 }
1081
1082 template <class ServiceType>
1083 void CancelRpc(ClientContext* context, int delay_us, ServiceType* service) {
1084   gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
1085                                gpr_time_from_micros(delay_us, GPR_TIMESPAN)));
1086   while (!service->signal_client()) {
1087   }
1088   context->TryCancel();
1089 }
1090
1091 TEST_P(End2endTest, CancelRpcBeforeStart) {
1092   MAYBE_SKIP_TEST;
1093   ResetStub();
1094   EchoRequest request;
1095   EchoResponse response;
1096   ClientContext context;
1097   request.set_message("hello");
1098   context.TryCancel();
1099   Status s = stub_->Echo(&context, request, &response);
1100   EXPECT_EQ("", response.message());
1101   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1102   if (GetParam().use_interceptors) {
1103     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1104   }
1105 }
1106
1107 // Client cancels request stream after sending two messages
1108 TEST_P(End2endTest, ClientCancelsRequestStream) {
1109   MAYBE_SKIP_TEST;
1110   ResetStub();
1111   EchoRequest request;
1112   EchoResponse response;
1113   ClientContext context;
1114   request.set_message("hello");
1115
1116   auto stream = stub_->RequestStream(&context, &response);
1117   EXPECT_TRUE(stream->Write(request));
1118   EXPECT_TRUE(stream->Write(request));
1119
1120   context.TryCancel();
1121
1122   Status s = stream->Finish();
1123   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1124
1125   EXPECT_EQ(response.message(), "");
1126   if (GetParam().use_interceptors) {
1127     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1128   }
1129 }
1130
1131 // Client cancels server stream after sending some messages
1132 TEST_P(End2endTest, ClientCancelsResponseStream) {
1133   MAYBE_SKIP_TEST;
1134   ResetStub();
1135   EchoRequest request;
1136   EchoResponse response;
1137   ClientContext context;
1138   request.set_message("hello");
1139
1140   auto stream = stub_->ResponseStream(&context, request);
1141
1142   EXPECT_TRUE(stream->Read(&response));
1143   EXPECT_EQ(response.message(), request.message() + "0");
1144   EXPECT_TRUE(stream->Read(&response));
1145   EXPECT_EQ(response.message(), request.message() + "1");
1146
1147   context.TryCancel();
1148
1149   // The cancellation races with responses, so there might be zero or
1150   // one responses pending, read till failure
1151
1152   if (stream->Read(&response)) {
1153     EXPECT_EQ(response.message(), request.message() + "2");
1154     // Since we have cancelled, we expect the next attempt to read to fail
1155     EXPECT_FALSE(stream->Read(&response));
1156   }
1157
1158   Status s = stream->Finish();
1159   // The final status could be either of CANCELLED or OK depending on
1160   // who won the race.
1161   EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code());
1162   if (GetParam().use_interceptors) {
1163     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1164   }
1165 }
1166
1167 // Client cancels bidi stream after sending some messages
1168 TEST_P(End2endTest, ClientCancelsBidi) {
1169   MAYBE_SKIP_TEST;
1170   ResetStub();
1171   EchoRequest request;
1172   EchoResponse response;
1173   ClientContext context;
1174   grpc::string msg("hello");
1175
1176   auto stream = stub_->BidiStream(&context);
1177
1178   request.set_message(msg + "0");
1179   EXPECT_TRUE(stream->Write(request));
1180   EXPECT_TRUE(stream->Read(&response));
1181   EXPECT_EQ(response.message(), request.message());
1182
1183   request.set_message(msg + "1");
1184   EXPECT_TRUE(stream->Write(request));
1185
1186   context.TryCancel();
1187
1188   // The cancellation races with responses, so there might be zero or
1189   // one responses pending, read till failure
1190
1191   if (stream->Read(&response)) {
1192     EXPECT_EQ(response.message(), request.message());
1193     // Since we have cancelled, we expect the next attempt to read to fail
1194     EXPECT_FALSE(stream->Read(&response));
1195   }
1196
1197   Status s = stream->Finish();
1198   EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
1199   if (GetParam().use_interceptors) {
1200     EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel());
1201   }
1202 }
1203
1204 TEST_P(End2endTest, RpcMaxMessageSize) {
1205   MAYBE_SKIP_TEST;
1206   ResetStub();
1207   EchoRequest request;
1208   EchoResponse response;
1209   request.set_message(string(kMaxMessageSize_ * 2, 'a'));
1210   request.mutable_param()->set_server_die(true);
1211
1212   ClientContext context;
1213   Status s = stub_->Echo(&context, request, &response);
1214   EXPECT_FALSE(s.ok());
1215 }
1216
1217 void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream,
1218                       gpr_event* ev) {
1219   EchoResponse resp;
1220   gpr_event_set(ev, (void*)1);
1221   while (stream->Read(&resp)) {
1222     gpr_log(GPR_INFO, "Read message");
1223   }
1224 }
1225
1226 // Run a Read and a WritesDone simultaneously.
1227 TEST_P(End2endTest, SimultaneousReadWritesDone) {
1228   MAYBE_SKIP_TEST;
1229   ResetStub();
1230   ClientContext context;
1231   gpr_event ev;
1232   gpr_event_init(&ev);
1233   auto stream = stub_->BidiStream(&context);
1234   std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev);
1235   gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME));
1236   stream->WritesDone();
1237   reader_thread.join();
1238   Status s = stream->Finish();
1239   EXPECT_TRUE(s.ok());
1240 }
1241
1242 TEST_P(End2endTest, ChannelState) {
1243   MAYBE_SKIP_TEST;
1244   if (GetParam().inproc) {
1245     return;
1246   }
1247
1248   ResetStub();
1249   // Start IDLE
1250   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false));
1251
1252   // Did not ask to connect, no state change.
1253   CompletionQueue cq;
1254   std::chrono::system_clock::time_point deadline =
1255       std::chrono::system_clock::now() + std::chrono::milliseconds(10);
1256   channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, nullptr);
1257   void* tag;
1258   bool ok = true;
1259   cq.Next(&tag, &ok);
1260   EXPECT_FALSE(ok);
1261
1262   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true));
1263   EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE,
1264                                            gpr_inf_future(GPR_CLOCK_REALTIME)));
1265   auto state = channel_->GetState(false);
1266   EXPECT_TRUE(state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_READY);
1267 }
1268
1269 // Takes 10s.
1270 TEST_P(End2endTest, ChannelStateTimeout) {
1271   if ((GetParam().credentials_type != kInsecureCredentialsType) ||
1272       GetParam().inproc) {
1273     return;
1274   }
1275   int port = grpc_pick_unused_port_or_die();
1276   std::ostringstream server_address;
1277   server_address << "127.0.0.1:" << port;
1278   // Channel to non-existing server
1279   auto channel =
1280       CreateChannel(server_address.str(), InsecureChannelCredentials());
1281   // Start IDLE
1282   EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true));
1283
1284   auto state = GRPC_CHANNEL_IDLE;
1285   for (int i = 0; i < 10; i++) {
1286     channel->WaitForStateChange(
1287         state, std::chrono::system_clock::now() + std::chrono::seconds(1));
1288     state = channel->GetState(false);
1289   }
1290 }
1291
1292 // Talking to a non-existing service.
1293 TEST_P(End2endTest, NonExistingService) {
1294   MAYBE_SKIP_TEST;
1295   ResetChannel();
1296   std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
1297   stub = grpc::testing::UnimplementedEchoService::NewStub(channel_);
1298
1299   EchoRequest request;
1300   EchoResponse response;
1301   request.set_message("Hello");
1302
1303   ClientContext context;
1304   Status s = stub->Unimplemented(&context, request, &response);
1305   EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code());
1306   EXPECT_EQ("", s.error_message());
1307 }
1308
1309 // Ask the server to send back a serialized proto in trailer.
1310 // This is an example of setting error details.
1311 TEST_P(End2endTest, BinaryTrailerTest) {
1312   MAYBE_SKIP_TEST;
1313   ResetStub();
1314   EchoRequest request;
1315   EchoResponse response;
1316   ClientContext context;
1317
1318   request.mutable_param()->set_echo_metadata(true);
1319   DebugInfo* info = request.mutable_param()->mutable_debug_info();
1320   info->add_stack_entries("stack_entry_1");
1321   info->add_stack_entries("stack_entry_2");
1322   info->add_stack_entries("stack_entry_3");
1323   info->set_detail("detailed debug info");
1324   grpc::string expected_string = info->SerializeAsString();
1325   request.set_message("Hello");
1326
1327   Status s = stub_->Echo(&context, request, &response);
1328   EXPECT_FALSE(s.ok());
1329   auto trailers = context.GetServerTrailingMetadata();
1330   EXPECT_EQ(1u, trailers.count(kDebugInfoTrailerKey));
1331   auto iter = trailers.find(kDebugInfoTrailerKey);
1332   EXPECT_EQ(expected_string, iter->second);
1333   // Parse the returned trailer into a DebugInfo proto.
1334   DebugInfo returned_info;
1335   EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second)));
1336 }
1337
1338 TEST_P(End2endTest, ExpectErrorTest) {
1339   MAYBE_SKIP_TEST;
1340   ResetStub();
1341
1342   std::vector<ErrorStatus> expected_status;
1343   expected_status.emplace_back();
1344   expected_status.back().set_code(13);  // INTERNAL
1345   // No Error message or details
1346
1347   expected_status.emplace_back();
1348   expected_status.back().set_code(13);  // INTERNAL
1349   expected_status.back().set_error_message("text error message");
1350   expected_status.back().set_binary_error_details("text error details");
1351
1352   expected_status.emplace_back();
1353   expected_status.back().set_code(13);  // INTERNAL
1354   expected_status.back().set_error_message("text error message");
1355   expected_status.back().set_binary_error_details(
1356       "\x0\x1\x2\x3\x4\x5\x6\x8\x9\xA\xB");
1357
1358   for (auto iter = expected_status.begin(); iter != expected_status.end();
1359        ++iter) {
1360     EchoRequest request;
1361     EchoResponse response;
1362     ClientContext context;
1363     request.set_message("Hello");
1364     auto* error = request.mutable_param()->mutable_expected_error();
1365     error->set_code(iter->code());
1366     error->set_error_message(iter->error_message());
1367     error->set_binary_error_details(iter->binary_error_details());
1368
1369     Status s = stub_->Echo(&context, request, &response);
1370     EXPECT_FALSE(s.ok());
1371     EXPECT_EQ(iter->code(), s.error_code());
1372     EXPECT_EQ(iter->error_message(), s.error_message());
1373     EXPECT_EQ(iter->binary_error_details(), s.error_details());
1374     EXPECT_TRUE(context.debug_error_string().find("created") !=
1375                 std::string::npos);
1376     EXPECT_TRUE(context.debug_error_string().find("file") != std::string::npos);
1377     EXPECT_TRUE(context.debug_error_string().find("line") != std::string::npos);
1378     EXPECT_TRUE(context.debug_error_string().find("status") !=
1379                 std::string::npos);
1380     EXPECT_TRUE(context.debug_error_string().find("13") != std::string::npos);
1381   }
1382 }
1383
1384 TEST_P(End2endTest, DelayedRpcEarlyCanceledUsingCancelCallback) {
1385   MAYBE_SKIP_TEST;
1386   // This test case is only relevant with callback server.
1387   // Additionally, using interceptors makes this test subject to
1388   // timing-dependent failures if the interceptors take too long to run.
1389   if (!GetParam().callback_server || GetParam().use_interceptors) {
1390     return;
1391   }
1392
1393   ResetStub();
1394   ClientContext context;
1395   context.AddMetadata(kServerUseCancelCallback,
1396                       grpc::to_string(MAYBE_USE_CALLBACK_EARLY_CANCEL));
1397   EchoRequest request;
1398   EchoResponse response;
1399   request.set_message("Hello");
1400   request.mutable_param()->set_skip_cancelled_check(true);
1401   context.TryCancel();
1402   Status s = stub_->Echo(&context, request, &response);
1403   EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1404 }
1405
1406 TEST_P(End2endTest, DelayedRpcLateCanceledUsingCancelCallback) {
1407   MAYBE_SKIP_TEST;
1408   // This test case is only relevant with callback server.
1409   // Additionally, using interceptors makes this test subject to
1410   // timing-dependent failures if the interceptors take too long to run.
1411   if (!GetParam().callback_server || GetParam().use_interceptors) {
1412     return;
1413   }
1414
1415   ResetStub();
1416   ClientContext context;
1417   context.AddMetadata(kServerUseCancelCallback,
1418                       grpc::to_string(MAYBE_USE_CALLBACK_LATE_CANCEL));
1419   EchoRequest request;
1420   EchoResponse response;
1421   request.set_message("Hello");
1422   request.mutable_param()->set_skip_cancelled_check(true);
1423   // Let server sleep for 80 ms first to give the cancellation a chance.
1424   // This is split into 40 ms to start the cancel and 40 ms extra time for
1425   // it to make it to the server, to make it highly probable that the server
1426   // RPC would have already started by the time the cancellation is sent
1427   // and the server-side gets enough time to react to it.
1428   request.mutable_param()->set_server_sleep_us(80 * 1000);
1429
1430   std::thread echo_thread{[this, &context, &request, &response] {
1431     Status s = stub_->Echo(&context, request, &response);
1432     EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1433   }};
1434   std::this_thread::sleep_for(std::chrono::microseconds(40000));
1435   context.TryCancel();
1436   echo_thread.join();
1437 }
1438
1439 TEST_P(End2endTest, DelayedRpcNonCanceledUsingCancelCallback) {
1440   MAYBE_SKIP_TEST;
1441   if (!GetParam().callback_server) {
1442     return;
1443   }
1444
1445   ResetStub();
1446   EchoRequest request;
1447   EchoResponse response;
1448   request.set_message("Hello");
1449
1450   ClientContext context;
1451   context.AddMetadata(kServerUseCancelCallback,
1452                       grpc::to_string(MAYBE_USE_CALLBACK_NO_CANCEL));
1453
1454   Status s = stub_->Echo(&context, request, &response);
1455   EXPECT_TRUE(s.ok());
1456 }
1457
1458 //////////////////////////////////////////////////////////////////////////
1459 // Test with and without a proxy.
1460 class ProxyEnd2endTest : public End2endTest {
1461  protected:
1462 };
1463
1464 TEST_P(ProxyEnd2endTest, SimpleRpc) {
1465   MAYBE_SKIP_TEST;
1466   ResetStub();
1467   SendRpc(stub_.get(), 1, false);
1468 }
1469
1470 TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) {
1471   MAYBE_SKIP_TEST;
1472   ResetStub();
1473   EchoRequest request;
1474   EchoResponse response;
1475
1476   ClientContext context;
1477   Status s = stub_->Echo(&context, request, &response);
1478   EXPECT_TRUE(s.ok());
1479 }
1480
1481 TEST_P(ProxyEnd2endTest, MultipleRpcs) {
1482   MAYBE_SKIP_TEST;
1483   ResetStub();
1484   std::vector<std::thread> threads;
1485   threads.reserve(10);
1486   for (int i = 0; i < 10; ++i) {
1487     threads.emplace_back(SendRpc, stub_.get(), 10, false);
1488   }
1489   for (int i = 0; i < 10; ++i) {
1490     threads[i].join();
1491   }
1492 }
1493
1494 // Set a 10us deadline and make sure proper error is returned.
1495 TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) {
1496   MAYBE_SKIP_TEST;
1497   ResetStub();
1498   EchoRequest request;
1499   EchoResponse response;
1500   request.set_message("Hello");
1501   request.mutable_param()->set_skip_cancelled_check(true);
1502   // Let server sleep for 40 ms first to guarantee expiry.
1503   // 40 ms might seem a bit extreme but the timer manager would have been just
1504   // initialized (when ResetStub() was called) and there are some warmup costs
1505   // i.e the timer thread many not have even started. There might also be other
1506   // delays in the timer manager thread (in acquiring locks, timer data
1507   // structure manipulations, starting backup timer threads) that add to the
1508   // delays. 40ms is still not enough in some cases but this significantly
1509   // reduces the test flakes
1510   request.mutable_param()->set_server_sleep_us(40 * 1000);
1511
1512   ClientContext context;
1513   std::chrono::system_clock::time_point deadline =
1514       std::chrono::system_clock::now() + std::chrono::milliseconds(1);
1515   context.set_deadline(deadline);
1516   Status s = stub_->Echo(&context, request, &response);
1517   EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code());
1518 }
1519
1520 // Set a long but finite deadline.
1521 TEST_P(ProxyEnd2endTest, RpcLongDeadline) {
1522   MAYBE_SKIP_TEST;
1523   ResetStub();
1524   EchoRequest request;
1525   EchoResponse response;
1526   request.set_message("Hello");
1527
1528   ClientContext context;
1529   std::chrono::system_clock::time_point deadline =
1530       std::chrono::system_clock::now() + std::chrono::hours(1);
1531   context.set_deadline(deadline);
1532   Status s = stub_->Echo(&context, request, &response);
1533   EXPECT_EQ(response.message(), request.message());
1534   EXPECT_TRUE(s.ok());
1535 }
1536
1537 // Ask server to echo back the deadline it sees.
1538 TEST_P(ProxyEnd2endTest, EchoDeadline) {
1539   MAYBE_SKIP_TEST;
1540   ResetStub();
1541   EchoRequest request;
1542   EchoResponse response;
1543   request.set_message("Hello");
1544   request.mutable_param()->set_echo_deadline(true);
1545
1546   ClientContext context;
1547   std::chrono::system_clock::time_point deadline =
1548       std::chrono::system_clock::now() + std::chrono::seconds(100);
1549   context.set_deadline(deadline);
1550   Status s = stub_->Echo(&context, request, &response);
1551   EXPECT_EQ(response.message(), request.message());
1552   EXPECT_TRUE(s.ok());
1553   gpr_timespec sent_deadline;
1554   Timepoint2Timespec(deadline, &sent_deadline);
1555   // We want to allow some reasonable error given:
1556   // - request_deadline() only has 1sec resolution so the best we can do is +-1
1557   // - if sent_deadline.tv_nsec is very close to the next second's boundary we
1558   // can end up being off by 2 in one direction.
1559   EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 2);
1560   EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1);
1561 }
1562
1563 // Ask server to echo back the deadline it sees. The rpc has no deadline.
1564 TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) {
1565   MAYBE_SKIP_TEST;
1566   ResetStub();
1567   EchoRequest request;
1568   EchoResponse response;
1569   request.set_message("Hello");
1570   request.mutable_param()->set_echo_deadline(true);
1571
1572   ClientContext context;
1573   Status s = stub_->Echo(&context, request, &response);
1574   EXPECT_EQ(response.message(), request.message());
1575   EXPECT_TRUE(s.ok());
1576   EXPECT_EQ(response.param().request_deadline(),
1577             gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec);
1578 }
1579
1580 TEST_P(ProxyEnd2endTest, UnimplementedRpc) {
1581   MAYBE_SKIP_TEST;
1582   ResetStub();
1583   EchoRequest request;
1584   EchoResponse response;
1585   request.set_message("Hello");
1586
1587   ClientContext context;
1588   Status s = stub_->Unimplemented(&context, request, &response);
1589   EXPECT_FALSE(s.ok());
1590   EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED);
1591   EXPECT_EQ(s.error_message(), "");
1592   EXPECT_EQ(response.message(), "");
1593 }
1594
1595 // Client cancels rpc after 10ms
1596 TEST_P(ProxyEnd2endTest, ClientCancelsRpc) {
1597   MAYBE_SKIP_TEST;
1598   ResetStub();
1599   EchoRequest request;
1600   EchoResponse response;
1601   request.set_message("Hello");
1602   const int kCancelDelayUs = 10 * 1000;
1603   request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs);
1604
1605   ClientContext context;
1606   std::thread cancel_thread;
1607   if (!GetParam().callback_server) {
1608     cancel_thread = std::thread(
1609         [&context, this](int delay) { CancelRpc(&context, delay, &service_); },
1610         kCancelDelayUs);
1611     // Note: the unusual pattern above (and below) is caused by a conflict
1612     // between two sets of compiler expectations. clang allows const to be
1613     // captured without mention, so there is no need to capture kCancelDelayUs
1614     // (and indeed clang-tidy complains if you do so). OTOH, a Windows compiler
1615     // in our tests requires an explicit capture even for const. We square this
1616     // circle by passing the const value in as an argument to the lambda.
1617   } else {
1618     cancel_thread = std::thread(
1619         [&context, this](int delay) {
1620           CancelRpc(&context, delay, &callback_service_);
1621         },
1622         kCancelDelayUs);
1623   }
1624   Status s = stub_->Echo(&context, request, &response);
1625   cancel_thread.join();
1626   EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1627   EXPECT_EQ(s.error_message(), "Cancelled");
1628 }
1629
1630 // Server cancels rpc after 1ms
1631 TEST_P(ProxyEnd2endTest, ServerCancelsRpc) {
1632   MAYBE_SKIP_TEST;
1633   ResetStub();
1634   EchoRequest request;
1635   EchoResponse response;
1636   request.set_message("Hello");
1637   request.mutable_param()->set_server_cancel_after_us(1000);
1638
1639   ClientContext context;
1640   Status s = stub_->Echo(&context, request, &response);
1641   EXPECT_EQ(StatusCode::CANCELLED, s.error_code());
1642   EXPECT_TRUE(s.error_message().empty());
1643 }
1644
1645 // Make the response larger than the flow control window.
1646 TEST_P(ProxyEnd2endTest, HugeResponse) {
1647   MAYBE_SKIP_TEST;
1648   ResetStub();
1649   EchoRequest request;
1650   EchoResponse response;
1651   request.set_message("huge response");
1652   const size_t kResponseSize = 1024 * (1024 + 10);
1653   request.mutable_param()->set_response_message_length(kResponseSize);
1654
1655   ClientContext context;
1656   std::chrono::system_clock::time_point deadline =
1657       std::chrono::system_clock::now() + std::chrono::seconds(20);
1658   context.set_deadline(deadline);
1659   Status s = stub_->Echo(&context, request, &response);
1660   EXPECT_EQ(kResponseSize, response.message().size());
1661   EXPECT_TRUE(s.ok());
1662 }
1663
1664 TEST_P(ProxyEnd2endTest, Peer) {
1665   MAYBE_SKIP_TEST;
1666   // Peer is not meaningful for inproc
1667   if (GetParam().inproc) {
1668     return;
1669   }
1670   ResetStub();
1671   EchoRequest request;
1672   EchoResponse response;
1673   request.set_message("hello");
1674   request.mutable_param()->set_echo_peer(true);
1675
1676   ClientContext context;
1677   Status s = stub_->Echo(&context, request, &response);
1678   EXPECT_EQ(response.message(), request.message());
1679   EXPECT_TRUE(s.ok());
1680   EXPECT_TRUE(CheckIsLocalhost(response.param().peer()));
1681   EXPECT_TRUE(CheckIsLocalhost(context.peer()));
1682 }
1683
1684 //////////////////////////////////////////////////////////////////////////
1685 class SecureEnd2endTest : public End2endTest {
1686  protected:
1687   SecureEnd2endTest() {
1688     GPR_ASSERT(!GetParam().use_proxy);
1689     GPR_ASSERT(GetParam().credentials_type != kInsecureCredentialsType);
1690   }
1691 };
1692
1693 TEST_P(SecureEnd2endTest, SimpleRpcWithHost) {
1694   MAYBE_SKIP_TEST;
1695   ResetStub();
1696
1697   EchoRequest request;
1698   EchoResponse response;
1699   request.set_message("Hello");
1700
1701   ClientContext context;
1702   context.set_authority("foo.test.youtube.com");
1703   Status s = stub_->Echo(&context, request, &response);
1704   EXPECT_EQ(response.message(), request.message());
1705   EXPECT_TRUE(response.has_param());
1706   EXPECT_EQ("special", response.param().host());
1707   EXPECT_TRUE(s.ok());
1708 }
1709
1710 bool MetadataContains(
1711     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
1712     const grpc::string& key, const grpc::string& value) {
1713   int count = 0;
1714
1715   for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter =
1716            metadata.begin();
1717        iter != metadata.end(); ++iter) {
1718     if (ToString(iter->first) == key && ToString(iter->second) == value) {
1719       count++;
1720     }
1721   }
1722   return count == 1;
1723 }
1724
1725 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) {
1726   MAYBE_SKIP_TEST;
1727   auto* processor = new TestAuthMetadataProcessor(true);
1728   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1729   ResetStub();
1730   EchoRequest request;
1731   EchoResponse response;
1732   ClientContext context;
1733   context.set_credentials(processor->GetCompatibleClientCreds());
1734   request.set_message("Hello");
1735   request.mutable_param()->set_echo_metadata(true);
1736   request.mutable_param()->set_expected_client_identity(
1737       TestAuthMetadataProcessor::kGoodGuy);
1738   request.mutable_param()->set_expected_transport_security_type(
1739       GetParam().credentials_type);
1740
1741   Status s = stub_->Echo(&context, request, &response);
1742   EXPECT_EQ(request.message(), response.message());
1743   EXPECT_TRUE(s.ok());
1744
1745   // Metadata should have been consumed by the processor.
1746   EXPECT_FALSE(MetadataContains(
1747       context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
1748       grpc::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
1749 }
1750
1751 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) {
1752   MAYBE_SKIP_TEST;
1753   auto* processor = new TestAuthMetadataProcessor(true);
1754   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1755   ResetStub();
1756   EchoRequest request;
1757   EchoResponse response;
1758   ClientContext context;
1759   context.set_credentials(processor->GetIncompatibleClientCreds());
1760   request.set_message("Hello");
1761
1762   Status s = stub_->Echo(&context, request, &response);
1763   EXPECT_FALSE(s.ok());
1764   EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
1765 }
1766
1767 TEST_P(SecureEnd2endTest, SetPerCallCredentials) {
1768   MAYBE_SKIP_TEST;
1769   ResetStub();
1770   EchoRequest request;
1771   EchoResponse response;
1772   ClientContext context;
1773   std::shared_ptr<CallCredentials> creds =
1774       GoogleIAMCredentials("fake_token", "fake_selector");
1775   context.set_credentials(creds);
1776   request.set_message("Hello");
1777   request.mutable_param()->set_echo_metadata(true);
1778
1779   Status s = stub_->Echo(&context, request, &response);
1780   EXPECT_EQ(request.message(), response.message());
1781   EXPECT_TRUE(s.ok());
1782   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1783                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1784                                "fake_token"));
1785   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1786                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1787                                "fake_selector"));
1788 }
1789
1790 TEST_P(SecureEnd2endTest, OverridePerCallCredentials) {
1791   MAYBE_SKIP_TEST;
1792   ResetStub();
1793   EchoRequest request;
1794   EchoResponse response;
1795   ClientContext context;
1796   std::shared_ptr<CallCredentials> creds1 =
1797       GoogleIAMCredentials("fake_token1", "fake_selector1");
1798   context.set_credentials(creds1);
1799   std::shared_ptr<CallCredentials> creds2 =
1800       GoogleIAMCredentials("fake_token2", "fake_selector2");
1801   context.set_credentials(creds2);
1802   request.set_message("Hello");
1803   request.mutable_param()->set_echo_metadata(true);
1804
1805   Status s = stub_->Echo(&context, request, &response);
1806   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1807                                GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1808                                "fake_token2"));
1809   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1810                                GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1811                                "fake_selector2"));
1812   EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1813                                 GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
1814                                 "fake_token1"));
1815   EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(),
1816                                 GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY,
1817                                 "fake_selector1"));
1818   EXPECT_EQ(request.message(), response.message());
1819   EXPECT_TRUE(s.ok());
1820 }
1821
1822 TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) {
1823   MAYBE_SKIP_TEST;
1824   ResetStub();
1825   EchoRequest request;
1826   EchoResponse response;
1827   ClientContext context;
1828   context.set_credentials(
1829       MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1830           new TestMetadataCredentialsPlugin(
1831               TestMetadataCredentialsPlugin::kBadMetadataKey,
1832               "Does not matter, will fail the key is invalid.", false, true))));
1833   request.set_message("Hello");
1834
1835   Status s = stub_->Echo(&context, request, &response);
1836   EXPECT_FALSE(s.ok());
1837   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1838 }
1839
1840 TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) {
1841   MAYBE_SKIP_TEST;
1842   ResetStub();
1843   EchoRequest request;
1844   EchoResponse response;
1845   ClientContext context;
1846   context.set_credentials(
1847       MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1848           new TestMetadataCredentialsPlugin(
1849               TestMetadataCredentialsPlugin::kGoodMetadataKey,
1850               "With illegal \n value.", false, true))));
1851   request.set_message("Hello");
1852
1853   Status s = stub_->Echo(&context, request, &response);
1854   EXPECT_FALSE(s.ok());
1855   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1856 }
1857
1858 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) {
1859   MAYBE_SKIP_TEST;
1860   ResetStub();
1861   EchoRequest request;
1862   EchoResponse response;
1863   ClientContext context;
1864   context.set_credentials(
1865       MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1866           new TestMetadataCredentialsPlugin(
1867               TestMetadataCredentialsPlugin::kGoodMetadataKey,
1868               "Does not matter, will fail anyway (see 3rd param)", false,
1869               false))));
1870   request.set_message("Hello");
1871
1872   Status s = stub_->Echo(&context, request, &response);
1873   EXPECT_FALSE(s.ok());
1874   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1875   EXPECT_EQ(s.error_message(),
1876             grpc::string("Getting metadata from plugin failed with error: ") +
1877                 kTestCredsPluginErrorMsg);
1878 }
1879
1880 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) {
1881   MAYBE_SKIP_TEST;
1882   auto* processor = new TestAuthMetadataProcessor(false);
1883   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1884   ResetStub();
1885   EchoRequest request;
1886   EchoResponse response;
1887   ClientContext context;
1888   context.set_credentials(processor->GetCompatibleClientCreds());
1889   request.set_message("Hello");
1890   request.mutable_param()->set_echo_metadata(true);
1891   request.mutable_param()->set_expected_client_identity(
1892       TestAuthMetadataProcessor::kGoodGuy);
1893   request.mutable_param()->set_expected_transport_security_type(
1894       GetParam().credentials_type);
1895
1896   Status s = stub_->Echo(&context, request, &response);
1897   EXPECT_EQ(request.message(), response.message());
1898   EXPECT_TRUE(s.ok());
1899
1900   // Metadata should have been consumed by the processor.
1901   EXPECT_FALSE(MetadataContains(
1902       context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY,
1903       grpc::string("Bearer ") + TestAuthMetadataProcessor::kGoodGuy));
1904 }
1905
1906 TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) {
1907   MAYBE_SKIP_TEST;
1908   auto* processor = new TestAuthMetadataProcessor(false);
1909   StartServer(std::shared_ptr<AuthMetadataProcessor>(processor));
1910   ResetStub();
1911   EchoRequest request;
1912   EchoResponse response;
1913   ClientContext context;
1914   context.set_credentials(processor->GetIncompatibleClientCreds());
1915   request.set_message("Hello");
1916
1917   Status s = stub_->Echo(&context, request, &response);
1918   EXPECT_FALSE(s.ok());
1919   EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED);
1920 }
1921
1922 TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) {
1923   MAYBE_SKIP_TEST;
1924   ResetStub();
1925   EchoRequest request;
1926   EchoResponse response;
1927   ClientContext context;
1928   context.set_credentials(
1929       MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1930           new TestMetadataCredentialsPlugin(
1931               TestMetadataCredentialsPlugin::kGoodMetadataKey,
1932               "Does not matter, will fail anyway (see 3rd param)", true,
1933               false))));
1934   request.set_message("Hello");
1935
1936   Status s = stub_->Echo(&context, request, &response);
1937   EXPECT_FALSE(s.ok());
1938   EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE);
1939   EXPECT_EQ(s.error_message(),
1940             grpc::string("Getting metadata from plugin failed with error: ") +
1941                 kTestCredsPluginErrorMsg);
1942 }
1943
1944 TEST_P(SecureEnd2endTest, CompositeCallCreds) {
1945   MAYBE_SKIP_TEST;
1946   ResetStub();
1947   EchoRequest request;
1948   EchoResponse response;
1949   ClientContext context;
1950   const char kMetadataKey1[] = "call-creds-key1";
1951   const char kMetadataKey2[] = "call-creds-key2";
1952   const char kMetadataVal1[] = "call-creds-val1";
1953   const char kMetadataVal2[] = "call-creds-val2";
1954
1955   context.set_credentials(CompositeCallCredentials(
1956       MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1957           new TestMetadataCredentialsPlugin(kMetadataKey1, kMetadataVal1, true,
1958                                             true))),
1959       MetadataCredentialsFromPlugin(std::unique_ptr<MetadataCredentialsPlugin>(
1960           new TestMetadataCredentialsPlugin(kMetadataKey2, kMetadataVal2, true,
1961                                             true)))));
1962   request.set_message("Hello");
1963   request.mutable_param()->set_echo_metadata(true);
1964
1965   Status s = stub_->Echo(&context, request, &response);
1966   EXPECT_TRUE(s.ok());
1967   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1968                                kMetadataKey1, kMetadataVal1));
1969   EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(),
1970                                kMetadataKey2, kMetadataVal2));
1971 }
1972
1973 TEST_P(SecureEnd2endTest, ClientAuthContext) {
1974   MAYBE_SKIP_TEST;
1975   ResetStub();
1976   EchoRequest request;
1977   EchoResponse response;
1978   request.set_message("Hello");
1979   request.mutable_param()->set_check_auth_context(GetParam().credentials_type ==
1980                                                   kTlsCredentialsType);
1981   request.mutable_param()->set_expected_transport_security_type(
1982       GetParam().credentials_type);
1983   ClientContext context;
1984   Status s = stub_->Echo(&context, request, &response);
1985   EXPECT_EQ(response.message(), request.message());
1986   EXPECT_TRUE(s.ok());
1987
1988   std::shared_ptr<const AuthContext> auth_ctx = context.auth_context();
1989   std::vector<grpc::string_ref> tst =
1990       auth_ctx->FindPropertyValues("transport_security_type");
1991   ASSERT_EQ(1u, tst.size());
1992   EXPECT_EQ(GetParam().credentials_type, ToString(tst[0]));
1993   if (GetParam().credentials_type == kTlsCredentialsType) {
1994     EXPECT_EQ("x509_subject_alternative_name",
1995               auth_ctx->GetPeerIdentityPropertyName());
1996     EXPECT_EQ(4u, auth_ctx->GetPeerIdentity().size());
1997     EXPECT_EQ("*.test.google.fr", ToString(auth_ctx->GetPeerIdentity()[0]));
1998     EXPECT_EQ("waterzooi.test.google.be",
1999               ToString(auth_ctx->GetPeerIdentity()[1]));
2000     EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2]));
2001     EXPECT_EQ("192.168.1.3", ToString(auth_ctx->GetPeerIdentity()[3]));
2002   }
2003 }
2004
2005 class ResourceQuotaEnd2endTest : public End2endTest {
2006  public:
2007   ResourceQuotaEnd2endTest()
2008       : server_resource_quota_("server_resource_quota") {}
2009
2010   virtual void ConfigureServerBuilder(ServerBuilder* builder) override {
2011     builder->SetResourceQuota(server_resource_quota_);
2012   }
2013
2014  private:
2015   ResourceQuota server_resource_quota_;
2016 };
2017
2018 TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) {
2019   MAYBE_SKIP_TEST;
2020   ResetStub();
2021
2022   EchoRequest request;
2023   EchoResponse response;
2024   request.set_message("Hello");
2025
2026   ClientContext context;
2027   Status s = stub_->Echo(&context, request, &response);
2028   EXPECT_EQ(response.message(), request.message());
2029   EXPECT_TRUE(s.ok());
2030 }
2031
2032 // TODO(vjpai): refactor arguments into a struct if it makes sense
2033 std::vector<TestScenario> CreateTestScenarios(bool use_proxy,
2034                                               bool test_insecure,
2035                                               bool test_secure,
2036                                               bool test_inproc,
2037                                               bool test_callback_server) {
2038   std::vector<TestScenario> scenarios;
2039   std::vector<grpc::string> credentials_types;
2040   if (test_secure) {
2041     credentials_types =
2042         GetCredentialsProvider()->GetSecureCredentialsTypeList();
2043   }
2044   auto insec_ok = [] {
2045     // Only allow insecure credentials type when it is registered with the
2046     // provider. User may create providers that do not have insecure.
2047     return GetCredentialsProvider()->GetChannelCredentials(
2048                kInsecureCredentialsType, nullptr) != nullptr;
2049   };
2050   if (test_insecure && insec_ok()) {
2051     credentials_types.push_back(kInsecureCredentialsType);
2052   }
2053
2054   // Test callback with inproc or if the event-engine allows it
2055   GPR_ASSERT(!credentials_types.empty());
2056   for (const auto& cred : credentials_types) {
2057     scenarios.emplace_back(false, false, false, cred, false);
2058     scenarios.emplace_back(true, false, false, cred, false);
2059     if (test_callback_server) {
2060       // Note that these scenarios will be dynamically disabled if the event
2061       // engine doesn't run in the background
2062       scenarios.emplace_back(false, false, false, cred, true);
2063       scenarios.emplace_back(true, false, false, cred, true);
2064     }
2065     if (use_proxy) {
2066       scenarios.emplace_back(false, true, false, cred, false);
2067       scenarios.emplace_back(true, true, false, cred, false);
2068     }
2069   }
2070   if (test_inproc && insec_ok()) {
2071     scenarios.emplace_back(false, false, true, kInsecureCredentialsType, false);
2072     scenarios.emplace_back(true, false, true, kInsecureCredentialsType, false);
2073     if (test_callback_server) {
2074       scenarios.emplace_back(false, false, true, kInsecureCredentialsType,
2075                              true);
2076       scenarios.emplace_back(true, false, true, kInsecureCredentialsType, true);
2077     }
2078   }
2079   return scenarios;
2080 }
2081
2082 INSTANTIATE_TEST_CASE_P(
2083     End2end, End2endTest,
2084     ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)));
2085
2086 INSTANTIATE_TEST_CASE_P(
2087     End2endServerTryCancel, End2endServerTryCancelTest,
2088     ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)));
2089
2090 INSTANTIATE_TEST_CASE_P(
2091     ProxyEnd2end, ProxyEnd2endTest,
2092     ::testing::ValuesIn(CreateTestScenarios(true, true, true, true, true)));
2093
2094 INSTANTIATE_TEST_CASE_P(
2095     SecureEnd2end, SecureEnd2endTest,
2096     ::testing::ValuesIn(CreateTestScenarios(false, false, true, false, true)));
2097
2098 INSTANTIATE_TEST_CASE_P(
2099     ResourceQuotaEnd2end, ResourceQuotaEnd2endTest,
2100     ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true)));
2101
2102 }  // namespace
2103 }  // namespace testing
2104 }  // namespace grpc
2105
2106 int main(int argc, char** argv) {
2107   gpr_setenv("GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS", "200");
2108   grpc::testing::TestEnvironment env(argc, argv);
2109   ::testing::InitGoogleTest(&argc, argv);
2110   return RUN_ALL_TESTS();
2111 }