Imported Upstream version 12.1.0
[contrib/python-twisted.git] / twisted / test / test_threadable.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 import sys, pickle
5
6 try:
7     import threading
8 except ImportError:
9     threading = None
10
11 from twisted.trial import unittest
12 from twisted.python import threadable
13 from twisted.internet import defer, reactor
14
15 class TestObject:
16     synchronized = ['aMethod']
17
18     x = -1
19     y = 1
20
21     def aMethod(self):
22         for i in xrange(10):
23             self.x, self.y = self.y, self.x
24             self.z = self.x + self.y
25             assert self.z == 0, "z == %d, not 0 as expected" % (self.z,)
26
27 threadable.synchronize(TestObject)
28
29 class SynchronizationTestCase(unittest.TestCase):
30     def setUp(self):
31         """
32         Reduce the CPython check interval so that thread switches happen much
33         more often, hopefully exercising more possible race conditions.  Also,
34         delay actual test startup until the reactor has been started.
35         """
36         if hasattr(sys, 'getcheckinterval'):
37             self.addCleanup(sys.setcheckinterval, sys.getcheckinterval())
38             sys.setcheckinterval(7)
39         # XXX This is a trial hack.  We need to make sure the reactor
40         # actually *starts* for isInIOThread() to have a meaningful result.
41         # Returning a Deferred here should force that to happen, if it has
42         # not happened already.  In the future, this should not be
43         # necessary.
44         d = defer.Deferred()
45         reactor.callLater(0, d.callback, None)
46         return d
47
48
49     def testIsInIOThread(self):
50         foreignResult = []
51         t = threading.Thread(target=lambda: foreignResult.append(threadable.isInIOThread()))
52         t.start()
53         t.join()
54         self.failIf(foreignResult[0], "Non-IO thread reported as IO thread")
55         self.failUnless(threadable.isInIOThread(), "IO thread reported as not IO thread")
56
57
58     def testThreadedSynchronization(self):
59         o = TestObject()
60
61         errors = []
62
63         def callMethodLots():
64             try:
65                 for i in xrange(1000):
66                     o.aMethod()
67             except AssertionError, e:
68                 errors.append(str(e))
69
70         threads = []
71         for x in range(5):
72             t = threading.Thread(target=callMethodLots)
73             threads.append(t)
74             t.start()
75
76         for t in threads:
77             t.join()
78
79         if errors:
80             raise unittest.FailTest(errors)
81
82     def testUnthreadedSynchronization(self):
83         o = TestObject()
84         for i in xrange(1000):
85             o.aMethod()
86
87 class SerializationTestCase(unittest.TestCase):
88     def testPickling(self):
89         lock = threadable.XLock()
90         lockType = type(lock)
91         lockPickle = pickle.dumps(lock)
92         newLock = pickle.loads(lockPickle)
93         self.failUnless(isinstance(newLock, lockType))
94
95     def testUnpickling(self):
96         lockPickle = 'ctwisted.python.threadable\nunpickle_lock\np0\n(tp1\nRp2\n.'
97         lock = pickle.loads(lockPickle)
98         newPickle = pickle.dumps(lock, 2)
99         newLock = pickle.loads(newPickle)
100
101 if threading is None:
102     SynchronizationTestCase.testThreadedSynchronization.skip = "Platform lacks thread support"
103     SerializationTestCase.testPickling.skip = "Platform lacks thread support"