RefCountedPtr<ResolvingLoadBalancingPolicy> parent)
: parent_(std::move(parent)) {}
- ~ResolverResultHandler() {
+ ~ResolverResultHandler() override {
if (GRPC_TRACE_FLAG_ENABLED(*(parent_->tracer_))) {
gpr_log(GPR_INFO, "resolving_lb=%p: resolver shutdown complete",
parent_.get());
: parent_(std::move(parent)) {}
RefCountedPtr<SubchannelInterface> CreateSubchannel(
- const grpc_channel_args& args) override {
+ ServerAddress address, const grpc_channel_args& args) override {
if (parent_->resolver_ == nullptr) return nullptr; // Shutting down.
- return parent_->channel_control_helper()->CreateSubchannel(args);
+ return parent_->channel_control_helper()->CreateSubchannel(
+ std::move(address), args);
}
- void UpdateState(grpc_connectivity_state state,
+ void UpdateState(grpc_connectivity_state state, const absl::Status& status,
std::unique_ptr<SubchannelPicker> picker) override {
if (parent_->resolver_ == nullptr) return; // Shutting down.
- parent_->channel_control_helper()->UpdateState(state, std::move(picker));
+ parent_->channel_control_helper()->UpdateState(state, status,
+ std::move(picker));
}
void RequestReresolution() override {
ResolvingLoadBalancingPolicy::ResolvingLoadBalancingPolicy(
Args args, TraceFlag* tracer, grpc_core::UniquePtr<char> target_uri,
- ProcessResolverResultCallback process_resolver_result,
- void* process_resolver_result_user_data)
+ ChannelConfigHelper* helper)
: LoadBalancingPolicy(std::move(args)),
tracer_(tracer),
target_uri_(std::move(target_uri)),
- process_resolver_result_(process_resolver_result),
- process_resolver_result_user_data_(process_resolver_result_user_data) {
- GPR_ASSERT(process_resolver_result != nullptr);
+ helper_(helper) {
+ GPR_ASSERT(helper_ != nullptr);
resolver_ = ResolverRegistry::CreateResolver(
target_uri_.get(), args.args, interested_parties(), work_serializer(),
absl::make_unique<ResolverResultHandler>(Ref()));
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
gpr_log(GPR_INFO, "resolving_lb=%p: starting name resolution", this);
}
- channel_control_helper()->UpdateState(GRPC_CHANNEL_CONNECTING,
+ channel_control_helper()->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(),
absl::make_unique<QueuePicker>(Ref()));
resolver_->StartLocked();
}
void ResolvingLoadBalancingPolicy::ShutdownLocked() {
if (resolver_ != nullptr) {
+ if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
+ gpr_log(GPR_INFO, "resolving_lb=%p: shutting down resolver=%p", this,
+ resolver_.get());
+ }
resolver_.reset();
if (lb_policy_ != nullptr) {
if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
if (lb_policy_ == nullptr) {
grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Resolver transient failure", &error, 1);
+ helper_->ResolverTransientFailure(GRPC_ERROR_REF(state_error));
channel_control_helper()->UpdateState(
- GRPC_CHANNEL_TRANSIENT_FAILURE,
+ GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error),
absl::make_unique<TransientFailurePicker>(state_error));
}
GRPC_ERROR_UNREF(error);
UpdateArgs update_args;
update_args.addresses = std::move(result.addresses);
update_args.config = std::move(lb_policy_config);
- // TODO(roth): Once channel args is converted to C++, use std::move() here.
- update_args.args = result.args;
- result.args = nullptr;
+ // Remove the config selector from channel args so that we're not holding
+ // unnecessary refs that cause it to be destroyed somewhere other than in the
+ // WorkSerializer.
+ const char* arg_name = GRPC_ARG_CONFIG_SELECTOR;
+ update_args.args =
+ grpc_channel_args_copy_and_remove(result.args, &arg_name, 1);
// Create policy if needed.
if (lb_policy_ == nullptr) {
lb_policy_ = CreateLbPolicyLocked(*update_args.args);
//
// We track a list of strings to eventually be concatenated and traced.
TraceStringVector trace_strings;
- const bool resolution_contains_addresses = result.addresses.size() > 0;
- // Process the resolver result.
- RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config;
- bool service_config_changed = false;
- std::string service_config_error_string;
- if (process_resolver_result_ != nullptr) {
- grpc_error* service_config_error = GRPC_ERROR_NONE;
- bool no_valid_service_config = false;
- service_config_changed = process_resolver_result_(
- process_resolver_result_user_data_, result, &lb_policy_config,
- &service_config_error, &no_valid_service_config);
- if (service_config_error != GRPC_ERROR_NONE) {
- service_config_error_string = grpc_error_string(service_config_error);
- if (no_valid_service_config) {
- // We received an invalid service config and we don't have a
- // fallback service config.
- OnResolverError(service_config_error);
- } else {
- GRPC_ERROR_UNREF(service_config_error);
- }
- }
+ MaybeAddTraceMessagesForAddressChangesLocked(!result.addresses.empty(),
+ &trace_strings);
+ // The result of grpc_error_string() is owned by the error itself.
+ // We're storing that string in trace_strings, so we need to make sure
+ // that the error lives until we're done with the string.
+ grpc_error* service_config_error =
+ GRPC_ERROR_REF(result.service_config_error);
+ if (service_config_error != GRPC_ERROR_NONE) {
+ trace_strings.push_back(grpc_error_string(service_config_error));
+ }
+ // Choose the service config.
+ ChannelConfigHelper::ChooseServiceConfigResult service_config_result;
+ if (helper_ != nullptr) {
+ service_config_result = helper_->ChooseServiceConfig(result);
} else {
- lb_policy_config = child_lb_config_;
+ service_config_result.lb_policy_config = child_lb_config_;
}
- if (lb_policy_config != nullptr) {
+ if (service_config_result.no_valid_service_config) {
+ // We received an invalid service config and we don't have a
+ // previous service config to fall back to.
+ OnResolverError(GRPC_ERROR_REF(service_config_error));
+ trace_strings.push_back("no valid service config");
+ } else {
// Create or update LB policy, as needed.
- CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config),
- std::move(result));
+ CreateOrUpdateLbPolicyLocked(
+ std::move(service_config_result.lb_policy_config), std::move(result));
+ if (service_config_result.service_config_changed) {
+ // Tell channel to start using new service config for calls.
+ // This needs to happen after the LB policy has been updated, since
+ // the ConfigSelector may need the LB policy to know about new
+ // destinations before it can send RPCs to those destinations.
+ if (helper_ != nullptr) helper_->StartUsingServiceConfigForCalls();
+ // TODO(ncteisen): might be worth somehow including a snippet of the
+ // config in the trace, at the risk of bloating the trace logs.
+ trace_strings.push_back("Service config changed");
+ }
}
// Add channel trace event.
- if (service_config_changed) {
- // TODO(ncteisen): might be worth somehow including a snippet of the
- // config in the trace, at the risk of bloating the trace logs.
- trace_strings.push_back("Service config changed");
- }
- if (!service_config_error_string.empty()) {
- trace_strings.push_back(service_config_error_string.c_str());
- }
- MaybeAddTraceMessagesForAddressChangesLocked(resolution_contains_addresses,
- &trace_strings);
ConcatenateAndAddChannelTraceLocked(trace_strings);
+ GRPC_ERROR_UNREF(service_config_error);
}
} // namespace grpc_core