3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H
22 #include <grpc/support/port_platform.h>
29 #include "opencensus/stats/stats.h"
30 #include "opencensus/tags/tag_key.h"
32 #include <grpc/support/log.h>
33 #include <grpcpp/impl/codegen/config.h>
35 #include "src/core/lib/gprpp/sync.h"
36 #include "src/cpp/server/load_reporter/load_data_store.h"
37 #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
40 namespace load_reporter {
42 // The interface to get the Census stats. Abstracted for mocking.
43 class CensusViewProvider {
45 // Maps from the view name to the view data.
47 std::unordered_map<std::string, ::opencensus::stats::ViewData>;
48 // Maps from the view name to the view descriptor.
49 using ViewDescriptorMap =
50 std::unordered_map<std::string, ::opencensus::stats::ViewDescriptor>;
53 virtual ~CensusViewProvider() = default;
55 // Fetches the view data accumulated since last fetching, and returns it as a
56 // map from the view name to the view data.
57 virtual ViewDataMap FetchViewData() = 0;
59 // A helper function that gets a row with the input tag values from the view
60 // data. Only used when we know that row must exist because we have seen a row
61 // with the same tag values in a related view data. Several ViewData's are
62 // considered related if their views are based on the measures that are always
63 // recorded at the same time.
64 static double GetRelatedViewDataRowDouble(
65 const ViewDataMap& view_data_map, const char* view_name,
66 size_t view_name_len, const std::vector<std::string>& tag_values);
67 static uint64_t GetRelatedViewDataRowInt(
68 const ViewDataMap& view_data_map, const char* view_name,
69 size_t view_name_len, const std::vector<std::string>& tag_values);
72 const ViewDescriptorMap& view_descriptor_map() const {
73 return view_descriptor_map_;
77 ViewDescriptorMap view_descriptor_map_;
79 ::opencensus::tags::TagKey tag_key_token_;
80 ::opencensus::tags::TagKey tag_key_host_;
81 ::opencensus::tags::TagKey tag_key_user_id_;
82 ::opencensus::tags::TagKey tag_key_status_;
83 ::opencensus::tags::TagKey tag_key_metric_name_;
86 // The default implementation fetches the real stats from Census.
87 class CensusViewProviderDefaultImpl : public CensusViewProvider {
89 CensusViewProviderDefaultImpl();
91 ViewDataMap FetchViewData() override;
94 std::unordered_map<std::string, ::opencensus::stats::View> view_map_;
97 // The interface to get the CPU stats. Abstracted for mocking.
98 class CpuStatsProvider {
100 // The used and total amounts of CPU usage.
101 using CpuStatsSample = std::pair<uint64_t, uint64_t>;
103 virtual ~CpuStatsProvider() = default;
105 // Gets the cumulative used CPU and total CPU resource.
106 virtual CpuStatsSample GetCpuStats() = 0;
109 // The default implementation reads CPU jiffies from the system to calculate CPU
111 class CpuStatsProviderDefaultImpl : public CpuStatsProvider {
113 CpuStatsSample GetCpuStats() override;
116 // Maintains all the load data and load reporting streams.
119 // TODO(juanlishen): Allow config for providers from users.
120 LoadReporter(uint32_t feedback_sample_window_seconds,
121 std::unique_ptr<CensusViewProvider> census_view_provider,
122 std::unique_ptr<CpuStatsProvider> cpu_stats_provider)
123 : feedback_sample_window_seconds_(feedback_sample_window_seconds),
124 census_view_provider_(std::move(census_view_provider)),
125 cpu_stats_provider_(std::move(cpu_stats_provider)) {
126 // Append the initial record so that the next real record can have a base.
127 AppendNewFeedbackRecord(0, 0);
130 // Fetches the latest data from Census and merge it into the data store.
131 // Also adds a new sample to the LB feedback sliding window.
132 // Thread-unsafe. (1). The access to the load data store and feedback records
133 // has locking. (2). The access to the Census view provider and CPU stats
134 // provider lacks locking, but we only access these two members in this method
135 // (in testing, we also access them when setting up expectation). So the
136 // invocations of this method must be serialized.
137 void FetchAndSample();
139 // Generates a report for that host and balancer. The report contains
140 // all the stats data accumulated between the last report (i.e., the last
141 // consumption) and the last fetch from Census (i.e., the last production).
143 ::google::protobuf::RepeatedPtrField<::grpc::lb::v1::Load> GenerateLoads(
144 const std::string& hostname, const std::string& lb_id);
146 // The feedback is calculated from the stats data recorded in the sliding
147 // window. Outdated records are discarded.
149 ::grpc::lb::v1::LoadBalancingFeedback GenerateLoadBalancingFeedback();
151 // Wrapper around LoadDataStore::ReportStreamCreated.
153 void ReportStreamCreated(const std::string& hostname,
154 const std::string& lb_id,
155 const std::string& load_key);
157 // Wrapper around LoadDataStore::ReportStreamClosed.
159 void ReportStreamClosed(const std::string& hostname,
160 const std::string& lb_id);
162 // Generates a unique LB ID of length kLbIdLength. Returns an empty string
163 // upon failure. Thread-safe.
164 std::string GenerateLbId();
166 // Accessors only for testing.
167 CensusViewProvider* census_view_provider() {
168 return census_view_provider_.get();
170 CpuStatsProvider* cpu_stats_provider() { return cpu_stats_provider_.get(); }
173 struct LoadBalancingFeedbackRecord {
174 std::chrono::system_clock::time_point end_time;
180 LoadBalancingFeedbackRecord(
181 const std::chrono::system_clock::time_point& end_time, uint64_t rpcs,
182 uint64_t errors, uint64_t cpu_usage, uint64_t cpu_limit)
183 : end_time(end_time),
186 cpu_usage(cpu_usage),
187 cpu_limit(cpu_limit) {}
190 // Finds the view data about starting call from the view_data_map and merges
191 // the data to the load data store.
192 void ProcessViewDataCallStart(
193 const CensusViewProvider::ViewDataMap& view_data_map);
194 // Finds the view data about ending call from the view_data_map and merges the
195 // data to the load data store.
196 void ProcessViewDataCallEnd(
197 const CensusViewProvider::ViewDataMap& view_data_map);
198 // Finds the view data about the customized call metrics from the
199 // view_data_map and merges the data to the load data store.
200 void ProcessViewDataOtherCallMetrics(
201 const CensusViewProvider::ViewDataMap& view_data_map);
203 bool IsRecordInWindow(const LoadBalancingFeedbackRecord& record,
204 std::chrono::system_clock::time_point now) {
205 return record.end_time > now - feedback_sample_window_seconds_;
208 void AppendNewFeedbackRecord(uint64_t rpcs, uint64_t errors);
210 // Extracts an OrphanedLoadIdentifier from the per-balancer store and attaches
212 void AttachOrphanLoadId(::grpc::lb::v1::Load* load,
213 const PerBalancerStore& per_balancer_store);
215 std::atomic<int64_t> next_lb_id_{0};
216 const std::chrono::seconds feedback_sample_window_seconds_;
217 grpc_core::Mutex feedback_mu_;
218 std::deque<LoadBalancingFeedbackRecord> feedback_records_;
219 // TODO(juanlishen): Lock in finer grain. Locking the whole store may be
221 grpc_core::Mutex store_mu_;
222 LoadDataStore load_data_store_;
223 std::unique_ptr<CensusViewProvider> census_view_provider_;
224 std::unique_ptr<CpuStatsProvider> cpu_stats_provider_;
227 } // namespace load_reporter
230 #endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_LOAD_REPORTER_H