private:
class ResourceState : public InternallyRefCounted<ResourceState> {
public:
- ResourceState(const std::string& type_url, const std::string& name,
- bool sent_initial_request)
- : type_url_(type_url),
- name_(name),
- sent_initial_request_(sent_initial_request) {
+ ResourceState(const std::string& type_url, const std::string& name)
+ : type_url_(type_url), name_(name) {
GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
grpc_schedule_on_exec_ctx);
}
}
void Start(RefCountedPtr<AdsCallState> ads_calld) {
- if (sent_initial_request_) return;
- sent_initial_request_ = true;
+ if (sent_) return;
+ sent_ = true;
ads_calld_ = std::move(ads_calld);
Ref(DEBUG_LOCATION, "timer").release();
timer_pending_ = true;
const std::string name_;
RefCountedPtr<AdsCallState> ads_calld_;
- bool sent_initial_request_;
+ bool sent_ = false;
bool timer_pending_ = false;
grpc_timer timer_;
grpc_closure timer_callback_;
struct ResourceTypeState {
~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
- // Nonce and error for this resource type.
+ // Version, nonce, and error for this resource type.
+ std::string version;
std::string nonce;
grpc_error* error = GRPC_ERROR_NONE;
void ScheduleNextReportLocked();
static void OnNextReportTimer(void* arg, grpc_error* error);
bool OnNextReportTimerLocked(grpc_error* error);
- bool SendReportLocked();
+ void SendReportLocked();
static void OnReportDone(void* arg, grpc_error* error);
bool OnReportDoneLocked(grpc_error* error);
XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
grpc_channel* channel)
- : InternallyRefCounted<ChannelState>(
- GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "ChannelState"
- : nullptr),
+ : InternallyRefCounted<ChannelState>(&grpc_xds_client_trace),
xds_client_(std::move(xds_client)),
channel_(channel) {
GPR_ASSERT(channel_ != nullptr);
XdsClient::ChannelState::AdsCallState::AdsCallState(
RefCountedPtr<RetryableCall<AdsCallState>> parent)
- : InternallyRefCounted<AdsCallState>(
- GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "AdsCallState"
- : nullptr),
+ : InternallyRefCounted<AdsCallState>(&grpc_xds_client_trace),
parent_(std::move(parent)) {
// Init the ADS call. Note that the call will progress every time there's
// activity in xds_client()->interested_parties_, which is comprised of
std::set<absl::string_view> resource_names =
ResourceNamesForRequest(type_url);
request_payload_slice = xds_client()->api_.CreateAdsRequest(
- type_url, resource_names, xds_client()->resource_version_map_[type_url],
- state.nonce, GRPC_ERROR_REF(state.error), !sent_initial_message_);
+ type_url, resource_names, state.version, state.nonce,
+ GRPC_ERROR_REF(state.error), !sent_initial_message_);
if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
state_map_.erase(type_url);
gpr_log(GPR_INFO,
"[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
"error=%s resources=%s",
- xds_client(), type_url.c_str(),
- xds_client()->resource_version_map_[type_url].c_str(),
+ xds_client(), type_url.c_str(), state.version.c_str(),
state.nonce.c_str(), grpc_error_string(state.error),
absl::StrJoin(resource_names, " ").c_str());
}
const std::string& type_url, const std::string& name) {
auto& state = state_map_[type_url].subscribed_resources[name];
if (state == nullptr) {
- state = MakeOrphanable<ResourceState>(
- type_url, name, !xds_client()->resource_version_map_[type_url].empty());
+ state = MakeOrphanable<ResourceState>(type_url, name);
SendMessageLocked(type_url);
}
}
} else if (result.type_url == XdsApi::kEdsTypeUrl) {
AcceptEdsUpdate(std::move(result.eds_update_map));
}
- xds_client()->resource_version_map_[result.type_url] =
- std::move(result.version);
+ state.version = std::move(result.version);
// ACK the update.
SendMessageLocked(result.type_url);
// Start load reporting if needed.
GRPC_ERROR_UNREF(error);
return true;
}
- return SendReportLocked();
+ SendReportLocked();
+ return false;
}
namespace {
} // namespace
-bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
+void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
// Construct snapshot from all reported stats.
XdsApi::ClusterLoadReportMap snapshot =
xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
const bool old_val = last_report_counters_were_zero_;
last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
if (old_val && last_report_counters_were_zero_) {
- if (xds_client()->load_report_map_.empty()) {
- parent_->chand()->StopLrsCall();
- return true;
- }
ScheduleNextReportLocked();
- return false;
+ return;
}
// Create a request that contains the snapshot.
grpc_slice request_payload_slice =
xds_client(), this, call_error);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}
- return false;
}
void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
XdsClient::ChannelState::LrsCallState::LrsCallState(
RefCountedPtr<RetryableCall<LrsCallState>> parent)
- : InternallyRefCounted<LrsCallState>(
- GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "LrsCallState"
- : nullptr),
+ : InternallyRefCounted<LrsCallState>(&grpc_xds_client_trace),
parent_(std::move(parent)) {
// Init the LRS call. Note that the call will progress every time there's
// activity in xds_client()->interested_parties_, which is comprised of
} // namespace
XdsClient::XdsClient(grpc_error** error)
- : DualRefCounted<XdsClient>(GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)
- ? "XdsClient"
- : nullptr),
+ : DualRefCounted<XdsClient>(&grpc_xds_client_trace),
request_timeout_(GetRequestTimeout()),
interested_parties_(grpc_pollset_set_create()),
bootstrap_(
auto it = load_report_map_
.emplace(std::make_pair(std::move(key), LoadReportState()))
.first;
- LoadReportState& load_report_state = it->second;
- RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
- if (load_report_state.drop_stats != nullptr) {
- cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
- }
- if (cluster_drop_stats == nullptr) {
- if (load_report_state.drop_stats != nullptr) {
- load_report_state.deleted_drop_stats +=
- load_report_state.drop_stats->GetSnapshotAndReset();
- }
- cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
- Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
- it->first.first /*cluster_name*/,
- it->first.second /*eds_service_name*/);
- load_report_state.drop_stats = cluster_drop_stats.get();
- }
+ auto cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
+ Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
+ it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/);
+ it->second.drop_stats.insert(cluster_drop_stats.get());
chand_->MaybeStartLrsCall();
return cluster_drop_stats;
}
absl::string_view eds_service_name,
XdsClusterDropStats* cluster_drop_stats) {
MutexLock lock(&mu_);
+ auto load_report_it = load_report_map_.find(
+ std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
+ if (load_report_it == load_report_map_.end()) return;
+ LoadReportState& load_report_state = load_report_it->second;
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
- auto it = load_report_map_.find(
- std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
- if (it == load_report_map_.end()) return;
- LoadReportState& load_report_state = it->second;
- if (load_report_state.drop_stats == cluster_drop_stats) {
- // Record final snapshot in deleted_drop_stats, which will be
+ auto it = load_report_state.drop_stats.find(cluster_drop_stats);
+ if (it != load_report_state.drop_stats.end()) {
+ // Record final drop stats in deleted_drop_stats, which will be
// added to the next load report.
- load_report_state.deleted_drop_stats +=
- load_report_state.drop_stats->GetSnapshotAndReset();
- load_report_state.drop_stats = nullptr;
+ auto dropped_requests = cluster_drop_stats->GetSnapshotAndReset();
+ load_report_state.deleted_drop_stats += dropped_requests;
+ load_report_state.drop_stats.erase(it);
}
}
auto it = load_report_map_
.emplace(std::make_pair(std::move(key), LoadReportState()))
.first;
- LoadReportState& load_report_state = it->second;
- LoadReportState::LocalityState& locality_state =
- load_report_state.locality_stats[locality];
- RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
- if (locality_state.locality_stats != nullptr) {
- cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
- }
- if (cluster_locality_stats == nullptr) {
- if (locality_state.locality_stats != nullptr) {
- locality_state.deleted_locality_stats +=
- locality_state.locality_stats->GetSnapshotAndReset();
- }
- cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
- Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
- it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
- std::move(locality));
- locality_state.locality_stats = cluster_locality_stats.get();
- }
+ auto cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
+ Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
+ it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
+ locality);
+ it->second.locality_stats[std::move(locality)].locality_stats.insert(
+ cluster_locality_stats.get());
chand_->MaybeStartLrsCall();
return cluster_locality_stats;
}
const RefCountedPtr<XdsLocalityName>& locality,
XdsClusterLocalityStats* cluster_locality_stats) {
MutexLock lock(&mu_);
+ auto load_report_it = load_report_map_.find(
+ std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
+ if (load_report_it == load_report_map_.end()) return;
+ LoadReportState& load_report_state = load_report_it->second;
// TODO(roth): When we add support for direct federation, use the
// server name specified in lrs_server.
- auto it = load_report_map_.find(
- std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
- if (it == load_report_map_.end()) return;
- LoadReportState& load_report_state = it->second;
auto locality_it = load_report_state.locality_stats.find(locality);
if (locality_it == load_report_state.locality_stats.end()) return;
- LoadReportState::LocalityState& locality_state = locality_it->second;
- if (locality_state.locality_stats == cluster_locality_stats) {
+ auto& locality_set = locality_it->second.locality_stats;
+ auto it = locality_set.find(cluster_locality_stats);
+ if (it != locality_set.end()) {
// Record final snapshot in deleted_locality_stats, which will be
// added to the next load report.
- locality_state.deleted_locality_stats +=
- locality_state.locality_stats->GetSnapshotAndReset();
- locality_state.locality_stats = nullptr;
+ locality_it->second.deleted_locality_stats.emplace_back(
+ cluster_locality_stats->GetSnapshotAndReset());
+ locality_set.erase(it);
}
}
XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
bool send_all_clusters, const std::set<std::string>& clusters) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
- }
XdsApi::ClusterLoadReportMap snapshot_map;
for (auto load_report_it = load_report_map_.begin();
load_report_it != load_report_map_.end();) {
XdsApi::ClusterLoadReport snapshot;
// Aggregate drop stats.
snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
- if (load_report.drop_stats != nullptr) {
- snapshot.dropped_requests +=
- load_report.drop_stats->GetSnapshotAndReset();
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
- this, cluster_key.first.c_str(), cluster_key.second.c_str(),
- load_report.drop_stats);
- }
+ for (auto& drop_stats : load_report.drop_stats) {
+ auto dropped_requests = drop_stats->GetSnapshotAndReset();
+ snapshot.dropped_requests += dropped_requests;
}
// Aggregate locality stats.
for (auto it = load_report.locality_stats.begin();
auto& locality_state = it->second;
XdsClusterLocalityStats::Snapshot& locality_snapshot =
snapshot.locality_stats[locality_name];
- locality_snapshot = std::move(locality_state.deleted_locality_stats);
- if (locality_state.locality_stats != nullptr) {
- locality_snapshot +=
- locality_state.locality_stats->GetSnapshotAndReset();
- if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
- gpr_log(GPR_INFO,
- "[xds_client %p] cluster=%s eds_service_name=%s "
- "locality=%s locality_stats=%p",
- this, cluster_key.first.c_str(), cluster_key.second.c_str(),
- locality_name->AsHumanReadableString().c_str(),
- locality_state.locality_stats);
- }
+ for (auto& locality_stats : locality_state.locality_stats) {
+ locality_snapshot += locality_stats->GetSnapshotAndReset();
+ }
+ // Add final snapshots from recently deleted locality stats objects.
+ for (auto& deleted_locality_stats :
+ locality_state.deleted_locality_stats) {
+ locality_snapshot += deleted_locality_stats;
}
+ locality_state.deleted_locality_stats.clear();
// If the only thing left in this entry was final snapshots from
// deleted locality stats objects, remove the entry.
- if (locality_state.locality_stats == nullptr) {
+ if (locality_state.locality_stats.empty()) {
it = load_report.locality_stats.erase(it);
} else {
++it;
}
}
- // Compute load report interval.
- const grpc_millis now = ExecCtx::Get()->Now();
- snapshot.load_report_interval = now - load_report.last_report_time;
- load_report.last_report_time = now;
- // Record snapshot.
if (record_stats) {
+ // Compute load report interval.
+ const grpc_millis now = ExecCtx::Get()->Now();
+ snapshot.load_report_interval = now - load_report.last_report_time;
+ load_report.last_report_time = now;
+ // Record snapshot.
snapshot_map[cluster_key] = std::move(snapshot);
}
// If the only thing left in this entry was final snapshots from
// deleted stats objects, remove the entry.
- if (load_report.locality_stats.empty() &&
- load_report.drop_stats == nullptr) {
+ if (load_report.locality_stats.empty() && load_report.drop_stats.empty()) {
load_report_it = load_report_map_.erase(load_report_it);
} else {
++load_report_it;