1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Tests for L{twisted.internet.iocpreactor}.
9 from array import array
10 from struct import pack
11 from socket import AF_INET6, AF_INET, SOCK_STREAM, SOL_SOCKET, error, socket
13 from zope.interface.verify import verifyClass
15 from twisted.trial import unittest
16 from twisted.python.log import msg
17 from twisted.internet.interfaces import IPushProducer
20 from twisted.internet.iocpreactor import iocpsupport as _iocp, tcp, udp
21 from twisted.internet.iocpreactor.reactor import IOCPReactor, EVENTS_PER_LOOP, KEY_NORMAL
22 from twisted.internet.iocpreactor.interfaces import IReadWriteHandle
23 from twisted.internet.iocpreactor.const import SO_UPDATE_ACCEPT_CONTEXT
24 from twisted.internet.iocpreactor.abstract import FileHandle
26 skip = 'This test only applies to IOCPReactor'
29 socket(AF_INET6, SOCK_STREAM).close()
35 class SupportTests(unittest.TestCase):
37 Tests for L{twisted.internet.iocpreactor.iocpsupport}, low-level reactor
38 implementation helpers.
40 def _acceptAddressTest(self, family, localhost):
42 Create a C{SOCK_STREAM} connection to localhost using a socket with an
43 address family of C{family} and assert that the result of
44 L{iocpsupport.get_accept_addrs} is consistent with the result of
45 C{socket.getsockname} and C{socket.getpeername}.
47 msg("family = %r" % (family,))
48 port = socket(family, SOCK_STREAM)
49 self.addCleanup(port.close)
52 client = socket(family, SOCK_STREAM)
53 self.addCleanup(client.close)
54 client.setblocking(False)
56 client.connect((localhost, port.getsockname()[1]))
57 except error, (errnum, message):
58 self.assertIn(errnum, (errno.EINPROGRESS, errno.EWOULDBLOCK))
60 server = socket(family, SOCK_STREAM)
61 self.addCleanup(server.close)
62 buff = array('c', '\0' * 256)
64 0, _iocp.accept(port.fileno(), server.fileno(), buff, None))
66 SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, pack('P', server.fileno()))
68 (family, client.getpeername()[:2], client.getsockname()[:2]),
69 _iocp.get_accept_addrs(server.fileno(), buff))
72 def test_ipv4AcceptAddress(self):
74 L{iocpsupport.get_accept_addrs} returns a three-tuple of address
75 information about the socket associated with the file descriptor passed
76 to it. For a connection using IPv4:
78 - the first element is C{AF_INET}
79 - the second element is a two-tuple of a dotted decimal notation IPv4
80 address and a port number giving the peer address of the connection
81 - the third element is the same type giving the host address of the
84 self._acceptAddressTest(AF_INET, '127.0.0.1')
87 def test_ipv6AcceptAddress(self):
89 Like L{test_ipv4AcceptAddress}, but for IPv6 connections. In this case:
91 - the first element is C{AF_INET6}
92 - the second element is a two-tuple of a hexadecimal IPv6 address
93 literal and a port number giving the peer address of the connection
94 - the third element is the same type giving the host address of the
97 self._acceptAddressTest(AF_INET6, '::1')
98 if ipv6Skip is not None:
99 test_ipv6AcceptAddress.skip = ipv6Skip
103 class IOCPReactorTestCase(unittest.TestCase):
104 def test_noPendingTimerEvents(self):
106 Test reactor behavior (doIteration) when there are no pending time
111 self.failIf(ir.doIteration(None))
114 def test_reactorInterfaces(self):
116 Verify that IOCP socket-representing classes implement IReadWriteHandle
118 self.assertTrue(verifyClass(IReadWriteHandle, tcp.Connection))
119 self.assertTrue(verifyClass(IReadWriteHandle, udp.Port))
122 def test_fileHandleInterfaces(self):
124 Verify that L{Filehandle} implements L{IPushProducer}.
126 self.assertTrue(verifyClass(IPushProducer, FileHandle))
129 def test_maxEventsPerIteration(self):
131 Verify that we don't lose an event when more than EVENTS_PER_LOOP
132 events occur in the same reactor iteration
138 def cb(self, rc, bytes, evt):
143 event = _iocp.Event(fd.cb, fd)
144 for _ in range(EVENTS_PER_LOOP + 1):
145 ir.port.postEvent(0, KEY_NORMAL, event)
147 self.assertEqual(fd.counter, EVENTS_PER_LOOP)
149 self.assertEqual(fd.counter, EVENTS_PER_LOOP + 1)