Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / tools / swarming_client / utils / subprocess42.py
index b0ee6e0..3641342 100644 (file)
@@ -20,6 +20,10 @@ import subprocess
 from subprocess import PIPE, STDOUT, call, check_output  # pylint: disable=W0611
 
 
+# Default maxsize argument.
+MAX_SIZE = 16384
+
+
 if subprocess.mswindows:
   import msvcrt  # pylint: disable=F0401
   from ctypes import wintypes
@@ -49,8 +53,20 @@ if subprocess.mswindows:
   def recv_multi_impl(conns, maxsize, timeout):
     """Reads from the first available pipe.
 
-    If timeout is None, it's blocking. If timeout is 0, it is not blocking.
+    It will immediately return on a closed connection, independent of timeout.
+
+    Arguments:
+    - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
+    - timeout: If None, it is blocking. If 0 or above, will return None if no
+          data is available within |timeout| seconds.
+
+    Returns:
+      tuple(int(index), str(data), bool(closed)).
     """
+    assert conns
+    assert timeout is None or isinstance(timeout, (int, float)), timeout
+    maxsize = max(maxsize or MAX_SIZE, 1)
+
     # TODO(maruel): Use WaitForMultipleObjects(). Python creates anonymous pipes
     # for proc.stdout and proc.stderr but they are implemented as named pipes on
     # Windows. Since named pipes are not waitable object, they can't be passed
@@ -59,27 +75,24 @@ if subprocess.mswindows:
     # object and remembering the pending ReadFile() calls. This will require
     # some re-architecture to store the relevant event handle and OVERLAPPEDIO
     # object in Popen or the file object.
-    maxsize = max(maxsize or 16384, 1)
-    if timeout:
-      start = time.time()
+    start = time.time()
     handles = [
       (i, msvcrt.get_osfhandle(c.fileno())) for i, c in enumerate(conns)
     ]
-    while handles:
+    while True:
       for index, handle in handles:
         try:
           avail = min(PeekNamedPipe(handle), maxsize)
           if avail:
-            return index, ReadFile(handle, avail)[1]
-          if (timeout and (time.time() - start) >= timeout) or timeout == 0:
-            return None, None
-          # Polling rocks.
-          time.sleep(0.001)
+            return index, ReadFile(handle, avail)[1], False
         except OSError:
-          handles.remove((index, handle))
-          break
-    # Nothing to wait for.
-    return None, None
+          # The pipe closed.
+          return index, None, True
+
+      if timeout is not None and (time.time() - start) >= timeout:
+        return None, None, False
+      # Polling rocks.
+      time.sleep(0.001)
 
 else:
   import fcntl  # pylint: disable=F0401
@@ -88,14 +101,29 @@ else:
   def recv_multi_impl(conns, maxsize, timeout):
     """Reads from the first available pipe.
 
-    If timeout is None, it's blocking. If timeout is 0, it is not blocking.
+    It will immediately return on a closed connection, independent of timeout.
+
+    Arguments:
+    - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
+    - timeout: If None, it is blocking. If 0 or above, will return None if no
+          data is available within |timeout| seconds.
+
+    Returns:
+      tuple(int(index), str(data), bool(closed)).
     """
+    assert conns
+    assert timeout is None or isinstance(timeout, (int, float)), timeout
+    maxsize = max(maxsize or MAX_SIZE, 1)
+
+    # select(timeout=0) will block, it has to be a value > 0.
+    if timeout == 0:
+      timeout = 0.001
     try:
       r, _, _ = select.select(conns, [], [], timeout)
     except select.error:
-      return None, None
+      r = None
     if not r:
-      return None, None
+      return None, None, False
 
     conn = r[0]
     # Temporarily make it non-blocking.
@@ -106,8 +134,11 @@ else:
       # pylint: disable=E1101
       fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
     try:
-      data = conn.read(max(maxsize or 16384, 1))
-      return conns.index(conn), data
+      data = conn.read(maxsize)
+      if not data:
+        # On posix, this means the channel closed.
+        return conns.index(conn), None, True
+      return conns.index(conn), data, False
     finally:
       if not conn.closed:
         fcntl.fcntl(conn, fcntl.F_SETFL, flags)
@@ -146,83 +177,136 @@ class Popen(subprocess.Popen):
       self.end = time.time()
     return ret
 
-  def yield_any(self, timeout=None):
+  def yield_any(self, maxsize=None, hard_timeout=None, soft_timeout=None):
     """Yields output until the process terminates or is killed by a timeout.
 
     Yielded values are in the form (pipename, data).
 
-    If timeout is None, it is blocking. If timeout is 0, it doesn't block. This
-    is not generally useful to use timeout=0.
+    Arguments:
+    - maxsize: See recv_any().
+    - hard_timeout: If None, the process is never killed. If set, the process is
+          killed after |hard_timeout| seconds.
+    - soft_timeout: If None, the call is blocking. If set, yields None, None
+          if no data is available within |soft_timeout| seconds. It resets
+          itself after each yield.
     """
-    remaining = 0
+    if hard_timeout is not None:
+      # hard_timeout=0 means the process is not even given a little chance to
+      # execute and will be immediately killed.
+      assert isinstance(hard_timeout, (int, float)) and hard_timeout > 0., (
+          hard_timeout)
+    if soft_timeout is not None:
+      # soft_timeout=0 effectively means that the pipe is continuously polled.
+      assert isinstance(soft_timeout, (int, float)) and soft_timeout >= 0, (
+          soft_timeout)
+
+    last_yield = time.time()
     while self.poll() is None:
