Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / test / test_socks.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for L{twisted.protocol.socks}, an implementation of the SOCKSv4 and
6 SOCKSv4a protocols.
7 """
8
9 import struct, socket
10
11 from twisted.trial import unittest
12 from twisted.test import proto_helpers
13 from twisted.internet import defer, address, reactor
14 from twisted.internet.error import DNSLookupError
15 from twisted.protocols import socks
16
17
18 class StringTCPTransport(proto_helpers.StringTransport):
19     stringTCPTransport_closing = False
20     peer = None
21
22     def getPeer(self):
23         return self.peer
24
25     def getHost(self):
26         return address.IPv4Address('TCP', '2.3.4.5', 42)
27
28     def loseConnection(self):
29         self.stringTCPTransport_closing = True
30
31
32
33 class FakeResolverReactor:
34     """
35     Bare-bones reactor with deterministic behavior for the resolve method.
36     """
37     def __init__(self, names):
38         """
39         @type names: C{dict} containing C{str} keys and C{str} values.
40         @param names: A hostname to IP address mapping. The IP addresses are
41             stringified dotted quads.
42         """
43         self.names = names
44
45
46     def resolve(self, hostname):
47         """
48         Resolve a hostname by looking it up in the C{names} dictionary.
49         """
50         try:
51             return defer.succeed(self.names[hostname])
52         except KeyError:
53             return defer.fail(
54                 DNSLookupError("FakeResolverReactor couldn't find " + hostname))
55
56
57
58 class SOCKSv4Driver(socks.SOCKSv4):
59     # last SOCKSv4Outgoing instantiated
60     driver_outgoing = None
61
62     # last SOCKSv4IncomingFactory instantiated
63     driver_listen = None
64
65     def connectClass(self, host, port, klass, *args):
66         # fake it
67         proto = klass(*args)
68         proto.transport = StringTCPTransport()
69         proto.transport.peer = address.IPv4Address('TCP', host, port)
70         proto.connectionMade()
71         self.driver_outgoing = proto
72         return defer.succeed(proto)
73
74     def listenClass(self, port, klass, *args):
75         # fake it
76         factory = klass(*args)
77         self.driver_listen = factory
78         if port == 0:
79             port = 1234
80         return defer.succeed(('6.7.8.9', port))
81
82
83
84 class Connect(unittest.TestCase):
85     """
86     Tests for SOCKS and SOCKSv4a connect requests using the L{SOCKSv4} protocol.
87     """
88     def setUp(self):
89         self.sock = SOCKSv4Driver()
90         self.sock.transport = StringTCPTransport()
91         self.sock.connectionMade()
92         self.sock.reactor = FakeResolverReactor({"localhost":"127.0.0.1"})
93
94
95     def tearDown(self):
96         outgoing = self.sock.driver_outgoing
97         if outgoing is not None:
98             self.assert_(outgoing.transport.stringTCPTransport_closing,
99                          "Outgoing SOCKS connections need to be closed.")
100
101
102     def test_simple(self):
103         self.sock.dataReceived(
104             struct.pack('!BBH', 4, 1, 34)
105             + socket.inet_aton('1.2.3.4')
106             + 'fooBAR'
107             + '\0')
108         sent = self.sock.transport.value()
109         self.sock.transport.clear()
110         self.assertEqual(sent,
111                          struct.pack('!BBH', 0, 90, 34)
112                          + socket.inet_aton('1.2.3.4'))
113         self.assert_(not self.sock.transport.stringTCPTransport_closing)
114         self.assert_(self.sock.driver_outgoing is not None)
115
116         # pass some data through
117         self.sock.dataReceived('hello, world')
118         self.assertEqual(self.sock.driver_outgoing.transport.value(),
119                          'hello, world')
120
121         # the other way around
122         self.sock.driver_outgoing.dataReceived('hi there')
123         self.assertEqual(self.sock.transport.value(), 'hi there')
124
125         self.sock.connectionLost('fake reason')
126
127
128     def test_socks4aSuccessfulResolution(self):
129         """
130         If the destination IP address has zeros for the first three octets and
131         non-zero for the fourth octet, the client is attempting a v4a
132         connection.  A hostname is specified after the user ID string and the
133         server connects to the address that hostname resolves to.
134
135         @see: U{http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a_protocol}
136         """
137         # send the domain name "localhost" to be resolved
138         clientRequest = (
139             struct.pack('!BBH', 4, 1, 34)
140             + socket.inet_aton('0.0.0.1')
141             + 'fooBAZ\0'
142             + 'localhost\0')
143
144         # Deliver the bytes one by one to exercise the protocol's buffering
145         # logic. FakeResolverReactor's resolve method is invoked to "resolve"
146         # the hostname.
147         for byte in clientRequest:
148             self.sock.dataReceived(byte)
149
150         sent = self.sock.transport.value()
151         self.sock.transport.clear()
152
153         # Verify that the server responded with the address which will be
154         # connected to.
155         self.assertEqual(
156             sent,
157             struct.pack('!BBH', 0, 90, 34) + socket.inet_aton('127.0.0.1'))
158         self.assertFalse(self.sock.transport.stringTCPTransport_closing)
159         self.assertNotIdentical(self.sock.driver_outgoing, None)
160
161         # Pass some data through and verify it is forwarded to the outgoing
162         # connection.
163         self.sock.dataReceived('hello, world')
164         self.assertEqual(
165             self.sock.driver_outgoing.transport.value(), 'hello, world')
166
167         # Deliver some data from the output connection and verify it is
168         # passed along to the incoming side.
169         self.sock.driver_outgoing.dataReceived('hi there')
170         self.assertEqual(self.sock.transport.value(), 'hi there')
171
172         self.sock.connectionLost('fake reason')
173
174
175     def test_socks4aFailedResolution(self):
176         """
177         Failed hostname resolution on a SOCKSv4a packet results in a 91 error
178         response and the connection getting closed.
179         """
180         # send the domain name "failinghost" to be resolved
181         clientRequest = (
182             struct.pack('!BBH', 4, 1, 34)
183             + socket.inet_aton('0.0.0.1')
184             + 'fooBAZ\0'
185             + 'failinghost\0')
186
187         # Deliver the bytes one by one to exercise the protocol's buffering
188         # logic. FakeResolverReactor's resolve method is invoked to "resolve"
189         # the hostname.
190         for byte in clientRequest:
191             self.sock.dataReceived(byte)
192
193         # Verify that the server responds with a 91 error.
194         sent = self.sock.transport.value()
195         self.assertEqual(
196             sent,
197             struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0'))
198
199         # A failed resolution causes the transport to drop the connection.
200         self.assertTrue(self.sock.transport.stringTCPTransport_closing)
201         self.assertIdentical(self.sock.driver_outgoing, None)
202
203
204     def test_accessDenied(self):
205         self.sock.authorize = lambda code, server, port, user: 0
206         self.sock.dataReceived(
207             struct.pack('!BBH', 4, 1, 4242)
208             + socket.inet_aton('10.2.3.4')
209             + 'fooBAR'
210             + '\0')
211         self.assertEqual(self.sock.transport.value(),
212                          struct.pack('!BBH', 0, 91, 0)
213                          + socket.inet_aton('0.0.0.0'))
214         self.assert_(self.sock.transport.stringTCPTransport_closing)
215         self.assertIdentical(self.sock.driver_outgoing, None)
216
217
218     def test_eofRemote(self):
219         self.sock.dataReceived(
220             struct.pack('!BBH', 4, 1, 34)
221             + socket.inet_aton('1.2.3.4')
222             + 'fooBAR'
223             + '\0')
224         sent = self.sock.transport.value()
225         self.sock.transport.clear()
226
227         # pass some data through
228         self.sock.dataReceived('hello, world')
229         self.assertEqual(self.sock.driver_outgoing.transport.value(),
230                          'hello, world')
231
232         # now close it from the server side
233         self.sock.driver_outgoing.transport.loseConnection()
234         self.sock.driver_outgoing.connectionLost('fake reason')
235
236
237     def test_eofLocal(self):
238         self.sock.dataReceived(
239             struct.pack('!BBH', 4, 1, 34)
240             + socket.inet_aton('1.2.3.4')
241             + 'fooBAR'
242             + '\0')
243         sent = self.sock.transport.value()
244         self.sock.transport.clear()
245
246         # pass some data through
247         self.sock.dataReceived('hello, world')
248         self.assertEqual(self.sock.driver_outgoing.transport.value(),
249                          'hello, world')
250
251         # now close it from the client side
252         self.sock.connectionLost('fake reason')
253
254
255
256 class Bind(unittest.TestCase):
257     """
258     Tests for SOCKS and SOCKSv4a bind requests using the L{SOCKSv4} protocol.
259     """
260     def setUp(self):
261         self.sock = SOCKSv4Driver()
262         self.sock.transport = StringTCPTransport()
263         self.sock.connectionMade()
264         self.sock.reactor = FakeResolverReactor({"localhost":"127.0.0.1"})
265
266 ##     def tearDown(self):
267 ##         # TODO ensure the listen port is closed
268 ##         listen = self.sock.driver_listen
269 ##         if listen is not None:
270 ##             self.assert_(incoming.transport.stringTCPTransport_closing,
271 ##                     "Incoming SOCKS connections need to be closed.")
272
273     def test_simple(self):
274         self.sock.dataReceived(
275             struct.pack('!BBH', 4, 2, 34)
276             + socket.inet_aton('1.2.3.4')
277             + 'fooBAR'
278             + '\0')
279         sent = self.sock.transport.value()
280         self.sock.transport.clear()
281         self.assertEqual(sent,
282                          struct.pack('!BBH', 0, 90, 1234)
283                          + socket.inet_aton('6.7.8.9'))
284         self.assert_(not self.sock.transport.stringTCPTransport_closing)
285         self.assert_(self.sock.driver_listen is not None)
286
287         # connect
288         incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
289         self.assertNotIdentical(incoming, None)
290         incoming.transport = StringTCPTransport()
291         incoming.connectionMade()
292
293         # now we should have the second reply packet
294         sent = self.sock.transport.value()
295         self.sock.transport.clear()
296         self.assertEqual(sent,
297                          struct.pack('!BBH', 0, 90, 0)
298                          + socket.inet_aton('0.0.0.0'))
299         self.assert_(not self.sock.transport.stringTCPTransport_closing)
300
301         # pass some data through
302         self.sock.dataReceived('hello, world')
303         self.assertEqual(incoming.transport.value(),
304                          'hello, world')
305
306         # the other way around
307         incoming.dataReceived('hi there')
308         self.assertEqual(self.sock.transport.value(), 'hi there')
309
310         self.sock.connectionLost('fake reason')
311
312
313     def test_socks4a(self):
314         """
315         If the destination IP address has zeros for the first three octets and
316         non-zero for the fourth octet, the client is attempting a v4a
317         connection.  A hostname is specified after the user ID string and the
318         server connects to the address that hostname resolves to.
319
320         @see: U{http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a_protocol}
321         """
322         # send the domain name "localhost" to be resolved
323         clientRequest = (
324             struct.pack('!BBH', 4, 2, 34)
325             + socket.inet_aton('0.0.0.1')
326             + 'fooBAZ\0'
327             + 'localhost\0')
328
329         # Deliver the bytes one by one to exercise the protocol's buffering
330         # logic. FakeResolverReactor's resolve method is invoked to "resolve"
331         # the hostname.
332         for byte in clientRequest:
333             self.sock.dataReceived(byte)
334
335         sent = self.sock.transport.value()
336         self.sock.transport.clear()
337
338         # Verify that the server responded with the address which will be
339         # connected to.
340         self.assertEqual(
341             sent,
342             struct.pack('!BBH', 0, 90, 1234) + socket.inet_aton('6.7.8.9'))
343         self.assertFalse(self.sock.transport.stringTCPTransport_closing)
344         self.assertNotIdentical(self.sock.driver_listen, None)
345
346         # connect
347         incoming = self.sock.driver_listen.buildProtocol(('127.0.0.1', 5345))
348         self.assertNotIdentical(incoming, None)
349         incoming.transport = StringTCPTransport()
350         incoming.connectionMade()
351
352         # now we should have the second reply packet
353         sent = self.sock.transport.value()
354         self.sock.transport.clear()
355         self.assertEqual(sent,
356                          struct.pack('!BBH', 0, 90, 0)
357                          + socket.inet_aton('0.0.0.0'))
358         self.assertNotIdentical(
359             self.sock.transport.stringTCPTransport_closing, None)
360
361         # Deliver some data from the output connection and verify it is
362         # passed along to the incoming side.
363         self.sock.dataReceived('hi there')
364         self.assertEqual(incoming.transport.value(), 'hi there')
365
366         # the other way around
367         incoming.dataReceived('hi there')
368         self.assertEqual(self.sock.transport.value(), 'hi there')
369
370         self.sock.connectionLost('fake reason')
371
372
373     def test_socks4aFailedResolution(self):
374         """
375         Failed hostname resolution on a SOCKSv4a packet results in a 91 error
376         response and the connection getting closed.
377         """
378         # send the domain name "failinghost" to be resolved
379         clientRequest = (
380             struct.pack('!BBH', 4, 2, 34)
381             + socket.inet_aton('0.0.0.1')
382             + 'fooBAZ\0'
383             + 'failinghost\0')
384
385         # Deliver the bytes one by one to exercise the protocol's buffering
386         # logic. FakeResolverReactor's resolve method is invoked to "resolve"
387         # the hostname.
388         for byte in clientRequest:
389             self.sock.dataReceived(byte)
390
391         # Verify that the server responds with a 91 error.
392         sent = self.sock.transport.value()
393         self.assertEqual(
394             sent,
395             struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0'))
396
397         # A failed resolution causes the transport to drop the connection.
398         self.assertTrue(self.sock.transport.stringTCPTransport_closing)
399         self.assertIdentical(self.sock.driver_outgoing, None)
400
401
402     def test_accessDenied(self):
403         self.sock.authorize = lambda code, server, port, user: 0
404         self.sock.dataReceived(
405             struct.pack('!BBH', 4, 2, 4242)
406             + socket.inet_aton('10.2.3.4')
407             + 'fooBAR'
408             + '\0')
409         self.assertEqual(self.sock.transport.value(),
410                          struct.pack('!BBH', 0, 91, 0)
411                          + socket.inet_aton('0.0.0.0'))
412         self.assert_(self.sock.transport.stringTCPTransport_closing)
413         self.assertIdentical(self.sock.driver_listen, None)
414
415     def test_eofRemote(self):
416         self.sock.dataReceived(
417             struct.pack('!BBH', 4, 2, 34)
418             + socket.inet_aton('1.2.3.4')
419             + 'fooBAR'
420             + '\0')
421         sent = self.sock.transport.value()
422         self.sock.transport.clear()
423
424         # connect
425         incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
426         self.assertNotIdentical(incoming, None)
427         incoming.transport = StringTCPTransport()
428         incoming.connectionMade()
429
430         # now we should have the second reply packet
431         sent = self.sock.transport.value()
432         self.sock.transport.clear()
433         self.assertEqual(sent,
434                          struct.pack('!BBH', 0, 90, 0)
435                          + socket.inet_aton('0.0.0.0'))
436         self.assert_(not self.sock.transport.stringTCPTransport_closing)
437
438         # pass some data through
439         self.sock.dataReceived('hello, world')
440         self.assertEqual(incoming.transport.value(),
441                          'hello, world')
442
443         # now close it from the server side
444         incoming.transport.loseConnection()
445         incoming.connectionLost('fake reason')
446
447     def test_eofLocal(self):
448         self.sock.dataReceived(
449             struct.pack('!BBH', 4, 2, 34)
450             + socket.inet_aton('1.2.3.4')
451             + 'fooBAR'
452             + '\0')
453         sent = self.sock.transport.value()
454         self.sock.transport.clear()
455
456         # connect
457         incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
458         self.assertNotIdentical(incoming, None)
459         incoming.transport = StringTCPTransport()
460         incoming.connectionMade()
461
462         # now we should have the second reply packet
463         sent = self.sock.transport.value()
464         self.sock.transport.clear()
465         self.assertEqual(sent,
466                          struct.pack('!BBH', 0, 90, 0)
467                          + socket.inet_aton('0.0.0.0'))
468         self.assert_(not self.sock.transport.stringTCPTransport_closing)
469
470         # pass some data through
471         self.sock.dataReceived('hello, world')
472         self.assertEqual(incoming.transport.value(),
473                          'hello, world')
474
475         # now close it from the client side
476         self.sock.connectionLost('fake reason')
477
478     def test_badSource(self):
479         self.sock.dataReceived(
480             struct.pack('!BBH', 4, 2, 34)
481             + socket.inet_aton('1.2.3.4')
482             + 'fooBAR'
483             + '\0')
484         sent = self.sock.transport.value()
485         self.sock.transport.clear()
486
487         # connect from WRONG address
488         incoming = self.sock.driver_listen.buildProtocol(('1.6.6.6', 666))
489         self.assertIdentical(incoming, None)
490
491         # Now we should have the second reply packet and it should
492         # be a failure. The connection should be closing.
493         sent = self.sock.transport.value()
494         self.sock.transport.clear()
495         self.assertEqual(sent,
496                          struct.pack('!BBH', 0, 91, 0)
497                          + socket.inet_aton('0.0.0.0'))
498         self.assert_(self.sock.transport.stringTCPTransport_closing)