Imported Upstream version 12.1.0
[contrib/python-twisted.git] / twisted / test / generator_failure_tests.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Python 2.5+ test cases for failures thrown into generators.
6 """
7
8 import sys
9 import traceback
10
11 from twisted.trial.unittest import TestCase
12
13 from twisted.python.failure import Failure
14 from twisted.internet import defer
15
16 # Re-implement getDivisionFailure here instead of using the one in
17 # test_failure.py in order to avoid creating a cyclic dependency.
18 def getDivisionFailure():
19     try:
20         1/0
21     except:
22         f = Failure()
23     return f
24
25
26
27 class TwoPointFiveFailureTests(TestCase):
28
29     def test_inlineCallbacksTracebacks(self):
30         """
31         inlineCallbacks that re-raise tracebacks into their deferred
32         should not lose their tracebacsk.
33         """
34         f = getDivisionFailure()
35         d = defer.Deferred()
36         try:
37             f.raiseException()
38         except:
39             d.errback()
40
41         failures = []
42         def collect_error(result):
43             failures.append(result)
44
45         def ic(d):
46             yield d
47         ic = defer.inlineCallbacks(ic)
48         ic(d).addErrback(collect_error)
49
50         newFailure, = failures
51         self.assertEqual(
52             traceback.extract_tb(newFailure.getTracebackObject())[-1][-1],
53             "1/0"
54         )
55
56
57     def _throwIntoGenerator(self, f, g):
58         try:
59             f.throwExceptionIntoGenerator(g)
60         except StopIteration:
61             pass
62         else:
63             self.fail("throwExceptionIntoGenerator should have raised "
64                       "StopIteration")
65
66     def test_throwExceptionIntoGenerator(self):
67         """
68         It should be possible to throw the exception that a Failure
69         represents into a generator.
70         """
71         stuff = []
72         def generator():
73             try:
74                 yield
75             except:
76                 stuff.append(sys.exc_info())
77             else:
78                 self.fail("Yield should have yielded exception.")
79         g = generator()
80         f = getDivisionFailure()
81         g.next()
82         self._throwIntoGenerator(f, g)
83
84         self.assertEqual(stuff[0][0], ZeroDivisionError)
85         self.assertTrue(isinstance(stuff[0][1], ZeroDivisionError))
86
87         self.assertEqual(traceback.extract_tb(stuff[0][2])[-1][-1], "1/0")
88
89
90     def test_findFailureInGenerator(self):
91         """
92         Within an exception handler, it should be possible to find the
93         original Failure that caused the current exception (if it was
94         caused by throwExceptionIntoGenerator).
95         """
96         f = getDivisionFailure()
97         f.cleanFailure()
98
99         foundFailures = []
100         def generator():
101             try:
102                 yield
103             except:
104                 foundFailures.append(Failure._findFailure())
105             else:
106                 self.fail("No exception sent to generator")
107
108         g = generator()
109         g.next()
110         self._throwIntoGenerator(f, g)
111
112         self.assertEqual(foundFailures, [f])
113
114
115     def test_failureConstructionFindsOriginalFailure(self):
116         """
117         When a Failure is constructed in the context of an exception
118         handler that is handling an exception raised by
119         throwExceptionIntoGenerator, the new Failure should be chained to that
120         original Failure.
121         """
122         f = getDivisionFailure()
123         f.cleanFailure()
124
125         newFailures = []
126
127         def generator():
128             try:
129                 yield
130             except:
131                 newFailures.append(Failure())
132             else:
133                 self.fail("No exception sent to generator")
134         g = generator()
135         g.next()
136         self._throwIntoGenerator(f, g)
137
138         self.assertEqual(len(newFailures), 1)
139         self.assertEqual(newFailures[0].getTraceback(), f.getTraceback())
140
141     def test_ambiguousFailureInGenerator(self):
142         """
143         When a generator reraises a different exception,
144         L{Failure._findFailure} inside the generator should find the reraised
145         exception rather than original one.
146         """
147         def generator():
148             try:
149                 try:
150                     yield
151                 except:
152                     [][1]
153             except:
154                 self.assertIsInstance(Failure().value, IndexError)
155         g = generator()
156         g.next()
157         f = getDivisionFailure()
158         self._throwIntoGenerator(f, g)
159
160     def test_ambiguousFailureFromGenerator(self):
161         """
162         When a generator reraises a different exception,
163         L{Failure._findFailure} above the generator should find the reraised
164         exception rather than original one.
165         """
166         def generator():
167             try:
168                 yield
169             except:
170                 [][1]
171         g = generator()
172         g.next()
173         f = getDivisionFailure()
174         try:
175             self._throwIntoGenerator(f, g)
176         except:
177             self.assertIsInstance(Failure().value, IndexError)