Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / internet / iocpreactor / abstract.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Abstract file handle class
6 """
7
8 from twisted.internet import main, error, interfaces
9 from twisted.internet.abstract import _ConsumerMixin, _LogOwner
10 from twisted.python import failure
11
12 from zope.interface import implements
13 import errno
14
15 from twisted.internet.iocpreactor.const import ERROR_HANDLE_EOF
16 from twisted.internet.iocpreactor.const import ERROR_IO_PENDING
17 from twisted.internet.iocpreactor import iocpsupport as _iocp
18
19
20
21 class FileHandle(_ConsumerMixin, _LogOwner):
22     """
23     File handle that can read and write asynchronously
24     """
25     implements(interfaces.IPushProducer, interfaces.IConsumer,
26                interfaces.ITransport, interfaces.IHalfCloseableDescriptor)
27     # read stuff
28     maxReadBuffers = 16
29     readBufferSize = 4096
30     reading = False
31     dynamicReadBuffers = True # set this to false if subclass doesn't do iovecs
32     _readNextBuffer = 0
33     _readSize = 0 # how much data we have in the read buffer
34     _readScheduled = None
35     _readScheduledInOS = False
36
37
38     def startReading(self):
39         self.reactor.addActiveHandle(self)
40         if not self._readScheduled and not self.reading:
41             self.reading = True
42             self._readScheduled = self.reactor.callLater(0,
43                                                          self._resumeReading)
44
45
46     def stopReading(self):
47         if self._readScheduled:
48             self._readScheduled.cancel()
49             self._readScheduled = None
50         self.reading = False
51
52
53     def _resumeReading(self):
54         self._readScheduled = None
55         if self._dispatchData() and not self._readScheduledInOS:
56             self.doRead()
57
58
59     def _dispatchData(self):
60         """
61         Dispatch previously read data. Return True if self.reading and we don't
62         have any more data
63         """
64         if not self._readSize:
65             return self.reading
66         size = self._readSize
67         full_buffers = size // self.readBufferSize
68         while self._readNextBuffer < full_buffers:
69             self.dataReceived(self._readBuffers[self._readNextBuffer])
70             self._readNextBuffer += 1
71             if not self.reading:
72                 return False
73         remainder = size % self.readBufferSize
74         if remainder:
75             self.dataReceived(buffer(self._readBuffers[full_buffers],
76                                      0, remainder))
77         if self.dynamicReadBuffers:
78             total_buffer_size = self.readBufferSize * len(self._readBuffers)
79             # we have one buffer too many
80             if size < total_buffer_size - self.readBufferSize:
81                 del self._readBuffers[-1]
82             # we filled all buffers, so allocate one more
83             elif (size == total_buffer_size and
84                   len(self._readBuffers) < self.maxReadBuffers):
85                 self._readBuffers.append(_iocp.AllocateReadBuffer(
86                                             self.readBufferSize))
87         self._readNextBuffer = 0
88         self._readSize = 0
89         return self.reading
90
91
92     def _cbRead(self, rc, bytes, evt):
93         self._readScheduledInOS = False
94         if self._handleRead(rc, bytes, evt):
95             self.doRead()
96
97
98     def _handleRead(self, rc, bytes, evt):
99         """
100         Returns False if we should stop reading for now
101         """
102         if self.disconnected:
103             return False
104         # graceful disconnection
105         if (not (rc or bytes)) or rc in (errno.WSAEDISCON, ERROR_HANDLE_EOF):
106             self.reactor.removeActiveHandle(self)
107             self.readConnectionLost(failure.Failure(main.CONNECTION_DONE))
108             return False
109         # XXX: not handling WSAEWOULDBLOCK
110         # ("too many outstanding overlapped I/O requests")
111         elif rc:
112             self.connectionLost(failure.Failure(
113                                 error.ConnectionLost("read error -- %s (%s)" %
114                                     (errno.errorcode.get(rc, 'unknown'), rc))))
115             return False
116         else:
117             assert self._readSize == 0
118             assert self._readNextBuffer == 0
119             self._readSize = bytes
120             return self._dispatchData()
121
122
123     def doRead(self):
124         evt = _iocp.Event(self._cbRead, self)
125
126         evt.buff = buff = self._readBuffers
127         rc, bytes = self.readFromHandle(buff, evt)
128
129         if not rc or rc == ERROR_IO_PENDING:
130             self._readScheduledInOS = True
131         else:
132             self._handleRead(rc, bytes, evt)
133
134
135     def readFromHandle(self, bufflist, evt):
136         raise NotImplementedError() # TODO: this should default to ReadFile
137
138
139     def dataReceived(self, data):
140         raise NotImplementedError
141
142
143     def readConnectionLost(self, reason):
144         self.connectionLost(reason)
145
146
147     # write stuff
148     dataBuffer = ''
149     offset = 0
150     writing = False
151     _writeScheduled = None
152     _writeDisconnecting = False
153     _writeDisconnected = False
154     writeBufferSize = 2**2**2**2
155
156
157     def loseWriteConnection(self):
158         self._writeDisconnecting = True
159         self.startWriting()
160
161
162     def _closeWriteConnection(self):
163         # override in subclasses
164         pass
165
166
167     def writeConnectionLost(self, reason):
168         # in current code should never be called
169         self.connectionLost(reason)
170
171
172     def startWriting(self):
173         self.reactor.addActiveHandle(self)
174         self.writing = True
175         if not self._writeScheduled:
176             self._writeScheduled = self.reactor.callLater(0,
177                                                           self._resumeWriting)
178
179
180     def stopWriting(self):
181         if self._writeScheduled:
182             self._writeScheduled.cancel()
183             self._writeScheduled = None
184         self.writing = False
185
186
187     def _resumeWriting(self):
188         self._writeScheduled = None
189         self.doWrite()
190
191
192     def _cbWrite(self, rc, bytes, evt):
193         if self._handleWrite(rc, bytes, evt):
194             self.doWrite()
195
196
197     def _handleWrite(self, rc, bytes, evt):
198         """
199         Returns false if we should stop writing for now
200         """
201         if self.disconnected or self._writeDisconnected:
202             return False
203         # XXX: not handling WSAEWOULDBLOCK
204         # ("too many outstanding overlapped I/O requests")
205         if rc:
206             self.connectionLost(failure.Failure(
207                                 error.ConnectionLost("write error -- %s (%s)" %
208                                     (errno.errorcode.get(rc, 'unknown'), rc))))
209             return False
210         else:
211             self.offset += bytes
212             # If there is nothing left to send,
213             if self.offset == len(self.dataBuffer) and not self._tempDataLen:
214                 self.dataBuffer = ""
215                 self.offset = 0
216                 # stop writing
217                 self.stopWriting()
218                 # If I've got a producer who is supposed to supply me with data
219                 if self.producer is not None and ((not self.streamingProducer)
220                                                   or self.producerPaused):
221                     # tell them to supply some more.
222                     self.producerPaused = True
223                     self.producer.resumeProducing()
224                 elif self.disconnecting:
225                     # But if I was previously asked to let the connection die,
226                     # do so.
227                     self.connectionLost(failure.Failure(main.CONNECTION_DONE))
228                 elif self._writeDisconnecting:
229                     # I was previously asked to to half-close the connection.
230                     self._writeDisconnected = True
231                     self._closeWriteConnection()
232                 return False
233             else:
234                 return True
235
236
237     def doWrite(self):
238         if len(self.dataBuffer) - self.offset < self.SEND_LIMIT:
239             # If there is currently less than SEND_LIMIT bytes left to send
240             # in the string, extend it with the array data.
241             self.dataBuffer = (buffer(self.dataBuffer, self.offset) +
242                                "".join(self._tempDataBuffer))
243             self.offset = 0
244             self._tempDataBuffer = []
245             self._tempDataLen = 0
246
247         evt = _iocp.Event(self._cbWrite, self)
248
249         # Send as much data as you can.
250         if self.offset:
251             evt.buff = buff = buffer(self.dataBuffer, self.offset)
252         else:
253             evt.buff = buff = self.dataBuffer
254         rc, bytes = self.writeToHandle(buff, evt)
255         if rc and rc != ERROR_IO_PENDING:
256             self._handleWrite(rc, bytes, evt)
257
258
259     def writeToHandle(self, buff, evt):
260         raise NotImplementedError() # TODO: this should default to WriteFile
261
262
263     def write(self, data):
264         """Reliably write some data.
265
266         The data is buffered until his file descriptor is ready for writing.
267         """
268         if isinstance(data, unicode): # no, really, I mean it
269             raise TypeError("Data must not be unicode")
270         if not self.connected or self._writeDisconnected:
271             return
272         if data:
273             self._tempDataBuffer.append(data)
274             self._tempDataLen += len(data)
275             if self.producer is not None and self.streamingProducer:
276                 if (len(self.dataBuffer) + self._tempDataLen
277                     > self.writeBufferSize):
278                     self.producerPaused = True
279                     self.producer.pauseProducing()
280             self.startWriting()
281
282
283     def writeSequence(self, iovec):
284         for i in iovec:
285             if isinstance(i, unicode): # no, really, I mean it
286                 raise TypeError("Data must not be unicode")
287         if not self.connected or not iovec or self._writeDisconnected:
288             return
289         self._tempDataBuffer.extend(iovec)
290         for i in iovec:
291             self._tempDataLen += len(i)
292         if self.producer is not None and self.streamingProducer:
293             if len(self.dataBuffer) + self._tempDataLen > self.writeBufferSize:
294                 self.producerPaused = True
295                 self.producer.pauseProducing()
296         self.startWriting()
297
298
299     # general stuff
300     connected = False
301     disconnected = False
302     disconnecting = False
303     logstr = "Uninitialized"
304
305     SEND_LIMIT = 128*1024
306
307
308     def __init__(self, reactor = None):
309         if not reactor:
310             from twisted.internet import reactor
311         self.reactor = reactor
312         self._tempDataBuffer = [] # will be added to dataBuffer in doWrite
313         self._tempDataLen = 0
314         self._readBuffers = [_iocp.AllocateReadBuffer(self.readBufferSize)]
315
316
317     def connectionLost(self, reason):
318         """
319         The connection was lost.
320
321         This is called when the connection on a selectable object has been
322         lost.  It will be called whether the connection was closed explicitly,
323         an exception occurred in an event handler, or the other end of the
324         connection closed it first.
325
326         Clean up state here, but make sure to call back up to FileDescriptor.
327         """
328
329         self.disconnected = True
330         self.connected = False
331         if self.producer is not None:
332             self.producer.stopProducing()
333             self.producer = None
334         self.stopReading()
335         self.stopWriting()
336         self.reactor.removeActiveHandle(self)
337
338
339     def getFileHandle(self):
340         return -1
341
342
343     def loseConnection(self, _connDone=failure.Failure(main.CONNECTION_DONE)):
344         """
345         Close the connection at the next available opportunity.
346
347         Call this to cause this FileDescriptor to lose its connection.  It will
348         first write any data that it has buffered.
349
350         If there is data buffered yet to be written, this method will cause the
351         transport to lose its connection as soon as it's done flushing its
352         write buffer.  If you have a producer registered, the connection won't
353         be closed until the producer is finished. Therefore, make sure you
354         unregister your producer when it's finished, or the connection will
355         never close.
356         """
357
358         if self.connected and not self.disconnecting:
359             if self._writeDisconnected:
360                 # doWrite won't trigger the connection close anymore
361                 self.stopReading()
362                 self.stopWriting
363                 self.connectionLost(_connDone)
364             else:
365                 self.stopReading()
366                 self.startWriting()
367                 self.disconnecting = 1
368
369
370     # Producer/consumer implementation
371
372     def stopConsuming(self):
373         """
374         Stop consuming data.
375
376         This is called when a producer has lost its connection, to tell the
377         consumer to go lose its connection (and break potential circular
378         references).
379         """
380         self.unregisterProducer()
381         self.loseConnection()
382
383
384     # producer interface implementation
385
386     def resumeProducing(self):
387         assert self.connected and not self.disconnecting
388         self.startReading()
389
390
391     def pauseProducing(self):
392         self.stopReading()
393
394
395     def stopProducing(self):
396         self.loseConnection()
397
398
399 __all__ = ['FileHandle']
400