Imported Upstream version 1.35.0
[platform/upstream/grpc.git] / test / cpp / interop / xds_interop_client.cc
1 /*
2  *
3  * Copyright 2020 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpcpp/grpcpp.h>
20 #include <grpcpp/server.h>
21 #include <grpcpp/server_builder.h>
22 #include <grpcpp/server_context.h>
23
24 #include <atomic>
25 #include <chrono>
26 #include <condition_variable>
27 #include <deque>
28 #include <map>
29 #include <mutex>
30 #include <set>
31 #include <sstream>
32 #include <string>
33 #include <thread>
34 #include <vector>
35
36 #include "absl/algorithm/container.h"
37 #include "absl/flags/flag.h"
38 #include "absl/strings/str_split.h"
39 #include "src/core/lib/channel/status_util.h"
40 #include "src/core/lib/gpr/env.h"
41 #include "src/proto/grpc/testing/empty.pb.h"
42 #include "src/proto/grpc/testing/messages.pb.h"
43 #include "src/proto/grpc/testing/test.grpc.pb.h"
44 #include "test/core/util/test_config.h"
45 #include "test/cpp/util/test_config.h"
46
47 ABSL_FLAG(bool, fail_on_failed_rpc, false,
48           "Fail client if any RPCs fail after first successful RPC.");
49 ABSL_FLAG(int32_t, num_channels, 1, "Number of channels.");
50 ABSL_FLAG(bool, print_response, false, "Write RPC response to stdout.");
51 ABSL_FLAG(int32_t, qps, 1, "Qps per channel.");
52 // TODO(Capstan): Consider using absl::Duration
53 ABSL_FLAG(int32_t, rpc_timeout_sec, 30, "Per RPC timeout seconds.");
54 ABSL_FLAG(std::string, server, "localhost:50051", "Address of server.");
55 ABSL_FLAG(int32_t, stats_port, 50052,
56           "Port to expose peer distribution stats service.");
57 ABSL_FLAG(std::string, rpc, "UnaryCall",
58           "a comma separated list of rpc methods.");
59 ABSL_FLAG(std::string, metadata, "", "metadata to send with the RPC.");
60 ABSL_FLAG(std::string, expect_status, "OK",
61           "RPC status for the test RPC to be considered successful");
62
63 using grpc::Channel;
64 using grpc::ClientAsyncResponseReader;
65 using grpc::ClientContext;
66 using grpc::CompletionQueue;
67 using grpc::Server;
68 using grpc::ServerBuilder;
69 using grpc::ServerContext;
70 using grpc::Status;
71 using grpc::testing::ClientConfigureRequest;
72 using grpc::testing::ClientConfigureRequest_RpcType_Name;
73 using grpc::testing::ClientConfigureResponse;
74 using grpc::testing::Empty;
75 using grpc::testing::LoadBalancerAccumulatedStatsRequest;
76 using grpc::testing::LoadBalancerAccumulatedStatsResponse;
77 using grpc::testing::LoadBalancerStatsRequest;
78 using grpc::testing::LoadBalancerStatsResponse;
79 using grpc::testing::LoadBalancerStatsService;
80 using grpc::testing::SimpleRequest;
81 using grpc::testing::SimpleResponse;
82 using grpc::testing::TestService;
83 using grpc::testing::XdsUpdateClientConfigureService;
84
85 class XdsStatsWatcher;
86
87 struct StatsWatchers {
88   // Unique ID for each outgoing RPC
89   int global_request_id = 0;
90   // Unique ID for each outgoing RPC by RPC method type
91   std::map<int, int> global_request_id_by_type;
92   // Stores a set of watchers that should be notified upon outgoing RPC
93   // completion
94   std::set<XdsStatsWatcher*> watchers;
95   // Global watcher for accumululated stats.
96   XdsStatsWatcher* global_watcher;
97   // Mutex for global_request_id and watchers
98   std::mutex mu;
99 };
100 // Whether at least one RPC has succeeded, indicating xDS resolution completed.
101 std::atomic<bool> one_rpc_succeeded(false);
102 // RPC configuration detailing how RPC should be sent.
103 struct RpcConfig {
104   ClientConfigureRequest::RpcType type;
105   std::vector<std::pair<std::string, std::string>> metadata;
106 };
107 struct RpcConfigurationsQueue {
108   // A queue of RPC configurations detailing how RPCs should be sent.
109   std::deque<std::vector<RpcConfig>> rpc_configs_queue;
110   // Mutex for rpc_configs_queue
111   std::mutex mu_rpc_configs_queue;
112 };
113
114 /** Records the remote peer distribution for a given range of RPCs. */
115 class XdsStatsWatcher {
116  public:
117   XdsStatsWatcher(int start_id, int end_id)
118       : start_id_(start_id), end_id_(end_id), rpcs_needed_(end_id - start_id) {}
119
120   // Upon the completion of an RPC, we will look at the request_id, the
121   // rpc_type, and the peer the RPC was sent to in order to count
122   // this RPC into the right stats bin.
123   void RpcCompleted(int request_id,
124                     const ClientConfigureRequest::RpcType rpc_type,
125                     const std::string& peer) {
126     // We count RPCs for global watcher or if the request_id falls into the
127     // watcher's interested range of request ids.
128     if ((start_id_ == 0 && end_id_ == 0) ||
129         (start_id_ <= request_id && request_id < end_id_)) {
130       {
131         std::lock_guard<std::mutex> lock(m_);
132         if (peer.empty()) {
133           no_remote_peer_++;
134           ++no_remote_peer_by_type_[rpc_type];
135         } else {
136           // RPC is counted into both per-peer bin and per-method-per-peer bin.
137           rpcs_by_peer_[peer]++;
138           rpcs_by_type_[rpc_type][peer]++;
139         }
140         rpcs_needed_--;
141       }
142       cv_.notify_one();
143     }
144   }
145
146   void WaitForRpcStatsResponse(LoadBalancerStatsResponse* response,
147                                int timeout_sec) {
148     {
149       std::unique_lock<std::mutex> lock(m_);
150       cv_.wait_for(lock, std::chrono::seconds(timeout_sec),
151                    [this] { return rpcs_needed_ == 0; });
152       response->mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(),
153                                                rpcs_by_peer_.end());
154       auto& response_rpcs_by_method = *response->mutable_rpcs_by_method();
155       for (const auto& rpc_by_type : rpcs_by_type_) {
156         std::string method_name;
157         if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) {
158           method_name = "EmptyCall";
159         } else if (rpc_by_type.first == ClientConfigureRequest::UNARY_CALL) {
160           method_name = "UnaryCall";
161         } else {
162           GPR_ASSERT(0);
163         }
164         // TODO(@donnadionne): When the test runner changes to accept EMPTY_CALL
165         // and UNARY_CALL we will just use the name of the enum instead of the
166         // method_name variable.
167         auto& response_rpc_by_method = response_rpcs_by_method[method_name];
168         auto& response_rpcs_by_peer =
169             *response_rpc_by_method.mutable_rpcs_by_peer();
170         for (const auto& rpc_by_peer : rpc_by_type.second) {
171           auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first];
172           response_rpc_by_peer = rpc_by_peer.second;
173         }
174       }
175       response->set_num_failures(no_remote_peer_ + rpcs_needed_);
176     }
177   }
178
179   void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response,
180                           StatsWatchers* stats_watchers) {
181     std::unique_lock<std::mutex> lock(m_);
182     auto& response_rpcs_started_by_method =
183         *response->mutable_num_rpcs_started_by_method();
184     auto& response_rpcs_succeeded_by_method =
185         *response->mutable_num_rpcs_succeeded_by_method();
186     auto& response_rpcs_failed_by_method =
187         *response->mutable_num_rpcs_failed_by_method();
188     for (const auto& rpc_by_type : rpcs_by_type_) {
189       auto total_succeeded = 0;
190       for (const auto& rpc_by_peer : rpc_by_type.second) {
191         total_succeeded += rpc_by_peer.second;
192       }
193       response_rpcs_succeeded_by_method[ClientConfigureRequest_RpcType_Name(
194           rpc_by_type.first)] = total_succeeded;
195       response_rpcs_started_by_method[ClientConfigureRequest_RpcType_Name(
196           rpc_by_type.first)] =
197           stats_watchers->global_request_id_by_type[rpc_by_type.first];
198       response_rpcs_failed_by_method[ClientConfigureRequest_RpcType_Name(
199           rpc_by_type.first)] = no_remote_peer_by_type_[rpc_by_type.first];
200     }
201   }
202
203  private:
204   int start_id_;
205   int end_id_;
206   int rpcs_needed_;
207   int no_remote_peer_ = 0;
208   std::map<int, int> no_remote_peer_by_type_;
209   // A map of stats keyed by peer name.
210   std::map<std::string, int> rpcs_by_peer_;
211   // A two-level map of stats keyed at top level by RPC method and second level
212   // by peer name.
213   std::map<int, std::map<std::string, int>> rpcs_by_type_;
214   std::mutex m_;
215   std::condition_variable cv_;
216 };
217
218 class TestClient {
219  public:
220   TestClient(const std::shared_ptr<Channel>& channel,
221              StatsWatchers* stats_watchers)
222       : stub_(TestService::NewStub(channel)), stats_watchers_(stats_watchers) {}
223
224   void AsyncUnaryCall(
225       std::vector<std::pair<std::string, std::string>> metadata) {
226     SimpleResponse response;
227     int saved_request_id;
228     {
229       std::lock_guard<std::mutex> lock(stats_watchers_->mu);
230       saved_request_id = ++stats_watchers_->global_request_id;
231       ++stats_watchers_
232             ->global_request_id_by_type[ClientConfigureRequest::UNARY_CALL];
233     }
234     std::chrono::system_clock::time_point deadline =
235         std::chrono::system_clock::now() +
236         std::chrono::seconds(absl::GetFlag(FLAGS_rpc_timeout_sec));
237     AsyncClientCall* call = new AsyncClientCall;
238     for (const auto& data : metadata) {
239       call->context.AddMetadata(data.first, data.second);
240       // TODO(@donnadionne): move deadline to separate proto.
241       if (data.first == "rpc-behavior" && data.second == "keep-open") {
242         deadline =
243             std::chrono::system_clock::now() + std::chrono::seconds(INT_MAX);
244       }
245     }
246     call->context.set_deadline(deadline);
247     call->saved_request_id = saved_request_id;
248     call->rpc_type = ClientConfigureRequest::UNARY_CALL;
249     call->simple_response_reader = stub_->PrepareAsyncUnaryCall(
250         &call->context, SimpleRequest::default_instance(), &cq_);
251     call->simple_response_reader->StartCall();
252     call->simple_response_reader->Finish(&call->simple_response, &call->status,
253                                          call);
254   }
255
256   void AsyncEmptyCall(
257       std::vector<std::pair<std::string, std::string>> metadata) {
258     Empty response;
259     int saved_request_id;
260     {
261       std::lock_guard<std::mutex> lock(stats_watchers_->mu);
262       saved_request_id = ++stats_watchers_->global_request_id;
263       ++stats_watchers_
264             ->global_request_id_by_type[ClientConfigureRequest::EMPTY_CALL];
265     }
266     std::chrono::system_clock::time_point deadline =
267         std::chrono::system_clock::now() +
268         std::chrono::seconds(absl::GetFlag(FLAGS_rpc_timeout_sec));
269     AsyncClientCall* call = new AsyncClientCall;
270     for (const auto& data : metadata) {
271       call->context.AddMetadata(data.first, data.second);
272       // TODO(@donnadionne): move deadline to separate proto.
273       if (data.first == "rpc-behavior" && data.second == "keep-open") {
274         deadline =
275             std::chrono::system_clock::now() + std::chrono::seconds(INT_MAX);
276       }
277     }
278     call->context.set_deadline(deadline);
279     call->saved_request_id = saved_request_id;
280     call->rpc_type = ClientConfigureRequest::EMPTY_CALL;
281     call->empty_response_reader = stub_->PrepareAsyncEmptyCall(
282         &call->context, Empty::default_instance(), &cq_);
283     call->empty_response_reader->StartCall();
284     call->empty_response_reader->Finish(&call->empty_response, &call->status,
285                                         call);
286   }
287
288   void AsyncCompleteRpc() {
289     void* got_tag;
290     bool ok = false;
291     while (cq_.Next(&got_tag, &ok)) {
292       AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
293       GPR_ASSERT(ok);
294       {
295         std::lock_guard<std::mutex> lock(stats_watchers_->mu);
296         auto server_initial_metadata = call->context.GetServerInitialMetadata();
297         auto metadata_hostname =
298             call->context.GetServerInitialMetadata().find("hostname");
299         std::string hostname =
300             metadata_hostname != call->context.GetServerInitialMetadata().end()
301                 ? std::string(metadata_hostname->second.data(),
302                               metadata_hostname->second.length())
303                 : call->simple_response.hostname();
304         for (auto watcher : stats_watchers_->watchers) {
305           watcher->RpcCompleted(call->saved_request_id, call->rpc_type,
306                                 hostname);
307         }
308       }
309
310       if (!RpcStatusCheckSuccess(call)) {
311         if (absl::GetFlag(FLAGS_print_response) ||
312             absl::GetFlag(FLAGS_fail_on_failed_rpc)) {
313           std::cout << "RPC failed: " << call->status.error_code() << ": "
314                     << call->status.error_message() << std::endl;
315         }
316         if (absl::GetFlag(FLAGS_fail_on_failed_rpc) &&
317             one_rpc_succeeded.load()) {
318           abort();
319         }
320       } else {
321         if (absl::GetFlag(FLAGS_print_response)) {
322           auto metadata_hostname =
323               call->context.GetServerInitialMetadata().find("hostname");
324           std::string hostname =
325               metadata_hostname !=
326                       call->context.GetServerInitialMetadata().end()
327                   ? std::string(metadata_hostname->second.data(),
328                                 metadata_hostname->second.length())
329                   : call->simple_response.hostname();
330           std::cout << "Greeting: Hello world, this is " << hostname
331                     << ", from " << call->context.peer() << std::endl;
332         }
333         one_rpc_succeeded = true;
334       }
335
336       delete call;
337     }
338   }
339
340  private:
341   struct AsyncClientCall {
342     Empty empty_response;
343     SimpleResponse simple_response;
344     ClientContext context;
345     Status status;
346     int saved_request_id;
347     ClientConfigureRequest::RpcType rpc_type;
348     std::unique_ptr<ClientAsyncResponseReader<Empty>> empty_response_reader;
349     std::unique_ptr<ClientAsyncResponseReader<SimpleResponse>>
350         simple_response_reader;
351   };
352   static bool RpcStatusCheckSuccess(AsyncClientCall* call) {
353     // Determine RPC success based on expected status.
354     grpc_status_code code;
355     GPR_ASSERT(grpc_status_code_from_string(
356         absl::GetFlag(FLAGS_expect_status).c_str(), &code));
357     return code == static_cast<grpc_status_code>(call->status.error_code());
358   }
359
360   std::unique_ptr<TestService::Stub> stub_;
361   StatsWatchers* stats_watchers_;
362   CompletionQueue cq_;
363 };
364
365 class LoadBalancerStatsServiceImpl : public LoadBalancerStatsService::Service {
366  public:
367   explicit LoadBalancerStatsServiceImpl(StatsWatchers* stats_watchers)
368       : stats_watchers_(stats_watchers) {}
369
370   Status GetClientStats(ServerContext* context,
371                         const LoadBalancerStatsRequest* request,
372                         LoadBalancerStatsResponse* response) override {
373     int start_id;
374     int end_id;
375     XdsStatsWatcher* watcher;
376     {
377       std::lock_guard<std::mutex> lock(stats_watchers_->mu);
378       start_id = stats_watchers_->global_request_id + 1;
379       end_id = start_id + request->num_rpcs();
380       watcher = new XdsStatsWatcher(start_id, end_id);
381       stats_watchers_->watchers.insert(watcher);
382     }
383     watcher->WaitForRpcStatsResponse(response, request->timeout_sec());
384     {
385       std::lock_guard<std::mutex> lock(stats_watchers_->mu);
386       stats_watchers_->watchers.erase(watcher);
387     }
388     delete watcher;
389     return Status::OK;
390   }
391
392   Status GetClientAccumulatedStats(
393       ServerContext* context,
394       const LoadBalancerAccumulatedStatsRequest* request,
395       LoadBalancerAccumulatedStatsResponse* response) override {
396     std::lock_guard<std::mutex> lock(stats_watchers_->mu);
397     stats_watchers_->global_watcher->GetCurrentRpcStats(response,
398                                                         stats_watchers_);
399     return Status::OK;
400   }
401
402  private:
403   StatsWatchers* stats_watchers_;
404 };
405
406 class XdsUpdateClientConfigureServiceImpl
407     : public XdsUpdateClientConfigureService::Service {
408  public:
409   explicit XdsUpdateClientConfigureServiceImpl(
410       RpcConfigurationsQueue* rpc_configs_queue)
411       : rpc_configs_queue_(rpc_configs_queue) {}
412
413   Status Configure(ServerContext* context,
414                    const ClientConfigureRequest* request,
415                    ClientConfigureResponse* response) override {
416     std::map<int, std::vector<std::pair<std::string, std::string>>>
417         metadata_map;
418     for (const auto& data : request->metadata()) {
419       metadata_map[data.type()].push_back({data.key(), data.value()});
420     }
421     std::vector<RpcConfig> configs;
422     for (const auto& rpc : request->types()) {
423       RpcConfig config;
424       config.type = static_cast<ClientConfigureRequest::RpcType>(rpc);
425       auto metadata_iter = metadata_map.find(rpc);
426       if (metadata_iter != metadata_map.end()) {
427         config.metadata = metadata_iter->second;
428       }
429       configs.push_back(std::move(config));
430     }
431     {
432       std::lock_guard<std::mutex> lock(
433           rpc_configs_queue_->mu_rpc_configs_queue);
434       rpc_configs_queue_->rpc_configs_queue.emplace_back(std::move(configs));
435     }
436     return Status::OK;
437   }
438
439  private:
440   RpcConfigurationsQueue* rpc_configs_queue_;
441 };
442
443 void RunTestLoop(std::chrono::duration<double> duration_per_query,
444                  StatsWatchers* stats_watchers,
445                  RpcConfigurationsQueue* rpc_configs_queue) {
446   TestClient client(grpc::CreateChannel(absl::GetFlag(FLAGS_server),
447                                         grpc::InsecureChannelCredentials()),
448                     stats_watchers);
449   std::chrono::time_point<std::chrono::system_clock> start =
450       std::chrono::system_clock::now();
451   std::chrono::duration<double> elapsed;
452
453   std::thread thread = std::thread(&TestClient::AsyncCompleteRpc, &client);
454
455   std::vector<RpcConfig> configs;
456   while (true) {
457     {
458       std::lock_guard<std::mutex> lockk(
459           rpc_configs_queue->mu_rpc_configs_queue);
460       if (!rpc_configs_queue->rpc_configs_queue.empty()) {
461         configs = std::move(rpc_configs_queue->rpc_configs_queue.front());
462         rpc_configs_queue->rpc_configs_queue.pop_front();
463       }
464     }
465
466     elapsed = std::chrono::system_clock::now() - start;
467     if (elapsed > duration_per_query) {
468       start = std::chrono::system_clock::now();
469       for (const auto& config : configs) {
470         if (config.type == ClientConfigureRequest::EMPTY_CALL) {
471           client.AsyncEmptyCall(config.metadata);
472         } else if (config.type == ClientConfigureRequest::UNARY_CALL) {
473           client.AsyncUnaryCall(config.metadata);
474         } else {
475           GPR_ASSERT(0);
476         }
477       }
478     }
479   }
480   thread.join();
481 }
482
483 void RunServer(const int port, StatsWatchers* stats_watchers,
484                RpcConfigurationsQueue* rpc_configs_queue) {
485   GPR_ASSERT(port != 0);
486   std::ostringstream server_address;
487   server_address << "0.0.0.0:" << port;
488
489   LoadBalancerStatsServiceImpl stats_service(stats_watchers);
490   XdsUpdateClientConfigureServiceImpl client_config_service(rpc_configs_queue);
491
492   ServerBuilder builder;
493   builder.RegisterService(&stats_service);
494   builder.RegisterService(&client_config_service);
495   builder.AddListeningPort(server_address.str(),
496                            grpc::InsecureServerCredentials());
497   std::unique_ptr<Server> server(builder.BuildAndStart());
498   gpr_log(GPR_DEBUG, "Server listening on %s", server_address.str().c_str());
499
500   server->Wait();
501 }
502
503 void BuildRpcConfigsFromFlags(RpcConfigurationsQueue* rpc_configs_queue) {
504   // Store Metadata like
505   // "EmptyCall:key1:value1,UnaryCall:key1:value1,UnaryCall:key2:value2" into a
506   // map where the key is the RPC method and value is a vector of key:value
507   // pairs. {EmptyCall, [{key1,value1}],
508   //  UnaryCall, [{key1,value1}, {key2,value2}]}
509   std::vector<std::string> rpc_metadata =
510       absl::StrSplit(absl::GetFlag(FLAGS_metadata), ',', absl::SkipEmpty());
511   std::map<int, std::vector<std::pair<std::string, std::string>>> metadata_map;
512   for (auto& data : rpc_metadata) {
513     std::vector<std::string> metadata =
514         absl::StrSplit(data, ':', absl::SkipEmpty());
515     GPR_ASSERT(metadata.size() == 3);
516     if (metadata[0] == "EmptyCall") {
517       metadata_map[ClientConfigureRequest::EMPTY_CALL].push_back(
518           {metadata[1], metadata[2]});
519     } else if (metadata[0] == "UnaryCall") {
520       metadata_map[ClientConfigureRequest::UNARY_CALL].push_back(
521           {metadata[1], metadata[2]});
522     } else {
523       GPR_ASSERT(0);
524     }
525   }
526   std::vector<RpcConfig> configs;
527   std::vector<std::string> rpc_methods =
528       absl::StrSplit(absl::GetFlag(FLAGS_rpc), ',', absl::SkipEmpty());
529   for (const std::string& rpc_method : rpc_methods) {
530     RpcConfig config;
531     if (rpc_method == "EmptyCall") {
532       config.type = ClientConfigureRequest::EMPTY_CALL;
533     } else if (rpc_method == "UnaryCall") {
534       config.type = ClientConfigureRequest::UNARY_CALL;
535     } else {
536       GPR_ASSERT(0);
537     }
538     auto metadata_iter = metadata_map.find(config.type);
539     if (metadata_iter != metadata_map.end()) {
540       config.metadata = metadata_iter->second;
541     }
542     configs.push_back(std::move(config));
543   }
544   {
545     std::lock_guard<std::mutex> lock(rpc_configs_queue->mu_rpc_configs_queue);
546     rpc_configs_queue->rpc_configs_queue.emplace_back(std::move(configs));
547   }
548 }
549
550 int main(int argc, char** argv) {
551   grpc::testing::TestEnvironment env(argc, argv);
552   grpc::testing::InitTest(&argc, &argv, true);
553   // Validate the expect_status flag.
554   grpc_status_code code;
555   GPR_ASSERT(grpc_status_code_from_string(
556       absl::GetFlag(FLAGS_expect_status).c_str(), &code));
557   StatsWatchers stats_watchers;
558   RpcConfigurationsQueue rpc_config_queue;
559
560   {
561     std::lock_guard<std::mutex> lock(stats_watchers.mu);
562     stats_watchers.global_watcher = new XdsStatsWatcher(0, 0);
563     stats_watchers.watchers.insert(stats_watchers.global_watcher);
564   }
565
566   BuildRpcConfigsFromFlags(&rpc_config_queue);
567
568   std::chrono::duration<double> duration_per_query =
569       std::chrono::nanoseconds(std::chrono::seconds(1)) /
570       absl::GetFlag(FLAGS_qps);
571
572   std::vector<std::thread> test_threads;
573   test_threads.reserve(absl::GetFlag(FLAGS_num_channels));
574   for (int i = 0; i < absl::GetFlag(FLAGS_num_channels); i++) {
575     test_threads.emplace_back(std::thread(&RunTestLoop, duration_per_query,
576                                           &stats_watchers, &rpc_config_queue));
577   }
578
579   RunServer(absl::GetFlag(FLAGS_stats_port), &stats_watchers,
580             &rpc_config_queue);
581
582   for (auto it = test_threads.begin(); it != test_threads.end(); it++) {
583     it->join();
584   }
585
586   return 0;
587 }