Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / names / test / test_names.py
1 # -*- test-case-name: twisted.names.test.test_names -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Test cases for twisted.names.
7 """
8
9 import socket, operator, copy
10 from StringIO import StringIO
11
12 from twisted.trial import unittest
13
14 from twisted.internet import reactor, defer, error
15 from twisted.internet.task import Clock
16 from twisted.internet.defer import succeed
17 from twisted.names import client, server, common, authority, dns
18 from twisted.python import failure
19 from twisted.names.error import DNSFormatError, DNSServerError, DNSNameError
20 from twisted.names.error import DNSNotImplementedError, DNSQueryRefusedError
21 from twisted.names.error import DNSUnknownError
22 from twisted.names.dns import EFORMAT, ESERVER, ENAME, ENOTIMP, EREFUSED
23 from twisted.names.dns import Message
24 from twisted.names.client import Resolver
25 from twisted.names.secondary import (
26     SecondaryAuthorityService, SecondaryAuthority)
27 from twisted.names.test.test_client import StubPort
28
29 from twisted.python.compat import reduce
30 from twisted.test.proto_helpers import StringTransport, MemoryReactor
31
32 def justPayload(results):
33     return [r.payload for r in results[0]]
34
35 class NoFileAuthority(authority.FileAuthority):
36     def __init__(self, soa, records):
37         # Yes, skip FileAuthority
38         common.ResolverBase.__init__(self)
39         self.soa, self.records = soa, records
40
41
42 soa_record = dns.Record_SOA(
43                     mname = 'test-domain.com',
44                     rname = 'root.test-domain.com',
45                     serial = 100,
46                     refresh = 1234,
47                     minimum = 7654,
48                     expire = 19283784,
49                     retry = 15,
50                     ttl=1
51                 )
52
53 reverse_soa = dns.Record_SOA(
54                      mname = '93.84.28.in-addr.arpa',
55                      rname = '93.84.28.in-addr.arpa',
56                      serial = 120,
57                      refresh = 54321,
58                      minimum = 382,
59                      expire = 11193983,
60                      retry = 30,
61                      ttl=3
62                 )
63
64 my_soa = dns.Record_SOA(
65     mname = 'my-domain.com',
66     rname = 'postmaster.test-domain.com',
67     serial = 130,
68     refresh = 12345,
69     minimum = 1,
70     expire = 999999,
71     retry = 100,
72     )
73
74 test_domain_com = NoFileAuthority(
75     soa = ('test-domain.com', soa_record),
76     records = {
77         'test-domain.com': [
78             soa_record,
79             dns.Record_A('127.0.0.1'),
80             dns.Record_NS('39.28.189.39'),
81             dns.Record_SPF('v=spf1 mx/30 mx:example.org/30 -all'),
82             dns.Record_SPF('v=spf1 +mx a:\0colo', '.example.com/28 -all not valid'),
83             dns.Record_MX(10, 'host.test-domain.com'),
84             dns.Record_HINFO(os='Linux', cpu='A Fast One, Dontcha know'),
85             dns.Record_CNAME('canonical.name.com'),
86             dns.Record_MB('mailbox.test-domain.com'),
87             dns.Record_MG('mail.group.someplace'),
88             dns.Record_TXT('A First piece of Text', 'a SecoNd piece'),
89             dns.Record_A6(0, 'ABCD::4321', ''),
90             dns.Record_A6(12, '0:0069::0', 'some.network.tld'),
91             dns.Record_A6(8, '0:5634:1294:AFCB:56AC:48EF:34C3:01FF', 'tra.la.la.net'),
92             dns.Record_TXT('Some more text, haha!  Yes.  \0  Still here?'),
93             dns.Record_MR('mail.redirect.or.whatever'),
94             dns.Record_MINFO(rmailbx='r mail box', emailbx='e mail box'),
95             dns.Record_AFSDB(subtype=1, hostname='afsdb.test-domain.com'),
96             dns.Record_RP(mbox='whatever.i.dunno', txt='some.more.text'),
97             dns.Record_WKS('12.54.78.12', socket.IPPROTO_TCP,
98                            '\x12\x01\x16\xfe\xc1\x00\x01'),
99             dns.Record_NAPTR(100, 10, "u", "sip+E2U",
100                              "!^.*$!sip:information@domain.tld!"),
101             dns.Record_AAAA('AF43:5634:1294:AFCB:56AC:48EF:34C3:01FF')],
102         'http.tcp.test-domain.com': [
103             dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool')
104         ],
105         'host.test-domain.com': [
106             dns.Record_A('123.242.1.5'),
107             dns.Record_A('0.255.0.255'),
108         ],
109         'host-two.test-domain.com': [
110 #
111 #  Python bug
112 #           dns.Record_A('255.255.255.255'),
113 #
114             dns.Record_A('255.255.255.254'),
115             dns.Record_A('0.0.0.0')
116         ],
117         'cname.test-domain.com': [
118             dns.Record_CNAME('test-domain.com')
119         ],
120         'anothertest-domain.com': [
121             dns.Record_A('1.2.3.4')],
122     }
123 )
124
125 reverse_domain = NoFileAuthority(
126     soa = ('93.84.28.in-addr.arpa', reverse_soa),
127     records = {
128         '123.93.84.28.in-addr.arpa': [
129              dns.Record_PTR('test.host-reverse.lookup.com'),
130              reverse_soa
131         ]
132     }
133 )
134
135
136 my_domain_com = NoFileAuthority(
137     soa = ('my-domain.com', my_soa),
138     records = {
139         'my-domain.com': [
140             my_soa,
141             dns.Record_A('1.2.3.4', ttl='1S'),
142             dns.Record_NS('ns1.domain', ttl='2M'),
143             dns.Record_NS('ns2.domain', ttl='3H'),
144             dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl='4D')
145             ]
146         }
147     )
148
149
150 class ServerDNSTestCase(unittest.TestCase):
151     """
152     Test cases for DNS server and client.
153     """
154
155     def setUp(self):
156         self.factory = server.DNSServerFactory([
157             test_domain_com, reverse_domain, my_domain_com
158         ], verbose=2)
159
160         p = dns.DNSDatagramProtocol(self.factory)
161
162         while 1:
163             listenerTCP = reactor.listenTCP(0, self.factory, interface="127.0.0.1")
164             # It's simpler to do the stop listening with addCleanup,
165             # even though we might not end up using this TCP port in
166             # the test (if the listenUDP below fails).  Cleaning up
167             # this TCP port sooner than "cleanup time" would mean
168             # adding more code to keep track of the Deferred returned
169             # by stopListening.
170             self.addCleanup(listenerTCP.stopListening)
171             port = listenerTCP.getHost().port
172
173             try:
174                 listenerUDP = reactor.listenUDP(port, p, interface="127.0.0.1")
175             except error.CannotListenError:
176                 pass
177             else:
178                 self.addCleanup(listenerUDP.stopListening)
179                 break
180
181         self.listenerTCP = listenerTCP
182         self.listenerUDP = listenerUDP
183         self.resolver = client.Resolver(servers=[('127.0.0.1', port)])
184
185
186     def tearDown(self):
187         """
188         Clean up any server connections associated with the
189         L{DNSServerFactory} created in L{setUp}
190         """
191         # It'd be great if DNSServerFactory had a method that
192         # encapsulated this task.  At least the necessary data is
193         # available, though.
194         for conn in self.factory.connections[:]:
195             conn.transport.loseConnection()
196
197
198     def namesTest(self, d, r):
199         self.response = None
200         def setDone(response):
201             self.response = response
202
203         def checkResults(ignored):
204             if isinstance(self.response, failure.Failure):
205                 raise self.response
206             results = justPayload(self.response)
207             assert len(results) == len(r), "%s != %s" % (map(str, results), map(str, r))
208             for rec in results:
209                 assert rec in r, "%s not in %s" % (rec, map(str, r))
210
211         d.addBoth(setDone)
212         d.addCallback(checkResults)
213         return d
214
215     def testAddressRecord1(self):
216         """Test simple DNS 'A' record queries"""
217         return self.namesTest(
218             self.resolver.lookupAddress('test-domain.com'),
219             [dns.Record_A('127.0.0.1', ttl=19283784)]
220         )
221
222
223     def testAddressRecord2(self):
224         """Test DNS 'A' record queries with multiple answers"""
225         return self.namesTest(
226             self.resolver.lookupAddress('host.test-domain.com'),
227             [dns.Record_A('123.242.1.5', ttl=19283784), dns.Record_A('0.255.0.255', ttl=19283784)]
228         )
229
230
231     def testAddressRecord3(self):
232         """Test DNS 'A' record queries with edge cases"""
233         return self.namesTest(
234             self.resolver.lookupAddress('host-two.test-domain.com'),
235             [dns.Record_A('255.255.255.254', ttl=19283784), dns.Record_A('0.0.0.0', ttl=19283784)]
236         )
237
238
239     def testAuthority(self):
240         """Test DNS 'SOA' record queries"""
241         return self.namesTest(
242             self.resolver.lookupAuthority('test-domain.com'),
243             [soa_record]
244         )
245
246
247     def testMailExchangeRecord(self):
248         """Test DNS 'MX' record queries"""
249         return self.namesTest(
250             self.resolver.lookupMailExchange('test-domain.com'),
251             [dns.Record_MX(10, 'host.test-domain.com', ttl=19283784)]
252         )
253
254
255     def testNameserver(self):
256         """Test DNS 'NS' record queries"""
257         return self.namesTest(
258             self.resolver.lookupNameservers('test-domain.com'),
259             [dns.Record_NS('39.28.189.39', ttl=19283784)]
260         )
261
262
263     def testHINFO(self):
264         """Test DNS 'HINFO' record queries"""
265         return self.namesTest(
266             self.resolver.lookupHostInfo('test-domain.com'),
267             [dns.Record_HINFO(os='Linux', cpu='A Fast One, Dontcha know', ttl=19283784)]
268         )
269
270     def testPTR(self):
271         """Test DNS 'PTR' record queries"""
272         return self.namesTest(
273             self.resolver.lookupPointer('123.93.84.28.in-addr.arpa'),
274             [dns.Record_PTR('test.host-reverse.lookup.com', ttl=11193983)]
275         )
276
277
278     def testCNAME(self):
279         """Test DNS 'CNAME' record queries"""
280         return self.namesTest(
281             self.resolver.lookupCanonicalName('test-domain.com'),
282             [dns.Record_CNAME('canonical.name.com', ttl=19283784)]
283         )
284
285     def testCNAMEAdditional(self):
286         """Test additional processing for CNAME records"""
287         return self.namesTest(
288         self.resolver.lookupAddress('cname.test-domain.com'),
289         [dns.Record_CNAME('test-domain.com', ttl=19283784), dns.Record_A('127.0.0.1', ttl=19283784)]
290     )
291
292     def testMB(self):
293         """Test DNS 'MB' record queries"""
294         return self.namesTest(
295             self.resolver.lookupMailBox('test-domain.com'),
296             [dns.Record_MB('mailbox.test-domain.com', ttl=19283784)]
297         )
298
299
300     def testMG(self):
301         """Test DNS 'MG' record queries"""
302         return self.namesTest(
303             self.resolver.lookupMailGroup('test-domain.com'),
304             [dns.Record_MG('mail.group.someplace', ttl=19283784)]
305         )
306
307
308     def testMR(self):
309         """Test DNS 'MR' record queries"""
310         return self.namesTest(
311             self.resolver.lookupMailRename('test-domain.com'),
312             [dns.Record_MR('mail.redirect.or.whatever', ttl=19283784)]
313         )
314
315
316     def testMINFO(self):
317         """Test DNS 'MINFO' record queries"""
318         return self.namesTest(
319             self.resolver.lookupMailboxInfo('test-domain.com'),
320             [dns.Record_MINFO(rmailbx='r mail box', emailbx='e mail box', ttl=19283784)]
321         )
322
323
324     def testSRV(self):
325         """Test DNS 'SRV' record queries"""
326         return self.namesTest(
327             self.resolver.lookupService('http.tcp.test-domain.com'),
328             [dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl=19283784)]
329         )
330
331     def testAFSDB(self):
332         """Test DNS 'AFSDB' record queries"""
333         return self.namesTest(
334             self.resolver.lookupAFSDatabase('test-domain.com'),
335             [dns.Record_AFSDB(subtype=1, hostname='afsdb.test-domain.com', ttl=19283784)]
336         )
337
338
339     def testRP(self):
340         """Test DNS 'RP' record queries"""
341         return self.namesTest(
342             self.resolver.lookupResponsibility('test-domain.com'),
343             [dns.Record_RP(mbox='whatever.i.dunno', txt='some.more.text', ttl=19283784)]
344         )
345
346
347     def testTXT(self):
348         """Test DNS 'TXT' record queries"""
349         return self.namesTest(
350             self.resolver.lookupText('test-domain.com'),
351             [dns.Record_TXT('A First piece of Text', 'a SecoNd piece', ttl=19283784),
352              dns.Record_TXT('Some more text, haha!  Yes.  \0  Still here?', ttl=19283784)]
353         )
354
355
356     def test_spf(self):
357         """
358         L{DNSServerFactory} can serve I{SPF} resource records.
359         """
360         return self.namesTest(
361             self.resolver.lookupSenderPolicy('test-domain.com'),
362             [dns.Record_SPF('v=spf1 mx/30 mx:example.org/30 -all', ttl=19283784),
363             dns.Record_SPF('v=spf1 +mx a:\0colo', '.example.com/28 -all not valid', ttl=19283784)]
364         )
365
366
367     def testWKS(self):
368         """Test DNS 'WKS' record queries"""
369         return self.namesTest(
370             self.resolver.lookupWellKnownServices('test-domain.com'),
371             [dns.Record_WKS('12.54.78.12', socket.IPPROTO_TCP, '\x12\x01\x16\xfe\xc1\x00\x01', ttl=19283784)]
372         )
373
374
375     def testSomeRecordsWithTTLs(self):
376         result_soa = copy.copy(my_soa)
377         result_soa.ttl = my_soa.expire
378         return self.namesTest(
379             self.resolver.lookupAllRecords('my-domain.com'),
380             [result_soa,
381              dns.Record_A('1.2.3.4', ttl='1S'),
382              dns.Record_NS('ns1.domain', ttl='2M'),
383              dns.Record_NS('ns2.domain', ttl='3H'),
384              dns.Record_SRV(257, 16383, 43690, 'some.other.place.fool', ttl='4D')]
385             )
386
387
388     def testAAAA(self):
389         """Test DNS 'AAAA' record queries (IPv6)"""
390         return self.namesTest(
391             self.resolver.lookupIPV6Address('test-domain.com'),
392             [dns.Record_AAAA('AF43:5634:1294:AFCB:56AC:48EF:34C3:01FF', ttl=19283784)]
393         )
394
395     def testA6(self):
396         """Test DNS 'A6' record queries (IPv6)"""
397         return self.namesTest(
398             self.resolver.lookupAddress6('test-domain.com'),
399             [dns.Record_A6(0, 'ABCD::4321', '', ttl=19283784),
400              dns.Record_A6(12, '0:0069::0', 'some.network.tld', ttl=19283784),
401              dns.Record_A6(8, '0:5634:1294:AFCB:56AC:48EF:34C3:01FF', 'tra.la.la.net', ttl=19283784)]
402          )
403
404
405     def test_zoneTransfer(self):
406         """
407         Test DNS 'AXFR' queries (Zone transfer)
408         """
409         default_ttl = soa_record.expire
410         results = [copy.copy(r) for r in reduce(operator.add, test_domain_com.records.values())]
411         for r in results:
412             if r.ttl is None:
413                 r.ttl = default_ttl
414         return self.namesTest(
415             self.resolver.lookupZone('test-domain.com').addCallback(lambda r: (r[0][:-1],)),
416             results
417         )
418
419
420     def testSimilarZonesDontInterfere(self):
421         """Tests that unrelated zones don't mess with each other."""
422         return self.namesTest(
423             self.resolver.lookupAddress("anothertest-domain.com"),
424             [dns.Record_A('1.2.3.4', ttl=19283784)]
425         )
426
427
428     def test_NAPTR(self):
429         """
430         Test DNS 'NAPTR' record queries.
431         """
432         return self.namesTest(
433             self.resolver.lookupNamingAuthorityPointer('test-domain.com'),
434             [dns.Record_NAPTR(100, 10, "u", "sip+E2U",
435                               "!^.*$!sip:information@domain.tld!",
436                               ttl=19283784)])
437
438
439
440 class DNSServerFactoryTests(unittest.TestCase):
441     """
442     Tests for L{server.DNSServerFactory}.
443     """
444     def _messageReceivedTest(self, methodName, message):
445         """
446         Assert that the named method is called with the given message when
447         it is passed to L{DNSServerFactory.messageReceived}.
448         """
449         # Make it appear to have some queries so that
450         # DNSServerFactory.allowQuery allows it.
451         message.queries = [None]
452
453         receivedMessages = []
454         def fakeHandler(message, protocol, address):
455             receivedMessages.append((message, protocol, address))
456
457         class FakeProtocol(object):
458             def writeMessage(self, message):
459                 pass
460
461         protocol = FakeProtocol()
462         factory = server.DNSServerFactory(None)
463         setattr(factory, methodName, fakeHandler)
464         factory.messageReceived(message, protocol)
465         self.assertEqual(receivedMessages, [(message, protocol, None)])
466
467
468     def test_notifyMessageReceived(self):
469         """
470         L{DNSServerFactory.messageReceived} passes messages with an opcode
471         of C{OP_NOTIFY} on to L{DNSServerFactory.handleNotify}.
472         """
473         # RFC 1996, section 4.5
474         opCode = 4
475         self._messageReceivedTest('handleNotify', Message(opCode=opCode))
476
477
478     def test_updateMessageReceived(self):
479         """
480         L{DNSServerFactory.messageReceived} passes messages with an opcode
481         of C{OP_UPDATE} on to L{DNSServerFactory.handleOther}.
482
483         This may change if the implementation ever covers update messages.
484         """
485         # RFC 2136, section 1.3
486         opCode = 5
487         self._messageReceivedTest('handleOther', Message(opCode=opCode))
488
489
490     def test_connectionTracking(self):
491         """
492         The C{connectionMade} and C{connectionLost} methods of
493         L{DNSServerFactory} cooperate to keep track of all
494         L{DNSProtocol} objects created by a factory which are
495         connected.
496         """
497         protoA, protoB = object(), object()
498         factory = server.DNSServerFactory()
499         factory.connectionMade(protoA)
500         self.assertEqual(factory.connections, [protoA])
501         factory.connectionMade(protoB)
502         self.assertEqual(factory.connections, [protoA, protoB])
503         factory.connectionLost(protoA)
504         self.assertEqual(factory.connections, [protoB])
505         factory.connectionLost(protoB)
506         self.assertEqual(factory.connections, [])
507
508
509 class HelperTestCase(unittest.TestCase):
510     def testSerialGenerator(self):
511         f = self.mktemp()
512         a = authority.getSerial(f)
513         for i in range(20):
514             b = authority.getSerial(f)
515             self.failUnless(a < b)
516             a = b
517
518
519 class AXFRTest(unittest.TestCase):
520     def setUp(self):
521         self.results = None
522         self.d = defer.Deferred()
523         self.d.addCallback(self._gotResults)
524         self.controller = client.AXFRController('fooby.com', self.d)
525
526         self.soa = dns.RRHeader(name='fooby.com', type=dns.SOA, cls=dns.IN, ttl=86400, auth=False,
527                                 payload=dns.Record_SOA(mname='fooby.com',
528                                                        rname='hooj.fooby.com',
529                                                        serial=100,
530                                                        refresh=200,
531                                                        retry=300,
532                                                        expire=400,
533                                                        minimum=500,
534                                                        ttl=600))
535
536         self.records = [
537             self.soa,
538             dns.RRHeader(name='fooby.com', type=dns.NS, cls=dns.IN, ttl=700, auth=False,
539                          payload=dns.Record_NS(name='ns.twistedmatrix.com', ttl=700)),
540
541             dns.RRHeader(name='fooby.com', type=dns.MX, cls=dns.IN, ttl=700, auth=False,
542                          payload=dns.Record_MX(preference=10, exchange='mail.mv3d.com', ttl=700)),
543
544             dns.RRHeader(name='fooby.com', type=dns.A, cls=dns.IN, ttl=700, auth=False,
545                          payload=dns.Record_A(address='64.123.27.105', ttl=700)),
546             self.soa
547             ]
548
549     def _makeMessage(self):
550         # hooray they all have the same message format
551         return dns.Message(id=999, answer=1, opCode=0, recDes=0, recAv=1, auth=1, rCode=0, trunc=0, maxSize=0)
552
553     def testBindAndTNamesStyle(self):
554         # Bind style = One big single message
555         m = self._makeMessage()
556         m.queries = [dns.Query('fooby.com', dns.AXFR, dns.IN)]
557         m.answers = self.records
558         self.controller.messageReceived(m, None)
559         self.assertEqual(self.results, self.records)
560
561     def _gotResults(self, result):
562         self.results = result
563
564     def testDJBStyle(self):
565         # DJB style = message per record
566         records = self.records[:]
567         while records:
568             m = self._makeMessage()
569             m.queries = [] # DJB *doesn't* specify any queries.. hmm..
570             m.answers = [records.pop(0)]
571             self.controller.messageReceived(m, None)
572         self.assertEqual(self.results, self.records)
573
574 class FakeDNSDatagramProtocol(object):
575     def __init__(self):
576         self.queries = []
577         self.transport = StubPort()
578
579     def query(self, address, queries, timeout=10, id=None):
580         self.queries.append((address, queries, timeout, id))
581         return defer.fail(dns.DNSQueryTimeoutError(queries))
582
583     def removeResend(self, id):
584         # Ignore this for the time being.
585         pass
586
587 class RetryLogic(unittest.TestCase):
588     testServers = [
589         '1.2.3.4',
590         '4.3.2.1',
591         'a.b.c.d',
592         'z.y.x.w']
593
594     def testRoundRobinBackoff(self):
595         addrs = [(x, 53) for x in self.testServers]
596         r = client.Resolver(resolv=None, servers=addrs)
597         r.protocol = proto = FakeDNSDatagramProtocol()
598         return r.lookupAddress("foo.example.com"
599             ).addCallback(self._cbRoundRobinBackoff
600             ).addErrback(self._ebRoundRobinBackoff, proto
601             )
602
603     def _cbRoundRobinBackoff(self, result):
604         raise unittest.FailTest("Lookup address succeeded, should have timed out")
605
606     def _ebRoundRobinBackoff(self, failure, fakeProto):
607         failure.trap(defer.TimeoutError)
608
609         # Assert that each server is tried with a particular timeout
610         # before the timeout is increased and the attempts are repeated.
611
612         for t in (1, 3, 11, 45):
613             tries = fakeProto.queries[:len(self.testServers)]
614             del fakeProto.queries[:len(self.testServers)]
615
616             tries.sort()
617             expected = list(self.testServers)
618             expected.sort()
619
620             for ((addr, query, timeout, id), expectedAddr) in zip(tries, expected):
621                 self.assertEqual(addr, (expectedAddr, 53))
622                 self.assertEqual(timeout, t)
623
624         self.failIf(fakeProto.queries)
625
626 class ResolvConfHandling(unittest.TestCase):
627     def testMissing(self):
628         resolvConf = self.mktemp()
629         r = client.Resolver(resolv=resolvConf)
630         self.assertEqual(r.dynServers, [('127.0.0.1', 53)])
631         r._parseCall.cancel()
632
633     def testEmpty(self):
634         resolvConf = self.mktemp()
635         fObj = file(resolvConf, 'w')
636         fObj.close()
637         r = client.Resolver(resolv=resolvConf)
638         self.assertEqual(r.dynServers, [('127.0.0.1', 53)])
639         r._parseCall.cancel()
640
641
642
643 class FilterAnswersTests(unittest.TestCase):
644     """
645     Test L{twisted.names.client.Resolver.filterAnswers}'s handling of various
646     error conditions it might encounter.
647     """
648     def setUp(self):
649         # Create a resolver pointed at an invalid server - we won't be hitting
650         # the network in any of these tests.
651         self.resolver = Resolver(servers=[('0.0.0.0', 0)])
652
653
654     def test_truncatedMessage(self):
655         """
656         Test that a truncated message results in an equivalent request made via
657         TCP.
658         """
659         m = Message(trunc=True)
660         m.addQuery('example.com')
661
662         def queryTCP(queries):
663             self.assertEqual(queries, m.queries)
664             response = Message()
665             response.answers = ['answer']
666             response.authority = ['authority']
667             response.additional = ['additional']
668             return succeed(response)
669         self.resolver.queryTCP = queryTCP
670         d = self.resolver.filterAnswers(m)
671         d.addCallback(
672             self.assertEqual, (['answer'], ['authority'], ['additional']))
673         return d
674
675
676     def _rcodeTest(self, rcode, exc):
677         m = Message(rCode=rcode)
678         err = self.resolver.filterAnswers(m)
679         err.trap(exc)
680
681
682     def test_formatError(self):
683         """
684         Test that a message with a result code of C{EFORMAT} results in a
685         failure wrapped around L{DNSFormatError}.
686         """
687         return self._rcodeTest(EFORMAT, DNSFormatError)
688
689
690     def test_serverError(self):
691         """
692         Like L{test_formatError} but for C{ESERVER}/L{DNSServerError}.
693         """
694         return self._rcodeTest(ESERVER, DNSServerError)
695
696
697     def test_nameError(self):
698         """
699         Like L{test_formatError} but for C{ENAME}/L{DNSNameError}.
700         """
701         return self._rcodeTest(ENAME, DNSNameError)
702
703
704     def test_notImplementedError(self):
705         """
706         Like L{test_formatError} but for C{ENOTIMP}/L{DNSNotImplementedError}.
707         """
708         return self._rcodeTest(ENOTIMP, DNSNotImplementedError)
709
710
711     def test_refusedError(self):
712         """
713         Like L{test_formatError} but for C{EREFUSED}/L{DNSQueryRefusedError}.
714         """
715         return self._rcodeTest(EREFUSED, DNSQueryRefusedError)
716
717
718     def test_refusedErrorUnknown(self):
719         """
720         Like L{test_formatError} but for an unrecognized error code and
721         L{DNSUnknownError}.
722         """
723         return self._rcodeTest(EREFUSED + 1, DNSUnknownError)
724
725
726
727 class AuthorityTests(unittest.TestCase):
728     """
729     Tests for the basic response record selection code in L{FileAuthority}
730     (independent of its fileness).
731     """
732     def test_recordMissing(self):
733         """
734         If a L{FileAuthority} has a zone which includes an I{NS} record for a
735         particular name and that authority is asked for another record for the
736         same name which does not exist, the I{NS} record is not included in the
737         authority section of the response.
738         """
739         authority = NoFileAuthority(
740             soa=(str(soa_record.mname), soa_record),
741             records={
742                 str(soa_record.mname): [
743                     soa_record,
744                     dns.Record_NS('1.2.3.4'),
745                     ]})
746         d = authority.lookupAddress(str(soa_record.mname))
747         result = []
748         d.addCallback(result.append)
749         answer, authority, additional = result[0]
750         self.assertEqual(answer, [])
751         self.assertEqual(
752             authority, [
753                 dns.RRHeader(
754                     str(soa_record.mname), soa_record.TYPE,
755                     ttl=soa_record.expire, payload=soa_record,
756                     auth=True)])
757         self.assertEqual(additional, [])
758
759
760     def _referralTest(self, method):
761         """
762         Create an authority and make a request against it.  Then verify that the
763         result is a referral, including no records in the answers or additional
764         sections, but with an I{NS} record in the authority section.
765         """
766         subdomain = 'example.' + str(soa_record.mname)
767         nameserver = dns.Record_NS('1.2.3.4')
768         authority = NoFileAuthority(
769             soa=(str(soa_record.mname), soa_record),
770             records={
771                 subdomain: [
772                     nameserver,
773                     ]})
774         d = getattr(authority, method)(subdomain)
775         result = []
776         d.addCallback(result.append)
777         answer, authority, additional = result[0]
778         self.assertEqual(answer, [])
779         self.assertEqual(
780             authority, [dns.RRHeader(
781                     subdomain, dns.NS, ttl=soa_record.expire,
782                     payload=nameserver, auth=False)])
783         self.assertEqual(additional, [])
784
785
786     def test_referral(self):
787         """
788         When an I{NS} record is found for a child zone, it is included in the
789         authority section of the response.  It is marked as non-authoritative if
790         the authority is not also authoritative for the child zone (RFC 2181,
791         section 6.1).
792         """
793         self._referralTest('lookupAddress')
794
795
796     def test_allRecordsReferral(self):
797         """
798         A referral is also generated for a request of type C{ALL_RECORDS}.
799         """
800         self._referralTest('lookupAllRecords')
801
802
803
804 class NoInitialResponseTestCase(unittest.TestCase):
805
806     def test_no_answer(self):
807         """
808         If a request returns a L{dns.NS} response, but we can't connect to the
809         given server, the request fails with the error returned at connection.
810         """
811
812         def query(self, *args):
813             # Pop from the message list, so that it blows up if more queries
814             # are run than expected.
815             return succeed(messages.pop(0))
816
817         def queryProtocol(self, *args, **kwargs):
818             return defer.fail(socket.gaierror("Couldn't connect"))
819
820         resolver = Resolver(servers=[('0.0.0.0', 0)])
821         resolver._query = query
822         messages = []
823         # Let's patch dns.DNSDatagramProtocol.query, as there is no easy way to
824         # customize it.
825         self.patch(dns.DNSDatagramProtocol, "query", queryProtocol)
826
827         records = [
828             dns.RRHeader(name='fooba.com', type=dns.NS, cls=dns.IN, ttl=700,
829                          auth=False,
830                          payload=dns.Record_NS(name='ns.twistedmatrix.com',
831                          ttl=700))]
832         m = dns.Message(id=999, answer=1, opCode=0, recDes=0, recAv=1, auth=1,
833                         rCode=0, trunc=0, maxSize=0)
834         m.answers = records
835         messages.append(m)
836         return self.assertFailure(
837             resolver.getHostByName("fooby.com"), socket.gaierror)
838
839
840
841 class SecondaryAuthorityServiceTests(unittest.TestCase):
842     """
843     Tests for L{SecondaryAuthorityService}, a service which keeps one or more
844     authorities up to date by doing zone transfers from a master.
845     """
846
847     def test_constructAuthorityFromHost(self):
848         """
849         L{SecondaryAuthorityService} can be constructed with a C{str} giving a
850         master server address and several domains, causing the creation of a
851         secondary authority for each domain and that master server address and
852         the default DNS port.
853         """
854         primary = '192.168.1.2'
855         service = SecondaryAuthorityService(
856             primary, ['example.com', 'example.org'])
857         self.assertEqual(service.primary, primary)
858         self.assertEqual(service._port, 53)
859
860         self.assertEqual(service.domains[0].primary, primary)
861         self.assertEqual(service.domains[0]._port, 53)
862         self.assertEqual(service.domains[0].domain, 'example.com')
863
864         self.assertEqual(service.domains[1].primary, primary)
865         self.assertEqual(service.domains[1]._port, 53)
866         self.assertEqual(service.domains[1].domain, 'example.org')
867
868
869     def test_constructAuthorityFromHostAndPort(self):
870         """
871         L{SecondaryAuthorityService.fromServerAddressAndDomains} constructs a
872         new L{SecondaryAuthorityService} from a C{str} giving a master server
873         address and DNS port and several domains, causing the creation of a secondary
874         authority for each domain and that master server address and the given
875         DNS port.
876         """
877         primary = '192.168.1.3'
878         port = 5335
879         service = SecondaryAuthorityService.fromServerAddressAndDomains(
880             (primary, port), ['example.net', 'example.edu'])
881         self.assertEqual(service.primary, primary)
882         self.assertEqual(service._port, 5335)
883
884         self.assertEqual(service.domains[0].primary, primary)
885         self.assertEqual(service.domains[0]._port, port)
886         self.assertEqual(service.domains[0].domain, 'example.net')
887
888         self.assertEqual(service.domains[1].primary, primary)
889         self.assertEqual(service.domains[1]._port, port)
890         self.assertEqual(service.domains[1].domain, 'example.edu')
891
892
893
894 class SecondaryAuthorityTests(unittest.TestCase):
895     """
896     L{twisted.names.secondary.SecondaryAuthority} correctly constructs objects
897     with a specified IP address and optionally specified DNS port.
898     """
899
900     def test_defaultPort(self):
901         """
902         When constructed using L{SecondaryAuthority.__init__}, the default port
903         of 53 is used.
904         """
905         secondary = SecondaryAuthority('192.168.1.1', 'inside.com')
906         self.assertEqual(secondary.primary, '192.168.1.1')
907         self.assertEqual(secondary._port, 53)
908         self.assertEqual(secondary.domain, 'inside.com')
909
910
911     def test_explicitPort(self):
912         """
913         When constructed using L{SecondaryAuthority.fromServerAddressAndDomain},
914         the specified port is used.
915         """
916         secondary = SecondaryAuthority.fromServerAddressAndDomain(
917             ('192.168.1.1', 5353), 'inside.com')
918         self.assertEqual(secondary.primary, '192.168.1.1')
919         self.assertEqual(secondary._port, 5353)
920         self.assertEqual(secondary.domain, 'inside.com')
921
922
923     def test_transfer(self):
924         """
925         An attempt is made to transfer the zone for the domain the
926         L{SecondaryAuthority} was constructed with from the server address it
927         was constructed with when L{SecondaryAuthority.transfer} is called.
928         """
929         class ClockMemoryReactor(Clock, MemoryReactor):
930             def __init__(self):
931                 Clock.__init__(self)
932                 MemoryReactor.__init__(self)
933
934         secondary = SecondaryAuthority.fromServerAddressAndDomain(
935             ('192.168.1.2', 1234), 'example.com')
936         secondary._reactor = reactor = ClockMemoryReactor()
937
938         secondary.transfer()
939
940         # Verify a connection attempt to the server address above
941         host, port, factory, timeout, bindAddress = reactor.tcpClients.pop(0)
942         self.assertEqual(host, '192.168.1.2')
943         self.assertEqual(port, 1234)
944
945         # See if a zone transfer query is issued.
946         proto = factory.buildProtocol((host, port))
947         transport = StringTransport()
948         proto.makeConnection(transport)
949
950         msg = Message()
951         # DNSProtocol.writeMessage length encodes the message by prepending a
952         # 2 byte message length to the buffered value.
953         msg.decode(StringIO(transport.value()[2:]))
954
955         self.assertEqual(
956             [dns.Query('example.com', dns.AXFR, dns.IN)], msg.queries)