1 # Copyright 2020 The gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Functions that obviate explicit stubs and explicit channels."""
21 from typing import (Any, AnyStr, Callable, Dict, Iterator, Optional, Sequence,
22 Tuple, TypeVar, Union)
25 from grpc.experimental import experimental_api
27 RequestType = TypeVar('RequestType')
28 ResponseType = TypeVar('ResponseType')
30 OptionsType = Sequence[Tuple[str, str]]
31 CacheKey = Tuple[str, OptionsType, Optional[grpc.ChannelCredentials], Optional[
34 _LOGGER = logging.getLogger(__name__)
36 _EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"
37 if _EVICTION_PERIOD_KEY in os.environ:
38 _EVICTION_PERIOD = datetime.timedelta(
39 seconds=float(os.environ[_EVICTION_PERIOD_KEY]))
40 _LOGGER.debug("Setting managed channel eviction period to %s",
43 _EVICTION_PERIOD = datetime.timedelta(minutes=10)
45 _MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"
46 if _MAXIMUM_CHANNELS_KEY in os.environ:
47 _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY])
48 _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS)
50 _MAXIMUM_CHANNELS = 2**8
53 def _create_channel(target: str, options: Sequence[Tuple[str, str]],
54 channel_credentials: Optional[grpc.ChannelCredentials],
55 compression: Optional[grpc.Compression]) -> grpc.Channel:
56 # TODO(rbellevi): Revisit the default value for this.
57 if channel_credentials is None:
58 raise NotImplementedError(
59 "channel_credentials must be supplied explicitly.")
60 if channel_credentials._credentials is grpc.experimental._insecure_channel_credentials:
61 _LOGGER.debug(f"Creating insecure channel with options '{options}' " +
62 f"and compression '{compression}'")
63 return grpc.insecure_channel(target,
65 compression=compression)
68 f"Creating secure channel with credentials '{channel_credentials}', "
69 + f"options '{options}' and compression '{compression}'")
70 return grpc.secure_channel(target,
71 credentials=channel_credentials,
73 compression=compression)
77 # NOTE(rbellevi): Untyped due to reference cycle.
79 _lock: threading.RLock = threading.RLock()
80 _condition: threading.Condition = threading.Condition(lock=_lock)
81 _eviction_ready: threading.Event = threading.Event()
83 _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]]
84 _eviction_thread: threading.Thread
87 self._mapping = collections.OrderedDict()
88 self._eviction_thread = threading.Thread(
89 target=ChannelCache._perform_evictions, daemon=True)
90 self._eviction_thread.start()
94 with ChannelCache._lock:
95 if ChannelCache._singleton is None:
96 ChannelCache._singleton = ChannelCache()
97 ChannelCache._eviction_ready.wait()
98 return ChannelCache._singleton
100 def _evict_locked(self, key: CacheKey):
101 channel, _ = self._mapping.pop(key)
102 _LOGGER.debug("Evicting channel %s with configuration %s.", channel,
108 def _perform_evictions():
110 with ChannelCache._lock:
111 ChannelCache._eviction_ready.set()
112 if not ChannelCache._singleton._mapping:
113 ChannelCache._condition.wait()
114 elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS:
115 key = next(iter(ChannelCache._singleton._mapping.keys()))
116 ChannelCache._singleton._evict_locked(key)
117 # And immediately reevaluate.
119 key, (_, eviction_time) = next(
120 iter(ChannelCache._singleton._mapping.items()))
121 now = datetime.datetime.now()
122 if eviction_time <= now:
123 ChannelCache._singleton._evict_locked(key)
126 time_to_eviction = (eviction_time - now).total_seconds()
127 # NOTE: We aim to *eventually* coalesce to a state in
128 # which no overdue channels are in the cache and the
129 # length of the cache is longer than _MAXIMUM_CHANNELS.
130 # We tolerate momentary states in which these two
131 # criteria are not met.
132 ChannelCache._condition.wait(timeout=time_to_eviction)
134 def get_channel(self, target: str, options: Sequence[Tuple[str, str]],
135 channel_credentials: Optional[grpc.ChannelCredentials],
136 compression: Optional[grpc.Compression]) -> grpc.Channel:
137 key = (target, options, channel_credentials, compression)
139 channel_data = self._mapping.get(key, None)
140 if channel_data is not None:
141 channel = channel_data[0]
142 self._mapping.pop(key)
143 self._mapping[key] = (channel, datetime.datetime.now() +
147 channel = _create_channel(target, options, channel_credentials,
149 self._mapping[key] = (channel, datetime.datetime.now() +
151 if len(self._mapping) == 1 or len(
152 self._mapping) >= _MAXIMUM_CHANNELS:
153 self._condition.notify()
156 def _test_only_channel_count(self) -> int:
158 return len(self._mapping)
163 request: RequestType,
166 request_serializer: Optional[Callable[[Any], bytes]] = None,
167 response_deserializer: Optional[Callable[[bytes], Any]] = None,
168 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
169 channel_credentials: Optional[grpc.ChannelCredentials] = None,
170 call_credentials: Optional[grpc.CallCredentials] = None,
171 compression: Optional[grpc.Compression] = None,
172 wait_for_ready: Optional[bool] = None,
173 timeout: Optional[float] = None,
174 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
176 """Invokes a unary-unary RPC without an explicitly specified channel.
178 THIS IS AN EXPERIMENTAL API.
180 This is backed by a per-process cache of channels. Channels are evicted
181 from the cache after a fixed period by a background. Channels will also be
182 evicted if more than a configured maximum accumulate.
184 The default eviction period is 10 minutes. One may set the environment
185 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
187 The default maximum number of channels is 256. One may set the
188 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
192 request: An iterator that yields request values for the RPC.
193 target: The server address.
194 method: The name of the RPC method.
195 request_serializer: Optional behaviour for serializing the request
196 message. Request goes unserialized in case None is passed.
197 response_deserializer: Optional behaviour for deserializing the response
198 message. Response goes undeserialized in case None is passed.
199 options: An optional list of key-value pairs (channel args in gRPC Core
200 runtime) to configure the channel.
201 channel_credentials: A credential applied to the whole channel, e.g. the
202 return value of grpc.ssl_channel_credentials() or
203 grpc.insecure_channel_credentials().
204 call_credentials: A call credential applied to each call individually,
205 e.g. the output of grpc.metadata_call_credentials() or
206 grpc.access_token_call_credentials().
207 compression: An optional value indicating the compression method to be
208 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
209 wait_for_ready: An optional flag indicating whether the RPC should fail
210 immediately if the connection is not ready at the time the RPC is
211 invoked, or if it should wait until the connection to the server
212 becomes ready. When using this option, the user will likely also want
213 to set a timeout. Defaults to False.
214 timeout: An optional duration of time in seconds to allow for the RPC,
215 after which an exception will be raised.
216 metadata: Optional metadata to send to the server.
219 The response to the RPC.
221 channel = ChannelCache.get().get_channel(target, options,
222 channel_credentials, compression)
223 multicallable = channel.unary_unary(method, request_serializer,
224 response_deserializer)
225 return multicallable(request,
227 wait_for_ready=wait_for_ready,
228 credentials=call_credentials,
234 request: RequestType,
237 request_serializer: Optional[Callable[[Any], bytes]] = None,
238 response_deserializer: Optional[Callable[[bytes], Any]] = None,
239 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
240 channel_credentials: Optional[grpc.ChannelCredentials] = None,
241 call_credentials: Optional[grpc.CallCredentials] = None,
242 compression: Optional[grpc.Compression] = None,
243 wait_for_ready: Optional[bool] = None,
244 timeout: Optional[float] = None,
245 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
246 ) -> Iterator[ResponseType]:
247 """Invokes a unary-stream RPC without an explicitly specified channel.
249 THIS IS AN EXPERIMENTAL API.
251 This is backed by a per-process cache of channels. Channels are evicted
252 from the cache after a fixed period by a background. Channels will also be
253 evicted if more than a configured maximum accumulate.
255 The default eviction period is 10 minutes. One may set the environment
256 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
258 The default maximum number of channels is 256. One may set the
259 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
263 request: An iterator that yields request values for the RPC.
264 target: The server address.
265 method: The name of the RPC method.
266 request_serializer: Optional behaviour for serializing the request
267 message. Request goes unserialized in case None is passed.
268 response_deserializer: Optional behaviour for deserializing the response
269 message. Response goes undeserialized in case None is passed.
270 options: An optional list of key-value pairs (channel args in gRPC Core
271 runtime) to configure the channel.
272 channel_credentials: A credential applied to the whole channel, e.g. the
273 return value of grpc.ssl_channel_credentials().
274 call_credentials: A call credential applied to each call individually,
275 e.g. the output of grpc.metadata_call_credentials() or
276 grpc.access_token_call_credentials().
277 compression: An optional value indicating the compression method to be
278 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
279 wait_for_ready: An optional flag indicating whether the RPC should fail
280 immediately if the connection is not ready at the time the RPC is
281 invoked, or if it should wait until the connection to the server
282 becomes ready. When using this option, the user will likely also want
283 to set a timeout. Defaults to False.
284 timeout: An optional duration of time in seconds to allow for the RPC,
285 after which an exception will be raised.
286 metadata: Optional metadata to send to the server.
289 An iterator of responses.
291 channel = ChannelCache.get().get_channel(target, options,
292 channel_credentials, compression)
293 multicallable = channel.unary_stream(method, request_serializer,
294 response_deserializer)
295 return multicallable(request,
297 wait_for_ready=wait_for_ready,
298 credentials=call_credentials,
304 request_iterator: Iterator[RequestType],
307 request_serializer: Optional[Callable[[Any], bytes]] = None,
308 response_deserializer: Optional[Callable[[bytes], Any]] = None,
309 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
310 channel_credentials: Optional[grpc.ChannelCredentials] = None,
311 call_credentials: Optional[grpc.CallCredentials] = None,
312 compression: Optional[grpc.Compression] = None,
313 wait_for_ready: Optional[bool] = None,
314 timeout: Optional[float] = None,
315 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
317 """Invokes a stream-unary RPC without an explicitly specified channel.
319 THIS IS AN EXPERIMENTAL API.
321 This is backed by a per-process cache of channels. Channels are evicted
322 from the cache after a fixed period by a background. Channels will also be
323 evicted if more than a configured maximum accumulate.
325 The default eviction period is 10 minutes. One may set the environment
326 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
328 The default maximum number of channels is 256. One may set the
329 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
333 request_iterator: An iterator that yields request values for the RPC.
334 target: The server address.
335 method: The name of the RPC method.
336 request_serializer: Optional behaviour for serializing the request
337 message. Request goes unserialized in case None is passed.
338 response_deserializer: Optional behaviour for deserializing the response
339 message. Response goes undeserialized in case None is passed.
340 options: An optional list of key-value pairs (channel args in gRPC Core
341 runtime) to configure the channel.
342 channel_credentials: A credential applied to the whole channel, e.g. the
343 return value of grpc.ssl_channel_credentials().
344 call_credentials: A call credential applied to each call individually,
345 e.g. the output of grpc.metadata_call_credentials() or
346 grpc.access_token_call_credentials().
347 compression: An optional value indicating the compression method to be
348 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
349 wait_for_ready: An optional flag indicating whether the RPC should fail
350 immediately if the connection is not ready at the time the RPC is
351 invoked, or if it should wait until the connection to the server
352 becomes ready. When using this option, the user will likely also want
353 to set a timeout. Defaults to False.
354 timeout: An optional duration of time in seconds to allow for the RPC,
355 after which an exception will be raised.
356 metadata: Optional metadata to send to the server.
359 The response to the RPC.
361 channel = ChannelCache.get().get_channel(target, options,
362 channel_credentials, compression)
363 multicallable = channel.stream_unary(method, request_serializer,
364 response_deserializer)
365 return multicallable(request_iterator,
367 wait_for_ready=wait_for_ready,
368 credentials=call_credentials,
374 request_iterator: Iterator[RequestType],
377 request_serializer: Optional[Callable[[Any], bytes]] = None,
378 response_deserializer: Optional[Callable[[bytes], Any]] = None,
379 options: Sequence[Tuple[AnyStr, AnyStr]] = (),
380 channel_credentials: Optional[grpc.ChannelCredentials] = None,
381 call_credentials: Optional[grpc.CallCredentials] = None,
382 compression: Optional[grpc.Compression] = None,
383 wait_for_ready: Optional[bool] = None,
384 timeout: Optional[float] = None,
385 metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
386 ) -> Iterator[ResponseType]:
387 """Invokes a stream-stream RPC without an explicitly specified channel.
389 THIS IS AN EXPERIMENTAL API.
391 This is backed by a per-process cache of channels. Channels are evicted
392 from the cache after a fixed period by a background. Channels will also be
393 evicted if more than a configured maximum accumulate.
395 The default eviction period is 10 minutes. One may set the environment
396 variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
398 The default maximum number of channels is 256. One may set the
399 environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
403 request_iterator: An iterator that yields request values for the RPC.
404 target: The server address.
405 method: The name of the RPC method.
406 request_serializer: Optional behaviour for serializing the request
407 message. Request goes unserialized in case None is passed.
408 response_deserializer: Optional behaviour for deserializing the response
409 message. Response goes undeserialized in case None is passed.
410 options: An optional list of key-value pairs (channel args in gRPC Core
411 runtime) to configure the channel.
412 channel_credentials: A credential applied to the whole channel, e.g. the
413 return value of grpc.ssl_channel_credentials().
414 call_credentials: A call credential applied to each call individually,
415 e.g. the output of grpc.metadata_call_credentials() or
416 grpc.access_token_call_credentials().
417 compression: An optional value indicating the compression method to be
418 used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
419 wait_for_ready: An optional flag indicating whether the RPC should fail
420 immediately if the connection is not ready at the time the RPC is
421 invoked, or if it should wait until the connection to the server
422 becomes ready. When using this option, the user will likely also want
423 to set a timeout. Defaults to False.
424 timeout: An optional duration of time in seconds to allow for the RPC,
425 after which an exception will be raised.
426 metadata: Optional metadata to send to the server.
429 An iterator of responses.
431 channel = ChannelCache.get().get_channel(target, options,
432 channel_credentials, compression)
433 multicallable = channel.stream_stream(method, request_serializer,
434 response_deserializer)
435 return multicallable(request_iterator,
437 wait_for_ready=wait_for_ready,
438 credentials=call_credentials,