1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Abstract file handle class
8 from twisted.internet import main, error, interfaces
9 from twisted.internet.abstract import _ConsumerMixin, _LogOwner
10 from twisted.python import failure
12 from zope.interface import implements
15 from twisted.internet.iocpreactor.const import ERROR_HANDLE_EOF
16 from twisted.internet.iocpreactor.const import ERROR_IO_PENDING
17 from twisted.internet.iocpreactor import iocpsupport as _iocp
21 class FileHandle(_ConsumerMixin, _LogOwner):
23 File handle that can read and write asynchronously
25 implements(interfaces.IPushProducer, interfaces.IConsumer,
26 interfaces.ITransport, interfaces.IHalfCloseableDescriptor)
31 dynamicReadBuffers = True # set this to false if subclass doesn't do iovecs
33 _readSize = 0 # how much data we have in the read buffer
35 _readScheduledInOS = False
38 def startReading(self):
39 self.reactor.addActiveHandle(self)
40 if not self._readScheduled and not self.reading:
42 self._readScheduled = self.reactor.callLater(0,
46 def stopReading(self):
47 if self._readScheduled:
48 self._readScheduled.cancel()
49 self._readScheduled = None
53 def _resumeReading(self):
54 self._readScheduled = None
55 if self._dispatchData() and not self._readScheduledInOS:
59 def _dispatchData(self):
61 Dispatch previously read data. Return True if self.reading and we don't
64 if not self._readSize:
67 full_buffers = size // self.readBufferSize
68 while self._readNextBuffer < full_buffers:
69 self.dataReceived(self._readBuffers[self._readNextBuffer])
70 self._readNextBuffer += 1
73 remainder = size % self.readBufferSize
75 self.dataReceived(buffer(self._readBuffers[full_buffers],
77 if self.dynamicReadBuffers:
78 total_buffer_size = self.readBufferSize * len(self._readBuffers)
79 # we have one buffer too many
80 if size < total_buffer_size - self.readBufferSize:
81 del self._readBuffers[-1]
82 # we filled all buffers, so allocate one more
83 elif (size == total_buffer_size and
84 len(self._readBuffers) < self.maxReadBuffers):
85 self._readBuffers.append(_iocp.AllocateReadBuffer(
87 self._readNextBuffer = 0
92 def _cbRead(self, rc, bytes, evt):
93 self._readScheduledInOS = False
94 if self._handleRead(rc, bytes, evt):
98 def _handleRead(self, rc, bytes, evt):
100 Returns False if we should stop reading for now
102 if self.disconnected:
104 # graceful disconnection
105 if (not (rc or bytes)) or rc in (errno.WSAEDISCON, ERROR_HANDLE_EOF):
106 self.reactor.removeActiveHandle(self)
107 self.readConnectionLost(failure.Failure(main.CONNECTION_DONE))
109 # XXX: not handling WSAEWOULDBLOCK
110 # ("too many outstanding overlapped I/O requests")
112 self.connectionLost(failure.Failure(
113 error.ConnectionLost("read error -- %s (%s)" %
114 (errno.errorcode.get(rc, 'unknown'), rc))))
117 assert self._readSize == 0
118 assert self._readNextBuffer == 0
119 self._readSize = bytes
120 return self._dispatchData()
124 evt = _iocp.Event(self._cbRead, self)
126 evt.buff = buff = self._readBuffers
127 rc, bytes = self.readFromHandle(buff, evt)
129 if not rc or rc == ERROR_IO_PENDING:
130 self._readScheduledInOS = True
132 self._handleRead(rc, bytes, evt)
135 def readFromHandle(self, bufflist, evt):
136 raise NotImplementedError() # TODO: this should default to ReadFile
139 def dataReceived(self, data):
140 raise NotImplementedError
143 def readConnectionLost(self, reason):
144 self.connectionLost(reason)
151 _writeScheduled = None
152 _writeDisconnecting = False
153 _writeDisconnected = False
154 writeBufferSize = 2**2**2**2
157 def loseWriteConnection(self):
158 self._writeDisconnecting = True
162 def _closeWriteConnection(self):
163 # override in subclasses
167 def writeConnectionLost(self, reason):
168 # in current code should never be called
169 self.connectionLost(reason)
172 def startWriting(self):
173 self.reactor.addActiveHandle(self)
175 if not self._writeScheduled:
176 self._writeScheduled = self.reactor.callLater(0,
180 def stopWriting(self):
181 if self._writeScheduled:
182 self._writeScheduled.cancel()
183 self._writeScheduled = None
187 def _resumeWriting(self):
188 self._writeScheduled = None
192 def _cbWrite(self, rc, bytes, evt):
193 if self._handleWrite(rc, bytes, evt):
197 def _handleWrite(self, rc, bytes, evt):
199 Returns false if we should stop writing for now
201 if self.disconnected or self._writeDisconnected:
203 # XXX: not handling WSAEWOULDBLOCK
204 # ("too many outstanding overlapped I/O requests")
206 self.connectionLost(failure.Failure(
207 error.ConnectionLost("write error -- %s (%s)" %
208 (errno.errorcode.get(rc, 'unknown'), rc))))
212 # If there is nothing left to send,
213 if self.offset == len(self.dataBuffer) and not self._tempDataLen:
218 # If I've got a producer who is supposed to supply me with data
219 if self.producer is not None and ((not self.streamingProducer)
220 or self.producerPaused):
221 # tell them to supply some more.
222 self.producerPaused = True
223 self.producer.resumeProducing()
224 elif self.disconnecting:
225 # But if I was previously asked to let the connection die,
227 self.connectionLost(failure.Failure(main.CONNECTION_DONE))
228 elif self._writeDisconnecting:
229 # I was previously asked to to half-close the connection.
230 self._writeDisconnected = True
231 self._closeWriteConnection()
238 if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
239 # If there is currently less than SEND_LIMIT bytes left to send
240 # in the string, extend it with the array data.
241 self.dataBuffer = (buffer(self.dataBuffer, self.offset) +
242 "".join(self._tempDataBuffer))
244 self._tempDataBuffer = []
245 self._tempDataLen = 0
247 evt = _iocp.Event(self._cbWrite, self)
249 # Send as much data as you can.
251 evt.buff = buff = buffer(self.dataBuffer, self.offset)
253 evt.buff = buff = self.dataBuffer
254 rc, bytes = self.writeToHandle(buff, evt)
255 if rc and rc != ERROR_IO_PENDING:
256 self._handleWrite(rc, bytes, evt)
259 def writeToHandle(self, buff, evt):
260 raise NotImplementedError() # TODO: this should default to WriteFile
263 def write(self, data):
264 """Reliably write some data.
266 The data is buffered until his file descriptor is ready for writing.
268 if isinstance(data, unicode): # no, really, I mean it
269 raise TypeError("Data must not be unicode")
270 if not self.connected or self._writeDisconnected:
273 self._tempDataBuffer.append(data)
274 self._tempDataLen += len(data)
275 if self.producer is not None and self.streamingProducer:
276 if (len(self.dataBuffer) + self._tempDataLen
277 > self.writeBufferSize):
278 self.producerPaused = True
279 self.producer.pauseProducing()
283 def writeSequence(self, iovec):
285 if isinstance(i, unicode): # no, really, I mean it
286 raise TypeError("Data must not be unicode")
287 if not self.connected or not iovec or self._writeDisconnected:
289 self._tempDataBuffer.extend(iovec)
291 self._tempDataLen += len(i)
292 if self.producer is not None and self.streamingProducer:
293 if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize:
294 self.producerPaused = True
295 self.producer.pauseProducing()
302 disconnecting = False
303 logstr = "Uninitialized"
305 SEND_LIMIT = 128*1024
308 def __init__(self, reactor = None):
310 from twisted.internet import reactor
311 self.reactor = reactor
312 self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
313 self._tempDataLen = 0
314 self._readBuffers = [_iocp.AllocateReadBuffer(self.readBufferSize)]
317 def connectionLost(self, reason):
319 The connection was lost.
321 This is called when the connection on a selectable object has been
322 lost. It will be called whether the connection was closed explicitly,
323 an exception occurred in an event handler, or the other end of the
324 connection closed it first.
326 Clean up state here, but make sure to call back up to FileDescriptor.
329 self.disconnected = True
330 self.connected = False
331 if self.producer is not None:
332 self.producer.stopProducing()
336 self.reactor.removeActiveHandle(self)
339 def getFileHandle(self):
343 def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
345 Close the connection at the next available opportunity.
347 Call this to cause this FileDescriptor to lose its connection. It will
348 first write any data that it has buffered.
350 If there is data buffered yet to be written, this method will cause the
351 transport to lose its connection as soon as it's done flushing its
352 write buffer. If you have a producer registered, the connection won't
353 be closed until the producer is finished. Therefore, make sure you
354 unregister your producer when it's finished, or the connection will
358 if self.connected and not self.disconnecting:
359 if self._writeDisconnected:
360 # doWrite won't trigger the connection close anymore
363 self.connectionLost(_connDone)
367 self.disconnecting = 1
370 # Producer/consumer implementation
372 def stopConsuming(self):
376 This is called when a producer has lost its connection, to tell the
377 consumer to go lose its connection (and break potential circular
380 self.unregisterProducer()
381 self.loseConnection()
384 # producer interface implementation
386 def resumeProducing(self):
387 assert self.connected and not self.disconnecting
391 def pauseProducing(self):
395 def stopProducing(self):
396 self.loseConnection()
399 __all__ = ['FileHandle']