1 # -*- test-case-name: twisted.internet.test.test_core -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 A reactor for integrating with U{CFRunLoop<http://bit.ly/cfrunloop>}, the
7 CoreFoundation main loop used by MacOS X.
9 This is useful for integrating Twisted with U{PyObjC<http://pyobjc.sf.net/>}
20 from zope.interface import implements
22 from twisted.internet.interfaces import IReactorFDSet
23 from twisted.internet.posixbase import PosixReactorBase, _Waker
24 from twisted.internet.posixbase import _NO_FILEDESC
26 from twisted.python import log
28 from CoreFoundation import (
29 CFRunLoopAddSource, CFRunLoopRemoveSource, CFRunLoopGetMain, CFRunLoopRun,
30 CFRunLoopStop, CFRunLoopTimerCreate, CFRunLoopAddTimer,
31 CFRunLoopTimerInvalidate, kCFAllocatorDefault, kCFRunLoopCommonModes,
32 CFAbsoluteTimeGetCurrent)
34 from CFNetwork import (
35 CFSocketCreateWithNative, CFSocketSetSocketFlags, CFSocketEnableCallBacks,
36 CFSocketCreateRunLoopSource, CFSocketDisableCallBacks, CFSocketInvalidate,
37 kCFSocketWriteCallBack, kCFSocketReadCallBack, kCFSocketConnectCallBack,
38 kCFSocketAutomaticallyReenableReadCallBack,
39 kCFSocketAutomaticallyReenableWriteCallBack)
44 _preserveSOError = 1 << 6
47 class _WakerPlus(_Waker):
49 The normal Twisted waker will simply wake up the main loop, which causes an
50 iteration to run, which in turn causes L{PosixReactorBase.runUntilCurrent}
53 L{CFReactor} has a slightly different model of iteration, though: rather
54 than have each iteration process the thread queue, then timed calls, then
55 file descriptors, each callback is run as it is dispatched by the CFRunLoop
56 observer which triggered it.
58 So this waker needs to not only unblock the loop, but also make sure the
59 work gets done; so, it reschedules the invocation of C{runUntilCurrent} to
60 be immediate (0 seconds from now) even if there is no timed call work to
66 Wake up the loop and force C{runUntilCurrent} to run immediately in the
69 result = _Waker.doRead(self)
70 self.reactor._scheduleSimulate(True)
75 class CFReactor(PosixReactorBase):
77 The CoreFoundation reactor.
79 You probably want to use this via the L{install} API.
81 @ivar _fdmap: a dictionary, mapping an integer (a file descriptor) to a
84 - source: a C{CFRunLoopSource}; the source associated with this
86 - socket: a C{CFSocket} wrapping the file descriptor.
87 - descriptor: an L{IReadDescriptor} and/or L{IWriteDescriptor}
89 - read-write: a 2-C{list} of booleans: respectively, whether this
90 descriptor is currently registered for reading or registered for
93 @ivar _idmap: a dictionary, mapping the id() of an L{IReadDescriptor} or
94 L{IWriteDescriptor} to a C{fd} in L{_fdmap}. Implemented in this
95 manner so that we don't have to rely (even more) on the hashability of
96 L{IReadDescriptor} providers, and we know that they won't be collected
97 since these are kept in sync with C{_fdmap}. Necessary because the
98 .fileno() of a file descriptor may change at will, so we need to be
99 able to look up what its file descriptor I{used} to be, so that we can
100 look it up in C{_fdmap}
102 @ivar _cfrunloop: the L{CFRunLoop} pyobjc object wrapped by this reactor.
104 @ivar _inCFLoop: Is L{CFRunLoopRun} currently running?
106 @type _inCFLoop: C{bool}
108 @ivar _currentSimulator: if a CFTimer is currently scheduled with the CF
109 run loop to run Twisted callLater calls, this is a reference to it.
110 Otherwise, it is C{None}
113 implements(IReactorFDSet)
115 def __init__(self, runLoop=None, runner=None):
119 runner = CFRunLoopRun
120 self._runner = runner
123 runLoop = CFRunLoopGetMain()
124 self._cfrunloop = runLoop
125 PosixReactorBase.__init__(self)
128 def installWaker(self):
130 Override C{installWaker} in order to use L{_WakerPlus}; otherwise this
131 should be exactly the same as the parent implementation.
134 self.waker = _WakerPlus(self)
135 self._internalReaders.add(self.waker)
136 self.addReader(self.waker)
139 def _socketCallback(self, cfSocket, callbackType,
140 ignoredAddress, ignoredData, context):
142 The socket callback issued by CFRunLoop. This will issue C{doRead} or
143 C{doWrite} calls to the L{IReadDescriptor} and L{IWriteDescriptor}
144 registered with the file descriptor that we are being notified of.
146 @param cfSocket: The L{CFSocket} which has got some activity.
148 @param callbackType: The type of activity that we are being notified
149 of. Either L{kCFSocketReadCallBack} or L{kCFSocketWriteCallBack}.
151 @param ignoredAddress: Unused, because this is not used for either of
152 the callback types we register for.
154 @param ignoredData: Unused, because this is not used for either of the
155 callback types we register for.
157 @param context: The data associated with this callback by
158 L{CFSocketCreateWithNative} (in L{CFReactor._watchFD}). A 2-tuple
159 of C{(int, CFRunLoopSource)}.
161 (fd, smugglesrc) = context
162 if fd not in self._fdmap:
163 # Spurious notifications seem to be generated sometimes if you
164 # CFSocketDisableCallBacks in the middle of an event. I don't know
165 # about this FD, any more, so let's get rid of it.
166 CFRunLoopRemoveSource(
167 self._cfrunloop, smugglesrc, kCFRunLoopCommonModes
173 src, skt, readWriteDescriptor, rw = self._fdmap[fd]
175 if readWriteDescriptor.fileno() == -1:
178 isRead = callbackType == kCFSocketReadCallBack
179 # CFSocket seems to deliver duplicate read/write notifications
180 # sometimes, especially a duplicate writability notification
181 # when first registering the socket. This bears further
182 # investigation, since I may have been mis-interpreting the
183 # behavior I was seeing. (Running the full Twisted test suite,
184 # while thorough, is not always entirely clear.) Until this has
185 # been more thoroughly investigated , we consult our own
186 # reading/writing state flags to determine whether we should
187 # actually attempt a doRead/doWrite first. -glyph
190 why = log.callWithLogger(
191 readWriteDescriptor, readWriteDescriptor.doRead)
194 why = log.callWithLogger(
195 readWriteDescriptor, readWriteDescriptor.doWrite)
197 why = sys.exc_info()[1]
200 self._disconnectSelectable(readWriteDescriptor, why, isRead)
203 def _watchFD(self, fd, descr, flag):
205 Register a file descriptor with the L{CFRunLoop}, or modify its state
206 so that it's listening for both notifications (read and write) rather
207 than just one; used to implement C{addReader} and C{addWriter}.
209 @param fd: The file descriptor.
213 @param descr: the L{IReadDescriptor} or L{IWriteDescriptor}
215 @param flag: the flag to register for callbacks on, either
216 L{kCFSocketReadCallBack} or L{kCFSocketWriteCallBack}
219 raise RuntimeError("Invalid file descriptor.")
220 if fd in self._fdmap:
221 src, cfs, gotdescr, rw = self._fdmap[fd]
222 # do I need to verify that it's the same descr?
226 cfs = CFSocketCreateWithNative(
227 kCFAllocatorDefault, fd,
228 kCFSocketReadCallBack | kCFSocketWriteCallBack |
229 kCFSocketConnectCallBack,
230 self._socketCallback, ctx
232 CFSocketSetSocketFlags(
234 kCFSocketAutomaticallyReenableReadCallBack |
235 kCFSocketAutomaticallyReenableWriteCallBack |
237 # This extra flag is to ensure that CF doesn't (destructively,
238 # because destructively is the only way to do it) retrieve
239 # SO_ERROR and thereby break twisted.internet.tcp.BaseClient,
240 # which needs SO_ERROR to tell it whether or not it needs to
241 # call connect_ex a second time.
244 src = CFSocketCreateRunLoopSource(kCFAllocatorDefault, cfs, 0)
246 CFRunLoopAddSource(self._cfrunloop, src, kCFRunLoopCommonModes)
247 CFSocketDisableCallBacks(
249 kCFSocketReadCallBack | kCFSocketWriteCallBack |
250 kCFSocketConnectCallBack
253 self._idmap[id(descr)] = fd
254 self._fdmap[fd] = src, cfs, descr, rw
255 rw[self._flag2idx(flag)] = True
256 CFSocketEnableCallBacks(cfs, flag)
259 def _flag2idx(self, flag):
261 Convert a C{kCFSocket...} constant to an index into the read/write
262 state list (C{_READ} or C{_WRITE}) (the 4th element of the value of
265 @param flag: C{kCFSocketReadCallBack} or C{kCFSocketWriteCallBack}
267 @return: C{_READ} or C{_WRITE}
269 return {kCFSocketReadCallBack: _READ,
270 kCFSocketWriteCallBack: _WRITE}[flag]
273 def _unwatchFD(self, fd, descr, flag):
275 Unregister a file descriptor with the L{CFRunLoop}, or modify its state
276 so that it's listening for only one notification (read or write) as
277 opposed to both; used to implement C{removeReader} and C{removeWriter}.
279 @param fd: a file descriptor
283 @param descr: an L{IReadDescriptor} or L{IWriteDescriptor}
285 @param flag: L{kCFSocketWriteCallBack} L{kCFSocketReadCallBack}
287 if id(descr) not in self._idmap:
290 # need to deal with it in this case, I think.
291 realfd = self._idmap[id(descr)]
294 src, cfs, descr, rw = self._fdmap[realfd]
295 CFSocketDisableCallBacks(cfs, flag)
296 rw[self._flag2idx(flag)] = False
297 if not rw[_READ] and not rw[_WRITE]:
298 del self._idmap[id(descr)]
299 del self._fdmap[realfd]
300 CFRunLoopRemoveSource(self._cfrunloop, src, kCFRunLoopCommonModes)
301 CFSocketInvalidate(cfs)
304 def addReader(self, reader):
306 Implement L{IReactorFDSet.addReader}.
308 self._watchFD(reader.fileno(), reader, kCFSocketReadCallBack)
311 def addWriter(self, writer):
313 Implement L{IReactorFDSet.addWriter}.
315 self._watchFD(writer.fileno(), writer, kCFSocketWriteCallBack)
318 def removeReader(self, reader):
320 Implement L{IReactorFDSet.removeReader}.
322 self._unwatchFD(reader.fileno(), reader, kCFSocketReadCallBack)
325 def removeWriter(self, writer):
327 Implement L{IReactorFDSet.removeWriter}.
329 self._unwatchFD(writer.fileno(), writer, kCFSocketWriteCallBack)
334 Implement L{IReactorFDSet.removeAll}.
336 allDesc = set([descr for src, cfs, descr, rw in self._fdmap.values()])
337 allDesc -= set(self._internalReaders)
339 self.removeReader(desc)
340 self.removeWriter(desc)
344 def getReaders(self):
346 Implement L{IReactorFDSet.getReaders}.
348 return [descr for src, cfs, descr, rw in self._fdmap.values()
352 def getWriters(self):
354 Implement L{IReactorFDSet.getWriters}.
356 return [descr for src, cfs, descr, rw in self._fdmap.values()
360 def _moveCallLaterSooner(self, tple):
362 Override L{PosixReactorBase}'s implementation of L{IDelayedCall.reset}
363 so that it will immediately reschedule. Normally
364 C{_moveCallLaterSooner} depends on the fact that C{runUntilCurrent} is
365 always run before the mainloop goes back to sleep, so this forces it to
366 immediately recompute how long the loop needs to stay asleep.
368 result = PosixReactorBase._moveCallLaterSooner(self, tple)
369 self._scheduleSimulate()
377 Run the runner (L{CFRunLoopRun} or something that calls it), which runs
378 the run loop until C{crash()} is called.
380 self._inCFLoop = True
384 self._inCFLoop = False
387 _currentSimulator = None
389 def _scheduleSimulate(self, force=False):
391 Schedule a call to C{self.runUntilCurrent}. This will cancel the
392 currently scheduled call if it is already scheduled.
394 @param force: Even if there are no timed calls, make sure that
395 C{runUntilCurrent} runs immediately (in a 0-seconds-from-now
396 {CFRunLoopTimer}). This is necessary for calls which need to
397 trigger behavior of C{runUntilCurrent} other than running timed
398 calls, such as draining the thread call queue or calling C{crash()}
399 when the appropriate flags are set.
403 if self._currentSimulator is not None:
404 CFRunLoopTimerInvalidate(self._currentSimulator)
405 self._currentSimulator = None
406 timeout = self.timeout()
409 if timeout is not None:
410 fireDate = (CFAbsoluteTimeGetCurrent() + timeout)
411 def simulate(cftimer, extra):
412 self._currentSimulator = None
413 self.runUntilCurrent()
414 self._scheduleSimulate()
415 c = self._currentSimulator = CFRunLoopTimerCreate(
416 kCFAllocatorDefault, fireDate,
417 0, 0, 0, simulate, None
419 CFRunLoopAddTimer(self._cfrunloop, c, kCFRunLoopCommonModes)
422 def callLater(self, _seconds, _f, *args, **kw):
424 Implement L{IReactorTime.callLater}.
426 delayedCall = PosixReactorBase.callLater(
427 self, _seconds, _f, *args, **kw
429 self._scheduleSimulate()
435 Implement L{IReactorCore.stop}.
437 PosixReactorBase.stop(self)
438 self._scheduleSimulate(True)
443 Implement L{IReactorCore.crash}
445 wasStarted = self._started
446 PosixReactorBase.crash(self)
451 self.callLater(0, self._stopNow)
456 Immediately stop the CFRunLoop (which must be running!).
458 CFRunLoopStop(self._cfrunloop)
461 def iterate(self, delay=0):
463 Emulate the behavior of C{iterate()} for things that want to call it,
464 by letting the loop run for a little while and then scheduling a timed
467 self.callLater(delay, self._stopNow)
472 def install(runLoop=None, runner=None):
474 Configure the twisted mainloop to be run inside CFRunLoop.
476 @param runLoop: the run loop to use.
478 @param runner: the function to call in order to actually invoke the main
479 loop. This will default to L{CFRunLoopRun} if not specified. However,
480 this is not an appropriate choice for GUI applications, as you need to
481 run NSApplicationMain (or something like it). For example, to run the
482 Twisted mainloop in a PyObjC application, your C{main.py} should look
483 something like this::
485 from PyObjCTools import AppHelper
486 from twisted.internet.cfreactor import install
487 install(runner=AppHelper.runEventLoop)
488 # initialize your application
491 @return: The installed reactor.
496 reactor = CFReactor(runLoop=runLoop, runner=runner)
497 from twisted.internet.main import installReactor
498 installReactor(reactor)