3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/impl/codegen/port_platform.h>
27 #include "src/cpp/server/load_reporter/constants.h"
28 #include "src/cpp/server/load_reporter/get_cpu_stats.h"
29 #include "src/cpp/server/load_reporter/load_reporter.h"
31 #include "opencensus/stats/internal/set_aggregation_window.h"
32 #include "opencensus/tags/tag_key.h"
35 namespace load_reporter {
37 CpuStatsProvider::CpuStatsSample CpuStatsProviderDefaultImpl::GetCpuStats() {
38 return GetCpuStatsImpl();
41 CensusViewProvider::CensusViewProvider()
42 : tag_key_token_(::opencensus::tags::TagKey::Register(kTagKeyToken)),
43 tag_key_host_(::opencensus::tags::TagKey::Register(kTagKeyHost)),
44 tag_key_user_id_(::opencensus::tags::TagKey::Register(kTagKeyUserId)),
45 tag_key_status_(::opencensus::tags::TagKey::Register(kTagKeyStatus)),
47 ::opencensus::tags::TagKey::Register(kTagKeyMetricName)) {
48 // One view related to starting a call.
50 ::opencensus::stats::ViewDescriptor()
51 .set_name(kViewStartCount)
52 .set_measure(kMeasureStartCount)
53 .set_aggregation(::opencensus::stats::Aggregation::Sum())
54 .add_column(tag_key_token_)
55 .add_column(tag_key_host_)
56 .add_column(tag_key_user_id_)
58 "Delta count of calls started broken down by <token, host, "
60 ::opencensus::stats::SetAggregationWindow(
61 ::opencensus::stats::AggregationWindow::Delta(), &vd_start_count);
62 view_descriptor_map_.emplace(kViewStartCount, vd_start_count);
63 // Four views related to ending a call.
64 // If this view is set as Count of kMeasureEndBytesSent (in hope of saving one
65 // measure), it's infeasible to prepare fake data for testing. That's because
66 // the OpenCensus API to make up view data will add the input data as separate
67 // measurements instead of setting the data values directly.
69 ::opencensus::stats::ViewDescriptor()
70 .set_name(kViewEndCount)
71 .set_measure(kMeasureEndCount)
72 .set_aggregation(::opencensus::stats::Aggregation::Sum())
73 .add_column(tag_key_token_)
74 .add_column(tag_key_host_)
75 .add_column(tag_key_user_id_)
76 .add_column(tag_key_status_)
78 "Delta count of calls ended broken down by <token, host, "
80 ::opencensus::stats::SetAggregationWindow(
81 ::opencensus::stats::AggregationWindow::Delta(), &vd_end_count);
82 view_descriptor_map_.emplace(kViewEndCount, vd_end_count);
83 auto vd_end_bytes_sent =
84 ::opencensus::stats::ViewDescriptor()
85 .set_name(kViewEndBytesSent)
86 .set_measure(kMeasureEndBytesSent)
87 .set_aggregation(::opencensus::stats::Aggregation::Sum())
88 .add_column(tag_key_token_)
89 .add_column(tag_key_host_)
90 .add_column(tag_key_user_id_)
91 .add_column(tag_key_status_)
93 "Delta sum of bytes sent broken down by <token, host, user_id, "
95 ::opencensus::stats::SetAggregationWindow(
96 ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_sent);
97 view_descriptor_map_.emplace(kViewEndBytesSent, vd_end_bytes_sent);
98 auto vd_end_bytes_received =
99 ::opencensus::stats::ViewDescriptor()
100 .set_name(kViewEndBytesReceived)
101 .set_measure(kMeasureEndBytesReceived)
102 .set_aggregation(::opencensus::stats::Aggregation::Sum())
103 .add_column(tag_key_token_)
104 .add_column(tag_key_host_)
105 .add_column(tag_key_user_id_)
106 .add_column(tag_key_status_)
108 "Delta sum of bytes received broken down by <token, host, "
109 "user_id, status>.");
110 ::opencensus::stats::SetAggregationWindow(
111 ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_received);
112 view_descriptor_map_.emplace(kViewEndBytesReceived, vd_end_bytes_received);
113 auto vd_end_latency_ms =
114 ::opencensus::stats::ViewDescriptor()
115 .set_name(kViewEndLatencyMs)
116 .set_measure(kMeasureEndLatencyMs)
117 .set_aggregation(::opencensus::stats::Aggregation::Sum())
118 .add_column(tag_key_token_)
119 .add_column(tag_key_host_)
120 .add_column(tag_key_user_id_)
121 .add_column(tag_key_status_)
123 "Delta sum of latency in ms broken down by <token, host, "
124 "user_id, status>.");
125 ::opencensus::stats::SetAggregationWindow(
126 ::opencensus::stats::AggregationWindow::Delta(), &vd_end_latency_ms);
127 view_descriptor_map_.emplace(kViewEndLatencyMs, vd_end_latency_ms);
128 // Two views related to other call metrics.
129 auto vd_metric_call_count =
130 ::opencensus::stats::ViewDescriptor()
131 .set_name(kViewOtherCallMetricCount)
132 .set_measure(kMeasureOtherCallMetric)
133 .set_aggregation(::opencensus::stats::Aggregation::Count())
134 .add_column(tag_key_token_)
135 .add_column(tag_key_host_)
136 .add_column(tag_key_user_id_)
137 .add_column(tag_key_metric_name_)
139 "Delta count of calls broken down by <token, host, user_id, "
141 ::opencensus::stats::SetAggregationWindow(
142 ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_call_count);
143 view_descriptor_map_.emplace(kViewOtherCallMetricCount, vd_metric_call_count);
144 auto vd_metric_value =
145 ::opencensus::stats::ViewDescriptor()
146 .set_name(kViewOtherCallMetricValue)
147 .set_measure(kMeasureOtherCallMetric)
148 .set_aggregation(::opencensus::stats::Aggregation::Sum())
149 .add_column(tag_key_token_)
150 .add_column(tag_key_host_)
151 .add_column(tag_key_user_id_)
152 .add_column(tag_key_metric_name_)
154 "Delta sum of call metric value broken down "
155 "by <token, host, user_id, metric_name>.");
156 ::opencensus::stats::SetAggregationWindow(
157 ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_value);
158 view_descriptor_map_.emplace(kViewOtherCallMetricValue, vd_metric_value);
161 double CensusViewProvider::GetRelatedViewDataRowDouble(
162 const ViewDataMap& view_data_map, const char* view_name,
163 size_t view_name_len, const std::vector<std::string>& tag_values) {
164 auto it_vd = view_data_map.find(std::string(view_name, view_name_len));
165 GPR_ASSERT(it_vd != view_data_map.end());
166 GPR_ASSERT(it_vd->second.type() ==
167 ::opencensus::stats::ViewData::Type::kDouble);
168 auto it_row = it_vd->second.double_data().find(tag_values);
169 GPR_ASSERT(it_row != it_vd->second.double_data().end());
170 return it_row->second;
173 uint64_t CensusViewProvider::GetRelatedViewDataRowInt(
174 const ViewDataMap& view_data_map, const char* view_name,
175 size_t view_name_len, const std::vector<std::string>& tag_values) {
176 auto it_vd = view_data_map.find(std::string(view_name, view_name_len));
177 GPR_ASSERT(it_vd != view_data_map.end());
178 GPR_ASSERT(it_vd->second.type() ==
179 ::opencensus::stats::ViewData::Type::kInt64);
180 auto it_row = it_vd->second.int_data().find(tag_values);
181 GPR_ASSERT(it_row != it_vd->second.int_data().end());
182 GPR_ASSERT(it_row->second >= 0);
183 return it_row->second;
186 CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl() {
187 for (const auto& p : view_descriptor_map()) {
188 const std::string& view_name = p.first;
189 const ::opencensus::stats::ViewDescriptor& vd = p.second;
190 // We need to use pair's piecewise ctor here, otherwise the deleted copy
191 // ctor of View will be called.
192 view_map_.emplace(std::piecewise_construct,
193 std::forward_as_tuple(view_name),
194 std::forward_as_tuple(vd));
198 CensusViewProvider::ViewDataMap CensusViewProviderDefaultImpl::FetchViewData() {
199 gpr_log(GPR_DEBUG, "[CVP %p] Starts fetching Census view data.", this);
200 ViewDataMap view_data_map;
201 for (auto& p : view_map_) {
202 const std::string& view_name = p.first;
203 ::opencensus::stats::View& view = p.second;
204 if (view.IsValid()) {
205 view_data_map.emplace(view_name, view.GetData());
206 gpr_log(GPR_DEBUG, "[CVP %p] Fetched view data (view: %s).", this,
211 "[CVP %p] Can't fetch view data because view is invalid (view: %s).",
212 this, view_name.c_str());
215 return view_data_map;
218 std::string LoadReporter::GenerateLbId() {
220 if (next_lb_id_ > UINT32_MAX) {
221 gpr_log(GPR_ERROR, "[LR %p] The LB ID exceeds the max valid value!",
225 int64_t lb_id = next_lb_id_++;
226 // Overflow should never happen.
227 GPR_ASSERT(lb_id >= 0);
228 // Convert to padded hex string for a 32-bit LB ID. E.g, "0000ca5b".
229 char buf[kLbIdLength + 1];
230 snprintf(buf, sizeof(buf), "%08lx", lb_id);
231 std::string lb_id_str(buf, kLbIdLength);
232 // The client may send requests with LB ID that has never been allocated
233 // by this load reporter. Those IDs are tracked and will be skipped when
234 // we generate a new ID.
235 if (!load_data_store_.IsTrackedUnknownBalancerId(lb_id_str)) {
241 ::grpc::lb::v1::LoadBalancingFeedback
242 LoadReporter::GenerateLoadBalancingFeedback() {
243 grpc_core::ReleasableMutexLock lock(&feedback_mu_);
244 auto now = std::chrono::system_clock::now();
245 // Discard records outside the window until there is only one record
246 // outside the window, which is used as the base for difference.
247 while (feedback_records_.size() > 1 &&
248 !IsRecordInWindow(feedback_records_[1], now)) {
249 feedback_records_.pop_front();
251 if (feedback_records_.size() < 2) {
252 return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
254 // Find the longest range with valid ends.
255 auto oldest = feedback_records_.begin();
256 auto newest = feedback_records_.end() - 1;
257 while (std::distance(oldest, newest) > 0 &&
258 (newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
259 // A zero limit means that the system info reading was failed, so these
260 // records can't be used to calculate CPU utilization.
261 if (newest->cpu_limit == 0) --newest;
262 if (oldest->cpu_limit == 0) ++oldest;
264 if (std::distance(oldest, newest) < 1 ||
265 oldest->end_time == newest->end_time ||
266 newest->cpu_limit == oldest->cpu_limit) {
267 return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
271 for (auto p = newest; p != oldest; --p) {
272 // Because these two numbers are counters, the oldest record shouldn't be
277 double cpu_usage = newest->cpu_usage - oldest->cpu_usage;
278 double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
279 std::chrono::duration<double> duration_seconds =
280 newest->end_time - oldest->end_time;
282 ::grpc::lb::v1::LoadBalancingFeedback feedback;
283 feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit));
284 feedback.set_calls_per_second(
285 static_cast<float>(rpcs / duration_seconds.count()));
286 feedback.set_errors_per_second(
287 static_cast<float>(errors / duration_seconds.count()));
291 ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load>
292 LoadReporter::GenerateLoads(const std::string& hostname,
293 const std::string& lb_id) {
294 grpc_core::MutexLock lock(&store_mu_);
295 auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id);
296 GPR_ASSERT(assigned_stores != nullptr);
297 GPR_ASSERT(!assigned_stores->empty());
298 ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> loads;
299 for (PerBalancerStore* per_balancer_store : *assigned_stores) {
300 GPR_ASSERT(!per_balancer_store->IsSuspended());
301 if (!per_balancer_store->load_record_map().empty()) {
302 for (const auto& p : per_balancer_store->load_record_map()) {
303 const auto& key = p.first;
304 const auto& value = p.second;
305 auto load = loads.Add();
306 load->set_load_balance_tag(key.lb_tag());
307 load->set_user_id(key.user_id());
308 load->set_client_ip_address(key.GetClientIpBytes());
309 load->set_num_calls_started(static_cast<int64_t>(value.start_count()));
310 load->set_num_calls_finished_without_error(
311 static_cast<int64_t>(value.ok_count()));
312 load->set_num_calls_finished_with_error(
313 static_cast<int64_t>(value.error_count()));
314 load->set_total_bytes_sent(static_cast<int64_t>(value.bytes_sent()));
315 load->set_total_bytes_received(
316 static_cast<int64_t>(value.bytes_recv()));
317 load->mutable_total_latency()->set_seconds(
318 static_cast<int64_t>(value.latency_ms() / 1000));
319 load->mutable_total_latency()->set_nanos(
320 (static_cast<int32_t>(value.latency_ms()) % 1000) * 1000000);
321 for (const auto& p : value.call_metrics()) {
322 const std::string& metric_name = p.first;
323 const CallMetricValue& metric_value = p.second;
324 auto call_metric_data = load->add_metric_data();
325 call_metric_data->set_metric_name(metric_name);
326 call_metric_data->set_num_calls_finished_with_metric(
327 metric_value.num_calls());
328 call_metric_data->set_total_metric_value(
329 metric_value.total_metric_value());
331 if (per_balancer_store->lb_id() != lb_id) {
332 // This per-balancer store is an orphan assigned to this receiving
334 AttachOrphanLoadId(load, *per_balancer_store);
337 per_balancer_store->ClearLoadRecordMap();
339 if (per_balancer_store->IsNumCallsInProgressChangedSinceLastReport()) {
340 auto load = loads.Add();
341 load->set_num_calls_in_progress(
342 per_balancer_store->GetNumCallsInProgressForReport());
343 if (per_balancer_store->lb_id() != lb_id) {
344 // This per-balancer store is an orphan assigned to this receiving
346 AttachOrphanLoadId(load, *per_balancer_store);
353 void LoadReporter::AttachOrphanLoadId(
354 ::grpc::lb::v1::Load* load, const PerBalancerStore& per_balancer_store) {
355 if (per_balancer_store.lb_id() == kInvalidLbId) {
356 load->set_load_key_unknown(true);
358 // We shouldn't set load_key_unknown to any value in this case because
359 // load_key_unknown and orphaned_load_identifier are under an oneof struct.
360 load->mutable_orphaned_load_identifier()->set_load_key(
361 per_balancer_store.load_key());
362 load->mutable_orphaned_load_identifier()->set_load_balancer_id(
363 per_balancer_store.lb_id());
367 void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) {
368 CpuStatsProvider::CpuStatsSample cpu_stats;
369 if (cpu_stats_provider_ != nullptr) {
370 cpu_stats = cpu_stats_provider_->GetCpuStats();
372 // This will make the load balancing feedback generation a no-op.
375 grpc_core::MutexLock lock(&feedback_mu_);
376 feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors,
377 cpu_stats.first, cpu_stats.second);
380 void LoadReporter::ReportStreamCreated(const std::string& hostname,
381 const std::string& lb_id,
382 const std::string& load_key) {
383 grpc_core::MutexLock lock(&store_mu_);
384 load_data_store_.ReportStreamCreated(hostname, lb_id, load_key);
386 "[LR %p] Report stream created (host: %s, LB ID: %s, load key: %s).",
387 this, hostname.c_str(), lb_id.c_str(), load_key.c_str());
390 void LoadReporter::ReportStreamClosed(const std::string& hostname,
391 const std::string& lb_id) {
392 grpc_core::MutexLock lock(&store_mu_);
393 load_data_store_.ReportStreamClosed(hostname, lb_id);
394 gpr_log(GPR_INFO, "[LR %p] Report stream closed (host: %s, LB ID: %s).", this,
395 hostname.c_str(), lb_id.c_str());
398 void LoadReporter::ProcessViewDataCallStart(
399 const CensusViewProvider::ViewDataMap& view_data_map) {
400 auto it = view_data_map.find(kViewStartCount);
401 if (it != view_data_map.end()) {
402 for (const auto& p : it->second.int_data()) {
403 const std::vector<std::string>& tag_values = p.first;
404 const uint64_t start_count = static_cast<uint64_t>(p.second);
405 const std::string& client_ip_and_token = tag_values[0];
406 const std::string& host = tag_values[1];
407 const std::string& user_id = tag_values[2];
408 LoadRecordKey key(client_ip_and_token, user_id);
409 LoadRecordValue value = LoadRecordValue(start_count);
411 grpc_core::MutexLock lock(&store_mu_);
412 load_data_store_.MergeRow(host, key, value);
418 void LoadReporter::ProcessViewDataCallEnd(
419 const CensusViewProvider::ViewDataMap& view_data_map) {
420 uint64_t total_end_count = 0;
421 uint64_t total_error_count = 0;
422 auto it = view_data_map.find(kViewEndCount);
423 if (it != view_data_map.end()) {
424 for (const auto& p : it->second.int_data()) {
425 const std::vector<std::string>& tag_values = p.first;
426 const uint64_t end_count = static_cast<uint64_t>(p.second);
427 const std::string& client_ip_and_token = tag_values[0];
428 const std::string& host = tag_values[1];
429 const std::string& user_id = tag_values[2];
430 const std::string& status = tag_values[3];
431 // This is due to a bug reported internally of Java server load reporting
433 // TODO(juanlishen): Check whether this situation happens in OSS C++.
434 if (client_ip_and_token.size() == 0) {
436 "Skipping processing Opencensus record with empty "
437 "client_ip_and_token tag.");
440 LoadRecordKey key(client_ip_and_token, user_id);
441 const uint64_t bytes_sent = CensusViewProvider::GetRelatedViewDataRowInt(
442 view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
444 const uint64_t bytes_received =
445 CensusViewProvider::GetRelatedViewDataRowInt(
446 view_data_map, kViewEndBytesReceived,
447 sizeof(kViewEndBytesReceived) - 1, tag_values);
448 const uint64_t latency_ms = CensusViewProvider::GetRelatedViewDataRowInt(
449 view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
451 uint64_t ok_count = 0;
452 uint64_t error_count = 0;
453 total_end_count += end_count;
454 if (std::strcmp(status.c_str(), kCallStatusOk) == 0) {
455 ok_count = end_count;
457 error_count = end_count;
458 total_error_count += end_count;
460 LoadRecordValue value = LoadRecordValue(
461 0, ok_count, error_count, bytes_sent, bytes_received, latency_ms);
463 grpc_core::MutexLock lock(&store_mu_);
464 load_data_store_.MergeRow(host, key, value);
468 AppendNewFeedbackRecord(total_end_count, total_error_count);
471 void LoadReporter::ProcessViewDataOtherCallMetrics(
472 const CensusViewProvider::ViewDataMap& view_data_map) {
473 auto it = view_data_map.find(kViewOtherCallMetricCount);
474 if (it != view_data_map.end()) {
475 for (const auto& p : it->second.int_data()) {
476 const std::vector<std::string>& tag_values = p.first;
477 const int64_t num_calls = p.second;
478 const std::string& client_ip_and_token = tag_values[0];
479 const std::string& host = tag_values[1];
480 const std::string& user_id = tag_values[2];
481 const std::string& metric_name = tag_values[3];
482 LoadRecordKey key(client_ip_and_token, user_id);
483 const double total_metric_value =
484 CensusViewProvider::GetRelatedViewDataRowDouble(
485 view_data_map, kViewOtherCallMetricValue,
486 sizeof(kViewOtherCallMetricValue) - 1, tag_values);
487 LoadRecordValue value = LoadRecordValue(
488 metric_name, static_cast<uint64_t>(num_calls), total_metric_value);
490 grpc_core::MutexLock lock(&store_mu_);
491 load_data_store_.MergeRow(host, key, value);
497 void LoadReporter::FetchAndSample() {
499 "[LR %p] Starts fetching Census view data and sampling LB feedback "
502 CensusViewProvider::ViewDataMap view_data_map =
503 census_view_provider_->FetchViewData();
504 ProcessViewDataCallStart(view_data_map);
505 ProcessViewDataCallEnd(view_data_map);
506 ProcessViewDataOtherCallMetrics(view_data_map);
509 } // namespace load_reporter