1 # -*- test-case-name: twisted.internet.test.test_pollingfile -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Implements a simple polling interface for file descriptors that don't work with
7 select() - this is pretty much only useful on Windows.
10 from zope.interface import implements
12 from twisted.internet.interfaces import IConsumer, IPushProducer
15 MIN_TIMEOUT = 0.000000001
20 class _PollableResource:
33 # Everything is private here because it is really an implementation detail.
35 def __init__(self, reactor):
36 self.reactor = reactor
38 self._pollTimer = None
39 self._currentTimeout = MAX_TIMEOUT
42 def _addPollableResource(self, res):
43 self._resources.append(res)
44 self._checkPollingState()
46 def _checkPollingState(self):
47 for resource in self._resources:
54 def _startPolling(self):
55 if self._pollTimer is None:
56 self._pollTimer = self._reschedule()
58 def _stopPolling(self):
59 if self._pollTimer is not None:
60 self._pollTimer.cancel()
61 self._pollTimer = None
68 self._checkPollingState()
70 def _reschedule(self):
72 return self.reactor.callLater(self._currentTimeout, self._pollEvent)
77 for resource in self._resources:
79 workUnits += resource.checkWork()
80 # Check AFTER work has been done
82 anyActive.append(resource)
84 newTimeout = self._currentTimeout
86 newTimeout = self._currentTimeout / (workUnits + 1.)
87 if newTimeout < MIN_TIMEOUT:
88 newTimeout = MIN_TIMEOUT
90 newTimeout = self._currentTimeout * 2.
91 if newTimeout > MAX_TIMEOUT:
92 newTimeout = MAX_TIMEOUT
93 self._currentTimeout = newTimeout
95 self._pollTimer = self._reschedule()
98 # If we ever (let's hope not) need the above functionality on UNIX, this could
99 # be factored into a different module.
106 class _PollableReadPipe(_PollableResource):
108 implements(IPushProducer)
110 def __init__(self, pipe, receivedCallback, lostCallback):
111 # security attributes for pipes
113 self.receivedCallback = receivedCallback
114 self.lostCallback = lostCallback
122 buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
123 # finished = (result == -1)
126 hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
127 fullDataRead.append(data)
128 except win32api.error:
132 dataBuf = ''.join(fullDataRead)
134 self.receivedCallback(dataBuf)
145 win32api.CloseHandle(self.pipe)
146 except pywintypes.error:
147 # You can't close std handles...?
150 def stopProducing(self):
153 def pauseProducing(self):
156 def resumeProducing(self):
160 FULL_BUFFER_SIZE = 64 * 1024
162 class _PollableWritePipe(_PollableResource):
164 implements(IConsumer)
166 def __init__(self, writePipe, lostCallback):
167 self.disconnecting = False
169 self.producerPaused = 0
170 self.streamingProducer = 0
172 self.writePipe = writePipe
173 self.lostCallback = lostCallback
175 win32pipe.SetNamedPipeHandleState(writePipe,
176 win32pipe.PIPE_NOWAIT,
179 except pywintypes.error:
180 # Maybe it's an invalid handle. Who knows.
184 self.disconnecting = True
186 def bufferFull(self):
187 if self.producer is not None:
188 self.producerPaused = 1
189 self.producer.pauseProducing()
191 def bufferEmpty(self):
192 if self.producer is not None and ((not self.streamingProducer) or
193 self.producerPaused):
194 self.producer.producerPaused = 0
195 self.producer.resumeProducing()
199 # almost-but-not-quite-exact copy-paste from abstract.FileDescriptor... ugh
201 def registerProducer(self, producer, streaming):
202 """Register to receive data from a producer.
204 This sets this selectable to be a consumer for a producer. When this
205 selectable runs out of data on a write() call, it will ask the producer
206 to resumeProducing(). A producer should implement the IProducer
209 FileDescriptor provides some infrastructure for producer methods.
211 if self.producer is not None:
213 "Cannot register producer %s, because producer %s was never "
214 "unregistered." % (producer, self.producer))
216 producer.stopProducing()
218 self.producer = producer
219 self.streamingProducer = streaming
221 producer.resumeProducing()
223 def unregisterProducer(self):
224 """Stop consuming data from a producer, without disconnecting.
228 def writeConnectionLost(self):
231 win32api.CloseHandle(self.writePipe)
232 except pywintypes.error:
238 def writeSequence(self, seq):
240 Append a C{list} or C{tuple} of bytes to the output buffer.
242 @param seq: C{list} or C{tuple} of C{str} instances to be appended to
245 @raise TypeError: If C{seq} contains C{unicode}.
247 if unicode in map(type, seq):
248 raise TypeError("Unicode not allowed in output buffer.")
249 self.outQueue.extend(seq)
252 def write(self, data):
254 Append some bytes to the output buffer.
256 @param data: C{str} to be appended to the output buffer.
259 @raise TypeError: If C{data} is C{unicode} instead of C{str}.
261 if isinstance(data, unicode):
262 raise TypeError("Unicode not allowed in output buffer.")
263 if self.disconnecting:
265 self.outQueue.append(data)
266 if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:
272 if not self.outQueue:
273 if self.disconnecting:
274 self.writeConnectionLost()
277 win32file.WriteFile(self.writePipe, '', None)
278 except pywintypes.error:
279 self.writeConnectionLost()
280 return numBytesWritten
282 data = self.outQueue.pop(0)
285 errCode, nBytesWritten = win32file.WriteFile(self.writePipe,
287 except win32api.error:
288 self.writeConnectionLost()
291 # assert not errCode, "wtf an error code???"
292 numBytesWritten += nBytesWritten
293 if len(data) > nBytesWritten:
294 self.outQueue.insert(0, data[nBytesWritten:])
297 resumed = self.bufferEmpty()
298 if not resumed and self.disconnecting:
299 self.writeConnectionLost()
300 return numBytesWritten