Imported Upstream version 1.33.1
[platform/upstream/grpc.git] / src / cpp / server / health / default_health_check_service.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <memory>
20
21 #include "upb/upb.hpp"
22
23 #include <grpc/slice.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpcpp/impl/codegen/method_handler.h>
27
28 #include "src/cpp/server/health/default_health_check_service.h"
29 #include "src/proto/grpc/health/v1/health.upb.h"
30 #include "upb/upb.hpp"
31
32 #define MAX_SERVICE_NAME_LENGTH 200
33
34 namespace grpc {
35
36 //
37 // DefaultHealthCheckService
38 //
39
40 DefaultHealthCheckService::DefaultHealthCheckService() {
41   services_map_[""].SetServingStatus(SERVING);
42 }
43
44 void DefaultHealthCheckService::SetServingStatus(
45     const std::string& service_name, bool serving) {
46   grpc_core::MutexLock lock(&mu_);
47   if (shutdown_) {
48     // Set to NOT_SERVING in case service_name is not in the map.
49     serving = false;
50   }
51   services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
52 }
53
54 void DefaultHealthCheckService::SetServingStatus(bool serving) {
55   const ServingStatus status = serving ? SERVING : NOT_SERVING;
56   grpc_core::MutexLock lock(&mu_);
57   if (shutdown_) {
58     return;
59   }
60   for (auto& p : services_map_) {
61     ServiceData& service_data = p.second;
62     service_data.SetServingStatus(status);
63   }
64 }
65
66 void DefaultHealthCheckService::Shutdown() {
67   grpc_core::MutexLock lock(&mu_);
68   if (shutdown_) {
69     return;
70   }
71   shutdown_ = true;
72   for (auto& p : services_map_) {
73     ServiceData& service_data = p.second;
74     service_data.SetServingStatus(NOT_SERVING);
75   }
76 }
77
78 DefaultHealthCheckService::ServingStatus
79 DefaultHealthCheckService::GetServingStatus(
80     const std::string& service_name) const {
81   grpc_core::MutexLock lock(&mu_);
82   auto it = services_map_.find(service_name);
83   if (it == services_map_.end()) {
84     return NOT_FOUND;
85   }
86   const ServiceData& service_data = it->second;
87   return service_data.GetServingStatus();
88 }
89
90 void DefaultHealthCheckService::RegisterCallHandler(
91     const std::string& service_name,
92     std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
93   grpc_core::MutexLock lock(&mu_);
94   ServiceData& service_data = services_map_[service_name];
95   service_data.AddCallHandler(handler /* copies ref */);
96   HealthCheckServiceImpl::CallHandler* h = handler.get();
97   h->SendHealth(std::move(handler), service_data.GetServingStatus());
98 }
99
100 void DefaultHealthCheckService::UnregisterCallHandler(
101     const std::string& service_name,
102     const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
103   grpc_core::MutexLock lock(&mu_);
104   auto it = services_map_.find(service_name);
105   if (it == services_map_.end()) return;
106   ServiceData& service_data = it->second;
107   service_data.RemoveCallHandler(handler);
108   if (service_data.Unused()) {
109     services_map_.erase(it);
110   }
111 }
112
113 DefaultHealthCheckService::HealthCheckServiceImpl*
114 DefaultHealthCheckService::GetHealthCheckService(
115     std::unique_ptr<ServerCompletionQueue> cq) {
116   GPR_ASSERT(impl_ == nullptr);
117   impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
118   return impl_.get();
119 }
120
121 //
122 // DefaultHealthCheckService::ServiceData
123 //
124
125 void DefaultHealthCheckService::ServiceData::SetServingStatus(
126     ServingStatus status) {
127   status_ = status;
128   for (auto& call_handler : call_handlers_) {
129     call_handler->SendHealth(call_handler /* copies ref */, status);
130   }
131 }
132
133 void DefaultHealthCheckService::ServiceData::AddCallHandler(
134     std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
135   call_handlers_.insert(std::move(handler));
136 }
137
138 void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
139     const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
140   call_handlers_.erase(handler);
141 }
142
143 //
144 // DefaultHealthCheckService::HealthCheckServiceImpl
145 //
146
147 namespace {
148 const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
149 const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
150 }  // namespace
151
152 DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
153     DefaultHealthCheckService* database,
154     std::unique_ptr<ServerCompletionQueue> cq)
155     : database_(database), cq_(std::move(cq)) {
156   // Add Check() method.
157   AddMethod(new internal::RpcServiceMethod(
158       kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
159   // Add Watch() method.
160   AddMethod(new internal::RpcServiceMethod(
161       kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
162   // Create serving thread.
163   thread_ = std::unique_ptr<::grpc_core::Thread>(
164       new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
165 }
166
167 DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
168   // We will reach here after the server starts shutting down.
169   shutdown_ = true;
170   {
171     grpc_core::MutexLock lock(&cq_shutdown_mu_);
172     cq_->Shutdown();
173   }
174   thread_->Join();
175 }
176
177 void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
178   // Request the calls we're interested in.
179   // We do this before starting the serving thread, so that we know it's
180   // done before server startup is complete.
181   CheckCallHandler::CreateAndStart(cq_.get(), database_, this);
182   WatchCallHandler::CreateAndStart(cq_.get(), database_, this);
183   // Start serving thread.
184   thread_->Start();
185 }
186
187 void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
188   HealthCheckServiceImpl* service = static_cast<HealthCheckServiceImpl*>(arg);
189   void* tag;
190   bool ok;
191   while (true) {
192     if (!service->cq_->Next(&tag, &ok)) {
193       // The completion queue is shutting down.
194       GPR_ASSERT(service->shutdown_);
195       break;
196     }
197     auto* next_step = static_cast<CallableTag*>(tag);
198     next_step->Run(ok);
199   }
200 }
201
202 bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
203     const ByteBuffer& request, std::string* service_name) {
204   std::vector<Slice> slices;
205   if (!request.Dump(&slices).ok()) return false;
206   uint8_t* request_bytes = nullptr;
207   size_t request_size = 0;
208   if (slices.size() == 1) {
209     request_bytes = const_cast<uint8_t*>(slices[0].begin());
210     request_size = slices[0].size();
211   } else if (slices.size() > 1) {
212     request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
213     uint8_t* copy_to = request_bytes;
214     for (size_t i = 0; i < slices.size(); i++) {
215       memcpy(copy_to, slices[i].begin(), slices[i].size());
216       copy_to += slices[i].size();
217     }
218   }
219   upb::Arena arena;
220   grpc_health_v1_HealthCheckRequest* request_struct =
221       grpc_health_v1_HealthCheckRequest_parse(
222           reinterpret_cast<char*>(request_bytes), request_size, arena.ptr());
223   if (slices.size() > 1) {
224     gpr_free(request_bytes);
225   }
226   if (request_struct == nullptr) {
227     return false;
228   }
229   upb_strview service =
230       grpc_health_v1_HealthCheckRequest_service(request_struct);
231   if (service.size > MAX_SERVICE_NAME_LENGTH) {
232     return false;
233   }
234   service_name->assign(service.data, service.size);
235   return true;
236 }
237
238 bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
239     ServingStatus status, ByteBuffer* response) {
240   upb::Arena arena;
241   grpc_health_v1_HealthCheckResponse* response_struct =
242       grpc_health_v1_HealthCheckResponse_new(arena.ptr());
243   grpc_health_v1_HealthCheckResponse_set_status(
244       response_struct,
245       status == NOT_FOUND
246           ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN
247           : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING
248                               : grpc_health_v1_HealthCheckResponse_NOT_SERVING);
249   size_t buf_length;
250   char* buf = grpc_health_v1_HealthCheckResponse_serialize(
251       response_struct, arena.ptr(), &buf_length);
252   if (buf == nullptr) {
253     return false;
254   }
255   grpc_slice response_slice = grpc_slice_from_copied_buffer(buf, buf_length);
256   Slice encoded_response(response_slice, Slice::STEAL_REF);
257   ByteBuffer response_buffer(&encoded_response, 1);
258   response->Swap(&response_buffer);
259   return true;
260 }
261
262 //
263 // DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
264 //
265
266 void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
267     CreateAndStart(ServerCompletionQueue* cq,
268                    DefaultHealthCheckService* database,
269                    HealthCheckServiceImpl* service) {
270   std::shared_ptr<CallHandler> self =
271       std::make_shared<CheckCallHandler>(cq, database, service);
272   CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
273   {
274     grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
275     if (service->shutdown_) return;
276     // Request a Check() call.
277     handler->next_ =
278         CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
279                               std::placeholders::_1, std::placeholders::_2),
280                     std::move(self));
281     service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
282                                &handler->writer_, cq, cq, &handler->next_);
283   }
284 }
285
286 DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
287     CheckCallHandler(ServerCompletionQueue* cq,
288                      DefaultHealthCheckService* database,
289                      HealthCheckServiceImpl* service)
290     : cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
291
292 void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
293     OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
294   if (!ok) {
295     // The value of ok being false means that the server is shutting down.
296     return;
297   }
298   // Spawn a new handler instance to serve the next new client. Every handler
299   // instance will deallocate itself when it's done.
300   CreateAndStart(cq_, database_, service_);
301   // Process request.
302   gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
303           this);
304   std::string service_name;
305   grpc::Status status = Status::OK;
306   ByteBuffer response;
307   if (!service_->DecodeRequest(request_, &service_name)) {
308     status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
309   } else {
310     ServingStatus serving_status = database_->GetServingStatus(service_name);
311     if (serving_status == NOT_FOUND) {
312       status = Status(StatusCode::NOT_FOUND, "service name unknown");
313     } else if (!service_->EncodeResponse(serving_status, &response)) {
314       status = Status(StatusCode::INTERNAL, "could not encode response");
315     }
316   }
317   // Send response.
318   {
319     grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
320     if (!service_->shutdown_) {
321       next_ =
322           CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
323                                 std::placeholders::_1, std::placeholders::_2),
324                       std::move(self));
325       if (status.ok()) {
326         writer_.Finish(response, status, &next_);
327       } else {
328         writer_.FinishWithError(status, &next_);
329       }
330     }
331   }
332 }
333
334 void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
335     OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
336   if (ok) {
337     gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
338             service_, this);
339   }
340   self.reset();  // To appease clang-tidy.
341 }
342
343 //
344 // DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
345 //
346
347 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
348     CreateAndStart(ServerCompletionQueue* cq,
349                    DefaultHealthCheckService* database,
350                    HealthCheckServiceImpl* service) {
351   std::shared_ptr<CallHandler> self =
352       std::make_shared<WatchCallHandler>(cq, database, service);
353   WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
354   {
355     grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
356     if (service->shutdown_) return;
357     // Request AsyncNotifyWhenDone().
358     handler->on_done_notified_ =
359         CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
360                               std::placeholders::_1, std::placeholders::_2),
361                     self /* copies ref */);
362     handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
363     // Request a Watch() call.
364     handler->next_ =
365         CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
366                               std::placeholders::_1, std::placeholders::_2),
367                     std::move(self));
368     service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
369                                          &handler->stream_, cq, cq,
370                                          &handler->next_);
371   }
372 }
373
374 DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
375     WatchCallHandler(ServerCompletionQueue* cq,
376                      DefaultHealthCheckService* database,
377                      HealthCheckServiceImpl* service)
378     : cq_(cq), database_(database), service_(service), stream_(&ctx_) {}
379
380 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
381     OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
382   if (!ok) {
383     // Server shutting down.
384     //
385     // AsyncNotifyWhenDone() needs to be called before the call starts, but the
386     // tag will not pop out if the call never starts (
387     // https://github.com/grpc/grpc/issues/10136). So we need to manually
388     // release the ownership of the handler in this case.
389     GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
390     return;
391   }
392   // Spawn a new handler instance to serve the next new client. Every handler
393   // instance will deallocate itself when it's done.
394   CreateAndStart(cq_, database_, service_);
395   // Parse request.
396   if (!service_->DecodeRequest(request_, &service_name_)) {
397     SendFinish(std::move(self),
398                Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
399     return;
400   }
401   // Register the call for updates to the service.
402   gpr_log(GPR_DEBUG,
403           "[HCS %p] Health watch started for service \"%s\" (handler: %p)",
404           service_, service_name_.c_str(), this);
405   database_->RegisterCallHandler(service_name_, std::move(self));
406 }
407
408 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
409     SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
410   grpc_core::MutexLock lock(&send_mu_);
411   // If there's already a send in flight, cache the new status, and
412   // we'll start a new send for it when the one in flight completes.
413   if (send_in_flight_) {
414     pending_status_ = status;
415     return;
416   }
417   // Start a send.
418   SendHealthLocked(std::move(self), status);
419 }
420
421 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
422     SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
423   send_in_flight_ = true;
424   // Construct response.
425   ByteBuffer response;
426   bool success = service_->EncodeResponse(status, &response);
427   // Grab shutdown lock and send response.
428   grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
429   if (service_->shutdown_) {
430     SendFinishLocked(std::move(self), Status::CANCELLED);
431     return;
432   }
433   if (!success) {
434     SendFinishLocked(std::move(self),
435                      Status(StatusCode::INTERNAL, "could not encode response"));
436     return;
437   }
438   next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
439                                 std::placeholders::_1, std::placeholders::_2),
440                       std::move(self));
441   stream_.Write(response, &next_);
442 }
443
444 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
445     OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
446   if (!ok) {
447     SendFinish(std::move(self), Status::CANCELLED);
448     return;
449   }
450   grpc_core::MutexLock lock(&send_mu_);
451   send_in_flight_ = false;
452   // If we got a new status since we started the last send, start a
453   // new send for it.
454   if (pending_status_ != NOT_FOUND) {
455     auto status = pending_status_;
456     pending_status_ = NOT_FOUND;
457     SendHealthLocked(std::move(self), status);
458   }
459 }
460
461 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
462     SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
463   if (finish_called_) return;
464   grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
465   if (service_->shutdown_) return;
466   SendFinishLocked(std::move(self), status);
467 }
468
469 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
470     SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
471   on_finish_done_ =
472       CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
473                             std::placeholders::_1, std::placeholders::_2),
474                   std::move(self));
475   stream_.Finish(status, &on_finish_done_);
476   finish_called_ = true;
477 }
478
479 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
480     OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
481   if (ok) {
482     gpr_log(GPR_DEBUG,
483             "[HCS %p] Health watch call finished (service_name: \"%s\", "
484             "handler: %p).",
485             service_, service_name_.c_str(), this);
486   }
487   self.reset();  // To appease clang-tidy.
488 }
489
490 // TODO(roth): This method currently assumes that there will be only one
491 // thread polling the cq and invoking the corresponding callbacks.  If
492 // that changes, we will need to add synchronization here.
493 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
494     OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
495   GPR_ASSERT(ok);
496   gpr_log(GPR_DEBUG,
497           "[HCS %p] Health watch call is notified done (handler: %p, "
498           "is_cancelled: %d).",
499           service_, this, static_cast<int>(ctx_.IsCancelled()));
500   database_->UnregisterCallHandler(service_name_, self);
501   SendFinish(std::move(self), Status::CANCELLED);
502 }
503
504 }  // namespace grpc