Imported Upstream version 1.36.0
[platform/upstream/grpc.git] / tools / run_tests / xds_k8s_test_driver / framework / infrastructure / gcp / api.py
1 # Copyright 2020 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import abc
15 import contextlib
16 import functools
17 import logging
18 from typing import Optional, List
19
20 # Workaround: `grpc` must be imported before `google.protobuf.json_format`,
21 # to prevent "Segmentation fault". Ref https://github.com/grpc/grpc/issues/24897
22 # TODO(sergiitk): Remove after #24897 is solved
23 import grpc  # noqa pylint: disable=unused-import
24 from absl import flags
25 from google.cloud import secretmanager_v1
26 from google.longrunning import operations_pb2
27 from google.protobuf import json_format
28 from google.rpc import code_pb2
29 from googleapiclient import discovery
30 import googleapiclient.errors
31 import tenacity
32 import yaml
33
34 logger = logging.getLogger(__name__)
35 PRIVATE_API_KEY_SECRET_NAME = flags.DEFINE_string(
36     "private_api_key_secret_name",
37     default=None,
38     help="Load Private API access key from the latest version of the secret "
39     "with the given name, in the format projects/*/secrets/*")
40 V1_DISCOVERY_URI = flags.DEFINE_string("v1_discovery_uri",
41                                        default=discovery.V1_DISCOVERY_URI,
42                                        help="Override v1 Discovery URI")
43 V2_DISCOVERY_URI = flags.DEFINE_string("v2_discovery_uri",
44                                        default=discovery.V2_DISCOVERY_URI,
45                                        help="Override v2 Discovery URI")
46 COMPUTE_V1_DISCOVERY_FILE = flags.DEFINE_string(
47     "compute_v1_discovery_file",
48     default=None,
49     help="Load compute v1 from discovery file")
50
51 # Type aliases
52 Operation = operations_pb2.Operation
53
54
55 class GcpApiManager:
56
57     def __init__(self,
58                  *,
59                  v1_discovery_uri=None,
60                  v2_discovery_uri=None,
61                  compute_v1_discovery_file=None,
62                  private_api_key_secret_name=None):
63         self.v1_discovery_uri = v1_discovery_uri or V1_DISCOVERY_URI.value
64         self.v2_discovery_uri = v2_discovery_uri or V2_DISCOVERY_URI.value
65         self.compute_v1_discovery_file = (compute_v1_discovery_file or
66                                           COMPUTE_V1_DISCOVERY_FILE.value)
67         self.private_api_key_secret_name = (private_api_key_secret_name or
68                                             PRIVATE_API_KEY_SECRET_NAME.value)
69         # TODO(sergiitk): add options to pass google Credentials
70         self._exit_stack = contextlib.ExitStack()
71
72     def close(self):
73         self._exit_stack.close()
74
75     @property
76     @functools.lru_cache(None)
77     def private_api_key(self):
78         """
79         Private API key.
80
81         Return API key credential that identifies a GCP project allow-listed for
82         accessing private API discovery documents.
83         https://pantheon.corp.google.com/apis/credentials
84
85         This method lazy-loads the content of the key from the Secret Manager.
86         https://pantheon.corp.google.com/security/secret-manager
87         """
88         if not self.private_api_key_secret_name:
89             raise ValueError('private_api_key_secret_name must be set to '
90                              'access private_api_key.')
91
92         secrets_api = self.secrets('v1')
93         version_resource_path = secrets_api.secret_version_path(
94             **secrets_api.parse_secret_path(self.private_api_key_secret_name),
95             secret_version='latest')
96         secret: secretmanager_v1.AccessSecretVersionResponse
97         secret = secrets_api.access_secret_version(name=version_resource_path)
98         return secret.payload.data.decode()
99
100     @functools.lru_cache(None)
101     def compute(self, version):
102         api_name = 'compute'
103         if version == 'v1':
104             if self.compute_v1_discovery_file:
105                 return self._build_from_file(self.compute_v1_discovery_file)
106             else:
107                 return self._build_from_discovery_v1(api_name, version)
108
109         raise NotImplementedError(f'Compute {version} not supported')
110
111     @functools.lru_cache(None)
112     def networksecurity(self, version):
113         api_name = 'networksecurity'
114         if version == 'v1alpha1':
115             return self._build_from_discovery_v2(
116                 api_name,
117                 version,
118                 api_key=self.private_api_key,
119                 visibility_labels=['NETWORKSECURITY_ALPHA'])
120
121         raise NotImplementedError(f'Network Security {version} not supported')
122
123     @functools.lru_cache(None)
124     def networkservices(self, version):
125         api_name = 'networkservices'
126         if version == 'v1alpha1':
127             return self._build_from_discovery_v2(
128                 api_name,
129                 version,
130                 api_key=self.private_api_key,
131                 visibility_labels=['NETWORKSERVICES_ALPHA'])
132
133         raise NotImplementedError(f'Network Services {version} not supported')
134
135     @functools.lru_cache(None)
136     def secrets(self, version):
137         if version == 'v1':
138             return secretmanager_v1.SecretManagerServiceClient()
139
140         raise NotImplementedError(f'Secrets Manager {version} not supported')
141
142     def _build_from_discovery_v1(self, api_name, version):
143         api = discovery.build(api_name,
144                               version,
145                               cache_discovery=False,
146                               discoveryServiceUrl=self.v1_discovery_uri)
147         self._exit_stack.enter_context(api)
148         return api
149
150     def _build_from_discovery_v2(self,
151                                  api_name,
152                                  version,
153                                  *,
154                                  api_key: Optional[str] = None,
155                                  visibility_labels: Optional[List] = None):
156         params = {}
157         if api_key:
158             params['key'] = api_key
159         if visibility_labels:
160             # Dash-separated list of labels.
161             params['labels'] = '_'.join(visibility_labels)
162
163         params_str = ''
164         if params:
165             params_str = '&' + ('&'.join(f'{k}={v}' for k, v in params.items()))
166
167         api = discovery.build(
168             api_name,
169             version,
170             cache_discovery=False,
171             discoveryServiceUrl=f'{self.v2_discovery_uri}{params_str}')
172         self._exit_stack.enter_context(api)
173         return api
174
175     def _build_from_file(self, discovery_file):
176         with open(discovery_file, 'r') as f:
177             api = discovery.build_from_document(f.read())
178         self._exit_stack.enter_context(api)
179         return api
180
181
182 class Error(Exception):
183     """Base error class for GCP API errors"""
184
185
186 class OperationError(Error):
187     """
188     Operation was not successful.
189
190     Assuming Operation based on Google API Style Guide:
191     https://cloud.google.com/apis/design/design_patterns#long_running_operations
192     https://github.com/googleapis/googleapis/blob/master/google/longrunning/operations.proto
193     """
194
195     def __init__(self, api_name, operation_response, message=None):
196         self.api_name = api_name
197         operation = json_format.ParseDict(operation_response, Operation())
198         self.name = operation.name or 'unknown'
199         self.error = operation.error
200         self.code_name = code_pb2.Code.Name(operation.error.code)
201         if message is None:
202             message = (f'{api_name} operation "{self.name}" failed. Error '
203                        f'code: {self.error.code} ({self.code_name}), '
204                        f'message: {self.error.message}')
205         self.message = message
206         super().__init__(message)
207
208
209 class GcpProjectApiResource:
210     # TODO(sergiitk): move someplace better
211     _WAIT_FOR_OPERATION_SEC = 60 * 5
212     _WAIT_FIXED_SEC = 2
213     _GCP_API_RETRIES = 5
214
215     def __init__(self, api: discovery.Resource, project: str):
216         self.api: discovery.Resource = api
217         self.project: str = project
218
219     @staticmethod
220     def wait_for_operation(operation_request,
221                            test_success_fn,
222                            timeout_sec=_WAIT_FOR_OPERATION_SEC,
223                            wait_sec=_WAIT_FIXED_SEC):
224         retryer = tenacity.Retrying(
225             retry=(tenacity.retry_if_not_result(test_success_fn) |
226                    tenacity.retry_if_exception_type()),
227             wait=tenacity.wait_fixed(wait_sec),
228             stop=tenacity.stop_after_delay(timeout_sec),
229             after=tenacity.after_log(logger, logging.DEBUG),
230             reraise=True)
231         return retryer(operation_request.execute)
232
233     @staticmethod
234     def _resource_pretty_format(body: dict) -> str:
235         """Return a string with pretty-printed resource body."""
236         return yaml.dump(body, explicit_start=True, explicit_end=True)
237
238
239 class GcpStandardCloudApiResource(GcpProjectApiResource, metaclass=abc.ABCMeta):
240     GLOBAL_LOCATION = 'global'
241
242     def parent(self, location: Optional[str] = GLOBAL_LOCATION):
243         if location is None:
244             location = self.GLOBAL_LOCATION
245         return f'projects/{self.project}/locations/{location}'
246
247     def resource_full_name(self, name, collection_name):
248         return f'{self.parent()}/{collection_name}/{name}'
249
250     def _create_resource(self, collection: discovery.Resource, body: dict,
251                          **kwargs):
252         logger.info("Creating %s resource:\n%s", self.api_name,
253                     self._resource_pretty_format(body))
254         create_req = collection.create(parent=self.parent(),
255                                        body=body,
256                                        **kwargs)
257         self._execute(create_req)
258
259     @property
260     @abc.abstractmethod
261     def api_name(self) -> str:
262         raise NotImplementedError
263
264     @property
265     @abc.abstractmethod
266     def api_version(self) -> str:
267         raise NotImplementedError
268
269     def _get_resource(self, collection: discovery.Resource, full_name):
270         resource = collection.get(name=full_name).execute()
271         logger.info('Loaded %s:\n%s', full_name,
272                     self._resource_pretty_format(resource))
273         return resource
274
275     def _delete_resource(self, collection: discovery.Resource,
276                          full_name: str) -> bool:
277         logger.debug("Deleting %s", full_name)
278         try:
279             self._execute(collection.delete(name=full_name))
280             return True
281         except googleapiclient.errors.HttpError as error:
282             if error.resp and error.resp.status == 404:
283                 logger.info('%s not deleted since it does not exist', full_name)
284             else:
285                 logger.warning('Failed to delete %s, %r', full_name, error)
286         return False
287
288     def _execute(self,
289                  request,
290                  timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC):
291         operation = request.execute(num_retries=self._GCP_API_RETRIES)
292         self._wait(operation, timeout_sec)
293
294     def _wait(self,
295               operation,
296               timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC):
297         op_name = operation['name']
298         logger.debug('Waiting for %s operation, timeout %s sec: %s',
299                      self.api_name, timeout_sec, op_name)
300
301         op_request = self.api.projects().locations().operations().get(
302             name=op_name)
303         operation = self.wait_for_operation(
304             operation_request=op_request,
305             test_success_fn=lambda result: result['done'],
306             timeout_sec=timeout_sec)
307
308         logger.debug('Completed operation: %s', operation)
309         if 'error' in operation:
310             raise OperationError(self.api_name, operation)