Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / tools / swarming_client / utils / subprocess42.py
1 # Copyright 2013 The Swarming Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 that
3 # can be found in the LICENSE file.
4
5 """subprocess42 is the answer to life the universe and everything.
6
7 It has the particularity of having a Popen implementation that can yield output
8 as it is produced while implementing a timeout and not requiring the use of
9 worker threads.
10
11 TODO(maruel): Add VOID and TIMED_OUT support like subprocess2.
12 """
13
14 import logging
15 import os
16 import time
17
18 import subprocess
19
20 from subprocess import PIPE, STDOUT, call, check_output  # pylint: disable=W0611
21
22
23 # Default maxsize argument.
24 MAX_SIZE = 16384
25
26
27 if subprocess.mswindows:
28   import msvcrt  # pylint: disable=F0401
29   from ctypes import wintypes
30   from ctypes import windll
31
32   def ReadFile(handle, desired_bytes):
33     """Calls kernel32.ReadFile()."""
34     c_read = wintypes.DWORD()
35     buff = wintypes.create_string_buffer(desired_bytes+1)
36     windll.kernel32.ReadFile(
37         handle, buff, desired_bytes, wintypes.byref(c_read), None)
38     # NULL terminate it.
39     buff[c_read.value] = '\x00'
40     return wintypes.GetLastError(), buff.value
41
42   def PeekNamedPipe(handle):
43     """Calls kernel32.PeekNamedPipe(). Simplified version."""
44     c_avail = wintypes.DWORD()
45     c_message = wintypes.DWORD()
46     success = windll.kernel32.PeekNamedPipe(
47         handle, None, 0, None, wintypes.byref(c_avail),
48         wintypes.byref(c_message))
49     if not success:
50       raise OSError(wintypes.GetLastError())
51     return c_avail.value
52
53   def recv_multi_impl(conns, maxsize, timeout):
54     """Reads from the first available pipe.
55
56     It will immediately return on a closed connection, independent of timeout.
57
58     Arguments:
59     - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
60     - timeout: If None, it is blocking. If 0 or above, will return None if no
61           data is available within |timeout| seconds.
62
63     Returns:
64       tuple(int(index), str(data), bool(closed)).
65     """
66     assert conns
67     assert timeout is None or isinstance(timeout, (int, float)), timeout
68     maxsize = max(maxsize or MAX_SIZE, 1)
69
70     # TODO(maruel): Use WaitForMultipleObjects(). Python creates anonymous pipes
71     # for proc.stdout and proc.stderr but they are implemented as named pipes on
72     # Windows. Since named pipes are not waitable object, they can't be passed
73     # as-is to WFMO(). So this means N times CreateEvent(), N times ReadFile()
74     # and finally WFMO(). This requires caching the events handles in the Popen
75     # object and remembering the pending ReadFile() calls. This will require
76     # some re-architecture to store the relevant event handle and OVERLAPPEDIO
77     # object in Popen or the file object.
78     start = time.time()
79     handles = [
80       (i, msvcrt.get_osfhandle(c.fileno())) for i, c in enumerate(conns)
81     ]
82     while True:
83       for index, handle in handles:
84         try:
85           avail = min(PeekNamedPipe(handle), maxsize)
86           if avail:
87             return index, ReadFile(handle, avail)[1], False
88         except OSError:
89           # The pipe closed.
90           return index, None, True
91
92       if timeout is not None and (time.time() - start) >= timeout:
93         return None, None, False
94       # Polling rocks.
95       time.sleep(0.001)
96
97 else:
98   import fcntl  # pylint: disable=F0401
99   import select
100
101   def recv_multi_impl(conns, maxsize, timeout):
102     """Reads from the first available pipe.
103
104     It will immediately return on a closed connection, independent of timeout.
105
106     Arguments:
107     - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
108     - timeout: If None, it is blocking. If 0 or above, will return None if no
109           data is available within |timeout| seconds.
110
111     Returns:
112       tuple(int(index), str(data), bool(closed)).
113     """
114     assert conns
115     assert timeout is None or isinstance(timeout, (int, float)), timeout
116     maxsize = max(maxsize or MAX_SIZE, 1)
117
118     # select(timeout=0) will block, it has to be a value > 0.
119     if timeout == 0:
120       timeout = 0.001
121     try:
122       r, _, _ = select.select(conns, [], [], timeout)
123     except select.error:
124       r = None
125     if not r:
126       return None, None, False
127
128     conn = r[0]
129     # Temporarily make it non-blocking.
130     # TODO(maruel): This is not very ifficient when the caller is doing this in
131     # a loop. Add a mechanism to have the caller handle this.
132     flags = fcntl.fcntl(conn, fcntl.F_GETFL)
133     if not conn.closed:
134       # pylint: disable=E1101
135       fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
136     try:
137       data = conn.read(maxsize)
138       if not data:
139         # On posix, this means the channel closed.
140         return conns.index(conn), None, True
141       return conns.index(conn), data, False
142     finally:
143       if not conn.closed:
144         fcntl.fcntl(conn, fcntl.F_SETFL, flags)
145
146
147 class Popen(subprocess.Popen):
148   """Adds timeout support on stdout and stderr.
149
150   Inspired by
151   http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/
152   """
153   def __init__(self, *args, **kwargs):
154     self.start = time.time()
155     self.end = None
156     super(Popen, self).__init__(*args, **kwargs)
157
158   def duration(self):
159     """Duration of the child process.
160
161     It is greater or equal to the actual time the child process ran. It can be
162     significantly higher than the real value if neither .wait() nor .poll() was
163     used.
164     """
165     return (self.end or time.time()) - self.start
166
167   def wait(self):
168     ret = super(Popen, self).wait()
169     if not self.end:
170       # communicate() uses wait() internally.
171       self.end = time.time()
172     return ret
173
174   def poll(self):
175     ret = super(Popen, self).poll()
176     if ret is not None and not self.end:
177       self.end = time.time()
178     return ret
179
180   def yield_any(self, maxsize=None, hard_timeout=None, soft_timeout=None):
181     """Yields output until the process terminates or is killed by a timeout.
182
183     Yielded values are in the form (pipename, data).
184
185     Arguments:
186     - maxsize: See recv_any().
187     - hard_timeout: If None, the process is never killed. If set, the process is
188           killed after |hard_timeout| seconds.
189     - soft_timeout: If None, the call is blocking. If set, yields None, None
190           if no data is available within |soft_timeout| seconds. It resets
191           itself after each yield.
192     """
193     if hard_timeout is not None:
194       # hard_timeout=0 means the process is not even given a little chance to
195       # execute and will be immediately killed.
196       assert isinstance(hard_timeout, (int, float)) and hard_timeout > 0., (
197           hard_timeout)
198     if soft_timeout is not None:
199       # soft_timeout=0 effectively means that the pipe is continuously polled.
200       assert isinstance(soft_timeout, (int, float)) and soft_timeout >= 0, (
201           soft_timeout)
202
203     last_yield = time.time()
204     while self.poll() is None:
205       t, data = self.recv_any(
206           maxsize=maxsize,
207           timeout=self._calc_timeout(hard_timeout, soft_timeout, last_yield))
208       if data or soft_timeout is not None:
209         yield t, data
210         last_yield = time.time()
211
212       if hard_timeout and self.duration() >= hard_timeout:
213         break
214
215     if self.poll() is None and hard_timeout:
216       logging.debug('Kill %s %s', self.duration(), hard_timeout)
217       self.kill()
218     self.wait()
219
220     # Read all remaining output in the pipes.
221     while True:
222       t, data = self.recv_any(maxsize=maxsize)
223       if not data:
224         break
225       yield t, data
226
227   def _calc_timeout(self, hard_timeout, soft_timeout, last_yield):
228     """Returns the timeout to be used on the next recv_any() in yield_any().
229
230     It depends on both timeout. It returns None if no timeout is used. Otherwise
231     it returns a value >= 0.001. It's not 0 because it's effectively polling, on
232     linux it can peg a single core, so adding 1ms sleep does a tremendous
233     difference.
234     """
235     hard_remaining = (
236         max(hard_timeout - self.duration(), 0) if hard_timeout else None)
237     soft_remaining = (
238         max(soft_timeout - (time.time() - last_yield), 0)
239         if soft_timeout is not None else None)
240     if hard_remaining is None:
241       return soft_remaining
242     if soft_remaining is None:
243       return hard_remaining
244     return min(hard_remaining, soft_remaining)
245
246   def recv_any(self, maxsize=None, timeout=None):
247     """Reads from the first pipe available from stdout and stderr.
248
249     Arguments:
250     - maxsize: Maximum number of bytes to return. Defaults to MAX_SIZE.
251     - timeout: If None, it is blocking. If 0 or above, will return None if no
252           data is available within |timeout| seconds.
253
254     Returns:
255       tuple(int(index) or None, str(data)).
256     """
257     # recv_multi_impl will early exit on a closed connection. Loop accordingly
258     # to simplify call sites.
259     while True:
260       pipes = [
261         x for x in ((self.stderr, 'stderr'), (self.stdout, 'stdout')) if x[0]
262       ]
263       # If both stdout and stderr have the exact file handle, they are
264       # effectively the same pipe. Deduplicate it since otherwise it confuses
265       # recv_multi_impl().
266       if len(pipes) == 2 and self.stderr.fileno() == self.stdout.fileno():
267         pipes.pop(0)
268
269       if not pipes:
270         return None, None
271       start = time.time()
272       conns, names = zip(*pipes)
273       index, data, closed = recv_multi_impl(conns, maxsize, timeout)
274       if index is None:
275         return index, data
276       if closed:
277         self._close(names[index])
278         if not data:
279           # Loop again. The other pipe may still be open.
280           if timeout:
281             timeout -= (time.time() - start)
282           continue
283
284       if self.universal_newlines:
285         data = self._translate_newlines(data)
286       return names[index], data
287
288   def recv_out(self, maxsize=None, timeout=None):
289     """Reads from stdout synchronously with timeout."""
290     return self._recv('stdout', maxsize, timeout)
291
292   def recv_err(self, maxsize=None, timeout=None):
293     """Reads from stderr synchronously with timeout."""
294     return self._recv('stderr', maxsize, timeout)
295
296   def _close(self, which):
297     """Closes either stdout or stderr."""
298     getattr(self, which).close()
299     setattr(self, which, None)
300
301   def _recv(self, which, maxsize, timeout):
302     """Reads from one of stdout or stderr synchronously with timeout."""
303     conn = getattr(self, which)
304     if conn is None:
305       return None
306     _, data, closed = recv_multi_impl([conn], maxsize, timeout)
307     if closed:
308       self._close(which)
309     if self.universal_newlines and data:
310       data = self._translate_newlines(data)
311     return data
312
313
314 def call_with_timeout(cmd, timeout, **kwargs):
315   """Runs an executable with an optional timeout.
316
317   timeout 0 or None disables the timeout.
318   """
319   proc = Popen(
320       cmd,
321       stdin=subprocess.PIPE,
322       stdout=subprocess.PIPE,
323       **kwargs)
324   if timeout:
325     out = ''
326     err = ''
327     for t, data in proc.yield_any(hard_timeout=timeout):
328       if t == 'stdout':
329         out += data
330       else:
331         err += data
332   else:
333     # This code path is much faster.
334     out, err = proc.communicate()
335   return out, err, proc.returncode, proc.duration()