Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / internet / _pollingfile.py
1 # -*- test-case-name: twisted.internet.test.test_pollingfile -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Implements a simple polling interface for file descriptors that don't work with
7 select() - this is pretty much only useful on Windows.
8 """
9
10 from zope.interface import implements
11
12 from twisted.internet.interfaces import IConsumer, IPushProducer
13
14
15 MIN_TIMEOUT = 0.000000001
16 MAX_TIMEOUT = 0.1
17
18
19
20 class _PollableResource:
21     active = True
22
23     def activate(self):
24         self.active = True
25
26
27     def deactivate(self):
28         self.active = False
29
30
31
32 class _PollingTimer:
33     # Everything is private here because it is really an implementation detail.
34
35     def __init__(self, reactor):
36         self.reactor = reactor
37         self._resources = []
38         self._pollTimer = None
39         self._currentTimeout = MAX_TIMEOUT
40         self._paused = False
41
42     def _addPollableResource(self, res):
43         self._resources.append(res)
44         self._checkPollingState()
45
46     def _checkPollingState(self):
47         for resource in self._resources:
48             if resource.active:
49                 self._startPolling()
50                 break
51         else:
52             self._stopPolling()
53
54     def _startPolling(self):
55         if self._pollTimer is None:
56             self._pollTimer = self._reschedule()
57
58     def _stopPolling(self):
59         if self._pollTimer is not None:
60             self._pollTimer.cancel()
61             self._pollTimer = None
62
63     def _pause(self):
64         self._paused = True
65
66     def _unpause(self):
67         self._paused = False
68         self._checkPollingState()
69
70     def _reschedule(self):
71         if not self._paused:
72             return self.reactor.callLater(self._currentTimeout, self._pollEvent)
73
74     def _pollEvent(self):
75         workUnits = 0.
76         anyActive = []
77         for resource in self._resources:
78             if resource.active:
79                 workUnits += resource.checkWork()
80                 # Check AFTER work has been done
81                 if resource.active:
82                     anyActive.append(resource)
83
84         newTimeout = self._currentTimeout
85         if workUnits:
86             newTimeout = self._currentTimeout / (workUnits + 1.)
87             if newTimeout < MIN_TIMEOUT:
88                 newTimeout = MIN_TIMEOUT
89         else:
90             newTimeout = self._currentTimeout * 2.
91             if newTimeout > MAX_TIMEOUT:
92                 newTimeout = MAX_TIMEOUT
93         self._currentTimeout = newTimeout
94         if anyActive:
95             self._pollTimer = self._reschedule()
96
97
98 # If we ever (let's hope not) need the above functionality on UNIX, this could
99 # be factored into a different module.
100
101 import win32pipe
102 import win32file
103 import win32api
104 import pywintypes
105
106 class _PollableReadPipe(_PollableResource):
107
108     implements(IPushProducer)
109
110     def __init__(self, pipe, receivedCallback, lostCallback):
111         # security attributes for pipes
112         self.pipe = pipe
113         self.receivedCallback = receivedCallback
114         self.lostCallback = lostCallback
115
116     def checkWork(self):
117         finished = 0
118         fullDataRead = []
119
120         while 1:
121             try:
122                 buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1)
123                 # finished = (result == -1)
124                 if not bytesToRead:
125                     break
126                 hr, data = win32file.ReadFile(self.pipe, bytesToRead, None)
127                 fullDataRead.append(data)
128             except win32api.error:
129                 finished = 1
130                 break
131
132         dataBuf = ''.join(fullDataRead)
133         if dataBuf:
134             self.receivedCallback(dataBuf)
135         if finished:
136             self.cleanup()
137         return len(dataBuf)
138
139     def cleanup(self):
140         self.deactivate()
141         self.lostCallback()
142
143     def close(self):
144         try:
145             win32api.CloseHandle(self.pipe)
146         except pywintypes.error:
147             # You can't close std handles...?
148             pass
149
150     def stopProducing(self):
151         self.close()
152
153     def pauseProducing(self):
154         self.deactivate()
155
156     def resumeProducing(self):
157         self.activate()
158
159
160 FULL_BUFFER_SIZE = 64 * 1024
161
162 class _PollableWritePipe(_PollableResource):
163
164     implements(IConsumer)
165
166     def __init__(self, writePipe, lostCallback):
167         self.disconnecting = False
168         self.producer = None
169         self.producerPaused = 0
170         self.streamingProducer = 0
171         self.outQueue = []
172         self.writePipe = writePipe
173         self.lostCallback = lostCallback
174         try:
175             win32pipe.SetNamedPipeHandleState(writePipe,
176                                               win32pipe.PIPE_NOWAIT,
177                                               None,
178                                               None)
179         except pywintypes.error:
180             # Maybe it's an invalid handle.  Who knows.
181             pass
182
183     def close(self):
184         self.disconnecting = True
185
186     def bufferFull(self):
187         if self.producer is not None:
188             self.producerPaused = 1
189             self.producer.pauseProducing()
190
191     def bufferEmpty(self):
192         if self.producer is not None and ((not self.streamingProducer) or
193                                           self.producerPaused):
194             self.producer.producerPaused = 0
195             self.producer.resumeProducing()
196             return True
197         return False
198
199     # almost-but-not-quite-exact copy-paste from abstract.FileDescriptor... ugh
200
201     def registerProducer(self, producer, streaming):
202         """Register to receive data from a producer.
203
204         This sets this selectable to be a consumer for a producer.  When this
205         selectable runs out of data on a write() call, it will ask the producer
206         to resumeProducing(). A producer should implement the IProducer
207         interface.
208
209         FileDescriptor provides some infrastructure for producer methods.
210         """
211         if self.producer is not None:
212             raise RuntimeError(
213                 "Cannot register producer %s, because producer %s was never "
214                 "unregistered." % (producer, self.producer))
215         if not self.active:
216             producer.stopProducing()
217         else:
218             self.producer = producer
219             self.streamingProducer = streaming
220             if not streaming:
221                 producer.resumeProducing()
222
223     def unregisterProducer(self):
224         """Stop consuming data from a producer, without disconnecting.
225         """
226         self.producer = None
227
228     def writeConnectionLost(self):
229         self.deactivate()
230         try:
231             win32api.CloseHandle(self.writePipe)
232         except pywintypes.error:
233             # OMG what
234             pass
235         self.lostCallback()
236
237
238     def writeSequence(self, seq):
239         """
240         Append a C{list} or C{tuple} of bytes to the output buffer.
241
242         @param seq: C{list} or C{tuple} of C{str} instances to be appended to
243             the output buffer.
244
245         @raise TypeError: If C{seq} contains C{unicode}.
246         """
247         if unicode in map(type, seq):
248             raise TypeError("Unicode not allowed in output buffer.")
249         self.outQueue.extend(seq)
250
251
252     def write(self, data):
253         """
254         Append some bytes to the output buffer.
255
256         @param data: C{str} to be appended to the output buffer.
257         @type data: C{str}.
258
259         @raise TypeError: If C{data} is C{unicode} instead of C{str}.
260         """
261         if isinstance(data, unicode):
262             raise TypeError("Unicode not allowed in output buffer.")
263         if self.disconnecting:
264             return
265         self.outQueue.append(data)
266         if sum(map(len, self.outQueue)) > FULL_BUFFER_SIZE:
267             self.bufferFull()
268
269
270     def checkWork(self):
271         numBytesWritten = 0
272         if not self.outQueue:
273             if self.disconnecting:
274                 self.writeConnectionLost()
275                 return 0
276             try:
277                 win32file.WriteFile(self.writePipe, '', None)
278             except pywintypes.error:
279                 self.writeConnectionLost()
280                 return numBytesWritten
281         while self.outQueue:
282             data = self.outQueue.pop(0)
283             errCode = 0
284             try:
285                 errCode, nBytesWritten = win32file.WriteFile(self.writePipe,
286                                                              data, None)
287             except win32api.error:
288                 self.writeConnectionLost()
289                 break
290             else:
291                 # assert not errCode, "wtf an error code???"
292                 numBytesWritten += nBytesWritten
293                 if len(data) > nBytesWritten:
294                     self.outQueue.insert(0, data[nBytesWritten:])
295                     break
296         else:
297             resumed = self.bufferEmpty()
298             if not resumed and self.disconnecting:
299                 self.writeConnectionLost()
300         return numBytesWritten