2 # Copyright 2013 The Swarming Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 that
4 # can be found in the LICENSE file.
12 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
13 sys.path.insert(0, ROOT_DIR)
15 from utils import subprocess42
17 OUTPUT = os.path.join(ROOT_DIR, 'tests', 'subprocess42', 'output.py')
20 # Disable pre-set unbuffered output to not interfere with the testing being done
21 # here. Otherwise everything would test with unbuffered; which is fine but
22 # that's not what we specifically want to test here.
23 ENV = os.environ.copy()
24 ENV.pop('PYTHONUNBUFFERED', None)
27 def to_native_eol(string):
30 if sys.platform == 'win32':
31 return string.replace('\n', '\r\n')
35 def get_output_sleep_proc(flush, unbuffered, sleep_duration):
36 """Returns process with universal_newlines=True that prints to stdout before
39 It also optionally sys.stdout.flush() before the sleep and optionally enable
40 unbuffered output in python.
47 # Sadly, this doesn't work otherwise in some combination.
48 command.append('sys.stdout.flush()')
50 'time.sleep(%s)' % sleep_duration,
53 cmd = [sys.executable, '-c', ';'.join(command)]
56 return subprocess42.Popen(
57 cmd, env=ENV, stdout=subprocess42.PIPE, universal_newlines=True)
60 def get_output_sleep_proc_err(sleep_duration):
61 """Returns process with universal_newlines=True that prints to stderr before
66 'sys.stderr.write(\'A\\n\')',
69 'time.sleep(%s)' % sleep_duration,
70 'sys.stderr.write(\'B\\n\')',
72 cmd = [sys.executable, '-c', ';'.join(command)]
73 return subprocess42.Popen(
74 cmd, env=ENV, stderr=subprocess42.PIPE, universal_newlines=True)
77 class Subprocess42Test(unittest.TestCase):
78 def test_call_with_timeout(self):
79 timedout = 1 if sys.platform == 'win32' else -9
81 # ( (cmd, stderr_pipe, timeout), (stdout, stderr, returncode) ), ...
83 # 0 means no timeout, like None.
85 (['out_sleeping', '0.001', 'out_slept', 'err_print'], None, 0),
86 ('Sleeping.\nSlept.\n', None, 0),
89 (['err_print'], subprocess42.STDOUT, 0),
90 ('printing', None, 0),
93 (['err_print'], subprocess42.PIPE, 0),
97 # On a loaded system, this can be tight.
99 (['out_sleeping', 'out_flush', '100', 'out_slept'], None, 0.5),
100 ('Sleeping.\n', '', timedout),
104 # Note that err_flush is necessary on Windows but not on the other
105 # OSes. This means the likelihood of missing stderr output from a
106 # killed child process on Windows is much higher than on other OSes.
108 'out_sleeping', 'out_flush', 'err_print', 'err_flush', '100',
113 ('Sleeping.\n', 'printing', timedout),
117 (['out_sleeping', '0.001', 'out_slept'], None, 100),
118 ('Sleeping.\nSlept.\n', '', 0),
121 for i, (data, expected) in enumerate(test_data):
122 stdout, stderr, code, duration = subprocess42.call_with_timeout(
123 [sys.executable, OUTPUT] + data[0],
127 self.assertTrue(duration > 0.0001, (data, duration))
129 (i, stdout, stderr, code),
131 to_native_eol(expected[0]),
132 to_native_eol(expected[1]),
135 # Try again with universal_newlines=True.
136 stdout, stderr, code, duration = subprocess42.call_with_timeout(
137 [sys.executable, OUTPUT] + data[0],
141 universal_newlines=True)
142 self.assertTrue(duration > 0.0001, (data, duration))
144 (i, stdout, stderr, code),
147 def test_recv_any(self):
148 # Test all pipe direction and output scenarios.
151 'cmd': ['out_print', 'err_print'],
157 'cmd': ['out_print', 'err_print'],
159 'stderr': subprocess42.STDOUT,
164 'cmd': ['out_print'],
165 'stdout': subprocess42.PIPE,
166 'stderr': subprocess42.PIPE,
167 'expected': {'stdout': 'printing'},
170 'cmd': ['out_print'],
171 'stdout': subprocess42.PIPE,
173 'expected': {'stdout': 'printing'},
176 'cmd': ['out_print'],
177 'stdout': subprocess42.PIPE,
178 'stderr': subprocess42.STDOUT,
179 'expected': {'stdout': 'printing'},
183 'cmd': ['err_print'],
184 'stdout': subprocess42.PIPE,
185 'stderr': subprocess42.PIPE,
186 'expected': {'stderr': 'printing'},
189 'cmd': ['err_print'],
191 'stderr': subprocess42.PIPE,
192 'expected': {'stderr': 'printing'},
195 'cmd': ['err_print'],
196 'stdout': subprocess42.PIPE,
197 'stderr': subprocess42.STDOUT,
198 'expected': {'stdout': 'printing'},
202 'cmd': ['out_print', 'err_print'],
203 'stdout': subprocess42.PIPE,
204 'stderr': subprocess42.PIPE,
205 'expected': {'stderr': 'printing', 'stdout': 'printing'},
208 'cmd': ['out_print', 'err_print'],
209 'stdout': subprocess42.PIPE,
210 'stderr': subprocess42.STDOUT,
211 'expected': {'stdout': 'printingprinting'},
214 for i, testcase in enumerate(combinations):
215 cmd = [sys.executable, OUTPUT] + testcase['cmd']
216 p = subprocess42.Popen(
217 cmd, env=ENV, stdout=testcase['stdout'], stderr=testcase['stderr'])
219 while p.poll() is None:
220 pipe, data = p.recv_any()
222 actual.setdefault(pipe, '')
225 # The process exited, read any remaining data in the pipes.
227 pipe, data = p.recv_any()
230 actual.setdefault(pipe, '')
233 testcase['expected'],
235 (i, testcase['cmd'], testcase['expected'], actual))
236 self.assertEqual((None, None), p.recv_any())
237 self.assertEqual(0, p.returncode)
239 def test_recv_any_different_buffering(self):
240 # Specifically test all buffering scenarios.
241 for flush, unbuffered in itertools.product([True, False], [True, False]):
243 proc = get_output_sleep_proc(flush, unbuffered, 0.5)
245 p, data = proc.recv_any()
248 self.assertEqual('stdout', p)
249 self.assertTrue(data, (p, data))
252 self.assertEqual('A\nB\n', actual)
253 # Contrary to yield_any() or recv_any(0), wait() needs to be used here.
255 self.assertEqual(0, proc.returncode)
257 def test_recv_any_timeout_0(self):
258 # rec_any() is expected to timeout and return None with no data pending at
259 # least once, due to the sleep of 'duration' and the use of timeout=0.
260 for flush, unbuffered in itertools.product([True, False], [True, False]):
261 for duration in (0.05, 0.1, 0.5, 2):
264 proc = get_output_sleep_proc(flush, unbuffered, duration)
267 p, data = proc.recv_any(timeout=0)
269 if proc.poll() is None:
273 self.assertEqual('stdout', p)
274 self.assertTrue(data, (p, data))
277 self.assertEqual('A\nB\n', actual)
278 self.assertEqual(0, proc.returncode)
279 self.assertEqual(True, got_none)
281 except AssertionError:
283 print('Sleeping rocks. Trying slower.')
287 def _test_recv_common(self, proc, is_err):
291 data = proc.recv_err()
293 data = proc.recv_out()
296 self.assertTrue(data)
299 self.assertEqual('A\nB\n', actual)
301 self.assertEqual(0, proc.returncode)
303 def test_yield_any_no_timeout(self):
304 for duration in (0.05, 0.1, 0.5, 2):
306 proc = get_output_sleep_proc(True, True, duration)
311 for p, data in proc.yield_any():
312 self.assertEqual('stdout', p)
313 self.assertEqual(expected.pop(0), data)
314 self.assertEqual(0, proc.returncode)
315 self.assertEqual([], expected)
317 except AssertionError:
319 print('Sleeping rocks. Trying slower.')
323 def test_yield_any_hard_timeout(self):
324 # Kill the process due to hard_timeout.
325 proc = get_output_sleep_proc(True, True, 10)
328 for p, data in proc.yield_any(hard_timeout=1):
332 self.assertEqual('stdout', p)
334 if sys.platform == 'win32':
335 self.assertEqual(1, proc.returncode)
337 self.assertEqual(-9, proc.returncode)
338 self.assertEqual('A\n', actual)
339 # No None is returned, since it's not using soft_timeout.
340 self.assertEqual(False, got_none)
342 def test_yield_any_soft_timeout_0(self):
343 # rec_any() is expected to timeout and return None with no data pending at
344 # least once, due to the sleep of 'duration' and the use of timeout=0.
345 for duration in (0.05, 0.1, 0.5, 2):
347 proc = get_output_sleep_proc(True, True, duration)
353 for p, data in proc.yield_any(soft_timeout=0):
357 self.assertEqual('stdout', p)
358 self.assertEqual(expected.pop(0), data)
359 self.assertEqual(0, proc.returncode)
360 self.assertEqual([], expected)
361 self.assertEqual(True, got_none)
363 except AssertionError:
365 print('Sleeping rocks. Trying slower.')
370 if __name__ == '__main__':
372 unittest.TestCase.maxDiff = None
374 level=logging.DEBUG if '-v' in sys.argv else logging.ERROR)