-      if timeout:
-        # While these float() calls seem redundant, they are to force
-        # ResetableTimeout to "render" itself into a float. At each call, the
-        # resulting value could be different, depending if a .reset() call
-        # occurred.
-        remaining = max(float(timeout) - self.duration(), 0.001)
-      else:
-        remaining = timeout
-      t, data = self.recv_any(timeout=remaining)
-      if data or timeout == 0:
-        yield (t, data)
-      if timeout and self.duration() >= float(timeout):
+      t, data = self.recv_any(
+          maxsize=maxsize,
+          timeout=self._calc_timeout(hard_timeout, soft_timeout, last_yield))
+      if data or soft_timeout is not None:
+        yield t, data
+        last_yield = time.time()
+
+      if hard_timeout and self.duration() >= hard_timeout:
         break
-    if self.poll() is None and timeout and self.duration() >= float(timeout):
-      logging.debug('Kill %s %s', self.duration(), float(timeout))
+
+    if self.poll() is None and hard_timeout:
+      logging.debug('Kill %s %s', self.duration(), hard_timeout)
       self.kill()
     self.wait()
+
     # Read all remaining output in the pipes.
     while True:
-      t, data = self.recv_any()
+      t, data = self.recv_any(maxsize=maxsize)
       if not data:
         break
-      yield (t, data)
+      yield t, data
+
+  def _calc_timeout(self, hard_timeout, soft_timeout, last_yield):
+    """Returns the timeout to be used on the next recv_any() in yield_any().
+
+    It depends on both timeout. It returns None if no timeout is used. Otherwise
+    it returns a value >= 0.001. It's not 0 because it's effectively polling, on
+    linux it can peg a single core, so adding 1ms sleep does a tremendous
+    difference.
+    """
+    hard_remaining = (
+        max(hard_timeout - self.duration(), 0) if hard_timeout else None)
+    soft_remaining = (
+        max(soft_timeout - (time.time() - last_yield), 0)
+        if soft_timeout is not None else None)
+    if hard_remaining is None:
+      return soft_remaining
+    if soft_remaining is None:
+      return hard_remaining
+    return min(hard_remaining, soft_remaining)
 
   def recv_any(self, maxsize=None, timeout=None):
-    """Reads from stderr and if empty, from stdout.
+    """Reads from the first pipe available from stdout and stderr.
 
-    If timeout is None, it is blocking. If timeout is 0, it doesn't block.
+    Arguments:
+    - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
+    - timeout: If None, it is blocking. If 0 or above, will return None if no
+          data is available within |timeout| seconds.
+
+    Returns:
+      tuple(int(index) or None, str(data)).
     """
-    pipes = [
-      x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0]
-    ]
-    if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno():
-      pipes.pop(0)
-    if not pipes:
-      return None, None
-    conns, names = zip(*pipes)
-    index, data = recv_multi_impl(conns, maxsize, timeout)
-    if index is None:
-      return index, data
-    if not data:
-      self._close(names[index])
-      return None, None
-    if self.universal_newlines:
-      data = self._translate_newlines(data)
-    return names[index], data
+    # recv_multi_impl will early exit on a closed connection. Loop accordingly
+    # to simplify call sites.
+    while True:
+      pipes = [
+        x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0]
+      ]
+      # If both stdout and stderr have the exact file handle, they are
+      # effectively the same pipe. Deduplicate it since otherwise it confuses
+      # recv_multi_impl().
+      if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno():
+        pipes.pop(0)
+
+      if not pipes:
+        return None, None
+      start = time.time()
+      conns, names = zip(*pipes)
+      index, data, closed = recv_multi_impl(conns, maxsize, timeout)
+      if index is None:
+        return index, data
+      if closed:
+        self._close(names[index])
+        if not data:
+          # Loop again. The other pipe may still be open.
+          if timeout:
+            timeout -= (time.time() - start)
+          continue
+
+      if self.universal_newlines:
+        data = self._translate_newlines(data)
+      return names[index], data
 
   def recv_out(self, maxsize=None, timeout=None):
-    """Reads from stdout asynchronously."""
+    """Reads from stdout synchronously with timeout."""
     return self._recv('stdout', maxsize, timeout)
 
   def recv_err(self, maxsize=None, timeout=None):
-    """Reads from stderr asynchronously."""
+    """Reads from stderr synchronously with timeout."""
     return self._recv('stderr', maxsize, timeout)
 
   def _close(self, which):
+    """Closes either stdout or stderr."""
     getattr(self, which).close()
     setattr(self, which, None)
 
   def _recv(self, which, maxsize, timeout):
+    """Reads from one of stdout or stderr synchronously with timeout."""
     conn = getattr(self, which)
     if conn is None:
       return None
-    data = recv_multi_impl([conn], maxsize, timeout)
-    if not data:
-      return self._close(which)
-    if self.universal_newlines:
+    _, data, closed = recv_multi_impl([conn], maxsize, timeout)
+    if closed:
+      self._close(which)
+    if self.universal_newlines and data:
       data = self._translate_newlines(data)
     return data
 
@@ -240,7 +324,7 @@ def call_with_timeout(cmd, timeout, **kwargs):
   if timeout:
     out = ''
     err = ''
-    for t, data in proc.yield_any(timeout):
+    for t, data in proc.yield_any(hard_timeout=timeout):
       if t == 'stdout':
         out += data
       else: