1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Tests for L{twisted.internet.epollreactor}.
8 from twisted.trial.unittest import TestCase
10 from twisted.internet.epollreactor import _ContinuousPolling
12 _ContinuousPolling = None
13 from twisted.internet.task import Clock
14 from twisted.internet.error import ConnectionDone
18 class Descriptor(object):
20 Records reads and writes, as if it were a C{FileDescriptor}.
32 self.events.append("read")
36 self.events.append("write")
39 def connectionLost(self, reason):
40 reason.trap(ConnectionDone)
41 self.events.append("lost")
45 class ContinuousPollingTests(TestCase):
47 L{_ContinuousPolling} can be used to read and write from C{FileDescriptor}
51 def test_addReader(self):
53 Adding a reader when there was previously no reader starts up a
56 poller = _ContinuousPolling(Clock())
57 self.assertEqual(poller._loop, None)
59 self.assertFalse(poller.isReading(reader))
60 poller.addReader(reader)
61 self.assertNotEqual(poller._loop, None)
62 self.assertTrue(poller._loop.running)
63 self.assertIdentical(poller._loop.clock, poller._reactor)
64 self.assertTrue(poller.isReading(reader))
67 def test_addWriter(self):
69 Adding a writer when there was previously no writer starts up a
72 poller = _ContinuousPolling(Clock())
73 self.assertEqual(poller._loop, None)
75 self.assertFalse(poller.isWriting(writer))
76 poller.addWriter(writer)
77 self.assertNotEqual(poller._loop, None)
78 self.assertTrue(poller._loop.running)
79 self.assertIdentical(poller._loop.clock, poller._reactor)
80 self.assertTrue(poller.isWriting(writer))
83 def test_removeReader(self):
85 Removing a reader stops the C{LoopingCall}.
87 poller = _ContinuousPolling(Clock())
89 poller.addReader(reader)
90 poller.removeReader(reader)
91 self.assertEqual(poller._loop, None)
92 self.assertEqual(poller._reactor.getDelayedCalls(), [])
93 self.assertFalse(poller.isReading(reader))
96 def test_removeWriter(self):
98 Removing a writer stops the C{LoopingCall}.
100 poller = _ContinuousPolling(Clock())
102 poller.addWriter(writer)
103 poller.removeWriter(writer)
104 self.assertEqual(poller._loop, None)
105 self.assertEqual(poller._reactor.getDelayedCalls(), [])
106 self.assertFalse(poller.isWriting(writer))
109 def test_removeUnknown(self):
111 Removing unknown readers and writers silently does nothing.
113 poller = _ContinuousPolling(Clock())
114 poller.removeWriter(object())
115 poller.removeReader(object())
118 def test_multipleReadersAndWriters(self):
120 Adding multiple readers and writers results in a single
123 poller = _ContinuousPolling(Clock())
125 poller.addWriter(writer)
126 self.assertNotEqual(poller._loop, None)
127 poller.addWriter(object())
128 self.assertNotEqual(poller._loop, None)
129 poller.addReader(object())
130 self.assertNotEqual(poller._loop, None)
131 poller.addReader(object())
132 poller.removeWriter(writer)
133 self.assertNotEqual(poller._loop, None)
134 self.assertTrue(poller._loop.running)
135 self.assertEqual(len(poller._reactor.getDelayedCalls()), 1)
138 def test_readerPolling(self):
140 Adding a reader causes its C{doRead} to be called every 1
144 poller = _ContinuousPolling(reactor)
146 poller.addReader(desc)
147 self.assertEqual(desc.events, [])
148 reactor.advance(0.00001)
149 self.assertEqual(desc.events, ["read"])
150 reactor.advance(0.00001)
151 self.assertEqual(desc.events, ["read", "read"])
152 reactor.advance(0.00001)
153 self.assertEqual(desc.events, ["read", "read", "read"])
156 def test_writerPolling(self):
158 Adding a writer causes its C{doWrite} to be called every 1
162 poller = _ContinuousPolling(reactor)
164 poller.addWriter(desc)
165 self.assertEqual(desc.events, [])
166 reactor.advance(0.001)
167 self.assertEqual(desc.events, ["write"])
168 reactor.advance(0.001)
169 self.assertEqual(desc.events, ["write", "write"])
170 reactor.advance(0.001)
171 self.assertEqual(desc.events, ["write", "write", "write"])
174 def test_connectionLostOnRead(self):
176 If a C{doRead} returns a value indicating disconnection,
177 C{connectionLost} is called on it.
180 poller = _ContinuousPolling(reactor)
182 desc.doRead = lambda: ConnectionDone()
183 poller.addReader(desc)
184 self.assertEqual(desc.events, [])
185 reactor.advance(0.001)
186 self.assertEqual(desc.events, ["lost"])
189 def test_connectionLostOnWrite(self):
191 If a C{doWrite} returns a value indicating disconnection,
192 C{connectionLost} is called on it.
195 poller = _ContinuousPolling(reactor)
197 desc.doWrite = lambda: ConnectionDone()
198 poller.addWriter(desc)
199 self.assertEqual(desc.events, [])
200 reactor.advance(0.001)
201 self.assertEqual(desc.events, ["lost"])
204 def test_removeAll(self):
206 L{_ContinuousPolling.removeAll} removes all descriptors and returns
207 the readers and writers.
209 poller = _ContinuousPolling(Clock())
213 poller.addReader(reader)
214 poller.addReader(both)
215 poller.addWriter(writer)
216 poller.addWriter(both)
217 removed = poller.removeAll()
218 self.assertEqual(poller.getReaders(), [])
219 self.assertEqual(poller.getWriters(), [])
220 self.assertEqual(len(removed), 3)
221 self.assertEqual(set(removed), set([reader, writer, both]))
224 def test_getReaders(self):
226 L{_ContinuousPolling.getReaders} returns a list of the read
229 poller = _ContinuousPolling(Clock())
231 poller.addReader(reader)
232 self.assertIn(reader, poller.getReaders())
235 def test_getWriters(self):
237 L{_ContinuousPolling.getWriters} returns a list of the write
240 poller = _ContinuousPolling(Clock())
242 poller.addWriter(writer)
243 self.assertIn(writer, poller.getWriters())
245 if _ContinuousPolling is None:
246 skip = "epoll not supported in this environment."