1 # Copyright 2016 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Entry point for running stress tests."""
17 from concurrent import futures
21 from six.moves import queue
22 from src.proto.grpc.testing import metrics_pb2_grpc
23 from src.proto.grpc.testing import test_pb2_grpc
25 from tests.interop import methods
26 from tests.interop import resources
27 from tests.qps import histogram
28 from tests.stress import metrics_server
29 from tests.stress import test_runner
33 parser = argparse.ArgumentParser(
34 description='gRPC Python stress test client')
37 help='comma separated list of hostname:port to run servers on',
38 default='localhost:8080',
42 help='comma separated list of testcase:weighting of tests to run',
43 default='large_unary:100',
46 '--test_duration_secs',
47 help='number of seconds to run the stress test',
51 '--num_channels_per_server',
52 help='number of channels per server',
56 '--num_stubs_per_channel',
57 help='number of stubs to create per channel',
62 help='the port to listen for metrics requests on',
67 help='Whether to use our fake CA. Requires --use_tls=true',
71 '--use_tls', help='Whether to use TLS', default=False, type=bool)
73 '--server_host_override',
74 help='the server host to which to claim to connect',
76 return parser.parse_args()
79 def _test_case_from_arg(test_case_arg):
80 for test_case in methods.TestCase:
81 if test_case_arg == test_case.value:
84 raise ValueError('No test case {}!'.format(test_case_arg))
87 def _parse_weighted_test_cases(test_case_args):
88 weighted_test_cases = {}
89 for test_case_arg in test_case_args.split(','):
90 name, weight = test_case_arg.split(':', 1)
91 test_case = _test_case_from_arg(name)
92 weighted_test_cases[test_case] = int(weight)
93 return weighted_test_cases
96 def _get_channel(target, args):
99 root_certificates = resources.test_root_certificates()
101 root_certificates = None # will load default roots.
102 channel_credentials = grpc.ssl_channel_credentials(
103 root_certificates=root_certificates)
105 'grpc.ssl_target_name_override',
106 args.server_host_override,
108 channel = grpc.secure_channel(
109 target, channel_credentials, options=options)
111 channel = grpc.insecure_channel(target)
113 # waits for the channel to be ready before we start sending messages
114 grpc.channel_ready_future(channel).result()
119 test_cases = _parse_weighted_test_cases(args.test_cases)
120 test_server_targets = args.server_addresses.split(',')
121 # Propagate any client exceptions with a queue
122 exception_queue = queue.Queue()
123 stop_event = threading.Event()
124 hist = histogram.Histogram(1, 1)
127 server = grpc.server(futures.ThreadPoolExecutor(max_workers=25))
128 metrics_pb2_grpc.add_MetricsServiceServicer_to_server(
129 metrics_server.MetricsServer(hist), server)
130 server.add_insecure_port('[::]:{}'.format(args.metrics_port))
133 for test_server_target in test_server_targets:
134 for _ in range(args.num_channels_per_server):
135 channel = _get_channel(test_server_target, args)
136 for _ in range(args.num_stubs_per_channel):
137 stub = test_pb2_grpc.TestServiceStub(channel)
138 runner = test_runner.TestRunner(stub, test_cases, hist,
139 exception_queue, stop_event)
140 runners.append(runner)
142 for runner in runners:
145 timeout_secs = args.test_duration_secs
148 raise exception_queue.get(block=True, timeout=timeout_secs)
150 # No exceptions thrown, success
154 for runner in runners:
160 if __name__ == '__main__':