1 # Copyright 2018 The gRPC Authors
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests of grpc_channelz.v1.channelz."""
18 from concurrent import futures
22 from grpc_channelz.v1 import channelz
23 from grpc_channelz.v1 import channelz_pb2
24 from grpc_channelz.v1 import channelz_pb2_grpc
26 from tests.unit import test_common
27 from tests.unit.framework.common import test_constants
29 _SUCCESSFUL_UNARY_UNARY = '/test/SuccessfulUnaryUnary'
30 _FAILED_UNARY_UNARY = '/test/FailedUnaryUnary'
31 _SUCCESSFUL_STREAM_STREAM = '/test/SuccessfulStreamStream'
33 _REQUEST = b'\x00\x00\x00'
34 _RESPONSE = b'\x01\x01\x01'
36 _DISABLE_REUSE_PORT = (('grpc.so_reuseport', 0),)
37 _ENABLE_CHANNELZ = (('grpc.enable_channelz', 1),)
38 _DISABLE_CHANNELZ = (('grpc.enable_channelz', 0),)
41 def _successful_unary_unary(request, servicer_context):
45 def _failed_unary_unary(request, servicer_context):
46 servicer_context.set_code(grpc.StatusCode.INTERNAL)
47 servicer_context.set_details("Channelz Test Intended Failure")
50 def _successful_stream_stream(request_iterator, servicer_context):
51 for _ in request_iterator:
55 class _GenericHandler(grpc.GenericRpcHandler):
57 def service(self, handler_call_details):
58 if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY:
59 return grpc.unary_unary_rpc_method_handler(_successful_unary_unary)
60 elif handler_call_details.method == _FAILED_UNARY_UNARY:
61 return grpc.unary_unary_rpc_method_handler(_failed_unary_unary)
62 elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM:
63 return grpc.stream_stream_rpc_method_handler(
64 _successful_stream_stream)
69 class _ChannelServerPair(object):
72 # Server will enable channelz service
73 self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=3),
74 options=_DISABLE_REUSE_PORT +
76 port = self.server.add_insecure_port('[::]:0')
77 self.server.add_generic_rpc_handlers((_GenericHandler(),))
80 # Channel will enable channelz service...
81 self.channel = grpc.insecure_channel('localhost:%d' % port,
85 def _generate_channel_server_pairs(n):
86 return [_ChannelServerPair() for i in range(n)]
89 def _close_channel_server_pairs(pairs):
91 pair.server.stop(None)
95 class ChannelzServicerTest(unittest.TestCase):
97 def _send_successful_unary_unary(self, idx):
98 _, r = self._pairs[idx].channel.unary_unary(
99 _SUCCESSFUL_UNARY_UNARY).with_call(_REQUEST)
100 self.assertEqual(r.code(), grpc.StatusCode.OK)
102 def _send_failed_unary_unary(self, idx):
104 self._pairs[idx].channel.unary_unary(_FAILED_UNARY_UNARY).with_call(
106 except grpc.RpcError:
109 self.fail("This call supposed to fail")
111 def _send_successful_stream_stream(self, idx):
112 response_iterator = self._pairs[idx].channel.stream_stream(
113 _SUCCESSFUL_STREAM_STREAM).__call__(
114 iter([_REQUEST] * test_constants.STREAM_LENGTH))
116 for _ in response_iterator:
118 self.assertEqual(cnt, test_constants.STREAM_LENGTH)
120 def _get_channel_id(self, idx):
121 """Channel id may not be consecutive"""
122 resp = self._channelz_stub.GetTopChannels(
123 channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
124 self.assertGreater(len(resp.channel), idx)
125 return resp.channel[idx].ref.channel_id
129 # This server is for Channelz info fetching only
130 # It self should not enable Channelz
131 self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=3),
132 options=_DISABLE_REUSE_PORT +
134 port = self._server.add_insecure_port('[::]:0')
135 channelz.add_channelz_servicer(self._server)
138 # This channel is used to fetch Channelz info only
139 # Channelz should not be enabled
140 self._channel = grpc.insecure_channel('localhost:%d' % port,
142 self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel)
145 self._server.stop(None)
146 self._channel.close()
147 _close_channel_server_pairs(self._pairs)
149 def test_get_top_channels_basic(self):
150 self._pairs = _generate_channel_server_pairs(1)
151 resp = self._channelz_stub.GetTopChannels(
152 channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
153 self.assertEqual(len(resp.channel), 1)
154 self.assertEqual(resp.end, True)
156 def test_get_top_channels_high_start_id(self):
157 self._pairs = _generate_channel_server_pairs(1)
158 resp = self._channelz_stub.GetTopChannels(
159 channelz_pb2.GetTopChannelsRequest(start_channel_id=10000))
160 self.assertEqual(len(resp.channel), 0)
161 self.assertEqual(resp.end, True)
163 def test_successful_request(self):
164 self._pairs = _generate_channel_server_pairs(1)
165 self._send_successful_unary_unary(0)
166 resp = self._channelz_stub.GetChannel(
167 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
168 self.assertEqual(resp.channel.data.calls_started, 1)
169 self.assertEqual(resp.channel.data.calls_succeeded, 1)
170 self.assertEqual(resp.channel.data.calls_failed, 0)
172 def test_failed_request(self):
173 self._pairs = _generate_channel_server_pairs(1)
174 self._send_failed_unary_unary(0)
175 resp = self._channelz_stub.GetChannel(
176 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
177 self.assertEqual(resp.channel.data.calls_started, 1)
178 self.assertEqual(resp.channel.data.calls_succeeded, 0)
179 self.assertEqual(resp.channel.data.calls_failed, 1)
181 def test_many_requests(self):
182 self._pairs = _generate_channel_server_pairs(1)
185 for i in range(k_success):
186 self._send_successful_unary_unary(0)
187 for i in range(k_failed):
188 self._send_failed_unary_unary(0)
189 resp = self._channelz_stub.GetChannel(
190 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
191 self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
192 self.assertEqual(resp.channel.data.calls_succeeded, k_success)
193 self.assertEqual(resp.channel.data.calls_failed, k_failed)
195 def test_many_channel(self):
197 self._pairs = _generate_channel_server_pairs(k_channels)
198 resp = self._channelz_stub.GetTopChannels(
199 channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
200 self.assertEqual(len(resp.channel), k_channels)
202 def test_many_requests_many_channel(self):
204 self._pairs = _generate_channel_server_pairs(k_channels)
207 for i in range(k_success):
208 self._send_successful_unary_unary(0)
209 self._send_successful_unary_unary(2)
210 for i in range(k_failed):
211 self._send_failed_unary_unary(1)
212 self._send_failed_unary_unary(2)
214 # The first channel saw only successes
215 resp = self._channelz_stub.GetChannel(
216 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
217 self.assertEqual(resp.channel.data.calls_started, k_success)
218 self.assertEqual(resp.channel.data.calls_succeeded, k_success)
219 self.assertEqual(resp.channel.data.calls_failed, 0)
221 # The second channel saw only failures
222 resp = self._channelz_stub.GetChannel(
223 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(1)))
224 self.assertEqual(resp.channel.data.calls_started, k_failed)
225 self.assertEqual(resp.channel.data.calls_succeeded, 0)
226 self.assertEqual(resp.channel.data.calls_failed, k_failed)
228 # The third channel saw both successes and failures
229 resp = self._channelz_stub.GetChannel(
230 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(2)))
231 self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
232 self.assertEqual(resp.channel.data.calls_succeeded, k_success)
233 self.assertEqual(resp.channel.data.calls_failed, k_failed)
235 # The fourth channel saw nothing
236 resp = self._channelz_stub.GetChannel(
237 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(3)))
238 self.assertEqual(resp.channel.data.calls_started, 0)
239 self.assertEqual(resp.channel.data.calls_succeeded, 0)
240 self.assertEqual(resp.channel.data.calls_failed, 0)
242 def test_many_subchannels(self):
244 self._pairs = _generate_channel_server_pairs(k_channels)
247 for i in range(k_success):
248 self._send_successful_unary_unary(0)
249 self._send_successful_unary_unary(2)
250 for i in range(k_failed):
251 self._send_failed_unary_unary(1)
252 self._send_failed_unary_unary(2)
254 gtc_resp = self._channelz_stub.GetTopChannels(
255 channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
256 self.assertEqual(len(gtc_resp.channel), k_channels)
257 for i in range(k_channels):
258 # If no call performed in the channel, there shouldn't be any subchannel
259 if gtc_resp.channel[i].data.calls_started == 0:
260 self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0)
263 # Otherwise, the subchannel should exist
264 self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0)
265 gsc_resp = self._channelz_stub.GetSubchannel(
266 channelz_pb2.GetSubchannelRequest(
267 subchannel_id=gtc_resp.channel[i].subchannel_ref[0].
269 self.assertEqual(gtc_resp.channel[i].data.calls_started,
270 gsc_resp.subchannel.data.calls_started)
271 self.assertEqual(gtc_resp.channel[i].data.calls_succeeded,
272 gsc_resp.subchannel.data.calls_succeeded)
273 self.assertEqual(gtc_resp.channel[i].data.calls_failed,
274 gsc_resp.subchannel.data.calls_failed)
276 def test_server_basic(self):
277 self._pairs = _generate_channel_server_pairs(1)
278 resp = self._channelz_stub.GetServers(
279 channelz_pb2.GetServersRequest(start_server_id=0))
280 self.assertEqual(len(resp.server), 1)
282 def test_get_one_server(self):
283 self._pairs = _generate_channel_server_pairs(1)
284 gss_resp = self._channelz_stub.GetServers(
285 channelz_pb2.GetServersRequest(start_server_id=0))
286 self.assertEqual(len(gss_resp.server), 1)
287 gs_resp = self._channelz_stub.GetServer(
288 channelz_pb2.GetServerRequest(
289 server_id=gss_resp.server[0].ref.server_id))
290 self.assertEqual(gss_resp.server[0].ref.server_id,
291 gs_resp.server.ref.server_id)
293 def test_server_call(self):
294 self._pairs = _generate_channel_server_pairs(1)
297 for i in range(k_success):
298 self._send_successful_unary_unary(0)
299 for i in range(k_failed):
300 self._send_failed_unary_unary(0)
302 resp = self._channelz_stub.GetServers(
303 channelz_pb2.GetServersRequest(start_server_id=0))
304 self.assertEqual(len(resp.server), 1)
305 self.assertEqual(resp.server[0].data.calls_started,
306 k_success + k_failed)
307 self.assertEqual(resp.server[0].data.calls_succeeded, k_success)
308 self.assertEqual(resp.server[0].data.calls_failed, k_failed)
310 def test_many_subchannels_and_sockets(self):
312 self._pairs = _generate_channel_server_pairs(k_channels)
315 for i in range(k_success):
316 self._send_successful_unary_unary(0)
317 self._send_successful_unary_unary(2)
318 for i in range(k_failed):
319 self._send_failed_unary_unary(1)
320 self._send_failed_unary_unary(2)
322 gtc_resp = self._channelz_stub.GetTopChannels(
323 channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
324 self.assertEqual(len(gtc_resp.channel), k_channels)
325 for i in range(k_channels):
326 # If no call performed in the channel, there shouldn't be any subchannel
327 if gtc_resp.channel[i].data.calls_started == 0:
328 self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0)
331 # Otherwise, the subchannel should exist
332 self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0)
333 gsc_resp = self._channelz_stub.GetSubchannel(
334 channelz_pb2.GetSubchannelRequest(
335 subchannel_id=gtc_resp.channel[i].subchannel_ref[0].
337 self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
339 gs_resp = self._channelz_stub.GetSocket(
340 channelz_pb2.GetSocketRequest(
341 socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
342 self.assertEqual(gsc_resp.subchannel.data.calls_started,
343 gs_resp.socket.data.streams_started)
344 self.assertEqual(gsc_resp.subchannel.data.calls_started,
345 gs_resp.socket.data.streams_succeeded)
346 # Calls started == messages sent, only valid for unary calls
347 self.assertEqual(gsc_resp.subchannel.data.calls_started,
348 gs_resp.socket.data.messages_sent)
349 # Only receive responses when the RPC was successful
350 self.assertEqual(gsc_resp.subchannel.data.calls_succeeded,
351 gs_resp.socket.data.messages_received)
353 def test_streaming_rpc(self):
354 self._pairs = _generate_channel_server_pairs(1)
355 # In C++, the argument for _send_successful_stream_stream is message length.
356 # Here the argument is still channel idx, to be consistent with the other two.
357 self._send_successful_stream_stream(0)
359 gc_resp = self._channelz_stub.GetChannel(
360 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
361 self.assertEqual(gc_resp.channel.data.calls_started, 1)
362 self.assertEqual(gc_resp.channel.data.calls_succeeded, 1)
363 self.assertEqual(gc_resp.channel.data.calls_failed, 0)
365 self.assertGreater(len(gc_resp.channel.subchannel_ref), 0)
367 gsc_resp = self._channelz_stub.GetSubchannel(
368 channelz_pb2.GetSubchannelRequest(
369 subchannel_id=gc_resp.channel.subchannel_ref[0].subchannel_id))
370 self.assertEqual(gsc_resp.subchannel.data.calls_started, 1)
371 self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1)
372 self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0)
374 self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
376 gs_resp = self._channelz_stub.GetSocket(
377 channelz_pb2.GetSocketRequest(
378 socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
379 self.assertEqual(gs_resp.socket.data.streams_started, 1)
380 self.assertEqual(gs_resp.socket.data.streams_succeeded, 1)
381 self.assertEqual(gs_resp.socket.data.streams_failed, 0)
382 self.assertEqual(gs_resp.socket.data.messages_sent,
383 test_constants.STREAM_LENGTH)
384 self.assertEqual(gs_resp.socket.data.messages_received,
385 test_constants.STREAM_LENGTH)
387 def test_server_sockets(self):
388 self._pairs = _generate_channel_server_pairs(1)
389 self._send_successful_unary_unary(0)
390 self._send_failed_unary_unary(0)
392 gs_resp = self._channelz_stub.GetServers(
393 channelz_pb2.GetServersRequest(start_server_id=0))
394 self.assertEqual(len(gs_resp.server), 1)
395 self.assertEqual(gs_resp.server[0].data.calls_started, 2)
396 self.assertEqual(gs_resp.server[0].data.calls_succeeded, 1)
397 self.assertEqual(gs_resp.server[0].data.calls_failed, 1)
399 gss_resp = self._channelz_stub.GetServerSockets(
400 channelz_pb2.GetServerSocketsRequest(
401 server_id=gs_resp.server[0].ref.server_id, start_socket_id=0))
402 # If the RPC call failed, it will raise a grpc.RpcError
403 # So, if there is no exception raised, considered pass
405 def test_server_listen_sockets(self):
406 self._pairs = _generate_channel_server_pairs(1)
408 gss_resp = self._channelz_stub.GetServers(
409 channelz_pb2.GetServersRequest(start_server_id=0))
410 self.assertEqual(len(gss_resp.server), 1)
411 self.assertEqual(len(gss_resp.server[0].listen_socket), 1)
413 gs_resp = self._channelz_stub.GetSocket(
414 channelz_pb2.GetSocketRequest(
415 socket_id=gss_resp.server[0].listen_socket[0].socket_id))
416 # If the RPC call failed, it will raise a grpc.RpcError
417 # So, if there is no exception raised, considered pass
419 def test_invalid_query_get_server(self):
421 self._channelz_stub.GetServer(
422 channelz_pb2.GetServerRequest(server_id=10000))
423 except BaseException as e:
424 self.assertIn('StatusCode.NOT_FOUND', str(e))
426 self.fail('Invalid query not detected')
428 def test_invalid_query_get_channel(self):
430 self._channelz_stub.GetChannel(
431 channelz_pb2.GetChannelRequest(channel_id=10000))
432 except BaseException as e:
433 self.assertIn('StatusCode.NOT_FOUND', str(e))
435 self.fail('Invalid query not detected')
437 def test_invalid_query_get_subchannel(self):
439 self._channelz_stub.GetSubchannel(
440 channelz_pb2.GetSubchannelRequest(subchannel_id=10000))
441 except BaseException as e:
442 self.assertIn('StatusCode.NOT_FOUND', str(e))
444 self.fail('Invalid query not detected')
446 def test_invalid_query_get_socket(self):
448 self._channelz_stub.GetSocket(
449 channelz_pb2.GetSocketRequest(socket_id=10000))
450 except BaseException as e:
451 self.assertIn('StatusCode.NOT_FOUND', str(e))
453 self.fail('Invalid query not detected')
455 def test_invalid_query_get_server_sockets(self):
457 self._channelz_stub.GetServerSockets(
458 channelz_pb2.GetServerSocketsRequest(
462 except BaseException as e:
463 self.assertIn('StatusCode.NOT_FOUND', str(e))
465 self.fail('Invalid query not detected')
468 if __name__ == '__main__':
469 unittest.main(verbosity=2)