Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / trial / test / test_warning.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for Trial's interaction with the Python warning system.
6 """
7
8 import sys, warnings
9 from StringIO import StringIO
10
11 from twisted.python.filepath import FilePath
12 from twisted.trial.unittest import (TestCase, _collectWarnings,
13                                     _setWarningRegistryToNone)
14 from twisted.trial.reporter import TestResult
15
16 class Mask(object):
17     """
18     Hide a L{TestCase} definition from trial's automatic discovery mechanism.
19     """
20     class MockTests(TestCase):
21         """
22         A test case which is used by L{FlushWarningsTests} to verify behavior
23         which cannot be verified by code inside a single test method.
24         """
25         message = "some warning text"
26         category = UserWarning
27
28         def test_unflushed(self):
29             """
30             Generate a warning and don't flush it.
31             """
32             warnings.warn(self.message, self.category)
33
34
35         def test_flushed(self):
36             """
37             Generate a warning and flush it.
38             """
39             warnings.warn(self.message, self.category)
40             self.assertEqual(len(self.flushWarnings()), 1)
41
42
43
44 class FlushWarningsTests(TestCase):
45     """
46     Tests for L{TestCase.flushWarnings}, an API for examining the warnings
47     emitted so far in a test.
48     """
49
50     def assertDictSubset(self, set, subset):
51         """
52         Assert that all the keys present in C{subset} are also present in
53         C{set} and that the corresponding values are equal.
54         """
55         for k, v in subset.iteritems():
56             self.assertEqual(set[k], v)
57
58
59     def assertDictSubsets(self, sets, subsets):
60         """
61         For each pair of corresponding elements in C{sets} and C{subsets},
62         assert that the element from C{subsets} is a subset of the element from
63         C{sets}.
64         """
65         self.assertEqual(len(sets), len(subsets))
66         for a, b in zip(sets, subsets):
67             self.assertDictSubset(a, b)
68
69
70     def test_none(self):
71         """
72         If no warnings are emitted by a test, L{TestCase.flushWarnings} returns
73         an empty list.
74         """
75         self.assertEqual(self.flushWarnings(), [])
76
77
78     def test_several(self):
79         """
80         If several warnings are emitted by a test, L{TestCase.flushWarnings}
81         returns a list containing all of them.
82         """
83         firstMessage = "first warning message"
84         firstCategory = UserWarning
85         warnings.warn(message=firstMessage, category=firstCategory)
86
87         secondMessage = "second warning message"
88         secondCategory = RuntimeWarning
89         warnings.warn(message=secondMessage, category=secondCategory)
90
91         self.assertDictSubsets(
92             self.flushWarnings(),
93             [{'category': firstCategory, 'message': firstMessage},
94              {'category': secondCategory, 'message': secondMessage}])
95
96
97     def test_repeated(self):
98         """
99         The same warning triggered twice from the same place is included twice
100         in the list returned by L{TestCase.flushWarnings}.
101         """
102         message = "the message"
103         category = RuntimeWarning
104         for i in range(2):
105             warnings.warn(message=message, category=category)
106
107         self.assertDictSubsets(
108             self.flushWarnings(),
109             [{'category': category, 'message': message}] * 2)
110
111
112     def test_cleared(self):
113         """
114         After a particular warning event has been returned by
115         L{TestCase.flushWarnings}, it is not returned by subsequent calls.
116         """
117         message = "the message"
118         category = RuntimeWarning
119         warnings.warn(message=message, category=category)
120         self.assertDictSubsets(
121             self.flushWarnings(),
122             [{'category': category, 'message': message}])
123         self.assertEqual(self.flushWarnings(), [])
124
125
126     def test_unflushed(self):
127         """
128         Any warnings emitted by a test which are not flushed are emitted to the
129         Python warning system.
130         """
131         result = TestResult()
132         case = Mask.MockTests('test_unflushed')
133         case.run(result)
134         warningsShown = self.flushWarnings([Mask.MockTests.test_unflushed])
135         self.assertEqual(warningsShown[0]['message'], 'some warning text')
136         self.assertIdentical(warningsShown[0]['category'], UserWarning)
137
138         where = case.test_unflushed.im_func.func_code
139         filename = where.co_filename
140         # If someone edits MockTests.test_unflushed, the value added to
141         # firstlineno might need to change.
142         lineno = where.co_firstlineno + 4
143
144         self.assertEqual(warningsShown[0]['filename'], filename)
145         self.assertEqual(warningsShown[0]['lineno'], lineno)
146
147         self.assertEqual(len(warningsShown), 1)
148
149
150     def test_flushed(self):
151         """
152         Any warnings emitted by a test which are flushed are not emitted to the
153         Python warning system.
154         """
155         result = TestResult()
156         case = Mask.MockTests('test_flushed')
157         output = StringIO()
158         monkey = self.patch(sys, 'stdout', output)
159         case.run(result)
160         monkey.restore()
161         self.assertEqual(output.getvalue(), "")
162
163
164     def test_warningsConfiguredAsErrors(self):
165         """
166         If a warnings filter has been installed which turns warnings into
167         exceptions, tests have an error added to the reporter for them for each
168         unflushed warning.
169         """
170         class CustomWarning(Warning):
171             pass
172
173         result = TestResult()
174         case = Mask.MockTests('test_unflushed')
175         case.category = CustomWarning
176
177         originalWarnings = warnings.filters[:]
178         try:
179             warnings.simplefilter('error')
180             case.run(result)
181             self.assertEqual(len(result.errors), 1)
182             self.assertIdentical(result.errors[0][0], case)
183             result.errors[0][1].trap(CustomWarning)
184         finally:
185             warnings.filters[:] = originalWarnings
186
187
188     def test_flushedWarningsConfiguredAsErrors(self):
189         """
190         If a warnings filter has been installed which turns warnings into
191         exceptions, tests which emit those warnings but flush them do not have
192         an error added to the reporter.
193         """
194         class CustomWarning(Warning):
195             pass
196
197         result = TestResult()
198         case = Mask.MockTests('test_flushed')
199         case.category = CustomWarning
200
201         originalWarnings = warnings.filters[:]
202         try:
203             warnings.simplefilter('error')
204             case.run(result)
205             self.assertEqual(result.errors, [])
206         finally:
207             warnings.filters[:] = originalWarnings
208
209
210     def test_multipleFlushes(self):
211         """
212         Any warnings emitted after a call to L{TestCase.flushWarnings} can be
213         flushed by another call to L{TestCase.flushWarnings}.
214         """
215         warnings.warn("first message")
216         self.assertEqual(len(self.flushWarnings()), 1)
217         warnings.warn("second message")
218         self.assertEqual(len(self.flushWarnings()), 1)
219
220
221     def test_filterOnOffendingFunction(self):
222         """
223         The list returned by L{TestCase.flushWarnings} includes only those
224         warnings which refer to the source of the function passed as the value
225         for C{offendingFunction}, if a value is passed for that parameter.
226         """
227         firstMessage = "first warning text"
228         firstCategory = UserWarning
229         def one():
230             warnings.warn(firstMessage, firstCategory, stacklevel=1)
231
232         secondMessage = "some text"
233         secondCategory = RuntimeWarning
234         def two():
235             warnings.warn(secondMessage, secondCategory, stacklevel=1)
236
237         one()
238         two()
239
240         self.assertDictSubsets(
241             self.flushWarnings(offendingFunctions=[one]),
242             [{'category': firstCategory, 'message': firstMessage}])
243         self.assertDictSubsets(
244             self.flushWarnings(offendingFunctions=[two]),
245             [{'category': secondCategory, 'message': secondMessage}])
246
247
248     def test_functionBoundaries(self):
249         """
250         Verify that warnings emitted at the very edges of a function are still
251         determined to be emitted from that function.
252         """
253         def warner():
254             warnings.warn("first line warning")
255             warnings.warn("internal line warning")
256             warnings.warn("last line warning")
257
258         warner()
259         self.assertEqual(
260             len(self.flushWarnings(offendingFunctions=[warner])), 3)
261
262
263     def test_invalidFilter(self):
264         """
265         If an object which is neither a function nor a method is included in
266         the C{offendingFunctions} list, L{TestCase.flushWarnings} raises
267         L{ValueError}.  Such a call flushes no warnings.
268         """
269         warnings.warn("oh no")
270         self.assertRaises(ValueError, self.flushWarnings, [None])
271         self.assertEqual(len(self.flushWarnings()), 1)
272
273
274     def test_missingSource(self):
275         """
276         Warnings emitted by a function the source code of which is not
277         available can still be flushed.
278         """
279         package = FilePath(self.mktemp()).child('twisted_private_helper')
280         package.makedirs()
281         package.child('__init__.py').setContent('')
282         package.child('missingsourcefile.py').setContent('''
283 import warnings
284 def foo():
285     warnings.warn("oh no")
286 ''')
287         sys.path.insert(0, package.parent().path)
288         self.addCleanup(sys.path.remove, package.parent().path)
289         from twisted_private_helper import missingsourcefile
290         self.addCleanup(sys.modules.pop, 'twisted_private_helper')
291         self.addCleanup(sys.modules.pop, missingsourcefile.__name__)
292         package.child('missingsourcefile.py').remove()
293
294         missingsourcefile.foo()
295         self.assertEqual(len(self.flushWarnings([missingsourcefile.foo])), 1)
296
297
298     def test_renamedSource(self):
299         """
300         Warnings emitted by a function defined in a file which has been renamed
301         since it was initially compiled can still be flushed.
302
303         This is testing the code which specifically supports working around the
304         unfortunate behavior of CPython to write a .py source file name into
305         the .pyc files it generates and then trust that it is correct in
306         various places.  If source files are renamed, .pyc files may not be
307         regenerated, but they will contain incorrect filenames.
308         """
309         package = FilePath(self.mktemp()).child('twisted_private_helper')
310         package.makedirs()
311         package.child('__init__.py').setContent('')
312         package.child('module.py').setContent('''
313 import warnings
314 def foo():
315     warnings.warn("oh no")
316 ''')
317         sys.path.insert(0, package.parent().path)
318         self.addCleanup(sys.path.remove, package.parent().path)
319
320         # Import it to cause pycs to be generated
321         from twisted_private_helper import module
322
323         # Clean up the state resulting from that import; we're not going to use
324         # this module, so it should go away.
325         del sys.modules['twisted_private_helper']
326         del sys.modules[module.__name__]
327
328         # Rename the source directory
329         package.moveTo(package.sibling('twisted_renamed_helper'))
330
331         # Import the newly renamed version
332         from twisted_renamed_helper import module
333         self.addCleanup(sys.modules.pop, 'twisted_renamed_helper')
334         self.addCleanup(sys.modules.pop, module.__name__)
335
336         # Generate the warning
337         module.foo()
338
339         # Flush it
340         self.assertEqual(len(self.flushWarnings([module.foo])), 1)
341
342
343
344 class FakeWarning(Warning):
345     pass
346
347
348
349 class CollectWarningsTests(TestCase):
350     """
351     Tests for L{_collectWarnings}.
352     """
353     def test_callsObserver(self):
354         """
355         L{_collectWarnings} calls the observer with each emitted warning.
356         """
357         firstMessage = "dummy calls observer warning"
358         secondMessage = firstMessage[::-1]
359         events = []
360         def f():
361             events.append('call')
362             warnings.warn(firstMessage)
363             warnings.warn(secondMessage)
364             events.append('returning')
365
366         _collectWarnings(events.append, f)
367
368         self.assertEqual(events[0], 'call')
369         self.assertEqual(events[1].message, firstMessage)
370         self.assertEqual(events[2].message, secondMessage)
371         self.assertEqual(events[3], 'returning')
372         self.assertEqual(len(events), 4)
373
374
375     def test_suppresses(self):
376         """
377         Any warnings emitted by a call to a function passed to
378         L{_collectWarnings} are not actually emitted to the warning system.
379         """
380         output = StringIO()
381         self.patch(sys, 'stdout', output)
382         _collectWarnings(lambda x: None, warnings.warn, "text")
383         self.assertEqual(output.getvalue(), "")
384
385
386     def test_callsFunction(self):
387         """
388         L{_collectWarnings} returns the result of calling the callable passed to
389         it with the parameters given.
390         """
391         arguments = []
392         value = object()
393
394         def f(*args, **kwargs):
395             arguments.append((args, kwargs))
396             return value
397
398         result = _collectWarnings(lambda x: None, f, 1, 'a', b=2, c='d')
399         self.assertEqual(arguments, [((1, 'a'), {'b': 2, 'c': 'd'})])
400         self.assertIdentical(result, value)
401
402
403     def test_duplicateWarningCollected(self):
404         """
405         Subsequent emissions of a warning from a particular source site can be
406         collected by L{_collectWarnings}.  In particular, the per-module
407         emitted-warning cache should be bypassed (I{__warningregistry__}).
408         """
409         # Make sure the worst case is tested: if __warningregistry__ isn't in a
410         # module's globals, then the warning system will add it and start using
411         # it to avoid emitting duplicate warnings.  Delete __warningregistry__
412         # to ensure that even modules which are first imported as a test is
413         # running still interact properly with the warning system.
414         global __warningregistry__
415         del __warningregistry__
416
417         def f():
418             warnings.warn("foo")
419         warnings.simplefilter('default')
420         f()
421         events = []
422         _collectWarnings(events.append, f)
423         self.assertEqual(len(events), 1)
424         self.assertEqual(events[0].message, "foo")
425         self.assertEqual(len(self.flushWarnings()), 1)
426
427
428     def test_immutableObject(self):
429         """
430         L{_collectWarnings}'s behavior is not altered by the presence of an
431         object which cannot have attributes set on it as a value in
432         C{sys.modules}.
433         """
434         key = object()
435         sys.modules[key] = key
436         self.addCleanup(sys.modules.pop, key)
437         self.test_duplicateWarningCollected()
438
439
440     def test_setWarningRegistryChangeWhileIterating(self):
441         """
442         If the dictionary passed to L{_setWarningRegistryToNone} changes size
443         partway through the process, C{_setWarningRegistryToNone} continues to
444         set C{__warningregistry__} to C{None} on the rest of the values anyway.
445
446
447         This might be caused by C{sys.modules} containing something that's not
448         really a module and imports things on setattr.  py.test does this, as
449         does L{twisted.python.deprecate.deprecatedModuleAttribute}.
450         """
451         d = {}
452
453         class A(object):
454             def __init__(self, key):
455                 self.__dict__['_key'] = key
456
457             def __setattr__(self, value, item):
458                 d[self._key] = None
459
460         key1 = object()
461         key2 = object()
462         d[key1] = A(key2)
463
464         key3 = object()
465         key4 = object()
466         d[key3] = A(key4)
467
468         _setWarningRegistryToNone(d)
469
470         # If both key2 and key4 were added, then both A instanced were
471         # processed.
472         self.assertEqual(set([key1, key2, key3, key4]), set(d.keys()))