Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / internet / epollreactor.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 An epoll() based implementation of the twisted main loop.
6
7 To install the event loop (and you should do this before any connections,
8 listeners or connectors are added)::
9
10     from twisted.internet import epollreactor
11     epollreactor.install()
12 """
13
14 import errno
15
16 from zope.interface import implements
17
18 from twisted.internet.interfaces import IReactorFDSet
19
20 from twisted.python import log
21 from twisted.internet import posixbase
22
23 try:
24     # In Python 2.6+, select.epoll provides epoll functionality. Try to import
25     # it, and fall back to Twisted's own epoll wrapper if it isn't available
26     # for any reason.
27     from select import epoll
28 except ImportError:
29     from twisted.python import _epoll
30 else:
31     del epoll
32     import select as _epoll
33
34
35
36 class _ContinuousPolling(posixbase._PollLikeMixin,
37                          posixbase._DisconnectSelectableMixin):
38     """
39     Schedule reads and writes based on the passage of time, rather than
40     notification.
41
42     This is useful for supporting polling filesystem files, which C{epoll(7)}
43     does not support.
44
45     The implementation uses L{posixbase._PollLikeMixin}, which is a bit hacky,
46     but re-implementing and testing the relevant code yet again is
47     unappealing.
48
49     @ivar _reactor: The L{EPollReactor} that is using this instance.
50
51     @ivar _loop: A C{LoopingCall} that drives the polling, or C{None}.
52
53     @ivar _readers: A C{set} of C{FileDescriptor} objects that should be read
54         from.
55
56     @ivar _writers: A C{set} of C{FileDescriptor} objects that should be
57         written to.
58     """
59     implements(IReactorFDSet)
60
61     # Attributes for _PollLikeMixin
62     _POLL_DISCONNECTED = 1
63     _POLL_IN = 2
64     _POLL_OUT = 4
65
66
67     def __init__(self, reactor):
68         self._reactor = reactor
69         self._loop = None
70         self._readers = set()
71         self._writers = set()
72         self.isReading = self._readers.__contains__
73         self.isWriting = self._writers.__contains__
74
75
76     def _checkLoop(self):
77         """
78         Start or stop a C{LoopingCall} based on whether there are readers and
79         writers.
80         """
81         if self._readers or self._writers:
82             if self._loop is None:
83                 from twisted.internet.task import LoopingCall, _EPSILON
84                 self._loop = LoopingCall(self.iterate)
85                 self._loop.clock = self._reactor
86                 # LoopingCall seems unhappy with timeout of 0, so use very
87                 # small number:
88                 self._loop.start(_EPSILON, now=False)
89         elif self._loop:
90             self._loop.stop()
91             self._loop = None
92
93
94     def iterate(self):
95         """
96         Call C{doRead} and C{doWrite} on all readers and writers respectively.
97         """
98         for reader in list(self._readers):
99             self._doReadOrWrite(reader, reader, self._POLL_IN)
100         for reader in list(self._writers):
101             self._doReadOrWrite(reader, reader, self._POLL_OUT)
102
103
104     def addReader(self, reader):
105         """
106         Add a C{FileDescriptor} for notification of data available to read.
107         """
108         self._readers.add(reader)
109         self._checkLoop()
110
111
112     def addWriter(self, writer):
113         """
114         Add a C{FileDescriptor} for notification of data available to write.
115         """
116         self._writers.add(writer)
117         self._checkLoop()
118
119
120     def removeReader(self, reader):
121         """
122         Remove a C{FileDescriptor} from notification of data available to read.
123         """
124         try:
125             self._readers.remove(reader)
126         except KeyError:
127             return
128         self._checkLoop()
129
130
131     def removeWriter(self, writer):
132         """
133         Remove a C{FileDescriptor} from notification of data available to write.
134         """
135         try:
136             self._writers.remove(writer)
137         except KeyError:
138             return
139         self._checkLoop()
140
141
142     def removeAll(self):
143         """
144         Remove all readers and writers.
145         """
146         result = list(self._readers | self._writers)
147         # Don't reset to new value, since self.isWriting and .isReading refer
148         # to the existing instance:
149         self._readers.clear()
150         self._writers.clear()
151         return result
152
153
154     def getReaders(self):
155         """
156         Return a list of the readers.
157         """
158         return list(self._readers)
159
160
161     def getWriters(self):
162         """
163         Return a list of the writers.
164         """
165         return list(self._writers)
166
167
168
169 class EPollReactor(posixbase.PosixReactorBase, posixbase._PollLikeMixin):
170     """
171     A reactor that uses epoll(7).
172
173     @ivar _poller: A C{epoll} which will be used to check for I/O
174         readiness.
175
176     @ivar _selectables: A dictionary mapping integer file descriptors to
177         instances of C{FileDescriptor} which have been registered with the
178         reactor.  All C{FileDescriptors} which are currently receiving read or
179         write readiness notifications will be present as values in this
180         dictionary.
181
182     @ivar _reads: A dictionary mapping integer file descriptors to arbitrary
183         values (this is essentially a set).  Keys in this dictionary will be
184         registered with C{_poller} for read readiness notifications which will
185         be dispatched to the corresponding C{FileDescriptor} instances in
186         C{_selectables}.
187
188     @ivar _writes: A dictionary mapping integer file descriptors to arbitrary
189         values (this is essentially a set).  Keys in this dictionary will be
190         registered with C{_poller} for write readiness notifications which will
191         be dispatched to the corresponding C{FileDescriptor} instances in
192         C{_selectables}.
193
194     @ivar _continuousPolling: A L{_ContinuousPolling} instance, used to handle
195         file descriptors (e.g. filesytem files) that are not supported by
196         C{epoll(7)}.
197     """
198     implements(IReactorFDSet)
199
200     # Attributes for _PollLikeMixin
201     _POLL_DISCONNECTED = (_epoll.EPOLLHUP | _epoll.EPOLLERR)
202     _POLL_IN = _epoll.EPOLLIN
203     _POLL_OUT = _epoll.EPOLLOUT
204
205     def __init__(self):
206         """
207         Initialize epoll object, file descriptor tracking dictionaries, and the
208         base class.
209         """
210         # Create the poller we're going to use.  The 1024 here is just a hint to
211         # the kernel, it is not a hard maximum.  After Linux 2.6.8, the size
212         # argument is completely ignored.
213         self._poller = _epoll.epoll(1024)
214         self._reads = {}
215         self._writes = {}
216         self._selectables = {}
217         self._continuousPolling = _ContinuousPolling(self)
218         posixbase.PosixReactorBase.__init__(self)
219
220
221     def _add(self, xer, primary, other, selectables, event, antievent):
222         """
223         Private method for adding a descriptor from the event loop.
224
225         It takes care of adding it if  new or modifying it if already added
226         for another state (read -> read/write for example).
227         """
228         fd = xer.fileno()
229         if fd not in primary:
230             flags = event
231             # epoll_ctl can raise all kinds of IOErrors, and every one
232             # indicates a bug either in the reactor or application-code.
233             # Let them all through so someone sees a traceback and fixes
234             # something.  We'll do the same thing for every other call to
235             # this method in this file.
236             if fd in other:
237                 flags |= antievent
238                 self._poller.modify(fd, flags)
239             else:
240                 self._poller.register(fd, flags)
241
242             # Update our own tracking state *only* after the epoll call has
243             # succeeded.  Otherwise we may get out of sync.
244             primary[fd] = 1
245             selectables[fd] = xer
246
247
248     def addReader(self, reader):
249         """
250         Add a FileDescriptor for notification of data available to read.
251         """
252         try:
253             self._add(reader, self._reads, self._writes, self._selectables,
254                       _epoll.EPOLLIN, _epoll.EPOLLOUT)
255         except IOError, e:
256             if e.errno == errno.EPERM:
257                 # epoll(7) doesn't support certain file descriptors,
258                 # e.g. filesystem files, so for those we just poll
259                 # continuously:
260                 self._continuousPolling.addReader(reader)
261             else:
262                 raise
263
264
265     def addWriter(self, writer):
266         """
267         Add a FileDescriptor for notification of data available to write.
268         """
269         try:
270             self._add(writer, self._writes, self._reads, self._selectables,
271                       _epoll.EPOLLOUT, _epoll.EPOLLIN)
272         except IOError, e:
273             if e.errno == errno.EPERM:
274                 # epoll(7) doesn't support certain file descriptors,
275                 # e.g. filesystem files, so for those we just poll
276                 # continuously:
277                 self._continuousPolling.addWriter(writer)
278             else:
279                 raise
280
281
282     def _remove(self, xer, primary, other, selectables, event, antievent):
283         """
284         Private method for removing a descriptor from the event loop.
285
286         It does the inverse job of _add, and also add a check in case of the fd
287         has gone away.
288         """
289         fd = xer.fileno()
290         if fd == -1:
291             for fd, fdes in selectables.items():
292                 if xer is fdes:
293                     break
294             else:
295                 return
296         if fd in primary:
297             if fd in other:
298                 flags = antievent
299                 # See comment above modify call in _add.
300                 self._poller.modify(fd, flags)
301             else:
302                 del selectables[fd]
303                 # See comment above _control call in _add.
304                 self._poller.unregister(fd)
305             del primary[fd]
306
307
308     def removeReader(self, reader):
309         """
310         Remove a Selectable for notification of data available to read.
311         """
312         if self._continuousPolling.isReading(reader):
313             self._continuousPolling.removeReader(reader)
314             return
315         self._remove(reader, self._reads, self._writes, self._selectables,
316                      _epoll.EPOLLIN, _epoll.EPOLLOUT)
317
318
319     def removeWriter(self, writer):
320         """
321         Remove a Selectable for notification of data available to write.
322         """
323         if self._continuousPolling.isWriting(writer):
324             self._continuousPolling.removeWriter(writer)
325             return
326         self._remove(writer, self._writes, self._reads, self._selectables,
327                      _epoll.EPOLLOUT, _epoll.EPOLLIN)
328
329
330     def removeAll(self):
331         """
332         Remove all selectables, and return a list of them.
333         """
334         return (self._removeAll(
335                 [self._selectables[fd] for fd in self._reads],
336                 [self._selectables[fd] for fd in self._writes]) +
337                 self._continuousPolling.removeAll())
338
339
340     def getReaders(self):
341         return ([self._selectables[fd] for fd in self._reads] +
342                 self._continuousPolling.getReaders())
343
344
345     def getWriters(self):
346         return ([self._selectables[fd] for fd in self._writes] +
347                 self._continuousPolling.getWriters())
348
349
350     def doPoll(self, timeout):
351         """
352         Poll the poller for new events.
353         """
354         if timeout is None:
355             timeout = -1  # Wait indefinitely.
356
357         try:
358             # Limit the number of events to the number of io objects we're
359             # currently tracking (because that's maybe a good heuristic) and
360             # the amount of time we block to the value specified by our
361             # caller.
362             l = self._poller.poll(timeout, len(self._selectables))
363         except IOError, err:
364             if err.errno == errno.EINTR:
365                 return
366             # See epoll_wait(2) for documentation on the other conditions
367             # under which this can fail.  They can only be due to a serious
368             # programming error on our part, so let's just announce them
369             # loudly.
370             raise
371
372         _drdw = self._doReadOrWrite
373         for fd, event in l:
374             try:
375                 selectable = self._selectables[fd]
376             except KeyError:
377                 pass
378             else:
379                 log.callWithLogger(selectable, _drdw, selectable, fd, event)
380
381     doIteration = doPoll
382
383
384 def install():
385     """
386     Install the epoll() reactor.
387     """
388     p = EPollReactor()
389     from twisted.internet.main import installReactor
390     installReactor(p)
391
392
393 __all__ = ["EPollReactor", "install"]
394