7344905373a192475931af961f430284e2b08c39
[platform/upstream/grpc.git] / test / cpp / interop / xds_interop_client.cc
1 /*
2  *
3  * Copyright 2020 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpcpp/grpcpp.h>
20 #include <grpcpp/server.h>
21 #include <grpcpp/server_builder.h>
22 #include <grpcpp/server_context.h>
23
24 #include <atomic>
25 #include <chrono>
26 #include <condition_variable>
27 #include <deque>
28 #include <map>
29 #include <mutex>
30 #include <set>
31 #include <sstream>
32 #include <string>
33 #include <thread>
34 #include <vector>
35
36 #include "absl/algorithm/container.h"
37 #include "absl/flags/flag.h"
38 #include "absl/strings/str_split.h"
39 #include "src/core/lib/channel/status_util.h"
40 #include "src/core/lib/gpr/env.h"
41 #include "src/proto/grpc/testing/empty.pb.h"
42 #include "src/proto/grpc/testing/messages.pb.h"
43 #include "src/proto/grpc/testing/test.grpc.pb.h"
44 #include "test/core/util/test_config.h"
45 #include "test/cpp/util/test_config.h"
46
47 ABSL_FLAG(bool, fail_on_failed_rpc, false,
48           "Fail client if any RPCs fail after first successful RPC.");
49 ABSL_FLAG(int32_t, num_channels, 1, "Number of channels.");
50 ABSL_FLAG(bool, print_response, false, "Write RPC response to stdout.");
51 ABSL_FLAG(int32_t, qps, 1, "Qps per channel.");
52 // TODO(Capstan): Consider using absl::Duration
53 ABSL_FLAG(int32_t, rpc_timeout_sec, 30, "Per RPC timeout seconds.");
54 ABSL_FLAG(std::string, server, "localhost:50051", "Address of server.");
55 ABSL_FLAG(int32_t, stats_port, 50052,
56           "Port to expose peer distribution stats service.");
57 ABSL_FLAG(std::string, rpc, "UnaryCall",
58           "a comma separated list of rpc methods.");
59 ABSL_FLAG(std::string, metadata, "", "metadata to send with the RPC.");
60 ABSL_FLAG(std::string, expect_status, "OK",
61           "RPC status for the test RPC to be considered successful");
62 ABSL_FLAG(
63     bool, secure_mode, false,
64     "If true, XdsCredentials are used, InsecureChannelCredentials otherwise");
65
66 using grpc::Channel;
67 using grpc::ClientAsyncResponseReader;
68 using grpc::ClientContext;
69 using grpc::CompletionQueue;
70 using grpc::Server;
71 using grpc::ServerBuilder;
72 using grpc::ServerContext;
73 using grpc::Status;
74 using grpc::testing::ClientConfigureRequest;
75 using grpc::testing::ClientConfigureRequest_RpcType_Name;
76 using grpc::testing::ClientConfigureResponse;
77 using grpc::testing::Empty;
78 using grpc::testing::LoadBalancerAccumulatedStatsRequest;
79 using grpc::testing::LoadBalancerAccumulatedStatsResponse;
80 using grpc::testing::LoadBalancerStatsRequest;
81 using grpc::testing::LoadBalancerStatsResponse;
82 using grpc::testing::LoadBalancerStatsService;
83 using grpc::testing::SimpleRequest;
84 using grpc::testing::SimpleResponse;
85 using grpc::testing::TestService;
86 using grpc::testing::XdsUpdateClientConfigureService;
87
88 class XdsStatsWatcher;
89
90 struct StatsWatchers {
91   // Unique ID for each outgoing RPC
92   int global_request_id = 0;
93   // Unique ID for each outgoing RPC by RPC method type
94   std::map<int, int> global_request_id_by_type;
95   // Stores a set of watchers that should be notified upon outgoing RPC
96   // completion
97   std::set<XdsStatsWatcher*> watchers;
98   // Global watcher for accumululated stats.
99   XdsStatsWatcher* global_watcher;
100   // Mutex for global_request_id and watchers
101   std::mutex mu;
102 };
103 // Whether at least one RPC has succeeded, indicating xDS resolution completed.
104 std::atomic<bool> one_rpc_succeeded(false);
105 // RPC configuration detailing how RPC should be sent.
106 struct RpcConfig {
107   ClientConfigureRequest::RpcType type;
108   std::vector<std::pair<std::string, std::string>> metadata;
109   int timeout_sec = 0;
110 };
111 struct RpcConfigurationsQueue {
112   // A queue of RPC configurations detailing how RPCs should be sent.
113   std::deque<std::vector<RpcConfig>> rpc_configs_queue;
114   // Mutex for rpc_configs_queue
115   std::mutex mu_rpc_configs_queue;
116 };
117 struct AsyncClientCall {
118   Empty empty_response;
119   SimpleResponse simple_response;
120   ClientContext context;
121   Status status;
122   int saved_request_id;
123   ClientConfigureRequest::RpcType rpc_type;
124   std::unique_ptr<ClientAsyncResponseReader<Empty>> empty_response_reader;
125   std::unique_ptr<ClientAsyncResponseReader<SimpleResponse>>
126       simple_response_reader;
127 };
128
129 /** Records the remote peer distribution for a given range of RPCs. */
130 class XdsStatsWatcher {
131  public:
132   XdsStatsWatcher(int start_id, int end_id)
133       : start_id_(start_id), end_id_(end_id), rpcs_needed_(end_id - start_id) {}
134
135   // Upon the completion of an RPC, we will look at the request_id, the
136   // rpc_type, and the peer the RPC was sent to in order to count
137   // this RPC into the right stats bin.
138   void RpcCompleted(AsyncClientCall* call, const std::string& peer) {
139     // We count RPCs for global watcher or if the request_id falls into the
140     // watcher's interested range of request ids.
141     if ((start_id_ == 0 && end_id_ == 0) ||
142         (start_id_ <= call->saved_request_id &&
143          call->saved_request_id < end_id_)) {
144       {
145         std::lock_guard<std::mutex> lock(m_);
146         if (peer.empty()) {
147           no_remote_peer_++;
148           ++no_remote_peer_by_type_[call->rpc_type];
149         } else {
150           // RPC is counted into both per-peer bin and per-method-per-peer bin.
151           rpcs_by_peer_[peer]++;
152           rpcs_by_type_[call->rpc_type][peer]++;
153         }
154         rpcs_needed_--;
155         // Report accumulated stats.
156         auto& stats_per_method = *accumulated_stats_.mutable_stats_per_method();
157         auto& method_stat =
158             stats_per_method[ClientConfigureRequest_RpcType_Name(
159                 call->rpc_type)];
160         auto& result = *method_stat.mutable_result();
161         grpc_status_code code =
162             static_cast<grpc_status_code>(call->status.error_code());
163         auto& num_rpcs = result[code];
164         ++num_rpcs;
165         auto rpcs_started = method_stat.rpcs_started();
166         method_stat.set_rpcs_started(++rpcs_started);
167       }
168       cv_.notify_one();
169     }
170   }
171
172   void WaitForRpcStatsResponse(LoadBalancerStatsResponse* response,
173                                int timeout_sec) {
174     std::unique_lock<std::mutex> lock(m_);
175     cv_.wait_for(lock, std::chrono::seconds(timeout_sec),
176                  [this] { return rpcs_needed_ == 0; });
177     response->mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(),
178                                              rpcs_by_peer_.end());
179     auto& response_rpcs_by_method = *response->mutable_rpcs_by_method();
180     for (const auto& rpc_by_type : rpcs_by_type_) {
181       std::string method_name;
182       if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) {
183         method_name = "EmptyCall";
184       } else if (rpc_by_type.first == ClientConfigureRequest::UNARY_CALL) {
185         method_name = "UnaryCall";
186       } else {
187         GPR_ASSERT(0);
188       }
189       // TODO(@donnadionne): When the test runner changes to accept EMPTY_CALL
190       // and UNARY_CALL we will just use the name of the enum instead of the
191       // method_name variable.
192       auto& response_rpc_by_method = response_rpcs_by_method[method_name];
193       auto& response_rpcs_by_peer =
194           *response_rpc_by_method.mutable_rpcs_by_peer();
195       for (const auto& rpc_by_peer : rpc_by_type.second) {
196         auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first];
197         response_rpc_by_peer = rpc_by_peer.second;
198       }
199     }
200     response->set_num_failures(no_remote_peer_ + rpcs_needed_);
201   }
202
203   void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response,
204                           StatsWatchers* stats_watchers) {
205     std::unique_lock<std::mutex> lock(m_);
206     response->CopyFrom(accumulated_stats_);
207     // TODO(@donnadionne): delete deprecated stats below when the test is no
208     // longer using them.
209     auto& response_rpcs_started_by_method =
210         *response->mutable_num_rpcs_started_by_method();
211     auto& response_rpcs_succeeded_by_method =
212         *response->mutable_num_rpcs_succeeded_by_method();
213     auto& response_rpcs_failed_by_method =
214         *response->mutable_num_rpcs_failed_by_method();
215     for (const auto& rpc_by_type : rpcs_by_type_) {
216       auto total_succeeded = 0;
217       for (const auto& rpc_by_peer : rpc_by_type.second) {
218         total_succeeded += rpc_by_peer.second;
219       }
220       response_rpcs_succeeded_by_method[ClientConfigureRequest_RpcType_Name(
221           rpc_by_type.first)] = total_succeeded;
222       response_rpcs_started_by_method[ClientConfigureRequest_RpcType_Name(
223           rpc_by_type.first)] =
224           stats_watchers->global_request_id_by_type[rpc_by_type.first];
225       response_rpcs_failed_by_method[ClientConfigureRequest_RpcType_Name(
226           rpc_by_type.first)] = no_remote_peer_by_type_[rpc_by_type.first];
227     }
228   }
229
230  private:
231   int start_id_;
232   int end_id_;
233   int rpcs_needed_;
234   int no_remote_peer_ = 0;
235   std::map<int, int> no_remote_peer_by_type_;
236   // A map of stats keyed by peer name.
237   std::map<std::string, int> rpcs_by_peer_;
238   // A two-level map of stats keyed at top level by RPC method and second level
239   // by peer name.
240   std::map<int, std::map<std::string, int>> rpcs_by_type_;
241   // Storing accumulated stats in the response proto format.
242   LoadBalancerAccumulatedStatsResponse accumulated_stats_;
243   std::mutex m_;
244   std::condition_variable cv_;
245 };
246
247 class TestClient {
248  public:
249   TestClient(const std::shared_ptr<Channel>& channel,
250              StatsWatchers* stats_watchers)
251       : stub_(TestService::NewStub(channel)), stats_watchers_(stats_watchers) {}
252
253   void AsyncUnaryCall(const RpcConfig& config) {
254     SimpleResponse response;
255     int saved_request_id;
256     {
257       std::lock_guard<std::mutex> lock(stats_watchers_->mu);
258       saved_request_id = ++stats_watchers_->global_request_id;
259       ++stats_watchers_
260             ->global_request_id_by_type[ClientConfigureRequest::UNARY_CALL];
261     }
262     std::chrono::system_clock::time_point deadline =
263         std::chrono::system_clock::now() +
264         std::chrono::seconds(config.timeout_sec != 0
265                                  ? config.timeout_sec
266                                  : absl::GetFlag(FLAGS_rpc_timeout_sec));
267     AsyncClientCall* call = new AsyncClientCall;
268     for (const auto& data : config.metadata) {
269       call->context.AddMetadata(data.first, data.second);
270       // TODO(@donnadionne): move deadline to separate proto.
271       if (data.first == "rpc-behavior" && data.second == "keep-open") {
272         deadline =
273             std::chrono::system_clock::now() + std::chrono::seconds(INT_MAX);
274       }
275     }
276     call->context.set_deadline(deadline);
277     call->saved_request_id = saved_request_id;
278     call->rpc_type = ClientConfigureRequest::UNARY_CALL;
279     call->simple_response_reader = stub_->PrepareAsyncUnaryCall(
280         &call->context, SimpleRequest::default_instance(), &cq_);
281     call->simple_response_reader->StartCall();
282     call->simple_response_reader->Finish(&call->simple_response, &call->status,
283                                          call);
284   }
285
286   void AsyncEmptyCall(const RpcConfig& config) {
287     Empty response;
288     int saved_request_id;
289     {
290       std::lock_guard<std::mutex> lock(stats_watchers_->mu);
291       saved_request_id = ++stats_watchers_->global_request_id;
292       ++stats_watchers_
293             ->global_request_id_by_type[ClientConfigureRequest::EMPTY_CALL];
294     }
295     std::chrono::system_clock::time_point deadline =
296         std::chrono::system_clock::now() +
297         std::chrono::seconds(config.timeout_sec != 0
298                                  ? config.timeout_sec
299                                  : absl::GetFlag(FLAGS_rpc_timeout_sec));
300     AsyncClientCall* call = new AsyncClientCall;
301     for (const auto& data : config.metadata) {
302       call->context.AddMetadata(data.first, data.second);
303       // TODO(@donnadionne): move deadline to separate proto.
304       if (data.first == "rpc-behavior" && data.second == "keep-open") {
305         deadline =
306             std::chrono::system_clock::now() + std::chrono::seconds(INT_MAX);
307       }
308     }
309     call->context.set_deadline(deadline);
310     call->saved_request_id = saved_request_id;
311     call->rpc_type = ClientConfigureRequest::EMPTY_CALL;
312     call->empty_response_reader = stub_->PrepareAsyncEmptyCall(
313         &call->context, Empty::default_instance(), &cq_);
314     call->empty_response_reader->StartCall();
315     call->empty_response_reader->Finish(&call->empty_response, &call->status,
316                                         call);
317   }
318
319   void AsyncCompleteRpc() {
320     void* got_tag;
321     bool ok = false;
322     while (cq_.Next(&got_tag, &ok)) {
323       AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
324       GPR_ASSERT(ok);
325       {
326         std::lock_guard<std::mutex> lock(stats_watchers_->mu);
327         auto server_initial_metadata = call->context.GetServerInitialMetadata();
328         auto metadata_hostname =
329             call->context.GetServerInitialMetadata().find("hostname");
330         std::string hostname =
331             metadata_hostname != call->context.GetServerInitialMetadata().end()
332                 ? std::string(metadata_hostname->second.data(),
333                               metadata_hostname->second.length())
334                 : call->simple_response.hostname();
335         for (auto watcher : stats_watchers_->watchers) {
336           watcher->RpcCompleted(call, hostname);
337         }
338       }
339
340       if (!RpcStatusCheckSuccess(call)) {
341         if (absl::GetFlag(FLAGS_print_response) ||
342             absl::GetFlag(FLAGS_fail_on_failed_rpc)) {
343           std::cout << "RPC failed: " << call->status.error_code() << ": "
344                     << call->status.error_message() << std::endl;
345         }
346         if (absl::GetFlag(FLAGS_fail_on_failed_rpc) &&
347             one_rpc_succeeded.load()) {
348           abort();
349         }
350       } else {
351         if (absl::GetFlag(FLAGS_print_response)) {
352           auto metadata_hostname =
353               call->context.GetServerInitialMetadata().find("hostname");
354           std::string hostname =
355               metadata_hostname !=
356                       call->context.GetServerInitialMetadata().end()
357                   ? std::string(metadata_hostname->second.data(),
358                                 metadata_hostname->second.length())
359                   : call->simple_response.hostname();
360           std::cout << "Greeting: Hello world, this is " << hostname
361                     << ", from " << call->context.peer() << std::endl;
362         }
363         one_rpc_succeeded = true;
364       }
365
366       delete call;
367     }
368   }
369
370  private:
371   static bool RpcStatusCheckSuccess(AsyncClientCall* call) {
372     // Determine RPC success based on expected status.
373     grpc_status_code code;
374     GPR_ASSERT(grpc_status_code_from_string(
375         absl::GetFlag(FLAGS_expect_status).c_str(), &code));
376     return code == static_cast<grpc_status_code>(call->status.error_code());
377   }
378
379   std::unique_ptr<TestService::Stub> stub_;
380   StatsWatchers* stats_watchers_;
381   CompletionQueue cq_;
382 };
383
384 class LoadBalancerStatsServiceImpl : public LoadBalancerStatsService::Service {
385  public:
386   explicit LoadBalancerStatsServiceImpl(StatsWatchers* stats_watchers)
387       : stats_watchers_(stats_watchers) {}
388
389   Status GetClientStats(ServerContext* /*context*/,
390                         const LoadBalancerStatsRequest* request,
391                         LoadBalancerStatsResponse* response) override {
392     int start_id;
393     int end_id;
394     XdsStatsWatcher* watcher;
395     {
396       std::lock_guard<std::mutex> lock(stats_watchers_->mu);
397       start_id = stats_watchers_->global_request_id + 1;
398       end_id = start_id + request->num_rpcs();
399       watcher = new XdsStatsWatcher(start_id, end_id);
400       stats_watchers_->watchers.insert(watcher);
401     }
402     watcher->WaitForRpcStatsResponse(response, request->timeout_sec());
403     {
404       std::lock_guard<std::mutex> lock(stats_watchers_->mu);
405       stats_watchers_->watchers.erase(watcher);
406     }
407     delete watcher;
408     return Status::OK;
409   }
410
411   Status GetClientAccumulatedStats(
412       ServerContext* /*context*/,
413       const LoadBalancerAccumulatedStatsRequest* /*request*/,
414       LoadBalancerAccumulatedStatsResponse* response) override {
415     std::lock_guard<std::mutex> lock(stats_watchers_->mu);
416     stats_watchers_->global_watcher->GetCurrentRpcStats(response,
417                                                         stats_watchers_);
418     return Status::OK;
419   }
420
421  private:
422   StatsWatchers* stats_watchers_;
423 };
424
425 class XdsUpdateClientConfigureServiceImpl
426     : public XdsUpdateClientConfigureService::Service {
427  public:
428   explicit XdsUpdateClientConfigureServiceImpl(
429       RpcConfigurationsQueue* rpc_configs_queue)
430       : rpc_configs_queue_(rpc_configs_queue) {}
431
432   Status Configure(ServerContext* /*context*/,
433                    const ClientConfigureRequest* request,
434                    ClientConfigureResponse* /*response*/) override {
435     std::map<int, std::vector<std::pair<std::string, std::string>>>
436         metadata_map;
437     for (const auto& data : request->metadata()) {
438       metadata_map[data.type()].push_back({data.key(), data.value()});
439     }
440     std::vector<RpcConfig> configs;
441     for (const auto& rpc : request->types()) {
442       RpcConfig config;
443       config.timeout_sec = request->timeout_sec();
444       config.type = static_cast<ClientConfigureRequest::RpcType>(rpc);
445       auto metadata_iter = metadata_map.find(rpc);
446       if (metadata_iter != metadata_map.end()) {
447         config.metadata = metadata_iter->second;
448       }
449       configs.push_back(std::move(config));
450     }
451     {
452       std::lock_guard<std::mutex> lock(
453           rpc_configs_queue_->mu_rpc_configs_queue);
454       rpc_configs_queue_->rpc_configs_queue.emplace_back(std::move(configs));
455     }
456     return Status::OK;
457   }
458
459  private:
460   RpcConfigurationsQueue* rpc_configs_queue_;
461 };
462
463 void RunTestLoop(std::chrono::duration<double> duration_per_query,
464                  StatsWatchers* stats_watchers,
465                  RpcConfigurationsQueue* rpc_configs_queue) {
466   TestClient client(
467       grpc::CreateChannel(absl::GetFlag(FLAGS_server),
468                           absl::GetFlag(FLAGS_secure_mode)
469                               ? grpc::experimental::XdsCredentials(
470                                     grpc::InsecureChannelCredentials())
471                               : grpc::InsecureChannelCredentials()),
472       stats_watchers);
473   std::chrono::time_point<std::chrono::system_clock> start =
474       std::chrono::system_clock::now();
475   std::chrono::duration<double> elapsed;
476
477   std::thread thread = std::thread(&TestClient::AsyncCompleteRpc, &client);
478
479   std::vector<RpcConfig> configs;
480   while (true) {
481     {
482       std::lock_guard<std::mutex> lockk(
483           rpc_configs_queue->mu_rpc_configs_queue);
484       if (!rpc_configs_queue->rpc_configs_queue.empty()) {
485         configs = std::move(rpc_configs_queue->rpc_configs_queue.front());
486         rpc_configs_queue->rpc_configs_queue.pop_front();
487       }
488     }
489
490     elapsed = std::chrono::system_clock::now() - start;
491     if (elapsed > duration_per_query) {
492       start = std::chrono::system_clock::now();
493       for (const auto& config : configs) {
494         if (config.type == ClientConfigureRequest::EMPTY_CALL) {
495           client.AsyncEmptyCall(config);
496         } else if (config.type == ClientConfigureRequest::UNARY_CALL) {
497           client.AsyncUnaryCall(config);
498         } else {
499           GPR_ASSERT(0);
500         }
501       }
502     }
503   }
504   thread.join();
505 }
506
507 void RunServer(const int port, StatsWatchers* stats_watchers,
508                RpcConfigurationsQueue* rpc_configs_queue) {
509   GPR_ASSERT(port != 0);
510   std::ostringstream server_address;
511   server_address << "0.0.0.0:" << port;
512
513   LoadBalancerStatsServiceImpl stats_service(stats_watchers);
514   XdsUpdateClientConfigureServiceImpl client_config_service(rpc_configs_queue);
515
516   ServerBuilder builder;
517   builder.RegisterService(&stats_service);
518   builder.RegisterService(&client_config_service);
519   builder.AddListeningPort(server_address.str(),
520                            grpc::InsecureServerCredentials());
521   std::unique_ptr<Server> server(builder.BuildAndStart());
522   gpr_log(GPR_DEBUG, "Server listening on %s", server_address.str().c_str());
523
524   server->Wait();
525 }
526
527 void BuildRpcConfigsFromFlags(RpcConfigurationsQueue* rpc_configs_queue) {
528   // Store Metadata like
529   // "EmptyCall:key1:value1,UnaryCall:key1:value1,UnaryCall:key2:value2" into a
530   // map where the key is the RPC method and value is a vector of key:value
531   // pairs. {EmptyCall, [{key1,value1}],
532   //  UnaryCall, [{key1,value1}, {key2,value2}]}
533   std::vector<std::string> rpc_metadata =
534       absl::StrSplit(absl::GetFlag(FLAGS_metadata), ',', absl::SkipEmpty());
535   std::map<int, std::vector<std::pair<std::string, std::string>>> metadata_map;
536   for (auto& data : rpc_metadata) {
537     std::vector<std::string> metadata =
538         absl::StrSplit(data, ':', absl::SkipEmpty());
539     GPR_ASSERT(metadata.size() == 3);
540     if (metadata[0] == "EmptyCall") {
541       metadata_map[ClientConfigureRequest::EMPTY_CALL].push_back(
542           {metadata[1], metadata[2]});
543     } else if (metadata[0] == "UnaryCall") {
544       metadata_map[ClientConfigureRequest::UNARY_CALL].push_back(
545           {metadata[1], metadata[2]});
546     } else {
547       GPR_ASSERT(0);
548     }
549   }
550   std::vector<RpcConfig> configs;
551   std::vector<std::string> rpc_methods =
552       absl::StrSplit(absl::GetFlag(FLAGS_rpc), ',', absl::SkipEmpty());
553   for (const std::string& rpc_method : rpc_methods) {
554     RpcConfig config;
555     if (rpc_method == "EmptyCall") {
556       config.type = ClientConfigureRequest::EMPTY_CALL;
557     } else if (rpc_method == "UnaryCall") {
558       config.type = ClientConfigureRequest::UNARY_CALL;
559     } else {
560       GPR_ASSERT(0);
561     }
562     auto metadata_iter = metadata_map.find(config.type);
563     if (metadata_iter != metadata_map.end()) {
564       config.metadata = metadata_iter->second;
565     }
566     configs.push_back(std::move(config));
567   }
568   {
569     std::lock_guard<std::mutex> lock(rpc_configs_queue->mu_rpc_configs_queue);
570     rpc_configs_queue->rpc_configs_queue.emplace_back(std::move(configs));
571   }
572 }
573
574 int main(int argc, char** argv) {
575   grpc::testing::TestEnvironment env(argc, argv);
576   grpc::testing::InitTest(&argc, &argv, true);
577   // Validate the expect_status flag.
578   grpc_status_code code;
579   GPR_ASSERT(grpc_status_code_from_string(
580       absl::GetFlag(FLAGS_expect_status).c_str(), &code));
581   StatsWatchers stats_watchers;
582   RpcConfigurationsQueue rpc_config_queue;
583
584   {
585     std::lock_guard<std::mutex> lock(stats_watchers.mu);
586     stats_watchers.global_watcher = new XdsStatsWatcher(0, 0);
587     stats_watchers.watchers.insert(stats_watchers.global_watcher);
588   }
589
590   BuildRpcConfigsFromFlags(&rpc_config_queue);
591
592   std::chrono::duration<double> duration_per_query =
593       std::chrono::nanoseconds(std::chrono::seconds(1)) /
594       absl::GetFlag(FLAGS_qps);
595
596   std::vector<std::thread> test_threads;
597   test_threads.reserve(absl::GetFlag(FLAGS_num_channels));
598   for (int i = 0; i < absl::GetFlag(FLAGS_num_channels); i++) {
599     test_threads.emplace_back(std::thread(&RunTestLoop, duration_per_query,
600                                           &stats_watchers, &rpc_config_queue));
601   }
602
603   RunServer(absl::GetFlag(FLAGS_stats_port), &stats_watchers,
604             &rpc_config_queue);
605
606   for (auto it = test_threads.begin(); it != test_threads.end(); it++) {
607     it->join();
608   }
609
610   return 0;
611 }