Upstream version 8.36.161.0
[platform/framework/web/crosswalk.git] / src / third_party / chromite / lib / cros_test_lib.py
1 #!/usr/bin/python
2 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
5
6 """Cros unit test library, with utility functions."""
7
8 from __future__ import print_function
9 import collections
10 import cStringIO
11 import errno
12 import exceptions
13 import functools
14 import httplib
15 import json
16 import logging
17 import mox
18 import netrc
19 import os
20 import re
21 import signal
22 import socket
23 import stat
24 import sys
25 import unittest
26 import urllib
27
28 from chromite.buildbot import constants
29 from chromite.buildbot import repository
30 import cros_build_lib
31 import git
32 import gob_util
33 import osutils
34 import terminal
35 import timeout_util
36
37 if 'chromite' not in sys.modules:
38   # TODO(build): Finish test wrapper (http://crosbug.com/37517).
39   # Until then, we detect the chromite manipulation not yet having
40   # occurred, and inject it ourselves.
41   # We cannot just import chromite since this module is still accessed
42   # from non chromite.lib.cros_test_lib pathways (which will be resolved
43   # implicitly via 37517).
44   sys.path.insert(0, os.path.join(
45       os.path.dirname(os.path.abspath(__file__)), '../third_party'))
46
47 import mock
48
49
50 Directory = collections.namedtuple('Directory', ['name', 'contents'])
51
52
53 class GlobalTestConfig(object):
54   """Global configuration for tests."""
55
56   # By default, disable all network tests.
57   NETWORK_TESTS_DISABLED = True
58
59
60 def NetworkTest(reason='Skipping network test'):
61   """Decorator for unit tests. Skip the test if --network is not specified."""
62   def Decorator(test_item):
63     @functools.wraps(test_item)
64     def NetworkWrapper(*args, **kwargs):
65       if GlobalTestConfig.NETWORK_TESTS_DISABLED:
66         raise unittest.SkipTest(reason)
67       test_item(*args, **kwargs)
68
69     if not (isinstance(test_item, type) and issubclass(test_item, TestCase)):
70       return NetworkWrapper
71     return test_item
72
73   return Decorator
74
75
76 def _FlattenStructure(base_path, dir_struct):
77   """Converts a directory structure to a list of paths."""
78   flattened = []
79   for obj in dir_struct:
80     if isinstance(obj, Directory):
81       new_base = os.path.join(base_path, obj.name).rstrip(os.sep)
82       flattened.append(new_base + os.sep)
83       flattened.extend(_FlattenStructure(new_base, obj.contents))
84     else:
85       assert(isinstance(obj, basestring))
86       flattened.append(os.path.join(base_path, obj))
87   return flattened
88
89
90 def CreateOnDiskHierarchy(base_path, dir_struct):
91   """Creates on-disk representation of an in-memory directory structure.
92
93   Args:
94     base_path: The absolute root of the directory structure.
95     dir_struct: A recursively defined data structure that represents a
96       directory tree.  The basic form is a list.  Elements can be file names or
97       cros_test_lib.Directory objects.  The 'contents' attribute of Directory
98       types is a directory structure representing the contents of the directory.
99       Examples:
100         - ['file1', 'file2']
101         - ['file1', Directory('directory', ['deepfile1', 'deepfile2']), 'file2']
102   """
103   flattened = _FlattenStructure(base_path, dir_struct)
104   for f in flattened:
105     f = os.path.join(base_path, f)
106     if f.endswith(os.sep):
107       osutils.SafeMakedirs(f)
108     else:
109       osutils.Touch(f, makedirs=True)
110
111
112 def _VerifyDirectoryIterables(existing, expected):
113   """Compare two iterables representing contents of a directory.
114
115   Paths in |existing| and |expected| will be compared for exact match.
116
117   Args:
118     existing: An iterable containing paths that exist.
119     expected: An iterable of paths that are expected.
120
121   Raises:
122     AssertionError when there is any divergence between |existing| and
123     |expected|.
124   """
125   def FormatPaths(paths):
126     return '\n'.join(sorted(paths))
127
128   existing = set(existing)
129   expected = set(expected)
130
131   unexpected = existing - expected
132   if unexpected:
133     raise AssertionError('Found unexpected paths:\n%s'
134                          % FormatPaths(unexpected))
135   missing = expected - existing
136   if missing:
137     raise AssertionError('These files were expected but not found:\n%s'
138                          % FormatPaths(missing))
139
140
141 def VerifyOnDiskHierarchy(base_path, dir_struct):
142   """Verify that an on-disk directory tree exactly matches a given structure.
143
144   Args:
145     base_path: See CreateOnDiskHierarchy()
146     dir_struct: See CreateOnDiskHierarchy()
147
148   Raises:
149     AssertionError when there is any divergence between the on-disk
150     structure and the structure specified by 'dir_struct'.
151   """
152   expected = _FlattenStructure(base_path, dir_struct)
153   _VerifyDirectoryIterables(osutils.DirectoryIterator(base_path), expected)
154
155
156 def VerifyTarball(tarball, dir_struct):
157   """Compare the contents of a tarball against a directory structure.
158
159   Args:
160     tarball: Path to the tarball.
161     dir_struct: See CreateOnDiskHierarchy()
162
163   Raises:
164     AssertionError when there is any divergence between the tarball and the
165     structure specified by 'dir_struct'.
166   """
167   contents = cros_build_lib.RunCommand(
168       ['tar', '-tf', tarball], capture_output=True).output.splitlines()
169   normalized = set()
170   for p in contents:
171     norm = os.path.normpath(p)
172     if p.endswith('/'):
173       norm += '/'
174     if norm in normalized:
175       raise AssertionError('Duplicate entry %r found in %r!' % (norm, tarball))
176     normalized.add(norm)
177
178   expected = _FlattenStructure('', dir_struct)
179   _VerifyDirectoryIterables(normalized, expected)
180
181
182 class StackedSetup(type):
183   """Metaclass to simplify unit testing and make it more robust.
184
185   A metaclass alters the way that classes are initialized, enabling us to
186   modify the class dictionary prior to the class being created. We use this
187   feature here to modify the way that unit tests work a bit.
188
189   This class does three things:
190     1) When a test case is set up or torn down, we now run all setUp and
191        tearDown methods in the inheritance tree.
192     2) If a setUp or tearDown method fails, we still run tearDown methods
193        for any test classes that were partially or completely set up.
194     3) All test cases time out after TEST_CASE_TIMEOUT seconds.
195
196   To use this class, set the following in your class:
197     __metaclass__ = StackedSetup
198
199   Since cros_test_lib.TestCase uses this metaclass, all derivatives of TestCase
200   also inherit the above behavior (unless they override the __metaclass__
201   attribute manually.)
202   """
203
204   TEST_CASE_TIMEOUT = 10 * 60
205
206   def __new__(mcs, name, bases, scope):
207     """Generate the new class with pointers to original funcs & our helpers"""
208     if 'setUp' in scope:
209       scope['__raw_setUp__'] = scope.pop('setUp')
210     scope['setUp'] = mcs._stacked_setUp
211
212     if 'tearDown' in scope:
213       scope['__raw_tearDown__'] = scope.pop('tearDown')
214     scope['tearDown'] = mcs._stacked_tearDown
215
216     # Modify all test* methods to time out after TEST_CASE_TIMEOUT seconds.
217     timeout = scope.get('TEST_CASE_TIMEOUT', StackedSetup.TEST_CASE_TIMEOUT)
218     if timeout is not None:
219       for name, func in scope.iteritems():
220         if name.startswith('test') and hasattr(func, '__call__'):
221           wrapper = timeout_util.TimeoutDecorator(timeout)
222           scope[name] = wrapper(func)
223
224     return type.__new__(mcs, name, bases, scope)
225
226   @staticmethod
227   def _walk_mro_stacking(obj, attr, reverse=False):
228     """Walk the stacked classes (python method resolution order)"""
229     iterator = iter if reverse else reversed
230     methods = (getattr(x, attr, None) for x in iterator(obj.__class__.__mro__))
231     seen = set()
232     for x in filter(None, methods):
233       x = getattr(x, 'im_func', x)
234       if x not in seen:
235         seen.add(x)
236         yield x
237
238   @staticmethod
239   def _stacked_setUp(obj):
240     """Run all the setUp funcs; if any fail, run all the tearDown funcs"""
241     obj.__test_was_run__ = False
242     try:
243       for target in StackedSetup._walk_mro_stacking(obj, '__raw_setUp__'):
244         target(obj)
245     except:
246       # TestCase doesn't trigger tearDowns if setUp failed; thus
247       # manually force it ourselves to ensure cleanup occurs.
248       StackedSetup._stacked_tearDown(obj)
249       raise
250
251     # Now mark the object as fully setUp; this is done so that
252     # any last minute assertions in tearDown can know if they should
253     # run or not.
254     obj.__test_was_run__ = True
255
256   @staticmethod
257   def _stacked_tearDown(obj):
258     """Run all the tearDown funcs; if any fail, we move on to the next one"""
259     exc_info = None
260     for target in StackedSetup._walk_mro_stacking(obj, '__raw_tearDown__',
261                                                   True):
262       #pylint: disable=W0702
263       try:
264         target(obj)
265       except:
266         # Preserve the exception, throw it after running
267         # all tearDowns; we throw just the first also.  We suppress
268         # pylint's warning here since it can't understand that we're
269         # actually raising the exception, just in a nonstandard way.
270         if exc_info is None:
271           exc_info = sys.exc_info()
272
273     if exc_info:
274       # Chuck the saved exception, w/ the same TB from
275       # when it occurred.
276       raise exc_info[0], exc_info[1], exc_info[2]
277
278
279 class TruthTable(object):
280   """Class to represent a boolean truth table, useful in unit tests.
281
282   If you find yourself testing the behavior of some function that should
283   basically follow the behavior of a particular truth table, then this class
284   can allow you to fully test that function without being overly verbose
285   in the unit test code.
286
287   The following usage is supported on a constructed TruthTable:
288   1) Iterate over input lines of the truth table, expressed as tuples of
289   bools.
290   2) Access a particular input line by index, expressed as a tuple of bools.
291   3) Access the expected output for a set of inputs.
292
293   For example, say function "Foo" in module "mod" should consists of the
294   following code:
295
296   def Foo(A, B, C):
297     return A and B and not C
298
299   In the unittest for Foo, do this:
300
301   def testFoo(self):
302     truth_table = cros_test_lib.TruthTable(inputs=[(True, True, True)])
303     for inputs in truth_table:
304       a, b, c = inputs
305       result = mod.Foo(a, b, c)
306       self.assertEquals(result, truth_table.GetOutput(inputs))
307   """
308
309   class TruthTableInputIterator(object):
310     """Class to support iteration over inputs of a TruthTable."""
311     def __init__(self, truth_table):
312       self.truth_table = truth_table
313       self.next_line = 0
314
315     def __iter__(self):
316       return self
317
318     def __next__(self):
319       return self.next()
320
321     def next(self):
322       if self.next_line < self.truth_table.num_lines:
323         self.next_line += 1
324         return self.truth_table.GetInputs(self.next_line - 1)
325       else:
326         raise StopIteration()
327
328   def __init__(self, inputs, input_result=True):
329     """Construct a TruthTable from given inputs.
330
331     Args:
332       inputs: Iterable of input lines, each expressed as a tuple of bools.
333         Each tuple must have the same length.
334       input_result: The output intended for each specified input.  For
335         truth tables that mostly output True it is more concise to specify
336         the false inputs and then set input_result to False.
337     """
338     # At least one input required.
339     if not inputs:
340       raise ValueError('Inputs required to construct TruthTable.')
341
342     # Save each input tuple in a set.  Also confirm that the length
343     # of each input tuple is the same.
344     self.dimension = len(inputs[0])
345     self.num_lines = pow(2, self.dimension)
346     self.expected_inputs = set()
347     self.expected_inputs_result = input_result
348
349     for input_vals in inputs:
350       if len(input_vals) != self.dimension:
351         raise ValueError('All TruthTable inputs must have same dimension.')
352
353       self.expected_inputs.add(input_vals)
354
355     # Start generator index at 0.
356     self.next_line = 0
357
358   def __len__(self):
359     return self.num_lines
360
361   def __iter__(self):
362     return self.TruthTableInputIterator(self)
363
364   def GetInputs(self, inputs_index):
365     """Get the input line at the given input index.
366
367     Args:
368       inputs_index: Following must hold: 0 <= inputs_index < self.num_lines.
369
370     Returns:
371       Tuple of bools representing one line of inputs.
372     """
373     if inputs_index >= 0 and inputs_index < self.num_lines:
374       line_values = []
375
376       # Iterate through each column in truth table.  Any order will
377       # produce a valid truth table, but going backward through
378       # columns will produce the traditional truth table ordering.
379       # For 2-dimensional example: F,F then F,T then T,F then T,T.
380       for col in xrange(self.dimension - 1, -1, -1):
381         line_values.append(bool(inputs_index / pow(2, col) % 2))
382
383       return tuple(line_values)
384
385     raise ValueError('This truth table has no line at index %r.' % inputs_index)
386
387   def GetOutput(self, inputs):
388     """Get the boolean output for the given inputs.
389
390     Args:
391       inputs: Tuple of bools, length must be equal to self.dimension.
392
393     Returns:
394       bool value representing truth table output for given inputs.
395     """
396     if not isinstance(inputs, tuple):
397       raise TypeError('Truth table inputs must be specified as a tuple.')
398
399     if not len(inputs) == self.dimension:
400       raise ValueError('Truth table inputs must match table dimension.')
401
402     return self.expected_inputs_result == (inputs in self.expected_inputs)
403
404
405 class EasyAttr(dict):
406   """Convenient class for simulating objects with attributes in tests.
407
408   An EasyAttr object can be created with any attributes initialized very
409   easily.  Examples:
410
411   1) An object with .id=45 and .name="Joe":
412   testobj = EasyAttr(id=45, name="Joe")
413   2) An object with .title.text="Big" and .owner.text="Joe":
414   testobj = EasyAttr(title=EasyAttr(text="Big"), owner=EasyAttr(text="Joe"))
415   """
416
417   __slots__ = ()
418
419   def __getattr__(self, attr):
420     try:
421       return self[attr]
422     except KeyError:
423       raise AttributeError(attr)
424
425   def __delattr__(self, attr):
426     try:
427       self.pop(attr)
428     except KeyError:
429       raise AttributeError(attr)
430
431   def __setattr__(self, attr, value):
432     self[attr] = value
433
434   def __dir__(self):
435     return self.keys()
436
437
438 class LogFilter(logging.Filter):
439   """A simple log filter that intercepts log messages and stores them."""
440
441   def __init__(self):
442     logging.Filter.__init__(self)
443     self.messages = cStringIO.StringIO()
444
445   def filter(self, record):
446     self.messages.write(record.getMessage() + '\n')
447     # Return False to prevent the message from being displayed.
448     return False
449
450
451 class LoggingCapturer(object):
452   """Captures all messages emitted by the logging module."""
453
454   def __init__(self, logger_name=''):
455     self._log_filter = LogFilter()
456     self.logger_name = logger_name
457
458   def __enter__(self):
459     self.StartCapturing()
460     return self
461
462   def __exit__(self, exc_type, exc_val, exc_tb):
463     self.StopCapturing()
464
465   def StartCapturing(self):
466     """Begin capturing logging messages."""
467     logging.getLogger(self.logger_name).addFilter(self._log_filter)
468
469
470   def StopCapturing(self):
471     """Stop capturing logging messages."""
472     logging.getLogger(self.logger_name).removeFilter(self._log_filter)
473
474   @property
475   def messages(self):
476     return self._log_filter.messages.getvalue()
477
478   def LogsMatch(self, regex):
479     """Checks whether the logs match a given regex."""
480     match = re.search(regex, self.messages, re.MULTILINE)
481     return match is not None
482
483   def LogsContain(self, msg):
484     """Checks whether the logs contain a given string."""
485     return self.LogsMatch(re.escape(msg))
486
487
488 class OutputCapturer(object):
489   """Class with limited support for capturing test stdout/stderr output.
490
491   Class is designed as a 'ContextManager'.  Example usage in a test method
492   of an object of TestCase:
493
494   with self.OutputCapturer() as output:
495     # Capturing of stdout/stderr automatically starts now.
496     # Do stuff that sends output to stdout/stderr.
497     # Capturing automatically stops at end of 'with' block.
498
499   # stdout/stderr can be retrieved from the OutputCapturer object:
500   stdout = output.GetStdoutLines() # Or other access methods
501
502   # Some Assert methods are only valid if capturing was used in test.
503   self.AssertOutputContainsError() # Or other related methods
504   """
505
506   # These work with error output from operation module.
507   OPER_MSG_SPLIT_RE = re.compile(r'^\033\[1;.*?\033\[0m$|^[^\n]*$',
508                                  re.DOTALL | re.MULTILINE)
509   ERROR_MSG_RE = re.compile(r'^\033\[1;%dm(.+?)(?:\033\[0m)+$' %
510                             (30 + terminal.Color.RED,), re.DOTALL)
511   WARNING_MSG_RE = re.compile(r'^\033\[1;%dm(.+?)(?:\033\[0m)+$' %
512                               (30 + terminal.Color.YELLOW,), re.DOTALL)
513
514   __slots__ = ['_stderr', '_stderr_cap', '_stdout', '_stdout_cap']
515
516   def __init__(self):
517     self._stdout = mock.patch('sys.stdout', new_callable=cStringIO.StringIO)
518     self._stderr = mock.patch('sys.stderr', new_callable=cStringIO.StringIO)
519     self._stderr_cap = self._stdout_cap = None
520
521   def __enter__(self):
522     # This method is called with entering 'with' block.
523     self.StartCapturing()
524     return self
525
526   def __exit__(self, exc_type, exc_val, exc_tb):
527     # This method is called when exiting 'with' block.
528     self.StopCapturing()
529
530     if exc_type:
531       print('Exception during output capturing: %r' % (exc_val,))
532       stdout = self.GetStdout()
533       if stdout:
534         print('Captured stdout was:\n%s' % stdout)
535       else:
536         print('No captured stdout')
537       stderr = self.GetStderr()
538       if stderr:
539         print('Captured stderr was:\n%s' % stderr)
540       else:
541         print('No captured stderr')
542
543   def StartCapturing(self):
544     """Begin capturing stdout and stderr."""
545     self._stdout_cap = self._stdout.start()
546     self._stderr_cap = self._stderr.start()
547
548   def StopCapturing(self):
549     """Stop capturing stdout and stderr."""
550     self._stdout.stop()
551     self._stderr.stop()
552
553   def ClearCaptured(self):
554     """Clear any captured stdout/stderr content."""
555     self._stdout_cap = None
556     self._stderr_cap = None
557
558   def GetStdout(self):
559     """Return captured stdout so far."""
560     return self._stdout_cap.getvalue()
561
562   def GetStderr(self):
563     """Return captured stderr so far."""
564     return self._stderr_cap.getvalue()
565
566   def _GetOutputLines(self, output, include_empties):
567     """Split |output| into lines, optionally |include_empties|.
568
569     Return array of lines.
570     """
571
572     lines = self.OPER_MSG_SPLIT_RE.findall(output)
573     if not include_empties:
574       lines = [ln for ln in lines if ln]
575
576     return lines
577
578   def GetStdoutLines(self, include_empties=True):
579     """Return captured stdout so far as array of lines.
580
581     If |include_empties| is false filter out all empty lines.
582     """
583     return self._GetOutputLines(self.GetStdout(), include_empties)
584
585   def GetStderrLines(self, include_empties=True):
586     """Return captured stderr so far as array of lines.
587
588     If |include_empties| is false filter out all empty lines.
589     """
590     return self._GetOutputLines(self.GetStderr(), include_empties)
591
592
593 class TestCase(unittest.TestCase):
594   """Basic chromite test case.
595
596   Provides sane setUp/tearDown logic so that tearDown is correctly cleaned up.
597
598   Takes care of saving/restoring process-wide settings like the environment so
599   that sub-tests don't have to worry about gettings this right.
600
601   Also includes additional assert helpers beyond python stdlib.
602   """
603
604   __metaclass__ = StackedSetup
605
606   # List of vars chromite is globally sensitive to and that should
607   # be suppressed for tests.
608   ENVIRON_VARIABLE_SUPPRESSIONS = ('CROS_CACHEDIR',)
609
610   def __init__(self, *args, **kwargs):
611     unittest.TestCase.__init__(self, *args, **kwargs)
612     # This is set to keep pylint from complaining.
613     self.__test_was_run__ = False
614
615   def setUp(self):
616     self.__saved_env__ = os.environ.copy()
617     self.__saved_cwd__ = os.getcwd()
618     self.__saved_umask__ = os.umask(0o22)
619     for x in self.ENVIRON_VARIABLE_SUPPRESSIONS:
620       os.environ.pop(x, None)
621
622   def tearDown(self):
623     osutils.SetEnvironment(self.__saved_env__)
624     os.chdir(self.__saved_cwd__)
625     os.umask(self.__saved_umask__)
626
627   def assertRaises2(self, exception, functor, *args, **kwargs):
628     """Like assertRaises, just with checking of the excpetion.
629
630     args:
631       exception: The expected exception type to intecept.
632       functor: The function to invoke.
633       args: Positional args to pass to the function.
634       kwargs: Optional args to pass to the function.  Note we pull
635         exact_kls, msg, and check_attrs from these kwargs.
636       exact_kls: If given, the exception raise must be *exactly* that class
637         type; derivatives are a failure.
638       check_attrs: If given, a mapping of attribute -> value to assert on
639         the resultant exception.  Thus if you wanted to catch a ENOENT, you
640         would do:
641           assertRaises2(EnvironmentError, func, args,
642                         attrs={"errno":errno.ENOENT})
643       msg: The error message to be displayed if the exception isn't raised.
644         If not given, a suitable one is defaulted to.
645       returns: The exception object.
646     """
647     exact_kls = kwargs.pop("exact_kls", None)
648     check_attrs = kwargs.pop("check_attrs", {})
649     msg = kwargs.pop("msg", None)
650     if msg is None:
651       msg = ("%s(*%r, **%r) didn't throw an exception"
652              % (functor.__name__, args, kwargs))
653     try:
654       functor(*args, **kwargs)
655       raise AssertionError(msg)
656     except exception as e:
657       if exact_kls:
658         self.assertEqual(e.__class__, exception)
659       bad = []
660       for attr, required in check_attrs.iteritems():
661         self.assertTrue(hasattr(e, attr),
662                         msg="%s lacks attr %s" % (e, attr))
663         value = getattr(e, attr)
664         if value != required:
665           bad.append("%s attr is %s, needed to be %s"
666                      % (attr, value, required))
667       if bad:
668         raise AssertionError("\n".join(bad))
669       return e
670
671   def assertExists(self, path):
672     """Make sure |path| exists"""
673     if not os.path.exists(path):
674       msg = ['path is missing: %s' % path]
675       while path != '/':
676         path = os.path.dirname(path)
677         if not path:
678           # If we're given something like "foo", abort once we get to "".
679           break
680         result = os.path.exists(path)
681         msg.append('\tos.path.exists(%s): %s' % (path, result))
682         if result:
683           msg.append('\tcontents: %r' % os.listdir(path))
684           break
685       raise self.failureException('\n'.join(msg))
686
687   def assertNotExists(self, path):
688     """Make sure |path| does not exist"""
689     if os.path.exists(path):
690       raise self.failureException('path exists when it should not: %s' % path)
691
692
693 class LoggingTestCase(TestCase):
694   """Base class for logging capturer test cases."""
695
696   def AssertLogsMatch(self, log_capturer, regex, inverted=False):
697     """Verifies a regex matches the logs."""
698     assert_msg = '%r not found in %r' % (regex, log_capturer.messages)
699     assert_fn = self.assertTrue
700     if inverted:
701       assert_msg = '%r found in %r' % (regex, log_capturer.messages)
702       assert_fn = self.assertFalse
703
704     assert_fn(log_capturer.LogsMatch(regex), msg=assert_msg)
705
706   def AssertLogsContain(self, log_capturer, msg, inverted=False):
707     """Verifies a message is contained in the logs."""
708     return self.AssertLogsMatch(log_capturer, re.escape(msg), inverted=inverted)
709
710
711 class OutputTestCase(TestCase):
712   """Base class for cros unit tests with utility methods."""
713
714   def __init__(self, *args, **kwargs):
715     """Base class __init__ takes a second argument."""
716     TestCase.__init__(self, *args, **kwargs)
717     self._output_capturer = None
718
719   def OutputCapturer(self):
720     """Create and return OutputCapturer object."""
721     self._output_capturer = OutputCapturer()
722     return self._output_capturer
723
724   def _GetOutputCapt(self):
725     """Internal access to existing OutputCapturer.
726
727     Raises RuntimeError if output capturing was never on.
728     """
729     if self._output_capturer:
730       return self._output_capturer
731
732     raise RuntimeError('Output capturing was never turned on for this test.')
733
734   def _GenCheckMsgFunc(self, prefix_re, line_re):
735     """Return boolean func to check a line given |prefix_re| and |line_re|."""
736     def _method(line):
737       if prefix_re:
738         # Prefix regexp will strip off prefix (and suffix) from line.
739         match = prefix_re.search(line)
740
741         if match:
742           line = match.group(1)
743         else:
744           return False
745
746       return line_re.search(line) if line_re else True
747
748     if isinstance(prefix_re, str):
749       prefix_re = re.compile(prefix_re)
750     if isinstance(line_re, str):
751       line_re = re.compile(line_re)
752
753     # Provide a description of what this function looks for in a line.  Error
754     # messages can make use of this.
755     _method.description = None
756     if prefix_re and line_re:
757       _method.description = ('line matching prefix regexp %r then regexp %r' %
758                              (prefix_re.pattern, line_re.pattern))
759     elif prefix_re:
760       _method.description = 'line matching prefix regexp %r' % prefix_re.pattern
761     elif line_re:
762       _method.description = 'line matching regexp %r' % line_re.pattern
763     else:
764       raise RuntimeError('Nonsensical usage of _GenCheckMsgFunc: '
765                          'no prefix_re or line_re')
766
767     return _method
768
769   def _ContainsMsgLine(self, lines, msg_check_func):
770     return any(msg_check_func(ln) for ln in lines)
771
772   def _GenOutputDescription(self, check_stdout, check_stderr):
773     # Some extra logic to make an error message useful.
774     if check_stdout and check_stderr:
775       return 'stdout or stderr'
776     elif check_stdout:
777       return 'stdout'
778     elif check_stderr:
779       return 'stderr'
780
781   def _AssertOutputContainsMsg(self, check_msg_func, invert,
782                                check_stdout, check_stderr):
783     assert check_stdout or check_stderr
784
785     lines = []
786     if check_stdout:
787       lines.extend(self._GetOutputCapt().GetStdoutLines())
788     if check_stderr:
789       lines.extend(self._GetOutputCapt().GetStderrLines())
790
791     result = self._ContainsMsgLine(lines, check_msg_func)
792
793     # Some extra logic to make an error message useful.
794     output_desc = self._GenOutputDescription(check_stdout, check_stderr)
795
796     if invert:
797       msg = ('expected %s to not contain %s,\nbut found it in:\n%s' %
798              (output_desc, check_msg_func.description, lines))
799       self.assertFalse(result, msg=msg)
800     else:
801       msg = ('expected %s to contain %s,\nbut did not find it in:\n%s' %
802              (output_desc, check_msg_func.description, lines))
803       self.assertTrue(result, msg=msg)
804
805   def AssertOutputContainsError(self, regexp=None, invert=False,
806                                 check_stdout=True, check_stderr=False):
807     """Assert requested output contains at least one error line.
808
809     If |regexp| is non-null, then the error line must also match it.
810     If |invert| is true, then assert the line is NOT found.
811
812     Raises RuntimeError if output capturing was never one for this test.
813     """
814     check_msg_func = self._GenCheckMsgFunc(OutputCapturer.ERROR_MSG_RE, regexp)
815     return self._AssertOutputContainsMsg(check_msg_func, invert,
816                                          check_stdout, check_stderr)
817
818   def AssertOutputContainsWarning(self, regexp=None, invert=False,
819                                   check_stdout=True, check_stderr=False):
820     """Assert requested output contains at least one warning line.
821
822     If |regexp| is non-null, then the warning line must also match it.
823     If |invert| is true, then assert the line is NOT found.
824
825     Raises RuntimeError if output capturing was never one for this test.
826     """
827     check_msg_func = self._GenCheckMsgFunc(OutputCapturer.WARNING_MSG_RE,
828                                            regexp)
829     return self._AssertOutputContainsMsg(check_msg_func, invert,
830                                          check_stdout, check_stderr)
831
832   def AssertOutputContainsLine(self, regexp, invert=False,
833                                check_stdout=True, check_stderr=False):
834     """Assert requested output contains line matching |regexp|.
835
836     If |invert| is true, then assert the line is NOT found.
837
838     Raises RuntimeError if output capturing was never one for this test.
839     """
840     check_msg_func = self._GenCheckMsgFunc(None, regexp)
841     return self._AssertOutputContainsMsg(check_msg_func, invert,
842                                          check_stdout, check_stderr)
843
844   def _AssertOutputEndsInMsg(self, check_msg_func,
845                              check_stdout, check_stderr):
846     """Pass if requested output(s) ends(end) with an error message."""
847     assert check_stdout or check_stderr
848
849     lines = []
850     if check_stdout:
851       stdout_lines = self._GetOutputCapt().GetStdoutLines(include_empties=False)
852       if stdout_lines:
853         lines.append(stdout_lines[-1])
854     if check_stderr:
855       stderr_lines = self._GetOutputCapt().GetStderrLines(include_empties=False)
856       if stderr_lines:
857         lines.append(stderr_lines[-1])
858
859     result = self._ContainsMsgLine(lines, check_msg_func)
860
861     # Some extra logic to make an error message useful.
862     output_desc = self._GenOutputDescription(check_stdout, check_stderr)
863
864     msg = ('expected %s to end with %s,\nbut did not find it in:\n%s' %
865            (output_desc, check_msg_func.description, lines))
866     self.assertTrue(result, msg=msg)
867
868   def AssertOutputEndsInError(self, regexp=None,
869                               check_stdout=True, check_stderr=False):
870     """Assert requested output ends in error line.
871
872     If |regexp| is non-null, then the error line must also match it.
873
874     Raises RuntimeError if output capturing was never one for this test.
875     """
876     check_msg_func = self._GenCheckMsgFunc(OutputCapturer.ERROR_MSG_RE, regexp)
877     return self._AssertOutputEndsInMsg(check_msg_func,
878                                        check_stdout, check_stderr)
879
880   def AssertOutputEndsInWarning(self, regexp=None,
881                                 check_stdout=True, check_stderr=False):
882     """Assert requested output ends in warning line.
883
884     If |regexp| is non-null, then the warning line must also match it.
885
886     Raises RuntimeError if output capturing was never one for this test.
887     """
888     check_msg_func = self._GenCheckMsgFunc(OutputCapturer.WARNING_MSG_RE,
889                                            regexp)
890     return self._AssertOutputEndsInMsg(check_msg_func,
891                                        check_stdout, check_stderr)
892
893   def AssertOutputEndsInLine(self, regexp,
894                              check_stdout=True, check_stderr=False):
895     """Assert requested output ends in line matching |regexp|.
896
897     Raises RuntimeError if output capturing was never one for this test.
898     """
899     check_msg_func = self._GenCheckMsgFunc(None, regexp)
900     return self._AssertOutputEndsInMsg(check_msg_func,
901                                        check_stdout, check_stderr)
902
903   def FuncCatchSystemExit(self, func, *args, **kwargs):
904     """Run |func| with |args| and |kwargs| and catch exceptions.SystemExit.
905
906     Return tuple (return value or None, SystemExit number code or None).
907     """
908     try:
909       returnval = func(*args, **kwargs)
910
911       return returnval, None
912     except exceptions.SystemExit as ex:
913       exit_code = ex.args[0]
914       return None, exit_code
915
916   def AssertFuncSystemExitZero(self, func, *args, **kwargs):
917     """Run |func| with |args| and |kwargs| catching exceptions.SystemExit.
918
919     If the func does not raise a SystemExit with exit code 0 then assert.
920     """
921     exit_code = self.FuncCatchSystemExit(func, *args, **kwargs)[1]
922     self.assertFalse(exit_code is None,
923                       msg='Expected system exit code 0, but caught none')
924     self.assertTrue(exit_code == 0,
925                     msg='Expected system exit code 0, but caught %d' %
926                     exit_code)
927
928   def AssertFuncSystemExitNonZero(self, func, *args, **kwargs):
929     """Run |func| with |args| and |kwargs| catching exceptions.SystemExit.
930
931     If the func does not raise a non-zero SystemExit code then assert.
932     """
933     exit_code = self.FuncCatchSystemExit(func, *args, **kwargs)[1]
934     self.assertFalse(exit_code is None,
935                       msg='Expected non-zero system exit code, but caught none')
936     self.assertFalse(exit_code == 0,
937                      msg='Expected non-zero system exit code, but caught %d' %
938                      exit_code)
939
940   def AssertRaisesAndReturn(self, error, func, *args, **kwargs):
941     """Like assertRaises, but return exception raised."""
942     try:
943       func(*args, **kwargs)
944       self.assertTrue(False, msg='Expected %s but got none' % error)
945     except error as ex:
946       return ex
947
948
949 class TempDirTestCase(TestCase):
950   """Mixin used to give each test a tempdir that is cleansed upon finish"""
951
952   sudo_cleanup = False
953
954   def __init__(self, *args, **kwargs):
955     TestCase.__init__(self, *args, **kwargs)
956     self.tempdir = None
957     self._tempdir_obj = None
958
959   def setUp(self):
960     self._tempdir_obj = osutils.TempDir(prefix='chromite.test', set_global=True)
961     self.tempdir = self._tempdir_obj.tempdir
962
963   def tearDown(self):
964     if self._tempdir_obj is not None:
965       self._tempdir_obj.Cleanup()
966       self.tempdir = None
967       self._tempdir_obj = None
968
969
970 class GerritTestCase(TempDirTestCase):
971   """Test class for tests that interact with a gerrit server.
972
973   The class setup creates and launches a stand-alone gerrit instance running on
974   localhost, for test methods to interact with.  Class teardown stops and
975   deletes the gerrit instance.
976
977   Note that there is a single gerrit instance for ALL test methods in a
978   GerritTestCase sub-class.
979   """
980
981   TEST_USERNAME = 'test-username'
982   TEST_EMAIL = 'test-username@test.org'
983
984   # To help when debugging test code; setting this to 'False' (which happens if
985   # you provide the '-d' flag at the shell prompt) will leave the test gerrit
986   # instance running on localhost after the script exits.  It is the
987   # responsibility of the user to kill the gerrit process!
988   TEARDOWN = True
989
990   GerritInstance = collections.namedtuple('GerritInstance', [
991       'credential_file',
992       'gerrit_dir',
993       'gerrit_exe',
994       'gerrit_host',
995       'gerrit_pid',
996       'gerrit_url',
997       'git_dir',
998       'git_host',
999       'git_url',
1000       'http_port',
1001       'netrc_file',
1002       'ssh_ident',
1003       'ssh_port',
1004   ])
1005
1006   @classmethod
1007   def _create_gerrit_instance(cls, gerrit_dir):
1008     gerrit_init_script = os.path.join(
1009         osutils.FindDepotTools(), 'testing_support', 'gerrit-init.sh')
1010     http_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1011     http_sock.bind(('', 0))
1012     http_port = str(http_sock.getsockname()[1])
1013     ssh_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1014     ssh_sock.bind(('', 0))
1015     ssh_port = str(ssh_sock.getsockname()[1])
1016
1017     # NOTE: this is not completely safe.  These port numbers could be
1018     # re-assigned by the OS between the calls to socket.close() and gerrit
1019     # starting up.  The only safe way to do this would be to pass file
1020     # descriptors down to the gerrit process, which is not even remotely
1021     # supported.  Alas.
1022     http_sock.close()
1023     ssh_sock.close()
1024     cros_build_lib.RunCommand(
1025         ['bash', gerrit_init_script, '--http-port', http_port,
1026          '--ssh-port', ssh_port, gerrit_dir], quiet=True)
1027
1028     gerrit_exe = os.path.join(gerrit_dir, 'bin', 'gerrit.sh')
1029     cros_build_lib.RunCommand(['bash', '-x', gerrit_exe, 'start'], quiet=True)
1030     gerrit_pid = int(osutils.ReadFile(
1031         os.path.join(gerrit_dir, 'logs', 'gerrit.pid')).rstrip())
1032     return cls.GerritInstance(
1033         credential_file=os.path.join(gerrit_dir, 'tmp', '.git-credentials'),
1034         gerrit_dir=gerrit_dir,
1035         gerrit_exe=gerrit_exe,
1036         gerrit_host='localhost:%s' % http_port,
1037         gerrit_pid=gerrit_pid,
1038         gerrit_url='http://localhost:%s' % http_port,
1039         git_dir=os.path.join(gerrit_dir, 'git'),
1040         git_host='%s/git' % gerrit_dir,
1041         git_url='file://%s/git' % gerrit_dir,
1042         http_port=http_port,
1043         netrc_file=os.path.join(gerrit_dir, 'tmp', '.netrc'),
1044         ssh_ident=os.path.join(gerrit_dir, 'tmp', 'id_rsa'),
1045         ssh_port=ssh_port,)
1046
1047   @classmethod
1048   def setUpClass(cls):
1049     """Sets up the gerrit instances in a class-specific temp dir."""
1050     # Create gerrit instance.
1051     cls.gerritdir_obj = osutils.TempDir(set_global=False)
1052     gi = cls.gerrit_instance = cls._create_gerrit_instance(
1053         cls.gerritdir_obj.tempdir)
1054
1055     # Set netrc file for http authentication.
1056     cls.netrc_patcher = mock.patch('chromite.lib.gob_util.NETRC',
1057                                    netrc.netrc(gi.netrc_file))
1058     cls.netrc_patcher.start()
1059
1060     # gob_util only knows about https connections, and that's a good thing.
1061     # But for testing, it's much simpler to use http connections.
1062     cls.httplib_patcher = mock.patch(
1063         'httplib.HTTPSConnection', httplib.HTTPConnection)
1064     cls.httplib_patcher.start()
1065     cls.protocol_patcher = mock.patch(
1066         'chromite.lib.gob_util.GERRIT_PROTOCOL', 'http')
1067     cls.protocol_patcher.start()
1068
1069     # Some of the chromite code requires read access to refs/meta/config.
1070     # Typically, that access should require http authentication.  However,
1071     # because we use plain http to communicate with the test server, libcurl
1072     # (and by extension git commands that use it) will not add the
1073     # Authorization header to git transactions.  So, we just allow anonymous
1074     # read access to refs/meta/config for the test code.
1075     clone_path = os.path.join(gi.gerrit_dir, 'tmp', 'All-Projects')
1076     cls._CloneProject('All-Projects', clone_path)
1077     project_config = os.path.join(clone_path, 'project.config')
1078     cros_build_lib.RunCommand(
1079         ['git', 'config', '--file', project_config, '--add',
1080          'access.refs/meta/config.read', 'group Anonymous Users'], quiet=True)
1081     cros_build_lib.RunCommand(
1082         ['git', 'add', project_config], cwd=clone_path, quiet=True)
1083     cros_build_lib.RunCommand(
1084         ['git', 'commit', '-m', 'Anonyous read for refs/meta/config'],
1085         cwd=clone_path, quiet=True)
1086     cros_build_lib.RunCommand(
1087         ['git', 'push', 'origin', 'HEAD:refs/meta/config'], cwd=clone_path,
1088         quiet=True)
1089
1090     # Make all chromite code point to the test server.
1091     cls.constants_patcher = mock.patch.dict(constants.__dict__, {
1092         'EXTERNAL_GOB_HOST': gi.git_host,
1093         'EXTERNAL_GERRIT_HOST': gi.gerrit_host,
1094         'EXTERNAL_GOB_URL': gi.git_url,
1095         'EXTERNAL_GERRIT_URL': gi.gerrit_url,
1096         'INTERNAL_GOB_HOST': gi.git_host,
1097         'INTERNAL_GERRIT_HOST': gi.gerrit_host,
1098         'INTERNAL_GOB_URL': gi.git_url,
1099         'INTERNAL_GERRIT_URL': gi.gerrit_url,
1100         'MANIFEST_URL': '%s/%s' % (gi.git_url, constants.MANIFEST_PROJECT),
1101         'MANIFEST_INT_URL': '%s/%s' % (
1102             gi.git_url, constants.MANIFEST_INT_PROJECT),
1103         'GIT_REMOTES': {
1104             constants.EXTERNAL_REMOTE: gi.gerrit_url,
1105             constants.INTERNAL_REMOTE: gi.gerrit_url,
1106             constants.CHROMIUM_REMOTE: gi.gerrit_url,
1107             constants.CHROME_REMOTE: gi.gerrit_url,
1108         }
1109     })
1110     cls.constants_patcher.start()
1111
1112   @classmethod
1113   def createProject(cls, name, description='Test project', owners=None,
1114                     submit_type='CHERRY_PICK'):
1115     """Create a project on the test gerrit server."""
1116     if owners is None:
1117       owners = ['Administrators']
1118     body = {
1119         'description': description,
1120         'submit_type': submit_type,
1121         'owners': owners,
1122     }
1123     path = 'projects/%s' % urllib.quote(name, '')
1124     conn = gob_util.CreateHttpConn(
1125         cls.gerrit_instance.gerrit_host, path, reqtype='PUT', body=body)
1126     response = conn.getresponse()
1127     assert response.status == 201
1128     s = cStringIO.StringIO(response.read())
1129     assert s.readline().rstrip() == ")]}'"
1130     jmsg = json.load(s)
1131     assert jmsg['name'] == name
1132
1133   @classmethod
1134   def _CloneProject(cls, name, path):
1135     """Clone a project from the test gerrit server."""
1136     osutils.SafeMakedirs(os.path.dirname(path))
1137     url = 'http://%s/%s' % (cls.gerrit_instance.gerrit_host, name)
1138     cros_build_lib.RunCommand(['git', 'clone', url, path], quiet=True)
1139     # Install commit-msg hook.
1140     hook_path = os.path.join(path, '.git', 'hooks', 'commit-msg')
1141     cros_build_lib.RunCommand(
1142         ['curl', '-o', hook_path,
1143          'http://%s/tools/hooks/commit-msg' % cls.gerrit_instance.gerrit_host],
1144         quiet=True)
1145     os.chmod(hook_path, stat.S_IRWXU)
1146     # Set git identity to test account
1147     cros_build_lib.RunCommand(
1148         ['git', 'config', 'user.email', cls.TEST_EMAIL], cwd=path, quiet=True)
1149     # Configure non-interactive credentials for git operations.
1150     config_path = os.path.join(path, '.git', 'config')
1151     cros_build_lib.RunCommand(
1152         ['git', 'config', '--file', config_path, 'credential.helper',
1153          'store --file=%s' % cls.gerrit_instance.credential_file], quiet=True)
1154     return path
1155
1156   def cloneProject(self, name, path=None):
1157     """Clone a project from the test gerrit server."""
1158     if path is None:
1159       path = os.path.basename(name)
1160       if path.endswith('.git'):
1161         path = path[:-4]
1162     path = os.path.join(self.tempdir, path)
1163     return self._CloneProject(name, path)
1164
1165   @classmethod
1166   def _CreateCommit(cls, clone_path, fn=None, msg=None, text=None):
1167     """Create a commit in the given git checkout."""
1168     if not fn:
1169       fn = 'test-file.txt'
1170     if not msg:
1171       msg = 'Test Message'
1172     if not text:
1173       text = 'Another day, another dollar.'
1174     fpath = os.path.join(clone_path, fn)
1175     osutils.WriteFile(fpath, '%s\n' % text, mode='a')
1176     cros_build_lib.RunCommand(['git', 'add', fn], cwd=clone_path, quiet=True)
1177     cros_build_lib.RunCommand(
1178         ['git', 'commit', '-m', msg], cwd=clone_path, quiet=True)
1179     return cls._GetCommit(clone_path)
1180
1181   def createCommit(self, clone_path, fn=None, msg=None, text=None):
1182     """Create a commit in the given git checkout."""
1183     clone_path = os.path.join(self.tempdir, clone_path)
1184     return self._CreateCommit(clone_path, fn, msg, text)
1185
1186   @staticmethod
1187   def _GetCommit(clone_path, ref='HEAD'):
1188     log_proc = cros_build_lib.RunCommand(
1189         ['git', 'log', '-n', '1', ref], cwd=clone_path,
1190         print_cmd=False, capture_output=True)
1191     sha1 = None
1192     change_id = None
1193     for line in log_proc.output.splitlines():
1194       match = re.match(r'^commit ([0-9a-fA-F]{40})$', line)
1195       if match:
1196         sha1 = match.group(1)
1197         continue
1198       match = re.match('^\s+Change-Id:\s*(\S+)$', line)
1199       if match:
1200         change_id = match.group(1)
1201         continue
1202     return (sha1, change_id)
1203
1204   def getCommit(self, clone_path, ref='HEAD'):
1205     """Get the sha1 and change-id for the head commit in a git checkout."""
1206     clone_path = os.path.join(self.tempdir, clone_path)
1207     (sha1, change_id) = self._GetCommit(clone_path, ref)
1208     self.assertTrue(sha1)
1209     self.assertTrue(change_id)
1210     return (sha1, change_id)
1211
1212   @staticmethod
1213   def _UploadChange(clone_path, branch='master', remote='origin'):
1214     cros_build_lib.RunCommand(
1215         ['git', 'push', remote, 'HEAD:refs/for/%s' % branch], cwd=clone_path,
1216         quiet=True)
1217
1218   def uploadChange(self, clone_path, branch='master', remote='origin'):
1219     """Create a gerrit CL from the HEAD of a git checkout."""
1220     clone_path = os.path.join(self.tempdir, clone_path)
1221     self._UploadChange(clone_path, branch, remote)
1222
1223   @staticmethod
1224   def _PushBranch(clone_path, branch='master'):
1225     cros_build_lib.RunCommand(
1226         ['git', 'push', 'origin', 'HEAD:refs/heads/%s' % branch],
1227         cwd=clone_path, quiet=True)
1228
1229   def pushBranch(self, clone_path, branch='master'):
1230     """Push a branch directly to gerrit, bypassing code review."""
1231     clone_path = os.path.join(self.tempdir, clone_path)
1232     self._PushBranch(clone_path, branch)
1233
1234   @classmethod
1235   def createAccount(cls, name='Test User', email='test-user@test.org',
1236                     password=None, groups=None):
1237     """Create a new user account on gerrit."""
1238     username = urllib.quote(email.partition('@')[0])
1239     path = 'accounts/%s' % username
1240     body = {
1241         'name': name,
1242         'email': email,
1243     }
1244
1245     if password:
1246       body['http_password'] = password
1247     if groups:
1248       if isinstance(groups, basestring):
1249         groups = [groups]
1250       body['groups'] = groups
1251     conn = gob_util.CreateHttpConn(
1252         cls.gerrit_instance.gerrit_host, path, reqtype='PUT', body=body)
1253     response = conn.getresponse()
1254     assert response.status == 201
1255     s = cStringIO.StringIO(response.read())
1256     assert s.readline().rstrip() == ")]}'"
1257     jmsg = json.load(s)
1258     assert jmsg['email'] == email
1259
1260   @staticmethod
1261   def _stop_gerrit(gerrit_obj):
1262     """Stops the running gerrit instance and deletes it."""
1263     try:
1264       # This should terminate the gerrit process.
1265       cros_build_lib.RunCommand(['bash', gerrit_obj.gerrit_exe, 'stop'],
1266                                 quiet=True)
1267     finally:
1268       try:
1269         # cls.gerrit_pid should have already terminated.  If it did, then
1270         # os.waitpid will raise OSError.
1271         os.waitpid(gerrit_obj.gerrit_pid, os.WNOHANG)
1272       except OSError as e:
1273         if e.errno == errno.ECHILD:
1274           # If gerrit shut down cleanly, os.waitpid will land here.
1275           # pylint: disable=W0150
1276           return
1277
1278       # If we get here, the gerrit process is still alive.  Send the process
1279       # SIGKILL for good measure.
1280       try:
1281         os.kill(gerrit_obj.gerrit_pid, signal.SIGKILL)
1282       except OSError:
1283         if e.errno == errno.ESRCH:
1284           # os.kill raised an error because the process doesn't exist.  Maybe
1285           # gerrit shut down cleanly after all.
1286           # pylint: disable=W0150
1287           return
1288
1289       # Expose the fact that gerrit didn't shut down cleanly.
1290       cros_build_lib.Warning(
1291           'Test gerrit server (pid=%d) did not shut down cleanly.' % (
1292               gerrit_obj.gerrit_pid))
1293
1294   @classmethod
1295   def tearDownClass(cls):
1296     cls.httplib_patcher.stop()
1297     cls.protocol_patcher.stop()
1298     cls.constants_patcher.stop()
1299     if cls.TEARDOWN:
1300       cls._stop_gerrit(cls.gerrit_instance)
1301       cls.gerritdir_obj.Cleanup()
1302     else:
1303       # Prevent gerrit dir from getting cleaned up on interpreter exit.
1304       cls.gerritdir_obj.tempdir = None
1305
1306
1307 class GerritInternalTestCase(GerritTestCase):
1308   """Test class which runs separate internal and external gerrit instances."""
1309
1310   @classmethod
1311   def setUpClass(cls):
1312     GerritTestCase.setUpClass()
1313     cls.int_gerritdir_obj = osutils.TempDir(set_global=False)
1314     pgi = cls.gerrit_instance
1315     igi = cls.int_gerrit_instance = cls._create_gerrit_instance(
1316         cls.int_gerritdir_obj.tempdir)
1317     cls.int_constants_patcher = mock.patch.dict(constants.__dict__, {
1318         'INTERNAL_GOB_HOST': igi.git_host,
1319         'INTERNAL_GERRIT_HOST': igi.gerrit_host,
1320         'INTERNAL_GOB_URL': igi.git_url,
1321         'INTERNAL_GERRIT_URL': igi.gerrit_url,
1322         'MANIFEST_INT_URL': '%s/%s' % (
1323             igi.git_url, constants.MANIFEST_INT_PROJECT),
1324         'GIT_REMOTES': {
1325             constants.EXTERNAL_REMOTE: pgi.gerrit_url,
1326             constants.INTERNAL_REMOTE: igi.gerrit_url,
1327             constants.CHROMIUM_REMOTE: pgi.gerrit_url,
1328             constants.CHROME_REMOTE: igi.gerrit_url,
1329         }
1330     })
1331     cls.int_constants_patcher.start()
1332
1333   @classmethod
1334   def tearDownClass(cls):
1335     cls.int_constants_patcher.stop()
1336     if cls.TEARDOWN:
1337       cls._stop_gerrit(cls.int_gerrit_instance)
1338       cls.int_gerritdir_obj.Cleanup()
1339     else:
1340       # Prevent gerrit dir from getting cleaned up on interpreter exit.
1341       cls.int_gerritdir_obj.tempdir = None
1342     GerritTestCase.tearDownClass()
1343
1344
1345 class RepoTestCase(GerritTestCase):
1346   """Test class which runs in a repo checkout."""
1347
1348   MANIFEST_TEMPLATE = """<?xml version="1.0" encoding="UTF-8"?>
1349 <manifest>
1350   <remote name="%(EXTERNAL_REMOTE)s"
1351           fetch="%(PUBLIC_GOB_URL)s"
1352           review="%(PUBLIC_GERRIT_HOST)s" />
1353   <remote name="%(INTERNAL_REMOTE)s"
1354           fetch="%(INTERNAL_GOB_URL)s"
1355           review="%(INTERNAL_GERRIT_HOST)s" />
1356   <default revision="refs/heads/master" remote="%(EXTERNAL_REMOTE)s" sync-j="1" />
1357   <project remote="%(EXTERNAL_REMOTE)s" path="localpath/testproj1" name="remotepath/testproj1" />
1358   <project remote="%(EXTERNAL_REMOTE)s" path="localpath/testproj2" name="remotepath/testproj2" />
1359   <project remote="%(INTERNAL_REMOTE)s" path="localpath/testproj3" name="remotepath/testproj3" />
1360   <project remote="%(INTERNAL_REMOTE)s" path="localpath/testproj4" name="remotepath/testproj4" />
1361 </manifest>
1362 """
1363
1364   @classmethod
1365   def setUpClass(cls):
1366     GerritTestCase.setUpClass()
1367
1368     # Patch in repo url on the gerrit test server.
1369     external_repo_url = constants.REPO_URL
1370     cls.repo_patcher = mock.patch.dict(constants.__dict__, {
1371         'REPO_URL': '%s/%s' % (
1372             cls.gerrit_instance.git_url, constants.REPO_PROJECT),
1373     })
1374     cls.repo_patcher.start()
1375
1376     # Create local mirror of repo tool repository.
1377     mirror_path = '%s.git' % (
1378         os.path.join(cls.gerrit_instance.git_dir, constants.REPO_PROJECT))
1379     cros_build_lib.RunCommand(
1380       ['git', 'clone', '--mirror', external_repo_url, mirror_path],
1381       quiet=True)
1382
1383     # Check out the top-level repo script; it will be used for invocation.
1384     repo_clone_path = os.path.join(
1385         cls.gerrit_instance.gerrit_dir, 'tmp', 'repo')
1386     cros_build_lib.RunCommand(
1387         ['git', 'clone', '-n', constants.REPO_URL, repo_clone_path], quiet=True)
1388     cros_build_lib.RunCommand(
1389         ['git', 'checkout', 'origin/stable', 'repo'], cwd=repo_clone_path,
1390         quiet=True)
1391     osutils.RmDir(os.path.join(repo_clone_path, '.git'))
1392     cls.repo_exe = os.path.join(repo_clone_path, 'repo')
1393
1394     # Create manifest repository.
1395     clone_path = os.path.join(cls.gerrit_instance.gerrit_dir, 'tmp', 'manifest')
1396     osutils.SafeMakedirs(clone_path)
1397     cros_build_lib.RunCommand(['git', 'init'], cwd=clone_path, quiet=True)
1398     manifest_path = os.path.join(clone_path, 'default.xml')
1399     osutils.WriteFile(manifest_path, cls.MANIFEST_TEMPLATE % constants.__dict__)
1400     cros_build_lib.RunCommand(
1401         ['git', 'add', 'default.xml'], cwd=clone_path, quiet=True)
1402     cros_build_lib.RunCommand(
1403         ['git', 'commit', '-m', 'Test manifest.'], cwd=clone_path, quiet=True)
1404     cls.createProject(constants.MANIFEST_PROJECT)
1405     cros_build_lib.RunCommand(
1406         ['git', 'push', constants.MANIFEST_URL, 'HEAD:refs/heads/master'],
1407         quiet=True, cwd=clone_path)
1408     cls.createProject(constants.MANIFEST_INT_PROJECT)
1409     cros_build_lib.RunCommand(
1410         ['git', 'push', constants.MANIFEST_INT_URL, 'HEAD:refs/heads/master'],
1411          cwd=clone_path, quiet=True)
1412
1413     # Create project repositories.
1414     for i in xrange(1, 5):
1415       proj = 'testproj%d' % i
1416       cls.createProject('remotepath/%s' % proj)
1417       clone_path = os.path.join(cls.gerrit_instance.gerrit_dir, 'tmp', proj)
1418       cls._CloneProject('remotepath/%s' % proj, clone_path)
1419       cls._CreateCommit(clone_path)
1420       cls._PushBranch(clone_path, 'master')
1421
1422   @classmethod
1423   def runRepo(cls, *args, **kwargs):
1424     # Unfortunately, munging $HOME appears to be the only way to control the
1425     # netrc file used by repo.
1426     munged_home = os.path.join(cls.gerrit_instance.gerrit_dir, 'tmp')
1427     if 'env' not in kwargs:
1428       env = kwargs['env'] = os.environ.copy()
1429       env['HOME'] = munged_home
1430     else:
1431       env.setdefault('HOME', munged_home)
1432     args[0].insert(0, cls.repo_exe)
1433     kwargs.setdefault('quiet', True)
1434     return cros_build_lib.RunCommand(*args, **kwargs)
1435
1436   @classmethod
1437   def tearDownClass(cls):
1438     cls.repo_patcher.stop()
1439     GerritTestCase.tearDownClass()
1440
1441   def uploadChange(self, clone_path, branch='master', remote='origin'):
1442     review_host = cros_build_lib.RunCommand(
1443         ['git', 'config', 'remote.%s.review' % remote],
1444         print_cmd=False, cwd=clone_path, capture_output=True).output.strip()
1445     assert(review_host)
1446     projectname = cros_build_lib.RunCommand(
1447         ['git', 'config', 'remote.%s.projectname' % remote],
1448         print_cmd=False, cwd=clone_path, capture_output=True).output.strip()
1449     assert(projectname)
1450     GerritTestCase._UploadChange(
1451         clone_path, branch=branch, remote='%s://%s/%s' % (
1452             gob_util.GERRIT_PROTOCOL, review_host, projectname))
1453
1454   def setUp(self):
1455     cros_build_lib.RunCommand(
1456         [self.repo_exe, 'init', '-u', constants.MANIFEST_URL, '--repo-url',
1457          constants.REPO_URL, '--no-repo-verify'], cwd=self.tempdir, quiet=True)
1458     self.repo = repository.RepoRepository(constants.MANIFEST_URL, self.tempdir)
1459     self.repo.Sync()
1460     self.manifest = git.ManifestCheckout(self.tempdir)
1461     for i in xrange(1, 5):
1462       # Configure non-interactive credentials for git operations.
1463       proj = 'testproj%d' % i
1464       config_path = os.path.join(
1465           self.tempdir, 'localpath', proj, '.git', 'config')
1466       cros_build_lib.RunCommand(
1467           ['git', 'config', '--file', config_path, 'credential.helper',
1468            'store --file=%s' % self.gerrit_instance.credential_file],
1469           quiet=True)
1470       cros_build_lib.RunCommand(
1471           ['git', 'config', '--file', config_path, 'review.%s.upload' %
1472            self.gerrit_instance.gerrit_host, 'true'], quiet=True)
1473
1474     # Make all google storage URL's point to the local filesystem.
1475     self.gs_url_patcher = mock.patch(
1476         'lib.chromite.gs.BASE_GS_URL',
1477         'file://%s' % os.path.join(self.tempdir, '.gs'))
1478
1479
1480 class _RunCommandMock(mox.MockObject):
1481   """Custom mock class used to suppress arguments we don't care about"""
1482
1483   DEFAULT_IGNORED_ARGS = ('print_cmd',)
1484
1485   def __call__(self, *args, **kwargs):
1486     for arg in self.DEFAULT_IGNORED_ARGS:
1487       kwargs.setdefault(arg, mox.IgnoreArg())
1488     return mox.MockObject.__call__(self, *args, **kwargs)
1489
1490
1491 class _LessAnnoyingMox(mox.Mox):
1492   """Mox derivative that slips in our suppressions to mox.
1493
1494   This is used by default via MoxTestCase; namely, this suppresses
1495   certain arguments awareness that we don't care about via switching
1496   in (dependent on the namespace requested) overriding MockObject
1497   classing.
1498
1499   Via this, it makes maintenance much simpler- simplest example, if code
1500   doesn't explicitly assert that print_cmd must be true/false... then
1501   we don't care about what argument is set (it has no effect beyond output).
1502   Mox normally *would* care, making it a pita to maintain.  This selectively
1503   suppresses that awareness, making it maintainable.
1504   """
1505
1506   mock_classes = {}.fromkeys(
1507       ['chromite.lib.cros_build_lib.%s' % x
1508        for x in dir(cros_build_lib) if "RunCommand" in x],
1509        _RunCommandMock)
1510
1511   @staticmethod
1512   def _GetNamespace(obj):
1513     return '%s.%s' % (obj.__module__, obj.__name__)
1514
1515   def CreateMock(self, obj, attrs=None):
1516     if attrs is None:
1517       attrs = {}
1518     kls = self.mock_classes.get(
1519         self._GetNamespace(obj), mox.MockObject)
1520     # Copy attrs; I don't trust mox to not be stupid here.
1521     new_mock = kls(obj, attrs=attrs)
1522     self._mock_objects.append(new_mock)
1523     return new_mock
1524
1525
1526 class MoxTestCase(TestCase):
1527   """Mox based test case; compatible with StackedSetup
1528
1529   Note: mox is deprecated; please use MockTestCase instead.
1530   """
1531
1532   mox_suppress_verify_all = False
1533
1534   def setUp(self):
1535     self.mox = _LessAnnoyingMox()
1536     self.stubs = mox.stubout.StubOutForTesting()
1537
1538   def tearDown(self):
1539     try:
1540       if self.__test_was_run__ and not self.mox_suppress_verify_all:
1541         # This means the test code was actually ran.
1542         # force a verifyall
1543         self.mox.VerifyAll()
1544     finally:
1545       if hasattr(self, 'mox'):
1546         self.mox.UnsetStubs()
1547       if hasattr(self, 'stubs'):
1548         self.stubs.UnsetAll()
1549         self.stubs.SmartUnsetAll()
1550
1551
1552 class MoxTempDirTestCase(MoxTestCase, TempDirTestCase):
1553   """Convenience class mixing TempDir and Mox
1554
1555   Note: mox is deprecated; please use MockTempDirTestCase instead.
1556   """
1557
1558
1559 class MoxOutputTestCase(OutputTestCase, MoxTestCase):
1560   """Conevenience class mixing OutputTestCase and MoxTestCase
1561
1562   Note: mox is deprecated; please use MockOutputTestCase instead.
1563   """
1564
1565
1566 class MockTestCase(TestCase):
1567   """Python-mock based test case; compatible with StackedSetup"""
1568   def setUp(self):
1569     self._patchers = []
1570
1571   def tearDown(self):
1572     # We can't just run stopall() by itself, and need to stop our patchers
1573     # manually since stopall() doesn't handle repatching.
1574     cros_build_lib.SafeRun([p.stop for p in reversed(self._patchers)] +
1575                            [mock.patch.stopall])
1576
1577   def StartPatcher(self, patcher):
1578     """Call start() on the patcher, and stop() in tearDown."""
1579     m = patcher.start()
1580     self._patchers.append(patcher)
1581     return m
1582
1583   def PatchObject(self, *args, **kwargs):
1584     """Create and start a mock.patch.object().
1585
1586     stop() will be called automatically during tearDown.
1587     """
1588     return self.StartPatcher(mock.patch.object(*args, **kwargs))
1589
1590
1591 # MockTestCase must be before TempDirTestCase in this inheritance order,
1592 # because MockTestCase.StartPatcher() calls may be for PartialMocks, which
1593 # create their own temporary directory.  The teardown for those directories
1594 # occurs during MockTestCase.tearDown(), which needs to be run before
1595 # TempDirTestCase.tearDown().
1596 class MockTempDirTestCase(MockTestCase, TempDirTestCase):
1597   """Convenience class mixing TempDir and Mock."""
1598
1599
1600 class MockOutputTestCase(MockTestCase, OutputTestCase):
1601   """Convenience class mixing Output and Mock."""
1602
1603
1604 class MockLoggingTestCase(MockTestCase, LoggingTestCase):
1605   """Convenience class mixing Logging and Mock."""
1606
1607
1608 def FindTests(directory, module_namespace=''):
1609   """Find all *_unittest.py, and return their python namespaces.
1610
1611   Args:
1612     directory: The directory to scan for tests.
1613     module_namespace: What namespace to prefix all found tests with.
1614
1615   Returns:
1616     A list of python unittests in python namespace form.
1617   """
1618   results = cros_build_lib.RunCommand(
1619       ['find', '.', '-name', '*_unittest.py', '-printf', '%P\n'],
1620       cwd=directory, print_cmd=False, capture_output=True).output.splitlines()
1621   # Drop the trailing .py, inject in the name if one was given.
1622   if module_namespace:
1623     module_namespace += '.'
1624   return [module_namespace + x[:-3].replace('/', '.') for x in results]
1625
1626
1627 def main(**kwargs):
1628   """Helper wrapper around unittest.main.  Invoke this, not unittest.main.
1629
1630   Any passed in kwargs are passed directly down to unittest.main; via this, you
1631   can inject custom argv for example (to limit what tests run).
1632   """
1633   # Default to exit=True; this matches old behaviour, and allows unittest
1634   # to trigger sys.exit on its own.  Unfortunately, the exit keyword is only
1635   # available in 2.7- as such, handle it ourselves.
1636   allow_exit = kwargs.pop('exit', True)
1637   if '--network' in sys.argv:
1638     sys.argv.remove('--network')
1639     GlobalTestConfig.NETWORK_TESTS_DISABLED = False
1640   level = kwargs.pop('level', logging.CRITICAL)
1641   for flag in ('-d', '--debug'):
1642     if flag in sys.argv:
1643       sys.argv.remove(flag)
1644       level = logging.DEBUG
1645       GerritTestCase.TEARDOWN = False
1646   cros_build_lib.SetupBasicLogging(level)
1647   try:
1648     unittest.main(**kwargs)
1649     raise SystemExit(0)
1650   except SystemExit as e:
1651     if e.__class__ != SystemExit or allow_exit:
1652       raise
1653     # Redo the exit code ourselves- unittest throws True on occasion.
1654     # This is why the lack of typing for SystemExit code attribute makes life
1655     # suck, in parallel to unittest being special.
1656     # Finally, note that it's possible for code to be a string...
1657     if isinstance(e.code, (int, long)):
1658       # This is done since exit code may be something other than 1/0; if they
1659       # explicitly pass it, we'll honor it.
1660       return e.code
1661     return 1 if e.code else 0