Imported Upstream version 1.21.0
[platform/upstream/grpc.git] / src / cpp / server / health / default_health_check_service.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <memory>
20 #include <mutex>
21
22 #include <grpc/slice.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25 #include <grpcpp/impl/codegen/method_handler_impl.h>
26
27 #include "pb_decode.h"
28 #include "pb_encode.h"
29 #include "src/core/ext/filters/client_channel/health/health.pb.h"
30 #include "src/cpp/server/health/default_health_check_service.h"
31
32 namespace grpc {
33
34 //
35 // DefaultHealthCheckService
36 //
37
38 DefaultHealthCheckService::DefaultHealthCheckService() {
39   services_map_[""].SetServingStatus(SERVING);
40 }
41
42 void DefaultHealthCheckService::SetServingStatus(
43     const grpc::string& service_name, bool serving) {
44   grpc_core::MutexLock lock(&mu_);
45   if (shutdown_) {
46     // Set to NOT_SERVING in case service_name is not in the map.
47     serving = false;
48   }
49   services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
50 }
51
52 void DefaultHealthCheckService::SetServingStatus(bool serving) {
53   const ServingStatus status = serving ? SERVING : NOT_SERVING;
54   grpc_core::MutexLock lock(&mu_);
55   if (shutdown_) {
56     return;
57   }
58   for (auto& p : services_map_) {
59     ServiceData& service_data = p.second;
60     service_data.SetServingStatus(status);
61   }
62 }
63
64 void DefaultHealthCheckService::Shutdown() {
65   grpc_core::MutexLock lock(&mu_);
66   if (shutdown_) {
67     return;
68   }
69   shutdown_ = true;
70   for (auto& p : services_map_) {
71     ServiceData& service_data = p.second;
72     service_data.SetServingStatus(NOT_SERVING);
73   }
74 }
75
76 DefaultHealthCheckService::ServingStatus
77 DefaultHealthCheckService::GetServingStatus(
78     const grpc::string& service_name) const {
79   grpc_core::MutexLock lock(&mu_);
80   auto it = services_map_.find(service_name);
81   if (it == services_map_.end()) {
82     return NOT_FOUND;
83   }
84   const ServiceData& service_data = it->second;
85   return service_data.GetServingStatus();
86 }
87
88 void DefaultHealthCheckService::RegisterCallHandler(
89     const grpc::string& service_name,
90     std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
91   grpc_core::MutexLock lock(&mu_);
92   ServiceData& service_data = services_map_[service_name];
93   service_data.AddCallHandler(handler /* copies ref */);
94   HealthCheckServiceImpl::CallHandler* h = handler.get();
95   h->SendHealth(std::move(handler), service_data.GetServingStatus());
96 }
97
98 void DefaultHealthCheckService::UnregisterCallHandler(
99     const grpc::string& service_name,
100     const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
101   grpc_core::MutexLock lock(&mu_);
102   auto it = services_map_.find(service_name);
103   if (it == services_map_.end()) return;
104   ServiceData& service_data = it->second;
105   service_data.RemoveCallHandler(handler);
106   if (service_data.Unused()) {
107     services_map_.erase(it);
108   }
109 }
110
111 DefaultHealthCheckService::HealthCheckServiceImpl*
112 DefaultHealthCheckService::GetHealthCheckService(
113     std::unique_ptr<ServerCompletionQueue> cq) {
114   GPR_ASSERT(impl_ == nullptr);
115   impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
116   return impl_.get();
117 }
118
119 //
120 // DefaultHealthCheckService::ServiceData
121 //
122
123 void DefaultHealthCheckService::ServiceData::SetServingStatus(
124     ServingStatus status) {
125   status_ = status;
126   for (auto& call_handler : call_handlers_) {
127     call_handler->SendHealth(call_handler /* copies ref */, status);
128   }
129 }
130
131 void DefaultHealthCheckService::ServiceData::AddCallHandler(
132     std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
133   call_handlers_.insert(std::move(handler));
134 }
135
136 void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
137     const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
138   call_handlers_.erase(handler);
139 }
140
141 //
142 // DefaultHealthCheckService::HealthCheckServiceImpl
143 //
144
145 namespace {
146 const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
147 const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
148 }  // namespace
149
150 DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
151     DefaultHealthCheckService* database,
152     std::unique_ptr<ServerCompletionQueue> cq)
153     : database_(database), cq_(std::move(cq)) {
154   // Add Check() method.
155   AddMethod(new internal::RpcServiceMethod(
156       kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
157   // Add Watch() method.
158   AddMethod(new internal::RpcServiceMethod(
159       kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
160   // Create serving thread.
161   thread_ = std::unique_ptr<::grpc_core::Thread>(
162       new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
163 }
164
165 DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
166   // We will reach here after the server starts shutting down.
167   shutdown_ = true;
168   {
169     grpc_core::MutexLock lock(&cq_shutdown_mu_);
170     cq_->Shutdown();
171   }
172   thread_->Join();
173 }
174
175 void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
176   // Request the calls we're interested in.
177   // We do this before starting the serving thread, so that we know it's
178   // done before server startup is complete.
179   CheckCallHandler::CreateAndStart(cq_.get(), database_, this);
180   WatchCallHandler::CreateAndStart(cq_.get(), database_, this);
181   // Start serving thread.
182   thread_->Start();
183 }
184
185 void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
186   HealthCheckServiceImpl* service =
187       reinterpret_cast<HealthCheckServiceImpl*>(arg);
188   void* tag;
189   bool ok;
190   while (true) {
191     if (!service->cq_->Next(&tag, &ok)) {
192       // The completion queue is shutting down.
193       GPR_ASSERT(service->shutdown_);
194       break;
195     }
196     auto* next_step = static_cast<CallableTag*>(tag);
197     next_step->Run(ok);
198   }
199 }
200
201 bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
202     const ByteBuffer& request, grpc::string* service_name) {
203   std::vector<Slice> slices;
204   if (!request.Dump(&slices).ok()) return false;
205   uint8_t* request_bytes = nullptr;
206   size_t request_size = 0;
207   grpc_health_v1_HealthCheckRequest request_struct;
208   request_struct.has_service = false;
209   if (slices.size() == 1) {
210     request_bytes = const_cast<uint8_t*>(slices[0].begin());
211     request_size = slices[0].size();
212   } else if (slices.size() > 1) {
213     request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
214     uint8_t* copy_to = request_bytes;
215     for (size_t i = 0; i < slices.size(); i++) {
216       memcpy(copy_to, slices[i].begin(), slices[i].size());
217       copy_to += slices[i].size();
218     }
219   }
220   pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size);
221   bool decode_status = pb_decode(
222       &istream, grpc_health_v1_HealthCheckRequest_fields, &request_struct);
223   if (slices.size() > 1) {
224     gpr_free(request_bytes);
225   }
226   if (!decode_status) return false;
227   *service_name = request_struct.has_service ? request_struct.service : "";
228   return true;
229 }
230
231 bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
232     ServingStatus status, ByteBuffer* response) {
233   grpc_health_v1_HealthCheckResponse response_struct;
234   response_struct.has_status = true;
235   response_struct.status =
236       status == NOT_FOUND
237           ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN
238           : status == SERVING
239                 ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
240                 : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
241   pb_ostream_t ostream;
242   memset(&ostream, 0, sizeof(ostream));
243   pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields,
244             &response_struct);
245   grpc_slice response_slice = grpc_slice_malloc(ostream.bytes_written);
246   ostream = pb_ostream_from_buffer(GRPC_SLICE_START_PTR(response_slice),
247                                    GRPC_SLICE_LENGTH(response_slice));
248   bool encode_status = pb_encode(
249       &ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct);
250   if (!encode_status) return false;
251   Slice encoded_response(response_slice, Slice::STEAL_REF);
252   ByteBuffer response_buffer(&encoded_response, 1);
253   response->Swap(&response_buffer);
254   return true;
255 }
256
257 //
258 // DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
259 //
260
261 void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
262     CreateAndStart(ServerCompletionQueue* cq,
263                    DefaultHealthCheckService* database,
264                    HealthCheckServiceImpl* service) {
265   std::shared_ptr<CallHandler> self =
266       std::make_shared<CheckCallHandler>(cq, database, service);
267   CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
268   {
269     grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
270     if (service->shutdown_) return;
271     // Request a Check() call.
272     handler->next_ =
273         CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
274                               std::placeholders::_1, std::placeholders::_2),
275                     std::move(self));
276     service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
277                                &handler->writer_, cq, cq, &handler->next_);
278   }
279 }
280
281 DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
282     CheckCallHandler(ServerCompletionQueue* cq,
283                      DefaultHealthCheckService* database,
284                      HealthCheckServiceImpl* service)
285     : cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
286
287 void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
288     OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
289   if (!ok) {
290     // The value of ok being false means that the server is shutting down.
291     return;
292   }
293   // Spawn a new handler instance to serve the next new client. Every handler
294   // instance will deallocate itself when it's done.
295   CreateAndStart(cq_, database_, service_);
296   // Process request.
297   gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
298           this);
299   grpc::string service_name;
300   grpc::Status status = Status::OK;
301   ByteBuffer response;
302   if (!service_->DecodeRequest(request_, &service_name)) {
303     status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
304   } else {
305     ServingStatus serving_status = database_->GetServingStatus(service_name);
306     if (serving_status == NOT_FOUND) {
307       status = Status(StatusCode::NOT_FOUND, "service name unknown");
308     } else if (!service_->EncodeResponse(serving_status, &response)) {
309       status = Status(StatusCode::INTERNAL, "could not encode response");
310     }
311   }
312   // Send response.
313   {
314     grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
315     if (!service_->shutdown_) {
316       next_ =
317           CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
318                                 std::placeholders::_1, std::placeholders::_2),
319                       std::move(self));
320       if (status.ok()) {
321         writer_.Finish(response, status, &next_);
322       } else {
323         writer_.FinishWithError(status, &next_);
324       }
325     }
326   }
327 }
328
329 void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
330     OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
331   if (ok) {
332     gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
333             service_, this);
334   }
335   self.reset();  // To appease clang-tidy.
336 }
337
338 //
339 // DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
340 //
341
342 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
343     CreateAndStart(ServerCompletionQueue* cq,
344                    DefaultHealthCheckService* database,
345                    HealthCheckServiceImpl* service) {
346   std::shared_ptr<CallHandler> self =
347       std::make_shared<WatchCallHandler>(cq, database, service);
348   WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
349   {
350     grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
351     if (service->shutdown_) return;
352     // Request AsyncNotifyWhenDone().
353     handler->on_done_notified_ =
354         CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
355                               std::placeholders::_1, std::placeholders::_2),
356                     self /* copies ref */);
357     handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
358     // Request a Watch() call.
359     handler->next_ =
360         CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
361                               std::placeholders::_1, std::placeholders::_2),
362                     std::move(self));
363     service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
364                                          &handler->stream_, cq, cq,
365                                          &handler->next_);
366   }
367 }
368
369 DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
370     WatchCallHandler(ServerCompletionQueue* cq,
371                      DefaultHealthCheckService* database,
372                      HealthCheckServiceImpl* service)
373     : cq_(cq), database_(database), service_(service), stream_(&ctx_) {}
374
375 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
376     OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
377   if (!ok) {
378     // Server shutting down.
379     //
380     // AsyncNotifyWhenDone() needs to be called before the call starts, but the
381     // tag will not pop out if the call never starts (
382     // https://github.com/grpc/grpc/issues/10136). So we need to manually
383     // release the ownership of the handler in this case.
384     GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
385     return;
386   }
387   // Spawn a new handler instance to serve the next new client. Every handler
388   // instance will deallocate itself when it's done.
389   CreateAndStart(cq_, database_, service_);
390   // Parse request.
391   if (!service_->DecodeRequest(request_, &service_name_)) {
392     SendFinish(std::move(self),
393                Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
394     return;
395   }
396   // Register the call for updates to the service.
397   gpr_log(GPR_DEBUG,
398           "[HCS %p] Health watch started for service \"%s\" (handler: %p)",
399           service_, service_name_.c_str(), this);
400   database_->RegisterCallHandler(service_name_, std::move(self));
401 }
402
403 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
404     SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
405   grpc_core::MutexLock lock(&send_mu_);
406   // If there's already a send in flight, cache the new status, and
407   // we'll start a new send for it when the one in flight completes.
408   if (send_in_flight_) {
409     pending_status_ = status;
410     return;
411   }
412   // Start a send.
413   SendHealthLocked(std::move(self), status);
414 }
415
416 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
417     SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
418   send_in_flight_ = true;
419   // Construct response.
420   ByteBuffer response;
421   bool success = service_->EncodeResponse(status, &response);
422   // Grab shutdown lock and send response.
423   grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
424   if (service_->shutdown_) {
425     SendFinishLocked(std::move(self), Status::CANCELLED);
426     return;
427   }
428   if (!success) {
429     SendFinishLocked(std::move(self),
430                      Status(StatusCode::INTERNAL, "could not encode response"));
431     return;
432   }
433   next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
434                                 std::placeholders::_1, std::placeholders::_2),
435                       std::move(self));
436   stream_.Write(response, &next_);
437 }
438
439 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
440     OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
441   if (!ok) {
442     SendFinish(std::move(self), Status::CANCELLED);
443     return;
444   }
445   grpc_core::MutexLock lock(&send_mu_);
446   send_in_flight_ = false;
447   // If we got a new status since we started the last send, start a
448   // new send for it.
449   if (pending_status_ != NOT_FOUND) {
450     auto status = pending_status_;
451     pending_status_ = NOT_FOUND;
452     SendHealthLocked(std::move(self), status);
453   }
454 }
455
456 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
457     SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
458   if (finish_called_) return;
459   grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
460   if (service_->shutdown_) return;
461   SendFinishLocked(std::move(self), status);
462 }
463
464 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
465     SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
466   on_finish_done_ =
467       CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
468                             std::placeholders::_1, std::placeholders::_2),
469                   std::move(self));
470   stream_.Finish(status, &on_finish_done_);
471   finish_called_ = true;
472 }
473
474 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
475     OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
476   if (ok) {
477     gpr_log(GPR_DEBUG,
478             "[HCS %p] Health watch call finished (service_name: \"%s\", "
479             "handler: %p).",
480             service_, service_name_.c_str(), this);
481   }
482   self.reset();  // To appease clang-tidy.
483 }
484
485 // TODO(roth): This method currently assumes that there will be only one
486 // thread polling the cq and invoking the corresponding callbacks.  If
487 // that changes, we will need to add synchronization here.
488 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
489     OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
490   GPR_ASSERT(ok);
491   gpr_log(GPR_DEBUG,
492           "[HCS %p] Health watch call is notified done (handler: %p, "
493           "is_cancelled: %d).",
494           service_, this, static_cast<int>(ctx_.IsCancelled()));
495   database_->UnregisterCallHandler(service_name_, self);
496   SendFinish(std::move(self), Status::CANCELLED);
497 }
498
499 }  // namespace grpc