Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / tools / run_tests / xds_k8s_test_driver / tests / url_map / timeout_test.py
1 # Copyright 2021 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import logging
15 import time
16 from typing import Tuple
17 import unittest
18
19 from absl import flags
20 from absl.testing import absltest
21 import grpc
22
23 from framework import xds_k8s_flags
24 from framework import xds_url_map_testcase
25 from framework.test_app import client_app
26
27 # Type aliases
28 HostRule = xds_url_map_testcase.HostRule
29 PathMatcher = xds_url_map_testcase.PathMatcher
30 GcpResourceManager = xds_url_map_testcase.GcpResourceManager
31 DumpedXdsConfig = xds_url_map_testcase.DumpedXdsConfig
32 RpcTypeUnaryCall = xds_url_map_testcase.RpcTypeUnaryCall
33 RpcTypeEmptyCall = xds_url_map_testcase.RpcTypeEmptyCall
34 ExpectedResult = xds_url_map_testcase.ExpectedResult
35 XdsTestClient = client_app.XdsTestClient
36 XdsUrlMapTestCase = xds_url_map_testcase.XdsUrlMapTestCase
37
38 logger = logging.getLogger(__name__)
39 flags.adopt_module_key_flags(xds_url_map_testcase)
40
41 # The first batch of RPCs don't count towards the result of test case. They are
42 # meant to prove the communication between driver and client is fine.
43 _NUM_RPCS = 25
44 _LENGTH_OF_RPC_SENDING_SEC = 10
45 _ERROR_TOLERANCE = 0.1
46
47
48 class _BaseXdsTimeOutTestCase(XdsUrlMapTestCase):
49
50     @staticmethod
51     def url_map_change(
52             host_rule: HostRule,
53             path_matcher: PathMatcher) -> Tuple[HostRule, PathMatcher]:
54         path_matcher['routeRules'] = [{
55             'priority': 0,
56             'matchRules': [{
57                 'fullPathMatch': '/grpc.testing.TestService/UnaryCall'
58             }],
59             'service': GcpResourceManager().default_backend_service(),
60             'routeAction': {
61                 'maxStreamDuration': {
62                     'seconds': 3,
63                 },
64             },
65         }]
66         return host_rule, path_matcher
67
68     def xds_config_validate(self, xds_config: DumpedXdsConfig):
69         self.assertNumEndpoints(xds_config, 1)
70         self.assertEqual(
71             xds_config.rds['virtualHosts'][0]['routes'][0]['route']
72             ['maxStreamDuration']['maxStreamDuration'], '3s')
73         self.assertEqual(
74             xds_config.rds['virtualHosts'][0]['routes'][0]['route']
75             ['maxStreamDuration']['grpcTimeoutHeaderMax'], '3s')
76
77     def rpc_distribution_validate(self, unused_test_client):
78         raise NotImplementedError()
79
80
81 class TestTimeoutInRouteRule(_BaseXdsTimeOutTestCase):
82
83     @staticmethod
84     def supported_servers() -> Tuple[str]:
85         # TODO(lidiz) either add support for rpc-behavior to other languages, or we
86         # should always use Java server as backend.
87         return 'java',
88
89     def rpc_distribution_validate(self, test_client: XdsTestClient):
90         rpc_distribution = self.configure_and_send(
91             test_client,
92             rpc_types=[RpcTypeUnaryCall, RpcTypeEmptyCall],
93             # UnaryCall and EmptyCall both sleep-4.
94             # UnaryCall timeouts, EmptyCall succeeds.
95             metadata=(
96                 (RpcTypeUnaryCall, 'rpc-behavior', 'sleep-4'),
97                 (RpcTypeEmptyCall, 'rpc-behavior', 'sleep-4'),
98             ),
99             num_rpcs=_NUM_RPCS)
100         self.assertRpcStatusCode(
101             test_client,
102             expected=(
103                 ExpectedResult(rpc_type=RpcTypeUnaryCall,
104                                status_code=grpc.StatusCode.DEADLINE_EXCEEDED),
105                 ExpectedResult(rpc_type=RpcTypeEmptyCall,
106                                status_code=grpc.StatusCode.OK),
107             ),
108             length=_LENGTH_OF_RPC_SENDING_SEC,
109             tolerance=_ERROR_TOLERANCE)
110
111
112 class TestTimeoutInApplication(_BaseXdsTimeOutTestCase):
113
114     @staticmethod
115     def supported_servers() -> Tuple[str]:
116         return 'java',
117
118     def rpc_distribution_validate(self, test_client: XdsTestClient):
119         rpc_distribution = self.configure_and_send(
120             test_client,
121             rpc_types=[RpcTypeUnaryCall],
122             # UnaryCall only with sleep-2; timeout=1s; calls timeout.
123             metadata=((RpcTypeUnaryCall, 'rpc-behavior', 'sleep-2'),),
124             app_timeout=1,
125             num_rpcs=_NUM_RPCS)
126         self.assertRpcStatusCode(
127             test_client,
128             expected=(ExpectedResult(
129                 rpc_type=RpcTypeUnaryCall,
130                 status_code=grpc.StatusCode.DEADLINE_EXCEEDED),),
131             length=_LENGTH_OF_RPC_SENDING_SEC,
132             tolerance=_ERROR_TOLERANCE)
133
134
135 class TestTimeoutNotExceeded(_BaseXdsTimeOutTestCase):
136
137     def rpc_distribution_validate(self, test_client: XdsTestClient):
138         rpc_distribution = self.configure_and_send(
139             test_client,
140             # UnaryCall only with no sleep; calls succeed.
141             rpc_types=[RpcTypeUnaryCall],
142             num_rpcs=_NUM_RPCS)
143         self.assertRpcStatusCode(test_client,
144                                  expected=(ExpectedResult(
145                                      rpc_type=RpcTypeUnaryCall,
146                                      status_code=grpc.StatusCode.OK),),
147                                  length=_LENGTH_OF_RPC_SENDING_SEC,
148                                  tolerance=_ERROR_TOLERANCE)
149
150
151 def load_tests(loader: absltest.TestLoader, unused_tests, unused_pattern):
152     suite = unittest.TestSuite()
153     test_cases = [
154         TestTimeoutInRouteRule, TestTimeoutInApplication, TestTimeoutNotExceeded
155     ]
156     for test_class in test_cases:
157         tests = loader.loadTestsFromTestCase(test_class)
158         suite.addTests(tests)
159     return suite
160
161
162 if __name__ == '__main__':
163     absltest.main()