1 # Copyright 2018 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Implementations of fork support test methods."""
19 import multiprocessing
26 from six.moves import queue
28 from src.proto.grpc.testing import empty_pb2
29 from src.proto.grpc.testing import messages_pb2
30 from src.proto.grpc.testing import test_pb2_grpc
32 _LOGGER = logging.getLogger(__name__)
34 _CHILD_FINISH_TIMEOUT_S = 60
38 target = '{}:{}'.format(args['server_host'], args['server_port'])
40 channel_credentials = grpc.ssl_channel_credentials()
41 channel = grpc.secure_channel(target, channel_credentials)
43 channel = grpc.insecure_channel(target)
47 def _validate_payload_type_and_length(response, expected_type, expected_length):
48 if response.payload.type is not expected_type:
49 raise ValueError('expected payload type %s, got %s' %
50 (expected_type, type(response.payload.type)))
51 elif len(response.payload.body) != expected_length:
52 raise ValueError('expected payload body size %d, got %d' %
53 (expected_length, len(response.payload.body)))
56 def _async_unary(stub):
58 request = messages_pb2.SimpleRequest(
59 response_type=messages_pb2.COMPRESSABLE,
61 payload=messages_pb2.Payload(body=b'\x00' * 271828))
62 response_future = stub.UnaryCall.future(request, timeout=_RPC_TIMEOUT_S)
63 response = response_future.result()
64 _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
67 def _blocking_unary(stub):
69 request = messages_pb2.SimpleRequest(
70 response_type=messages_pb2.COMPRESSABLE,
72 payload=messages_pb2.Payload(body=b'\x00' * 271828))
73 response = stub.UnaryCall(request, timeout=_RPC_TIMEOUT_S)
74 _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE, size)
80 self._condition = threading.Condition()
92 while not self._values and self._open:
93 self._condition.wait()
95 return self._values.pop(0)
100 with self._condition:
101 self._values.append(value)
102 self._condition.notify()
105 with self._condition:
107 self._condition.notify()
112 def __exit__(self, type, value, traceback):
116 class _ChildProcess(object):
118 def __init__(self, task, args=None):
121 self._exceptions = multiprocessing.Queue()
123 def record_exceptions():
126 except grpc.RpcError as rpc_error:
127 self._exceptions.put('RpcError: %s' % rpc_error)
128 except Exception as e: # pylint: disable=broad-except
129 self._exceptions.put(e)
131 self._process = multiprocessing.Process(target=record_exceptions)
134 self._process.start()
137 self._process.join(timeout=_CHILD_FINISH_TIMEOUT_S)
138 if self._process.is_alive():
139 raise RuntimeError('Child process did not terminate')
140 if self._process.exitcode != 0:
141 raise ValueError('Child process failed with exitcode %d' %
142 self._process.exitcode)
144 exception = self._exceptions.get(block=False)
145 raise ValueError('Child process failed: %s' % exception)
150 def _async_unary_same_channel(channel):
156 'Child should not be able to re-use channel after fork')
157 except ValueError as expected_value_error:
160 stub = test_pb2_grpc.TestServiceStub(channel)
162 child_process = _ChildProcess(child_target)
163 child_process.start()
165 child_process.finish()
168 def _async_unary_new_channel(channel, args):
171 with _channel(args) as child_channel:
172 child_stub = test_pb2_grpc.TestServiceStub(child_channel)
173 _async_unary(child_stub)
174 child_channel.close()
176 stub = test_pb2_grpc.TestServiceStub(channel)
178 child_process = _ChildProcess(child_target)
179 child_process.start()
181 child_process.finish()
184 def _blocking_unary_same_channel(channel):
188 _blocking_unary(stub)
190 'Child should not be able to re-use channel after fork')
191 except ValueError as expected_value_error:
194 stub = test_pb2_grpc.TestServiceStub(channel)
195 _blocking_unary(stub)
196 child_process = _ChildProcess(child_target)
197 child_process.start()
198 child_process.finish()
201 def _blocking_unary_new_channel(channel, args):
204 with _channel(args) as child_channel:
205 child_stub = test_pb2_grpc.TestServiceStub(child_channel)
206 _blocking_unary(child_stub)
208 stub = test_pb2_grpc.TestServiceStub(channel)
209 _blocking_unary(stub)
210 child_process = _ChildProcess(child_target)
211 child_process.start()
212 _blocking_unary(stub)
213 child_process.finish()
216 # Verify that the fork channel registry can handle already closed channels
217 def _close_channel_before_fork(channel, args):
221 with _channel(args) as child_channel:
222 child_stub = test_pb2_grpc.TestServiceStub(child_channel)
223 _blocking_unary(child_stub)
225 stub = test_pb2_grpc.TestServiceStub(channel)
226 _blocking_unary(stub)
229 with _channel(args) as new_channel:
230 new_stub = test_pb2_grpc.TestServiceStub(new_channel)
231 child_process = _ChildProcess(child_target)
232 child_process.start()
233 _blocking_unary(new_stub)
234 child_process.finish()
237 def _connectivity_watch(channel, args):
240 parent_channel_ready_event = threading.Event()
244 child_channel_ready_event = threading.Event()
246 def child_connectivity_callback(state):
247 if state is grpc.ChannelConnectivity.READY:
248 child_channel_ready_event.set()
250 with _channel(args) as child_channel:
251 child_stub = test_pb2_grpc.TestServiceStub(child_channel)
252 child_channel.subscribe(child_connectivity_callback)
253 _async_unary(child_stub)
254 if not child_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S):
255 raise ValueError('Channel did not move to READY')
256 if len(parent_states) > 1:
258 'Received connectivity updates on parent callback',
260 child_channel.unsubscribe(child_connectivity_callback)
262 def parent_connectivity_callback(state):
263 parent_states.append(state)
264 if state is grpc.ChannelConnectivity.READY:
265 parent_channel_ready_event.set()
267 channel.subscribe(parent_connectivity_callback)
268 stub = test_pb2_grpc.TestServiceStub(channel)
269 child_process = _ChildProcess(child_target)
270 child_process.start()
272 if not parent_channel_ready_event.wait(timeout=_RPC_TIMEOUT_S):
273 raise ValueError('Channel did not move to READY')
274 channel.unsubscribe(parent_connectivity_callback)
275 child_process.finish()
278 def _ping_pong_with_child_processes_after_first_response(
279 channel, args, child_target, run_after_close=True):
280 request_response_sizes = (
286 request_payload_sizes = (
292 stub = test_pb2_grpc.TestServiceStub(channel)
294 parent_bidi_call = stub.FullDuplexCall(pipe)
296 first_message_received = False
297 for response_size, payload_size in zip(request_response_sizes,
298 request_payload_sizes):
299 request = messages_pb2.StreamingOutputCallRequest(
300 response_type=messages_pb2.COMPRESSABLE,
301 response_parameters=(messages_pb2.ResponseParameters(
302 size=response_size),),
303 payload=messages_pb2.Payload(body=b'\x00' * payload_size))
305 if first_message_received:
306 child_process = _ChildProcess(child_target,
307 (parent_bidi_call, channel, args))
308 child_process.start()
309 child_processes.append(child_process)
310 response = next(parent_bidi_call)
311 first_message_received = True
312 child_process = _ChildProcess(child_target,
313 (parent_bidi_call, channel, args))
314 child_process.start()
315 child_processes.append(child_process)
316 _validate_payload_type_and_length(response, messages_pb2.COMPRESSABLE,
320 child_process = _ChildProcess(child_target,
321 (parent_bidi_call, channel, args))
322 child_process.start()
323 child_processes.append(child_process)
324 for child_process in child_processes:
325 child_process.finish()
328 def _in_progress_bidi_continue_call(channel):
330 def child_target(parent_bidi_call, parent_channel, args):
331 stub = test_pb2_grpc.TestServiceStub(parent_channel)
335 'Child should not be able to re-use channel after fork')
336 except ValueError as expected_value_error:
338 inherited_code = parent_bidi_call.code()
339 inherited_details = parent_bidi_call.details()
340 if inherited_code != grpc.StatusCode.CANCELLED:
341 raise ValueError('Expected inherited code CANCELLED, got %s' %
343 if inherited_details != 'Channel closed due to fork':
345 'Expected inherited details Channel closed due to fork, got %s'
348 # Don't run child_target after closing the parent call, as the call may have
349 # received a status from the server before fork occurs.
350 _ping_pong_with_child_processes_after_first_response(channel,
353 run_after_close=False)
356 def _in_progress_bidi_same_channel_async_call(channel):
358 def child_target(parent_bidi_call, parent_channel, args):
359 stub = test_pb2_grpc.TestServiceStub(parent_channel)
363 'Child should not be able to re-use channel after fork')
364 except ValueError as expected_value_error:
367 _ping_pong_with_child_processes_after_first_response(
368 channel, None, child_target)
371 def _in_progress_bidi_same_channel_blocking_call(channel):
373 def child_target(parent_bidi_call, parent_channel, args):
374 stub = test_pb2_grpc.TestServiceStub(parent_channel)
376 _blocking_unary(stub)
378 'Child should not be able to re-use channel after fork')
379 except ValueError as expected_value_error:
382 _ping_pong_with_child_processes_after_first_response(
383 channel, None, child_target)
386 def _in_progress_bidi_new_channel_async_call(channel, args):
388 def child_target(parent_bidi_call, parent_channel, args):
389 with _channel(args) as channel:
390 stub = test_pb2_grpc.TestServiceStub(channel)
393 _ping_pong_with_child_processes_after_first_response(
394 channel, args, child_target)
397 def _in_progress_bidi_new_channel_blocking_call(channel, args):
399 def child_target(parent_bidi_call, parent_channel, args):
400 with _channel(args) as channel:
401 stub = test_pb2_grpc.TestServiceStub(channel)
402 _blocking_unary(stub)
404 _ping_pong_with_child_processes_after_first_response(
405 channel, args, child_target)
409 class TestCase(enum.Enum):
411 CONNECTIVITY_WATCH = 'connectivity_watch'
412 CLOSE_CHANNEL_BEFORE_FORK = 'close_channel_before_fork'
413 ASYNC_UNARY_SAME_CHANNEL = 'async_unary_same_channel'
414 ASYNC_UNARY_NEW_CHANNEL = 'async_unary_new_channel'
415 BLOCKING_UNARY_SAME_CHANNEL = 'blocking_unary_same_channel'
416 BLOCKING_UNARY_NEW_CHANNEL = 'blocking_unary_new_channel'
417 IN_PROGRESS_BIDI_CONTINUE_CALL = 'in_progress_bidi_continue_call'
418 IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL = 'in_progress_bidi_same_channel_async_call'
419 IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_same_channel_blocking_call'
420 IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL = 'in_progress_bidi_new_channel_async_call'
421 IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL = 'in_progress_bidi_new_channel_blocking_call'
423 def run_test(self, args):
424 _LOGGER.info("Running %s", self)
425 channel = _channel(args)
426 if self is TestCase.ASYNC_UNARY_SAME_CHANNEL:
427 _async_unary_same_channel(channel)
428 elif self is TestCase.ASYNC_UNARY_NEW_CHANNEL:
429 _async_unary_new_channel(channel, args)
430 elif self is TestCase.BLOCKING_UNARY_SAME_CHANNEL:
431 _blocking_unary_same_channel(channel)
432 elif self is TestCase.BLOCKING_UNARY_NEW_CHANNEL:
433 _blocking_unary_new_channel(channel, args)
434 elif self is TestCase.CLOSE_CHANNEL_BEFORE_FORK:
435 _close_channel_before_fork(channel, args)
436 elif self is TestCase.CONNECTIVITY_WATCH:
437 _connectivity_watch(channel, args)
438 elif self is TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL:
439 _in_progress_bidi_continue_call(channel)
440 elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL:
441 _in_progress_bidi_same_channel_async_call(channel)
442 elif self is TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL:
443 _in_progress_bidi_same_channel_blocking_call(channel)
444 elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL:
445 _in_progress_bidi_new_channel_async_call(channel, args)
446 elif self is TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL:
447 _in_progress_bidi_new_channel_blocking_call(channel, args)
449 raise NotImplementedError('Test case "%s" not implemented!' %