Upstream version 9.38.198.0
[platform/framework/web/crosswalk.git] / src / third_party / chromite / lib / cros_build_lib_unittest.py
1 #!/usr/bin/python
2 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 """Test the cros_build_lib module."""
7
8 import os
9 import sys
10 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(
11     os.path.abspath(__file__)))))
12
13 import contextlib
14 import datetime
15 import difflib
16 import errno
17 import functools
18 import itertools
19 import logging
20 import mox
21 import signal
22 import StringIO
23 import time
24 import __builtin__
25
26 from chromite.cbuildbot import constants
27 from chromite.lib import cros_build_lib
28 from chromite.lib import git
29 from chromite.lib import cros_test_lib
30 from chromite.lib import osutils
31 from chromite.lib import partial_mock
32 from chromite.lib import retry_util
33 from chromite.lib import signals as cros_signals
34
35 # TODO(build): Finish test wrapper (http://crosbug.com/37517).
36 # Until then, this has to be after the chromite imports.
37 import mock
38
39 # pylint: disable=W0212,R0904
40
41
42 class CmdToStrTest(cros_test_lib.TestCase):
43   """Test the CmdToStr function."""
44
45   def setUp(self):
46     self.differ = difflib.Differ()
47
48   def _assertEqual(self, func, test_input, test_output, result):
49     """Like assertEqual but with built in diff support."""
50     diff = '\n'.join(list(self.differ.compare([test_output], [result])))
51     msg = ('Expected %s to translate %r to %r, but got %r\n%s' %
52            (func, test_input, test_output, result, diff))
53     self.assertEqual(test_output, result, msg)
54
55   def _testData(self, functor, tests, check_type=True):
56     """Process a dict of test data."""
57     for test_output, test_input in tests.iteritems():
58       result = functor(test_input)
59       self._assertEqual(functor.__name__, test_input, test_output, result)
60
61       if check_type:
62         # Also make sure the result is a string, otherwise the %r output will
63         # include a "u" prefix and that is not good for logging.
64         self.assertEqual(type(test_output), str)
65
66   def testShellQuote(self):
67     """Basic ShellQuote tests."""
68     # Dict of expected output strings to input lists.
69     tests_quote = {
70         "''": '',
71         'a': unicode('a'),
72         "'a b c'": unicode('a b c'),
73         "'a\tb'": 'a\tb',
74         "'/a$file'": '/a$file',
75         "'/a#file'": '/a#file',
76         """'b"c'""": 'b"c',
77         "'a@()b'": 'a@()b',
78         'j%k': 'j%k',
79         r'''"s'a\$va\\rs"''': r"s'a$va\rs",
80         r'''"\\'\\\""''': r'''\'\"''',
81         r'''"'\\\$"''': r"""'\$""",
82     }
83
84     # Expected input output specific to ShellUnquote. This string cannot be
85     # produced by ShellQuote but is still a valid bash escaped string.
86     tests_unquote = {
87         r'''\$''': r'''"\\$"''',
88     }
89
90     def aux(s):
91       return cros_build_lib.ShellUnquote(cros_build_lib.ShellQuote(s))
92
93     self._testData(cros_build_lib.ShellQuote, tests_quote)
94     self._testData(cros_build_lib.ShellUnquote, tests_unquote)
95
96     # Test that the operations are reversible.
97     self._testData(aux, {k: k for k in tests_quote.values()}, False)
98     self._testData(aux, {k: k for k in tests_quote.keys()}, False)
99
100   def testCmdToStr(self):
101     # Dict of expected output strings to input lists.
102     tests = {
103         r"a b": ['a', 'b'],
104         r"'a b' c": ['a b', 'c'],
105         r'''a "b'c"''': ['a', "b'c"],
106         r'''a "/'\$b" 'a b c' "xy'z"''':
107             [unicode('a'), "/'$b", 'a b c', "xy'z"],
108         '': [],
109     }
110     self._testData(cros_build_lib.CmdToStr, tests)
111
112
113 class RunCommandMock(partial_mock.PartialCmdMock):
114   """Provides a context where all RunCommand invocations low-level mocked."""
115
116   TARGET = 'chromite.lib.cros_build_lib'
117   ATTRS = ('RunCommand',)
118   DEFAULT_ATTR = 'RunCommand'
119
120   def RunCommand(self, cmd, *args, **kwargs):
121     result = self._results['RunCommand'].LookupResult(
122         (cmd,), hook_args=(cmd,) + args, hook_kwargs=kwargs)
123
124     popen_mock = PopenMock()
125     popen_mock.AddCmdResult(partial_mock.Ignore(), result.returncode,
126                             result.output, result.error)
127     with popen_mock:
128       return self.backup['RunCommand'](cmd, *args, **kwargs)
129
130
131 class RunCommandTestCase(cros_test_lib.MockTestCase):
132   """MockTestCase that mocks out RunCommand by default."""
133
134   def setUp(self):
135     self.rc = self.StartPatcher(RunCommandMock())
136     self.rc.SetDefaultCmdResult()
137     self.assertCommandCalled = self.rc.assertCommandCalled
138     self.assertCommandContains = self.rc.assertCommandContains
139
140
141 class RunCommandTempDirTestCase(RunCommandTestCase,
142                                 cros_test_lib.TempDirTestCase):
143   """Convenience class mixing TempDirTestCase and RunCommandTestCase"""
144
145
146 class PopenMock(partial_mock.PartialCmdMock):
147   """Provides a context where all _Popen instances are low-level mocked."""
148
149   TARGET = 'chromite.lib.cros_build_lib._Popen'
150   ATTRS = ('__init__',)
151   DEFAULT_ATTR = '__init__'
152
153   def __init__(self):
154     partial_mock.PartialCmdMock.__init__(self, create_tempdir=True)
155
156   def _target__init__(self, inst, cmd, *args, **kwargs):
157     result = self._results['__init__'].LookupResult(
158         (cmd,), hook_args=(inst, cmd,) + args, hook_kwargs=kwargs)
159
160     script = os.path.join(self.tempdir, 'mock_cmd.sh')
161     stdout = os.path.join(self.tempdir, 'output')
162     stderr = os.path.join(self.tempdir, 'error')
163     osutils.WriteFile(stdout, result.output)
164     osutils.WriteFile(stderr, result.error)
165     osutils.WriteFile(
166         script,
167         ['#!/bin/bash\n', 'cat %s\n' % stdout, 'cat %s >&2\n' % stderr,
168          'exit %s' % result.returncode])
169     os.chmod(script, 0o700)
170     kwargs['cwd'] = self.tempdir
171     self.backup['__init__'](inst, [script, '--'] + cmd, *args, **kwargs)
172
173
174 class TestRunCommandNoMock(cros_test_lib.TestCase):
175   """Class that tests RunCommand by not mocking subprocess.Popen"""
176
177   def testErrorCodeNotRaisesError(self):
178     """Don't raise exception when command returns non-zero exit code."""
179     result = cros_build_lib.RunCommand(['ls', '/does/not/exist'],
180                                         error_code_ok=True)
181     self.assertTrue(result.returncode != 0)
182
183   def testReturnCodeNotZeroErrorOkNotRaisesError(self):
184     """Raise error when proc.communicate() returns non-zero."""
185     self.assertRaises(cros_build_lib.RunCommandError, cros_build_lib.RunCommand,
186                       ['/does/not/exist'])
187
188
189 def _ForceLoggingLevel(functor):
190   def inner(*args, **kwargs):
191     current = cros_build_lib.logger.getEffectiveLevel()
192     try:
193       cros_build_lib.logger.setLevel(logging.INFO)
194       return functor(*args, **kwargs)
195     finally:
196       cros_build_lib.logger.setLevel(current)
197   return inner
198
199
200 class TestRunCommand(cros_test_lib.MoxTestCase):
201   """Tests of RunCommand functionality."""
202
203   def setUp(self):
204     # Get the original value for SIGINT so our signal() mock can return the
205     # correct thing.
206     self._old_sigint = signal.getsignal(signal.SIGINT)
207
208     self.mox.StubOutWithMock(cros_build_lib, '_Popen', use_mock_anything=True)
209     self.mox.StubOutWithMock(signal, 'signal')
210     self.mox.StubOutWithMock(signal, 'getsignal')
211     self.mox.StubOutWithMock(cros_signals, 'SignalModuleUsable')
212     self.proc_mock = self.mox.CreateMockAnything()
213     self.error = 'test error'
214     self.output = 'test output'
215
216   @contextlib.contextmanager
217   def _SetupPopen(self, cmd, **kwargs):
218     cros_signals.SignalModuleUsable().AndReturn(True)
219     ignore_sigint = kwargs.pop('ignore_sigint', False)
220
221     for val in ('cwd', 'stdin', 'stdout', 'stderr'):
222       kwargs.setdefault(val, None)
223     kwargs.setdefault('shell', False)
224     kwargs.setdefault('env', mox.IgnoreArg())
225     kwargs['close_fds'] = True
226
227     # Make some arbitrary functors we can pretend are signal handlers.
228     # Note that these are intentionally defined on the fly via lambda-
229     # this is to ensure that they're unique to each run.
230     sigint_suppress = lambda signum, frame:None
231     sigint_suppress.__name__ = 'sig_ign_sigint'
232     normal_sigint = lambda signum, frame:None
233     normal_sigint.__name__ = 'sigint'
234     normal_sigterm = lambda signum, frame:None
235     normal_sigterm.__name__ = 'sigterm'
236
237     # If requested, RunCommand will ignore sigints; record that.
238     if ignore_sigint:
239       signal.signal(signal.SIGINT, signal.SIG_IGN).AndReturn(sigint_suppress)
240     else:
241       signal.getsignal(signal.SIGINT).AndReturn(normal_sigint)
242       signal.signal(signal.SIGINT, mox.IgnoreArg()).AndReturn(normal_sigint)
243     signal.getsignal(signal.SIGTERM).AndReturn(normal_sigterm)
244     signal.signal(signal.SIGTERM, mox.IgnoreArg()).AndReturn(normal_sigterm)
245
246     cros_build_lib._Popen(cmd, **kwargs).AndReturn(self.proc_mock)
247     yield self.proc_mock
248
249     # If it ignored them, RunCommand will restore sigints; record that.
250     if ignore_sigint:
251       signal.signal(signal.SIGINT, sigint_suppress).AndReturn(signal.SIG_IGN)
252     else:
253       signal.signal(signal.SIGINT, normal_sigint).AndReturn(None)
254     signal.signal(signal.SIGTERM, normal_sigterm).AndReturn(None)
255
256   def _AssertCrEqual(self, expected, actual):
257     """Helper method to compare two CommandResult objects.
258
259     This is needed since assertEqual does not know how to compare two
260     CommandResult objects.
261
262     Args:
263       expected: a CommandResult object, expected result.
264       actual: a CommandResult object, actual result.
265     """
266     self.assertEqual(expected.cmd, actual.cmd)
267     self.assertEqual(expected.error, actual.error)
268     self.assertEqual(expected.output, actual.output)
269     self.assertEqual(expected.returncode, actual.returncode)
270
271   @_ForceLoggingLevel
272   def _TestCmd(self, cmd, real_cmd, sp_kv=dict(), rc_kv=dict(), sudo=False):
273     """Factor out common setup logic for testing RunCommand().
274
275     Args:
276       cmd: a string or an array of strings that will be passed to RunCommand.
277       real_cmd: the real command we expect RunCommand to call (might be
278           modified to have enter_chroot).
279       sp_kv: key-value pairs passed to subprocess.Popen().
280       rc_kv: key-value pairs passed to RunCommand().
281       sudo: use SudoRunCommand() rather than RunCommand().
282     """
283     expected_result = cros_build_lib.CommandResult()
284     expected_result.cmd = real_cmd
285     expected_result.error = self.error
286     expected_result.output = self.output
287     expected_result.returncode = self.proc_mock.returncode
288
289     arg_dict = dict()
290     for attr in 'close_fds cwd env stdin stdout stderr shell'.split():
291       if attr in sp_kv:
292         arg_dict[attr] = sp_kv[attr]
293       else:
294         if attr == 'close_fds':
295           arg_dict[attr] = True
296         elif attr == 'shell':
297           arg_dict[attr] = False
298         else:
299           arg_dict[attr] = None
300
301     with self._SetupPopen(real_cmd,
302                           ignore_sigint=rc_kv.get('ignore_sigint'),
303                           **sp_kv) as proc:
304       proc.communicate(None).AndReturn((self.output, self.error))
305
306     self.mox.ReplayAll()
307     if sudo:
308       actual_result = cros_build_lib.SudoRunCommand(cmd, **rc_kv)
309     else:
310       actual_result = cros_build_lib.RunCommand(cmd, **rc_kv)
311     self.mox.VerifyAll()
312
313     self._AssertCrEqual(expected_result, actual_result)
314
315   def testReturnCodeZeroWithArrayCmd(self, ignore_sigint=False):
316     """--enter_chroot=False and --cmd is an array of strings.
317
318     Parameterized so this can also be used by some other tests w/ alternate
319     params to RunCommand().
320
321     Args:
322       ignore_sigint: If True, we'll tell RunCommand to ignore sigint.
323     """
324     self.proc_mock.returncode = 0
325     cmd_list = ['foo', 'bar', 'roger']
326     self._TestCmd(cmd_list, cmd_list,
327                   rc_kv=dict(ignore_sigint=ignore_sigint))
328
329   def testSignalRestoreNormalCase(self):
330     """Test RunCommand() properly sets/restores sigint.  Normal case."""
331     self.testReturnCodeZeroWithArrayCmd(ignore_sigint=True)
332
333   def testReturnCodeZeroWithArrayCmdEnterChroot(self):
334     """--enter_chroot=True and --cmd is an array of strings."""
335     self.proc_mock.returncode = 0
336     cmd_list = ['foo', 'bar', 'roger']
337     real_cmd = cmd_list
338     if not cros_build_lib.IsInsideChroot():
339       real_cmd = ['cros_sdk', '--'] + cmd_list
340     self._TestCmd(cmd_list, real_cmd, rc_kv=dict(enter_chroot=True))
341
342   @_ForceLoggingLevel
343   def testCommandFailureRaisesError(self, ignore_sigint=False):
344     """Verify error raised by communicate() is caught.
345
346     Parameterized so this can also be used by some other tests w/ alternate
347     params to RunCommand().
348
349     Args:
350       ignore_sigint: If True, we'll tell RunCommand to ignore sigint.
351     """
352     cmd = 'test cmd'
353
354
355     with self._SetupPopen(['/bin/bash', '-c', cmd],
356                           ignore_sigint=ignore_sigint) as proc:
357       proc.communicate(None).AndReturn((self.output, self.error))
358
359     self.mox.ReplayAll()
360     self.assertRaises(cros_build_lib.RunCommandError,
361                       cros_build_lib.RunCommand, cmd, shell=True,
362                       ignore_sigint=ignore_sigint, error_code_ok=False)
363     self.mox.VerifyAll()
364
365   @_ForceLoggingLevel
366   def testSubprocessCommunicateExceptionRaisesError(self, ignore_sigint=False):
367     """Verify error raised by communicate() is caught.
368
369     Parameterized so this can also be used by some other tests w/ alternate
370     params to RunCommand().
371
372     Args:
373       ignore_sigint: If True, we'll tell RunCommand to ignore sigint.
374     """
375     cmd = ['test', 'cmd']
376
377     with self._SetupPopen(cmd, ignore_sigint=ignore_sigint) as proc:
378       proc.communicate(None).AndRaise(ValueError)
379
380     self.mox.ReplayAll()
381     self.assertRaises(ValueError, cros_build_lib.RunCommand, cmd,
382                       ignore_sigint=ignore_sigint)
383     self.mox.VerifyAll()
384
385   def testSignalRestoreExceptionCase(self):
386     """Test RunCommand() properly sets/restores sigint.  Exception case."""
387     self.testSubprocessCommunicateExceptionRaisesError(ignore_sigint=True)
388
389   def testEnvWorks(self):
390     """Test RunCommand(..., env=xyz) works."""
391     # We'll put this bogus environment together, just to make sure
392     # subprocess.Popen gets passed it.
393     env = {'Tom': 'Jerry', 'Itchy': 'Scratchy'}
394
395     # This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
396     self.proc_mock.returncode = 0
397     cmd_list = ['foo', 'bar', 'roger']
398
399     # Run.  We expect the env= to be passed through from sp (subprocess.Popen)
400     # to rc (RunCommand).
401     self._TestCmd(cmd_list, cmd_list,
402                   sp_kv=dict(env=env),
403                   rc_kv=dict(env=env))
404
405   def testExtraEnvOnlyWorks(self):
406     """Test RunCommand(..., extra_env=xyz) works."""
407     # We'll put this bogus environment together, just to make sure
408     # subprocess.Popen gets passed it.
409     extra_env = {'Pinky' : 'Brain'}
410     ## This is a little bit circular, since the same logic is used to compute
411     ## the value inside, but at least it checks that this happens.
412     total_env = os.environ.copy()
413     total_env.update(extra_env)
414
415     # This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
416     self.proc_mock.returncode = 0
417     cmd_list = ['foo', 'bar', 'roger']
418
419     # Run.  We expect the env= to be passed through from sp (subprocess.Popen)
420     # to rc (RunCommand).
421     self._TestCmd(cmd_list, cmd_list,
422                   sp_kv=dict(env=total_env),
423                   rc_kv=dict(extra_env=extra_env))
424
425   def testExtraEnvTooWorks(self):
426     """Test RunCommand(..., env=xy, extra_env=z) works."""
427     # We'll put this bogus environment together, just to make sure
428     # subprocess.Popen gets passed it.
429     env = {'Tom': 'Jerry', 'Itchy': 'Scratchy'}
430     extra_env = {'Pinky': 'Brain'}
431     total_env = {'Tom': 'Jerry', 'Itchy': 'Scratchy', 'Pinky': 'Brain'}
432
433     # This is a simple case, copied from testReturnCodeZeroWithArrayCmd()
434     self.proc_mock.returncode = 0
435     cmd_list = ['foo', 'bar', 'roger']
436
437     # Run.  We expect the env= to be passed through from sp (subprocess.Popen)
438     # to rc (RunCommand).
439     self._TestCmd(cmd_list, cmd_list,
440                   sp_kv=dict(env=total_env),
441                   rc_kv=dict(env=env, extra_env=extra_env))
442
443   def testExceptionEquality(self):
444     """Verify equality methods for RunCommandError"""
445
446     c1 = cros_build_lib.CommandResult(cmd=['ls', 'arg'], returncode=1)
447     c2 = cros_build_lib.CommandResult(cmd=['ls', 'arg1'], returncode=1)
448     c3 = cros_build_lib.CommandResult(cmd=['ls', 'arg'], returncode=2)
449     e1 = cros_build_lib.RunCommandError('Message 1', c1)
450     e2 = cros_build_lib.RunCommandError('Message 1', c1)
451     e_diff_msg = cros_build_lib.RunCommandError('Message 2', c1)
452     e_diff_cmd = cros_build_lib.RunCommandError('Message 1', c2)
453     e_diff_code = cros_build_lib.RunCommandError('Message 1', c3)
454
455     self.assertEqual(e1, e2)
456     self.assertNotEqual(e1, e_diff_msg)
457     self.assertNotEqual(e1, e_diff_cmd)
458     self.assertNotEqual(e1, e_diff_code)
459
460   def testSudoRunCommand(self):
461     """Test SudoRunCommand(...) works."""
462     cmd_list = ['foo', 'bar', 'roger']
463     sudo_list = ['sudo', '--'] + cmd_list
464     self.proc_mock.returncode = 0
465     self._TestCmd(cmd_list, sudo_list, sudo=True)
466
467   def testSudoRunCommandShell(self):
468     """Test SudoRunCommand(..., shell=True) works."""
469     cmd = 'foo bar roger'
470     sudo_list = ['sudo', '--', '/bin/bash', '-c', cmd]
471     self.proc_mock.returncode = 0
472     self._TestCmd(cmd, sudo_list, sudo=True,
473                   rc_kv=dict(shell=True))
474
475   def testSudoRunCommandEnv(self):
476     """Test SudoRunCommand(..., extra_env=z) works."""
477     cmd_list = ['foo', 'bar', 'roger']
478     sudo_list = ['sudo', 'shucky=ducky', '--'] + cmd_list
479     extra_env = {'shucky' : 'ducky'}
480     self.proc_mock.returncode = 0
481     self._TestCmd(cmd_list, sudo_list, sudo=True,
482                   rc_kv=dict(extra_env=extra_env))
483
484   def testSudoRunCommandUser(self):
485     """Test SudoRunCommand(..., user='...') works."""
486     cmd_list = ['foo', 'bar', 'roger']
487     sudo_list = ['sudo', '-u', 'MMMMMonster', '--'] + cmd_list
488     self.proc_mock.returncode = 0
489     self._TestCmd(cmd_list, sudo_list, sudo=True,
490                   rc_kv=dict(user='MMMMMonster'))
491
492   def testSudoRunCommandUserShell(self):
493     """Test SudoRunCommand(..., user='...', shell=True) works."""
494     cmd = 'foo bar roger'
495     sudo_list = ['sudo', '-u', 'MMMMMonster', '--', '/bin/bash', '-c', cmd]
496     self.proc_mock.returncode = 0
497     self._TestCmd(cmd, sudo_list, sudo=True,
498                   rc_kv=dict(user='MMMMMonster', shell=True))
499
500
501 class TestRunCommandLogging(cros_test_lib.TempDirTestCase):
502   """Tests of RunCommand logging."""
503
504   @_ForceLoggingLevel
505   def testLogStdoutToFile(self):
506     # Make mox happy.
507     log = os.path.join(self.tempdir, 'output')
508     ret = cros_build_lib.RunCommand(
509         ['python', '-c', 'print "monkeys"'], log_stdout_to_file=log)
510     self.assertEqual(osutils.ReadFile(log), 'monkeys\n')
511     self.assertTrue(ret.output is None)
512     self.assertTrue(ret.error is None)
513
514     # Validate dumb api usage.
515     ret = cros_build_lib.RunCommand(
516         ['python', '-c', 'import sys;print "monkeys2"'],
517         log_stdout_to_file=log, redirect_stdout=True)
518     self.assertTrue(ret.output is None)
519     self.assertEqual(osutils.ReadFile(log), 'monkeys2\n')
520
521     os.unlink(log)
522     ret = cros_build_lib.RunCommand(
523         ['python', '-c', 'import sys;print >> sys.stderr, "monkeys3"'],
524         log_stdout_to_file=log, redirect_stderr=True)
525     self.assertEqual(ret.error, 'monkeys3\n')
526     self.assertTrue(os.path.exists(log))
527     self.assertEqual(os.stat(log).st_size, 0)
528
529     os.unlink(log)
530     ret = cros_build_lib.RunCommand(
531         ['python', '-u', '-c',
532          'import sys;print "monkeys4"\nprint >> sys.stderr, "monkeys5"\n'],
533         log_stdout_to_file=log, combine_stdout_stderr=True)
534     self.assertTrue(ret.output is None)
535     self.assertTrue(ret.error is None)
536
537     self.assertEqual(osutils.ReadFile(log), 'monkeys4\nmonkeys5\n')
538
539
540 class TestRetries(cros_test_lib.MoxTestCase):
541   """Tests of GenericRetry and relatives."""
542
543   def testGenericRetry(self):
544     source, source2 = iter(xrange(5)).next, iter(xrange(5)).next
545     def f():
546       val = source2()
547       self.assertEqual(val, source())
548       if val < 4:
549         raise ValueError()
550       return val
551     handler = lambda ex: isinstance(ex, ValueError)
552     self.assertRaises(ValueError, retry_util.GenericRetry, handler, 3, f)
553     self.assertEqual(4, retry_util.GenericRetry(handler, 1, f))
554     self.assertRaises(StopIteration, retry_util.GenericRetry, handler, 3, f)
555
556   def testRetryExceptionBadArgs(self):
557     """Verify we reject non-classes or tuples of classes"""
558     self.assertRaises(TypeError, retry_util.RetryException, '', 3, map)
559     self.assertRaises(TypeError, retry_util.RetryException, 123, 3, map)
560     self.assertRaises(TypeError, retry_util.RetryException, None, 3, map)
561     self.assertRaises(TypeError, retry_util.RetryException, [None], 3, map)
562
563   def testRetryException(self):
564     """Verify we retry only when certain exceptions get thrown"""
565     source, source2 = iter(xrange(6)).next, iter(xrange(6)).next
566     def f():
567       val = source2()
568       self.assertEqual(val, source())
569       if val < 2:
570         raise OSError()
571       if val < 5:
572         raise ValueError()
573       return val
574     self.assertRaises(OSError, retry_util.RetryException,
575                       (OSError, ValueError), 2, f)
576     self.assertRaises(ValueError, retry_util.RetryException,
577                       (OSError, ValueError), 1, f)
578     self.assertEqual(5, retry_util.RetryException(ValueError, 1, f))
579     self.assertRaises(StopIteration, retry_util.RetryException,
580                       ValueError, 3, f)
581
582   @osutils.TempDirDecorator
583   def testBasicRetry(self):
584     # pylint: disable=E1101
585     path = os.path.join(self.tempdir, 'script')
586     paths = {
587       'stop': os.path.join(self.tempdir, 'stop'),
588       'store': os.path.join(self.tempdir, 'store')
589     }
590     osutils.WriteFile(path,
591         "import sys\n"
592         "val = int(open(%(store)r).read())\n"
593         "stop_val = int(open(%(stop)r).read())\n"
594         "open(%(store)r, 'w').write(str(val + 1))\n"
595         "print val\n"
596         "sys.exit(0 if val == stop_val else 1)\n" % paths)
597
598     os.chmod(path, 0o755)
599
600     def _setup_counters(start, stop, sleep, sleep_cnt):
601       self.mox.ResetAll()
602       for i in xrange(sleep_cnt):
603         time.sleep(sleep * (i + 1))
604       self.mox.ReplayAll()
605
606       osutils.WriteFile(paths['store'], str(start))
607       osutils.WriteFile(paths['stop'], str(stop))
608
609     self.mox.StubOutWithMock(time, 'sleep')
610     self.mox.ReplayAll()
611
612     _setup_counters(0, 0, 0, 0)
613     command = ['python', path]
614     kwargs = {'redirect_stdout': True, 'print_cmd': False}
615
616     self.assertEqual(cros_build_lib.RunCommand(command, **kwargs).output, '0\n')
617
618     func = retry_util.RunCommandWithRetries
619
620     _setup_counters(2, 2, 0, 0)
621     self.assertEqual(func(0, command, sleep=0, **kwargs).output, '2\n')
622     self.mox.VerifyAll()
623
624     _setup_counters(0, 2, 1, 2)
625     self.assertEqual(func(2, command, sleep=1, **kwargs).output, '2\n')
626     self.mox.VerifyAll()
627
628     _setup_counters(0, 1, 2, 1)
629     self.assertEqual(func(1, command, sleep=2, **kwargs).output, '1\n')
630     self.mox.VerifyAll()
631
632     _setup_counters(0, 3, 3, 2)
633     self.assertRaises(cros_build_lib.RunCommandError,
634                       func, 2, command, sleep=3, **kwargs)
635     self.mox.VerifyAll()
636
637
638 class TestTimedCommand(cros_test_lib.MoxTestCase):
639   """Tests for TimedCommand()"""
640
641   def setUp(self):
642     self.mox.StubOutWithMock(cros_build_lib, 'RunCommand')
643
644   def testBasic(self):
645     """Make sure simple stuff works."""
646     cros_build_lib.RunCommand(['true', 'foo'])
647     self.mox.ReplayAll()
648
649     cros_build_lib.TimedCommand(cros_build_lib.RunCommand, ['true', 'foo'])
650
651   def testArgs(self):
652     """Verify passing of optional args to the destination function."""
653     cros_build_lib.RunCommand(':', shell=True, print_cmd=False,
654                               error_code_ok=False)
655     self.mox.ReplayAll()
656
657     cros_build_lib.TimedCommand(cros_build_lib.RunCommand, ':', shell=True,
658                                 print_cmd=False, error_code_ok=False)
659
660   def testLog(self):
661     """Verify logging does the right thing."""
662     self.mox.StubOutWithMock(cros_build_lib.logger, 'log')
663
664     cros_build_lib.RunCommand('fun', shell=True)
665     cros_build_lib.logger.log(mox.IgnoreArg(), 'msg! %s', mox.IgnoreArg())
666     self.mox.ReplayAll()
667
668     cros_build_lib.TimedCommand(cros_build_lib.RunCommand, 'fun',
669                                 timed_log_msg='msg! %s', shell=True)
670
671
672 class TestListFiles(cros_test_lib.TempDirTestCase):
673   """Tests of ListFiles funciton."""
674
675   def _CreateNestedDir(self, dir_structure):
676     for entry in dir_structure:
677       full_path = os.path.join(os.path.join(self.tempdir, entry))
678       # ensure dirs are created
679       try:
680         os.makedirs(os.path.dirname(full_path))
681         if full_path.endswith('/'):
682           # we only want to create directories
683           return
684       except OSError as err:
685         if err.errno == errno.EEXIST:
686           # we don't care if the dir already exists
687           pass
688         else:
689           raise
690       # create dummy files
691       tmp = open(full_path, 'w')
692       tmp.close()
693
694   def testTraverse(self):
695     """Test that we are traversing the directory properly."""
696     dir_structure = ['one/two/test.txt', 'one/blah.py',
697                      'three/extra.conf']
698     self._CreateNestedDir(dir_structure)
699
700     files = cros_build_lib.ListFiles(self.tempdir)
701     for f in files:
702       f = f.replace(self.tempdir, '').lstrip('/')
703       if f not in dir_structure:
704         self.fail('%s was not found in %s' % (f, dir_structure))
705
706   def testEmptyFilePath(self):
707     """Test that we return nothing when directories are empty."""
708     dir_structure = ['one/', 'two/', 'one/a/']
709     self._CreateNestedDir(dir_structure)
710     files = cros_build_lib.ListFiles(self.tempdir)
711     self.assertEqual(files, [])
712
713   def testNoSuchDir(self):
714     try:
715       cros_build_lib.ListFiles(os.path.join(self.tempdir, 'missing'))
716     except OSError as err:
717       self.assertEqual(err.errno, errno.ENOENT)
718
719
720 @contextlib.contextmanager
721 def SetTimeZone(tz):
722   """Temporarily set the timezone to the specified value.
723
724   This is needed because cros_test_lib.TestCase doesn't call time.tzset()
725   after resetting the environment.
726   """
727   old_environ = os.environ.copy()
728   try:
729     os.environ['TZ'] = tz
730     time.tzset()
731     yield
732   finally:
733     osutils.SetEnvironment(old_environ)
734     time.tzset()
735
736
737 class HelperMethodSimpleTests(cros_test_lib.TestCase):
738   """Tests for various helper methods without using mox."""
739
740   def _TestChromeosVersion(self, test_str, expected=None):
741     actual = cros_build_lib.GetChromeosVersion(test_str)
742     self.assertEqual(expected, actual)
743
744   def testGetChromeosVersionWithValidVersionReturnsValue(self):
745     expected = '0.8.71.2010_09_10_1530'
746     test_str = ' CHROMEOS_VERSION_STRING=0.8.71.2010_09_10_1530 '
747     self._TestChromeosVersion(test_str, expected)
748
749   def testGetChromeosVersionWithMultipleVersionReturnsFirstMatch(self):
750     expected = '0.8.71.2010_09_10_1530'
751     test_str = (' CHROMEOS_VERSION_STRING=0.8.71.2010_09_10_1530 '
752                 ' CHROMEOS_VERSION_STRING=10_1530 ')
753     self._TestChromeosVersion(test_str, expected)
754
755   def testGetChromeosVersionWithInvalidVersionReturnsDefault(self):
756     test_str = ' CHROMEOS_VERSION_STRING=invalid_version_string '
757     self._TestChromeosVersion(test_str)
758
759   def testGetChromeosVersionWithEmptyInputReturnsDefault(self):
760     self._TestChromeosVersion('')
761
762   def testGetChromeosVersionWithNoneInputReturnsDefault(self):
763     self._TestChromeosVersion(None)
764
765   def testUserDateTime(self):
766     """Test with a raw time value."""
767     expected = 'Mon, 16 Jun 1980 05:03:20 -0700 (PDT)'
768     with SetTimeZone('US/Pacific'):
769       timeval = 330005000
770       self.assertEqual(cros_build_lib.UserDateTimeFormat(timeval=timeval),
771                        expected)
772
773   def testUserDateTimeDateTime(self):
774     """Test with a datetime object."""
775     expected = 'Mon, 16 Jun 1980 00:00:00 -0700 (PDT)'
776     with SetTimeZone('US/Pacific'):
777       timeval = datetime.datetime(1980, 6, 16)
778       self.assertEqual(cros_build_lib.UserDateTimeFormat(timeval=timeval),
779                        expected)
780
781   def testUserDateTimeDateTimeInWinter(self):
782     """Test that we correctly switch from PDT to PST."""
783     expected = 'Wed, 16 Jan 1980 00:00:00 -0800 (PST)'
784     with SetTimeZone('US/Pacific'):
785       timeval = datetime.datetime(1980, 1, 16)
786       self.assertEqual(cros_build_lib.UserDateTimeFormat(timeval=timeval),
787                        expected)
788
789   def testUserDateTimeDateTimeInEST(self):
790     """Test that we correctly switch from PDT to EST."""
791     expected = 'Wed, 16 Jan 1980 00:00:00 -0500 (EST)'
792     with SetTimeZone('US/Eastern'):
793       timeval = datetime.datetime(1980, 1, 16)
794       self.assertEqual(cros_build_lib.UserDateTimeFormat(timeval=timeval),
795                        expected)
796
797   def testUserDateTimeCurrentTime(self):
798     """Test that we can get the current time."""
799     cros_build_lib.UserDateTimeFormat()
800
801   def testParseUserDateTimeFormat(self):
802     stringtime = cros_build_lib.UserDateTimeFormat(100000.0)
803     self.assertEqual(cros_build_lib.ParseUserDateTimeFormat(stringtime),
804                      100000.0)
805
806   def testParseDurationToSeconds(self):
807     self.assertEqual(cros_build_lib.ParseDurationToSeconds('1:01:01'),
808                      3600 + 60 + 1)
809
810 class YNInteraction():
811   """Class to hold a list of responses and expected reault of YN prompt."""
812   def __init__(self, responses, expected_result):
813     self.responses = responses
814     self.expected_result = expected_result
815
816
817 class TestInput(cros_test_lib.MoxOutputTestCase):
818   """Tests of input gathering functionality."""
819
820   def testGetInput(self):
821     self.mox.StubOutWithMock(__builtin__, 'raw_input')
822
823     prompt = 'Some prompt'
824     response = 'Some response'
825     __builtin__.raw_input(prompt).AndReturn(response)
826     self.mox.ReplayAll()
827
828     self.assertEquals(response, cros_build_lib.GetInput(prompt))
829     self.mox.VerifyAll()
830
831   def testBooleanPrompt(self):
832     self.mox.StubOutWithMock(cros_build_lib, 'GetInput')
833
834     for value in ('', '', 'yes', 'ye', 'y', 'no', 'n'):
835       cros_build_lib.GetInput(mox.IgnoreArg()).AndReturn(value)
836
837     self.mox.ReplayAll()
838     self.assertTrue(cros_build_lib.BooleanPrompt())
839     self.assertFalse(cros_build_lib.BooleanPrompt(default=False))
840     self.assertTrue(cros_build_lib.BooleanPrompt())
841     self.assertTrue(cros_build_lib.BooleanPrompt())
842     self.assertTrue(cros_build_lib.BooleanPrompt())
843     self.assertFalse(cros_build_lib.BooleanPrompt())
844     self.assertFalse(cros_build_lib.BooleanPrompt())
845     self.mox.VerifyAll()
846
847   def testBooleanShellValue(self):
848     """Verify BooleanShellValue() inputs work as expected"""
849     for v in (None,):
850       self.assertTrue(cros_build_lib.BooleanShellValue(v, True))
851       self.assertFalse(cros_build_lib.BooleanShellValue(v, False))
852
853     for v in (1234, '', 'akldjsf', '"'):
854       self.assertRaises(ValueError, cros_build_lib.BooleanShellValue, v, True)
855       self.assertTrue(cros_build_lib.BooleanShellValue(v, True, msg=''))
856       self.assertFalse(cros_build_lib.BooleanShellValue(v, False, msg=''))
857
858     for v in ('yes', 'YES', 'YeS', 'y', 'Y', '1', 'true', 'True', 'TRUE',):
859       self.assertTrue(cros_build_lib.BooleanShellValue(v, True))
860       self.assertTrue(cros_build_lib.BooleanShellValue(v, False))
861
862     for v in ('no', 'NO', 'nO', 'n', 'N', '0', 'false', 'False', 'FALSE',):
863       self.assertFalse(cros_build_lib.BooleanShellValue(v, True))
864       self.assertFalse(cros_build_lib.BooleanShellValue(v, False))
865
866
867 class TestContextManagerStack(cros_test_lib.TestCase):
868   """Test the ContextManagerStack class."""
869
870   def test(self):
871     invoked = []
872     counter = iter(itertools.count()).next
873     def _mk_kls(has_exception=None, exception_kls=None, suppress=False):
874       class foon(object):
875         """Simple context manager which runs checks on __exit__."""
876         marker = counter()
877         def __enter__(self):
878           return self
879
880         # pylint: disable=E0213
881         def __exit__(obj_self, exc_type, exc, traceback):
882           invoked.append(obj_self.marker)
883           if has_exception is not None:
884             self.assertTrue(all(x is not None
885                               for x in (exc_type, exc, traceback)))
886             self.assertTrue(exc_type == has_exception)
887           if exception_kls:
888             raise exception_kls()
889           if suppress:
890             return True
891       return foon
892
893     with cros_build_lib.ContextManagerStack() as stack:
894       # Note... these tests are in reverse, since the exception
895       # winds its way up the stack.
896       stack.Add(_mk_kls())
897       stack.Add(_mk_kls(ValueError, suppress=True))
898       stack.Add(_mk_kls(IndexError, exception_kls=ValueError))
899       stack.Add(_mk_kls(IndexError))
900       stack.Add(_mk_kls(exception_kls=IndexError))
901       stack.Add(_mk_kls())
902     self.assertEqual(invoked, list(reversed(range(6))))
903
904
905 class TestManifestCheckout(cros_test_lib.TempDirTestCase):
906   """Tests for ManifestCheckout functionality."""
907
908   def setUp(self):
909     self.manifest_dir = os.path.join(self.tempdir, '.repo', 'manifests')
910     # Initialize a repo intance here.
911     # TODO(vapier, ferringb):  mangle this so it inits from a local
912     # checkout if one is available, same for the git-repo fetch.
913     cmd = ['repo', 'init', '-u', constants.MANIFEST_URL]
914     cros_build_lib.RunCommand(cmd, cwd=self.tempdir, input='',
915                               capture_output=True)
916     self.active_manifest = os.path.realpath(
917         os.path.join(self.tempdir, '.repo', 'manifest.xml'))
918
919   def testManifestInheritance(self):
920     osutils.WriteFile(self.active_manifest, """
921         <manifest>
922           <include name="include-target.xml" />
923           <include name="empty.xml" />
924           <project name="monkeys" path="baz" remote="foon" revision="master" />
925         </manifest>""")
926     # First, verify it properly explodes if the include can't be found.
927     self.assertRaises(EnvironmentError,
928                       git.ManifestCheckout, self.tempdir)
929
930     # Next, verify it can read an empty manifest; this is to ensure
931     # that we can point Manifest at the empty manifest without exploding,
932     # same for ManifestCheckout; this sort of thing is primarily useful
933     # to ensure no step of an include assumes everything is yet assembled.
934     empty_path = os.path.join(self.manifest_dir, 'empty.xml')
935     osutils.WriteFile(empty_path, '<manifest/>')
936     git.Manifest(empty_path)
937     git.ManifestCheckout(self.tempdir, manifest_path=empty_path)
938
939     # Next, verify include works.
940     osutils.WriteFile(os.path.join(self.manifest_dir, 'include-target.xml'),
941         """
942         <manifest>
943           <remote name="foon" fetch="http://localhost" />
944         </manifest>""")
945     manifest = git.ManifestCheckout(self.tempdir)
946     self.assertEqual(list(manifest.checkouts_by_name), ['monkeys'])
947     self.assertEqual(list(manifest.remotes), ['foon'])
948
949   # pylint: disable=E1101
950   def testGetManifestsBranch(self):
951     func = git.ManifestCheckout._GetManifestsBranch
952     manifest = self.manifest_dir
953     repo_root = self.tempdir
954
955     # pylint: disable=W0613
956     def reconfig(merge='master', origin='origin'):
957       if merge is not None:
958         merge = 'refs/heads/%s' % merge
959       for key in ('merge', 'origin'):
960         val = locals()[key]
961         key = 'branch.default.%s' % key
962         if val is None:
963           git.RunGit(manifest, ['config', '--unset', key], error_code_ok=True)
964         else:
965           git.RunGit(manifest, ['config', key, val])
966
967     # First, verify our assumptions about a fresh repo init are correct.
968     self.assertEqual('default', git.GetCurrentBranch(manifest))
969     self.assertEqual('master', func(repo_root))
970
971     # Ensure we can handle a missing origin; this can occur jumping between
972     # branches, and can be worked around.
973     reconfig(origin=None)
974     self.assertEqual('default', git.GetCurrentBranch(manifest))
975     self.assertEqual('master', func(repo_root))
976
977     # TODO(ferringb): convert this over to assertRaises2
978     def assertExcept(message, **kwargs):
979       reconfig(**kwargs)
980       try:
981         func(repo_root)
982         assert "Testing for %s, an exception wasn't thrown." % (message,)
983       except OSError as e:
984         self.assertEqual(e.errno, errno.ENOENT)
985         self.assertTrue(message in str(e),
986                         msg="Couldn't find string %r in error message %r"
987                         % (message, str(e)))
988
989     # No merge target means the configuration isn't usable, period.
990     assertExcept("git tracking configuration for that branch is broken",
991                  merge=None)
992
993     # Ensure we detect if we're on the wrong branch, even if it has
994     # tracking setup.
995     git.RunGit(manifest, ['checkout', '-t', 'origin/master', '-b', 'test'])
996     assertExcept("It should be checked out to 'default'")
997
998     # Ensure we handle detached HEAD w/ an appropriate exception.
999     git.RunGit(manifest, ['checkout', '--detach', 'test'])
1000     assertExcept("It should be checked out to 'default'")
1001
1002     # Finally, ensure that if the default branch is non-existant, we still throw
1003     # a usable exception.
1004     git.RunGit(manifest, ['branch', '-d', 'default'])
1005     assertExcept("It should be checked out to 'default'")
1006
1007   def testGitMatchBranchName(self):
1008     git_repo = os.path.join(self.tempdir, '.repo', 'manifests')
1009
1010     branches = git.MatchBranchName(git_repo, 'default', namespace='')
1011     self.assertEqual(branches, ['refs/heads/default'])
1012
1013     branches = git.MatchBranchName(git_repo, 'default', namespace='refs/heads/')
1014     self.assertEqual(branches, ['default'])
1015
1016     branches = git.MatchBranchName(git_repo, 'origin/f.*link',
1017                                    namespace='refs/remotes/')
1018     self.assertTrue('firmware-link-' in branches[0])
1019
1020     branches = git.MatchBranchName(git_repo, 'r23')
1021     self.assertEqual(branches, ['refs/remotes/origin/release-R23-2913.B'])
1022
1023
1024 class Test_iflatten_instance(cros_test_lib.TestCase):
1025   """Test iflatten_instance function."""
1026
1027   def test_it(self):
1028     f = lambda *a: list(cros_build_lib.iflatten_instance(*a))
1029     self.assertEqual([1, 2], f([1, 2]))
1030     self.assertEqual([1, '2a'], f([1, '2a']))
1031     self.assertEqual([1, 2, 'b'], f([1, [2, 'b']]))
1032     self.assertEqual([1, 2, 'f', 'd', 'a', 's'], f([1, 2, ('fdas',)], int))
1033     self.assertEqual([''], f(''))
1034
1035
1036 class TestKeyValueFiles(cros_test_lib.TempDirTestCase):
1037   """Tests handling of key/value files."""
1038
1039   def setUp(self):
1040     self.contents = """# A comment !@
1041 A = 1
1042 AA= 2
1043 AAA =3
1044 AAAA\t=\t4
1045 AAAAA\t   \t=\t   5
1046 AAAAAA = 6     \t\t# Another comment
1047 \t
1048 \t# Aerith lives!
1049 C = 'D'
1050 CC= 'D'
1051 CCC ='D'
1052 \x20
1053  \t# monsters go boom #
1054 E \t= "Fxxxxx" # Blargl
1055 EE= "Faaa\taaaa"\x20
1056 EEE ="Fk  \t  kkkk"\t
1057 Q = "'q"
1058 \tQQ ="q'"\x20
1059  QQQ='"q"'\t
1060 R = "r
1061 "
1062 RR = "rr
1063 rrr"
1064 RRR = 'rrr
1065  RRRR
1066  rrr
1067 '
1068 SSS=" ss
1069 'ssss'
1070 ss"
1071 T="
1072 ttt"
1073 """
1074     self.expected = {
1075         'A': '1',
1076         'AA': '2',
1077         'AAA': '3',
1078         'AAAA': '4',
1079         'AAAAA': '5',
1080         'AAAAAA': '6',
1081         'C': 'D',
1082         'CC': 'D',
1083         'CCC': 'D',
1084         'E': 'Fxxxxx',
1085         'EE': 'Faaa\taaaa',
1086         'EEE': 'Fk  \t  kkkk',
1087         'Q': "'q",
1088         'QQ': "q'",
1089         'QQQ': '"q"',
1090         'R': 'r\n',
1091         'RR': 'rr\nrrr',
1092         'RRR': 'rrr\n RRRR\n rrr\n',
1093         'SSS': ' ss\n\'ssss\'\nss',
1094         'T': '\nttt'
1095     }
1096
1097     self.conf_file = os.path.join(self.tempdir, 'file.conf')
1098     osutils.WriteFile(self.conf_file, self.contents)
1099
1100   def _RunAndCompare(self, test_input, multiline):
1101     result = cros_build_lib.LoadKeyValueFile(test_input, multiline=multiline)
1102     self.assertEqual(self.expected, result)
1103
1104   def testLoadFilePath(self):
1105     """Verify reading a simple file works"""
1106     self._RunAndCompare(self.conf_file, True)
1107
1108   def testLoadStringIO(self):
1109     """Verify passing in StringIO object works."""
1110     self._RunAndCompare(StringIO.StringIO(self.contents), True)
1111
1112   def testLoadFileObject(self):
1113     """Verify passing in open file object works."""
1114     with open(self.conf_file) as f:
1115       self._RunAndCompare(f, True)
1116
1117   def testNoMultlineValues(self):
1118     """Verify exception is thrown when multiline is disabled."""
1119     self.assertRaises(ValueError, self._RunAndCompare, self.conf_file, False)
1120
1121
1122 class SafeRunTest(cros_test_lib.TestCase):
1123   """Tests SafeRunTest functionality."""
1124
1125   def _raise_exception(self, e):
1126     raise e
1127
1128   def testRunsSafely(self):
1129     """Verify that we are robust to exceptions."""
1130     def append_val(value):
1131       call_list.append(value)
1132
1133     call_list = []
1134     f_list = [functools.partial(append_val, 1),
1135               functools.partial(self._raise_exception,
1136                                 Exception('testRunsSafely exception.')),
1137               functools.partial(append_val, 2)]
1138     self.assertRaises(Exception, cros_build_lib.SafeRun, f_list)
1139     self.assertEquals(call_list, [1, 2])
1140
1141   def testRaisesFirstException(self):
1142     """Verify we raise the first exception when multiple are encountered."""
1143     class E1(Exception):
1144       """Simple exception class."""
1145       pass
1146
1147     class E2(Exception):
1148       """Simple exception class."""
1149       pass
1150
1151     f_list = [functools.partial(self._raise_exception, e) for e in [E1, E2]]
1152     self.assertRaises(E1, cros_build_lib.SafeRun, f_list)
1153
1154   def testCombinedRaise(self):
1155     """Raises a RuntimeError with exceptions combined."""
1156     f_list = [functools.partial(self._raise_exception, Exception())] * 3
1157     self.assertRaises(RuntimeError, cros_build_lib.SafeRun, f_list,
1158                       combine_exceptions=True)
1159
1160
1161 class FrozenAttributesTest(cros_test_lib.TestCase):
1162   """Tests FrozenAttributesMixin functionality."""
1163
1164   class DummyClass(object):
1165     """Any class that does not override __setattr__."""
1166
1167   class SetattrClass(object):
1168     """Class that does override __setattr__."""
1169     SETATTR_OFFSET = 10
1170     def __setattr__(self, attr, value):
1171       """Adjust value here to later confirm that this code ran."""
1172       object.__setattr__(self, attr, self.SETATTR_OFFSET + value)
1173
1174   def _TestBasics(self, cls):
1175     # pylint: disable=W0201
1176     def _Expected(val):
1177       return getattr(cls, 'SETATTR_OFFSET', 0) + val
1178
1179     obj = cls()
1180     obj.a = 1
1181     obj.b = 2
1182     self.assertEquals(_Expected(1), obj.a)
1183     self.assertEquals(_Expected(2), obj.b)
1184
1185     obj.Freeze()
1186     self.assertRaises(cros_build_lib.AttributeFrozenError, setattr, obj, 'a', 3)
1187     self.assertEquals(_Expected(1), obj.a)
1188
1189     self.assertRaises(cros_build_lib.AttributeFrozenError, setattr, obj, 'c', 3)
1190     self.assertFalse(hasattr(obj, 'c'))
1191
1192   def testFrozenByMetaclass(self):
1193     """Test attribute freezing with FrozenAttributesClass."""
1194     class DummyByMeta(self.DummyClass):
1195       """Class that freezes DummyClass using metaclass construct."""
1196       __metaclass__ = cros_build_lib.FrozenAttributesClass
1197
1198     self._TestBasics(DummyByMeta)
1199
1200     class SetattrByMeta(self.SetattrClass):
1201       """Class that freezes SetattrClass using metaclass construct."""
1202       __metaclass__ = cros_build_lib.FrozenAttributesClass
1203
1204     self._TestBasics(SetattrByMeta)
1205
1206   def testFrozenByMixinFirst(self):
1207     """Test attribute freezing with FrozenAttributesMixin first in hierarchy."""
1208     class Dummy(cros_build_lib.FrozenAttributesMixin, self.DummyClass):
1209       """Class that freezes DummyClass using mixin construct."""
1210
1211     self._TestBasics(Dummy)
1212
1213     class Setattr(cros_build_lib.FrozenAttributesMixin, self.SetattrClass):
1214       """Class that freezes SetattrClass using mixin construct."""
1215
1216     self._TestBasics(Setattr)
1217
1218   def testFrozenByMixinLast(self):
1219     """Test attribute freezing with FrozenAttributesMixin last in hierarchy."""
1220     class Dummy(self.DummyClass, cros_build_lib.FrozenAttributesMixin):
1221       """Class that freezes DummyClass using mixin construct."""
1222
1223     self._TestBasics(Dummy)
1224
1225     class Setattr(self.SetattrClass, cros_build_lib.FrozenAttributesMixin):
1226       """Class that freezes SetattrClass using mixin construct."""
1227
1228     self._TestBasics(Setattr)
1229
1230
1231 class TestGetIPv4Address(RunCommandTestCase):
1232   """Tests the GetIPv4Address function."""
1233
1234   IP_GLOBAL_OUTPUT = """
1235 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN
1236     link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
1237 2: eth0: <NO-CARRIER,BROADCAST,MULTICAST,UP> mtu 1500 qdisc pfifo_fast state \
1238 DOWN qlen 1000
1239     link/ether cc:cc:cc:cc:cc:cc brd ff:ff:ff:ff:ff:ff
1240 3: eth1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP \
1241 qlen 1000
1242     link/ether dd:dd:dd:dd:dd:dd brd ff:ff:ff:ff:ff:ff
1243     inet 111.11.11.111/22 brd 111.11.11.255 scope global eth1
1244     inet6 cdef:0:cdef:cdef:cdef:cdef:cdef:cdef/64 scope global dynamic
1245        valid_lft 2592000sec preferred_lft 604800sec
1246 """
1247
1248   def testGetIPv4AddressParseResult(self):
1249     """Verifies we can parse the output and get correct IP address."""
1250     self.rc.AddCmdResult(partial_mock.In('ip'), output=self.IP_GLOBAL_OUTPUT)
1251     self.assertEqual(cros_build_lib.GetIPv4Address(), '111.11.11.111')
1252
1253   def testGetIPv4Address(self):
1254     """Tests that correct shell commmand is called."""
1255     cros_build_lib.GetIPv4Address(global_ip=False, dev='eth0')
1256     self.rc.assertCommandContains(
1257         ['ip', 'addr', 'show', 'scope', 'host', 'dev', 'eth0'])
1258
1259     cros_build_lib.GetIPv4Address(global_ip=True)
1260     self.rc.assertCommandContains(['ip', 'addr', 'show', 'scope', 'global'])
1261
1262
1263 class TestGetChrootVersion(cros_test_lib.MockTestCase):
1264   """Tests GetChrootVersion functionality."""
1265
1266   def testSimpleBuildroot(self):
1267     """Verify buildroot arg works"""
1268     read_mock = self.PatchObject(osutils, 'ReadFile', return_value='12\n')
1269     ret = cros_build_lib.GetChrootVersion(buildroot='/build/root')
1270     self.assertEqual(ret, '12')
1271     read_mock.assert_called_with('/build/root/chroot/etc/cros_chroot_version')
1272
1273   def testSimpleChroot(self):
1274     """Verify chroot arg works"""
1275     read_mock = self.PatchObject(osutils, 'ReadFile', return_value='70')
1276     ret = cros_build_lib.GetChrootVersion(chroot='/ch/root')
1277     self.assertEqual(ret, '70')
1278     read_mock.assert_called_with('/ch/root/etc/cros_chroot_version')
1279
1280   def testNoChroot(self):
1281     """Verify we don't blow up when there is no chroot yet"""
1282     ret = cros_build_lib.GetChrootVersion(chroot='/.$om3/place/nowhere')
1283     self.assertEqual(ret, None)
1284
1285
1286 class TestChrootPathHelpers(cros_test_lib.TestCase):
1287   """Verify we correctly reinterpret paths to be used inside/outside chroot."""
1288
1289   @mock.patch('chromite.lib.cros_build_lib.IsInsideChroot', return_value=True)
1290   def testToChrootPathInChroot(self, _inchroot_mock):
1291     """Test we return the original path to be used in chroot while in chroot."""
1292     path = '/foo/woo/bar'
1293     self.assertEqual(cros_build_lib.ToChrootPath(path), path)
1294
1295   @mock.patch('chromite.lib.cros_build_lib.IsInsideChroot', return_value=False)
1296   def testToChrootPathOutChroot(self, _inchroot_mock):
1297     """Test we convert the path to be used in chroot while outside chroot."""
1298     subpath = 'bar/haa/ooo'
1299     path = os.path.join(constants.SOURCE_ROOT, subpath)
1300     chroot_path = git.ReinterpretPathForChroot(path)
1301     self.assertEqual(cros_build_lib.ToChrootPath(path), chroot_path)
1302
1303   @mock.patch('chromite.lib.cros_build_lib.IsInsideChroot', return_value=True)
1304   def testFromChrootInChroot(self, _inchroot_mock):
1305     """Test we return the original chroot path while in chroot."""
1306     path = '/foo/woo/bar'
1307     self.assertEqual(cros_build_lib.FromChrootPath(path), path)
1308
1309   @mock.patch('chromite.lib.cros_build_lib.IsInsideChroot', return_value=False)
1310   def testFromChrootOutChroot(self, _inchroot_mock):
1311     """Test we convert the chroot path to be used outside chroot."""
1312     # Test that chroot source root has been replaced in the path.
1313     subpath = 'foo/woo/bar'
1314     chroot_path = os.path.join(constants.CHROOT_SOURCE_ROOT, subpath)
1315     path = os.path.join(constants.SOURCE_ROOT, subpath)
1316     self.assertEqual(cros_build_lib.FromChrootPath(chroot_path), path)
1317
1318     # Test that a chroot path has been converted.
1319     chroot_path = '/foo/woo/bar'
1320     path = os.path.join(constants.SOURCE_ROOT,
1321                         constants.DEFAULT_CHROOT_DIR,
1322                         chroot_path.strip(os.path.sep))
1323     self.assertEqual(cros_build_lib.FromChrootPath(chroot_path), path)
1324
1325
1326 class CollectionTest(cros_test_lib.TestCase):
1327   """Tests for Collection helper."""
1328
1329   def testDefaults(self):
1330     """Verify default values kick in."""
1331     O = cros_build_lib.Collection('O', a=0, b='string', c={})
1332     o = O()
1333     self.assertEqual(o.a, 0)
1334     self.assertEqual(o.b, 'string')
1335     self.assertEqual(o.c, {})
1336
1337   def testOverrideDefaults(self):
1338     """Verify we can set custom values at instantiation time."""
1339     O = cros_build_lib.Collection('O', a=0, b='string', c={})
1340     o = O(a=1000)
1341     self.assertEqual(o.a, 1000)
1342     self.assertEqual(o.b, 'string')
1343     self.assertEqual(o.c, {})
1344
1345   def testSetNoNewMembers(self):
1346     """Verify we cannot add new members after the fact."""
1347     O = cros_build_lib.Collection('O', a=0, b='string', c={})
1348     o = O()
1349
1350     # Need the func since self.assertRaises evaluates the args in this scope.
1351     def _setit(collection):
1352       collection.does_not_exit = 10
1353     self.assertRaises(AttributeError, _setit, o)
1354     self.assertRaises(AttributeError, setattr, o, 'new_guy', 10)
1355
1356   def testGetNoNewMembers(self):
1357     """Verify we cannot get new members after the fact."""
1358     O = cros_build_lib.Collection('O', a=0, b='string', c={})
1359     o = O()
1360
1361     # Need the func since self.assertRaises evaluates the args in this scope.
1362     def _getit(collection):
1363       return collection.does_not_exit
1364     self.assertRaises(AttributeError, _getit, o)
1365     self.assertRaises(AttributeError, getattr, o, 'foooo')
1366
1367   def testNewValue(self):
1368     """Verify we change members correctly."""
1369     O = cros_build_lib.Collection('O', a=0, b='string', c={})
1370     o = O()
1371     o.a = 'a string'
1372     o.c = 123
1373     self.assertEqual(o.a, 'a string')
1374     self.assertEqual(o.b, 'string')
1375     self.assertEqual(o.c, 123)
1376
1377   def testString(self):
1378     """Make sure the string representation is readable by da hue mans."""
1379     O = cros_build_lib.Collection('O', a=0, b='string', c={})
1380     o = O()
1381     self.assertEqual("Collection_O(a=0, b='string', c={})", str(o))
1382
1383
1384 class GetImageDiskPartitionInfoTests(RunCommandTestCase):
1385   """Tests the GetImageDiskPartitionInfo function."""
1386
1387   SAMPLE_OUTPUT = """/foo/chromiumos_qemu_image.bin:3360MB:file:512:512:gpt:;
1388 11:0.03MB:8.42MB:8.39MB::RWFW:;
1389 6:8.42MB:8.42MB:0.00MB::KERN-C:;
1390 7:8.42MB:8.42MB:0.00MB::ROOT-C:;
1391 9:8.42MB:8.42MB:0.00MB::reserved:;
1392 10:8.42MB:8.42MB:0.00MB::reserved:;
1393 2:10.5MB:27.3MB:16.8MB::KERN-A:;
1394 4:27.3MB:44.0MB:16.8MB::KERN-B:;
1395 8:44.0MB:60.8MB:16.8MB:ext4:OEM:;
1396 12:128MB:145MB:16.8MB:fat16:EFI-SYSTEM:boot;
1397 5:145MB:2292MB:2147MB::ROOT-B:;
1398 3:2292MB:4440MB:2147MB:ext2:ROOT-A:;
1399 1:4440MB:7661MB:3221MB:ext4:STATE:;
1400 """
1401
1402   def setUp(self):
1403     self.rc.AddCmdResult(partial_mock.Ignore(), output=self.SAMPLE_OUTPUT)
1404
1405   def testNormalPath(self):
1406     partitions = cros_build_lib.GetImageDiskPartitionInfo('_ignored')
1407     # Because "reserved" is duplicated, we only have 11 key-value pairs.
1408     self.assertEqual(11, len(partitions))
1409     self.assertEqual(1, partitions['STATE'].number)
1410     self.assertEqual(2147, partitions['ROOT-A'].size)
1411
1412   def testKeyedByNumber(self):
1413     partitions = cros_build_lib.GetImageDiskPartitionInfo(
1414         '_ignored', key_selector='number'
1415     )
1416     self.assertEqual(12, len(partitions))
1417     self.assertEqual('STATE', partitions[1].name)
1418     self.assertEqual(2147, partitions[3].size)
1419     self.assertEqual('reserved', partitions[9].name)
1420     self.assertEqual('reserved', partitions[10].name)
1421
1422   def testChangeUnit(self):
1423
1424     def changeUnit(unit):
1425       cros_build_lib.GetImageDiskPartitionInfo('_ignored', unit)
1426       self.assertCommandContains(
1427           ['-m', '_ignored', 'unit', unit, 'print'],
1428       )
1429
1430     # We must use 2-char units here because the mocked output is in 'MB'.
1431     changeUnit('MB')
1432     changeUnit('KB')
1433
1434
1435 if __name__ == '__main__':
1436   cros_test_lib.main()