Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / tools / swarming_client / tests / subprocess42_test.py
index 91e92e2..799139c 100755 (executable)
@@ -3,6 +3,7 @@
 # Use of this source code is governed under the Apache License, Version 2.0 that
 # can be found in the LICENSE file.
 
+import itertools
 import logging
 import os
 import sys
@@ -16,6 +17,13 @@ from utils import subprocess42
 OUTPUT = os.path.join(ROOT_DIR, 'tests', 'subprocess42', 'output.py')
 
 
+# Disable pre-set unbuffered output to not interfere with the testing being done
+# here. Otherwise everything would test with unbuffered; which is fine but
+# that's not what we specifically want to test here.
+ENV = os.environ.copy()
+ENV.pop('PYTHONUNBUFFERED', None)
+
+
 def to_native_eol(string):
   if string is None:
     return string
@@ -24,6 +32,48 @@ def to_native_eol(string):
   return string
 
 
+def get_output_sleep_proc(flush, unbuffered, sleep_duration):
+  """Returns process with universal_newlines=True that prints to stdout before
+  after a sleep.
+
+  It also optionally sys.stdout.flush() before the sleep and optionally enable
+  unbuffered output in python.
+  """
+  command = [
+    'import sys,time',
+    'print(\'A\')',
+  ]
+  if flush:
+    # Sadly, this doesn't work otherwise in some combination.
+    command.append('sys.stdout.flush()')
+  command.extend((
+    'time.sleep(%s)' % sleep_duration,
+    'print(\'B\')',
+  ))
+  cmd = [sys.executable, '-c', ';'.join(command)]
+  if unbuffered:
+    cmd.append('-u')
+  return subprocess42.Popen(
+      cmd, env=ENV, stdout=subprocess42.PIPE, universal_newlines=True)
+
+
+def get_output_sleep_proc_err(sleep_duration):
+  """Returns process with universal_newlines=True that prints to stderr before
+  and after a sleep.
+  """
+  command = [
+    'import sys,time',
+    'sys.stderr.write(\'A\\n\')',
+  ]
+  command.extend((
+    'time.sleep(%s)' % sleep_duration,
+    'sys.stderr.write(\'B\\n\')',
+  ))
+  cmd = [sys.executable, '-c', ';'.join(command)]
+  return subprocess42.Popen(
+      cmd, env=ENV, stderr=subprocess42.PIPE, universal_newlines=True)
+
+
 class Subprocess42Test(unittest.TestCase):
   def test_call_with_timeout(self):
     timedout = 1 if sys.platform == 'win32' else -9
@@ -71,6 +121,7 @@ class Subprocess42Test(unittest.TestCase):
     for i, (data, expected) in enumerate(test_data):
       stdout, stderr, code, duration = subprocess42.call_with_timeout(
           [sys.executable, OUTPUT] + data[0],
+          env=ENV,
           stderr=data[1],
           timeout=data[2])
       self.assertTrue(duration > 0.0001, (data, duration))
@@ -84,6 +135,7 @@ class Subprocess42Test(unittest.TestCase):
       # Try again with universal_newlines=True.
       stdout, stderr, code, duration = subprocess42.call_with_timeout(
           [sys.executable, OUTPUT] + data[0],
+          env=ENV,
           stderr=data[1],
           timeout=data[2],
           universal_newlines=True)
@@ -93,6 +145,7 @@ class Subprocess42Test(unittest.TestCase):
           (i,) + expected)
 
   def test_recv_any(self):
+    # Test all pipe direction and output scenarios.
     combinations = [
       {
         'cmd': ['out_print', 'err_print'],
@@ -158,59 +211,104 @@ class Subprocess42Test(unittest.TestCase):
         'expected': {'stdout': 'printingprinting'},
       },
     ]
-    for i, data in enumerate(combinations):
-      cmd = [sys.executable, OUTPUT] + data['cmd']
+    for i, testcase in enumerate(combinations):
+      cmd = [sys.executable, OUTPUT] + testcase['cmd']
       p = subprocess42.Popen(
-          cmd, stdout=data['stdout'], stderr=data['stderr'])
+          cmd, env=ENV, stdout=testcase['stdout'], stderr=testcase['stderr'])
       actual = {}
       while p.poll() is None:
-        pipe, d = p.recv_any()
-        if pipe is not None:
+        pipe, data = p.recv_any()
+        if data:
           actual.setdefault(pipe, '')
