3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
22 #include <grpc/slice.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25 #include <grpcpp/impl/codegen/method_handler_impl.h>
27 #include "pb_decode.h"
28 #include "pb_encode.h"
29 #include "src/core/ext/filters/client_channel/health/health.pb.h"
30 #include "src/cpp/server/health/default_health_check_service.h"
35 // DefaultHealthCheckService
38 DefaultHealthCheckService::DefaultHealthCheckService() {
39 services_map_[""].SetServingStatus(SERVING);
42 void DefaultHealthCheckService::SetServingStatus(
43 const grpc::string& service_name, bool serving) {
44 grpc_core::MutexLock lock(&mu_);
46 // Set to NOT_SERVING in case service_name is not in the map.
49 services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
52 void DefaultHealthCheckService::SetServingStatus(bool serving) {
53 const ServingStatus status = serving ? SERVING : NOT_SERVING;
54 grpc_core::MutexLock lock(&mu_);
58 for (auto& p : services_map_) {
59 ServiceData& service_data = p.second;
60 service_data.SetServingStatus(status);
64 void DefaultHealthCheckService::Shutdown() {
65 grpc_core::MutexLock lock(&mu_);
70 for (auto& p : services_map_) {
71 ServiceData& service_data = p.second;
72 service_data.SetServingStatus(NOT_SERVING);
76 DefaultHealthCheckService::ServingStatus
77 DefaultHealthCheckService::GetServingStatus(
78 const grpc::string& service_name) const {
79 grpc_core::MutexLock lock(&mu_);
80 auto it = services_map_.find(service_name);
81 if (it == services_map_.end()) {
84 const ServiceData& service_data = it->second;
85 return service_data.GetServingStatus();
88 void DefaultHealthCheckService::RegisterCallHandler(
89 const grpc::string& service_name,
90 std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
91 grpc_core::MutexLock lock(&mu_);
92 ServiceData& service_data = services_map_[service_name];
93 service_data.AddCallHandler(handler /* copies ref */);
94 HealthCheckServiceImpl::CallHandler* h = handler.get();
95 h->SendHealth(std::move(handler), service_data.GetServingStatus());
98 void DefaultHealthCheckService::UnregisterCallHandler(
99 const grpc::string& service_name,
100 const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
101 grpc_core::MutexLock lock(&mu_);
102 auto it = services_map_.find(service_name);
103 if (it == services_map_.end()) return;
104 ServiceData& service_data = it->second;
105 service_data.RemoveCallHandler(handler);
106 if (service_data.Unused()) {
107 services_map_.erase(it);
111 DefaultHealthCheckService::HealthCheckServiceImpl*
112 DefaultHealthCheckService::GetHealthCheckService(
113 std::unique_ptr<ServerCompletionQueue> cq) {
114 GPR_ASSERT(impl_ == nullptr);
115 impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
120 // DefaultHealthCheckService::ServiceData
123 void DefaultHealthCheckService::ServiceData::SetServingStatus(
124 ServingStatus status) {
126 for (auto& call_handler : call_handlers_) {
127 call_handler->SendHealth(call_handler /* copies ref */, status);
131 void DefaultHealthCheckService::ServiceData::AddCallHandler(
132 std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
133 call_handlers_.insert(std::move(handler));
136 void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
137 const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
138 call_handlers_.erase(handler);
142 // DefaultHealthCheckService::HealthCheckServiceImpl
146 const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
147 const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
150 DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
151 DefaultHealthCheckService* database,
152 std::unique_ptr<ServerCompletionQueue> cq)
153 : database_(database), cq_(std::move(cq)) {
154 // Add Check() method.
155 AddMethod(new internal::RpcServiceMethod(
156 kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
157 // Add Watch() method.
158 AddMethod(new internal::RpcServiceMethod(
159 kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
160 // Create serving thread.
161 thread_ = std::unique_ptr<::grpc_core::Thread>(
162 new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
165 DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
166 // We will reach here after the server starts shutting down.
169 grpc_core::MutexLock lock(&cq_shutdown_mu_);
175 void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
176 // Request the calls we're interested in.
177 // We do this before starting the serving thread, so that we know it's
178 // done before server startup is complete.
179 CheckCallHandler::CreateAndStart(cq_.get(), database_, this);
180 WatchCallHandler::CreateAndStart(cq_.get(), database_, this);
181 // Start serving thread.
185 void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
186 HealthCheckServiceImpl* service =
187 reinterpret_cast<HealthCheckServiceImpl*>(arg);
191 if (!service->cq_->Next(&tag, &ok)) {
192 // The completion queue is shutting down.
193 GPR_ASSERT(service->shutdown_);
196 auto* next_step = static_cast<CallableTag*>(tag);
201 bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
202 const ByteBuffer& request, grpc::string* service_name) {
203 std::vector<Slice> slices;
204 if (!request.Dump(&slices).ok()) return false;
205 uint8_t* request_bytes = nullptr;
206 size_t request_size = 0;
207 grpc_health_v1_HealthCheckRequest request_struct;
208 request_struct.has_service = false;
209 if (slices.size() == 1) {
210 request_bytes = const_cast<uint8_t*>(slices[0].begin());
211 request_size = slices[0].size();
212 } else if (slices.size() > 1) {
213 request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
214 uint8_t* copy_to = request_bytes;
215 for (size_t i = 0; i < slices.size(); i++) {
216 memcpy(copy_to, slices[i].begin(), slices[i].size());
217 copy_to += slices[i].size();
220 pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size);
221 bool decode_status = pb_decode(
222 &istream, grpc_health_v1_HealthCheckRequest_fields, &request_struct);
223 if (slices.size() > 1) {
224 gpr_free(request_bytes);
226 if (!decode_status) return false;
227 *service_name = request_struct.has_service ? request_struct.service : "";
231 bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
232 ServingStatus status, ByteBuffer* response) {
233 grpc_health_v1_HealthCheckResponse response_struct;
234 response_struct.has_status = true;
235 response_struct.status =
237 ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN
239 ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
240 : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
241 pb_ostream_t ostream;
242 memset(&ostream, 0, sizeof(ostream));
243 pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields,
245 grpc_slice response_slice = grpc_slice_malloc(ostream.bytes_written);
246 ostream = pb_ostream_from_buffer(GRPC_SLICE_START_PTR(response_slice),
247 GRPC_SLICE_LENGTH(response_slice));
248 bool encode_status = pb_encode(
249 &ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct);
250 if (!encode_status) return false;
251 Slice encoded_response(response_slice, Slice::STEAL_REF);
252 ByteBuffer response_buffer(&encoded_response, 1);
253 response->Swap(&response_buffer);
258 // DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
261 void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
262 CreateAndStart(ServerCompletionQueue* cq,
263 DefaultHealthCheckService* database,
264 HealthCheckServiceImpl* service) {
265 std::shared_ptr<CallHandler> self =
266 std::make_shared<CheckCallHandler>(cq, database, service);
267 CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
269 grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
270 if (service->shutdown_) return;
271 // Request a Check() call.
273 CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
274 std::placeholders::_1, std::placeholders::_2),
276 service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
277 &handler->writer_, cq, cq, &handler->next_);
281 DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
282 CheckCallHandler(ServerCompletionQueue* cq,
283 DefaultHealthCheckService* database,
284 HealthCheckServiceImpl* service)
285 : cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
287 void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
288 OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
290 // The value of ok being false means that the server is shutting down.
293 // Spawn a new handler instance to serve the next new client. Every handler
294 // instance will deallocate itself when it's done.
295 CreateAndStart(cq_, database_, service_);
297 gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
299 grpc::string service_name;
300 grpc::Status status = Status::OK;
302 if (!service_->DecodeRequest(request_, &service_name)) {
303 status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
305 ServingStatus serving_status = database_->GetServingStatus(service_name);
306 if (serving_status == NOT_FOUND) {
307 status = Status(StatusCode::NOT_FOUND, "service name unknown");
308 } else if (!service_->EncodeResponse(serving_status, &response)) {
309 status = Status(StatusCode::INTERNAL, "could not encode response");
314 grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
315 if (!service_->shutdown_) {
317 CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
318 std::placeholders::_1, std::placeholders::_2),
321 writer_.Finish(response, status, &next_);
323 writer_.FinishWithError(status, &next_);
329 void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
330 OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
332 gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
335 self.reset(); // To appease clang-tidy.
339 // DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
342 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
343 CreateAndStart(ServerCompletionQueue* cq,
344 DefaultHealthCheckService* database,
345 HealthCheckServiceImpl* service) {
346 std::shared_ptr<CallHandler> self =
347 std::make_shared<WatchCallHandler>(cq, database, service);
348 WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
350 grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
351 if (service->shutdown_) return;
352 // Request AsyncNotifyWhenDone().
353 handler->on_done_notified_ =
354 CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
355 std::placeholders::_1, std::placeholders::_2),
356 self /* copies ref */);
357 handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
358 // Request a Watch() call.
360 CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
361 std::placeholders::_1, std::placeholders::_2),
363 service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
364 &handler->stream_, cq, cq,
369 DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
370 WatchCallHandler(ServerCompletionQueue* cq,
371 DefaultHealthCheckService* database,
372 HealthCheckServiceImpl* service)
373 : cq_(cq), database_(database), service_(service), stream_(&ctx_) {}
375 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
376 OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
378 // Server shutting down.
380 // AsyncNotifyWhenDone() needs to be called before the call starts, but the
381 // tag will not pop out if the call never starts (
382 // https://github.com/grpc/grpc/issues/10136). So we need to manually
383 // release the ownership of the handler in this case.
384 GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
387 // Spawn a new handler instance to serve the next new client. Every handler
388 // instance will deallocate itself when it's done.
389 CreateAndStart(cq_, database_, service_);
391 if (!service_->DecodeRequest(request_, &service_name_)) {
392 SendFinish(std::move(self),
393 Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
396 // Register the call for updates to the service.
398 "[HCS %p] Health watch started for service \"%s\" (handler: %p)",
399 service_, service_name_.c_str(), this);
400 database_->RegisterCallHandler(service_name_, std::move(self));
403 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
404 SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
405 grpc_core::MutexLock lock(&send_mu_);
406 // If there's already a send in flight, cache the new status, and
407 // we'll start a new send for it when the one in flight completes.
408 if (send_in_flight_) {
409 pending_status_ = status;
413 SendHealthLocked(std::move(self), status);
416 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
417 SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
418 send_in_flight_ = true;
419 // Construct response.
421 bool success = service_->EncodeResponse(status, &response);
422 // Grab shutdown lock and send response.
423 grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
424 if (service_->shutdown_) {
425 SendFinishLocked(std::move(self), Status::CANCELLED);
429 SendFinishLocked(std::move(self),
430 Status(StatusCode::INTERNAL, "could not encode response"));
433 next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
434 std::placeholders::_1, std::placeholders::_2),
436 stream_.Write(response, &next_);
439 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
440 OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
442 SendFinish(std::move(self), Status::CANCELLED);
445 grpc_core::MutexLock lock(&send_mu_);
446 send_in_flight_ = false;
447 // If we got a new status since we started the last send, start a
449 if (pending_status_ != NOT_FOUND) {
450 auto status = pending_status_;
451 pending_status_ = NOT_FOUND;
452 SendHealthLocked(std::move(self), status);
456 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
457 SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
458 if (finish_called_) return;
459 grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
460 if (service_->shutdown_) return;
461 SendFinishLocked(std::move(self), status);
464 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
465 SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
467 CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
468 std::placeholders::_1, std::placeholders::_2),
470 stream_.Finish(status, &on_finish_done_);
471 finish_called_ = true;
474 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
475 OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
478 "[HCS %p] Health watch call finished (service_name: \"%s\", "
480 service_, service_name_.c_str(), this);
482 self.reset(); // To appease clang-tidy.
485 // TODO(roth): This method currently assumes that there will be only one
486 // thread polling the cq and invoking the corresponding callbacks. If
487 // that changes, we will need to add synchronization here.
488 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
489 OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
492 "[HCS %p] Health watch call is notified done (handler: %p, "
493 "is_cancelled: %d).",
494 service_, this, static_cast<int>(ctx_.IsCancelled()));
495 database_->UnregisterCallHandler(service_name_, self);
496 SendFinish(std::move(self), Status::CANCELLED);