Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / xds / xds_api.cc
index 12b61a2..43e9e69 100644 (file)
 #include <grpc/support/string_util.h>
 
 #include "src/core/ext/filters/client_channel/xds/xds_api.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
 #include "src/core/lib/iomgr/error.h"
 #include "src/core/lib/iomgr/sockaddr_utils.h"
 
+#include "envoy/api/v2/cds.upb.h"
 #include "envoy/api/v2/core/address.upb.h"
 #include "envoy/api/v2/core/base.upb.h"
+#include "envoy/api/v2/core/config_source.upb.h"
 #include "envoy/api/v2/core/health_check.upb.h"
 #include "envoy/api/v2/discovery.upb.h"
 #include "envoy/api/v2/eds.upb.h"
 #include "google/protobuf/any.upb.h"
 #include "google/protobuf/duration.upb.h"
 #include "google/protobuf/struct.upb.h"
-#include "google/protobuf/timestamp.upb.h"
 #include "google/protobuf/wrappers.upb.h"
+#include "google/rpc/status.upb.h"
 #include "upb/upb.h"
 
 namespace grpc_core {
 
-namespace {
-
-constexpr char kEdsTypeUrl[] =
-    "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
-
-}  // namespace
-
 bool XdsPriorityListUpdate::operator==(
     const XdsPriorityListUpdate& other) const {
   if (priorities_.size() != other.priorities_.size()) return false;
@@ -88,8 +84,7 @@ bool XdsPriorityListUpdate::Contains(
   return false;
 }
 
-bool XdsDropConfig::ShouldDrop(
-    const grpc_core::UniquePtr<char>** category_name) const {
+bool XdsDropConfig::ShouldDrop(const std::string** category_name) const {
   for (size_t i = 0; i < drop_category_list_.size(); ++i) {
     const auto& drop_category = drop_category_list_[i];
     // Generate a random number in [0, 1000000).
@@ -199,20 +194,141 @@ void PopulateNode(upb_arena* arena, const XdsBootstrap::Node* node,
 
 }  // namespace
 
-grpc_slice XdsEdsRequestCreateAndEncode(const char* server_name,
-                                        const XdsBootstrap::Node* node,
-                                        const char* build_version) {
+grpc_slice XdsUnsupportedTypeNackRequestCreateAndEncode(
+    const std::string& type_url, const std::string& nonce, grpc_error* error) {
   upb::Arena arena;
   // Create a request.
   envoy_api_v2_DiscoveryRequest* request =
       envoy_api_v2_DiscoveryRequest_new(arena.ptr());
-  envoy_api_v2_core_Node* node_msg =
-      envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
-  PopulateNode(arena.ptr(), node, build_version, node_msg);
-  envoy_api_v2_DiscoveryRequest_add_resource_names(
-      request, upb_strview_makez(server_name), arena.ptr());
+  // Set type_url.
+  envoy_api_v2_DiscoveryRequest_set_type_url(
+      request, upb_strview_makez(type_url.c_str()));
+  // Set nonce.
+  envoy_api_v2_DiscoveryRequest_set_response_nonce(
+      request, upb_strview_makez(nonce.c_str()));
+  // Set error_detail.
+  grpc_slice error_description_slice;
+  GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION,
+                                &error_description_slice));
+  upb_strview error_description_strview =
+      upb_strview_make(reinterpret_cast<const char*>(
+                           GPR_SLICE_START_PTR(error_description_slice)),
+                       GPR_SLICE_LENGTH(error_description_slice));
+  google_rpc_Status* error_detail =
+      envoy_api_v2_DiscoveryRequest_mutable_error_detail(request, arena.ptr());
+  google_rpc_Status_set_message(error_detail, error_description_strview);
+  GRPC_ERROR_UNREF(error);
+  // Encode the request.
+  size_t output_length;
+  char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(),
+                                                         &output_length);
+  return grpc_slice_from_copied_buffer(output, output_length);
+}
+
+grpc_slice XdsCdsRequestCreateAndEncode(
+    const std::set<StringView>& cluster_names, const XdsBootstrap::Node* node,
+    const char* build_version, const std::string& version,
+    const std::string& nonce, grpc_error* error) {
+  upb::Arena arena;
+  // Create a request.
+  envoy_api_v2_DiscoveryRequest* request =
+      envoy_api_v2_DiscoveryRequest_new(arena.ptr());
+  // Set version_info.
+  if (!version.empty()) {
+    envoy_api_v2_DiscoveryRequest_set_version_info(
+        request, upb_strview_makez(version.c_str()));
+  }
+  // Populate node.
+  if (build_version != nullptr) {
+    envoy_api_v2_core_Node* node_msg =
+        envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
+    PopulateNode(arena.ptr(), node, build_version, node_msg);
+  }
+  // Add resource_names.
+  for (const auto& cluster_name : cluster_names) {
+    envoy_api_v2_DiscoveryRequest_add_resource_names(
+        request, upb_strview_make(cluster_name.data(), cluster_name.size()),
+        arena.ptr());
+  }
+  // Set type_url.
+  envoy_api_v2_DiscoveryRequest_set_type_url(request,
+                                             upb_strview_makez(kCdsTypeUrl));
+  // Set nonce.
+  if (!nonce.empty()) {
+    envoy_api_v2_DiscoveryRequest_set_response_nonce(
+        request, upb_strview_makez(nonce.c_str()));
+  }
+  // Set error_detail if it's a NACK.
+  if (error != GRPC_ERROR_NONE) {
+    grpc_slice error_description_slice;
+    GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION,
+                                  &error_description_slice));
+    upb_strview error_description_strview =
+        upb_strview_make(reinterpret_cast<const char*>(
+                             GPR_SLICE_START_PTR(error_description_slice)),
+                         GPR_SLICE_LENGTH(error_description_slice));
+    google_rpc_Status* error_detail =
+        envoy_api_v2_DiscoveryRequest_mutable_error_detail(request,
+                                                           arena.ptr());
+    google_rpc_Status_set_message(error_detail, error_description_strview);
+    GRPC_ERROR_UNREF(error);
+  }
+  // Encode the request.
+  size_t output_length;
+  char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(),
+                                                         &output_length);
+  return grpc_slice_from_copied_buffer(output, output_length);
+}
+
+grpc_slice XdsEdsRequestCreateAndEncode(
+    const std::set<StringView>& eds_service_names,
+    const XdsBootstrap::Node* node, const char* build_version,
+    const std::string& version, const std::string& nonce, grpc_error* error) {
+  upb::Arena arena;
+  // Create a request.
+  envoy_api_v2_DiscoveryRequest* request =
+      envoy_api_v2_DiscoveryRequest_new(arena.ptr());
+  // Set version_info.
+  if (!version.empty()) {
+    envoy_api_v2_DiscoveryRequest_set_version_info(
+        request, upb_strview_makez(version.c_str()));
+  }
+  // Populate node.
+  if (build_version != nullptr) {
+    envoy_api_v2_core_Node* node_msg =
+        envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
+    PopulateNode(arena.ptr(), node, build_version, node_msg);
+  }
+  // Add resource_names.
+  for (const auto& eds_service_name : eds_service_names) {
+    envoy_api_v2_DiscoveryRequest_add_resource_names(
+        request,
+        upb_strview_make(eds_service_name.data(), eds_service_name.size()),
+        arena.ptr());
+  }
+  // Set type_url.
   envoy_api_v2_DiscoveryRequest_set_type_url(request,
                                              upb_strview_makez(kEdsTypeUrl));
+  // Set nonce.
+  if (!nonce.empty()) {
+    envoy_api_v2_DiscoveryRequest_set_response_nonce(
+        request, upb_strview_makez(nonce.c_str()));
+  }
+  // Set error_detail if it's a NACK.
+  if (error != GRPC_ERROR_NONE) {
+    grpc_slice error_description_slice;
+    GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION,
+                                  &error_description_slice));
+    upb_strview error_description_strview =
+        upb_strview_make(reinterpret_cast<const char*>(
+                             GPR_SLICE_START_PTR(error_description_slice)),
+                         GPR_SLICE_LENGTH(error_description_slice));
+    google_rpc_Status* error_detail =
+        envoy_api_v2_DiscoveryRequest_mutable_error_detail(request,
+                                                           arena.ptr());
+    google_rpc_Status_set_message(error_detail, error_description_strview);
+    GRPC_ERROR_UNREF(error);
+  }
   // Encode the request.
   size_t output_length;
   char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(),
