613cf3a03bc8033c86ecf9552289ef9ec69f5ae8
[platform/upstream/grpc.git] / src / cpp / server / load_reporter / load_reporter.cc
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/impl/codegen/port_platform.h>
20
21 #include <inttypes.h>
22 #include <stdint.h>
23 #include <stdio.h>
24 #include <chrono>
25 #include <ctime>
26 #include <iterator>
27
28 #include "src/cpp/server/load_reporter/constants.h"
29 #include "src/cpp/server/load_reporter/get_cpu_stats.h"
30 #include "src/cpp/server/load_reporter/load_reporter.h"
31
32 #include "opencensus/stats/internal/set_aggregation_window.h"
33 #include "opencensus/tags/tag_key.h"
34
35 namespace grpc {
36 namespace load_reporter {
37
38 CpuStatsProvider::CpuStatsSample CpuStatsProviderDefaultImpl::GetCpuStats() {
39   return GetCpuStatsImpl();
40 }
41
42 CensusViewProvider::CensusViewProvider()
43     : tag_key_token_(::opencensus::tags::TagKey::Register(kTagKeyToken)),
44       tag_key_host_(::opencensus::tags::TagKey::Register(kTagKeyHost)),
45       tag_key_user_id_(::opencensus::tags::TagKey::Register(kTagKeyUserId)),
46       tag_key_status_(::opencensus::tags::TagKey::Register(kTagKeyStatus)),
47       tag_key_metric_name_(
48           ::opencensus::tags::TagKey::Register(kTagKeyMetricName)) {
49   // One view related to starting a call.
50   auto vd_start_count =
51       ::opencensus::stats::ViewDescriptor()
52           .set_name(kViewStartCount)
53           .set_measure(kMeasureStartCount)
54           .set_aggregation(::opencensus::stats::Aggregation::Sum())
55           .add_column(tag_key_token_)
56           .add_column(tag_key_host_)
57           .add_column(tag_key_user_id_)
58           .set_description(
59               "Delta count of calls started broken down by <token, host, "
60               "user_id>.");
61   ::opencensus::stats::SetAggregationWindow(
62       ::opencensus::stats::AggregationWindow::Delta(), &vd_start_count);
63   view_descriptor_map_.emplace(kViewStartCount, vd_start_count);
64   // Four views related to ending a call.
65   // If this view is set as Count of kMeasureEndBytesSent (in hope of saving one
66   // measure), it's infeasible to prepare fake data for testing. That's because
67   // the OpenCensus API to make up view data will add the input data as separate
68   // measurements instead of setting the data values directly.
69   auto vd_end_count =
70       ::opencensus::stats::ViewDescriptor()
71           .set_name(kViewEndCount)
72           .set_measure(kMeasureEndCount)
73           .set_aggregation(::opencensus::stats::Aggregation::Sum())
74           .add_column(tag_key_token_)
75           .add_column(tag_key_host_)
76           .add_column(tag_key_user_id_)
77           .add_column(tag_key_status_)
78           .set_description(
79               "Delta count of calls ended broken down by <token, host, "
80               "user_id, status>.");
81   ::opencensus::stats::SetAggregationWindow(
82       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_count);
83   view_descriptor_map_.emplace(kViewEndCount, vd_end_count);
84   auto vd_end_bytes_sent =
85       ::opencensus::stats::ViewDescriptor()
86           .set_name(kViewEndBytesSent)
87           .set_measure(kMeasureEndBytesSent)
88           .set_aggregation(::opencensus::stats::Aggregation::Sum())
89           .add_column(tag_key_token_)
90           .add_column(tag_key_host_)
91           .add_column(tag_key_user_id_)
92           .add_column(tag_key_status_)
93           .set_description(
94               "Delta sum of bytes sent broken down by <token, host, user_id, "
95               "status>.");
96   ::opencensus::stats::SetAggregationWindow(
97       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_sent);
98   view_descriptor_map_.emplace(kViewEndBytesSent, vd_end_bytes_sent);
99   auto vd_end_bytes_received =
100       ::opencensus::stats::ViewDescriptor()
101           .set_name(kViewEndBytesReceived)
102           .set_measure(kMeasureEndBytesReceived)
103           .set_aggregation(::opencensus::stats::Aggregation::Sum())
104           .add_column(tag_key_token_)
105           .add_column(tag_key_host_)
106           .add_column(tag_key_user_id_)
107           .add_column(tag_key_status_)
108           .set_description(
109               "Delta sum of bytes received broken down by <token, host, "
110               "user_id, status>.");
111   ::opencensus::stats::SetAggregationWindow(
112       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_bytes_received);
113   view_descriptor_map_.emplace(kViewEndBytesReceived, vd_end_bytes_received);
114   auto vd_end_latency_ms =
115       ::opencensus::stats::ViewDescriptor()
116           .set_name(kViewEndLatencyMs)
117           .set_measure(kMeasureEndLatencyMs)
118           .set_aggregation(::opencensus::stats::Aggregation::Sum())
119           .add_column(tag_key_token_)
120           .add_column(tag_key_host_)
121           .add_column(tag_key_user_id_)
122           .add_column(tag_key_status_)
123           .set_description(
124               "Delta sum of latency in ms broken down by <token, host, "
125               "user_id, status>.");
126   ::opencensus::stats::SetAggregationWindow(
127       ::opencensus::stats::AggregationWindow::Delta(), &vd_end_latency_ms);
128   view_descriptor_map_.emplace(kViewEndLatencyMs, vd_end_latency_ms);
129   // Two views related to other call metrics.
130   auto vd_metric_call_count =
131       ::opencensus::stats::ViewDescriptor()
132           .set_name(kViewOtherCallMetricCount)
133           .set_measure(kMeasureOtherCallMetric)
134           .set_aggregation(::opencensus::stats::Aggregation::Count())
135           .add_column(tag_key_token_)
136           .add_column(tag_key_host_)
137           .add_column(tag_key_user_id_)
138           .add_column(tag_key_metric_name_)
139           .set_description(
140               "Delta count of calls broken down by <token, host, user_id, "
141               "metric_name>.");
142   ::opencensus::stats::SetAggregationWindow(
143       ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_call_count);
144   view_descriptor_map_.emplace(kViewOtherCallMetricCount, vd_metric_call_count);
145   auto vd_metric_value =
146       ::opencensus::stats::ViewDescriptor()
147           .set_name(kViewOtherCallMetricValue)
148           .set_measure(kMeasureOtherCallMetric)
149           .set_aggregation(::opencensus::stats::Aggregation::Sum())
150           .add_column(tag_key_token_)
151           .add_column(tag_key_host_)
152           .add_column(tag_key_user_id_)
153           .add_column(tag_key_metric_name_)
154           .set_description(
155               "Delta sum of call metric value broken down "
156               "by <token, host, user_id, metric_name>.");
157   ::opencensus::stats::SetAggregationWindow(
158       ::opencensus::stats::AggregationWindow::Delta(), &vd_metric_value);
159   view_descriptor_map_.emplace(kViewOtherCallMetricValue, vd_metric_value);
160 }
161
162 double CensusViewProvider::GetRelatedViewDataRowDouble(
163     const ViewDataMap& view_data_map, const char* view_name,
164     size_t view_name_len, const std::vector<std::string>& tag_values) {
165   auto it_vd = view_data_map.find(std::string(view_name, view_name_len));
166   GPR_ASSERT(it_vd != view_data_map.end());
167   GPR_ASSERT(it_vd->second.type() ==
168              ::opencensus::stats::ViewData::Type::kDouble);
169   auto it_row = it_vd->second.double_data().find(tag_values);
170   GPR_ASSERT(it_row != it_vd->second.double_data().end());
171   return it_row->second;
172 }
173
174 uint64_t CensusViewProvider::GetRelatedViewDataRowInt(
175     const ViewDataMap& view_data_map, const char* view_name,
176     size_t view_name_len, const std::vector<std::string>& tag_values) {
177   auto it_vd = view_data_map.find(std::string(view_name, view_name_len));
178   GPR_ASSERT(it_vd != view_data_map.end());
179   GPR_ASSERT(it_vd->second.type() ==
180              ::opencensus::stats::ViewData::Type::kInt64);
181   auto it_row = it_vd->second.int_data().find(tag_values);
182   GPR_ASSERT(it_row != it_vd->second.int_data().end());
183   GPR_ASSERT(it_row->second >= 0);
184   return it_row->second;
185 }
186
187 CensusViewProviderDefaultImpl::CensusViewProviderDefaultImpl() {
188   for (const auto& p : view_descriptor_map()) {
189     const std::string& view_name = p.first;
190     const ::opencensus::stats::ViewDescriptor& vd = p.second;
191     // We need to use pair's piecewise ctor here, otherwise the deleted copy
192     // ctor of View will be called.
193     view_map_.emplace(std::piecewise_construct,
194                       std::forward_as_tuple(view_name),
195                       std::forward_as_tuple(vd));
196   }
197 }
198
199 CensusViewProvider::ViewDataMap CensusViewProviderDefaultImpl::FetchViewData() {
200   gpr_log(GPR_DEBUG, "[CVP %p] Starts fetching Census view data.", this);
201   ViewDataMap view_data_map;
202   for (auto& p : view_map_) {
203     const std::string& view_name = p.first;
204     ::opencensus::stats::View& view = p.second;
205     if (view.IsValid()) {
206       view_data_map.emplace(view_name, view.GetData());
207       gpr_log(GPR_DEBUG, "[CVP %p] Fetched view data (view: %s).", this,
208               view_name.c_str());
209     } else {
210       gpr_log(
211           GPR_DEBUG,
212           "[CVP %p] Can't fetch view data because view is invalid (view: %s).",
213           this, view_name.c_str());
214     }
215   }
216   return view_data_map;
217 }
218
219 std::string LoadReporter::GenerateLbId() {
220   while (true) {
221     if (next_lb_id_ > UINT32_MAX) {
222       gpr_log(GPR_ERROR, "[LR %p] The LB ID exceeds the max valid value!",
223               this);
224       return "";
225     }
226     int64_t lb_id = next_lb_id_++;
227     // Overflow should never happen.
228     GPR_ASSERT(lb_id >= 0);
229     // Convert to padded hex string for a 32-bit LB ID. E.g, "0000ca5b".
230     char buf[kLbIdLength + 1];
231     snprintf(buf, sizeof(buf), "%08" PRIx64, lb_id);
232     std::string lb_id_str(buf, kLbIdLength);
233     // The client may send requests with LB ID that has never been allocated
234     // by this load reporter. Those IDs are tracked and will be skipped when
235     // we generate a new ID.
236     if (!load_data_store_.IsTrackedUnknownBalancerId(lb_id_str)) {
237       return lb_id_str;
238     }
239   }
240 }
241
242 ::grpc::lb::v1::LoadBalancingFeedback
243 LoadReporter::GenerateLoadBalancingFeedback() {
244   grpc_core::ReleasableMutexLock lock(&feedback_mu_);
245   auto now = std::chrono::system_clock::now();
246   // Discard records outside the window until there is only one record
247   // outside the window, which is used as the base for difference.
248   while (feedback_records_.size() > 1 &&
249          !IsRecordInWindow(feedback_records_[1], now)) {
250     feedback_records_.pop_front();
251   }
252   if (feedback_records_.size() < 2) {
253     return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
254   }
255   // Find the longest range with valid ends.
256   auto oldest = feedback_records_.begin();
257   auto newest = feedback_records_.end() - 1;
258   while (std::distance(oldest, newest) > 0 &&
259          (newest->cpu_limit == 0 || oldest->cpu_limit == 0)) {
260     // A zero limit means that the system info reading was failed, so these
261     // records can't be used to calculate CPU utilization.
262     if (newest->cpu_limit == 0) --newest;
263     if (oldest->cpu_limit == 0) ++oldest;
264   }
265   if (std::distance(oldest, newest) < 1 ||
266       oldest->end_time == newest->end_time ||
267       newest->cpu_limit == oldest->cpu_limit) {
268     return ::grpc::lb::v1::LoadBalancingFeedback::default_instance();
269   }
270   uint64_t rpcs = 0;
271   uint64_t errors = 0;
272   for (auto p = newest; p != oldest; --p) {
273     // Because these two numbers are counters, the oldest record shouldn't be
274     // included.
275     rpcs += p->rpcs;
276     errors += p->errors;
277   }
278   double cpu_usage = newest->cpu_usage - oldest->cpu_usage;
279   double cpu_limit = newest->cpu_limit - oldest->cpu_limit;
280   std::chrono::duration<double> duration_seconds =
281       newest->end_time - oldest->end_time;
282   lock.Release();
283   ::grpc::lb::v1::LoadBalancingFeedback feedback;
284   feedback.set_server_utilization(static_cast<float>(cpu_usage / cpu_limit));
285   feedback.set_calls_per_second(
286       static_cast<float>(rpcs / duration_seconds.count()));
287   feedback.set_errors_per_second(
288       static_cast<float>(errors / duration_seconds.count()));
289   return feedback;
290 }
291
292 ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load>
293 LoadReporter::GenerateLoads(const std::string& hostname,
294                             const std::string& lb_id) {
295   grpc_core::MutexLock lock(&store_mu_);
296   auto assigned_stores = load_data_store_.GetAssignedStores(hostname, lb_id);
297   GPR_ASSERT(assigned_stores != nullptr);
298   GPR_ASSERT(!assigned_stores->empty());
299   ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> loads;
300   for (PerBalancerStore* per_balancer_store : *assigned_stores) {
301     GPR_ASSERT(!per_balancer_store->IsSuspended());
302     if (!per_balancer_store->load_record_map().empty()) {
303       for (const auto& p : per_balancer_store->load_record_map()) {
304         const auto& key = p.first;
305         const auto& value = p.second;
306         auto load = loads.Add();
307         load->set_load_balance_tag(key.lb_tag());
308         load->set_user_id(key.user_id());
309         load->set_client_ip_address(key.GetClientIpBytes());
310         load->set_num_calls_started(static_cast<int64_t>(value.start_count()));
311         load->set_num_calls_finished_without_error(
312             static_cast<int64_t>(value.ok_count()));
313         load->set_num_calls_finished_with_error(
314             static_cast<int64_t>(value.error_count()));
315         load->set_total_bytes_sent(static_cast<int64_t>(value.bytes_sent()));
316         load->set_total_bytes_received(
317             static_cast<int64_t>(value.bytes_recv()));
318         load->mutable_total_latency()->set_seconds(
319             static_cast<int64_t>(value.latency_ms() / 1000));
320         load->mutable_total_latency()->set_nanos(
321             (static_cast<int32_t>(value.latency_ms()) % 1000) * 1000000);
322         for (const auto& p : value.call_metrics()) {
323           const std::string& metric_name = p.first;
324           const CallMetricValue& metric_value = p.second;
325           auto call_metric_data = load->add_metric_data();
326           call_metric_data->set_metric_name(metric_name);
327           call_metric_data->set_num_calls_finished_with_metric(
328               metric_value.num_calls());
329           call_metric_data->set_total_metric_value(
330               metric_value.total_metric_value());
331         }
332         if (per_balancer_store->lb_id() != lb_id) {
333           // This per-balancer store is an orphan assigned to this receiving
334           // balancer.
335           AttachOrphanLoadId(load, *per_balancer_store);
336         }
337       }
338       per_balancer_store->ClearLoadRecordMap();
339     }
340     if (per_balancer_store->IsNumCallsInProgressChangedSinceLastReport()) {
341       auto load = loads.Add();
342       load->set_num_calls_in_progress(
343           per_balancer_store->GetNumCallsInProgressForReport());
344       if (per_balancer_store->lb_id() != lb_id) {
345         // This per-balancer store is an orphan assigned to this receiving
346         // balancer.
347         AttachOrphanLoadId(load, *per_balancer_store);
348       }
349     }
350   }
351   return loads;
352 }
353
354 void LoadReporter::AttachOrphanLoadId(
355     ::grpc::lb::v1::Load* load, const PerBalancerStore& per_balancer_store) {
356   if (per_balancer_store.lb_id() == kInvalidLbId) {
357     load->set_load_key_unknown(true);
358   } else {
359     // We shouldn't set load_key_unknown to any value in this case because
360     // load_key_unknown and orphaned_load_identifier are under an oneof struct.
361     load->mutable_orphaned_load_identifier()->set_load_key(
362         per_balancer_store.load_key());
363     load->mutable_orphaned_load_identifier()->set_load_balancer_id(
364         per_balancer_store.lb_id());
365   }
366 }
367
368 void LoadReporter::AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors) {
369   CpuStatsProvider::CpuStatsSample cpu_stats;
370   if (cpu_stats_provider_ != nullptr) {
371     cpu_stats = cpu_stats_provider_->GetCpuStats();
372   } else {
373     // This will make the load balancing feedback generation a no-op.
374     cpu_stats = {0, 0};
375   }
376   grpc_core::MutexLock lock(&feedback_mu_);
377   feedback_records_.emplace_back(std::chrono::system_clock::now(), rpcs, errors,
378                                  cpu_stats.first, cpu_stats.second);
379 }
380
381 void LoadReporter::ReportStreamCreated(const std::string& hostname,
382                                        const std::string& lb_id,
383                                        const std::string& load_key) {
384   grpc_core::MutexLock lock(&store_mu_);
385   load_data_store_.ReportStreamCreated(hostname, lb_id, load_key);
386   gpr_log(GPR_INFO,
387           "[LR %p] Report stream created (host: %s, LB ID: %s, load key: %s).",
388           this, hostname.c_str(), lb_id.c_str(), load_key.c_str());
389 }
390
391 void LoadReporter::ReportStreamClosed(const std::string& hostname,
392                                       const std::string& lb_id) {
393   grpc_core::MutexLock lock(&store_mu_);
394   load_data_store_.ReportStreamClosed(hostname, lb_id);
395   gpr_log(GPR_INFO, "[LR %p] Report stream closed (host: %s, LB ID: %s).", this,
396           hostname.c_str(), lb_id.c_str());
397 }
398
399 void LoadReporter::ProcessViewDataCallStart(
400     const CensusViewProvider::ViewDataMap& view_data_map) {
401   auto it = view_data_map.find(kViewStartCount);
402   if (it != view_data_map.end()) {
403     for (const auto& p : it->second.int_data()) {
404       const std::vector<std::string>& tag_values = p.first;
405       const uint64_t start_count = static_cast<uint64_t>(p.second);
406       const std::string& client_ip_and_token = tag_values[0];
407       const std::string& host = tag_values[1];
408       const std::string& user_id = tag_values[2];
409       LoadRecordKey key(client_ip_and_token, user_id);
410       LoadRecordValue value = LoadRecordValue(start_count);
411       {
412         grpc_core::MutexLock lock(&store_mu_);
413         load_data_store_.MergeRow(host, key, value);
414       }
415     }
416   }
417 }
418
419 void LoadReporter::ProcessViewDataCallEnd(
420     const CensusViewProvider::ViewDataMap& view_data_map) {
421   uint64_t total_end_count = 0;
422   uint64_t total_error_count = 0;
423   auto it = view_data_map.find(kViewEndCount);
424   if (it != view_data_map.end()) {
425     for (const auto& p : it->second.int_data()) {
426       const std::vector<std::string>& tag_values = p.first;
427       const uint64_t end_count = static_cast<uint64_t>(p.second);
428       const std::string& client_ip_and_token = tag_values[0];
429       const std::string& host = tag_values[1];
430       const std::string& user_id = tag_values[2];
431       const std::string& status = tag_values[3];
432       // This is due to a bug reported internally of Java server load reporting
433       // implementation.
434       // TODO(juanlishen): Check whether this situation happens in OSS C++.
435       if (client_ip_and_token.empty()) {
436         gpr_log(GPR_DEBUG,
437                 "Skipping processing Opencensus record with empty "
438                 "client_ip_and_token tag.");
439         continue;
440       }
441       LoadRecordKey key(client_ip_and_token, user_id);
442       const uint64_t bytes_sent = CensusViewProvider::GetRelatedViewDataRowInt(
443           view_data_map, kViewEndBytesSent, sizeof(kViewEndBytesSent) - 1,
444           tag_values);
445       const uint64_t bytes_received =
446           CensusViewProvider::GetRelatedViewDataRowInt(
447               view_data_map, kViewEndBytesReceived,
448               sizeof(kViewEndBytesReceived) - 1, tag_values);
449       const uint64_t latency_ms = CensusViewProvider::GetRelatedViewDataRowInt(
450           view_data_map, kViewEndLatencyMs, sizeof(kViewEndLatencyMs) - 1,
451           tag_values);
452       uint64_t ok_count = 0;
453       uint64_t error_count = 0;
454       total_end_count += end_count;
455       if (std::strcmp(status.c_str(), kCallStatusOk) == 0) {
456         ok_count = end_count;
457       } else {
458         error_count = end_count;
459         total_error_count += end_count;
460       }
461       LoadRecordValue value = LoadRecordValue(
462           0, ok_count, error_count, bytes_sent, bytes_received, latency_ms);
463       {
464         grpc_core::MutexLock lock(&store_mu_);
465         load_data_store_.MergeRow(host, key, value);
466       }
467     }
468   }
469   AppendNewFeedbackRecord(total_end_count, total_error_count);
470 }
471
472 void LoadReporter::ProcessViewDataOtherCallMetrics(
473     const CensusViewProvider::ViewDataMap& view_data_map) {
474   auto it = view_data_map.find(kViewOtherCallMetricCount);
475   if (it != view_data_map.end()) {
476     for (const auto& p : it->second.int_data()) {
477       const std::vector<std::string>& tag_values = p.first;
478       const int64_t num_calls = p.second;
479       const std::string& client_ip_and_token = tag_values[0];
480       const std::string& host = tag_values[1];
481       const std::string& user_id = tag_values[2];
482       const std::string& metric_name = tag_values[3];
483       LoadRecordKey key(client_ip_and_token, user_id);
484       const double total_metric_value =
485           CensusViewProvider::GetRelatedViewDataRowDouble(
486               view_data_map, kViewOtherCallMetricValue,
487               sizeof(kViewOtherCallMetricValue) - 1, tag_values);
488       LoadRecordValue value = LoadRecordValue(
489           metric_name, static_cast<uint64_t>(num_calls), total_metric_value);
490       {
491         grpc_core::MutexLock lock(&store_mu_);
492         load_data_store_.MergeRow(host, key, value);
493       }
494     }
495   }
496 }
497
498 void LoadReporter::FetchAndSample() {
499   gpr_log(GPR_DEBUG,
500           "[LR %p] Starts fetching Census view data and sampling LB feedback "
501           "record.",
502           this);
503   CensusViewProvider::ViewDataMap view_data_map =
504       census_view_provider_->FetchViewData();
505   ProcessViewDataCallStart(view_data_map);
506   ProcessViewDataCallEnd(view_data_map);
507   ProcessViewDataOtherCallMetrics(view_data_map);
508 }
509
510 }  // namespace load_reporter
511 }  // namespace grpc