1 # Copyright 2013 The Swarming Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 that
3 # can be found in the LICENSE file.
5 """subprocess42 is the answer to life the universe and everything.
7 It has the particularity of having a Popen implementation that can yield output
8 as it is produced while implementing a timeout and not requiring the use of
11 TODO(maruel): Add VOID and TIMED_OUT support like subprocess2.
20 from subprocess import PIPE, STDOUT, call, check_output # pylint: disable=W0611
23 # Default maxsize argument.
27 if subprocess.mswindows:
28 import msvcrt # pylint: disable=F0401
29 from ctypes import wintypes
30 from ctypes import windll
32 def ReadFile(handle, desired_bytes):
33 """Calls kernel32.ReadFile()."""
34 c_read = wintypes.DWORD()
35 buff = wintypes.create_string_buffer(desired_bytes+1)
36 windll.kernel32.ReadFile(
37 handle, buff, desired_bytes, wintypes.byref(c_read), None)
39 buff[c_read.value] = '\x00'
40 return wintypes.GetLastError(), buff.value
42 def PeekNamedPipe(handle):
43 """Calls kernel32.PeekNamedPipe(). Simplified version."""
44 c_avail = wintypes.DWORD()
45 c_message = wintypes.DWORD()
46 success = windll.kernel32.PeekNamedPipe(
47 handle, None, 0, None, wintypes.byref(c_avail),
48 wintypes.byref(c_message))
50 raise OSError(wintypes.GetLastError())
53 def recv_multi_impl(conns, maxsize, timeout):
54 """Reads from the first available pipe.
56 It will immediately return on a closed connection, independent of timeout.
59 - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
60 - timeout: If None, it is blocking. If 0 or above, will return None if no
61 data is available within |timeout| seconds.
64 tuple(int(index), str(data), bool(closed)).
67 assert timeout is None or isinstance(timeout, (int, float)), timeout
68 maxsize = max(maxsize or MAX_SIZE, 1)
70 # TODO(maruel): Use WaitForMultipleObjects(). Python creates anonymous pipes
71 # for proc.stdout and proc.stderr but they are implemented as named pipes on
72 # Windows. Since named pipes are not waitable object, they can't be passed
73 # as-is to WFMO(). So this means N times CreateEvent(), N times ReadFile()
74 # and finally WFMO(). This requires caching the events handles in the Popen
75 # object and remembering the pending ReadFile() calls. This will require
76 # some re-architecture to store the relevant event handle and OVERLAPPEDIO
77 # object in Popen or the file object.
80 (i, msvcrt.get_osfhandle(c.fileno())) for i, c in enumerate(conns)
83 for index, handle in handles:
85 avail = min(PeekNamedPipe(handle), maxsize)
87 return index, ReadFile(handle, avail)[1], False
90 return index, None, True
92 if timeout is not None and (time.time() - start) >= timeout:
93 return None, None, False
98 import fcntl # pylint: disable=F0401
101 def recv_multi_impl(conns, maxsize, timeout):
102 """Reads from the first available pipe.
104 It will immediately return on a closed connection, independent of timeout.
107 - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
108 - timeout: If None, it is blocking. If 0 or above, will return None if no
109 data is available within |timeout| seconds.
112 tuple(int(index), str(data), bool(closed)).
115 assert timeout is None or isinstance(timeout, (int, float)), timeout
116 maxsize = max(maxsize or MAX_SIZE, 1)
118 # select(timeout=0) will block, it has to be a value > 0.
122 r, _, _ = select.select(conns, [], [], timeout)
126 return None, None, False
129 # Temporarily make it non-blocking.
130 # TODO(maruel): This is not very ifficient when the caller is doing this in
131 # a loop. Add a mechanism to have the caller handle this.
132 flags = fcntl.fcntl(conn, fcntl.F_GETFL)
134 # pylint: disable=E1101
135 fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
137 data = conn.read(maxsize)
139 # On posix, this means the channel closed.
140 return conns.index(conn), None, True
141 return conns.index(conn), data, False
144 fcntl.fcntl(conn, fcntl.F_SETFL, flags)
147 class Popen(subprocess.Popen):
148 """Adds timeout support on stdout and stderr.
151 http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/
153 def __init__(self, *args, **kwargs):
154 self.start = time.time()
156 super(Popen, self).__init__(*args, **kwargs)
159 """Duration of the child process.
161 It is greater or equal to the actual time the child process ran. It can be
162 significantly higher than the real value if neither .wait() nor .poll() was
165 return (self.end or time.time()) - self.start
168 ret = super(Popen, self).wait()
170 # communicate() uses wait() internally.
171 self.end = time.time()
175 ret = super(Popen, self).poll()
176 if ret is not None and not self.end:
177 self.end = time.time()
180 def yield_any(self, maxsize=None, hard_timeout=None, soft_timeout=None):
181 """Yields output until the process terminates or is killed by a timeout.
183 Yielded values are in the form (pipename, data).
186 - maxsize: See recv_any().
187 - hard_timeout: If None, the process is never killed. If set, the process is
188 killed after |hard_timeout| seconds.
189 - soft_timeout: If None, the call is blocking. If set, yields None, None
190 if no data is available within |soft_timeout| seconds. It resets
191 itself after each yield.
193 if hard_timeout is not None:
194 # hard_timeout=0 means the process is not even given a little chance to
195 # execute and will be immediately killed.
196 assert isinstance(hard_timeout, (int, float)) and hard_timeout > 0., (
198 if soft_timeout is not None:
199 # soft_timeout=0 effectively means that the pipe is continuously polled.
200 assert isinstance(soft_timeout, (int, float)) and soft_timeout >= 0, (
203 last_yield = time.time()
204 while self.poll() is None:
205 t, data = self.recv_any(
207 timeout=self._calc_timeout(hard_timeout, soft_timeout, last_yield))
208 if data or soft_timeout is not None:
210 last_yield = time.time()
212 if hard_timeout and self.duration() >= hard_timeout:
215 if self.poll() is None and hard_timeout:
216 logging.debug('Kill %s %s', self.duration(), hard_timeout)
220 # Read all remaining output in the pipes.
222 t, data = self.recv_any(maxsize=maxsize)
227 def _calc_timeout(self, hard_timeout, soft_timeout, last_yield):
228 """Returns the timeout to be used on the next recv_any() in yield_any().
230 It depends on both timeout. It returns None if no timeout is used. Otherwise
231 it returns a value >= 0.001. It's not 0 because it's effectively polling, on
232 linux it can peg a single core, so adding 1ms sleep does a tremendous
236 max(hard_timeout - self.duration(), 0) if hard_timeout else None)
238 max(soft_timeout - (time.time() - last_yield), 0)
239 if soft_timeout is not None else None)
240 if hard_remaining is None:
241 return soft_remaining
242 if soft_remaining is None:
243 return hard_remaining
244 return min(hard_remaining, soft_remaining)
246 def recv_any(self, maxsize=None, timeout=None):
247 """Reads from the first pipe available from stdout and stderr.
250 - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
251 - timeout: If None, it is blocking. If 0 or above, will return None if no
252 data is available within |timeout| seconds.
255 tuple(int(index) or None, str(data)).
257 # recv_multi_impl will early exit on a closed connection. Loop accordingly
258 # to simplify call sites.
261 x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0]
263 # If both stdout and stderr have the exact file handle, they are
264 # effectively the same pipe. Deduplicate it since otherwise it confuses
266 if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno():
272 conns, names = zip(*pipes)
273 index, data, closed = recv_multi_impl(conns, maxsize, timeout)
277 self._close(names[index])
279 # Loop again. The other pipe may still be open.
281 timeout -= (time.time() - start)
284 if self.universal_newlines:
285 data = self._translate_newlines(data)
286 return names[index], data
288 def recv_out(self, maxsize=None, timeout=None):
289 """Reads from stdout synchronously with timeout."""
290 return self._recv('stdout', maxsize, timeout)
292 def recv_err(self, maxsize=None, timeout=None):
293 """Reads from stderr synchronously with timeout."""
294 return self._recv('stderr', maxsize, timeout)
296 def _close(self, which):
297 """Closes either stdout or stderr."""
298 getattr(self, which).close()
299 setattr(self, which, None)
301 def _recv(self, which, maxsize, timeout):
302 """Reads from one of stdout or stderr synchronously with timeout."""
303 conn = getattr(self, which)
306 _, data, closed = recv_multi_impl([conn], maxsize, timeout)
309 if self.universal_newlines and data:
310 data = self._translate_newlines(data)
314 def call_with_timeout(cmd, timeout, **kwargs):
315 """Runs an executable with an optional timeout.
317 timeout 0 or None disables the timeout.
321 stdin=subprocess.PIPE,
322 stdout=subprocess.PIPE,
327 for t, data in proc.yield_any(hard_timeout=timeout):
333 # This code path is much faster.
334 out, err = proc.communicate()
335 return out, err, proc.returncode, proc.duration()