Imported Upstream version 12.1.0
[contrib/python-twisted.git] / twisted / internet / test / test_udp.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for implementations of L{IReactorUDP}.
6 """
7
8 __metaclass__ = type
9
10 from socket import SOCK_DGRAM
11
12 from zope.interface import implements
13 from zope.interface.verify import verifyObject
14
15 from twisted.python import context
16 from twisted.python.log import ILogContext, err
17 from twisted.internet.test.reactormixins import ReactorBuilder
18 from twisted.internet.defer import Deferred, maybeDeferred
19 from twisted.internet.interfaces import ILoggingContext, IListeningPort
20 from twisted.internet.address import IPv4Address
21 from twisted.internet.protocol import DatagramProtocol
22
23 from twisted.internet.test.test_tcp import findFreePort
24 from twisted.internet.test.connectionmixins import LogObserverMixin
25
26
27 class UDPPortMixin(object):
28     def getListeningPort(self, reactor, protocol):
29         """
30         Get a UDP port from a reactor.
31         """
32         return reactor.listenUDP(0, protocol)
33
34
35     def getExpectedStartListeningLogMessage(self, port, protocol):
36         """
37         Get the message expected to be logged when a UDP port starts listening.
38         """
39         return "%s starting on %d" % (protocol, port.getHost().port)
40
41
42     def getExpectedConnectionLostLogMessage(self, port):
43         """
44         Get the expected connection lost message for a UDP port.
45         """
46         return "(UDP Port %s Closed)" % (port.getHost().port,)
47
48
49
50 class DatagramTransportTestsMixin(LogObserverMixin):
51     """
52     Mixin defining tests which apply to any port/datagram based transport.
53     """
54     def test_startedListeningLogMessage(self):
55         """
56         When a port starts, a message including a description of the associated
57         protocol is logged.
58         """
59         loggedMessages = self.observe()
60         reactor = self.buildReactor()
61         class SomeProtocol(DatagramProtocol):
62             implements(ILoggingContext)
63             def logPrefix(self):
64                 return "Crazy Protocol"
65         protocol = SomeProtocol()
66         p = self.getListeningPort(reactor, protocol)
67         expectedMessage = self.getExpectedStartListeningLogMessage(
68             p, "Crazy Protocol")
69         self.assertEqual((expectedMessage,), loggedMessages[0]['message'])
70
71
72     def test_connectionLostLogMessage(self):
73         """
74         When a connection is lost, an informative message should be logged (see
75         L{getExpectedConnectionLostLogMessage}): an address identifying the port
76         and the fact that it was closed.
77         """
78         loggedMessages = self.observe()
79         reactor = self.buildReactor()
80         p = self.getListeningPort(reactor, DatagramProtocol())
81         expectedMessage = self.getExpectedConnectionLostLogMessage(p)
82
83         def stopReactor(ignored):
84             reactor.stop()
85
86         def doStopListening():
87             del loggedMessages[:]
88             maybeDeferred(p.stopListening).addCallback(stopReactor)
89
90         reactor.callWhenRunning(doStopListening)
91         self.runReactor(reactor)
92
93         self.assertEqual((expectedMessage,), loggedMessages[0]['message'])
94
95
96     def test_stopProtocolScheduling(self):
97         """
98         L{DatagramProtocol.stopProtocol} is called asynchronously (ie, not
99         re-entrantly) when C{stopListening} is used to stop the the datagram
100         transport.
101         """
102         class DisconnectingProtocol(DatagramProtocol):
103
104             started = False
105             stopped = False
106             inStartProtocol = False
107             stoppedInStart = False
108
109             def startProtocol(self):
110                 self.started = True
111                 self.inStartProtocol = True
112                 self.transport.stopListening()
113                 self.inStartProtocol = False
114
115             def stopProtocol(self):
116                 self.stopped = True
117                 self.stoppedInStart = self.inStartProtocol
118                 reactor.stop()
119
120         reactor = self.buildReactor()
121         protocol = DisconnectingProtocol()
122         self.getListeningPort(reactor, protocol)
123         self.runReactor(reactor)
124
125         self.assertTrue(protocol.started)
126         self.assertTrue(protocol.stopped)
127         self.assertFalse(protocol.stoppedInStart)
128
129
130
131 class UDPServerTestsBuilder(ReactorBuilder, UDPPortMixin,
132                             DatagramTransportTestsMixin):
133     """
134     Builder defining tests relating to L{IReactorUDP.listenUDP}.
135     """
136     def test_interface(self):
137         """
138         L{IReactorUDP.listenUDP} returns an object providing L{IListeningPort}.
139         """
140         reactor = self.buildReactor()
141         port = reactor.listenUDP(0, DatagramProtocol())
142         self.assertTrue(verifyObject(IListeningPort, port))
143
144
145     def test_getHost(self):
146         """
147         L{IListeningPort.getHost} returns an L{IPv4Address} giving a
148         dotted-quad of the IPv4 address the port is listening on as well as
149         the port number.
150         """
151         host, portNumber = findFreePort(type=SOCK_DGRAM)
152         reactor = self.buildReactor()
153         port = reactor.listenUDP(
154             portNumber, DatagramProtocol(), interface=host)
155         self.assertEqual(
156             port.getHost(), IPv4Address('UDP', host, portNumber))
157
158
159     def test_logPrefix(self):
160         """
161         Datagram transports implement L{ILoggingContext.logPrefix} to return a
162         message reflecting the protocol they are running.
163         """
164         class CustomLogPrefixDatagramProtocol(DatagramProtocol):
165             def __init__(self, prefix):
166                 self._prefix = prefix
167                 self.system = Deferred()
168
169             def logPrefix(self):
170                 return self._prefix
171
172             def datagramReceived(self, bytes, addr):
173                 if self.system is not None:
174                     system = self.system
175                     self.system = None
176                     system.callback(context.get(ILogContext)["system"])
177
178         reactor = self.buildReactor()
179         protocol = CustomLogPrefixDatagramProtocol("Custom Datagrams")
180         d = protocol.system
181         port = reactor.listenUDP(0, protocol)
182         address = port.getHost()
183
184         def gotSystem(system):
185             self.assertEqual("Custom Datagrams (UDP)", system)
186         d.addCallback(gotSystem)
187         d.addErrback(err)
188         d.addCallback(lambda ignored: reactor.stop())
189
190         port.write("some bytes", ('127.0.0.1', address.port))
191         self.runReactor(reactor)
192
193
194 globals().update(UDPServerTestsBuilder.makeTestCaseClasses())