Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / python / grpcio_tests / tests / unit / _server_shutdown_test.py
1 # Copyright 2018 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests clean shutdown of server on various interpreter exit conditions.
15
16 The tests in this module spawn a subprocess for each test case, the
17 test is considered successful if it doesn't hang/timeout.
18 """
19
20 import atexit
21 import os
22 import subprocess
23 import sys
24 import threading
25 import unittest
26 import logging
27
28 from tests.unit import _server_shutdown_scenarios
29
30 SCENARIO_FILE = os.path.abspath(
31     os.path.join(os.path.dirname(os.path.realpath(__file__)),
32                  '_server_shutdown_scenarios.py'))
33 INTERPRETER = sys.executable
34 BASE_COMMAND = [INTERPRETER, SCENARIO_FILE]
35
36 processes = []
37 process_lock = threading.Lock()
38
39
40 # Make sure we attempt to clean up any
41 # processes we may have left running
42 def cleanup_processes():
43     with process_lock:
44         for process in processes:
45             try:
46                 process.kill()
47             except Exception:  # pylint: disable=broad-except
48                 pass
49
50
51 atexit.register(cleanup_processes)
52
53
54 def wait(process):
55     with process_lock:
56         processes.append(process)
57     process.wait()
58
59
60 class ServerShutdown(unittest.TestCase):
61
62     # Currently we shut down a server (if possible) after the Python server
63     # instance is garbage collected. This behavior may change in the future.
64     def test_deallocated_server_stops(self):
65         process = subprocess.Popen(
66             BASE_COMMAND + [_server_shutdown_scenarios.SERVER_DEALLOCATED],
67             stdout=sys.stdout,
68             stderr=sys.stderr)
69         wait(process)
70
71     def test_server_exception_exits(self):
72         process = subprocess.Popen(
73             BASE_COMMAND + [_server_shutdown_scenarios.SERVER_RAISES_EXCEPTION],
74             stdout=sys.stdout,
75             stderr=sys.stderr)
76         wait(process)
77
78     @unittest.skipIf(os.name == 'nt', 'fork not supported on windows')
79     def test_server_fork_can_exit(self):
80         process = subprocess.Popen(
81             BASE_COMMAND + [_server_shutdown_scenarios.SERVER_FORK_CAN_EXIT],
82             stdout=sys.stdout,
83             stderr=sys.stderr)
84         wait(process)
85
86
87 if __name__ == '__main__':
88     logging.basicConfig()
89     unittest.main(verbosity=2)