Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / trial / test / test_util.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3 #
4
5 """
6 Tests for L{twisted.trial.util}
7 """
8
9 import os
10
11 from zope.interface import implements
12
13 from twisted.internet.interfaces import IProcessTransport
14 from twisted.internet import defer
15 from twisted.internet.base import DelayedCall
16
17 from twisted.trial.unittest import TestCase
18 from twisted.trial import util
19 from twisted.trial.util import DirtyReactorAggregateError, _Janitor
20 from twisted.trial.test import packages
21
22
23
24 class TestMktemp(TestCase):
25     def test_name(self):
26         name = self.mktemp()
27         dirs = os.path.dirname(name).split(os.sep)[:-1]
28         self.assertEqual(
29             dirs, ['twisted.trial.test.test_util', 'TestMktemp', 'test_name'])
30
31     def test_unique(self):
32         name = self.mktemp()
33         self.failIfEqual(name, self.mktemp())
34
35     def test_created(self):
36         name = self.mktemp()
37         dirname = os.path.dirname(name)
38         self.failUnless(os.path.exists(dirname))
39         self.failIf(os.path.exists(name))
40
41     def test_location(self):
42         path = os.path.abspath(self.mktemp())
43         self.failUnless(path.startswith(os.getcwd()))
44
45
46 class TestIntrospection(TestCase):
47     def test_containers(self):
48         import suppression
49         parents = util.getPythonContainers(
50             suppression.TestSuppression2.testSuppressModule)
51         expected = [suppression.TestSuppression2, suppression]
52         for a, b in zip(parents, expected):
53             self.assertEqual(a, b)
54
55
56 class TestFindObject(packages.SysPathManglingTest):
57     """
58     Tests for L{twisted.trial.util.findObject}
59     """
60
61     def test_deprecation(self):
62         """
63         Calling L{findObject} results in a deprecation warning
64         """
65         util.findObject('')
66         warningsShown = self.flushWarnings()
67         self.assertEqual(len(warningsShown), 1)
68         self.assertIdentical(warningsShown[0]['category'], DeprecationWarning)
69         self.assertEqual(warningsShown[0]['message'],
70                           "twisted.trial.util.findObject was deprecated "
71                           "in Twisted 10.1.0: Please use "
72                           "twisted.python.reflect.namedAny instead.")
73
74
75     def test_importPackage(self):
76         package1 = util.findObject('package')
77         import package as package2
78         self.assertEqual(package1, (True, package2))
79
80     def test_importModule(self):
81         test_sample2 = util.findObject('goodpackage.test_sample')
82         from goodpackage import test_sample
83         self.assertEqual((True, test_sample), test_sample2)
84
85     def test_importError(self):
86         self.failUnlessRaises(ZeroDivisionError,
87                               util.findObject, 'package.test_bad_module')
88
89     def test_sophisticatedImportError(self):
90         self.failUnlessRaises(ImportError,
91                               util.findObject, 'package2.test_module')
92
93     def test_importNonexistentPackage(self):
94         self.assertEqual(util.findObject('doesntexist')[0], False)
95
96     def test_findNonexistentModule(self):
97         self.assertEqual(util.findObject('package.doesntexist')[0], False)
98
99     def test_findNonexistentObject(self):
100         self.assertEqual(util.findObject(
101             'goodpackage.test_sample.doesnt')[0], False)
102         self.assertEqual(util.findObject(
103             'goodpackage.test_sample.AlphabetTest.doesntexist')[0], False)
104
105     def test_findObjectExist(self):
106         alpha1 = util.findObject('goodpackage.test_sample.AlphabetTest')
107         from goodpackage import test_sample
108         self.assertEqual(alpha1, (True, test_sample.AlphabetTest))
109
110
111
112 class TestRunSequentially(TestCase):
113     """
114     Sometimes it is useful to be able to run an arbitrary list of callables,
115     one after the other.
116
117     When some of those callables can return Deferreds, things become complex.
118     """
119
120     def test_emptyList(self):
121         """
122         When asked to run an empty list of callables, runSequentially returns a
123         successful Deferred that fires an empty list.
124         """
125         d = util._runSequentially([])
126         d.addCallback(self.assertEqual, [])
127         return d
128
129
130     def test_singleSynchronousSuccess(self):
131         """
132         When given a callable that succeeds without returning a Deferred,
133         include the return value in the results list, tagged with a SUCCESS
134         flag.
135         """
136         d = util._runSequentially([lambda: None])
137         d.addCallback(self.assertEqual, [(defer.SUCCESS, None)])
138         return d
139
140
141     def test_singleSynchronousFailure(self):
142         """
143         When given a callable that raises an exception, include a Failure for
144         that exception in the results list, tagged with a FAILURE flag.
145         """
146         d = util._runSequentially([lambda: self.fail('foo')])
147         def check(results):
148             [(flag, fail)] = results
149             fail.trap(self.failureException)
150             self.assertEqual(fail.getErrorMessage(), 'foo')
151             self.assertEqual(flag, defer.FAILURE)
152         return d.addCallback(check)
153
154
155     def test_singleAsynchronousSuccess(self):
156         """
157         When given a callable that returns a successful Deferred, include the
158         result of the Deferred in the results list, tagged with a SUCCESS flag.
159         """
160         d = util._runSequentially([lambda: defer.succeed(None)])
161         d.addCallback(self.assertEqual, [(defer.SUCCESS, None)])
162         return d
163
164
165     def test_singleAsynchronousFailure(self):
166         """
167         When given a callable that returns a failing Deferred, include the
168         failure the results list, tagged with a FAILURE flag.
169         """
170         d = util._runSequentially([lambda: defer.fail(ValueError('foo'))])
171         def check(results):
172             [(flag, fail)] = results
173             fail.trap(ValueError)
174             self.assertEqual(fail.getErrorMessage(), 'foo')
175             self.assertEqual(flag, defer.FAILURE)
176         return d.addCallback(check)
177
178
179     def test_callablesCalledInOrder(self):
180         """
181         Check that the callables are called in the given order, one after the
182         other.
183         """
184         log = []
185         deferreds = []
186
187         def append(value):
188             d = defer.Deferred()
189             log.append(value)
190             deferreds.append(d)
191             return d
192
193         d = util._runSequentially([lambda: append('foo'),
194                                    lambda: append('bar')])
195
196         # runSequentially should wait until the Deferred has fired before
197         # running the second callable.
198         self.assertEqual(log, ['foo'])
199         deferreds[-1].callback(None)
200         self.assertEqual(log, ['foo', 'bar'])
201
202         # Because returning created Deferreds makes jml happy.
203         deferreds[-1].callback(None)
204         return d
205
206
207     def test_continuesAfterError(self):
208         """
209         If one of the callables raises an error, then runSequentially continues
210         to run the remaining callables.
211         """
212         d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'])
213         def check(results):
214             [(flag1, fail), (flag2, result)] = results
215             fail.trap(self.failureException)
216             self.assertEqual(flag1, defer.FAILURE)
217             self.assertEqual(fail.getErrorMessage(), 'foo')
218             self.assertEqual(flag2, defer.SUCCESS)
219             self.assertEqual(result, 'bar')
220         return d.addCallback(check)
221
222
223     def test_stopOnFirstError(self):
224         """
225         If the C{stopOnFirstError} option is passed to C{runSequentially}, then
226         no further callables are called after the first exception is raised.
227         """
228         d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'],
229                                   stopOnFirstError=True)
230         def check(results):
231             [(flag1, fail)] = results
232             fail.trap(self.failureException)
233             self.assertEqual(flag1, defer.FAILURE)
234             self.assertEqual(fail.getErrorMessage(), 'foo')
235         return d.addCallback(check)
236
237
238     def test_stripFlags(self):
239         """
240         If the C{stripFlags} option is passed to C{runSequentially} then the
241         SUCCESS / FAILURE flags are stripped from the output. Instead, the
242         Deferred fires a flat list of results containing only the results and
243         failures.
244         """
245         d = util._runSequentially([lambda: self.fail('foo'), lambda: 'bar'],
246                                   stripFlags=True)
247         def check(results):
248             [fail, result] = results
249             fail.trap(self.failureException)
250             self.assertEqual(fail.getErrorMessage(), 'foo')
251             self.assertEqual(result, 'bar')
252         return d.addCallback(check)
253     test_stripFlags.todo = "YAGNI"
254
255
256
257 class DirtyReactorAggregateErrorTest(TestCase):
258     """
259     Tests for the L{DirtyReactorAggregateError}.
260     """
261
262     def test_formatDelayedCall(self):
263         """
264         Delayed calls are formatted nicely.
265         """
266         error = DirtyReactorAggregateError(["Foo", "bar"])
267         self.assertEqual(str(error),
268                           """\
269 Reactor was unclean.
270 DelayedCalls: (set twisted.internet.base.DelayedCall.debug = True to debug)
271 Foo
272 bar""")
273
274
275     def test_formatSelectables(self):
276         """
277         Selectables are formatted nicely.
278         """
279         error = DirtyReactorAggregateError([], ["selectable 1", "selectable 2"])
280         self.assertEqual(str(error),
281                           """\
282 Reactor was unclean.
283 Selectables:
284 selectable 1
285 selectable 2""")
286
287
288     def test_formatDelayedCallsAndSelectables(self):
289         """
290         Both delayed calls and selectables can appear in the same error.
291         """
292         error = DirtyReactorAggregateError(["bleck", "Boozo"],
293                                            ["Sel1", "Sel2"])
294         self.assertEqual(str(error),
295                           """\
296 Reactor was unclean.
297 DelayedCalls: (set twisted.internet.base.DelayedCall.debug = True to debug)
298 bleck
299 Boozo
300 Selectables:
301 Sel1
302 Sel2""")
303
304
305
306 class StubReactor(object):
307     """
308     A reactor stub which contains enough functionality to be used with the
309     L{_Janitor}.
310
311     @ivar iterations: A list of the arguments passed to L{iterate}.
312     @ivar removeAllCalled: Number of times that L{removeAll} was called.
313     @ivar selectables: The value that will be returned from L{removeAll}.
314     @ivar delayedCalls: The value to return from L{getDelayedCalls}.
315     """
316
317     def __init__(self, delayedCalls, selectables=None):
318         """
319         @param delayedCalls: See L{StubReactor.delayedCalls}.
320         @param selectables: See L{StubReactor.selectables}.
321         """
322         self.delayedCalls = delayedCalls
323         self.iterations = []
324         self.removeAllCalled = 0
325         if not selectables:
326             selectables = []
327         self.selectables = selectables
328
329
330     def iterate(self, timeout=None):
331         """
332         Increment C{self.iterations}.
333         """
334         self.iterations.append(timeout)
335
336
337     def getDelayedCalls(self):
338         """
339         Return C{self.delayedCalls}.
340         """
341         return self.delayedCalls
342
343
344     def removeAll(self):
345         """
346         Increment C{self.removeAllCalled} and return C{self.selectables}.
347         """
348         self.removeAllCalled += 1
349         return self.selectables
350
351
352
353 class StubErrorReporter(object):
354     """
355     A subset of L{twisted.trial.itrial.IReporter} which records L{addError}
356     calls.
357
358     @ivar errors: List of two-tuples of (test, error) which were passed to
359         L{addError}.
360     """
361
362     def __init__(self):
363         self.errors = []
364
365
366     def addError(self, test, error):
367         """
368         Record parameters in C{self.errors}.
369         """
370         self.errors.append((test, error))
371
372
373
374 class JanitorTests(TestCase):
375     """
376     Tests for L{_Janitor}!
377     """
378
379     def test_cleanPendingSpinsReactor(self):
380         """
381         During pending-call cleanup, the reactor will be spun twice with an
382         instant timeout. This is not a requirement, it is only a test for
383         current behavior. Hopefully Trial will eventually not do this kind of
384         reactor stuff.
385         """
386         reactor = StubReactor([])
387         jan = _Janitor(None, None, reactor=reactor)
388         jan._cleanPending()
389         self.assertEqual(reactor.iterations, [0, 0])
390
391
392     def test_cleanPendingCancelsCalls(self):
393         """
394         During pending-call cleanup, the janitor cancels pending timed calls.
395         """
396         def func():
397             return "Lulz"
398         cancelled = []
399         delayedCall = DelayedCall(300, func, (), {},
400                                   cancelled.append, lambda x: None)
401         reactor = StubReactor([delayedCall])
402         jan = _Janitor(None, None, reactor=reactor)
403         jan._cleanPending()
404         self.assertEqual(cancelled, [delayedCall])
405
406
407     def test_cleanPendingReturnsDelayedCallStrings(self):
408         """
409         The Janitor produces string representations of delayed calls from the
410         delayed call cleanup method. It gets the string representations
411         *before* cancelling the calls; this is important because cancelling the
412         call removes critical debugging information from the string
413         representation.
414         """
415         delayedCall = DelayedCall(300, lambda: None, (), {},
416                                   lambda x: None, lambda x: None,
417                                   seconds=lambda: 0)
418         delayedCallString = str(delayedCall)
419         reactor = StubReactor([delayedCall])
420         jan = _Janitor(None, None, reactor=reactor)
421         strings = jan._cleanPending()
422         self.assertEqual(strings, [delayedCallString])
423
424
425     def test_cleanReactorRemovesSelectables(self):
426         """
427         The Janitor will remove selectables during reactor cleanup.
428         """
429         reactor = StubReactor([])
430         jan = _Janitor(None, None, reactor=reactor)
431         jan._cleanReactor()
432         self.assertEqual(reactor.removeAllCalled, 1)
433
434
435     def test_cleanReactorKillsProcesses(self):
436         """
437         The Janitor will kill processes during reactor cleanup.
438         """
439         class StubProcessTransport(object):
440             """
441             A stub L{IProcessTransport} provider which records signals.
442             @ivar signals: The signals passed to L{signalProcess}.
443             """
444             implements(IProcessTransport)
445
446             def __init__(self):
447                 self.signals = []
448
449             def signalProcess(self, signal):
450                 """
451                 Append C{signal} to C{self.signals}.
452                 """
453                 self.signals.append(signal)
454
455         pt = StubProcessTransport()
456         reactor = StubReactor([], [pt])
457         jan = _Janitor(None, None, reactor=reactor)
458         jan._cleanReactor()
459         self.assertEqual(pt.signals, ["KILL"])
460
461
462     def test_cleanReactorReturnsSelectableStrings(self):
463         """
464         The Janitor returns string representations of the selectables that it
465         cleaned up from the reactor cleanup method.
466         """
467         class Selectable(object):
468             """
469             A stub Selectable which only has an interesting string
470             representation.
471             """
472             def __repr__(self):
473                 return "(SELECTABLE!)"
474
475         reactor = StubReactor([], [Selectable()])
476         jan = _Janitor(None, None, reactor=reactor)
477         self.assertEqual(jan._cleanReactor(), ["(SELECTABLE!)"])
478
479
480     def test_postCaseCleanupNoErrors(self):
481         """
482         The post-case cleanup method will return True and not call C{addError}
483         on the result if there are no pending calls.
484         """
485         reactor = StubReactor([])
486         test = object()
487         reporter = StubErrorReporter()
488         jan = _Janitor(test, reporter, reactor=reactor)
489         self.assertTrue(jan.postCaseCleanup())
490         self.assertEqual(reporter.errors, [])
491
492
493     def test_postCaseCleanupWithErrors(self):
494         """
495         The post-case cleanup method will return False and call C{addError} on
496         the result with a L{DirtyReactorAggregateError} Failure if there are
497         pending calls.
498         """
499         delayedCall = DelayedCall(300, lambda: None, (), {},
500                                   lambda x: None, lambda x: None,
501                                   seconds=lambda: 0)
502         delayedCallString = str(delayedCall)
503         reactor = StubReactor([delayedCall], [])
504         test = object()
505         reporter = StubErrorReporter()
506         jan = _Janitor(test, reporter, reactor=reactor)
507         self.assertFalse(jan.postCaseCleanup())
508         self.assertEqual(len(reporter.errors), 1)
509         self.assertEqual(reporter.errors[0][1].value.delayedCalls,
510                           [delayedCallString])
511
512
513     def test_postClassCleanupNoErrors(self):
514         """
515         The post-class cleanup method will not call C{addError} on the result
516         if there are no pending calls or selectables.
517         """
518         reactor = StubReactor([])
519         test = object()
520         reporter = StubErrorReporter()
521         jan = _Janitor(test, reporter, reactor=reactor)
522         jan.postClassCleanup()
523         self.assertEqual(reporter.errors, [])
524
525
526     def test_postClassCleanupWithPendingCallErrors(self):
527         """
528         The post-class cleanup method call C{addError} on the result with a
529         L{DirtyReactorAggregateError} Failure if there are pending calls.
530         """
531         delayedCall = DelayedCall(300, lambda: None, (), {},
532                                   lambda x: None, lambda x: None,
533                                   seconds=lambda: 0)
534         delayedCallString = str(delayedCall)
535         reactor = StubReactor([delayedCall], [])
536         test = object()
537         reporter = StubErrorReporter()
538         jan = _Janitor(test, reporter, reactor=reactor)
539         jan.postClassCleanup()
540         self.assertEqual(len(reporter.errors), 1)
541         self.assertEqual(reporter.errors[0][1].value.delayedCalls,
542                           [delayedCallString])
543
544
545     def test_postClassCleanupWithSelectableErrors(self):
546         """
547         The post-class cleanup method call C{addError} on the result with a
548         L{DirtyReactorAggregateError} Failure if there are selectables.
549         """
550         selectable = "SELECTABLE HERE"
551         reactor = StubReactor([], [selectable])
552         test = object()
553         reporter = StubErrorReporter()
554         jan = _Janitor(test, reporter, reactor=reactor)
555         jan.postClassCleanup()
556         self.assertEqual(len(reporter.errors), 1)
557         self.assertEqual(reporter.errors[0][1].value.selectables,
558                           [repr(selectable)])
559