3 * Copyright 2020 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
21 #include <condition_variable>
30 #include "absl/strings/str_split.h"
32 #include <gflags/gflags.h>
33 #include <grpcpp/grpcpp.h>
34 #include <grpcpp/server.h>
35 #include <grpcpp/server_builder.h>
36 #include <grpcpp/server_context.h>
38 #include "src/core/lib/gpr/env.h"
39 #include "src/proto/grpc/testing/empty.pb.h"
40 #include "src/proto/grpc/testing/messages.pb.h"
41 #include "src/proto/grpc/testing/test.grpc.pb.h"
42 #include "test/core/util/test_config.h"
43 #include "test/cpp/util/test_config.h"
45 DEFINE_bool(fail_on_failed_rpc, false,
46 "Fail client if any RPCs fail after first successful RPC.");
47 DEFINE_int32(num_channels, 1, "Number of channels.");
48 DEFINE_bool(print_response, false, "Write RPC response to stdout.");
49 DEFINE_int32(qps, 1, "Qps per channel.");
50 DEFINE_int32(rpc_timeout_sec, 30, "Per RPC timeout seconds.");
51 DEFINE_string(server, "localhost:50051", "Address of server.");
52 DEFINE_int32(stats_port, 50052,
53 "Port to expose peer distribution stats service.");
54 DEFINE_string(rpc, "UnaryCall", "a comma separated list of rpc methods.");
55 DEFINE_string(metadata, "", "metadata to send with the RPC.");
58 using grpc::ClientAsyncResponseReader;
59 using grpc::ClientContext;
60 using grpc::CompletionQueue;
62 using grpc::ServerBuilder;
63 using grpc::ServerContext;
64 using grpc::ServerCredentials;
65 using grpc::ServerReader;
66 using grpc::ServerReaderWriter;
67 using grpc::ServerWriter;
69 using grpc::testing::Empty;
70 using grpc::testing::LoadBalancerStatsRequest;
71 using grpc::testing::LoadBalancerStatsResponse;
72 using grpc::testing::LoadBalancerStatsService;
73 using grpc::testing::SimpleRequest;
74 using grpc::testing::SimpleResponse;
75 using grpc::testing::TestService;
77 class XdsStatsWatcher;
79 // Unique ID for each outgoing RPC
80 int global_request_id;
81 // Stores a set of watchers that should be notified upon outgoing RPC completion
82 std::set<XdsStatsWatcher*> watchers;
83 // Mutex for global_request_id and watchers
85 // Whether at least one RPC has succeeded, indicating xDS resolution completed.
86 std::atomic<bool> one_rpc_succeeded(false);
88 /** Records the remote peer distribution for a given range of RPCs. */
89 class XdsStatsWatcher {
91 XdsStatsWatcher(int start_id, int end_id)
92 : start_id_(start_id), end_id_(end_id), rpcs_needed_(end_id - start_id) {}
94 void RpcCompleted(int request_id, const std::string& rpc_method,
95 const std::string& peer) {
96 if (start_id_ <= request_id && request_id < end_id_) {
98 std::lock_guard<std::mutex> lk(m_);
102 rpcs_by_peer_[peer]++;
103 rpcs_by_method_[rpc_method][peer]++;
111 void WaitForRpcStatsResponse(LoadBalancerStatsResponse* response,
114 std::unique_lock<std::mutex> lk(m_);
115 cv_.wait_for(lk, std::chrono::seconds(timeout_sec),
116 [this] { return rpcs_needed_ == 0; });
117 response->mutable_rpcs_by_peer()->insert(rpcs_by_peer_.begin(),
118 rpcs_by_peer_.end());
119 auto& response_rpcs_by_method = *response->mutable_rpcs_by_method();
120 for (const auto& rpc_by_method : rpcs_by_method_) {
121 auto& response_rpc_by_method =
122 response_rpcs_by_method[rpc_by_method.first];
123 auto& response_rpcs_by_peer =
124 *response_rpc_by_method.mutable_rpcs_by_peer();
125 for (const auto& rpc_by_peer : rpc_by_method.second) {
126 auto& response_rpc_by_peer = response_rpcs_by_peer[rpc_by_peer.first];
127 response_rpc_by_peer = rpc_by_peer.second;
130 response->set_num_failures(no_remote_peer_ + rpcs_needed_);
138 int no_remote_peer_ = 0;
139 // A map of stats keyed by peer name.
140 std::map<std::string, int> rpcs_by_peer_;
141 // A two-level map of stats keyed at top level by RPC method and second level
143 std::map<std::string, std::map<std::string, int>> rpcs_by_method_;
145 std::condition_variable cv_;
150 TestClient(const std::shared_ptr<Channel>& channel)
151 : stub_(TestService::NewStub(channel)) {}
154 std::vector<std::pair<std::string, std::string>> metadata) {
155 SimpleResponse response;
156 int saved_request_id;
158 std::lock_guard<std::mutex> lk(mu);
159 saved_request_id = ++global_request_id;
161 std::chrono::system_clock::time_point deadline =
162 std::chrono::system_clock::now() +
163 std::chrono::seconds(FLAGS_rpc_timeout_sec);
164 AsyncClientCall* call = new AsyncClientCall;
165 call->context.set_deadline(deadline);
166 for (const auto& data : metadata) {
167 call->context.AddMetadata(data.first, data.second);
169 call->saved_request_id = saved_request_id;
170 call->rpc_method = "UnaryCall";
171 call->simple_response_reader = stub_->PrepareAsyncUnaryCall(
172 &call->context, SimpleRequest::default_instance(), &cq_);
173 call->simple_response_reader->StartCall();
174 call->simple_response_reader->Finish(&call->simple_response, &call->status,
179 std::vector<std::pair<std::string, std::string>> metadata) {
181 int saved_request_id;
183 std::lock_guard<std::mutex> lk(mu);
184 saved_request_id = ++global_request_id;
186 std::chrono::system_clock::time_point deadline =
187 std::chrono::system_clock::now() +
188 std::chrono::seconds(FLAGS_rpc_timeout_sec);
189 AsyncClientCall* call = new AsyncClientCall;
190 call->context.set_deadline(deadline);
191 for (const auto& data : metadata) {
192 call->context.AddMetadata(data.first, data.second);
194 call->saved_request_id = saved_request_id;
195 call->rpc_method = "EmptyCall";
196 call->empty_response_reader = stub_->PrepareAsyncEmptyCall(
197 &call->context, Empty::default_instance(), &cq_);
198 call->empty_response_reader->StartCall();
199 call->empty_response_reader->Finish(&call->empty_response, &call->status,
203 void AsyncCompleteRpc() {
206 while (cq_.Next(&got_tag, &ok)) {
207 AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
210 std::lock_guard<std::mutex> lk(mu);
211 auto server_initial_metadata = call->context.GetServerInitialMetadata();
212 auto metadata_hostname =
213 call->context.GetServerInitialMetadata().find("hostname");
214 std::string hostname =
215 metadata_hostname != call->context.GetServerInitialMetadata().end()
216 ? std::string(metadata_hostname->second.data(),
217 metadata_hostname->second.length())
218 : call->simple_response.hostname();
219 for (auto watcher : watchers) {
220 watcher->RpcCompleted(call->saved_request_id, call->rpc_method,
225 if (!call->status.ok()) {
226 if (FLAGS_print_response || FLAGS_fail_on_failed_rpc) {
227 std::cout << "RPC failed: " << call->status.error_code() << ": "
228 << call->status.error_message() << std::endl;
230 if (FLAGS_fail_on_failed_rpc && one_rpc_succeeded.load()) {
234 if (FLAGS_print_response) {
235 auto metadata_hostname =
236 call->context.GetServerInitialMetadata().find("hostname");
237 std::string hostname =
239 call->context.GetServerInitialMetadata().end()
240 ? std::string(metadata_hostname->second.data(),
241 metadata_hostname->second.length())
242 : call->simple_response.hostname();
243 std::cout << "Greeting: Hello world, this is " << hostname
244 << ", from " << call->context.peer() << std::endl;
246 one_rpc_succeeded = true;
254 struct AsyncClientCall {
255 Empty empty_response;
256 SimpleResponse simple_response;
257 ClientContext context;
259 int saved_request_id;
260 std::string rpc_method;
261 std::unique_ptr<ClientAsyncResponseReader<Empty>> empty_response_reader;
262 std::unique_ptr<ClientAsyncResponseReader<SimpleResponse>>
263 simple_response_reader;
266 std::unique_ptr<TestService::Stub> stub_;
270 class LoadBalancerStatsServiceImpl : public LoadBalancerStatsService::Service {
272 Status GetClientStats(ServerContext* context,
273 const LoadBalancerStatsRequest* request,
274 LoadBalancerStatsResponse* response) {
277 XdsStatsWatcher* watcher;
279 std::lock_guard<std::mutex> lk(mu);
280 start_id = global_request_id + 1;
281 end_id = start_id + request->num_rpcs();
282 watcher = new XdsStatsWatcher(start_id, end_id);
283 watchers.insert(watcher);
285 watcher->WaitForRpcStatsResponse(response, request->timeout_sec());
287 std::lock_guard<std::mutex> lk(mu);
288 watchers.erase(watcher);
295 void RunTestLoop(std::chrono::duration<double> duration_per_query) {
296 std::vector<absl::string_view> rpc_methods =
297 absl::StrSplit(FLAGS_rpc, ',', absl::SkipEmpty());
298 // Store Metadata like
299 // "EmptyCall:key1:value1,UnaryCall:key1:value1,UnaryCall:key2:value2" into a
300 // map where the key is the RPC method and value is a vector of key:value
301 // pairs. {EmptyCall, [{key1,value1}],
302 // UnaryCall, [{key1,value1}, {key2,value2}]}
303 std::vector<absl::string_view> rpc_metadata =
304 absl::StrSplit(FLAGS_metadata, ',', absl::SkipEmpty());
305 std::map<std::string, std::vector<std::pair<std::string, std::string>>>
307 for (auto& data : rpc_metadata) {
308 std::vector<absl::string_view> metadata =
309 absl::StrSplit(data, ':', absl::SkipEmpty());
310 GPR_ASSERT(metadata.size() == 3);
311 metadata_map[std::string(metadata[0])].push_back(
312 {std::string(metadata[1]), std::string(metadata[2])});
315 grpc::CreateChannel(FLAGS_server, grpc::InsecureChannelCredentials()));
316 std::chrono::time_point<std::chrono::system_clock> start =
317 std::chrono::system_clock::now();
318 std::chrono::duration<double> elapsed;
320 std::thread thread = std::thread(&TestClient::AsyncCompleteRpc, &client);
323 for (const absl::string_view& rpc_method : rpc_methods) {
324 elapsed = std::chrono::system_clock::now() - start;
325 if (elapsed > duration_per_query) {
326 start = std::chrono::system_clock::now();
327 auto metadata_iter = metadata_map.find(std::string(rpc_method));
328 if (rpc_method == "EmptyCall") {
329 client.AsyncEmptyCall(
330 metadata_iter != metadata_map.end()
331 ? metadata_iter->second
332 : std::vector<std::pair<std::string, std::string>>());
334 client.AsyncUnaryCall(
335 metadata_iter != metadata_map.end()
336 ? metadata_iter->second
337 : std::vector<std::pair<std::string, std::string>>());
345 void RunServer(const int port) {
346 GPR_ASSERT(port != 0);
347 std::ostringstream server_address;
348 server_address << "0.0.0.0:" << port;
350 LoadBalancerStatsServiceImpl service;
352 ServerBuilder builder;
353 builder.RegisterService(&service);
354 builder.AddListeningPort(server_address.str(),
355 grpc::InsecureServerCredentials());
356 std::unique_ptr<Server> server(builder.BuildAndStart());
357 gpr_log(GPR_INFO, "Stats server listening on %s",
358 server_address.str().c_str());
363 int main(int argc, char** argv) {
364 grpc::testing::TestEnvironment env(argc, argv);
365 grpc::testing::InitTest(&argc, &argv, true);
367 std::chrono::duration<double> duration_per_query =
368 std::chrono::nanoseconds(std::chrono::seconds(1)) / FLAGS_qps;
370 std::vector<std::thread> test_threads;
372 test_threads.reserve(FLAGS_num_channels);
373 for (int i = 0; i < FLAGS_num_channels; i++) {
374 test_threads.emplace_back(std::thread(&RunTestLoop, duration_per_query));
377 RunServer(FLAGS_stats_port);
379 for (auto it = test_threads.begin(); it != test_threads.end(); it++) {