@@ -220,6 +336,76 @@ grpc_slice XdsEdsRequestCreateAndEncode(const char* server_name,
   return grpc_slice_from_copied_buffer(output, output_length);
 }
 
+grpc_error* CdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
+                             CdsUpdateMap* cds_update_map, upb_arena* arena) {
+  // Get the resources from the response.
+  size_t size;
+  const google_protobuf_Any* const* resources =
+      envoy_api_v2_DiscoveryResponse_resources(response, &size);
+  if (size < 1) {
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "CDS response contains 0 resource.");
+  }
+  // Parse all the resources in the CDS response.
+  for (size_t i = 0; i < size; ++i) {
+    CdsUpdate cds_update;
+    // Check the type_url of the resource.
+    const upb_strview type_url = google_protobuf_Any_type_url(resources[i]);
+    if (!upb_strview_eql(type_url, upb_strview_makez(kCdsTypeUrl))) {
+      return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not CDS.");
+    }
+    // Decode the cluster.
+    const upb_strview encoded_cluster = google_protobuf_Any_value(resources[i]);
+    const envoy_api_v2_Cluster* cluster = envoy_api_v2_Cluster_parse(
+        encoded_cluster.data, encoded_cluster.size, arena);
+    if (cluster == nullptr) {
+      return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode cluster.");
+    }
+    // Check the cluster_discovery_type.
+    if (!envoy_api_v2_Cluster_has_type(cluster)) {
+      return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType not found.");
+    }
+    if (envoy_api_v2_Cluster_type(cluster) != envoy_api_v2_Cluster_EDS) {
+      return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType is not EDS.");
+    }
+    // Check the EDS config source.
+    const envoy_api_v2_Cluster_EdsClusterConfig* eds_cluster_config =
+        envoy_api_v2_Cluster_eds_cluster_config(cluster);
+    const envoy_api_v2_core_ConfigSource* eds_config =
+        envoy_api_v2_Cluster_EdsClusterConfig_eds_config(eds_cluster_config);
+    if (!envoy_api_v2_core_ConfigSource_has_ads(eds_config)) {
+      return GRPC_ERROR_CREATE_FROM_STATIC_STRING("ConfigSource is not ADS.");
+    }
+    // Record EDS service_name (if any).
+    upb_strview service_name =
+        envoy_api_v2_Cluster_EdsClusterConfig_service_name(eds_cluster_config);
+    if (service_name.size != 0) {
+      cds_update.eds_service_name =
+          std::string(service_name.data, service_name.size);
+    }
+    // Check the LB policy.
+    if (envoy_api_v2_Cluster_lb_policy(cluster) !=
+        envoy_api_v2_Cluster_ROUND_ROBIN) {
+      return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "LB policy is not ROUND_ROBIN.");
+    }
+    // Record LRS server name (if any).
+    const envoy_api_v2_core_ConfigSource* lrs_server =
+        envoy_api_v2_Cluster_lrs_server(cluster);
+    if (lrs_server != nullptr) {
+      if (!envoy_api_v2_core_ConfigSource_has_self(lrs_server)) {
+        return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+            "ConfigSource is not self.");
+      }
+      cds_update.lrs_load_reporting_server_name.set("");
+    }
+    upb_strview cluster_name = envoy_api_v2_Cluster_name(cluster);
+    cds_update_map->emplace(std::string(cluster_name.data, cluster_name.size),
+                            std::move(cds_update));
+  }
+  return GRPC_ERROR_NONE;
+}
+
 namespace {
 
 grpc_error* ServerAddressParseAndAppend(
@@ -257,17 +443,6 @@ grpc_error* ServerAddressParseAndAppend(
   return GRPC_ERROR_NONE;
 }
 
-namespace {
-
-grpc_core::UniquePtr<char> StringCopy(const upb_strview& strview) {
-  char* str = static_cast<char*>(gpr_malloc(strview.size + 1));
-  memcpy(str, strview.data, strview.size);
-  str[strview.size] = '\0';
-  return grpc_core::UniquePtr<char>(str);
-}
-
-}  // namespace
-
 grpc_error* LocalityParse(
     const envoy_api_v2_endpoint_LocalityLbEndpoints* locality_lb_endpoints,
     XdsPriorityListUpdate::LocalityMap::Locality* output_locality) {
@@ -284,10 +459,12 @@ grpc_error* LocalityParse(
   // Parse locality name.
   const envoy_api_v2_core_Locality* locality =
       envoy_api_v2_endpoint_LocalityLbEndpoints_locality(locality_lb_endpoints);
+  upb_strview region = envoy_api_v2_core_Locality_region(locality);
+  upb_strview zone = envoy_api_v2_core_Locality_region(locality);
+  upb_strview sub_zone = envoy_api_v2_core_Locality_sub_zone(locality);
   output_locality->name = MakeRefCounted<XdsLocalityName>(
-      StringCopy(envoy_api_v2_core_Locality_region(locality)),
-      StringCopy(envoy_api_v2_core_Locality_zone(locality)),
-      StringCopy(envoy_api_v2_core_Locality_sub_zone(locality)));
+      std::string(region.data, region.size), std::string(zone.data, zone.size),
+      std::string(sub_zone.data, sub_zone.size));
   // Parse the addresses.
   size_t size;
   const envoy_api_v2_endpoint_LbEndpoint* const* lb_endpoints =
@@ -338,30 +515,15 @@ grpc_error* DropParseAndAppend(
   // Cap numerator to 1000000.
   numerator = GPR_MIN(numerator, 1000000);
   if (numerator == 1000000) *drop_all = true;
-  drop_config->AddCategory(StringCopy(category), numerator);
+  drop_config->AddCategory(std::string(category.data, category.size),
+                           numerator);
   return GRPC_ERROR_NONE;
 }
 
-}  // namespace
-
-grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response,
-                                         EdsUpdate* update) {
-  upb::Arena arena;
-  // Decode the response.
-  const envoy_api_v2_DiscoveryResponse* response =
-      envoy_api_v2_DiscoveryResponse_parse(
-          reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(encoded_response)),
-          GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
-  // Parse the response.
-  if (response == nullptr) {
-    return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found.");
-  }
-  // Check the type_url of the response.
-  upb_strview type_url = envoy_api_v2_DiscoveryResponse_type_url(response);
-  upb_strview expected_type_url = upb_strview_makez(kEdsTypeUrl);
-  if (!upb_strview_eql(type_url, expected_type_url)) {
-    return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS.");
-  }
+grpc_error* EdsResponsedParse(
+    const envoy_api_v2_DiscoveryResponse* response,
+    const std::set<StringView>& expected_eds_service_names,
+    EdsUpdateMap* eds_update_map, upb_arena* arena) {
   // Get the resources from the response.
   size_t size;
   const google_protobuf_Any* const* resources =
@@ -370,48 +532,115 @@ grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response,
     return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
         "EDS response contains 0 resource.");
   }
-  // Check the type_url of the resource.
-  type_url = google_protobuf_Any_type_url(resources[0]);
-  if (!upb_strview_eql(type_url, expected_type_url)) {
-    return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS.");
-  }
-  // Get the cluster_load_assignment.
-  upb_strview encoded_cluster_load_assignment =
-      google_protobuf_Any_value(resources[0]);
-  envoy_api_v2_ClusterLoadAssignment* cluster_load_assignment =
-      envoy_api_v2_ClusterLoadAssignment_parse(
-          encoded_cluster_load_assignment.data,
-          encoded_cluster_load_assignment.size, arena.ptr());
-  // Get the endpoints.
-  const envoy_api_v2_endpoint_LocalityLbEndpoints* const* endpoints =
-      envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment,
-                                                   &size);
   for (size_t i = 0; i < size; ++i) {
-    XdsPriorityListUpdate::LocalityMap::Locality locality;
-    grpc_error* error = LocalityParse(endpoints[i], &locality);
-    if (error != GRPC_ERROR_NONE) return error;
-    // Filter out locality with weight 0.
-    if (locality.lb_weight == 0) continue;
-    update->priority_list_update.Add(locality);
-  }
-  // Get the drop config.
-  update->drop_config = MakeRefCounted<XdsDropConfig>();
-  const envoy_api_v2_ClusterLoadAssignment_Policy* policy =
-      envoy_api_v2_ClusterLoadAssignment_policy(cluster_load_assignment);
-  if (policy != nullptr) {
-    const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* const*
-        drop_overload =
-            envoy_api_v2_ClusterLoadAssignment_Policy_drop_overloads(policy,
-                                                                     &size);
-    for (size_t i = 0; i < size; ++i) {
-      grpc_error* error = DropParseAndAppend(
-          drop_overload[i], update->drop_config.get(), &update->drop_all);
+    EdsUpdate eds_update;
+    // Check the type_url of the resource.
+    upb_strview type_url = google_protobuf_Any_type_url(resources[i]);
+    if (!upb_strview_eql(type_url, upb_strview_makez(kEdsTypeUrl))) {
+      return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS.");
+    }
+    // Get the cluster_load_assignment.
+    upb_strview encoded_cluster_load_assignment =
+        google_protobuf_Any_value(resources[i]);
+    envoy_api_v2_ClusterLoadAssignment* cluster_load_assignment =
+        envoy_api_v2_ClusterLoadAssignment_parse(
+            encoded_cluster_load_assignment.data,
+            encoded_cluster_load_assignment.size, arena);
+    if (cluster_load_assignment == nullptr) {
+      return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "Can't parse cluster_load_assignment.");
+    }
+    // Check the cluster name (which actually means eds_service_name). Ignore
+    // unexpected names.
+    upb_strview cluster_name = envoy_api_v2_ClusterLoadAssignment_cluster_name(
+        cluster_load_assignment);
+    StringView cluster_name_strview(cluster_name.data, cluster_name.size);
+    if (expected_eds_service_names.find(cluster_name_strview) ==
+        expected_eds_service_names.end()) {
+      continue;
+    }
+    // Get the endpoints.
+    size_t locality_size;
+    const envoy_api_v2_endpoint_LocalityLbEndpoints* const* endpoints =
+        envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment,
+                                                     &locality_size);
+    for (size_t j = 0; j < locality_size; ++j) {
+      XdsPriorityListUpdate::LocalityMap::Locality locality;
+      grpc_error* error = LocalityParse(endpoints[j], &locality);
       if (error != GRPC_ERROR_NONE) return error;
+      // Filter out locality with weight 0.
+      if (locality.lb_weight == 0) continue;
+      eds_update.priority_list_update.Add(locality);
+    }
+    // Get the drop config.
+    eds_update.drop_config = MakeRefCounted<XdsDropConfig>();
+    const envoy_api_v2_ClusterLoadAssignment_Policy* policy =
+        envoy_api_v2_ClusterLoadAssignment_policy(cluster_load_assignment);
+    if (policy != nullptr) {
+      size_t drop_size;
+      const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* const*
+          drop_overload =
+              envoy_api_v2_ClusterLoadAssignment_Policy_drop_overloads(
+                  policy, &drop_size);
+      for (size_t j = 0; j < drop_size; ++j) {
+        grpc_error* error =
+            DropParseAndAppend(drop_overload[j], eds_update.drop_config.get(),
+                               &eds_update.drop_all);
+        if (error != GRPC_ERROR_NONE) return error;
+      }
+    }
+    // Validate the update content.
+    if (eds_update.priority_list_update.empty() && !eds_update.drop_all) {
+      return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+          "EDS response doesn't contain any valid "
+          "locality but doesn't require to drop all calls.");
     }
