1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
6 Tests for L{twisted.trial.util}
11 from zope.interface import implements
13 from twisted.internet.interfaces import IProcessTransport
14 from twisted.internet import defer
15 from twisted.internet.base import DelayedCall
17 from twisted.trial.unittest import TestCase
18 from twisted.trial import util
19 from twisted.trial.util import DirtyReactorAggregateError, _Janitor
20 from twisted.trial.test import packages
24 class TestMktemp(TestCase):
27 dirs = os.path.dirname(name).split(os.sep)[:-1]
29 dirs, ['twisted.trial.test.test_util', 'TestMktemp', 'test_name'])
31 def test_unique(self):
33 self.failIfEqual(name, self.mktemp())
35 def test_created(self):
37 dirname = os.path.dirname(name)
38 self.failUnless(os.path.exists(dirname))
39 self.failIf(os.path.exists(name))
41 def test_location(self):
42 path = os.path.abspath(self.mktemp())
43 self.failUnless(path.startswith(os.getcwd()))
46 class TestIntrospection(TestCase):
47 def test_containers(self):
49 parents = util.getPythonContainers(
50 suppression.TestSuppression2.testSuppressModule)
51 expected = [suppression.TestSuppression2, suppression]
52 for a, b in zip(parents, expected):
53 self.assertEqual(a, b)
56 class TestFindObject(packages.SysPathManglingTest):
58 Tests for L{twisted.trial.util.findObject}
61 def test_deprecation(self):
63 Calling L{findObject} results in a deprecation warning
66 warningsShown = self.flushWarnings()
67 self.assertEqual(len(warningsShown), 1)
68 self.assertIdentical(warningsShown[0]['category'], DeprecationWarning)
69 self.assertEqual(warningsShown[0]['message'],
70 "twisted.trial.util.findObject was deprecated "
71 "in Twisted 10.1.0: Please use "
72 "twisted.python.reflect.namedAny instead.")
75 def test_importPackage(self):
76 package1 = util.findObject('package')
77 import package as package2
78 self.assertEqual(package1, (True, package2))
80 def test_importModule(self):
81 test_sample2 = util.findObject('goodpackage.test_sample')
82 from goodpackage import test_sample
83 self.assertEqual((True, test_sample), test_sample2)
85 def test_importError(self):
86 self.failUnlessRaises(ZeroDivisionError,
87 util.findObject, 'package.test_bad_module')
89 def test_sophisticatedImportError(self):
90 self.failUnlessRaises(ImportError,
91 util.findObject, 'package2.test_module')
93 def test_importNonexistentPackage(self):
94 self.assertEqual(util.findObject('doesntexist')[0], False)
96 def test_findNonexistentModule(self):
97 self.assertEqual(util.findObject('package.doesntexist')[0], False)
99 def test_findNonexistentObject(self):
100 self.assertEqual(util.findObject(
101 'goodpackage.test_sample.doesnt')[0], False)
102 self.assertEqual(util.findObject(
103 'goodpackage.test_sample.AlphabetTest.doesntexist')[0], False)
105 def test_findObjectExist(self):
106 alpha1 = util.findObject('goodpackage.test_sample.AlphabetTest')
107 from goodpackage import test_sample
108 self.assertEqual(alpha1, (True, test_sample.AlphabetTest))
112 class TestRunSequentially(TestCase):
114 Sometimes it is useful to be able to run an arbitrary list of callables,
117 When some of those callables can return Deferreds, things become complex.
120 def test_emptyList(self):
122 When asked to run an empty list of callables, runSequentially returns a
123 successful Deferred that fires an empty list.
125 d = util._runSequentially([])
126 d.addCallback(self.assertEqual, [])
130 def test_singleSynchronousSuccess(self):
132 When given a callable that succeeds without returning a Deferred,
133 include the return value in the results list, tagged with a SUCCESS
136 d = util._runSequentially([lambda: None])
137 d.addCallback(self.assertEqual, [(defer.SUCCESS, None)])
141 def test_singleSynchronousFailure(self):
143 When given a callable that raises an exception, include a Failure for
144 that exception in the results list, tagged with a FAILURE flag.
146 d = util._runSequentially([lambda: self.fail('foo')])
148 [(flag, fail)] = results
149 fail.trap(self.failureException)
150 self.assertEqual(fail.getErrorMessage(), 'foo')
151 self.assertEqual(flag, defer.FAILURE)
152 return d.addCallback(check)
155 def test_singleAsynchronousSuccess(self):
157 When given a callable that returns a successful Deferred, include the
158 result of the Deferred in the results list, tagged with a SUCCESS flag.
160 d = util._runSequentially([lambda: defer.succeed(None)])
161 d.addCallback(self.assertEqual, [(defer.SUCCESS, None)])
165 def test_singleAsynchronousFailure(self):
167 When given a callable that returns a failing Deferred, include the
168 failure the results list, tagged with a FAILURE flag.
170 d = util._runSequentially([lambda: defer.fail(ValueError('foo'))])
172 [(flag, fail)] = results
173 fail.trap(ValueError)
174 self.assertEqual(fail.getErrorMessage(), 'foo')
175 self.assertEqual(flag, defer.FAILURE)
176 return d.addCallback(check)
179 def test_callablesCalledInOrder(self):
181 Check that the callables are called in the given order, one after the
193 d = util._runSequentially([lambda: append('foo'),
194 lambda: append('bar')])
196 # runSequentially should wait until the Deferred has fired before
197 # running the second callable.
198 self.assertEqual(log, ['foo'])
199 deferreds[-1].callback(None)
200 self.assertEqual(log, ['foo', 'bar'])
202 # Because returning created Deferreds makes jml happy.
203 deferreds[-1].callback(None)
207 def test_continuesAfterError(self):
209 If one of the callables raises an error, then runSequentially continues
210 to run the remaining callables.
212 d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'])
214 [(flag1, fail), (flag2, result)] = results
215 fail.trap(self.failureException)
216 self.assertEqual(flag1, defer.FAILURE)
217 self.assertEqual(fail.getErrorMessage(), 'foo')
218 self.assertEqual(flag2, defer.SUCCESS)
219 self.assertEqual(result, 'bar')
220 return d.addCallback(check)
223 def test_stopOnFirstError(self):
225 If the C{stopOnFirstError} option is passed to C{runSequentially}, then
226 no further callables are called after the first exception is raised.
228 d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'],
229 stopOnFirstError=True)
231 [(flag1, fail)] = results
232 fail.trap(self.failureException)
233 self.assertEqual(flag1, defer.FAILURE)
234 self.assertEqual(fail.getErrorMessage(), 'foo')
235 return d.addCallback(check)
238 def test_stripFlags(self):
240 If the C{stripFlags} option is passed to C{runSequentially} then the
241 SUCCESS / FAILURE flags are stripped from the output. Instead, the
242 Deferred fires a flat list of results containing only the results and
245 d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'],
248 [fail, result] = results
249 fail.trap(self.failureException)
250 self.assertEqual(fail.getErrorMessage(), 'foo')
251 self.assertEqual(result, 'bar')
252 return d.addCallback(check)
253 test_stripFlags.todo = "YAGNI"
257 class DirtyReactorAggregateErrorTest(TestCase):
259 Tests for the L{DirtyReactorAggregateError}.
262 def test_formatDelayedCall(self):
264 Delayed calls are formatted nicely.
266 error = DirtyReactorAggregateError(["Foo", "bar"])
267 self.assertEqual(str(error),
270 DelayedCalls: (set twisted.internet.base.DelayedCall.debug = True to debug)
275 def test_formatSelectables(self):
277 Selectables are formatted nicely.
279 error = DirtyReactorAggregateError([], ["selectable 1", "selectable 2"])
280 self.assertEqual(str(error),
288 def test_formatDelayedCallsAndSelectables(self):
290 Both delayed calls and selectables can appear in the same error.
292 error = DirtyReactorAggregateError(["bleck", "Boozo"],
294 self.assertEqual(str(error),
297 DelayedCalls: (set twisted.internet.base.DelayedCall.debug = True to debug)
306 class StubReactor(object):
308 A reactor stub which contains enough functionality to be used with the
311 @ivar iterations: A list of the arguments passed to L{iterate}.
312 @ivar removeAllCalled: Number of times that L{removeAll} was called.
313 @ivar selectables: The value that will be returned from L{removeAll}.
314 @ivar delayedCalls: The value to return from L{getDelayedCalls}.
317 def __init__(self, delayedCalls, selectables=None):
319 @param delayedCalls: See L{StubReactor.delayedCalls}.
320 @param selectables: See L{StubReactor.selectables}.
322 self.delayedCalls = delayedCalls
324 self.removeAllCalled = 0
327 self.selectables = selectables
330 def iterate(self, timeout=None):
332 Increment C{self.iterations}.
334 self.iterations.append(timeout)
337 def getDelayedCalls(self):
339 Return C{self.delayedCalls}.
341 return self.delayedCalls
346 Increment C{self.removeAllCalled} and return C{self.selectables}.
348 self.removeAllCalled += 1
349 return self.selectables
353 class StubErrorReporter(object):
355 A subset of L{twisted.trial.itrial.IReporter} which records L{addError}
358 @ivar errors: List of two-tuples of (test, error) which were passed to
366 def addError(self, test, error):
368 Record parameters in C{self.errors}.
370 self.errors.append((test, error))
374 class JanitorTests(TestCase):
376 Tests for L{_Janitor}!
379 def test_cleanPendingSpinsReactor(self):
381 During pending-call cleanup, the reactor will be spun twice with an
382 instant timeout. This is not a requirement, it is only a test for
383 current behavior. Hopefully Trial will eventually not do this kind of
386 reactor = StubReactor([])
387 jan = _Janitor(None, None, reactor=reactor)
389 self.assertEqual(reactor.iterations, [0, 0])
392 def test_cleanPendingCancelsCalls(self):
394 During pending-call cleanup, the janitor cancels pending timed calls.
399 delayedCall = DelayedCall(300, func, (), {},
400 cancelled.append, lambda x: None)
401 reactor = StubReactor([delayedCall])
402 jan = _Janitor(None, None, reactor=reactor)
404 self.assertEqual(cancelled, [delayedCall])
407 def test_cleanPendingReturnsDelayedCallStrings(self):
409 The Janitor produces string representations of delayed calls from the
410 delayed call cleanup method. It gets the string representations
411 *before* cancelling the calls; this is important because cancelling the
412 call removes critical debugging information from the string
415 delayedCall = DelayedCall(300, lambda: None, (), {},
416 lambda x: None, lambda x: None,
418 delayedCallString = str(delayedCall)
419 reactor = StubReactor([delayedCall])
420 jan = _Janitor(None, None, reactor=reactor)
421 strings = jan._cleanPending()
422 self.assertEqual(strings, [delayedCallString])
425 def test_cleanReactorRemovesSelectables(self):
427 The Janitor will remove selectables during reactor cleanup.
429 reactor = StubReactor([])
430 jan = _Janitor(None, None, reactor=reactor)
432 self.assertEqual(reactor.removeAllCalled, 1)
435 def test_cleanReactorKillsProcesses(self):
437 The Janitor will kill processes during reactor cleanup.
439 class StubProcessTransport(object):
441 A stub L{IProcessTransport} provider which records signals.
442 @ivar signals: The signals passed to L{signalProcess}.
444 implements(IProcessTransport)
449 def signalProcess(self, signal):
451 Append C{signal} to C{self.signals}.
453 self.signals.append(signal)
455 pt = StubProcessTransport()
456 reactor = StubReactor([], [pt])
457 jan = _Janitor(None, None, reactor=reactor)
459 self.assertEqual(pt.signals, ["KILL"])
462 def test_cleanReactorReturnsSelectableStrings(self):
464 The Janitor returns string representations of the selectables that it
465 cleaned up from the reactor cleanup method.
467 class Selectable(object):
469 A stub Selectable which only has an interesting string
473 return "(SELECTABLE!)"
475 reactor = StubReactor([], [Selectable()])
476 jan = _Janitor(None, None, reactor=reactor)
477 self.assertEqual(jan._cleanReactor(), ["(SELECTABLE!)"])
480 def test_postCaseCleanupNoErrors(self):
482 The post-case cleanup method will return True and not call C{addError}
483 on the result if there are no pending calls.
485 reactor = StubReactor([])
487 reporter = StubErrorReporter()
488 jan = _Janitor(test, reporter, reactor=reactor)
489 self.assertTrue(jan.postCaseCleanup())
490 self.assertEqual(reporter.errors, [])
493 def test_postCaseCleanupWithErrors(self):
495 The post-case cleanup method will return False and call C{addError} on
496 the result with a L{DirtyReactorAggregateError} Failure if there are
499 delayedCall = DelayedCall(300, lambda: None, (), {},
500 lambda x: None, lambda x: None,
502 delayedCallString = str(delayedCall)
503 reactor = StubReactor([delayedCall], [])
505 reporter = StubErrorReporter()
506 jan = _Janitor(test, reporter, reactor=reactor)
507 self.assertFalse(jan.postCaseCleanup())
508 self.assertEqual(len(reporter.errors), 1)
509 self.assertEqual(reporter.errors[0][1].value.delayedCalls,
513 def test_postClassCleanupNoErrors(self):
515 The post-class cleanup method will not call C{addError} on the result
516 if there are no pending calls or selectables.
518 reactor = StubReactor([])
520 reporter = StubErrorReporter()
521 jan = _Janitor(test, reporter, reactor=reactor)
522 jan.postClassCleanup()
523 self.assertEqual(reporter.errors, [])
526 def test_postClassCleanupWithPendingCallErrors(self):
528 The post-class cleanup method call C{addError} on the result with a
529 L{DirtyReactorAggregateError} Failure if there are pending calls.
531 delayedCall = DelayedCall(300, lambda: None, (), {},
532 lambda x: None, lambda x: None,
534 delayedCallString = str(delayedCall)
535 reactor = StubReactor([delayedCall], [])
537 reporter = StubErrorReporter()
538 jan = _Janitor(test, reporter, reactor=reactor)
539 jan.postClassCleanup()
540 self.assertEqual(len(reporter.errors), 1)
541 self.assertEqual(reporter.errors[0][1].value.delayedCalls,
545 def test_postClassCleanupWithSelectableErrors(self):
547 The post-class cleanup method call C{addError} on the result with a
548 L{DirtyReactorAggregateError} Failure if there are selectables.
550 selectable = "SELECTABLE HERE"
551 reactor = StubReactor([], [selectable])
553 reporter = StubErrorReporter()
554 jan = _Janitor(test, reporter, reactor=reactor)
555 jan.postClassCleanup()
556 self.assertEqual(len(reporter.errors), 1)
557 self.assertEqual(reporter.errors[0][1].value.selectables,