from subprocess import PIPE, STDOUT, call, check_output # pylint: disable=W0611
+# Default maxsize argument.
+MAX_SIZE = 16384
+
+
if subprocess.mswindows:
import msvcrt # pylint: disable=F0401
from ctypes import wintypes
def recv_multi_impl(conns, maxsize, timeout):
"""Reads from the first available pipe.
- If timeout is None, it's blocking. If timeout is 0, it is not blocking.
+ It will immediately return on a closed connection, independent of timeout.
+
+ Arguments:
+ - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
+ - timeout: If None, it is blocking. If 0 or above, will return None if no
+ data is available within |timeout| seconds.
+
+ Returns:
+ tuple(int(index), str(data), bool(closed)).
"""
+ assert conns
+ assert timeout is None or isinstance(timeout, (int, float)), timeout
+ maxsize = max(maxsize or MAX_SIZE, 1)
+
# TODO(maruel): Use WaitForMultipleObjects(). Python creates anonymous pipes
# for proc.stdout and proc.stderr but they are implemented as named pipes on
# Windows. Since named pipes are not waitable object, they can't be passed
# object and remembering the pending ReadFile() calls. This will require
# some re-architecture to store the relevant event handle and OVERLAPPEDIO
# object in Popen or the file object.
- maxsize = max(maxsize or 16384, 1)
- if timeout:
- start = time.time()
+ start = time.time()
handles = [
(i, msvcrt.get_osfhandle(c.fileno())) for i, c in enumerate(conns)
]
- while handles:
+ while True:
for index, handle in handles:
try:
avail = min(PeekNamedPipe(handle), maxsize)
if avail:
- return index, ReadFile(handle, avail)[1]
- if (timeout and (time.time() - start) >= timeout) or timeout == 0:
- return None, None
- # Polling rocks.
- time.sleep(0.001)
+ return index, ReadFile(handle, avail)[1], False
except OSError:
- handles.remove((index, handle))
- break
- # Nothing to wait for.
- return None, None
+ # The pipe closed.
+ return index, None, True
+
+ if timeout is not None and (time.time() - start) >= timeout:
+ return None, None, False
+ # Polling rocks.
+ time.sleep(0.001)
else:
import fcntl # pylint: disable=F0401
def recv_multi_impl(conns, maxsize, timeout):
"""Reads from the first available pipe.
- If timeout is None, it's blocking. If timeout is 0, it is not blocking.
+ It will immediately return on a closed connection, independent of timeout.
+
+ Arguments:
+ - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
+ - timeout: If None, it is blocking. If 0 or above, will return None if no
+ data is available within |timeout| seconds.
+
+ Returns:
+ tuple(int(index), str(data), bool(closed)).
"""
+ assert conns
+ assert timeout is None or isinstance(timeout, (int, float)), timeout
+ maxsize = max(maxsize or MAX_SIZE, 1)
+
+ # select(timeout=0) will block, it has to be a value > 0.
+ if timeout == 0:
+ timeout = 0.001
try:
r, _, _ = select.select(conns, [], [], timeout)
except select.error:
- return None, None
+ r = None
if not r:
- return None, None
+ return None, None, False
conn = r[0]
# Temporarily make it non-blocking.
# pylint: disable=E1101
fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
try:
- data = conn.read(max(maxsize or 16384, 1))
- return conns.index(conn), data
+ data = conn.read(maxsize)
+ if not data:
+ # On posix, this means the channel closed.
+ return conns.index(conn), None, True
+ return conns.index(conn), data, False
finally:
if not conn.closed:
fcntl.fcntl(conn, fcntl.F_SETFL, flags)
self.end = time.time()
return ret
- def yield_any(self, timeout=None):
+ def yield_any(self, maxsize=None, hard_timeout=None, soft_timeout=None):
"""Yields output until the process terminates or is killed by a timeout.
Yielded values are in the form (pipename, data).
- If timeout is None, it is blocking. If timeout is 0, it doesn't block. This
- is not generally useful to use timeout=0.
+ Arguments:
+ - maxsize: See recv_any().
+ - hard_timeout: If None, the process is never killed. If set, the process is
+ killed after |hard_timeout| seconds.
+ - soft_timeout: If None, the call is blocking. If set, yields None, None
+ if no data is available within |soft_timeout| seconds. It resets
+ itself after each yield.
"""
- remaining = 0
+ if hard_timeout is not None:
+ # hard_timeout=0 means the process is not even given a little chance to
+ # execute and will be immediately killed.
+ assert isinstance(hard_timeout, (int, float)) and hard_timeout > 0., (
+ hard_timeout)
+ if soft_timeout is not None:
+ # soft_timeout=0 effectively means that the pipe is continuously polled.
+ assert isinstance(soft_timeout, (int, float)) and soft_timeout >= 0, (
+ soft_timeout)
+
+ last_yield = time.time()
while self.poll() is None:
- if timeout:
- # While these float() calls seem redundant, they are to force
- # ResetableTimeout to "render" itself into a float. At each call, the
- # resulting value could be different, depending if a .reset() call
- # occurred.
- remaining = max(float(timeout) - self.duration(), 0.001)
- else:
- remaining = timeout
- t, data = self.recv_any(timeout=remaining)
- if data or timeout == 0:
- yield (t, data)
- if timeout and self.duration() >= float(timeout):
+ t, data = self.recv_any(
+ maxsize=maxsize,
+ timeout=self._calc_timeout(hard_timeout, soft_timeout, last_yield))
+ if data or soft_timeout is not None:
+ yield t, data
+ last_yield = time.time()
+
+ if hard_timeout and self.duration() >= hard_timeout:
break
- if self.poll() is None and timeout and self.duration() >= float(timeout):
- logging.debug('Kill %s %s', self.duration(), float(timeout))
+
+ if self.poll() is None and hard_timeout:
+ logging.debug('Kill %s %s', self.duration(), hard_timeout)
self.kill()
self.wait()
+
# Read all remaining output in the pipes.
while True:
- t, data = self.recv_any()
+ t, data = self.recv_any(maxsize=maxsize)
if not data:
break
- yield (t, data)
+ yield t, data
+
+ def _calc_timeout(self, hard_timeout, soft_timeout, last_yield):
+ """Returns the timeout to be used on the next recv_any() in yield_any().
+
+ It depends on both timeout. It returns None if no timeout is used. Otherwise
+ it returns a value >= 0.001. It's not 0 because it's effectively polling, on
+ linux it can peg a single core, so adding 1ms sleep does a tremendous
+ difference.
+ """
+ hard_remaining = (
+ max(hard_timeout - self.duration(), 0) if hard_timeout else None)
+ soft_remaining = (
+ max(soft_timeout - (time.time() - last_yield), 0)
+ if soft_timeout is not None else None)
+ if hard_remaining is None:
+ return soft_remaining
+ if soft_remaining is None:
+ return hard_remaining
+ return min(hard_remaining, soft_remaining)
def recv_any(self, maxsize=None, timeout=None):
- """Reads from stderr and if empty, from stdout.
+ """Reads from the first pipe available from stdout and stderr.
- If timeout is None, it is blocking. If timeout is 0, it doesn't block.
+ Arguments:
+ - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
+ - timeout: If None, it is blocking. If 0 or above, will return None if no
+ data is available within |timeout| seconds.
+
+ Returns:
+ tuple(int(index) or None, str(data)).
"""
- pipes = [
- x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0]
- ]
- if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno():
- pipes.pop(0)
- if not pipes:
- return None, None
- conns, names = zip(*pipes)
- index, data = recv_multi_impl(conns, maxsize, timeout)
- if index is None:
- return index, data
- if not data:
- self._close(names[index])
- return None, None
- if self.universal_newlines:
- data = self._translate_newlines(data)
- return names[index], data
+ # recv_multi_impl will early exit on a closed connection. Loop accordingly
+ # to simplify call sites.
+ while True:
+ pipes = [
+ x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0]
+ ]
+ # If both stdout and stderr have the exact file handle, they are
+ # effectively the same pipe. Deduplicate it since otherwise it confuses
+ # recv_multi_impl().
+ if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno():
+ pipes.pop(0)
+
+ if not pipes:
+ return None, None
+ start = time.time()
+ conns, names = zip(*pipes)
+ index, data, closed = recv_multi_impl(conns, maxsize, timeout)
+ if index is None:
+ return index, data
+ if closed:
+ self._close(names[index])
+ if not data:
+ # Loop again. The other pipe may still be open.
+ if timeout:
+ timeout -= (time.time() - start)
+ continue
+
+ if self.universal_newlines:
+ data = self._translate_newlines(data)
+ return names[index], data
def recv_out(self, maxsize=None, timeout=None):
- """Reads from stdout asynchronously."""
+ """Reads from stdout synchronously with timeout."""
return self._recv('stdout', maxsize, timeout)
def recv_err(self, maxsize=None, timeout=None):
- """Reads from stderr asynchronously."""
+ """Reads from stderr synchronously with timeout."""
return self._recv('stderr', maxsize, timeout)
def _close(self, which):
+ """Closes either stdout or stderr."""
getattr(self, which).close()
setattr(self, which, None)
def _recv(self, which, maxsize, timeout):
+ """Reads from one of stdout or stderr synchronously with timeout."""
conn = getattr(self, which)
if conn is None:
return None
- data = recv_multi_impl([conn], maxsize, timeout)
- if not data:
- return self._close(which)
- if self.universal_newlines:
+ _, data, closed = recv_multi_impl([conn], maxsize, timeout)
+ if closed:
+ self._close(which)
+ if self.universal_newlines and data:
data = self._translate_newlines(data)
return data
if timeout:
out = ''
err = ''
- for t, data in proc.yield_any(timeout):
+ for t, data in proc.yield_any(hard_timeout=timeout):
if t == 'stdout':
out += data
else: