1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Test cases for Twisted.names' root resolver.
8 from random import randrange
10 from zope.interface import implements
11 from zope.interface.verify import verifyClass
13 from twisted.python.log import msg
14 from twisted.trial import util
15 from twisted.trial.unittest import TestCase
16 from twisted.internet.defer import Deferred, succeed, gatherResults
17 from twisted.internet.task import Clock
18 from twisted.internet.address import IPv4Address
19 from twisted.internet.interfaces import IReactorUDP, IUDPTransport
20 from twisted.names.root import Resolver, lookupNameservers, lookupAddress
21 from twisted.names.root import extractAuthority, discoverAuthority, retry
22 from twisted.names.dns import IN, HS, A, NS, CNAME, OK, ENAME, Record_CNAME
23 from twisted.names.dns import Query, Message, RRHeader, Record_A, Record_NS
24 from twisted.names.error import DNSNameError, ResolverError
27 class MemoryDatagramTransport(object):
29 This L{IUDPTransport} implementation enforces the usual connection rules
30 and captures sent traffic in a list for later inspection.
32 @ivar _host: The host address to which this transport is bound.
33 @ivar _protocol: The protocol connected to this transport.
34 @ivar _sentPackets: A C{list} of two-tuples of the datagrams passed to
35 C{write} and the addresses to which they are destined.
37 @ivar _connectedTo: C{None} if this transport is unconnected, otherwise an
38 address to which all traffic is supposedly sent.
40 @ivar _maxPacketSize: An C{int} giving the maximum length of a datagram
41 which will be successfully handled by C{write}.
43 implements(IUDPTransport)
45 def __init__(self, host, protocol, maxPacketSize):
47 self._protocol = protocol
48 self._sentPackets = []
49 self._connectedTo = None
50 self._maxPacketSize = maxPacketSize
55 Return the address which this transport is pretending to be bound
58 return IPv4Address('UDP', *self._host)
61 def connect(self, host, port):
63 Connect this transport to the given address.
65 if self._connectedTo is not None:
66 raise ValueError("Already connected")
67 self._connectedTo = (host, port)
70 def write(self, datagram, addr=None):
72 Send the given datagram.
75 addr = self._connectedTo
77 raise ValueError("Need an address")
78 if len(datagram) > self._maxPacketSize:
79 raise ValueError("Packet too big")
80 self._sentPackets.append((datagram, addr))
83 def stopListening(self):
85 Shut down this transport.
87 self._protocol.stopProtocol()
90 verifyClass(IUDPTransport, MemoryDatagramTransport)
94 class MemoryReactor(Clock):
96 An L{IReactorTime} and L{IReactorUDP} provider.
98 Time is controlled deterministically via the base class, L{Clock}. UDP is
99 handled in-memory by connecting protocols to instances of
100 L{MemoryDatagramTransport}.
102 @ivar udpPorts: A C{dict} mapping port numbers to instances of
103 L{MemoryDatagramTransport}.
105 implements(IReactorUDP)
112 def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
114 Pretend to bind a UDP port and connect the given protocol to it.
118 port = randrange(1, 2 ** 16)
119 if port not in self.udpPorts:
121 if port in self.udpPorts:
122 raise ValueError("Address in use")
123 transport = MemoryDatagramTransport(
124 (interface, port), protocol, maxPacketSize)
125 self.udpPorts[port] = transport
126 protocol.makeConnection(transport)
129 verifyClass(IReactorUDP, MemoryReactor)
133 class RootResolverTests(TestCase):
135 Tests for L{twisted.names.root.Resolver}.
137 def _queryTest(self, filter):
139 Invoke L{Resolver._query} and verify that it sends the correct DNS
140 query. Deliver a canned response to the query and return whatever the
141 L{Deferred} returned by L{Resolver._query} fires with.
143 @param filter: The value to pass for the C{filter} parameter to
146 reactor = MemoryReactor()
147 resolver = Resolver([], reactor=reactor)
149 Query('foo.example.com', A, IN), [('1.1.2.3', 1053)], (30,),
152 # A UDP port should have been started.
153 portNumber, transport = reactor.udpPorts.popitem()
155 # And a DNS packet sent.
156 [(packet, address)] = transport._sentPackets
161 # It should be a query with the parameters used above.
162 self.assertEqual(msg.queries, [Query('foo.example.com', A, IN)])
163 self.assertEqual(msg.answers, [])
164 self.assertEqual(msg.authority, [])
165 self.assertEqual(msg.additional, [])
168 d.addCallback(response.append)
169 self.assertEqual(response, [])
171 # Once a reply is received, the Deferred should fire.
174 msg.answers.append(RRHeader('foo.example.com', payload=Record_A('5.8.13.21')))
175 transport._protocol.datagramReceived(msg.toStr(), ('1.1.2.3', 1053))
179 def test_filteredQuery(self):
181 L{Resolver._query} accepts a L{Query} instance and an address, issues
182 the query, and returns a L{Deferred} which fires with the response to
183 the query. If a true value is passed for the C{filter} parameter, the
184 result is a three-tuple of lists of records.
186 answer, authority, additional = self._queryTest(True)
189 [RRHeader('foo.example.com', payload=Record_A('5.8.13.21', ttl=0))])
190 self.assertEqual(authority, [])
191 self.assertEqual(additional, [])
194 def test_unfilteredQuery(self):
196 Similar to L{test_filteredQuery}, but for the case where a false value
197 is passed for the C{filter} parameter. In this case, the result is a
200 message = self._queryTest(False)
201 self.assertIsInstance(message, Message)
202 self.assertEqual(message.queries, [])
205 [RRHeader('foo.example.com', payload=Record_A('5.8.13.21', ttl=0))])
206 self.assertEqual(message.authority, [])
207 self.assertEqual(message.additional, [])
210 def _respond(self, answers=[], authority=[], additional=[], rCode=OK):
212 Create a L{Message} suitable for use as a response to a query.
214 @param answers: A C{list} of two-tuples giving data for the answers
215 section of the message. The first element of each tuple is a name
216 for the L{RRHeader}. The second element is the payload.
217 @param authority: A C{list} like C{answers}, but for the authority
218 section of the response.
219 @param additional: A C{list} like C{answers}, but for the
220 additional section of the response.
221 @param rCode: The response code the message will be created with.
223 @return: A new L{Message} initialized with the given values.
225 response = Message(rCode=rCode)
226 for (section, data) in [(response.answers, answers),
227 (response.authority, authority),
228 (response.additional, additional)]:
230 RRHeader(name, record.TYPE, getattr(record, 'CLASS', IN),
232 for (name, record) in data])
236 def _getResolver(self, serverResponses, maximumQueries=10):
238 Create and return a new L{root.Resolver} modified to resolve queries
239 against the record data represented by C{servers}.
241 @param serverResponses: A mapping from dns server addresses to
242 mappings. The inner mappings are from query two-tuples (name,
243 type) to dictionaries suitable for use as **arguments to
244 L{_respond}. See that method for details.
247 resolver = Resolver(roots, maximumQueries)
249 def query(query, serverAddresses, timeout, filter):
250 msg("Query for QNAME %s at %r" % (query.name, serverAddresses))
251 for addr in serverAddresses:
253 server = serverResponses[addr]
256 records = server[str(query.name), query.type]
257 return succeed(self._respond(**records))
258 resolver._query = query
262 def test_lookupAddress(self):
264 L{root.Resolver.lookupAddress} looks up the I{A} records for the
265 specified hostname by first querying one of the root servers the
266 resolver was created with and then following the authority delegations
267 until a result is received.
271 ('foo.example.com', A): {
272 'authority': [('foo.example.com', Record_NS('ns1.example.com'))],
273 'additional': [('ns1.example.com', Record_A('34.55.89.144'))],
276 ('34.55.89.144', 53): {
277 ('foo.example.com', A): {
278 'answers': [('foo.example.com', Record_A('10.0.0.1'))],
282 resolver = self._getResolver(servers)
283 d = resolver.lookupAddress('foo.example.com')
284 d.addCallback(lambda (ans, auth, add): ans[0].payload.dottedQuad())
285 d.addCallback(self.assertEqual, '10.0.0.1')
289 def test_lookupChecksClass(self):
291 If a response includes a record with a class different from the one
292 in the query, it is ignored and lookup continues until a record with
293 the right class is found.
295 badClass = Record_A('10.0.0.1')
299 ('foo.example.com', A): {
300 'answers': [('foo.example.com', badClass)],
301 'authority': [('foo.example.com', Record_NS('ns1.example.com'))],
302 'additional': [('ns1.example.com', Record_A('10.0.0.2'))],
306 ('foo.example.com', A): {
307 'answers': [('foo.example.com', Record_A('10.0.0.3'))],
311 resolver = self._getResolver(servers)
312 d = resolver.lookupAddress('foo.example.com')
313 d.addCallback(lambda (ans, auth, add): ans[0].payload)
314 d.addCallback(self.assertEqual, Record_A('10.0.0.3'))
318 def test_missingGlue(self):
320 If an intermediate response includes no glue records for the
321 authorities, separate queries are made to find those addresses.
325 ('foo.example.com', A): {
326 'authority': [('foo.example.com', Record_NS('ns1.example.org'))],
327 # Conspicuous lack of an additional section naming ns1.example.com
329 ('ns1.example.org', A): {
330 'answers': [('ns1.example.org', Record_A('10.0.0.1'))],
334 ('foo.example.com', A): {
335 'answers': [('foo.example.com', Record_A('10.0.0.2'))],
339 resolver = self._getResolver(servers)
340 d = resolver.lookupAddress('foo.example.com')
341 d.addCallback(lambda (ans, auth, add): ans[0].payload.dottedQuad())
342 d.addCallback(self.assertEqual, '10.0.0.2')
346 def test_missingName(self):
348 If a name is missing, L{Resolver.lookupAddress} returns a L{Deferred}
349 which fails with L{DNSNameError}.
353 ('foo.example.com', A): {
358 resolver = self._getResolver(servers)
359 d = resolver.lookupAddress('foo.example.com')
360 return self.assertFailure(d, DNSNameError)
363 def test_answerless(self):
365 If a query is responded to with no answers or nameserver records, the
366 L{Deferred} returned by L{Resolver.lookupAddress} fires with
371 ('example.com', A): {
375 resolver = self._getResolver(servers)
376 d = resolver.lookupAddress('example.com')
377 return self.assertFailure(d, ResolverError)
380 def test_delegationLookupError(self):
382 If there is an error resolving the nameserver in a delegation response,
383 the L{Deferred} returned by L{Resolver.lookupAddress} fires with that
388 ('example.com', A): {
389 'authority': [('example.com', Record_NS('ns1.example.com'))],
391 ('ns1.example.com', A): {
396 resolver = self._getResolver(servers)
397 d = resolver.lookupAddress('example.com')
398 return self.assertFailure(d, DNSNameError)
401 def test_delegationLookupEmpty(self):
403 If there are no records in the response to a lookup of a delegation
404 nameserver, the L{Deferred} returned by L{Resolver.lookupAddress} fires
405 with L{ResolverError}.
409 ('example.com', A): {
410 'authority': [('example.com', Record_NS('ns1.example.com'))],
412 ('ns1.example.com', A): {
416 resolver = self._getResolver(servers)
417 d = resolver.lookupAddress('example.com')
418 return self.assertFailure(d, ResolverError)
421 def test_lookupNameservers(self):
423 L{Resolver.lookupNameservers} is like L{Resolver.lookupAddress}, except
424 it queries for I{NS} records instead of I{A} records.
428 ('example.com', A): {
431 ('example.com', NS): {
432 'answers': [('example.com', Record_NS('ns1.example.com'))],
436 resolver = self._getResolver(servers)
437 d = resolver.lookupNameservers('example.com')
438 d.addCallback(lambda (ans, auth, add): str(ans[0].payload.name))
439 d.addCallback(self.assertEqual, 'ns1.example.com')
443 def test_returnCanonicalName(self):
445 If a I{CNAME} record is encountered as the answer to a query for
446 another record type, that record is returned as the answer.
450 ('example.com', A): {
451 'answers': [('example.com', Record_CNAME('example.net')),
452 ('example.net', Record_A('10.0.0.7'))],
456 resolver = self._getResolver(servers)
457 d = resolver.lookupAddress('example.com')
458 d.addCallback(lambda (ans, auth, add): ans)
461 [RRHeader('example.com', CNAME, payload=Record_CNAME('example.net')),
462 RRHeader('example.net', A, payload=Record_A('10.0.0.7'))])
466 def test_followCanonicalName(self):
468 If no record of the requested type is included in a response, but a
469 I{CNAME} record for the query name is included, queries are made to
470 resolve the value of the I{CNAME}.
474 ('example.com', A): {
475 'answers': [('example.com', Record_CNAME('example.net'))],
477 ('example.net', A): {
478 'answers': [('example.net', Record_A('10.0.0.5'))],
482 resolver = self._getResolver(servers)
483 d = resolver.lookupAddress('example.com')
484 d.addCallback(lambda (ans, auth, add): ans)
487 [RRHeader('example.com', CNAME, payload=Record_CNAME('example.net')),
488 RRHeader('example.net', A, payload=Record_A('10.0.0.5'))])
492 def test_detectCanonicalNameLoop(self):
494 If there is a cycle between I{CNAME} records in a response, this is
495 detected and the L{Deferred} returned by the lookup method fails
496 with L{ResolverError}.
500 ('example.com', A): {
501 'answers': [('example.com', Record_CNAME('example.net')),
502 ('example.net', Record_CNAME('example.com'))],
506 resolver = self._getResolver(servers)
507 d = resolver.lookupAddress('example.com')
508 return self.assertFailure(d, ResolverError)
511 def test_boundedQueries(self):
513 L{Resolver.lookupAddress} won't issue more queries following
514 delegations than the limit passed to its initializer.
518 # First query - force it to start over with a name lookup of
520 ('example.com', A): {
521 'authority': [('example.com', Record_NS('ns1.example.com'))],
523 # Second query - let it resume the original lookup with the
524 # address of the nameserver handling the delegation.
525 ('ns1.example.com', A): {
526 'answers': [('ns1.example.com', Record_A('10.0.0.2'))],
530 # Third query - let it jump straight to asking the
531 # delegation server by including its address here (different
532 # case from the first query).
533 ('example.com', A): {
534 'authority': [('example.com', Record_NS('ns2.example.com'))],
535 'additional': [('ns2.example.com', Record_A('10.0.0.3'))],
539 # Fourth query - give it the answer, we're done.
540 ('example.com', A): {
541 'answers': [('example.com', Record_A('10.0.0.4'))],
546 # Make two resolvers. One which is allowed to make 3 queries
547 # maximum, and so will fail, and on which may make 4, and so should
549 failer = self._getResolver(servers, 3)
550 failD = self.assertFailure(
551 failer.lookupAddress('example.com'), ResolverError)
553 succeeder = self._getResolver(servers, 4)
554 succeedD = succeeder.lookupAddress('example.com')
555 succeedD.addCallback(lambda (ans, auth, add): ans[0].payload)
556 succeedD.addCallback(self.assertEqual, Record_A('10.0.0.4'))
558 return gatherResults([failD, succeedD])
561 def test_discoveredAuthorityDeprecated(self):
563 Calling L{Resolver.discoveredAuthority} produces a deprecation warning.
565 resolver = Resolver([])
566 d = resolver.discoveredAuthority('127.0.0.1', 'example.com', IN, A, (0,))
568 warnings = self.flushWarnings([
569 self.test_discoveredAuthorityDeprecated])
570 self.assertEqual(warnings[0]['category'], DeprecationWarning)
572 warnings[0]['message'],
573 'twisted.names.root.Resolver.discoveredAuthority is deprecated since '
574 'Twisted 10.0. Use twisted.names.client.Resolver directly, instead.')
575 self.assertEqual(len(warnings), 1)
577 # This will time out quickly, but we need to wait for it because there
578 # are resources associated with.
579 d.addErrback(lambda ignored: None)
584 class StubDNSDatagramProtocol:
586 A do-nothing stand-in for L{DNSDatagramProtocol} which can be used to avoid
587 network traffic in tests where that kind of thing doesn't matter.
589 def query(self, *a, **kw):
594 _retrySuppression = util.suppress(
595 category=DeprecationWarning,
597 'twisted.names.root.retry is deprecated since Twisted 10.0. Use a '
598 'Resolver object for retry logic.'))
601 class DiscoveryToolsTests(TestCase):
603 Tests for the free functions in L{twisted.names.root} which help out with
604 authority discovery. Since these are mostly deprecated, these are mostly
607 def test_lookupNameserversDeprecated(self):
609 Calling L{root.lookupNameservers} produces a deprecation warning.
611 # Don't care about the return value, since it will never have a result,
612 # since StubDNSDatagramProtocol doesn't actually work.
613 lookupNameservers('example.com', '127.0.0.1', StubDNSDatagramProtocol())
615 warnings = self.flushWarnings([
616 self.test_lookupNameserversDeprecated])
617 self.assertEqual(warnings[0]['category'], DeprecationWarning)
619 warnings[0]['message'],
620 'twisted.names.root.lookupNameservers is deprecated since Twisted '
621 '10.0. Use twisted.names.root.Resolver.lookupNameservers '
623 self.assertEqual(len(warnings), 1)
624 test_lookupNameserversDeprecated.suppress = [_retrySuppression]
627 def test_lookupAddressDeprecated(self):
629 Calling L{root.lookupAddress} produces a deprecation warning.
631 # Don't care about the return value, since it will never have a result,
632 # since StubDNSDatagramProtocol doesn't actually work.
633 lookupAddress('example.com', '127.0.0.1', StubDNSDatagramProtocol())
635 warnings = self.flushWarnings([
636 self.test_lookupAddressDeprecated])
637 self.assertEqual(warnings[0]['category'], DeprecationWarning)
639 warnings[0]['message'],
640 'twisted.names.root.lookupAddress is deprecated since Twisted '
641 '10.0. Use twisted.names.root.Resolver.lookupAddress '
643 self.assertEqual(len(warnings), 1)
644 test_lookupAddressDeprecated.suppress = [_retrySuppression]
647 def test_extractAuthorityDeprecated(self):
649 Calling L{root.extractAuthority} produces a deprecation warning.
651 extractAuthority(Message(), {})
653 warnings = self.flushWarnings([
654 self.test_extractAuthorityDeprecated])
655 self.assertEqual(warnings[0]['category'], DeprecationWarning)
657 warnings[0]['message'],
658 'twisted.names.root.extractAuthority is deprecated since Twisted '
659 '10.0. Please inspect the Message object directly.')
660 self.assertEqual(len(warnings), 1)
663 def test_discoverAuthorityDeprecated(self):
665 Calling L{root.discoverAuthority} produces a deprecation warning.
668 'example.com', ['10.0.0.1'], p=StubDNSDatagramProtocol())
670 warnings = self.flushWarnings([
671 self.test_discoverAuthorityDeprecated])
672 self.assertEqual(warnings[0]['category'], DeprecationWarning)
674 warnings[0]['message'],
675 'twisted.names.root.discoverAuthority is deprecated since Twisted '
676 '10.0. Use twisted.names.root.Resolver.lookupNameservers '
678 self.assertEqual(len(warnings), 1)
680 # discoverAuthority is implemented in terms of deprecated functions,
682 test_discoverAuthorityDeprecated.suppress = [
684 category=DeprecationWarning,
686 'twisted.names.root.lookupNameservers is deprecated since '
688 'twisted.names.root.Resolver.lookupNameservers instead.')),
692 def test_retryDeprecated(self):
694 Calling L{root.retry} produces a deprecation warning.
696 retry([0], StubDNSDatagramProtocol())
698 warnings = self.flushWarnings([
699 self.test_retryDeprecated])
700 self.assertEqual(warnings[0]['category'], DeprecationWarning)
702 warnings[0]['message'],
703 'twisted.names.root.retry is deprecated since Twisted '
704 '10.0. Use a Resolver object for retry logic.')
705 self.assertEqual(len(warnings), 1)