3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include "test/cpp/end2end/test_service_impl.h"
24 #include <grpc/support/log.h>
25 #include <grpcpp/security/credentials.h>
26 #include <grpcpp/server_context.h>
28 #include "src/proto/grpc/testing/echo.grpc.pb.h"
29 #include "test/cpp/util/string_ref_helper.h"
31 #include <gtest/gtest.h>
33 using std::chrono::system_clock;
39 // When echo_deadline is requested, deadline seen in the ServerContext is set in
40 // the response in seconds.
41 void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
42 EchoResponse* response) {
43 if (request->has_param() && request->param().echo_deadline()) {
44 gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
45 if (context->deadline() != system_clock::time_point::max()) {
46 Timepoint2Timespec(context->deadline(), &deadline);
48 response->mutable_param()->set_request_deadline(deadline.tv_sec);
52 void CheckServerAuthContext(
53 const ServerContext* context,
54 const grpc::string& expected_transport_security_type,
55 const grpc::string& expected_client_identity) {
56 std::shared_ptr<const AuthContext> auth_ctx = context->auth_context();
57 std::vector<grpc::string_ref> tst =
58 auth_ctx->FindPropertyValues("transport_security_type");
59 EXPECT_EQ(1u, tst.size());
60 EXPECT_EQ(expected_transport_security_type, ToString(tst[0]));
61 if (expected_client_identity.empty()) {
62 EXPECT_TRUE(auth_ctx->GetPeerIdentityPropertyName().empty());
63 EXPECT_TRUE(auth_ctx->GetPeerIdentity().empty());
64 EXPECT_FALSE(auth_ctx->IsPeerAuthenticated());
66 auto identity = auth_ctx->GetPeerIdentity();
67 EXPECT_TRUE(auth_ctx->IsPeerAuthenticated());
68 EXPECT_EQ(1u, identity.size());
69 EXPECT_EQ(expected_client_identity, identity[0]);
73 // Returns the number of pairs in metadata that exactly match the given
74 // key-value pair. Returns -1 if the pair wasn't found.
75 int MetadataMatchCount(
76 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
77 const grpc::string& key, const grpc::string& value) {
79 for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter =
81 iter != metadata.end(); ++iter) {
82 if (ToString(iter->first) == key && ToString(iter->second) == value) {
91 int GetIntValueFromMetadataHelper(
93 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
95 if (metadata.find(key) != metadata.end()) {
96 std::istringstream iss(ToString(metadata.find(key)->second));
98 gpr_log(GPR_INFO, "%s : %d", key, default_value);
101 return default_value;
104 int GetIntValueFromMetadata(
106 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
108 return GetIntValueFromMetadataHelper(key, metadata, default_value);
111 void ServerTryCancel(ServerContext* context) {
112 EXPECT_FALSE(context->IsCancelled());
113 context->TryCancel();
114 gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
115 // Now wait until it's really canceled
116 while (!context->IsCancelled()) {
117 gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
118 gpr_time_from_micros(1000, GPR_TIMESPAN)));
122 void ServerTryCancelNonblocking(ServerContext* context) {
123 EXPECT_FALSE(context->IsCancelled());
124 context->TryCancel();
125 gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
128 void LoopUntilCancelled(Alarm* alarm, ServerContext* context,
129 experimental::ServerCallbackRpcController* controller,
131 if (!context->IsCancelled()) {
132 alarm->experimental().Set(
133 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
134 gpr_time_from_micros(loop_delay_us, GPR_TIMESPAN)),
135 [alarm, context, controller, loop_delay_us](bool) {
136 LoopUntilCancelled(alarm, context, controller, loop_delay_us);
139 controller->Finish(Status::CANCELLED);
144 Status TestServiceImpl::Echo(ServerContext* context, const EchoRequest* request,
145 EchoResponse* response) {
146 gpr_log(GPR_DEBUG, "Request message was %s", request->message().c_str());
147 // A bit of sleep to make sure that short deadline tests fail
148 if (request->has_param() && request->param().server_sleep_us() > 0) {
150 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
151 gpr_time_from_micros(request->param().server_sleep_us(),
155 if (request->has_param() && request->param().server_die()) {
156 gpr_log(GPR_ERROR, "The request should not reach application handler.");
159 if (request->has_param() && request->param().has_expected_error()) {
160 const auto& error = request->param().expected_error();
161 return Status(static_cast<StatusCode>(error.code()), error.error_message(),
162 error.binary_error_details());
164 int server_try_cancel = GetIntValueFromMetadata(
165 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
166 if (server_try_cancel > DO_NOT_CANCEL) {
167 // Since this is a unary RPC, by the time this server handler is called,
168 // the 'request' message is already read from the client. So the scenarios
169 // in server_try_cancel don't make much sense. Just cancel the RPC as long
170 // as server_try_cancel is not DO_NOT_CANCEL
171 ServerTryCancel(context);
172 return Status::CANCELLED;
175 response->set_message(request->message());
176 MaybeEchoDeadline(context, request, response);
178 response->mutable_param()->set_host(*host_);
180 if (request->has_param() && request->param().client_cancel_after_us()) {
182 std::unique_lock<std::mutex> lock(mu_);
183 signal_client_ = true;
185 while (!context->IsCancelled()) {
186 gpr_sleep_until(gpr_time_add(
187 gpr_now(GPR_CLOCK_REALTIME),
188 gpr_time_from_micros(request->param().client_cancel_after_us(),
191 return Status::CANCELLED;
192 } else if (request->has_param() &&
193 request->param().server_cancel_after_us()) {
194 gpr_sleep_until(gpr_time_add(
195 gpr_now(GPR_CLOCK_REALTIME),
196 gpr_time_from_micros(request->param().server_cancel_after_us(),
198 return Status::CANCELLED;
199 } else if (!request->has_param() ||
200 !request->param().skip_cancelled_check()) {
201 EXPECT_FALSE(context->IsCancelled());
204 if (request->has_param() && request->param().echo_metadata()) {
205 const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
206 context->client_metadata();
207 for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
208 iter = client_metadata.begin();
209 iter != client_metadata.end(); ++iter) {
210 context->AddTrailingMetadata(ToString(iter->first),
211 ToString(iter->second));
213 // Terminate rpc with error and debug info in trailer.
214 if (request->param().debug_info().stack_entries_size() ||
215 !request->param().debug_info().detail().empty()) {
216 grpc::string serialized_debug_info =
217 request->param().debug_info().SerializeAsString();
218 context->AddTrailingMetadata(kDebugInfoTrailerKey, serialized_debug_info);
219 return Status::CANCELLED;
222 if (request->has_param() &&
223 (request->param().expected_client_identity().length() > 0 ||
224 request->param().check_auth_context())) {
225 CheckServerAuthContext(context,
226 request->param().expected_transport_security_type(),
227 request->param().expected_client_identity());
229 if (request->has_param() && request->param().response_message_length() > 0) {
230 response->set_message(
231 grpc::string(request->param().response_message_length(), '\0'));
233 if (request->has_param() && request->param().echo_peer()) {
234 response->mutable_param()->set_peer(context->peer());
239 Status TestServiceImpl::CheckClientInitialMetadata(ServerContext* context,
240 const SimpleRequest* request,
241 SimpleResponse* response) {
242 EXPECT_EQ(MetadataMatchCount(context->client_metadata(),
243 kCheckClientInitialMetadataKey,
244 kCheckClientInitialMetadataVal),
247 context->client_metadata().count(kCheckClientInitialMetadataKey));
251 void CallbackTestServiceImpl::Echo(
252 ServerContext* context, const EchoRequest* request, EchoResponse* response,
253 experimental::ServerCallbackRpcController* controller) {
254 CancelState* cancel_state = new CancelState;
255 int server_use_cancel_callback =
256 GetIntValueFromMetadata(kServerUseCancelCallback,
257 context->client_metadata(), DO_NOT_USE_CALLBACK);
258 if (server_use_cancel_callback != DO_NOT_USE_CALLBACK) {
259 controller->SetCancelCallback([cancel_state] {
260 EXPECT_FALSE(cancel_state->callback_invoked.exchange(
261 true, std::memory_order_relaxed));
263 if (server_use_cancel_callback == MAYBE_USE_CALLBACK_EARLY_CANCEL) {
264 EXPECT_TRUE(context->IsCancelled());
266 cancel_state->callback_invoked.load(std::memory_order_relaxed));
268 EXPECT_FALSE(context->IsCancelled());
270 cancel_state->callback_invoked.load(std::memory_order_relaxed));
273 // A bit of sleep to make sure that short deadline tests fail
274 if (request->has_param() && request->param().server_sleep_us() > 0) {
275 // Set an alarm for that much time
276 alarm_.experimental().Set(
277 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
278 gpr_time_from_micros(request->param().server_sleep_us(),
280 [this, context, request, response, controller, cancel_state](bool) {
281 EchoNonDelayed(context, request, response, controller, cancel_state);
284 EchoNonDelayed(context, request, response, controller, cancel_state);
288 void CallbackTestServiceImpl::CheckClientInitialMetadata(
289 ServerContext* context, const SimpleRequest* request,
290 SimpleResponse* response,
291 experimental::ServerCallbackRpcController* controller) {
292 EXPECT_EQ(MetadataMatchCount(context->client_metadata(),
293 kCheckClientInitialMetadataKey,
294 kCheckClientInitialMetadataVal),
297 context->client_metadata().count(kCheckClientInitialMetadataKey));
298 controller->Finish(Status::OK);
301 void CallbackTestServiceImpl::EchoNonDelayed(
302 ServerContext* context, const EchoRequest* request, EchoResponse* response,
303 experimental::ServerCallbackRpcController* controller,
304 CancelState* cancel_state) {
305 int server_use_cancel_callback =
306 GetIntValueFromMetadata(kServerUseCancelCallback,
307 context->client_metadata(), DO_NOT_USE_CALLBACK);
309 // Safe to clear cancel callback even if it wasn't set
310 controller->ClearCancelCallback();
311 if (server_use_cancel_callback == MAYBE_USE_CALLBACK_EARLY_CANCEL ||
312 server_use_cancel_callback == MAYBE_USE_CALLBACK_LATE_CANCEL) {
313 EXPECT_TRUE(context->IsCancelled());
314 EXPECT_TRUE(cancel_state->callback_invoked.load(std::memory_order_relaxed));
316 controller->Finish(Status::CANCELLED);
320 EXPECT_FALSE(cancel_state->callback_invoked.load(std::memory_order_relaxed));
323 if (request->has_param() && request->param().server_die()) {
324 gpr_log(GPR_ERROR, "The request should not reach application handler.");
327 if (request->has_param() && request->param().has_expected_error()) {
328 const auto& error = request->param().expected_error();
329 controller->Finish(Status(static_cast<StatusCode>(error.code()),
330 error.error_message(),
331 error.binary_error_details()));
334 int server_try_cancel = GetIntValueFromMetadata(
335 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
336 if (server_try_cancel > DO_NOT_CANCEL) {
337 // Since this is a unary RPC, by the time this server handler is called,
338 // the 'request' message is already read from the client. So the scenarios
339 // in server_try_cancel don't make much sense. Just cancel the RPC as long
340 // as server_try_cancel is not DO_NOT_CANCEL
341 EXPECT_FALSE(context->IsCancelled());
342 context->TryCancel();
343 gpr_log(GPR_INFO, "Server called TryCancel() to cancel the request");
345 if (server_use_cancel_callback == DO_NOT_USE_CALLBACK) {
346 // Now wait until it's really canceled
347 LoopUntilCancelled(&alarm_, context, controller, 1000);
352 gpr_log(GPR_DEBUG, "Request message was %s", request->message().c_str());
353 response->set_message(request->message());
354 MaybeEchoDeadline(context, request, response);
356 response->mutable_param()->set_host(*host_);
358 if (request->has_param() && request->param().client_cancel_after_us()) {
360 std::unique_lock<std::mutex> lock(mu_);
361 signal_client_ = true;
363 if (server_use_cancel_callback == DO_NOT_USE_CALLBACK) {
364 // Now wait until it's really canceled
365 LoopUntilCancelled(&alarm_, context, controller,
366 request->param().client_cancel_after_us());
369 } else if (request->has_param() &&
370 request->param().server_cancel_after_us()) {
371 alarm_.experimental().Set(
373 gpr_now(GPR_CLOCK_REALTIME),
374 gpr_time_from_micros(request->param().server_cancel_after_us(),
376 [controller](bool) { controller->Finish(Status::CANCELLED); });
378 } else if (!request->has_param() ||
379 !request->param().skip_cancelled_check()) {
380 EXPECT_FALSE(context->IsCancelled());
383 if (request->has_param() && request->param().echo_metadata()) {
384 const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
385 context->client_metadata();
386 for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator
387 iter = client_metadata.begin();
388 iter != client_metadata.end(); ++iter) {
389 context->AddTrailingMetadata(ToString(iter->first),
390 ToString(iter->second));
392 // Terminate rpc with error and debug info in trailer.
393 if (request->param().debug_info().stack_entries_size() ||
394 !request->param().debug_info().detail().empty()) {
395 grpc::string serialized_debug_info =
396 request->param().debug_info().SerializeAsString();
397 context->AddTrailingMetadata(kDebugInfoTrailerKey, serialized_debug_info);
398 controller->Finish(Status::CANCELLED);
402 if (request->has_param() &&
403 (request->param().expected_client_identity().length() > 0 ||
404 request->param().check_auth_context())) {
405 CheckServerAuthContext(context,
406 request->param().expected_transport_security_type(),
407 request->param().expected_client_identity());
409 if (request->has_param() && request->param().response_message_length() > 0) {
410 response->set_message(
411 grpc::string(request->param().response_message_length(), '\0'));
413 if (request->has_param() && request->param().echo_peer()) {
414 response->mutable_param()->set_peer(context->peer());
416 controller->Finish(Status::OK);
419 // Unimplemented is left unimplemented to test the returned error.
421 Status TestServiceImpl::RequestStream(ServerContext* context,
422 ServerReader<EchoRequest>* reader,
423 EchoResponse* response) {
424 // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
425 // the server by calling ServerContext::TryCancel() depending on the value:
426 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
427 // any message from the client
428 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
429 // reading messages from the client
430 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
431 // all the messages from the client
432 int server_try_cancel = GetIntValueFromMetadata(
433 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
436 response->set_message("");
438 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
439 ServerTryCancel(context);
440 return Status::CANCELLED;
443 std::thread* server_try_cancel_thd = nullptr;
444 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
445 server_try_cancel_thd =
446 new std::thread([context] { ServerTryCancel(context); });
449 int num_msgs_read = 0;
450 while (reader->Read(&request)) {
451 response->mutable_message()->append(request.message());
453 gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read);
455 if (server_try_cancel_thd != nullptr) {
456 server_try_cancel_thd->join();
457 delete server_try_cancel_thd;
458 return Status::CANCELLED;
461 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
462 ServerTryCancel(context);
463 return Status::CANCELLED;
469 // Return 'kNumResponseStreamMsgs' messages.
470 // TODO(yangg) make it generic by adding a parameter into EchoRequest
471 Status TestServiceImpl::ResponseStream(ServerContext* context,
472 const EchoRequest* request,
473 ServerWriter<EchoResponse>* writer) {
474 // If server_try_cancel is set in the metadata, the RPC is cancelled by the
475 // server by calling ServerContext::TryCancel() depending on the value:
476 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
477 // any messages to the client
478 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
479 // writing messages to the client
480 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
481 // all the messages to the client
482 int server_try_cancel = GetIntValueFromMetadata(
483 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
485 int server_coalescing_api = GetIntValueFromMetadata(
486 kServerUseCoalescingApi, context->client_metadata(), 0);
488 int server_responses_to_send = GetIntValueFromMetadata(
489 kServerResponseStreamsToSend, context->client_metadata(),
490 kServerDefaultResponseStreamsToSend);
492 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
493 ServerTryCancel(context);
494 return Status::CANCELLED;
497 EchoResponse response;
498 std::thread* server_try_cancel_thd = nullptr;
499 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
500 server_try_cancel_thd =
501 new std::thread([context] { ServerTryCancel(context); });
504 for (int i = 0; i < server_responses_to_send; i++) {
505 response.set_message(request->message() + grpc::to_string(i));
506 if (i == server_responses_to_send - 1 && server_coalescing_api != 0) {
507 writer->WriteLast(response, WriteOptions());
509 writer->Write(response);
513 if (server_try_cancel_thd != nullptr) {
514 server_try_cancel_thd->join();
515 delete server_try_cancel_thd;
516 return Status::CANCELLED;
519 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
520 ServerTryCancel(context);
521 return Status::CANCELLED;
527 Status TestServiceImpl::BidiStream(
528 ServerContext* context,
529 ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
530 // If server_try_cancel is set in the metadata, the RPC is cancelled by the
531 // server by calling ServerContext::TryCancel() depending on the value:
532 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
533 // writes any messages from/to the client
534 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
535 // reading/writing messages from/to the client
536 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
537 // reads/writes all messages from/to the client
538 int server_try_cancel = GetIntValueFromMetadata(
539 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
542 EchoResponse response;
544 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
545 ServerTryCancel(context);
546 return Status::CANCELLED;
549 std::thread* server_try_cancel_thd = nullptr;
550 if (server_try_cancel == CANCEL_DURING_PROCESSING) {
551 server_try_cancel_thd =
552 new std::thread([context] { ServerTryCancel(context); });
555 // kServerFinishAfterNReads suggests after how many reads, the server should
556 // write the last message and send status (coalesced using WriteLast)
557 int server_write_last = GetIntValueFromMetadata(
558 kServerFinishAfterNReads, context->client_metadata(), 0);
561 while (stream->Read(&request)) {
563 gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
564 response.set_message(request.message());
565 if (read_counts == server_write_last) {
566 stream->WriteLast(response, WriteOptions());
568 stream->Write(response);
572 if (server_try_cancel_thd != nullptr) {
573 server_try_cancel_thd->join();
574 delete server_try_cancel_thd;
575 return Status::CANCELLED;
578 if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
579 ServerTryCancel(context);
580 return Status::CANCELLED;
586 experimental::ServerReadReactor<EchoRequest, EchoResponse>*
587 CallbackTestServiceImpl::RequestStream() {
588 class Reactor : public ::grpc::experimental::ServerReadReactor<EchoRequest,
592 void OnStarted(ServerContext* context, EchoResponse* response) override {
594 response_ = response;
595 // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
596 // the server by calling ServerContext::TryCancel() depending on the
598 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
599 // reads any message from the client CANCEL_DURING_PROCESSING: The RPC
600 // is cancelled while the server is reading messages from the client
601 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
602 // all the messages from the client
603 server_try_cancel_ = GetIntValueFromMetadata(
604 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
606 response_->set_message("");
608 if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) {
609 ServerTryCancelNonblocking(ctx_);
613 if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
615 // Don't wait for it here
618 StartRead(&request_);
620 void OnDone() override { delete this; }
621 void OnCancel() override {
622 EXPECT_TRUE(ctx_->IsCancelled());
623 FinishOnce(Status::CANCELLED);
625 void OnReadDone(bool ok) override {
627 response_->mutable_message()->append(request_.message());
629 StartRead(&request_);
631 gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read_);
633 if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
634 // Let OnCancel recover this
637 if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
638 ServerTryCancelNonblocking(ctx_);
641 FinishOnce(Status::OK);
646 void FinishOnce(const Status& s) {
647 std::lock_guard<std::mutex> l(finish_mu_);
655 EchoResponse* response_;
656 EchoRequest request_;
657 int num_msgs_read_{0};
658 int server_try_cancel_;
659 std::mutex finish_mu_;
660 bool finished_{false};
666 // Return 'kNumResponseStreamMsgs' messages.
667 // TODO(yangg) make it generic by adding a parameter into EchoRequest
668 experimental::ServerWriteReactor<EchoRequest, EchoResponse>*
669 CallbackTestServiceImpl::ResponseStream() {
671 : public ::grpc::experimental::ServerWriteReactor<EchoRequest,
675 void OnStarted(ServerContext* context,
676 const EchoRequest* request) override {
679 // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
680 // the server by calling ServerContext::TryCancel() depending on the
682 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
683 // reads any message from the client CANCEL_DURING_PROCESSING: The RPC
684 // is cancelled while the server is reading messages from the client
685 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
686 // all the messages from the client
687 server_try_cancel_ = GetIntValueFromMetadata(
688 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
689 server_coalescing_api_ = GetIntValueFromMetadata(
690 kServerUseCoalescingApi, context->client_metadata(), 0);
691 server_responses_to_send_ = GetIntValueFromMetadata(
692 kServerResponseStreamsToSend, context->client_metadata(),
693 kServerDefaultResponseStreamsToSend);
694 if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) {
695 ServerTryCancelNonblocking(ctx_);
699 if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
702 if (num_msgs_sent_ < server_responses_to_send_) {
706 void OnDone() override { delete this; }
707 void OnCancel() override {
708 EXPECT_TRUE(ctx_->IsCancelled());
709 FinishOnce(Status::CANCELLED);
711 void OnWriteDone(bool ok) override {
712 if (num_msgs_sent_ < server_responses_to_send_) {
714 } else if (server_coalescing_api_ != 0) {
715 // We would have already done Finish just after the WriteLast
716 } else if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
717 // Let OnCancel recover this
718 } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
719 ServerTryCancelNonblocking(ctx_);
721 FinishOnce(Status::OK);
726 void FinishOnce(const Status& s) {
727 std::lock_guard<std::mutex> l(finish_mu_);
735 response_.set_message(request_->message() +
736 grpc::to_string(num_msgs_sent_));
737 if (num_msgs_sent_ == server_responses_to_send_ - 1 &&
738 server_coalescing_api_ != 0) {
740 StartWriteLast(&response_, WriteOptions());
741 // If we use WriteLast, we shouldn't wait before attempting Finish
742 FinishOnce(Status::OK);
745 StartWrite(&response_);
749 const EchoRequest* request_;
750 EchoResponse response_;
751 int num_msgs_sent_{0};
752 int server_try_cancel_;
753 int server_coalescing_api_;
754 int server_responses_to_send_;
755 std::mutex finish_mu_;
756 bool finished_{false};
761 experimental::ServerBidiReactor<EchoRequest, EchoResponse>*
762 CallbackTestServiceImpl::BidiStream() {
763 class Reactor : public ::grpc::experimental::ServerBidiReactor<EchoRequest,
767 void OnStarted(ServerContext* context) override {
769 // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
770 // the server by calling ServerContext::TryCancel() depending on the
772 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server
773 // reads any message from the client CANCEL_DURING_PROCESSING: The RPC
774 // is cancelled while the server is reading messages from the client
775 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
776 // all the messages from the client
777 server_try_cancel_ = GetIntValueFromMetadata(
778 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
779 server_write_last_ = GetIntValueFromMetadata(
780 kServerFinishAfterNReads, context->client_metadata(), 0);
781 if (server_try_cancel_ == CANCEL_BEFORE_PROCESSING) {
782 ServerTryCancelNonblocking(ctx_);
786 if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
790 StartRead(&request_);
792 void OnDone() override { delete this; }
793 void OnCancel() override {
794 EXPECT_TRUE(ctx_->IsCancelled());
795 FinishOnce(Status::CANCELLED);
797 void OnReadDone(bool ok) override {
800 gpr_log(GPR_INFO, "recv msg %s", request_.message().c_str());
801 response_.set_message(request_.message());
802 if (num_msgs_read_ == server_write_last_) {
803 StartWriteLast(&response_, WriteOptions());
804 // If we use WriteLast, we shouldn't wait before attempting Finish
806 StartWrite(&response_);
811 if (server_try_cancel_ == CANCEL_DURING_PROCESSING) {
812 // Let OnCancel handle this
813 } else if (server_try_cancel_ == CANCEL_AFTER_PROCESSING) {
814 ServerTryCancelNonblocking(ctx_);
816 FinishOnce(Status::OK);
819 void OnWriteDone(bool ok) override {
820 std::lock_guard<std::mutex> l(finish_mu_);
822 StartRead(&request_);
827 void FinishOnce(const Status& s) {
828 std::lock_guard<std::mutex> l(finish_mu_);
836 EchoRequest request_;
837 EchoResponse response_;
838 int num_msgs_read_{0};
839 int server_try_cancel_;
840 int server_write_last_;
841 std::mutex finish_mu_;
842 bool finished_{false};
848 } // namespace testing