Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / test / cpp / server / load_reporter / load_data_store_test.cc
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/impl/codegen/port_platform.h>
20
21 #include "src/cpp/server/load_reporter/load_data_store.h"
22
23 #include <set>
24 #include <vector>
25
26 #include <gtest/gtest.h>
27
28 #include <grpc/grpc.h>
29
30 #include "test/core/util/port.h"
31 #include "test/core/util/test_config.h"
32
33 namespace grpc {
34 namespace testing {
35 namespace {
36
37 using ::grpc::load_reporter::CallMetricValue;
38 using ::grpc::load_reporter::kInvalidLbId;
39 using ::grpc::load_reporter::LoadDataStore;
40 using ::grpc::load_reporter::LoadRecordKey;
41 using ::grpc::load_reporter::LoadRecordValue;
42 using ::grpc::load_reporter::PerBalancerStore;
43
44 class LoadDataStoreTest : public ::testing::Test {
45  public:
46   LoadDataStoreTest()
47       : kKey1(kLbId1, kLbTag1, kUser1, kClientIp1),
48         kKey2(kLbId2, kLbTag2, kUser2, kClientIp2) {}
49
50   // Check whether per_balancer_stores contains a store which was originally
51   // created for <hostname, lb_id, and load_key>.
52   bool PerBalancerStoresContains(
53       const LoadDataStore& load_data_store,
54       const std::set<PerBalancerStore*>* per_balancer_stores,
55       const std::string& hostname, const std::string& lb_id,
56       const std::string& load_key) {
57     auto original_per_balancer_store =
58         load_data_store.FindPerBalancerStore(hostname, lb_id);
59     EXPECT_NE(original_per_balancer_store, nullptr);
60     EXPECT_EQ(original_per_balancer_store->lb_id(), lb_id);
61     EXPECT_EQ(original_per_balancer_store->load_key(), load_key);
62     for (auto per_balancer_store : *per_balancer_stores) {
63       if (per_balancer_store == original_per_balancer_store) {
64         return true;
65       }
66     }
67     return false;
68   }
69
70   std::string FormatLbId(size_t index) {
71     return "kLbId" + std::to_string(index);
72   }
73
74   const std::string kHostname1 = "kHostname1";
75   const std::string kHostname2 = "kHostname2";
76   const std::string kLbId1 = "kLbId1";
77   const std::string kLbId2 = "kLbId2";
78   const std::string kLbId3 = "kLbId3";
79   const std::string kLbId4 = "kLbId4";
80   const std::string kLoadKey1 = "kLoadKey1";
81   const std::string kLoadKey2 = "kLoadKey2";
82   const std::string kLbTag1 = "kLbTag1";
83   const std::string kLbTag2 = "kLbTag2";
84   const std::string kUser1 = "kUser1";
85   const std::string kUser2 = "kUser2";
86   const std::string kClientIp1 = "00";
87   const std::string kClientIp2 = "02";
88   const std::string kMetric1 = "kMetric1";
89   const std::string kMetric2 = "kMetric2";
90   const LoadRecordKey kKey1;
91   const LoadRecordKey kKey2;
92 };
93
94 using PerBalancerStoreTest = LoadDataStoreTest;
95
96 TEST_F(LoadDataStoreTest, AssignToSelf) {
97   LoadDataStore load_data_store;
98   load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
99   auto assigned_stores = load_data_store.GetAssignedStores(kHostname1, kLbId1);
100   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_stores,
101                                         kHostname1, kLbId1, kLoadKey1));
102 }
103
104 TEST_F(LoadDataStoreTest, ReassignOrphanStores) {
105   LoadDataStore load_data_store;
106   load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
107   load_data_store.ReportStreamCreated(kHostname1, kLbId2, kLoadKey1);
108   load_data_store.ReportStreamCreated(kHostname1, kLbId3, kLoadKey2);
109   load_data_store.ReportStreamCreated(kHostname2, kLbId4, kLoadKey1);
110   // 1. Close the second stream.
111   load_data_store.ReportStreamClosed(kHostname1, kLbId2);
112   auto assigned_to_lb_id_1 =
113       load_data_store.GetAssignedStores(kHostname1, kLbId1);
114   // The orphaned store is re-assigned to kLbId1 with the same load key.
115   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_1,
116                                         kHostname1, kLbId1, kLoadKey1));
117   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_1,
118                                         kHostname1, kLbId2, kLoadKey1));
119   // 2. Close the first stream.
120   load_data_store.ReportStreamClosed(kHostname1, kLbId1);
121   auto assigned_to_lb_id_3 =
122       load_data_store.GetAssignedStores(kHostname1, kLbId3);
123   // The orphaned stores are re-assigned to kLbId3 with the same host,
124   // because there isn't any LB with the same load key.
125   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
126                                         kHostname1, kLbId1, kLoadKey1));
127   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
128                                         kHostname1, kLbId2, kLoadKey1));
129   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
130                                         kHostname1, kLbId3, kLoadKey2));
131   // 3. Close the third stream.
132   load_data_store.ReportStreamClosed(kHostname1, kLbId3);
133   auto assigned_to_lb_id_4 =
134       load_data_store.GetAssignedStores(kHostname2, kLbId4);
135   // There is no active LB for the first host now. kLbId4 is active but
136   // it's for the second host, so it wll NOT adopt the orphaned stores.
137   EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
138                                          kHostname1, kLbId1, kLoadKey1));
139   EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
140                                          kHostname1, kLbId2, kLoadKey1));
141   EXPECT_FALSE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
142                                          kHostname1, kLbId3, kLoadKey2));
143   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_4,
144                                         kHostname2, kLbId4, kLoadKey1));
145 }
146
147 TEST_F(LoadDataStoreTest, OrphanAssignmentIsSticky) {
148   LoadDataStore load_data_store;
149   std::set<std::string> active_lb_ids;
150   size_t num_lb_ids = 1000;
151   for (size_t i = 0; i < num_lb_ids; ++i) {
152     load_data_store.ReportStreamCreated(kHostname1, FormatLbId(i), kLoadKey1);
153     active_lb_ids.insert(FormatLbId(i));
154   }
155   std::string orphaned_lb_id = FormatLbId(std::rand() % num_lb_ids);
156   load_data_store.ReportStreamClosed(kHostname1, orphaned_lb_id);
157   active_lb_ids.erase(orphaned_lb_id);
158   // Find which LB is assigned the orphaned store.
159   std::string assigned_lb_id = "";
160   for (const auto& lb_id : active_lb_ids) {
161     if (PerBalancerStoresContains(
162             load_data_store,
163             load_data_store.GetAssignedStores(kHostname1, lb_id), kHostname1,
164             orphaned_lb_id, kLoadKey1)) {
165       assigned_lb_id = lb_id;
166       break;
167     }
168   }
169   EXPECT_STRNE(assigned_lb_id.c_str(), "");
170   // Close 10 more stream, skipping the assigned_lb_id. The assignment of
171   // orphaned_lb_id shouldn't change.
172   for (size_t _ = 0; _ < 10; ++_) {
173     std::string lb_id_to_close = "";
174     for (const auto& lb_id : active_lb_ids) {
175       if (lb_id != assigned_lb_id) {
176         lb_id_to_close = lb_id;
177         break;
178       }
179     }
180     EXPECT_STRNE(lb_id_to_close.c_str(), "");
181     load_data_store.ReportStreamClosed(kHostname1, lb_id_to_close);
182     active_lb_ids.erase(lb_id_to_close);
183     EXPECT_TRUE(PerBalancerStoresContains(
184         load_data_store,
185         load_data_store.GetAssignedStores(kHostname1, assigned_lb_id),
186         kHostname1, orphaned_lb_id, kLoadKey1));
187   }
188   // Close the assigned_lb_id, orphaned_lb_id will be re-assigned again.
189   load_data_store.ReportStreamClosed(kHostname1, assigned_lb_id);
190   active_lb_ids.erase(assigned_lb_id);
191   size_t orphaned_lb_id_occurences = 0;
192   for (const auto& lb_id : active_lb_ids) {
193     if (PerBalancerStoresContains(
194             load_data_store,
195             load_data_store.GetAssignedStores(kHostname1, lb_id), kHostname1,
196             orphaned_lb_id, kLoadKey1)) {
197       orphaned_lb_id_occurences++;
198     }
199   }
200   EXPECT_EQ(orphaned_lb_id_occurences, 1U);
201 }
202
203 TEST_F(LoadDataStoreTest, HostTemporarilyLoseAllStreams) {
204   LoadDataStore load_data_store;
205   load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
206   load_data_store.ReportStreamCreated(kHostname2, kLbId2, kLoadKey1);
207   auto store_lb_id_1 = load_data_store.FindPerBalancerStore(kHostname1, kLbId1);
208   auto store_invalid_lb_id_1 =
209       load_data_store.FindPerBalancerStore(kHostname1, kInvalidLbId);
210   EXPECT_FALSE(store_lb_id_1->IsSuspended());
211   EXPECT_FALSE(store_invalid_lb_id_1->IsSuspended());
212   // Disconnect all the streams of the first host.
213   load_data_store.ReportStreamClosed(kHostname1, kLbId1);
214   // All the streams of that host are suspended.
215   EXPECT_TRUE(store_lb_id_1->IsSuspended());
216   EXPECT_TRUE(store_invalid_lb_id_1->IsSuspended());
217   // Detailed load data won't be kept when the PerBalancerStore is suspended.
218   store_lb_id_1->MergeRow(kKey1, LoadRecordValue());
219   store_invalid_lb_id_1->MergeRow(kKey1, LoadRecordValue());
220   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 0U);
221   EXPECT_EQ(store_invalid_lb_id_1->load_record_map().size(), 0U);
222   // The stores for different hosts won't mix, even if the load key is the same.
223   auto assigned_to_lb_id_2 =
224       load_data_store.GetAssignedStores(kHostname2, kLbId2);
225   EXPECT_EQ(assigned_to_lb_id_2->size(), 2U);
226   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_2,
227                                         kHostname2, kLbId2, kLoadKey1));
228   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_2,
229                                         kHostname2, kInvalidLbId, ""));
230   // A new stream is created for the first host.
231   load_data_store.ReportStreamCreated(kHostname1, kLbId3, kLoadKey2);
232   // The stores for the first host are resumed.
233   EXPECT_FALSE(store_lb_id_1->IsSuspended());
234   EXPECT_FALSE(store_invalid_lb_id_1->IsSuspended());
235   store_lb_id_1->MergeRow(kKey1, LoadRecordValue());
236   store_invalid_lb_id_1->MergeRow(kKey1, LoadRecordValue());
237   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
238   EXPECT_EQ(store_invalid_lb_id_1->load_record_map().size(), 1U);
239   // The resumed stores are assigned to the new LB.
240   auto assigned_to_lb_id_3 =
241       load_data_store.GetAssignedStores(kHostname1, kLbId3);
242   EXPECT_EQ(assigned_to_lb_id_3->size(), 3U);
243   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
244                                         kHostname1, kLbId1, kLoadKey1));
245   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
246                                         kHostname1, kInvalidLbId, ""));
247   EXPECT_TRUE(PerBalancerStoresContains(load_data_store, assigned_to_lb_id_3,
248                                         kHostname1, kLbId3, kLoadKey2));
249 }
250
251 TEST_F(LoadDataStoreTest, OneStorePerLbId) {
252   LoadDataStore load_data_store;
253   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname1, kLbId1), nullptr);
254   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname1, kInvalidLbId),
255             nullptr);
256   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId2), nullptr);
257   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId3), nullptr);
258   // Create The first stream.
259   load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
260   auto store_lb_id_1 = load_data_store.FindPerBalancerStore(kHostname1, kLbId1);
261   auto store_invalid_lb_id_1 =
262       load_data_store.FindPerBalancerStore(kHostname1, kInvalidLbId);
263   // Two stores will be created: one is for the stream; the other one is for
264   // kInvalidLbId.
265   EXPECT_NE(store_lb_id_1, nullptr);
266   EXPECT_NE(store_invalid_lb_id_1, nullptr);
267   EXPECT_NE(store_lb_id_1, store_invalid_lb_id_1);
268   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId2), nullptr);
269   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId3), nullptr);
270   // Create the second stream.
271   load_data_store.ReportStreamCreated(kHostname2, kLbId3, kLoadKey1);
272   auto store_lb_id_3 = load_data_store.FindPerBalancerStore(kHostname2, kLbId3);
273   auto store_invalid_lb_id_2 =
274       load_data_store.FindPerBalancerStore(kHostname2, kInvalidLbId);
275   EXPECT_NE(store_lb_id_3, nullptr);
276   EXPECT_NE(store_invalid_lb_id_2, nullptr);
277   EXPECT_NE(store_lb_id_3, store_invalid_lb_id_2);
278   // The PerBalancerStores created for different hosts are independent.
279   EXPECT_NE(store_lb_id_3, store_invalid_lb_id_1);
280   EXPECT_NE(store_invalid_lb_id_2, store_invalid_lb_id_1);
281   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId2), nullptr);
282 }
283
284 TEST_F(LoadDataStoreTest, ExactlyOnceAssignment) {
285   LoadDataStore load_data_store;
286   size_t num_create = 100;
287   size_t num_close = 50;
288   for (size_t i = 0; i < num_create; ++i) {
289     load_data_store.ReportStreamCreated(kHostname1, FormatLbId(i), kLoadKey1);
290   }
291   for (size_t i = 0; i < num_close; ++i) {
292     load_data_store.ReportStreamClosed(kHostname1, FormatLbId(i));
293   }
294   std::set<std::string> reported_lb_ids;
295   for (size_t i = num_close; i < num_create; ++i) {
296     for (auto assigned_store :
297          *load_data_store.GetAssignedStores(kHostname1, FormatLbId(i))) {
298       EXPECT_TRUE(reported_lb_ids.insert(assigned_store->lb_id()).second);
299     }
300   }
301   // Add one for kInvalidLbId.
302   EXPECT_EQ(reported_lb_ids.size(), (num_create + 1));
303   EXPECT_NE(reported_lb_ids.find(kInvalidLbId), reported_lb_ids.end());
304 }
305
306 TEST_F(LoadDataStoreTest, UnknownBalancerIdTracking) {
307   LoadDataStore load_data_store;
308   load_data_store.ReportStreamCreated(kHostname1, kLbId1, kLoadKey1);
309   // Merge data for a known LB ID.
310   LoadRecordValue v1(192);
311   load_data_store.MergeRow(kHostname1, kKey1, v1);
312   // Merge data for unknown LB ID.
313   LoadRecordValue v2(23);
314   EXPECT_FALSE(load_data_store.IsTrackedUnknownBalancerId(kLbId2));
315   load_data_store.MergeRow(
316       kHostname1, LoadRecordKey(kLbId2, kLbTag1, kUser1, kClientIp1), v2);
317   EXPECT_TRUE(load_data_store.IsTrackedUnknownBalancerId(kLbId2));
318   LoadRecordValue v3(952);
319   load_data_store.MergeRow(
320       kHostname2, LoadRecordKey(kLbId3, kLbTag1, kUser1, kClientIp1), v3);
321   EXPECT_TRUE(load_data_store.IsTrackedUnknownBalancerId(kLbId3));
322   // The data kept for a known LB ID is correct.
323   auto store_lb_id_1 = load_data_store.FindPerBalancerStore(kHostname1, kLbId1);
324   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
325   EXPECT_EQ(store_lb_id_1->load_record_map().find(kKey1)->second.start_count(),
326             v1.start_count());
327   EXPECT_EQ(store_lb_id_1->GetNumCallsInProgressForReport(), v1.start_count());
328   // No PerBalancerStore created for Unknown LB ID.
329   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname1, kLbId2), nullptr);
330   EXPECT_EQ(load_data_store.FindPerBalancerStore(kHostname2, kLbId3), nullptr);
331   // End all the started RPCs for kLbId1.
332   LoadRecordValue v4(0, v1.start_count());
333   load_data_store.MergeRow(kHostname1, kKey1, v4);
334   EXPECT_EQ(store_lb_id_1->load_record_map().size(), 1U);
335   EXPECT_EQ(store_lb_id_1->load_record_map().find(kKey1)->second.start_count(),
336             v1.start_count());
337   EXPECT_EQ(store_lb_id_1->load_record_map().find(kKey1)->second.ok_count(),
338             v4.ok_count());
339   EXPECT_EQ(store_lb_id_1->GetNumCallsInProgressForReport(), 0U);
340   EXPECT_FALSE(load_data_store.IsTrackedUnknownBalancerId(kLbId1));
341   // End all the started RPCs for kLbId2.
342   LoadRecordValue v5(0, v2.start_count());
343   load_data_store.MergeRow(
344       kHostname1, LoadRecordKey(kLbId2, kLbTag1, kUser1, kClientIp1), v5);
345   EXPECT_FALSE(load_data_store.IsTrackedUnknownBalancerId(kLbId2));
346   // End some of the started RPCs for kLbId3.
347   LoadRecordValue v6(0, v3.start_count() / 2);
348   load_data_store.MergeRow(
349       kHostname2, LoadRecordKey(kLbId3, kLbTag1, kUser1, kClientIp1), v6);
350   EXPECT_TRUE(load_data_store.IsTrackedUnknownBalancerId(kLbId3));
351 }
352
353 TEST_F(PerBalancerStoreTest, Suspend) {
354   PerBalancerStore per_balancer_store(kLbId1, kLoadKey1);
355   EXPECT_FALSE(per_balancer_store.IsSuspended());
356   // Suspend the store.
357   per_balancer_store.Suspend();
358   EXPECT_TRUE(per_balancer_store.IsSuspended());
359   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
360   // Data merged when the store is suspended won't be kept.
361   LoadRecordValue v1(139, 19);
362   per_balancer_store.MergeRow(kKey1, v1);
363   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
364   // Resume the store.
365   per_balancer_store.Resume();
366   EXPECT_FALSE(per_balancer_store.IsSuspended());
367   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
368   // Data merged after the store is resumed will be kept.
369   LoadRecordValue v2(23, 0, 51);
370   per_balancer_store.MergeRow(kKey1, v2);
371   EXPECT_EQ(1U, per_balancer_store.load_record_map().size());
372   // Suspend the store.
373   per_balancer_store.Suspend();
374   EXPECT_TRUE(per_balancer_store.IsSuspended());
375   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
376   // Data merged when the store is suspended won't be kept.
377   LoadRecordValue v3(62, 11);
378   per_balancer_store.MergeRow(kKey1, v3);
379   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
380   // Resume the store.
381   per_balancer_store.Resume();
382   EXPECT_FALSE(per_balancer_store.IsSuspended());
383   EXPECT_EQ(0U, per_balancer_store.load_record_map().size());
384   // Data merged after the store is resumed will be kept.
385   LoadRecordValue v4(225, 98);
386   per_balancer_store.MergeRow(kKey1, v4);
387   EXPECT_EQ(1U, per_balancer_store.load_record_map().size());
388   // In-progress count is always kept.
389   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
390             v1.start_count() - v1.ok_count() + v2.start_count() -
391                 v2.error_count() + v3.start_count() - v3.ok_count() +
392                 v4.start_count() - v4.ok_count());
393 }
394
395 TEST_F(PerBalancerStoreTest, DataAggregation) {
396   PerBalancerStore per_balancer_store(kLbId1, kLoadKey1);
397   // Construct some Values.
398   LoadRecordValue v1(992, 34, 13, 234, 164, 173467);
399   v1.InsertCallMetric(kMetric1, CallMetricValue(3, 2773.2));
400   LoadRecordValue v2(4842, 213, 9, 393, 974, 1345);
401   v2.InsertCallMetric(kMetric1, CallMetricValue(7, 25.234));
402   v2.InsertCallMetric(kMetric2, CallMetricValue(2, 387.08));
403   // v3 doesn't change the number of in-progress RPCs.
404   LoadRecordValue v3(293, 55, 293 - 55, 28764, 5284, 5772);
405   v3.InsertCallMetric(kMetric1, CallMetricValue(61, 3465.0));
406   v3.InsertCallMetric(kMetric2, CallMetricValue(13, 672.0));
407   // The initial state of the store.
408   uint64_t num_calls_in_progress = 0;
409   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
410   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
411             num_calls_in_progress);
412   // Merge v1 and get report of the number of in-progress calls.
413   per_balancer_store.MergeRow(kKey1, v1);
414   EXPECT_TRUE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
415   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
416             num_calls_in_progress +=
417             (v1.start_count() - v1.ok_count() - v1.error_count()));
418   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
419   // Merge v2 and get report of the number of in-progress calls.
420   per_balancer_store.MergeRow(kKey2, v2);
421   EXPECT_TRUE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
422   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
423             num_calls_in_progress +=
424             (v2.start_count() - v2.ok_count() - v2.error_count()));
425   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
426   // Merge v3 and get report of the number of in-progress calls.
427   per_balancer_store.MergeRow(kKey1, v3);
428   EXPECT_FALSE(per_balancer_store.IsNumCallsInProgressChangedSinceLastReport());
429   EXPECT_EQ(per_balancer_store.GetNumCallsInProgressForReport(),
430             num_calls_in_progress);
431   // LoadRecordValue for kKey1 is aggregated correctly.
432   LoadRecordValue value_for_key1 =
433       per_balancer_store.load_record_map().find(kKey1)->second;
434   EXPECT_EQ(value_for_key1.start_count(), v1.start_count() + v3.start_count());
435   EXPECT_EQ(value_for_key1.ok_count(), v1.ok_count() + v3.ok_count());
436   EXPECT_EQ(value_for_key1.error_count(), v1.error_count() + v3.error_count());
437   EXPECT_EQ(value_for_key1.bytes_sent(), v1.bytes_sent() + v3.bytes_sent());
438   EXPECT_EQ(value_for_key1.bytes_recv(), v1.bytes_recv() + v3.bytes_recv());
439   EXPECT_EQ(value_for_key1.latency_ms(), v1.latency_ms() + v3.latency_ms());
440   EXPECT_EQ(value_for_key1.call_metrics().size(), 2U);
441   EXPECT_EQ(value_for_key1.call_metrics().find(kMetric1)->second.num_calls(),
442             v1.call_metrics().find(kMetric1)->second.num_calls() +
443                 v3.call_metrics().find(kMetric1)->second.num_calls());
444   EXPECT_EQ(
445       value_for_key1.call_metrics().find(kMetric1)->second.total_metric_value(),
446       v1.call_metrics().find(kMetric1)->second.total_metric_value() +
447           v3.call_metrics().find(kMetric1)->second.total_metric_value());
448   EXPECT_EQ(value_for_key1.call_metrics().find(kMetric2)->second.num_calls(),
449             v3.call_metrics().find(kMetric2)->second.num_calls());
450   EXPECT_EQ(
451       value_for_key1.call_metrics().find(kMetric2)->second.total_metric_value(),
452       v3.call_metrics().find(kMetric2)->second.total_metric_value());
453   // LoadRecordValue for kKey2 is aggregated (trivially) correctly.
454   LoadRecordValue value_for_key2 =
455       per_balancer_store.load_record_map().find(kKey2)->second;
456   EXPECT_EQ(value_for_key2.start_count(), v2.start_count());
457   EXPECT_EQ(value_for_key2.ok_count(), v2.ok_count());
458   EXPECT_EQ(value_for_key2.error_count(), v2.error_count());
459   EXPECT_EQ(value_for_key2.bytes_sent(), v2.bytes_sent());
460   EXPECT_EQ(value_for_key2.bytes_recv(), v2.bytes_recv());
461   EXPECT_EQ(value_for_key2.latency_ms(), v2.latency_ms());
462   EXPECT_EQ(value_for_key2.call_metrics().size(), 2U);
463   EXPECT_EQ(value_for_key2.call_metrics().find(kMetric1)->second.num_calls(),
464             v2.call_metrics().find(kMetric1)->second.num_calls());
465   EXPECT_EQ(
466       value_for_key2.call_metrics().find(kMetric1)->second.total_metric_value(),
467       v2.call_metrics().find(kMetric1)->second.total_metric_value());
468   EXPECT_EQ(value_for_key2.call_metrics().find(kMetric2)->second.num_calls(),
469             v2.call_metrics().find(kMetric2)->second.num_calls());
470   EXPECT_EQ(
471       value_for_key2.call_metrics().find(kMetric2)->second.total_metric_value(),
472       v2.call_metrics().find(kMetric2)->second.total_metric_value());
473 }
474
475 }  // namespace
476 }  // namespace testing
477 }  // namespace grpc
478
479 int main(int argc, char** argv) {
480   grpc::testing::TestEnvironment env(argc, argv);
481   ::testing::InitGoogleTest(&argc, argv);
482   return RUN_ALL_TESTS();
483 }