Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / python / grpcio_tests / tests / channelz / _channelz_servicer_test.py
1 # Copyright 2018 The gRPC Authors
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests of grpc_channelz.v1.channelz."""
15
16 import unittest
17
18 from concurrent import futures
19
20 import grpc
21
22 from grpc_channelz.v1 import channelz
23 from grpc_channelz.v1 import channelz_pb2
24 from grpc_channelz.v1 import channelz_pb2_grpc
25
26 from tests.unit import test_common
27 from tests.unit.framework.common import test_constants
28
29 _SUCCESSFUL_UNARY_UNARY = '/test/SuccessfulUnaryUnary'
30 _FAILED_UNARY_UNARY = '/test/FailedUnaryUnary'
31 _SUCCESSFUL_STREAM_STREAM = '/test/SuccessfulStreamStream'
32
33 _REQUEST = b'\x00\x00\x00'
34 _RESPONSE = b'\x01\x01\x01'
35
36 _DISABLE_REUSE_PORT = (('grpc.so_reuseport', 0),)
37 _ENABLE_CHANNELZ = (('grpc.enable_channelz', 1),)
38 _DISABLE_CHANNELZ = (('grpc.enable_channelz', 0),)
39
40
41 def _successful_unary_unary(request, servicer_context):
42     return _RESPONSE
43
44
45 def _failed_unary_unary(request, servicer_context):
46     servicer_context.set_code(grpc.StatusCode.INTERNAL)
47     servicer_context.set_details("Channelz Test Intended Failure")
48
49
50 def _successful_stream_stream(request_iterator, servicer_context):
51     for _ in request_iterator:
52         yield _RESPONSE
53
54
55 class _GenericHandler(grpc.GenericRpcHandler):
56
57     def service(self, handler_call_details):
58         if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY:
59             return grpc.unary_unary_rpc_method_handler(_successful_unary_unary)
60         elif handler_call_details.method == _FAILED_UNARY_UNARY:
61             return grpc.unary_unary_rpc_method_handler(_failed_unary_unary)
62         elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM:
63             return grpc.stream_stream_rpc_method_handler(
64                 _successful_stream_stream)
65         else:
66             return None
67
68
69 class _ChannelServerPair(object):
70
71     def __init__(self):
72         # Server will enable channelz service
73         self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=3),
74                                   options=_DISABLE_REUSE_PORT +
75                                   _ENABLE_CHANNELZ)
76         port = self.server.add_insecure_port('[::]:0')
77         self.server.add_generic_rpc_handlers((_GenericHandler(),))
78         self.server.start()
79
80         # Channel will enable channelz service...
81         self.channel = grpc.insecure_channel('localhost:%d' % port,
82                                              _ENABLE_CHANNELZ)
83
84
85 def _generate_channel_server_pairs(n):
86     return [_ChannelServerPair() for i in range(n)]
87
88
89 def _close_channel_server_pairs(pairs):
90     for pair in pairs:
91         pair.server.stop(None)
92         pair.channel.close()
93
94
95 class ChannelzServicerTest(unittest.TestCase):
96
97     def _send_successful_unary_unary(self, idx):
98         _, r = self._pairs[idx].channel.unary_unary(
99             _SUCCESSFUL_UNARY_UNARY).with_call(_REQUEST)
100         self.assertEqual(r.code(), grpc.StatusCode.OK)
101
102     def _send_failed_unary_unary(self, idx):
103         try:
104             self._pairs[idx].channel.unary_unary(_FAILED_UNARY_UNARY).with_call(
105                 _REQUEST)
106         except grpc.RpcError:
107             return
108         else:
109             self.fail("This call supposed to fail")
110
111     def _send_successful_stream_stream(self, idx):
112         response_iterator = self._pairs[idx].channel.stream_stream(
113             _SUCCESSFUL_STREAM_STREAM).__call__(
114                 iter([_REQUEST] * test_constants.STREAM_LENGTH))
115         cnt = 0
116         for _ in response_iterator:
117             cnt += 1
118         self.assertEqual(cnt, test_constants.STREAM_LENGTH)
119
120     def _get_channel_id(self, idx):
121         """Channel id may not be consecutive"""
122         resp = self._channelz_stub.GetTopChannels(
123             channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
124         self.assertGreater(len(resp.channel), idx)
125         return resp.channel[idx].ref.channel_id
126
127     def setUp(self):
128         self._pairs = []
129         # This server is for Channelz info fetching only
130         # It self should not enable Channelz
131         self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=3),
132                                    options=_DISABLE_REUSE_PORT +
133                                    _DISABLE_CHANNELZ)
134         port = self._server.add_insecure_port('[::]:0')
135         channelz.add_channelz_servicer(self._server)
136         self._server.start()
137
138         # This channel is used to fetch Channelz info only
139         # Channelz should not be enabled
140         self._channel = grpc.insecure_channel('localhost:%d' % port,
141                                               _DISABLE_CHANNELZ)
142         self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel)
143
144     def tearDown(self):
145         self._server.stop(None)
146         self._channel.close()
147         _close_channel_server_pairs(self._pairs)
148
149     def test_get_top_channels_basic(self):
150         self._pairs = _generate_channel_server_pairs(1)
151         resp = self._channelz_stub.GetTopChannels(
152             channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
153         self.assertEqual(len(resp.channel), 1)
154         self.assertEqual(resp.end, True)
155
156     def test_get_top_channels_high_start_id(self):
157         self._pairs = _generate_channel_server_pairs(1)
158         resp = self._channelz_stub.GetTopChannels(
159             channelz_pb2.GetTopChannelsRequest(start_channel_id=10000))
160         self.assertEqual(len(resp.channel), 0)
161         self.assertEqual(resp.end, True)
162
163     def test_successful_request(self):
164         self._pairs = _generate_channel_server_pairs(1)
165         self._send_successful_unary_unary(0)
166         resp = self._channelz_stub.GetChannel(
167             channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
168         self.assertEqual(resp.channel.data.calls_started, 1)
169         self.assertEqual(resp.channel.data.calls_succeeded, 1)
170         self.assertEqual(resp.channel.data.calls_failed, 0)
171
172     def test_failed_request(self):
173         self._pairs = _generate_channel_server_pairs(1)
174         self._send_failed_unary_unary(0)
175         resp = self._channelz_stub.GetChannel(
176             channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
177         self.assertEqual(resp.channel.data.calls_started, 1)
178         self.assertEqual(resp.channel.data.calls_succeeded, 0)
179         self.assertEqual(resp.channel.data.calls_failed, 1)
180
181     def test_many_requests(self):
182         self._pairs = _generate_channel_server_pairs(1)
183         k_success = 7
184         k_failed = 9
185         for i in range(k_success):
186             self._send_successful_unary_unary(0)
187         for i in range(k_failed):
188             self._send_failed_unary_unary(0)
189         resp = self._channelz_stub.GetChannel(
190             channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
191         self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
192         self.assertEqual(resp.channel.data.calls_succeeded, k_success)
193         self.assertEqual(resp.channel.data.calls_failed, k_failed)
194
195     def test_many_channel(self):
196         k_channels = 4
197         self._pairs = _generate_channel_server_pairs(k_channels)
198         resp = self._channelz_stub.GetTopChannels(
199             channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
200         self.assertEqual(len(resp.channel), k_channels)
201
202     def test_many_requests_many_channel(self):
203         k_channels = 4
204         self._pairs = _generate_channel_server_pairs(k_channels)
205         k_success = 11
206         k_failed = 13
207         for i in range(k_success):
208             self._send_successful_unary_unary(0)
209             self._send_successful_unary_unary(2)
210         for i in range(k_failed):
211             self._send_failed_unary_unary(1)
212             self._send_failed_unary_unary(2)
213
214         # The first channel saw only successes
215         resp = self._channelz_stub.GetChannel(
216             channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
217         self.assertEqual(resp.channel.data.calls_started, k_success)
218         self.assertEqual(resp.channel.data.calls_succeeded, k_success)
219         self.assertEqual(resp.channel.data.calls_failed, 0)
220
221         # The second channel saw only failures
222         resp = self._channelz_stub.GetChannel(
223             channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(1)))
224         self.assertEqual(resp.channel.data.calls_started, k_failed)
225         self.assertEqual(resp.channel.data.calls_succeeded, 0)
226         self.assertEqual(resp.channel.data.calls_failed, k_failed)
227
228         # The third channel saw both successes and failures
229         resp = self._channelz_stub.GetChannel(
230             channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(2)))
231         self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
232         self.assertEqual(resp.channel.data.calls_succeeded, k_success)
233         self.assertEqual(resp.channel.data.calls_failed, k_failed)
234
235         # The fourth channel saw nothing
236         resp = self._channelz_stub.GetChannel(
237             channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(3)))
238         self.assertEqual(resp.channel.data.calls_started, 0)
239         self.assertEqual(resp.channel.data.calls_succeeded, 0)
240         self.assertEqual(resp.channel.data.calls_failed, 0)
241
242     def test_many_subchannels(self):
243         k_channels = 4
244         self._pairs = _generate_channel_server_pairs(k_channels)
245         k_success = 17
246         k_failed = 19
247         for i in range(k_success):
248             self._send_successful_unary_unary(0)
249             self._send_successful_unary_unary(2)
250         for i in range(k_failed):
251             self._send_failed_unary_unary(1)
252             self._send_failed_unary_unary(2)
253
254         gtc_resp = self._channelz_stub.GetTopChannels(
255             channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
256         self.assertEqual(len(gtc_resp.channel), k_channels)
257         for i in range(k_channels):
258             # If no call performed in the channel, there shouldn't be any subchannel
259             if gtc_resp.channel[i].data.calls_started == 0:
260                 self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0)
261                 continue
262
263             # Otherwise, the subchannel should exist
264             self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0)
265             gsc_resp = self._channelz_stub.GetSubchannel(
266                 channelz_pb2.GetSubchannelRequest(
267                     subchannel_id=gtc_resp.channel[i].subchannel_ref[0].
268                     subchannel_id))
269             self.assertEqual(gtc_resp.channel[i].data.calls_started,
270                              gsc_resp.subchannel.data.calls_started)
271             self.assertEqual(gtc_resp.channel[i].data.calls_succeeded,
272                              gsc_resp.subchannel.data.calls_succeeded)
273             self.assertEqual(gtc_resp.channel[i].data.calls_failed,
274                              gsc_resp.subchannel.data.calls_failed)
275
276     def test_server_basic(self):
277         self._pairs = _generate_channel_server_pairs(1)
278         resp = self._channelz_stub.GetServers(
279             channelz_pb2.GetServersRequest(start_server_id=0))
280         self.assertEqual(len(resp.server), 1)
281
282     def test_get_one_server(self):
283         self._pairs = _generate_channel_server_pairs(1)
284         gss_resp = self._channelz_stub.GetServers(
285             channelz_pb2.GetServersRequest(start_server_id=0))
286         self.assertEqual(len(gss_resp.server), 1)
287         gs_resp = self._channelz_stub.GetServer(
288             channelz_pb2.GetServerRequest(
289                 server_id=gss_resp.server[0].ref.server_id))
290         self.assertEqual(gss_resp.server[0].ref.server_id,
291                          gs_resp.server.ref.server_id)
292
293     def test_server_call(self):
294         self._pairs = _generate_channel_server_pairs(1)
295         k_success = 23
296         k_failed = 29
297         for i in range(k_success):
298             self._send_successful_unary_unary(0)
299         for i in range(k_failed):
300             self._send_failed_unary_unary(0)
301
302         resp = self._channelz_stub.GetServers(
303             channelz_pb2.GetServersRequest(start_server_id=0))
304         self.assertEqual(len(resp.server), 1)
305         self.assertEqual(resp.server[0].data.calls_started,
306                          k_success + k_failed)
307         self.assertEqual(resp.server[0].data.calls_succeeded, k_success)
308         self.assertEqual(resp.server[0].data.calls_failed, k_failed)
309
310     def test_many_subchannels_and_sockets(self):
311         k_channels = 4
312         self._pairs = _generate_channel_server_pairs(k_channels)
313         k_success = 3
314         k_failed = 5
315         for i in range(k_success):
316             self._send_successful_unary_unary(0)
317             self._send_successful_unary_unary(2)
318         for i in range(k_failed):
319             self._send_failed_unary_unary(1)
320             self._send_failed_unary_unary(2)
321
322         gtc_resp = self._channelz_stub.GetTopChannels(
323             channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
324         self.assertEqual(len(gtc_resp.channel), k_channels)
325         for i in range(k_channels):
326             # If no call performed in the channel, there shouldn't be any subchannel
327             if gtc_resp.channel[i].data.calls_started == 0:
328                 self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0)
329                 continue
330
331             # Otherwise, the subchannel should exist
332             self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0)
333             gsc_resp = self._channelz_stub.GetSubchannel(
334                 channelz_pb2.GetSubchannelRequest(
335                     subchannel_id=gtc_resp.channel[i].subchannel_ref[0].
336                     subchannel_id))
337             self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
338
339             gs_resp = self._channelz_stub.GetSocket(
340                 channelz_pb2.GetSocketRequest(
341                     socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
342             self.assertEqual(gsc_resp.subchannel.data.calls_started,
343                              gs_resp.socket.data.streams_started)
344             self.assertEqual(gsc_resp.subchannel.data.calls_started,
345                              gs_resp.socket.data.streams_succeeded)
346             # Calls started == messages sent, only valid for unary calls
347             self.assertEqual(gsc_resp.subchannel.data.calls_started,
348                              gs_resp.socket.data.messages_sent)
349             # Only receive responses when the RPC was successful
350             self.assertEqual(gsc_resp.subchannel.data.calls_succeeded,
351                              gs_resp.socket.data.messages_received)
352
353     def test_streaming_rpc(self):
354         self._pairs = _generate_channel_server_pairs(1)
355         # In C++, the argument for _send_successful_stream_stream is message length.
356         # Here the argument is still channel idx, to be consistent with the other two.
357         self._send_successful_stream_stream(0)
358
359         gc_resp = self._channelz_stub.GetChannel(
360             channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
361         self.assertEqual(gc_resp.channel.data.calls_started, 1)
362         self.assertEqual(gc_resp.channel.data.calls_succeeded, 1)
363         self.assertEqual(gc_resp.channel.data.calls_failed, 0)
364         # Subchannel exists
365         self.assertGreater(len(gc_resp.channel.subchannel_ref), 0)
366
367         gsc_resp = self._channelz_stub.GetSubchannel(
368             channelz_pb2.GetSubchannelRequest(
369                 subchannel_id=gc_resp.channel.subchannel_ref[0].subchannel_id))
370         self.assertEqual(gsc_resp.subchannel.data.calls_started, 1)
371         self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1)
372         self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0)
373         # Socket exists
374         self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
375
376         gs_resp = self._channelz_stub.GetSocket(
377             channelz_pb2.GetSocketRequest(
378                 socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
379         self.assertEqual(gs_resp.socket.data.streams_started, 1)
380         self.assertEqual(gs_resp.socket.data.streams_succeeded, 1)
381         self.assertEqual(gs_resp.socket.data.streams_failed, 0)
382         self.assertEqual(gs_resp.socket.data.messages_sent,
383                          test_constants.STREAM_LENGTH)
384         self.assertEqual(gs_resp.socket.data.messages_received,
385                          test_constants.STREAM_LENGTH)
386
387     def test_server_sockets(self):
388         self._pairs = _generate_channel_server_pairs(1)
389         self._send_successful_unary_unary(0)
390         self._send_failed_unary_unary(0)
391
392         gs_resp = self._channelz_stub.GetServers(
393             channelz_pb2.GetServersRequest(start_server_id=0))
394         self.assertEqual(len(gs_resp.server), 1)
395         self.assertEqual(gs_resp.server[0].data.calls_started, 2)
396         self.assertEqual(gs_resp.server[0].data.calls_succeeded, 1)
397         self.assertEqual(gs_resp.server[0].data.calls_failed, 1)
398
399         gss_resp = self._channelz_stub.GetServerSockets(
400             channelz_pb2.GetServerSocketsRequest(
401                 server_id=gs_resp.server[0].ref.server_id, start_socket_id=0))
402         # If the RPC call failed, it will raise a grpc.RpcError
403         # So, if there is no exception raised, considered pass
404
405     def test_server_listen_sockets(self):
406         self._pairs = _generate_channel_server_pairs(1)
407
408         gss_resp = self._channelz_stub.GetServers(
409             channelz_pb2.GetServersRequest(start_server_id=0))
410         self.assertEqual(len(gss_resp.server), 1)
411         self.assertEqual(len(gss_resp.server[0].listen_socket), 1)
412
413         gs_resp = self._channelz_stub.GetSocket(
414             channelz_pb2.GetSocketRequest(
415                 socket_id=gss_resp.server[0].listen_socket[0].socket_id))
416         # If the RPC call failed, it will raise a grpc.RpcError
417         # So, if there is no exception raised, considered pass
418
419     def test_invalid_query_get_server(self):
420         try:
421             self._channelz_stub.GetServer(
422                 channelz_pb2.GetServerRequest(server_id=10000))
423         except BaseException as e:
424             self.assertIn('StatusCode.NOT_FOUND', str(e))
425         else:
426             self.fail('Invalid query not detected')
427
428     def test_invalid_query_get_channel(self):
429         try:
430             self._channelz_stub.GetChannel(
431                 channelz_pb2.GetChannelRequest(channel_id=10000))
432         except BaseException as e:
433             self.assertIn('StatusCode.NOT_FOUND', str(e))
434         else:
435             self.fail('Invalid query not detected')
436
437     def test_invalid_query_get_subchannel(self):
438         try:
439             self._channelz_stub.GetSubchannel(
440                 channelz_pb2.GetSubchannelRequest(subchannel_id=10000))
441         except BaseException as e:
442             self.assertIn('StatusCode.NOT_FOUND', str(e))
443         else:
444             self.fail('Invalid query not detected')
445
446     def test_invalid_query_get_socket(self):
447         try:
448             self._channelz_stub.GetSocket(
449                 channelz_pb2.GetSocketRequest(socket_id=10000))
450         except BaseException as e:
451             self.assertIn('StatusCode.NOT_FOUND', str(e))
452         else:
453             self.fail('Invalid query not detected')
454
455     def test_invalid_query_get_server_sockets(self):
456         try:
457             self._channelz_stub.GetServerSockets(
458                 channelz_pb2.GetServerSocketsRequest(
459                     server_id=10000,
460                     start_socket_id=0,
461                 ))
462         except BaseException as e:
463             self.assertIn('StatusCode.NOT_FOUND', str(e))
464         else:
465             self.fail('Invalid query not detected')
466
467
468 if __name__ == '__main__':
469     unittest.main(verbosity=2)