Merge tag 'v2022.01-rc3' into next
[platform/kernel/u-boot.git] / test / py / u_boot_utils.py
1 # SPDX-License-Identifier: GPL-2.0
2 # Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved.
3
4 """
5 Utility code shared across multiple tests.
6 """
7
8 import hashlib
9 import inspect
10 import os
11 import os.path
12 import pathlib
13 import signal
14 import sys
15 import time
16 import re
17 import pytest
18
19 def md5sum_data(data):
20     """Calculate the MD5 hash of some data.
21
22     Args:
23         data: The data to hash.
24
25     Returns:
26         The hash of the data, as a binary string.
27     """
28
29     h = hashlib.md5()
30     h.update(data)
31     return h.digest()
32
33 def md5sum_file(fn, max_length=None):
34     """Calculate the MD5 hash of the contents of a file.
35
36     Args:
37         fn: The filename of the file to hash.
38         max_length: The number of bytes to hash. If the file has more
39             bytes than this, they will be ignored. If None or omitted, the
40             entire file will be hashed.
41
42     Returns:
43         The hash of the file content, as a binary string.
44     """
45
46     with open(fn, 'rb') as fh:
47         if max_length:
48             params = [max_length]
49         else:
50             params = []
51         data = fh.read(*params)
52     return md5sum_data(data)
53
54 class PersistentRandomFile:
55     """Generate and store information about a persistent file containing
56     random data."""
57
58     def __init__(self, u_boot_console, fn, size):
59         """Create or process the persistent file.
60
61         If the file does not exist, it is generated.
62
63         If the file does exist, its content is hashed for later comparison.
64
65         These files are always located in the "persistent data directory" of
66         the current test run.
67
68         Args:
69             u_boot_console: A console connection to U-Boot.
70             fn: The filename (without path) to create.
71             size: The desired size of the file in bytes.
72
73         Returns:
74             Nothing.
75         """
76
77         self.fn = fn
78
79         self.abs_fn = u_boot_console.config.persistent_data_dir + '/' + fn
80
81         if os.path.exists(self.abs_fn):
82             u_boot_console.log.action('Persistent data file ' + self.abs_fn +
83                 ' already exists')
84             self.content_hash = md5sum_file(self.abs_fn)
85         else:
86             u_boot_console.log.action('Generating ' + self.abs_fn +
87                 ' (random, persistent, %d bytes)' % size)
88             data = os.urandom(size)
89             with open(self.abs_fn, 'wb') as fh:
90                 fh.write(data)
91             self.content_hash = md5sum_data(data)
92
93 def attempt_to_open_file(fn):
94     """Attempt to open a file, without throwing exceptions.
95
96     Any errors (exceptions) that occur during the attempt to open the file
97     are ignored. This is useful in order to test whether a file (in
98     particular, a device node) exists and can be successfully opened, in order
99     to poll for e.g. USB enumeration completion.
100
101     Args:
102         fn: The filename to attempt to open.
103
104     Returns:
105         An open file handle to the file, or None if the file could not be
106             opened.
107     """
108
109     try:
110         return open(fn, 'rb')
111     except:
112         return None
113
114 def wait_until_open_succeeds(fn):
115     """Poll until a file can be opened, or a timeout occurs.
116
117     Continually attempt to open a file, and return when this succeeds, or
118     raise an exception after a timeout.
119
120     Args:
121         fn: The filename to attempt to open.
122
123     Returns:
124         An open file handle to the file.
125     """
126
127     for i in range(100):
128         fh = attempt_to_open_file(fn)
129         if fh:
130             return fh
131         time.sleep(0.1)
132     raise Exception('File could not be opened')
133
134 def wait_until_file_open_fails(fn, ignore_errors):
135     """Poll until a file cannot be opened, or a timeout occurs.
136
137     Continually attempt to open a file, and return when this fails, or
138     raise an exception after a timeout.
139
140     Args:
141         fn: The filename to attempt to open.
142         ignore_errors: Indicate whether to ignore timeout errors. If True, the
143             function will simply return if a timeout occurs, otherwise an
144             exception will be raised.
145
146     Returns:
147         Nothing.
148     """
149
150     for _ in range(100):
151         fh = attempt_to_open_file(fn)
152         if not fh:
153             return
154         fh.close()
155         time.sleep(0.1)
156     if ignore_errors:
157         return
158     raise Exception('File can still be opened')
159
160 def run_and_log(u_boot_console, cmd, ignore_errors=False, stdin=None):
161     """Run a command and log its output.
162
163     Args:
164         u_boot_console: A console connection to U-Boot.
165         cmd: The command to run, as an array of argv[], or a string.
166             If a string, note that it is split up so that quoted spaces
167             will not be preserved. E.g. "fred and" becomes ['"fred', 'and"']
168         ignore_errors: Indicate whether to ignore errors. If True, the function
169             will simply return if the command cannot be executed or exits with
170             an error code, otherwise an exception will be raised if such
171             problems occur.
172         stdin: Input string to pass to the command as stdin (or None)
173
174     Returns:
175         The output as a string.
176     """
177     if isinstance(cmd, str):
178         cmd = cmd.split()
179     runner = u_boot_console.log.get_runner(cmd[0], sys.stdout)
180     output = runner.run(cmd, ignore_errors=ignore_errors, stdin=stdin)
181     runner.close()
182     return output
183
184 def run_and_log_expect_exception(u_boot_console, cmd, retcode, msg):
185     """Run a command that is expected to fail.
186
187     This runs a command and checks that it fails with the expected return code
188     and exception method. If not, an exception is raised.
189
190     Args:
191         u_boot_console: A console connection to U-Boot.
192         cmd: The command to run, as an array of argv[].
193         retcode: Expected non-zero return code from the command.
194         msg: String that should be contained within the command's output.
195     """
196     try:
197         runner = u_boot_console.log.get_runner(cmd[0], sys.stdout)
198         runner.run(cmd)
199     except Exception:
200         assert retcode == runner.exit_status
201         assert msg in runner.output
202     else:
203         raise Exception("Expected an exception with retcode %d message '%s',"
204                         "but it was not raised" % (retcode, msg))
205     finally:
206         runner.close()
207
208 ram_base = None
209 def find_ram_base(u_boot_console):
210     """Find the running U-Boot's RAM location.
211
212     Probe the running U-Boot to determine the address of the first bank
213     of RAM. This is useful for tests that test reading/writing RAM, or
214     load/save files that aren't associated with some standard address
215     typically represented in an environment variable such as
216     ${kernel_addr_r}. The value is cached so that it only needs to be
217     actively read once.
218
219     Args:
220         u_boot_console: A console connection to U-Boot.
221
222     Returns:
223         The address of U-Boot's first RAM bank, as an integer.
224     """
225
226     global ram_base
227     if u_boot_console.config.buildconfig.get('config_cmd_bdi', 'n') != 'y':
228         pytest.skip('bdinfo command not supported')
229     if ram_base == -1:
230         pytest.skip('Previously failed to find RAM bank start')
231     if ram_base is not None:
232         return ram_base
233
234     with u_boot_console.log.section('find_ram_base'):
235         response = u_boot_console.run_command('bdinfo')
236         for l in response.split('\n'):
237             if '-> start' in l or 'memstart    =' in l:
238                 ram_base = int(l.split('=')[1].strip(), 16)
239                 break
240         if ram_base is None:
241             ram_base = -1
242             raise Exception('Failed to find RAM bank start in `bdinfo`')
243
244     # We don't want ram_base to be zero as some functions test if the given
245     # address is NULL (0). Besides, on some RISC-V targets the low memory
246     # is protected that prevents S-mode U-Boot from access.
247     # Let's add 2MiB then (size of an ARM LPAE/v8 section).
248
249     ram_base += 1024 * 1024 * 2
250
251     return ram_base
252
253 class PersistentFileHelperCtxMgr(object):
254     """A context manager for Python's "with" statement, which ensures that any
255     generated file is deleted (and hence regenerated) if its mtime is older
256     than the mtime of the Python module which generated it, and gets an mtime
257     newer than the mtime of the Python module which generated after it is
258     generated. Objects of this type should be created by factory function
259     persistent_file_helper rather than directly."""
260
261     def __init__(self, log, filename):
262         """Initialize a new object.
263
264         Args:
265             log: The Logfile object to log to.
266             filename: The filename of the generated file.
267
268         Returns:
269             Nothing.
270         """
271
272         self.log = log
273         self.filename = filename
274
275     def __enter__(self):
276         frame = inspect.stack()[1]
277         module = inspect.getmodule(frame[0])
278         self.module_filename = module.__file__
279         self.module_timestamp = os.path.getmtime(self.module_filename)
280
281         if os.path.exists(self.filename):
282             filename_timestamp = os.path.getmtime(self.filename)
283             if filename_timestamp < self.module_timestamp:
284                 self.log.action('Removing stale generated file ' +
285                     self.filename)
286                 pathlib.Path(self.filename).unlink()
287
288     def __exit__(self, extype, value, traceback):
289         if extype:
290             try:
291                 pathlib.Path(self.filename).unlink()
292             except Exception:
293                 pass
294             return
295         logged = False
296         for _ in range(20):
297             filename_timestamp = os.path.getmtime(self.filename)
298             if filename_timestamp > self.module_timestamp:
299                 break
300             if not logged:
301                 self.log.action(
302                     'Waiting for generated file timestamp to increase')
303                 logged = True
304             os.utime(self.filename)
305             time.sleep(0.1)
306
307 def persistent_file_helper(u_boot_log, filename):
308     """Manage the timestamps and regeneration of a persistent generated
309     file. This function creates a context manager for Python's "with"
310     statement
311
312     Usage:
313         with persistent_file_helper(u_boot_console.log, filename):
314             code to generate the file, if it's missing.
315
316     Args:
317         u_boot_log: u_boot_console.log.
318         filename: The filename of the generated file.
319
320     Returns:
321         A context manager object.
322     """
323
324     return PersistentFileHelperCtxMgr(u_boot_log, filename)
325
326 def crc32(u_boot_console, address, count):
327     """Helper function used to compute the CRC32 value of a section of RAM.
328
329     Args:
330         u_boot_console: A U-Boot console connection.
331         address: Address where data starts.
332         count: Amount of data to use for calculation.
333
334     Returns:
335         CRC32 value
336     """
337
338     bcfg = u_boot_console.config.buildconfig
339     has_cmd_crc32 = bcfg.get('config_cmd_crc32', 'n') == 'y'
340     assert has_cmd_crc32, 'Cannot compute crc32 without CONFIG_CMD_CRC32.'
341     output = u_boot_console.run_command('crc32 %08x %x' % (address, count))
342
343     m = re.search('==> ([0-9a-fA-F]{8})$', output)
344     assert m, 'CRC32 operation failed.'
345
346     return m.group(1)
347
348 def waitpid(pid, timeout=60, kill=False):
349     """Wait a process to terminate by its PID
350
351     This is an alternative to a os.waitpid(pid, 0) call that works on
352     processes that aren't children of the python process.
353
354     Args:
355         pid: PID of a running process.
356         timeout: Time in seconds to wait.
357         kill: Whether to forcibly kill the process after timeout.
358
359     Returns:
360         True, if the process ended on its own.
361         False, if the process was killed by this function.
362
363     Raises:
364         TimeoutError, if the process is still running after timeout.
365     """
366     try:
367         for _ in range(timeout):
368             os.kill(pid, 0)
369             time.sleep(1)
370
371         if kill:
372             os.kill(pid, signal.SIGKILL)
373             return False
374
375     except ProcessLookupError:
376         return True
377
378     raise TimeoutError(
379         "Process with PID {} did not terminate after {} seconds."
380         .format(pid, timeout)
381     )