1 # Copyright 2020 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from typing import Optional, List
20 # Workaround: `grpc` must be imported before `google.protobuf.json_format`,
21 # to prevent "Segmentation fault". Ref https://github.com/grpc/grpc/issues/24897
22 # TODO(sergiitk): Remove after #24897 is solved
23 import grpc # noqa pylint: disable=unused-import
24 from absl import flags
25 from google.cloud import secretmanager_v1
26 from google.longrunning import operations_pb2
27 from google.protobuf import json_format
28 from google.rpc import code_pb2
29 from googleapiclient import discovery
30 import googleapiclient.errors
34 logger = logging.getLogger(__name__)
35 PRIVATE_API_KEY_SECRET_NAME = flags.DEFINE_string(
36 "private_api_key_secret_name",
38 help="Load Private API access key from the latest version of the secret "
39 "with the given name, in the format projects/*/secrets/*")
40 V1_DISCOVERY_URI = flags.DEFINE_string("v1_discovery_uri",
41 default=discovery.V1_DISCOVERY_URI,
42 help="Override v1 Discovery URI")
43 V2_DISCOVERY_URI = flags.DEFINE_string("v2_discovery_uri",
44 default=discovery.V2_DISCOVERY_URI,
45 help="Override v2 Discovery URI")
46 COMPUTE_V1_DISCOVERY_FILE = flags.DEFINE_string(
47 "compute_v1_discovery_file",
49 help="Load compute v1 from discovery file")
52 Operation = operations_pb2.Operation
59 v1_discovery_uri=None,
60 v2_discovery_uri=None,
61 compute_v1_discovery_file=None,
62 private_api_key_secret_name=None):
63 self.v1_discovery_uri = v1_discovery_uri or V1_DISCOVERY_URI.value
64 self.v2_discovery_uri = v2_discovery_uri or V2_DISCOVERY_URI.value
65 self.compute_v1_discovery_file = (compute_v1_discovery_file or
66 COMPUTE_V1_DISCOVERY_FILE.value)
67 self.private_api_key_secret_name = (private_api_key_secret_name or
68 PRIVATE_API_KEY_SECRET_NAME.value)
69 # TODO(sergiitk): add options to pass google Credentials
70 self._exit_stack = contextlib.ExitStack()
73 self._exit_stack.close()
76 @functools.lru_cache(None)
77 def private_api_key(self):
81 Return API key credential that identifies a GCP project allow-listed for
82 accessing private API discovery documents.
83 https://pantheon.corp.google.com/apis/credentials
85 This method lazy-loads the content of the key from the Secret Manager.
86 https://pantheon.corp.google.com/security/secret-manager
88 if not self.private_api_key_secret_name:
89 raise ValueError('private_api_key_secret_name must be set to '
90 'access private_api_key.')
92 secrets_api = self.secrets('v1')
93 version_resource_path = secrets_api.secret_version_path(
94 **secrets_api.parse_secret_path(self.private_api_key_secret_name),
95 secret_version='latest')
96 secret: secretmanager_v1.AccessSecretVersionResponse
97 secret = secrets_api.access_secret_version(name=version_resource_path)
98 return secret.payload.data.decode()
100 @functools.lru_cache(None)
101 def compute(self, version):
104 if self.compute_v1_discovery_file:
105 return self._build_from_file(self.compute_v1_discovery_file)
107 return self._build_from_discovery_v1(api_name, version)
109 raise NotImplementedError(f'Compute {version} not supported')
111 @functools.lru_cache(None)
112 def networksecurity(self, version):
113 api_name = 'networksecurity'
114 if version == 'v1alpha1':
115 return self._build_from_discovery_v2(
118 api_key=self.private_api_key,
119 visibility_labels=['NETWORKSECURITY_ALPHA'])
121 raise NotImplementedError(f'Network Security {version} not supported')
123 @functools.lru_cache(None)
124 def networkservices(self, version):
125 api_name = 'networkservices'
126 if version == 'v1alpha1':
127 return self._build_from_discovery_v2(
130 api_key=self.private_api_key,
131 visibility_labels=['NETWORKSERVICES_ALPHA'])
133 raise NotImplementedError(f'Network Services {version} not supported')
135 @functools.lru_cache(None)
136 def secrets(self, version):
138 return secretmanager_v1.SecretManagerServiceClient()
140 raise NotImplementedError(f'Secrets Manager {version} not supported')
142 def _build_from_discovery_v1(self, api_name, version):
143 api = discovery.build(api_name,
145 cache_discovery=False,
146 discoveryServiceUrl=self.v1_discovery_uri)
147 self._exit_stack.enter_context(api)
150 def _build_from_discovery_v2(self,
154 api_key: Optional[str] = None,
155 visibility_labels: Optional[List] = None):
158 params['key'] = api_key
159 if visibility_labels:
160 # Dash-separated list of labels.
161 params['labels'] = '_'.join(visibility_labels)
165 params_str = '&' + ('&'.join(f'{k}={v}' for k, v in params.items()))
167 api = discovery.build(
170 cache_discovery=False,
171 discoveryServiceUrl=f'{self.v2_discovery_uri}{params_str}')
172 self._exit_stack.enter_context(api)
175 def _build_from_file(self, discovery_file):
176 with open(discovery_file, 'r') as f:
177 api = discovery.build_from_document(f.read())
178 self._exit_stack.enter_context(api)
182 class Error(Exception):
183 """Base error class for GCP API errors"""
186 class OperationError(Error):
188 Operation was not successful.
190 Assuming Operation based on Google API Style Guide:
191 https://cloud.google.com/apis/design/design_patterns#long_running_operations
192 https://github.com/googleapis/googleapis/blob/master/google/longrunning/operations.proto
195 def __init__(self, api_name, operation_response, message=None):
196 self.api_name = api_name
197 operation = json_format.ParseDict(operation_response, Operation())
198 self.name = operation.name or 'unknown'
199 self.error = operation.error
200 self.code_name = code_pb2.Code.Name(operation.error.code)
202 message = (f'{api_name} operation "{self.name}" failed. Error '
203 f'code: {self.error.code} ({self.code_name}), '
204 f'message: {self.error.message}')
205 self.message = message
206 super().__init__(message)
209 class GcpProjectApiResource:
210 # TODO(sergiitk): move someplace better
211 _WAIT_FOR_OPERATION_SEC = 60 * 5
215 def __init__(self, api: discovery.Resource, project: str):
216 self.api: discovery.Resource = api
217 self.project: str = project
220 def wait_for_operation(operation_request,
222 timeout_sec=_WAIT_FOR_OPERATION_SEC,
223 wait_sec=_WAIT_FIXED_SEC):
224 retryer = tenacity.Retrying(
225 retry=(tenacity.retry_if_not_result(test_success_fn) |
226 tenacity.retry_if_exception_type()),
227 wait=tenacity.wait_fixed(wait_sec),
228 stop=tenacity.stop_after_delay(timeout_sec),
229 after=tenacity.after_log(logger, logging.DEBUG),
231 return retryer(operation_request.execute)
234 def _resource_pretty_format(body: dict) -> str:
235 """Return a string with pretty-printed resource body."""
236 return yaml.dump(body, explicit_start=True, explicit_end=True)
239 class GcpStandardCloudApiResource(GcpProjectApiResource, metaclass=abc.ABCMeta):
240 GLOBAL_LOCATION = 'global'
242 def parent(self, location: Optional[str] = GLOBAL_LOCATION):
244 location = self.GLOBAL_LOCATION
245 return f'projects/{self.project}/locations/{location}'
247 def resource_full_name(self, name, collection_name):
248 return f'{self.parent()}/{collection_name}/{name}'
250 def _create_resource(self, collection: discovery.Resource, body: dict,
252 logger.info("Creating %s resource:\n%s", self.api_name,
253 self._resource_pretty_format(body))
254 create_req = collection.create(parent=self.parent(),
257 self._execute(create_req)
261 def api_name(self) -> str:
262 raise NotImplementedError
266 def api_version(self) -> str:
267 raise NotImplementedError
269 def _get_resource(self, collection: discovery.Resource, full_name):
270 resource = collection.get(name=full_name).execute()
271 logger.info('Loaded %s:\n%s', full_name,
272 self._resource_pretty_format(resource))
275 def _delete_resource(self, collection: discovery.Resource,
276 full_name: str) -> bool:
277 logger.debug("Deleting %s", full_name)
279 self._execute(collection.delete(name=full_name))
281 except googleapiclient.errors.HttpError as error:
282 if error.resp and error.resp.status == 404:
283 logger.info('%s not deleted since it does not exist', full_name)
285 logger.warning('Failed to delete %s, %r', full_name, error)
290 timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC):
291 operation = request.execute(num_retries=self._GCP_API_RETRIES)
292 self._wait(operation, timeout_sec)
296 timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC):
297 op_name = operation['name']
298 logger.debug('Waiting for %s operation, timeout %s sec: %s',
299 self.api_name, timeout_sec, op_name)
301 op_request = self.api.projects().locations().operations().get(
303 operation = self.wait_for_operation(
304 operation_request=op_request,
305 test_success_fn=lambda result: result['done'],
306 timeout_sec=timeout_sec)
308 logger.debug('Completed operation: %s', operation)
309 if 'error' in operation:
310 raise OperationError(self.api_name, operation)