Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / tools / run_tests / xds_k8s_test_driver / framework / xds_url_map_test_resources.py
1 # Copyright 2021 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """A test framework built for urlMap related xDS test cases."""
15
16 import functools
17 import inspect
18 import time
19 from typing import Any, Iterable, List, Mapping, Tuple
20
21 from absl import flags
22 from absl import logging
23
24 from framework import xds_flags
25 from framework import xds_k8s_flags
26 import framework.helpers.rand
27 from framework.infrastructure import gcp
28 from framework.infrastructure import k8s
29 from framework.infrastructure import traffic_director
30 from framework.test_app import client_app
31 from framework.test_app import server_app
32
33 flags.adopt_module_key_flags(xds_flags)
34 flags.adopt_module_key_flags(xds_k8s_flags)
35
36 STRATEGY = flags.DEFINE_enum('strategy',
37                              default='reuse',
38                              enum_values=['create', 'keep', 'reuse'],
39                              help='Strategy of GCP resources management')
40
41 # Type alias
42 UrlMapType = Any
43 HostRule = Any
44 PathMatcher = Any
45
46 _BackendHTTP2 = gcp.compute.ComputeV1.BackendServiceProtocol.HTTP2
47 _COMPUTE_V1_URL_PREFIX = 'https://www.googleapis.com/compute/v1'
48
49
50 class _UrlMapChangeAggregator:
51     """Where all the urlMap change happens."""
52
53     def __init__(self, url_map_name: str):
54         self._map = {
55             "name": url_map_name,
56             "defaultService": GcpResourceManager().default_backend_service(),
57             "hostRules": [],
58             "pathMatchers": [],
59         }
60
61     def get_map(self) -> UrlMapType:
62         return self._map
63
64     def apply_change(self, test_case: 'XdsUrlMapTestCase') -> None:
65         logging.info('Apply urlMap change for test case: %s.%s',
66                      test_case.short_module_name, test_case.__name__)
67         url_map_parts = test_case.url_map_change(
68             *self._get_test_case_url_map(test_case))
69         self._set_test_case_url_map(*url_map_parts)
70
71     def _get_test_case_url_map(
72             self,
73             test_case: 'XdsUrlMapTestCase') -> Tuple[HostRule, PathMatcher]:
74         host_rule = {
75             "hosts": [test_case.hostname()],
76             "pathMatcher": test_case.path_matcher_name(),
77         }
78         path_matcher = {
79             "name": test_case.path_matcher_name(),
80             "defaultService": GcpResourceManager().default_backend_service(),
81         }
82         return host_rule, path_matcher
83
84     def _set_test_case_url_map(self, host_rule: HostRule,
85                                path_matcher: PathMatcher) -> None:
86         self._map["hostRules"].append(host_rule)
87         self._map["pathMatchers"].append(path_matcher)
88
89
90 def _package_flags() -> Mapping[str, Any]:
91     """Automatically parse Abseil flags into a dictionary.
92
93     Abseil flag is only available after the Abseil app initialization. If we use
94     __new__ in our metaclass, the flag value parse will happen during the
95     initialization of modules, hence will fail. That's why we are using __call__
96     to inject metaclass magics, and the flag parsing will be delayed until the
97     class is about to be instantiated.
98     """
99     res = {}
100     for flag_module in [xds_flags, xds_k8s_flags]:
101         for key, value in inspect.getmembers(flag_module):
102             if isinstance(value, flags.FlagHolder):
103                 res[key.lower()] = value.value
104     res['strategy'] = STRATEGY.value
105     return res
106
107
108 class _MetaSingletonAndAbslFlags(type):
109     """Ensures singleton and injects flag values."""
110
111     # Allow different subclasses to create different singletons.
112     _instances = {}
113     # But we only parse Abseil flags once.
114     _flags = None
115
116     def __call__(cls, *args, **kwargs):
117         if cls not in cls._instances:
118             if cls._flags is None:
119                 cls._flags = _package_flags()
120             obj = super().__call__(cls._flags, *args, **kwargs)
121             cls._instances[cls] = obj
122             return obj
123         return cls._instances[cls]
124
125
126 class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
127     """Manages the lifecycle of GCP resources.
128
129     The GCP resources including:
130         - 3 K8s deployment (client, default backends, alternative backends)
131         - Full set of the Traffic Director stuff
132         - Merged gigantic urlMap from all imported test cases
133
134     All resources are intended to be used across test cases and multiple runs
135     (except the client K8s deployment).
136     """
137
138     # This class dynamically set, so disable "no-member" check.
139     # pylint: disable=no-member
140
141     def __init__(self, absl_flags: Mapping[str, Any] = None):
142         if absl_flags is not None:
143             for key in absl_flags:
144                 setattr(self, key, absl_flags[key])
145         # Pick a client_namespace_suffix if not set
146         if self.resource_suffix is None:
147             self.resource_suffix = ""
148         else:
149             raise NotImplementedError(
150                 'Predefined resource_suffix is not supported for UrlMap tests')
151         logging.info('GcpResourceManager: resource prefix=%s, suffix=%s',
152                      self.resource_prefix, self.resource_suffix)
153         # API managers
154         self.k8s_api_manager = k8s.KubernetesApiManager(self.kube_context)
155         self.gcp_api_manager = gcp.api.GcpApiManager()
156         self.td = traffic_director.TrafficDirectorManager(
157             self.gcp_api_manager,
158             self.project,
159             resource_prefix=self.resource_prefix,
160             resource_suffix=(self.resource_suffix or ""),
161             network=self.network,
162         )
163         # Kubernetes namespace
164         self.k8s_namespace = k8s.KubernetesNamespace(self.k8s_api_manager,
165                                                      self.resource_prefix)
166         # Kubernetes Test Servers
167         self.test_server_runner = server_app.KubernetesServerRunner(
168             self.k8s_namespace,
169             deployment_name=self.server_name,
170             image_name=self.server_image,
171             gcp_project=self.project,
172             gcp_api_manager=self.gcp_api_manager,
173             gcp_service_account=self.gcp_service_account,
174             td_bootstrap_image=self.td_bootstrap_image,
175             xds_server_uri=self.xds_server_uri,
176             network=self.network,
177             enable_workload_identity=self.enable_workload_identity)
178         self.test_server_alternative_runner = server_app.KubernetesServerRunner(
179             self.k8s_namespace,
180             deployment_name=self.server_name + '-alternative',
181             image_name=self.server_image,
182             gcp_project=self.project,
183             gcp_api_manager=self.gcp_api_manager,
184             gcp_service_account=self.gcp_service_account,
185             td_bootstrap_image=self.td_bootstrap_image,
186             xds_server_uri=self.xds_server_uri,
187             network=self.network,
188             enable_workload_identity=self.enable_workload_identity,
189             reuse_namespace=True)
190         self.test_server_affinity_runner = server_app.KubernetesServerRunner(
191             self.k8s_namespace,
192             deployment_name=self.server_name + '-affinity',
193             image_name=self.server_image,
194             gcp_project=self.project,
195             gcp_api_manager=self.gcp_api_manager,
196             gcp_service_account=self.gcp_service_account,
197             td_bootstrap_image=self.td_bootstrap_image,
198             xds_server_uri=self.xds_server_uri,
199             network=self.network,
200             enable_workload_identity=self.enable_workload_identity,
201             reuse_namespace=True)
202         logging.info('Strategy of GCP resources management: %s', self.strategy)
203
204     def create_test_client_runner(self):
205         if self.resource_suffix:
206             client_namespace_suffix = self.resource_suffix
207         else:
208             client_namespace_suffix = framework.helpers.rand.random_resource_suffix(
209             )
210         logging.info('GcpResourceManager: client_namespace_suffix=%s',
211                      client_namespace_suffix)
212         # Kubernetes Test Client
213         return client_app.KubernetesClientRunner(
214             k8s.KubernetesNamespace(
215                 self.k8s_api_manager,
216                 client_app.KubernetesClientRunner.make_namespace_name(
217                     self.resource_prefix, client_namespace_suffix)),
218             deployment_name=self.client_name,
219             image_name=self.client_image,
220             gcp_project=self.project,
221             gcp_api_manager=self.gcp_api_manager,
222             gcp_service_account=self.gcp_service_account,
223             td_bootstrap_image=self.td_bootstrap_image,
224             xds_server_uri=self.xds_server_uri,
225             network=self.network,
226             debug_use_port_forwarding=self.debug_use_port_forwarding,
227             enable_workload_identity=self.enable_workload_identity,
228             stats_port=self.client_port)
229
230     def _pre_cleanup(self):
231         # Cleanup existing debris
232         logging.info('GcpResourceManager: pre clean-up')
233         self.td.cleanup(force=True)
234         self.test_server_runner.delete_namespace()
235
236     def setup(self, test_case_classes: Iterable['XdsUrlMapTestCase']) -> None:
237         if self.strategy not in ['create', 'keep']:
238             logging.info('GcpResourceManager: skipping setup for strategy [%s]',
239                          self.strategy)
240             return
241         # Clean up debris from previous runs
242         self._pre_cleanup()
243         # Start creating GCP resources
244         logging.info('GcpResourceManager: start setup')
245         # Firewall
246         if self.ensure_firewall:
247             self.td.create_firewall_rule(
248                 allowed_ports=self.firewall_allowed_ports)
249         # Health Checks
250         self.td.create_health_check()
251         # Backend Services
252         self.td.create_backend_service()
253         self.td.create_alternative_backend_service()
254         self.td.create_affinity_backend_service()
255         # Construct UrlMap from test classes
256         aggregator = _UrlMapChangeAggregator(
257             url_map_name=self.td.make_resource_name(self.td.URL_MAP_NAME))
258         for test_case_class in test_case_classes:
259             aggregator.apply_change(test_case_class)
260         final_url_map = aggregator.get_map()
261         # UrlMap
262         self.td.create_url_map_with_content(final_url_map)
263         # Target Proxy
264         self.td.create_target_proxy()
265         # Forwarding Rule
266         self.td.create_forwarding_rule(self.server_xds_port)
267         # Kubernetes Test Server
268         self.test_server_runner.run(
269             test_port=self.server_port,
270             maintenance_port=self.server_maintenance_port)
271         # Kubernetes Test Server Alternative
272         self.test_server_alternative_runner.run(
273             test_port=self.server_port,
274             maintenance_port=self.server_maintenance_port)
275         # Kubernetes Test Server Affinity. 3 endpoints to test that only the
276         # picked sub-channel is connected.
277         self.test_server_affinity_runner.run(
278             test_port=self.server_port,
279             maintenance_port=self.server_maintenance_port,
280             replica_count=3)
281         # Add backend to default backend service
282         neg_name, neg_zones = self.k8s_namespace.get_service_neg(
283             self.test_server_runner.service_name, self.server_port)
284         self.td.backend_service_add_neg_backends(neg_name, neg_zones)
285         # Add backend to alternative backend service
286         neg_name_alt, neg_zones_alt = self.k8s_namespace.get_service_neg(
287             self.test_server_alternative_runner.service_name, self.server_port)
288         self.td.alternative_backend_service_add_neg_backends(
289             neg_name_alt, neg_zones_alt)
290         # Add backend to affinity backend service
291         neg_name_affinity, neg_zones_affinity = self.k8s_namespace.get_service_neg(
292             self.test_server_affinity_runner.service_name, self.server_port)
293         self.td.affinity_backend_service_add_neg_backends(
294             neg_name_affinity, neg_zones_affinity)
295         # Wait for healthy backends
296         self.td.wait_for_backends_healthy_status()
297         self.td.wait_for_alternative_backends_healthy_status()
298         self.td.wait_for_affinity_backends_healthy_status()
299
300     def cleanup(self) -> None:
301         if self.strategy not in ['create']:
302             logging.info(
303                 'GcpResourceManager: skipping tear down for strategy [%s]',
304                 self.strategy)
305             return
306         logging.info('GcpResourceManager: start tear down')
307         if hasattr(self, 'td'):
308             self.td.cleanup(force=True)
309         if hasattr(self, 'test_server_runner'):
310             self.test_server_runner.cleanup(force=True)
311         if hasattr(self, 'test_server_alternative_runner'):
312             self.test_server_alternative_runner.cleanup(force=True,
313                                                         force_namespace=True)
314         if hasattr(self, 'test_server_affinity_runner'):
315             self.test_server_affinity_runner.cleanup(force=True,
316                                                      force_namespace=True)
317
318     @functools.lru_cache(None)
319     def default_backend_service(self) -> str:
320         """Returns default backend service URL."""
321         self.td.load_backend_service()
322         return self.td.backend_service.url
323
324     @functools.lru_cache(None)
325     def alternative_backend_service(self) -> str:
326         """Returns alternative backend service URL."""
327         self.td.load_alternative_backend_service()
328         return self.td.alternative_backend_service.url
329
330     @functools.lru_cache(None)
331     def affinity_backend_service(self) -> str:
332         """Returns affinity backend service URL."""
333         self.td.load_affinity_backend_service()
334         return self.td.affinity_backend_service.url