Imported Upstream version 1.28.1
[platform/upstream/grpc.git] / src / python / grpcio / grpc / _simple_stubs.py
1 # Copyright 2020 The gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Functions that obviate explicit stubs and explicit channels."""
15
16 import collections
17 import datetime
18 import os
19 import logging
20 import threading
21 from typing import (Any, AnyStr, Callable, Dict, Iterator, Optional, Sequence,
22                     Tuple, TypeVar, Union)
23
24 import grpc
25 from grpc.experimental import experimental_api
26
27 RequestType = TypeVar('RequestType')
28 ResponseType = TypeVar('ResponseType')
29
30 OptionsType = Sequence[Tuple[str, str]]
31 CacheKey = Tuple[str, OptionsType, Optional[grpc.ChannelCredentials], Optional[
32     grpc.Compression]]
33
34 _LOGGER = logging.getLogger(__name__)
35
36 _EVICTION_PERIOD_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS"
37 if _EVICTION_PERIOD_KEY in os.environ:
38     _EVICTION_PERIOD = datetime.timedelta(
39         seconds=float(os.environ[_EVICTION_PERIOD_KEY]))
40     _LOGGER.debug("Setting managed channel eviction period to %s",
41                   _EVICTION_PERIOD)
42 else:
43     _EVICTION_PERIOD = datetime.timedelta(minutes=10)
44
45 _MAXIMUM_CHANNELS_KEY = "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM"
46 if _MAXIMUM_CHANNELS_KEY in os.environ:
47     _MAXIMUM_CHANNELS = int(os.environ[_MAXIMUM_CHANNELS_KEY])
48     _LOGGER.debug("Setting maximum managed channels to %d", _MAXIMUM_CHANNELS)
49 else:
50     _MAXIMUM_CHANNELS = 2**8
51
52
53 def _create_channel(target: str, options: Sequence[Tuple[str, str]],
54                     channel_credentials: Optional[grpc.ChannelCredentials],
55                     compression: Optional[grpc.Compression]) -> grpc.Channel:
56     # TODO(rbellevi): Revisit the default value for this.
57     if channel_credentials is None:
58         raise NotImplementedError(
59             "channel_credentials must be supplied explicitly.")
60     if channel_credentials._credentials is grpc.experimental._insecure_channel_credentials:
61         _LOGGER.debug(f"Creating insecure channel with options '{options}' " +
62                       f"and compression '{compression}'")
63         return grpc.insecure_channel(target,
64                                      options=options,
65                                      compression=compression)
66     else:
67         _LOGGER.debug(
68             f"Creating secure channel with credentials '{channel_credentials}', "
69             + f"options '{options}' and compression '{compression}'")
70         return grpc.secure_channel(target,
71                                    credentials=channel_credentials,
72                                    options=options,
73                                    compression=compression)
74
75
76 class ChannelCache:
77     # NOTE(rbellevi): Untyped due to reference cycle.
78     _singleton = None
79     _lock: threading.RLock = threading.RLock()
80     _condition: threading.Condition = threading.Condition(lock=_lock)
81     _eviction_ready: threading.Event = threading.Event()
82
83     _mapping: Dict[CacheKey, Tuple[grpc.Channel, datetime.datetime]]
84     _eviction_thread: threading.Thread
85
86     def __init__(self):
87         self._mapping = collections.OrderedDict()
88         self._eviction_thread = threading.Thread(
89             target=ChannelCache._perform_evictions, daemon=True)
90         self._eviction_thread.start()
91
92     @staticmethod
93     def get():
94         with ChannelCache._lock:
95             if ChannelCache._singleton is None:
96                 ChannelCache._singleton = ChannelCache()
97         ChannelCache._eviction_ready.wait()
98         return ChannelCache._singleton
99
100     def _evict_locked(self, key: CacheKey):
101         channel, _ = self._mapping.pop(key)
102         _LOGGER.debug("Evicting channel %s with configuration %s.", channel,
103                       key)
104         channel.close()
105         del channel
106
107     @staticmethod
108     def _perform_evictions():
109         while True:
110             with ChannelCache._lock:
111                 ChannelCache._eviction_ready.set()
112                 if not ChannelCache._singleton._mapping:
113                     ChannelCache._condition.wait()
114                 elif len(ChannelCache._singleton._mapping) > _MAXIMUM_CHANNELS:
115                     key = next(iter(ChannelCache._singleton._mapping.keys()))
116                     ChannelCache._singleton._evict_locked(key)
117                     # And immediately reevaluate.
118                 else:
119                     key, (_, eviction_time) = next(
120                         iter(ChannelCache._singleton._mapping.items()))
121                     now = datetime.datetime.now()
122                     if eviction_time <= now:
123                         ChannelCache._singleton._evict_locked(key)
124                         continue
125                     else:
126                         time_to_eviction = (eviction_time - now).total_seconds()
127                         # NOTE: We aim to *eventually* coalesce to a state in
128                         # which no overdue channels are in the cache and the
129                         # length of the cache is longer than _MAXIMUM_CHANNELS.
130                         # We tolerate momentary states in which these two
131                         # criteria are not met.
132                         ChannelCache._condition.wait(timeout=time_to_eviction)
133
134     def get_channel(self, target: str, options: Sequence[Tuple[str, str]],
135                     channel_credentials: Optional[grpc.ChannelCredentials],
136                     compression: Optional[grpc.Compression]) -> grpc.Channel:
137         key = (target, options, channel_credentials, compression)
138         with self._lock:
139             channel_data = self._mapping.get(key, None)
140             if channel_data is not None:
141                 channel = channel_data[0]
142                 self._mapping.pop(key)
143                 self._mapping[key] = (channel, datetime.datetime.now() +
144                                       _EVICTION_PERIOD)
145                 return channel
146             else:
147                 channel = _create_channel(target, options, channel_credentials,
148                                           compression)
149                 self._mapping[key] = (channel, datetime.datetime.now() +
150                                       _EVICTION_PERIOD)
151                 if len(self._mapping) == 1 or len(
152                         self._mapping) >= _MAXIMUM_CHANNELS:
153                     self._condition.notify()
154                 return channel
155
156     def _test_only_channel_count(self) -> int:
157         with self._lock:
158             return len(self._mapping)
159
160
161 @experimental_api
162 def unary_unary(
163         request: RequestType,
164         target: str,
165         method: str,
166         request_serializer: Optional[Callable[[Any], bytes]] = None,
167         response_deserializer: Optional[Callable[[bytes], Any]] = None,
168         options: Sequence[Tuple[AnyStr, AnyStr]] = (),
169         channel_credentials: Optional[grpc.ChannelCredentials] = None,
170         call_credentials: Optional[grpc.CallCredentials] = None,
171         compression: Optional[grpc.Compression] = None,
172         wait_for_ready: Optional[bool] = None,
173         timeout: Optional[float] = None,
174         metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
175 ) -> ResponseType:
176     """Invokes a unary-unary RPC without an explicitly specified channel.
177
178     THIS IS AN EXPERIMENTAL API.
179
180     This is backed by a per-process cache of channels. Channels are evicted
181     from the cache after a fixed period by a background. Channels will also be
182     evicted if more than a configured maximum accumulate.
183
184     The default eviction period is 10 minutes. One may set the environment
185     variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
186
187     The default maximum number of channels is 256. One may set the
188     environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
189     this.
190
191     Args:
192       request: An iterator that yields request values for the RPC.
193       target: The server address.
194       method: The name of the RPC method.
195       request_serializer: Optional behaviour for serializing the request
196         message. Request goes unserialized in case None is passed.
197       response_deserializer: Optional behaviour for deserializing the response
198         message. Response goes undeserialized in case None is passed.
199       options: An optional list of key-value pairs (channel args in gRPC Core
200         runtime) to configure the channel.
201       channel_credentials: A credential applied to the whole channel, e.g. the
202         return value of grpc.ssl_channel_credentials() or
203         grpc.insecure_channel_credentials().
204       call_credentials: A call credential applied to each call individually,
205         e.g. the output of grpc.metadata_call_credentials() or
206         grpc.access_token_call_credentials().
207       compression: An optional value indicating the compression method to be
208         used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
209       wait_for_ready: An optional flag indicating whether the RPC should fail
210         immediately if the connection is not ready at the time the RPC is
211         invoked, or if it should wait until the connection to the server
212         becomes ready. When using this option, the user will likely also want
213         to set a timeout. Defaults to False.
214       timeout: An optional duration of time in seconds to allow for the RPC,
215         after which an exception will be raised.
216       metadata: Optional metadata to send to the server.
217
218     Returns:
219       The response to the RPC.
220     """
221     channel = ChannelCache.get().get_channel(target, options,
222                                              channel_credentials, compression)
223     multicallable = channel.unary_unary(method, request_serializer,
224                                         response_deserializer)
225     return multicallable(request,
226                          metadata=metadata,
227                          wait_for_ready=wait_for_ready,
228                          credentials=call_credentials,
229                          timeout=timeout)
230
231
232 @experimental_api
233 def unary_stream(
234         request: RequestType,
235         target: str,
236         method: str,
237         request_serializer: Optional[Callable[[Any], bytes]] = None,
238         response_deserializer: Optional[Callable[[bytes], Any]] = None,
239         options: Sequence[Tuple[AnyStr, AnyStr]] = (),
240         channel_credentials: Optional[grpc.ChannelCredentials] = None,
241         call_credentials: Optional[grpc.CallCredentials] = None,
242         compression: Optional[grpc.Compression] = None,
243         wait_for_ready: Optional[bool] = None,
244         timeout: Optional[float] = None,
245         metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
246 ) -> Iterator[ResponseType]:
247     """Invokes a unary-stream RPC without an explicitly specified channel.
248
249     THIS IS AN EXPERIMENTAL API.
250
251     This is backed by a per-process cache of channels. Channels are evicted
252     from the cache after a fixed period by a background. Channels will also be
253     evicted if more than a configured maximum accumulate.
254
255     The default eviction period is 10 minutes. One may set the environment
256     variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
257
258     The default maximum number of channels is 256. One may set the
259     environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
260     this.
261
262     Args:
263       request: An iterator that yields request values for the RPC.
264       target: The server address.
265       method: The name of the RPC method.
266       request_serializer: Optional behaviour for serializing the request
267         message. Request goes unserialized in case None is passed.
268       response_deserializer: Optional behaviour for deserializing the response
269         message. Response goes undeserialized in case None is passed.
270       options: An optional list of key-value pairs (channel args in gRPC Core
271         runtime) to configure the channel.
272       channel_credentials: A credential applied to the whole channel, e.g. the
273         return value of grpc.ssl_channel_credentials().
274       call_credentials: A call credential applied to each call individually,
275         e.g. the output of grpc.metadata_call_credentials() or
276         grpc.access_token_call_credentials().
277       compression: An optional value indicating the compression method to be
278         used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
279       wait_for_ready: An optional flag indicating whether the RPC should fail
280         immediately if the connection is not ready at the time the RPC is
281         invoked, or if it should wait until the connection to the server
282         becomes ready. When using this option, the user will likely also want
283         to set a timeout. Defaults to False.
284       timeout: An optional duration of time in seconds to allow for the RPC,
285         after which an exception will be raised.
286       metadata: Optional metadata to send to the server.
287
288     Returns:
289       An iterator of responses.
290     """
291     channel = ChannelCache.get().get_channel(target, options,
292                                              channel_credentials, compression)
293     multicallable = channel.unary_stream(method, request_serializer,
294                                          response_deserializer)
295     return multicallable(request,
296                          metadata=metadata,
297                          wait_for_ready=wait_for_ready,
298                          credentials=call_credentials,
299                          timeout=timeout)
300
301
302 @experimental_api
303 def stream_unary(
304         request_iterator: Iterator[RequestType],
305         target: str,
306         method: str,
307         request_serializer: Optional[Callable[[Any], bytes]] = None,
308         response_deserializer: Optional[Callable[[bytes], Any]] = None,
309         options: Sequence[Tuple[AnyStr, AnyStr]] = (),
310         channel_credentials: Optional[grpc.ChannelCredentials] = None,
311         call_credentials: Optional[grpc.CallCredentials] = None,
312         compression: Optional[grpc.Compression] = None,
313         wait_for_ready: Optional[bool] = None,
314         timeout: Optional[float] = None,
315         metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
316 ) -> ResponseType:
317     """Invokes a stream-unary RPC without an explicitly specified channel.
318
319     THIS IS AN EXPERIMENTAL API.
320
321     This is backed by a per-process cache of channels. Channels are evicted
322     from the cache after a fixed period by a background. Channels will also be
323     evicted if more than a configured maximum accumulate.
324
325     The default eviction period is 10 minutes. One may set the environment
326     variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
327
328     The default maximum number of channels is 256. One may set the
329     environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
330     this.
331
332     Args:
333       request_iterator: An iterator that yields request values for the RPC.
334       target: The server address.
335       method: The name of the RPC method.
336       request_serializer: Optional behaviour for serializing the request
337         message. Request goes unserialized in case None is passed.
338       response_deserializer: Optional behaviour for deserializing the response
339         message. Response goes undeserialized in case None is passed.
340       options: An optional list of key-value pairs (channel args in gRPC Core
341         runtime) to configure the channel.
342       channel_credentials: A credential applied to the whole channel, e.g. the
343         return value of grpc.ssl_channel_credentials().
344       call_credentials: A call credential applied to each call individually,
345         e.g. the output of grpc.metadata_call_credentials() or
346         grpc.access_token_call_credentials().
347       compression: An optional value indicating the compression method to be
348         used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
349       wait_for_ready: An optional flag indicating whether the RPC should fail
350         immediately if the connection is not ready at the time the RPC is
351         invoked, or if it should wait until the connection to the server
352         becomes ready. When using this option, the user will likely also want
353         to set a timeout. Defaults to False.
354       timeout: An optional duration of time in seconds to allow for the RPC,
355         after which an exception will be raised.
356       metadata: Optional metadata to send to the server.
357
358     Returns:
359       The response to the RPC.
360     """
361     channel = ChannelCache.get().get_channel(target, options,
362                                              channel_credentials, compression)
363     multicallable = channel.stream_unary(method, request_serializer,
364                                          response_deserializer)
365     return multicallable(request_iterator,
366                          metadata=metadata,
367                          wait_for_ready=wait_for_ready,
368                          credentials=call_credentials,
369                          timeout=timeout)
370
371
372 @experimental_api
373 def stream_stream(
374         request_iterator: Iterator[RequestType],
375         target: str,
376         method: str,
377         request_serializer: Optional[Callable[[Any], bytes]] = None,
378         response_deserializer: Optional[Callable[[bytes], Any]] = None,
379         options: Sequence[Tuple[AnyStr, AnyStr]] = (),
380         channel_credentials: Optional[grpc.ChannelCredentials] = None,
381         call_credentials: Optional[grpc.CallCredentials] = None,
382         compression: Optional[grpc.Compression] = None,
383         wait_for_ready: Optional[bool] = None,
384         timeout: Optional[float] = None,
385         metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]] = None
386 ) -> Iterator[ResponseType]:
387     """Invokes a stream-stream RPC without an explicitly specified channel.
388
389     THIS IS AN EXPERIMENTAL API.
390
391     This is backed by a per-process cache of channels. Channels are evicted
392     from the cache after a fixed period by a background. Channels will also be
393     evicted if more than a configured maximum accumulate.
394
395     The default eviction period is 10 minutes. One may set the environment
396     variable "GRPC_PYTHON_MANAGED_CHANNEL_EVICTION_SECONDS" to configure this.
397
398     The default maximum number of channels is 256. One may set the
399     environment variable "GRPC_PYTHON_MANAGED_CHANNEL_MAXIMUM" to configure
400     this.
401
402     Args:
403       request_iterator: An iterator that yields request values for the RPC.
404       target: The server address.
405       method: The name of the RPC method.
406       request_serializer: Optional behaviour for serializing the request
407         message. Request goes unserialized in case None is passed.
408       response_deserializer: Optional behaviour for deserializing the response
409         message. Response goes undeserialized in case None is passed.
410       options: An optional list of key-value pairs (channel args in gRPC Core
411         runtime) to configure the channel.
412       channel_credentials: A credential applied to the whole channel, e.g. the
413         return value of grpc.ssl_channel_credentials().
414       call_credentials: A call credential applied to each call individually,
415         e.g. the output of grpc.metadata_call_credentials() or
416         grpc.access_token_call_credentials().
417       compression: An optional value indicating the compression method to be
418         used over the lifetime of the channel, e.g. grpc.Compression.Gzip.
419       wait_for_ready: An optional flag indicating whether the RPC should fail
420         immediately if the connection is not ready at the time the RPC is
421         invoked, or if it should wait until the connection to the server
422         becomes ready. When using this option, the user will likely also want
423         to set a timeout. Defaults to False.
424       timeout: An optional duration of time in seconds to allow for the RPC,
425         after which an exception will be raised.
426       metadata: Optional metadata to send to the server.
427
428     Returns:
429       An iterator of responses.
430     """
431     channel = ChannelCache.get().get_channel(target, options,
432                                              channel_credentials, compression)
433     multicallable = channel.stream_stream(method, request_serializer,
434                                           response_deserializer)
435     return multicallable(request_iterator,
436                          metadata=metadata,
437                          wait_for_ready=wait_for_ready,
438                          credentials=call_credentials,
439                          timeout=timeout)