Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / python / grpcio_tests / tests / fork / methods.py
1 # Copyright 2018 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Implementations of fork support test methods."""
15
16 import enum
17 import json
18 import logging
19 import multiprocessing
20 import os
21 import threading
22 import time
23
24 import grpc
25
26 from six.moves import queue
27
28 from src.proto.grpc.testing import empty_pb2
29 from src.proto.grpc.testing import messages_pb2
30 from src.proto.grpc.testing import test_pb2_grpc
31
32 _LOGGER = logging.getLogger(__name__)
33 _RPC_TIMEOUT_S = 10
34 _CHILD_FINISH_TIMEOUT_S = 60
35
36
37 def _channel(args):
38     target = '{}:{}'.format(args['server_host'], args['server_port'])
39     if args['use_tls']:
40         channel_credentials = grpc.ssl_channel_credentials()
41         channel = grpc.secure_channel(target, channel_credentials)
42     else:
43         channel = grpc.insecure_channel(target)
44     return channel
45
46
47 def _validate_payload_type_and_length(response, expected_type, expected_length):
48     if response.payload.type is not expected_type:
49         raise ValueError('expected payload type %s, got %s' %
50                          (expected_type, type(response.payload.type)))
51     elif len(response.payload.body) != expected_length:
52         raise ValueError('expected payload body size %d, got %d' %
53                          (expected_length, len(response.payload.body)))
54
55
56 def _async_unary(stub):
57     size = 314159
58     request = messages_pb2.SimpleRequest(
59         response_type=messages_pb2.COMPRESSABLE,
60         response_size=size,
61         payload=messages_pb2.Payload(body=b'\x00' * 271828))
62     response_future = stub.UnaryCall.future(request, timeout=_RPC_TIMEOUT_S)
63     response = response_future.result()
64     _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
65
66
67 def _blocking_unary(stub):
68     size = 314159
69     request = messages_pb2.SimpleRequest(
70         response_type=messages_pb2.COMPRESSABLE,
71         response_size=size,
72         payload=messages_pb2.Payload(body=b'\x00' * 271828))
73     response = stub.UnaryCall(request, timeout=_RPC_TIMEOUT_S)
74     _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
75
76
77 class _Pipe(object):
78
79     def __init__(self):
80         self._condition = threading.Condition()
81         self._values = []
82         self._open = True
83
84     def __iter__(self):
85         return self
86
87     def __next__(self):
88         return self.next()
89
90     def next(self):
91         with self._condition:
92             while not self._values and self._open:
93                 self._condition.wait()
94             if self._values:
95                 return self._values.pop(0)
96             else:
97                 raise StopIteration()
98
99     def add(self, value):
100         with self._condition:
101             self._values.append(value)
102             self._condition.notify()
103
104     def close(self):
105         with self._condition:
106             self._open = False
107             self._condition.notify()
108
109     def __enter__(self):
110         return self
111
112     def __exit__(self, type, value, traceback):
113         self.close()
114
115
116 class _ChildProcess(object):
117
118     def __init__(self, task, args=None):
119         if args is None:
120             args = ()
121         self._exceptions = multiprocessing.Queue()
122
123         def record_exceptions():
124             try:
125                 task(*args)
126             except grpc.RpcError as rpc_error:
127                 self._exceptions.put('RpcError: %s' % rpc_error)
128             except Exception as e:  # pylint: disable=broad-except
129                 self._exceptions.put(e)
130
131         self._process = multiprocessing.Process(target=record_exceptions)
132
133     def start(self):
134         self._process.start()
135
136     def finish(self):
137         self._process.join(timeout=_CHILD_FINISH_TIMEOUT_S)
138         if self._process.is_alive():
139             raise RuntimeError('Child process did not terminate')
140         if self._process.exitcode != 0:
141             raise ValueError('Child process failed with exitcode %d' %
142                              self._process.exitcode)
143         try:
144             exception = self._exceptions.get(block=False)
145             raise ValueError('Child process failed: %s' % exception)
146         except queue.Empty:
147             pass
148
149
150 def _async_unary_same_channel(channel):
151
152     def child_target():
153         try:
154             _async_unary(stub)
155             raise Exception(
156                 'Child should not be able to re-use channel after fork')
157         except ValueError as expected_value_error:
158             pass
159
160     stub = test_pb2_grpc.TestServiceStub(channel)
161     _async_unary(stub)
162     child_process = _ChildProcess(child_target)
163     child_process.start()
164     _async_unary(stub)
165     child_process.finish()
166
167
168 def _async_unary_new_channel(channel, args):
169
170     def child_target():
171         with _channel(args) as child_channel:
172             child_stub = test_pb2_grpc.TestServiceStub(child_channel)
173             _async_unary(child_stub)
174             child_channel.close()
175
176     stub = test_pb2_grpc.TestServiceStub(channel)
177     _async_unary(stub)
178     child_process = _ChildProcess(child_target)
179     child_process.start()
180     _async_unary(stub)
181     child_process.finish()
182
183
184 def _blocking_unary_same_channel(channel):
185
186     def child_target():
187         try:
188             _blocking_unary(stub)
189             raise Exception(
190                 'Child should not be able to re-use channel after fork')
191         except ValueError as expected_value_error:
192             pass
193
194     stub = test_pb2_grpc.TestServiceStub(channel)
195     _blocking_unary(stub)
196     child_process = _ChildProcess(child_target)
197     child_process.start()
198     child_process.finish()
199
200
201 def _blocking_unary_new_channel(channel, args):
202
203     def child_target():
204         with _channel(args) as child_channel:
205             child_stub = test_pb2_grpc.TestServiceStub(child_channel)
206             _blocking_unary(child_stub)
207
208     stub = test_pb2_grpc.TestServiceStub(channel)
209     _blocking_unary(stub)
210     child_process = _ChildProcess(child_target)
211     child_process.start()
212     _blocking_unary(stub)
213     child_process.finish()
214
215
216 # Verify that the fork channel registry can handle already closed channels
217 def _close_channel_before_fork(channel, args):
218
219     def child_target():
220         new_channel.close()
221         with _channel(args) as child_channel:
222             child_stub = test_pb2_grpc.TestServiceStub(child_channel)
223             _blocking_unary(child_stub)
224
225     stub = test_pb2_grpc.TestServiceStub(channel)
226     _blocking_unary(stub)
227     channel.close()
228
229     with _channel(args) as new_channel:
230         new_stub = test_pb2_grpc.TestServiceStub(new_channel)
231         child_process = _ChildProcess(child_target)
232         child_process.start()
233         _blocking_unary(new_stub)
234         child_process.finish()
235
236
237 def _connectivity_watch(channel, args):
238
239     parent_states = []
240     parent_channel_ready_event = threading.Event()
241
242     def child_target():
243
244         child_channel_ready_event = threading.Event()
245
246         def child_connectivity_callback(state):
247             if state is grpc.ChannelConnectivity.READY:
248                 child_channel_ready_event.set()
249
250         with _channel(args) as child_channel:
251             child_stub = test_pb2_grpc.TestServiceStub(child_channel)
252             child_channel.subscribe(child_connectivity_callback)
253             _async_unary(child_stub)
254             if not child_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S):
255                 raise ValueError('Channel did not move to READY')
256             if len(parent_states) > 1:
257                 raise ValueError(
258                     'Received connectivity updates on parent callback',
259                     parent_states)
260             child_channel.unsubscribe(child_connectivity_callback)
261
262     def parent_connectivity_callback(state):
263         parent_states.append(state)
264         if state is grpc.ChannelConnectivity.READY:
265             parent_channel_ready_event.set()
266
267     channel.subscribe(parent_connectivity_callback)
268     stub = test_pb2_grpc.TestServiceStub(channel)
269     child_process = _ChildProcess(child_target)
270     child_process.start()
271     _async_unary(stub)
272     if not parent_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S):
273         raise ValueError('Channel did not move to READY')
274     channel.unsubscribe(parent_connectivity_callback)
275     child_process.finish()
276
277
278 def _ping_pong_with_child_processes_after_first_response(
279         channel, args, child_target, run_after_close=True):
280     request_response_sizes = (
281         31415,
282         9,
283         2653,
284         58979,
285     )
286     request_payload_sizes = (
287         27182,
288         8,
289         1828,
290         45904,
291     )
292     stub = test_pb2_grpc.TestServiceStub(channel)
293     pipe = _Pipe()
294     parent_bidi_call = stub.FullDuplexCall(pipe)
295     child_processes = []
296     first_message_received = False
297     for response_size, payload_size in zip(request_response_sizes,
298                                            request_payload_sizes):
299         request = messages_pb2.StreamingOutputCallRequest(
300             response_type=messages_pb2.COMPRESSABLE,
301             response_parameters=(messages_pb2.ResponseParameters(
302                 size=response_size),),
303             payload=messages_pb2.Payload(body=b'\x00' * payload_size))
304         pipe.add(request)
305         if first_message_received:
306             child_process = _ChildProcess(child_target,
307                                           (parent_bidi_call, channel, args))
308             child_process.start()
309             child_processes.append(child_process)
310         response = next(parent_bidi_call)
311         first_message_received = True
312         child_process = _ChildProcess(child_target,
313                                       (parent_bidi_call, channel, args))
314         child_process.start()
315         child_processes.append(child_process)
316         _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE,
317                                           response_size)
318     pipe.close()
319     if run_after_close:
320         child_process = _ChildProcess(child_target,
321                                       (parent_bidi_call, channel, args))
322         child_process.start()
323         child_processes.append(child_process)
324     for child_process in child_processes:
325         child_process.finish()
326
327
328 def _in_progress_bidi_continue_call(channel):
329
330     def child_target(parent_bidi_call, parent_channel, args):
331         stub = test_pb2_grpc.TestServiceStub(parent_channel)
332         try:
333             _async_unary(stub)
334             raise Exception(
335                 'Child should not be able to re-use channel after fork')
336         except ValueError as expected_value_error:
337             pass
338         inherited_code = parent_bidi_call.code()
339         inherited_details = parent_bidi_call.details()
340         if inherited_code != grpc.StatusCode.CANCELLED:
341             raise ValueError('Expected inherited code CANCELLED, got %s' %
342                              inherited_code)
343         if inherited_details != 'Channel closed due to fork':
344             raise ValueError(
345                 'Expected inherited details Channel closed due to fork, got %s'
346                 % inherited_details)
347
348     # Don't run child_target after closing the parent call, as the call may have
349     # received a status from the  server before fork occurs.
350     _ping_pong_with_child_processes_after_first_response(channel,
351                                                          None,
352                                                          child_target,
353                                                          run_after_close=False)
354
355
356 def _in_progress_bidi_same_channel_async_call(channel):
357
358     def child_target(parent_bidi_call, parent_channel, args):
359         stub = test_pb2_grpc.TestServiceStub(parent_channel)
360         try:
361             _async_unary(stub)
362             raise Exception(
363                 'Child should not be able to re-use channel after fork')
364         except ValueError as expected_value_error:
365             pass
366
367     _ping_pong_with_child_processes_after_first_response(
368         channel, None, child_target)
369
370
371 def _in_progress_bidi_same_channel_blocking_call(channel):
372
373     def child_target(parent_bidi_call, parent_channel, args):
374         stub = test_pb2_grpc.TestServiceStub(parent_channel)
375         try:
376             _blocking_unary(stub)
377             raise Exception(
378                 'Child should not be able to re-use channel after fork')
379         except ValueError as expected_value_error:
380             pass
381
382     _ping_pong_with_child_processes_after_first_response(
383         channel, None, child_target)
384
385
386 def _in_progress_bidi_new_channel_async_call(channel, args):
387
388     def child_target(parent_bidi_call, parent_channel, args):
389         with _channel(args) as channel:
390             stub = test_pb2_grpc.TestServiceStub(channel)
391             _async_unary(stub)
392
393     _ping_pong_with_child_processes_after_first_response(
394         channel, args, child_target)
395
396
397 def _in_progress_bidi_new_channel_blocking_call(channel, args):
398
399     def child_target(parent_bidi_call, parent_channel, args):
400         with _channel(args) as channel:
401             stub = test_pb2_grpc.TestServiceStub(channel)
402             _blocking_unary(stub)
403
404     _ping_pong_with_child_processes_after_first_response(
405         channel, args, child_target)
406
407
408 @enum.unique
409 class TestCase(enum.Enum):
410
411     CONNECTIVITY_WATCH = 'connectivity_watch'
412     CLOSE_CHANNEL_BEFORE_FORK = 'close_channel_before_fork'
413     ASYNC_UNARY_SAME_CHANNEL = 'async_unary_same_channel'
414     ASYNC_UNARY_NEW_CHANNEL = 'async_unary_new_channel'
415     BLOCKING_UNARY_SAME_CHANNEL = 'blocking_unary_same_channel'
416     BLOCKING_UNARY_NEW_CHANNEL = 'blocking_unary_new_channel'
417     IN_PROGRESS_BIDI_CONTINUE_CALL = 'in_progress_bidi_continue_call'
418     IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL = 'in_progress_bidi_same_channel_async_call'
419     IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_same_channel_blocking_call'
420     IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL = 'in_progress_bidi_new_channel_async_call'
421     IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_new_channel_blocking_call'
422
423     def run_test(self, args):
424         _LOGGER.info("Running %s", self)
425         channel = _channel(args)
426         if self is TestCase.ASYNC_UNARY_SAME_CHANNEL:
427             _async_unary_same_channel(channel)
428         elif self is TestCase.ASYNC_UNARY_NEW_CHANNEL:
429             _async_unary_new_channel(channel, args)
430         elif self is TestCase.BLOCKING_UNARY_SAME_CHANNEL:
431             _blocking_unary_same_channel(channel)
432         elif self is TestCase.BLOCKING_UNARY_NEW_CHANNEL:
433             _blocking_unary_new_channel(channel, args)
434         elif self is TestCase.CLOSE_CHANNEL_BEFORE_FORK:
435             _close_channel_before_fork(channel, args)
436         elif self is TestCase.CONNECTIVITY_WATCH:
437             _connectivity_watch(channel, args)
438         elif self is TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL:
439             _in_progress_bidi_continue_call(channel)
440         elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL:
441             _in_progress_bidi_same_channel_async_call(channel)
442         elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL:
443             _in_progress_bidi_same_channel_blocking_call(channel)
444         elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL:
445             _in_progress_bidi_new_channel_async_call(channel, args)
446         elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL:
447             _in_progress_bidi_new_channel_blocking_call(channel, args)
448         else:
449             raise NotImplementedError('Test case "%s" not implemented!' %
450                                       self.name)
451         channel.close()