-          actual[pipe] += d
+          actual[pipe] += data
+
+      # The process exited, read any remaining data in the pipes.
       while True:
-        pipe, d = p.recv_any()
+        pipe, data = p.recv_any()
         if pipe is None:
           break
         actual.setdefault(pipe, '')
-        actual[pipe] += d
+        actual[pipe] += data
       self.assertEqual(
-          data['expected'], actual, (i, data['cmd'], data['expected'], actual))
+          testcase['expected'],
+          actual,
+          (i, testcase['cmd'], testcase['expected'], actual))
       self.assertEqual((None, None), p.recv_any())
       self.assertEqual(0, p.returncode)
 
-  @staticmethod
-  def _get_output_sleep_proc(flush, env, duration):
-    command = [
-      'import sys,time',
-      'print(\'A\')',
-    ]
-    if flush:
-      # Sadly, this doesn't work otherwise in some combination.
-      command.append('sys.stdout.flush()')
-    command.extend((
-      'time.sleep(%s)' % duration,
-      'print(\'B\')',
-    ))
-    return subprocess42.Popen(
-        [
-          sys.executable,
-          '-c',
-          ';'.join(command),
-        ],
-        stdout=subprocess42.PIPE,
-        universal_newlines=True,
-        env=env)
-
-  def test_yield_any_None(self):
+  def test_recv_any_different_buffering(self):
+    # Specifically test all buffering scenarios.
+    for flush, unbuffered in itertools.product([True, False], [True, False]):
+      actual = ''
+      proc = get_output_sleep_proc(flush, unbuffered, 0.5)
+      while True:
+        p, data = proc.recv_any()
+        if not p:
+          break
+        self.assertEqual('stdout', p)
+        self.assertTrue(data, (p, data))
+        actual += data
+
+      self.assertEqual('A\nB\n', actual)
+      # Contrary to yield_any() or recv_any(0), wait() needs to be used here.
+      proc.wait()
+      self.assertEqual(0, proc.returncode)
+
+  def test_recv_any_timeout_0(self):
+    # rec_any() is expected to timeout and return None with no data pending at
+    # least once, due to the sleep of 'duration' and the use of timeout=0.
+    for flush, unbuffered in itertools.product([True, False], [True, False]):
+      for duration in (0.05, 0.1, 0.5, 2):
+        try:
+          actual = ''
+          proc = get_output_sleep_proc(flush, unbuffered, duration)
+          got_none = False
+          while True:
+            p, data = proc.recv_any(timeout=0)
+            if not p:
+              if proc.poll() is None:
+                got_none = True
+                continue
+              break
+            self.assertEqual('stdout', p)
+            self.assertTrue(data, (p, data))
+            actual += data
+
+          self.assertEqual('A\nB\n', actual)
+          self.assertEqual(0, proc.returncode)
+          self.assertEqual(True, got_none)
+          break
+        except AssertionError:
+          if duration != 2:
+            print('Sleeping rocks. Trying slower.')
+            continue
+          raise
+
+  def _test_recv_common(self, proc, is_err):
+    actual = ''
+    while True:
+      if is_err:
+        data = proc.recv_err()
+      else:
+        data = proc.recv_out()
+      if not data:
+        break
+      self.assertTrue(data)
+      actual += data
+
+    self.assertEqual('A\nB\n', actual)
+    proc.wait()
+    self.assertEqual(0, proc.returncode)
+
+  def test_yield_any_no_timeout(self):
     for duration in (0.05, 0.1, 0.5, 2):
       try:
-        proc = self._get_output_sleep_proc(True, {}, duration)
+        proc = get_output_sleep_proc(True, True, duration)
         expected = [
           'A\n',
           'B\n',
         ]
-        for p, data in proc.yield_any(timeout=None):
+        for p, data in proc.yield_any():
           self.assertEqual('stdout', p)
           self.assertEqual(expected.pop(0), data)
         self.assertEqual(0, proc.returncode)
@@ -218,20 +316,41 @@ class Subprocess42Test(unittest.TestCase):
         break
       except AssertionError:
         if duration != 2:
-          print('Sleeping rocks. trying more slowly.')
+          print('Sleeping rocks. Trying slower.')
           continue
         raise
 
