#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/timer.h"
-#include "src/core/lib/security/credentials/credentials.h"
-#include "src/core/lib/security/credentials/fake/fake_credentials.h"
#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/surface/call.h"
private:
class ResourceState : public InternallyRefCounted<ResourceState> {
public:
- ResourceState(const std::string& type_url, const std::string& name)
- : type_url_(type_url), name_(name) {
+ ResourceState(const std::string& type_url, const std::string& name,
+ bool sent_initial_request)
+ : type_url_(type_url),
+ name_(name),
+ sent_initial_request_(sent_initial_request) {
GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
grpc_schedule_on_exec_ctx);
}
}
void Start(RefCountedPtr<AdsCallState> ads_calld) {
- if (sent_) return;
- sent_ = true;
+ if (sent_initial_request_) return;
+ sent_initial_request_ = true;
ads_calld_ = std::move(ads_calld);
Ref(DEBUG_LOCATION, "timer").release();
timer_pending_ = true;
const std::string name_;
RefCountedPtr<AdsCallState> ads_calld_;
- bool sent_ = false;
+ bool sent_initial_request_;
bool timer_pending_ = false;
grpc_timer timer_;
grpc_closure timer_callback_;
struct ResourceTypeState {
~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
- // Version, nonce, and error for this resource type.
- std::string version;
+ // Nonce and error for this resource type.
std::string nonce;
grpc_error* error = GRPC_ERROR_NONE;
void ScheduleNextReportLocked();
static void OnNextReportTimer(void* arg, grpc_error* error);
bool OnNextReportTimerLocked(grpc_error* error);
- void SendReportLocked();
+ bool SendReportLocked();
static void OnReportDone(void* arg, grpc_error* error);
bool OnReportDoneLocked(grpc_error* error);
// XdsClient::ChannelState
//
+namespace {
+
+grpc_channel* CreateXdsChannel(const XdsBootstrap::XdsServer& server) {
+ // Build channel args.
+ absl::InlinedVector<grpc_arg, 2> args_to_add = {
+ grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
+ 5 * 60 * GPR_MS_PER_SEC),
+ grpc_channel_arg_integer_create(
+ const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
+ };
+ grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
+ g_channel_args, args_to_add.data(), args_to_add.size());
+ // Create channel creds.
+ RefCountedPtr<grpc_channel_credentials> channel_creds =
+ XdsChannelCredsRegistry::MakeChannelCreds(server.channel_creds_type,
+ server.channel_creds_config);
+ // Create channel.
+ grpc_channel* channel = grpc_secure_channel_create(
+ channel_creds.get(), server.server_uri.c_str(), new_args, nullptr);
+ grpc_channel_args_destroy(new_args);
+ return channel;
+}
+
+} // namespace
+
XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
- grpc_channel* channel)
- : InternallyRefCounted<ChannelState>(&grpc_xds_client_trace),
+ const XdsBootstrap::XdsServer& server)
+ : InternallyRefCounted<ChannelState>(
+ GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "ChannelState"
+ : nullptr),
xds_client_(std::move(xds_client)),
- channel_(channel) {
+ server_(server) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
+ gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
+ xds_client_.get(), server.server_uri.c_str());
+ }
+ channel_ = CreateXdsChannel(server);
GPR_ASSERT(channel_ != nullptr);
StartConnectivityWatchLocked();
}
XdsClient::ChannelState::AdsCallState::AdsCallState(
RefCountedPtr<RetryableCall<AdsCallState>> parent)
- : InternallyRefCounted<AdsCallState>(&grpc_xds_client_trace),
+ : InternallyRefCounted<AdsCallState>(
+ GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "AdsCallState"
+ : nullptr),
parent_(std::move(parent)) {
// Init the ADS call. Note that the call will progress every time there's
// activity in xds_client()->interested_parties_, which is comprised of
GPR_ASSERT(xds_client() != nullptr);
// Create a call with the specified method name.
const auto& method =
- xds_client()->bootstrap_->server().ShouldUseV3()
+ chand()->server_.ShouldUseV3()
? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
: GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
call_ = grpc_channel_create_pollset_set_call(
std::set<absl::string_view> resource_names =
ResourceNamesForRequest(type_url);
request_payload_slice = xds_client()->api_.CreateAdsRequest(
- type_url, resource_names, state.version, state.nonce,
+ chand()->server_, type_url, resource_names,
+ xds_client()->resource_version_map_[type_url], state.nonce,
GRPC_ERROR_REF(state.error), !sent_initial_message_);
if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
gpr_log(GPR_INFO,
"[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
"error=%s resources=%s",
- xds_client(), type_url.c_str(), state.version.c_str(),
+ xds_client(), type_url.c_str(),
+ xds_client()->resource_version_map_[type_url].c_str(),
state.nonce.c_str(), grpc_error_string(state.error),
absl::StrJoin(resource_names, " ").c_str());
}
const std::string& type_url, const std::string& name) {
auto& state = state_map_[type_url].subscribed_resources[name];
if (state == nullptr) {
- state = MakeOrphanable<ResourceState>(type_url, name);
+ state = MakeOrphanable<ResourceState>(
+ type_url, name, !xds_client()->resource_version_map_[type_url].empty());
SendMessageLocked(type_url);
}
}
} else if (result.type_url == XdsApi::kEdsTypeUrl) {
AcceptEdsUpdate(std::move(result.eds_update_map));
}
- state.version = std::move(result.version);
+ xds_client()->resource_version_map_[result.type_url] =
+ std::move(result.version);
// ACK the update.
SendMessageLocked(result.type_url);
// Start load reporting if needed.
GRPC_ERROR_UNREF(error);
return true;
}
- SendReportLocked();
- return false;
+ return SendReportLocked();
}
namespace {
} // namespace
-void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
+bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
// Construct snapshot from all reported stats.
XdsApi::ClusterLoadReportMap snapshot =
xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
const bool old_val = last_report_counters_were_zero_;
last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
if (old_val && last_report_counters_were_zero_) {
+ if (xds_client()->load_report_map_.empty()) {
+ parent_->chand()->StopLrsCall();
+ return true;
+ }
ScheduleNextReportLocked();
- return;
+ return false;
}
// Create a request that contains the snapshot.
grpc_slice request_payload_slice =
xds_client(), this, call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
+ return false;
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
XdsClient::ChannelState::LrsCallState::LrsCallState(
RefCountedPtr<RetryableCall<LrsCallState>> parent)
- : InternallyRefCounted<LrsCallState>(&grpc_xds_client_trace),
+ : InternallyRefCounted<LrsCallState>(
+ GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "LrsCallState"
+ : nullptr),
parent_(std::move(parent)) {
// Init the LRS call. Note that the call will progress every time there's
// activity in xds_client()->interested_parties_, which is comprised of
// the polling entities from client_channel.
GPR_ASSERT(xds_client() != nullptr);
const auto& method =
- xds_client()->bootstrap_->server().ShouldUseV3()
+ chand()->server_.ShouldUseV3()
? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
: GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
call_ = grpc_channel_create_pollset_set_call(
GPR_ASSERT(call_ != nullptr);
// Init the request payload.
grpc_slice request_payload_slice =
- xds_client()->api_.CreateLrsInitialRequest();
+ xds_client()->api_.CreateLrsInitialRequest(chand()->server_);
send_message_payload_ =
grpc_raw_byte_buffer_create(&request_payload_slice, 1);
grpc_slice_unref_internal(request_payload_slice);
{15000, 0, INT_MAX});
}
-grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap,
- grpc_error** error) {
- // Build channel args.
- absl::InlinedVector<grpc_arg, 2> args_to_add = {
- grpc_channel_arg_integer_create(
- const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
- 5 * 60 * GPR_MS_PER_SEC),
- grpc_channel_arg_integer_create(
- const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
- };
- grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
- g_channel_args, args_to_add.data(), args_to_add.size());
- // Find credentials and create channel.
- RefCountedPtr<grpc_channel_credentials> creds;
- for (const auto& channel_creds : bootstrap.server().channel_creds) {
- if (channel_creds.type == "google_default") {
- creds.reset(grpc_google_default_credentials_create(nullptr));
- break;
- }
- if (channel_creds.type == "insecure") {
- grpc_channel* channel = grpc_insecure_channel_create(
- bootstrap.server().server_uri.c_str(), new_args, nullptr);
- grpc_channel_args_destroy(new_args);
- return channel;
- }
- if (channel_creds.type == "fake") {
- creds.reset(grpc_fake_transport_security_credentials_create());
- break;
- }
- }
- if (creds == nullptr) {
- *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "no supported credential types found");
- return nullptr;
- }
- grpc_channel* channel = grpc_secure_channel_create(
- creds.get(), bootstrap.server().server_uri.c_str(), new_args, nullptr);
- grpc_channel_args_destroy(new_args);
- return channel;
-}
-
} // namespace
XdsClient::XdsClient(grpc_error** error)
- : DualRefCounted<XdsClient>(&grpc_xds_client_trace),
+ : DualRefCounted<XdsClient>(GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)
+ ? "XdsClient"
+ : nullptr),
request_timeout_(GetRequestTimeout()),
interested_parties_(grpc_pollset_set_create()),
bootstrap_(
XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
- api_(this, &grpc_xds_client_trace, bootstrap_.get()) {
+ api_(this, &grpc_xds_client_trace,
+ bootstrap_ == nullptr ? nullptr : bootstrap_->node()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
}
this, grpc_error_string(*error));
return;
}
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s", this,
- bootstrap_->server().server_uri.c_str());
- }
- grpc_channel* channel = CreateXdsChannel(*bootstrap_, error);
- if (*error != GRPC_ERROR_NONE) {
- gpr_log(GPR_ERROR, "[xds_client %p] failed to create xds channel: %s", this,
- grpc_error_string(*error));
- return;
- }
// Create ChannelState object.
chand_ = MakeOrphanable<ChannelState>(
- WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), channel);
+ WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server());
}
XdsClient::~XdsClient() {
auto it = load_report_map_
.emplace(std::make_pair(std::move(key), LoadReportState()))
.first;
- auto cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
- Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
- it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/);
- it->second.drop_stats.insert(cluster_drop_stats.get());
+ LoadReportState& load_report_state = it->second;
+ RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
+ if (load_report_state.drop_stats != nullptr) {
+ cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
+ }
+ if (cluster_drop_stats == nullptr) {
+ if (load_report_state.drop_stats != nullptr) {
+ load_report_state.deleted_drop_stats +=
+ load_report_state.drop_stats->GetSnapshotAndReset();
+ }
+ cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
+ Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
+ it->first.first /*cluster_name*/,
+ it->first.second /*eds_service_name*/);
+ load_report_state.drop_stats = cluster_drop_stats.get();
+ }
chand_->MaybeStartLrsCall();
return cluster_drop_stats;
}
absl::string_view eds_service_name,
XdsClusterDropStats* cluster_drop_stats) {
MutexLock lock(&mu_);
- auto load_report_it = load_report_map_.find(
- std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
- if (load_report_it == load_report_map_.end()) return;
- LoadReportState& load_report_state = load_report_it->second;
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
- auto it = load_report_state.drop_stats.find(cluster_drop_stats);
- if (it != load_report_state.drop_stats.end()) {
- // Record final drop stats in deleted_drop_stats, which will be
+ auto it = load_report_map_.find(
+ std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
+ if (it == load_report_map_.end()) return;
+ LoadReportState& load_report_state = it->second;
+ if (load_report_state.drop_stats == cluster_drop_stats) {
+ // Record final snapshot in deleted_drop_stats, which will be
// added to the next load report.
- auto dropped_requests = cluster_drop_stats->GetSnapshotAndReset();
- load_report_state.deleted_drop_stats += dropped_requests;
- load_report_state.drop_stats.erase(it);
+ load_report_state.deleted_drop_stats +=
+ load_report_state.drop_stats->GetSnapshotAndReset();
+ load_report_state.drop_stats = nullptr;
}
}
auto it = load_report_map_
.emplace(std::make_pair(std::move(key), LoadReportState()))
.first;
- auto cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
- Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
- it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
- locality);
- it->second.locality_stats[std::move(locality)].locality_stats.insert(
- cluster_locality_stats.get());
+ LoadReportState& load_report_state = it->second;
+ LoadReportState::LocalityState& locality_state =
+ load_report_state.locality_stats[locality];
+ RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
+ if (locality_state.locality_stats != nullptr) {
+ cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
+ }
+ if (cluster_locality_stats == nullptr) {
+ if (locality_state.locality_stats != nullptr) {
+ locality_state.deleted_locality_stats +=
+ locality_state.locality_stats->GetSnapshotAndReset();
+ }
+ cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
+ Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
+ it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
+ std::move(locality));
+ locality_state.locality_stats = cluster_locality_stats.get();
+ }
chand_->MaybeStartLrsCall();
return cluster_locality_stats;
}
const RefCountedPtr<XdsLocalityName>& locality,
XdsClusterLocalityStats* cluster_locality_stats) {
MutexLock lock(&mu_);
- auto load_report_it = load_report_map_.find(
- std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
- if (load_report_it == load_report_map_.end()) return;
- LoadReportState& load_report_state = load_report_it->second;
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
+ auto it = load_report_map_.find(
+ std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
+ if (it == load_report_map_.end()) return;
+ LoadReportState& load_report_state = it->second;
auto locality_it = load_report_state.locality_stats.find(locality);
if (locality_it == load_report_state.locality_stats.end()) return;
- auto& locality_set = locality_it->second.locality_stats;
- auto it = locality_set.find(cluster_locality_stats);
- if (it != locality_set.end()) {
+ LoadReportState::LocalityState& locality_state = locality_it->second;
+ if (locality_state.locality_stats == cluster_locality_stats) {
// Record final snapshot in deleted_locality_stats, which will be
// added to the next load report.
- locality_it->second.deleted_locality_stats.emplace_back(
- cluster_locality_stats->GetSnapshotAndReset());
- locality_set.erase(it);
+ locality_state.deleted_locality_stats +=
+ locality_state.locality_stats->GetSnapshotAndReset();
+ locality_state.locality_stats = nullptr;
}
}
XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
bool send_all_clusters, const std::set<std::string>& clusters) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
+ gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
+ }
XdsApi::ClusterLoadReportMap snapshot_map;
for (auto load_report_it = load_report_map_.begin();
load_report_it != load_report_map_.end();) {
XdsApi::ClusterLoadReport snapshot;
// Aggregate drop stats.
snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
- for (auto& drop_stats : load_report.drop_stats) {
- auto dropped_requests = drop_stats->GetSnapshotAndReset();
- snapshot.dropped_requests += dropped_requests;
+ if (load_report.drop_stats != nullptr) {
+ snapshot.dropped_requests +=
+ load_report.drop_stats->GetSnapshotAndReset();
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
+ gpr_log(GPR_INFO,
+ "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
+ this, cluster_key.first.c_str(), cluster_key.second.c_str(),
+ load_report.drop_stats);
+ }
}
// Aggregate locality stats.
for (auto it = load_report.locality_stats.begin();
auto& locality_state = it->second;
XdsClusterLocalityStats::Snapshot& locality_snapshot =
snapshot.locality_stats[locality_name];
- for (auto& locality_stats : locality_state.locality_stats) {
- locality_snapshot += locality_stats->GetSnapshotAndReset();
- }
- // Add final snapshots from recently deleted locality stats objects.
- for (auto& deleted_locality_stats :
- locality_state.deleted_locality_stats) {
- locality_snapshot += deleted_locality_stats;
+ locality_snapshot = std::move(locality_state.deleted_locality_stats);
+ if (locality_state.locality_stats != nullptr) {
+ locality_snapshot +=
+ locality_state.locality_stats->GetSnapshotAndReset();
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
+ gpr_log(GPR_INFO,
+ "[xds_client %p] cluster=%s eds_service_name=%s "
+ "locality=%s locality_stats=%p",
+ this, cluster_key.first.c_str(), cluster_key.second.c_str(),
+ locality_name->AsHumanReadableString().c_str(),
+ locality_state.locality_stats);
+ }
}
- locality_state.deleted_locality_stats.clear();
// If the only thing left in this entry was final snapshots from
// deleted locality stats objects, remove the entry.
- if (locality_state.locality_stats.empty()) {
+ if (locality_state.locality_stats == nullptr) {
it = load_report.locality_stats.erase(it);
} else {
++it;
}
}
+ // Compute load report interval.
+ const grpc_millis now = ExecCtx::Get()->Now();
+ snapshot.load_report_interval = now - load_report.last_report_time;
+ load_report.last_report_time = now;
+ // Record snapshot.
if (record_stats) {
- // Compute load report interval.
- const grpc_millis now = ExecCtx::Get()->Now();
- snapshot.load_report_interval = now - load_report.last_report_time;
- load_report.last_report_time = now;
- // Record snapshot.
snapshot_map[cluster_key] = std::move(snapshot);
}
// If the only thing left in this entry was final snapshots from
// deleted stats objects, remove the entry.
- if (load_report.locality_stats.empty() && load_report.drop_stats.empty()) {
+ if (load_report.locality_stats.empty() &&
+ load_report.drop_stats == nullptr) {
load_report_it = load_report_map_.erase(load_report_it);
} else {
++load_report_it;