Imported Upstream version 1.36.0
[platform/upstream/grpc.git] / test / cpp / qps / driver.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <cinttypes>
20 #include <deque>
21 #include <list>
22 #include <thread>
23 #include <unordered_map>
24 #include <vector>
25
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/string_util.h>
29 #include <grpcpp/channel.h>
30 #include <grpcpp/client_context.h>
31 #include <grpcpp/create_channel.h>
32
33 #include "src/core/lib/gpr/env.h"
34 #include "src/core/lib/gprpp/host_port.h"
35 #include "src/core/lib/profiling/timers.h"
36 #include "src/proto/grpc/testing/worker_service.grpc.pb.h"
37 #include "test/core/util/port.h"
38 #include "test/core/util/test_config.h"
39 #include "test/cpp/qps/client.h"
40 #include "test/cpp/qps/driver.h"
41 #include "test/cpp/qps/histogram.h"
42 #include "test/cpp/qps/qps_worker.h"
43 #include "test/cpp/qps/stats.h"
44 #include "test/cpp/util/test_credentials_provider.h"
45
46 using std::deque;
47 using std::list;
48 using std::unique_ptr;
49 using std::vector;
50
51 namespace grpc {
52 namespace testing {
53 static std::string get_host(const std::string& worker) {
54   absl::string_view host;
55   absl::string_view port;
56   grpc_core::SplitHostPort(worker.c_str(), &host, &port);
57   return std::string(host.data(), host.size());
58 }
59
60 static deque<string> get_workers(const string& env_name) {
61   deque<string> out;
62   char* env = gpr_getenv(env_name.c_str());
63   if (!env) {
64     env = gpr_strdup("");
65   }
66   char* p = env;
67   if (strlen(env) != 0) {
68     for (;;) {
69       char* comma = strchr(p, ',');
70       if (comma) {
71         out.emplace_back(p, comma);
72         p = comma + 1;
73       } else {
74         out.emplace_back(p);
75         break;
76       }
77     }
78   }
79   if (out.empty()) {
80     gpr_log(GPR_ERROR,
81             "Environment variable \"%s\" does not contain a list of QPS "
82             "workers to use. Set it to a comma-separated list of "
83             "hostname:port pairs, starting with hosts that should act as "
84             "servers. E.g. export "
85             "%s=\"serverhost1:1234,clienthost1:1234,clienthost2:1234\"",
86             env_name.c_str(), env_name.c_str());
87   }
88   gpr_free(env);
89   return out;
90 }
91
92 std::string GetCredType(
93     const std::string& worker_addr,
94     const std::map<std::string, std::string>& per_worker_credential_types,
95     const std::string& credential_type) {
96   auto it = per_worker_credential_types.find(worker_addr);
97   if (it != per_worker_credential_types.end()) {
98     return it->second;
99   }
100   return credential_type;
101 }
102
103 // helpers for postprocess_scenario_result
104 static double WallTime(const ClientStats& s) { return s.time_elapsed(); }
105 static double SystemTime(const ClientStats& s) { return s.time_system(); }
106 static double UserTime(const ClientStats& s) { return s.time_user(); }
107 static double CliPollCount(const ClientStats& s) { return s.cq_poll_count(); }
108 static double SvrPollCount(const ServerStats& s) { return s.cq_poll_count(); }
109 static double ServerWallTime(const ServerStats& s) { return s.time_elapsed(); }
110 static double ServerSystemTime(const ServerStats& s) { return s.time_system(); }
111 static double ServerUserTime(const ServerStats& s) { return s.time_user(); }
112 static double ServerTotalCpuTime(const ServerStats& s) {
113   return s.total_cpu_time();
114 }
115 static double ServerIdleCpuTime(const ServerStats& s) {
116   return s.idle_cpu_time();
117 }
118 static int Cores(int n) { return n; }
119
120 static bool IsSuccess(const Status& s) {
121   if (s.ok()) return true;
122   // Since we shutdown servers and clients at the same time, they both can
123   // observe cancellation.  Thus, we consider CANCELLED as good status.
124   if (static_cast<StatusCode>(s.error_code()) == StatusCode::CANCELLED) {
125     return true;
126   }
127   // Since we shutdown servers and clients at the same time, server can close
128   // the socket before the client attempts to do that, and vice versa.  Thus
129   // receiving a "Socket closed" error is fine.
130   if (s.error_message() == "Socket closed") return true;
131   return false;
132 }
133
134 // Postprocess ScenarioResult and populate result summary.
135 static void postprocess_scenario_result(ScenarioResult* result) {
136   // Get latencies from ScenarioResult latencies histogram and populate to
137   // result summary.
138   Histogram histogram;
139   histogram.MergeProto(result->latencies());
140   result->mutable_summary()->set_latency_50(histogram.Percentile(50));
141   result->mutable_summary()->set_latency_90(histogram.Percentile(90));
142   result->mutable_summary()->set_latency_95(histogram.Percentile(95));
143   result->mutable_summary()->set_latency_99(histogram.Percentile(99));
144   result->mutable_summary()->set_latency_999(histogram.Percentile(99.9));
145
146   // Calculate qps and cpu load for each client and then aggregate results for
147   // all clients
148   double qps = 0;
149   double client_system_cpu_load = 0, client_user_cpu_load = 0;
150   for (int i = 0; i < result->client_stats_size(); i++) {
151     auto client_stat = result->client_stats(i);
152     qps += client_stat.latencies().count() / client_stat.time_elapsed();
153     client_system_cpu_load +=
154         client_stat.time_system() / client_stat.time_elapsed();
155     client_user_cpu_load +=
156         client_stat.time_user() / client_stat.time_elapsed();
157   }
158   // Calculate cpu load for each server and then aggregate results for all
159   // servers
160   double server_system_cpu_load = 0, server_user_cpu_load = 0;
161   for (int i = 0; i < result->server_stats_size(); i++) {
162     auto server_stat = result->server_stats(i);
163     server_system_cpu_load +=
164         server_stat.time_system() / server_stat.time_elapsed();
165     server_user_cpu_load +=
166         server_stat.time_user() / server_stat.time_elapsed();
167   }
168   result->mutable_summary()->set_qps(qps);
169   // Populate the percentage of cpu load to result summary.
170   result->mutable_summary()->set_server_system_time(100 *
171                                                     server_system_cpu_load);
172   result->mutable_summary()->set_server_user_time(100 * server_user_cpu_load);
173   result->mutable_summary()->set_client_system_time(100 *
174                                                     client_system_cpu_load);
175   result->mutable_summary()->set_client_user_time(100 * client_user_cpu_load);
176
177   // For Non-linux platform, get_cpu_usage() is not implemented. Thus,
178   // ServerTotalCpuTime and ServerIdleCpuTime are both 0.
179   if (average(result->server_stats(), ServerTotalCpuTime) == 0) {
180     result->mutable_summary()->set_server_cpu_usage(0);
181   } else {
182     auto server_cpu_usage =
183         100 - 100 * average(result->server_stats(), ServerIdleCpuTime) /
184                   average(result->server_stats(), ServerTotalCpuTime);
185     result->mutable_summary()->set_server_cpu_usage(server_cpu_usage);
186   }
187
188   // Calculate and populate successful request per second and failed requests
189   // per seconds to result summary.
190   auto time_estimate = average(result->client_stats(), WallTime);
191   if (result->request_results_size() > 0) {
192     int64_t successes = 0;
193     int64_t failures = 0;
194     for (int i = 0; i < result->request_results_size(); i++) {
195       const RequestResultCount& rrc = result->request_results(i);
196       if (rrc.status_code() == 0) {
197         successes += rrc.count();
198       } else {
199         failures += rrc.count();
200       }
201     }
202     result->mutable_summary()->set_successful_requests_per_second(
203         successes / time_estimate);
204     result->mutable_summary()->set_failed_requests_per_second(failures /
205                                                               time_estimate);
206   }
207
208   // Fill in data for other metrics required in result summary
209   auto qps_per_server_core = qps / sum(result->server_cores(), Cores);
210   result->mutable_summary()->set_qps_per_server_core(qps_per_server_core);
211   result->mutable_summary()->set_client_polls_per_request(
212       sum(result->client_stats(), CliPollCount) / histogram.Count());
213   result->mutable_summary()->set_server_polls_per_request(
214       sum(result->server_stats(), SvrPollCount) / histogram.Count());
215
216   auto server_queries_per_cpu_sec =
217       histogram.Count() / (sum(result->server_stats(), ServerSystemTime) +
218                            sum(result->server_stats(), ServerUserTime));
219   auto client_queries_per_cpu_sec =
220       histogram.Count() / (sum(result->client_stats(), SystemTime) +
221                            sum(result->client_stats(), UserTime));
222
223   result->mutable_summary()->set_server_queries_per_cpu_sec(
224       server_queries_per_cpu_sec);
225   result->mutable_summary()->set_client_queries_per_cpu_sec(
226       client_queries_per_cpu_sec);
227 }
228
229 struct ClientData {
230   unique_ptr<WorkerService::Stub> stub;
231   unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
232 };
233
234 struct ServerData {
235   unique_ptr<WorkerService::Stub> stub;
236   unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
237 };
238
239 static void FinishClients(const std::vector<ClientData>& clients,
240                           const ClientArgs& client_mark) {
241   gpr_log(GPR_INFO, "Finishing clients");
242   for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
243     auto client = &clients[i];
244     if (!client->stream->Write(client_mark)) {
245       gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
246       GPR_ASSERT(false);
247     }
248     if (!client->stream->WritesDone()) {
249       gpr_log(GPR_ERROR, "Failed WritesDone for client %zu", i);
250       GPR_ASSERT(false);
251     }
252   }
253 }
254
255 static void ReceiveFinalStatusFromClients(
256     const std::vector<ClientData>& clients, Histogram& merged_latencies,
257     std::unordered_map<int, int64_t>& merged_statuses, ScenarioResult& result) {
258   gpr_log(GPR_INFO, "Receiving final status from clients");
259   ClientStatus client_status;
260   for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
261     auto client = &clients[i];
262     // Read the client final status
263     if (client->stream->Read(&client_status)) {
264       gpr_log(GPR_INFO, "Received final status from client %zu", i);
265       const auto& stats = client_status.stats();
266       merged_latencies.MergeProto(stats.latencies());
267       for (int i = 0; i < stats.request_results_size(); i++) {
268         merged_statuses[stats.request_results(i).status_code()] +=
269             stats.request_results(i).count();
270       }
271       result.add_client_stats()->CopyFrom(stats);
272       // That final status should be the last message on the client stream
273       GPR_ASSERT(!client->stream->Read(&client_status));
274     } else {
275       gpr_log(GPR_ERROR, "Couldn't get final status from client %zu", i);
276       GPR_ASSERT(false);
277     }
278   }
279 }
280
281 static void ShutdownClients(const std::vector<ClientData>& clients,
282                             ScenarioResult& result) {
283   gpr_log(GPR_INFO, "Shutdown clients");
284   for (size_t i = 0, i_end = clients.size(); i < i_end; i++) {
285     auto client = &clients[i];
286     Status s = client->stream->Finish();
287     // Since we shutdown servers and clients at the same time, clients can
288     // observe cancellation.  Thus, we consider both OK and CANCELLED as good
289     // status.
290     const bool success = IsSuccess(s);
291     result.add_client_success(success);
292     if (!success) {
293       gpr_log(GPR_ERROR, "Client %zu had an error %s", i,
294               s.error_message().c_str());
295       GPR_ASSERT(false);
296     }
297   }
298 }
299
300 static void FinishServers(const std::vector<ServerData>& servers,
301                           const ServerArgs& server_mark) {
302   gpr_log(GPR_INFO, "Finishing servers");
303   for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
304     auto server = &servers[i];
305     if (!server->stream->Write(server_mark)) {
306       gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
307       GPR_ASSERT(false);
308     }
309     if (!server->stream->WritesDone()) {
310       gpr_log(GPR_ERROR, "Failed WritesDone for server %zu", i);
311       GPR_ASSERT(false);
312     }
313   }
314 }
315
316 static void ReceiveFinalStatusFromServer(const std::vector<ServerData>& servers,
317                                          ScenarioResult& result) {
318   gpr_log(GPR_INFO, "Receiving final status from servers");
319   ServerStatus server_status;
320   for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
321     auto server = &servers[i];
322     // Read the server final status
323     if (server->stream->Read(&server_status)) {
324       gpr_log(GPR_INFO, "Received final status from server %zu", i);
325       result.add_server_stats()->CopyFrom(server_status.stats());
326       result.add_server_cores(server_status.cores());
327       // That final status should be the last message on the server stream
328       GPR_ASSERT(!server->stream->Read(&server_status));
329     } else {
330       gpr_log(GPR_ERROR, "Couldn't get final status from server %zu", i);
331       GPR_ASSERT(false);
332     }
333   }
334 }
335
336 static void ShutdownServers(const std::vector<ServerData>& servers,
337                             ScenarioResult& result) {
338   gpr_log(GPR_INFO, "Shutdown servers");
339   for (size_t i = 0, i_end = servers.size(); i < i_end; i++) {
340     auto server = &servers[i];
341     Status s = server->stream->Finish();
342     // Since we shutdown servers and clients at the same time, servers can
343     // observe cancellation.  Thus, we consider both OK and CANCELLED as good
344     // status.
345     const bool success = IsSuccess(s);
346     result.add_server_success(success);
347     if (!success) {
348       gpr_log(GPR_ERROR, "Server %zu had an error %s", i,
349               s.error_message().c_str());
350       GPR_ASSERT(false);
351     }
352   }
353 }
354
355 std::vector<grpc::testing::Server*>* g_inproc_servers = nullptr;
356
357 std::unique_ptr<ScenarioResult> RunScenario(
358     const ClientConfig& initial_client_config, size_t num_clients,
359     const ServerConfig& initial_server_config, size_t num_servers,
360     int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
361     const std::string& qps_server_target_override,
362     const std::string& credential_type,
363     const std::map<std::string, std::string>& per_worker_credential_types,
364     bool run_inproc, int32_t median_latency_collection_interval_millis) {
365   if (run_inproc) {
366     g_inproc_servers = new std::vector<grpc::testing::Server*>;
367   }
368   // Log everything from the driver
369   gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
370
371   // ClientContext allocations (all are destroyed at scope exit)
372   list<ClientContext> contexts;
373   auto alloc_context = [](list<ClientContext>* contexts) {
374     contexts->emplace_back();
375     auto context = &contexts->back();
376     context->set_wait_for_ready(true);
377     return context;
378   };
379
380   // To be added to the result, containing the final configuration used for
381   // client and config (including host, etc.)
382   ClientConfig result_client_config;
383
384   // Get client, server lists; ignore if inproc test
385   auto workers = (!run_inproc) ? get_workers("QPS_WORKERS") : deque<string>();
386   ClientConfig client_config = initial_client_config;
387
388   // Spawn some local workers if desired
389   vector<unique_ptr<QpsWorker>> local_workers;
390   for (int i = 0; i < abs(spawn_local_worker_count); i++) {
391     // act as if we're a new test -- gets a good rng seed
392     static bool called_init = false;
393     if (!called_init) {
394       char args_buf[100];
395       strcpy(args_buf, "some-benchmark");
396       char* args[] = {args_buf};
397       grpc_test_init(1, args);
398       called_init = true;
399     }
400
401     char addr[256];
402     // we use port # of -1 to indicate inproc
403     int driver_port = (!run_inproc) ? grpc_pick_unused_port_or_die() : -1;
404     local_workers.emplace_back(new QpsWorker(driver_port, 0, credential_type));
405     sprintf(addr, "localhost:%d", driver_port);
406     if (spawn_local_worker_count < 0) {
407       workers.push_front(addr);
408     } else {
409       workers.push_back(addr);
410     }
411   }
412   GPR_ASSERT(!workers.empty());
413
414   // if num_clients is set to <=0, do dynamic sizing: all workers
415   // except for servers are clients
416   if (num_clients <= 0) {
417     num_clients = workers.size() - num_servers;
418   }
419
420   // TODO(ctiller): support running multiple configurations, and binpack
421   // client/server pairs
422   // to available workers
423   GPR_ASSERT(workers.size() >= num_clients + num_servers);
424
425   // Trim to just what we need
426   workers.resize(num_clients + num_servers);
427
428   // Start servers
429   std::vector<ServerData> servers(num_servers);
430   std::unordered_map<string, std::deque<int>> hosts_cores;
431   ChannelArguments channel_args;
432
433   for (size_t i = 0; i < num_servers; i++) {
434     gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")",
435             workers[i].c_str(), i);
436     if (!run_inproc) {
437       servers[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
438           workers[i],
439           GetCredType(workers[i], per_worker_credential_types, credential_type),
440           nullptr /* call creds */, {} /* interceptor creators */));
441     } else {
442       servers[i].stub = WorkerService::NewStub(
443           local_workers[i]->InProcessChannel(channel_args));
444     }
445
446     const ServerConfig& server_config = initial_server_config;
447     if (server_config.core_limit() != 0) {
448       gpr_log(GPR_ERROR,
449               "server config core limit is set but ignored by driver");
450       GPR_ASSERT(false);
451     }
452
453     ServerArgs args;
454     *args.mutable_setup() = server_config;
455     servers[i].stream = servers[i].stub->RunServer(alloc_context(&contexts));
456     if (!servers[i].stream->Write(args)) {
457       gpr_log(GPR_ERROR, "Could not write args to server %zu", i);
458       GPR_ASSERT(false);
459     }
460     ServerStatus init_status;
461     if (!servers[i].stream->Read(&init_status)) {
462       gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
463       GPR_ASSERT(false);
464     }
465     if (run_inproc) {
466       std::string cli_target(INPROC_NAME_PREFIX);
467       cli_target += std::to_string(i);
468       client_config.add_server_targets(cli_target);
469     } else {
470       std::string host = get_host(workers[i]);
471       std::string cli_target =
472           grpc_core::JoinHostPort(host.c_str(), init_status.port());
473       client_config.add_server_targets(cli_target.c_str());
474     }
475   }
476   if (qps_server_target_override.length() > 0) {
477     // overriding the qps server target only makes since if there is <= 1
478     // servers
479     GPR_ASSERT(num_servers <= 1);
480     client_config.add_server_targets(qps_server_target_override);
481   }
482   client_config.set_median_latency_collection_interval_millis(
483       median_latency_collection_interval_millis);
484
485   // Targets are all set by now
486   result_client_config = client_config;
487   // Start clients
488   std::vector<ClientData> clients(num_clients);
489   size_t channels_allocated = 0;
490   for (size_t i = 0; i < num_clients; i++) {
491     const auto& worker = workers[i + num_servers];
492     gpr_log(GPR_INFO, "Starting client on %s (worker #%" PRIuPTR ")",
493             worker.c_str(), i + num_servers);
494     if (!run_inproc) {
495       clients[i].stub = WorkerService::NewStub(grpc::CreateTestChannel(
496           worker,
497           GetCredType(worker, per_worker_credential_types, credential_type),
498           nullptr /* call creds */, {} /* interceptor creators */));
499     } else {
500       clients[i].stub = WorkerService::NewStub(
501           local_workers[i + num_servers]->InProcessChannel(channel_args));
502     }
503     ClientConfig per_client_config = client_config;
504
505     if (initial_client_config.core_limit() != 0) {
506       gpr_log(GPR_ERROR, "client config core limit set but ignored");
507       GPR_ASSERT(false);
508     }
509
510     // Reduce channel count so that total channels specified is held regardless
511     // of the number of clients available
512     size_t num_channels =
513         (client_config.client_channels() - channels_allocated) /
514         (num_clients - i);
515     channels_allocated += num_channels;
516     gpr_log(GPR_DEBUG, "Client %" PRIdPTR " gets %" PRIdPTR " channels", i,
517             num_channels);
518     per_client_config.set_client_channels(num_channels);
519
520     ClientArgs args;
521     *args.mutable_setup() = per_client_config;
522     clients[i].stream = clients[i].stub->RunClient(alloc_context(&contexts));
523     if (!clients[i].stream->Write(args)) {
524       gpr_log(GPR_ERROR, "Could not write args to client %zu", i);
525       GPR_ASSERT(false);
526     }
527   }
528
529   for (size_t i = 0; i < num_clients; i++) {
530     ClientStatus init_status;
531     if (!clients[i].stream->Read(&init_status)) {
532       gpr_log(GPR_ERROR, "Client %zu did not yield initial status", i);
533       GPR_ASSERT(false);
534     }
535   }
536
537   // Send an initial mark: clients can use this to know that everything is ready
538   // to start
539   gpr_log(GPR_INFO, "Initiating");
540   ServerArgs server_mark;
541   server_mark.mutable_mark()->set_reset(true);
542   ClientArgs client_mark;
543   client_mark.mutable_mark()->set_reset(true);
544   ServerStatus server_status;
545   ClientStatus client_status;
546   for (size_t i = 0; i < num_clients; i++) {
547     auto client = &clients[i];
548     if (!client->stream->Write(client_mark)) {
549       gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
550       GPR_ASSERT(false);
551     }
552   }
553   for (size_t i = 0; i < num_clients; i++) {
554     auto client = &clients[i];
555     if (!client->stream->Read(&client_status)) {
556       gpr_log(GPR_ERROR, "Couldn't get status from client %zu", i);
557       GPR_ASSERT(false);
558     }
559   }
560
561   // Let everything warmup
562   gpr_log(GPR_INFO, "Warming up");
563   gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
564   gpr_sleep_until(
565       gpr_time_add(start, gpr_time_from_seconds(warmup_seconds, GPR_TIMESPAN)));
566
567   // Start a run
568   gpr_log(GPR_INFO, "Starting");
569   for (size_t i = 0; i < num_servers; i++) {
570     auto server = &servers[i];
571     if (!server->stream->Write(server_mark)) {
572       gpr_log(GPR_ERROR, "Couldn't write mark to server %zu", i);
573       GPR_ASSERT(false);
574     }
575   }
576   for (size_t i = 0; i < num_clients; i++) {
577     auto client = &clients[i];
578     if (!client->stream->Write(client_mark)) {
579       gpr_log(GPR_ERROR, "Couldn't write mark to client %zu", i);
580       GPR_ASSERT(false);
581     }
582   }
583   for (size_t i = 0; i < num_servers; i++) {
584     auto server = &servers[i];
585     if (!server->stream->Read(&server_status)) {
586       gpr_log(GPR_ERROR, "Couldn't get status from server %zu", i);
587       GPR_ASSERT(false);
588     }
589   }
590   for (size_t i = 0; i < num_clients; i++) {
591     auto client = &clients[i];
592     if (!client->stream->Read(&client_status)) {
593       gpr_log(GPR_ERROR, "Couldn't get status from client %zu", i);
594       GPR_ASSERT(false);
595     }
596   }
597
598   // Wait some time
599   gpr_log(GPR_INFO, "Running");
600   // Use gpr_sleep_until rather than this_thread::sleep_until to support
601   // compilers that don't work with this_thread
602   gpr_sleep_until(gpr_time_add(
603       start,
604       gpr_time_from_seconds(warmup_seconds + benchmark_seconds, GPR_TIMESPAN)));
605
606   gpr_timer_set_enabled(0);
607
608   // Finish a run
609   std::unique_ptr<ScenarioResult> result(new ScenarioResult);
610   Histogram merged_latencies;
611   std::unordered_map<int, int64_t> merged_statuses;
612
613   // For the case where clients lead the test such as UNARY and
614   // STREAMING_FROM_CLIENT, clients need to finish completely while a server
615   // is running to prevent the clients from being stuck while waiting for
616   // the result.
617   bool client_finish_first =
618       (client_config.rpc_type() != STREAMING_FROM_SERVER);
619
620   FinishClients(clients, client_mark);
621
622   if (!client_finish_first) {
623     FinishServers(servers, server_mark);
624   }
625
626   ReceiveFinalStatusFromClients(clients, merged_latencies, merged_statuses,
627                                 *result);
628   ShutdownClients(clients, *result);
629
630   if (client_finish_first) {
631     FinishServers(servers, server_mark);
632   }
633
634   ReceiveFinalStatusFromServer(servers, *result);
635   ShutdownServers(servers, *result);
636
637   if (g_inproc_servers != nullptr) {
638     delete g_inproc_servers;
639   }
640
641   merged_latencies.FillProto(result->mutable_latencies());
642   for (std::unordered_map<int, int64_t>::iterator it = merged_statuses.begin();
643        it != merged_statuses.end(); ++it) {
644     RequestResultCount* rrc = result->add_request_results();
645     rrc->set_status_code(it->first);
646     rrc->set_count(it->second);
647   }
648   postprocess_scenario_result(result.get());
649   return result;
650 }
651
652 bool RunQuit(
653     const std::string& credential_type,
654     const std::map<std::string, std::string>& per_worker_credential_types) {
655   // Get client, server lists
656   bool result = true;
657   auto workers = get_workers("QPS_WORKERS");
658   if (workers.empty()) {
659     return false;
660   }
661
662   for (size_t i = 0; i < workers.size(); i++) {
663     auto stub = WorkerService::NewStub(grpc::CreateTestChannel(
664         workers[i],
665         GetCredType(workers[i], per_worker_credential_types, credential_type),
666         nullptr /* call creds */, {} /* interceptor creators */));
667     Void dummy;
668     grpc::ClientContext ctx;
669     ctx.set_wait_for_ready(true);
670     Status s = stub->QuitWorker(&ctx, dummy, &dummy);
671     if (!s.ok()) {
672       gpr_log(GPR_ERROR, "Worker %zu could not be properly quit because %s", i,
673               s.error_message().c_str());
674       result = false;
675     }
676   }
677   return result;
678 }
679
680 }  // namespace testing
681 }  // namespace grpc