Imported Upstream version 1.33.1
[platform/upstream/grpc.git] / src / core / ext / xds / xds_client.cc
index 86405de..6d44864 100644 (file)
@@ -143,11 +143,8 @@ class XdsClient::ChannelState::AdsCallState
  private:
   class ResourceState : public InternallyRefCounted<ResourceState> {
    public:
-    ResourceState(const std::string& type_url, const std::string& name,
-                  bool sent_initial_request)
-        : type_url_(type_url),
-          name_(name),
-          sent_initial_request_(sent_initial_request) {
+    ResourceState(const std::string& type_url, const std::string& name)
+        : type_url_(type_url), name_(name) {
       GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
                         grpc_schedule_on_exec_ctx);
     }
@@ -158,8 +155,8 @@ class XdsClient::ChannelState::AdsCallState
     }
 
     void Start(RefCountedPtr<AdsCallState> ads_calld) {
-      if (sent_initial_request_) return;
-      sent_initial_request_ = true;
+      if (sent_) return;
+      sent_ = true;
       ads_calld_ = std::move(ads_calld);
       Ref(DEBUG_LOCATION, "timer").release();
       timer_pending_ = true;
@@ -232,7 +229,7 @@ class XdsClient::ChannelState::AdsCallState
     const std::string name_;
 
     RefCountedPtr<AdsCallState> ads_calld_;
-    bool sent_initial_request_;
+    bool sent_ = false;
     bool timer_pending_ = false;
     grpc_timer timer_;
     grpc_closure timer_callback_;
@@ -241,7 +238,8 @@ class XdsClient::ChannelState::AdsCallState
   struct ResourceTypeState {
     ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
 
-    // Nonce and error for this resource type.
+    // Version, nonce, and error for this resource type.
+    std::string version;
     std::string nonce;
     grpc_error* error = GRPC_ERROR_NONE;
 
@@ -338,7 +336,7 @@ class XdsClient::ChannelState::LrsCallState
     void ScheduleNextReportLocked();
     static void OnNextReportTimer(void* arg, grpc_error* error);
     bool OnNextReportTimerLocked(grpc_error* error);
-    bool SendReportLocked();
+    void SendReportLocked();
     static void OnReportDone(void* arg, grpc_error* error);
     bool OnReportDoneLocked(grpc_error* error);
 
@@ -435,9 +433,7 @@ class XdsClient::ChannelState::StateWatcher
 
 XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
                                       grpc_channel* channel)
-    : InternallyRefCounted<ChannelState>(
-          GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "ChannelState"
-                                                         : nullptr),
+    : InternallyRefCounted<ChannelState>(&grpc_xds_client_trace),
       xds_client_(std::move(xds_client)),
       channel_(channel) {
   GPR_ASSERT(channel_ != nullptr);
@@ -638,9 +634,7 @@ void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
 
 XdsClient::ChannelState::AdsCallState::AdsCallState(
     RefCountedPtr<RetryableCall<AdsCallState>> parent)
-    : InternallyRefCounted<AdsCallState>(
-          GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "AdsCallState"
-                                                         : nullptr),
+    : InternallyRefCounted<AdsCallState>(&grpc_xds_client_trace),
       parent_(std::move(parent)) {
   // Init the ADS call. Note that the call will progress every time there's
   // activity in xds_client()->interested_parties_, which is comprised of
@@ -769,8 +763,8 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
   std::set<absl::string_view> resource_names =
       ResourceNamesForRequest(type_url);
   request_payload_slice = xds_client()->api_.CreateAdsRequest(
-      type_url, resource_names, xds_client()->resource_version_map_[type_url],
-      state.nonce, GRPC_ERROR_REF(state.error), !sent_initial_message_);
+      type_url, resource_names, state.version, state.nonce,
+      GRPC_ERROR_REF(state.error), !sent_initial_message_);
   if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
       type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
     state_map_.erase(type_url);
@@ -780,8 +774,7 @@ void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
     gpr_log(GPR_INFO,
             "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
             "error=%s resources=%s",
-            xds_client(), type_url.c_str(),
-            xds_client()->resource_version_map_[type_url].c_str(),
+            xds_client(), type_url.c_str(), state.version.c_str(),
             state.nonce.c_str(), grpc_error_string(state.error),
             absl::StrJoin(resource_names, " ").c_str());
   }
@@ -813,8 +806,7 @@ void XdsClient::ChannelState::AdsCallState::Subscribe(
     const std::string& type_url, const std::string& name) {
   auto& state = state_map_[type_url].subscribed_resources[name];
   if (state == nullptr) {
-    state = MakeOrphanable<ResourceState>(
-        type_url, name, !xds_client()->resource_version_map_[type_url].empty());
+    state = MakeOrphanable<ResourceState>(type_url, name);
     SendMessageLocked(type_url);
   }
 }
@@ -1178,8 +1170,7 @@ bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
       } else if (result.type_url == XdsApi::kEdsTypeUrl) {
         AcceptEdsUpdate(std::move(result.eds_update_map));
       }
