Imported Upstream version 1.34.0
[platform/upstream/grpc.git] / test / cpp / end2end / test_service_impl.h
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
20 #define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
21
22 #include <condition_variable>
23 #include <memory>
24 #include <mutex>
25
26 #include <grpc/grpc.h>
27 #include <grpc/support/log.h>
28 #include <grpcpp/alarm.h>
29 #include <grpcpp/security/credentials.h>
30 #include <grpcpp/server_context.h>
31 #include <gtest/gtest.h>
32
33 #include <string>
34 #include <thread>
35
36 #include "src/proto/grpc/testing/echo.grpc.pb.h"
37 #include "test/cpp/util/string_ref_helper.h"
38
39 namespace grpc {
40 namespace testing {
41
42 const int kServerDefaultResponseStreamsToSend = 3;
43 const char* const kServerResponseStreamsToSend = "server_responses_to_send";
44 const char* const kServerTryCancelRequest = "server_try_cancel";
45 const char* const kDebugInfoTrailerKey = "debug-info-bin";
46 const char* const kServerFinishAfterNReads = "server_finish_after_n_reads";
47 const char* const kServerUseCoalescingApi = "server_use_coalescing_api";
48 const char* const kCheckClientInitialMetadataKey = "custom_client_metadata";
49 const char* const kCheckClientInitialMetadataVal = "Value for client metadata";
50
51 typedef enum {
52   DO_NOT_CANCEL = 0,
53   CANCEL_BEFORE_PROCESSING,
54   CANCEL_DURING_PROCESSING,
55   CANCEL_AFTER_PROCESSING
56 } ServerTryCancelRequestPhase;
57
58 namespace internal {
59 // When echo_deadline is requested, deadline seen in the ServerContext is set in
60 // the response in seconds.
61 void MaybeEchoDeadline(experimental::ServerContextBase* context,
62                        const EchoRequest* request, EchoResponse* response);
63
64 void CheckServerAuthContext(const experimental::ServerContextBase* context,
65                             const std::string& expected_transport_security_type,
66                             const std::string& expected_client_identity);
67
68 // Returns the number of pairs in metadata that exactly match the given
69 // key-value pair. Returns -1 if the pair wasn't found.
70 int MetadataMatchCount(
71     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
72     const std::string& key, const std::string& value);
73
74 int GetIntValueFromMetadataHelper(
75     const char* key,
76     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
77     int default_value);
78
79 int GetIntValueFromMetadata(
80     const char* key,
81     const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
82     int default_value);
83
84 void ServerTryCancel(ServerContext* context);
85 }  // namespace internal
86
87 class TestServiceSignaller {
88  public:
89   void ClientWaitUntilRpcStarted() {
90     std::unique_lock<std::mutex> lock(mu_);
91     cv_rpc_started_.wait(lock, [this] { return rpc_started_; });
92   }
93   void ServerWaitToContinue() {
94     std::unique_lock<std::mutex> lock(mu_);
95     cv_server_continue_.wait(lock, [this] { return server_should_continue_; });
96   }
97   void SignalClientThatRpcStarted() {
98     std::unique_lock<std::mutex> lock(mu_);
99     rpc_started_ = true;
100     cv_rpc_started_.notify_one();
101   }
102   void SignalServerToContinue() {
103     std::unique_lock<std::mutex> lock(mu_);
104     server_should_continue_ = true;
105     cv_server_continue_.notify_one();
106   }
107
108  private:
109   std::mutex mu_;
110   std::condition_variable cv_rpc_started_;
111   bool rpc_started_ /* GUARDED_BY(mu_) */ = false;
112   std::condition_variable cv_server_continue_;
113   bool server_should_continue_ /* GUARDED_BY(mu_) */ = false;
114 };
115
116 template <typename RpcService>
117 class TestMultipleServiceImpl : public RpcService {
118  public:
119   TestMultipleServiceImpl() : signal_client_(false), host_() {}
120   explicit TestMultipleServiceImpl(const std::string& host)
121       : signal_client_(false), host_(new std::string(host)) {}
122
123   Status Echo(ServerContext* context, const EchoRequest* request,
124               EchoResponse* response) {
125     if (request->has_param() &&
126         request->param().server_notify_client_when_started()) {
127       signaller_.SignalClientThatRpcStarted();
128       signaller_.ServerWaitToContinue();
129     }
130
131     // A bit of sleep to make sure that short deadline tests fail
132     if (request->has_param() && request->param().server_sleep_us() > 0) {
133       gpr_sleep_until(
134           gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
135                        gpr_time_from_micros(request->param().server_sleep_us(),
136                                             GPR_TIMESPAN)));
137     }
138
139     if (request->has_param() && request->param().server_die()) {
140       gpr_log(GPR_ERROR, "The request should not reach application handler.");
141       GPR_ASSERT(0);
142     }
143     if (request->has_param() && request->param().has_expected_error()) {
144       const auto& error = request->param().expected_error();
145       return Status(static_cast<StatusCode>(error.code()),
146                     error.error_message(), error.binary_error_details());
147     }
148     int server_try_cancel = internal::GetIntValueFromMetadata(
149         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
150     if (server_try_cancel > DO_NOT_CANCEL) {
151       // Since this is a unary RPC, by the time this server handler is called,
152       // the 'request' message is already read from the client. So the scenarios
153       // in server_try_cancel don't make much sense. Just cancel the RPC as long
154       // as server_try_cancel is not DO_NOT_CANCEL
155       internal::ServerTryCancel(context);
156       return Status::CANCELLED;
157     }
158
159     response->set_message(request->message());
160     internal::MaybeEchoDeadline(context, request, response);
161     if (host_) {
162       response->mutable_param()->set_host(*host_);
163     }
164     if (request->has_param() && request->param().client_cancel_after_us()) {
165       {
166         std::unique_lock<std::mutex> lock(mu_);
167         signal_client_ = true;
168         ++rpcs_waiting_for_client_cancel_;
169       }
170       while (!context->IsCancelled()) {
171         gpr_sleep_until(gpr_time_add(
172             gpr_now(GPR_CLOCK_REALTIME),
173             gpr_time_from_micros(request->param().client_cancel_after_us(),
174                                  GPR_TIMESPAN)));
175       }
176       {
177         std::unique_lock<std::mutex> lock(mu_);
178         --rpcs_waiting_for_client_cancel_;
179       }
180       return Status::CANCELLED;
181     } else if (request->has_param() &&
182                request->param().server_cancel_after_us()) {
183       gpr_sleep_until(gpr_time_add(
184           gpr_now(GPR_CLOCK_REALTIME),
185           gpr_time_from_micros(request->param().server_cancel_after_us(),
186                                GPR_TIMESPAN)));
187       return Status::CANCELLED;
188     } else if (!request->has_param() ||
189                !request->param().skip_cancelled_check()) {
190       EXPECT_FALSE(context->IsCancelled());
191     }
192
193     if (request->has_param() && request->param().echo_metadata_initially()) {
194       const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
195           context->client_metadata();
196       for (const auto& metadatum : client_metadata) {
197         context->AddInitialMetadata(ToString(metadatum.first),
198                                     ToString(metadatum.second));
199       }
200     }
201
202     if (request->has_param() && request->param().echo_metadata()) {
203       const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
204           context->client_metadata();
205       for (const auto& metadatum : client_metadata) {
206         context->AddTrailingMetadata(ToString(metadatum.first),
207                                      ToString(metadatum.second));
208       }
209       // Terminate rpc with error and debug info in trailer.
210       if (request->param().debug_info().stack_entries_size() ||
211           !request->param().debug_info().detail().empty()) {
212         std::string serialized_debug_info =
213             request->param().debug_info().SerializeAsString();
214         context->AddTrailingMetadata(kDebugInfoTrailerKey,
215                                      serialized_debug_info);
216         return Status::CANCELLED;
217       }
218     }
219     if (request->has_param() &&
220         (request->param().expected_client_identity().length() > 0 ||
221          request->param().check_auth_context())) {
222       internal::CheckServerAuthContext(
223           context, request->param().expected_transport_security_type(),
224           request->param().expected_client_identity());
225     }
226     if (request->has_param() &&
227         request->param().response_message_length() > 0) {
228       response->set_message(
229           std::string(request->param().response_message_length(), '\0'));
230     }
231     if (request->has_param() && request->param().echo_peer()) {
232       response->mutable_param()->set_peer(context->peer());
233     }
234     return Status::OK;
235   }
236
237   Status Echo1(ServerContext* context, const EchoRequest* request,
238                EchoResponse* response) {
239     return Echo(context, request, response);
240   }
241
242   Status Echo2(ServerContext* context, const EchoRequest* request,
243                EchoResponse* response) {
244     return Echo(context, request, response);
245   }
246
247   Status CheckClientInitialMetadata(ServerContext* context,
248                                     const SimpleRequest* /*request*/,
249                                     SimpleResponse* /*response*/) {
250     EXPECT_EQ(internal::MetadataMatchCount(context->client_metadata(),
251                                            kCheckClientInitialMetadataKey,
252                                            kCheckClientInitialMetadataVal),
253               1);
254     EXPECT_EQ(1u,
255               context->client_metadata().count(kCheckClientInitialMetadataKey));
256     return Status::OK;
257   }
258
259   // Unimplemented is left unimplemented to test the returned error.
260
261   Status RequestStream(ServerContext* context,
262                        ServerReader<EchoRequest>* reader,
263                        EchoResponse* response) {
264     // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
265     // the server by calling ServerContext::TryCancel() depending on the value:
266     //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
267     //   any message from the client
268     //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
269     //   reading messages from the client
270     //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
271     //   all the messages from the client
272     int server_try_cancel = internal::GetIntValueFromMetadata(
273         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
274
275     EchoRequest request;
276     response->set_message("");
277
278     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
279       internal::ServerTryCancel(context);
280       return Status::CANCELLED;
281     }
282
283     std::thread* server_try_cancel_thd = nullptr;
284     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
285       server_try_cancel_thd =
286           new std::thread([context] { internal::ServerTryCancel(context); });
287     }
288
289     int num_msgs_read = 0;
290     while (reader->Read(&request)) {
291       response->mutable_message()->append(request.message());
292     }
293     gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read);
294
295     if (server_try_cancel_thd != nullptr) {
296       server_try_cancel_thd->join();
297       delete server_try_cancel_thd;
298       return Status::CANCELLED;
299     }
300
301     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
302       internal::ServerTryCancel(context);
303       return Status::CANCELLED;
304     }
305
306     return Status::OK;
307   }
308
309   // Return 'kNumResponseStreamMsgs' messages.
310   // TODO(yangg) make it generic by adding a parameter into EchoRequest
311   Status ResponseStream(ServerContext* context, const EchoRequest* request,
312                         ServerWriter<EchoResponse>* writer) {
313     // If server_try_cancel is set in the metadata, the RPC is cancelled by the
314     // server by calling ServerContext::TryCancel() depending on the value:
315     //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
316     //   any messages to the client
317     //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
318     //   writing messages to the client
319     //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
320     //   all the messages to the client
321     int server_try_cancel = internal::GetIntValueFromMetadata(
322         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
323
324     int server_coalescing_api = internal::GetIntValueFromMetadata(
325         kServerUseCoalescingApi, context->client_metadata(), 0);
326
327     int server_responses_to_send = internal::GetIntValueFromMetadata(
328         kServerResponseStreamsToSend, context->client_metadata(),
329         kServerDefaultResponseStreamsToSend);
330
331     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
332       internal::ServerTryCancel(context);
333       return Status::CANCELLED;
334     }
335
336     EchoResponse response;
337     std::thread* server_try_cancel_thd = nullptr;
338     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
339       server_try_cancel_thd =
340           new std::thread([context] { internal::ServerTryCancel(context); });
341     }
342
343     for (int i = 0; i < server_responses_to_send; i++) {
344       response.set_message(request->message() + std::to_string(i));
345       if (i == server_responses_to_send - 1 && server_coalescing_api != 0) {
346         writer->WriteLast(response, WriteOptions());
347       } else {
348         writer->Write(response);
349       }
350     }
351
352     if (server_try_cancel_thd != nullptr) {
353       server_try_cancel_thd->join();
354       delete server_try_cancel_thd;
355       return Status::CANCELLED;
356     }
357
358     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
359       internal::ServerTryCancel(context);
360       return Status::CANCELLED;
361     }
362
363     return Status::OK;
364   }
365
366   Status BidiStream(ServerContext* context,
367                     ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
368     // If server_try_cancel is set in the metadata, the RPC is cancelled by the
369     // server by calling ServerContext::TryCancel() depending on the value:
370     //   CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
371     //   writes any messages from/to the client
372     //   CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
373     //   reading/writing messages from/to the client
374     //   CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
375     //   reads/writes all messages from/to the client
376     int server_try_cancel = internal::GetIntValueFromMetadata(
377         kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
378
379     EchoRequest request;
380     EchoResponse response;
381
382     if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
383       internal::ServerTryCancel(context);
384       return Status::CANCELLED;
385     }
386
387     std::thread* server_try_cancel_thd = nullptr;
388     if (server_try_cancel == CANCEL_DURING_PROCESSING) {
389       server_try_cancel_thd =
390           new std::thread([context] { internal::ServerTryCancel(context); });
391     }
392
393     // kServerFinishAfterNReads suggests after how many reads, the server should
394     // write the last message and send status (coalesced using WriteLast)
395     int server_write_last = internal::GetIntValueFromMetadata(
396         kServerFinishAfterNReads, context->client_metadata(), 0);
397
398     int read_counts = 0;
399     while (stream->Read(&request)) {
400       read_counts++;
401       gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
402       response.set_message(request.message());
403       if (read_counts == server_write_last) {
404         stream->WriteLast(response, WriteOptions());
405       } else {
406         stream->Write(response);
407       }
408     }
409
410     if (server_try_cancel_thd != nullptr) {
411       server_try_cancel_thd->join();
412       delete server_try_cancel_thd;
413       return Status::CANCELLED;
414     }
415
416     if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
417       internal::ServerTryCancel(context);
418       return Status::CANCELLED;
419     }
420
421     return Status::OK;
422   }
423
424   // Unimplemented is left unimplemented to test the returned error.
425   bool signal_client() {
426     std::unique_lock<std::mutex> lock(mu_);
427     return signal_client_;
428   }
429   void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); }
430   void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
431   uint64_t RpcsWaitingForClientCancel() {
432     std::unique_lock<std::mutex> lock(mu_);
433     return rpcs_waiting_for_client_cancel_;
434   }
435
436  private:
437   bool signal_client_;
438   std::mutex mu_;
439   TestServiceSignaller signaller_;
440   std::unique_ptr<std::string> host_;
441   uint64_t rpcs_waiting_for_client_cancel_ = 0;
442 };
443
444 class CallbackTestServiceImpl
445     : public ::grpc::testing::EchoTestService::ExperimentalCallbackService {
446  public:
447   CallbackTestServiceImpl() : signal_client_(false), host_() {}
448   explicit CallbackTestServiceImpl(const std::string& host)
449       : signal_client_(false), host_(new std::string(host)) {}
450
451   experimental::ServerUnaryReactor* Echo(
452       experimental::CallbackServerContext* context, const EchoRequest* request,
453       EchoResponse* response) override;
454
455   experimental::ServerUnaryReactor* CheckClientInitialMetadata(
456       experimental::CallbackServerContext* context, const SimpleRequest*,
457       SimpleResponse*) override;
458
459   experimental::ServerReadReactor<EchoRequest>* RequestStream(
460       experimental::CallbackServerContext* context,
461       EchoResponse* response) override;
462
463   experimental::ServerWriteReactor<EchoResponse>* ResponseStream(
464       experimental::CallbackServerContext* context,
465       const EchoRequest* request) override;
466
467   experimental::ServerBidiReactor<EchoRequest, EchoResponse>* BidiStream(
468       experimental::CallbackServerContext* context) override;
469
470   // Unimplemented is left unimplemented to test the returned error.
471   bool signal_client() {
472     std::unique_lock<std::mutex> lock(mu_);
473     return signal_client_;
474   }
475   void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); }
476   void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
477
478  private:
479   bool signal_client_;
480   std::mutex mu_;
481   TestServiceSignaller signaller_;
482   std::unique_ptr<std::string> host_;
483 };
484
485 using TestServiceImpl =
486     TestMultipleServiceImpl<::grpc::testing::EchoTestService::Service>;
487
488 }  // namespace testing
489 }  // namespace grpc
490
491 #endif  // GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H