import contextlib
import functools
import logging
-from typing import Optional
+from typing import Optional, List
# Workaround: `grpc` must be imported before `google.protobuf.json_format`,
# to prevent "Segmentation fault". Ref https://github.com/grpc/grpc/issues/24897
def networksecurity(self, version):
api_name = 'networksecurity'
if version == 'v1alpha1':
- return self._build_from_discovery_v2(api_name,
- version,
- api_key=self.private_api_key)
+ return self._build_from_discovery_v2(
+ api_name,
+ version,
+ api_key=self.private_api_key,
+ visibility_labels=['NETWORKSECURITY_ALPHA'])
raise NotImplementedError(f'Network Security {version} not supported')
def networkservices(self, version):
api_name = 'networkservices'
if version == 'v1alpha1':
- return self._build_from_discovery_v2(api_name,
- version,
- api_key=self.private_api_key)
+ return self._build_from_discovery_v2(
+ api_name,
+ version,
+ api_key=self.private_api_key,
+ visibility_labels=['NETWORKSERVICES_ALPHA'])
raise NotImplementedError(f'Network Services {version} not supported')
self._exit_stack.enter_context(api)
return api
- def _build_from_discovery_v2(self, api_name, version, *, api_key=None):
- key_arg = f'&key={api_key}' if api_key else ''
+ def _build_from_discovery_v2(self,
+ api_name,
+ version,
+ *,
+ api_key: Optional[str] = None,
+ visibility_labels: Optional[List] = None):
+ params = {}
+ if api_key:
+ params['key'] = api_key
+ if visibility_labels:
+ # Dash-separated list of labels.
+ params['labels'] = '_'.join(visibility_labels)
+
+ params_str = ''
+ if params:
+ params_str = '&' + ('&'.join(f'{k}={v}' for k, v in params.items()))
+
api = discovery.build(
api_name,
version,
cache_discovery=False,
- discoveryServiceUrl=f'{self.v2_discovery_uri}{key_arg}')
+ discoveryServiceUrl=f'{self.v2_discovery_uri}{params_str}')
self._exit_stack.enter_context(api)
return api