3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/impl/codegen/port_platform.h>
21 #include "src/cpp/server/load_reporter/load_data_store.h"
26 #include <gtest/gtest.h>
28 #include <grpc/grpc.h>
30 #include "test/core/util/port.h"
31 #include "test/core/util/test_config.h"
37 using ::grpc::load_reporter::CallMetricValue;
38 using ::grpc::load_reporter::kInvalidLbId;
39 using ::grpc::load_reporter::LoadDataStore;
40 using ::grpc::load_reporter::LoadRecordKey;
41 using ::grpc::load_reporter::LoadRecordValue;
42 using ::grpc::load_reporter::PerBalancerStore;
44 class LoadDataStoreTest : public ::testing::Test {
47 : kKey1(kLbId1, kLbTag1, kUser1, kClientIp1),
48 kKey2(kLbId2, kLbTag2, kUser2, kClientIp2) {}
50 // Check whether per_balancer_stores contains a store which was originally
51 // created for <hostname, lb_id, and load_key>.
52 bool PerBalancerStoresContains(
53 const LoadDataStore& load_data_store,
54 const std::set<PerBalancerStore*>* per_balancer_stores,
55 const std::string& hostname, const std::string& lb_id,
56 const std::string& load_key) {
57 auto original_per_balancer_store =
58 load_data_store.FindPerBalancerStore(hostname, lb_id);
59 EXPECT_NE(original_per_balancer_store, nullptr);
60 EXPECT_EQ(original_per_balancer_store->lb_id(), lb_id);
61 EXPECT_EQ(original_per_balancer_store->load_key(), load_key);
62 for (auto per_balancer_store : *per_balancer_stores) {
63 if (per_balancer_store == original_per_balancer_store) {
70 std::string FormatLbId(size_t index) {
71 return "kLbId" + std::to_string(index);
74 const std::string kHostname1 = "kHostname1";
75 const std::string kHostname2 = "kHostname2";
76 const std::string kLbId1 = "kLbId1";
77 const std::string kLbId2 = "kLbId2";
78 const std::string kLbId3 = "kLbId3";
79 const std::string kLbId4 = "kLbId4";
80 const std::string kLoadKey1 = "kLoadKey1";
81 const std::string kLoadKey2 = "kLoadKey2";
82 const std::string kLbTag1 = "kLbTag1";
83 const std::string kLbTag2 = "kLbTag2";
84 const std::string kUser1 = "kUser1";
85 const std::string kUser2 = "kUser2";
86 const std::string kClientIp1 = "00";
87 const std::string kClientIp2 = "02";
88 const std::string kMetric1 = "kMetric1";
89 const std::string kMetric2 = "kMetric2";
90 const LoadRecordKey kKey1;
91 const LoadRecordKey kKey2;
94 using PerBalancerStoreTest = LoadDataStoreTest;
96 TEST_F(LoadDataStoreTest, AssignToSelf) {
97 LoadDataStore load_data_store;
98 load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
99 auto assigned_stores = load_data_store.GetAssignedStores(kHostname1, kLbId1);
100 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_stores,
101 kHostname1, kLbId1, kLoadKey1));
104 TEST_F(LoadDataStoreTest, ReassignOrphanStores) {
105 LoadDataStore load_data_store;
106 load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
107 load_data_store.ReportStreamCreated(kHostname1, kLbId2, kLoadKey1);
108 load_data_store.ReportStreamCreated(kHostname1, kLbId3, kLoadKey2);
109 load_data_store.ReportStreamCreated(kHostname2, kLbId4, kLoadKey1);
110 // 1. Close the second stream.
111 load_data_store.ReportStreamClosed(kHostname1, kLbId2);
112 auto assigned_to_lb_id_1 =
113 load_data_store.GetAssignedStores(kHostname1, kLbId1);
114 // The orphaned store is re-assigned to kLbId1 with the same load key.
115 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_1,
116 kHostname1, kLbId1, kLoadKey1));
117 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_1,
118 kHostname1, kLbId2, kLoadKey1));
119 // 2. Close the first stream.
120 load_data_store.ReportStreamClosed(kHostname1, kLbId1);
121 auto assigned_to_lb_id_3 =
122 load_data_store.GetAssignedStores(kHostname1, kLbId3);
123 // The orphaned stores are re-assigned to kLbId3 with the same host,
124 // because there isn't any LB with the same load key.
125 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
126 kHostname1, kLbId1, kLoadKey1));
127 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
128 kHostname1, kLbId2, kLoadKey1));
129 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
130 kHostname1, kLbId3, kLoadKey2));
131 // 3. Close the third stream.
132 load_data_store.ReportStreamClosed(kHostname1, kLbId3);
133 auto assigned_to_lb_id_4 =
134 load_data_store.GetAssignedStores(kHostname2, kLbId4);
135 // There is no active LB for the first host now. kLbId4 is active but
136 // it's for the second host, so it wll NOT adopt the orphaned stores.
137 EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
138 kHostname1, kLbId1, kLoadKey1));
139 EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
140 kHostname1, kLbId2, kLoadKey1));
141 EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
142 kHostname1, kLbId3, kLoadKey2));
143 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
144 kHostname2, kLbId4, kLoadKey1));
147 TEST_F(LoadDataStoreTest, OrphanAssignmentIsSticky) {
148 LoadDataStore load_data_store;
149 std::set<std::string> active_lb_ids;
150 size_t num_lb_ids = 1000;
151 for (size_t i = 0; i < num_lb_ids; ++i) {
152 load_data_store.ReportStreamCreated(kHostname1, FormatLbId(i), kLoadKey1);
153 active_lb_ids.insert(FormatLbId(i));
155 std::string orphaned_lb_id = FormatLbId(std::rand() % num_lb_ids);
156 load_data_store.ReportStreamClosed(kHostname1, orphaned_lb_id);
157 active_lb_ids.erase(orphaned_lb_id);
158 // Find which LB is assigned the orphaned store.
159 std::string assigned_lb_id = "";
160 for (const auto& lb_id : active_lb_ids) {
161 if (PerBalancerStoresContains(
163 load_data_store.GetAssignedStores(kHostname1, lb_id), kHostname1,
164 orphaned_lb_id, kLoadKey1)) {
165 assigned_lb_id = lb_id;
169 EXPECT_STRNE(assigned_lb_id.c_str(), "");
170 // Close 10 more stream, skipping the assigned_lb_id. The assignment of
171 // orphaned_lb_id shouldn't change.
172 for (size_t _ = 0; _ < 10; ++_) {
173 std::string lb_id_to_close = "";
174 for (const auto& lb_id : active_lb_ids) {
175 if (lb_id != assigned_lb_id) {
176 lb_id_to_close = lb_id;
180 EXPECT_STRNE(lb_id_to_close.c_str(), "");
181 load_data_store.ReportStreamClosed(kHostname1, lb_id_to_close);
182 active_lb_ids.erase(lb_id_to_close);
183 EXPECT_TRUE(PerBalancerStoresContains(
185 load_data_store.GetAssignedStores(kHostname1, assigned_lb_id),
186 kHostname1, orphaned_lb_id, kLoadKey1));
188 // Close the assigned_lb_id, orphaned_lb_id will be re-assigned again.
189 load_data_store.ReportStreamClosed(kHostname1, assigned_lb_id);
190 active_lb_ids.erase(assigned_lb_id);
191 size_t orphaned_lb_id_occurences = 0;
192 for (const auto& lb_id : active_lb_ids) {
193 if (PerBalancerStoresContains(
195 load_data_store.GetAssignedStores(kHostname1, lb_id), kHostname1,
196 orphaned_lb_id, kLoadKey1)) {
197 orphaned_lb_id_occurences++;
200 EXPECT_EQ(orphaned_lb_id_occurences, 1U);
203 TEST_F(LoadDataStoreTest, HostTemporarilyLoseAllStreams) {
204 LoadDataStore load_data_store;
205 load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
206 load_data_store.ReportStreamCreated(kHostname2, kLbId2, kLoadKey1);
207 auto store_lb_id_1 = load_data_store.FindPerBalancerStore(kHostname1, kLbId1);
208 auto store_invalid_lb_id_1 =
209 load_data_store.FindPerBalancerStore(kHostname1, kInvalidLbId);
210 EXPECT_FALSE(store_lb_id_1->IsSuspended());
211 EXPECT_FALSE(store_invalid_lb_id_1->IsSuspended());
212 // Disconnect all the streams of the first host.
213 load_data_store.ReportStreamClosed(kHostname1, kLbId1);
214 // All the streams of that host are suspended.
215 EXPECT_TRUE(store_lb_id_1->IsSuspended());
216 EXPECT_TRUE(store_invalid_lb_id_1->IsSuspended());
217 // Detailed load data won't be kept when the PerBalancerStore is suspended.
218 store_lb_id_1->MergeRow(kKey1, LoadRecordValue());
219 store_invalid_lb_id_1->MergeRow(kKey1, LoadRecordValue());
220 EXPECT_EQ(store_lb_id_1->load_record_map().size(), 0U);
221 EXPECT_EQ(store_invalid_lb_id_1->load_record_map().size(), 0U);
222 // The stores for different hosts won't mix, even if the load key is the same.
223 auto assigned_to_lb_id_2 =
224 load_data_store.GetAssignedStores(kHostname2, kLbId2);
225 EXPECT_EQ(assigned_to_lb_id_2->size(), 2U);
226 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_2,
227 kHostname2, kLbId2, kLoadKey1));
228 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_2,
229 kHostname2, kInvalidLbId, ""));
230 // A new stream is created for the first host.
231 load_data_store.ReportStreamCreated(kHostname1, kLbId3, kLoadKey2);
232 // The stores for the first host are resumed.
233 EXPECT_FALSE(store_lb_id_1->IsSuspended());
234 EXPECT_FALSE(store_invalid_lb_id_1->IsSuspended());
235 store_lb_id_1->MergeRow(kKey1, LoadRecordValue());
236 store_invalid_lb_id_1->MergeRow(kKey1, LoadRecordValue());
237 EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
238 EXPECT_EQ(store_invalid_lb_id_1->load_record_map().size(), 1U);
239 // The resumed stores are assigned to the new LB.
240 auto assigned_to_lb_id_3 =
241 load_data_store.GetAssignedStores(kHostname1, kLbId3);
242 EXPECT_EQ(assigned_to_lb_id_3->size(), 3U);
243 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
244 kHostname1, kLbId1, kLoadKey1));
245 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
246 kHostname1, kInvalidLbId, ""));
247 EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
248 kHostname1, kLbId3, kLoadKey2));
251 TEST_F(LoadDataStoreTest, OneStorePerLbId) {
252 LoadDataStore load_data_store;
253 EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname1, kLbId1), nullptr);
254 EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname1, kInvalidLbId),
256 EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId2), nullptr);
257 EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId3), nullptr);
258 // Create The first stream.
259 load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
260 auto store_lb_id_1 = load_data_store.FindPerBalancerStore(kHostname1, kLbId1);
261 auto store_invalid_lb_id_1 =
262 load_data_store.FindPerBalancerStore(kHostname1, kInvalidLbId);
263 // Two stores will be created: one is for the stream; the other one is for
265 EXPECT_NE(store_lb_id_1, nullptr);
266 EXPECT_NE(store_invalid_lb_id_1, nullptr);
267 EXPECT_NE(store_lb_id_1, store_invalid_lb_id_1);
268 EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId2), nullptr);
269 EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId3), nullptr);
270 // Create the second stream.
271 load_data_store.ReportStreamCreated(kHostname2, kLbId3, kLoadKey1);
272 auto store_lb_id_3 = load_data_store.FindPerBalancerStore(kHostname2, kLbId3);
273 auto store_invalid_lb_id_2 =
274 load_data_store.FindPerBalancerStore(kHostname2, kInvalidLbId);
275 EXPECT_NE(store_lb_id_3, nullptr);
276 EXPECT_NE(store_invalid_lb_id_2, nullptr);
277 EXPECT_NE(store_lb_id_3, store_invalid_lb_id_2);
278 // The PerBalancerStores created for different hosts are independent.
279 EXPECT_NE(store_lb_id_3, store_invalid_lb_id_1);
280 EXPECT_NE(store_invalid_lb_id_2, store_invalid_lb_id_1);
281 EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId2), nullptr);
284 TEST_F(LoadDataStoreTest, ExactlyOnceAssignment) {
285 LoadDataStore load_data_store;
286 size_t num_create = 100;
287 size_t num_close = 50;
288 for (size_t i = 0; i < num_create; ++i) {
289 load_data_store.ReportStreamCreated(kHostname1, FormatLbId(i), kLoadKey1);
291 for (size_t i = 0; i < num_close; ++i) {
292 load_data_store.ReportStreamClosed(kHostname1, FormatLbId(i));
294 std::set<std::string> reported_lb_ids;
295 for (size_t i = num_close; i < num_create; ++i) {
296 for (auto assigned_store :
297 *load_data_store.GetAssignedStores(kHostname1, FormatLbId(i))) {
298 EXPECT_TRUE(reported_lb_ids.insert(assigned_store->lb_id()).second);
301 // Add one for kInvalidLbId.
302 EXPECT_EQ(reported_lb_ids.size(), (num_create + 1));
303 EXPECT_NE(reported_lb_ids.find(kInvalidLbId), reported_lb_ids.end());
306 TEST_F(LoadDataStoreTest, UnknownBalancerIdTracking) {
307 LoadDataStore load_data_store;
308 load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
309 // Merge data for a known LB ID.
310 LoadRecordValue v1(192);
311 load_data_store.MergeRow(kHostname1, kKey1, v1);
312 // Merge data for unknown LB ID.
313 LoadRecordValue v2(23);
314 EXPECT_FALSE(load_data_store.IsTrackedUnknownBalancerId(kLbId2));
315 load_data_store.MergeRow(
316 kHostname1, LoadRecordKey(kLbId2, kLbTag1, kUser1, kClientIp1), v2);
317 EXPECT_TRUE(load_data_store.IsTrackedUnknownBalancerId(kLbId2));
318 LoadRecordValue v3(952);
319 load_data_store.MergeRow(
320 kHostname2, LoadRecordKey(kLbId3, kLbTag1, kUser1, kClientIp1), v3);
321 EXPECT_TRUE(load_data_store.IsTrackedUnknownBalancerId(kLbId3));
322 // The data kept for a known LB ID is correct.
323 auto store_lb_id_1 = load_data_store.FindPerBalancerStore(kHostname1, kLbId1);
324 EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
325 EXPECT_EQ(store_lb_id_1->load_record_map().find(kKey1)->second.start_count(),
327 EXPECT_EQ(store_lb_id_1->GetNumCallsInProgressForReport(), v1.start_count());
328 // No PerBalancerStore created for Unknown LB ID.
329 EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname1, kLbId2), nullptr);
330 EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId3), nullptr);
331 // End all the started RPCs for kLbId1.
332 LoadRecordValue v4(0, v1.start_count());
333 load_data_store.MergeRow(kHostname1, kKey1, v4);
334 EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
335 EXPECT_EQ(store_lb_id_1->load_record_map().find(kKey1)->second.start_count(),
337 EXPECT_EQ(store_lb_id_1->load_record_map().find(kKey1)->second.ok_count(),
339 EXPECT_EQ(store_lb_id_1->GetNumCallsInProgressForReport(), 0U);
340 EXPECT_FALSE(load_data_store.IsTrackedUnknownBalancerId(kLbId1));
341 // End all the started RPCs for kLbId2.
342 LoadRecordValue v5(0, v2.start_count());
343 load_data_store.MergeRow(
344 kHostname1, LoadRecordKey(kLbId2, kLbTag1, kUser1, kClientIp1), v5);
345 EXPECT_FALSE(load_data_store.IsTrackedUnknownBalancerId(kLbId2));
346 // End some of the started RPCs for kLbId3.
347 LoadRecordValue v6(0, v3.start_count() / 2);
348 load_data_store.MergeRow(
349 kHostname2, LoadRecordKey(kLbId3, kLbTag1, kUser1, kClientIp1), v6);
350 EXPECT_TRUE(load_data_store.IsTrackedUnknownBalancerId(kLbId3));
353 TEST_F(PerBalancerStoreTest, Suspend) {
354 PerBalancerStore per_balancer_store(kLbId1, kLoadKey1);
355 EXPECT_FALSE(per_balancer_store.IsSuspended());
356 // Suspend the store.
357 per_balancer_store.Suspend();
358 EXPECT_TRUE(per_balancer_store.IsSuspended());
359 EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
360 // Data merged when the store is suspended won't be kept.
361 LoadRecordValue v1(139, 19);
362 per_balancer_store.MergeRow(kKey1, v1);
363 EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
365 per_balancer_store.Resume();
366 EXPECT_FALSE(per_balancer_store.IsSuspended());
367 EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
368 // Data merged after the store is resumed will be kept.
369 LoadRecordValue v2(23, 0, 51);
370 per_balancer_store.MergeRow(kKey1, v2);
371 EXPECT_EQ(1U, per_balancer_store.load_record_map().size());
372 // Suspend the store.
373 per_balancer_store.Suspend();
374 EXPECT_TRUE(per_balancer_store.IsSuspended());
375 EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
376 // Data merged when the store is suspended won't be kept.
377 LoadRecordValue v3(62, 11);
378 per_balancer_store.MergeRow(kKey1, v3);
379 EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
381 per_balancer_store.Resume();
382 EXPECT_FALSE(per_balancer_store.IsSuspended());
383 EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
384 // Data merged after the store is resumed will be kept.
385 LoadRecordValue v4(225, 98);
386 per_balancer_store.MergeRow(kKey1, v4);
387 EXPECT_EQ(1U, per_balancer_store.load_record_map().size());
388 // In-progress count is always kept.
389 EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
390 v1.start_count() - v1.ok_count() + v2.start_count() -
391 v2.error_count() + v3.start_count() - v3.ok_count() +
392 v4.start_count() - v4.ok_count());
395 TEST_F(PerBalancerStoreTest, DataAggregation) {
396 PerBalancerStore per_balancer_store(kLbId1, kLoadKey1);
397 // Construct some Values.
398 LoadRecordValue v1(992, 34, 13, 234, 164, 173467);
399 v1.InsertCallMetric(kMetric1, CallMetricValue(3, 2773.2));
400 LoadRecordValue v2(4842, 213, 9, 393, 974, 1345);
401 v2.InsertCallMetric(kMetric1, CallMetricValue(7, 25.234));
402 v2.InsertCallMetric(kMetric2, CallMetricValue(2, 387.08));
403 // v3 doesn't change the number of in-progress RPCs.
404 LoadRecordValue v3(293, 55, 293 - 55, 28764, 5284, 5772);
405 v3.InsertCallMetric(kMetric1, CallMetricValue(61, 3465.0));
406 v3.InsertCallMetric(kMetric2, CallMetricValue(13, 672.0));
407 // The initial state of the store.
408 uint64_t num_calls_in_progress = 0;
409 EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
410 EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
411 num_calls_in_progress);
412 // Merge v1 and get report of the number of in-progress calls.
413 per_balancer_store.MergeRow(kKey1, v1);
414 EXPECT_TRUE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
415 EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
416 num_calls_in_progress +=
417 (v1.start_count() - v1.ok_count() - v1.error_count()));
418 EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
419 // Merge v2 and get report of the number of in-progress calls.
420 per_balancer_store.MergeRow(kKey2, v2);
421 EXPECT_TRUE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
422 EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
423 num_calls_in_progress +=
424 (v2.start_count() - v2.ok_count() - v2.error_count()));
425 EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
426 // Merge v3 and get report of the number of in-progress calls.
427 per_balancer_store.MergeRow(kKey1, v3);
428 EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
429 EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
430 num_calls_in_progress);
431 // LoadRecordValue for kKey1 is aggregated correctly.
432 LoadRecordValue value_for_key1 =
433 per_balancer_store.load_record_map().find(kKey1)->second;
434 EXPECT_EQ(value_for_key1.start_count(), v1.start_count() + v3.start_count());
435 EXPECT_EQ(value_for_key1.ok_count(), v1.ok_count() + v3.ok_count());
436 EXPECT_EQ(value_for_key1.error_count(), v1.error_count() + v3.error_count());
437 EXPECT_EQ(value_for_key1.bytes_sent(), v1.bytes_sent() + v3.bytes_sent());
438 EXPECT_EQ(value_for_key1.bytes_recv(), v1.bytes_recv() + v3.bytes_recv());
439 EXPECT_EQ(value_for_key1.latency_ms(), v1.latency_ms() + v3.latency_ms());
440 EXPECT_EQ(value_for_key1.call_metrics().size(), 2U);
441 EXPECT_EQ(value_for_key1.call_metrics().find(kMetric1)->second.num_calls(),
442 v1.call_metrics().find(kMetric1)->second.num_calls() +
443 v3.call_metrics().find(kMetric1)->second.num_calls());
445 value_for_key1.call_metrics().find(kMetric1)->second.total_metric_value(),
446 v1.call_metrics().find(kMetric1)->second.total_metric_value() +
447 v3.call_metrics().find(kMetric1)->second.total_metric_value());
448 EXPECT_EQ(value_for_key1.call_metrics().find(kMetric2)->second.num_calls(),
449 v3.call_metrics().find(kMetric2)->second.num_calls());
451 value_for_key1.call_metrics().find(kMetric2)->second.total_metric_value(),
452 v3.call_metrics().find(kMetric2)->second.total_metric_value());
453 // LoadRecordValue for kKey2 is aggregated (trivially) correctly.
454 LoadRecordValue value_for_key2 =
455 per_balancer_store.load_record_map().find(kKey2)->second;
456 EXPECT_EQ(value_for_key2.start_count(), v2.start_count());
457 EXPECT_EQ(value_for_key2.ok_count(), v2.ok_count());
458 EXPECT_EQ(value_for_key2.error_count(), v2.error_count());
459 EXPECT_EQ(value_for_key2.bytes_sent(), v2.bytes_sent());
460 EXPECT_EQ(value_for_key2.bytes_recv(), v2.bytes_recv());
461 EXPECT_EQ(value_for_key2.latency_ms(), v2.latency_ms());
462 EXPECT_EQ(value_for_key2.call_metrics().size(), 2U);
463 EXPECT_EQ(value_for_key2.call_metrics().find(kMetric1)->second.num_calls(),
464 v2.call_metrics().find(kMetric1)->second.num_calls());
466 value_for_key2.call_metrics().find(kMetric1)->second.total_metric_value(),
467 v2.call_metrics().find(kMetric1)->second.total_metric_value());
468 EXPECT_EQ(value_for_key2.call_metrics().find(kMetric2)->second.num_calls(),
469 v2.call_metrics().find(kMetric2)->second.num_calls());
471 value_for_key2.call_metrics().find(kMetric2)->second.total_metric_value(),
472 v2.call_metrics().find(kMetric2)->second.total_metric_value());
476 } // namespace testing
479 int main(int argc, char** argv) {
480 grpc::testing::TestEnvironment env(argc, argv);
481 ::testing::InitGoogleTest(&argc, argv);
482 return RUN_ALL_TESTS();