1 # Copyright 2015 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests Face interface compliance of the gRPC Python Beta API."""
19 from grpc.beta import implementations
20 from grpc.beta import interfaces
21 from grpc.framework.common import cardinality
22 from grpc.framework.interfaces.face import utilities
23 from tests.unit import resources
24 from tests.unit.beta import test_utilities
25 from tests.unit.framework.common import test_constants
27 _SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
29 _PER_RPC_CREDENTIALS_METADATA_KEY = b'my-call-credentials-metadata-key'
30 _PER_RPC_CREDENTIALS_METADATA_VALUE = b'my-call-credentials-metadata-value'
33 _UNARY_UNARY = 'unary-unary'
34 _UNARY_STREAM = 'unary-stream'
35 _STREAM_UNARY = 'stream-unary'
36 _STREAM_STREAM = 'stream-stream'
42 class _Servicer(object):
45 self._condition = threading.Condition()
47 self._serviced = False
49 def unary_unary(self, request, context):
51 self._request = request
52 self._peer = context.protocol_context().peer()
53 self._invocation_metadata = context.invocation_metadata()
54 context.protocol_context().disable_next_response_compression()
56 self._condition.notify_all()
59 def unary_stream(self, request, context):
61 self._request = request
62 self._peer = context.protocol_context().peer()
63 self._invocation_metadata = context.invocation_metadata()
64 context.protocol_context().disable_next_response_compression()
66 self._condition.notify_all()
68 yield # pylint: disable=unreachable
70 def stream_unary(self, request_iterator, context):
71 for request in request_iterator:
72 self._request = request
74 self._peer = context.protocol_context().peer()
75 self._invocation_metadata = context.invocation_metadata()
76 context.protocol_context().disable_next_response_compression()
78 self._condition.notify_all()
81 def stream_stream(self, request_iterator, context):
82 for request in request_iterator:
84 self._peer = context.protocol_context().peer()
85 context.protocol_context().disable_next_response_compression()
88 self._invocation_metadata = context.invocation_metadata()
90 self._condition.notify_all()
96 def block_until_serviced(self):
98 while not self._serviced:
99 self._condition.wait()
102 class _BlockingIterator(object):
104 def __init__(self, upstream):
105 self._condition = threading.Condition()
106 self._upstream = upstream
116 with self._condition:
118 if self._allowed is None:
119 raise StopIteration()
121 return self._allowed.pop(0)
123 self._condition.wait()
126 with self._condition:
128 self._allowed.append(next(self._upstream))
129 except StopIteration:
131 self._condition.notify_all()
134 def _metadata_plugin(context, callback):
136 (_PER_RPC_CREDENTIALS_METADATA_KEY, _PER_RPC_CREDENTIALS_METADATA_VALUE)
140 class BetaFeaturesTest(unittest.TestCase):
143 self._servicer = _Servicer()
144 method_implementations = {
145 (_GROUP, _UNARY_UNARY):
146 utilities.unary_unary_inline(self._servicer.unary_unary),
147 (_GROUP, _UNARY_STREAM):
148 utilities.unary_stream_inline(self._servicer.unary_stream),
149 (_GROUP, _STREAM_UNARY):
150 utilities.stream_unary_inline(self._servicer.stream_unary),
151 (_GROUP, _STREAM_STREAM):
152 utilities.stream_stream_inline(self._servicer.stream_stream),
156 _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
157 _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
158 _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
159 _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
162 server_options = implementations.server_options(
163 thread_pool_size=test_constants.POOL_SIZE)
164 self._server = implementations.server(method_implementations,
165 options=server_options)
166 server_credentials = implementations.ssl_server_credentials([
168 resources.private_key(),
169 resources.certificate_chain(),
172 port = self._server.add_secure_port('[::]:0', server_credentials)
174 self._channel_credentials = implementations.ssl_channel_credentials(
175 resources.test_root_certificates())
176 self._call_credentials = implementations.metadata_call_credentials(
178 channel = test_utilities.not_really_secure_channel(
179 'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE)
180 stub_options = implementations.stub_options(
181 thread_pool_size=test_constants.POOL_SIZE)
182 self._dynamic_stub = implementations.dynamic_stub(channel,
185 options=stub_options)
188 self._dynamic_stub = None
189 self._server.stop(test_constants.SHORT_TIMEOUT).wait()
191 def test_unary_unary(self):
192 call_options = interfaces.grpc_call_options(
193 disable_compression=True, credentials=self._call_credentials)
194 response = getattr(self._dynamic_stub,
195 _UNARY_UNARY)(_REQUEST,
196 test_constants.LONG_TIMEOUT,
197 protocol_options=call_options)
198 self.assertEqual(_RESPONSE, response)
199 self.assertIsNotNone(self._servicer.peer())
200 invocation_metadata = [
201 (metadatum.key, metadatum.value)
202 for metadatum in self._servicer._invocation_metadata
204 self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
205 _PER_RPC_CREDENTIALS_METADATA_VALUE),
208 def test_unary_stream(self):
209 call_options = interfaces.grpc_call_options(
210 disable_compression=True, credentials=self._call_credentials)
211 response_iterator = getattr(self._dynamic_stub, _UNARY_STREAM)(
213 test_constants.LONG_TIMEOUT,
214 protocol_options=call_options)
215 self._servicer.block_until_serviced()
216 self.assertIsNotNone(self._servicer.peer())
217 invocation_metadata = [
218 (metadatum.key, metadatum.value)
219 for metadatum in self._servicer._invocation_metadata
221 self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
222 _PER_RPC_CREDENTIALS_METADATA_VALUE),
225 def test_stream_unary(self):
226 call_options = interfaces.grpc_call_options(
227 credentials=self._call_credentials)
228 request_iterator = _BlockingIterator(iter((_REQUEST,)))
229 response_future = getattr(self._dynamic_stub, _STREAM_UNARY).future(
231 test_constants.LONG_TIMEOUT,
232 protocol_options=call_options)
233 response_future.protocol_context().disable_next_request_compression()
234 request_iterator.allow()
235 response_future.protocol_context().disable_next_request_compression()
236 request_iterator.allow()
237 self._servicer.block_until_serviced()
238 self.assertIsNotNone(self._servicer.peer())
239 self.assertEqual(_RESPONSE, response_future.result())
240 invocation_metadata = [
241 (metadatum.key, metadatum.value)
242 for metadatum in self._servicer._invocation_metadata
244 self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
245 _PER_RPC_CREDENTIALS_METADATA_VALUE),
248 def test_stream_stream(self):
249 call_options = interfaces.grpc_call_options(
250 credentials=self._call_credentials)
251 request_iterator = _BlockingIterator(iter((_REQUEST,)))
252 response_iterator = getattr(self._dynamic_stub, _STREAM_STREAM)(
254 test_constants.SHORT_TIMEOUT,
255 protocol_options=call_options)
256 response_iterator.protocol_context().disable_next_request_compression()
257 request_iterator.allow()
258 response = next(response_iterator)
259 response_iterator.protocol_context().disable_next_request_compression()
260 request_iterator.allow()
261 self._servicer.block_until_serviced()
262 self.assertIsNotNone(self._servicer.peer())
263 self.assertEqual(_RESPONSE, response)
264 invocation_metadata = [
265 (metadatum.key, metadatum.value)
266 for metadatum in self._servicer._invocation_metadata
268 self.assertIn((_PER_RPC_CREDENTIALS_METADATA_KEY,
269 _PER_RPC_CREDENTIALS_METADATA_VALUE),
273 class ContextManagementAndLifecycleTest(unittest.TestCase):
276 self._servicer = _Servicer()
277 self._method_implementations = {
278 (_GROUP, _UNARY_UNARY):
279 utilities.unary_unary_inline(self._servicer.unary_unary),
280 (_GROUP, _UNARY_STREAM):
281 utilities.unary_stream_inline(self._servicer.unary_stream),
282 (_GROUP, _STREAM_UNARY):
283 utilities.stream_unary_inline(self._servicer.stream_unary),
284 (_GROUP, _STREAM_STREAM):
285 utilities.stream_stream_inline(self._servicer.stream_stream),
288 self._cardinalities = {
289 _UNARY_UNARY: cardinality.Cardinality.UNARY_UNARY,
290 _UNARY_STREAM: cardinality.Cardinality.UNARY_STREAM,
291 _STREAM_UNARY: cardinality.Cardinality.STREAM_UNARY,
292 _STREAM_STREAM: cardinality.Cardinality.STREAM_STREAM,
295 self._server_options = implementations.server_options(
296 thread_pool_size=test_constants.POOL_SIZE)
297 self._server_credentials = implementations.ssl_server_credentials([
299 resources.private_key(),
300 resources.certificate_chain(),
303 self._channel_credentials = implementations.ssl_channel_credentials(
304 resources.test_root_certificates())
305 self._stub_options = implementations.stub_options(
306 thread_pool_size=test_constants.POOL_SIZE)
308 def test_stub_context(self):
309 server = implementations.server(self._method_implementations,
310 options=self._server_options)
311 port = server.add_secure_port('[::]:0', self._server_credentials)
314 channel = test_utilities.not_really_secure_channel(
315 'localhost', port, self._channel_credentials, _SERVER_HOST_OVERRIDE)
316 dynamic_stub = implementations.dynamic_stub(channel,
319 options=self._stub_options)
325 call_options = interfaces.grpc_call_options(
326 disable_compression=True)
327 response = getattr(dynamic_stub,
328 _UNARY_UNARY)(_REQUEST,
329 test_constants.LONG_TIMEOUT,
330 protocol_options=call_options)
331 self.assertEqual(_RESPONSE, response)
332 self.assertIsNotNone(self._servicer.peer())
334 server.stop(test_constants.SHORT_TIMEOUT).wait()
336 def test_server_lifecycle(self):
338 server = implementations.server(self._method_implementations,
339 options=self._server_options)
340 port = server.add_secure_port('[::]:0', self._server_credentials)
342 server.stop(test_constants.SHORT_TIMEOUT).wait()
344 server = implementations.server(self._method_implementations,
345 options=self._server_options)
346 server.add_secure_port('[::]:0', self._server_credentials)
347 server.add_insecure_port('[::]:0')
349 server.stop(test_constants.SHORT_TIMEOUT)
350 server.stop(test_constants.SHORT_TIMEOUT)
353 if __name__ == '__main__':
354 unittest.main(verbosity=2)