Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / internet / iocpreactor / reactor.py
1 # -*- test-case-name: twisted.internet.test.test_iocp -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Reactor that uses IO completion ports
7 """
8
9 import warnings, socket, sys
10
11 from zope.interface import implements
12
13 from twisted.internet import base, interfaces, main, error
14 from twisted.python import log, failure
15 from twisted.internet._dumbwin32proc import Process
16 from twisted.internet.win32eventreactor import _ThreadedWin32EventsMixin
17
18 from twisted.internet.iocpreactor import iocpsupport as _iocp
19 from twisted.internet.iocpreactor.const import WAIT_TIMEOUT
20 from twisted.internet.iocpreactor import tcp, udp
21
22 try:
23     from twisted.protocols.tls import TLSMemoryBIOFactory
24 except ImportError:
25     # Either pyOpenSSL isn't installed, or it is too old for this code to work.
26     # The reactor won't provide IReactorSSL.
27     TLSMemoryBIOFactory = None
28     _extraInterfaces = ()
29     warnings.warn(
30         "pyOpenSSL 0.10 or newer is required for SSL support in iocpreactor. "
31         "It is missing, so the reactor will not support SSL APIs.")
32 else:
33     _extraInterfaces = (interfaces.IReactorSSL,)
34
35 from twisted.python.compat import set
36
37 MAX_TIMEOUT = 2000 # 2 seconds, see doIteration for explanation
38
39 EVENTS_PER_LOOP = 1000 # XXX: what's a good value here?
40
41 # keys to associate with normal and waker events
42 KEY_NORMAL, KEY_WAKEUP = range(2)
43
44 _NO_GETHANDLE = error.ConnectionFdescWentAway(
45                     'Handler has no getFileHandle method')
46 _NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')
47
48
49
50 class IOCPReactor(base._SignalReactorMixin, base.ReactorBase,
51                   _ThreadedWin32EventsMixin):
52     implements(interfaces.IReactorTCP, interfaces.IReactorUDP,
53                interfaces.IReactorMulticast, interfaces.IReactorProcess,
54                *_extraInterfaces)
55
56     port = None
57
58     def __init__(self):
59         base.ReactorBase.__init__(self)
60         self.port = _iocp.CompletionPort()
61         self.handles = set()
62
63
64     def addActiveHandle(self, handle):
65         self.handles.add(handle)
66
67
68     def removeActiveHandle(self, handle):
69         self.handles.discard(handle)
70
71
72     def doIteration(self, timeout):
73         """
74         Poll the IO completion port for new events.
75         """
76         # This function sits and waits for an IO completion event.
77         #
78         # There are two requirements: process IO events as soon as they arrive
79         # and process ctrl-break from the user in a reasonable amount of time.
80         #
81         # There are three kinds of waiting.
82         # 1) GetQueuedCompletionStatus (self.port.getEvent) to wait for IO
83         # events only.
84         # 2) Msg* family of wait functions that can stop waiting when
85         # ctrl-break is detected (then, I think, Python converts it into a
86         # KeyboardInterrupt)
87         # 3) *Ex family of wait functions that put the thread into an
88         # "alertable" wait state which is supposedly triggered by IO completion
89         #
90         # 2) and 3) can be combined. Trouble is, my IO completion is not
91         # causing 3) to trigger, possibly because I do not use an IO completion
92         # callback. Windows is weird.
93         # There are two ways to handle this. I could use MsgWaitForSingleObject
94         # here and GetQueuedCompletionStatus in a thread. Or I could poll with
95         # a reasonable interval. Guess what! Threads are hard.
96
97         processed_events = 0
98         if timeout is None:
99             timeout = MAX_TIMEOUT
100         else:
101             timeout = min(MAX_TIMEOUT, int(1000*timeout))
102         rc, bytes, key, evt = self.port.getEvent(timeout)
103         while 1:
104             if rc == WAIT_TIMEOUT:
105                 break
106             if key != KEY_WAKEUP:
107                 assert key == KEY_NORMAL
108                 log.callWithLogger(evt.owner, self._callEventCallback,
109                                    rc, bytes, evt)
110                 processed_events += 1
111             if processed_events >= EVENTS_PER_LOOP:
112                 break
113             rc, bytes, key, evt = self.port.getEvent(0)
114
115
116     def _callEventCallback(self, rc, bytes, evt):
117         owner = evt.owner
118         why = None
119         try:
120             evt.callback(rc, bytes, evt)
121             handfn = getattr(owner, 'getFileHandle', None)
122             if not handfn:
123                 why = _NO_GETHANDLE
124             elif handfn() == -1:
125                 why = _NO_FILEDESC
126             if why:
127                 return # ignore handles that were closed
128         except:
129             why = sys.exc_info()[1]
130             log.err()
131         if why:
132             owner.loseConnection(failure.Failure(why))
133
134
135     def installWaker(self):
136         pass
137
138
139     def wakeUp(self):
140         self.port.postEvent(0, KEY_WAKEUP, None)
141
142
143     def registerHandle(self, handle):
144         self.port.addHandle(handle, KEY_NORMAL)
145
146
147     def createSocket(self, af, stype):
148         skt = socket.socket(af, stype)
149         self.registerHandle(skt.fileno())
150         return skt
151
152
153     def listenTCP(self, port, factory, backlog=50, interface=''):
154         """
155         @see: twisted.internet.interfaces.IReactorTCP.listenTCP
156         """
157         p = tcp.Port(port, factory, backlog, interface, self)
158         p.startListening()
159         return p
160
161
162     def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
163         """
164         @see: twisted.internet.interfaces.IReactorTCP.connectTCP
165         """
166         c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
167         c.connect()
168         return c
169
170
171     if TLSMemoryBIOFactory is not None:
172         def listenSSL(self, port, factory, contextFactory, backlog=50, interface=''):
173             """
174             @see: twisted.internet.interfaces.IReactorSSL.listenSSL
175             """
176             port = self.listenTCP(
177                 port,
178                 TLSMemoryBIOFactory(contextFactory, False, factory),
179                 backlog, interface)
180             port._type = 'TLS'
181             return port
182
183
184         def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAddress=None):
185             """
186             @see: twisted.internet.interfaces.IReactorSSL.connectSSL
187             """
188             return self.connectTCP(
189                 host, port,
190                 TLSMemoryBIOFactory(contextFactory, True, factory),
191                 timeout, bindAddress)
192     else:
193         def listenSSL(self, port, factory, contextFactory, backlog=50, interface=''):
194             """
195             Non-implementation of L{IReactorSSL.listenSSL}.  Some dependency
196             is not satisfied.  This implementation always raises
197             L{NotImplementedError}.
198             """
199             raise NotImplementedError(
200                 "pyOpenSSL 0.10 or newer is required for SSL support in "
201                 "iocpreactor. It is missing, so the reactor does not support "
202                 "SSL APIs.")
203
204
205         def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAddress=None):
206             """
207             Non-implementation of L{IReactorSSL.connectSSL}.  Some dependency
208             is not satisfied.  This implementation always raises
209             L{NotImplementedError}.
210             """
211             raise NotImplementedError(
212                 "pyOpenSSL 0.10 or newer is required for SSL support in "
213                 "iocpreactor. It is missing, so the reactor does not support "
214                 "SSL APIs.")
215
216
217     def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
218         """
219         Connects a given L{DatagramProtocol} to the given numeric UDP port.
220
221         @returns: object conforming to L{IListeningPort}.
222         """
223         p = udp.Port(port, protocol, interface, maxPacketSize, self)
224         p.startListening()
225         return p
226
227
228     def listenMulticast(self, port, protocol, interface='', maxPacketSize=8192,
229                         listenMultiple=False):
230         """
231         Connects a given DatagramProtocol to the given numeric UDP port.
232
233         EXPERIMENTAL.
234
235         @returns: object conforming to IListeningPort.
236         """
237         p = udp.MulticastPort(port, protocol, interface, maxPacketSize, self,
238                               listenMultiple)
239         p.startListening()
240         return p
241
242
243     def spawnProcess(self, processProtocol, executable, args=(), env={},
244                      path=None, uid=None, gid=None, usePTY=0, childFDs=None):
245         """
246         Spawn a process.
247         """
248         if uid is not None:
249             raise ValueError("Setting UID is unsupported on this platform.")
250         if gid is not None:
251             raise ValueError("Setting GID is unsupported on this platform.")
252         if usePTY:
253             raise ValueError("PTYs are unsupported on this platform.")
254         if childFDs is not None:
255             raise ValueError(
256                 "Custom child file descriptor mappings are unsupported on "
257                 "this platform.")
258         args, env = self._checkProcessArgs(args, env)
259         return Process(self, processProtocol, executable, args, env, path)
260
261
262     def removeAll(self):
263         res = list(self.handles)
264         self.handles.clear()
265         return res
266
267
268
269 def install():
270     r = IOCPReactor()
271     main.installReactor(r)
272
273
274 __all__ = ['IOCPReactor', 'install']
275