# Use of this source code is governed under the Apache License, Version 2.0 that
# can be found in the LICENSE file.
+import itertools
import logging
import os
import sys
OUTPUT = os.path.join(ROOT_DIR, 'tests', 'subprocess42', 'output.py')
+# Disable pre-set unbuffered output to not interfere with the testing being done
+# here. Otherwise everything would test with unbuffered; which is fine but
+# that's not what we specifically want to test here.
+ENV = os.environ.copy()
+ENV.pop('PYTHONUNBUFFERED', None)
+
+
def to_native_eol(string):
if string is None:
return string
return string
+def get_output_sleep_proc(flush, unbuffered, sleep_duration):
+ """Returns process with universal_newlines=True that prints to stdout before
+ after a sleep.
+
+ It also optionally sys.stdout.flush() before the sleep and optionally enable
+ unbuffered output in python.
+ """
+ command = [
+ 'import sys,time',
+ 'print(\'A\')',
+ ]
+ if flush:
+ # Sadly, this doesn't work otherwise in some combination.
+ command.append('sys.stdout.flush()')
+ command.extend((
+ 'time.sleep(%s)' % sleep_duration,
+ 'print(\'B\')',
+ ))
+ cmd = [sys.executable, '-c', ';'.join(command)]
+ if unbuffered:
+ cmd.append('-u')
+ return subprocess42.Popen(
+ cmd, env=ENV, stdout=subprocess42.PIPE, universal_newlines=True)
+
+
+def get_output_sleep_proc_err(sleep_duration):
+ """Returns process with universal_newlines=True that prints to stderr before
+ and after a sleep.
+ """
+ command = [
+ 'import sys,time',
+ 'sys.stderr.write(\'A\\n\')',
+ ]
+ command.extend((
+ 'time.sleep(%s)' % sleep_duration,
+ 'sys.stderr.write(\'B\\n\')',
+ ))
+ cmd = [sys.executable, '-c', ';'.join(command)]
+ return subprocess42.Popen(
+ cmd, env=ENV, stderr=subprocess42.PIPE, universal_newlines=True)
+
+
class Subprocess42Test(unittest.TestCase):
def test_call_with_timeout(self):
timedout = 1 if sys.platform == 'win32' else -9
for i, (data, expected) in enumerate(test_data):
stdout, stderr, code, duration = subprocess42.call_with_timeout(
[sys.executable, OUTPUT] + data[0],
+ env=ENV,
stderr=data[1],
timeout=data[2])
self.assertTrue(duration > 0.0001, (data, duration))
# Try again with universal_newlines=True.
stdout, stderr, code, duration = subprocess42.call_with_timeout(
[sys.executable, OUTPUT] + data[0],
+ env=ENV,
stderr=data[1],
timeout=data[2],
universal_newlines=True)
(i,) + expected)
def test_recv_any(self):
+ # Test all pipe direction and output scenarios.
combinations = [
{
'cmd': ['out_print', 'err_print'],
'expected': {'stdout': 'printingprinting'},
},
]
- for i, data in enumerate(combinations):
- cmd = [sys.executable, OUTPUT] + data['cmd']
+ for i, testcase in enumerate(combinations):
+ cmd = [sys.executable, OUTPUT] + testcase['cmd']
p = subprocess42.Popen(
- cmd, stdout=data['stdout'], stderr=data['stderr'])
+ cmd, env=ENV, stdout=testcase['stdout'], stderr=testcase['stderr'])
actual = {}
while p.poll() is None:
- pipe, d = p.recv_any()
- if pipe is not None:
+ pipe, data = p.recv_any()
+ if data:
actual.setdefault(pipe, '')
- actual[pipe] += d
+ actual[pipe] += data
+
+ # The process exited, read any remaining data in the pipes.
while True:
- pipe, d = p.recv_any()
+ pipe, data = p.recv_any()
if pipe is None:
break
actual.setdefault(pipe, '')
- actual[pipe] += d
+ actual[pipe] += data
self.assertEqual(
- data['expected'], actual, (i, data['cmd'], data['expected'], actual))
+ testcase['expected'],
+ actual,
+ (i, testcase['cmd'], testcase['expected'], actual))
self.assertEqual((None, None), p.recv_any())
self.assertEqual(0, p.returncode)
- @staticmethod
- def _get_output_sleep_proc(flush, env, duration):
- command = [
- 'import sys,time',
- 'print(\'A\')',
- ]
- if flush:
- # Sadly, this doesn't work otherwise in some combination.
- command.append('sys.stdout.flush()')
- command.extend((
- 'time.sleep(%s)' % duration,
- 'print(\'B\')',
- ))
- return subprocess42.Popen(
- [
- sys.executable,
- '-c',
- ';'.join(command),
- ],
- stdout=subprocess42.PIPE,
- universal_newlines=True,
- env=env)
-
- def test_yield_any_None(self):
+ def test_recv_any_different_buffering(self):
+ # Specifically test all buffering scenarios.
+ for flush, unbuffered in itertools.product([True, False], [True, False]):
+ actual = ''
+ proc = get_output_sleep_proc(flush, unbuffered, 0.5)
+ while True:
+ p, data = proc.recv_any()
+ if not p:
+ break
+ self.assertEqual('stdout', p)
+ self.assertTrue(data, (p, data))
+ actual += data
+
+ self.assertEqual('A\nB\n', actual)
+ # Contrary to yield_any() or recv_any(0), wait() needs to be used here.
+ proc.wait()
+ self.assertEqual(0, proc.returncode)
+
+ def test_recv_any_timeout_0(self):
+ # rec_any() is expected to timeout and return None with no data pending at
+ # least once, due to the sleep of 'duration' and the use of timeout=0.
+ for flush, unbuffered in itertools.product([True, False], [True, False]):
+ for duration in (0.05, 0.1, 0.5, 2):
+ try:
+ actual = ''
+ proc = get_output_sleep_proc(flush, unbuffered, duration)
+ got_none = False
+ while True:
+ p, data = proc.recv_any(timeout=0)
+ if not p:
+ if proc.poll() is None:
+ got_none = True
+ continue
+ break
+ self.assertEqual('stdout', p)
+ self.assertTrue(data, (p, data))
+ actual += data
+
+ self.assertEqual('A\nB\n', actual)
+ self.assertEqual(0, proc.returncode)
+ self.assertEqual(True, got_none)
+ break
+ except AssertionError:
+ if duration != 2:
+ print('Sleeping rocks. Trying slower.')
+ continue
+ raise
+
+ def _test_recv_common(self, proc, is_err):
+ actual = ''
+ while True:
+ if is_err:
+ data = proc.recv_err()
+ else:
+ data = proc.recv_out()
+ if not data:
+ break
+ self.assertTrue(data)
+ actual += data
+
+ self.assertEqual('A\nB\n', actual)
+ proc.wait()
+ self.assertEqual(0, proc.returncode)
+
+ def test_yield_any_no_timeout(self):
for duration in (0.05, 0.1, 0.5, 2):
try:
- proc = self._get_output_sleep_proc(True, {}, duration)
+ proc = get_output_sleep_proc(True, True, duration)
expected = [
'A\n',
'B\n',
]
- for p, data in proc.yield_any(timeout=None):
+ for p, data in proc.yield_any():
self.assertEqual('stdout', p)
self.assertEqual(expected.pop(0), data)
self.assertEqual(0, proc.returncode)
break
except AssertionError:
if duration != 2:
- print('Sleeping rocks. trying more slowly.')
+ print('Sleeping rocks. Trying slower.')
continue
raise
- def test_yield_any_0(self):
+ def test_yield_any_hard_timeout(self):
+ # Kill the process due to hard_timeout.
+ proc = get_output_sleep_proc(True, True, 10)
+ got_none = False
+ actual = ''
+ for p, data in proc.yield_any(hard_timeout=1):
+ if not data:
+ got_none = True
+ continue
+ self.assertEqual('stdout', p)
+ actual += data
+ if sys.platform == 'win32':
+ self.assertEqual(1, proc.returncode)
+ else:
+ self.assertEqual(-9, proc.returncode)
+ self.assertEqual('A\n', actual)
+ # No None is returned, since it's not using soft_timeout.
+ self.assertEqual(False, got_none)
+
+ def test_yield_any_soft_timeout_0(self):
+ # rec_any() is expected to timeout and return None with no data pending at
+ # least once, due to the sleep of 'duration' and the use of timeout=0.
for duration in (0.05, 0.1, 0.5, 2):
try:
- proc = self._get_output_sleep_proc(True, {}, duration)
+ proc = get_output_sleep_proc(True, True, duration)
expected = [
'A\n',
'B\n',
]
got_none = False
- for p, data in proc.yield_any(timeout=0):
+ for p, data in proc.yield_any(soft_timeout=0):
if not p:
got_none = True
continue
break
except AssertionError:
if duration != 2:
- print('Sleeping rocks. trying more slowly.')
+ print('Sleeping rocks. Trying slower.')
continue
raise
- def test_recv_any_None(self):
- values = (
- (True, ['A\n', 'B\n'], {}),
- (False, ['A\nB\n'], {}),
- (False, ['A\n', 'B\n'], {'PYTHONUNBUFFERED': 'x'}),
- )
- for flush, exp, env in values:
- for duration in (0.05, 0.1, 0.5, 2):
- expected = exp[:]
- try:
- proc = self._get_output_sleep_proc(flush, env, duration)
- while True:
- p, data = proc.recv_any(timeout=None)
- if not p:
- break
- self.assertEqual('stdout', p)
- if not expected:
- self.fail(data)
- e = expected.pop(0)
- if env:
- # Buffering is truly a character-level and could get items
- # individually. This is usually seen only during high load, try
- # compiling at the same time to reproduce it.
- if len(data) < len(e):
- expected.insert(0, e[len(data):])
- e = e[:len(data)]
- self.assertEqual(e, data)
- # Contrary to yield_any() or recv_any(0), wait() needs to be used
- # here.
- proc.wait()
- self.assertEqual([], expected)
- self.assertEqual(0, proc.returncode)
- except AssertionError:
- if duration != 2:
- print('Sleeping rocks. trying more slowly.')
- continue
- raise
-
- def test_recv_any_0(self):
- values = (
- (True, ['A\n', 'B\n'], {}),
- (False, ['A\nB\n'], {}),
- (False, ['A\n', 'B\n'], {'PYTHONUNBUFFERED': 'x'}),
- )
- for i, (flush, exp, env) in enumerate(values):
- for duration in (0.1, 0.5, 2):
- expected = exp[:]
- try:
- proc = self._get_output_sleep_proc(flush, env, duration)
- got_none = False
- while True:
- p, data = proc.recv_any(timeout=0)
- if not p:
- if proc.poll() is None:
- got_none = True
- continue
- break
- self.assertEqual('stdout', p)
- if not expected:
- self.fail(data)
- e = expected.pop(0)
- if sys.platform == 'win32':
- # Buffering is truly a character-level on Windows and could get
- # items individually.
- if len(data) < len(e):
- expected.insert(0, e[len(data):])
- e = e[:len(data)]
- self.assertEqual(e, data)
-
- self.assertEqual(0, proc.returncode)
- self.assertEqual([], expected)
- self.assertEqual(True, got_none)
- except Exception as e:
- if duration != 2:
- print('Sleeping rocks. trying more slowly.')
- continue
- print >> sys.stderr, 'Failure at index %d' % i
- raise
-
if __name__ == '__main__':
+ if '-v' in sys.argv:
+ unittest.TestCase.maxDiff = None
logging.basicConfig(
level=logging.DEBUG if '-v' in sys.argv else logging.ERROR)
unittest.main()