Imported Upstream version 12.1.0
[contrib/python-twisted.git] / twisted / test / test_ident.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4
5 """
6 Test cases for twisted.protocols.ident module.
7 """
8
9 import struct
10
11 from twisted.protocols import ident
12 from twisted.python import failure
13 from twisted.internet import error
14 from twisted.internet import defer
15
16 from twisted.trial import unittest
17 from twisted.test.proto_helpers import StringTransport
18
19
20
21 class ClassParserTestCase(unittest.TestCase):
22     """
23     Test parsing of ident responses.
24     """
25
26     def setUp(self):
27         """
28         Create a ident client used in tests.
29         """
30         self.client = ident.IdentClient()
31
32
33     def test_indentError(self):
34         """
35         'UNKNOWN-ERROR' error should map to the L{ident.IdentError} exception.
36         """
37         d = defer.Deferred()
38         self.client.queries.append((d, 123, 456))
39         self.client.lineReceived('123, 456 : ERROR : UNKNOWN-ERROR')
40         return self.assertFailure(d, ident.IdentError)
41
42
43     def test_noUSerError(self):
44         """
45         'NO-USER' error should map to the L{ident.NoUser} exception.
46         """
47         d = defer.Deferred()
48         self.client.queries.append((d, 234, 456))
49         self.client.lineReceived('234, 456 : ERROR : NO-USER')
50         return self.assertFailure(d, ident.NoUser)
51
52
53     def test_invalidPortError(self):
54         """
55         'INVALID-PORT' error should map to the L{ident.InvalidPort} exception.
56         """
57         d = defer.Deferred()
58         self.client.queries.append((d, 345, 567))
59         self.client.lineReceived('345, 567 :  ERROR : INVALID-PORT')
60         return self.assertFailure(d, ident.InvalidPort)
61
62
63     def test_hiddenUserError(self):
64         """
65         'HIDDEN-USER' error should map to the L{ident.HiddenUser} exception.
66         """
67         d = defer.Deferred()
68         self.client.queries.append((d, 567, 789))
69         self.client.lineReceived('567, 789 : ERROR : HIDDEN-USER')
70         return self.assertFailure(d, ident.HiddenUser)
71
72
73     def test_lostConnection(self):
74         """
75         A pending query which failed because of a ConnectionLost should
76         receive an L{ident.IdentError}.
77         """
78         d = defer.Deferred()
79         self.client.queries.append((d, 765, 432))
80         self.client.connectionLost(failure.Failure(error.ConnectionLost()))
81         return self.assertFailure(d, ident.IdentError)
82
83
84
85 class TestIdentServer(ident.IdentServer):
86     def lookup(self, serverAddress, clientAddress):
87         return self.resultValue
88
89
90 class TestErrorIdentServer(ident.IdentServer):
91     def lookup(self, serverAddress, clientAddress):
92         raise self.exceptionType()
93
94
95 class NewException(RuntimeError):
96     pass
97
98
99 class ServerParserTestCase(unittest.TestCase):
100     def testErrors(self):
101         p = TestErrorIdentServer()
102         p.makeConnection(StringTransport())
103         L = []
104         p.sendLine = L.append
105
106         p.exceptionType = ident.IdentError
107         p.lineReceived('123, 345')
108         self.assertEqual(L[0], '123, 345 : ERROR : UNKNOWN-ERROR')
109
110         p.exceptionType = ident.NoUser
111         p.lineReceived('432, 210')
112         self.assertEqual(L[1], '432, 210 : ERROR : NO-USER')
113
114         p.exceptionType = ident.InvalidPort
115         p.lineReceived('987, 654')
116         self.assertEqual(L[2], '987, 654 : ERROR : INVALID-PORT')
117
118         p.exceptionType = ident.HiddenUser
119         p.lineReceived('756, 827')
120         self.assertEqual(L[3], '756, 827 : ERROR : HIDDEN-USER')
121
122         p.exceptionType = NewException
123         p.lineReceived('987, 789')
124         self.assertEqual(L[4], '987, 789 : ERROR : UNKNOWN-ERROR')
125         errs = self.flushLoggedErrors(NewException)
126         self.assertEqual(len(errs), 1)
127
128         for port in -1, 0, 65536, 65537:
129             del L[:]
130             p.lineReceived('%d, 5' % (port,))
131             p.lineReceived('5, %d' % (port,))
132             self.assertEqual(
133                 L, ['%d, 5 : ERROR : INVALID-PORT' % (port,),
134                     '5, %d : ERROR : INVALID-PORT' % (port,)])
135
136     def testSuccess(self):
137         p = TestIdentServer()
138         p.makeConnection(StringTransport())
139         L = []
140         p.sendLine = L.append
141
142         p.resultValue = ('SYS', 'USER')
143         p.lineReceived('123, 456')
144         self.assertEqual(L[0], '123, 456 : USERID : SYS : USER')
145
146
147 if struct.pack('=L', 1)[0] == '\x01':
148     _addr1 = '0100007F'
149     _addr2 = '04030201'
150 else:
151     _addr1 = '7F000001'
152     _addr2 = '01020304'
153
154
155 class ProcMixinTestCase(unittest.TestCase):
156     line = ('4: %s:0019 %s:02FA 0A 00000000:00000000 '
157             '00:00000000 00000000     0        0 10927 1 f72a5b80 '
158             '3000 0 0 2 -1') % (_addr1, _addr2)
159
160     def testDottedQuadFromHexString(self):
161         p = ident.ProcServerMixin()
162         self.assertEqual(p.dottedQuadFromHexString(_addr1), '127.0.0.1')
163
164     def testUnpackAddress(self):
165         p = ident.ProcServerMixin()
166         self.assertEqual(p.unpackAddress(_addr1 + ':0277'),
167                           ('127.0.0.1', 631))
168
169     def testLineParser(self):
170         p = ident.ProcServerMixin()
171         self.assertEqual(
172             p.parseLine(self.line),
173             (('127.0.0.1', 25), ('1.2.3.4', 762), 0))
174
175     def testExistingAddress(self):
176         username = []
177         p = ident.ProcServerMixin()
178         p.entries = lambda: iter([self.line])
179         p.getUsername = lambda uid: (username.append(uid), 'root')[1]
180         self.assertEqual(
181             p.lookup(('127.0.0.1', 25), ('1.2.3.4', 762)),
182             (p.SYSTEM_NAME, 'root'))
183         self.assertEqual(username, [0])
184
185     def testNonExistingAddress(self):
186         p = ident.ProcServerMixin()
187         p.entries = lambda: iter([self.line])
188         self.assertRaises(ident.NoUser, p.lookup, ('127.0.0.1', 26),
189                                                   ('1.2.3.4', 762))
190         self.assertRaises(ident.NoUser, p.lookup, ('127.0.0.1', 25),
191                                                   ('1.2.3.5', 762))
192         self.assertRaises(ident.NoUser, p.lookup, ('127.0.0.1', 25),
193                                                   ('1.2.3.4', 763))
194