1 # Copyright 2016 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """The Python client used to test negative http2 conditions."""
20 from src.proto.grpc.testing import test_pb2_grpc
21 from src.proto.grpc.testing import messages_pb2
24 def _validate_payload_type_and_length(response, expected_type, expected_length):
25 if response.payload.type is not expected_type:
26 raise ValueError('expected payload type %s, got %s' %
27 (expected_type, type(response.payload.type)))
28 elif len(response.payload.body) != expected_length:
29 raise ValueError('expected payload body size %d, got %d' %
30 (expected_length, len(response.payload.body)))
33 def _expect_status_code(call, expected_code):
34 if call.code() != expected_code:
35 raise ValueError('expected code %s, got %s' %
36 (expected_code, call.code()))
39 def _expect_status_details(call, expected_details):
40 if call.details() != expected_details:
41 raise ValueError('expected message %s, got %s' %
42 (expected_details, call.details()))
45 def _validate_status_code_and_details(call, expected_code, expected_details):
46 _expect_status_code(call, expected_code)
47 _expect_status_details(call, expected_details)
51 _REQUEST_SIZE = 314159
52 _RESPONSE_SIZE = 271828
54 _SIMPLE_REQUEST = messages_pb2.SimpleRequest(
55 response_type=messages_pb2.COMPRESSABLE,
56 response_size=_RESPONSE_SIZE,
57 payload=messages_pb2.Payload(body=b'\x00' * _REQUEST_SIZE))
61 first_response = stub.UnaryCall(_SIMPLE_REQUEST)
62 _validate_payload_type_and_length(first_response, messages_pb2.COMPRESSABLE,
65 second_response = stub.UnaryCall(_SIMPLE_REQUEST)
66 _validate_payload_type_and_length(second_response,
67 messages_pb2.COMPRESSABLE, _RESPONSE_SIZE)
70 def _rst_after_header(stub):
71 resp_future = stub.UnaryCall.future(_SIMPLE_REQUEST)
72 _validate_status_code_and_details(resp_future, grpc.StatusCode.INTERNAL,
73 "Received RST_STREAM with error code 0")
76 def _rst_during_data(stub):
77 resp_future = stub.UnaryCall.future(_SIMPLE_REQUEST)
78 _validate_status_code_and_details(resp_future, grpc.StatusCode.INTERNAL,
79 "Received RST_STREAM with error code 0")
82 def _rst_after_data(stub):
83 resp_future = stub.UnaryCall.future(_SIMPLE_REQUEST)
84 _validate_status_code_and_details(resp_future, grpc.StatusCode.INTERNAL,
85 "Received RST_STREAM with error code 0")
89 response = stub.UnaryCall(_SIMPLE_REQUEST)
90 _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE,
94 def _max_streams(stub):
95 # send one req to ensure server sets MAX_STREAMS
96 response = stub.UnaryCall(_SIMPLE_REQUEST)
97 _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE,
100 # give the streams a workout
103 futures.append(stub.UnaryCall.future(_SIMPLE_REQUEST))
104 for future in futures:
105 _validate_payload_type_and_length(future.result(),
106 messages_pb2.COMPRESSABLE,
110 def _run_test_case(test_case, stub):
111 if test_case == 'goaway':
113 elif test_case == 'rst_after_header':
114 _rst_after_header(stub)
115 elif test_case == 'rst_during_data':
116 _rst_during_data(stub)
117 elif test_case == 'rst_after_data':
118 _rst_after_data(stub)
119 elif test_case == 'ping':
121 elif test_case == 'max_streams':
124 raise ValueError("Invalid test case: %s" % test_case)
128 parser = argparse.ArgumentParser()
129 parser.add_argument('--server_host',
130 help='the host to which to connect',
133 parser.add_argument('--server_port',
134 help='the port to which to connect',
137 parser.add_argument('--test_case',
138 help='the test case to execute',
141 return parser.parse_args()
144 def _stub(server_host, server_port):
145 target = '{}:{}'.format(server_host, server_port)
146 channel = grpc.insecure_channel(target)
147 grpc.channel_ready_future(channel).result()
148 return test_pb2_grpc.TestServiceStub(channel)
153 stub = _stub(args.server_host, args.server_port)
154 _run_test_case(args.test_case, stub)
157 if __name__ == '__main__':