1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Tests for implementations of L{IReactorThreads}.
10 from weakref import ref
13 from twisted.python.threadable import isInIOThread
14 from twisted.internet.test.reactormixins import ReactorBuilder
15 from twisted.python.threadpool import ThreadPool
18 class ThreadTestsBuilder(ReactorBuilder):
20 Builder for defining tests relating to L{IReactorThreads}.
22 def test_getThreadPool(self):
24 C{reactor.getThreadPool()} returns an instance of L{ThreadPool} which
25 starts when C{reactor.run()} is called and stops before it returns.
28 reactor = self.buildReactor()
30 pool = reactor.getThreadPool()
31 self.assertIsInstance(pool, ThreadPool)
33 pool.started, "Pool should not start before reactor.run")
36 # Record the state for later assertions
37 state.append(pool.started)
38 state.append(pool.joined)
41 reactor.callWhenRunning(f)
42 self.runReactor(reactor, 2)
45 state[0], "Pool should start after reactor.run")
47 state[1], "Pool should not be joined before reactor.stop")
50 "Pool should be stopped after reactor.run returns")
53 def test_suggestThreadPoolSize(self):
55 C{reactor.suggestThreadPoolSize()} sets the maximum size of the reactor
58 reactor = self.buildReactor()
59 reactor.suggestThreadPoolSize(17)
60 pool = reactor.getThreadPool()
61 self.assertEqual(pool.max, 17)
64 def test_delayedCallFromThread(self):
66 A function scheduled with L{IReactorThreads.callFromThread} invoked
67 from a delayed call is run immediately in the next reactor iteration.
69 When invoked from the reactor thread, previous implementations of
70 L{IReactorThreads.callFromThread} would skip the pipe/socket based wake
71 up step, assuming the reactor would wake up on its own. However, this
72 resulted in the reactor not noticing a insert into the thread queue at
73 the right time (in this case, after the thread queue has been processed
74 for that reactor iteration).
76 reactor = self.buildReactor()
81 # Set up the use of callFromThread being tested.
82 reactor.callLater(0, reactor.callFromThread, threadCall)
84 before = reactor.seconds()
85 self.runReactor(reactor, 60)
86 after = reactor.seconds()
88 # We specified a timeout of 60 seconds. The timeout code in runReactor
89 # probably won't actually work, though. If the reactor comes out of
90 # the event notification API just a little bit early, say after 59.9999
91 # seconds instead of after 60 seconds, then the queued thread call will
92 # get processed but the timeout delayed call runReactor sets up won't!
93 # Then the reactor will stop and runReactor will return without the
94 # timeout firing. As it turns out, select() and poll() are quite
95 # likely to return *slightly* earlier than we ask them to, so the
96 # timeout will rarely happen, even if callFromThread is broken. So,
97 # instead we'll measure the elapsed time and make sure it's something
98 # less than about half of the timeout we specified. This is heuristic.
99 # It assumes that select() won't ever return after 30 seconds when we
100 # asked it to timeout after 60 seconds. And of course like all
101 # time-based tests, it's slightly non-deterministic. If the OS doesn't
102 # schedule this process for 30 seconds, then the test might fail even
103 # if callFromThread is working.
104 self.assertTrue(after - before < 30)
107 def test_callFromThread(self):
109 A function scheduled with L{IReactorThreads.callFromThread} invoked
110 from another thread is run in the reactor thread.
112 reactor = self.buildReactor()
116 result.append(threading.currentThread())
118 reactor.callLater(0, reactor.callInThread,
119 reactor.callFromThread, threadCall)
120 self.runReactor(reactor, 5)
122 self.assertEquals(result, [threading.currentThread()])
125 def test_stopThreadPool(self):
127 When the reactor stops, L{ReactorBase._stopThreadPool} drops the
128 reactor's direct reference to its internal threadpool and removes
129 the associated startup and shutdown triggers.
131 This is the case of the thread pool being created before the reactor
134 reactor = self.buildReactor()
135 threadpool = ref(reactor.getThreadPool())
136 reactor.callWhenRunning(reactor.stop)
137 self.runReactor(reactor)
139 self.assertIdentical(threadpool(), None)
142 def test_stopThreadPoolWhenStartedAfterReactorRan(self):
144 We must handle the case of shutting down the thread pool when it was
145 started after the reactor was run in a special way.
147 Some implementation background: The thread pool is started with
148 callWhenRunning, which only returns a system trigger ID when it is
149 invoked before the reactor is started.
151 This is the case of the thread pool being created after the reactor
154 reactor = self.buildReactor()
156 def acquireThreadPool():
157 threadPoolRefs.append(ref(reactor.getThreadPool()))
159 reactor.callWhenRunning(acquireThreadPool)
160 self.runReactor(reactor)
162 self.assertIdentical(threadPoolRefs[0](), None)
165 def test_cleanUpThreadPoolEvenBeforeReactorIsRun(self):
167 When the reactor has its shutdown event fired before it is run, the
168 thread pool is completely destroyed.
170 For what it's worth, the reason we support this behavior at all is
171 because Trial does this.
173 This is the case of the thread pool being created without the reactor
176 reactor = self.buildReactor()
177 threadPoolRef = ref(reactor.getThreadPool())
178 reactor.fireSystemEvent("shutdown")
180 self.assertIdentical(threadPoolRef(), None)
183 def test_isInIOThread(self):
185 The reactor registers itself as the I/O thread when it runs so that
186 L{twisted.python.threadable.isInIOThread} returns C{True} if it is
187 called in the thread the reactor is running in.
190 reactor = self.buildReactor()
192 results.append(isInIOThread())
194 reactor.callWhenRunning(check)
195 self.runReactor(reactor)
196 self.assertEqual([True], results)
199 def test_isNotInIOThread(self):
201 The reactor registers itself as the I/O thread when it runs so that
202 L{twisted.python.threadable.isInIOThread} returns C{False} if it is
203 called in a different thread than the reactor is running in.
206 reactor = self.buildReactor()
208 results.append(isInIOThread())
209 reactor.callFromThread(reactor.stop)
210 reactor.callInThread(check)
211 self.runReactor(reactor)
212 self.assertEqual([False], results)
215 globals().update(ThreadTestsBuilder.makeTestCaseClasses())