Imported Upstream version 1.34.0
[platform/upstream/grpc.git] / src / python / grpcio_tests / tests / fork / _fork_interop_test.py
1 # Copyright 2019 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Client-side fork interop tests as a unit test."""
15
16 import six
17 import subprocess
18 import sys
19 import tempfile
20 import threading
21 import unittest
22 from grpc._cython import cygrpc
23 from tests.fork import methods
24
25 # New instance of multiprocessing.Process using fork without exec can and will
26 # hang if the Python process has any other threads running. This includes the
27 # additional thread spawned by our _runner.py class. So in order to test our
28 # compatibility with multiprocessing, we first fork+exec a new process to ensure
29 # we don't have any conflicting background threads.
30 _CLIENT_FORK_SCRIPT_TEMPLATE = """if True:
31     import os
32     import sys
33     from grpc._cython import cygrpc
34     from tests.fork import methods
35
36     cygrpc._GRPC_ENABLE_FORK_SUPPORT = True
37     os.environ['GRPC_POLL_STRATEGY'] = 'epoll1'
38     methods.TestCase.%s.run_test({
39       'server_host': 'localhost',
40       'server_port': %d,
41       'use_tls': False
42     })
43 """
44 _SUBPROCESS_TIMEOUT_S = 30
45
46
47 @unittest.skipUnless(
48     sys.platform.startswith("linux"),
49     "not supported on windows, and fork+exec networking blocked on mac")
50 @unittest.skipUnless(six.PY2, "https://github.com/grpc/grpc/issues/18075")
51 class ForkInteropTest(unittest.TestCase):
52
53     def setUp(self):
54         start_server_script = """if True:
55             import sys
56             import time
57
58             import grpc
59             from src.proto.grpc.testing import test_pb2_grpc
60             from tests.interop import service as interop_service
61             from tests.unit import test_common
62
63             server = test_common.test_server()
64             test_pb2_grpc.add_TestServiceServicer_to_server(
65                 interop_service.TestService(), server)
66             port = server.add_insecure_port('[::]:0')
67             server.start()
68             print(port)
69             sys.stdout.flush()
70             while True:
71                 time.sleep(1)
72         """
73         streams = tuple(tempfile.TemporaryFile() for _ in range(2))
74         self._server_process = subprocess.Popen(
75             [sys.executable, '-c', start_server_script],
76             stdout=streams[0],
77             stderr=streams[1])
78         timer = threading.Timer(_SUBPROCESS_TIMEOUT_S,
79                                 self._server_process.kill)
80         try:
81             timer.start()
82             while True:
83                 streams[0].seek(0)
84                 s = streams[0].readline()
85                 if not s:
86                     continue
87                 else:
88                     self._port = int(s)
89                     break
90         except ValueError:
91             raise Exception('Failed to get port from server')
92         finally:
93             timer.cancel()
94
95     def testConnectivityWatch(self):
96         self._verifyTestCase(methods.TestCase.CONNECTIVITY_WATCH)
97
98     def testCloseChannelBeforeFork(self):
99         self._verifyTestCase(methods.TestCase.CLOSE_CHANNEL_BEFORE_FORK)
100
101     def testAsyncUnarySameChannel(self):
102         self._verifyTestCase(methods.TestCase.ASYNC_UNARY_SAME_CHANNEL)
103
104     def testAsyncUnaryNewChannel(self):
105         self._verifyTestCase(methods.TestCase.ASYNC_UNARY_NEW_CHANNEL)
106
107     def testBlockingUnarySameChannel(self):
108         self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_SAME_CHANNEL)
109
110     def testBlockingUnaryNewChannel(self):
111         self._verifyTestCase(methods.TestCase.BLOCKING_UNARY_NEW_CHANNEL)
112
113     def testInProgressBidiContinueCall(self):
114         self._verifyTestCase(methods.TestCase.IN_PROGRESS_BIDI_CONTINUE_CALL)
115
116     def testInProgressBidiSameChannelAsyncCall(self):
117         self._verifyTestCase(
118             methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_ASYNC_CALL)
119
120     def testInProgressBidiSameChannelBlockingCall(self):
121         self._verifyTestCase(
122             methods.TestCase.IN_PROGRESS_BIDI_SAME_CHANNEL_BLOCKING_CALL)
123
124     def testInProgressBidiNewChannelAsyncCall(self):
125         self._verifyTestCase(
126             methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_ASYNC_CALL)
127
128     def testInProgressBidiNewChannelBlockingCall(self):
129         self._verifyTestCase(
130             methods.TestCase.IN_PROGRESS_BIDI_NEW_CHANNEL_BLOCKING_CALL)
131
132     def tearDown(self):
133         self._server_process.kill()
134
135     def _verifyTestCase(self, test_case):
136         script = _CLIENT_FORK_SCRIPT_TEMPLATE % (test_case.name, self._port)
137         streams = tuple(tempfile.TemporaryFile() for _ in range(2))
138         process = subprocess.Popen([sys.executable, '-c', script],
139                                    stdout=streams[0],
140                                    stderr=streams[1])
141         timer = threading.Timer(_SUBPROCESS_TIMEOUT_S, process.kill)
142         timer.start()
143         process.wait()
144         timer.cancel()
145         outputs = []
146         for stream in streams:
147             stream.seek(0)
148             outputs.append(stream.read())
149         self.assertEqual(
150             0, process.returncode,
151             'process failed with exit code %d (stdout: "%s", stderr: "%s")' %
152             (process.returncode, outputs[0], outputs[1]))
153
154
155 if __name__ == '__main__':
156     unittest.main(verbosity=2)