3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
21 #include "upb/upb.hpp"
23 #include <grpc/slice.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpcpp/impl/codegen/method_handler.h>
28 #include "src/cpp/server/health/default_health_check_service.h"
29 #include "src/proto/grpc/health/v1/health.upb.h"
30 #include "upb/upb.hpp"
32 #define MAX_SERVICE_NAME_LENGTH 200
37 // DefaultHealthCheckService
40 DefaultHealthCheckService::DefaultHealthCheckService() {
41 services_map_[""].SetServingStatus(SERVING);
44 void DefaultHealthCheckService::SetServingStatus(
45 const std::string& service_name, bool serving) {
46 grpc_core::MutexLock lock(&mu_);
48 // Set to NOT_SERVING in case service_name is not in the map.
51 services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
54 void DefaultHealthCheckService::SetServingStatus(bool serving) {
55 const ServingStatus status = serving ? SERVING : NOT_SERVING;
56 grpc_core::MutexLock lock(&mu_);
60 for (auto& p : services_map_) {
61 ServiceData& service_data = p.second;
62 service_data.SetServingStatus(status);
66 void DefaultHealthCheckService::Shutdown() {
67 grpc_core::MutexLock lock(&mu_);
72 for (auto& p : services_map_) {
73 ServiceData& service_data = p.second;
74 service_data.SetServingStatus(NOT_SERVING);
78 DefaultHealthCheckService::ServingStatus
79 DefaultHealthCheckService::GetServingStatus(
80 const std::string& service_name) const {
81 grpc_core::MutexLock lock(&mu_);
82 auto it = services_map_.find(service_name);
83 if (it == services_map_.end()) {
86 const ServiceData& service_data = it->second;
87 return service_data.GetServingStatus();
90 void DefaultHealthCheckService::RegisterCallHandler(
91 const std::string& service_name,
92 std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
93 grpc_core::MutexLock lock(&mu_);
94 ServiceData& service_data = services_map_[service_name];
95 service_data.AddCallHandler(handler /* copies ref */);
96 HealthCheckServiceImpl::CallHandler* h = handler.get();
97 h->SendHealth(std::move(handler), service_data.GetServingStatus());
100 void DefaultHealthCheckService::UnregisterCallHandler(
101 const std::string& service_name,
102 const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
103 grpc_core::MutexLock lock(&mu_);
104 auto it = services_map_.find(service_name);
105 if (it == services_map_.end()) return;
106 ServiceData& service_data = it->second;
107 service_data.RemoveCallHandler(handler);
108 if (service_data.Unused()) {
109 services_map_.erase(it);
113 DefaultHealthCheckService::HealthCheckServiceImpl*
114 DefaultHealthCheckService::GetHealthCheckService(
115 std::unique_ptr<ServerCompletionQueue> cq) {
116 GPR_ASSERT(impl_ == nullptr);
117 impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
122 // DefaultHealthCheckService::ServiceData
125 void DefaultHealthCheckService::ServiceData::SetServingStatus(
126 ServingStatus status) {
128 for (auto& call_handler : call_handlers_) {
129 call_handler->SendHealth(call_handler /* copies ref */, status);
133 void DefaultHealthCheckService::ServiceData::AddCallHandler(
134 std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
135 call_handlers_.insert(std::move(handler));
138 void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
139 const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
140 call_handlers_.erase(handler);
144 // DefaultHealthCheckService::HealthCheckServiceImpl
148 const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
149 const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
152 DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
153 DefaultHealthCheckService* database,
154 std::unique_ptr<ServerCompletionQueue> cq)
155 : database_(database), cq_(std::move(cq)) {
156 // Add Check() method.
157 AddMethod(new internal::RpcServiceMethod(
158 kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
159 // Add Watch() method.
160 AddMethod(new internal::RpcServiceMethod(
161 kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
162 // Create serving thread.
163 thread_ = std::unique_ptr<::grpc_core::Thread>(
164 new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
167 DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
168 // We will reach here after the server starts shutting down.
171 grpc_core::MutexLock lock(&cq_shutdown_mu_);
177 void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
178 // Request the calls we're interested in.
179 // We do this before starting the serving thread, so that we know it's
180 // done before server startup is complete.
181 CheckCallHandler::CreateAndStart(cq_.get(), database_, this);
182 WatchCallHandler::CreateAndStart(cq_.get(), database_, this);
183 // Start serving thread.
187 void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
188 HealthCheckServiceImpl* service = static_cast<HealthCheckServiceImpl*>(arg);
192 if (!service->cq_->Next(&tag, &ok)) {
193 // The completion queue is shutting down.
194 GPR_ASSERT(service->shutdown_);
197 auto* next_step = static_cast<CallableTag*>(tag);
202 bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
203 const ByteBuffer& request, std::string* service_name) {
204 std::vector<Slice> slices;
205 if (!request.Dump(&slices).ok()) return false;
206 uint8_t* request_bytes = nullptr;
207 size_t request_size = 0;
208 if (slices.size() == 1) {
209 request_bytes = const_cast<uint8_t*>(slices[0].begin());
210 request_size = slices[0].size();
211 } else if (slices.size() > 1) {
212 request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
213 uint8_t* copy_to = request_bytes;
214 for (size_t i = 0; i < slices.size(); i++) {
215 memcpy(copy_to, slices[i].begin(), slices[i].size());
216 copy_to += slices[i].size();
220 grpc_health_v1_HealthCheckRequest* request_struct =
221 grpc_health_v1_HealthCheckRequest_parse(
222 reinterpret_cast<char*>(request_bytes), request_size, arena.ptr());
223 if (slices.size() > 1) {
224 gpr_free(request_bytes);
226 if (request_struct == nullptr) {
229 upb_strview service =
230 grpc_health_v1_HealthCheckRequest_service(request_struct);
231 if (service.size > MAX_SERVICE_NAME_LENGTH) {
234 service_name->assign(service.data, service.size);
238 bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
239 ServingStatus status, ByteBuffer* response) {
241 grpc_health_v1_HealthCheckResponse* response_struct =
242 grpc_health_v1_HealthCheckResponse_new(arena.ptr());
243 grpc_health_v1_HealthCheckResponse_set_status(
246 ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN
247 : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING
248 : grpc_health_v1_HealthCheckResponse_NOT_SERVING);
250 char* buf = grpc_health_v1_HealthCheckResponse_serialize(
251 response_struct, arena.ptr(), &buf_length);
252 if (buf == nullptr) {
255 grpc_slice response_slice = grpc_slice_from_copied_buffer(buf, buf_length);
256 Slice encoded_response(response_slice, Slice::STEAL_REF);
257 ByteBuffer response_buffer(&encoded_response, 1);
258 response->Swap(&response_buffer);
263 // DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
266 void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
267 CreateAndStart(ServerCompletionQueue* cq,
268 DefaultHealthCheckService* database,
269 HealthCheckServiceImpl* service) {
270 std::shared_ptr<CallHandler> self =
271 std::make_shared<CheckCallHandler>(cq, database, service);
272 CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
274 grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
275 if (service->shutdown_) return;
276 // Request a Check() call.
278 CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
279 std::placeholders::_1, std::placeholders::_2),
281 service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
282 &handler->writer_, cq, cq, &handler->next_);
286 DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
287 CheckCallHandler(ServerCompletionQueue* cq,
288 DefaultHealthCheckService* database,
289 HealthCheckServiceImpl* service)
290 : cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
292 void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
293 OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
295 // The value of ok being false means that the server is shutting down.
298 // Spawn a new handler instance to serve the next new client. Every handler
299 // instance will deallocate itself when it's done.
300 CreateAndStart(cq_, database_, service_);
302 gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
304 std::string service_name;
305 grpc::Status status = Status::OK;
307 if (!service_->DecodeRequest(request_, &service_name)) {
308 status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
310 ServingStatus serving_status = database_->GetServingStatus(service_name);
311 if (serving_status == NOT_FOUND) {
312 status = Status(StatusCode::NOT_FOUND, "service name unknown");
313 } else if (!service_->EncodeResponse(serving_status, &response)) {
314 status = Status(StatusCode::INTERNAL, "could not encode response");
319 grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
320 if (!service_->shutdown_) {
322 CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
323 std::placeholders::_1, std::placeholders::_2),
326 writer_.Finish(response, status, &next_);
328 writer_.FinishWithError(status, &next_);
334 void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
335 OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
337 gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
340 self.reset(); // To appease clang-tidy.
344 // DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
347 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
348 CreateAndStart(ServerCompletionQueue* cq,
349 DefaultHealthCheckService* database,
350 HealthCheckServiceImpl* service) {
351 std::shared_ptr<CallHandler> self =
352 std::make_shared<WatchCallHandler>(cq, database, service);
353 WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
355 grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
356 if (service->shutdown_) return;
357 // Request AsyncNotifyWhenDone().
358 handler->on_done_notified_ =
359 CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
360 std::placeholders::_1, std::placeholders::_2),
361 self /* copies ref */);
362 handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
363 // Request a Watch() call.
365 CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
366 std::placeholders::_1, std::placeholders::_2),
368 service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
369 &handler->stream_, cq, cq,
374 DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
375 WatchCallHandler(ServerCompletionQueue* cq,
376 DefaultHealthCheckService* database,
377 HealthCheckServiceImpl* service)
378 : cq_(cq), database_(database), service_(service), stream_(&ctx_) {}
380 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
381 OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
383 // Server shutting down.
385 // AsyncNotifyWhenDone() needs to be called before the call starts, but the
386 // tag will not pop out if the call never starts (
387 // https://github.com/grpc/grpc/issues/10136). So we need to manually
388 // release the ownership of the handler in this case.
389 GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
392 // Spawn a new handler instance to serve the next new client. Every handler
393 // instance will deallocate itself when it's done.
394 CreateAndStart(cq_, database_, service_);
396 if (!service_->DecodeRequest(request_, &service_name_)) {
397 SendFinish(std::move(self),
398 Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
401 // Register the call for updates to the service.
403 "[HCS %p] Health watch started for service \"%s\" (handler: %p)",
404 service_, service_name_.c_str(), this);
405 database_->RegisterCallHandler(service_name_, std::move(self));
408 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
409 SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
410 grpc_core::MutexLock lock(&send_mu_);
411 // If there's already a send in flight, cache the new status, and
412 // we'll start a new send for it when the one in flight completes.
413 if (send_in_flight_) {
414 pending_status_ = status;
418 SendHealthLocked(std::move(self), status);
421 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
422 SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
423 send_in_flight_ = true;
424 // Construct response.
426 bool success = service_->EncodeResponse(status, &response);
427 // Grab shutdown lock and send response.
428 grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
429 if (service_->shutdown_) {
430 SendFinishLocked(std::move(self), Status::CANCELLED);
434 SendFinishLocked(std::move(self),
435 Status(StatusCode::INTERNAL, "could not encode response"));
438 next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
439 std::placeholders::_1, std::placeholders::_2),
441 stream_.Write(response, &next_);
444 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
445 OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
447 SendFinish(std::move(self), Status::CANCELLED);
450 grpc_core::MutexLock lock(&send_mu_);
451 send_in_flight_ = false;
452 // If we got a new status since we started the last send, start a
454 if (pending_status_ != NOT_FOUND) {
455 auto status = pending_status_;
456 pending_status_ = NOT_FOUND;
457 SendHealthLocked(std::move(self), status);
461 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
462 SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
463 if (finish_called_) return;
464 grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
465 if (service_->shutdown_) return;
466 SendFinishLocked(std::move(self), status);
469 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
470 SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
472 CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
473 std::placeholders::_1, std::placeholders::_2),
475 stream_.Finish(status, &on_finish_done_);
476 finish_called_ = true;
479 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
480 OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
483 "[HCS %p] Health watch call finished (service_name: \"%s\", "
485 service_, service_name_.c_str(), this);
487 self.reset(); // To appease clang-tidy.
490 // TODO(roth): This method currently assumes that there will be only one
491 // thread polling the cq and invoking the corresponding callbacks. If
492 // that changes, we will need to add synchronization here.
493 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
494 OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
497 "[HCS %p] Health watch call is notified done (handler: %p, "
498 "is_cancelled: %d).",
499 service_, this, static_cast<int>(ctx_.IsCancelled()));
500 database_->UnregisterCallHandler(service_name_, self);
501 SendFinish(std::move(self), Status::CANCELLED);