3 * Copyright 2020 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpcpp/grpcpp.h>
20 #include <grpcpp/server.h>
21 #include <grpcpp/server_builder.h>
22 #include <grpcpp/server_context.h>
26 #include <condition_variable>
36 #include "absl/algorithm/container.h"
37 #include "absl/flags/flag.h"
38 #include "absl/strings/str_split.h"
39 #include "src/core/lib/channel/status_util.h"
40 #include "src/core/lib/gpr/env.h"
41 #include "src/proto/grpc/testing/empty.pb.h"
42 #include "src/proto/grpc/testing/messages.pb.h"
43 #include "src/proto/grpc/testing/test.grpc.pb.h"
44 #include "test/core/util/test_config.h"
45 #include "test/cpp/util/test_config.h"
47 ABSL_FLAG(bool, fail_on_failed_rpc, false,
48 "Fail client if any RPCs fail after first successful RPC.");
49 ABSL_FLAG(int32_t, num_channels, 1, "Number of channels.");
50 ABSL_FLAG(bool, print_response, false, "Write RPC response to stdout.");
51 ABSL_FLAG(int32_t, qps, 1, "Qps per channel.");
52 // TODO(Capstan): Consider using absl::Duration
53 ABSL_FLAG(int32_t, rpc_timeout_sec, 30, "Per RPC timeout seconds.");
54 ABSL_FLAG(std::string, server, "localhost:50051", "Address of server.");
55 ABSL_FLAG(int32_t, stats_port, 50052,
56 "Port to expose peer distribution stats service.");
57 ABSL_FLAG(std::string, rpc, "UnaryCall",
58 "a comma separated list of rpc methods.");
59 ABSL_FLAG(std::string, metadata, "", "metadata to send with the RPC.");
60 ABSL_FLAG(std::string, expect_status, "OK",
61 "RPC status for the test RPC to be considered successful");
63 bool, secure_mode, false,
64 "If true, XdsCredentials are used, InsecureChannelCredentials otherwise");
67 using grpc::ClientAsyncResponseReader;
68 using grpc::ClientContext;
69 using grpc::CompletionQueue;
71 using grpc::ServerBuilder;
72 using grpc::ServerContext;
74 using grpc::testing::ClientConfigureRequest;
75 using grpc::testing::ClientConfigureRequest_RpcType_Name;
76 using grpc::testing::ClientConfigureResponse;
77 using grpc::testing::Empty;
78 using grpc::testing::LoadBalancerAccumulatedStatsRequest;
79 using grpc::testing::LoadBalancerAccumulatedStatsResponse;
80 using grpc::testing::LoadBalancerStatsRequest;
81 using grpc::testing::LoadBalancerStatsResponse;
82 using grpc::testing::LoadBalancerStatsService;
83 using grpc::testing::SimpleRequest;
84 using grpc::testing::SimpleResponse;
85 using grpc::testing::TestService;
86 using grpc::testing::XdsUpdateClientConfigureService;
88 class XdsStatsWatcher;
90 struct StatsWatchers {
91 // Unique ID for each outgoing RPC
92 int global_request_id = 0;
93 // Unique ID for each outgoing RPC by RPC method type
94 std::map<int, int> global_request_id_by_type;
95 // Stores a set of watchers that should be notified upon outgoing RPC
97 std::set<XdsStatsWatcher*> watchers;
98 // Global watcher for accumululated stats.
99 XdsStatsWatcher* global_watcher;
100 // Mutex for global_request_id and watchers
103 // Whether at least one RPC has succeeded, indicating xDS resolution completed.
104 std::atomic<bool> one_rpc_succeeded(false);
105 // RPC configuration detailing how RPC should be sent.
107 ClientConfigureRequest::RpcType type;
108 std::vector<std::pair<std::string, std::string>> metadata;
111 struct RpcConfigurationsQueue {
112 // A queue of RPC configurations detailing how RPCs should be sent.
113 std::deque<std::vector<RpcConfig>> rpc_configs_queue;
114 // Mutex for rpc_configs_queue
115 std::mutex mu_rpc_configs_queue;
117 struct AsyncClientCall {
118 Empty empty_response;
119 SimpleResponse simple_response;
120 ClientContext context;
122 int saved_request_id;
123 ClientConfigureRequest::RpcType rpc_type;
124 std::unique_ptr<ClientAsyncResponseReader<Empty>> empty_response_reader;
125 std::unique_ptr<ClientAsyncResponseReader<SimpleResponse>>
126 simple_response_reader;
129 /** Records the remote peer distribution for a given range of RPCs. */
130 class XdsStatsWatcher {
132 XdsStatsWatcher(int start_id, int end_id)
133 : start_id_(start_id), end_id_(end_id), rpcs_needed_(end_id - start_id) {}
135 // Upon the completion of an RPC, we will look at the request_id, the
136 // rpc_type, and the peer the RPC was sent to in order to count
137 // this RPC into the right stats bin.
138 void RpcCompleted(AsyncClientCall* call, const std::string& peer) {
139 // We count RPCs for global watcher or if the request_id falls into the
140 // watcher's interested range of request ids.
141 if ((start_id_ == 0 && end_id_ == 0) ||
142 (start_id_ <= call->saved_request_id &&
143 call->saved_request_id < end_id_)) {
145 std::lock_guard<std::mutex> lock(m_);
148 ++no_remote_peer_by_type_[call->rpc_type];
150 // RPC is counted into both per-peer bin and per-method-per-peer bin.
151 rpcs_by_peer_[peer]++;
152 rpcs_by_type_[call->rpc_type][peer]++;
155 // Report accumulated stats.
156 auto& stats_per_method = *accumulated_stats_.mutable_stats_per_method();
158 stats_per_method[ClientConfigureRequest_RpcType_Name(
160 auto& result = *method_stat.mutable_result();
161 grpc_status_code code =
162 static_cast<grpc_status_code>(call->status.error_code());
163 auto& num_rpcs = result[code];
165 auto rpcs_started = method_stat.rpcs_started();
166 method_stat.set_rpcs_started(++rpcs_started);
172 void WaitForRpcStatsResponse(LoadBalancerStatsResponse* response,
174 std::unique_lock<std::mutex> lock(m_);
175 cv_.wait_for(lock, std::chrono::seconds(timeout_sec),
176 [this] { return rpcs_needed_ == 0; });
177 response->mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(),
178 rpcs_by_peer_.end());
179 auto& response_rpcs_by_method = *response->mutable_rpcs_by_method();
180 for (const auto& rpc_by_type : rpcs_by_type_) {
181 std::string method_name;
182 if (rpc_by_type.first == ClientConfigureRequest::EMPTY_CALL) {
183 method_name = "EmptyCall";
184 } else if (rpc_by_type.first == ClientConfigureRequest::UNARY_CALL) {
185 method_name = "UnaryCall";
189 // TODO(@donnadionne): When the test runner changes to accept EMPTY_CALL
190 // and UNARY_CALL we will just use the name of the enum instead of the
191 // method_name variable.
192 auto& response_rpc_by_method = response_rpcs_by_method[method_name];
193 auto& response_rpcs_by_peer =
194 *response_rpc_by_method.mutable_rpcs_by_peer();
195 for (const auto& rpc_by_peer : rpc_by_type.second) {
196 auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first];
197 response_rpc_by_peer = rpc_by_peer.second;
200 response->set_num_failures(no_remote_peer_ + rpcs_needed_);
203 void GetCurrentRpcStats(LoadBalancerAccumulatedStatsResponse* response,
204 StatsWatchers* stats_watchers) {
205 std::unique_lock<std::mutex> lock(m_);
206 response->CopyFrom(accumulated_stats_);
207 // TODO(@donnadionne): delete deprecated stats below when the test is no
208 // longer using them.
209 auto& response_rpcs_started_by_method =
210 *response->mutable_num_rpcs_started_by_method();
211 auto& response_rpcs_succeeded_by_method =
212 *response->mutable_num_rpcs_succeeded_by_method();
213 auto& response_rpcs_failed_by_method =
214 *response->mutable_num_rpcs_failed_by_method();
215 for (const auto& rpc_by_type : rpcs_by_type_) {
216 auto total_succeeded = 0;
217 for (const auto& rpc_by_peer : rpc_by_type.second) {
218 total_succeeded += rpc_by_peer.second;
220 response_rpcs_succeeded_by_method[ClientConfigureRequest_RpcType_Name(
221 rpc_by_type.first)] = total_succeeded;
222 response_rpcs_started_by_method[ClientConfigureRequest_RpcType_Name(
223 rpc_by_type.first)] =
224 stats_watchers->global_request_id_by_type[rpc_by_type.first];
225 response_rpcs_failed_by_method[ClientConfigureRequest_RpcType_Name(
226 rpc_by_type.first)] = no_remote_peer_by_type_[rpc_by_type.first];
234 int no_remote_peer_ = 0;
235 std::map<int, int> no_remote_peer_by_type_;
236 // A map of stats keyed by peer name.
237 std::map<std::string, int> rpcs_by_peer_;
238 // A two-level map of stats keyed at top level by RPC method and second level
240 std::map<int, std::map<std::string, int>> rpcs_by_type_;
241 // Storing accumulated stats in the response proto format.
242 LoadBalancerAccumulatedStatsResponse accumulated_stats_;
244 std::condition_variable cv_;
249 TestClient(const std::shared_ptr<Channel>& channel,
250 StatsWatchers* stats_watchers)
251 : stub_(TestService::NewStub(channel)), stats_watchers_(stats_watchers) {}
253 void AsyncUnaryCall(const RpcConfig& config) {
254 SimpleResponse response;
255 int saved_request_id;
257 std::lock_guard<std::mutex> lock(stats_watchers_->mu);
258 saved_request_id = ++stats_watchers_->global_request_id;
260 ->global_request_id_by_type[ClientConfigureRequest::UNARY_CALL];
262 std::chrono::system_clock::time_point deadline =
263 std::chrono::system_clock::now() +
264 std::chrono::seconds(config.timeout_sec != 0
266 : absl::GetFlag(FLAGS_rpc_timeout_sec));
267 AsyncClientCall* call = new AsyncClientCall;
268 for (const auto& data : config.metadata) {
269 call->context.AddMetadata(data.first, data.second);
270 // TODO(@donnadionne): move deadline to separate proto.
271 if (data.first == "rpc-behavior" && data.second == "keep-open") {
273 std::chrono::system_clock::now() + std::chrono::seconds(INT_MAX);
276 call->context.set_deadline(deadline);
277 call->saved_request_id = saved_request_id;
278 call->rpc_type = ClientConfigureRequest::UNARY_CALL;
279 call->simple_response_reader = stub_->PrepareAsyncUnaryCall(
280 &call->context, SimpleRequest::default_instance(), &cq_);
281 call->simple_response_reader->StartCall();
282 call->simple_response_reader->Finish(&call->simple_response, &call->status,
286 void AsyncEmptyCall(const RpcConfig& config) {
288 int saved_request_id;
290 std::lock_guard<std::mutex> lock(stats_watchers_->mu);
291 saved_request_id = ++stats_watchers_->global_request_id;
293 ->global_request_id_by_type[ClientConfigureRequest::EMPTY_CALL];
295 std::chrono::system_clock::time_point deadline =
296 std::chrono::system_clock::now() +
297 std::chrono::seconds(config.timeout_sec != 0
299 : absl::GetFlag(FLAGS_rpc_timeout_sec));
300 AsyncClientCall* call = new AsyncClientCall;
301 for (const auto& data : config.metadata) {
302 call->context.AddMetadata(data.first, data.second);
303 // TODO(@donnadionne): move deadline to separate proto.
304 if (data.first == "rpc-behavior" && data.second == "keep-open") {
306 std::chrono::system_clock::now() + std::chrono::seconds(INT_MAX);
309 call->context.set_deadline(deadline);
310 call->saved_request_id = saved_request_id;
311 call->rpc_type = ClientConfigureRequest::EMPTY_CALL;
312 call->empty_response_reader = stub_->PrepareAsyncEmptyCall(
313 &call->context, Empty::default_instance(), &cq_);
314 call->empty_response_reader->StartCall();
315 call->empty_response_reader->Finish(&call->empty_response, &call->status,
319 void AsyncCompleteRpc() {
322 while (cq_.Next(&got_tag, &ok)) {
323 AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
326 std::lock_guard<std::mutex> lock(stats_watchers_->mu);
327 auto server_initial_metadata = call->context.GetServerInitialMetadata();
328 auto metadata_hostname =
329 call->context.GetServerInitialMetadata().find("hostname");
330 std::string hostname =
331 metadata_hostname != call->context.GetServerInitialMetadata().end()
332 ? std::string(metadata_hostname->second.data(),
333 metadata_hostname->second.length())
334 : call->simple_response.hostname();
335 for (auto watcher : stats_watchers_->watchers) {
336 watcher->RpcCompleted(call, hostname);
340 if (!RpcStatusCheckSuccess(call)) {
341 if (absl::GetFlag(FLAGS_print_response) ||
342 absl::GetFlag(FLAGS_fail_on_failed_rpc)) {
343 std::cout << "RPC failed: " << call->status.error_code() << ": "
344 << call->status.error_message() << std::endl;
346 if (absl::GetFlag(FLAGS_fail_on_failed_rpc) &&
347 one_rpc_succeeded.load()) {
351 if (absl::GetFlag(FLAGS_print_response)) {
352 auto metadata_hostname =
353 call->context.GetServerInitialMetadata().find("hostname");
354 std::string hostname =
356 call->context.GetServerInitialMetadata().end()
357 ? std::string(metadata_hostname->second.data(),
358 metadata_hostname->second.length())
359 : call->simple_response.hostname();
360 std::cout << "Greeting: Hello world, this is " << hostname
361 << ", from " << call->context.peer() << std::endl;
363 one_rpc_succeeded = true;
371 static bool RpcStatusCheckSuccess(AsyncClientCall* call) {
372 // Determine RPC success based on expected status.
373 grpc_status_code code;
374 GPR_ASSERT(grpc_status_code_from_string(
375 absl::GetFlag(FLAGS_expect_status).c_str(), &code));
376 return code == static_cast<grpc_status_code>(call->status.error_code());
379 std::unique_ptr<TestService::Stub> stub_;
380 StatsWatchers* stats_watchers_;
384 class LoadBalancerStatsServiceImpl : public LoadBalancerStatsService::Service {
386 explicit LoadBalancerStatsServiceImpl(StatsWatchers* stats_watchers)
387 : stats_watchers_(stats_watchers) {}
389 Status GetClientStats(ServerContext* /*context*/,
390 const LoadBalancerStatsRequest* request,
391 LoadBalancerStatsResponse* response) override {
394 XdsStatsWatcher* watcher;
396 std::lock_guard<std::mutex> lock(stats_watchers_->mu);
397 start_id = stats_watchers_->global_request_id + 1;
398 end_id = start_id + request->num_rpcs();
399 watcher = new XdsStatsWatcher(start_id, end_id);
400 stats_watchers_->watchers.insert(watcher);
402 watcher->WaitForRpcStatsResponse(response, request->timeout_sec());
404 std::lock_guard<std::mutex> lock(stats_watchers_->mu);
405 stats_watchers_->watchers.erase(watcher);
411 Status GetClientAccumulatedStats(
412 ServerContext* /*context*/,
413 const LoadBalancerAccumulatedStatsRequest* /*request*/,
414 LoadBalancerAccumulatedStatsResponse* response) override {
415 std::lock_guard<std::mutex> lock(stats_watchers_->mu);
416 stats_watchers_->global_watcher->GetCurrentRpcStats(response,
422 StatsWatchers* stats_watchers_;
425 class XdsUpdateClientConfigureServiceImpl
426 : public XdsUpdateClientConfigureService::Service {
428 explicit XdsUpdateClientConfigureServiceImpl(
429 RpcConfigurationsQueue* rpc_configs_queue)
430 : rpc_configs_queue_(rpc_configs_queue) {}
432 Status Configure(ServerContext* /*context*/,
433 const ClientConfigureRequest* request,
434 ClientConfigureResponse* /*response*/) override {
435 std::map<int, std::vector<std::pair<std::string, std::string>>>
437 for (const auto& data : request->metadata()) {
438 metadata_map[data.type()].push_back({data.key(), data.value()});
440 std::vector<RpcConfig> configs;
441 for (const auto& rpc : request->types()) {
443 config.timeout_sec = request->timeout_sec();
444 config.type = static_cast<ClientConfigureRequest::RpcType>(rpc);
445 auto metadata_iter = metadata_map.find(rpc);
446 if (metadata_iter != metadata_map.end()) {
447 config.metadata = metadata_iter->second;
449 configs.push_back(std::move(config));
452 std::lock_guard<std::mutex> lock(
453 rpc_configs_queue_->mu_rpc_configs_queue);
454 rpc_configs_queue_->rpc_configs_queue.emplace_back(std::move(configs));
460 RpcConfigurationsQueue* rpc_configs_queue_;
463 void RunTestLoop(std::chrono::duration<double> duration_per_query,
464 StatsWatchers* stats_watchers,
465 RpcConfigurationsQueue* rpc_configs_queue) {
467 grpc::CreateChannel(absl::GetFlag(FLAGS_server),
468 absl::GetFlag(FLAGS_secure_mode)
469 ? grpc::experimental::XdsCredentials(
470 grpc::InsecureChannelCredentials())
471 : grpc::InsecureChannelCredentials()),
473 std::chrono::time_point<std::chrono::system_clock> start =
474 std::chrono::system_clock::now();
475 std::chrono::duration<double> elapsed;
477 std::thread thread = std::thread(&TestClient::AsyncCompleteRpc, &client);
479 std::vector<RpcConfig> configs;
482 std::lock_guard<std::mutex> lockk(
483 rpc_configs_queue->mu_rpc_configs_queue);
484 if (!rpc_configs_queue->rpc_configs_queue.empty()) {
485 configs = std::move(rpc_configs_queue->rpc_configs_queue.front());
486 rpc_configs_queue->rpc_configs_queue.pop_front();
490 elapsed = std::chrono::system_clock::now() - start;
491 if (elapsed > duration_per_query) {
492 start = std::chrono::system_clock::now();
493 for (const auto& config : configs) {
494 if (config.type == ClientConfigureRequest::EMPTY_CALL) {
495 client.AsyncEmptyCall(config);
496 } else if (config.type == ClientConfigureRequest::UNARY_CALL) {
497 client.AsyncUnaryCall(config);
507 void RunServer(const int port, StatsWatchers* stats_watchers,
508 RpcConfigurationsQueue* rpc_configs_queue) {
509 GPR_ASSERT(port != 0);
510 std::ostringstream server_address;
511 server_address << "0.0.0.0:" << port;
513 LoadBalancerStatsServiceImpl stats_service(stats_watchers);
514 XdsUpdateClientConfigureServiceImpl client_config_service(rpc_configs_queue);
516 ServerBuilder builder;
517 builder.RegisterService(&stats_service);
518 builder.RegisterService(&client_config_service);
519 builder.AddListeningPort(server_address.str(),
520 grpc::InsecureServerCredentials());
521 std::unique_ptr<Server> server(builder.BuildAndStart());
522 gpr_log(GPR_DEBUG, "Server listening on %s", server_address.str().c_str());
527 void BuildRpcConfigsFromFlags(RpcConfigurationsQueue* rpc_configs_queue) {
528 // Store Metadata like
529 // "EmptyCall:key1:value1,UnaryCall:key1:value1,UnaryCall:key2:value2" into a
530 // map where the key is the RPC method and value is a vector of key:value
531 // pairs. {EmptyCall, [{key1,value1}],
532 // UnaryCall, [{key1,value1}, {key2,value2}]}
533 std::vector<std::string> rpc_metadata =
534 absl::StrSplit(absl::GetFlag(FLAGS_metadata), ',', absl::SkipEmpty());
535 std::map<int, std::vector<std::pair<std::string, std::string>>> metadata_map;
536 for (auto& data : rpc_metadata) {
537 std::vector<std::string> metadata =
538 absl::StrSplit(data, ':', absl::SkipEmpty());
539 GPR_ASSERT(metadata.size() == 3);
540 if (metadata[0] == "EmptyCall") {
541 metadata_map[ClientConfigureRequest::EMPTY_CALL].push_back(
542 {metadata[1], metadata[2]});
543 } else if (metadata[0] == "UnaryCall") {
544 metadata_map[ClientConfigureRequest::UNARY_CALL].push_back(
545 {metadata[1], metadata[2]});
550 std::vector<RpcConfig> configs;
551 std::vector<std::string> rpc_methods =
552 absl::StrSplit(absl::GetFlag(FLAGS_rpc), ',', absl::SkipEmpty());
553 for (const std::string& rpc_method : rpc_methods) {
555 if (rpc_method == "EmptyCall") {
556 config.type = ClientConfigureRequest::EMPTY_CALL;
557 } else if (rpc_method == "UnaryCall") {
558 config.type = ClientConfigureRequest::UNARY_CALL;
562 auto metadata_iter = metadata_map.find(config.type);
563 if (metadata_iter != metadata_map.end()) {
564 config.metadata = metadata_iter->second;
566 configs.push_back(std::move(config));
569 std::lock_guard<std::mutex> lock(rpc_configs_queue->mu_rpc_configs_queue);
570 rpc_configs_queue->rpc_configs_queue.emplace_back(std::move(configs));
574 int main(int argc, char** argv) {
575 grpc::testing::TestEnvironment env(argc, argv);
576 grpc::testing::InitTest(&argc, &argv, true);
577 // Validate the expect_status flag.
578 grpc_status_code code;
579 GPR_ASSERT(grpc_status_code_from_string(
580 absl::GetFlag(FLAGS_expect_status).c_str(), &code));
581 StatsWatchers stats_watchers;
582 RpcConfigurationsQueue rpc_config_queue;
585 std::lock_guard<std::mutex> lock(stats_watchers.mu);
586 stats_watchers.global_watcher = new XdsStatsWatcher(0, 0);
587 stats_watchers.watchers.insert(stats_watchers.global_watcher);
590 BuildRpcConfigsFromFlags(&rpc_config_queue);
592 std::chrono::duration<double> duration_per_query =
593 std::chrono::nanoseconds(std::chrono::seconds(1)) /
594 absl::GetFlag(FLAGS_qps);
596 std::vector<std::thread> test_threads;
597 test_threads.reserve(absl::GetFlag(FLAGS_num_channels));
598 for (int i = 0; i < absl::GetFlag(FLAGS_num_channels); i++) {
599 test_threads.emplace_back(std::thread(&RunTestLoop, duration_per_query,
600 &stats_watchers, &rpc_config_queue));
603 RunServer(absl::GetFlag(FLAGS_stats_port), &stats_watchers,
606 for (auto it = test_threads.begin(); it != test_threads.end(); it++) {