1 # Copyright 2021 The gRPC Authors
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """A test framework built for urlMap related xDS test cases."""
19 from typing import Any, Iterable, List, Mapping, Tuple
21 from absl import flags
22 from absl import logging
24 from framework import xds_flags
25 from framework import xds_k8s_flags
26 import framework.helpers.rand
27 from framework.infrastructure import gcp
28 from framework.infrastructure import k8s
29 from framework.infrastructure import traffic_director
30 from framework.test_app import client_app
31 from framework.test_app import server_app
33 flags.adopt_module_key_flags(xds_flags)
34 flags.adopt_module_key_flags(xds_k8s_flags)
36 STRATEGY = flags.DEFINE_enum('strategy',
38 enum_values=['create', 'keep', 'reuse'],
39 help='Strategy of GCP resources management')
46 _BackendHTTP2 = gcp.compute.ComputeV1.BackendServiceProtocol.HTTP2
47 _COMPUTE_V1_URL_PREFIX = 'https://www.googleapis.com/compute/v1'
50 class _UrlMapChangeAggregator:
51 """Where all the urlMap change happens."""
53 def __init__(self, url_map_name: str):
56 "defaultService": GcpResourceManager().default_backend_service(),
61 def get_map(self) -> UrlMapType:
64 def apply_change(self, test_case: 'XdsUrlMapTestCase') -> None:
65 logging.info('Apply urlMap change for test case: %s.%s',
66 test_case.short_module_name, test_case.__name__)
67 url_map_parts = test_case.url_map_change(
68 *self._get_test_case_url_map(test_case))
69 self._set_test_case_url_map(*url_map_parts)
71 def _get_test_case_url_map(
73 test_case: 'XdsUrlMapTestCase') -> Tuple[HostRule, PathMatcher]:
75 "hosts": [test_case.hostname()],
76 "pathMatcher": test_case.path_matcher_name(),
79 "name": test_case.path_matcher_name(),
80 "defaultService": GcpResourceManager().default_backend_service(),
82 return host_rule, path_matcher
84 def _set_test_case_url_map(self, host_rule: HostRule,
85 path_matcher: PathMatcher) -> None:
86 self._map["hostRules"].append(host_rule)
87 self._map["pathMatchers"].append(path_matcher)
90 def _package_flags() -> Mapping[str, Any]:
91 """Automatically parse Abseil flags into a dictionary.
93 Abseil flag is only available after the Abseil app initialization. If we use
94 __new__ in our metaclass, the flag value parse will happen during the
95 initialization of modules, hence will fail. That's why we are using __call__
96 to inject metaclass magics, and the flag parsing will be delayed until the
97 class is about to be instantiated.
100 for flag_module in [xds_flags, xds_k8s_flags]:
101 for key, value in inspect.getmembers(flag_module):
102 if isinstance(value, flags.FlagHolder):
103 res[key.lower()] = value.value
104 res['strategy'] = STRATEGY.value
108 class _MetaSingletonAndAbslFlags(type):
109 """Ensures singleton and injects flag values."""
111 # Allow different subclasses to create different singletons.
113 # But we only parse Abseil flags once.
116 def __call__(cls, *args, **kwargs):
117 if cls not in cls._instances:
118 if cls._flags is None:
119 cls._flags = _package_flags()
120 obj = super().__call__(cls._flags, *args, **kwargs)
121 cls._instances[cls] = obj
123 return cls._instances[cls]
126 class GcpResourceManager(metaclass=_MetaSingletonAndAbslFlags):
127 """Manages the lifecycle of GCP resources.
129 The GCP resources including:
130 - 3 K8s deployment (client, default backends, alternative backends)
131 - Full set of the Traffic Director stuff
132 - Merged gigantic urlMap from all imported test cases
134 All resources are intended to be used across test cases and multiple runs
135 (except the client K8s deployment).
138 # This class dynamically set, so disable "no-member" check.
139 # pylint: disable=no-member
141 def __init__(self, absl_flags: Mapping[str, Any] = None):
142 if absl_flags is not None:
143 for key in absl_flags:
144 setattr(self, key, absl_flags[key])
145 # Pick a client_namespace_suffix if not set
146 if self.resource_suffix is None:
147 self.resource_suffix = ""
149 raise NotImplementedError(
150 'Predefined resource_suffix is not supported for UrlMap tests')
151 logging.info('GcpResourceManager: resource prefix=%s, suffix=%s',
152 self.resource_prefix, self.resource_suffix)
154 self.k8s_api_manager = k8s.KubernetesApiManager(self.kube_context)
155 self.gcp_api_manager = gcp.api.GcpApiManager()
156 self.td = traffic_director.TrafficDirectorManager(
157 self.gcp_api_manager,
159 resource_prefix=self.resource_prefix,
160 resource_suffix=(self.resource_suffix or ""),
161 network=self.network,
163 # Kubernetes namespace
164 self.k8s_namespace = k8s.KubernetesNamespace(self.k8s_api_manager,
165 self.resource_prefix)
166 # Kubernetes Test Servers
167 self.test_server_runner = server_app.KubernetesServerRunner(
169 deployment_name=self.server_name,
170 image_name=self.server_image,
171 gcp_project=self.project,
172 gcp_api_manager=self.gcp_api_manager,
173 gcp_service_account=self.gcp_service_account,
174 td_bootstrap_image=self.td_bootstrap_image,
175 xds_server_uri=self.xds_server_uri,
176 network=self.network,
177 enable_workload_identity=self.enable_workload_identity)
178 self.test_server_alternative_runner = server_app.KubernetesServerRunner(
180 deployment_name=self.server_name + '-alternative',
181 image_name=self.server_image,
182 gcp_project=self.project,
183 gcp_api_manager=self.gcp_api_manager,
184 gcp_service_account=self.gcp_service_account,
185 td_bootstrap_image=self.td_bootstrap_image,
186 xds_server_uri=self.xds_server_uri,
187 network=self.network,
188 enable_workload_identity=self.enable_workload_identity,
189 reuse_namespace=True)
190 self.test_server_affinity_runner = server_app.KubernetesServerRunner(
192 deployment_name=self.server_name + '-affinity',
193 image_name=self.server_image,
194 gcp_project=self.project,
195 gcp_api_manager=self.gcp_api_manager,
196 gcp_service_account=self.gcp_service_account,
197 td_bootstrap_image=self.td_bootstrap_image,
198 xds_server_uri=self.xds_server_uri,
199 network=self.network,
200 enable_workload_identity=self.enable_workload_identity,
201 reuse_namespace=True)
202 logging.info('Strategy of GCP resources management: %s', self.strategy)
204 def create_test_client_runner(self):
205 if self.resource_suffix:
206 client_namespace_suffix = self.resource_suffix
208 client_namespace_suffix = framework.helpers.rand.random_resource_suffix(
210 logging.info('GcpResourceManager: client_namespace_suffix=%s',
211 client_namespace_suffix)
212 # Kubernetes Test Client
213 return client_app.KubernetesClientRunner(
214 k8s.KubernetesNamespace(
215 self.k8s_api_manager,
216 client_app.KubernetesClientRunner.make_namespace_name(
217 self.resource_prefix, client_namespace_suffix)),
218 deployment_name=self.client_name,
219 image_name=self.client_image,
220 gcp_project=self.project,
221 gcp_api_manager=self.gcp_api_manager,
222 gcp_service_account=self.gcp_service_account,
223 td_bootstrap_image=self.td_bootstrap_image,
224 xds_server_uri=self.xds_server_uri,
225 network=self.network,
226 debug_use_port_forwarding=self.debug_use_port_forwarding,
227 enable_workload_identity=self.enable_workload_identity,
228 stats_port=self.client_port)
230 def _pre_cleanup(self):
231 # Cleanup existing debris
232 logging.info('GcpResourceManager: pre clean-up')
233 self.td.cleanup(force=True)
234 self.test_server_runner.delete_namespace()
236 def setup(self, test_case_classes: Iterable['XdsUrlMapTestCase']) -> None:
237 if self.strategy not in ['create', 'keep']:
238 logging.info('GcpResourceManager: skipping setup for strategy [%s]',
241 # Clean up debris from previous runs
243 # Start creating GCP resources
244 logging.info('GcpResourceManager: start setup')
246 if self.ensure_firewall:
247 self.td.create_firewall_rule(
248 allowed_ports=self.firewall_allowed_ports)
250 self.td.create_health_check()
252 self.td.create_backend_service()
253 self.td.create_alternative_backend_service()
254 self.td.create_affinity_backend_service()
255 # Construct UrlMap from test classes
256 aggregator = _UrlMapChangeAggregator(
257 url_map_name=self.td.make_resource_name(self.td.URL_MAP_NAME))
258 for test_case_class in test_case_classes:
259 aggregator.apply_change(test_case_class)
260 final_url_map = aggregator.get_map()
262 self.td.create_url_map_with_content(final_url_map)
264 self.td.create_target_proxy()
266 self.td.create_forwarding_rule(self.server_xds_port)
267 # Kubernetes Test Server
268 self.test_server_runner.run(
269 test_port=self.server_port,
270 maintenance_port=self.server_maintenance_port)
271 # Kubernetes Test Server Alternative
272 self.test_server_alternative_runner.run(
273 test_port=self.server_port,
274 maintenance_port=self.server_maintenance_port)
275 # Kubernetes Test Server Affinity. 3 endpoints to test that only the
276 # picked sub-channel is connected.
277 self.test_server_affinity_runner.run(
278 test_port=self.server_port,
279 maintenance_port=self.server_maintenance_port,
281 # Add backend to default backend service
282 neg_name, neg_zones = self.k8s_namespace.get_service_neg(
283 self.test_server_runner.service_name, self.server_port)
284 self.td.backend_service_add_neg_backends(neg_name, neg_zones)
285 # Add backend to alternative backend service
286 neg_name_alt, neg_zones_alt = self.k8s_namespace.get_service_neg(
287 self.test_server_alternative_runner.service_name, self.server_port)
288 self.td.alternative_backend_service_add_neg_backends(
289 neg_name_alt, neg_zones_alt)
290 # Add backend to affinity backend service
291 neg_name_affinity, neg_zones_affinity = self.k8s_namespace.get_service_neg(
292 self.test_server_affinity_runner.service_name, self.server_port)
293 self.td.affinity_backend_service_add_neg_backends(
294 neg_name_affinity, neg_zones_affinity)
295 # Wait for healthy backends
296 self.td.wait_for_backends_healthy_status()
297 self.td.wait_for_alternative_backends_healthy_status()
298 self.td.wait_for_affinity_backends_healthy_status()
300 def cleanup(self) -> None:
301 if self.strategy not in ['create']:
303 'GcpResourceManager: skipping tear down for strategy [%s]',
306 logging.info('GcpResourceManager: start tear down')
307 if hasattr(self, 'td'):
308 self.td.cleanup(force=True)
309 if hasattr(self, 'test_server_runner'):
310 self.test_server_runner.cleanup(force=True)
311 if hasattr(self, 'test_server_alternative_runner'):
312 self.test_server_alternative_runner.cleanup(force=True,
313 force_namespace=True)
314 if hasattr(self, 'test_server_affinity_runner'):
315 self.test_server_affinity_runner.cleanup(force=True,
316 force_namespace=True)
318 @functools.lru_cache(None)
319 def default_backend_service(self) -> str:
320 """Returns default backend service URL."""
321 self.td.load_backend_service()
322 return self.td.backend_service.url
324 @functools.lru_cache(None)
325 def alternative_backend_service(self) -> str:
326 """Returns alternative backend service URL."""
327 self.td.load_alternative_backend_service()
328 return self.td.alternative_backend_service.url
330 @functools.lru_cache(None)
331 def affinity_backend_service(self) -> str:
332 """Returns affinity backend service URL."""
333 self.td.load_affinity_backend_service()
334 return self.td.affinity_backend_service.url