#include <grpc/support/port_platform.h>
+#include "src/core/ext/filters/client_channel/health/health_check_client.h"
+
#include <stdint.h>
#include <stdio.h>
-#include "src/core/ext/filters/client_channel/health/health_check_client.h"
+#include "upb/upb.hpp"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/sync.h"
//
HealthCheckClient::HealthCheckClient(
- const char* service_name,
+ std::string service_name,
RefCountedPtr<ConnectedSubchannel> connected_subchannel,
grpc_pollset_set* interested_parties,
RefCountedPtr<channelz::SubchannelNode> channelz_node,
RefCountedPtr<ConnectivityStateWatcherInterface> watcher)
- : InternallyRefCounted<HealthCheckClient>(&grpc_health_check_client_trace),
- service_name_(service_name),
+ : InternallyRefCounted<HealthCheckClient>(
+ GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)
+ ? "HealthCheckClient"
+ : nullptr),
+ service_name_(std::move(service_name)),
connected_subchannel_(std::move(connected_subchannel)),
interested_parties_(interested_parties),
channelz_node_(std::move(channelz_node)),
gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s", this,
ConnectivityStateName(state), reason);
}
- if (watcher_ != nullptr) watcher_->Notify(state);
+ if (watcher_ != nullptr) {
+ watcher_->Notify(state,
+ state == GRPC_CHANNEL_TRANSIENT_FAILURE
+ ? absl::Status(absl::StatusCode::kUnavailable, reason)
+ : absl::Status());
+ }
}
void HealthCheckClient::Orphan() {
call_state_->StartCall();
}
-void HealthCheckClient::StartRetryTimer() {
- MutexLock lock(&mu_);
+void HealthCheckClient::StartRetryTimerLocked() {
SetHealthStatusLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
"health check call failed; will retry after backoff");
grpc_millis next_try = retry_backoff_.NextAttemptTime();
grpc_timer_init(&retry_timer_, next_try, &retry_timer_callback_);
}
-void HealthCheckClient::OnRetryTimer(void* arg, grpc_error* error) {
+void HealthCheckClient::OnRetryTimer(void* arg, grpc_error_handle error) {
HealthCheckClient* self = static_cast<HealthCheckClient*>(arg);
{
MutexLock lock(&self->mu_);
namespace {
-void EncodeRequest(const char* service_name,
+void EncodeRequest(const std::string& service_name,
ManualConstructor<SliceBufferByteStream>* send_message) {
upb::Arena arena;
grpc_health_v1_HealthCheckRequest* request_struct =
grpc_health_v1_HealthCheckRequest_new(arena.ptr());
grpc_health_v1_HealthCheckRequest_set_service(
- request_struct, upb_strview_makez(service_name));
+ request_struct,
+ upb_strview_make(service_name.data(), service_name.size()));
size_t buf_length;
char* buf = grpc_health_v1_HealthCheckRequest_serialize(
request_struct, arena.ptr(), &buf_length);
// Returns true if healthy.
// If there was an error parsing the response, sets *error and returns false.
-bool DecodeResponse(grpc_slice_buffer* slice_buffer, grpc_error** error) {
+bool DecodeResponse(grpc_slice_buffer* slice_buffer, grpc_error_handle* error) {
// If message is empty, assume unhealthy.
if (slice_buffer->length == 0) {
*error =
return false;
}
// Concatenate the slices to form a single string.
- UniquePtr<uint8_t> recv_message_deleter;
+ std::unique_ptr<uint8_t> recv_message_deleter;
uint8_t* recv_message;
if (slice_buffer->count == 1) {
recv_message = GRPC_SLICE_START_PTR(slice_buffer->slices[0]);
: health_check_client_(std::move(health_check_client)),
pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
arena_(Arena::Create(health_check_client_->connected_subchannel_
- ->GetInitialCallSizeEstimate(0))),
+ ->GetInitialCallSizeEstimate())),
payload_(context_) {}
HealthCheckClient::CallState::~CallState() {
// Unset the call combiner cancellation closure. This has the
// effect of scheduling the previously set cancellation closure, if
// any, so that it can release any internal references it may be
- // holding to the call stack. Also flush the closures on exec_ctx so that
- // filters that schedule cancel notification closures on exec_ctx do not
- // need to take a ref of the call stack to guarantee closure liveness.
+ // holding to the call stack.
call_combiner_.SetNotifyOnCancel(nullptr);
- ExecCtx::Get()->Flush();
arena_->Destroy();
}
arena_,
context_,
&call_combiner_,
- 0, // parent_data_size
};
- grpc_error* error = GRPC_ERROR_NONE;
+ grpc_error_handle error = GRPC_ERROR_NONE;
call_ = SubchannelCall::Create(std::move(args), &error).release();
// Register after-destruction callback.
GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
gpr_log(GPR_ERROR,
"HealthCheckClient %p CallState %p: error creating health "
"checking call on subchannel (%s); will retry",
- health_check_client_.get(), this, grpc_error_string(error));
+ health_check_client_.get(), this,
+ grpc_error_std_string(error).c_str());
GRPC_ERROR_UNREF(error);
- // Schedule instead of running directly, since we must not be
- // holding health_check_client_->mu_ when CallEnded() is called.
- call_->Ref(DEBUG_LOCATION, "call_end_closure").release();
- GRPC_CLOSURE_SCHED(
- GRPC_CLOSURE_INIT(&batch_.handler_private.closure, CallEndedRetry, this,
- grpc_schedule_on_exec_ctx),
- GRPC_ERROR_NONE);
+ CallEndedLocked(/*retry=*/true);
return;
}
// Initialize payload and batch.
batch_.recv_initial_metadata = true;
// Add recv_message op.
payload_.recv_message.recv_message = &recv_message_;
+ payload_.recv_message.call_failed_before_recv_message = nullptr;
// recv_message callback takes ref, handled manually.
call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
StartBatch(&recv_trailing_metadata_batch_);
}
-void HealthCheckClient::CallState::StartBatchInCallCombiner(void* arg,
- grpc_error* error) {
+void HealthCheckClient::CallState::StartBatchInCallCombiner(
+ void* arg, grpc_error_handle /*error*/) {
grpc_transport_stream_op_batch* batch =
static_cast<grpc_transport_stream_op_batch*>(arg);
SubchannelCall* call =
}
void HealthCheckClient::CallState::AfterCallStackDestruction(
- void* arg, grpc_error* error) {
+ void* arg, grpc_error_handle /*error*/) {
HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg);
- Delete(self);
+ delete self;
}
-void HealthCheckClient::CallState::OnCancelComplete(void* arg,
- grpc_error* error) {
+void HealthCheckClient::CallState::OnCancelComplete(
+ void* arg, grpc_error_handle /*error*/) {
HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg);
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
self->call_->Unref(DEBUG_LOCATION, "cancel");
}
-void HealthCheckClient::CallState::StartCancel(void* arg, grpc_error* error) {
+void HealthCheckClient::CallState::StartCancel(void* arg,
+ grpc_error_handle /*error*/) {
HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg);
auto* batch = grpc_make_transport_stream_op(
void HealthCheckClient::CallState::Cancel() {
bool expected = false;
- if (cancelled_.CompareExchangeStrong(&expected, true, MemoryOrder::ACQ_REL,
- MemoryOrder::ACQUIRE)) {
+ if (cancelled_.compare_exchange_strong(expected, true,
+ std::memory_order_acq_rel,
+ std::memory_order_acquire)) {
call_->Ref(DEBUG_LOCATION, "cancel").release();
GRPC_CALL_COMBINER_START(
&call_combiner_,
}
}
-void HealthCheckClient::CallState::OnComplete(void* arg, grpc_error* error) {
+void HealthCheckClient::CallState::OnComplete(void* arg,
+ grpc_error_handle /*error*/) {
HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg);
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
self->call_->Unref(DEBUG_LOCATION, "on_complete");
}
-void HealthCheckClient::CallState::RecvInitialMetadataReady(void* arg,
- grpc_error* error) {
+void HealthCheckClient::CallState::RecvInitialMetadataReady(
+ void* arg, grpc_error_handle /*error*/) {
HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg);
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
}
-void HealthCheckClient::CallState::DoneReadingRecvMessage(grpc_error* error) {
+void HealthCheckClient::CallState::DoneReadingRecvMessage(
+ grpc_error_handle error) {
recv_message_.reset();
if (error != GRPC_ERROR_NONE) {
GRPC_ERROR_UNREF(error);
const bool healthy = DecodeResponse(&recv_message_buffer_, &error);
const grpc_connectivity_state state =
healthy ? GRPC_CHANNEL_READY : GRPC_CHANNEL_TRANSIENT_FAILURE;
- const char* reason = error == GRPC_ERROR_NONE && !healthy
- ? "backend unhealthy"
- : grpc_error_string(error);
- health_check_client_->SetHealthStatus(state, reason);
- seen_response_.Store(true, MemoryOrder::RELEASE);
+ health_check_client_->SetHealthStatus(
+ state, error == GRPC_ERROR_NONE && !healthy
+ ? "backend unhealthy"
+ : grpc_error_std_string(error).c_str());
+ seen_response_.store(true, std::memory_order_release);
grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
// Start another recv_message batch.
// This re-uses the ref we're holding.
// callbacks from the original batch have completed yet.
recv_message_batch_.payload = &payload_;
payload_.recv_message.recv_message = &recv_message_;
+ payload_.recv_message.call_failed_before_recv_message = nullptr;
payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
&recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
recv_message_batch_.recv_message = true;
StartBatch(&recv_message_batch_);
}
-grpc_error* HealthCheckClient::CallState::PullSliceFromRecvMessage() {
+grpc_error_handle HealthCheckClient::CallState::PullSliceFromRecvMessage() {
grpc_slice slice;
- grpc_error* error = recv_message_->Pull(&slice);
+ grpc_error_handle error = recv_message_->Pull(&slice);
if (error == GRPC_ERROR_NONE) {
grpc_slice_buffer_add(&recv_message_buffer_, slice);
}
void HealthCheckClient::CallState::ContinueReadingRecvMessage() {
while (recv_message_->Next(SIZE_MAX, &recv_message_ready_)) {
- grpc_error* error = PullSliceFromRecvMessage();
+ grpc_error_handle error = PullSliceFromRecvMessage();
if (error != GRPC_ERROR_NONE) {
DoneReadingRecvMessage(error);
return;
}
void HealthCheckClient::CallState::OnByteStreamNext(void* arg,
- grpc_error* error) {
+ grpc_error_handle error) {
HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg);
if (error != GRPC_ERROR_NONE) {
}
}
-void HealthCheckClient::CallState::RecvMessageReady(void* arg,
- grpc_error* error) {
+void HealthCheckClient::CallState::RecvMessageReady(
+ void* arg, grpc_error_handle /*error*/) {
HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg);
GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
}
void HealthCheckClient::CallState::RecvTrailingMetadataReady(
- void* arg, grpc_error* error) {
+ void* arg, grpc_error_handle error) {
HealthCheckClient::CallState* self =
static_cast<HealthCheckClient::CallState*>(arg);
GRPC_CALL_COMBINER_STOP(&self->call_combiner_,
kErrorMessage);
retry = false;
}
- self->CallEnded(retry);
-}
-
-void HealthCheckClient::CallState::CallEndedRetry(void* arg,
- grpc_error* error) {
- HealthCheckClient::CallState* self =
- static_cast<HealthCheckClient::CallState*>(arg);
- self->CallEnded(true /* retry */);
- self->call_->Unref(DEBUG_LOCATION, "call_end_closure");
+ MutexLock lock(&self->health_check_client_->mu_);
+ self->CallEndedLocked(retry);
}
-void HealthCheckClient::CallState::CallEnded(bool retry) {
+void HealthCheckClient::CallState::CallEndedLocked(bool retry) {
// If this CallState is still in use, this call ended because of a failure,
// so we need to stop using it and optionally create a new one.
// Otherwise, we have deliberately ended this call, and no further action
health_check_client_->call_state_.reset();
if (retry) {
GPR_ASSERT(!health_check_client_->shutting_down_);
- if (seen_response_.Load(MemoryOrder::ACQUIRE)) {
+ if (seen_response_.load(std::memory_order_acquire)) {
// If the call fails after we've gotten a successful response, reset
// the backoff and restart the call immediately.
health_check_client_->retry_backoff_.Reset();
- health_check_client_->StartCall();
+ health_check_client_->StartCallLocked();
} else {
// If the call failed without receiving any messages, retry later.
- health_check_client_->StartRetryTimer();
+ health_check_client_->StartRetryTimerLocked();
}
}
}