1 # Copyright 2017 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Test servers at the level of the Cython API."""
20 from grpc._cython import cygrpc
23 class Test(unittest.TestCase):
25 def test_lonely_server(self):
26 server_call_completion_queue = cygrpc.CompletionQueue()
27 server_shutdown_completion_queue = cygrpc.CompletionQueue()
28 server = cygrpc.Server(None, False)
29 server.register_completion_queue(server_call_completion_queue)
30 server.register_completion_queue(server_shutdown_completion_queue)
31 port = server.add_http2_port(b'[::]:0')
34 server_request_call_tag = 'server_request_call_tag'
35 server_request_call_start_batch_result = server.request_call(
36 server_call_completion_queue, server_call_completion_queue,
37 server_request_call_tag)
41 server_shutdown_tag = 'server_shutdown_tag'
42 server_shutdown_result = server.shutdown(
43 server_shutdown_completion_queue, server_shutdown_tag)
44 server_request_call_event = server_call_completion_queue.poll()
45 server_shutdown_event = server_shutdown_completion_queue.poll()
48 if __name__ == '__main__':
49 unittest.main(verbosity=2)