1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Tests for L{twisted.protocol.socks}, an implementation of the SOCKSv4 and
11 from twisted.trial import unittest
12 from twisted.test import proto_helpers
13 from twisted.internet import defer, address, reactor
14 from twisted.internet.error import DNSLookupError
15 from twisted.protocols import socks
18 class StringTCPTransport(proto_helpers.StringTransport):
19 stringTCPTransport_closing = False
26 return address.IPv4Address('TCP', '2.3.4.5', 42)
28 def loseConnection(self):
29 self.stringTCPTransport_closing = True
33 class FakeResolverReactor:
35 Bare-bones reactor with deterministic behavior for the resolve method.
37 def __init__(self, names):
39 @type names: C{dict} containing C{str} keys and C{str} values.
40 @param names: A hostname to IP address mapping. The IP addresses are
41 stringified dotted quads.
46 def resolve(self, hostname):
48 Resolve a hostname by looking it up in the C{names} dictionary.
51 return defer.succeed(self.names[hostname])
54 DNSLookupError("FakeResolverReactor couldn't find " + hostname))
58 class SOCKSv4Driver(socks.SOCKSv4):
59 # last SOCKSv4Outgoing instantiated
60 driver_outgoing = None
62 # last SOCKSv4IncomingFactory instantiated
65 def connectClass(self, host, port, klass, *args):
68 proto.transport = StringTCPTransport()
69 proto.transport.peer = address.IPv4Address('TCP', host, port)
70 proto.connectionMade()
71 self.driver_outgoing = proto
72 return defer.succeed(proto)
74 def listenClass(self, port, klass, *args):
76 factory = klass(*args)
77 self.driver_listen = factory
80 return defer.succeed(('6.7.8.9', port))
84 class Connect(unittest.TestCase):
86 Tests for SOCKS and SOCKSv4a connect requests using the L{SOCKSv4} protocol.
89 self.sock = SOCKSv4Driver()
90 self.sock.transport = StringTCPTransport()
91 self.sock.connectionMade()
92 self.sock.reactor = FakeResolverReactor({"localhost":"127.0.0.1"})
96 outgoing = self.sock.driver_outgoing
97 if outgoing is not None:
98 self.assert_(outgoing.transport.stringTCPTransport_closing,
99 "Outgoing SOCKS connections need to be closed.")
102 def test_simple(self):
103 self.sock.dataReceived(
104 struct.pack('!BBH', 4, 1, 34)
105 + socket.inet_aton('1.2.3.4')
108 sent = self.sock.transport.value()
109 self.sock.transport.clear()
110 self.assertEqual(sent,
111 struct.pack('!BBH', 0, 90, 34)
112 + socket.inet_aton('1.2.3.4'))
113 self.assert_(not self.sock.transport.stringTCPTransport_closing)
114 self.assert_(self.sock.driver_outgoing is not None)
116 # pass some data through
117 self.sock.dataReceived('hello, world')
118 self.assertEqual(self.sock.driver_outgoing.transport.value(),
121 # the other way around
122 self.sock.driver_outgoing.dataReceived('hi there')
123 self.assertEqual(self.sock.transport.value(), 'hi there')
125 self.sock.connectionLost('fake reason')
128 def test_socks4aSuccessfulResolution(self):
130 If the destination IP address has zeros for the first three octets and
131 non-zero for the fourth octet, the client is attempting a v4a
132 connection. A hostname is specified after the user ID string and the
133 server connects to the address that hostname resolves to.
135 @see: U{http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a_protocol}
137 # send the domain name "localhost" to be resolved
139 struct.pack('!BBH', 4, 1, 34)
140 + socket.inet_aton('0.0.0.1')
144 # Deliver the bytes one by one to exercise the protocol's buffering
145 # logic. FakeResolverReactor's resolve method is invoked to "resolve"
147 for byte in clientRequest:
148 self.sock.dataReceived(byte)
150 sent = self.sock.transport.value()
151 self.sock.transport.clear()
153 # Verify that the server responded with the address which will be
157 struct.pack('!BBH', 0, 90, 34) + socket.inet_aton('127.0.0.1'))
158 self.assertFalse(self.sock.transport.stringTCPTransport_closing)
159 self.assertNotIdentical(self.sock.driver_outgoing, None)
161 # Pass some data through and verify it is forwarded to the outgoing
163 self.sock.dataReceived('hello, world')
165 self.sock.driver_outgoing.transport.value(), 'hello, world')
167 # Deliver some data from the output connection and verify it is
168 # passed along to the incoming side.
169 self.sock.driver_outgoing.dataReceived('hi there')
170 self.assertEqual(self.sock.transport.value(), 'hi there')
172 self.sock.connectionLost('fake reason')
175 def test_socks4aFailedResolution(self):
177 Failed hostname resolution on a SOCKSv4a packet results in a 91 error
178 response and the connection getting closed.
180 # send the domain name "failinghost" to be resolved
182 struct.pack('!BBH', 4, 1, 34)
183 + socket.inet_aton('0.0.0.1')
187 # Deliver the bytes one by one to exercise the protocol's buffering
188 # logic. FakeResolverReactor's resolve method is invoked to "resolve"
190 for byte in clientRequest:
191 self.sock.dataReceived(byte)
193 # Verify that the server responds with a 91 error.
194 sent = self.sock.transport.value()
197 struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0'))
199 # A failed resolution causes the transport to drop the connection.
200 self.assertTrue(self.sock.transport.stringTCPTransport_closing)
201 self.assertIdentical(self.sock.driver_outgoing, None)
204 def test_accessDenied(self):
205 self.sock.authorize = lambda code, server, port, user: 0
206 self.sock.dataReceived(
207 struct.pack('!BBH', 4, 1, 4242)
208 + socket.inet_aton('10.2.3.4')
211 self.assertEqual(self.sock.transport.value(),
212 struct.pack('!BBH', 0, 91, 0)
213 + socket.inet_aton('0.0.0.0'))
214 self.assert_(self.sock.transport.stringTCPTransport_closing)
215 self.assertIdentical(self.sock.driver_outgoing, None)
218 def test_eofRemote(self):
219 self.sock.dataReceived(
220 struct.pack('!BBH', 4, 1, 34)
221 + socket.inet_aton('1.2.3.4')
224 sent = self.sock.transport.value()
225 self.sock.transport.clear()
227 # pass some data through
228 self.sock.dataReceived('hello, world')
229 self.assertEqual(self.sock.driver_outgoing.transport.value(),
232 # now close it from the server side
233 self.sock.driver_outgoing.transport.loseConnection()
234 self.sock.driver_outgoing.connectionLost('fake reason')
237 def test_eofLocal(self):
238 self.sock.dataReceived(
239 struct.pack('!BBH', 4, 1, 34)
240 + socket.inet_aton('1.2.3.4')
243 sent = self.sock.transport.value()
244 self.sock.transport.clear()
246 # pass some data through
247 self.sock.dataReceived('hello, world')
248 self.assertEqual(self.sock.driver_outgoing.transport.value(),
251 # now close it from the client side
252 self.sock.connectionLost('fake reason')
256 class Bind(unittest.TestCase):
258 Tests for SOCKS and SOCKSv4a bind requests using the L{SOCKSv4} protocol.
261 self.sock = SOCKSv4Driver()
262 self.sock.transport = StringTCPTransport()
263 self.sock.connectionMade()
264 self.sock.reactor = FakeResolverReactor({"localhost":"127.0.0.1"})
266 ## def tearDown(self):
267 ## # TODO ensure the listen port is closed
268 ## listen = self.sock.driver_listen
269 ## if listen is not None:
270 ## self.assert_(incoming.transport.stringTCPTransport_closing,
271 ## "Incoming SOCKS connections need to be closed.")
273 def test_simple(self):
274 self.sock.dataReceived(
275 struct.pack('!BBH', 4, 2, 34)
276 + socket.inet_aton('1.2.3.4')
279 sent = self.sock.transport.value()
280 self.sock.transport.clear()
281 self.assertEqual(sent,
282 struct.pack('!BBH', 0, 90, 1234)
283 + socket.inet_aton('6.7.8.9'))
284 self.assert_(not self.sock.transport.stringTCPTransport_closing)
285 self.assert_(self.sock.driver_listen is not None)
288 incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
289 self.assertNotIdentical(incoming, None)
290 incoming.transport = StringTCPTransport()
291 incoming.connectionMade()
293 # now we should have the second reply packet
294 sent = self.sock.transport.value()
295 self.sock.transport.clear()
296 self.assertEqual(sent,
297 struct.pack('!BBH', 0, 90, 0)
298 + socket.inet_aton('0.0.0.0'))
299 self.assert_(not self.sock.transport.stringTCPTransport_closing)
301 # pass some data through
302 self.sock.dataReceived('hello, world')
303 self.assertEqual(incoming.transport.value(),
306 # the other way around
307 incoming.dataReceived('hi there')
308 self.assertEqual(self.sock.transport.value(), 'hi there')
310 self.sock.connectionLost('fake reason')
313 def test_socks4a(self):
315 If the destination IP address has zeros for the first three octets and
316 non-zero for the fourth octet, the client is attempting a v4a
317 connection. A hostname is specified after the user ID string and the
318 server connects to the address that hostname resolves to.
320 @see: U{http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a_protocol}
322 # send the domain name "localhost" to be resolved
324 struct.pack('!BBH', 4, 2, 34)
325 + socket.inet_aton('0.0.0.1')
329 # Deliver the bytes one by one to exercise the protocol's buffering
330 # logic. FakeResolverReactor's resolve method is invoked to "resolve"
332 for byte in clientRequest:
333 self.sock.dataReceived(byte)
335 sent = self.sock.transport.value()
336 self.sock.transport.clear()
338 # Verify that the server responded with the address which will be
342 struct.pack('!BBH', 0, 90, 1234) + socket.inet_aton('6.7.8.9'))
343 self.assertFalse(self.sock.transport.stringTCPTransport_closing)
344 self.assertNotIdentical(self.sock.driver_listen, None)
347 incoming = self.sock.driver_listen.buildProtocol(('127.0.0.1', 5345))
348 self.assertNotIdentical(incoming, None)
349 incoming.transport = StringTCPTransport()
350 incoming.connectionMade()
352 # now we should have the second reply packet
353 sent = self.sock.transport.value()
354 self.sock.transport.clear()
355 self.assertEqual(sent,
356 struct.pack('!BBH', 0, 90, 0)
357 + socket.inet_aton('0.0.0.0'))
358 self.assertNotIdentical(
359 self.sock.transport.stringTCPTransport_closing, None)
361 # Deliver some data from the output connection and verify it is
362 # passed along to the incoming side.
363 self.sock.dataReceived('hi there')
364 self.assertEqual(incoming.transport.value(), 'hi there')
366 # the other way around
367 incoming.dataReceived('hi there')
368 self.assertEqual(self.sock.transport.value(), 'hi there')
370 self.sock.connectionLost('fake reason')
373 def test_socks4aFailedResolution(self):
375 Failed hostname resolution on a SOCKSv4a packet results in a 91 error
376 response and the connection getting closed.
378 # send the domain name "failinghost" to be resolved
380 struct.pack('!BBH', 4, 2, 34)
381 + socket.inet_aton('0.0.0.1')
385 # Deliver the bytes one by one to exercise the protocol's buffering
386 # logic. FakeResolverReactor's resolve method is invoked to "resolve"
388 for byte in clientRequest:
389 self.sock.dataReceived(byte)
391 # Verify that the server responds with a 91 error.
392 sent = self.sock.transport.value()
395 struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0'))
397 # A failed resolution causes the transport to drop the connection.
398 self.assertTrue(self.sock.transport.stringTCPTransport_closing)
399 self.assertIdentical(self.sock.driver_outgoing, None)
402 def test_accessDenied(self):
403 self.sock.authorize = lambda code, server, port, user: 0
404 self.sock.dataReceived(
405 struct.pack('!BBH', 4, 2, 4242)
406 + socket.inet_aton('10.2.3.4')
409 self.assertEqual(self.sock.transport.value(),
410 struct.pack('!BBH', 0, 91, 0)
411 + socket.inet_aton('0.0.0.0'))
412 self.assert_(self.sock.transport.stringTCPTransport_closing)
413 self.assertIdentical(self.sock.driver_listen, None)
415 def test_eofRemote(self):
416 self.sock.dataReceived(
417 struct.pack('!BBH', 4, 2, 34)
418 + socket.inet_aton('1.2.3.4')
421 sent = self.sock.transport.value()
422 self.sock.transport.clear()
425 incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
426 self.assertNotIdentical(incoming, None)
427 incoming.transport = StringTCPTransport()
428 incoming.connectionMade()
430 # now we should have the second reply packet
431 sent = self.sock.transport.value()
432 self.sock.transport.clear()
433 self.assertEqual(sent,
434 struct.pack('!BBH', 0, 90, 0)
435 + socket.inet_aton('0.0.0.0'))
436 self.assert_(not self.sock.transport.stringTCPTransport_closing)
438 # pass some data through
439 self.sock.dataReceived('hello, world')
440 self.assertEqual(incoming.transport.value(),
443 # now close it from the server side
444 incoming.transport.loseConnection()
445 incoming.connectionLost('fake reason')
447 def test_eofLocal(self):
448 self.sock.dataReceived(
449 struct.pack('!BBH', 4, 2, 34)
450 + socket.inet_aton('1.2.3.4')
453 sent = self.sock.transport.value()
454 self.sock.transport.clear()
457 incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
458 self.assertNotIdentical(incoming, None)
459 incoming.transport = StringTCPTransport()
460 incoming.connectionMade()
462 # now we should have the second reply packet
463 sent = self.sock.transport.value()
464 self.sock.transport.clear()
465 self.assertEqual(sent,
466 struct.pack('!BBH', 0, 90, 0)
467 + socket.inet_aton('0.0.0.0'))
468 self.assert_(not self.sock.transport.stringTCPTransport_closing)
470 # pass some data through
471 self.sock.dataReceived('hello, world')
472 self.assertEqual(incoming.transport.value(),
475 # now close it from the client side
476 self.sock.connectionLost('fake reason')
478 def test_badSource(self):
479 self.sock.dataReceived(
480 struct.pack('!BBH', 4, 2, 34)
481 + socket.inet_aton('1.2.3.4')
484 sent = self.sock.transport.value()
485 self.sock.transport.clear()
487 # connect from WRONG address
488 incoming = self.sock.driver_listen.buildProtocol(('1.6.6.6', 666))
489 self.assertIdentical(incoming, None)
491 # Now we should have the second reply packet and it should
492 # be a failure. The connection should be closing.
493 sent = self.sock.transport.value()
494 self.sock.transport.clear()
495 self.assertEqual(sent,
496 struct.pack('!BBH', 0, 91, 0)
497 + socket.inet_aton('0.0.0.0'))
498 self.assert_(self.sock.transport.stringTCPTransport_closing)