3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
20 #define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
22 #include <condition_variable>
28 #include <gtest/gtest.h>
30 #include <grpc/grpc.h>
31 #include <grpc/support/log.h>
32 #include <grpcpp/alarm.h>
33 #include <grpcpp/security/credentials.h>
34 #include <grpcpp/server_context.h>
36 #include "src/proto/grpc/testing/echo.grpc.pb.h"
37 #include "test/cpp/util/string_ref_helper.h"
42 const int kServerDefaultResponseStreamsToSend = 3;
43 const char* const kServerResponseStreamsToSend = "server_responses_to_send";
44 const char* const kServerTryCancelRequest = "server_try_cancel";
45 const char* const kClientTryCancelRequest = "client_try_cancel";
46 const char* const kDebugInfoTrailerKey = "debug-info-bin";
47 const char* const kServerFinishAfterNReads = "server_finish_after_n_reads";
48 const char* const kServerUseCoalescingApi = "server_use_coalescing_api";
49 const char* const kCheckClientInitialMetadataKey = "custom_client_metadata";
50 const char* const kCheckClientInitialMetadataVal = "Value for client metadata";
54 CANCEL_BEFORE_PROCESSING,
55 CANCEL_DURING_PROCESSING,
56 CANCEL_AFTER_PROCESSING
57 } ServerTryCancelRequestPhase;
60 // When echo_deadline is requested, deadline seen in the ServerContext is set in
61 // the response in seconds.
62 void MaybeEchoDeadline(ServerContextBase* context, const EchoRequest* request,
63 EchoResponse* response);
65 void CheckServerAuthContext(const ServerContextBase* context,
66 const std::string& expected_transport_security_type,
67 const std::string& expected_client_identity);
69 // Returns the number of pairs in metadata that exactly match the given
70 // key-value pair. Returns -1 if the pair wasn't found.
71 int MetadataMatchCount(
72 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
73 const std::string& key, const std::string& value);
75 int GetIntValueFromMetadataHelper(
77 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
80 int GetIntValueFromMetadata(
82 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
85 void ServerTryCancel(ServerContext* context);
86 } // namespace internal
88 class TestServiceSignaller {
90 void ClientWaitUntilRpcStarted() {
91 std::unique_lock<std::mutex> lock(mu_);
92 cv_rpc_started_.wait(lock, [this] { return rpc_started_; });
94 void ServerWaitToContinue() {
95 std::unique_lock<std::mutex> lock(mu_);
96 cv_server_continue_.wait(lock, [this] { return server_should_continue_; });
98 void SignalClientThatRpcStarted() {
99 std::unique_lock<std::mutex> lock(mu_);
101 cv_rpc_started_.notify_one();
103 void SignalServerToContinue() {
104 std::unique_lock<std::mutex> lock(mu_);
105 server_should_continue_ = true;
106 cv_server_continue_.notify_one();
111 std::condition_variable cv_rpc_started_;
112 bool rpc_started_ /* GUARDED_BY(mu_) */ = false;
113 std::condition_variable cv_server_continue_;
114 bool server_should_continue_ /* GUARDED_BY(mu_) */ = false;
117 template <typename RpcService>
118 class TestMultipleServiceImpl : public RpcService {
120 TestMultipleServiceImpl() : signal_client_(false), host_() {}
121 explicit TestMultipleServiceImpl(const std::string& host)
122 : signal_client_(false), host_(new std::string(host)) {}
124 Status Echo(ServerContext* context, const EchoRequest* request,
125 EchoResponse* response) {
126 if (request->has_param() &&
127 request->param().server_notify_client_when_started()) {
128 signaller_.SignalClientThatRpcStarted();
129 signaller_.ServerWaitToContinue();
132 // A bit of sleep to make sure that short deadline tests fail
133 if (request->has_param() && request->param().server_sleep_us() > 0) {
135 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
136 gpr_time_from_micros(request->param().server_sleep_us(),
140 if (request->has_param() && request->param().server_die()) {
141 gpr_log(GPR_ERROR, "The request should not reach application handler.");
144 if (request->has_param() && request->param().has_expected_error()) {
145 const auto& error = request->param().expected_error();
146 return Status(static_cast<StatusCode>(error.code()),
147 error.error_message(), error.binary_error_details());
149 int server_try_cancel = internal::GetIntValueFromMetadata(
150 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
151 if (server_try_cancel > DO_NOT_CANCEL) {
152 // Since this is a unary RPC, by the time this server handler is called,
153 // the 'request' message is already read from the client. So the scenarios
154 // in server_try_cancel don't make much sense. Just cancel the RPC as long
155 // as server_try_cancel is not DO_NOT_CANCEL
156 internal::ServerTryCancel(context);
157 return Status::CANCELLED;
160 response->set_message(request->message());
161 internal::MaybeEchoDeadline(context, request, response);
163 response->mutable_param()->set_host(*host_);
165 if (request->has_param() && request->param().client_cancel_after_us()) {
167 std::unique_lock<std::mutex> lock(mu_);
168 signal_client_ = true;
169 ++rpcs_waiting_for_client_cancel_;
171 while (!context->IsCancelled()) {
172 gpr_sleep_until(gpr_time_add(
173 gpr_now(GPR_CLOCK_REALTIME),
174 gpr_time_from_micros(request->param().client_cancel_after_us(),
178 std::unique_lock<std::mutex> lock(mu_);
179 --rpcs_waiting_for_client_cancel_;
181 return Status::CANCELLED;
182 } else if (request->has_param() &&
183 request->param().server_cancel_after_us()) {
184 gpr_sleep_until(gpr_time_add(
185 gpr_now(GPR_CLOCK_REALTIME),
186 gpr_time_from_micros(request->param().server_cancel_after_us(),
188 return Status::CANCELLED;
189 } else if (!request->has_param() ||
190 !request->param().skip_cancelled_check()) {
191 EXPECT_FALSE(context->IsCancelled());
194 if (request->has_param() && request->param().echo_metadata_initially()) {
195 const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
196 context->client_metadata();
197 for (const auto& metadatum : client_metadata) {
198 context->AddInitialMetadata(ToString(metadatum.first),
199 ToString(metadatum.second));
203 if (request->has_param() && request->param().echo_metadata()) {
204 const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
205 context->client_metadata();
206 for (const auto& metadatum : client_metadata) {
207 context->AddTrailingMetadata(ToString(metadatum.first),
208 ToString(metadatum.second));
210 // Terminate rpc with error and debug info in trailer.
211 if (request->param().debug_info().stack_entries_size() ||
212 !request->param().debug_info().detail().empty()) {
213 std::string serialized_debug_info =
214 request->param().debug_info().SerializeAsString();
215 context->AddTrailingMetadata(kDebugInfoTrailerKey,
216 serialized_debug_info);
217 return Status::CANCELLED;
220 if (request->has_param() &&
221 (request->param().expected_client_identity().length() > 0 ||
222 request->param().check_auth_context())) {
223 internal::CheckServerAuthContext(
224 context, request->param().expected_transport_security_type(),
225 request->param().expected_client_identity());
227 if (request->has_param() &&
228 request->param().response_message_length() > 0) {
229 response->set_message(
230 std::string(request->param().response_message_length(), '\0'));
232 if (request->has_param() && request->param().echo_peer()) {
233 response->mutable_param()->set_peer(context->peer());
238 Status Echo1(ServerContext* context, const EchoRequest* request,
239 EchoResponse* response) {
240 return Echo(context, request, response);
243 Status Echo2(ServerContext* context, const EchoRequest* request,
244 EchoResponse* response) {
245 return Echo(context, request, response);
248 Status CheckClientInitialMetadata(ServerContext* context,
249 const SimpleRequest* /*request*/,
250 SimpleResponse* /*response*/) {
251 EXPECT_EQ(internal::MetadataMatchCount(context->client_metadata(),
252 kCheckClientInitialMetadataKey,
253 kCheckClientInitialMetadataVal),
256 context->client_metadata().count(kCheckClientInitialMetadataKey));
260 // Unimplemented is left unimplemented to test the returned error.
262 Status RequestStream(ServerContext* context,
263 ServerReader<EchoRequest>* reader,
264 EchoResponse* response) {
265 // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
266 // the server by calling ServerContext::TryCancel() depending on the value:
267 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
268 // any message from the client
269 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
270 // reading messages from the client
271 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
272 // all the messages from the client
273 int server_try_cancel = internal::GetIntValueFromMetadata(
274 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
277 response->set_message("");
279 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
280 internal::ServerTryCancel(context);
281 return Status::CANCELLED;
284 std::thread* server_try_cancel_thd = nullptr;
285 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
286 server_try_cancel_thd =
287 new std::thread([context] { internal::ServerTryCancel(context); });
290 int num_msgs_read = 0;
291 while (reader->Read(&request)) {
292 response->mutable_message()->append(request.message());
294 gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read);
296 if (server_try_cancel_thd != nullptr) {
297 server_try_cancel_thd->join();
298 delete server_try_cancel_thd;
299 return Status::CANCELLED;
302 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
303 internal::ServerTryCancel(context);
304 return Status::CANCELLED;
310 // Return 'kNumResponseStreamMsgs' messages.
311 // TODO(yangg) make it generic by adding a parameter into EchoRequest
312 Status ResponseStream(ServerContext* context, const EchoRequest* request,
313 ServerWriter<EchoResponse>* writer) {
314 // If server_try_cancel is set in the metadata, the RPC is cancelled by the
315 // server by calling ServerContext::TryCancel() depending on the value:
316 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
317 // any messages to the client
318 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
319 // writing messages to the client
320 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
321 // all the messages to the client
322 int server_try_cancel = internal::GetIntValueFromMetadata(
323 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
325 int server_coalescing_api = internal::GetIntValueFromMetadata(
326 kServerUseCoalescingApi, context->client_metadata(), 0);
328 int server_responses_to_send = internal::GetIntValueFromMetadata(
329 kServerResponseStreamsToSend, context->client_metadata(),
330 kServerDefaultResponseStreamsToSend);
332 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
333 internal::ServerTryCancel(context);
334 return Status::CANCELLED;
337 EchoResponse response;
338 std::thread* server_try_cancel_thd = nullptr;
339 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
340 server_try_cancel_thd =
341 new std::thread([context] { internal::ServerTryCancel(context); });
344 for (int i = 0; i < server_responses_to_send; i++) {
345 response.set_message(request->message() + std::to_string(i));
346 if (i == server_responses_to_send - 1 && server_coalescing_api != 0) {
347 writer->WriteLast(response, WriteOptions());
349 writer->Write(response);
353 if (server_try_cancel_thd != nullptr) {
354 server_try_cancel_thd->join();
355 delete server_try_cancel_thd;
356 return Status::CANCELLED;
359 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
360 internal::ServerTryCancel(context);
361 return Status::CANCELLED;
367 Status BidiStream(ServerContext* context,
368 ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
369 // If server_try_cancel is set in the metadata, the RPC is cancelled by the
370 // server by calling ServerContext::TryCancel() depending on the value:
371 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
372 // writes any messages from/to the client
373 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
374 // reading/writing messages from/to the client
375 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
376 // reads/writes all messages from/to the client
377 int server_try_cancel = internal::GetIntValueFromMetadata(
378 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
381 EchoResponse response;
383 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
384 internal::ServerTryCancel(context);
385 return Status::CANCELLED;
388 std::thread* server_try_cancel_thd = nullptr;
389 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
390 server_try_cancel_thd =
391 new std::thread([context] { internal::ServerTryCancel(context); });
394 // kServerFinishAfterNReads suggests after how many reads, the server should
395 // write the last message and send status (coalesced using WriteLast)
396 int server_write_last = internal::GetIntValueFromMetadata(
397 kServerFinishAfterNReads, context->client_metadata(), 0);
400 while (stream->Read(&request)) {
402 gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
403 response.set_message(request.message());
404 if (read_counts == server_write_last) {
405 stream->WriteLast(response, WriteOptions());
408 stream->Write(response);
412 if (server_try_cancel_thd != nullptr) {
413 server_try_cancel_thd->join();
414 delete server_try_cancel_thd;
415 return Status::CANCELLED;
418 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
419 internal::ServerTryCancel(context);
420 return Status::CANCELLED;
426 // Unimplemented is left unimplemented to test the returned error.
427 bool signal_client() {
428 std::unique_lock<std::mutex> lock(mu_);
429 return signal_client_;
431 void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); }
432 void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
433 uint64_t RpcsWaitingForClientCancel() {
434 std::unique_lock<std::mutex> lock(mu_);
435 return rpcs_waiting_for_client_cancel_;
441 TestServiceSignaller signaller_;
442 std::unique_ptr<std::string> host_;
443 uint64_t rpcs_waiting_for_client_cancel_ = 0;
446 class CallbackTestServiceImpl
447 : public ::grpc::testing::EchoTestService::CallbackService {
449 CallbackTestServiceImpl() : signal_client_(false), host_() {}
450 explicit CallbackTestServiceImpl(const std::string& host)
451 : signal_client_(false), host_(new std::string(host)) {}
453 ServerUnaryReactor* Echo(CallbackServerContext* context,
454 const EchoRequest* request,
455 EchoResponse* response) override;
457 ServerUnaryReactor* CheckClientInitialMetadata(CallbackServerContext* context,
458 const SimpleRequest*,
459 SimpleResponse*) override;
461 ServerReadReactor<EchoRequest>* RequestStream(
462 CallbackServerContext* context, EchoResponse* response) override;
464 ServerWriteReactor<EchoResponse>* ResponseStream(
465 CallbackServerContext* context, const EchoRequest* request) override;
467 ServerBidiReactor<EchoRequest, EchoResponse>* BidiStream(
468 CallbackServerContext* context) override;
470 // Unimplemented is left unimplemented to test the returned error.
471 bool signal_client() {
472 std::unique_lock<std::mutex> lock(mu_);
473 return signal_client_;
475 void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); }
476 void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
481 TestServiceSignaller signaller_;
482 std::unique_ptr<std::string> host_;
485 using TestServiceImpl =
486 TestMultipleServiceImpl<::grpc::testing::EchoTestService::Service>;
488 } // namespace testing
491 #endif // GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H