Imported Upstream version 1.33.1
[platform/upstream/grpc.git] / src / cpp / server / load_reporter / load_reporter.cc
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/impl/codegen/port_platform.h>
20
21 #include <stdint.h>
22 #include <stdio.h>
23 #include <chrono>
24 #include <ctime>
25 #include <iterator>
26
27 #include "src/cpp/server/load_reporter/constants.h"
28 #include "src/cpp/server/load_reporter/get_cpu_stats.h"
29 #include "src/cpp/server/load_reporter/load_reporter.h"
30
31 #include "opencensus/stats/internal/set_aggregation_window.h"
32 #include "opencensus/tags/tag_key.h"
33
34 namespace grpc {
35 namespace load_reporter {
36
37 CpuStatsProvider::CpuStatsSample CpuStatsProviderDefaultImpl::GetCpuStats() {
38   return GetCpuStatsImpl();
39 }
40
41 CensusViewProvider::CensusViewProvider()
42     : tag_key_token_(::opencensus::tags::TagKey::Register(kTagKeyToken)),
43       tag_key_host_(::opencensus::tags::TagKey::Register(kTagKeyHost)),
44       tag_key_user_id_(::opencensus::tags::TagKey::Register(kTagKeyUserId)),
45       tag_key_status_(::opencensus::tags::TagKey::Register(kTagKeyStatus)),
46       tag_key_metric_name_(
47           ::opencensus::tags::TagKey::Register(kTagKeyMetricName)) {
48   // One view related to starting a call.
49   auto vd_start_count =
50       ::opencensus::stats::ViewDescriptor()
51           .set_name(kViewStartCount)
52           .set_measure(kMeasureStartCount)
53           .set_aggregation(::opencensus::stats::Aggregation::Sum())
54           .add_column(tag_key_token_)
55           .add_column(tag_key_host_)
56           .add_column(tag_key_user_id_)
57           .set_description(
58               "Delta count of calls started broken down by <token, host, "
59               "user_id>.");
60   ::opencensus::stats::SetAggregationWindow(
61       ::opencensus::stats::AggregationWindow::Delta(), &vd_start_count);
62   view_descriptor_map_.emplace(kViewStartCount, vd_start_count);
63   // Four views related to ending a call.
64   // If this view is set as Count of kMeasureEndBytesSent (in hope of saving one
65   // measure), it's infeasible to prepare fake data for testing. That's because
66   // the OpenCensus API to make up view data will add the input data as separate
67   // measurements instead of setting the data values directly.
68   auto vd_end_count =
69       ::opencensus::stats::ViewDescriptor()
70           .set_name(kViewEndCount)
71           .set_measure(kMeasureEndCount)
72           .set_aggregation(::opencensus::stats::Aggregation::Sum())
73           .add_column(tag_key_token_)
74           .add_column(tag_key_host_)
75           .add_column(tag_key_user_id_)
76           .add_column(tag_key_status_)
77           .set_description(
78               "Delta count of calls ended broken down by <token, host, "
79               "user_id, status>.");
80   ::opencensus::stats::SetAggregationWindow(
81       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_count);
82   view_descriptor_map_.emplace(kViewEndCount, vd_end_count);
83   auto vd_end_bytes_sent =
84       ::opencensus::stats::ViewDescriptor()
85           .set_name(kViewEndBytesSent)
86           .set_measure(kMeasureEndBytesSent)
87           .set_aggregation(::opencensus::stats::Aggregation::Sum())
88           .add_column(tag_key_token_)
89           .add_column(tag_key_host_)
90           .add_column(tag_key_user_id_)
91           .add_column(tag_key_status_)
92           .set_description(
93               "Delta sum of bytes sent broken down by <token, host, user_id, "
94               "status>.");
95   ::opencensus::stats::SetAggregationWindow(
96       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_sent);
97   view_descriptor_map_.emplace(kViewEndBytesSent, vd_end_bytes_sent);
98   auto vd_end_bytes_received =
99       ::opencensus::stats::ViewDescriptor()
100           .set_name(kViewEndBytesReceived)
101           .set_measure(kMeasureEndBytesReceived)
102           .set_aggregation(::opencensus::stats::Aggregation::Sum())
103           .add_column(tag_key_token_)
104           .add_column(tag_key_host_)
105           .add_column(tag_key_user_id_)
106           .add_column(tag_key_status_)
107           .set_description(
108               "Delta sum of bytes received broken down by <token, host, "
109               "user_id, status>.");
110   ::opencensus::stats::SetAggregationWindow(
111       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_received);
112   view_descriptor_map_.emplace(kViewEndBytesReceived, vd_end_bytes_received);
113   auto vd_end_latency_ms =
114       ::opencensus::stats::ViewDescriptor()
115           .set_name(kViewEndLatencyMs)
116           .set_measure(kMeasureEndLatencyMs)
117           .set_aggregation(::opencensus::stats::Aggregation::Sum())
118           .add_column(tag_key_token_)
119           .add_column(tag_key_host_)
120           .add_column(tag_key_user_id_)
121           .add_column(tag_key_status_)
122           .set_description(
123               "Delta sum of latency in ms broken down by <token, host, "
124               "user_id, status>.");
125   ::opencensus::stats::SetAggregationWindow(
126       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_latency_ms);
127   view_descriptor_map_.emplace(kViewEndLatencyMs, vd_end_latency_ms);
128   // Two views related to other call metrics.
129   auto vd_metric_call_count =
130       ::opencensus::stats::ViewDescriptor()
131           .set_name(kViewOtherCallMetricCount)
132           .set_measure(kMeasureOtherCallMetric)
133           .set_aggregation(::opencensus::stats::Aggregation::Count())
134           .add_column(tag_key_token_)
135           .add_column(tag_key_host_)
136           .add_column(tag_key_user_id_)
137           .add_column(tag_key_metric_name_)
138           .set_description(
139               "Delta count of calls broken down by <token, host, user_id, "
140               "metric_name>.");
141   ::opencensus::stats::SetAggregationWindow(
142       ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_call_count);
143   view_descriptor_map_.emplace(kViewOtherCallMetricCount, vd_metric_call_count);
144   auto vd_metric_value =
145       ::opencensus::stats::ViewDescriptor()
146           .set_name(kViewOtherCallMetricValue)
147           .set_measure(kMeasureOtherCallMetric)
148           .set_aggregation(::opencensus::stats::Aggregation::Sum())
149           .add_column(tag_key_token_)
150           .add_column(tag_key_host_)
151           .add_column(tag_key_user_id_)
152           .add_column(tag_key_metric_name_)
153           .set_description(
154               "Delta sum of call metric value broken down "
155               "by <token, host, user_id, metric_name>.");
156   ::opencensus::stats::SetAggregationWindow(
157       ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_value);
158   view_descriptor_map_.emplace(kViewOtherCallMetricValue, vd_metric_value);
159 }
160
161 double CensusViewProvider::GetRelatedViewDataRowDouble(
162     const ViewDataMap& view_data_map, const char* view_name,
163     size_t view_name_len, const std::vector<std::string>& tag_values) {
164   auto it_vd = view_data_map.find(std::string(view_name, view_name_len));
165   GPR_ASSERT(it_vd != view_data_map.end());
166   GPR_ASSERT(it_vd->second.type() ==
167              ::opencensus::stats::ViewData::Type::kDouble);
168   auto it_row = it_vd->second.double_data().find(tag_values);
169   GPR_ASSERT(it_row != it_vd->second.double_data().end());
170   return it_row->second;
171 }
172
173 uint64_t CensusViewProvider::GetRelatedViewDataRowInt(
174     const ViewDataMap& view_data_map, const char* view_name,
175     size_t view_name_len, const std::vector<std::string>& tag_values) {
176   auto it_vd = view_data_map.find(std::string(view_name, view_name_len));
177   GPR_ASSERT(it_vd != view_data_map.end());
178   GPR_ASSERT(it_vd->second.type() ==
179              ::opencensus::stats::ViewData::Type::kInt64);
180   auto it_row = it_vd->second.int_data().find(tag_values);
181   GPR_ASSERT(it_row != it_vd->second.int_data().end());
182   GPR_ASSERT(it_row->second >= 0);
183   return it_row->second;
184 }
185
186 CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl() {
187   for (const auto& p : view_descriptor_map()) {
188     const std::string& view_name = p.first;
189     const ::opencensus::stats::ViewDescriptor& vd = p.second;
190     // We need to use pair's piecewise ctor here, otherwise the deleted copy
191     // ctor of View will be called.
192     view_map_.emplace(std::piecewise_construct,
193                       std::forward_as_tuple(view_name),
194                       std::forward_as_tuple(vd));
195   }
196 }
197
198 CensusViewProvider::ViewDataMap CensusViewProviderDefaultImpl::FetchViewData() {
199   gpr_log(GPR_DEBUG, "[CVP %p] Starts fetching Census view data.", this);
200   ViewDataMap view_data_map;
201   for (auto& p : view_map_) {
202     const std::string& view_name = p.first;
203     ::opencensus::stats::View& view = p.second;
204     if (view.IsValid()) {
205       view_data_map.emplace(view_name, view.GetData());
206       gpr_log(GPR_DEBUG, "[CVP %p] Fetched view data (view: %s).", this,
207               view_name.c_str());
208     } else {
209       gpr_log(
210           GPR_DEBUG,
211           "[CVP %p] Can't fetch view data because view is invalid (view: %s).",
212           this, view_name.c_str());
213     }
214   }
215   return view_data_map;
216 }
217
218 std::string LoadReporter::GenerateLbId() {
219   while (true) {
220     if (next_lb_id_ > UINT32_MAX) {
221       gpr_log(GPR_ERROR, "[LR %p] The LB ID exceeds the max valid value!",
222               this);
223       return "";
224     }
225     int64_t lb_id = next_lb_id_++;
226     // Overflow should never happen.
227     GPR_ASSERT(lb_id >= 0);
228     // Convert to padded hex string for a 32-bit LB ID. E.g, "0000ca5b".
229     char buf[kLbIdLength + 1];
230     snprintf(buf, sizeof(buf), "%08lx", lb_id);
231     std::string lb_id_str(buf, kLbIdLength);
232     // The client may send requests with LB ID that has never been allocated
233     // by this load reporter. Those IDs are tracked and will be skipped when
234     // we generate a new ID.
235     if (!load_data_store_.IsTrackedUnknownBalancerId(lb_id_str)) {
236       return lb_id_str;
237     }
238   }
239 }
240
241 ::grpc::lb::v1::LoadBalancingFeedback
242 LoadReporter::GenerateLoadBalancingFeedback() {
243   grpc_core::ReleasableMutexLock lock(&feedback_mu_);
244   auto now = std::chrono::system_clock::now();
245   // Discard records outside the window until there is only one record
246   // outside the window, which is used as the base for difference.
247   while (feedback_records_.size() > 1 &&
248          !IsRecordInWindow(feedback_records_[1], now)) {
249     feedback_records_.pop_front();
250   }
251   if (feedback_records_.size() < 2) {
252     return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
253   }
254   // Find the longest range with valid ends.
255   auto oldest = feedback_records_.begin();
256   auto newest = feedback_records_.end() - 1;
257   while (std::distance(oldest, newest) > 0 &&
258          (newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
259     // A zero limit means that the system info reading was failed, so these
260     // records can't be used to calculate CPU utilization.
261     if (newest->cpu_limit == 0) --newest;
262     if (oldest->cpu_limit == 0) ++oldest;
263   }
264   if (std::distance(oldest, newest) < 1 ||
265       oldest->end_time == newest->end_time ||
266       newest->cpu_limit == oldest->cpu_limit) {
267     return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
268   }
269   uint64_t rpcs = 0;
270   uint64_t errors = 0;
271   for (auto p = newest; p != oldest; --p) {
272     // Because these two numbers are counters, the oldest record shouldn't be
273     // included.
274     rpcs += p->rpcs;
275     errors += p->errors;
276   }
277   double cpu_usage = newest->cpu_usage - oldest->cpu_usage;
278   double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
279   std::chrono::duration<double> duration_seconds =
280       newest->end_time - oldest->end_time;
281   lock.Unlock();
282   ::grpc::lb::v1::LoadBalancingFeedback feedback;
283   feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit));
284   feedback.set_calls_per_second(
285       static_cast<float>(rpcs / duration_seconds.count()));
286   feedback.set_errors_per_second(
287       static_cast<float>(errors / duration_seconds.count()));
288   return feedback;
289 }
290
291 ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load>
292 LoadReporter::GenerateLoads(const std::string& hostname,
293                             const std::string& lb_id) {
294   grpc_core::MutexLock lock(&store_mu_);
295   auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id);
296   GPR_ASSERT(assigned_stores != nullptr);
297   GPR_ASSERT(!assigned_stores->empty());
298   ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> loads;
299   for (PerBalancerStore* per_balancer_store : *assigned_stores) {
300     GPR_ASSERT(!per_balancer_store->IsSuspended());
301     if (!per_balancer_store->load_record_map().empty()) {
302       for (const auto& p : per_balancer_store->load_record_map()) {
303         const auto& key = p.first;
304         const auto& value = p.second;
305         auto load = loads.Add();
306         load->set_load_balance_tag(key.lb_tag());
307         load->set_user_id(key.user_id());
308         load->set_client_ip_address(key.GetClientIpBytes());
309         load->set_num_calls_started(static_cast<int64_t>(value.start_count()));
310         load->set_num_calls_finished_without_error(
311             static_cast<int64_t>(value.ok_count()));
312         load->set_num_calls_finished_with_error(
313             static_cast<int64_t>(value.error_count()));
314         load->set_total_bytes_sent(static_cast<int64_t>(value.bytes_sent()));
315         load->set_total_bytes_received(
316             static_cast<int64_t>(value.bytes_recv()));
317         load->mutable_total_latency()->set_seconds(
318             static_cast<int64_t>(value.latency_ms() / 1000));
319         load->mutable_total_latency()->set_nanos(
320             (static_cast<int32_t>(value.latency_ms()) % 1000) * 1000000);
321         for (const auto& p : value.call_metrics()) {
322           const std::string& metric_name = p.first;
323           const CallMetricValue& metric_value = p.second;
324           auto call_metric_data = load->add_metric_data();
325           call_metric_data->set_metric_name(metric_name);
326           call_metric_data->set_num_calls_finished_with_metric(
327               metric_value.num_calls());
328           call_metric_data->set_total_metric_value(
329               metric_value.total_metric_value());
330         }
331         if (per_balancer_store->lb_id() != lb_id) {
332           // This per-balancer store is an orphan assigned to this receiving
333           // balancer.
334           AttachOrphanLoadId(load, *per_balancer_store);
335         }
336       }
337       per_balancer_store->ClearLoadRecordMap();
338     }
339     if (per_balancer_store->IsNumCallsInProgressChangedSinceLastReport()) {
340       auto load = loads.Add();
341       load->set_num_calls_in_progress(
342           per_balancer_store->GetNumCallsInProgressForReport());
343       if (per_balancer_store->lb_id() != lb_id) {
344         // This per-balancer store is an orphan assigned to this receiving
345         // balancer.
346         AttachOrphanLoadId(load, *per_balancer_store);
347       }
348     }
349   }
350   return loads;
351 }
352
353 void LoadReporter::AttachOrphanLoadId(
354     ::grpc::lb::v1::Load* load, const PerBalancerStore& per_balancer_store) {
355   if (per_balancer_store.lb_id() == kInvalidLbId) {
356     load->set_load_key_unknown(true);
357   } else {
358     // We shouldn't set load_key_unknown to any value in this case because
359     // load_key_unknown and orphaned_load_identifier are under an oneof struct.
360     load->mutable_orphaned_load_identifier()->set_load_key(
361         per_balancer_store.load_key());
362     load->mutable_orphaned_load_identifier()->set_load_balancer_id(
363         per_balancer_store.lb_id());
364   }
365 }
366
367 void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) {
368   CpuStatsProvider::CpuStatsSample cpu_stats;
369   if (cpu_stats_provider_ != nullptr) {
370     cpu_stats = cpu_stats_provider_->GetCpuStats();
371   } else {
372     // This will make the load balancing feedback generation a no-op.
373     cpu_stats = {0, 0};
374   }
375   grpc_core::MutexLock lock(&feedback_mu_);
376   feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors,
377                                  cpu_stats.first, cpu_stats.second);
378 }
379
380 void LoadReporter::ReportStreamCreated(const std::string& hostname,
381                                        const std::string& lb_id,
382                                        const std::string& load_key) {
383   grpc_core::MutexLock lock(&store_mu_);
384   load_data_store_.ReportStreamCreated(hostname, lb_id, load_key);
385   gpr_log(GPR_INFO,
386           "[LR %p] Report stream created (host: %s, LB ID: %s, load key: %s).",
387           this, hostname.c_str(), lb_id.c_str(), load_key.c_str());
388 }
389
390 void LoadReporter::ReportStreamClosed(const std::string& hostname,
391                                       const std::string& lb_id) {
392   grpc_core::MutexLock lock(&store_mu_);
393   load_data_store_.ReportStreamClosed(hostname, lb_id);
394   gpr_log(GPR_INFO, "[LR %p] Report stream closed (host: %s, LB ID: %s).", this,
395           hostname.c_str(), lb_id.c_str());
396 }
397
398 void LoadReporter::ProcessViewDataCallStart(
399     const CensusViewProvider::ViewDataMap& view_data_map) {
400   auto it = view_data_map.find(kViewStartCount);
401   if (it != view_data_map.end()) {
402     for (const auto& p : it->second.int_data()) {
403       const std::vector<std::string>& tag_values = p.first;
404       const uint64_t start_count = static_cast<uint64_t>(p.second);
405       const std::string& client_ip_and_token = tag_values[0];
406       const std::string& host = tag_values[1];
407       const std::string& user_id = tag_values[2];
408       LoadRecordKey key(client_ip_and_token, user_id);
409       LoadRecordValue value = LoadRecordValue(start_count);
410       {
411         grpc_core::MutexLock lock(&store_mu_);
412         load_data_store_.MergeRow(host, key, value);
413       }
414     }
415   }
416 }
417
418 void LoadReporter::ProcessViewDataCallEnd(
419     const CensusViewProvider::ViewDataMap& view_data_map) {
420   uint64_t total_end_count = 0;
421   uint64_t total_error_count = 0;
422   auto it = view_data_map.find(kViewEndCount);
423   if (it != view_data_map.end()) {
424     for (const auto& p : it->second.int_data()) {
425       const std::vector<std::string>& tag_values = p.first;
426       const uint64_t end_count = static_cast<uint64_t>(p.second);
427       const std::string& client_ip_and_token = tag_values[0];
428       const std::string& host = tag_values[1];
429       const std::string& user_id = tag_values[2];
430       const std::string& status = tag_values[3];
431       // This is due to a bug reported internally of Java server load reporting
432       // implementation.
433       // TODO(juanlishen): Check whether this situation happens in OSS C++.
434       if (client_ip_and_token.size() == 0) {
435         gpr_log(GPR_DEBUG,
436                 "Skipping processing Opencensus record with empty "
437                 "client_ip_and_token tag.");
438         continue;
439       }
440       LoadRecordKey key(client_ip_and_token, user_id);
441       const uint64_t bytes_sent = CensusViewProvider::GetRelatedViewDataRowInt(
442           view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
443           tag_values);
444       const uint64_t bytes_received =
445           CensusViewProvider::GetRelatedViewDataRowInt(
446               view_data_map, kViewEndBytesReceived,
447               sizeof(kViewEndBytesReceived) - 1, tag_values);
448       const uint64_t latency_ms = CensusViewProvider::GetRelatedViewDataRowInt(
449           view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
450           tag_values);
451       uint64_t ok_count = 0;
452       uint64_t error_count = 0;
453       total_end_count += end_count;
454       if (std::strcmp(status.c_str(), kCallStatusOk) == 0) {
455         ok_count = end_count;
456       } else {
457         error_count = end_count;
458         total_error_count += end_count;
459       }
460       LoadRecordValue value = LoadRecordValue(
461           0, ok_count, error_count, bytes_sent, bytes_received, latency_ms);
462       {
463         grpc_core::MutexLock lock(&store_mu_);
464         load_data_store_.MergeRow(host, key, value);
465       }
466     }
467   }
468   AppendNewFeedbackRecord(total_end_count, total_error_count);
469 }
470
471 void LoadReporter::ProcessViewDataOtherCallMetrics(
472     const CensusViewProvider::ViewDataMap& view_data_map) {
473   auto it = view_data_map.find(kViewOtherCallMetricCount);
474   if (it != view_data_map.end()) {
475     for (const auto& p : it->second.int_data()) {
476       const std::vector<std::string>& tag_values = p.first;
477       const int64_t num_calls = p.second;
478       const std::string& client_ip_and_token = tag_values[0];
479       const std::string& host = tag_values[1];
480       const std::string& user_id = tag_values[2];
481       const std::string& metric_name = tag_values[3];
482       LoadRecordKey key(client_ip_and_token, user_id);
483       const double total_metric_value =
484           CensusViewProvider::GetRelatedViewDataRowDouble(
485               view_data_map, kViewOtherCallMetricValue,
486               sizeof(kViewOtherCallMetricValue) - 1, tag_values);
487       LoadRecordValue value = LoadRecordValue(
488           metric_name, static_cast<uint64_t>(num_calls), total_metric_value);
489       {
490         grpc_core::MutexLock lock(&store_mu_);
491         load_data_store_.MergeRow(host, key, value);
492       }
493     }
494   }
495 }
496
497 void LoadReporter::FetchAndSample() {
498   gpr_log(GPR_DEBUG,
499           "[LR %p] Starts fetching Census view data and sampling LB feedback "
500           "record.",
501           this);
502   CensusViewProvider::ViewDataMap view_data_map =
503       census_view_provider_->FetchViewData();
504   ProcessViewDataCallStart(view_data_map);
505   ProcessViewDataCallEnd(view_data_map);
506   ProcessViewDataOtherCallMetrics(view_data_map);
507 }
508
509 }  // namespace load_reporter
510 }  // namespace grpc