1 # -*- test-case-name: twisted.test.test_internet -*-
3 # Copyright (c) Twisted Matrix Laboratories.
4 # See LICENSE for details.
6 from __future__ import generators
9 Threaded select reactor
11 Maintainer: Bob Ippolito
14 The threadedselectreactor is a specialized reactor for integrating with
15 arbitrary foreign event loop, such as those you find in GUI toolkits.
17 There are three things you'll need to do to use this reactor.
19 Install the reactor at the beginning of your program, before importing
22 | from twisted.internet import _threadedselect
23 | _threadedselect.install()
25 Interleave this reactor with your foreign event loop, at some point after
26 your event loop is initialized::
28 | from twisted.internet import reactor
29 | reactor.interleave(foreignEventLoopWakerFunction)
30 | self.addSystemEventTrigger('after', 'shutdown', foreignEventLoopStop)
32 Instead of shutting down the foreign event loop directly, shut down the
35 | from twisted.internet import reactor
38 In order for Twisted to do its work in the main thread (the thread that
39 interleave is called from), a waker function is necessary. The waker function
40 will be called from a "background" thread with one argument: func.
41 The waker function's purpose is to call func() from the main thread.
42 Many GUI toolkits ship with appropriate waker functions.
43 Some examples of this are wxPython's wx.callAfter (may be wxCallAfter in
44 older versions of wxPython) or PyObjC's PyObjCTools.AppHelper.callAfter.
45 These would be used in place of "foreignEventLoopWakerFunction" in the above
48 The other integration point at which the foreign event loop and this reactor
49 must integrate is shutdown. In order to ensure clean shutdown of Twisted,
50 you must allow for Twisted to come to a complete stop before quitting the
51 application. Typically, you will do this by setting up an after shutdown
52 trigger to stop your foreign event loop, and call reactor.stop() where you
53 would normally have initiated the shutdown procedure for the foreign event
54 loop. Shutdown functions that could be used in place of
55 "foreignEventloopStop" would be the ExitMainLoop method of the wxApp instance
56 with wxPython, or the PyObjCTools.AppHelper.stopEventLoop function.
59 from threading import Thread
60 from Queue import Queue, Empty
61 from time import sleep
64 from zope.interface import implements
66 from twisted.internet.interfaces import IReactorFDSet
67 from twisted.internet import error
68 from twisted.internet import posixbase
69 from twisted.internet.posixbase import _NO_FILENO, _NO_FILEDESC
70 from twisted.python import log, failure, threadable
71 from twisted.persisted import styles
72 from twisted.python.runtime import platformType
75 from errno import EINTR, EBADF
77 from twisted.internet.selectreactor import _select
79 def dictRemove(dct, value):
85 def raiseException(e):
88 class ThreadedSelectReactor(posixbase.PosixReactorBase):
89 """A threaded select() based reactor - runs on all POSIX platforms and on
92 implements(IReactorFDSet)
98 self.toThreadQueue = Queue()
99 self.toMainThread = Queue()
100 self.workerThread = None
101 self.mainWaker = None
102 posixbase.PosixReactorBase.__init__(self)
103 self.addSystemEventTrigger('after', 'shutdown', self._mainLoopShutdown)
106 # we want to wake up from any thread
109 def callLater(self, *args, **kw):
110 tple = posixbase.PosixReactorBase.callLater(self, *args, **kw)
114 def _sendToMain(self, msg, *args):
115 #print >>sys.stderr, 'sendToMain', msg, args
116 self.toMainThread.put((msg, args))
117 if self.mainWaker is not None:
120 def _sendToThread(self, fn, *args):
121 #print >>sys.stderr, 'sendToThread', fn, args
122 self.toThreadQueue.put((fn, args))
124 def _preenDescriptorsInThread(self):
125 log.msg("Malformed file descriptor found. Preening lists.")
126 readers = self.reads.keys()
127 writers = self.writes.keys()
130 for selDict, selList in ((self.reads, readers), (self.writes, writers)):
131 for selectable in selList:
133 select.select([selectable], [selectable], [selectable], 0)
135 log.msg("bad descriptor %s" % selectable)
137 selDict[selectable] = 1
139 def _workerInThread(self):
142 fn, args = self.toThreadQueue.get()
143 #print >>sys.stderr, "worker got", fn, args
146 pass # exception indicates this thread should exit
148 f = failure.Failure()
149 self._sendToMain('Failure', f)
150 #print >>sys.stderr, "worker finished"
152 def _doSelectInThread(self, timeout):
153 """Run one iteration of the I/O monitor loop.
155 This will run all selectables who had input or output readiness
162 r, w, ignored = _select(reads.keys(),
166 except ValueError, ve:
167 # Possibly a file descriptor has gone negative?
169 self._preenDescriptorsInThread()
170 except TypeError, te:
171 # Something *totally* invalid (object w/o fileno, non-integral
174 self._preenDescriptorsInThread()
175 except (select.error, IOError), se:
176 # select(2) encountered an error
177 if se.args[0] in (0, 2):
178 # windows does this if it got an empty list
179 if (not reads) and (not writes):
183 elif se.args[0] == EINTR:
185 elif se.args[0] == EBADF:
186 self._preenDescriptorsInThread()
188 # OK, I really don't know what's going on. Blow up.
190 self._sendToMain('Notify', r, w)
192 def _process_Notify(self, r, w):
193 #print >>sys.stderr, "_process_Notify"
197 _drdw = self._doReadOrWrite
198 _logrun = log.callWithLogger
199 for selectables, method, dct in ((r, "doRead", reads), (w, "doWrite", writes)):
200 for selectable in selectables:
201 # if this was disconnected in another thread, kill it.
202 if selectable not in dct:
204 # This for pausing input when we're not ready for more.
205 _logrun(selectable, _drdw, selectable, method, dct)
206 #print >>sys.stderr, "done _process_Notify"
208 def _process_Failure(self, f):
211 _doIterationInThread = _doSelectInThread
213 def ensureWorkerThread(self):
214 if self.workerThread is None or not self.workerThread.isAlive():
215 self.workerThread = Thread(target=self._workerInThread)
216 self.workerThread.start()
218 def doThreadIteration(self, timeout):
219 self._sendToThread(self._doIterationInThread, timeout)
220 self.ensureWorkerThread()
221 #print >>sys.stderr, 'getting...'
222 msg, args = self.toMainThread.get()
223 #print >>sys.stderr, 'got', msg, args
224 getattr(self, '_process_' + msg)(*args)
226 doIteration = doThreadIteration
228 def _interleave(self):
230 #print >>sys.stderr, "runUntilCurrent"
231 self.runUntilCurrent()
233 t = self.running and t2
234 self._sendToThread(self._doIterationInThread, t)
235 #print >>sys.stderr, "yielding"
237 #print >>sys.stderr, "fetching"
238 msg, args = self.toMainThread.get_nowait()
239 getattr(self, '_process_' + msg)(*args)
241 def interleave(self, waker, *args, **kw):
243 interleave(waker) interleaves this reactor with the
244 current application by moving the blocking parts of
245 the reactor (select() in this case) to a separate
246 thread. This is typically useful for integration with
247 GUI applications which have their own event loop
250 See the module docstring for more information.
252 self.startRunning(*args, **kw)
253 loop = self._interleave()
254 def mainWaker(waker=waker, loop=loop):
255 #print >>sys.stderr, "mainWaker()"
257 self.mainWaker = mainWaker
259 self.ensureWorkerThread()
261 def _mainLoopShutdown(self):
262 self.mainWaker = None
263 if self.workerThread is not None:
264 #print >>sys.stderr, 'getting...'
265 self._sendToThread(raiseException, SystemExit)
269 msg, args = self.toMainThread.get_nowait()
270 #print >>sys.stderr, "ignored:", (msg, args)
273 self.workerThread.join()
274 self.workerThread = None
277 fn, args = self.toThreadQueue.get_nowait()
278 if fn is self._doIterationInThread:
279 log.msg('Iteration is still in the thread queue!')
280 elif fn is raiseException and args[0] is SystemExit:
287 def _doReadOrWrite(self, selectable, method, dict):
289 why = getattr(selectable, method)()
290 handfn = getattr(selectable, 'fileno', None)
296 why = sys.exc_info()[1]
299 self._disconnectSelectable(selectable, why, method == "doRead")
301 def addReader(self, reader):
302 """Add a FileDescriptor for notification of data available to read.
304 self._sendToThread(self.reads.__setitem__, reader, 1)
307 def addWriter(self, writer):
308 """Add a FileDescriptor for notification of data available to write.
310 self._sendToThread(self.writes.__setitem__, writer, 1)
313 def removeReader(self, reader):
314 """Remove a Selectable for notification of data available to read.
316 self._sendToThread(dictRemove, self.reads, reader)
318 def removeWriter(self, writer):
319 """Remove a Selectable for notification of data available to write.
321 self._sendToThread(dictRemove, self.writes, writer)
324 return self._removeAll(self.reads, self.writes)
327 def getReaders(self):
328 return self.reads.keys()
331 def getWriters(self):
332 return self.writes.keys()
337 Extend the base stop implementation to also wake up the select thread so
338 that C{runUntilCurrent} notices the reactor should stop.
340 posixbase.PosixReactorBase.stop(self)
344 def run(self, installSignalHandlers=1):
345 self.startRunning(installSignalHandlers=installSignalHandlers)
350 self.interleave(q.put)
354 except StopIteration:
360 """Configure the twisted mainloop to be run using the select() reactor.
362 reactor = ThreadedSelectReactor()
363 from twisted.internet.main import installReactor
364 installReactor(reactor)
367 __all__ = ['install']