-      xds_client()->resource_version_map_[result.type_url] =
-          std::move(result.version);
+      state.version = std::move(result.version);
       // ACK the update.
       SendMessageLocked(result.type_url);
       // Start load reporting if needed.
@@ -1296,7 +1287,8 @@ bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
     GRPC_ERROR_UNREF(error);
     return true;
   }
-  return SendReportLocked();
+  SendReportLocked();
+  return false;
 }
 
 namespace {
@@ -1315,7 +1307,7 @@ bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
 
 }  // namespace
 
-bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
+void XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
   // Construct snapshot from all reported stats.
   XdsApi::ClusterLoadReportMap snapshot =
       xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
@@ -1325,12 +1317,8 @@ bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
   const bool old_val = last_report_counters_were_zero_;
   last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
   if (old_val && last_report_counters_were_zero_) {
-    if (xds_client()->load_report_map_.empty()) {
-      parent_->chand()->StopLrsCall();
-      return true;
-    }
     ScheduleNextReportLocked();
-    return false;
+    return;
   }
   // Create a request that contains the snapshot.
   grpc_slice request_payload_slice =
@@ -1351,7 +1339,6 @@ bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
             xds_client(), this, call_error);
     GPR_ASSERT(GRPC_CALL_OK == call_error);
   }
-  return false;
 }
 
 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
@@ -1394,9 +1381,7 @@ bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
 
 XdsClient::ChannelState::LrsCallState::LrsCallState(
     RefCountedPtr<RetryableCall<LrsCallState>> parent)
-    : InternallyRefCounted<LrsCallState>(
-          GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "LrsCallState"
-                                                         : nullptr),
+    : InternallyRefCounted<LrsCallState>(&grpc_xds_client_trace),
       parent_(std::move(parent)) {
   // Init the LRS call. Note that the call will progress every time there's
   // activity in xds_client()->interested_parties_, which is comprised of
@@ -1748,9 +1733,7 @@ grpc_channel* CreateXdsChannel(const XdsBootstrap& bootstrap,
 }  // namespace
 
 XdsClient::XdsClient(grpc_error** error)
-    : DualRefCounted<XdsClient>(GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)
-                                    ? "XdsClient"
-                                    : nullptr),
+    : DualRefCounted<XdsClient>(&grpc_xds_client_trace),
       request_timeout_(GetRequestTimeout()),
       interested_parties_(grpc_pollset_set_create()),
       bootstrap_(
@@ -1999,22 +1982,10 @@ RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
   auto it = load_report_map_
                 .emplace(std::make_pair(std::move(key), LoadReportState()))
                 .first;
-  LoadReportState& load_report_state = it->second;
-  RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
-  if (load_report_state.drop_stats != nullptr) {
-    cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
-  }
-  if (cluster_drop_stats == nullptr) {
-    if (load_report_state.drop_stats != nullptr) {
-      load_report_state.deleted_drop_stats +=
-          load_report_state.drop_stats->GetSnapshotAndReset();
-    }
-    cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
-        Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
-        it->first.first /*cluster_name*/,
-        it->first.second /*eds_service_name*/);
-    load_report_state.drop_stats = cluster_drop_stats.get();
-  }
+  auto cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
+      Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
+      it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/);
+  it->second.drop_stats.insert(cluster_drop_stats.get());
   chand_->MaybeStartLrsCall();
   return cluster_drop_stats;
 }
