1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Tests for L{twisted.python.threadpool}
8 import pickle, time, weakref, gc, threading
10 from twisted.trial import unittest
11 from twisted.python import threadpool, threadable, failure, context
12 from twisted.internet import reactor
13 from twisted.internet.defer import Deferred
16 # See the end of this module for the remainder of the imports.
19 class Synchronization(object):
22 def __init__(self, N, waiting):
24 self.waiting = waiting
25 self.lock = threading.Lock()
29 # This is the testy part: this is supposed to be invoked
30 # serially from multiple threads. If that is actually the
31 # case, we will never fail to acquire this lock. If it is
32 # *not* the case, we might get here while someone else is
34 if self.lock.acquire(False):
35 if not len(self.runs) % 5:
36 time.sleep(0.0002) # Constant selected based on
37 # empirical data to maximize the
38 # chance of a quick failure if this
44 # This is just the only way I can think of to wake up the test
45 # method. It doesn't actually have anything to do with the
48 self.runs.append(None)
49 if len(self.runs) == self.N:
50 self.waiting.release()
53 synchronized = ["run"]
54 threadable.synchronize(Synchronization)
58 class ThreadPoolTestCase(unittest.TestCase):
62 def _waitForLock(self, lock):
63 for i in xrange(1000000):
64 if lock.acquire(False):
68 self.fail("A long time passed without succeeding")
71 def test_attributes(self):
73 L{ThreadPool.min} and L{ThreadPool.max} are set to the values passed to
74 L{ThreadPool.__init__}.
76 pool = threadpool.ThreadPool(12, 22)
77 self.assertEqual(pool.min, 12)
78 self.assertEqual(pool.max, 22)
83 L{ThreadPool.start} creates the minimum number of threads specified.
85 pool = threadpool.ThreadPool(0, 5)
87 self.addCleanup(pool.stop)
88 self.assertEqual(len(pool.threads), 0)
90 pool = threadpool.ThreadPool(3, 10)
91 self.assertEqual(len(pool.threads), 0)
93 self.addCleanup(pool.stop)
94 self.assertEqual(len(pool.threads), 3)
97 def test_threadCreationArguments(self):
99 Test that creating threads in the threadpool with application-level
100 objects as arguments doesn't results in those objects never being
101 freed, with the thread maintaining a reference to them as long as it
104 tp = threadpool.ThreadPool(0, 1)
106 self.addCleanup(tp.stop)
108 # Sanity check - no threads should have been started yet.
109 self.assertEqual(tp.threads, [])
111 # Here's our function
114 # weakref needs an object subclass
117 # And here's the unique object
120 workerRef = weakref.ref(worker)
121 uniqueRef = weakref.ref(unique)
124 tp.callInThread(worker, unique)
126 # Add an event to wait completion
127 event = threading.Event()
128 tp.callInThread(event.set)
129 event.wait(self.getTimeout())
134 self.assertEqual(uniqueRef(), None)
135 self.assertEqual(workerRef(), None)
138 def test_threadCreationArgumentsCallInThreadWithCallback(self):
140 As C{test_threadCreationArguments} above, but for
141 callInThreadWithCallback.
144 tp = threadpool.ThreadPool(0, 1)
146 self.addCleanup(tp.stop)
148 # Sanity check - no threads should have been started yet.
149 self.assertEqual(tp.threads, [])
151 # this holds references obtained in onResult
152 refdict = {} # name -> ref value
154 onResultWait = threading.Event()
155 onResultDone = threading.Event()
160 def onResult(success, result):
161 onResultWait.wait(self.getTimeout())
162 refdict['workerRef'] = workerRef()
163 refdict['uniqueRef'] = uniqueRef()
165 resultRef.append(weakref.ref(result))
167 # Here's our function
168 def worker(arg, test):
171 # weakref needs an object subclass
175 # And here's the unique object
178 onResultRef = weakref.ref(onResult)
179 workerRef = weakref.ref(worker)
180 uniqueRef = weakref.ref(unique)
183 tp.callInThreadWithCallback(onResult, worker, unique, test=unique)
189 # let onResult collect the refs
192 onResultDone.wait(self.getTimeout())
194 self.assertEqual(uniqueRef(), None)
195 self.assertEqual(workerRef(), None)
197 # XXX There's a race right here - has onResult in the worker thread
198 # returned and the locals in _worker holding it and the result been
203 self.assertEqual(onResultRef(), None)
204 self.assertEqual(resultRef[0](), None)
207 def test_persistence(self):
209 Threadpools can be pickled and unpickled, which should preserve the
210 number of threads and other parameters.
212 pool = threadpool.ThreadPool(7, 20)
214 self.assertEqual(pool.min, 7)
215 self.assertEqual(pool.max, 20)
217 # check that unpickled threadpool has same number of threads
218 copy = pickle.loads(pickle.dumps(pool))
220 self.assertEqual(copy.min, 7)
221 self.assertEqual(copy.max, 20)
224 def _threadpoolTest(self, method):
226 Test synchronization of calls made with C{method}, which should be
227 one of the mechanisms of the threadpool to execute work in threads.
229 # This is a schizophrenic test: it seems to be trying to test
230 # both the callInThread()/dispatch() behavior of the ThreadPool as well
231 # as the serialization behavior of threadable.synchronize(). It
232 # would probably make more sense as two much simpler tests.
235 tp = threadpool.ThreadPool()
237 self.addCleanup(tp.stop)
239 waiting = threading.Lock()
241 actor = Synchronization(N, waiting)
246 self._waitForLock(waiting)
248 self.failIf(actor.failures, "run() re-entered %d times" %
252 def test_callInThread(self):
254 Call C{_threadpoolTest} with C{callInThread}.
256 return self._threadpoolTest(
257 lambda tp, actor: tp.callInThread(actor.run))
260 def test_callInThreadException(self):
262 L{ThreadPool.callInThread} logs exceptions raised by the callable it
265 class NewError(Exception):
271 tp = threadpool.ThreadPool(0, 1)
272 tp.callInThread(raiseError)
276 errors = self.flushLoggedErrors(NewError)
277 self.assertEqual(len(errors), 1)
280 def test_callInThreadWithCallback(self):
282 L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
283 two-tuple of C{(True, result)} where C{result} is the value returned
284 by the callable supplied.
286 waiter = threading.Lock()
291 def onResult(success, result):
293 results.append(success)
294 results.append(result)
296 tp = threadpool.ThreadPool(0, 1)
297 tp.callInThreadWithCallback(onResult, lambda : "test")
301 self._waitForLock(waiter)
305 self.assertTrue(results[0])
306 self.assertEqual(results[1], "test")
309 def test_callInThreadWithCallbackExceptionInCallback(self):
311 L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
312 two-tuple of C{(False, failure)} where C{failure} represents the
313 exception raised by the callable supplied.
315 class NewError(Exception):
321 waiter = threading.Lock()
326 def onResult(success, result):
328 results.append(success)
329 results.append(result)
331 tp = threadpool.ThreadPool(0, 1)
332 tp.callInThreadWithCallback(onResult, raiseError)
336 self._waitForLock(waiter)
340 self.assertFalse(results[0])
341 self.assertTrue(isinstance(results[1], failure.Failure))
342 self.assertTrue(issubclass(results[1].type, NewError))
345 def test_callInThreadWithCallbackExceptionInOnResult(self):
347 L{ThreadPool.callInThreadWithCallback} logs the exception raised by
350 class NewError(Exception):
353 waiter = threading.Lock()
358 def onResult(success, result):
359 results.append(success)
360 results.append(result)
363 tp = threadpool.ThreadPool(0, 1)
364 tp.callInThreadWithCallback(onResult, lambda : None)
365 tp.callInThread(waiter.release)
369 self._waitForLock(waiter)
373 errors = self.flushLoggedErrors(NewError)
374 self.assertEqual(len(errors), 1)
376 self.assertTrue(results[0])
377 self.assertEqual(results[1], None)
380 def test_callbackThread(self):
382 L{ThreadPool.callInThreadWithCallback} calls the function it is
383 given and the C{onResult} callback in the same thread.
389 event = threading.Event()
391 def onResult(success, result):
392 threadIds.append(thread.get_ident())
396 threadIds.append(thread.get_ident())
398 tp = threadpool.ThreadPool(0, 1)
399 tp.callInThreadWithCallback(onResult, func)
401 self.addCleanup(tp.stop)
403 event.wait(self.getTimeout())
404 self.assertEqual(len(threadIds), 2)
405 self.assertEqual(threadIds[0], threadIds[1])
408 def test_callbackContext(self):
410 The context L{ThreadPool.callInThreadWithCallback} is invoked in is
411 shared by the context the callable and C{onResult} callback are
414 myctx = context.theContextTracker.currentContext().contexts[-1]
415 myctx['testing'] = 'this must be present'
419 event = threading.Event()
421 def onResult(success, result):
422 ctx = context.theContextTracker.currentContext().contexts[-1]
427 ctx = context.theContextTracker.currentContext().contexts[-1]
430 tp = threadpool.ThreadPool(0, 1)
431 tp.callInThreadWithCallback(onResult, func)
433 self.addCleanup(tp.stop)
435 event.wait(self.getTimeout())
437 self.assertEqual(len(contexts), 2)
438 self.assertEqual(myctx, contexts[0])
439 self.assertEqual(myctx, contexts[1])
442 def test_existingWork(self):
444 Work added to the threadpool before its start should be executed once
445 the threadpool is started: this is ensured by trying to release a lock
448 waiter = threading.Lock()
451 tp = threadpool.ThreadPool(0, 1)
452 tp.callInThread(waiter.release) # before start()
456 self._waitForLock(waiter)
462 class RaceConditionTestCase(unittest.TestCase):
464 self.event = threading.Event()
465 self.threadpool = threadpool.ThreadPool(0, 10)
466 self.threadpool.start()
471 self.threadpool.stop()
475 def test_synchronization(self):
477 Test a race condition: ensure that actions run in the pool synchronize
478 with actions run in the main thread.
480 timeout = self.getTimeout()
481 self.threadpool.callInThread(self.event.set)
482 self.event.wait(timeout)
485 self.threadpool.callInThread(self.event.wait)
486 self.threadpool.callInThread(self.event.set)
487 self.event.wait(timeout)
488 if not self.event.isSet():
490 self.fail("Actions not synchronized")
493 def test_singleThread(self):
495 The submission of a new job to a thread pool in response to the
496 C{onResult} callback does not cause a new thread to be added to the
499 This requires that the thread which calls C{onResult} to have first
500 marked itself as available so that when the new job is queued, that
501 thread may be considered to run it. This is desirable so that when
502 only N jobs are ever being executed in the thread pool at once only
503 N threads will ever be created.
505 # Ensure no threads running
506 self.assertEqual(self.threadpool.workers, 0)
508 loopDeferred = Deferred()
510 def onResult(success, counter):
511 reactor.callFromThread(submit, counter)
515 self.threadpool.callInThreadWithCallback(
516 onResult, lambda: counter - 1)
518 loopDeferred.callback(None)
521 # Ensure there is only one thread running.
522 self.assertEqual(self.threadpool.workers, 1)
524 loopDeferred.addCallback(cbLoop)