Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / test / test_threadpool.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for L{twisted.python.threadpool}
6 """
7
8 import pickle, time, weakref, gc, threading
9
10 from twisted.trial import unittest
11 from twisted.python import threadpool, threadable, failure, context
12 from twisted.internet import reactor
13 from twisted.internet.defer import Deferred
14
15 #
16 # See the end of this module for the remainder of the imports.
17 #
18
19 class Synchronization(object):
20     failures = 0
21
22     def __init__(self, N, waiting):
23         self.N = N
24         self.waiting = waiting
25         self.lock = threading.Lock()
26         self.runs = []
27
28     def run(self):
29         # This is the testy part: this is supposed to be invoked
30         # serially from multiple threads.  If that is actually the
31         # case, we will never fail to acquire this lock.  If it is
32         # *not* the case, we might get here while someone else is
33         # holding the lock.
34         if self.lock.acquire(False):
35             if not len(self.runs) % 5:
36                 time.sleep(0.0002) # Constant selected based on
37                                    # empirical data to maximize the
38                                    # chance of a quick failure if this
39                                    # code is broken.
40             self.lock.release()
41         else:
42             self.failures += 1
43
44         # This is just the only way I can think of to wake up the test
45         # method.  It doesn't actually have anything to do with the
46         # test.
47         self.lock.acquire()
48         self.runs.append(None)
49         if len(self.runs) == self.N:
50             self.waiting.release()
51         self.lock.release()
52
53     synchronized = ["run"]
54 threadable.synchronize(Synchronization)
55
56
57
58 class ThreadPoolTestCase(unittest.TestCase):
59     """
60     Test threadpools.
61     """
62     def _waitForLock(self, lock):
63         for i in xrange(1000000):
64             if lock.acquire(False):
65                 break
66             time.sleep(1e-5)
67         else:
68             self.fail("A long time passed without succeeding")
69
70
71     def test_attributes(self):
72         """
73         L{ThreadPool.min} and L{ThreadPool.max} are set to the values passed to
74         L{ThreadPool.__init__}.
75         """
76         pool = threadpool.ThreadPool(12, 22)
77         self.assertEqual(pool.min, 12)
78         self.assertEqual(pool.max, 22)
79
80
81     def test_start(self):
82         """
83         L{ThreadPool.start} creates the minimum number of threads specified.
84         """
85         pool = threadpool.ThreadPool(0, 5)
86         pool.start()
87         self.addCleanup(pool.stop)
88         self.assertEqual(len(pool.threads), 0)
89
90         pool = threadpool.ThreadPool(3, 10)
91         self.assertEqual(len(pool.threads), 0)
92         pool.start()
93         self.addCleanup(pool.stop)
94         self.assertEqual(len(pool.threads), 3)
95
96
97     def test_threadCreationArguments(self):
98         """
99         Test that creating threads in the threadpool with application-level
100         objects as arguments doesn't results in those objects never being
101         freed, with the thread maintaining a reference to them as long as it
102         exists.
103         """
104         tp = threadpool.ThreadPool(0, 1)
105         tp.start()
106         self.addCleanup(tp.stop)
107
108         # Sanity check - no threads should have been started yet.
109         self.assertEqual(tp.threads, [])
110
111         # Here's our function
112         def worker(arg):
113             pass
114         # weakref needs an object subclass
115         class Dumb(object):
116             pass
117         # And here's the unique object
118         unique = Dumb()
119
120         workerRef = weakref.ref(worker)
121         uniqueRef = weakref.ref(unique)
122
123         # Put some work in
124         tp.callInThread(worker, unique)
125
126         # Add an event to wait completion
127         event = threading.Event()
128         tp.callInThread(event.set)
129         event.wait(self.getTimeout())
130
131         del worker
132         del unique
133         gc.collect()
134         self.assertEqual(uniqueRef(), None)
135         self.assertEqual(workerRef(), None)
136
137
138     def test_threadCreationArgumentsCallInThreadWithCallback(self):
139         """
140         As C{test_threadCreationArguments} above, but for
141         callInThreadWithCallback.
142         """
143
144         tp = threadpool.ThreadPool(0, 1)
145         tp.start()
146         self.addCleanup(tp.stop)
147
148         # Sanity check - no threads should have been started yet.
149         self.assertEqual(tp.threads, [])
150
151         # this holds references obtained in onResult
152         refdict = {} # name -> ref value
153
154         onResultWait = threading.Event()
155         onResultDone = threading.Event()
156
157         resultRef = []
158
159         # result callback
160         def onResult(success, result):
161             onResultWait.wait(self.getTimeout())
162             refdict['workerRef'] = workerRef()
163             refdict['uniqueRef'] = uniqueRef()
164             onResultDone.set()
165             resultRef.append(weakref.ref(result))
166
167         # Here's our function
168         def worker(arg, test):
169             return Dumb()
170
171         # weakref needs an object subclass
172         class Dumb(object):
173             pass
174
175         # And here's the unique object
176         unique = Dumb()
177
178         onResultRef = weakref.ref(onResult)
179         workerRef = weakref.ref(worker)
180         uniqueRef = weakref.ref(unique)
181
182         # Put some work in
183         tp.callInThreadWithCallback(onResult, worker, unique, test=unique)
184
185         del worker
186         del unique
187         gc.collect()
188
189         # let onResult collect the refs
190         onResultWait.set()
191         # wait for onResult
192         onResultDone.wait(self.getTimeout())
193
194         self.assertEqual(uniqueRef(), None)
195         self.assertEqual(workerRef(), None)
196
197         # XXX There's a race right here - has onResult in the worker thread
198         # returned and the locals in _worker holding it and the result been
199         # deleted yet?
200
201         del onResult
202         gc.collect()
203         self.assertEqual(onResultRef(), None)
204         self.assertEqual(resultRef[0](), None)
205
206
207     def test_persistence(self):
208         """
209         Threadpools can be pickled and unpickled, which should preserve the
210         number of threads and other parameters.
211         """
212         pool = threadpool.ThreadPool(7, 20)
213
214         self.assertEqual(pool.min, 7)
215         self.assertEqual(pool.max, 20)
216
217         # check that unpickled threadpool has same number of threads
218         copy = pickle.loads(pickle.dumps(pool))
219
220         self.assertEqual(copy.min, 7)
221         self.assertEqual(copy.max, 20)
222
223
224     def _threadpoolTest(self, method):
225         """
226         Test synchronization of calls made with C{method}, which should be
227         one of the mechanisms of the threadpool to execute work in threads.
228         """
229         # This is a schizophrenic test: it seems to be trying to test
230         # both the callInThread()/dispatch() behavior of the ThreadPool as well
231         # as the serialization behavior of threadable.synchronize().  It
232         # would probably make more sense as two much simpler tests.
233         N = 10
234
235         tp = threadpool.ThreadPool()
236         tp.start()
237         self.addCleanup(tp.stop)
238
239         waiting = threading.Lock()
240         waiting.acquire()
241         actor = Synchronization(N, waiting)
242
243         for i in xrange(N):
244             method(tp, actor)
245
246         self._waitForLock(waiting)
247
248         self.failIf(actor.failures, "run() re-entered %d times" %
249                                     (actor.failures,))
250
251
252     def test_callInThread(self):
253         """
254         Call C{_threadpoolTest} with C{callInThread}.
255         """
256         return self._threadpoolTest(
257             lambda tp, actor: tp.callInThread(actor.run))
258
259
260     def test_callInThreadException(self):
261         """
262         L{ThreadPool.callInThread} logs exceptions raised by the callable it
263         is passed.
264         """
265         class NewError(Exception):
266             pass
267
268         def raiseError():
269             raise NewError()
270
271         tp = threadpool.ThreadPool(0, 1)
272         tp.callInThread(raiseError)
273         tp.start()
274         tp.stop()
275
276         errors = self.flushLoggedErrors(NewError)
277         self.assertEqual(len(errors), 1)
278
279
280     def test_callInThreadWithCallback(self):
281         """
282         L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
283         two-tuple of C{(True, result)} where C{result} is the value returned
284         by the callable supplied.
285         """
286         waiter = threading.Lock()
287         waiter.acquire()
288
289         results = []
290
291         def onResult(success, result):
292             waiter.release()
293             results.append(success)
294             results.append(result)
295
296         tp = threadpool.ThreadPool(0, 1)
297         tp.callInThreadWithCallback(onResult, lambda : "test")
298         tp.start()
299
300         try:
301             self._waitForLock(waiter)
302         finally:
303             tp.stop()
304
305         self.assertTrue(results[0])
306         self.assertEqual(results[1], "test")
307
308
309     def test_callInThreadWithCallbackExceptionInCallback(self):
310         """
311         L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
312         two-tuple of C{(False, failure)} where C{failure} represents the
313         exception raised by the callable supplied.
314         """
315         class NewError(Exception):
316             pass
317
318         def raiseError():
319             raise NewError()
320
321         waiter = threading.Lock()
322         waiter.acquire()
323
324         results = []
325
326         def onResult(success, result):
327             waiter.release()
328             results.append(success)
329             results.append(result)
330
331         tp = threadpool.ThreadPool(0, 1)
332         tp.callInThreadWithCallback(onResult, raiseError)
333         tp.start()
334
335         try:
336             self._waitForLock(waiter)
337         finally:
338             tp.stop()
339
340         self.assertFalse(results[0])
341         self.assertTrue(isinstance(results[1], failure.Failure))
342         self.assertTrue(issubclass(results[1].type, NewError))
343
344
345     def test_callInThreadWithCallbackExceptionInOnResult(self):
346         """
347         L{ThreadPool.callInThreadWithCallback} logs the exception raised by
348         C{onResult}.
349         """
350         class NewError(Exception):
351             pass
352
353         waiter = threading.Lock()
354         waiter.acquire()
355
356         results = []
357
358         def onResult(success, result):
359             results.append(success)
360             results.append(result)
361             raise NewError()
362
363         tp = threadpool.ThreadPool(0, 1)
364         tp.callInThreadWithCallback(onResult, lambda : None)
365         tp.callInThread(waiter.release)
366         tp.start()
367
368         try:
369             self._waitForLock(waiter)
370         finally:
371             tp.stop()
372
373         errors = self.flushLoggedErrors(NewError)
374         self.assertEqual(len(errors), 1)
375
376         self.assertTrue(results[0])
377         self.assertEqual(results[1], None)
378
379
380     def test_callbackThread(self):
381         """
382         L{ThreadPool.callInThreadWithCallback} calls the function it is
383         given and the C{onResult} callback in the same thread.
384         """
385         threadIds = []
386
387         import thread
388
389         event = threading.Event()
390
391         def onResult(success, result):
392             threadIds.append(thread.get_ident())
393             event.set()
394
395         def func():
396             threadIds.append(thread.get_ident())
397
398         tp = threadpool.ThreadPool(0, 1)
399         tp.callInThreadWithCallback(onResult, func)
400         tp.start()
401         self.addCleanup(tp.stop)
402
403         event.wait(self.getTimeout())
404         self.assertEqual(len(threadIds), 2)
405         self.assertEqual(threadIds[0], threadIds[1])
406
407
408     def test_callbackContext(self):
409         """
410         The context L{ThreadPool.callInThreadWithCallback} is invoked in is
411         shared by the context the callable and C{onResult} callback are
412         invoked in.
413         """
414         myctx = context.theContextTracker.currentContext().contexts[-1]
415         myctx['testing'] = 'this must be present'
416
417         contexts = []
418
419         event = threading.Event()
420
421         def onResult(success, result):
422             ctx = context.theContextTracker.currentContext().contexts[-1]
423             contexts.append(ctx)
424             event.set()
425
426         def func():
427             ctx = context.theContextTracker.currentContext().contexts[-1]
428             contexts.append(ctx)
429
430         tp = threadpool.ThreadPool(0, 1)
431         tp.callInThreadWithCallback(onResult, func)
432         tp.start()
433         self.addCleanup(tp.stop)
434
435         event.wait(self.getTimeout())
436
437         self.assertEqual(len(contexts), 2)
438         self.assertEqual(myctx, contexts[0])
439         self.assertEqual(myctx, contexts[1])
440
441
442     def test_existingWork(self):
443         """
444         Work added to the threadpool before its start should be executed once
445         the threadpool is started: this is ensured by trying to release a lock
446         previously acquired.
447         """
448         waiter = threading.Lock()
449         waiter.acquire()
450
451         tp = threadpool.ThreadPool(0, 1)
452         tp.callInThread(waiter.release) # before start()
453         tp.start()
454
455         try:
456             self._waitForLock(waiter)
457         finally:
458             tp.stop()
459
460
461
462 class RaceConditionTestCase(unittest.TestCase):
463     def setUp(self):
464         self.event = threading.Event()
465         self.threadpool = threadpool.ThreadPool(0, 10)
466         self.threadpool.start()
467
468
469     def tearDown(self):
470         del self.event
471         self.threadpool.stop()
472         del self.threadpool
473
474
475     def test_synchronization(self):
476         """
477         Test a race condition: ensure that actions run in the pool synchronize
478         with actions run in the main thread.
479         """
480         timeout = self.getTimeout()
481         self.threadpool.callInThread(self.event.set)
482         self.event.wait(timeout)
483         self.event.clear()
484         for i in range(3):
485             self.threadpool.callInThread(self.event.wait)
486         self.threadpool.callInThread(self.event.set)
487         self.event.wait(timeout)
488         if not self.event.isSet():
489             self.event.set()
490             self.fail("Actions not synchronized")
491
492
493     def test_singleThread(self):
494         """
495         The submission of a new job to a thread pool in response to the
496         C{onResult} callback does not cause a new thread to be added to the
497         thread pool.
498
499         This requires that the thread which calls C{onResult} to have first
500         marked itself as available so that when the new job is queued, that
501         thread may be considered to run it.  This is desirable so that when
502         only N jobs are ever being executed in the thread pool at once only
503         N threads will ever be created.
504         """
505         # Ensure no threads running
506         self.assertEqual(self.threadpool.workers, 0)
507
508         loopDeferred = Deferred()
509
510         def onResult(success, counter):
511             reactor.callFromThread(submit, counter)
512
513         def submit(counter):
514             if counter:
515                 self.threadpool.callInThreadWithCallback(
516                     onResult, lambda: counter - 1)
517             else:
518                 loopDeferred.callback(None)
519
520         def cbLoop(ignored):
521             # Ensure there is only one thread running.
522             self.assertEqual(self.threadpool.workers, 1)
523
524         loopDeferred.addCallback(cbLoop)
525         submit(10)
526         return loopDeferred