-  def test_yield_any_0(self):
+  def test_yield_any_hard_timeout(self):
+    # Kill the process due to hard_timeout.
+    proc = get_output_sleep_proc(True, True, 10)
+    got_none = False
+    actual = ''
+    for p, data in proc.yield_any(hard_timeout=1):
+      if not data:
+        got_none = True
+        continue
+      self.assertEqual('stdout', p)
+      actual += data
+    if sys.platform == 'win32':
+      self.assertEqual(1, proc.returncode)
+    else:
+      self.assertEqual(-9, proc.returncode)
+    self.assertEqual('A\n', actual)
+    # No None is returned, since it's not using soft_timeout.
+    self.assertEqual(False, got_none)
+
+  def test_yield_any_soft_timeout_0(self):
+    # rec_any() is expected to timeout and return None with no data pending at
+    # least once, due to the sleep of 'duration' and the use of timeout=0.
     for duration in (0.05, 0.1, 0.5, 2):
       try:
-        proc = self._get_output_sleep_proc(True, {}, duration)
+        proc = get_output_sleep_proc(True, True, duration)
         expected = [
           'A\n',
           'B\n',
         ]
         got_none = False
-        for p, data in proc.yield_any(timeout=0):
+        for p, data in proc.yield_any(soft_timeout=0):
           if not p:
             got_none = True
             continue
@@ -243,91 +362,14 @@ class Subprocess42Test(unittest.TestCase):
         break
       except AssertionError:
         if duration != 2:
-          print('Sleeping rocks. trying more slowly.')
+          print('Sleeping rocks. Trying slower.')
           continue
         raise
 
-  def test_recv_any_None(self):
-    values = (
-        (True, ['A\n', 'B\n'], {}),
-        (False, ['A\nB\n'], {}),
-        (False, ['A\n', 'B\n'], {'PYTHONUNBUFFERED': 'x'}),
-    )
-    for flush, exp, env in values:
-      for duration in (0.05, 0.1, 0.5, 2):
-        expected = exp[:]
-        try:
-          proc = self._get_output_sleep_proc(flush, env, duration)
-          while True:
-            p, data = proc.recv_any(timeout=None)
-            if not p:
-              break
-            self.assertEqual('stdout', p)
-            if not expected:
-              self.fail(data)
-            e = expected.pop(0)
-            if env:
-              # Buffering is truly a character-level and could get items
-              # individually. This is usually seen only during high load, try
-              # compiling at the same time to reproduce it.
-              if len(data) < len(e):
-                expected.insert(0, e[len(data):])
-                e = e[:len(data)]
-            self.assertEqual(e, data)
-          # Contrary to yield_any() or recv_any(0), wait() needs to be used
-          # here.
-          proc.wait()
-          self.assertEqual([], expected)
-          self.assertEqual(0, proc.returncode)
-        except AssertionError:
-          if duration != 2:
-            print('Sleeping rocks. trying more slowly.')
-            continue
-          raise
-
-  def test_recv_any_0(self):
-    values = (
-        (True, ['A\n', 'B\n'], {}),
-        (False, ['A\nB\n'], {}),
-        (False, ['A\n', 'B\n'], {'PYTHONUNBUFFERED': 'x'}),
-    )
-    for i, (flush, exp, env) in enumerate(values):
-      for duration in (0.1, 0.5, 2):
-        expected = exp[:]
-        try:
-          proc = self._get_output_sleep_proc(flush, env, duration)
-          got_none = False
-          while True:
-            p, data = proc.recv_any(timeout=0)
-            if not p:
-              if proc.poll() is None:
-                got_none = True
-                continue
-              break
-            self.assertEqual('stdout', p)
-            if not expected:
-              self.fail(data)
-            e = expected.pop(0)
-            if sys.platform == 'win32':
-              # Buffering is truly a character-level on Windows and could get
-              # items individually.
-              if len(data) < len(e):
-                expected.insert(0, e[len(data):])
-                e = e[:len(data)]
-            self.assertEqual(e, data)
-
-          self.assertEqual(0, proc.returncode)
-          self.assertEqual([], expected)
-          self.assertEqual(True, got_none)
-        except Exception as e:
-          if duration != 2:
-            print('Sleeping rocks. trying more slowly.')
-            continue
-          print >> sys.stderr, 'Failure at index %d' % i
-          raise
-
 
 if __name__ == '__main__':
+  if '-v' in sys.argv:
+    unittest.TestCase.maxDiff = None
   logging.basicConfig(
       level=logging.DEBUG if '-v' in sys.argv else logging.ERROR)
   unittest.main()