2 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
6 """Cros unit test library, with utility functions."""
8 from __future__ import print_function
28 from chromite.buildbot import constants
29 from chromite.buildbot import repository
37 if 'chromite' not in sys.modules:
38 # TODO(build): Finish test wrapper (http://crosbug.com/37517).
39 # Until then, we detect the chromite manipulation not yet having
40 # occurred, and inject it ourselves.
41 # We cannot just import chromite since this module is still accessed
42 # from non chromite.lib.cros_test_lib pathways (which will be resolved
43 # implicitly via 37517).
44 sys.path.insert(0, os.path.join(
45 os.path.dirname(os.path.abspath(__file__)), '../third_party'))
50 Directory = collections.namedtuple('Directory', ['name', 'contents'])
53 class GlobalTestConfig(object):
54 """Global configuration for tests."""
56 # By default, disable all network tests.
57 NETWORK_TESTS_DISABLED = True
60 def NetworkTest(reason='Skipping network test'):
61 """Decorator for unit tests. Skip the test if --network is not specified."""
62 def Decorator(test_item):
63 @functools.wraps(test_item)
64 def NetworkWrapper(*args, **kwargs):
65 if GlobalTestConfig.NETWORK_TESTS_DISABLED:
66 raise unittest.SkipTest(reason)
67 test_item(*args, **kwargs)
69 if not (isinstance(test_item, type) and issubclass(test_item, TestCase)):
76 def _FlattenStructure(base_path, dir_struct):
77 """Converts a directory structure to a list of paths."""
79 for obj in dir_struct:
80 if isinstance(obj, Directory):
81 new_base = os.path.join(base_path, obj.name).rstrip(os.sep)
82 flattened.append(new_base + os.sep)
83 flattened.extend(_FlattenStructure(new_base, obj.contents))
85 assert(isinstance(obj, basestring))
86 flattened.append(os.path.join(base_path, obj))
90 def CreateOnDiskHierarchy(base_path, dir_struct):
91 """Creates on-disk representation of an in-memory directory structure.
94 base_path: The absolute root of the directory structure.
95 dir_struct: A recursively defined data structure that represents a
96 directory tree. The basic form is a list. Elements can be file names or
97 cros_test_lib.Directory objects. The 'contents' attribute of Directory
98 types is a directory structure representing the contents of the directory.
101 - ['file1', Directory('directory', ['deepfile1', 'deepfile2']), 'file2']
103 flattened = _FlattenStructure(base_path, dir_struct)
105 f = os.path.join(base_path, f)
106 if f.endswith(os.sep):
107 osutils.SafeMakedirs(f)
109 osutils.Touch(f, makedirs=True)
112 def _VerifyDirectoryIterables(existing, expected):
113 """Compare two iterables representing contents of a directory.
115 Paths in |existing| and |expected| will be compared for exact match.
118 existing: An iterable containing paths that exist.
119 expected: An iterable of paths that are expected.
122 AssertionError when there is any divergence between |existing| and
125 def FormatPaths(paths):
126 return '\n'.join(sorted(paths))
128 existing = set(existing)
129 expected = set(expected)
131 unexpected = existing - expected
133 raise AssertionError('Found unexpected paths:\n%s'
134 % FormatPaths(unexpected))
135 missing = expected - existing
137 raise AssertionError('These files were expected but not found:\n%s'
138 % FormatPaths(missing))
141 def VerifyOnDiskHierarchy(base_path, dir_struct):
142 """Verify that an on-disk directory tree exactly matches a given structure.
145 base_path: See CreateOnDiskHierarchy()
146 dir_struct: See CreateOnDiskHierarchy()
149 AssertionError when there is any divergence between the on-disk
150 structure and the structure specified by 'dir_struct'.
152 expected = _FlattenStructure(base_path, dir_struct)
153 _VerifyDirectoryIterables(osutils.DirectoryIterator(base_path), expected)
156 def VerifyTarball(tarball, dir_struct):
157 """Compare the contents of a tarball against a directory structure.
160 tarball: Path to the tarball.
161 dir_struct: See CreateOnDiskHierarchy()
164 AssertionError when there is any divergence between the tarball and the
165 structure specified by 'dir_struct'.
167 contents = cros_build_lib.RunCommand(
168 ['tar', '-tf', tarball], capture_output=True).output.splitlines()
171 norm = os.path.normpath(p)
174 if norm in normalized:
175 raise AssertionError('Duplicate entry %r found in %r!' % (norm, tarball))
178 expected = _FlattenStructure('', dir_struct)
179 _VerifyDirectoryIterables(normalized, expected)
182 class StackedSetup(type):
183 """Metaclass to simplify unit testing and make it more robust.
185 A metaclass alters the way that classes are initialized, enabling us to
186 modify the class dictionary prior to the class being created. We use this
187 feature here to modify the way that unit tests work a bit.
189 This class does three things:
190 1) When a test case is set up or torn down, we now run all setUp and
191 tearDown methods in the inheritance tree.
192 2) If a setUp or tearDown method fails, we still run tearDown methods
193 for any test classes that were partially or completely set up.
194 3) All test cases time out after TEST_CASE_TIMEOUT seconds.
196 To use this class, set the following in your class:
197 __metaclass__ = StackedSetup
199 Since cros_test_lib.TestCase uses this metaclass, all derivatives of TestCase
200 also inherit the above behavior (unless they override the __metaclass__
204 TEST_CASE_TIMEOUT = 10 * 60
206 def __new__(mcs, name, bases, scope):
207 """Generate the new class with pointers to original funcs & our helpers"""
209 scope['__raw_setUp__'] = scope.pop('setUp')
210 scope['setUp'] = mcs._stacked_setUp
212 if 'tearDown' in scope:
213 scope['__raw_tearDown__'] = scope.pop('tearDown')
214 scope['tearDown'] = mcs._stacked_tearDown
216 # Modify all test* methods to time out after TEST_CASE_TIMEOUT seconds.
217 timeout = scope.get('TEST_CASE_TIMEOUT', StackedSetup.TEST_CASE_TIMEOUT)
218 if timeout is not None:
219 for name, func in scope.iteritems():
220 if name.startswith('test') and hasattr(func, '__call__'):
221 wrapper = timeout_util.TimeoutDecorator(timeout)
222 scope[name] = wrapper(func)
224 return type.__new__(mcs, name, bases, scope)
227 def _walk_mro_stacking(obj, attr, reverse=False):
228 """Walk the stacked classes (python method resolution order)"""
229 iterator = iter if reverse else reversed
230 methods = (getattr(x, attr, None) for x in iterator(obj.__class__.__mro__))
232 for x in filter(None, methods):
233 x = getattr(x, 'im_func', x)
239 def _stacked_setUp(obj):
240 """Run all the setUp funcs; if any fail, run all the tearDown funcs"""
241 obj.__test_was_run__ = False
243 for target in StackedSetup._walk_mro_stacking(obj, '__raw_setUp__'):
246 # TestCase doesn't trigger tearDowns if setUp failed; thus
247 # manually force it ourselves to ensure cleanup occurs.
248 StackedSetup._stacked_tearDown(obj)
251 # Now mark the object as fully setUp; this is done so that
252 # any last minute assertions in tearDown can know if they should
254 obj.__test_was_run__ = True
257 def _stacked_tearDown(obj):
258 """Run all the tearDown funcs; if any fail, we move on to the next one"""
260 for target in StackedSetup._walk_mro_stacking(obj, '__raw_tearDown__',
262 #pylint: disable=W0702
266 # Preserve the exception, throw it after running
267 # all tearDowns; we throw just the first also. We suppress
268 # pylint's warning here since it can't understand that we're
269 # actually raising the exception, just in a nonstandard way.
271 exc_info = sys.exc_info()
274 # Chuck the saved exception, w/ the same TB from
276 raise exc_info[0], exc_info[1], exc_info[2]
279 class TruthTable(object):
280 """Class to represent a boolean truth table, useful in unit tests.
282 If you find yourself testing the behavior of some function that should
283 basically follow the behavior of a particular truth table, then this class
284 can allow you to fully test that function without being overly verbose
285 in the unit test code.
287 The following usage is supported on a constructed TruthTable:
288 1) Iterate over input lines of the truth table, expressed as tuples of
290 2) Access a particular input line by index, expressed as a tuple of bools.
291 3) Access the expected output for a set of inputs.
293 For example, say function "Foo" in module "mod" should consists of the
297 return A and B and not C
299 In the unittest for Foo, do this:
302 truth_table = cros_test_lib.TruthTable(inputs=[(True, True, True)])
303 for inputs in truth_table:
305 result = mod.Foo(a, b, c)
306 self.assertEquals(result, truth_table.GetOutput(inputs))
309 class TruthTableInputIterator(object):
310 """Class to support iteration over inputs of a TruthTable."""
311 def __init__(self, truth_table):
312 self.truth_table = truth_table
322 if self.next_line < self.truth_table.num_lines:
324 return self.truth_table.GetInputs(self.next_line - 1)
326 raise StopIteration()
328 def __init__(self, inputs, input_result=True):
329 """Construct a TruthTable from given inputs.
332 inputs: Iterable of input lines, each expressed as a tuple of bools.
333 Each tuple must have the same length.
334 input_result: The output intended for each specified input. For
335 truth tables that mostly output True it is more concise to specify
336 the false inputs and then set input_result to False.
338 # At least one input required.
340 raise ValueError('Inputs required to construct TruthTable.')
342 # Save each input tuple in a set. Also confirm that the length
343 # of each input tuple is the same.
344 self.dimension = len(inputs[0])
345 self.num_lines = pow(2, self.dimension)
346 self.expected_inputs = set()
347 self.expected_inputs_result = input_result
349 for input_vals in inputs:
350 if len(input_vals) != self.dimension:
351 raise ValueError('All TruthTable inputs must have same dimension.')
353 self.expected_inputs.add(input_vals)
355 # Start generator index at 0.
359 return self.num_lines
362 return self.TruthTableInputIterator(self)
364 def GetInputs(self, inputs_index):
365 """Get the input line at the given input index.
368 inputs_index: Following must hold: 0 <= inputs_index < self.num_lines.
371 Tuple of bools representing one line of inputs.
373 if inputs_index >= 0 and inputs_index < self.num_lines:
376 # Iterate through each column in truth table. Any order will
377 # produce a valid truth table, but going backward through
378 # columns will produce the traditional truth table ordering.
379 # For 2-dimensional example: F,F then F,T then T,F then T,T.
380 for col in xrange(self.dimension - 1, -1, -1):
381 line_values.append(bool(inputs_index / pow(2, col) % 2))
383 return tuple(line_values)
385 raise ValueError('This truth table has no line at index %r.' % inputs_index)
387 def GetOutput(self, inputs):
388 """Get the boolean output for the given inputs.
391 inputs: Tuple of bools, length must be equal to self.dimension.
394 bool value representing truth table output for given inputs.
396 if not isinstance(inputs, tuple):
397 raise TypeError('Truth table inputs must be specified as a tuple.')
399 if not len(inputs) == self.dimension:
400 raise ValueError('Truth table inputs must match table dimension.')
402 return self.expected_inputs_result == (inputs in self.expected_inputs)
405 class EasyAttr(dict):
406 """Convenient class for simulating objects with attributes in tests.
408 An EasyAttr object can be created with any attributes initialized very
411 1) An object with .id=45 and .name="Joe":
412 testobj = EasyAttr(id=45, name="Joe")
413 2) An object with .title.text="Big" and .owner.text="Joe":
414 testobj = EasyAttr(title=EasyAttr(text="Big"), owner=EasyAttr(text="Joe"))
419 def __getattr__(self, attr):
423 raise AttributeError(attr)
425 def __delattr__(self, attr):
429 raise AttributeError(attr)
431 def __setattr__(self, attr, value):
438 class LogFilter(logging.Filter):
439 """A simple log filter that intercepts log messages and stores them."""
442 logging.Filter.__init__(self)
443 self.messages = cStringIO.StringIO()
445 def filter(self, record):
446 self.messages.write(record.getMessage() + '\n')
447 # Return False to prevent the message from being displayed.
451 class LoggingCapturer(object):
452 """Captures all messages emitted by the logging module."""
454 def __init__(self, logger_name=''):
455 self._log_filter = LogFilter()
456 self.logger_name = logger_name
459 self.StartCapturing()
462 def __exit__(self, exc_type, exc_val, exc_tb):
465 def StartCapturing(self):
466 """Begin capturing logging messages."""
467 logging.getLogger(self.logger_name).addFilter(self._log_filter)
470 def StopCapturing(self):
471 """Stop capturing logging messages."""
472 logging.getLogger(self.logger_name).removeFilter(self._log_filter)
476 return self._log_filter.messages.getvalue()
478 def LogsMatch(self, regex):
479 """Checks whether the logs match a given regex."""
480 match = re.search(regex, self.messages, re.MULTILINE)
481 return match is not None
483 def LogsContain(self, msg):
484 """Checks whether the logs contain a given string."""
485 return self.LogsMatch(re.escape(msg))
488 class OutputCapturer(object):
489 """Class with limited support for capturing test stdout/stderr output.
491 Class is designed as a 'ContextManager'. Example usage in a test method
492 of an object of TestCase:
494 with self.OutputCapturer() as output:
495 # Capturing of stdout/stderr automatically starts now.
496 # Do stuff that sends output to stdout/stderr.
497 # Capturing automatically stops at end of 'with' block.
499 # stdout/stderr can be retrieved from the OutputCapturer object:
500 stdout = output.GetStdoutLines() # Or other access methods
502 # Some Assert methods are only valid if capturing was used in test.
503 self.AssertOutputContainsError() # Or other related methods
506 # These work with error output from operation module.
507 OPER_MSG_SPLIT_RE = re.compile(r'^\033\[1;.*?\033\[0m$|^[^\n]*$',
508 re.DOTALL | re.MULTILINE)
509 ERROR_MSG_RE = re.compile(r'^\033\[1;%dm(.+?)(?:\033\[0m)+$' %
510 (30 + terminal.Color.RED,), re.DOTALL)
511 WARNING_MSG_RE = re.compile(r'^\033\[1;%dm(.+?)(?:\033\[0m)+$' %
512 (30 + terminal.Color.YELLOW,), re.DOTALL)
514 __slots__ = ['_stderr', '_stderr_cap', '_stdout', '_stdout_cap']
517 self._stdout = mock.patch('sys.stdout', new_callable=cStringIO.StringIO)
518 self._stderr = mock.patch('sys.stderr', new_callable=cStringIO.StringIO)
519 self._stderr_cap = self._stdout_cap = None
522 # This method is called with entering 'with' block.
523 self.StartCapturing()
526 def __exit__(self, exc_type, exc_val, exc_tb):
527 # This method is called when exiting 'with' block.
531 print('Exception during output capturing: %r' % (exc_val,))
532 stdout = self.GetStdout()
534 print('Captured stdout was:\n%s' % stdout)
536 print('No captured stdout')
537 stderr = self.GetStderr()
539 print('Captured stderr was:\n%s' % stderr)
541 print('No captured stderr')
543 def StartCapturing(self):
544 """Begin capturing stdout and stderr."""
545 self._stdout_cap = self._stdout.start()
546 self._stderr_cap = self._stderr.start()
548 def StopCapturing(self):
549 """Stop capturing stdout and stderr."""
553 def ClearCaptured(self):
554 """Clear any captured stdout/stderr content."""
555 self._stdout_cap = None
556 self._stderr_cap = None
559 """Return captured stdout so far."""
560 return self._stdout_cap.getvalue()
563 """Return captured stderr so far."""
564 return self._stderr_cap.getvalue()
566 def _GetOutputLines(self, output, include_empties):
567 """Split |output| into lines, optionally |include_empties|.
569 Return array of lines.
572 lines = self.OPER_MSG_SPLIT_RE.findall(output)
573 if not include_empties:
574 lines = [ln for ln in lines if ln]
578 def GetStdoutLines(self, include_empties=True):
579 """Return captured stdout so far as array of lines.
581 If |include_empties| is false filter out all empty lines.
583 return self._GetOutputLines(self.GetStdout(), include_empties)
585 def GetStderrLines(self, include_empties=True):
586 """Return captured stderr so far as array of lines.
588 If |include_empties| is false filter out all empty lines.
590 return self._GetOutputLines(self.GetStderr(), include_empties)
593 class TestCase(unittest.TestCase):
594 """Basic chromite test case.
596 Provides sane setUp/tearDown logic so that tearDown is correctly cleaned up.
598 Takes care of saving/restoring process-wide settings like the environment so
599 that sub-tests don't have to worry about gettings this right.
601 Also includes additional assert helpers beyond python stdlib.
604 __metaclass__ = StackedSetup
606 # List of vars chromite is globally sensitive to and that should
607 # be suppressed for tests.
608 ENVIRON_VARIABLE_SUPPRESSIONS = ('CROS_CACHEDIR',)
610 def __init__(self, *args, **kwargs):
611 unittest.TestCase.__init__(self, *args, **kwargs)
612 # This is set to keep pylint from complaining.
613 self.__test_was_run__ = False
616 self.__saved_env__ = os.environ.copy()
617 self.__saved_cwd__ = os.getcwd()
618 self.__saved_umask__ = os.umask(0o22)
619 for x in self.ENVIRON_VARIABLE_SUPPRESSIONS:
620 os.environ.pop(x, None)
623 osutils.SetEnvironment(self.__saved_env__)
624 os.chdir(self.__saved_cwd__)
625 os.umask(self.__saved_umask__)
627 def assertRaises2(self, exception, functor, *args, **kwargs):
628 """Like assertRaises, just with checking of the excpetion.
631 exception: The expected exception type to intecept.
632 functor: The function to invoke.
633 args: Positional args to pass to the function.
634 kwargs: Optional args to pass to the function. Note we pull
635 exact_kls, msg, and check_attrs from these kwargs.
636 exact_kls: If given, the exception raise must be *exactly* that class
637 type; derivatives are a failure.
638 check_attrs: If given, a mapping of attribute -> value to assert on
639 the resultant exception. Thus if you wanted to catch a ENOENT, you
641 assertRaises2(EnvironmentError, func, args,
642 attrs={"errno":errno.ENOENT})
643 msg: The error message to be displayed if the exception isn't raised.
644 If not given, a suitable one is defaulted to.
645 returns: The exception object.
647 exact_kls = kwargs.pop("exact_kls", None)
648 check_attrs = kwargs.pop("check_attrs", {})
649 msg = kwargs.pop("msg", None)
651 msg = ("%s(*%r, **%r) didn't throw an exception"
652 % (functor.__name__, args, kwargs))
654 functor(*args, **kwargs)
655 raise AssertionError(msg)
656 except exception as e:
658 self.assertEqual(e.__class__, exception)
660 for attr, required in check_attrs.iteritems():
661 self.assertTrue(hasattr(e, attr),
662 msg="%s lacks attr %s" % (e, attr))
663 value = getattr(e, attr)
664 if value != required:
665 bad.append("%s attr is %s, needed to be %s"
666 % (attr, value, required))
668 raise AssertionError("\n".join(bad))
671 def assertExists(self, path):
672 """Make sure |path| exists"""
673 if not os.path.exists(path):
674 msg = ['path is missing: %s' % path]
676 path = os.path.dirname(path)
678 # If we're given something like "foo", abort once we get to "".
680 result = os.path.exists(path)
681 msg.append('\tos.path.exists(%s): %s' % (path, result))
683 msg.append('\tcontents: %r' % os.listdir(path))
685 raise self.failureException('\n'.join(msg))
687 def assertNotExists(self, path):
688 """Make sure |path| does not exist"""
689 if os.path.exists(path):
690 raise self.failureException('path exists when it should not: %s' % path)
693 class LoggingTestCase(TestCase):
694 """Base class for logging capturer test cases."""
696 def AssertLogsMatch(self, log_capturer, regex, inverted=False):
697 """Verifies a regex matches the logs."""
698 assert_msg = '%r not found in %r' % (regex, log_capturer.messages)
699 assert_fn = self.assertTrue
701 assert_msg = '%r found in %r' % (regex, log_capturer.messages)
702 assert_fn = self.assertFalse
704 assert_fn(log_capturer.LogsMatch(regex), msg=assert_msg)
706 def AssertLogsContain(self, log_capturer, msg, inverted=False):
707 """Verifies a message is contained in the logs."""
708 return self.AssertLogsMatch(log_capturer, re.escape(msg), inverted=inverted)
711 class OutputTestCase(TestCase):
712 """Base class for cros unit tests with utility methods."""
714 def __init__(self, *args, **kwargs):
715 """Base class __init__ takes a second argument."""
716 TestCase.__init__(self, *args, **kwargs)
717 self._output_capturer = None
719 def OutputCapturer(self):
720 """Create and return OutputCapturer object."""
721 self._output_capturer = OutputCapturer()
722 return self._output_capturer
724 def _GetOutputCapt(self):
725 """Internal access to existing OutputCapturer.
727 Raises RuntimeError if output capturing was never on.
729 if self._output_capturer:
730 return self._output_capturer
732 raise RuntimeError('Output capturing was never turned on for this test.')
734 def _GenCheckMsgFunc(self, prefix_re, line_re):
735 """Return boolean func to check a line given |prefix_re| and |line_re|."""
738 # Prefix regexp will strip off prefix (and suffix) from line.
739 match = prefix_re.search(line)
742 line = match.group(1)
746 return line_re.search(line) if line_re else True
748 if isinstance(prefix_re, str):
749 prefix_re = re.compile(prefix_re)
750 if isinstance(line_re, str):
751 line_re = re.compile(line_re)
753 # Provide a description of what this function looks for in a line. Error
754 # messages can make use of this.
755 _method.description = None
756 if prefix_re and line_re:
757 _method.description = ('line matching prefix regexp %r then regexp %r' %
758 (prefix_re.pattern, line_re.pattern))
760 _method.description = 'line matching prefix regexp %r' % prefix_re.pattern
762 _method.description = 'line matching regexp %r' % line_re.pattern
764 raise RuntimeError('Nonsensical usage of _GenCheckMsgFunc: '
765 'no prefix_re or line_re')
769 def _ContainsMsgLine(self, lines, msg_check_func):
770 return any(msg_check_func(ln) for ln in lines)
772 def _GenOutputDescription(self, check_stdout, check_stderr):
773 # Some extra logic to make an error message useful.
774 if check_stdout and check_stderr:
775 return 'stdout or stderr'
781 def _AssertOutputContainsMsg(self, check_msg_func, invert,
782 check_stdout, check_stderr):
783 assert check_stdout or check_stderr
787 lines.extend(self._GetOutputCapt().GetStdoutLines())
789 lines.extend(self._GetOutputCapt().GetStderrLines())
791 result = self._ContainsMsgLine(lines, check_msg_func)
793 # Some extra logic to make an error message useful.
794 output_desc = self._GenOutputDescription(check_stdout, check_stderr)
797 msg = ('expected %s to not contain %s,\nbut found it in:\n%s' %
798 (output_desc, check_msg_func.description, lines))
799 self.assertFalse(result, msg=msg)
801 msg = ('expected %s to contain %s,\nbut did not find it in:\n%s' %
802 (output_desc, check_msg_func.description, lines))
803 self.assertTrue(result, msg=msg)
805 def AssertOutputContainsError(self, regexp=None, invert=False,
806 check_stdout=True, check_stderr=False):
807 """Assert requested output contains at least one error line.
809 If |regexp| is non-null, then the error line must also match it.
810 If |invert| is true, then assert the line is NOT found.
812 Raises RuntimeError if output capturing was never one for this test.
814 check_msg_func = self._GenCheckMsgFunc(OutputCapturer.ERROR_MSG_RE, regexp)
815 return self._AssertOutputContainsMsg(check_msg_func, invert,
816 check_stdout, check_stderr)
818 def AssertOutputContainsWarning(self, regexp=None, invert=False,
819 check_stdout=True, check_stderr=False):
820 """Assert requested output contains at least one warning line.
822 If |regexp| is non-null, then the warning line must also match it.
823 If |invert| is true, then assert the line is NOT found.
825 Raises RuntimeError if output capturing was never one for this test.
827 check_msg_func = self._GenCheckMsgFunc(OutputCapturer.WARNING_MSG_RE,
829 return self._AssertOutputContainsMsg(check_msg_func, invert,
830 check_stdout, check_stderr)
832 def AssertOutputContainsLine(self, regexp, invert=False,
833 check_stdout=True, check_stderr=False):
834 """Assert requested output contains line matching |regexp|.
836 If |invert| is true, then assert the line is NOT found.
838 Raises RuntimeError if output capturing was never one for this test.
840 check_msg_func = self._GenCheckMsgFunc(None, regexp)
841 return self._AssertOutputContainsMsg(check_msg_func, invert,
842 check_stdout, check_stderr)
844 def _AssertOutputEndsInMsg(self, check_msg_func,
845 check_stdout, check_stderr):
846 """Pass if requested output(s) ends(end) with an error message."""
847 assert check_stdout or check_stderr
851 stdout_lines = self._GetOutputCapt().GetStdoutLines(include_empties=False)
853 lines.append(stdout_lines[-1])
855 stderr_lines = self._GetOutputCapt().GetStderrLines(include_empties=False)
857 lines.append(stderr_lines[-1])
859 result = self._ContainsMsgLine(lines, check_msg_func)
861 # Some extra logic to make an error message useful.
862 output_desc = self._GenOutputDescription(check_stdout, check_stderr)
864 msg = ('expected %s to end with %s,\nbut did not find it in:\n%s' %
865 (output_desc, check_msg_func.description, lines))
866 self.assertTrue(result, msg=msg)
868 def AssertOutputEndsInError(self, regexp=None,
869 check_stdout=True, check_stderr=False):
870 """Assert requested output ends in error line.
872 If |regexp| is non-null, then the error line must also match it.
874 Raises RuntimeError if output capturing was never one for this test.
876 check_msg_func = self._GenCheckMsgFunc(OutputCapturer.ERROR_MSG_RE, regexp)
877 return self._AssertOutputEndsInMsg(check_msg_func,
878 check_stdout, check_stderr)
880 def AssertOutputEndsInWarning(self, regexp=None,
881 check_stdout=True, check_stderr=False):
882 """Assert requested output ends in warning line.
884 If |regexp| is non-null, then the warning line must also match it.
886 Raises RuntimeError if output capturing was never one for this test.
888 check_msg_func = self._GenCheckMsgFunc(OutputCapturer.WARNING_MSG_RE,
890 return self._AssertOutputEndsInMsg(check_msg_func,
891 check_stdout, check_stderr)
893 def AssertOutputEndsInLine(self, regexp,
894 check_stdout=True, check_stderr=False):
895 """Assert requested output ends in line matching |regexp|.
897 Raises RuntimeError if output capturing was never one for this test.
899 check_msg_func = self._GenCheckMsgFunc(None, regexp)
900 return self._AssertOutputEndsInMsg(check_msg_func,
901 check_stdout, check_stderr)
903 def FuncCatchSystemExit(self, func, *args, **kwargs):
904 """Run |func| with |args| and |kwargs| and catch exceptions.SystemExit.
906 Return tuple (return value or None, SystemExit number code or None).
909 returnval = func(*args, **kwargs)
911 return returnval, None
912 except exceptions.SystemExit as ex:
913 exit_code = ex.args[0]
914 return None, exit_code
916 def AssertFuncSystemExitZero(self, func, *args, **kwargs):
917 """Run |func| with |args| and |kwargs| catching exceptions.SystemExit.
919 If the func does not raise a SystemExit with exit code 0 then assert.
921 exit_code = self.FuncCatchSystemExit(func, *args, **kwargs)[1]
922 self.assertFalse(exit_code is None,
923 msg='Expected system exit code 0, but caught none')
924 self.assertTrue(exit_code == 0,
925 msg='Expected system exit code 0, but caught %d' %
928 def AssertFuncSystemExitNonZero(self, func, *args, **kwargs):
929 """Run |func| with |args| and |kwargs| catching exceptions.SystemExit.
931 If the func does not raise a non-zero SystemExit code then assert.
933 exit_code = self.FuncCatchSystemExit(func, *args, **kwargs)[1]
934 self.assertFalse(exit_code is None,
935 msg='Expected non-zero system exit code, but caught none')
936 self.assertFalse(exit_code == 0,
937 msg='Expected non-zero system exit code, but caught %d' %
940 def AssertRaisesAndReturn(self, error, func, *args, **kwargs):
941 """Like assertRaises, but return exception raised."""
943 func(*args, **kwargs)
944 self.assertTrue(False, msg='Expected %s but got none' % error)
949 class TempDirTestCase(TestCase):
950 """Mixin used to give each test a tempdir that is cleansed upon finish"""
954 def __init__(self, *args, **kwargs):
955 TestCase.__init__(self, *args, **kwargs)
957 self._tempdir_obj = None
960 self._tempdir_obj = osutils.TempDir(prefix='chromite.test', set_global=True)
961 self.tempdir = self._tempdir_obj.tempdir
964 if self._tempdir_obj is not None:
965 self._tempdir_obj.Cleanup()
967 self._tempdir_obj = None
970 class GerritTestCase(TempDirTestCase):
971 """Test class for tests that interact with a gerrit server.
973 The class setup creates and launches a stand-alone gerrit instance running on
974 localhost, for test methods to interact with. Class teardown stops and
975 deletes the gerrit instance.
977 Note that there is a single gerrit instance for ALL test methods in a
978 GerritTestCase sub-class.
981 TEST_USERNAME = 'test-username'
982 TEST_EMAIL = 'test-username@test.org'
984 # To help when debugging test code; setting this to 'False' (which happens if
985 # you provide the '-d' flag at the shell prompt) will leave the test gerrit
986 # instance running on localhost after the script exits. It is the
987 # responsibility of the user to kill the gerrit process!
990 GerritInstance = collections.namedtuple('GerritInstance', [
1007 def _create_gerrit_instance(cls, gerrit_dir):
1008 gerrit_init_script = os.path.join(
1009 osutils.FindDepotTools(), 'testing_support', 'gerrit-init.sh')
1010 http_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1011 http_sock.bind(('', 0))
1012 http_port = str(http_sock.getsockname()[1])
1013 ssh_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1014 ssh_sock.bind(('', 0))
1015 ssh_port = str(ssh_sock.getsockname()[1])
1017 # NOTE: this is not completely safe. These port numbers could be
1018 # re-assigned by the OS between the calls to socket.close() and gerrit
1019 # starting up. The only safe way to do this would be to pass file
1020 # descriptors down to the gerrit process, which is not even remotely
1024 cros_build_lib.RunCommand(
1025 ['bash', gerrit_init_script, '--http-port', http_port,
1026 '--ssh-port', ssh_port, gerrit_dir], quiet=True)
1028 gerrit_exe = os.path.join(gerrit_dir, 'bin', 'gerrit.sh')
1029 cros_build_lib.RunCommand(['bash', '-x', gerrit_exe, 'start'], quiet=True)
1030 gerrit_pid = int(osutils.ReadFile(
1031 os.path.join(gerrit_dir, 'logs', 'gerrit.pid')).rstrip())
1032 return cls.GerritInstance(
1033 credential_file=os.path.join(gerrit_dir, 'tmp', '.git-credentials'),
1034 gerrit_dir=gerrit_dir,
1035 gerrit_exe=gerrit_exe,
1036 gerrit_host='localhost:%s' % http_port,
1037 gerrit_pid=gerrit_pid,
1038 gerrit_url='http://localhost:%s' % http_port,
1039 git_dir=os.path.join(gerrit_dir, 'git'),
1040 git_host='%s/git' % gerrit_dir,
1041 git_url='file://%s/git' % gerrit_dir,
1042 http_port=http_port,
1043 netrc_file=os.path.join(gerrit_dir, 'tmp', '.netrc'),
1044 ssh_ident=os.path.join(gerrit_dir, 'tmp', 'id_rsa'),
1048 def setUpClass(cls):
1049 """Sets up the gerrit instances in a class-specific temp dir."""
1050 # Create gerrit instance.
1051 cls.gerritdir_obj = osutils.TempDir(set_global=False)
1052 gi = cls.gerrit_instance = cls._create_gerrit_instance(
1053 cls.gerritdir_obj.tempdir)
1055 # Set netrc file for http authentication.
1056 cls.netrc_patcher = mock.patch('chromite.lib.gob_util.NETRC',
1057 netrc.netrc(gi.netrc_file))
1058 cls.netrc_patcher.start()
1060 # gob_util only knows about https connections, and that's a good thing.
1061 # But for testing, it's much simpler to use http connections.
1062 cls.httplib_patcher = mock.patch(
1063 'httplib.HTTPSConnection', httplib.HTTPConnection)
1064 cls.httplib_patcher.start()
1065 cls.protocol_patcher = mock.patch(
1066 'chromite.lib.gob_util.GERRIT_PROTOCOL', 'http')
1067 cls.protocol_patcher.start()
1069 # Some of the chromite code requires read access to refs/meta/config.
1070 # Typically, that access should require http authentication. However,
1071 # because we use plain http to communicate with the test server, libcurl
1072 # (and by extension git commands that use it) will not add the
1073 # Authorization header to git transactions. So, we just allow anonymous
1074 # read access to refs/meta/config for the test code.
1075 clone_path = os.path.join(gi.gerrit_dir, 'tmp', 'All-Projects')
1076 cls._CloneProject('All-Projects', clone_path)
1077 project_config = os.path.join(clone_path, 'project.config')
1078 cros_build_lib.RunCommand(
1079 ['git', 'config', '--file', project_config, '--add',
1080 'access.refs/meta/config.read', 'group Anonymous Users'], quiet=True)
1081 cros_build_lib.RunCommand(
1082 ['git', 'add', project_config], cwd=clone_path, quiet=True)
1083 cros_build_lib.RunCommand(
1084 ['git', 'commit', '-m', 'Anonyous read for refs/meta/config'],
1085 cwd=clone_path, quiet=True)
1086 cros_build_lib.RunCommand(
1087 ['git', 'push', 'origin', 'HEAD:refs/meta/config'], cwd=clone_path,
1090 # Make all chromite code point to the test server.
1091 cls.constants_patcher = mock.patch.dict(constants.__dict__, {
1092 'EXTERNAL_GOB_HOST': gi.git_host,
1093 'EXTERNAL_GERRIT_HOST': gi.gerrit_host,
1094 'EXTERNAL_GOB_URL': gi.git_url,
1095 'EXTERNAL_GERRIT_URL': gi.gerrit_url,
1096 'INTERNAL_GOB_HOST': gi.git_host,
1097 'INTERNAL_GERRIT_HOST': gi.gerrit_host,
1098 'INTERNAL_GOB_URL': gi.git_url,
1099 'INTERNAL_GERRIT_URL': gi.gerrit_url,
1100 'MANIFEST_URL': '%s/%s' % (gi.git_url, constants.MANIFEST_PROJECT),
1101 'MANIFEST_INT_URL': '%s/%s' % (
1102 gi.git_url, constants.MANIFEST_INT_PROJECT),
1104 constants.EXTERNAL_REMOTE: gi.gerrit_url,
1105 constants.INTERNAL_REMOTE: gi.gerrit_url,
1106 constants.CHROMIUM_REMOTE: gi.gerrit_url,
1107 constants.CHROME_REMOTE: gi.gerrit_url,
1110 cls.constants_patcher.start()
1113 def createProject(cls, name, description='Test project', owners=None,
1114 submit_type='CHERRY_PICK'):
1115 """Create a project on the test gerrit server."""
1117 owners = ['Administrators']
1119 'description': description,
1120 'submit_type': submit_type,
1123 path = 'projects/%s' % urllib.quote(name, '')
1124 conn = gob_util.CreateHttpConn(
1125 cls.gerrit_instance.gerrit_host, path, reqtype='PUT', body=body)
1126 response = conn.getresponse()
1127 assert response.status == 201
1128 s = cStringIO.StringIO(response.read())
1129 assert s.readline().rstrip() == ")]}'"
1131 assert jmsg['name'] == name
1134 def _CloneProject(cls, name, path):
1135 """Clone a project from the test gerrit server."""
1136 osutils.SafeMakedirs(os.path.dirname(path))
1137 url = 'http://%s/%s' % (cls.gerrit_instance.gerrit_host, name)
1138 cros_build_lib.RunCommand(['git', 'clone', url, path], quiet=True)
1139 # Install commit-msg hook.
1140 hook_path = os.path.join(path, '.git', 'hooks', 'commit-msg')
1141 cros_build_lib.RunCommand(
1142 ['curl', '-o', hook_path,
1143 'http://%s/tools/hooks/commit-msg' % cls.gerrit_instance.gerrit_host],
1145 os.chmod(hook_path, stat.S_IRWXU)
1146 # Set git identity to test account
1147 cros_build_lib.RunCommand(
1148 ['git', 'config', 'user.email', cls.TEST_EMAIL], cwd=path, quiet=True)
1149 # Configure non-interactive credentials for git operations.
1150 config_path = os.path.join(path, '.git', 'config')
1151 cros_build_lib.RunCommand(
1152 ['git', 'config', '--file', config_path, 'credential.helper',
1153 'store --file=%s' % cls.gerrit_instance.credential_file], quiet=True)
1156 def cloneProject(self, name, path=None):
1157 """Clone a project from the test gerrit server."""
1159 path = os.path.basename(name)
1160 if path.endswith('.git'):
1162 path = os.path.join(self.tempdir, path)
1163 return self._CloneProject(name, path)
1166 def _CreateCommit(cls, clone_path, fn=None, msg=None, text=None):
1167 """Create a commit in the given git checkout."""
1169 fn = 'test-file.txt'
1171 msg = 'Test Message'
1173 text = 'Another day, another dollar.'
1174 fpath = os.path.join(clone_path, fn)
1175 osutils.WriteFile(fpath, '%s\n' % text, mode='a')
1176 cros_build_lib.RunCommand(['git', 'add', fn], cwd=clone_path, quiet=True)
1177 cros_build_lib.RunCommand(
1178 ['git', 'commit', '-m', msg], cwd=clone_path, quiet=True)
1179 return cls._GetCommit(clone_path)
1181 def createCommit(self, clone_path, fn=None, msg=None, text=None):
1182 """Create a commit in the given git checkout."""
1183 clone_path = os.path.join(self.tempdir, clone_path)
1184 return self._CreateCommit(clone_path, fn, msg, text)
1187 def _GetCommit(clone_path, ref='HEAD'):
1188 log_proc = cros_build_lib.RunCommand(
1189 ['git', 'log', '-n', '1', ref], cwd=clone_path,
1190 print_cmd=False, capture_output=True)
1193 for line in log_proc.output.splitlines():
1194 match = re.match(r'^commit ([0-9a-fA-F]{40})$', line)
1196 sha1 = match.group(1)
1198 match = re.match('^\s+Change-Id:\s*(\S+)$', line)
1200 change_id = match.group(1)
1202 return (sha1, change_id)
1204 def getCommit(self, clone_path, ref='HEAD'):
1205 """Get the sha1 and change-id for the head commit in a git checkout."""
1206 clone_path = os.path.join(self.tempdir, clone_path)
1207 (sha1, change_id) = self._GetCommit(clone_path, ref)
1208 self.assertTrue(sha1)
1209 self.assertTrue(change_id)
1210 return (sha1, change_id)
1213 def _UploadChange(clone_path, branch='master', remote='origin'):
1214 cros_build_lib.RunCommand(
1215 ['git', 'push', remote, 'HEAD:refs/for/%s' % branch], cwd=clone_path,
1218 def uploadChange(self, clone_path, branch='master', remote='origin'):
1219 """Create a gerrit CL from the HEAD of a git checkout."""
1220 clone_path = os.path.join(self.tempdir, clone_path)
1221 self._UploadChange(clone_path, branch, remote)
1224 def _PushBranch(clone_path, branch='master'):
1225 cros_build_lib.RunCommand(
1226 ['git', 'push', 'origin', 'HEAD:refs/heads/%s' % branch],
1227 cwd=clone_path, quiet=True)
1229 def pushBranch(self, clone_path, branch='master'):
1230 """Push a branch directly to gerrit, bypassing code review."""
1231 clone_path = os.path.join(self.tempdir, clone_path)
1232 self._PushBranch(clone_path, branch)
1235 def createAccount(cls, name='Test User', email='test-user@test.org',
1236 password=None, groups=None):
1237 """Create a new user account on gerrit."""
1238 username = urllib.quote(email.partition('@')[0])
1239 path = 'accounts/%s' % username
1246 body['http_password'] = password
1248 if isinstance(groups, basestring):
1250 body['groups'] = groups
1251 conn = gob_util.CreateHttpConn(
1252 cls.gerrit_instance.gerrit_host, path, reqtype='PUT', body=body)
1253 response = conn.getresponse()
1254 assert response.status == 201
1255 s = cStringIO.StringIO(response.read())
1256 assert s.readline().rstrip() == ")]}'"
1258 assert jmsg['email'] == email
1261 def _stop_gerrit(gerrit_obj):
1262 """Stops the running gerrit instance and deletes it."""
1264 # This should terminate the gerrit process.
1265 cros_build_lib.RunCommand(['bash', gerrit_obj.gerrit_exe, 'stop'],
1269 # cls.gerrit_pid should have already terminated. If it did, then
1270 # os.waitpid will raise OSError.
1271 os.waitpid(gerrit_obj.gerrit_pid, os.WNOHANG)
1272 except OSError as e:
1273 if e.errno == errno.ECHILD:
1274 # If gerrit shut down cleanly, os.waitpid will land here.
1275 # pylint: disable=W0150
1278 # If we get here, the gerrit process is still alive. Send the process
1279 # SIGKILL for good measure.
1281 os.kill(gerrit_obj.gerrit_pid, signal.SIGKILL)
1283 if e.errno == errno.ESRCH:
1284 # os.kill raised an error because the process doesn't exist. Maybe
1285 # gerrit shut down cleanly after all.
1286 # pylint: disable=W0150
1289 # Expose the fact that gerrit didn't shut down cleanly.
1290 cros_build_lib.Warning(
1291 'Test gerrit server (pid=%d) did not shut down cleanly.' % (
1292 gerrit_obj.gerrit_pid))
1295 def tearDownClass(cls):
1296 cls.httplib_patcher.stop()
1297 cls.protocol_patcher.stop()
1298 cls.constants_patcher.stop()
1300 cls._stop_gerrit(cls.gerrit_instance)
1301 cls.gerritdir_obj.Cleanup()
1303 # Prevent gerrit dir from getting cleaned up on interpreter exit.
1304 cls.gerritdir_obj.tempdir = None
1307 class GerritInternalTestCase(GerritTestCase):
1308 """Test class which runs separate internal and external gerrit instances."""
1311 def setUpClass(cls):
1312 GerritTestCase.setUpClass()
1313 cls.int_gerritdir_obj = osutils.TempDir(set_global=False)
1314 pgi = cls.gerrit_instance
1315 igi = cls.int_gerrit_instance = cls._create_gerrit_instance(
1316 cls.int_gerritdir_obj.tempdir)
1317 cls.int_constants_patcher = mock.patch.dict(constants.__dict__, {
1318 'INTERNAL_GOB_HOST': igi.git_host,
1319 'INTERNAL_GERRIT_HOST': igi.gerrit_host,
1320 'INTERNAL_GOB_URL': igi.git_url,
1321 'INTERNAL_GERRIT_URL': igi.gerrit_url,
1322 'MANIFEST_INT_URL': '%s/%s' % (
1323 igi.git_url, constants.MANIFEST_INT_PROJECT),
1325 constants.EXTERNAL_REMOTE: pgi.gerrit_url,
1326 constants.INTERNAL_REMOTE: igi.gerrit_url,
1327 constants.CHROMIUM_REMOTE: pgi.gerrit_url,
1328 constants.CHROME_REMOTE: igi.gerrit_url,
1331 cls.int_constants_patcher.start()
1334 def tearDownClass(cls):
1335 cls.int_constants_patcher.stop()
1337 cls._stop_gerrit(cls.int_gerrit_instance)
1338 cls.int_gerritdir_obj.Cleanup()
1340 # Prevent gerrit dir from getting cleaned up on interpreter exit.
1341 cls.int_gerritdir_obj.tempdir = None
1342 GerritTestCase.tearDownClass()
1345 class RepoTestCase(GerritTestCase):
1346 """Test class which runs in a repo checkout."""
1348 MANIFEST_TEMPLATE = """<?xml version="1.0" encoding="UTF-8"?>
1350 <remote name="%(EXTERNAL_REMOTE)s"
1351 fetch="%(PUBLIC_GOB_URL)s"
1352 review="%(PUBLIC_GERRIT_HOST)s" />
1353 <remote name="%(INTERNAL_REMOTE)s"
1354 fetch="%(INTERNAL_GOB_URL)s"
1355 review="%(INTERNAL_GERRIT_HOST)s" />
1356 <default revision="refs/heads/master" remote="%(EXTERNAL_REMOTE)s" sync-j="1" />
1357 <project remote="%(EXTERNAL_REMOTE)s" path="localpath/testproj1" name="remotepath/testproj1" />
1358 <project remote="%(EXTERNAL_REMOTE)s" path="localpath/testproj2" name="remotepath/testproj2" />
1359 <project remote="%(INTERNAL_REMOTE)s" path="localpath/testproj3" name="remotepath/testproj3" />
1360 <project remote="%(INTERNAL_REMOTE)s" path="localpath/testproj4" name="remotepath/testproj4" />
1365 def setUpClass(cls):
1366 GerritTestCase.setUpClass()
1368 # Patch in repo url on the gerrit test server.
1369 external_repo_url = constants.REPO_URL
1370 cls.repo_patcher = mock.patch.dict(constants.__dict__, {
1371 'REPO_URL': '%s/%s' % (
1372 cls.gerrit_instance.git_url, constants.REPO_PROJECT),
1374 cls.repo_patcher.start()
1376 # Create local mirror of repo tool repository.
1377 mirror_path = '%s.git' % (
1378 os.path.join(cls.gerrit_instance.git_dir, constants.REPO_PROJECT))
1379 cros_build_lib.RunCommand(
1380 ['git', 'clone', '--mirror', external_repo_url, mirror_path],
1383 # Check out the top-level repo script; it will be used for invocation.
1384 repo_clone_path = os.path.join(
1385 cls.gerrit_instance.gerrit_dir, 'tmp', 'repo')
1386 cros_build_lib.RunCommand(
1387 ['git', 'clone', '-n', constants.REPO_URL, repo_clone_path], quiet=True)
1388 cros_build_lib.RunCommand(
1389 ['git', 'checkout', 'origin/stable', 'repo'], cwd=repo_clone_path,
1391 osutils.RmDir(os.path.join(repo_clone_path, '.git'))
1392 cls.repo_exe = os.path.join(repo_clone_path, 'repo')
1394 # Create manifest repository.
1395 clone_path = os.path.join(cls.gerrit_instance.gerrit_dir, 'tmp', 'manifest')
1396 osutils.SafeMakedirs(clone_path)
1397 cros_build_lib.RunCommand(['git', 'init'], cwd=clone_path, quiet=True)
1398 manifest_path = os.path.join(clone_path, 'default.xml')
1399 osutils.WriteFile(manifest_path, cls.MANIFEST_TEMPLATE % constants.__dict__)
1400 cros_build_lib.RunCommand(
1401 ['git', 'add', 'default.xml'], cwd=clone_path, quiet=True)
1402 cros_build_lib.RunCommand(
1403 ['git', 'commit', '-m', 'Test manifest.'], cwd=clone_path, quiet=True)
1404 cls.createProject(constants.MANIFEST_PROJECT)
1405 cros_build_lib.RunCommand(
1406 ['git', 'push', constants.MANIFEST_URL, 'HEAD:refs/heads/master'],
1407 quiet=True, cwd=clone_path)
1408 cls.createProject(constants.MANIFEST_INT_PROJECT)
1409 cros_build_lib.RunCommand(
1410 ['git', 'push', constants.MANIFEST_INT_URL, 'HEAD:refs/heads/master'],
1411 cwd=clone_path, quiet=True)
1413 # Create project repositories.
1414 for i in xrange(1, 5):
1415 proj = 'testproj%d' % i
1416 cls.createProject('remotepath/%s' % proj)
1417 clone_path = os.path.join(cls.gerrit_instance.gerrit_dir, 'tmp', proj)
1418 cls._CloneProject('remotepath/%s' % proj, clone_path)
1419 cls._CreateCommit(clone_path)
1420 cls._PushBranch(clone_path, 'master')
1423 def runRepo(cls, *args, **kwargs):
1424 # Unfortunately, munging $HOME appears to be the only way to control the
1425 # netrc file used by repo.
1426 munged_home = os.path.join(cls.gerrit_instance.gerrit_dir, 'tmp')
1427 if 'env' not in kwargs:
1428 env = kwargs['env'] = os.environ.copy()
1429 env['HOME'] = munged_home
1431 env.setdefault('HOME', munged_home)
1432 args[0].insert(0, cls.repo_exe)
1433 kwargs.setdefault('quiet', True)
1434 return cros_build_lib.RunCommand(*args, **kwargs)
1437 def tearDownClass(cls):
1438 cls.repo_patcher.stop()
1439 GerritTestCase.tearDownClass()
1441 def uploadChange(self, clone_path, branch='master', remote='origin'):
1442 review_host = cros_build_lib.RunCommand(
1443 ['git', 'config', 'remote.%s.review' % remote],
1444 print_cmd=False, cwd=clone_path, capture_output=True).output.strip()
1446 projectname = cros_build_lib.RunCommand(
1447 ['git', 'config', 'remote.%s.projectname' % remote],
1448 print_cmd=False, cwd=clone_path, capture_output=True).output.strip()
1450 GerritTestCase._UploadChange(
1451 clone_path, branch=branch, remote='%s://%s/%s' % (
1452 gob_util.GERRIT_PROTOCOL, review_host, projectname))
1455 cros_build_lib.RunCommand(
1456 [self.repo_exe, 'init', '-u', constants.MANIFEST_URL, '--repo-url',
1457 constants.REPO_URL, '--no-repo-verify'], cwd=self.tempdir, quiet=True)
1458 self.repo = repository.RepoRepository(constants.MANIFEST_URL, self.tempdir)
1460 self.manifest = git.ManifestCheckout(self.tempdir)
1461 for i in xrange(1, 5):
1462 # Configure non-interactive credentials for git operations.
1463 proj = 'testproj%d' % i
1464 config_path = os.path.join(
1465 self.tempdir, 'localpath', proj, '.git', 'config')
1466 cros_build_lib.RunCommand(
1467 ['git', 'config', '--file', config_path, 'credential.helper',
1468 'store --file=%s' % self.gerrit_instance.credential_file],
1470 cros_build_lib.RunCommand(
1471 ['git', 'config', '--file', config_path, 'review.%s.upload' %
1472 self.gerrit_instance.gerrit_host, 'true'], quiet=True)
1474 # Make all google storage URL's point to the local filesystem.
1475 self.gs_url_patcher = mock.patch(
1476 'lib.chromite.gs.BASE_GS_URL',
1477 'file://%s' % os.path.join(self.tempdir, '.gs'))
1480 class _RunCommandMock(mox.MockObject):
1481 """Custom mock class used to suppress arguments we don't care about"""
1483 DEFAULT_IGNORED_ARGS = ('print_cmd',)
1485 def __call__(self, *args, **kwargs):
1486 for arg in self.DEFAULT_IGNORED_ARGS:
1487 kwargs.setdefault(arg, mox.IgnoreArg())
1488 return mox.MockObject.__call__(self, *args, **kwargs)
1491 class _LessAnnoyingMox(mox.Mox):
1492 """Mox derivative that slips in our suppressions to mox.
1494 This is used by default via MoxTestCase; namely, this suppresses
1495 certain arguments awareness that we don't care about via switching
1496 in (dependent on the namespace requested) overriding MockObject
1499 Via this, it makes maintenance much simpler- simplest example, if code
1500 doesn't explicitly assert that print_cmd must be true/false... then
1501 we don't care about what argument is set (it has no effect beyond output).
1502 Mox normally *would* care, making it a pita to maintain. This selectively
1503 suppresses that awareness, making it maintainable.
1506 mock_classes = {}.fromkeys(
1507 ['chromite.lib.cros_build_lib.%s' % x
1508 for x in dir(cros_build_lib) if "RunCommand" in x],
1512 def _GetNamespace(obj):
1513 return '%s.%s' % (obj.__module__, obj.__name__)
1515 def CreateMock(self, obj, attrs=None):
1518 kls = self.mock_classes.get(
1519 self._GetNamespace(obj), mox.MockObject)
1520 # Copy attrs; I don't trust mox to not be stupid here.
1521 new_mock = kls(obj, attrs=attrs)
1522 self._mock_objects.append(new_mock)
1526 class MoxTestCase(TestCase):
1527 """Mox based test case; compatible with StackedSetup
1529 Note: mox is deprecated; please use MockTestCase instead.
1532 mox_suppress_verify_all = False
1535 self.mox = _LessAnnoyingMox()
1536 self.stubs = mox.stubout.StubOutForTesting()
1540 if self.__test_was_run__ and not self.mox_suppress_verify_all:
1541 # This means the test code was actually ran.
1543 self.mox.VerifyAll()
1545 if hasattr(self, 'mox'):
1546 self.mox.UnsetStubs()
1547 if hasattr(self, 'stubs'):
1548 self.stubs.UnsetAll()
1549 self.stubs.SmartUnsetAll()
1552 class MoxTempDirTestCase(MoxTestCase, TempDirTestCase):
1553 """Convenience class mixing TempDir and Mox
1555 Note: mox is deprecated; please use MockTempDirTestCase instead.
1559 class MoxOutputTestCase(OutputTestCase, MoxTestCase):
1560 """Conevenience class mixing OutputTestCase and MoxTestCase
1562 Note: mox is deprecated; please use MockOutputTestCase instead.
1566 class MockTestCase(TestCase):
1567 """Python-mock based test case; compatible with StackedSetup"""
1572 # We can't just run stopall() by itself, and need to stop our patchers
1573 # manually since stopall() doesn't handle repatching.
1574 cros_build_lib.SafeRun([p.stop for p in reversed(self._patchers)] +
1575 [mock.patch.stopall])
1577 def StartPatcher(self, patcher):
1578 """Call start() on the patcher, and stop() in tearDown."""
1580 self._patchers.append(patcher)
1583 def PatchObject(self, *args, **kwargs):
1584 """Create and start a mock.patch.object().
1586 stop() will be called automatically during tearDown.
1588 return self.StartPatcher(mock.patch.object(*args, **kwargs))
1591 # MockTestCase must be before TempDirTestCase in this inheritance order,
1592 # because MockTestCase.StartPatcher() calls may be for PartialMocks, which
1593 # create their own temporary directory. The teardown for those directories
1594 # occurs during MockTestCase.tearDown(), which needs to be run before
1595 # TempDirTestCase.tearDown().
1596 class MockTempDirTestCase(MockTestCase, TempDirTestCase):
1597 """Convenience class mixing TempDir and Mock."""
1600 class MockOutputTestCase(MockTestCase, OutputTestCase):
1601 """Convenience class mixing Output and Mock."""
1604 class MockLoggingTestCase(MockTestCase, LoggingTestCase):
1605 """Convenience class mixing Logging and Mock."""
1608 def FindTests(directory, module_namespace=''):
1609 """Find all *_unittest.py, and return their python namespaces.
1612 directory: The directory to scan for tests.
1613 module_namespace: What namespace to prefix all found tests with.
1616 A list of python unittests in python namespace form.
1618 results = cros_build_lib.RunCommand(
1619 ['find', '.', '-name', '*_unittest.py', '-printf', '%P\n'],
1620 cwd=directory, print_cmd=False, capture_output=True).output.splitlines()
1621 # Drop the trailing .py, inject in the name if one was given.
1622 if module_namespace:
1623 module_namespace += '.'
1624 return [module_namespace + x[:-3].replace('/', '.') for x in results]
1628 """Helper wrapper around unittest.main. Invoke this, not unittest.main.
1630 Any passed in kwargs are passed directly down to unittest.main; via this, you
1631 can inject custom argv for example (to limit what tests run).
1633 # Default to exit=True; this matches old behaviour, and allows unittest
1634 # to trigger sys.exit on its own. Unfortunately, the exit keyword is only
1635 # available in 2.7- as such, handle it ourselves.
1636 allow_exit = kwargs.pop('exit', True)
1637 if '--network' in sys.argv:
1638 sys.argv.remove('--network')
1639 GlobalTestConfig.NETWORK_TESTS_DISABLED = False
1640 level = kwargs.pop('level', logging.CRITICAL)
1641 for flag in ('-d', '--debug'):
1642 if flag in sys.argv:
1643 sys.argv.remove(flag)
1644 level = logging.DEBUG
1645 GerritTestCase.TEARDOWN = False
1646 cros_build_lib.SetupBasicLogging(level)
1648 unittest.main(**kwargs)
1650 except SystemExit as e:
1651 if e.__class__ != SystemExit or allow_exit:
1653 # Redo the exit code ourselves- unittest throws True on occasion.
1654 # This is why the lack of typing for SystemExit code attribute makes life
1655 # suck, in parallel to unittest being special.
1656 # Finally, note that it's possible for code to be a string...
1657 if isinstance(e.code, (int, long)):
1658 # This is done since exit code may be something other than 1/0; if they
1659 # explicitly pass it, we'll honor it.
1661 return 1 if e.code else 0