Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / python / grpcio_tests / tests / unit / beta / _beta_features_test.py
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests Face interface compliance of the gRPC Python Beta API."""
15
16 import threading
17 import unittest
18
19 from grpc.beta import implementations
20 from grpc.beta import interfaces
21 from grpc.framework.common import cardinality
22 from grpc.framework.interfaces.face import utilities
23 from tests.unit import resources
24 from tests.unit.beta import test_utilities
25 from tests.unit.framework.common import test_constants
26
27 _SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
28
29 _PER_RPC_CREDENTIALS_METADATA_KEY = b'my-call-credentials-metadata-key'
30 _PER_RPC_CREDENTIALS_METADATA_VALUE = b'my-call-credentials-metadata-value'
31
32 _GROUP = 'group'
33 _UNARY_UNARY = 'unary-unary'
34 _UNARY_STREAM = 'unary-stream'
35 _STREAM_UNARY = 'stream-unary'
36 _STREAM_STREAM = 'stream-stream'
37
38 _REQUEST = b'abc'
39 _RESPONSE = b'123'
40
41
42 class _Servicer(object):
43
44     def __init__(self):
45         self._condition = threading.Condition()
46         self._peer = None
47         self._serviced = False
48
49     def unary_unary(self, request, context):
50         with self._condition:
51             self._request = request
52             self._peer = context.protocol_context().peer()
53             self._invocation_metadata = context.invocation_metadata()
54             context.protocol_context().disable_next_response_compression()
55             self._serviced = True
56             self._condition.notify_all()
57             return _RESPONSE
58
59     def unary_stream(self, request, context):
60         with self._condition:
61             self._request = request
62             self._peer = context.protocol_context().peer()
63             self._invocation_metadata = context.invocation_metadata()
64             context.protocol_context().disable_next_response_compression()
65             self._serviced = True
66             self._condition.notify_all()
67             return
68             yield  # pylint: disable=unreachable
69
70     def stream_unary(self, request_iterator, context):
71         for request in request_iterator:
72             self._request = request
73         with self._condition:
74             self._peer = context.protocol_context().peer()
75             self._invocation_metadata = context.invocation_metadata()
76             context.protocol_context().disable_next_response_compression()
77             self._serviced = True
78             self._condition.notify_all()
79             return _RESPONSE
80
81     def stream_stream(self, request_iterator, context):
82         for request in request_iterator:
83             with self._condition:
84                 self._peer = context.protocol_context().peer()
85                 context.protocol_context().disable_next_response_compression()
86                 yield _RESPONSE
87         with self._condition:
88             self._invocation_metadata = context.invocation_metadata()
89             self._serviced = True
90             self._condition.notify_all()
91
92     def peer(self):
93         with self._condition:
94             return self._peer
95
96     def block_until_serviced(self):
97         with self._condition:
98             while not self._serviced:
99                 self._condition.wait()
100
101
102 class _BlockingIterator(object):
103
104     def __init__(self, upstream):
105         self._condition = threading.Condition()
106         self._upstream = upstream
107         self._allowed = []
108
109     def __iter__(self):
110         return self
111
112     def __next__(self):
113         return self.next()
114
115     def next(self):
116         with self._condition:
117             while True:
118                 if self._allowed is None:
119                     raise StopIteration()
120                 elif self._allowed:
121                     return self._allowed.pop(0)
122                 else:
123                     self._condition.wait()
124
125     def allow(self):
126         with self._condition:
127             try:
128                 self._allowed.append(next(self._upstream))
129             except StopIteration:
130                 self._allowed = None
131             self._condition.notify_all()
132
133
134 def _metadata_plugin(context, callback):
135     callback([
136         (_PER_RPC_CREDENTIALS_METADATA_KEY, _PER_RPC_CREDENTIALS_METADATA_VALUE)
137     ], None)
138
139
140 class BetaFeaturesTest(unittest.TestCase):
141
142     def setUp(self):
143         self._servicer = _Servicer()
144         method_implementations = {
145             (_GROUP, _UNARY_UNARY):
146                 utilities.unary_unary_inline(self._servicer.unary_unary),
147             (_GROUP, _UNARY_STREAM):
148                 utilities.unary_stream_inline(self._servicer.unary_stream),
149             (_GROUP, _STREAM_UNARY):
150                 utilities.stream_unary_inline(self._servicer.stream_unary),
151             (_GROUP, _STREAM_STREAM):
152                 utilities.stream_stream_inline(self._servicer.stream_stream),
153         }
154
155         cardinalities = {
156             _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
157             _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
158             _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
159             _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
160         }
161
162         server_options = implementations.server_options(
163             thread_pool_size=test_constants.POOL_SIZE)
164         self._server = implementations.server(method_implementations,
165                                               options=server_options)
166         server_credentials = implementations.ssl_server_credentials([
167             (
168                 resources.private_key(),
169                 resources.certificate_chain(),
170             ),
171         ])
172         port = self._server.add_secure_port('[::]:0', server_credentials)
173         self._server.start()
174         self._channel_credentials = implementations.ssl_channel_credentials(
175             resources.test_root_certificates())
176         self._call_credentials = implementations.metadata_call_credentials(
177             _metadata_plugin)
178         channel = test_utilities.not_really_secure_channel(
179             'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE)
180         stub_options = implementations.stub_options(
181             thread_pool_size=test_constants.POOL_SIZE)
182         self._dynamic_stub = implementations.dynamic_stub(channel,
183                                                           _GROUP,
184                                                           cardinalities,
185                                                           options=stub_options)
186
187     def tearDown(self):
188         self._dynamic_stub = None
189         self._server.stop(test_constants.SHORT_TIMEOUT).wait()
190
191     def test_unary_unary(self):
192         call_options = interfaces.grpc_call_options(
193             disable_compression=True, credentials=self._call_credentials)
194         response = getattr(self._dynamic_stub,
195                            _UNARY_UNARY)(_REQUEST,
196                                          test_constants.LONG_TIMEOUT,
197                                          protocol_options=call_options)
198         self.assertEqual(_RESPONSE, response)
199         self.assertIsNotNone(self._servicer.peer())
200         invocation_metadata = [
201             (metadatum.key, metadatum.value)
202             for metadatum in self._servicer._invocation_metadata
203         ]
204         self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
205                        _PER_RPC_CREDENTIALS_METADATA_VALUE),
206                       invocation_metadata)
207
208     def test_unary_stream(self):
209         call_options = interfaces.grpc_call_options(
210             disable_compression=True, credentials=self._call_credentials)
211         response_iterator = getattr(self._dynamic_stub, _UNARY_STREAM)(
212             _REQUEST,
213             test_constants.LONG_TIMEOUT,
214             protocol_options=call_options)
215         self._servicer.block_until_serviced()
216         self.assertIsNotNone(self._servicer.peer())
217         invocation_metadata = [
218             (metadatum.key, metadatum.value)
219             for metadatum in self._servicer._invocation_metadata
220         ]
221         self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
222                        _PER_RPC_CREDENTIALS_METADATA_VALUE),
223                       invocation_metadata)
224
225     def test_stream_unary(self):
226         call_options = interfaces.grpc_call_options(
227             credentials=self._call_credentials)
228         request_iterator = _BlockingIterator(iter((_REQUEST,)))
229         response_future = getattr(self._dynamic_stub, _STREAM_UNARY).future(
230             request_iterator,
231             test_constants.LONG_TIMEOUT,
232             protocol_options=call_options)
233         response_future.protocol_context().disable_next_request_compression()
234         request_iterator.allow()
235         response_future.protocol_context().disable_next_request_compression()
236         request_iterator.allow()
237         self._servicer.block_until_serviced()
238         self.assertIsNotNone(self._servicer.peer())
239         self.assertEqual(_RESPONSE, response_future.result())
240         invocation_metadata = [
241             (metadatum.key, metadatum.value)
242             for metadatum in self._servicer._invocation_metadata
243         ]
244         self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
245                        _PER_RPC_CREDENTIALS_METADATA_VALUE),
246                       invocation_metadata)
247
248     def test_stream_stream(self):
249         call_options = interfaces.grpc_call_options(
250             credentials=self._call_credentials)
251         request_iterator = _BlockingIterator(iter((_REQUEST,)))
252         response_iterator = getattr(self._dynamic_stub, _STREAM_STREAM)(
253             request_iterator,
254             test_constants.SHORT_TIMEOUT,
255             protocol_options=call_options)
256         response_iterator.protocol_context().disable_next_request_compression()
257         request_iterator.allow()
258         response = next(response_iterator)
259         response_iterator.protocol_context().disable_next_request_compression()
260         request_iterator.allow()
261         self._servicer.block_until_serviced()
262         self.assertIsNotNone(self._servicer.peer())
263         self.assertEqual(_RESPONSE, response)
264         invocation_metadata = [
265             (metadatum.key, metadatum.value)
266             for metadatum in self._servicer._invocation_metadata
267         ]
268         self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
269                        _PER_RPC_CREDENTIALS_METADATA_VALUE),
270                       invocation_metadata)
271
272
273 class ContextManagementAndLifecycleTest(unittest.TestCase):
274
275     def setUp(self):
276         self._servicer = _Servicer()
277         self._method_implementations = {
278             (_GROUP, _UNARY_UNARY):
279                 utilities.unary_unary_inline(self._servicer.unary_unary),
280             (_GROUP, _UNARY_STREAM):
281                 utilities.unary_stream_inline(self._servicer.unary_stream),
282             (_GROUP, _STREAM_UNARY):
283                 utilities.stream_unary_inline(self._servicer.stream_unary),
284             (_GROUP, _STREAM_STREAM):
285                 utilities.stream_stream_inline(self._servicer.stream_stream),
286         }
287
288         self._cardinalities = {
289             _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
290             _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
291             _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
292             _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
293         }
294
295         self._server_options = implementations.server_options(
296             thread_pool_size=test_constants.POOL_SIZE)
297         self._server_credentials = implementations.ssl_server_credentials([
298             (
299                 resources.private_key(),
300                 resources.certificate_chain(),
301             ),
302         ])
303         self._channel_credentials = implementations.ssl_channel_credentials(
304             resources.test_root_certificates())
305         self._stub_options = implementations.stub_options(
306             thread_pool_size=test_constants.POOL_SIZE)
307
308     def test_stub_context(self):
309         server = implementations.server(self._method_implementations,
310                                         options=self._server_options)
311         port = server.add_secure_port('[::]:0', self._server_credentials)
312         server.start()
313
314         channel = test_utilities.not_really_secure_channel(
315             'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE)
316         dynamic_stub = implementations.dynamic_stub(channel,
317                                                     _GROUP,
318                                                     self._cardinalities,
319                                                     options=self._stub_options)
320         for _ in range(100):
321             with dynamic_stub:
322                 pass
323         for _ in range(10):
324             with dynamic_stub:
325                 call_options = interfaces.grpc_call_options(
326                     disable_compression=True)
327                 response = getattr(dynamic_stub,
328                                    _UNARY_UNARY)(_REQUEST,
329                                                  test_constants.LONG_TIMEOUT,
330                                                  protocol_options=call_options)
331                 self.assertEqual(_RESPONSE, response)
332                 self.assertIsNotNone(self._servicer.peer())
333
334         server.stop(test_constants.SHORT_TIMEOUT).wait()
335
336     def test_server_lifecycle(self):
337         for _ in range(100):
338             server = implementations.server(self._method_implementations,
339                                             options=self._server_options)
340             port = server.add_secure_port('[::]:0', self._server_credentials)
341             server.start()
342             server.stop(test_constants.SHORT_TIMEOUT).wait()
343         for _ in range(100):
344             server = implementations.server(self._method_implementations,
345                                             options=self._server_options)
346             server.add_secure_port('[::]:0', self._server_credentials)
347             server.add_insecure_port('[::]:0')
348             with server:
349                 server.stop(test_constants.SHORT_TIMEOUT)
350             server.stop(test_constants.SHORT_TIMEOUT)
351
352
353 if __name__ == '__main__':
354     unittest.main(verbosity=2)