1 # Copyright 2016 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests of grpc_health.v1.health."""
21 from grpc_health.v1 import health
22 from grpc_health.v1 import health_pb2
23 from grpc_health.v1 import health_pb2_grpc
25 from tests.unit import test_common
26 from tests.unit import thread_pool
27 from tests.unit.framework.common import test_constants
29 from six.moves import queue
31 _SERVING_SERVICE = 'grpc.test.TestServiceServing'
32 _UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown'
33 _NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing'
34 _WATCH_SERVICE = 'grpc.test.WatchService'
37 def _consume_responses(response_iterator, response_queue):
38 for response in response_iterator:
39 response_queue.put(response)
42 class BaseWatchTests(object):
44 class WatchTests(unittest.TestCase):
46 def start_server(self, non_blocking=False, thread_pool=None):
47 self._thread_pool = thread_pool
48 self._servicer = health.HealthServicer(
49 experimental_non_blocking=non_blocking,
50 experimental_thread_pool=thread_pool)
51 self._servicer.set('', health_pb2.HealthCheckResponse.SERVING)
52 self._servicer.set(_SERVING_SERVICE,
53 health_pb2.HealthCheckResponse.SERVING)
54 self._servicer.set(_UNKNOWN_SERVICE,
55 health_pb2.HealthCheckResponse.UNKNOWN)
56 self._servicer.set(_NOT_SERVING_SERVICE,
57 health_pb2.HealthCheckResponse.NOT_SERVING)
58 self._server = test_common.test_server()
59 port = self._server.add_insecure_port('[::]:0')
60 health_pb2_grpc.add_HealthServicer_to_server(
61 self._servicer, self._server)
64 self._channel = grpc.insecure_channel('localhost:%d' % port)
65 self._stub = health_pb2_grpc.HealthStub(self._channel)
68 self._server.stop(None)
71 def test_watch_empty_service(self):
72 request = health_pb2.HealthCheckRequest(service='')
73 response_queue = queue.Queue()
74 rendezvous = self._stub.Watch(request)
75 thread = threading.Thread(
76 target=_consume_responses, args=(rendezvous, response_queue))
79 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
80 self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
85 self.assertTrue(response_queue.empty())
87 if self._thread_pool is not None:
88 self.assertTrue(self._thread_pool.was_used())
90 def test_watch_new_service(self):
91 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
92 response_queue = queue.Queue()
93 rendezvous = self._stub.Watch(request)
94 thread = threading.Thread(
95 target=_consume_responses, args=(rendezvous, response_queue))
98 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
99 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
102 self._servicer.set(_WATCH_SERVICE,
103 health_pb2.HealthCheckResponse.SERVING)
104 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
105 self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
108 self._servicer.set(_WATCH_SERVICE,
109 health_pb2.HealthCheckResponse.NOT_SERVING)
110 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
111 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
116 self.assertTrue(response_queue.empty())
118 def test_watch_service_isolation(self):
119 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
120 response_queue = queue.Queue()
121 rendezvous = self._stub.Watch(request)
122 thread = threading.Thread(
123 target=_consume_responses, args=(rendezvous, response_queue))
126 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
127 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
130 self._servicer.set('some-other-service',
131 health_pb2.HealthCheckResponse.SERVING)
132 with self.assertRaises(queue.Empty):
133 response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
137 self.assertTrue(response_queue.empty())
139 def test_two_watchers(self):
140 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
141 response_queue1 = queue.Queue()
142 response_queue2 = queue.Queue()
143 rendezvous1 = self._stub.Watch(request)
144 rendezvous2 = self._stub.Watch(request)
145 thread1 = threading.Thread(
146 target=_consume_responses, args=(rendezvous1, response_queue1))
147 thread2 = threading.Thread(
148 target=_consume_responses, args=(rendezvous2, response_queue2))
152 response1 = response_queue1.get(
153 timeout=test_constants.SHORT_TIMEOUT)
154 response2 = response_queue2.get(
155 timeout=test_constants.SHORT_TIMEOUT)
156 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
158 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
161 self._servicer.set(_WATCH_SERVICE,
162 health_pb2.HealthCheckResponse.SERVING)
163 response1 = response_queue1.get(
164 timeout=test_constants.SHORT_TIMEOUT)
165 response2 = response_queue2.get(
166 timeout=test_constants.SHORT_TIMEOUT)
167 self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
169 self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
176 self.assertTrue(response_queue1.empty())
177 self.assertTrue(response_queue2.empty())
179 @unittest.skip("https://github.com/grpc/grpc/issues/18127")
180 def test_cancelled_watch_removed_from_watch_list(self):
181 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
182 response_queue = queue.Queue()
183 rendezvous = self._stub.Watch(request)
184 thread = threading.Thread(
185 target=_consume_responses, args=(rendezvous, response_queue))
188 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
189 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
193 self._servicer.set(_WATCH_SERVICE,
194 health_pb2.HealthCheckResponse.SERVING)
197 # Wait, if necessary, for serving thread to process client cancellation
198 timeout = time.time() + test_constants.TIME_ALLOWANCE
200 ) < timeout and self._servicer._send_response_callbacks[_WATCH_SERVICE]:
203 self._servicer._send_response_callbacks[_WATCH_SERVICE],
204 'watch set should be empty')
205 self.assertTrue(response_queue.empty())
207 def test_graceful_shutdown(self):
208 request = health_pb2.HealthCheckRequest(service='')
209 response_queue = queue.Queue()
210 rendezvous = self._stub.Watch(request)
211 thread = threading.Thread(
212 target=_consume_responses, args=(rendezvous, response_queue))
215 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
216 self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
219 self._servicer.enter_graceful_shutdown()
220 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
221 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
224 # This should be a no-op.
225 self._servicer.set('', health_pb2.HealthCheckResponse.SERVING)
229 self.assertTrue(response_queue.empty())
232 class HealthServicerTest(BaseWatchTests.WatchTests):
235 self._thread_pool = thread_pool.RecordingThreadPool(max_workers=None)
236 super(HealthServicerTest, self).start_server(
237 non_blocking=True, thread_pool=self._thread_pool)
239 def test_check_empty_service(self):
240 request = health_pb2.HealthCheckRequest()
241 resp = self._stub.Check(request)
242 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
244 def test_check_serving_service(self):
245 request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE)
246 resp = self._stub.Check(request)
247 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
249 def test_check_unknown_service(self):
250 request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE)
251 resp = self._stub.Check(request)
252 self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status)
254 def test_check_not_serving_service(self):
255 request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE)
256 resp = self._stub.Check(request)
257 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
260 def test_check_not_found_service(self):
261 request = health_pb2.HealthCheckRequest(service='not-found')
262 with self.assertRaises(grpc.RpcError) as context:
263 resp = self._stub.Check(request)
265 self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code())
267 def test_health_service_name(self):
268 self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health')
271 class HealthServicerBackwardsCompatibleWatchTest(BaseWatchTests.WatchTests):
274 super(HealthServicerBackwardsCompatibleWatchTest, self).start_server(
275 non_blocking=False, thread_pool=None)
278 if __name__ == '__main__':
279 unittest.main(verbosity=2)