1 # Copyright 2019 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Client-side fork interop tests as a unit test."""
22 from grpc._cython import cygrpc
23 from tests.fork import methods
25 # New instance of multiprocessing.Process using fork without exec can and will
26 # hang if the Python process has any other threads running. This includes the
27 # additional thread spawned by our _runner.py class. So in order to test our
28 # compatibility with multiprocessing, we first fork+exec a new process to ensure
29 # we don't have any conflicting background threads.
30 _CLIENT_FORK_SCRIPT_TEMPLATE = """if True:
33 from grpc._cython import cygrpc
34 from tests.fork import methods
36 cygrpc._GRPC_ENABLE_FORK_SUPPORT = True
37 os.environ['GRPC_POLL_STRATEGY'] = 'epoll1'
38 methods.TestCase.%s.run_test({
39 'server_host': 'localhost',
44 _SUBPROCESS_TIMEOUT_S = 30
48 sys.platform.startswith("linux"),
49 "not supported on windows, and fork+exec networking blocked on mac")
50 @unittest.skipUnless(six.PY2, "https://github.com/grpc/grpc/issues/18075")
51 class ForkInteropTest(unittest.TestCase):
54 start_server_script = """if True:
59 from src.proto.grpc.testing import test_pb2_grpc
60 from tests.interop import service as interop_service
61 from tests.unit import test_common
63 server = test_common.test_server()
64 test_pb2_grpc.add_TestServiceServicer_to_server(
65 interop_service.TestService(), server)
66 port = server.add_insecure_port('[::]:0')
73 streams = tuple(tempfile.TemporaryFile() for _ in range(2))
74 self._server_process = subprocess.Popen(
75 [sys.executable, '-c', start_server_script],
78 timer = threading.Timer(_SUBPROCESS_TIMEOUT_S,
79 self._server_process.kill)
84 s = streams[0].readline()
91 raise Exception('Failed to get port from server')
95 def testConnectivityWatch(self):
96 self._verifyTestCase(methods.TestCase.CONNECTIVITY_WATCH)
98 def testCloseChannelBeforeFork(self):
99 self._verifyTestCase(methods.TestCase.CLOSE_CHANNEL_BEFORE_FORK)
101 def testAsyncUnarySameChannel(self):
102 self._verifyTestCase(methods.TestCase.ASYNC_UNARY_SAME_CHANNEL)
104 def testAsyncUnaryNewChannel(self):
105 self._verifyTestCase(methods.TestCase.ASYNC_UNARY_NEW_CHANNEL)
107 def testBlockingUnarySameChannel(self):
108 self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_SAME_CHANNEL)
110 def testBlockingUnaryNewChannel(self):
111 self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_NEW_CHANNEL)
113 def testInProgressBidiContinueCall(self):
114 self._verifyTestCase(methods.TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL)
116 def testInProgressBidiSameChannelAsyncCall(self):
117 self._verifyTestCase(
118 methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL)
120 def testInProgressBidiSameChannelBlockingCall(self):
121 self._verifyTestCase(
122 methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL)
124 def testInProgressBidiNewChannelAsyncCall(self):
125 self._verifyTestCase(
126 methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL)
128 def testInProgressBidiNewChannelBlockingCall(self):
129 self._verifyTestCase(
130 methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL)
133 self._server_process.kill()
135 def _verifyTestCase(self, test_case):
136 script = _CLIENT_FORK_SCRIPT_TEMPLATE % (test_case.name, self._port)
137 streams = tuple(tempfile.TemporaryFile() for _ in range(2))
138 process = subprocess.Popen([sys.executable, '-c', script],
141 timer = threading.Timer(_SUBPROCESS_TIMEOUT_S, process.kill)
146 for stream in streams:
148 outputs.append(stream.read())
150 0, process.returncode,
151 'process failed with exit code %d (stdout: "%s", stderr: "%s")' %
152 (process.returncode, outputs[0], outputs[1]))
155 if __name__ == '__main__':
156 unittest.main(verbosity=2)