1 # -*- test-case-name: twisted.internet.test.test_iocp -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Reactor that uses IO completion ports
9 import warnings, socket, sys
11 from zope.interface import implements
13 from twisted.internet import base, interfaces, main, error
14 from twisted.python import log, failure
15 from twisted.internet._dumbwin32proc import Process
16 from twisted.internet.win32eventreactor import _ThreadedWin32EventsMixin
18 from twisted.internet.iocpreactor import iocpsupport as _iocp
19 from twisted.internet.iocpreactor.const import WAIT_TIMEOUT
20 from twisted.internet.iocpreactor import tcp, udp
23 from twisted.protocols.tls import TLSMemoryBIOFactory
25 # Either pyOpenSSL isn't installed, or it is too old for this code to work.
26 # The reactor won't provide IReactorSSL.
27 TLSMemoryBIOFactory = None
30 "pyOpenSSL 0.10 or newer is required for SSL support in iocpreactor. "
31 "It is missing, so the reactor will not support SSL APIs.")
33 _extraInterfaces = (interfaces.IReactorSSL,)
35 from twisted.python.compat import set
37 MAX_TIMEOUT = 2000 # 2 seconds, see doIteration for explanation
39 EVENTS_PER_LOOP = 1000 # XXX: what's a good value here?
41 # keys to associate with normal and waker events
42 KEY_NORMAL, KEY_WAKEUP = range(2)
44 _NO_GETHANDLE = error.ConnectionFdescWentAway(
45 'Handler has no getFileHandle method')
46 _NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')
50 class IOCPReactor(base._SignalReactorMixin, base.ReactorBase,
51 _ThreadedWin32EventsMixin):
52 implements(interfaces.IReactorTCP, interfaces.IReactorUDP,
53 interfaces.IReactorMulticast, interfaces.IReactorProcess,
59 base.ReactorBase.__init__(self)
60 self.port = _iocp.CompletionPort()
64 def addActiveHandle(self, handle):
65 self.handles.add(handle)
68 def removeActiveHandle(self, handle):
69 self.handles.discard(handle)
72 def doIteration(self, timeout):
74 Poll the IO completion port for new events.
76 # This function sits and waits for an IO completion event.
78 # There are two requirements: process IO events as soon as they arrive
79 # and process ctrl-break from the user in a reasonable amount of time.
81 # There are three kinds of waiting.
82 # 1) GetQueuedCompletionStatus (self.port.getEvent) to wait for IO
84 # 2) Msg* family of wait functions that can stop waiting when
85 # ctrl-break is detected (then, I think, Python converts it into a
87 # 3) *Ex family of wait functions that put the thread into an
88 # "alertable" wait state which is supposedly triggered by IO completion
90 # 2) and 3) can be combined. Trouble is, my IO completion is not
91 # causing 3) to trigger, possibly because I do not use an IO completion
92 # callback. Windows is weird.
93 # There are two ways to handle this. I could use MsgWaitForSingleObject
94 # here and GetQueuedCompletionStatus in a thread. Or I could poll with
95 # a reasonable interval. Guess what! Threads are hard.
101 timeout = min(MAX_TIMEOUT, int(1000*timeout))
102 rc, bytes, key, evt = self.port.getEvent(timeout)
104 if rc == WAIT_TIMEOUT:
106 if key != KEY_WAKEUP:
107 assert key == KEY_NORMAL
108 log.callWithLogger(evt.owner, self._callEventCallback,
110 processed_events += 1
111 if processed_events >= EVENTS_PER_LOOP:
113 rc, bytes, key, evt = self.port.getEvent(0)
116 def _callEventCallback(self, rc, bytes, evt):
120 evt.callback(rc, bytes, evt)
121 handfn = getattr(owner, 'getFileHandle', None)
127 return # ignore handles that were closed
129 why = sys.exc_info()[1]
132 owner.loseConnection(failure.Failure(why))
135 def installWaker(self):
140 self.port.postEvent(0, KEY_WAKEUP, None)
143 def registerHandle(self, handle):
144 self.port.addHandle(handle, KEY_NORMAL)
147 def createSocket(self, af, stype):
148 skt = socket.socket(af, stype)
149 self.registerHandle(skt.fileno())
153 def listenTCP(self, port, factory, backlog=50, interface=''):
155 @see: twisted.internet.interfaces.IReactorTCP.listenTCP
157 p = tcp.Port(port, factory, backlog, interface, self)
162 def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
164 @see: twisted.internet.interfaces.IReactorTCP.connectTCP
166 c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
171 if TLSMemoryBIOFactory is not None:
172 def listenSSL(self, port, factory, contextFactory, backlog=50, interface=''):
174 @see: twisted.internet.interfaces.IReactorSSL.listenSSL
176 port = self.listenTCP(
178 TLSMemoryBIOFactory(contextFactory, False, factory),
184 def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAddress=None):
186 @see: twisted.internet.interfaces.IReactorSSL.connectSSL
188 return self.connectTCP(
190 TLSMemoryBIOFactory(contextFactory, True, factory),
191 timeout, bindAddress)
193 def listenSSL(self, port, factory, contextFactory, backlog=50, interface=''):
195 Non-implementation of L{IReactorSSL.listenSSL}. Some dependency
196 is not satisfied. This implementation always raises
197 L{NotImplementedError}.
199 raise NotImplementedError(
200 "pyOpenSSL 0.10 or newer is required for SSL support in "
201 "iocpreactor. It is missing, so the reactor does not support "
205 def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAddress=None):
207 Non-implementation of L{IReactorSSL.connectSSL}. Some dependency
208 is not satisfied. This implementation always raises
209 L{NotImplementedError}.
211 raise NotImplementedError(
212 "pyOpenSSL 0.10 or newer is required for SSL support in "
213 "iocpreactor. It is missing, so the reactor does not support "
217 def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
219 Connects a given L{DatagramProtocol} to the given numeric UDP port.
221 @returns: object conforming to L{IListeningPort}.
223 p = udp.Port(port, protocol, interface, maxPacketSize, self)
228 def listenMulticast(self, port, protocol, interface='', maxPacketSize=8192,
229 listenMultiple=False):
231 Connects a given DatagramProtocol to the given numeric UDP port.
235 @returns: object conforming to IListeningPort.
237 p = udp.MulticastPort(port, protocol, interface, maxPacketSize, self,
243 def spawnProcess(self, processProtocol, executable, args=(), env={},
244 path=None, uid=None, gid=None, usePTY=0, childFDs=None):
249 raise ValueError("Setting UID is unsupported on this platform.")
251 raise ValueError("Setting GID is unsupported on this platform.")
253 raise ValueError("PTYs are unsupported on this platform.")
254 if childFDs is not None:
256 "Custom child file descriptor mappings are unsupported on "
258 args, env = self._checkProcessArgs(args, env)
259 return Process(self, processProtocol, executable, args, env, path)
263 res = list(self.handles)
271 main.installReactor(r)
274 __all__ = ['IOCPReactor', 'install']