Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / third_party / chromite / lib / osutils_unittest.py
1 #!/usr/bin/python
2 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 """Unittests for the osutils.py module (imagine that!)."""
7
8 from __future__ import print_function
9
10 import os
11 import sys
12 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(
13     os.path.abspath(__file__)))))
14
15 from chromite.lib import cros_build_lib
16 from chromite.lib import cros_build_lib_unittest
17 from chromite.lib import cros_test_lib
18 from chromite.lib import osutils
19 from chromite.lib import partial_mock
20
21 # TODO(build): Finish test wrapper (http://crosbug.com/37517).
22 # Until then, this has to be after the chromite imports.
23 import mock
24
25 class TestOsutils(cros_test_lib.TempDirTestCase):
26   """General unittests for the osutils module."""
27
28   def testReadWriteFile(self):
29     """Verify we can write data to a file, and then read it back."""
30     filename = os.path.join(self.tempdir, 'foo')
31     data = 'alsdkfjasldkfjaskdlfjasdf'
32     self.assertEqual(osutils.WriteFile(filename, data), None)
33     self.assertEqual(osutils.ReadFile(filename), data)
34
35   def testSafeUnlink(self):
36     """Test unlinking files work (existing or not)."""
37     def f(dirname, sudo=False):
38       dirname = os.path.join(self.tempdir, dirname)
39       path = os.path.join(dirname, 'foon')
40       os.makedirs(dirname)
41       open(path, 'w').close()
42       self.assertTrue(os.path.exists(path))
43       if sudo:
44         cros_build_lib.SudoRunCommand(
45             ['chown', 'root:root', '-R', '--', dirname], print_cmd=False)
46         self.assertRaises(EnvironmentError, os.unlink, path)
47       self.assertTrue(osutils.SafeUnlink(path, sudo=sudo))
48       self.assertFalse(os.path.exists(path))
49       self.assertFalse(osutils.SafeUnlink(path))
50       self.assertFalse(os.path.exists(path))
51
52     f("nonsudo", False)
53     f("sudo", True)
54
55   def testSafeMakedirs(self):
56     """Test creating directory trees work (existing or not)."""
57     path = os.path.join(self.tempdir, 'a', 'b', 'c', 'd', 'e')
58     self.assertTrue(osutils.SafeMakedirs(path))
59     self.assertTrue(os.path.exists(path))
60     self.assertFalse(osutils.SafeMakedirs(path))
61     self.assertTrue(os.path.exists(path))
62
63   def testSafeMakedirs_error(self):
64     """Check error paths."""
65     self.assertRaises(OSError, osutils.SafeMakedirs, '/foo/bar/cow/moo/wee')
66     self.assertRaises(OSError, osutils.SafeMakedirs, '')
67
68   def testSafeMakedirsSudo(self):
69     """Test creating directory trees work as root (existing or not)."""
70     path = os.path.join(self.tempdir, 'a', 'b', 'c', 'd', 'e')
71     self.assertTrue(osutils.SafeMakedirs(path, sudo=True))
72     self.assertTrue(os.path.exists(path))
73     self.assertFalse(osutils.SafeMakedirs(path, sudo=True))
74     self.assertTrue(os.path.exists(path))
75     self.assertEqual(os.stat(path).st_uid, 0)
76     # Have to manually clean up as a non-root `rm -rf` will fail.
77     cros_build_lib.SudoRunCommand(['rm', '-rf', self.tempdir], print_cmd=False)
78
79   def testRmDir(self):
80     """Test that removing dirs work."""
81     path = os.path.join(self.tempdir, 'a', 'b', 'c', 'd', 'e')
82
83     self.assertRaises(EnvironmentError, osutils.RmDir, path)
84     osutils.SafeMakedirs(path)
85     osutils.RmDir(path)
86     osutils.RmDir(path, ignore_missing=True)
87     self.assertRaises(EnvironmentError, osutils.RmDir, path)
88
89     osutils.SafeMakedirs(path)
90     osutils.RmDir(path)
91     self.assertFalse(os.path.exists(path))
92
93   def testRmDirSudo(self):
94     """Test that removing dirs via sudo works."""
95     subpath = os.path.join(self.tempdir, 'a')
96     path = os.path.join(subpath, 'b', 'c', 'd', 'e')
97     self.assertTrue(osutils.SafeMakedirs(path, sudo=True))
98     self.assertRaises(OSError, osutils.RmDir, path)
99     osutils.RmDir(subpath, sudo=True)
100     self.assertRaises(cros_build_lib.RunCommandError,
101                       osutils.RmDir, subpath, sudo=True)
102
103   def testTouchFile(self):
104     """Test that we can touch files."""
105     path = os.path.join(self.tempdir, 'touchit')
106     self.assertFalse(os.path.exists(path))
107     osutils.Touch(path)
108     self.assertTrue(os.path.exists(path))
109     self.assertEqual(os.path.getsize(path), 0)
110
111   def testTouchFileSubDir(self):
112     """Test that we can touch files in non-existent subdirs."""
113     path = os.path.join(self.tempdir, 'a', 'b', 'c', 'touchit')
114     self.assertFalse(os.path.exists(os.path.dirname(path)))
115     osutils.Touch(path, makedirs=True)
116     self.assertTrue(os.path.exists(path))
117     self.assertEqual(os.path.getsize(path), 0)
118
119
120 class TempDirTests(cros_test_lib.TestCase):
121   """Unittests of osutils.TempDir.
122
123   Unlike other test classes in this file, TempDirTestCase isn't used as a base
124   class, because that is the functionality under test.
125   """
126   PREFIX = 'chromite.test.osutils.TempDirTests'
127
128   class HelperException(Exception):
129     """Exception for tests to raise to test exception handling."""
130
131   class HelperExceptionInner(Exception):
132     """Exception for tests to raise to test exception handling."""
133
134   def testBasicSuccessEmpty(self):
135     """Test we create and cleanup an empty tempdir."""
136     with osutils.TempDir(prefix=self.PREFIX) as td:
137       tempdir = td
138       # Show the temp directory exists and is empty.
139       self.assertTrue(os.path.isdir(tempdir))
140       self.assertEquals(os.listdir(tempdir), [])
141
142     # Show the temp directory no longer exists.
143     self.assertNotExists(tempdir)
144
145   def testBasicSuccessNotEmpty(self):
146     """Test we cleanup tempdir with stuff in it."""
147     with osutils.TempDir(prefix=self.PREFIX) as td:
148       tempdir = td
149       # Show the temp directory exists and is empty.
150       self.assertTrue(os.path.isdir(tempdir))
151       self.assertEquals(os.listdir(tempdir), [])
152
153       # Create an empty file.
154       osutils.Touch(os.path.join(tempdir, 'foo.txt'))
155
156       # Create nested sub directories.
157       subdir = os.path.join(tempdir, 'foo', 'bar', 'taco')
158       os.makedirs(subdir)
159       osutils.Touch(os.path.join(subdir, 'sauce.txt'))
160
161     # Show the temp directory no longer exists.
162     self.assertNotExists(tempdir)
163
164   def testErrorCleanup(self):
165     """Test we cleanup, even if an exception is raised."""
166     try:
167       with osutils.TempDir(prefix=self.PREFIX) as td:
168         tempdir = td
169         raise TempDirTests.HelperException()
170     except TempDirTests.HelperException:
171       pass
172
173     # Show the temp directory no longer exists.
174     self.assertNotExists(tempdir)
175
176   def testCleanupExceptionContextException(self):
177     """Test an exception during cleanup if the context DID raise."""
178     was_raised = False
179     tempdir_obj = osutils.TempDir(prefix=self.PREFIX)
180
181     with mock.patch.object(osutils, '_TempDirTearDown',
182                            side_effect=TempDirTests.HelperException):
183       try:
184         with tempdir_obj as td:
185           tempdir = td
186           raise TempDirTests.HelperExceptionInner()
187       except TempDirTests.HelperExceptionInner:
188         was_raised = True
189
190     # Show that the exception exited the context.
191     self.assertTrue(was_raised)
192
193     # Verify the tempdir object no longer contains a reference to the tempdir.
194     self.assertIsNone(tempdir_obj.tempdir)
195
196     # Cleanup the dir leaked by our mock exception.
197     os.rmdir(tempdir)
198
199   def testCleanupExceptionNoContextException(self):
200     """Test an exception during cleanup if the context did NOT raise."""
201     was_raised = False
202     tempdir_obj = osutils.TempDir(prefix=self.PREFIX)
203
204     with mock.patch.object(osutils, '_TempDirTearDown',
205                            side_effect=TempDirTests.HelperException):
206       try:
207         with tempdir_obj as td:
208           tempdir = td
209       except TempDirTests.HelperException:
210         was_raised = True
211
212     # Show that the exception exited the context.
213     self.assertTrue(was_raised)
214
215     # Verify the tempdir object no longer contains a reference to the tempdir.
216     self.assertIsNone(tempdir_obj.tempdir)
217
218     # Cleanup the dir leaked by our mock exception.
219     os.rmdir(tempdir)
220
221
222 class MountTests(cros_test_lib.TestCase):
223   """Unittests for osutils mounting and umounting helpers."""
224
225   def testMountTmpfsDir(self):
226     """Verify mounting a tmpfs works"""
227     cleaned = False
228     with osutils.TempDir(prefix='chromite.test.osutils') as tempdir:
229       st_before = os.stat(tempdir)
230       try:
231         # Mount the dir and verify it worked.
232         osutils.MountTmpfsDir(tempdir)
233         st_after = os.stat(tempdir)
234         self.assertNotEqual(st_before.st_dev, st_after.st_dev)
235
236         # Unmount the dir and verify it worked.
237         osutils.UmountDir(tempdir)
238         cleaned = True
239
240         # Finally make sure it's cleaned up.
241         self.assertFalse(os.path.exists(tempdir))
242       finally:
243         if not cleaned:
244           cros_build_lib.SudoRunCommand(['umount', '-lf', tempdir],
245                                         error_code_ok=True)
246
247
248 class IteratePathParentsTest(cros_test_lib.TestCase):
249   """Test parent directory iteration functionality."""
250
251   def _RunForPath(self, path, expected):
252     result_components = []
253     for p in osutils.IteratePathParents(path):
254       result_components.append(os.path.basename(p))
255
256     result_components.reverse()
257     if expected is not None:
258       self.assertEquals(expected, result_components)
259
260   def testIt(self):
261     """Run the test vectors."""
262     vectors = {
263         '/': [''],
264         '/path/to/nowhere': ['', 'path', 'to', 'nowhere'],
265         '/path/./to': ['', 'path', 'to'],
266         '//path/to': ['', 'path', 'to'],
267         'path/to': None,
268         '': None,
269     }
270     for p, e in vectors.iteritems():
271       self._RunForPath(p, e)
272
273
274 class FindInPathParentsTest(cros_test_lib.TempDirTestCase):
275   """Test FindInPathParents functionality."""
276
277   D = cros_test_lib.Directory
278
279   DIR_STRUCT = [
280     D('a', [
281       D('.repo', []),
282       D('b', [
283         D('c', [])
284       ])
285     ])
286   ]
287
288   START_PATH = os.path.join('a', 'b', 'c')
289
290   def setUp(self):
291     cros_test_lib.CreateOnDiskHierarchy(self.tempdir, self.DIR_STRUCT)
292
293   def testFound(self):
294     """Target is found."""
295     found = osutils.FindInPathParents(
296       '.repo', os.path.join(self.tempdir, self.START_PATH))
297     self.assertEquals(found, os.path.join(self.tempdir, 'a', '.repo'))
298
299   def testNotFound(self):
300     """Target is not found."""
301     found = osutils.FindInPathParents(
302       'does.not/exist', os.path.join(self.tempdir, self.START_PATH))
303     self.assertEquals(found, None)
304
305
306 class SourceEnvironmentTest(cros_test_lib.TempDirTestCase):
307   """Test ostuil's environmental variable related methods."""
308
309   ENV_WHITELIST = {
310       'ENV1': 'monkeys like bananas',
311       'ENV3': 'merci',
312       'ENV6': '',
313   }
314
315   ENV_OTHER = {
316       'ENV2': 'bananas are yellow',
317       'ENV4': 'de rien',
318   }
319
320   ENV = """
321 declare -x ENV1="monkeys like bananas"
322 declare -x ENV2="bananas are yellow"
323 declare -x ENV3="merci"
324 declare -x ENV4="de rien"
325 declare -x ENV6=''
326 declare -x ENVA=('a b c' 'd' 'e 1234 %')
327 """
328
329   def setUp(self):
330     self.env_file = os.path.join(self.tempdir, 'environment')
331     osutils.WriteFile(self.env_file, self.ENV)
332
333   def testWhiteList(self):
334     env_dict = osutils.SourceEnvironment(
335         self.env_file, ('ENV1', 'ENV3', 'ENV5', 'ENV6'))
336     self.assertEquals(env_dict, self.ENV_WHITELIST)
337
338   def testArrays(self):
339     env_dict = osutils.SourceEnvironment(self.env_file, ('ENVA',))
340     self.assertEquals(env_dict, {'ENVA': 'a b c,d,e 1234 %'})
341
342     env_dict = osutils.SourceEnvironment(self.env_file, ('ENVA',), ifs=' ')
343     self.assertEquals(env_dict, {'ENVA': 'a b c d e 1234 %'})
344
345
346 class DeviceInfoTests(cros_build_lib_unittest.RunCommandTestCase):
347   """Tests methods retrieving information about devices."""
348
349   FULL_OUTPUT = """
350 NAME="sda" RM="0" TYPE="disk" SIZE="128G"
351 NAME="sda1" RM="1" TYPE="part" SIZE="100G"
352 NAME="sda2" RM="1" TYPE="part" SIZE="28G"
353 NAME="sdc" RM="1" TYPE="disk" SIZE="7.4G"
354 NAME="sdc1" RM="1" TYPE="part" SIZE="1G"
355 NAME="sdc2" RM="1" TYPE="part" SIZE="6.4G"
356 """
357
358   PARTIAL_OUTPUT = """
359 NAME="sdc" RM="1" TYPE="disk" SIZE="7.4G"
360 NAME="sdc1" RM="1" TYPE="part" SIZE="1G"
361 NAME="sdc2" RM="1" TYPE="part" SIZE="6.4G"
362 """
363
364   def testListBlockDevices(self):
365     """Tests that we can list all block devices correctly."""
366     self.rc.AddCmdResult(partial_mock.Ignore(), output=self.FULL_OUTPUT)
367     devices = osutils.ListBlockDevices()
368     self.assertEqual(devices[0].NAME, 'sda')
369     self.assertEqual(devices[0].RM, '0')
370     self.assertEqual(devices[0].TYPE, 'disk')
371     self.assertEqual(devices[0].SIZE, '128G')
372     self.assertEqual(devices[3].NAME, 'sdc')
373     self.assertEqual(devices[3].RM, '1')
374     self.assertEqual(devices[3].TYPE, 'disk')
375     self.assertEqual(devices[3].SIZE, '7.4G')
376
377   def testGetDeviceSize(self):
378     """Tests that we can get the size of a device."""
379     self.rc.AddCmdResult(partial_mock.Ignore(), output=self.PARTIAL_OUTPUT)
380     self.assertEqual(osutils.GetDeviceSize('/dev/sdc'), '7.4G')
381
382
383 class MountImagePartitionTests(cros_test_lib.MockTestCase):
384   """Tests for MountImagePartition."""
385
386   def setUp(self):
387     self._gpt_table = {
388         3: cros_build_lib.PartitionInfo(3, 1, 3, 2, 'fs', 'Label', 'flag')
389     }
390
391   def testWithCacheOkay(self):
392     mount_dir = self.PatchObject(osutils, 'MountDir')
393     osutils.MountImagePartition('image_file', 3, 'destination',
394                                 self._gpt_table)
395     opts = ['loop', 'offset=1', 'sizelimit=2', 'ro']
396     mount_dir.assert_called_with('image_file', 'destination', makedirs=True,
397                                  skip_mtab=False, sudo=True, mount_opts=opts)
398
399   def testWithCacheFail(self):
400     self.assertRaises(ValueError, osutils.MountImagePartition,
401                       'image_file', 404, 'destination', self._gpt_table)
402
403   def testWithoutCache(self):
404     self.PatchObject(cros_build_lib, 'GetImageDiskPartitionInfo',
405                      return_value=self._gpt_table)
406     mount_dir = self.PatchObject(osutils, 'MountDir')
407     osutils.MountImagePartition('image_file', 3, 'destination')
408     opts = ['loop', 'offset=1', 'sizelimit=2', 'ro']
409     mount_dir.assert_called_with(
410         'image_file', 'destination', makedirs=True, skip_mtab=False,
411         sudo=True, mount_opts=opts
412     )
413
414
415 class ChdirTests(cros_test_lib.MockTempDirTestCase):
416   """Tests for ChdirContext."""
417
418   def testChdir(self):
419     current_dir = os.getcwd()
420     self.assertNotEqual(self.tempdir, os.getcwd())
421     with osutils.ChdirContext(self.tempdir):
422       self.assertEqual(self.tempdir, os.getcwd())
423     self.assertEqual(current_dir, os.getcwd())
424
425
426 class MountImageTests(cros_test_lib.MockTempDirTestCase):
427   """Tests for MountImageContext."""
428
429   def _testWithParts(self, parts, selectors, check_links=True):
430     self.PatchObject(cros_build_lib, 'GetImageDiskPartitionInfo',
431                      return_value=parts)
432     mount_dir = self.PatchObject(osutils, 'MountDir')
433     unmount_dir = self.PatchObject(osutils, 'UmountDir')
434     rmdir = self.PatchObject(osutils, 'RmDir')
435     with osutils.MountImageContext('_ignored', self.tempdir, selectors):
436       for _, part in parts.items():
437         mount_point = os.path.join(self.tempdir, 'dir-%d' % part.number)
438         mount_dir.assert_any_call(
439             '_ignored', mount_point, makedirs=True, skip_mtab=False,
440             sudo=True,
441             mount_opts=['loop', 'offset=0', 'sizelimit=0', 'ro']
442         )
443         if check_links:
444           link = os.path.join(self.tempdir, 'dir-%s' % part.name)
445           self.assertTrue(os.path.islink(link))
446           self.assertEqual(os.path.basename(mount_point),
447                            os.readlink(link))
448     for _, part in parts.items():
449       mount_point = os.path.join(self.tempdir, 'dir-%d' % part.number)
450       unmount_dir.assert_any_call(mount_point, cleanup=False)
451       rmdir.assert_any_call(mount_point, sudo=True)
452       if check_links:
453         link = os.path.join(self.tempdir, 'dir-%s' % part.name)
454         self.assertFalse(os.path.lexists(link))
455
456   def testWithPartitionNumber(self):
457     parts = {
458         1: cros_build_lib.PartitionInfo(1, 0, 0, 0, '', 'my-stateful', ''),
459         3: cros_build_lib.PartitionInfo(3, 0, 0, 0, '', 'my-root-a', ''),
460     }
461     self._testWithParts(parts, [1, 3])
462
463   def testWithPartitionLabel(self):
464     parts = {
465         42: cros_build_lib.PartitionInfo(42, 0, 0, 0, '', 'label', ''),
466     }
467     self._testWithParts(parts, ['label'])
468
469   def testInvalidPartSelector(self):
470     parts = {
471         42: cros_build_lib.PartitionInfo(42, 0, 0, 0, '', 'label', ''),
472     }
473     self.assertRaises(ValueError, self._testWithParts, parts, ['label404'])
474     self.assertRaises(ValueError, self._testWithParts, parts, [404])
475
476   def testFailOnExistingMount(self):
477     parts = {
478         42: cros_build_lib.PartitionInfo(42, 0, 0, 0, '', 'label', ''),
479     }
480     os.makedirs(os.path.join(self.tempdir, 'dir-42'))
481     self.assertRaises(ValueError, self._testWithParts, parts, [42])
482
483   def testExistingLinkNotCleanedUp(self):
484     parts = {
485         42: cros_build_lib.PartitionInfo(42, 0, 0, 0, '', 'label', ''),
486     }
487     symlink = os.path.join(self.tempdir, 'dir-label')
488     os.symlink('/tmp', symlink)
489     self.assertEqual('/tmp', os.readlink(symlink))
490     self._testWithParts(parts, [42], check_links=False)
491     self.assertEqual('/tmp', os.readlink(symlink))
492
493
494 class IterateMountPointsTests(cros_test_lib.TempDirTestCase):
495   """Test for IterateMountPoints function."""
496
497   def setUp(self):
498     self.proc_mount = os.path.join(self.tempdir, 'mounts')
499     osutils.WriteFile(
500         self.proc_mount,
501         r'''/dev/loop0 /mnt/dir_8 ext4 rw,relatime,data=ordered 0 0
502 /dev/loop2 /mnt/dir_1 ext4 rw,relatime,data=ordered 0 0
503 /dev/loop1 /mnt/dir_12 vfat rw 0 0
504 /dev/loop4 /mnt/dir_3 ext4 ro,relatime 0 0
505 weird\040system /mnt/weirdo unknown ro 0 0
506 tmpfs /mnt/spaced\040dir tmpfs ro 0 0
507 tmpfs /mnt/\134 tmpfs ro 0 0
508 '''
509     )
510
511   def testOkay(self):
512     r = list(osutils.IterateMountPoints(self.proc_mount))
513     self.assertEqual(len(r), 7)
514     self.assertEqual(r[0].source, '/dev/loop0')
515     self.assertEqual(r[1].destination, '/mnt/dir_1')
516     self.assertEqual(r[2].filesystem, 'vfat')
517     self.assertEqual(r[3].options, 'ro,relatime')
518
519   def testEscape(self):
520     r = list(osutils.IterateMountPoints(self.proc_mount))
521     self.assertEqual(r[4].source, 'weird system')
522     self.assertEqual(r[5].destination, '/mnt/spaced dir')
523     self.assertEqual(r[6].destination, '/mnt/\\')
524
525
526 class ResolveSymlinkTest(cros_test_lib.TestCase):
527   """Tests for ResolveSymlink."""
528
529   def testRelativeLink(self):
530     os.symlink('target', 'link')
531     self.assertEqual(osutils.ResolveSymlink('link'), 'target')
532     os.unlink('link')
533
534   def testAbsoluteLink(self):
535     os.symlink('/target', 'link')
536     self.assertEqual(osutils.ResolveSymlink('link'), '/target')
537     self.assertEqual(osutils.ResolveSymlink('link', '/root'), '/root/target')
538     os.unlink('link')
539
540   def testRecursion(self):
541     os.symlink('target', 'link1')
542     os.symlink('link1', 'link2')
543     self.assertEqual(osutils.ResolveSymlink('link2'), 'target')
544     os.unlink('link2')
545     os.unlink('link1')
546
547   def testRecursionWithAbsoluteLink(self):
548     os.symlink('target', 'link1')
549     os.symlink('/link1', 'link2')
550     self.assertEqual(osutils.ResolveSymlink('link2', '.'), './target')
551     os.unlink('link2')
552     os.unlink('link1')
553
554
555 if __name__ == '__main__':
556   cros_test_lib.main()