+    eds_update_map->emplace(std::string(cluster_name.data, cluster_name.size),
+                            std::move(eds_update));
   }
   return GRPC_ERROR_NONE;
 }
 
+}  // namespace
+
+grpc_error* XdsAdsResponseDecodeAndParse(
+    const grpc_slice& encoded_response,
+    const std::set<StringView>& expected_eds_service_names,
+    CdsUpdateMap* cds_update_map, EdsUpdateMap* eds_update_map,
+    std::string* version, std::string* nonce, std::string* type_url) {
+  upb::Arena arena;
+  // Decode the response.
+  const envoy_api_v2_DiscoveryResponse* response =
+      envoy_api_v2_DiscoveryResponse_parse(
+          reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(encoded_response)),
+          GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
+  // If decoding fails, output an empty type_url and return.
+  if (response == nullptr) {
+    *type_url = "";
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "Can't decode the whole response.");
+  }
+  // Record the type_url, the version_info, and the nonce of the response.
+  upb_strview type_url_strview =
+      envoy_api_v2_DiscoveryResponse_type_url(response);
+  *type_url = std::string(type_url_strview.data, type_url_strview.size);
+  upb_strview version_info =
+      envoy_api_v2_DiscoveryResponse_version_info(response);
+  *version = std::string(version_info.data, version_info.size);
+  upb_strview nonce_strview = envoy_api_v2_DiscoveryResponse_nonce(response);
+  *nonce = std::string(nonce_strview.data, nonce_strview.size);
+  // Parse the response according to the resource type.
+  if (*type_url == kCdsTypeUrl) {
+    return CdsResponseParse(response, cds_update_map, arena.ptr());
+  } else if (*type_url == kEdsTypeUrl) {
+    return EdsResponsedParse(response, expected_eds_service_names,
+                             eds_update_map, arena.ptr());
+  } else {
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+        "Unsupported ADS resource type.");
+  }
+}
+
 namespace {
 
 grpc_slice LrsRequestEncode(
@@ -425,7 +654,7 @@ grpc_slice LrsRequestEncode(
 
 }  // namespace
 
-grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name,
+grpc_slice XdsLrsRequestCreateAndEncode(const std::string& server_name,
                                         const XdsBootstrap::Node* node,
                                         const char* build_version) {
   upb::Arena arena;
@@ -444,7 +673,7 @@ grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name,
           request, arena.ptr());
   // Set the cluster name.
   envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
-      cluster_stats, upb_strview_makez(server_name));
+      cluster_stats, upb_strview_makez(server_name.c_str()));
   return LrsRequestEncode(request, arena.ptr());
 }
 
