Imported Upstream version 1.36.0
[platform/upstream/grpc.git] / src / python / grpcio_tests / tests / unit / _cython / _server_test.py
1 # Copyright 2017 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Test servers at the level of the Cython API."""
15
16 import threading
17 import time
18 import unittest
19
20 from grpc._cython import cygrpc
21
22
23 class Test(unittest.TestCase):
24
25     def test_lonely_server(self):
26         server_call_completion_queue = cygrpc.CompletionQueue()
27         server_shutdown_completion_queue = cygrpc.CompletionQueue()
28         server = cygrpc.Server(None, False)
29         server.register_completion_queue(server_call_completion_queue)
30         server.register_completion_queue(server_shutdown_completion_queue)
31         port = server.add_http2_port(b'[::]:0')
32         server.start()
33
34         server_request_call_tag = 'server_request_call_tag'
35         server_request_call_start_batch_result = server.request_call(
36             server_call_completion_queue, server_call_completion_queue,
37             server_request_call_tag)
38
39         time.sleep(4)
40
41         server_shutdown_tag = 'server_shutdown_tag'
42         server_shutdown_result = server.shutdown(
43             server_shutdown_completion_queue, server_shutdown_tag)
44         server_request_call_event = server_call_completion_queue.poll()
45         server_shutdown_event = server_shutdown_completion_queue.poll()
46
47
48 if __name__ == '__main__':
49     unittest.main(verbosity=2)