Imported Upstream version 1.21.0
[platform/upstream/grpc.git] / src / python / grpcio_tests / tests / health_check / _health_servicer_test.py
1 # Copyright 2016 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests of grpc_health.v1.health."""
15
16 import threading
17 import time
18 import unittest
19
20 import grpc
21 from grpc_health.v1 import health
22 from grpc_health.v1 import health_pb2
23 from grpc_health.v1 import health_pb2_grpc
24
25 from tests.unit import test_common
26 from tests.unit import thread_pool
27 from tests.unit.framework.common import test_constants
28
29 from six.moves import queue
30
31 _SERVING_SERVICE = 'grpc.test.TestServiceServing'
32 _UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown'
33 _NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing'
34 _WATCH_SERVICE = 'grpc.test.WatchService'
35
36
37 def _consume_responses(response_iterator, response_queue):
38     for response in response_iterator:
39         response_queue.put(response)
40
41
42 class BaseWatchTests(object):
43
44     class WatchTests(unittest.TestCase):
45
46         def start_server(self, non_blocking=False, thread_pool=None):
47             self._thread_pool = thread_pool
48             self._servicer = health.HealthServicer(
49                 experimental_non_blocking=non_blocking,
50                 experimental_thread_pool=thread_pool)
51             self._servicer.set('', health_pb2.HealthCheckResponse.SERVING)
52             self._servicer.set(_SERVING_SERVICE,
53                                health_pb2.HealthCheckResponse.SERVING)
54             self._servicer.set(_UNKNOWN_SERVICE,
55                                health_pb2.HealthCheckResponse.UNKNOWN)
56             self._servicer.set(_NOT_SERVING_SERVICE,
57                                health_pb2.HealthCheckResponse.NOT_SERVING)
58             self._server = test_common.test_server()
59             port = self._server.add_insecure_port('[::]:0')
60             health_pb2_grpc.add_HealthServicer_to_server(
61                 self._servicer, self._server)
62             self._server.start()
63
64             self._channel = grpc.insecure_channel('localhost:%d' % port)
65             self._stub = health_pb2_grpc.HealthStub(self._channel)
66
67         def tearDown(self):
68             self._server.stop(None)
69             self._channel.close()
70
71         def test_watch_empty_service(self):
72             request = health_pb2.HealthCheckRequest(service='')
73             response_queue = queue.Queue()
74             rendezvous = self._stub.Watch(request)
75             thread = threading.Thread(
76                 target=_consume_responses, args=(rendezvous, response_queue))
77             thread.start()
78
79             response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
80             self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
81                              response.status)
82
83             rendezvous.cancel()
84             thread.join()
85             self.assertTrue(response_queue.empty())
86
87             if self._thread_pool is not None:
88                 self.assertTrue(self._thread_pool.was_used())
89
90         def test_watch_new_service(self):
91             request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
92             response_queue = queue.Queue()
93             rendezvous = self._stub.Watch(request)
94             thread = threading.Thread(
95                 target=_consume_responses, args=(rendezvous, response_queue))
96             thread.start()
97
98             response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
99             self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
100                              response.status)
101
102             self._servicer.set(_WATCH_SERVICE,
103                                health_pb2.HealthCheckResponse.SERVING)
104             response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
105             self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
106                              response.status)
107
108             self._servicer.set(_WATCH_SERVICE,
109                                health_pb2.HealthCheckResponse.NOT_SERVING)
110             response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
111             self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
112                              response.status)
113
114             rendezvous.cancel()
115             thread.join()
116             self.assertTrue(response_queue.empty())
117
118         def test_watch_service_isolation(self):
119             request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
120             response_queue = queue.Queue()
121             rendezvous = self._stub.Watch(request)
122             thread = threading.Thread(
123                 target=_consume_responses, args=(rendezvous, response_queue))
124             thread.start()
125
126             response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
127             self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
128                              response.status)
129
130             self._servicer.set('some-other-service',
131                                health_pb2.HealthCheckResponse.SERVING)
132             with self.assertRaises(queue.Empty):
133                 response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
134
135             rendezvous.cancel()
136             thread.join()
137             self.assertTrue(response_queue.empty())
138
139         def test_two_watchers(self):
140             request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
141             response_queue1 = queue.Queue()
142             response_queue2 = queue.Queue()
143             rendezvous1 = self._stub.Watch(request)
144             rendezvous2 = self._stub.Watch(request)
145             thread1 = threading.Thread(
146                 target=_consume_responses, args=(rendezvous1, response_queue1))
147             thread2 = threading.Thread(
148                 target=_consume_responses, args=(rendezvous2, response_queue2))
149             thread1.start()
150             thread2.start()
151
152             response1 = response_queue1.get(
153                 timeout=test_constants.SHORT_TIMEOUT)
154             response2 = response_queue2.get(
155                 timeout=test_constants.SHORT_TIMEOUT)
156             self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
157                              response1.status)
158             self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
159                              response2.status)
160
161             self._servicer.set(_WATCH_SERVICE,
162                                health_pb2.HealthCheckResponse.SERVING)
163             response1 = response_queue1.get(
164                 timeout=test_constants.SHORT_TIMEOUT)
165             response2 = response_queue2.get(
166                 timeout=test_constants.SHORT_TIMEOUT)
167             self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
168                              response1.status)
169             self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
170                              response2.status)
171
172             rendezvous1.cancel()
173             rendezvous2.cancel()
174             thread1.join()
175             thread2.join()
176             self.assertTrue(response_queue1.empty())
177             self.assertTrue(response_queue2.empty())
178
179         @unittest.skip("https://github.com/grpc/grpc/issues/18127")
180         def test_cancelled_watch_removed_from_watch_list(self):
181             request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
182             response_queue = queue.Queue()
183             rendezvous = self._stub.Watch(request)
184             thread = threading.Thread(
185                 target=_consume_responses, args=(rendezvous, response_queue))
186             thread.start()
187
188             response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
189             self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
190                              response.status)
191
192             rendezvous.cancel()
193             self._servicer.set(_WATCH_SERVICE,
194                                health_pb2.HealthCheckResponse.SERVING)
195             thread.join()
196
197             # Wait, if necessary, for serving thread to process client cancellation
198             timeout = time.time() + test_constants.TIME_ALLOWANCE
199             while time.time(
200             ) < timeout and self._servicer._send_response_callbacks[_WATCH_SERVICE]:
201                 time.sleep(1)
202             self.assertFalse(
203                 self._servicer._send_response_callbacks[_WATCH_SERVICE],
204                 'watch set should be empty')
205             self.assertTrue(response_queue.empty())
206
207         def test_graceful_shutdown(self):
208             request = health_pb2.HealthCheckRequest(service='')
209             response_queue = queue.Queue()
210             rendezvous = self._stub.Watch(request)
211             thread = threading.Thread(
212                 target=_consume_responses, args=(rendezvous, response_queue))
213             thread.start()
214
215             response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
216             self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
217                              response.status)
218
219             self._servicer.enter_graceful_shutdown()
220             response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
221             self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
222                              response.status)
223
224             # This should be a no-op.
225             self._servicer.set('', health_pb2.HealthCheckResponse.SERVING)
226
227             rendezvous.cancel()
228             thread.join()
229             self.assertTrue(response_queue.empty())
230
231
232 class HealthServicerTest(BaseWatchTests.WatchTests):
233
234     def setUp(self):
235         self._thread_pool = thread_pool.RecordingThreadPool(max_workers=None)
236         super(HealthServicerTest, self).start_server(
237             non_blocking=True, thread_pool=self._thread_pool)
238
239     def test_check_empty_service(self):
240         request = health_pb2.HealthCheckRequest()
241         resp = self._stub.Check(request)
242         self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
243
244     def test_check_serving_service(self):
245         request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE)
246         resp = self._stub.Check(request)
247         self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
248
249     def test_check_unknown_service(self):
250         request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE)
251         resp = self._stub.Check(request)
252         self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status)
253
254     def test_check_not_serving_service(self):
255         request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE)
256         resp = self._stub.Check(request)
257         self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
258                          resp.status)
259
260     def test_check_not_found_service(self):
261         request = health_pb2.HealthCheckRequest(service='not-found')
262         with self.assertRaises(grpc.RpcError) as context:
263             resp = self._stub.Check(request)
264
265         self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code())
266
267     def test_health_service_name(self):
268         self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health')
269
270
271 class HealthServicerBackwardsCompatibleWatchTest(BaseWatchTests.WatchTests):
272
273     def setUp(self):
274         super(HealthServicerBackwardsCompatibleWatchTest, self).start_server(
275             non_blocking=False, thread_pool=None)
276
277
278 if __name__ == '__main__':
279     unittest.main(verbosity=2)