@@ -452,17 +681,17 @@ namespace {
 
 void LocalityStatsPopulate(
     envoy_api_v2_endpoint_UpstreamLocalityStats* output,
-    std::pair<const RefCountedPtr<XdsLocalityName>,
-              XdsClientStats::LocalityStats::Snapshot>& input,
+    const std::pair<const RefCountedPtr<XdsLocalityName>,
+                    XdsClientStats::LocalityStats::Snapshot>& input,
     upb_arena* arena) {
   // Set sub_zone.
   envoy_api_v2_core_Locality* locality =
       envoy_api_v2_endpoint_UpstreamLocalityStats_mutable_locality(output,
                                                                    arena);
   envoy_api_v2_core_Locality_set_sub_zone(
-      locality, upb_strview_makez(input.first->sub_zone()));
+      locality, upb_strview_makez(input.first->sub_zone().c_str()));
   // Set total counts.
-  XdsClientStats::LocalityStats::Snapshot& snapshot = input.second;
+  const XdsClientStats::LocalityStats::Snapshot& snapshot = input.second;
   envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_successful_requests(
       output, snapshot.total_successful_requests);
   envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_requests_in_progress(
@@ -473,7 +702,7 @@ void LocalityStatsPopulate(
       output, snapshot.total_issued_requests);
   // Add load metric stats.
   for (auto& p : snapshot.load_metric_stats) {
-    const char* metric_name = p.first.get();
+    const char* metric_name = p.first.c_str();
     const XdsClientStats::LocalityStats::LoadMetric::Snapshot& metric_value =
         p.second;
     envoy_api_v2_endpoint_EndpointLoadMetricStats* load_metric =
@@ -490,62 +719,80 @@ void LocalityStatsPopulate(
 
 }  // namespace
 
-grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name,
-                                        XdsClientStats* client_stats) {
+grpc_slice XdsLrsRequestCreateAndEncode(
+    std::map<StringView, std::set<XdsClientStats*>> client_stats_map) {
   upb::Arena arena;
-  XdsClientStats::Snapshot snapshot = client_stats->GetSnapshotAndReset();
-  // Prune unused locality stats.
-  client_stats->PruneLocalityStats();
+  // Get the snapshots.
+  std::map<StringView, grpc_core::InlinedVector<XdsClientStats::Snapshot, 1>>
+      snapshot_map;
+  for (auto& p : client_stats_map) {
+    const StringView& cluster_name = p.first;
+    for (auto* client_stats : p.second) {
+      XdsClientStats::Snapshot snapshot = client_stats->GetSnapshotAndReset();
+      // Prune unused locality stats.
+      client_stats->PruneLocalityStats();
+      if (snapshot.IsAllZero()) continue;
+      snapshot_map[cluster_name].emplace_back(std::move(snapshot));
+    }
+  }
   // When all the counts are zero, return empty slice.
-  if (snapshot.IsAllZero()) return grpc_empty_slice();
+  if (snapshot_map.empty()) return grpc_empty_slice();
   // Create a request.
   envoy_service_load_stats_v2_LoadStatsRequest* request =
       envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr());
-  // Add cluster stats. There is only one because we only use one server name in
-  // one channel.
-  envoy_api_v2_endpoint_ClusterStats* cluster_stats =
-      envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats(
-          request, arena.ptr());
-  // Set the cluster name.
-  envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
-      cluster_stats, upb_strview_makez(server_name));
-  // Add locality stats.
-  for (auto& p : snapshot.upstream_locality_stats) {
-    envoy_api_v2_endpoint_UpstreamLocalityStats* locality_stats =
-        envoy_api_v2_endpoint_ClusterStats_add_upstream_locality_stats(
-            cluster_stats, arena.ptr());
-    LocalityStatsPopulate(locality_stats, p, arena.ptr());
-  }
-  // Add dropped requests.
-  for (auto& p : snapshot.dropped_requests) {
-    const char* category = p.first.get();
-    const uint64_t count = p.second;
-    envoy_api_v2_endpoint_ClusterStats_DroppedRequests* dropped_requests =
-        envoy_api_v2_endpoint_ClusterStats_add_dropped_requests(cluster_stats,
-                                                                arena.ptr());
-    envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_category(
-        dropped_requests, upb_strview_makez(category));
-    envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_dropped_count(
-        dropped_requests, count);
-  }
-  // Set total dropped requests.
-  envoy_api_v2_endpoint_ClusterStats_set_total_dropped_requests(
-      cluster_stats, snapshot.total_dropped_requests);
-  // Set real load report interval.
-  gpr_timespec timespec =
-      grpc_millis_to_timespec(snapshot.load_report_interval, GPR_TIMESPAN);
-  google_protobuf_Duration* load_report_interval =
-      envoy_api_v2_endpoint_ClusterStats_mutable_load_report_interval(
-          cluster_stats, arena.ptr());
-  google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec);
-  google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec);
+  for (auto& p : snapshot_map) {
+    const StringView& cluster_name = p.first;
+    const auto& snapshot_list = p.second;
+    for (size_t i = 0; i < snapshot_list.size(); ++i) {
+      const auto& snapshot = snapshot_list[i];
+      // Add cluster stats.
+      envoy_api_v2_endpoint_ClusterStats* cluster_stats =
+          envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats(
+              request, arena.ptr());
+      // Set the cluster name.
+      envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
+          cluster_stats,
+          upb_strview_make(cluster_name.data(), cluster_name.size()));
+      // Add locality stats.
+      for (auto& p : snapshot.upstream_locality_stats) {
+        envoy_api_v2_endpoint_UpstreamLocalityStats* locality_stats =
+            envoy_api_v2_endpoint_ClusterStats_add_upstream_locality_stats(
+                cluster_stats, arena.ptr());
+        LocalityStatsPopulate(locality_stats, p, arena.ptr());
+      }
+      // Add dropped requests.
+      for (auto& p : snapshot.dropped_requests) {
+        const char* category = p.first.c_str();
+        const uint64_t count = p.second;
+        envoy_api_v2_endpoint_ClusterStats_DroppedRequests* dropped_requests =
+            envoy_api_v2_endpoint_ClusterStats_add_dropped_requests(
+                cluster_stats, arena.ptr());
+        envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_category(
+            dropped_requests, upb_strview_makez(category));
+        envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_dropped_count(
+            dropped_requests, count);
+      }
+      // Set total dropped requests.
+      envoy_api_v2_endpoint_ClusterStats_set_total_dropped_requests(
+          cluster_stats, snapshot.total_dropped_requests);
+      // Set real load report interval.
+      gpr_timespec timespec =
+          grpc_millis_to_timespec(snapshot.load_report_interval, GPR_TIMESPAN);
+      google_protobuf_Duration* load_report_interval =
+          envoy_api_v2_endpoint_ClusterStats_mutable_load_report_interval(
+              cluster_stats, arena.ptr());
+      google_protobuf_Duration_set_seconds(load_report_interval,
+                                           timespec.tv_sec);
+      google_protobuf_Duration_set_nanos(load_report_interval,
+                                         timespec.tv_nsec);
+    }
+  }
   return LrsRequestEncode(request, arena.ptr());
 }
 