@@ -2024,18 +1995,19 @@ void XdsClient::RemoveClusterDropStats(
     absl::string_view eds_service_name,
     XdsClusterDropStats* cluster_drop_stats) {
   MutexLock lock(&mu_);
+  auto load_report_it = load_report_map_.find(
+      std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
+  if (load_report_it == load_report_map_.end()) return;
+  LoadReportState& load_report_state = load_report_it->second;
   // TODO(roth): When we add support for direct federation, use the
   // server name specified in lrs_server.
-  auto it = load_report_map_.find(
-      std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
-  if (it == load_report_map_.end()) return;
-  LoadReportState& load_report_state = it->second;
-  if (load_report_state.drop_stats == cluster_drop_stats) {
-    // Record final snapshot in deleted_drop_stats, which will be
+  auto it = load_report_state.drop_stats.find(cluster_drop_stats);
+  if (it != load_report_state.drop_stats.end()) {
+    // Record final drop stats in deleted_drop_stats, which will be
     // added to the next load report.
-    load_report_state.deleted_drop_stats +=
-        load_report_state.drop_stats->GetSnapshotAndReset();
-    load_report_state.drop_stats = nullptr;
+    auto dropped_requests = cluster_drop_stats->GetSnapshotAndReset();
+    load_report_state.deleted_drop_stats += dropped_requests;
+    load_report_state.drop_stats.erase(it);
   }
 }
 
@@ -2054,24 +2026,12 @@ RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
   auto it = load_report_map_
                 .emplace(std::make_pair(std::move(key), LoadReportState()))
                 .first;
-  LoadReportState& load_report_state = it->second;
-  LoadReportState::LocalityState& locality_state =
-      load_report_state.locality_stats[locality];
-  RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
-  if (locality_state.locality_stats != nullptr) {
-    cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
-  }
-  if (cluster_locality_stats == nullptr) {
-    if (locality_state.locality_stats != nullptr) {
-      locality_state.deleted_locality_stats +=
-          locality_state.locality_stats->GetSnapshotAndReset();
-    }
-    cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
-        Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
-        it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
-        std::move(locality));
-    locality_state.locality_stats = cluster_locality_stats.get();
-  }
+  auto cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
+      Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
+      it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
+      locality);
+  it->second.locality_stats[std::move(locality)].locality_stats.insert(
+      cluster_locality_stats.get());
   chand_->MaybeStartLrsCall();
   return cluster_locality_stats;
 }
