2 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
6 """Unittests for the osutils.py module (imagine that!)."""
8 from __future__ import print_function
12 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(
13 os.path.abspath(__file__)))))
15 from chromite.lib import cros_build_lib
16 from chromite.lib import cros_build_lib_unittest
17 from chromite.lib import cros_test_lib
18 from chromite.lib import osutils
19 from chromite.lib import partial_mock
21 # TODO(build): Finish test wrapper (http://crosbug.com/37517).
22 # Until then, this has to be after the chromite imports.
25 class TestOsutils(cros_test_lib.TempDirTestCase):
26 """General unittests for the osutils module."""
28 def testReadWriteFile(self):
29 """Verify we can write data to a file, and then read it back."""
30 filename = os.path.join(self.tempdir, 'foo')
31 data = 'alsdkfjasldkfjaskdlfjasdf'
32 self.assertEqual(osutils.WriteFile(filename, data), None)
33 self.assertEqual(osutils.ReadFile(filename), data)
35 def testSafeUnlink(self):
36 """Test unlinking files work (existing or not)."""
37 def f(dirname, sudo=False):
38 dirname = os.path.join(self.tempdir, dirname)
39 path = os.path.join(dirname, 'foon')
41 open(path, 'w').close()
42 self.assertTrue(os.path.exists(path))
44 cros_build_lib.SudoRunCommand(
45 ['chown', 'root:root', '-R', '--', dirname], print_cmd=False)
46 self.assertRaises(EnvironmentError, os.unlink, path)
47 self.assertTrue(osutils.SafeUnlink(path, sudo=sudo))
48 self.assertFalse(os.path.exists(path))
49 self.assertFalse(osutils.SafeUnlink(path))
50 self.assertFalse(os.path.exists(path))
55 def testSafeMakedirs(self):
56 """Test creating directory trees work (existing or not)."""
57 path = os.path.join(self.tempdir, 'a', 'b', 'c', 'd', 'e')
58 self.assertTrue(osutils.SafeMakedirs(path))
59 self.assertTrue(os.path.exists(path))
60 self.assertFalse(osutils.SafeMakedirs(path))
61 self.assertTrue(os.path.exists(path))
63 def testSafeMakedirs_error(self):
64 """Check error paths."""
65 self.assertRaises(OSError, osutils.SafeMakedirs, '/foo/bar/cow/moo/wee')
66 self.assertRaises(OSError, osutils.SafeMakedirs, '')
68 def testSafeMakedirsSudo(self):
69 """Test creating directory trees work as root (existing or not)."""
70 path = os.path.join(self.tempdir, 'a', 'b', 'c', 'd', 'e')
71 self.assertTrue(osutils.SafeMakedirs(path, sudo=True))
72 self.assertTrue(os.path.exists(path))
73 self.assertFalse(osutils.SafeMakedirs(path, sudo=True))
74 self.assertTrue(os.path.exists(path))
75 self.assertEqual(os.stat(path).st_uid, 0)
76 # Have to manually clean up as a non-root `rm -rf` will fail.
77 cros_build_lib.SudoRunCommand(['rm', '-rf', self.tempdir], print_cmd=False)
80 """Test that removing dirs work."""
81 path = os.path.join(self.tempdir, 'a', 'b', 'c', 'd', 'e')
83 self.assertRaises(EnvironmentError, osutils.RmDir, path)
84 osutils.SafeMakedirs(path)
86 osutils.RmDir(path, ignore_missing=True)
87 self.assertRaises(EnvironmentError, osutils.RmDir, path)
89 osutils.SafeMakedirs(path)
91 self.assertFalse(os.path.exists(path))
93 def testRmDirSudo(self):
94 """Test that removing dirs via sudo works."""
95 subpath = os.path.join(self.tempdir, 'a')
96 path = os.path.join(subpath, 'b', 'c', 'd', 'e')
97 self.assertTrue(osutils.SafeMakedirs(path, sudo=True))
98 self.assertRaises(OSError, osutils.RmDir, path)
99 osutils.RmDir(subpath, sudo=True)
100 self.assertRaises(cros_build_lib.RunCommandError,
101 osutils.RmDir, subpath, sudo=True)
103 def testTouchFile(self):
104 """Test that we can touch files."""
105 path = os.path.join(self.tempdir, 'touchit')
106 self.assertFalse(os.path.exists(path))
108 self.assertTrue(os.path.exists(path))
109 self.assertEqual(os.path.getsize(path), 0)
111 def testTouchFileSubDir(self):
112 """Test that we can touch files in non-existent subdirs."""
113 path = os.path.join(self.tempdir, 'a', 'b', 'c', 'touchit')
114 self.assertFalse(os.path.exists(os.path.dirname(path)))
115 osutils.Touch(path, makedirs=True)
116 self.assertTrue(os.path.exists(path))
117 self.assertEqual(os.path.getsize(path), 0)
120 class TempDirTests(cros_test_lib.TestCase):
121 """Unittests of osutils.TempDir.
123 Unlike other test classes in this file, TempDirTestCase isn't used as a base
124 class, because that is the functionality under test.
126 PREFIX = 'chromite.test.osutils.TempDirTests'
128 class HelperException(Exception):
129 """Exception for tests to raise to test exception handling."""
131 class HelperExceptionInner(Exception):
132 """Exception for tests to raise to test exception handling."""
134 def testBasicSuccessEmpty(self):
135 """Test we create and cleanup an empty tempdir."""
136 with osutils.TempDir(prefix=self.PREFIX) as td:
138 # Show the temp directory exists and is empty.
139 self.assertTrue(os.path.isdir(tempdir))
140 self.assertEquals(os.listdir(tempdir), [])
142 # Show the temp directory no longer exists.
143 self.assertNotExists(tempdir)
145 def testBasicSuccessNotEmpty(self):
146 """Test we cleanup tempdir with stuff in it."""
147 with osutils.TempDir(prefix=self.PREFIX) as td:
149 # Show the temp directory exists and is empty.
150 self.assertTrue(os.path.isdir(tempdir))
151 self.assertEquals(os.listdir(tempdir), [])
153 # Create an empty file.
154 osutils.Touch(os.path.join(tempdir, 'foo.txt'))
156 # Create nested sub directories.
157 subdir = os.path.join(tempdir, 'foo', 'bar', 'taco')
159 osutils.Touch(os.path.join(subdir, 'sauce.txt'))
161 # Show the temp directory no longer exists.
162 self.assertNotExists(tempdir)
164 def testErrorCleanup(self):
165 """Test we cleanup, even if an exception is raised."""
167 with osutils.TempDir(prefix=self.PREFIX) as td:
169 raise TempDirTests.HelperException()
170 except TempDirTests.HelperException:
173 # Show the temp directory no longer exists.
174 self.assertNotExists(tempdir)
176 def testCleanupExceptionContextException(self):
177 """Test an exception during cleanup if the context DID raise."""
179 tempdir_obj = osutils.TempDir(prefix=self.PREFIX)
181 with mock.patch.object(osutils, '_TempDirTearDown',
182 side_effect=TempDirTests.HelperException):
184 with tempdir_obj as td:
186 raise TempDirTests.HelperExceptionInner()
187 except TempDirTests.HelperExceptionInner:
190 # Show that the exception exited the context.
191 self.assertTrue(was_raised)
193 # Verify the tempdir object no longer contains a reference to the tempdir.
194 self.assertIsNone(tempdir_obj.tempdir)
196 # Cleanup the dir leaked by our mock exception.
199 def testCleanupExceptionNoContextException(self):
200 """Test an exception during cleanup if the context did NOT raise."""
202 tempdir_obj = osutils.TempDir(prefix=self.PREFIX)
204 with mock.patch.object(osutils, '_TempDirTearDown',
205 side_effect=TempDirTests.HelperException):
207 with tempdir_obj as td:
209 except TempDirTests.HelperException:
212 # Show that the exception exited the context.
213 self.assertTrue(was_raised)
215 # Verify the tempdir object no longer contains a reference to the tempdir.
216 self.assertIsNone(tempdir_obj.tempdir)
218 # Cleanup the dir leaked by our mock exception.
222 class MountTests(cros_test_lib.TestCase):
223 """Unittests for osutils mounting and umounting helpers."""
225 def testMountTmpfsDir(self):
226 """Verify mounting a tmpfs works"""
228 with osutils.TempDir(prefix='chromite.test.osutils') as tempdir:
229 st_before = os.stat(tempdir)
231 # Mount the dir and verify it worked.
232 osutils.MountTmpfsDir(tempdir)
233 st_after = os.stat(tempdir)
234 self.assertNotEqual(st_before.st_dev, st_after.st_dev)
236 # Unmount the dir and verify it worked.
237 osutils.UmountDir(tempdir)
240 # Finally make sure it's cleaned up.
241 self.assertFalse(os.path.exists(tempdir))
244 cros_build_lib.SudoRunCommand(['umount', '-lf', tempdir],
248 class IteratePathParentsTest(cros_test_lib.TestCase):
249 """Test parent directory iteration functionality."""
251 def _RunForPath(self, path, expected):
252 result_components = []
253 for p in osutils.IteratePathParents(path):
254 result_components.append(os.path.basename(p))
256 result_components.reverse()
257 if expected is not None:
258 self.assertEquals(expected, result_components)
261 """Run the test vectors."""
264 '/path/to/nowhere': ['', 'path', 'to', 'nowhere'],
265 '/path/./to': ['', 'path', 'to'],
266 '//path/to': ['', 'path', 'to'],
270 for p, e in vectors.iteritems():
271 self._RunForPath(p, e)
274 class FindInPathParentsTest(cros_test_lib.TempDirTestCase):
275 """Test FindInPathParents functionality."""
277 D = cros_test_lib.Directory
288 START_PATH = os.path.join('a', 'b', 'c')
291 cros_test_lib.CreateOnDiskHierarchy(self.tempdir, self.DIR_STRUCT)
294 """Target is found."""
295 found = osutils.FindInPathParents(
296 '.repo', os.path.join(self.tempdir, self.START_PATH))
297 self.assertEquals(found, os.path.join(self.tempdir, 'a', '.repo'))
299 def testNotFound(self):
300 """Target is not found."""
301 found = osutils.FindInPathParents(
302 'does.not/exist', os.path.join(self.tempdir, self.START_PATH))
303 self.assertEquals(found, None)
306 class SourceEnvironmentTest(cros_test_lib.TempDirTestCase):
307 """Test ostuil's environmental variable related methods."""
310 'ENV1': 'monkeys like bananas',
316 'ENV2': 'bananas are yellow',
321 declare -x ENV1="monkeys like bananas"
322 declare -x ENV2="bananas are yellow"
323 declare -x ENV3="merci"
324 declare -x ENV4="de rien"
326 declare -x ENVA=('a b c' 'd' 'e 1234 %')
330 self.env_file = os.path.join(self.tempdir, 'environment')
331 osutils.WriteFile(self.env_file, self.ENV)
333 def testWhiteList(self):
334 env_dict = osutils.SourceEnvironment(
335 self.env_file, ('ENV1', 'ENV3', 'ENV5', 'ENV6'))
336 self.assertEquals(env_dict, self.ENV_WHITELIST)
338 def testArrays(self):
339 env_dict = osutils.SourceEnvironment(self.env_file, ('ENVA',))
340 self.assertEquals(env_dict, {'ENVA': 'a b c,d,e 1234 %'})
342 env_dict = osutils.SourceEnvironment(self.env_file, ('ENVA',), ifs=' ')
343 self.assertEquals(env_dict, {'ENVA': 'a b c d e 1234 %'})
346 class DeviceInfoTests(cros_build_lib_unittest.RunCommandTestCase):
347 """Tests methods retrieving information about devices."""
350 NAME="sda" RM="0" TYPE="disk" SIZE="128G"
351 NAME="sda1" RM="1" TYPE="part" SIZE="100G"
352 NAME="sda2" RM="1" TYPE="part" SIZE="28G"
353 NAME="sdc" RM="1" TYPE="disk" SIZE="7.4G"
354 NAME="sdc1" RM="1" TYPE="part" SIZE="1G"
355 NAME="sdc2" RM="1" TYPE="part" SIZE="6.4G"
359 NAME="sdc" RM="1" TYPE="disk" SIZE="7.4G"
360 NAME="sdc1" RM="1" TYPE="part" SIZE="1G"
361 NAME="sdc2" RM="1" TYPE="part" SIZE="6.4G"
364 def testListBlockDevices(self):
365 """Tests that we can list all block devices correctly."""
366 self.rc.AddCmdResult(partial_mock.Ignore(), output=self.FULL_OUTPUT)
367 devices = osutils.ListBlockDevices()
368 self.assertEqual(devices[0].NAME, 'sda')
369 self.assertEqual(devices[0].RM, '0')
370 self.assertEqual(devices[0].TYPE, 'disk')
371 self.assertEqual(devices[0].SIZE, '128G')
372 self.assertEqual(devices[3].NAME, 'sdc')
373 self.assertEqual(devices[3].RM, '1')
374 self.assertEqual(devices[3].TYPE, 'disk')
375 self.assertEqual(devices[3].SIZE, '7.4G')
377 def testGetDeviceSize(self):
378 """Tests that we can get the size of a device."""
379 self.rc.AddCmdResult(partial_mock.Ignore(), output=self.PARTIAL_OUTPUT)
380 self.assertEqual(osutils.GetDeviceSize('/dev/sdc'), '7.4G')
383 class MountImagePartitionTests(cros_test_lib.MockTestCase):
384 """Tests for MountImagePartition."""
388 3: cros_build_lib.PartitionInfo(3, 1, 3, 2, 'fs', 'Label', 'flag')
391 def testWithCacheOkay(self):
392 mount_dir = self.PatchObject(osutils, 'MountDir')
393 osutils.MountImagePartition('image_file', 3, 'destination',
395 opts = ['loop', 'offset=1', 'sizelimit=2', 'ro']
396 mount_dir.assert_called_with('image_file', 'destination', makedirs=True,
397 skip_mtab=False, sudo=True, mount_opts=opts)
399 def testWithCacheFail(self):
400 self.assertRaises(ValueError, osutils.MountImagePartition,
401 'image_file', 404, 'destination', self._gpt_table)
403 def testWithoutCache(self):
404 self.PatchObject(cros_build_lib, 'GetImageDiskPartitionInfo',
405 return_value=self._gpt_table)
406 mount_dir = self.PatchObject(osutils, 'MountDir')
407 osutils.MountImagePartition('image_file', 3, 'destination')
408 opts = ['loop', 'offset=1', 'sizelimit=2', 'ro']
409 mount_dir.assert_called_with(
410 'image_file', 'destination', makedirs=True, skip_mtab=False,
411 sudo=True, mount_opts=opts
415 class ChdirTests(cros_test_lib.MockTempDirTestCase):
416 """Tests for ChdirContext."""
419 current_dir = os.getcwd()
420 self.assertNotEqual(self.tempdir, os.getcwd())
421 with osutils.ChdirContext(self.tempdir):
422 self.assertEqual(self.tempdir, os.getcwd())
423 self.assertEqual(current_dir, os.getcwd())
426 class MountImageTests(cros_test_lib.MockTempDirTestCase):
427 """Tests for MountImageContext."""
429 def _testWithParts(self, parts, selectors, check_links=True):
430 self.PatchObject(cros_build_lib, 'GetImageDiskPartitionInfo',
432 mount_dir = self.PatchObject(osutils, 'MountDir')
433 unmount_dir = self.PatchObject(osutils, 'UmountDir')
434 rmdir = self.PatchObject(osutils, 'RmDir')
435 with osutils.MountImageContext('_ignored', self.tempdir, selectors):
436 for _, part in parts.items():
437 mount_point = os.path.join(self.tempdir, 'dir-%d' % part.number)
438 mount_dir.assert_any_call(
439 '_ignored', mount_point, makedirs=True, skip_mtab=False,
441 mount_opts=['loop', 'offset=0', 'sizelimit=0', 'ro']
444 link = os.path.join(self.tempdir, 'dir-%s' % part.name)
445 self.assertTrue(os.path.islink(link))
446 self.assertEqual(os.path.basename(mount_point),
448 for _, part in parts.items():
449 mount_point = os.path.join(self.tempdir, 'dir-%d' % part.number)
450 unmount_dir.assert_any_call(mount_point, cleanup=False)
451 rmdir.assert_any_call(mount_point, sudo=True)
453 link = os.path.join(self.tempdir, 'dir-%s' % part.name)
454 self.assertFalse(os.path.lexists(link))
456 def testWithPartitionNumber(self):
458 1: cros_build_lib.PartitionInfo(1, 0, 0, 0, '', 'my-stateful', ''),
459 3: cros_build_lib.PartitionInfo(3, 0, 0, 0, '', 'my-root-a', ''),
461 self._testWithParts(parts, [1, 3])
463 def testWithPartitionLabel(self):
465 42: cros_build_lib.PartitionInfo(42, 0, 0, 0, '', 'label', ''),
467 self._testWithParts(parts, ['label'])
469 def testInvalidPartSelector(self):
471 42: cros_build_lib.PartitionInfo(42, 0, 0, 0, '', 'label', ''),
473 self.assertRaises(ValueError, self._testWithParts, parts, ['label404'])
474 self.assertRaises(ValueError, self._testWithParts, parts, [404])
476 def testFailOnExistingMount(self):
478 42: cros_build_lib.PartitionInfo(42, 0, 0, 0, '', 'label', ''),
480 os.makedirs(os.path.join(self.tempdir, 'dir-42'))
481 self.assertRaises(ValueError, self._testWithParts, parts, [42])
483 def testExistingLinkNotCleanedUp(self):
485 42: cros_build_lib.PartitionInfo(42, 0, 0, 0, '', 'label', ''),
487 symlink = os.path.join(self.tempdir, 'dir-label')
488 os.symlink('/tmp', symlink)
489 self.assertEqual('/tmp', os.readlink(symlink))
490 self._testWithParts(parts, [42], check_links=False)
491 self.assertEqual('/tmp', os.readlink(symlink))
494 class IterateMountPointsTests(cros_test_lib.TempDirTestCase):
495 """Test for IterateMountPoints function."""
498 self.proc_mount = os.path.join(self.tempdir, 'mounts')
501 r'''/dev/loop0 /mnt/dir_8 ext4 rw,relatime,data=ordered 0 0
502 /dev/loop2 /mnt/dir_1 ext4 rw,relatime,data=ordered 0 0
503 /dev/loop1 /mnt/dir_12 vfat rw 0 0
504 /dev/loop4 /mnt/dir_3 ext4 ro,relatime 0 0
505 weird\040system /mnt/weirdo unknown ro 0 0
506 tmpfs /mnt/spaced\040dir tmpfs ro 0 0
507 tmpfs /mnt/\134 tmpfs ro 0 0
512 r = list(osutils.IterateMountPoints(self.proc_mount))
513 self.assertEqual(len(r), 7)
514 self.assertEqual(r[0].source, '/dev/loop0')
515 self.assertEqual(r[1].destination, '/mnt/dir_1')
516 self.assertEqual(r[2].filesystem, 'vfat')
517 self.assertEqual(r[3].options, 'ro,relatime')
519 def testEscape(self):
520 r = list(osutils.IterateMountPoints(self.proc_mount))
521 self.assertEqual(r[4].source, 'weird system')
522 self.assertEqual(r[5].destination, '/mnt/spaced dir')
523 self.assertEqual(r[6].destination, '/mnt/\\')
526 class ResolveSymlinkTest(cros_test_lib.TestCase):
527 """Tests for ResolveSymlink."""
529 def testRelativeLink(self):
530 os.symlink('target', 'link')
531 self.assertEqual(osutils.ResolveSymlink('link'), 'target')
534 def testAbsoluteLink(self):
535 os.symlink('/target', 'link')
536 self.assertEqual(osutils.ResolveSymlink('link'), '/target')
537 self.assertEqual(osutils.ResolveSymlink('link', '/root'), '/root/target')
540 def testRecursion(self):
541 os.symlink('target', 'link1')
542 os.symlink('link1', 'link2')
543 self.assertEqual(osutils.ResolveSymlink('link2'), 'target')
547 def testRecursionWithAbsoluteLink(self):
548 os.symlink('target', 'link1')
549 os.symlink('/link1', 'link2')
550 self.assertEqual(osutils.ResolveSymlink('link2', '.'), './target')
555 if __name__ == '__main__':