Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / trial / test / test_reporter.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3 #
4 # Maintainer: Jonathan Lange
5
6 """
7 Tests for L{twisted.trial.reporter}.
8 """
9
10
11 import errno, sys, os, re, StringIO
12 from inspect import getmro
13
14 from twisted.internet.utils import suppressWarnings
15 from twisted.python import log
16 from twisted.python.failure import Failure
17 from twisted.trial import itrial, unittest, runner, reporter, util
18 from twisted.trial.reporter import UncleanWarningsReporterWrapper
19 from twisted.trial.test import erroneous
20 from twisted.trial.unittest import makeTodo, SkipTest, Todo
21 from twisted.trial.test import sample
22
23
24 class BrokenStream(object):
25     """
26     Stream-ish object that raises a signal interrupt error. We use this to make
27     sure that Trial still manages to write what it needs to write.
28     """
29     written = False
30     flushed = False
31
32     def __init__(self, fObj):
33         self.fObj = fObj
34
35     def write(self, s):
36         if self.written:
37             return self.fObj.write(s)
38         self.written = True
39         raise IOError(errno.EINTR, "Interrupted write")
40
41     def flush(self):
42         if self.flushed:
43             return self.fObj.flush()
44         self.flushed = True
45         raise IOError(errno.EINTR, "Interrupted flush")
46
47
48 class StringTest(unittest.TestCase):
49     def stringComparison(self, expect, output):
50         output = filter(None, output)
51         self.failUnless(len(expect) <= len(output),
52                         "Must have more observed than expected"
53                         "lines %d < %d" % (len(output), len(expect)))
54         REGEX_PATTERN_TYPE = type(re.compile(''))
55         for line_number, (exp, out) in enumerate(zip(expect, output)):
56             if exp is None:
57                 continue
58             elif isinstance(exp, str):
59                 self.assertSubstring(exp, out, "Line %d: %r not in %r"
60                                      % (line_number, exp, out))
61             elif isinstance(exp, REGEX_PATTERN_TYPE):
62                 self.failUnless(exp.match(out),
63                                 "Line %d: %r did not match string %r"
64                                 % (line_number, exp.pattern, out))
65             else:
66                 raise TypeError("don't know what to do with object %r"
67                                 % (exp,))
68
69
70 class TestTestResult(unittest.TestCase):
71     def setUp(self):
72         self.result = reporter.TestResult()
73
74     def test_pyunitAddError(self):
75         # pyunit passes an exc_info tuple directly to addError
76         try:
77             raise RuntimeError('foo')
78         except RuntimeError, excValue:
79             self.result.addError(self, sys.exc_info())
80         failure = self.result.errors[0][1]
81         self.assertEqual(excValue, failure.value)
82         self.assertEqual(RuntimeError, failure.type)
83
84     def test_pyunitAddFailure(self):
85         # pyunit passes an exc_info tuple directly to addFailure
86         try:
87             raise self.failureException('foo')
88         except self.failureException, excValue:
89             self.result.addFailure(self, sys.exc_info())
90         failure = self.result.failures[0][1]
91         self.assertEqual(excValue, failure.value)
92         self.assertEqual(self.failureException, failure.type)
93
94
95 class TestReporterRealtime(TestTestResult):
96     def setUp(self):
97         output = StringIO.StringIO()
98         self.result = reporter.Reporter(output, realtime=True)
99
100
101 class TestErrorReporting(StringTest):
102     doubleSeparator = re.compile(r'^=+$')
103
104     def setUp(self):
105         self.loader = runner.TestLoader()
106         self.output = StringIO.StringIO()
107         self.result = reporter.Reporter(self.output)
108
109     def getOutput(self, suite):
110         result = self.getResult(suite)
111         result.done()
112         return self.output.getvalue()
113
114     def getResult(self, suite):
115         suite.run(self.result)
116         return self.result
117
118     def test_formatErroredMethod(self):
119         """
120         A test method which runs and has an error recorded against it is
121         reported in the output stream with the I{ERROR} tag along with a summary
122         of what error was reported and the ID of the test.
123         """
124         suite = self.loader.loadClass(erroneous.TestFailureInSetUp)
125         output = self.getOutput(suite).splitlines()
126         match = [
127             self.doubleSeparator,
128             '[ERROR]',
129             'Traceback (most recent call last):',
130             re.compile(r'^\s+File .*erroneous\.py., line \d+, in setUp$'),
131             re.compile(r'^\s+raise FoolishError, '
132                        r'.I am a broken setUp method.$'),
133             ('twisted.trial.test.erroneous.FoolishError: '
134              'I am a broken setUp method'),
135             'twisted.trial.test.erroneous.TestFailureInSetUp.test_noop']
136         self.stringComparison(match, output)
137
138
139     def test_formatFailedMethod(self):
140         """
141         A test method which runs and has a failure recorded against it is
142         reported in the output stream with the I{FAIL} tag along with a summary
143         of what failure was reported and the ID of the test.
144         """
145         suite = self.loader.loadMethod(erroneous.TestRegularFail.test_fail)
146         output = self.getOutput(suite).splitlines()
147         match = [
148             self.doubleSeparator,
149             '[FAIL]',
150             'Traceback (most recent call last):',
151             re.compile(r'^\s+File .*erroneous\.py., line \d+, in test_fail$'),
152             re.compile(r'^\s+self\.fail\("I fail"\)$'),
153             'twisted.trial.unittest.FailTest: I fail',
154             'twisted.trial.test.erroneous.TestRegularFail.test_fail',
155             ]
156         self.stringComparison(match, output)
157
158
159     def test_doctestError(self):
160         """
161         A problem encountered while running a doctest is reported in the output
162         stream with a I{FAIL} or I{ERROR} tag along with a summary of what
163         problem was encountered and the ID of the test.
164         """
165         from twisted.trial.test import erroneous
166         suite = unittest.decorate(
167             self.loader.loadDoctests(erroneous), itrial.ITestCase)
168         output = self.getOutput(suite)
169         path = 'twisted.trial.test.erroneous.unexpectedException'
170         for substring in ['1/0', 'ZeroDivisionError',
171                           'Exception raised:', path]:
172             self.assertSubstring(substring, output)
173         self.failUnless(re.search('Fail(ed|ure in) example:', output),
174                         "Couldn't match 'Failure in example: ' "
175                         "or 'Failed example: '")
176         expect = [self.doubleSeparator,
177                   re.compile(r'\[(ERROR|FAIL)\]')]
178         self.stringComparison(expect, output.splitlines())
179
180
181     def test_hiddenException(self):
182         """
183         Check that errors in C{DelayedCall}s get reported, even if the
184         test already has a failure.
185
186         Only really necessary for testing the deprecated style of tests that
187         use iterate() directly. See
188         L{erroneous.DelayedCall.testHiddenException} for more details.
189         """
190         test = erroneous.DelayedCall('testHiddenException')
191         output = self.getOutput(test).splitlines()
192         match = [
193             self.doubleSeparator,
194             '[FAIL]',
195             'Traceback (most recent call last):',
196             re.compile(r'^\s+File .*erroneous\.py., line \d+, in '
197                        'testHiddenException$'),
198             re.compile(r'^\s+self\.fail\("Deliberate failure to mask the '
199                        'hidden exception"\)$'),
200             'twisted.trial.unittest.FailTest: '
201             'Deliberate failure to mask the hidden exception',
202             'twisted.trial.test.erroneous.DelayedCall.testHiddenException',
203             self.doubleSeparator,
204             '[ERROR]',
205             'Traceback (most recent call last):',
206             re.compile(r'^\s+File .* in runUntilCurrent'),
207             re.compile(r'^\s+.*'),
208             re.compile('^\s+File .*erroneous\.py", line \d+, in go'),
209             re.compile('^\s+raise RuntimeError\(self.hiddenExceptionMsg\)'),
210             'exceptions.RuntimeError: something blew up',
211             'twisted.trial.test.erroneous.DelayedCall.testHiddenException',
212             ]
213         self.stringComparison(match, output)
214
215
216
217 class TestUncleanWarningWrapperErrorReporting(TestErrorReporting):
218     """
219     Tests that the L{UncleanWarningsReporterWrapper} can sufficiently proxy
220     IReporter failure and error reporting methods to a L{reporter.Reporter}.
221     """
222     def setUp(self):
223         self.loader = runner.TestLoader()
224         self.output = StringIO.StringIO()
225         self.result = UncleanWarningsReporterWrapper(
226             reporter.Reporter(self.output))
227
228
229
230 class TracebackHandling(unittest.TestCase):
231     def getErrorFrames(self, test):
232         stream = StringIO.StringIO()
233         result = reporter.Reporter(stream)
234         test.run(result)
235         bads = result.failures + result.errors
236         assert len(bads) == 1
237         assert bads[0][0] == test
238         return result._trimFrames(bads[0][1].frames)
239
240     def checkFrames(self, observedFrames, expectedFrames):
241         for observed, expected in zip(observedFrames, expectedFrames):
242             self.assertEqual(observed[0], expected[0])
243             observedSegs = os.path.splitext(observed[1])[0].split(os.sep)
244             expectedSegs = expected[1].split('/')
245             self.assertEqual(observedSegs[-len(expectedSegs):],
246                              expectedSegs)
247         self.assertEqual(len(observedFrames), len(expectedFrames))
248
249     def test_basic(self):
250         test = erroneous.TestRegularFail('test_fail')
251         frames = self.getErrorFrames(test)
252         self.checkFrames(frames,
253                          [('test_fail', 'twisted/trial/test/erroneous')])
254
255     def test_subroutine(self):
256         test = erroneous.TestRegularFail('test_subfail')
257         frames = self.getErrorFrames(test)
258         self.checkFrames(frames,
259                          [('test_subfail', 'twisted/trial/test/erroneous'),
260                           ('subroutine', 'twisted/trial/test/erroneous')])
261
262     def test_deferred(self):
263         test = erroneous.TestFailureInDeferredChain('test_fail')
264         frames = self.getErrorFrames(test)
265         self.checkFrames(frames,
266                          [('_later', 'twisted/trial/test/erroneous')])
267
268     def test_noFrames(self):
269         result = reporter.Reporter(None)
270         self.assertEqual([], result._trimFrames([]))
271
272     def test_oneFrame(self):
273         result = reporter.Reporter(None)
274         self.assertEqual(['fake frame'], result._trimFrames(['fake frame']))
275
276
277 class FormatFailures(StringTest):
278     def setUp(self):
279         try:
280             raise RuntimeError('foo')
281         except RuntimeError:
282             self.f = Failure()
283         self.f.frames = [
284             ['foo', 'foo/bar.py', 5, [('x', 5)], [('y', 'orange')]],
285             ['qux', 'foo/bar.py', 10, [('a', 'two')], [('b', 'MCMXCIX')]]
286             ]
287         self.stream = StringIO.StringIO()
288         self.result = reporter.Reporter(self.stream)
289
290     def test_formatDefault(self):
291         tb = self.result._formatFailureTraceback(self.f)
292         self.stringComparison([
293             'Traceback (most recent call last):',
294             '  File "foo/bar.py", line 5, in foo',
295             re.compile(r'^\s*$'),
296             '  File "foo/bar.py", line 10, in qux',
297             re.compile(r'^\s*$'),
298             'RuntimeError: foo'], tb.splitlines())
299
300     def test_formatString(self):
301         tb = '''
302   File "twisted/trial/unittest.py", line 256, in failUnlessSubstring
303     return self.failUnlessIn(substring, astring, msg)
304 exceptions.TypeError: iterable argument required
305
306 '''
307         expected = '''
308   File "twisted/trial/unittest.py", line 256, in failUnlessSubstring
309     return self.failUnlessIn(substring, astring, msg)
310 exceptions.TypeError: iterable argument required
311 '''
312         formatted = self.result._formatFailureTraceback(tb)
313         self.assertEqual(expected, formatted)
314
315     def test_mutation(self):
316         frames = self.f.frames[:]
317         # The call shouldn't mutate the frames.
318         self.result._formatFailureTraceback(self.f)
319         self.assertEqual(self.f.frames, frames)
320
321
322 class PyunitTestNames(unittest.TestCase):
323     def setUp(self):
324         self.stream = StringIO.StringIO()
325         self.test = sample.PyunitTest('test_foo')
326
327     def test_verboseReporter(self):
328         result = reporter.VerboseTextReporter(self.stream)
329         result.startTest(self.test)
330         output = self.stream.getvalue()
331         self.assertEqual(
332             output, 'twisted.trial.test.sample.PyunitTest.test_foo ... ')
333
334     def test_treeReporter(self):
335         result = reporter.TreeReporter(self.stream)
336         result.startTest(self.test)
337         output = self.stream.getvalue()
338         output = output.splitlines()[-1].strip()
339         self.assertEqual(output, result.getDescription(self.test) + ' ...')
340
341     def test_getDescription(self):
342         result = reporter.TreeReporter(self.stream)
343         output = result.getDescription(self.test)
344         self.assertEqual(output, 'test_foo')
345
346
347     def test_minimalReporter(self):
348         """
349         The summary of L{reporter.MinimalReporter} is a simple list of numbers,
350         indicating how many tests ran, how many failed etc.
351
352         The numbers represents:
353          * the run time of the tests
354          * the number of tests run, printed 2 times for legacy reasons
355          * the number of errors
356          * the number of failures
357          * the number of skips
358         """
359         result = reporter.MinimalReporter(self.stream)
360         self.test.run(result)
361         result._printSummary()
362         output = self.stream.getvalue().strip().split(' ')
363         self.assertEqual(output[1:], ['1', '1', '0', '0', '0'])
364
365
366     def test_minimalReporterTime(self):
367         """
368         L{reporter.MinimalReporter} reports the time to run the tests as first
369         data in its output.
370         """
371         times = [1.0, 1.2, 1.5, 1.9]
372         result = reporter.MinimalReporter(self.stream)
373         result._getTime = lambda: times.pop(0)
374         self.test.run(result)
375         result._printSummary()
376         output = self.stream.getvalue().strip().split(' ')
377         timer = output[0]
378         self.assertEqual(timer, "0.7")
379
380
381     def test_emptyMinimalReporter(self):
382         """
383         The summary of L{reporter.MinimalReporter} is a list of zeroes when no
384         test is actually run.
385         """
386         result = reporter.MinimalReporter(self.stream)
387         result._printSummary()
388         output = self.stream.getvalue().strip().split(' ')
389         self.assertEqual(output, ['0', '0', '0', '0', '0', '0'])
390
391
392
393 class TestDirtyReactor(unittest.TestCase):
394     """
395     The trial script has an option to treat L{DirtyReactorAggregateError}s as
396     warnings, as a migration tool for test authors. It causes a wrapper to be
397     placed around reporters that replaces L{DirtyReactorAggregatErrors} with
398     warnings.
399     """
400
401     def setUp(self):
402         self.dirtyError = Failure(
403             util.DirtyReactorAggregateError(['foo'], ['bar']))
404         self.output = StringIO.StringIO()
405         self.test = TestDirtyReactor('test_errorByDefault')
406
407
408     def test_errorByDefault(self):
409         """
410         L{DirtyReactorAggregateError}s are reported as errors with the default
411         Reporter.
412         """
413         result = reporter.Reporter(stream=self.output)
414         result.addError(self.test, self.dirtyError)
415         self.assertEqual(len(result.errors), 1)
416         self.assertEqual(result.errors[0][1], self.dirtyError)
417
418
419     def test_warningsEnabled(self):
420         """
421         L{DirtyReactorAggregateError}s are reported as warnings when using
422         the L{UncleanWarningsReporterWrapper}.
423         """
424         result = UncleanWarningsReporterWrapper(
425             reporter.Reporter(stream=self.output))
426         self.assertWarns(UserWarning, self.dirtyError.getErrorMessage(),
427                          reporter.__file__,
428                          result.addError, self.test, self.dirtyError)
429
430
431     def test_warningsMaskErrors(self):
432         """
433         L{DirtyReactorAggregateError}s are I{not} reported as errors if the
434         L{UncleanWarningsReporterWrapper} is used.
435         """
436         result = UncleanWarningsReporterWrapper(
437             reporter.Reporter(stream=self.output))
438         self.assertWarns(UserWarning, self.dirtyError.getErrorMessage(),
439                          reporter.__file__,
440                          result.addError, self.test, self.dirtyError)
441         self.assertEqual(result._originalReporter.errors, [])
442
443
444     def test_dealsWithThreeTuples(self):
445         """
446         Some annoying stuff can pass three-tuples to addError instead of
447         Failures (like PyUnit). The wrapper, of course, handles this case,
448         since it is a part of L{twisted.trial.itrial.IReporter}! But it does
449         not convert L{DirtyReactorAggregateError} to warnings in this case,
450         because nobody should be passing those in the form of three-tuples.
451         """
452         result = UncleanWarningsReporterWrapper(
453             reporter.Reporter(stream=self.output))
454         result.addError(self.test,
455                         (self.dirtyError.type, self.dirtyError.value, None))
456         self.assertEqual(len(result._originalReporter.errors), 1)
457         self.assertEqual(result._originalReporter.errors[0][1].type,
458                           self.dirtyError.type)
459         self.assertEqual(result._originalReporter.errors[0][1].value,
460                           self.dirtyError.value)
461
462
463
464 class TrialTestNames(unittest.TestCase):
465
466     def setUp(self):
467         self.stream = StringIO.StringIO()
468         self.test = sample.FooTest('test_foo')
469
470     def test_verboseReporter(self):
471         result = reporter.VerboseTextReporter(self.stream)
472         result.startTest(self.test)
473         output = self.stream.getvalue()
474         self.assertEqual(output, self.test.id() + ' ... ')
475
476     def test_treeReporter(self):
477         result = reporter.TreeReporter(self.stream)
478         result.startTest(self.test)
479         output = self.stream.getvalue()
480         output = output.splitlines()[-1].strip()
481         self.assertEqual(output, result.getDescription(self.test) + ' ...')
482
483     def test_treeReporterWithDocstrings(self):
484         """A docstring"""
485         result = reporter.TreeReporter(self.stream)
486         self.assertEqual(result.getDescription(self),
487                          'test_treeReporterWithDocstrings')
488
489     def test_getDescription(self):
490         result = reporter.TreeReporter(self.stream)
491         output = result.getDescription(self.test)
492         self.assertEqual(output, "test_foo")
493
494
495 class TestSkip(unittest.TestCase):
496     """
497     Tests for L{reporter.Reporter}'s handling of skips.
498     """
499     def setUp(self):
500         self.stream = StringIO.StringIO()
501         self.result = reporter.Reporter(self.stream)
502         self.test = sample.FooTest('test_foo')
503
504     def _getSkips(self, result):
505         """
506         Get the number of skips that happened to a reporter.
507         """
508         return len(result.skips)
509
510     def test_accumulation(self):
511         self.result.addSkip(self.test, 'some reason')
512         self.assertEqual(self._getSkips(self.result), 1)
513
514     def test_success(self):
515         self.result.addSkip(self.test, 'some reason')
516         self.assertEqual(True, self.result.wasSuccessful())
517
518
519     def test_summary(self):
520         """
521         The summary of a successful run with skips indicates that the test
522         suite passed and includes the number of skips.
523         """
524         self.result.addSkip(self.test, 'some reason')
525         self.result.done()
526         output = self.stream.getvalue().splitlines()[-1]
527         prefix = 'PASSED '
528         self.failUnless(output.startswith(prefix))
529         self.assertEqual(output[len(prefix):].strip(), '(skips=1)')
530
531
532     def test_basicErrors(self):
533         """
534         The output at the end of a test run with skips includes the reasons
535         for skipping those tests.
536         """
537         self.result.addSkip(self.test, 'some reason')
538         self.result.done()
539         output = self.stream.getvalue().splitlines()[3]
540         self.assertEqual(output.strip(), 'some reason')
541
542
543     def test_booleanSkip(self):
544         """
545         Tests can be skipped without specifying a reason by setting the 'skip'
546         attribute to True. When this happens, the test output includes 'True'
547         as the reason.
548         """
549         self.result.addSkip(self.test, True)
550         self.result.done()
551         output = self.stream.getvalue().splitlines()[3]
552         self.assertEqual(output, 'True')
553
554
555     def test_exceptionSkip(self):
556         """
557         Skips can be raised as errors. When this happens, the error is
558         included in the summary at the end of the test suite.
559         """
560         try:
561             1/0
562         except Exception, e:
563             error = e
564         self.result.addSkip(self.test, error)
565         self.result.done()
566         output = '\n'.join(self.stream.getvalue().splitlines()[3:5]).strip()
567         self.assertEqual(output, str(e))
568
569
570 class UncleanWarningSkipTest(TestSkip):
571     """
572     Tests for skips on a L{reporter.Reporter} wrapped by an
573     L{UncleanWarningsReporterWrapper}.
574     """
575     def setUp(self):
576         TestSkip.setUp(self)
577         self.result = UncleanWarningsReporterWrapper(self.result)
578
579     def _getSkips(self, result):
580         """
581         Get the number of skips that happened to a reporter inside of an
582         unclean warnings reporter wrapper.
583         """
584         return len(result._originalReporter.skips)
585
586
587
588 class TodoTest(unittest.TestCase):
589     """
590     Tests for L{reporter.Reporter}'s handling of todos.
591     """
592
593     def setUp(self):
594         self.stream = StringIO.StringIO()
595         self.result = reporter.Reporter(self.stream)
596         self.test = sample.FooTest('test_foo')
597
598
599     def _getTodos(self, result):
600         """
601         Get the number of todos that happened to a reporter.
602         """
603         return len(result.expectedFailures)
604
605
606     def _getUnexpectedSuccesses(self, result):
607         """
608         Get the number of unexpected successes that happened to a reporter.
609         """
610         return len(result.unexpectedSuccesses)
611
612
613     def test_accumulation(self):
614         """
615         L{reporter.Reporter} accumulates the expected failures that it
616         is notified of.
617         """
618         self.result.addExpectedFailure(self.test, Failure(Exception()),
619                                        makeTodo('todo!'))
620         self.assertEqual(self._getTodos(self.result), 1)
621
622
623     def test_success(self):
624         """
625         A test run is still successful even if there are expected failures.
626         """
627         self.result.addExpectedFailure(self.test, Failure(Exception()),
628                                        makeTodo('todo!'))
629         self.assertEqual(True, self.result.wasSuccessful())
630
631
632     def test_unexpectedSuccess(self):
633         """
634         A test which is marked as todo but succeeds will have an unexpected
635         success reported to its result. A test run is still successful even
636         when this happens.
637         """
638         self.result.addUnexpectedSuccess(self.test, makeTodo("Heya!"))
639         self.assertEqual(True, self.result.wasSuccessful())
640         self.assertEqual(self._getUnexpectedSuccesses(self.result), 1)
641
642
643     def test_summary(self):
644         """
645         The reporter's C{printSummary} method should print the number of
646         expected failures that occured.
647         """
648         self.result.addExpectedFailure(self.test, Failure(Exception()),
649                                        makeTodo('some reason'))
650         self.result.done()
651         output = self.stream.getvalue().splitlines()[-1]
652         prefix = 'PASSED '
653         self.failUnless(output.startswith(prefix))
654         self.assertEqual(output[len(prefix):].strip(),
655                          '(expectedFailures=1)')
656
657
658     def test_basicErrors(self):
659         """
660         The reporter's L{printErrors} method should include the value of the
661         Todo.
662         """
663         self.result.addExpectedFailure(self.test, Failure(Exception()),
664                                        makeTodo('some reason'))
665         self.result.done()
666         output = self.stream.getvalue().splitlines()[3].strip()
667         self.assertEqual(output, "Reason: 'some reason'")
668
669
670     def test_booleanTodo(self):
671         """
672         Booleans CAN'T be used as the value of a todo. Maybe this sucks. This
673         is a test for current behavior, not a requirement.
674         """
675         self.result.addExpectedFailure(self.test, Failure(Exception()),
676                                        makeTodo(True))
677         self.assertRaises(Exception, self.result.done)
678
679
680     def test_exceptionTodo(self):
681         """
682         The exception for expected failures should be shown in the
683         C{printErrors} output.
684         """
685         try:
686             1/0
687         except Exception, e:
688             error = e
689         self.result.addExpectedFailure(self.test, Failure(error),
690                                        makeTodo("todo!"))
691         self.result.done()
692         output = '\n'.join(self.stream.getvalue().splitlines()[3:]).strip()
693         self.assertTrue(str(e) in output)
694
695
696
697 class UncleanWarningTodoTest(TodoTest):
698     """
699     Tests for L{UncleanWarningsReporterWrapper}'s handling of todos.
700     """
701
702     def setUp(self):
703         TodoTest.setUp(self)
704         self.result = UncleanWarningsReporterWrapper(self.result)
705
706
707     def _getTodos(self, result):
708         """
709         Get the number of todos that happened to a reporter inside of an
710         unclean warnings reporter wrapper.
711         """
712         return len(result._originalReporter.expectedFailures)
713
714
715     def _getUnexpectedSuccesses(self, result):
716         """
717         Get the number of unexpected successes that happened to a reporter
718         inside of an unclean warnings reporter wrapper.
719         """
720         return len(result._originalReporter.unexpectedSuccesses)
721
722
723
724 class MockColorizer:
725     """
726     Used by TestTreeReporter to make sure that output is colored correctly.
727     """
728
729     def __init__(self, stream):
730         self.log = []
731
732
733     def write(self, text, color):
734         self.log.append((color, text))
735
736
737
738 class TestTreeReporter(unittest.TestCase):
739     def setUp(self):
740         self.test = sample.FooTest('test_foo')
741         self.stream = StringIO.StringIO()
742         self.result = reporter.TreeReporter(self.stream)
743         self.result._colorizer = MockColorizer(self.stream)
744         self.log = self.result._colorizer.log
745
746     def makeError(self):
747         try:
748             1/0
749         except ZeroDivisionError:
750             f = Failure()
751         return f
752
753     def test_cleanupError(self):
754         """
755         Run cleanupErrors and check that the output is correct, and colored
756         correctly.
757         """
758         f = self.makeError()
759         self.result.cleanupErrors(f)
760         color, text = self.log[0]
761         self.assertEqual(color.strip(), self.result.ERROR)
762         self.assertEqual(text.strip(), 'cleanup errors')
763         color, text = self.log[1]
764         self.assertEqual(color.strip(), self.result.ERROR)
765         self.assertEqual(text.strip(), '[ERROR]')
766     test_cleanupError = suppressWarnings(
767         test_cleanupError,
768         util.suppress(category=reporter.BrokenTestCaseWarning),
769         util.suppress(category=DeprecationWarning))
770
771
772     def test_upDownError(self):
773         """
774         Run upDownError and check that the output is correct and colored
775         correctly.
776         """
777         self.result.upDownError("method", None, None, False)
778         color, text = self.log[0]
779         self.assertEqual(color.strip(), self.result.ERROR)
780         self.assertEqual(text.strip(), 'method')
781     test_upDownError = suppressWarnings(
782         test_upDownError,
783         util.suppress(category=DeprecationWarning,
784                       message="upDownError is deprecated in Twisted 8.0."))
785
786
787     def test_summaryColoredSuccess(self):
788         """
789         The summary in case of success should have a good count of successes
790         and be colored properly.
791         """
792         self.result.addSuccess(self.test)
793         self.result.done()
794         self.assertEqual(self.log[1], (self.result.SUCCESS, 'PASSED'))
795         self.assertEqual(
796             self.stream.getvalue().splitlines()[-1].strip(), "(successes=1)")
797
798
799     def test_summaryColoredFailure(self):
800         """
801         The summary in case of failure should have a good count of errors
802         and be colored properly.
803         """
804         try:
805             raise RuntimeError('foo')
806         except RuntimeError:
807             self.result.addError(self, sys.exc_info())
808         self.result.done()
809         self.assertEqual(self.log[1], (self.result.FAILURE, 'FAILED'))
810         self.assertEqual(
811             self.stream.getvalue().splitlines()[-1].strip(), "(errors=1)")
812
813
814     def test_getPrelude(self):
815         """
816         The tree needs to get the segments of the test ID that correspond
817         to the module and class that it belongs to.
818         """
819         self.assertEqual(
820             ['foo.bar', 'baz'],
821             self.result._getPreludeSegments('foo.bar.baz.qux'))
822         self.assertEqual(
823             ['foo', 'bar'],
824             self.result._getPreludeSegments('foo.bar.baz'))
825         self.assertEqual(
826             ['foo'],
827             self.result._getPreludeSegments('foo.bar'))
828         self.assertEqual([], self.result._getPreludeSegments('foo'))
829
830
831     def test_groupResults(self):
832         """
833         If two different tests have the same error, L{Reporter._groupResults}
834         includes them together in one of the tuples in the list it returns.
835         """
836         try:
837             raise RuntimeError('foo')
838         except RuntimeError:
839             self.result.addError(self, sys.exc_info())
840             self.result.addError(self.test, sys.exc_info())
841         try:
842             raise RuntimeError('bar')
843         except RuntimeError:
844             extra = sample.FooTest('test_bar')
845             self.result.addError(extra, sys.exc_info())
846         self.result.done()
847         grouped = self.result._groupResults(
848             self.result.errors, self.result._formatFailureTraceback)
849         self.assertEqual(grouped[0][1], [self, self.test])
850         self.assertEqual(grouped[1][1], [extra])
851
852
853     def test_printResults(self):
854         """
855         L{Reporter._printResults} uses the results list and formatter callable
856         passed to it to produce groups of results to write to its output stream.
857         """
858         def formatter(n):
859             return str(n) + '\n'
860         first = sample.FooTest('test_foo')
861         second = sample.FooTest('test_bar')
862         third = sample.PyunitTest('test_foo')
863         self.result._printResults(
864             'FOO', [(first, 1), (second, 1), (third, 2)], formatter)
865         self.assertEqual(
866             self.stream.getvalue(),
867             "%(double separator)s\n"
868             "FOO\n"
869             "1\n"
870             "\n"
871             "%(first)s\n"
872             "%(second)s\n"
873             "%(double separator)s\n"
874             "FOO\n"
875             "2\n"
876             "\n"
877             "%(third)s\n" % {
878                 'double separator': self.result._doubleSeparator,
879                 'first': first.id(),
880                 'second': second.id(),
881                 'third': third.id(),
882                 })
883
884
885
886 class TestReporterInterface(unittest.TestCase):
887     """
888     Tests for the bare interface of a trial reporter.
889
890     Subclass this test case and provide a different 'resultFactory' to test
891     that a particular reporter implementation will work with the rest of
892     Trial.
893
894     @cvar resultFactory: A callable that returns a reporter to be tested. The
895         callable must take the same parameters as L{reporter.Reporter}.
896     """
897
898     resultFactory = reporter.Reporter
899
900     def setUp(self):
901         self.test = sample.FooTest('test_foo')
902         self.stream = StringIO.StringIO()
903         self.publisher = log.LogPublisher()
904         self.result = self.resultFactory(self.stream, publisher=self.publisher)
905
906
907     def test_shouldStopInitiallyFalse(self):
908         """
909         shouldStop is False to begin with.
910         """
911         self.assertEqual(False, self.result.shouldStop)
912
913
914     def test_shouldStopTrueAfterStop(self):
915         """
916         shouldStop becomes True soon as someone calls stop().
917         """
918         self.result.stop()
919         self.assertEqual(True, self.result.shouldStop)
920
921
922     def test_wasSuccessfulInitiallyTrue(self):
923         """
924         wasSuccessful() is True when there have been no results reported.
925         """
926         self.assertEqual(True, self.result.wasSuccessful())
927
928
929     def test_wasSuccessfulTrueAfterSuccesses(self):
930         """
931         wasSuccessful() is True when there have been only successes, False
932         otherwise.
933         """
934         self.result.addSuccess(self.test)
935         self.assertEqual(True, self.result.wasSuccessful())
936
937
938     def test_wasSuccessfulFalseAfterErrors(self):
939         """
940         wasSuccessful() becomes False after errors have been reported.
941         """
942         try:
943             1 / 0
944         except ZeroDivisionError:
945             self.result.addError(self.test, sys.exc_info())
946         self.assertEqual(False, self.result.wasSuccessful())
947
948
949     def test_wasSuccessfulFalseAfterFailures(self):
950         """
951         wasSuccessful() becomes False after failures have been reported.
952         """
953         try:
954             self.fail("foo")
955         except self.failureException:
956             self.result.addFailure(self.test, sys.exc_info())
957         self.assertEqual(False, self.result.wasSuccessful())
958
959
960
961 class TestReporter(TestReporterInterface):
962     """
963     Tests for the base L{reporter.Reporter} class.
964     """
965
966     def setUp(self):
967         TestReporterInterface.setUp(self)
968         self._timer = 0
969         self.result._getTime = self._getTime
970
971
972     def _getTime(self):
973         self._timer += 1
974         return self._timer
975
976
977     def test_startStop(self):
978         self.result.startTest(self.test)
979         self.result.stopTest(self.test)
980         self.assertTrue(self.result._lastTime > 0)
981         self.assertEqual(self.result.testsRun, 1)
982         self.assertEqual(self.result.wasSuccessful(), True)
983
984
985     def test_brokenStream(self):
986         """
987         Test that the reporter safely writes to its stream.
988         """
989         result = self.resultFactory(stream=BrokenStream(self.stream))
990         result._writeln("Hello")
991         self.assertEqual(self.stream.getvalue(), 'Hello\n')
992         self.stream.truncate(0)
993         result._writeln("Hello %s!", 'World')
994         self.assertEqual(self.stream.getvalue(), 'Hello World!\n')
995
996
997     def test_printErrorsDeprecated(self):
998         """
999         L{IReporter.printErrors} was deprecated in Twisted 8.0.
1000         """
1001         def f():
1002             self.result.printErrors()
1003         self.assertWarns(
1004             DeprecationWarning, "printErrors is deprecated in Twisted 8.0.",
1005             __file__, f)
1006
1007
1008     def test_printSummaryDeprecated(self):
1009         """
1010         L{IReporter.printSummary} was deprecated in Twisted 8.0.
1011         """
1012         def f():
1013             self.result.printSummary()
1014         self.assertWarns(
1015             DeprecationWarning, "printSummary is deprecated in Twisted 8.0.",
1016             __file__, f)
1017
1018
1019     def test_writeDeprecated(self):
1020         """
1021         L{IReporter.write} was deprecated in Twisted 8.0.
1022         """
1023         def f():
1024             self.result.write("")
1025         self.assertWarns(
1026             DeprecationWarning, "write is deprecated in Twisted 8.0.",
1027             __file__, f)
1028
1029
1030     def test_writelnDeprecated(self):
1031         """
1032         L{IReporter.writeln} was deprecated in Twisted 8.0.
1033         """
1034         def f():
1035             self.result.writeln("")
1036         self.assertWarns(
1037             DeprecationWarning, "writeln is deprecated in Twisted 8.0.",
1038             __file__, f)
1039
1040
1041     def test_separatorDeprecated(self):
1042         """
1043         L{IReporter.separator} was deprecated in Twisted 8.0.
1044         """
1045         def f():
1046             return self.result.separator
1047         self.assertWarns(
1048             DeprecationWarning, "separator is deprecated in Twisted 8.0.",
1049             __file__, f)
1050
1051
1052     def test_streamDeprecated(self):
1053         """
1054         L{IReporter.stream} was deprecated in Twisted 8.0.
1055         """
1056         def f():
1057             return self.result.stream
1058         self.assertWarns(
1059             DeprecationWarning, "stream is deprecated in Twisted 8.0.",
1060             __file__, f)
1061
1062
1063     def test_upDownErrorDeprecated(self):
1064         """
1065         L{IReporter.upDownError} was deprecated in Twisted 8.0.
1066         """
1067         def f():
1068             self.result.upDownError(None, None, None, None)
1069         self.assertWarns(
1070             DeprecationWarning, "upDownError is deprecated in Twisted 8.0.",
1071             __file__, f)
1072
1073
1074     def test_warning(self):
1075         """
1076         L{reporter.Reporter} observes warnings emitted by the Twisted log
1077         system and writes them to its output stream.
1078         """
1079         message = RuntimeWarning("some warning text")
1080         category = 'exceptions.RuntimeWarning'
1081         filename = "path/to/some/file.py"
1082         lineno = 71
1083         self.publisher.msg(
1084             warning=message, category=category,
1085             filename=filename, lineno=lineno)
1086         self.assertEqual(
1087             self.stream.getvalue(),
1088             "%s:%d: %s: %s\n" % (
1089                 filename, lineno, category.split('.')[-1], message))
1090
1091
1092     def test_duplicateWarningSuppressed(self):
1093         """
1094         A warning emitted twice within a single test is only written to the
1095         stream once.
1096         """
1097         # Emit the warning and assert that it shows up
1098         self.test_warning()
1099         # Emit the warning again and assert that the stream still only has one
1100         # warning on it.
1101         self.test_warning()
1102
1103
1104     def test_warningEmittedForNewTest(self):
1105         """
1106         A warning emitted again after a new test has started is written to the
1107         stream again.
1108         """
1109         test = self.__class__('test_warningEmittedForNewTest')
1110         self.result.startTest(test)
1111
1112         # Clear whatever startTest wrote to the stream
1113         self.stream.seek(0)
1114         self.stream.truncate()
1115
1116         # Emit a warning (and incidentally, assert that it was emitted)
1117         self.test_warning()
1118
1119         # Clean up from the first warning to simplify the rest of the
1120         # assertions.
1121         self.stream.seek(0)
1122         self.stream.truncate()
1123
1124         # Stop the first test and start another one (it just happens to be the
1125         # same one, but that doesn't matter)
1126         self.result.stopTest(test)
1127         self.result.startTest(test)
1128
1129         # Clean up the stopTest/startTest output
1130         self.stream.seek(0)
1131         self.stream.truncate()
1132
1133         # Emit the warning again and make sure it shows up
1134         self.test_warning()
1135
1136
1137     def test_stopObserving(self):
1138         """
1139         L{reporter.Reporter} stops observing log events when its C{done} method
1140         is called.
1141         """
1142         self.result.done()
1143         self.stream.seek(0)
1144         self.stream.truncate()
1145         self.publisher.msg(
1146             warning=RuntimeWarning("some message"),
1147             category='exceptions.RuntimeWarning',
1148             filename="file/name.py", lineno=17)
1149         self.assertEqual(self.stream.getvalue(), "")
1150
1151
1152
1153 class TestSafeStream(unittest.TestCase):
1154     def test_safe(self):
1155         """
1156         Test that L{reporter.SafeStream} successfully write to its original
1157         stream even if an interrupt happens during the write.
1158         """
1159         stream = StringIO.StringIO()
1160         broken = BrokenStream(stream)
1161         safe = reporter.SafeStream(broken)
1162         safe.write("Hello")
1163         self.assertEqual(stream.getvalue(), "Hello")
1164
1165
1166
1167 class TestSubunitReporter(TestReporterInterface):
1168     """
1169     Tests for the subunit reporter.
1170
1171     This just tests that the subunit reporter implements the basic interface.
1172     """
1173
1174     resultFactory = reporter.SubunitReporter
1175
1176
1177     def setUp(self):
1178         if reporter.TestProtocolClient is None:
1179             raise SkipTest(
1180                 "Subunit not installed, cannot test SubunitReporter")
1181         TestReporterInterface.setUp(self)
1182
1183
1184     def assertForwardsToSubunit(self, methodName, *args, **kwargs):
1185         """
1186         Assert that 'methodName' on L{SubunitReporter} forwards to the
1187         equivalent method on subunit.
1188
1189         Checks that the return value from subunit is returned from the
1190         L{SubunitReporter} and that the reporter writes the same data to its
1191         stream as subunit does to its own.
1192
1193         Assumes that the method on subunit has the same name as the method on
1194         L{SubunitReporter}.
1195         """
1196         stream = StringIO.StringIO()
1197         subunitClient = reporter.TestProtocolClient(stream)
1198         subunitReturn = getattr(subunitClient, methodName)(*args, **kwargs)
1199         subunitOutput = stream.getvalue()
1200         reporterReturn = getattr(self.result, methodName)(*args, **kwargs)
1201         self.assertEqual(subunitReturn, reporterReturn)
1202         self.assertEqual(subunitOutput, self.stream.getvalue())
1203
1204
1205     def removeMethod(self, klass, methodName):
1206         """
1207         Remove 'methodName' from 'klass'.
1208
1209         If 'klass' does not have a method named 'methodName', then
1210         'removeMethod' succeeds silently.
1211
1212         If 'klass' does have a method named 'methodName', then it is removed
1213         using delattr. Also, methods of the same name are removed from all
1214         base classes of 'klass', thus removing the method entirely.
1215
1216         @param klass: The class to remove the method from.
1217         @param methodName: The name of the method to remove.
1218         """
1219         method = getattr(klass, methodName, None)
1220         if method is None:
1221             return
1222         for base in getmro(klass):
1223             try:
1224                 delattr(base, methodName)
1225             except (AttributeError, TypeError):
1226                 break
1227             else:
1228                 self.addCleanup(setattr, base, methodName, method)
1229
1230
1231     def test_subunitWithoutAddExpectedFailureInstalled(self):
1232         """
1233         Some versions of subunit don't have "addExpectedFailure". For these
1234         versions, we report expected failures as successes.
1235         """
1236         self.removeMethod(reporter.TestProtocolClient, 'addExpectedFailure')
1237         try:
1238             1 / 0
1239         except ZeroDivisionError:
1240             self.result.addExpectedFailure(self.test, sys.exc_info(), "todo")
1241         expectedFailureOutput = self.stream.getvalue()
1242         self.stream.truncate(0)
1243         self.result.addSuccess(self.test)
1244         successOutput = self.stream.getvalue()
1245         self.assertEqual(successOutput, expectedFailureOutput)
1246
1247
1248     def test_subunitWithoutAddSkipInstalled(self):
1249         """
1250         Some versions of subunit don't have "addSkip". For these versions, we
1251         report skips as successes.
1252         """
1253         self.removeMethod(reporter.TestProtocolClient, 'addSkip')
1254         self.result.addSkip(self.test, "reason")
1255         skipOutput = self.stream.getvalue()
1256         self.stream.truncate(0)
1257         self.result.addSuccess(self.test)
1258         successOutput = self.stream.getvalue()
1259         self.assertEqual(successOutput, skipOutput)
1260
1261
1262     def test_addExpectedFailurePassedThrough(self):
1263         """
1264         Some versions of subunit have "addExpectedFailure". For these
1265         versions, when we call 'addExpectedFailure' on the test result, we
1266         pass the error and test through to the subunit client.
1267         """
1268         addExpectedFailureCalls = []
1269         def addExpectedFailure(test, error):
1270             addExpectedFailureCalls.append((test, error))
1271
1272         # Provide our own addExpectedFailure, whether or not the locally
1273         # installed subunit has addExpectedFailure.
1274         self.result._subunit.addExpectedFailure = addExpectedFailure
1275         try:
1276             1 / 0
1277         except ZeroDivisionError:
1278             exc_info = sys.exc_info()
1279             self.result.addExpectedFailure(self.test, exc_info, 'todo')
1280         self.assertEqual(addExpectedFailureCalls, [(self.test, exc_info)])
1281
1282
1283     def test_addSkipSendsSubunitAddSkip(self):
1284         """
1285         Some versions of subunit have "addSkip". For these versions, when we
1286         call 'addSkip' on the test result, we pass the test and reason through
1287         to the subunit client.
1288         """
1289         addSkipCalls = []
1290         def addSkip(test, reason):
1291             addSkipCalls.append((test, reason))
1292
1293         # Provide our own addSkip, whether or not the locally-installed
1294         # subunit has addSkip.
1295         self.result._subunit.addSkip = addSkip
1296         self.result.addSkip(self.test, 'reason')
1297         self.assertEqual(addSkipCalls, [(self.test, 'reason')])
1298
1299
1300     def test_doneDoesNothing(self):
1301         """
1302         The subunit reporter doesn't need to print out a summary -- the stream
1303         of results is everything. Thus, done() does nothing.
1304         """
1305         self.result.done()
1306         self.assertEqual('', self.stream.getvalue())
1307
1308
1309     def test_startTestSendsSubunitStartTest(self):
1310         """
1311         SubunitReporter.startTest() sends the subunit 'startTest' message.
1312         """
1313         self.assertForwardsToSubunit('startTest', self.test)
1314
1315
1316     def test_stopTestSendsSubunitStopTest(self):
1317         """
1318         SubunitReporter.stopTest() sends the subunit 'stopTest' message.
1319         """
1320         self.assertForwardsToSubunit('stopTest', self.test)
1321
1322
1323     def test_addSuccessSendsSubunitAddSuccess(self):
1324         """
1325         SubunitReporter.addSuccess() sends the subunit 'addSuccess' message.
1326         """
1327         self.assertForwardsToSubunit('addSuccess', self.test)
1328
1329
1330     def test_addErrorSendsSubunitAddError(self):
1331         """
1332         SubunitReporter.addError() sends the subunit 'addError' message.
1333         """
1334         try:
1335             1 / 0
1336         except ZeroDivisionError:
1337             error = sys.exc_info()
1338         self.assertForwardsToSubunit('addError', self.test, error)
1339
1340
1341     def test_addFailureSendsSubunitAddFailure(self):
1342         """
1343         SubunitReporter.addFailure() sends the subunit 'addFailure' message.
1344         """
1345         try:
1346             self.fail('hello')
1347         except self.failureException:
1348             failure = sys.exc_info()
1349         self.assertForwardsToSubunit('addFailure', self.test, failure)
1350
1351
1352     def test_addUnexpectedSuccessSendsSubunitAddSuccess(self):
1353         """
1354         SubunitReporter.addFailure() sends the subunit 'addSuccess' message,
1355         since subunit doesn't model unexpected success.
1356         """
1357         stream = StringIO.StringIO()
1358         subunitClient = reporter.TestProtocolClient(stream)
1359         subunitClient.addSuccess(self.test)
1360         subunitOutput = stream.getvalue()
1361         self.result.addUnexpectedSuccess(self.test, 'todo')
1362         self.assertEqual(subunitOutput, self.stream.getvalue())
1363
1364
1365     def test_loadTimeErrors(self):
1366         """
1367         Load-time errors are reported like normal errors.
1368         """
1369         test = runner.TestLoader().loadByName('doesntexist')
1370         test.run(self.result)
1371         output = self.stream.getvalue()
1372         # Just check that 'doesntexist' is in the output, rather than
1373         # assembling the expected stack trace.
1374         self.assertIn('doesntexist', output)
1375
1376
1377
1378 class TestSubunitReporterNotInstalled(unittest.TestCase):
1379     """
1380     Test behaviour when the subunit reporter is not installed.
1381     """
1382
1383     def test_subunitNotInstalled(self):
1384         """
1385         If subunit is not installed, TestProtocolClient will be None, and
1386         SubunitReporter will raise an error when you try to construct it.
1387         """
1388         stream = StringIO.StringIO()
1389         self.patch(reporter, 'TestProtocolClient', None)
1390         e = self.assertRaises(Exception, reporter.SubunitReporter, stream)
1391         self.assertEqual("Subunit not available", str(e))
1392
1393
1394
1395 class TestTimingReporter(TestReporter):
1396     resultFactory = reporter.TimingTextReporter
1397
1398
1399
1400 class LoggingReporter(reporter.Reporter):
1401     """
1402     Simple reporter that stores the last test that was passed to it.
1403     """
1404
1405     def __init__(self, *args, **kwargs):
1406         reporter.Reporter.__init__(self, *args, **kwargs)
1407         self.test = None
1408
1409     def addError(self, test, error):
1410         self.test = test
1411
1412     def addExpectedFailure(self, test, failure, todo):
1413         self.test = test
1414
1415     def addFailure(self, test, failure):
1416         self.test = test
1417
1418     def addSkip(self, test, skip):
1419         self.test = test
1420
1421     def addUnexpectedSuccess(self, test, todo):
1422         self.test = test
1423
1424     def startTest(self, test):
1425         self.test = test
1426
1427     def stopTest(self, test):
1428         self.test = test
1429
1430
1431
1432 class TestAdaptedReporter(unittest.TestCase):
1433     """
1434     L{reporter._AdaptedReporter} is a reporter wrapper that wraps all of the
1435     tests it receives before passing them on to the original reporter.
1436     """
1437
1438     def setUp(self):
1439         self.wrappedResult = self.getWrappedResult()
1440
1441
1442     def _testAdapter(self, test):
1443         return test.id()
1444
1445
1446     def assertWrapped(self, wrappedResult, test):
1447         self.assertEqual(wrappedResult._originalReporter.test, self._testAdapter(test))
1448
1449
1450     def getFailure(self, exceptionInstance):
1451         """
1452         Return a L{Failure} from raising the given exception.
1453
1454         @param exceptionInstance: The exception to raise.
1455         @return: L{Failure}
1456         """
1457         try:
1458             raise exceptionInstance
1459         except:
1460             return Failure()
1461
1462
1463     def getWrappedResult(self):
1464         result = LoggingReporter()
1465         return reporter._AdaptedReporter(result, self._testAdapter)
1466
1467
1468     def test_addError(self):
1469         """
1470         C{addError} wraps its test with the provided adapter.
1471         """
1472         self.wrappedResult.addError(self, self.getFailure(RuntimeError()))
1473         self.assertWrapped(self.wrappedResult, self)
1474
1475
1476     def test_addFailure(self):
1477         """
1478         C{addFailure} wraps its test with the provided adapter.
1479         """
1480         self.wrappedResult.addFailure(self, self.getFailure(AssertionError()))
1481         self.assertWrapped(self.wrappedResult, self)
1482
1483
1484     def test_addSkip(self):
1485         """
1486         C{addSkip} wraps its test with the provided adapter.
1487         """
1488         self.wrappedResult.addSkip(self, self.getFailure(SkipTest('no reason')))
1489         self.assertWrapped(self.wrappedResult, self)
1490
1491
1492     def test_startTest(self):
1493         """
1494         C{startTest} wraps its test with the provided adapter.
1495         """
1496         self.wrappedResult.startTest(self)
1497         self.assertWrapped(self.wrappedResult, self)
1498
1499
1500     def test_stopTest(self):
1501         """
1502         C{stopTest} wraps its test with the provided adapter.
1503         """
1504         self.wrappedResult.stopTest(self)
1505         self.assertWrapped(self.wrappedResult, self)
1506
1507
1508     def test_addExpectedFailure(self):
1509         """
1510         C{addExpectedFailure} wraps its test with the provided adapter.
1511         """
1512         self.wrappedResult.addExpectedFailure(
1513             self, self.getFailure(RuntimeError()), Todo("no reason"))
1514         self.assertWrapped(self.wrappedResult, self)
1515
1516
1517     def test_addUnexpectedSuccess(self):
1518         """
1519         C{addUnexpectedSuccess} wraps its test with the provided adapter.
1520         """
1521         self.wrappedResult.addUnexpectedSuccess(self, Todo("no reason"))
1522         self.assertWrapped(self.wrappedResult, self)
1523
1524
1525
1526 class FakeStream(object):
1527     """
1528     A fake stream which C{isatty} method returns some predictable.
1529
1530     @ivar tty: returned value of C{isatty}.
1531     @type tty: C{bool}
1532     """
1533
1534     def __init__(self, tty=True):
1535         self.tty = tty
1536
1537
1538     def isatty(self):
1539         return self.tty
1540
1541
1542
1543 class AnsiColorizerTests(unittest.TestCase):
1544     """
1545     Tests for L{reporter._AnsiColorizer}.
1546     """
1547
1548     def setUp(self):
1549         self.savedModules = sys.modules.copy()
1550
1551
1552     def tearDown(self):
1553         sys.modules.clear()
1554         sys.modules.update(self.savedModules)
1555
1556
1557     def test_supportedStdOutTTY(self):
1558         """
1559         L{reporter._AnsiColorizer.supported} returns C{False} if the given
1560         stream is not a TTY.
1561         """
1562         self.assertFalse(reporter._AnsiColorizer.supported(FakeStream(False)))
1563
1564
1565     def test_supportedNoCurses(self):
1566         """
1567         L{reporter._AnsiColorizer.supported} returns C{False} if the curses
1568         module can't be imported.
1569         """
1570         sys.modules['curses'] = None
1571         self.assertFalse(reporter._AnsiColorizer.supported(FakeStream()))
1572
1573
1574     def test_supportedSetupTerm(self):
1575         """
1576         L{reporter._AnsiColorizer.supported} returns C{True} if
1577         C{curses.tigetnum} returns more than 2 supported colors. It only tries
1578         to call C{curses.setupterm} if C{curses.tigetnum} previously failed
1579         with a C{curses.error}.
1580         """
1581         class fakecurses(object):
1582             error = RuntimeError
1583             setUp = 0
1584
1585             def setupterm(self):
1586                 self.setUp += 1
1587
1588             def tigetnum(self, value):
1589                 if self.setUp:
1590                     return 3
1591                 else:
1592                     raise self.error()
1593
1594         sys.modules['curses'] = fakecurses()
1595         self.assertTrue(reporter._AnsiColorizer.supported(FakeStream()))
1596         self.assertTrue(reporter._AnsiColorizer.supported(FakeStream()))
1597
1598         self.assertEqual(sys.modules['curses'].setUp, 1)
1599
1600
1601     def test_supportedTigetNumWrongError(self):
1602         """
1603         L{reporter._AnsiColorizer.supported} returns C{False} and doesn't try
1604         to call C{curses.setupterm} if C{curses.tigetnum} returns something
1605         different than C{curses.error}.
1606         """
1607         class fakecurses(object):
1608             error = RuntimeError
1609
1610             def tigetnum(self, value):
1611                 raise ValueError()
1612
1613         sys.modules['curses'] = fakecurses()
1614         self.assertFalse(reporter._AnsiColorizer.supported(FakeStream()))
1615
1616
1617     def test_supportedTigetNumNotEnoughColor(self):
1618         """
1619         L{reporter._AnsiColorizer.supported} returns C{False} if
1620         C{curses.tigetnum} returns less than 2 supported colors.
1621         """
1622         class fakecurses(object):
1623             error = RuntimeError
1624
1625             def tigetnum(self, value):
1626                 return 1
1627
1628         sys.modules['curses'] = fakecurses()
1629         self.assertFalse(reporter._AnsiColorizer.supported(FakeStream()))
1630
1631
1632     def test_supportedTigetNumErrors(self):
1633         """
1634         L{reporter._AnsiColorizer.supported} returns C{False} if
1635         C{curses.tigetnum} raises an error, and calls C{curses.setupterm} once.
1636         """
1637         class fakecurses(object):
1638             error = RuntimeError
1639             setUp = 0
1640
1641             def setupterm(self):
1642                 self.setUp += 1
1643
1644             def tigetnum(self, value):
1645                 raise self.error()
1646
1647         sys.modules['curses'] = fakecurses()
1648         self.assertFalse(reporter._AnsiColorizer.supported(FakeStream()))
1649         self.assertEqual(sys.modules['curses'].setUp, 1)