Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / health / health_check_client.cc
index 6091184..d8f4455 100644 (file)
 
 #include <grpc/support/port_platform.h>
 
+#include "src/core/ext/filters/client_channel/health/health_check_client.h"
+
 #include <stdint.h>
 #include <stdio.h>
 
-#include "src/core/ext/filters/client_channel/health/health_check_client.h"
+#include "upb/upb.hpp"
 
 #include "src/core/lib/debug/trace.h"
 #include "src/core/lib/gprpp/sync.h"
@@ -44,13 +46,16 @@ TraceFlag grpc_health_check_client_trace(false, "health_check_client");
 //
 
 HealthCheckClient::HealthCheckClient(
-    const char* service_name,
+    std::string service_name,
     RefCountedPtr<ConnectedSubchannel> connected_subchannel,
     grpc_pollset_set* interested_parties,
     RefCountedPtr<channelz::SubchannelNode> channelz_node,
     RefCountedPtr<ConnectivityStateWatcherInterface> watcher)
-    : InternallyRefCounted<HealthCheckClient>(&grpc_health_check_client_trace),
-      service_name_(service_name),
+    : InternallyRefCounted<HealthCheckClient>(
+          GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)
+              ? "HealthCheckClient"
+              : nullptr),
+      service_name_(std::move(service_name)),
       connected_subchannel_(std::move(connected_subchannel)),
       interested_parties_(interested_parties),
       channelz_node_(std::move(channelz_node)),
@@ -89,7 +94,12 @@ void HealthCheckClient::SetHealthStatusLocked(grpc_connectivity_state state,
     gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s", this,
             ConnectivityStateName(state), reason);
   }
