#include <grpc/support/string_util.h>
#include "src/core/ext/filters/client_channel/xds/xds_api.h"
+#include "src/core/lib/gprpp/inlined_vector.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "envoy/api/v2/cds.upb.h"
#include "envoy/api/v2/core/address.upb.h"
#include "envoy/api/v2/core/base.upb.h"
+#include "envoy/api/v2/core/config_source.upb.h"
#include "envoy/api/v2/core/health_check.upb.h"
#include "envoy/api/v2/discovery.upb.h"
#include "envoy/api/v2/eds.upb.h"
#include "google/protobuf/any.upb.h"
#include "google/protobuf/duration.upb.h"
#include "google/protobuf/struct.upb.h"
-#include "google/protobuf/timestamp.upb.h"
#include "google/protobuf/wrappers.upb.h"
+#include "google/rpc/status.upb.h"
#include "upb/upb.h"
namespace grpc_core {
-namespace {
-
-constexpr char kEdsTypeUrl[] =
- "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
-
-} // namespace
-
bool XdsPriorityListUpdate::operator==(
const XdsPriorityListUpdate& other) const {
if (priorities_.size() != other.priorities_.size()) return false;
return false;
}
-bool XdsDropConfig::ShouldDrop(
- const grpc_core::UniquePtr<char>** category_name) const {
+bool XdsDropConfig::ShouldDrop(const std::string** category_name) const {
for (size_t i = 0; i < drop_category_list_.size(); ++i) {
const auto& drop_category = drop_category_list_[i];
// Generate a random number in [0, 1000000).
} // namespace
-grpc_slice XdsEdsRequestCreateAndEncode(const char* server_name,
- const XdsBootstrap::Node* node,
- const char* build_version) {
+grpc_slice XdsUnsupportedTypeNackRequestCreateAndEncode(
+ const std::string& type_url, const std::string& nonce, grpc_error* error) {
upb::Arena arena;
// Create a request.
envoy_api_v2_DiscoveryRequest* request =
envoy_api_v2_DiscoveryRequest_new(arena.ptr());
- envoy_api_v2_core_Node* node_msg =
- envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
- PopulateNode(arena.ptr(), node, build_version, node_msg);
- envoy_api_v2_DiscoveryRequest_add_resource_names(
- request, upb_strview_makez(server_name), arena.ptr());
+ // Set type_url.
+ envoy_api_v2_DiscoveryRequest_set_type_url(
+ request, upb_strview_makez(type_url.c_str()));
+ // Set nonce.
+ envoy_api_v2_DiscoveryRequest_set_response_nonce(
+ request, upb_strview_makez(nonce.c_str()));
+ // Set error_detail.
+ grpc_slice error_description_slice;
+ GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION,
+ &error_description_slice));
+ upb_strview error_description_strview =
+ upb_strview_make(reinterpret_cast<const char*>(
+ GPR_SLICE_START_PTR(error_description_slice)),
+ GPR_SLICE_LENGTH(error_description_slice));
+ google_rpc_Status* error_detail =
+ envoy_api_v2_DiscoveryRequest_mutable_error_detail(request, arena.ptr());
+ google_rpc_Status_set_message(error_detail, error_description_strview);
+ GRPC_ERROR_UNREF(error);
+ // Encode the request.
+ size_t output_length;
+ char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(),
+ &output_length);
+ return grpc_slice_from_copied_buffer(output, output_length);
+}
+
+grpc_slice XdsCdsRequestCreateAndEncode(
+ const std::set<StringView>& cluster_names, const XdsBootstrap::Node* node,
+ const char* build_version, const std::string& version,
+ const std::string& nonce, grpc_error* error) {
+ upb::Arena arena;
+ // Create a request.
+ envoy_api_v2_DiscoveryRequest* request =
+ envoy_api_v2_DiscoveryRequest_new(arena.ptr());
+ // Set version_info.
+ if (!version.empty()) {
+ envoy_api_v2_DiscoveryRequest_set_version_info(
+ request, upb_strview_makez(version.c_str()));
+ }
+ // Populate node.
+ if (build_version != nullptr) {
+ envoy_api_v2_core_Node* node_msg =
+ envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
+ PopulateNode(arena.ptr(), node, build_version, node_msg);
+ }
+ // Add resource_names.
+ for (const auto& cluster_name : cluster_names) {
+ envoy_api_v2_DiscoveryRequest_add_resource_names(
+ request, upb_strview_make(cluster_name.data(), cluster_name.size()),
+ arena.ptr());
+ }
+ // Set type_url.
+ envoy_api_v2_DiscoveryRequest_set_type_url(request,
+ upb_strview_makez(kCdsTypeUrl));
+ // Set nonce.
+ if (!nonce.empty()) {
+ envoy_api_v2_DiscoveryRequest_set_response_nonce(
+ request, upb_strview_makez(nonce.c_str()));
+ }
+ // Set error_detail if it's a NACK.
+ if (error != GRPC_ERROR_NONE) {
+ grpc_slice error_description_slice;
+ GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION,
+ &error_description_slice));
+ upb_strview error_description_strview =
+ upb_strview_make(reinterpret_cast<const char*>(
+ GPR_SLICE_START_PTR(error_description_slice)),
+ GPR_SLICE_LENGTH(error_description_slice));
+ google_rpc_Status* error_detail =
+ envoy_api_v2_DiscoveryRequest_mutable_error_detail(request,
+ arena.ptr());
+ google_rpc_Status_set_message(error_detail, error_description_strview);
+ GRPC_ERROR_UNREF(error);
+ }
+ // Encode the request.
+ size_t output_length;
+ char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(),
+ &output_length);
+ return grpc_slice_from_copied_buffer(output, output_length);
+}
+
+grpc_slice XdsEdsRequestCreateAndEncode(
+ const std::set<StringView>& eds_service_names,
+ const XdsBootstrap::Node* node, const char* build_version,
+ const std::string& version, const std::string& nonce, grpc_error* error) {
+ upb::Arena arena;
+ // Create a request.
+ envoy_api_v2_DiscoveryRequest* request =
+ envoy_api_v2_DiscoveryRequest_new(arena.ptr());
+ // Set version_info.
+ if (!version.empty()) {
+ envoy_api_v2_DiscoveryRequest_set_version_info(
+ request, upb_strview_makez(version.c_str()));
+ }
+ // Populate node.
+ if (build_version != nullptr) {
+ envoy_api_v2_core_Node* node_msg =
+ envoy_api_v2_DiscoveryRequest_mutable_node(request, arena.ptr());
+ PopulateNode(arena.ptr(), node, build_version, node_msg);
+ }
+ // Add resource_names.
+ for (const auto& eds_service_name : eds_service_names) {
+ envoy_api_v2_DiscoveryRequest_add_resource_names(
+ request,
+ upb_strview_make(eds_service_name.data(), eds_service_name.size()),
+ arena.ptr());
+ }
+ // Set type_url.
envoy_api_v2_DiscoveryRequest_set_type_url(request,
upb_strview_makez(kEdsTypeUrl));
+ // Set nonce.
+ if (!nonce.empty()) {
+ envoy_api_v2_DiscoveryRequest_set_response_nonce(
+ request, upb_strview_makez(nonce.c_str()));
+ }
+ // Set error_detail if it's a NACK.
+ if (error != GRPC_ERROR_NONE) {
+ grpc_slice error_description_slice;
+ GPR_ASSERT(grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION,
+ &error_description_slice));
+ upb_strview error_description_strview =
+ upb_strview_make(reinterpret_cast<const char*>(
+ GPR_SLICE_START_PTR(error_description_slice)),
+ GPR_SLICE_LENGTH(error_description_slice));
+ google_rpc_Status* error_detail =
+ envoy_api_v2_DiscoveryRequest_mutable_error_detail(request,
+ arena.ptr());
+ google_rpc_Status_set_message(error_detail, error_description_strview);
+ GRPC_ERROR_UNREF(error);
+ }
// Encode the request.
size_t output_length;
char* output = envoy_api_v2_DiscoveryRequest_serialize(request, arena.ptr(),
return grpc_slice_from_copied_buffer(output, output_length);
}
+grpc_error* CdsResponseParse(const envoy_api_v2_DiscoveryResponse* response,
+ CdsUpdateMap* cds_update_map, upb_arena* arena) {
+ // Get the resources from the response.
+ size_t size;
+ const google_protobuf_Any* const* resources =
+ envoy_api_v2_DiscoveryResponse_resources(response, &size);
+ if (size < 1) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "CDS response contains 0 resource.");
+ }
+ // Parse all the resources in the CDS response.
+ for (size_t i = 0; i < size; ++i) {
+ CdsUpdate cds_update;
+ // Check the type_url of the resource.
+ const upb_strview type_url = google_protobuf_Any_type_url(resources[i]);
+ if (!upb_strview_eql(type_url, upb_strview_makez(kCdsTypeUrl))) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not CDS.");
+ }
+ // Decode the cluster.
+ const upb_strview encoded_cluster = google_protobuf_Any_value(resources[i]);
+ const envoy_api_v2_Cluster* cluster = envoy_api_v2_Cluster_parse(
+ encoded_cluster.data, encoded_cluster.size, arena);
+ if (cluster == nullptr) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode cluster.");
+ }
+ // Check the cluster_discovery_type.
+ if (!envoy_api_v2_Cluster_has_type(cluster)) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType not found.");
+ }
+ if (envoy_api_v2_Cluster_type(cluster) != envoy_api_v2_Cluster_EDS) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING("DiscoveryType is not EDS.");
+ }
+ // Check the EDS config source.
+ const envoy_api_v2_Cluster_EdsClusterConfig* eds_cluster_config =
+ envoy_api_v2_Cluster_eds_cluster_config(cluster);
+ const envoy_api_v2_core_ConfigSource* eds_config =
+ envoy_api_v2_Cluster_EdsClusterConfig_eds_config(eds_cluster_config);
+ if (!envoy_api_v2_core_ConfigSource_has_ads(eds_config)) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING("ConfigSource is not ADS.");
+ }
+ // Record EDS service_name (if any).
+ upb_strview service_name =
+ envoy_api_v2_Cluster_EdsClusterConfig_service_name(eds_cluster_config);
+ if (service_name.size != 0) {
+ cds_update.eds_service_name =
+ std::string(service_name.data, service_name.size);
+ }
+ // Check the LB policy.
+ if (envoy_api_v2_Cluster_lb_policy(cluster) !=
+ envoy_api_v2_Cluster_ROUND_ROBIN) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "LB policy is not ROUND_ROBIN.");
+ }
+ // Record LRS server name (if any).
+ const envoy_api_v2_core_ConfigSource* lrs_server =
+ envoy_api_v2_Cluster_lrs_server(cluster);
+ if (lrs_server != nullptr) {
+ if (!envoy_api_v2_core_ConfigSource_has_self(lrs_server)) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "ConfigSource is not self.");
+ }
+ cds_update.lrs_load_reporting_server_name.set("");
+ }
+ upb_strview cluster_name = envoy_api_v2_Cluster_name(cluster);
+ cds_update_map->emplace(std::string(cluster_name.data, cluster_name.size),
+ std::move(cds_update));
+ }
+ return GRPC_ERROR_NONE;
+}
+
namespace {
grpc_error* ServerAddressParseAndAppend(
return GRPC_ERROR_NONE;
}
-namespace {
-
-grpc_core::UniquePtr<char> StringCopy(const upb_strview& strview) {
- char* str = static_cast<char*>(gpr_malloc(strview.size + 1));
- memcpy(str, strview.data, strview.size);
- str[strview.size] = '\0';
- return grpc_core::UniquePtr<char>(str);
-}
-
-} // namespace
-
grpc_error* LocalityParse(
const envoy_api_v2_endpoint_LocalityLbEndpoints* locality_lb_endpoints,
XdsPriorityListUpdate::LocalityMap::Locality* output_locality) {
// Parse locality name.
const envoy_api_v2_core_Locality* locality =
envoy_api_v2_endpoint_LocalityLbEndpoints_locality(locality_lb_endpoints);
+ upb_strview region = envoy_api_v2_core_Locality_region(locality);
+ upb_strview zone = envoy_api_v2_core_Locality_region(locality);
+ upb_strview sub_zone = envoy_api_v2_core_Locality_sub_zone(locality);
output_locality->name = MakeRefCounted<XdsLocalityName>(
- StringCopy(envoy_api_v2_core_Locality_region(locality)),
- StringCopy(envoy_api_v2_core_Locality_zone(locality)),
- StringCopy(envoy_api_v2_core_Locality_sub_zone(locality)));
+ std::string(region.data, region.size), std::string(zone.data, zone.size),
+ std::string(sub_zone.data, sub_zone.size));
// Parse the addresses.
size_t size;
const envoy_api_v2_endpoint_LbEndpoint* const* lb_endpoints =
// Cap numerator to 1000000.
numerator = GPR_MIN(numerator, 1000000);
if (numerator == 1000000) *drop_all = true;
- drop_config->AddCategory(StringCopy(category), numerator);
+ drop_config->AddCategory(std::string(category.data, category.size),
+ numerator);
return GRPC_ERROR_NONE;
}
-} // namespace
-
-grpc_error* XdsEdsResponseDecodeAndParse(const grpc_slice& encoded_response,
- EdsUpdate* update) {
- upb::Arena arena;
- // Decode the response.
- const envoy_api_v2_DiscoveryResponse* response =
- envoy_api_v2_DiscoveryResponse_parse(
- reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(encoded_response)),
- GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
- // Parse the response.
- if (response == nullptr) {
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found.");
- }
- // Check the type_url of the response.
- upb_strview type_url = envoy_api_v2_DiscoveryResponse_type_url(response);
- upb_strview expected_type_url = upb_strview_makez(kEdsTypeUrl);
- if (!upb_strview_eql(type_url, expected_type_url)) {
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS.");
- }
+grpc_error* EdsResponsedParse(
+ const envoy_api_v2_DiscoveryResponse* response,
+ const std::set<StringView>& expected_eds_service_names,
+ EdsUpdateMap* eds_update_map, upb_arena* arena) {
// Get the resources from the response.
size_t size;
const google_protobuf_Any* const* resources =
return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"EDS response contains 0 resource.");
}
- // Check the type_url of the resource.
- type_url = google_protobuf_Any_type_url(resources[0]);
- if (!upb_strview_eql(type_url, expected_type_url)) {
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS.");
- }
- // Get the cluster_load_assignment.
- upb_strview encoded_cluster_load_assignment =
- google_protobuf_Any_value(resources[0]);
- envoy_api_v2_ClusterLoadAssignment* cluster_load_assignment =
- envoy_api_v2_ClusterLoadAssignment_parse(
- encoded_cluster_load_assignment.data,
- encoded_cluster_load_assignment.size, arena.ptr());
- // Get the endpoints.
- const envoy_api_v2_endpoint_LocalityLbEndpoints* const* endpoints =
- envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment,
- &size);
for (size_t i = 0; i < size; ++i) {
- XdsPriorityListUpdate::LocalityMap::Locality locality;
- grpc_error* error = LocalityParse(endpoints[i], &locality);
- if (error != GRPC_ERROR_NONE) return error;
- // Filter out locality with weight 0.
- if (locality.lb_weight == 0) continue;
- update->priority_list_update.Add(locality);
- }
- // Get the drop config.
- update->drop_config = MakeRefCounted<XdsDropConfig>();
- const envoy_api_v2_ClusterLoadAssignment_Policy* policy =
- envoy_api_v2_ClusterLoadAssignment_policy(cluster_load_assignment);
- if (policy != nullptr) {
- const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* const*
- drop_overload =
- envoy_api_v2_ClusterLoadAssignment_Policy_drop_overloads(policy,
- &size);
- for (size_t i = 0; i < size; ++i) {
- grpc_error* error = DropParseAndAppend(
- drop_overload[i], update->drop_config.get(), &update->drop_all);
+ EdsUpdate eds_update;
+ // Check the type_url of the resource.
+ upb_strview type_url = google_protobuf_Any_type_url(resources[i]);
+ if (!upb_strview_eql(type_url, upb_strview_makez(kEdsTypeUrl))) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource is not EDS.");
+ }
+ // Get the cluster_load_assignment.
+ upb_strview encoded_cluster_load_assignment =
+ google_protobuf_Any_value(resources[i]);
+ envoy_api_v2_ClusterLoadAssignment* cluster_load_assignment =
+ envoy_api_v2_ClusterLoadAssignment_parse(
+ encoded_cluster_load_assignment.data,
+ encoded_cluster_load_assignment.size, arena);
+ if (cluster_load_assignment == nullptr) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Can't parse cluster_load_assignment.");
+ }
+ // Check the cluster name (which actually means eds_service_name). Ignore
+ // unexpected names.
+ upb_strview cluster_name = envoy_api_v2_ClusterLoadAssignment_cluster_name(
+ cluster_load_assignment);
+ StringView cluster_name_strview(cluster_name.data, cluster_name.size);
+ if (expected_eds_service_names.find(cluster_name_strview) ==
+ expected_eds_service_names.end()) {
+ continue;
+ }
+ // Get the endpoints.
+ size_t locality_size;
+ const envoy_api_v2_endpoint_LocalityLbEndpoints* const* endpoints =
+ envoy_api_v2_ClusterLoadAssignment_endpoints(cluster_load_assignment,
+ &locality_size);
+ for (size_t j = 0; j < locality_size; ++j) {
+ XdsPriorityListUpdate::LocalityMap::Locality locality;
+ grpc_error* error = LocalityParse(endpoints[j], &locality);
if (error != GRPC_ERROR_NONE) return error;
+ // Filter out locality with weight 0.
+ if (locality.lb_weight == 0) continue;
+ eds_update.priority_list_update.Add(locality);
+ }
+ // Get the drop config.
+ eds_update.drop_config = MakeRefCounted<XdsDropConfig>();
+ const envoy_api_v2_ClusterLoadAssignment_Policy* policy =
+ envoy_api_v2_ClusterLoadAssignment_policy(cluster_load_assignment);
+ if (policy != nullptr) {
+ size_t drop_size;
+ const envoy_api_v2_ClusterLoadAssignment_Policy_DropOverload* const*
+ drop_overload =
+ envoy_api_v2_ClusterLoadAssignment_Policy_drop_overloads(
+ policy, &drop_size);
+ for (size_t j = 0; j < drop_size; ++j) {
+ grpc_error* error =
+ DropParseAndAppend(drop_overload[j], eds_update.drop_config.get(),
+ &eds_update.drop_all);
+ if (error != GRPC_ERROR_NONE) return error;
+ }
+ }
+ // Validate the update content.
+ if (eds_update.priority_list_update.empty() && !eds_update.drop_all) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "EDS response doesn't contain any valid "
+ "locality but doesn't require to drop all calls.");
}
+ eds_update_map->emplace(std::string(cluster_name.data, cluster_name.size),
+ std::move(eds_update));
}
return GRPC_ERROR_NONE;
}
+} // namespace
+
+grpc_error* XdsAdsResponseDecodeAndParse(
+ const grpc_slice& encoded_response,
+ const std::set<StringView>& expected_eds_service_names,
+ CdsUpdateMap* cds_update_map, EdsUpdateMap* eds_update_map,
+ std::string* version, std::string* nonce, std::string* type_url) {
+ upb::Arena arena;
+ // Decode the response.
+ const envoy_api_v2_DiscoveryResponse* response =
+ envoy_api_v2_DiscoveryResponse_parse(
+ reinterpret_cast<const char*>(GRPC_SLICE_START_PTR(encoded_response)),
+ GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
+ // If decoding fails, output an empty type_url and return.
+ if (response == nullptr) {
+ *type_url = "";
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Can't decode the whole response.");
+ }
+ // Record the type_url, the version_info, and the nonce of the response.
+ upb_strview type_url_strview =
+ envoy_api_v2_DiscoveryResponse_type_url(response);
+ *type_url = std::string(type_url_strview.data, type_url_strview.size);
+ upb_strview version_info =
+ envoy_api_v2_DiscoveryResponse_version_info(response);
+ *version = std::string(version_info.data, version_info.size);
+ upb_strview nonce_strview = envoy_api_v2_DiscoveryResponse_nonce(response);
+ *nonce = std::string(nonce_strview.data, nonce_strview.size);
+ // Parse the response according to the resource type.
+ if (*type_url == kCdsTypeUrl) {
+ return CdsResponseParse(response, cds_update_map, arena.ptr());
+ } else if (*type_url == kEdsTypeUrl) {
+ return EdsResponsedParse(response, expected_eds_service_names,
+ eds_update_map, arena.ptr());
+ } else {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Unsupported ADS resource type.");
+ }
+}
+
namespace {
grpc_slice LrsRequestEncode(
} // namespace
-grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name,
+grpc_slice XdsLrsRequestCreateAndEncode(const std::string& server_name,
const XdsBootstrap::Node* node,
const char* build_version) {
upb::Arena arena;
request, arena.ptr());
// Set the cluster name.
envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
- cluster_stats, upb_strview_makez(server_name));
+ cluster_stats, upb_strview_makez(server_name.c_str()));
return LrsRequestEncode(request, arena.ptr());
}
void LocalityStatsPopulate(
envoy_api_v2_endpoint_UpstreamLocalityStats* output,
- std::pair<const RefCountedPtr<XdsLocalityName>,
- XdsClientStats::LocalityStats::Snapshot>& input,
+ const std::pair<const RefCountedPtr<XdsLocalityName>,
+ XdsClientStats::LocalityStats::Snapshot>& input,
upb_arena* arena) {
// Set sub_zone.
envoy_api_v2_core_Locality* locality =
envoy_api_v2_endpoint_UpstreamLocalityStats_mutable_locality(output,
arena);
envoy_api_v2_core_Locality_set_sub_zone(
- locality, upb_strview_makez(input.first->sub_zone()));
+ locality, upb_strview_makez(input.first->sub_zone().c_str()));
// Set total counts.
- XdsClientStats::LocalityStats::Snapshot& snapshot = input.second;
+ const XdsClientStats::LocalityStats::Snapshot& snapshot = input.second;
envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_successful_requests(
output, snapshot.total_successful_requests);
envoy_api_v2_endpoint_UpstreamLocalityStats_set_total_requests_in_progress(
output, snapshot.total_issued_requests);
// Add load metric stats.
for (auto& p : snapshot.load_metric_stats) {
- const char* metric_name = p.first.get();
+ const char* metric_name = p.first.c_str();
const XdsClientStats::LocalityStats::LoadMetric::Snapshot& metric_value =
p.second;
envoy_api_v2_endpoint_EndpointLoadMetricStats* load_metric =
} // namespace
-grpc_slice XdsLrsRequestCreateAndEncode(const char* server_name,
- XdsClientStats* client_stats) {
+grpc_slice XdsLrsRequestCreateAndEncode(
+ std::map<StringView, std::set<XdsClientStats*>> client_stats_map) {
upb::Arena arena;
- XdsClientStats::Snapshot snapshot = client_stats->GetSnapshotAndReset();
- // Prune unused locality stats.
- client_stats->PruneLocalityStats();
+ // Get the snapshots.
+ std::map<StringView, grpc_core::InlinedVector<XdsClientStats::Snapshot, 1>>
+ snapshot_map;
+ for (auto& p : client_stats_map) {
+ const StringView& cluster_name = p.first;
+ for (auto* client_stats : p.second) {
+ XdsClientStats::Snapshot snapshot = client_stats->GetSnapshotAndReset();
+ // Prune unused locality stats.
+ client_stats->PruneLocalityStats();
+ if (snapshot.IsAllZero()) continue;
+ snapshot_map[cluster_name].emplace_back(std::move(snapshot));
+ }
+ }
// When all the counts are zero, return empty slice.
- if (snapshot.IsAllZero()) return grpc_empty_slice();
+ if (snapshot_map.empty()) return grpc_empty_slice();
// Create a request.
envoy_service_load_stats_v2_LoadStatsRequest* request =
envoy_service_load_stats_v2_LoadStatsRequest_new(arena.ptr());
- // Add cluster stats. There is only one because we only use one server name in
- // one channel.
- envoy_api_v2_endpoint_ClusterStats* cluster_stats =
- envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats(
- request, arena.ptr());
- // Set the cluster name.
- envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
- cluster_stats, upb_strview_makez(server_name));
- // Add locality stats.
- for (auto& p : snapshot.upstream_locality_stats) {
- envoy_api_v2_endpoint_UpstreamLocalityStats* locality_stats =
- envoy_api_v2_endpoint_ClusterStats_add_upstream_locality_stats(
- cluster_stats, arena.ptr());
- LocalityStatsPopulate(locality_stats, p, arena.ptr());
- }
- // Add dropped requests.
- for (auto& p : snapshot.dropped_requests) {
- const char* category = p.first.get();
- const uint64_t count = p.second;
- envoy_api_v2_endpoint_ClusterStats_DroppedRequests* dropped_requests =
- envoy_api_v2_endpoint_ClusterStats_add_dropped_requests(cluster_stats,
- arena.ptr());
- envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_category(
- dropped_requests, upb_strview_makez(category));
- envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_dropped_count(
- dropped_requests, count);
- }
- // Set total dropped requests.
- envoy_api_v2_endpoint_ClusterStats_set_total_dropped_requests(
- cluster_stats, snapshot.total_dropped_requests);
- // Set real load report interval.
- gpr_timespec timespec =
- grpc_millis_to_timespec(snapshot.load_report_interval, GPR_TIMESPAN);
- google_protobuf_Duration* load_report_interval =
- envoy_api_v2_endpoint_ClusterStats_mutable_load_report_interval(
- cluster_stats, arena.ptr());
- google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec);
- google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec);
+ for (auto& p : snapshot_map) {
+ const StringView& cluster_name = p.first;
+ const auto& snapshot_list = p.second;
+ for (size_t i = 0; i < snapshot_list.size(); ++i) {
+ const auto& snapshot = snapshot_list[i];
+ // Add cluster stats.
+ envoy_api_v2_endpoint_ClusterStats* cluster_stats =
+ envoy_service_load_stats_v2_LoadStatsRequest_add_cluster_stats(
+ request, arena.ptr());
+ // Set the cluster name.
+ envoy_api_v2_endpoint_ClusterStats_set_cluster_name(
+ cluster_stats,
+ upb_strview_make(cluster_name.data(), cluster_name.size()));
+ // Add locality stats.
+ for (auto& p : snapshot.upstream_locality_stats) {
+ envoy_api_v2_endpoint_UpstreamLocalityStats* locality_stats =
+ envoy_api_v2_endpoint_ClusterStats_add_upstream_locality_stats(
+ cluster_stats, arena.ptr());
+ LocalityStatsPopulate(locality_stats, p, arena.ptr());
+ }
+ // Add dropped requests.
+ for (auto& p : snapshot.dropped_requests) {
+ const char* category = p.first.c_str();
+ const uint64_t count = p.second;
+ envoy_api_v2_endpoint_ClusterStats_DroppedRequests* dropped_requests =
+ envoy_api_v2_endpoint_ClusterStats_add_dropped_requests(
+ cluster_stats, arena.ptr());
+ envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_category(
+ dropped_requests, upb_strview_makez(category));
+ envoy_api_v2_endpoint_ClusterStats_DroppedRequests_set_dropped_count(
+ dropped_requests, count);
+ }
+ // Set total dropped requests.
+ envoy_api_v2_endpoint_ClusterStats_set_total_dropped_requests(
+ cluster_stats, snapshot.total_dropped_requests);
+ // Set real load report interval.
+ gpr_timespec timespec =
+ grpc_millis_to_timespec(snapshot.load_report_interval, GPR_TIMESPAN);
+ google_protobuf_Duration* load_report_interval =
+ envoy_api_v2_endpoint_ClusterStats_mutable_load_report_interval(
+ cluster_stats, arena.ptr());
+ google_protobuf_Duration_set_seconds(load_report_interval,
+ timespec.tv_sec);
+ google_protobuf_Duration_set_nanos(load_report_interval,
+ timespec.tv_nsec);
+ }
+ }
return LrsRequestEncode(request, arena.ptr());
}
-grpc_error* XdsLrsResponseDecodeAndParse(
- const grpc_slice& encoded_response,
- grpc_core::UniquePtr<char>* cluster_name,
- grpc_millis* load_reporting_interval) {
+grpc_error* XdsLrsResponseDecodeAndParse(const grpc_slice& encoded_response,
+ std::set<std::string>* cluster_names,
+ grpc_millis* load_reporting_interval) {
upb::Arena arena;
// Decode the response.
const envoy_service_load_stats_v2_LoadStatsResponse* decoded_response =
GRPC_SLICE_LENGTH(encoded_response), arena.ptr());
// Parse the response.
if (decoded_response == nullptr) {
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING("No response found.");
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Can't decode response.");
}
- // Check the cluster size in the response.
+ // Store the cluster names.
size_t size;
const upb_strview* clusters =
envoy_service_load_stats_v2_LoadStatsResponse_clusters(decoded_response,
&size);
- if (size != 1) {
- return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "The number of clusters (server names) is not 1.");
+ for (size_t i = 0; i < size; ++i) {
+ cluster_names->emplace(clusters[i].data, clusters[i].size);
}
- // Get the cluster name for reporting loads.
- *cluster_name = StringCopy(clusters[0]);
// Get the load report interval.
const google_protobuf_Duration* load_reporting_interval_duration =
envoy_service_load_stats_v2_LoadStatsResponse_load_reporting_interval(