-grpc_error* XdsLrsResponseDecodeAndParse(
-    const grpc_slice& encoded_response,
-    grpc_core::UniquePtr<char>* cluster_name,
-    grpc_millis* load_reporting_interval) {
+grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response,
+                                         std::set<std::string>* cluster_names,
+                                         grpc_millis* load_reporting_interval) {
   upb::Arena arena;
   // Decode the response.
   const envoy_service_load_stats_v2_LoadStatsResponse* decoded_response =
@@ -554,19 +801,16 @@ grpc_error* XdsLrsResponseDecodeAndParse(
           GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
   // Parse the response.
   if (decoded_response == nullptr) {
-    return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found.");
+    return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode response.");
   }
-  // Check the cluster size in the response.
+  // Store the cluster names.
   size_t size;
   const upb_strview* clusters =
       envoy_service_load_stats_v2_LoadStatsResponse_clusters(decoded_response,
                                                              &size);
-  if (size != 1) {
-    return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
-        "The number of clusters (server names) is not 1.");
+  for (size_t i = 0; i < size; ++i) {
+    cluster_names->emplace(clusters[i].data, clusters[i].size);
   }
-  // Get the cluster name for reporting loads.
-  *cluster_name = StringCopy(clusters[0]);
   // Get the load report interval.
   const google_protobuf_Duration* load_reporting_interval_duration =
       envoy_service_load_stats_v2_LoadStatsResponse_load_reporting_interval(