odroid: remove CONFIG_DM_I2C_COMPAT config
[platform/kernel/u-boot.git] / tools / patman / cros_subprocess.py
1 # Copyright (c) 2012 The Chromium OS Authors.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
4 #
5 # Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
6 # Licensed to PSF under a Contributor Agreement.
7 # See http://www.python.org/2.4/license for licensing details.
8
9 """Subprocress execution
10
11 This module holds a subclass of subprocess.Popen with our own required
12 features, mainly that we get access to the subprocess output while it
13 is running rather than just at the end. This makes it easiler to show
14 progress information and filter output in real time.
15 """
16
17 import errno
18 import os
19 import pty
20 import select
21 import subprocess
22 import sys
23 import unittest
24
25
26 # Import these here so the caller does not need to import subprocess also.
27 PIPE = subprocess.PIPE
28 STDOUT = subprocess.STDOUT
29 PIPE_PTY = -3     # Pipe output through a pty
30 stay_alive = True
31
32
33 class Popen(subprocess.Popen):
34     """Like subprocess.Popen with ptys and incremental output
35
36     This class deals with running a child process and filtering its output on
37     both stdout and stderr while it is running. We do this so we can monitor
38     progress, and possibly relay the output to the user if requested.
39
40     The class is similar to subprocess.Popen, the equivalent is something like:
41
42         Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
43
44     But this class has many fewer features, and two enhancement:
45
46     1. Rather than getting the output data only at the end, this class sends it
47          to a provided operation as it arrives.
48     2. We use pseudo terminals so that the child will hopefully flush its output
49          to us as soon as it is produced, rather than waiting for the end of a
50          line.
51
52     Use CommunicateFilter() to handle output from the subprocess.
53
54     """
55
56     def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY,
57                  shell=False, cwd=None, env=None, **kwargs):
58         """Cut-down constructor
59
60         Args:
61             args: Program and arguments for subprocess to execute.
62             stdin: See subprocess.Popen()
63             stdout: See subprocess.Popen(), except that we support the sentinel
64                     value of cros_subprocess.PIPE_PTY.
65             stderr: See subprocess.Popen(), except that we support the sentinel
66                     value of cros_subprocess.PIPE_PTY.
67             shell: See subprocess.Popen()
68             cwd: Working directory to change to for subprocess, or None if none.
69             env: Environment to use for this subprocess, or None to inherit parent.
70             kwargs: No other arguments are supported at the moment.    Passing other
71                     arguments will cause a ValueError to be raised.
72         """
73         stdout_pty = None
74         stderr_pty = None
75
76         if stdout == PIPE_PTY:
77             stdout_pty = pty.openpty()
78             stdout = os.fdopen(stdout_pty[1])
79         if stderr == PIPE_PTY:
80             stderr_pty = pty.openpty()
81             stderr = os.fdopen(stderr_pty[1])
82
83         super(Popen, self).__init__(args, stdin=stdin,
84                 stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env,
85                 **kwargs)
86
87         # If we're on a PTY, we passed the slave half of the PTY to the subprocess.
88         # We want to use the master half on our end from now on.    Setting this here
89         # does make some assumptions about the implementation of subprocess, but
90         # those assumptions are pretty minor.
91
92         # Note that if stderr is STDOUT, then self.stderr will be set to None by
93         # this constructor.
94         if stdout_pty is not None:
95             self.stdout = os.fdopen(stdout_pty[0])
96         if stderr_pty is not None:
97             self.stderr = os.fdopen(stderr_pty[0])
98
99         # Insist that unit tests exist for other arguments we don't support.
100         if kwargs:
101             raise ValueError("Unit tests do not test extra args - please add tests")
102
103     def CommunicateFilter(self, output):
104         """Interact with process: Read data from stdout and stderr.
105
106         This method runs until end-of-file is reached, then waits for the
107         subprocess to terminate.
108
109         The output function is sent all output from the subprocess and must be
110         defined like this:
111
112             def Output([self,] stream, data)
113             Args:
114                 stream: the stream the output was received on, which will be
115                         sys.stdout or sys.stderr.
116                 data: a string containing the data
117
118         Note: The data read is buffered in memory, so do not use this
119         method if the data size is large or unlimited.
120
121         Args:
122             output: Function to call with each fragment of output.
123
124         Returns:
125             A tuple (stdout, stderr, combined) which is the data received on
126             stdout, stderr and the combined data (interleaved stdout and stderr).
127
128             Note that the interleaved output will only be sensible if you have
129             set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on
130             the timing of the output in the subprocess. If a subprocess flips
131             between stdout and stderr quickly in succession, by the time we come to
132             read the output from each we may see several lines in each, and will read
133             all the stdout lines, then all the stderr lines. So the interleaving
134             may not be correct. In this case you might want to pass
135             stderr=cros_subprocess.STDOUT to the constructor.
136
137             This feature is still useful for subprocesses where stderr is
138             rarely used and indicates an error.
139
140             Note also that if you set stderr to STDOUT, then stderr will be empty
141             and the combined output will just be the same as stdout.
142         """
143
144         read_set = []
145         write_set = []
146         stdout = None # Return
147         stderr = None # Return
148
149         if self.stdin:
150             # Flush stdio buffer.    This might block, if the user has
151             # been writing to .stdin in an uncontrolled fashion.
152             self.stdin.flush()
153             if input:
154                 write_set.append(self.stdin)
155             else:
156                 self.stdin.close()
157         if self.stdout:
158             read_set.append(self.stdout)
159             stdout = []
160         if self.stderr and self.stderr != self.stdout:
161             read_set.append(self.stderr)
162             stderr = []
163         combined = []
164
165         input_offset = 0
166         while read_set or write_set:
167             try:
168                 rlist, wlist, _ = select.select(read_set, write_set, [], 0.2)
169             except select.error as e:
170                 if e.args[0] == errno.EINTR:
171                     continue
172                 raise
173
174             if not stay_alive:
175                     self.terminate()
176
177             if self.stdin in wlist:
178                 # When select has indicated that the file is writable,
179                 # we can write up to PIPE_BUF bytes without risk
180                 # blocking.    POSIX defines PIPE_BUF >= 512
181                 chunk = input[input_offset : input_offset + 512]
182                 bytes_written = os.write(self.stdin.fileno(), chunk)
183                 input_offset += bytes_written
184                 if input_offset >= len(input):
185                     self.stdin.close()
186                     write_set.remove(self.stdin)
187
188             if self.stdout in rlist:
189                 data = ""
190                 # We will get an error on read if the pty is closed
191                 try:
192                     data = os.read(self.stdout.fileno(), 1024)
193                     if isinstance(data, bytes):
194                         data = data.decode('utf-8')
195                 except OSError:
196                     pass
197                 if data == "":
198                     self.stdout.close()
199                     read_set.remove(self.stdout)
200                 else:
201                     stdout.append(data)
202                     combined.append(data)
203                     if output:
204                         output(sys.stdout, data)
205             if self.stderr in rlist:
206                 data = ""
207                 # We will get an error on read if the pty is closed
208                 try:
209                     data = os.read(self.stderr.fileno(), 1024)
210                     if isinstance(data, bytes):
211                         data = data.decode('utf-8')
212                 except OSError:
213                     pass
214                 if data == "":
215                     self.stderr.close()
216                     read_set.remove(self.stderr)
217                 else:
218                     stderr.append(data)
219                     combined.append(data)
220                     if output:
221                         output(sys.stderr, data)
222
223         # All data exchanged.    Translate lists into strings.
224         if stdout is not None:
225             stdout = ''.join(stdout)
226         else:
227             stdout = ''
228         if stderr is not None:
229             stderr = ''.join(stderr)
230         else:
231             stderr = ''
232         combined = ''.join(combined)
233
234         # Translate newlines, if requested.    We cannot let the file
235         # object do the translation: It is based on stdio, which is
236         # impossible to combine with select (unless forcing no
237         # buffering).
238         if self.universal_newlines and hasattr(file, 'newlines'):
239             if stdout:
240                 stdout = self._translate_newlines(stdout)
241             if stderr:
242                 stderr = self._translate_newlines(stderr)
243
244         self.wait()
245         return (stdout, stderr, combined)
246
247
248 # Just being a unittest.TestCase gives us 14 public methods.    Unless we
249 # disable this, we can only have 6 tests in a TestCase.    That's not enough.
250 #
251 # pylint: disable=R0904
252
253 class TestSubprocess(unittest.TestCase):
254     """Our simple unit test for this module"""
255
256     class MyOperation:
257         """Provides a operation that we can pass to Popen"""
258         def __init__(self, input_to_send=None):
259             """Constructor to set up the operation and possible input.
260
261             Args:
262                 input_to_send: a text string to send when we first get input. We will
263                     add \r\n to the string.
264             """
265             self.stdout_data = ''
266             self.stderr_data = ''
267             self.combined_data = ''
268             self.stdin_pipe = None
269             self._input_to_send = input_to_send
270             if input_to_send:
271                 pipe = os.pipe()
272                 self.stdin_read_pipe = pipe[0]
273                 self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
274
275         def Output(self, stream, data):
276             """Output handler for Popen. Stores the data for later comparison"""
277             if stream == sys.stdout:
278                 self.stdout_data += data
279             if stream == sys.stderr:
280                 self.stderr_data += data
281             self.combined_data += data
282
283             # Output the input string if we have one.
284             if self._input_to_send:
285                 self._stdin_write_pipe.write(self._input_to_send + '\r\n')
286                 self._stdin_write_pipe.flush()
287
288     def _BasicCheck(self, plist, oper):
289         """Basic checks that the output looks sane."""
290         self.assertEqual(plist[0], oper.stdout_data)
291         self.assertEqual(plist[1], oper.stderr_data)
292         self.assertEqual(plist[2], oper.combined_data)
293
294         # The total length of stdout and stderr should equal the combined length
295         self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2]))
296
297     def test_simple(self):
298         """Simple redirection: Get process list"""
299         oper = TestSubprocess.MyOperation()
300         plist = Popen(['ps']).CommunicateFilter(oper.Output)
301         self._BasicCheck(plist, oper)
302
303     def test_stderr(self):
304         """Check stdout and stderr"""
305         oper = TestSubprocess.MyOperation()
306         cmd = 'echo fred >/dev/stderr && false || echo bad'
307         plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
308         self._BasicCheck(plist, oper)
309         self.assertEqual(plist [0], 'bad\r\n')
310         self.assertEqual(plist [1], 'fred\r\n')
311
312     def test_shell(self):
313         """Check with and without shell works"""
314         oper = TestSubprocess.MyOperation()
315         cmd = 'echo test >/dev/stderr'
316         self.assertRaises(OSError, Popen, [cmd], shell=False)
317         plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
318         self._BasicCheck(plist, oper)
319         self.assertEqual(len(plist [0]), 0)
320         self.assertEqual(plist [1], 'test\r\n')
321
322     def test_list_args(self):
323         """Check with and without shell works using list arguments"""
324         oper = TestSubprocess.MyOperation()
325         cmd = ['echo', 'test', '>/dev/stderr']
326         plist = Popen(cmd, shell=False).CommunicateFilter(oper.Output)
327         self._BasicCheck(plist, oper)
328         self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n')
329         self.assertEqual(len(plist [1]), 0)
330
331         oper = TestSubprocess.MyOperation()
332
333         # this should be interpreted as 'echo' with the other args dropped
334         cmd = ['echo', 'test', '>/dev/stderr']
335         plist = Popen(cmd, shell=True).CommunicateFilter(oper.Output)
336         self._BasicCheck(plist, oper)
337         self.assertEqual(plist [0], '\r\n')
338
339     def test_cwd(self):
340         """Check we can change directory"""
341         for shell in (False, True):
342             oper = TestSubprocess.MyOperation()
343             plist = Popen('pwd', shell=shell, cwd='/tmp').CommunicateFilter(oper.Output)
344             self._BasicCheck(plist, oper)
345             self.assertEqual(plist [0], '/tmp\r\n')
346
347     def test_env(self):
348         """Check we can change environment"""
349         for add in (False, True):
350             oper = TestSubprocess.MyOperation()
351             env = os.environ
352             if add:
353                 env ['FRED'] = 'fred'
354             cmd = 'echo $FRED'
355             plist = Popen(cmd, shell=True, env=env).CommunicateFilter(oper.Output)
356             self._BasicCheck(plist, oper)
357             self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n')
358
359     def test_extra_args(self):
360         """Check we can't add extra arguments"""
361         self.assertRaises(ValueError, Popen, 'true', close_fds=False)
362
363     def test_basic_input(self):
364         """Check that incremental input works
365
366         We set up a subprocess which will prompt for name. When we see this prompt
367         we send the name as input to the process. It should then print the name
368         properly to stdout.
369         """
370         oper = TestSubprocess.MyOperation('Flash')
371         prompt = 'What is your name?: '
372         cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
373         plist = Popen([cmd], stdin=oper.stdin_read_pipe,
374                 shell=True).CommunicateFilter(oper.Output)
375         self._BasicCheck(plist, oper)
376         self.assertEqual(len(plist [1]), 0)
377         self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n')
378
379     def test_isatty(self):
380         """Check that ptys appear as terminals to the subprocess"""
381         oper = TestSubprocess.MyOperation()
382         cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; '
383                 'else echo "not %d" >&%d; fi;')
384         both_cmds = ''
385         for fd in (1, 2):
386             both_cmds += cmd % (fd, fd, fd, fd, fd)
387         plist = Popen(both_cmds, shell=True).CommunicateFilter(oper.Output)
388         self._BasicCheck(plist, oper)
389         self.assertEqual(plist [0], 'terminal 1\r\n')
390         self.assertEqual(plist [1], 'terminal 2\r\n')
391
392         # Now try with PIPE and make sure it is not a terminal
393         oper = TestSubprocess.MyOperation()
394         plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
395                 shell=True).CommunicateFilter(oper.Output)
396         self._BasicCheck(plist, oper)
397         self.assertEqual(plist [0], 'not 1\n')
398         self.assertEqual(plist [1], 'not 2\n')
399
400 if __name__ == '__main__':
401     unittest.main()