1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Tests for implementations of L{IReactorUDP}.
10 from socket import SOCK_DGRAM
12 from zope.interface import implements
13 from zope.interface.verify import verifyObject
15 from twisted.python import context
16 from twisted.python.log import ILogContext, err
17 from twisted.internet.test.reactormixins import ReactorBuilder
18 from twisted.internet.defer import Deferred, maybeDeferred
19 from twisted.internet.interfaces import ILoggingContext, IListeningPort
20 from twisted.internet.address import IPv4Address
21 from twisted.internet.protocol import DatagramProtocol
23 from twisted.internet.test.test_tcp import findFreePort
24 from twisted.internet.test.connectionmixins import LogObserverMixin
27 class UDPPortMixin(object):
28 def getListeningPort(self, reactor, protocol):
30 Get a UDP port from a reactor.
32 return reactor.listenUDP(0, protocol)
35 def getExpectedStartListeningLogMessage(self, port, protocol):
37 Get the message expected to be logged when a UDP port starts listening.
39 return "%s starting on %d" % (protocol, port.getHost().port)
42 def getExpectedConnectionLostLogMessage(self, port):
44 Get the expected connection lost message for a UDP port.
46 return "(UDP Port %s Closed)" % (port.getHost().port,)
50 class DatagramTransportTestsMixin(LogObserverMixin):
52 Mixin defining tests which apply to any port/datagram based transport.
54 def test_startedListeningLogMessage(self):
56 When a port starts, a message including a description of the associated
59 loggedMessages = self.observe()
60 reactor = self.buildReactor()
61 class SomeProtocol(DatagramProtocol):
62 implements(ILoggingContext)
64 return "Crazy Protocol"
65 protocol = SomeProtocol()
66 p = self.getListeningPort(reactor, protocol)
67 expectedMessage = self.getExpectedStartListeningLogMessage(
69 self.assertEqual((expectedMessage,), loggedMessages[0]['message'])
72 def test_connectionLostLogMessage(self):
74 When a connection is lost, an informative message should be logged (see
75 L{getExpectedConnectionLostLogMessage}): an address identifying the port
76 and the fact that it was closed.
78 loggedMessages = self.observe()
79 reactor = self.buildReactor()
80 p = self.getListeningPort(reactor, DatagramProtocol())
81 expectedMessage = self.getExpectedConnectionLostLogMessage(p)
83 def stopReactor(ignored):
86 def doStopListening():
88 maybeDeferred(p.stopListening).addCallback(stopReactor)
90 reactor.callWhenRunning(doStopListening)
91 self.runReactor(reactor)
93 self.assertEqual((expectedMessage,), loggedMessages[0]['message'])
96 def test_stopProtocolScheduling(self):
98 L{DatagramProtocol.stopProtocol} is called asynchronously (ie, not
99 re-entrantly) when C{stopListening} is used to stop the the datagram
102 class DisconnectingProtocol(DatagramProtocol):
106 inStartProtocol = False
107 stoppedInStart = False
109 def startProtocol(self):
111 self.inStartProtocol = True
112 self.transport.stopListening()
113 self.inStartProtocol = False
115 def stopProtocol(self):
117 self.stoppedInStart = self.inStartProtocol
120 reactor = self.buildReactor()
121 protocol = DisconnectingProtocol()
122 self.getListeningPort(reactor, protocol)
123 self.runReactor(reactor)
125 self.assertTrue(protocol.started)
126 self.assertTrue(protocol.stopped)
127 self.assertFalse(protocol.stoppedInStart)
131 class UDPServerTestsBuilder(ReactorBuilder, UDPPortMixin,
132 DatagramTransportTestsMixin):
134 Builder defining tests relating to L{IReactorUDP.listenUDP}.
136 def test_interface(self):
138 L{IReactorUDP.listenUDP} returns an object providing L{IListeningPort}.
140 reactor = self.buildReactor()
141 port = reactor.listenUDP(0, DatagramProtocol())
142 self.assertTrue(verifyObject(IListeningPort, port))
145 def test_getHost(self):
147 L{IListeningPort.getHost} returns an L{IPv4Address} giving a
148 dotted-quad of the IPv4 address the port is listening on as well as
151 host, portNumber = findFreePort(type=SOCK_DGRAM)
152 reactor = self.buildReactor()
153 port = reactor.listenUDP(
154 portNumber, DatagramProtocol(), interface=host)
156 port.getHost(), IPv4Address('UDP', host, portNumber))
159 def test_logPrefix(self):
161 Datagram transports implement L{ILoggingContext.logPrefix} to return a
162 message reflecting the protocol they are running.
164 class CustomLogPrefixDatagramProtocol(DatagramProtocol):
165 def __init__(self, prefix):
166 self._prefix = prefix
167 self.system = Deferred()
172 def datagramReceived(self, bytes, addr):
173 if self.system is not None:
176 system.callback(context.get(ILogContext)["system"])
178 reactor = self.buildReactor()
179 protocol = CustomLogPrefixDatagramProtocol("Custom Datagrams")
181 port = reactor.listenUDP(0, protocol)
182 address = port.getHost()
184 def gotSystem(system):
185 self.assertEqual("Custom Datagrams (UDP)", system)
186 d.addCallback(gotSystem)
188 d.addCallback(lambda ignored: reactor.stop())
190 port.write("some bytes", ('127.0.0.1', address.port))
191 self.runReactor(reactor)
194 globals().update(UDPServerTestsBuilder.makeTestCaseClasses())