Imported Upstream version 12.1.0
[contrib/python-twisted.git] / twisted / internet / test / test_threads.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for implementations of L{IReactorThreads}.
6 """
7
8 __metaclass__ = type
9
10 from weakref import ref
11 import gc, threading
12
13 from twisted.python.threadable import isInIOThread
14 from twisted.internet.test.reactormixins import ReactorBuilder
15 from twisted.python.threadpool import ThreadPool
16
17
18 class ThreadTestsBuilder(ReactorBuilder):
19     """
20     Builder for defining tests relating to L{IReactorThreads}.
21     """
22     def test_getThreadPool(self):
23         """
24         C{reactor.getThreadPool()} returns an instance of L{ThreadPool} which
25         starts when C{reactor.run()} is called and stops before it returns.
26         """
27         state = []
28         reactor = self.buildReactor()
29
30         pool = reactor.getThreadPool()
31         self.assertIsInstance(pool, ThreadPool)
32         self.assertFalse(
33             pool.started, "Pool should not start before reactor.run")
34
35         def f():
36             # Record the state for later assertions
37             state.append(pool.started)
38             state.append(pool.joined)
39             reactor.stop()
40
41         reactor.callWhenRunning(f)
42         self.runReactor(reactor, 2)
43
44         self.assertTrue(
45             state[0], "Pool should start after reactor.run")
46         self.assertFalse(
47             state[1], "Pool should not be joined before reactor.stop")
48         self.assertTrue(
49             pool.joined,
50             "Pool should be stopped after reactor.run returns")
51
52
53     def test_suggestThreadPoolSize(self):
54         """
55         C{reactor.suggestThreadPoolSize()} sets the maximum size of the reactor
56         threadpool.
57         """
58         reactor = self.buildReactor()
59         reactor.suggestThreadPoolSize(17)
60         pool = reactor.getThreadPool()
61         self.assertEqual(pool.max, 17)
62
63
64     def test_delayedCallFromThread(self):
65         """
66         A function scheduled with L{IReactorThreads.callFromThread} invoked
67         from a delayed call is run immediately in the next reactor iteration.
68
69         When invoked from the reactor thread, previous implementations of
70         L{IReactorThreads.callFromThread} would skip the pipe/socket based wake
71         up step, assuming the reactor would wake up on its own.  However, this
72         resulted in the reactor not noticing a insert into the thread queue at
73         the right time (in this case, after the thread queue has been processed
74         for that reactor iteration).
75         """
76         reactor = self.buildReactor()
77
78         def threadCall():
79             reactor.stop()
80
81         # Set up the use of callFromThread being tested.
82         reactor.callLater(0, reactor.callFromThread, threadCall)
83
84         before = reactor.seconds()
85         self.runReactor(reactor, 60)
86         after = reactor.seconds()
87
88         # We specified a timeout of 60 seconds.  The timeout code in runReactor
89         # probably won't actually work, though.  If the reactor comes out of
90         # the event notification API just a little bit early, say after 59.9999
91         # seconds instead of after 60 seconds, then the queued thread call will
92         # get processed but the timeout delayed call runReactor sets up won't!
93         # Then the reactor will stop and runReactor will return without the
94         # timeout firing.  As it turns out, select() and poll() are quite
95         # likely to return *slightly* earlier than we ask them to, so the
96         # timeout will rarely happen, even if callFromThread is broken.  So,
97         # instead we'll measure the elapsed time and make sure it's something
98         # less than about half of the timeout we specified.  This is heuristic.
99         # It assumes that select() won't ever return after 30 seconds when we
100         # asked it to timeout after 60 seconds.  And of course like all
101         # time-based tests, it's slightly non-deterministic.  If the OS doesn't
102         # schedule this process for 30 seconds, then the test might fail even
103         # if callFromThread is working.
104         self.assertTrue(after - before < 30)
105
106
107     def test_callFromThread(self):
108         """
109         A function scheduled with L{IReactorThreads.callFromThread} invoked
110         from another thread is run in the reactor thread.
111         """
112         reactor = self.buildReactor()
113         result = []
114
115         def threadCall():
116             result.append(threading.currentThread())
117             reactor.stop()
118         reactor.callLater(0, reactor.callInThread,
119                           reactor.callFromThread, threadCall)
120         self.runReactor(reactor, 5)
121
122         self.assertEquals(result, [threading.currentThread()])
123
124
125     def test_stopThreadPool(self):
126         """
127         When the reactor stops, L{ReactorBase._stopThreadPool} drops the
128         reactor's direct reference to its internal threadpool and removes
129         the associated startup and shutdown triggers.
130
131         This is the case of the thread pool being created before the reactor
132         is run.
133         """
134         reactor = self.buildReactor()
135         threadpool = ref(reactor.getThreadPool())
136         reactor.callWhenRunning(reactor.stop)
137         self.runReactor(reactor)
138         gc.collect()
139         self.assertIdentical(threadpool(), None)
140
141
142     def test_stopThreadPoolWhenStartedAfterReactorRan(self):
143         """
144         We must handle the case of shutting down the thread pool when it was
145         started after the reactor was run in a special way.
146
147         Some implementation background: The thread pool is started with
148         callWhenRunning, which only returns a system trigger ID when it is
149         invoked before the reactor is started.
150
151         This is the case of the thread pool being created after the reactor
152         is started.
153         """
154         reactor = self.buildReactor()
155         threadPoolRefs = []
156         def acquireThreadPool():
157             threadPoolRefs.append(ref(reactor.getThreadPool()))
158             reactor.stop()
159         reactor.callWhenRunning(acquireThreadPool)
160         self.runReactor(reactor)
161         gc.collect()
162         self.assertIdentical(threadPoolRefs[0](), None)
163
164
165     def test_cleanUpThreadPoolEvenBeforeReactorIsRun(self):
166         """
167         When the reactor has its shutdown event fired before it is run, the
168         thread pool is completely destroyed.
169
170         For what it's worth, the reason we support this behavior at all is
171         because Trial does this.
172
173         This is the case of the thread pool being created without the reactor
174         being started at al.
175         """
176         reactor = self.buildReactor()
177         threadPoolRef = ref(reactor.getThreadPool())
178         reactor.fireSystemEvent("shutdown")
179         gc.collect()
180         self.assertIdentical(threadPoolRef(), None)
181
182
183     def test_isInIOThread(self):
184         """
185         The reactor registers itself as the I/O thread when it runs so that
186         L{twisted.python.threadable.isInIOThread} returns C{True} if it is
187         called in the thread the reactor is running in.
188         """
189         results = []
190         reactor = self.buildReactor()
191         def check():
192             results.append(isInIOThread())
193             reactor.stop()
194         reactor.callWhenRunning(check)
195         self.runReactor(reactor)
196         self.assertEqual([True], results)
197
198
199     def test_isNotInIOThread(self):
200         """
201         The reactor registers itself as the I/O thread when it runs so that
202         L{twisted.python.threadable.isInIOThread} returns C{False} if it is
203         called in a different thread than the reactor is running in.
204         """
205         results = []
206         reactor = self.buildReactor()
207         def check():
208             results.append(isInIOThread())
209             reactor.callFromThread(reactor.stop)
210         reactor.callInThread(check)
211         self.runReactor(reactor)
212         self.assertEqual([False], results)
213
214
215 globals().update(ThreadTestsBuilder.makeTestCaseClasses())