-  if (watcher_ != nullptr) watcher_->Notify(state);
+  if (watcher_ != nullptr) {
+    watcher_->Notify(state,
+                     state == GRPC_CHANNEL_TRANSIENT_FAILURE
+                         ? absl::Status(absl::StatusCode::kUnavailable, reason)
+                         : absl::Status());
+  }
 }
 
 void HealthCheckClient::Orphan() {
@@ -125,8 +135,7 @@ void HealthCheckClient::StartCallLocked() {
   call_state_->StartCall();
 }
 
-void HealthCheckClient::StartRetryTimer() {
-  MutexLock lock(&mu_);
+void HealthCheckClient::StartRetryTimerLocked() {
   SetHealthStatusLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
                         "health check call failed; will retry after backoff");
   grpc_millis next_try = retry_backoff_.NextAttemptTime();
@@ -148,7 +157,7 @@ void HealthCheckClient::StartRetryTimer() {
   grpc_timer_init(&retry_timer_, next_try, &retry_timer_callback_);
 }
 
-void HealthCheckClient::OnRetryTimer(void* arg, grpc_error* error) {
+void HealthCheckClient::OnRetryTimer(void* arg, grpc_error_handle error) {
   HealthCheckClient* self = static_cast<HealthCheckClient*>(arg);
   {
     MutexLock lock(&self->mu_);
@@ -171,13 +180,14 @@ void HealthCheckClient::OnRetryTimer(void* arg, grpc_error* error) {
 
 namespace {
 
-void EncodeRequest(const char* service_name,
+void EncodeRequest(const std::string& service_name,
                    ManualConstructor<SliceBufferByteStream>* send_message) {
   upb::Arena arena;
   grpc_health_v1_HealthCheckRequest* request_struct =
       grpc_health_v1_HealthCheckRequest_new(arena.ptr());
   grpc_health_v1_HealthCheckRequest_set_service(
-      request_struct, upb_strview_makez(service_name));
+      request_struct,
+      upb_strview_make(service_name.data(), service_name.size()));
   size_t buf_length;
   char* buf = grpc_health_v1_HealthCheckRequest_serialize(
       request_struct, arena.ptr(), &buf_length);
@@ -192,7 +202,7 @@ void EncodeRequest(const char* service_name,
 
 // Returns true if healthy.
 // If there was an error parsing the response, sets *error and returns false.
-bool DecodeResponse(grpc_slice_buffer* slice_buffer, grpc_error** error) {
+bool DecodeResponse(grpc_slice_buffer* slice_buffer, grpc_error_handle* error) {
   // If message is empty, assume unhealthy.
   if (slice_buffer->length == 0) {
     *error =
@@ -200,7 +210,7 @@ bool DecodeResponse(grpc_slice_buffer* slice_buffer, grpc_error** error) {
     return false;
   }
   // Concatenate the slices to form a single string.
-  UniquePtr<uint8_t> recv_message_deleter;
+  std::unique_ptr<uint8_t> recv_message_deleter;
   uint8_t* recv_message;
   if (slice_buffer->count == 1) {
     recv_message = GRPC_SLICE_START_PTR(slice_buffer->slices[0]);
@@ -243,7 +253,7 @@ HealthCheckClient::CallState::CallState(
     : health_check_client_(std::move(health_check_client)),
       pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
       arena_(Arena::Create(health_check_client_->connected_subchannel_
-                               ->GetInitialCallSizeEstimate(0))),
+                               ->GetInitialCallSizeEstimate())),
       payload_(context_) {}
 
 HealthCheckClient::CallState::~CallState() {
@@ -259,11 +269,8 @@ HealthCheckClient::CallState::~CallState() {
   // Unset the call combiner cancellation closure.  This has the
   // effect of scheduling the previously set cancellation closure, if
   // any, so that it can release any internal references it may be
-  // holding to the call stack. Also flush the closures on exec_ctx so that
-  // filters that schedule cancel notification closures on exec_ctx do not
-  // need to take a ref of the call stack to guarantee closure liveness.
+  // holding to the call stack.
   call_combiner_.SetNotifyOnCancel(nullptr);
-  ExecCtx::Get()->Flush();
   arena_->Destroy();
 }
 
@@ -282,9 +289,8 @@ void HealthCheckClient::CallState::StartCall() {
       arena_,
       context_,
       &call_combiner_,
-      0,  // parent_data_size
   };
-  grpc_error* error = GRPC_ERROR_NONE;
+  grpc_error_handle error = GRPC_ERROR_NONE;
   call_ = SubchannelCall::Create(std::move(args), &error).release();
   // Register after-destruction callback.
   GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
@@ -295,15 +301,10 @@ void HealthCheckClient::CallState::StartCall() {
     gpr_log(GPR_ERROR,
             "HealthCheckClient %p CallState %p: error creating health "
             "checking call on subchannel (%s); will retry",
-            health_check_client_.get(), this, grpc_error_string(error));
+            health_check_client_.get(), this,
+            grpc_error_std_string(error).c_str());
     GRPC_ERROR_UNREF(error);
-    // Schedule instead of running directly, since we must not be
-    // holding health_check_client_->mu_ when CallEnded() is called.
-    call_->Ref(DEBUG_LOCATION, "call_end_closure").release();
-    GRPC_CLOSURE_SCHED(
-        GRPC_CLOSURE_INIT(&batch_.handler_private.closure, CallEndedRetry, this,
-                          grpc_schedule_on_exec_ctx),
-        GRPC_ERROR_NONE);
+    CallEndedLocked(/*retry=*/true);
     return;
   }
   // Initialize payload and batch.
@@ -351,6 +352,7 @@ void HealthCheckClient::CallState::StartCall() {
   batch_.recv_initial_metadata = true;
   // Add recv_message op.
   payload_.recv_message.recv_message = &recv_message_;
+  payload_.recv_message.call_failed_before_recv_message = nullptr;
   // recv_message callback takes ref, handled manually.
   call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
   payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
@@ -377,8 +379,8 @@ void HealthCheckClient::CallState::StartCall() {
   StartBatch(&recv_trailing_metadata_batch_);
 }
 
-void HealthCheckClient::CallState::StartBatchInCallCombiner(void* arg,
-                                                            grpc_error* error) {
+void HealthCheckClient::CallState::StartBatchInCallCombiner(
+    void* arg, grpc_error_handle /*error*/) {
   grpc_transport_stream_op_batch* batch =
       static_cast<grpc_transport_stream_op_batch*>(arg);
   SubchannelCall* call =
@@ -396,21 +398,22 @@ void HealthCheckClient::CallState::StartBatch(
 }
 
 void HealthCheckClient::CallState::AfterCallStackDestruction(
-    void* arg, grpc_error* error) {
+    void* arg, grpc_error_handle /*error*/) {
   HealthCheckClient::CallState* self =
       static_cast<HealthCheckClient::CallState*>(arg);
-  Delete(self);
+  delete self;
 }
 
-void HealthCheckClient::CallState::OnCancelComplete(void* arg,
-                                                    grpc_error* error) {
+void HealthCheckClient::CallState::OnCancelComplete(
+    void* arg, grpc_error_handle /*error*/) {
   HealthCheckClient::CallState* self =
       static_cast<HealthCheckClient::CallState*>(arg);
   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
   self->call_->Unref(DEBUG_LOCATION, "cancel");
 }
 
-void HealthCheckClient::CallState::StartCancel(void* arg, grpc_error* error) {
+void HealthCheckClient::CallState::StartCancel(void* arg,
+                                               grpc_error_handle /*error*/) {
   HealthCheckClient::CallState* self =
       static_cast<HealthCheckClient::CallState*>(arg);
   auto* batch = grpc_make_transport_stream_op(
@@ -422,8 +425,9 @@ void HealthCheckClient::CallState::StartCancel(void* arg, grpc_error* error) {
 
 void HealthCheckClient::CallState::Cancel() {
   bool expected = false;
-  if (cancelled_.CompareExchangeStrong(&expected, true, MemoryOrder::ACQ_REL,
-                                       MemoryOrder::ACQUIRE)) {
+  if (cancelled_.compare_exchange_strong(expected, true,
+                                         std::memory_order_acq_rel,
+                                         std::memory_order_acquire)) {
     call_->Ref(DEBUG_LOCATION, "cancel").release();
     GRPC_CALL_COMBINER_START(
         &call_combiner_,
@@ -432,7 +436,8 @@ void HealthCheckClient::CallState::Cancel() {
   }
 }
 
-void HealthCheckClient::CallState::OnComplete(void* arg, grpc_error* error) {
+void HealthCheckClient::CallState::OnComplete(void* arg,
+                                              grpc_error_handle /*error*/) {
   HealthCheckClient::CallState* self =
       static_cast<HealthCheckClient::CallState*>(arg);
   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
@@ -441,8 +446,8 @@ void HealthCheckClient::CallState::OnComplete(void* arg, grpc_error* error) {
   self->call_->Unref(DEBUG_LOCATION, "on_complete");
 }
 
-void HealthCheckClient::CallState::RecvInitialMetadataReady(void* arg,
-                                                            grpc_error* error) {
+void HealthCheckClient::CallState::RecvInitialMetadataReady(
+    void* arg, grpc_error_handle /*error*/) {
   HealthCheckClient::CallState* self =
       static_cast<HealthCheckClient::CallState*>(arg);
   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
@@ -450,7 +455,8 @@ void HealthCheckClient::CallState::RecvInitialMetadataReady(void* arg,
   self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
 }
 
-void HealthCheckClient::CallState::DoneReadingRecvMessage(grpc_error* error) {
+void HealthCheckClient::CallState::DoneReadingRecvMessage(
+    grpc_error_handle error) {
   recv_message_.reset();
   if (error != GRPC_ERROR_NONE) {
     GRPC_ERROR_UNREF(error);
@@ -462,11 +468,11 @@ void HealthCheckClient::CallState::DoneReadingRecvMessage(grpc_error* error) {
   const bool healthy = DecodeResponse(&recv_message_buffer_, &error);
   const grpc_connectivity_state state =
       healthy ? GRPC_CHANNEL_READY : GRPC_CHANNEL_TRANSIENT_FAILURE;
-  const char* reason = error == GRPC_ERROR_NONE && !healthy
-                           ? "backend unhealthy"
-                           : grpc_error_string(error);
-  health_check_client_->SetHealthStatus(state, reason);
-  seen_response_.Store(true, MemoryOrder::RELEASE);
+  health_check_client_->SetHealthStatus(
+      state, error == GRPC_ERROR_NONE && !healthy
+                 ? "backend unhealthy"
+                 : grpc_error_std_string(error).c_str());
+  seen_response_.store(true, std::memory_order_release);
   grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
   // Start another recv_message batch.
   // This re-uses the ref we're holding.
@@ -474,15 +480,16 @@ void HealthCheckClient::CallState::DoneReadingRecvMessage(grpc_error* error) {
   // callbacks from the original batch have completed yet.
   recv_message_batch_.payload = &payload_;
   payload_.recv_message.recv_message = &recv_message_;
+  payload_.recv_message.call_failed_before_recv_message = nullptr;
   payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
       &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
   recv_message_batch_.recv_message = true;
   StartBatch(&recv_message_batch_);
 }
 
-grpc_error* HealthCheckClient::CallState::PullSliceFromRecvMessage() {
+grpc_error_handle HealthCheckClient::CallState::PullSliceFromRecvMessage() {
   grpc_slice slice;
-  grpc_error* error = recv_message_->Pull(&slice);
+  grpc_error_handle error = recv_message_->Pull(&slice);
   if (error == GRPC_ERROR_NONE) {
     grpc_slice_buffer_add(&recv_message_buffer_, slice);
   }
@@ -491,7 +498,7 @@ grpc_error* HealthCheckClient::CallState::PullSliceFromRecvMessage() {
 
 void HealthCheckClient::CallState::ContinueReadingRecvMessage() {
   while (recv_message_->Next(SIZE_MAX, &recv_message_ready_)) {
-    grpc_error* error = PullSliceFromRecvMessage();
+    grpc_error_handle error = PullSliceFromRecvMessage();
     if (error != GRPC_ERROR_NONE) {
       DoneReadingRecvMessage(error);
       return;
@@ -504,7 +511,7 @@ void HealthCheckClient::CallState::ContinueReadingRecvMessage() {
 }
 
 void HealthCheckClient::CallState::OnByteStreamNext(void* arg,
-                                                    grpc_error* error) {
+                                                    grpc_error_handle error) {
   HealthCheckClient::CallState* self =
       static_cast<HealthCheckClient::CallState*>(arg);
   if (error != GRPC_ERROR_NONE) {
@@ -523,8 +530,8 @@ void HealthCheckClient::CallState::OnByteStreamNext(void* arg,
   }
 }
 
-void HealthCheckClient::CallState::RecvMessageReady(void* arg,
-                                                    grpc_error* error) {
+void HealthCheckClient::CallState::RecvMessageReady(
+    void* arg, grpc_error_handle /*error*/) {
   HealthCheckClient::CallState* self =
       static_cast<HealthCheckClient::CallState*>(arg);
   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
@@ -540,7 +547,7 @@ void HealthCheckClient::CallState::RecvMessageReady(void* arg,
 }
 
 void HealthCheckClient::CallState::RecvTrailingMetadataReady(
-    void* arg, grpc_error* error) {
+    void* arg, grpc_error_handle error) {
   HealthCheckClient::CallState* self =
       static_cast<HealthCheckClient::CallState*>(arg);
   GRPC_CALL_COMBINER_STOP(&self->call_combiner_,
@@ -579,18 +586,11 @@ void HealthCheckClient::CallState::RecvTrailingMetadataReady(
                                                 kErrorMessage);
     retry = false;
   }
-  self->CallEnded(retry);
-}
-
-void HealthCheckClient::CallState::CallEndedRetry(void* arg,
-                                                  grpc_error* error) {
-  HealthCheckClient::CallState* self =
-      static_cast<HealthCheckClient::CallState*>(arg);
-  self->CallEnded(true /* retry */);
-  self->call_->Unref(DEBUG_LOCATION, "call_end_closure");
+  MutexLock lock(&self->health_check_client_->mu_);
+  self->CallEndedLocked(retry);
 }
 
-void HealthCheckClient::CallState::CallEnded(bool retry) {
+void HealthCheckClient::CallState::CallEndedLocked(bool retry) {
   // If this CallState is still in use, this call ended because of a failure,
   // so we need to stop using it and optionally create a new one.
   // Otherwise, we have deliberately ended this call, and no further action
@@ -599,14 +599,14 @@ void HealthCheckClient::CallState::CallEnded(bool retry) {
     health_check_client_->call_state_.reset();
     if (retry) {
       GPR_ASSERT(!health_check_client_->shutting_down_);
-      if (seen_response_.Load(MemoryOrder::ACQUIRE)) {
+      if (seen_response_.load(std::memory_order_acquire)) {
         // If the call fails after we've gotten a successful response, reset
         // the backoff and restart the call immediately.
         health_check_client_->retry_backoff_.Reset();
-        health_check_client_->StartCall();
+        health_check_client_->StartCallLocked();
       } else {
         // If the call failed without receiving any messages, retry later.
-        health_check_client_->StartRetryTimer();
+        health_check_client_->StartRetryTimerLocked();
       }
     }
   }