Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / internet / test / test_iocp.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for L{twisted.internet.iocpreactor}.
6 """
7
8 import errno
9 from array import array
10 from struct import pack
11 from socket import AF_INET6, AF_INET, SOCK_STREAM, SOL_SOCKET, error, socket
12
13 from zope.interface.verify import verifyClass
14
15 from twisted.trial import unittest
16 from twisted.python.log import msg
17 from twisted.internet.interfaces import IPushProducer
18
19 try:
20     from twisted.internet.iocpreactor import iocpsupport as _iocp, tcp, udp
21     from twisted.internet.iocpreactor.reactor import IOCPReactor, EVENTS_PER_LOOP, KEY_NORMAL
22     from twisted.internet.iocpreactor.interfaces import IReadWriteHandle
23     from twisted.internet.iocpreactor.const import SO_UPDATE_ACCEPT_CONTEXT
24     from twisted.internet.iocpreactor.abstract import FileHandle
25 except ImportError:
26     skip = 'This test only applies to IOCPReactor'
27
28 try:
29     socket(AF_INET6, SOCK_STREAM).close()
30 except error, e:
31     ipv6Skip = str(e)
32 else:
33     ipv6Skip = None
34
35 class SupportTests(unittest.TestCase):
36     """
37     Tests for L{twisted.internet.iocpreactor.iocpsupport}, low-level reactor
38     implementation helpers.
39     """
40     def _acceptAddressTest(self, family, localhost):
41         """
42         Create a C{SOCK_STREAM} connection to localhost using a socket with an
43         address family of C{family} and assert that the result of
44         L{iocpsupport.get_accept_addrs} is consistent with the result of
45         C{socket.getsockname} and C{socket.getpeername}.
46         """
47         msg("family = %r" % (family,))
48         port = socket(family, SOCK_STREAM)
49         self.addCleanup(port.close)
50         port.bind(('', 0))
51         port.listen(1)
52         client = socket(family, SOCK_STREAM)
53         self.addCleanup(client.close)
54         client.setblocking(False)
55         try:
56             client.connect((localhost, port.getsockname()[1]))
57         except error, (errnum, message):
58             self.assertIn(errnum, (errno.EINPROGRESS, errno.EWOULDBLOCK))
59
60         server = socket(family, SOCK_STREAM)
61         self.addCleanup(server.close)
62         buff = array('c', '\0' * 256)
63         self.assertEqual(
64             0, _iocp.accept(port.fileno(), server.fileno(), buff, None))
65         server.setsockopt(
66             SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, pack('P', server.fileno()))
67         self.assertEqual(
68             (family, client.getpeername()[:2], client.getsockname()[:2]),
69             _iocp.get_accept_addrs(server.fileno(), buff))
70
71
72     def test_ipv4AcceptAddress(self):
73         """
74         L{iocpsupport.get_accept_addrs} returns a three-tuple of address
75         information about the socket associated with the file descriptor passed
76         to it.  For a connection using IPv4:
77
78           - the first element is C{AF_INET}
79           - the second element is a two-tuple of a dotted decimal notation IPv4
80             address and a port number giving the peer address of the connection
81           - the third element is the same type giving the host address of the
82             connection
83         """
84         self._acceptAddressTest(AF_INET, '127.0.0.1')
85
86
87     def test_ipv6AcceptAddress(self):
88         """
89         Like L{test_ipv4AcceptAddress}, but for IPv6 connections.  In this case:
90
91           - the first element is C{AF_INET6}
92           - the second element is a two-tuple of a hexadecimal IPv6 address
93             literal and a port number giving the peer address of the connection
94           - the third element is the same type giving the host address of the
95             connection
96         """
97         self._acceptAddressTest(AF_INET6, '::1')
98     if ipv6Skip is not None:
99         test_ipv6AcceptAddress.skip = ipv6Skip
100
101
102
103 class IOCPReactorTestCase(unittest.TestCase):
104     def test_noPendingTimerEvents(self):
105         """
106         Test reactor behavior (doIteration) when there are no pending time
107         events.
108         """
109         ir = IOCPReactor()
110         ir.wakeUp()
111         self.failIf(ir.doIteration(None))
112
113
114     def test_reactorInterfaces(self):
115         """
116         Verify that IOCP socket-representing classes implement IReadWriteHandle
117         """
118         self.assertTrue(verifyClass(IReadWriteHandle, tcp.Connection))
119         self.assertTrue(verifyClass(IReadWriteHandle, udp.Port))
120
121
122     def test_fileHandleInterfaces(self):
123         """
124         Verify that L{Filehandle} implements L{IPushProducer}.
125         """
126         self.assertTrue(verifyClass(IPushProducer, FileHandle))
127
128
129     def test_maxEventsPerIteration(self):
130         """
131         Verify that we don't lose an event when more than EVENTS_PER_LOOP
132         events occur in the same reactor iteration
133         """
134         class FakeFD:
135             counter = 0
136             def logPrefix(self):
137                 return 'FakeFD'
138             def cb(self, rc, bytes, evt):
139                 self.counter += 1
140
141         ir = IOCPReactor()
142         fd = FakeFD()
143         event = _iocp.Event(fd.cb, fd)
144         for _ in range(EVENTS_PER_LOOP + 1):
145             ir.port.postEvent(0, KEY_NORMAL, event)
146         ir.doIteration(None)
147         self.assertEqual(fd.counter, EVENTS_PER_LOOP)
148         ir.doIteration(0)
149         self.assertEqual(fd.counter, EVENTS_PER_LOOP + 1)
150