Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / python / grpcio_tests / tests / unit / _channel_ready_future_test.py
1 # Copyright 2015 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests of grpc.channel_ready_future."""
15
16 import threading
17 import unittest
18 import logging
19
20 import grpc
21 from tests.unit.framework.common import test_constants
22 from tests.unit import thread_pool
23
24
25 class _Callback(object):
26
27     def __init__(self):
28         self._condition = threading.Condition()
29         self._value = None
30
31     def accept_value(self, value):
32         with self._condition:
33             self._value = value
34             self._condition.notify_all()
35
36     def block_until_called(self):
37         with self._condition:
38             while self._value is None:
39                 self._condition.wait()
40             return self._value
41
42
43 class ChannelReadyFutureTest(unittest.TestCase):
44
45     def test_lonely_channel_connectivity(self):
46         channel = grpc.insecure_channel('localhost:12345')
47         callback = _Callback()
48
49         ready_future = grpc.channel_ready_future(channel)
50         ready_future.add_done_callback(callback.accept_value)
51         with self.assertRaises(grpc.FutureTimeoutError):
52             ready_future.result(timeout=test_constants.SHORT_TIMEOUT)
53         self.assertFalse(ready_future.cancelled())
54         self.assertFalse(ready_future.done())
55         self.assertTrue(ready_future.running())
56         ready_future.cancel()
57         value_passed_to_callback = callback.block_until_called()
58         self.assertIs(ready_future, value_passed_to_callback)
59         self.assertTrue(ready_future.cancelled())
60         self.assertTrue(ready_future.done())
61         self.assertFalse(ready_future.running())
62
63         channel.close()
64
65     def test_immediately_connectable_channel_connectivity(self):
66         recording_thread_pool = thread_pool.RecordingThreadPool(
67             max_workers=None)
68         server = grpc.server(recording_thread_pool,
69                              options=(('grpc.so_reuseport', 0),))
70         port = server.add_insecure_port('[::]:0')
71         server.start()
72         channel = grpc.insecure_channel('localhost:{}'.format(port))
73         callback = _Callback()
74
75         ready_future = grpc.channel_ready_future(channel)
76         ready_future.add_done_callback(callback.accept_value)
77         self.assertIsNone(
78             ready_future.result(timeout=test_constants.LONG_TIMEOUT))
79         value_passed_to_callback = callback.block_until_called()
80         self.assertIs(ready_future, value_passed_to_callback)
81         self.assertFalse(ready_future.cancelled())
82         self.assertTrue(ready_future.done())
83         self.assertFalse(ready_future.running())
84         # Cancellation after maturity has no effect.
85         ready_future.cancel()
86         self.assertFalse(ready_future.cancelled())
87         self.assertTrue(ready_future.done())
88         self.assertFalse(ready_future.running())
89         self.assertFalse(recording_thread_pool.was_used())
90
91         channel.close()
92         server.stop(None)
93
94
95 if __name__ == '__main__':
96     logging.basicConfig()
97     unittest.main(verbosity=2)