Imported Upstream version 1.28.1
[platform/upstream/grpc.git] / src / python / grpcio / grpc / _utilities.py
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Internal utilities for gRPC Python."""
15
16 import collections
17 import threading
18 import time
19 import logging
20
21 import six
22
23 import grpc
24 from grpc import _common
25
26 _LOGGER = logging.getLogger(__name__)
27
28 _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE = (
29     'Exception calling connectivity future "done" callback!')
30
31
32 class RpcMethodHandler(
33         collections.namedtuple('_RpcMethodHandler', (
34             'request_streaming',
35             'response_streaming',
36             'request_deserializer',
37             'response_serializer',
38             'unary_unary',
39             'unary_stream',
40             'stream_unary',
41             'stream_stream',
42         )), grpc.RpcMethodHandler):
43     pass
44
45
46 class DictionaryGenericHandler(grpc.ServiceRpcHandler):
47
48     def __init__(self, service, method_handlers):
49         self._name = service
50         self._method_handlers = {
51             _common.fully_qualified_method(service, method): method_handler
52             for method, method_handler in six.iteritems(method_handlers)
53         }
54
55     def service_name(self):
56         return self._name
57
58     def service(self, handler_call_details):
59         return self._method_handlers.get(handler_call_details.method)
60
61
62 class _ChannelReadyFuture(grpc.Future):
63
64     def __init__(self, channel):
65         self._condition = threading.Condition()
66         self._channel = channel
67
68         self._matured = False
69         self._cancelled = False
70         self._done_callbacks = []
71
72     def _block(self, timeout):
73         until = None if timeout is None else time.time() + timeout
74         with self._condition:
75             while True:
76                 if self._cancelled:
77                     raise grpc.FutureCancelledError()
78                 elif self._matured:
79                     return
80                 else:
81                     if until is None:
82                         self._condition.wait()
83                     else:
84                         remaining = until - time.time()
85                         if remaining < 0:
86                             raise grpc.FutureTimeoutError()
87                         else:
88                             self._condition.wait(timeout=remaining)
89
90     def _update(self, connectivity):
91         with self._condition:
92             if (not self._cancelled and
93                     connectivity is grpc.ChannelConnectivity.READY):
94                 self._matured = True
95                 self._channel.unsubscribe(self._update)
96                 self._condition.notify_all()
97                 done_callbacks = tuple(self._done_callbacks)
98                 self._done_callbacks = None
99             else:
100                 return
101
102         for done_callback in done_callbacks:
103             try:
104                 done_callback(self)
105             except Exception:  # pylint: disable=broad-except
106                 _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE)
107
108     def cancel(self):
109         with self._condition:
110             if not self._matured:
111                 self._cancelled = True
112                 self._channel.unsubscribe(self._update)
113                 self._condition.notify_all()
114                 done_callbacks = tuple(self._done_callbacks)
115                 self._done_callbacks = None
116             else:
117                 return False
118
119         for done_callback in done_callbacks:
120             try:
121                 done_callback(self)
122             except Exception:  # pylint: disable=broad-except
123                 _LOGGER.exception(_DONE_CALLBACK_EXCEPTION_LOG_MESSAGE)
124
125         return True
126
127     def cancelled(self):
128         with self._condition:
129             return self._cancelled
130
131     def running(self):
132         with self._condition:
133             return not self._cancelled and not self._matured
134
135     def done(self):
136         with self._condition:
137             return self._cancelled or self._matured
138
139     def result(self, timeout=None):
140         self._block(timeout)
141
142     def exception(self, timeout=None):
143         self._block(timeout)
144
145     def traceback(self, timeout=None):
146         self._block(timeout)
147
148     def add_done_callback(self, fn):
149         with self._condition:
150             if not self._cancelled and not self._matured:
151                 self._done_callbacks.append(fn)
152                 return
153
154         fn(self)
155
156     def start(self):
157         with self._condition:
158             self._channel.subscribe(self._update, try_to_connect=True)
159
160     def __del__(self):
161         with self._condition:
162             if not self._cancelled and not self._matured:
163                 self._channel.unsubscribe(self._update)
164
165
166 def channel_ready_future(channel):
167     ready_future = _ChannelReadyFuture(channel)
168     ready_future.start()
169     return ready_future