@@ -2082,21 +2042,22 @@ void XdsClient::RemoveClusterLocalityStats(
     const RefCountedPtr<XdsLocalityName>& locality,
     XdsClusterLocalityStats* cluster_locality_stats) {
   MutexLock lock(&mu_);
+  auto load_report_it = load_report_map_.find(
+      std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
+  if (load_report_it == load_report_map_.end()) return;
+  LoadReportState& load_report_state = load_report_it->second;
   // TODO(roth): When we add support for direct federation, use the
   // server name specified in lrs_server.
-  auto it = load_report_map_.find(
-      std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
-  if (it == load_report_map_.end()) return;
-  LoadReportState& load_report_state = it->second;
   auto locality_it = load_report_state.locality_stats.find(locality);
   if (locality_it == load_report_state.locality_stats.end()) return;
-  LoadReportState::LocalityState& locality_state = locality_it->second;
-  if (locality_state.locality_stats == cluster_locality_stats) {
+  auto& locality_set = locality_it->second.locality_stats;
+  auto it = locality_set.find(cluster_locality_stats);
+  if (it != locality_set.end()) {
     // Record final snapshot in deleted_locality_stats, which will be
     // added to the next load report.
-    locality_state.deleted_locality_stats +=
-        locality_state.locality_stats->GetSnapshotAndReset();
-    locality_state.locality_stats = nullptr;
+    locality_it->second.deleted_locality_stats.emplace_back(
+        cluster_locality_stats->GetSnapshotAndReset());
+    locality_set.erase(it);
   }
 }
 
@@ -2137,9 +2098,6 @@ void XdsClient::NotifyOnErrorLocked(grpc_error* error) {
 
 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
     bool send_all_clusters, const std::set<std::string>& clusters) {
-  if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
-    gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
-  }
   XdsApi::ClusterLoadReportMap snapshot_map;
   for (auto load_report_it = load_report_map_.begin();
        load_report_it != load_report_map_.end();) {
@@ -2158,15 +2116,9 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
     XdsApi::ClusterLoadReport snapshot;
     // Aggregate drop stats.
     snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
-    if (load_report.drop_stats != nullptr) {
-      snapshot.dropped_requests +=
-          load_report.drop_stats->GetSnapshotAndReset();
-      if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
-        gpr_log(GPR_INFO,
-                "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
-                this, cluster_key.first.c_str(), cluster_key.second.c_str(),
-                load_report.drop_stats);
-      }
+    for (auto& drop_stats : load_report.drop_stats) {
+      auto dropped_requests = drop_stats->GetSnapshotAndReset();
+      snapshot.dropped_requests += dropped_requests;
     }
     // Aggregate locality stats.
     for (auto it = load_report.locality_stats.begin();
@@ -2175,39 +2127,34 @@ XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
       auto& locality_state = it->second;
       XdsClusterLocalityStats::Snapshot& locality_snapshot =
           snapshot.locality_stats[locality_name];
-      locality_snapshot = std::move(locality_state.deleted_locality_stats);
-      if (locality_state.locality_stats != nullptr) {
-        locality_snapshot +=
-            locality_state.locality_stats->GetSnapshotAndReset();
-        if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
-          gpr_log(GPR_INFO,
-                  "[xds_client %p] cluster=%s eds_service_name=%s "
-                  "locality=%s locality_stats=%p",
-                  this, cluster_key.first.c_str(), cluster_key.second.c_str(),
-                  locality_name->AsHumanReadableString().c_str(),
-                  locality_state.locality_stats);
-        }
+      for (auto& locality_stats : locality_state.locality_stats) {
+        locality_snapshot += locality_stats->GetSnapshotAndReset();
+      }
+      // Add final snapshots from recently deleted locality stats objects.
+      for (auto& deleted_locality_stats :
+           locality_state.deleted_locality_stats) {
+        locality_snapshot += deleted_locality_stats;
       }
+      locality_state.deleted_locality_stats.clear();
       // If the only thing left in this entry was final snapshots from
       // deleted locality stats objects, remove the entry.
-      if (locality_state.locality_stats == nullptr) {
+      if (locality_state.locality_stats.empty()) {
         it = load_report.locality_stats.erase(it);
       } else {
         ++it;
       }
     }
-    // Compute load report interval.
-    const grpc_millis now = ExecCtx::Get()->Now();
-    snapshot.load_report_interval = now - load_report.last_report_time;
-    load_report.last_report_time = now;
-    // Record snapshot.
     if (record_stats) {
+      // Compute load report interval.
+      const grpc_millis now = ExecCtx::Get()->Now();
+      snapshot.load_report_interval = now - load_report.last_report_time;
+      load_report.last_report_time = now;
+      // Record snapshot.
       snapshot_map[cluster_key] = std::move(snapshot);
     }
     // If the only thing left in this entry was final snapshots from
     // deleted stats objects, remove the entry.
-    if (load_report.locality_stats.empty() &&
-        load_report.drop_stats == nullptr) {
+    if (load_report.locality_stats.empty() && load_report.drop_stats.empty()) {
       load_report_it = load_report_map_.erase(load_report_it);
     } else {
       ++load_report_it;