1 # Copyright 2021 The gRPC Authors
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from typing import Tuple
19 from absl import flags
20 from absl.testing import absltest
23 from framework import xds_k8s_flags
24 from framework import xds_url_map_testcase
25 from framework.test_app import client_app
28 HostRule = xds_url_map_testcase.HostRule
29 PathMatcher = xds_url_map_testcase.PathMatcher
30 GcpResourceManager = xds_url_map_testcase.GcpResourceManager
31 DumpedXdsConfig = xds_url_map_testcase.DumpedXdsConfig
32 RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall
33 RpcTypeEmptyCall = xds_url_map_testcase.RpcTypeEmptyCall
34 ExpectedResult = xds_url_map_testcase.ExpectedResult
35 XdsTestClient = client_app.XdsTestClient
36 XdsUrlMapTestCase = xds_url_map_testcase.XdsUrlMapTestCase
38 logger = logging.getLogger(__name__)
39 flags.adopt_module_key_flags(xds_url_map_testcase)
41 # The first batch of RPCs don't count towards the result of test case. They are
42 # meant to prove the communication between driver and client is fine.
44 _LENGTH_OF_RPC_SENDING_SEC = 10
45 _ERROR_TOLERANCE = 0.1
48 class _BaseXdsTimeOutTestCase(XdsUrlMapTestCase):
53 path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
54 path_matcher['routeRules'] = [{
57 'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
59 'service': GcpResourceManager().default_backend_service(),
61 'maxStreamDuration': {
66 return host_rule, path_matcher
68 def xds_config_validate(self, xds_config: DumpedXdsConfig):
69 self.assertNumEndpoints(xds_config, 1)
71 xds_config.rds['virtualHosts'][0]['routes'][0]['route']
72 ['maxStreamDuration']['maxStreamDuration'], '3s')
74 xds_config.rds['virtualHosts'][0]['routes'][0]['route']
75 ['maxStreamDuration']['grpcTimeoutHeaderMax'], '3s')
77 def rpc_distribution_validate(self, unused_test_client):
78 raise NotImplementedError()
81 class TestTimeoutInRouteRule(_BaseXdsTimeOutTestCase):
84 def supported_servers() -> Tuple[str]:
85 # TODO(lidiz) either add support for rpc-behavior to other languages, or we
86 # should always use Java server as backend.
89 def rpc_distribution_validate(self, test_client: XdsTestClient):
90 rpc_distribution = self.configure_and_send(
92 rpc_types=[RpcTypeUnaryCall, RpcTypeEmptyCall],
93 # UnaryCall and EmptyCall both sleep-4.
94 # UnaryCall timeouts, EmptyCall succeeds.
96 (RpcTypeUnaryCall, 'rpc-behavior', 'sleep-4'),
97 (RpcTypeEmptyCall, 'rpc-behavior', 'sleep-4'),
100 self.assertRpcStatusCode(
103 ExpectedResult(rpc_type=RpcTypeUnaryCall,
104 status_code=grpc.StatusCode.DEADLINE_EXCEEDED),
105 ExpectedResult(rpc_type=RpcTypeEmptyCall,
106 status_code=grpc.StatusCode.OK),
108 length=_LENGTH_OF_RPC_SENDING_SEC,
109 tolerance=_ERROR_TOLERANCE)
112 class TestTimeoutInApplication(_BaseXdsTimeOutTestCase):
115 def supported_servers() -> Tuple[str]:
118 def rpc_distribution_validate(self, test_client: XdsTestClient):
119 rpc_distribution = self.configure_and_send(
121 rpc_types=[RpcTypeUnaryCall],
122 # UnaryCall only with sleep-2; timeout=1s; calls timeout.
123 metadata=((RpcTypeUnaryCall, 'rpc-behavior', 'sleep-2'),),
126 self.assertRpcStatusCode(
128 expected=(ExpectedResult(
129 rpc_type=RpcTypeUnaryCall,
130 status_code=grpc.StatusCode.DEADLINE_EXCEEDED),),
131 length=_LENGTH_OF_RPC_SENDING_SEC,
132 tolerance=_ERROR_TOLERANCE)
135 class TestTimeoutNotExceeded(_BaseXdsTimeOutTestCase):
137 def rpc_distribution_validate(self, test_client: XdsTestClient):
138 rpc_distribution = self.configure_and_send(
140 # UnaryCall only with no sleep; calls succeed.
141 rpc_types=[RpcTypeUnaryCall],
143 self.assertRpcStatusCode(test_client,
144 expected=(ExpectedResult(
145 rpc_type=RpcTypeUnaryCall,
146 status_code=grpc.StatusCode.OK),),
147 length=_LENGTH_OF_RPC_SENDING_SEC,
148 tolerance=_ERROR_TOLERANCE)
151 def load_tests(loader: absltest.TestLoader, unused_tests, unused_pattern):
152 suite = unittest.TestSuite()
154 TestTimeoutInRouteRule, TestTimeoutInApplication, TestTimeoutNotExceeded
156 for test_class in test_cases:
157 tests = loader.loadTestsFromTestCase(test_class)
158 suite.addTests(tests)
162 if __name__ == '__main__':