Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / internet / cfreactor.py
1 # -*- test-case-name: twisted.internet.test.test_core -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 A reactor for integrating with U{CFRunLoop<http://bit.ly/cfrunloop>}, the
7 CoreFoundation main loop used by MacOS X.
8
9 This is useful for integrating Twisted with U{PyObjC<http://pyobjc.sf.net/>}
10 applications.
11 """
12
13 __all__ = [
14     'install',
15     'CFReactor'
16 ]
17
18 import sys
19
20 from zope.interface import implements
21
22 from twisted.internet.interfaces import IReactorFDSet
23 from twisted.internet.posixbase import PosixReactorBase, _Waker
24 from twisted.internet.posixbase import _NO_FILEDESC
25
26 from twisted.python import log
27
28 from CoreFoundation import (
29     CFRunLoopAddSource, CFRunLoopRemoveSource, CFRunLoopGetMain, CFRunLoopRun,
30     CFRunLoopStop, CFRunLoopTimerCreate, CFRunLoopAddTimer,
31     CFRunLoopTimerInvalidate, kCFAllocatorDefault, kCFRunLoopCommonModes,
32     CFAbsoluteTimeGetCurrent)
33
34 from CFNetwork import (
35     CFSocketCreateWithNative, CFSocketSetSocketFlags, CFSocketEnableCallBacks,
36     CFSocketCreateRunLoopSource, CFSocketDisableCallBacks, CFSocketInvalidate,
37     kCFSocketWriteCallBack, kCFSocketReadCallBack, kCFSocketConnectCallBack,
38     kCFSocketAutomaticallyReenableReadCallBack,
39     kCFSocketAutomaticallyReenableWriteCallBack)
40
41
42 _READ = 0
43 _WRITE = 1
44 _preserveSOError = 1 << 6
45
46
47 class _WakerPlus(_Waker):
48     """
49     The normal Twisted waker will simply wake up the main loop, which causes an
50     iteration to run, which in turn causes L{PosixReactorBase.runUntilCurrent}
51     to get invoked.
52
53     L{CFReactor} has a slightly different model of iteration, though: rather
54     than have each iteration process the thread queue, then timed calls, then
55     file descriptors, each callback is run as it is dispatched by the CFRunLoop
56     observer which triggered it.
57
58     So this waker needs to not only unblock the loop, but also make sure the
59     work gets done; so, it reschedules the invocation of C{runUntilCurrent} to
60     be immediate (0 seconds from now) even if there is no timed call work to
61     do.
62     """
63
64     def doRead(self):
65         """
66         Wake up the loop and force C{runUntilCurrent} to run immediately in the
67         next timed iteration.
68         """
69         result = _Waker.doRead(self)
70         self.reactor._scheduleSimulate(True)
71         return result
72
73
74
75 class CFReactor(PosixReactorBase):
76     """
77     The CoreFoundation reactor.
78
79     You probably want to use this via the L{install} API.
80
81     @ivar _fdmap: a dictionary, mapping an integer (a file descriptor) to a
82         4-tuple of:
83
84             - source: a C{CFRunLoopSource}; the source associated with this
85               socket.
86             - socket: a C{CFSocket} wrapping the file descriptor.
87             - descriptor: an L{IReadDescriptor} and/or L{IWriteDescriptor}
88               provider.
89             - read-write: a 2-C{list} of booleans: respectively, whether this
90               descriptor is currently registered for reading or registered for
91               writing.
92
93     @ivar _idmap: a dictionary, mapping the id() of an L{IReadDescriptor} or
94         L{IWriteDescriptor} to a C{fd} in L{_fdmap}.  Implemented in this
95         manner so that we don't have to rely (even more) on the hashability of
96         L{IReadDescriptor} providers, and we know that they won't be collected
97         since these are kept in sync with C{_fdmap}.  Necessary because the
98         .fileno() of a file descriptor may change at will, so we need to be
99         able to look up what its file descriptor I{used} to be, so that we can
100         look it up in C{_fdmap}
101
102     @ivar _cfrunloop: the L{CFRunLoop} pyobjc object wrapped by this reactor.
103
104     @ivar _inCFLoop: Is L{CFRunLoopRun} currently running?
105
106     @type _inCFLoop: C{bool}
107
108     @ivar _currentSimulator: if a CFTimer is currently scheduled with the CF
109         run loop to run Twisted callLater calls, this is a reference to it.
110         Otherwise, it is C{None}
111     """
112
113     implements(IReactorFDSet)
114
115     def __init__(self, runLoop=None, runner=None):
116         self._fdmap = {}
117         self._idmap = {}
118         if runner is None:
119             runner = CFRunLoopRun
120         self._runner = runner
121
122         if runLoop is None:
123             runLoop = CFRunLoopGetMain()
124         self._cfrunloop = runLoop
125         PosixReactorBase.__init__(self)
126
127
128     def installWaker(self):
129         """
130         Override C{installWaker} in order to use L{_WakerPlus}; otherwise this
131         should be exactly the same as the parent implementation.
132         """
133         if not self.waker:
134             self.waker = _WakerPlus(self)
135             self._internalReaders.add(self.waker)
136             self.addReader(self.waker)
137
138
139     def _socketCallback(self, cfSocket, callbackType,
140                         ignoredAddress, ignoredData, context):
141         """
142         The socket callback issued by CFRunLoop.  This will issue C{doRead} or
143         C{doWrite} calls to the L{IReadDescriptor} and L{IWriteDescriptor}
144         registered with the file descriptor that we are being notified of.
145
146         @param cfSocket: The L{CFSocket} which has got some activity.
147
148         @param callbackType: The type of activity that we are being notified
149             of.  Either L{kCFSocketReadCallBack} or L{kCFSocketWriteCallBack}.
150
151         @param ignoredAddress: Unused, because this is not used for either of
152             the callback types we register for.
153
154         @param ignoredData: Unused, because this is not used for either of the
155             callback types we register for.
156
157         @param context: The data associated with this callback by
158             L{CFSocketCreateWithNative} (in L{CFReactor._watchFD}).  A 2-tuple
159             of C{(int, CFRunLoopSource)}.
160         """
161         (fd, smugglesrc) = context
162         if fd not in self._fdmap:
163             # Spurious notifications seem to be generated sometimes if you
164             # CFSocketDisableCallBacks in the middle of an event.  I don't know
165             # about this FD, any more, so let's get rid of it.
166             CFRunLoopRemoveSource(
167                 self._cfrunloop, smugglesrc, kCFRunLoopCommonModes
168             )
169             return
170
171         why = None
172         isRead = False
173         src, skt, readWriteDescriptor, rw = self._fdmap[fd]
174         try:
175             if readWriteDescriptor.fileno() == -1:
176                 why = _NO_FILEDESC
177             else:
178                 isRead = callbackType == kCFSocketReadCallBack
179                 # CFSocket seems to deliver duplicate read/write notifications
180                 # sometimes, especially a duplicate writability notification
181                 # when first registering the socket.  This bears further
182                 # investigation, since I may have been mis-interpreting the
183                 # behavior I was seeing. (Running the full Twisted test suite,
184                 # while thorough, is not always entirely clear.) Until this has
185                 # been more thoroughly investigated , we consult our own
186                 # reading/writing state flags to determine whether we should
187                 # actually attempt a doRead/doWrite first.  -glyph
188                 if isRead:
189                     if rw[_READ]:
190                         why = log.callWithLogger(
191                             readWriteDescriptor, readWriteDescriptor.doRead)
192                 else:
193                     if rw[_WRITE]:
194                         why = log.callWithLogger(
195                             readWriteDescriptor, readWriteDescriptor.doWrite)
196         except:
197             why = sys.exc_info()[1]
198             log.err()
199         if why:
200             self._disconnectSelectable(readWriteDescriptor, why, isRead)
201
202
203     def _watchFD(self, fd, descr, flag):
204         """
205         Register a file descriptor with the L{CFRunLoop}, or modify its state
206         so that it's listening for both notifications (read and write) rather
207         than just one; used to implement C{addReader} and C{addWriter}.
208
209         @param fd: The file descriptor.
210
211         @type fd: C{int}
212
213         @param descr: the L{IReadDescriptor} or L{IWriteDescriptor}
214
215         @param flag: the flag to register for callbacks on, either
216             L{kCFSocketReadCallBack} or L{kCFSocketWriteCallBack}
217         """
218         if fd == -1:
219             raise RuntimeError("Invalid file descriptor.")
220         if fd in self._fdmap:
221             src, cfs, gotdescr, rw = self._fdmap[fd]
222             # do I need to verify that it's the same descr?
223         else:
224             ctx = []
225             ctx.append(fd)
226             cfs = CFSocketCreateWithNative(
227                 kCFAllocatorDefault, fd,
228                 kCFSocketReadCallBack | kCFSocketWriteCallBack |
229                 kCFSocketConnectCallBack,
230                 self._socketCallback, ctx
231             )
232             CFSocketSetSocketFlags(
233                 cfs,
234                 kCFSocketAutomaticallyReenableReadCallBack |
235                 kCFSocketAutomaticallyReenableWriteCallBack |
236
237                 # This extra flag is to ensure that CF doesn't (destructively,
238                 # because destructively is the only way to do it) retrieve
239                 # SO_ERROR and thereby break twisted.internet.tcp.BaseClient,
240                 # which needs SO_ERROR to tell it whether or not it needs to
241                 # call connect_ex a second time.
242                 _preserveSOError
243             )
244             src = CFSocketCreateRunLoopSource(kCFAllocatorDefault, cfs, 0)
245             ctx.append(src)
246             CFRunLoopAddSource(self._cfrunloop, src, kCFRunLoopCommonModes)
247             CFSocketDisableCallBacks(
248                 cfs,
249                 kCFSocketReadCallBack | kCFSocketWriteCallBack |
250                 kCFSocketConnectCallBack
251             )
252             rw = [False, False]
253             self._idmap[id(descr)] = fd
254             self._fdmap[fd] = src, cfs, descr, rw
255         rw[self._flag2idx(flag)] = True
256         CFSocketEnableCallBacks(cfs, flag)
257
258
259     def _flag2idx(self, flag):
260         """
261         Convert a C{kCFSocket...} constant to an index into the read/write
262         state list (C{_READ} or C{_WRITE}) (the 4th element of the value of
263         C{self._fdmap}).
264
265         @param flag: C{kCFSocketReadCallBack} or C{kCFSocketWriteCallBack}
266
267         @return: C{_READ} or C{_WRITE}
268         """
269         return {kCFSocketReadCallBack: _READ,
270                 kCFSocketWriteCallBack: _WRITE}[flag]
271
272
273     def _unwatchFD(self, fd, descr, flag):
274         """
275         Unregister a file descriptor with the L{CFRunLoop}, or modify its state
276         so that it's listening for only one notification (read or write) as
277         opposed to both; used to implement C{removeReader} and C{removeWriter}.
278
279         @param fd: a file descriptor
280
281         @type fd: C{int}
282
283         @param descr: an L{IReadDescriptor} or L{IWriteDescriptor}
284
285         @param flag: L{kCFSocketWriteCallBack} L{kCFSocketReadCallBack}
286         """
287         if id(descr) not in self._idmap:
288             return
289         if fd == -1:
290             # need to deal with it in this case, I think.
291             realfd = self._idmap[id(descr)]
292         else:
293             realfd = fd
294         src, cfs, descr, rw = self._fdmap[realfd]
295         CFSocketDisableCallBacks(cfs, flag)
296         rw[self._flag2idx(flag)] = False
297         if not rw[_READ] and not rw[_WRITE]:
298             del self._idmap[id(descr)]
299             del self._fdmap[realfd]
300             CFRunLoopRemoveSource(self._cfrunloop, src, kCFRunLoopCommonModes)
301             CFSocketInvalidate(cfs)
302
303
304     def addReader(self, reader):
305         """
306         Implement L{IReactorFDSet.addReader}.
307         """
308         self._watchFD(reader.fileno(), reader, kCFSocketReadCallBack)
309
310
311     def addWriter(self, writer):
312         """
313         Implement L{IReactorFDSet.addWriter}.
314         """
315         self._watchFD(writer.fileno(), writer, kCFSocketWriteCallBack)
316
317
318     def removeReader(self, reader):
319         """
320         Implement L{IReactorFDSet.removeReader}.
321         """
322         self._unwatchFD(reader.fileno(), reader, kCFSocketReadCallBack)
323
324
325     def removeWriter(self, writer):
326         """
327         Implement L{IReactorFDSet.removeWriter}.
328         """
329         self._unwatchFD(writer.fileno(), writer, kCFSocketWriteCallBack)
330
331
332     def removeAll(self):
333         """
334         Implement L{IReactorFDSet.removeAll}.
335         """
336         allDesc = set([descr for src, cfs, descr, rw in self._fdmap.values()])
337         allDesc -= set(self._internalReaders)
338         for desc in allDesc:
339             self.removeReader(desc)
340             self.removeWriter(desc)
341         return list(allDesc)
342
343
344     def getReaders(self):
345         """
346         Implement L{IReactorFDSet.getReaders}.
347         """
348         return [descr for src, cfs, descr, rw in self._fdmap.values()
349                 if rw[_READ]]
350
351
352     def getWriters(self):
353         """
354         Implement L{IReactorFDSet.getWriters}.
355         """
356         return [descr for src, cfs, descr, rw in self._fdmap.values()
357                 if rw[_WRITE]]
358
359
360     def _moveCallLaterSooner(self, tple):
361         """
362         Override L{PosixReactorBase}'s implementation of L{IDelayedCall.reset}
363         so that it will immediately reschedule.  Normally
364         C{_moveCallLaterSooner} depends on the fact that C{runUntilCurrent} is
365         always run before the mainloop goes back to sleep, so this forces it to
366         immediately recompute how long the loop needs to stay asleep.
367         """
368         result = PosixReactorBase._moveCallLaterSooner(self, tple)
369         self._scheduleSimulate()
370         return result
371
372
373     _inCFLoop = False
374
375     def mainLoop(self):
376         """
377         Run the runner (L{CFRunLoopRun} or something that calls it), which runs
378         the run loop until C{crash()} is called.
379         """
380         self._inCFLoop = True
381         try:
382             self._runner()
383         finally:
384             self._inCFLoop = False
385
386
387     _currentSimulator = None
388
389     def _scheduleSimulate(self, force=False):
390         """
391         Schedule a call to C{self.runUntilCurrent}.  This will cancel the
392         currently scheduled call if it is already scheduled.
393
394         @param force: Even if there are no timed calls, make sure that
395             C{runUntilCurrent} runs immediately (in a 0-seconds-from-now
396             {CFRunLoopTimer}).  This is necessary for calls which need to
397             trigger behavior of C{runUntilCurrent} other than running timed
398             calls, such as draining the thread call queue or calling C{crash()}
399             when the appropriate flags are set.
400
401         @type force: C{bool}
402         """
403         if self._currentSimulator is not None:
404             CFRunLoopTimerInvalidate(self._currentSimulator)
405             self._currentSimulator = None
406         timeout = self.timeout()
407         if force:
408             timeout = 0.0
409         if timeout is not None:
410             fireDate = (CFAbsoluteTimeGetCurrent() + timeout)
411             def simulate(cftimer, extra):
412                 self._currentSimulator = None
413                 self.runUntilCurrent()
414                 self._scheduleSimulate()
415             c = self._currentSimulator = CFRunLoopTimerCreate(
416                 kCFAllocatorDefault, fireDate,
417                 0, 0, 0, simulate, None
418             )
419             CFRunLoopAddTimer(self._cfrunloop, c, kCFRunLoopCommonModes)
420
421
422     def callLater(self, _seconds, _f, *args, **kw):
423         """
424         Implement L{IReactorTime.callLater}.
425         """
426         delayedCall = PosixReactorBase.callLater(
427             self, _seconds, _f, *args, **kw
428         )
429         self._scheduleSimulate()
430         return delayedCall
431
432
433     def stop(self):
434         """
435         Implement L{IReactorCore.stop}.
436         """
437         PosixReactorBase.stop(self)
438         self._scheduleSimulate(True)
439
440
441     def crash(self):
442         """
443         Implement L{IReactorCore.crash}
444         """
445         wasStarted = self._started
446         PosixReactorBase.crash(self)
447         if self._inCFLoop:
448             self._stopNow()
449         else:
450             if wasStarted:
451                 self.callLater(0, self._stopNow)
452
453
454     def _stopNow(self):
455         """
456         Immediately stop the CFRunLoop (which must be running!).
457         """
458         CFRunLoopStop(self._cfrunloop)
459
460
461     def iterate(self, delay=0):
462         """
463         Emulate the behavior of C{iterate()} for things that want to call it,
464         by letting the loop run for a little while and then scheduling a timed
465         call to exit it.
466         """
467         self.callLater(delay, self._stopNow)
468         self.mainLoop()
469
470
471
472 def install(runLoop=None, runner=None):
473     """
474     Configure the twisted mainloop to be run inside CFRunLoop.
475
476     @param runLoop: the run loop to use.
477
478     @param runner: the function to call in order to actually invoke the main
479         loop.  This will default to L{CFRunLoopRun} if not specified.  However,
480         this is not an appropriate choice for GUI applications, as you need to
481         run NSApplicationMain (or something like it).  For example, to run the
482         Twisted mainloop in a PyObjC application, your C{main.py} should look
483         something like this::
484
485             from PyObjCTools import AppHelper
486             from twisted.internet.cfreactor import install
487             install(runner=AppHelper.runEventLoop)
488             # initialize your application
489             reactor.run()
490
491     @return: The installed reactor.
492
493     @rtype: L{CFReactor}
494     """
495
496     reactor = CFReactor(runLoop=runLoop, runner=runner)
497     from twisted.internet.main import installReactor
498     installReactor(reactor)
499     return reactor
500
501