2 # -*- coding: utf-8 -*-
4 # $Id: test_ftpd.py 976 2012-01-22 22:39:10Z g.rodola $
6 # ======================================================================
7 # Copyright (C) 2007-2012 Giampaolo Rodola' <g.rodola@gmail.com>
11 # Permission is hereby granted, free of charge, to any person
12 # obtaining a copy of this software and associated documentation
13 # files (the "Software"), to deal in the Software without
14 # restriction, including without limitation the rights to use,
15 # copy, modify, merge, publish, distribute, sublicense, and/or sell
16 # copies of the Software, and to permit persons to whom the
17 # Software is furnished to do so, subject to the following
20 # The above copyright notice and this permission notice shall be
21 # included in all copies or substantial portions of the Software.
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
25 # OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
27 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
28 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
29 # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
30 # OTHER DEALINGS IN THE SOFTWARE.
32 # ======================================================================
35 # This test suite has been run successfully on the following systems:
37 # -----------------------------------------------------------
38 # System | Python version
39 # -----------------------------------------------------------
40 # Linux Ubuntu 2.6.20-15 | 2.4, 2.5, 2.6, 2.7
41 # Linux Kubuntu 8.04 32 & 64 bits | 2.5.2
42 # Windows XP prof SP3 | 2.4, 2.5, 2.6.1
43 # Windows Vista Ultimate 64 bit | 2.6, 2.7
44 # Windows Server 2008 64bit | 2.5.1
45 # Windows Mobile 6.1 | PythonCE 2.5
46 # OS X 10.4.10 | 2.4, 2.5, 2.7
47 # FreeBSD 7.0 | 2.4, 2.5, 2.7
48 # -----------------------------------------------------------
68 import cStringIO as StringIO
80 from pyftpdlib import ftpserver
83 # Attempt to use IP rather than hostname (test suite will run a lot faster)
85 HOST = socket.gethostbyname('localhost')
91 TESTFN = 'tmp-pyftpdlib'
93 def try_address(host, port=0, family=socket.AF_INET):
94 """Try to bind a socket on the given host:port and return True
95 if that has been possible."""
97 sock = socket.socket(family)
98 sock.bind((host, port))
99 except (socket.error, socket.gaierror):
105 def support_hybrid_ipv6():
106 """Return True if it is possible to use hybrid IPv6/IPv4 sockets
109 # IPPROTO_IPV6 constant is broken, see: http://bugs.python.org/issue6926
110 IPPROTO_IPV6 = getattr(socket, "IPV6_V6ONLY", 41)
111 IPV6_V6ONLY = getattr(socket, "IPV6_V6ONLY", 26)
112 sock = socket.socket(socket.AF_INET6)
115 return not sock.getsockopt(IPPROTO_IPV6, IPV6_V6ONLY)
122 SUPPORTS_IPV4 = try_address('127.0.0.1')
123 SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6)
124 SUPPORTS_HYBRID_IPV6 = SUPPORTS_IPV6 and support_hybrid_ipv6()
125 SUPPORTS_SENDFILE = sendfile is not None
127 def safe_remove(*files):
128 "Convenience function for removing temporary test files"
133 if err.errno != errno.ENOENT:
137 "Convenience function for removing temporary test directories"
141 if err.errno != errno.ENOENT:
145 """Create a file and return its name."""
146 assert not os.path.isfile(name), name
154 """Convenience function for removing temporary files and
155 directories on interpreter exit.
156 Also closes all sockets/instances left behind in asyncore
159 for name in os.listdir('.'):
160 if name.startswith(tempfile.template):
161 if os.path.isdir(name):
165 map = asyncore.socket_map
166 for x in map.values():
168 sys.stderr.write("garbage: %s\n" % repr(x))
175 # commented out as per bug http://bugs.python.org/issue10354
176 #tempfile.template = 'tmp-pyftpdlib'
177 atexit.register(onexit)
179 # lower this threshold so that the scheduler internal queue
180 # gets re-heapified more often
181 ftpserver._scheduler.cancellations_threshold = 5
184 class FTPd(threading.Thread):
185 """A threaded FTP server used for running tests.
187 This is basically a modified version of the FTPServer class which
188 wraps the polling loop into a thread.
190 The instance returned can be used to start(), stop() and
191 eventually re-start() the server.
193 handler = ftpserver.FTPHandler
195 def __init__(self, host=HOST, port=0, verbose=False):
196 threading.Thread.__init__(self)
197 self.__serving = False
198 self.__stopped = False
199 self.__lock = threading.Lock()
200 self.__flag = threading.Event()
203 ftpserver.log = ftpserver.logline = lambda x: x
205 # this makes the threaded server raise an actual exception
206 # instead of just logging its traceback
209 ftpserver.logerror = logerror
210 authorizer = ftpserver.DummyAuthorizer()
211 authorizer.add_user(USER, PASSWD, HOME, perm='elradfmwM') # full perms
212 authorizer.add_anonymous(HOME)
213 self.handler.authorizer = authorizer
214 self.server = ftpserver.FTPServer((host, port), self.handler)
215 self.host, self.port = self.server.socket.getsockname()[:2]
218 status = [self.__class__.__module__ + "." + self.__class__.__name__]
220 status.append('active')
222 status.append('inactive')
223 status.append('%s:%s' % self.server.socket.getsockname()[:2])
224 return '<%s at %#x>' % (' '.join(status), id(self))
228 return self.__serving
230 def start(self, timeout=0.001, use_poll=False):
231 """Start serving until an explicit stop() request.
232 Polls for shutdown every 'timeout' seconds.
235 raise RuntimeError("Server already started")
237 # ensure the server can be started again
238 FTPd.__init__(self, self.server.socket.getsockname(), self.handler)
239 self.__timeout = timeout
240 self.__use_poll = use_poll
241 threading.Thread.start(self)
245 self.__serving = True
247 while self.__serving and asyncore.socket_map:
248 self.__lock.acquire()
249 self.server.serve_forever(timeout=self.__timeout, count=1,
250 use_poll=self.__use_poll)
251 self.__lock.release()
252 self.server.close_all()
255 """Stop serving (also disconnecting all currently connected
256 clients) by telling the serve_forever() loop to stop and
259 if not self.__serving:
260 raise RuntimeError("Server not started yet")
261 self.__serving = False
262 self.__stopped = True
266 class TestAbstractedFS(unittest.TestCase):
267 """Test for conversion utility methods of AbstractedFS class."""
274 def test_ftpnorm(self):
275 # Tests for ftpnorm method.
276 ae = self.assertEquals
277 fs = ftpserver.AbstractedFS('/', None)
280 ae(fs.ftpnorm(''), '/')
281 ae(fs.ftpnorm('/'), '/')
282 ae(fs.ftpnorm('.'), '/')
283 ae(fs.ftpnorm('..'), '/')
284 ae(fs.ftpnorm('a'), '/a')
285 ae(fs.ftpnorm('/a'), '/a')
286 ae(fs.ftpnorm('/a/'), '/a')
287 ae(fs.ftpnorm('a/..'), '/')
288 ae(fs.ftpnorm('a/b'), '/a/b')
289 ae(fs.ftpnorm('a/b/..'), '/a')
290 ae(fs.ftpnorm('a/b/../..'), '/')
292 ae(fs.ftpnorm(''), '/sub')
293 ae(fs.ftpnorm('/'), '/')
294 ae(fs.ftpnorm('.'), '/sub')
295 ae(fs.ftpnorm('..'), '/')
296 ae(fs.ftpnorm('a'), '/sub/a')
297 ae(fs.ftpnorm('a/'), '/sub/a')
298 ae(fs.ftpnorm('a/..'), '/sub')
299 ae(fs.ftpnorm('a/b'), '/sub/a/b')
300 ae(fs.ftpnorm('a/b/'), '/sub/a/b')
301 ae(fs.ftpnorm('a/b/..'), '/sub/a')
302 ae(fs.ftpnorm('a/b/../..'), '/sub')
303 ae(fs.ftpnorm('a/b/../../..'), '/')
304 ae(fs.ftpnorm('//'), '/') # UNC paths must be collapsed
306 def test_ftp2fs(self):
307 # Tests for ftp2fs method.
308 ae = self.assertEquals
309 fs = ftpserver.AbstractedFS('/', None)
310 join = lambda x, y: os.path.join(x, y.replace('/', os.sep))
315 ae(fs.ftp2fs(''), root)
316 ae(fs.ftp2fs('/'), root)
317 ae(fs.ftp2fs('.'), root)
318 ae(fs.ftp2fs('..'), root)
319 ae(fs.ftp2fs('a'), join(root, 'a'))
320 ae(fs.ftp2fs('/a'), join(root, 'a'))
321 ae(fs.ftp2fs('/a/'), join(root, 'a'))
322 ae(fs.ftp2fs('a/..'), root)
323 ae(fs.ftp2fs('a/b'), join(root, r'a/b'))
324 ae(fs.ftp2fs('/a/b'), join(root, r'a/b'))
325 ae(fs.ftp2fs('/a/b/..'), join(root, 'a'))
326 ae(fs.ftp2fs('/a/b/../..'), root)
328 ae(fs.ftp2fs(''), join(root, 'sub'))
329 ae(fs.ftp2fs('/'), root)
330 ae(fs.ftp2fs('.'), join(root, 'sub'))
331 ae(fs.ftp2fs('..'), root)
332 ae(fs.ftp2fs('a'), join(root, 'sub/a'))
333 ae(fs.ftp2fs('a/'), join(root, 'sub/a'))
334 ae(fs.ftp2fs('a/..'), join(root, 'sub'))
335 ae(fs.ftp2fs('a/b'), join(root, 'sub/a/b'))
336 ae(fs.ftp2fs('a/b/..'), join(root, 'sub/a'))
337 ae(fs.ftp2fs('a/b/../..'), join(root, 'sub'))
338 ae(fs.ftp2fs('a/b/../../..'), root)
339 ae(fs.ftp2fs('//a'), join(root, 'a')) # UNC paths must be collapsed
344 # on DOS-derived filesystems (e.g. Windows) this is the same
345 # as specifying the current drive directory (e.g. 'C:\\')
348 goforit('/home/user')
351 # os.sep == ':'? Don't know... let's try it anyway
354 def test_fs2ftp(self):
355 # Tests for fs2ftp method.
356 ae = self.assertEquals
357 fs = ftpserver.AbstractedFS('/', None)
358 join = lambda x, y: os.path.join(x, y.replace('/', os.sep))
362 ae(fs.fs2ftp(root), '/')
363 ae(fs.fs2ftp(join(root, '/')), '/')
364 ae(fs.fs2ftp(join(root, '.')), '/')
365 ae(fs.fs2ftp(join(root, '..')), '/') # can't escape from root
366 ae(fs.fs2ftp(join(root, 'a')), '/a')
367 ae(fs.fs2ftp(join(root, 'a/')), '/a')
368 ae(fs.fs2ftp(join(root, 'a/..')), '/')
369 ae(fs.fs2ftp(join(root, 'a/b')), '/a/b')
370 ae(fs.fs2ftp(join(root, 'a/b')), '/a/b')
371 ae(fs.fs2ftp(join(root, 'a/b/..')), '/a')
372 ae(fs.fs2ftp(join(root, '/a/b/../..')), '/')
374 ae(fs.fs2ftp(join(root, 'a/')), '/a')
379 # on DOS-derived filesystems (e.g. Windows) this is the same
380 # as specifying the current drive directory (e.g. 'C:\\')
383 ae(fs.fs2ftp('C:\\'), '/')
384 ae(fs.fs2ftp('D:\\'), '/')
385 ae(fs.fs2ftp('D:\\dir'), '/')
388 if os.path.realpath('/__home/user') != '/__home/user':
389 self.fail('Test skipped (symlinks not allowed).')
390 goforit('/__home/user')
391 fs._root = '/__home/user'
392 ae(fs.fs2ftp('/__home'), '/')
393 ae(fs.fs2ftp('/'), '/')
394 ae(fs.fs2ftp('/__home/userx'), '/')
396 # os.sep == ':'? Don't know... let's try it anyway
399 def test_validpath(self):
400 # Tests for validpath method.
401 fs = ftpserver.AbstractedFS('/', None)
403 self.assertTrue(fs.validpath(HOME))
404 self.assertTrue(fs.validpath(HOME + '/'))
405 self.assertFalse(fs.validpath(HOME + 'bar'))
407 if hasattr(os, 'symlink'):
409 def test_validpath_validlink(self):
410 # Test validpath by issuing a symlink pointing to a path
411 # inside the root directory.
412 fs = ftpserver.AbstractedFS('/', None)
414 TESTFN2 = TESTFN + '1'
417 os.symlink(TESTFN, TESTFN2)
418 self.assertTrue(fs.validpath(TESTFN))
420 safe_remove(TESTFN, TESTFN2)
422 def test_validpath_external_symlink(self):
423 # Test validpath by issuing a symlink pointing to a path
424 # outside the root directory.
425 fs = ftpserver.AbstractedFS('/', None)
427 # tempfile should create our file in /tmp directory
428 # which should be outside the user root. If it is
429 # not we just skip the test.
430 file = tempfile.NamedTemporaryFile()
432 if HOME == os.path.dirname(file.name):
434 os.symlink(file.name, TESTFN)
435 self.assertFalse(fs.validpath(TESTFN))
441 class TestDummyAuthorizer(unittest.TestCase):
442 """Tests for DummyAuthorizer class."""
444 # temporarily change warnings to exceptions for the purposes of testing
446 self.tempdir = tempfile.mkdtemp(dir=HOME)
447 self.subtempdir = tempfile.mkdtemp(dir=os.path.join(HOME, self.tempdir))
448 self.tempfile = touch(os.path.join(self.tempdir, TESTFN))
449 self.subtempfile = touch(os.path.join(self.subtempdir, TESTFN))
450 warnings.filterwarnings("error")
453 os.remove(self.tempfile)
454 os.remove(self.subtempfile)
455 os.rmdir(self.subtempdir)
456 os.rmdir(self.tempdir)
457 warnings.resetwarnings()
459 def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs):
461 callableObj(*args, **kwargs)
462 except excClass, why:
465 raise self.failureException("%s != %s" % (str(why), msg))
467 if hasattr(excClass,'__name__'): excName = excClass.__name__
468 else: excName = str(excClass)
469 raise self.failureException, "%s not raised" % excName
471 def test_common_methods(self):
472 auth = ftpserver.DummyAuthorizer()
474 auth.add_user(USER, PASSWD, HOME)
475 auth.add_anonymous(HOME)
477 self.assertTrue(auth.validate_authentication(USER, PASSWD))
478 self.assertFalse(auth.validate_authentication(USER, 'wrongpwd'))
480 auth.remove_user(USER)
481 auth.remove_user('anonymous')
482 # raise exc if user does not exists
483 self.assertRaises(KeyError, auth.remove_user, USER)
484 # raise exc if path does not exist
485 self.assertRaisesWithMsg(ValueError,
486 'no such directory: "%s"' % '?:\\',
487 auth.add_user, USER, PASSWD, '?:\\')
488 self.assertRaisesWithMsg(ValueError,
489 'no such directory: "%s"' % '?:\\',
490 auth.add_anonymous, '?:\\')
491 # raise exc if user already exists
492 auth.add_user(USER, PASSWD, HOME)
493 auth.add_anonymous(HOME)
494 self.assertRaisesWithMsg(ValueError,
495 'user "%s" already exists' % USER,
496 auth.add_user, USER, PASSWD, HOME)
497 self.assertRaisesWithMsg(ValueError,
498 'user "anonymous" already exists',
499 auth.add_anonymous, HOME)
500 auth.remove_user(USER)
501 auth.remove_user('anonymous')
502 # raise on wrong permission
503 self.assertRaisesWithMsg(ValueError,
504 'no such permission "?"',
505 auth.add_user, USER, PASSWD, HOME, perm='?')
506 self.assertRaisesWithMsg(ValueError,
507 'no such permission "?"',
508 auth.add_anonymous, HOME, perm='?')
509 # expect warning on write permissions assigned to anonymous user
511 self.assertRaisesWithMsg(RuntimeWarning,
512 "write permissions assigned to anonymous user.",
513 auth.add_anonymous, HOME, perm=x)
515 def test_override_perm_interface(self):
516 auth = ftpserver.DummyAuthorizer()
517 auth.add_user(USER, PASSWD, HOME, perm='elr')
518 # raise exc if user does not exists
519 self.assertRaises(KeyError, auth.override_perm, USER+'w', HOME, 'elr')
520 # raise exc if path does not exist or it's not a directory
521 self.assertRaisesWithMsg(ValueError,
522 'no such directory: "%s"' % '?:\\',
523 auth.override_perm, USER, '?:\\', 'elr')
524 self.assertRaisesWithMsg(ValueError,
525 'no such directory: "%s"' % self.tempfile,
526 auth.override_perm, USER, self.tempfile, 'elr')
527 # raise on wrong permission
528 self.assertRaisesWithMsg(ValueError,
529 'no such permission "?"', auth.override_perm,
530 USER, HOME, perm='?')
531 # expect warning on write permissions assigned to anonymous user
532 auth.add_anonymous(HOME)
534 self.assertRaisesWithMsg(RuntimeWarning,
535 "write permissions assigned to anonymous user.",
536 auth.override_perm, 'anonymous', HOME, p)
537 # raise on attempt to override home directory permissions
538 self.assertRaisesWithMsg(ValueError,
539 "can't override home directory permissions",
540 auth.override_perm, USER, HOME, perm='w')
541 # raise on attempt to override a path escaping home directory
542 if os.path.dirname(HOME) != HOME:
543 self.assertRaisesWithMsg(ValueError,
544 "path escapes user home directory",
545 auth.override_perm, USER,
546 os.path.dirname(HOME), perm='w')
547 # try to re-set an overridden permission
548 auth.override_perm(USER, self.tempdir, perm='w')
549 auth.override_perm(USER, self.tempdir, perm='wr')
551 def test_override_perm_recursive_paths(self):
552 auth = ftpserver.DummyAuthorizer()
553 auth.add_user(USER, PASSWD, HOME, perm='elr')
554 self.assertEqual(auth.has_perm(USER, 'w', self.tempdir), False)
555 auth.override_perm(USER, self.tempdir, perm='w', recursive=True)
556 self.assertEqual(auth.has_perm(USER, 'w', HOME), False)
557 self.assertEqual(auth.has_perm(USER, 'w', self.tempdir), True)
558 self.assertEqual(auth.has_perm(USER, 'w', self.tempfile), True)
559 self.assertEqual(auth.has_perm(USER, 'w', self.subtempdir), True)
560 self.assertEqual(auth.has_perm(USER, 'w', self.subtempfile), True)
562 self.assertEqual(auth.has_perm(USER, 'w', HOME + '@'), False)
563 self.assertEqual(auth.has_perm(USER, 'w', self.tempdir + '@'), False)
564 path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile))
565 self.assertEqual(auth.has_perm(USER, 'w', path), False)
566 # test case-sensitiveness
567 if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
568 self.assertEqual(auth.has_perm(USER, 'w', self.tempdir.upper()), True)
570 def test_override_perm_not_recursive_paths(self):
571 auth = ftpserver.DummyAuthorizer()
572 auth.add_user(USER, PASSWD, HOME, perm='elr')
573 self.assertEqual(auth.has_perm(USER, 'w', self.tempdir), False)
574 auth.override_perm(USER, self.tempdir, perm='w')
575 self.assertEqual(auth.has_perm(USER, 'w', HOME), False)
576 self.assertEqual(auth.has_perm(USER, 'w', self.tempdir), True)
577 self.assertEqual(auth.has_perm(USER, 'w', self.tempfile), True)
578 self.assertEqual(auth.has_perm(USER, 'w', self.subtempdir), False)
579 self.assertEqual(auth.has_perm(USER, 'w', self.subtempfile), False)
581 self.assertEqual(auth.has_perm(USER, 'w', HOME + '@'), False)
582 self.assertEqual(auth.has_perm(USER, 'w', self.tempdir + '@'), False)
583 path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile))
584 self.assertEqual(auth.has_perm(USER, 'w', path), False)
585 # test case-sensitiveness
586 if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
587 self.assertEqual(auth.has_perm(USER, 'w', self.tempdir.upper()), True)
590 class TestCallLater(unittest.TestCase):
591 """Tests for CallLater class."""
594 for task in ftpserver._scheduler._tasks:
595 if not task.cancelled:
597 del ftpserver._scheduler._tasks[:]
599 def scheduler(self, timeout=0.01, count=100):
600 while ftpserver._scheduler._tasks and count > 0:
601 ftpserver._scheduler()
605 def test_interface(self):
607 self.assertRaises(AssertionError, ftpserver.CallLater, -1, fun)
608 x = ftpserver.CallLater(3, fun)
609 self.assertRaises(AssertionError, x.delay, -1)
610 self.assertEqual(x.cancelled, False)
612 self.assertEqual(x.cancelled, True)
613 self.assertRaises(AssertionError, x.call)
614 self.assertRaises(AssertionError, x.reset)
615 self.assertRaises(AssertionError, x.delay, 2)
616 self.assertRaises(AssertionError, x.cancel)
618 def test_order(self):
620 fun = lambda x: l.append(x)
621 for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
622 ftpserver.CallLater(x, fun, x)
624 self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
626 def test_delay(self):
628 fun = lambda x: l.append(x)
629 ftpserver.CallLater(0.01, fun, 0.01).delay(0.07)
630 ftpserver.CallLater(0.02, fun, 0.02).delay(0.08)
631 ftpserver.CallLater(0.03, fun, 0.03)
632 ftpserver.CallLater(0.04, fun, 0.04)
633 ftpserver.CallLater(0.05, fun, 0.05)
634 ftpserver.CallLater(0.06, fun, 0.06).delay(0.001)
636 self.assertEqual(l, [0.06, 0.03, 0.04, 0.05, 0.01, 0.02])
638 # The test is reliable only on those systems where time.time()
639 # provides time with a better precision than 1 second.
640 if not str(time.time()).endswith('.0'):
641 def test_reset(self):
643 fun = lambda x: l.append(x)
644 ftpserver.CallLater(0.01, fun, 0.01)
645 ftpserver.CallLater(0.02, fun, 0.02)
646 ftpserver.CallLater(0.03, fun, 0.03)
647 x = ftpserver.CallLater(0.04, fun, 0.04)
648 ftpserver.CallLater(0.05, fun, 0.05)
652 self.assertEqual(l, [0.01, 0.02, 0.03, 0.05, 0.04])
654 def test_cancel(self):
656 fun = lambda x: l.append(x)
657 ftpserver.CallLater(0.01, fun, 0.01).cancel()
658 ftpserver.CallLater(0.02, fun, 0.02)
659 ftpserver.CallLater(0.03, fun, 0.03)
660 ftpserver.CallLater(0.04, fun, 0.04)
661 ftpserver.CallLater(0.05, fun, 0.05).cancel()
663 self.assertEqual(l, [0.02, 0.03, 0.04])
665 def test_errback(self):
667 ftpserver.CallLater(0.0, lambda: 1//0, _errback=lambda: l.append(True))
669 self.assertEqual(l, [True])
672 class TestCallEvery(unittest.TestCase):
673 """Tests for CallEvery class."""
676 for task in ftpserver._scheduler._tasks:
677 if not task.cancelled:
679 del ftpserver._scheduler._tasks[:]
681 def scheduler(self, timeout=0.0001):
683 ftpserver._scheduler()
686 def test_interface(self):
688 self.assertRaises(AssertionError, ftpserver.CallEvery, -1, fun)
689 x = ftpserver.CallEvery(3, fun)
690 self.assertRaises(AssertionError, x.delay, -1)
691 self.assertEqual(x.cancelled, False)
693 self.assertEqual(x.cancelled, True)
694 self.assertRaises(AssertionError, x.call)
695 self.assertRaises(AssertionError, x.reset)
696 self.assertRaises(AssertionError, x.delay, 2)
697 self.assertRaises(AssertionError, x.cancel)
699 def test_only_once(self):
700 # make sure that callback is called only once per-loop
702 fun = lambda: l1.append(None)
703 ftpserver.CallEvery(0, fun)
704 ftpserver._scheduler()
705 self.assertEqual(l1, [None])
707 def test_multi_0_timeout(self):
708 # make sure a 0 timeout callback is called as many times
709 # as the number of loops
711 fun = lambda: l.append(None)
712 ftpserver.CallEvery(0, fun)
714 self.assertEqual(len(l), 100)
716 # run it on systems where time.time() has a higher precision
717 if os.name == 'posix':
718 def test_low_and_high_timeouts(self):
719 # make sure a callback with a lower timeout is called more
720 # frequently than another with a greater timeout
722 fun = lambda: l1.append(None)
723 ftpserver.CallEvery(0.001, fun)
727 fun = lambda: l2.append(None)
728 ftpserver.CallEvery(0.01, fun)
731 self.assertTrue(len(l1) > len(l2))
733 def test_cancel(self):
734 # make sure a cancelled callback doesn't get called anymore
736 fun = lambda: l.append(None)
737 call = ftpserver.CallEvery(0.001, fun)
742 self.assertEqual(len_l, len(l))
744 def test_errback(self):
746 ftpserver.CallEvery(0.0, lambda: 1//0, _errback=lambda: l.append(True))
751 class TestFtpAuthentication(unittest.TestCase):
752 "test: USER, PASS, REIN."
754 client_class = ftplib.FTP
757 self.server = self.server_class()
758 self.server.handler._auth_failed_timeout = 0
760 self.client = self.client_class()
761 self.client.connect(self.server.host, self.server.port)
762 self.client.sock.settimeout(2)
763 self.file = open(TESTFN, 'w+b')
764 self.dummyfile = StringIO.StringIO()
767 self.server.handler._auth_failed_timeout = 5
770 if not self.file.closed:
772 if not self.dummyfile.closed:
773 self.dummyfile.close()
776 def test_auth_ok(self):
777 self.client.login(user=USER, passwd=PASSWD)
779 def test_anon_auth(self):
780 self.client.login(user='anonymous', passwd='anon@')
781 self.client.login(user='anonymous', passwd='')
782 self.assertRaises(ftplib.error_perm, self.client.login, 'AnoNymouS')
784 def test_auth_failed(self):
785 self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
786 self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', PASSWD)
787 self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', 'wrong')
789 def test_wrong_cmds_order(self):
790 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pass ' + PASSWD)
791 self.client.login(user=USER, passwd=PASSWD)
792 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pass ' + PASSWD)
794 def test_max_auth(self):
795 self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
796 self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
797 self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
798 # If authentication fails for 3 times ftpd disconnects the
799 # client. We can check if that happens by using self.client.sendcmd()
800 # on the 'dead' socket object. If socket object is really
801 # closed it should be raised a socket.error exception (Windows)
802 # or a EOFError exception (Linux).
803 self.assertRaises((socket.error, EOFError), self.client.sendcmd, '')
806 self.client.login(user=USER, passwd=PASSWD)
807 self.client.sendcmd('rein')
808 # user not authenticated, error response expected
809 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
810 # by logging-in again we should be able to execute a
811 # file-system command
812 self.client.login(user=USER, passwd=PASSWD)
813 self.client.sendcmd('pwd')
815 def test_rein_during_transfer(self):
816 # Test REIN while already authenticated and a transfer is
818 self.client.login(user=USER, passwd=PASSWD)
819 data = 'abcde12345' * 1000000
820 self.file.write(data)
823 conn = self.client.transfercmd('retr ' + TESTFN)
827 chunk = conn.recv(8192)
830 bytes_recv += len(chunk)
831 self.dummyfile.write(chunk)
832 if bytes_recv > 65536 and not rein_sent:
834 # flush account, error response expected
835 self.client.sendcmd('rein')
836 self.assertRaises(ftplib.error_perm, self.client.dir)
838 # a 226 response is expected once tranfer finishes
839 self.assertEqual(self.client.voidresp()[:3], '226')
840 # account is still flushed, error response is still expected
841 self.assertRaises(ftplib.error_perm, self.client.sendcmd,
843 # by logging-in again we should be able to execute a
845 self.client.login(user=USER, passwd=PASSWD)
846 self.client.sendcmd('pwd')
847 self.dummyfile.seek(0)
848 self.assertEqual(hash(data), hash (self.dummyfile.read()))
851 # Test USER while already authenticated and no transfer
853 self.client.login(user=USER, passwd=PASSWD)
854 self.client.sendcmd('user ' + USER) # authentication flushed
855 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
856 self.client.sendcmd('pass ' + PASSWD)
857 self.client.sendcmd('pwd')
859 def test_user_during_transfer(self):
860 # Test USER while already authenticated and a transfer is
862 self.client.login(user=USER, passwd=PASSWD)
863 data = 'abcde12345' * 1000000
864 self.file.write(data)
867 conn = self.client.transfercmd('retr ' + TESTFN)
871 chunk = conn.recv(8192)
874 bytes_recv += len(chunk)
875 self.dummyfile.write(chunk)
876 # stop transfer while it isn't finished yet
877 if bytes_recv > 65536 and not rein_sent:
879 # flush account, expect an error response
880 self.client.sendcmd('user ' + USER)
881 self.assertRaises(ftplib.error_perm, self.client.dir)
883 # a 226 response is expected once transfer finishes
884 self.assertEqual(self.client.voidresp()[:3], '226')
885 # account is still flushed, error response is still expected
886 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
887 # by logging-in again we should be able to execute a
889 self.client.sendcmd('pass ' + PASSWD)
890 self.client.sendcmd('pwd')
891 self.dummyfile.seek(0)
892 self.assertEqual(hash(data), hash (self.dummyfile.read()))
895 class TestFtpDummyCmds(unittest.TestCase):
896 "test: TYPE, STRU, MODE, NOOP, SYST, ALLO, HELP, SITE HELP"
898 client_class = ftplib.FTP
901 self.server = self.server_class()
903 self.client = self.client_class()
904 self.client.connect(self.server.host, self.server.port)
905 self.client.sock.settimeout(2)
906 self.client.login(USER, PASSWD)
913 self.client.sendcmd('type a')
914 self.client.sendcmd('type i')
915 self.client.sendcmd('type l7')
916 self.client.sendcmd('type l8')
917 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'type ?!?')
920 self.client.sendcmd('stru f')
921 self.client.sendcmd('stru F')
922 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stru p')
923 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stru r')
924 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stru ?!?')
927 self.client.sendcmd('mode s')
928 self.client.sendcmd('mode S')
929 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mode b')
930 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mode c')
931 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mode ?!?')
934 self.client.sendcmd('noop')
937 self.client.sendcmd('syst')
940 self.client.sendcmd('allo x')
943 self.client.sendcmd('quit')
946 self.client.sendcmd('help')
947 cmd = random.choice(ftpserver.proto_cmds.keys())
948 self.client.sendcmd('help %s' % cmd)
949 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'help ?!?')
952 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'site')
953 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'site ?!?')
954 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'site foo bar')
955 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'sitefoo bar')
957 def test_site_help(self):
958 self.client.sendcmd('site help')
959 self.client.sendcmd('site help help')
960 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'site help ?!?')
963 # Test error conditions only; resumed data transfers are
965 self.client.sendcmd('type i')
966 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest')
967 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest str')
968 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest -1')
969 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest 10.1')
970 # REST is not supposed to be allowed in ASCII mode
971 self.client.sendcmd('type a')
972 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest 10')
974 def test_opts_feat(self):
975 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst bad_fact')
976 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst type ;')
977 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts not_mlst')
978 # utility function which used for extracting the MLST "facts"
979 # string from the FEAT response
981 resp = self.client.sendcmd('feat')
982 return re.search(r'^\s*MLST\s+(\S+)$', resp, re.MULTILINE).group(1)
983 # we rely on "type", "perm", "size", and "modify" facts which
984 # are those available on all platforms
985 self.assertTrue('type*;perm*;size*;modify*;' in mlst())
986 self.assertEqual(self.client.sendcmd('opts mlst type;'), '200 MLST OPTS type;')
987 self.assertEqual(self.client.sendcmd('opts mLSt TypE;'), '200 MLST OPTS type;')
988 self.assertTrue('type*;perm;size;modify;' in mlst())
990 self.assertEqual(self.client.sendcmd('opts mlst'), '200 MLST OPTS ')
991 self.assertTrue(not '*' in mlst())
993 self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;'), '200 MLST OPTS ')
994 self.assertTrue(not '*' in mlst())
995 self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;type;'),
996 '200 MLST OPTS type;')
997 self.assertTrue('type*;perm;size;modify;' in mlst())
1000 class TestFtpCmdsSemantic(unittest.TestCase):
1002 client_class = ftplib.FTP
1003 arg_cmds = ['allo','appe','dele','eprt','mdtm','mode','mkd','opts','port',
1004 'rest','retr','rmd','rnfr','rnto','site','size','stor','stru',
1005 'type','user','xmkd','xrmd','site chmod']
1008 self.server = self.server_class()
1010 self.client = self.client_class()
1011 self.client.connect(self.server.host, self.server.port)
1012 self.client.sock.settimeout(2)
1013 self.client.login(USER, PASSWD)
1019 def test_arg_cmds(self):
1020 # Test commands requiring an argument.
1021 expected = "501 Syntax error: command needs an argument."
1022 for cmd in self.arg_cmds:
1023 self.client.putcmd(cmd)
1024 resp = self.client.getmultiline()
1025 self.assertEqual(resp, expected)
1027 def test_no_arg_cmds(self):
1028 # Test commands accepting no arguments.
1029 expected = "501 Syntax error: command does not accept arguments."
1030 for cmd in ('abor','cdup','feat','noop','pasv','pwd','quit','rein',
1031 'syst','xcup','xpwd'):
1032 self.client.putcmd(cmd + ' arg')
1033 resp = self.client.getmultiline()
1034 self.assertEqual(resp, expected)
1036 def test_auth_cmds(self):
1037 # Test those commands requiring client to be authenticated.
1038 expected = "530 Log in with USER and PASS first."
1039 self.client.sendcmd('rein')
1040 for cmd in self.server.handler.proto_cmds:
1042 if cmd in ('feat','help','noop','user','pass','stat','syst','quit',
1043 'site', 'site help', 'pbsz', 'auth', 'prot', 'ccc'):
1045 if cmd in self.arg_cmds:
1047 self.client.putcmd(cmd)
1048 resp = self.client.getmultiline()
1049 self.assertEqual(resp, expected)
1051 def test_no_auth_cmds(self):
1052 # Test those commands that do not require client to be authenticated.
1053 self.client.sendcmd('rein')
1054 for cmd in ('feat','help','noop','stat','syst','site help'):
1055 self.client.sendcmd(cmd)
1056 # STAT provided with an argument is equal to LIST hence not allowed
1057 # if not authenticated
1058 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat /')
1059 self.client.sendcmd('quit')
1062 class TestFtpFsOperations(unittest.TestCase):
1063 "test: PWD, CWD, CDUP, SIZE, RNFR, RNTO, DELE, MKD, RMD, MDTM, STAT"
1065 client_class = ftplib.FTP
1068 self.server = self.server_class()
1070 self.client = self.client_class()
1071 self.client.connect(self.server.host, self.server.port)
1072 self.client.sock.settimeout(2)
1073 self.client.login(USER, PASSWD)
1074 self.tempfile = os.path.basename(touch(TESTFN))
1075 self.tempdir = os.path.basename(tempfile.mkdtemp(dir=HOME))
1080 safe_remove(self.tempfile)
1081 if os.path.exists(self.tempdir):
1082 shutil.rmtree(self.tempdir)
1085 self.client.cwd(self.tempdir)
1086 self.assertEqual(self.client.pwd(), '/' + self.tempdir)
1087 self.assertRaises(ftplib.error_perm, self.client.cwd, 'subtempdir')
1088 # cwd provided with no arguments is supposed to move us to the
1090 self.client.sendcmd('cwd')
1091 self.assertEqual(self.client.pwd(), '/')
1094 self.assertEqual(self.client.pwd(), '/')
1095 self.client.cwd(self.tempdir)
1096 self.assertEqual(self.client.pwd(), '/' + self.tempdir)
1098 def test_cdup(self):
1099 subfolder = os.path.basename(tempfile.mkdtemp(dir=self.tempdir))
1100 self.assertEqual(self.client.pwd(), '/')
1101 self.client.cwd(self.tempdir)
1102 self.assertEqual(self.client.pwd(), '/%s' % self.tempdir)
1103 self.client.cwd(subfolder)
1104 self.assertEqual(self.client.pwd(), '/%s/%s' % (self.tempdir, subfolder))
1105 self.client.sendcmd('cdup')
1106 self.assertEqual(self.client.pwd(), '/%s' % self.tempdir)
1107 self.client.sendcmd('cdup')
1108 self.assertEqual(self.client.pwd(), '/')
1110 # make sure we can't escape from root directory
1111 self.client.sendcmd('cdup')
1112 self.assertEqual(self.client.pwd(), '/')
1115 tempdir = os.path.basename(tempfile.mktemp(dir=HOME))
1116 dirname = self.client.mkd(tempdir)
1117 # the 257 response is supposed to include the absolute dirname
1118 self.assertEqual(dirname, '/' + tempdir)
1119 # make sure we can't create directories which already exist
1120 # (probably not really necessary);
1121 # let's use a try/except statement to avoid leaving behind
1122 # orphaned temporary directory in the event of a test failure.
1124 self.client.mkd(tempdir)
1125 except ftplib.error_perm:
1126 os.rmdir(tempdir) # ok
1128 self.fail('ftplib.error_perm not raised.')
1131 self.client.rmd(self.tempdir)
1132 self.assertRaises(ftplib.error_perm, self.client.rmd, self.tempfile)
1133 # make sure we can't remove the root directory
1134 self.assertRaises(ftplib.error_perm, self.client.rmd, '/')
1136 def test_dele(self):
1137 self.client.delete(self.tempfile)
1138 self.assertRaises(ftplib.error_perm, self.client.delete, self.tempdir)
1140 def test_rnfr_rnto(self):
1142 tempname = os.path.basename(tempfile.mktemp(dir=HOME))
1143 self.client.rename(self.tempfile, tempname)
1144 self.client.rename(tempname, self.tempfile)
1146 tempname = os.path.basename(tempfile.mktemp(dir=HOME))
1147 self.client.rename(self.tempdir, tempname)
1148 self.client.rename(tempname, self.tempdir)
1149 # rnfr/rnto over non-existing paths
1150 bogus = os.path.basename(tempfile.mktemp(dir=HOME))
1151 self.assertRaises(ftplib.error_perm, self.client.rename, bogus, '/x')
1152 self.assertRaises(ftplib.error_perm, self.client.rename, self.tempfile, '/')
1153 # rnto sent without first specifying the source
1154 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rnto ' + self.tempfile)
1156 # make sure we can't rename root directory
1157 self.assertRaises(ftplib.error_perm, self.client.rename, '/', '/x')
1159 def test_mdtm(self):
1160 self.client.sendcmd('mdtm ' + self.tempfile)
1161 bogus = os.path.basename(tempfile.mktemp(dir=HOME))
1162 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mdtm ' + bogus)
1163 # make sure we can't use mdtm against directories
1165 self.client.sendcmd('mdtm ' + self.tempdir)
1166 except ftplib.error_perm, err:
1167 self.assertTrue("not retrievable" in str(err))
1169 self.fail('Exception not raised')
1171 def test_unforeseen_mdtm_event(self):
1172 # Emulate a case where the file last modification time is prior
1173 # to year 1900. This most likely will never happen unless
1174 # someone specifically force the last modification time of a
1176 # To do so we temporarily override os.path.getmtime so that it
1177 # returns a negative value referring to a year prior to 1900.
1178 # It causes time.localtime/gmtime to raise a ValueError exception
1179 # which is supposed to be handled by server.
1180 _getmtime = ftpserver.AbstractedFS.getmtime
1182 ftpserver.AbstractedFS.getmtime = lambda x, y: -9000000000
1183 self.assertRaises(ftplib.error_perm, self.client.sendcmd,
1184 'mdtm ' + self.tempfile)
1185 # make sure client hasn't been disconnected
1186 self.client.sendcmd('noop')
1188 ftpserver.AbstractedFS.getmtime = _getmtime
1190 def test_size(self):
1191 self.client.sendcmd('type a')
1192 self.assertRaises(ftplib.error_perm, self.client.size, self.tempfile)
1193 self.client.sendcmd('type i')
1194 self.client.size(self.tempfile)
1195 # make sure we can't use size against directories
1197 self.client.sendcmd('size ' + self.tempdir)
1198 except ftplib.error_perm, err:
1199 self.assertTrue("not retrievable" in str(err))
1201 self.fail('Exception not raised')
1203 if not hasattr(os, 'chmod'):
1204 def test_site_chmod(self):
1205 self.assertRaises(ftplib.error_perm, self.client.sendcmd,
1206 'site chmod 777 ' + self.tempfile)
1208 def test_site_chmod(self):
1210 self.assertRaises(ftplib.error_perm,
1211 self.client.sendcmd, 'site chmod 777')
1213 self.assertRaises(ftplib.error_perm, self.client.sendcmd,
1214 'site chmod -177 ' + self.tempfile)
1215 self.assertRaises(ftplib.error_perm, self.client.sendcmd,
1216 'site chmod 778 ' + self.tempfile)
1217 self.assertRaises(ftplib.error_perm, self.client.sendcmd,
1218 'site chmod foo ' + self.tempfile)
1220 # on Windows it is possible to set read-only flag only
1222 self.client.sendcmd('site chmod 777 ' + self.tempfile)
1223 mode = oct(stat.S_IMODE(os.stat(self.tempfile).st_mode))
1224 self.assertEqual(mode, '0666')
1225 self.client.sendcmd('site chmod 444 ' + self.tempfile)
1226 mode = oct(stat.S_IMODE(os.stat(self.tempfile).st_mode))
1227 self.assertEqual(mode, '0444')
1228 self.client.sendcmd('site chmod 666 ' + self.tempfile)
1229 mode = oct(stat.S_IMODE(os.stat(self.tempfile).st_mode))
1230 self.assertEqual(mode, '0666')
1232 self.client.sendcmd('site chmod 777 ' + self.tempfile)
1233 mode = oct(stat.S_IMODE(os.stat(self.tempfile).st_mode))
1234 self.assertEqual(mode, '0777')
1235 self.client.sendcmd('site chmod 755 ' + self.tempfile)
1236 mode = oct(stat.S_IMODE(os.stat(self.tempfile).st_mode))
1237 self.assertEqual(mode, '0755')
1238 self.client.sendcmd('site chmod 555 ' + self.tempfile)
1239 mode = oct(stat.S_IMODE(os.stat(self.tempfile).st_mode))
1240 self.assertEqual(mode, '0555')
1243 class TestFtpStoreData(unittest.TestCase):
1244 """Test STOR, STOU, APPE, REST, TYPE."""
1246 client_class = ftplib.FTP
1249 self.server = self.server_class()
1251 self.client = self.client_class()
1252 self.client.connect(self.server.host, self.server.port)
1253 self.client.sock.settimeout(2)
1254 self.client.login(USER, PASSWD)
1255 self.dummy_recvfile = StringIO.StringIO()
1256 self.dummy_sendfile = StringIO.StringIO()
1261 self.dummy_recvfile.close()
1262 self.dummy_sendfile.close()
1265 def test_stor(self):
1267 data = 'abcde12345' * 100000
1268 self.dummy_sendfile.write(data)
1269 self.dummy_sendfile.seek(0)
1270 self.client.storbinary('stor ' + TESTFN, self.dummy_sendfile)
1271 self.client.retrbinary('retr ' + TESTFN, self.dummy_recvfile.write)
1272 self.dummy_recvfile.seek(0)
1273 self.assertEqual(hash(data), hash (self.dummy_recvfile.read()))
1275 # We do not use os.remove() because file could still be
1276 # locked by ftpd thread. If DELE through FTP fails try
1277 # os.remove() as last resort.
1278 if os.path.exists(TESTFN):
1280 self.client.delete(TESTFN)
1281 except (ftplib.Error, EOFError, socket.error):
1284 def test_stor_active(self):
1285 # Like test_stor but using PORT
1286 self.client.set_pasv(False)
1289 def test_stor_ascii(self):
1290 # Test STOR in ASCII mode
1292 def store(cmd, fp, blocksize=8192):
1293 # like storbinary() except it sends "type a" instead of
1294 # "type i" before starting the transfer
1295 self.client.voidcmd('type a')
1296 conn = self.client.transfercmd(cmd)
1298 buf = fp.read(blocksize)
1303 return self.client.voidresp()
1306 data = 'abcde12345\r\n' * 100000
1307 self.dummy_sendfile.write(data)
1308 self.dummy_sendfile.seek(0)
1309 store('stor ' + TESTFN, self.dummy_sendfile)
1310 self.client.retrbinary('retr ' + TESTFN, self.dummy_recvfile.write)
1311 expected = data.replace('\r\n', os.linesep)
1312 self.dummy_recvfile.seek(0)
1313 self.assertEqual(hash(expected), hash(self.dummy_recvfile.read()))
1315 # We do not use os.remove() because file could still be
1316 # locked by ftpd thread. If DELE through FTP fails try
1317 # os.remove() as last resort.
1318 if os.path.exists(TESTFN):
1320 self.client.delete(TESTFN)
1321 except (ftplib.Error, EOFError, socket.error):
1324 def test_stor_ascii_2(self):
1325 # Test that no extra extra carriage returns are added to the
1326 # file in ASCII mode in case CRLF gets truncated in two chunks
1329 def store(cmd, fp, blocksize=8192):
1330 # like storbinary() except it sends "type a" instead of
1331 # "type i" before starting the transfer
1332 self.client.voidcmd('type a')
1333 conn = self.client.transfercmd(cmd)
1335 buf = fp.read(blocksize)
1340 return self.client.voidresp()
1342 old_buffer = ftpserver.DTPHandler.ac_in_buffer_size
1344 # set a small buffer so that CRLF gets delivered in two
1345 # separate chunks: "CRLF", " f", "oo", " CR", "LF", " b", "ar"
1346 ftpserver.DTPHandler.ac_in_buffer_size = 2
1347 data = '\r\n foo \r\n bar'
1348 self.dummy_sendfile.write(data)
1349 self.dummy_sendfile.seek(0)
1350 store('stor ' + TESTFN, self.dummy_sendfile)
1352 expected = data.replace('\r\n', os.linesep)
1353 self.client.retrbinary('retr ' + TESTFN, self.dummy_recvfile.write)
1354 self.dummy_recvfile.seek(0)
1355 self.assertEqual(expected, self.dummy_recvfile.read())
1357 ftpserver.DTPHandler.ac_in_buffer_size = old_buffer
1358 # We do not use os.remove() because file could still be
1359 # locked by ftpd thread. If DELE through FTP fails try
1360 # os.remove() as last resort.
1361 if os.path.exists(TESTFN):
1363 self.client.delete(TESTFN)
1364 except (ftplib.Error, EOFError, socket.error):
1367 def test_stou(self):
1368 data = 'abcde12345' * 100000
1369 self.dummy_sendfile.write(data)
1370 self.dummy_sendfile.seek(0)
1372 self.client.voidcmd('TYPE I')
1373 # filename comes in as "1xx FILE: <filename>"
1374 filename = self.client.sendcmd('stou').split('FILE: ')[1]
1376 sock = self.client.makeport()
1377 conn, sockaddr = sock.accept()
1378 if hasattr(self.client_class, 'ssl_version'):
1379 conn = ssl.wrap_socket(conn)
1381 buf = self.dummy_sendfile.read(8192)
1387 # transfer finished, a 226 response is expected
1388 self.assertEqual('226', self.client.voidresp()[:3])
1389 self.client.retrbinary('retr ' + filename, self.dummy_recvfile.write)
1390 self.dummy_recvfile.seek(0)
1391 self.assertEqual(hash(data), hash (self.dummy_recvfile.read()))
1393 # We do not use os.remove() because file could still be
1394 # locked by ftpd thread. If DELE through FTP fails try
1395 # os.remove() as last resort.
1396 if os.path.exists(filename):
1398 self.client.delete(filename)
1399 except (ftplib.Error, EOFError, socket.error):
1400 safe_remove(filename)
1402 def test_stou_rest(self):
1403 # Watch for STOU preceded by REST, which makes no sense.
1404 self.client.sendcmd('type i')
1405 self.client.sendcmd('rest 10')
1406 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'stou')
1408 def test_stou_orphaned_file(self):
1409 # Check that no orphaned file gets left behind when STOU fails.
1410 # Even if STOU fails the file is first created and then erased.
1411 # Since we can't know the name of the file the best way that
1412 # we have to test this case is comparing the content of the
1413 # directory before and after STOU has been issued.
1414 # Assuming that TESTFN is supposed to be a "reserved" file
1415 # name we shouldn't get false positives.
1417 # login as a limited user to let STOU fail
1418 self.client.login('anonymous', '@nopasswd')
1419 before = os.listdir(HOME)
1420 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stou ' + TESTFN)
1421 after = os.listdir(HOME)
1424 self.assert_(not file.startswith(TESTFN))
1426 def test_appe(self):
1428 data1 = 'abcde12345' * 100000
1429 self.dummy_sendfile.write(data1)
1430 self.dummy_sendfile.seek(0)
1431 self.client.storbinary('stor ' + TESTFN, self.dummy_sendfile)
1433 data2 = 'fghil67890' * 100000
1434 self.dummy_sendfile.write(data2)
1435 self.dummy_sendfile.seek(len(data1))
1436 self.client.storbinary('appe ' + TESTFN, self.dummy_sendfile)
1438 self.client.retrbinary("retr " + TESTFN, self.dummy_recvfile.write)
1439 self.dummy_recvfile.seek(0)
1440 self.assertEqual(hash(data1 + data2), hash (self.dummy_recvfile.read()))
1442 # We do not use os.remove() because file could still be
1443 # locked by ftpd thread. If DELE through FTP fails try
1444 # os.remove() as last resort.
1445 if os.path.exists(TESTFN):
1447 self.client.delete(TESTFN)
1448 except (ftplib.Error, EOFError, socket.error):
1451 def test_appe_rest(self):
1452 # Watch for APPE preceded by REST, which makes no sense.
1453 self.client.sendcmd('type i')
1454 self.client.sendcmd('rest 10')
1455 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'appe x')
1457 def test_rest_on_stor(self):
1458 # Test STOR preceded by REST.
1459 data = 'abcde12345' * 100000
1460 self.dummy_sendfile.write(data)
1461 self.dummy_sendfile.seek(0)
1463 self.client.voidcmd('TYPE I')
1464 conn = self.client.transfercmd('stor ' + TESTFN)
1467 chunk = self.dummy_sendfile.read(8192)
1469 bytes_sent += len(chunk)
1470 # stop transfer while it isn't finished yet
1471 if bytes_sent >= 524288 or not chunk:
1475 # transfer wasn't finished yet but server can't know this,
1476 # hence expect a 226 response
1477 self.assertEqual('226', self.client.voidresp()[:3])
1479 # resuming transfer by using a marker value greater than the
1480 # file size stored on the server should result in an error
1482 file_size = self.client.size(TESTFN)
1483 self.assertEqual(file_size, bytes_sent)
1484 self.client.sendcmd('rest %s' % ((file_size + 1)))
1485 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stor ' + TESTFN)
1487 self.client.sendcmd('rest %s' % bytes_sent)
1488 self.client.storbinary('stor ' + TESTFN, self.dummy_sendfile)
1490 self.client.retrbinary('retr ' + TESTFN, self.dummy_recvfile.write)
1491 self.dummy_sendfile.seek(0)
1492 self.dummy_recvfile.seek(0)
1493 self.assertEqual(hash(self.dummy_sendfile.read()),
1494 hash(self.dummy_recvfile.read())
1496 self.client.delete(TESTFN)
1498 def test_failing_rest_on_stor(self):
1499 # Test REST -> STOR against a non existing file.
1500 if os.path.exists(TESTFN):
1501 self.client.delete(TESTFN)
1502 self.client.sendcmd('type i')
1503 self.client.sendcmd('rest 10')
1504 self.assertRaises(ftplib.error_perm, self.client.storbinary,
1505 'stor ' + TESTFN, lambda x: x)
1506 # if the first STOR failed because of REST, the REST marker
1507 # is supposed to be resetted to 0
1508 self.dummy_sendfile.write('x' * 4096)
1509 self.dummy_sendfile.seek(0)
1510 self.client.storbinary('stor ' + TESTFN, self.dummy_sendfile)
1512 def test_quit_during_transfer(self):
1513 # RFC-959 states that if QUIT is sent while a transfer is in
1514 # progress, the connection must remain open for result response
1515 # and the server will then close it.
1516 conn = self.client.transfercmd('stor ' + TESTFN)
1517 conn.sendall('abcde12345' * 50000)
1518 self.client.sendcmd('quit')
1519 conn.sendall('abcde12345' * 50000)
1521 # expect the response (transfer ok)
1522 self.assertEqual('226', self.client.voidresp()[:3])
1523 # Make sure client has been disconnected.
1524 # socket.error (Windows) or EOFError (Linux) exception is supposed
1525 # to be raised in such a case.
1526 self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
1528 def test_stor_empty_file(self):
1529 self.client.storbinary('stor ' + TESTFN, self.dummy_sendfile)
1532 self.assertEqual(f.read(), "")
1536 if SUPPORTS_SENDFILE:
1537 class TestFtpStoreDataNoSendfile(TestFtpStoreData):
1538 """Test STOR, STOU, APPE, REST, TYPE not using sendfile()."""
1541 TestFtpStoreData.setUp(self)
1542 self.server.handler.use_sendfile = False
1545 TestFtpStoreData.tearDown(self)
1546 self.server.handler.use_sendfile = True
1549 class TestFtpRetrieveData(unittest.TestCase):
1550 "Test RETR, REST, TYPE"
1552 client_class = ftplib.FTP
1555 self.server = self.server_class()
1557 self.client = self.client_class()
1558 self.client.connect(self.server.host, self.server.port)
1559 self.client.sock.settimeout(2)
1560 self.client.login(USER, PASSWD)
1561 self.file = open(TESTFN, 'w+b')
1562 self.dummyfile = StringIO.StringIO()
1567 if not self.file.closed:
1569 if not self.dummyfile.closed:
1570 self.dummyfile.close()
1573 def test_retr(self):
1574 data = 'abcde12345' * 100000
1575 self.file.write(data)
1577 self.client.retrbinary("retr " + TESTFN, self.dummyfile.write)
1578 self.dummyfile.seek(0)
1579 self.assertEqual(hash(data), hash(self.dummyfile.read()))
1581 # attempt to retrieve a file which doesn't exist
1582 bogus = os.path.basename(tempfile.mktemp(dir=HOME))
1583 self.assertRaises(ftplib.error_perm, self.client.retrbinary,
1584 "retr " + bogus, lambda x: x)
1586 def test_retr_ascii(self):
1587 # Test RETR in ASCII mode.
1589 def retrieve(cmd, callback, blocksize=8192, rest=None):
1590 # like retrbinary but uses TYPE A instead
1591 self.client.voidcmd('type a')
1592 conn = self.client.transfercmd(cmd, rest)
1594 data = conn.recv(blocksize)
1599 return self.client.voidresp()
1601 data = ('abcde12345' + os.linesep) * 100000
1602 self.file.write(data)
1604 retrieve("retr " + TESTFN, self.dummyfile.write)
1605 expected = data.replace(os.linesep, '\r\n')
1606 self.dummyfile.seek(0)
1607 self.assertEqual(hash(expected), hash(self.dummyfile.read()))
1609 def test_restore_on_retr(self):
1610 data = 'abcde12345' * 1000000
1611 self.file.write(data)
1615 self.client.voidcmd('TYPE I')
1616 conn = self.client.transfercmd('retr ' + TESTFN)
1618 chunk = conn.recv(8192)
1621 self.dummyfile.write(chunk)
1622 received_bytes += len(chunk)
1623 if received_bytes >= len(data) // 2:
1627 # transfer wasn't finished yet so we expect a 426 response
1628 self.assertEqual(self.client.getline()[:3], "426")
1630 # resuming transfer by using a marker value greater than the
1631 # file size stored on the server should result in an error
1632 # on retr (RFC-1123)
1633 file_size = self.client.size(TESTFN)
1634 self.client.sendcmd('rest %s' % ((file_size + 1)))
1635 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'retr ' + TESTFN)
1638 self.client.sendcmd('rest %s' % received_bytes)
1639 self.client.retrbinary("retr " + TESTFN, self.dummyfile.write)
1640 self.dummyfile.seek(0)
1641 self.assertEqual(hash(data), hash (self.dummyfile.read()))
1643 def test_retr_empty_file(self):
1644 self.client.retrbinary("retr " + TESTFN, self.dummyfile.write)
1645 self.dummyfile.seek(0)
1646 self.assertEqual(self.dummyfile.read(), "")
1649 if SUPPORTS_SENDFILE:
1650 class TestFtpRetrieveDataNoSendfile(TestFtpRetrieveData):
1651 """Test RETR, REST, TYPE by not using sendfile()."""
1654 TestFtpRetrieveData.setUp(self)
1655 self.server.handler.use_sendfile = False
1658 TestFtpRetrieveData.tearDown(self)
1659 self.server.handler.use_sendfile = True
1662 class TestFtpListingCmds(unittest.TestCase):
1663 """Test LIST, NLST, argumented STAT."""
1665 client_class = ftplib.FTP
1668 self.server = self.server_class()
1670 self.client = self.client_class()
1671 self.client.connect(self.server.host, self.server.port)
1672 self.client.sock.settimeout(2)
1673 self.client.login(USER, PASSWD)
1681 def _test_listing_cmds(self, cmd):
1682 """Tests common to LIST NLST and MLSD commands."""
1683 # assume that no argument has the same meaning of "/"
1685 self.client.retrlines(cmd, l1.append)
1686 self.client.retrlines(cmd + ' /', l2.append)
1687 self.assertEqual(l1, l2)
1688 if cmd.lower() != 'mlsd':
1689 # if pathname is a file one line is expected
1691 self.client.retrlines('%s ' % cmd + TESTFN, x.append)
1692 self.assertEqual(len(x), 1)
1693 self.assertTrue(''.join(x).endswith(TESTFN))
1694 # non-existent path, 550 response is expected
1695 bogus = os.path.basename(tempfile.mktemp(dir=HOME))
1696 self.assertRaises(ftplib.error_perm, self.client.retrlines,
1697 '%s ' %cmd + bogus, lambda x: x)
1698 # for an empty directory we excpect that the data channel is
1699 # opened anyway and that no data is received
1701 tempdir = os.path.basename(tempfile.mkdtemp(dir=HOME))
1703 self.client.retrlines('%s %s' % (cmd, tempdir), x.append)
1704 self.assertEqual(x, [])
1711 def test_nlst(self):
1713 self._test_listing_cmds('nlst')
1715 def test_list(self):
1717 self._test_listing_cmds('list')
1718 # known incorrect pathname arguments (e.g. old clients) are
1719 # expected to be treated as if pathname would be == '/'
1720 l1 = l2 = l3 = l4 = l5 = []
1721 self.client.retrlines('list /', l1.append)
1722 self.client.retrlines('list -a', l2.append)
1723 self.client.retrlines('list -l', l3.append)
1724 self.client.retrlines('list -al', l4.append)
1725 self.client.retrlines('list -la', l5.append)
1726 tot = (l1, l2, l3, l4, l5)
1727 for x in range(len(tot) - 1):
1728 self.assertEqual(tot[x], tot[x+1])
1730 def test_mlst(self):
1731 # utility function for extracting the line of interest
1732 mlstline = lambda cmd: self.client.voidcmd(cmd).split('\n')[1]
1734 # the fact set must be preceded by a space
1735 self.assertTrue(mlstline('mlst').startswith(' '))
1736 # where TVFS is supported, a fully qualified pathname is expected
1737 self.assertTrue(mlstline('mlst ' + TESTFN).endswith('/' + TESTFN))
1738 self.assertTrue(mlstline('mlst').endswith('/'))
1739 # assume that no argument has the same meaning of "/"
1740 self.assertEqual(mlstline('mlst'), mlstline('mlst /'))
1742 bogus = os.path.basename(tempfile.mktemp(dir=HOME))
1743 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mlst '+bogus)
1744 # test file/dir notations
1745 self.assertTrue('type=dir' in mlstline('mlst'))
1746 self.assertTrue('type=file' in mlstline('mlst ' + TESTFN))
1747 # let's add some tests for OPTS command
1748 self.client.sendcmd('opts mlst type;')
1749 self.assertEqual(mlstline('mlst'), ' type=dir; /')
1750 # where no facts are present, two leading spaces before the
1751 # pathname are required (RFC-3659)
1752 self.client.sendcmd('opts mlst')
1753 self.assertEqual(mlstline('mlst'), ' /')
1755 def test_mlsd(self):
1757 self._test_listing_cmds('mlsd')
1758 dir = os.path.basename(tempfile.mkdtemp(dir=HOME))
1761 self.client.retrlines('mlsd ' + TESTFN, lambda x: x)
1762 except ftplib.error_perm, resp:
1763 # if path is a file a 501 response code is expected
1764 self.assertEqual(str(resp)[0:3], "501")
1766 self.fail("Exception not raised")
1770 def test_mlsd_all_facts(self):
1771 feat = self.client.sendcmd('feat')
1773 facts = re.search(r'^\s*MLST\s+(\S+)$', feat, re.MULTILINE).group(1)
1774 facts = facts.replace("*;", ";")
1775 self.client.sendcmd('opts mlst ' + facts)
1776 resp = self.client.sendcmd('mlst')
1778 local = facts[:-1].split(";")
1779 returned = resp.split("\n")[1].strip()[:-3]
1780 returned = [x.split("=")[0] for x in returned.split(";")]
1781 self.assertEqual(sorted(local), sorted(returned))
1783 self.assertTrue("type" in resp)
1784 self.assertTrue("size" in resp)
1785 self.assertTrue("perm" in resp)
1786 self.assertTrue("modify" in resp)
1787 if os.name == 'posix':
1788 self.assertTrue("unique" in resp)
1789 self.assertTrue("unix.mode" in resp)
1790 self.assertTrue("unix.uid" in resp)
1791 self.assertTrue("unix.gid" in resp)
1792 elif os.name == 'nt':
1793 self.assertTrue("create" in resp)
1795 def test_stat(self):
1796 # Test STAT provided with argument which is equal to LIST
1797 self.client.sendcmd('stat /')
1798 self.client.sendcmd('stat ' + TESTFN)
1799 self.client.putcmd('stat *')
1800 resp = self.client.getmultiline()
1801 self.assertEqual(resp, '550 Globbing not supported.')
1802 bogus = os.path.basename(tempfile.mktemp(dir=HOME))
1803 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat ' + bogus)
1805 def test_unforeseen_time_event(self):
1806 # Emulate a case where the file last modification time is prior
1807 # to year 1900. This most likely will never happen unless
1808 # someone specifically force the last modification time of a
1810 # To do so we temporarily override os.path.getmtime so that it
1811 # returns a negative value referring to a year prior to 1900.
1812 # It causes time.localtime/gmtime to raise a ValueError exception
1813 # which is supposed to be handled by server.
1814 _getmtime = ftpserver.AbstractedFS.getmtime
1816 ftpserver.AbstractedFS.getmtime = lambda x, y: -9000000000
1817 self.client.sendcmd('stat /') # test AbstractedFS.format_list()
1818 self.client.sendcmd('mlst /') # test AbstractedFS.format_mlsx()
1819 # make sure client hasn't been disconnected
1820 self.client.sendcmd('noop')
1822 ftpserver.AbstractedFS.getmtime = _getmtime
1825 class TestFtpAbort(unittest.TestCase):
1828 client_class = ftplib.FTP
1831 self.server = self.server_class()
1833 self.client = self.client_class()
1834 self.client.connect(self.server.host, self.server.port)
1835 self.client.sock.settimeout(2)
1836 self.client.login(USER, PASSWD)
1842 def test_abor_no_data(self):
1843 # Case 1: ABOR while no data channel is opened: respond with 225.
1844 resp = self.client.sendcmd('ABOR')
1845 self.assertEqual('225 No transfer to abort.', resp)
1846 self.client.retrlines('list', [].append)
1848 def test_abor_pasv(self):
1849 # Case 2: user sends a PASV, a data-channel socket is listening
1850 # but not connected, and ABOR is sent: close listening data
1851 # socket, respond with 225.
1852 self.client.makepasv()
1853 respcode = self.client.sendcmd('ABOR')[:3]
1854 self.assertEqual('225', respcode)
1855 self.client.retrlines('list', [].append)
1857 def test_abor_port(self):
1858 # Case 3: data channel opened with PASV or PORT, but ABOR sent
1859 # before a data transfer has been started: close data channel,
1861 self.client.set_pasv(0)
1862 sock = self.client.makeport()
1863 respcode = self.client.sendcmd('ABOR')[:3]
1865 self.assertEqual('225', respcode)
1866 self.client.retrlines('list', [].append)
1868 def test_abor_during_transfer(self):
1869 # Case 4: ABOR while a data transfer on DTP channel is in
1870 # progress: close data channel, respond with 426, respond
1872 data = 'abcde12345' * 1000000
1873 f = open(TESTFN, 'w+b')
1877 self.client.voidcmd('TYPE I')
1878 conn = self.client.transfercmd('retr ' + TESTFN)
1880 while bytes_recv < 65536:
1881 chunk = conn.recv(8192)
1882 bytes_recv += len(chunk)
1884 # stop transfer while it isn't finished yet
1885 self.client.putcmd('ABOR')
1887 # transfer isn't finished yet so ftpd should respond with 426
1888 self.assertEqual(self.client.getline()[:3], "426")
1890 # transfer successfully aborted, so should now respond with a 226
1891 self.assertEqual('226', self.client.voidresp()[:3])
1893 # We do not use os.remove() because file could still be
1894 # locked by ftpd thread. If DELE through FTP fails try
1895 # os.remove() as last resort.
1897 self.client.delete(TESTFN)
1898 except (ftplib.Error, EOFError, socket.error):
1901 if hasattr(socket, 'MSG_OOB'):
1902 def test_oob_abor(self):
1903 # Send ABOR by following the RFC-959 directives of sending
1904 # Telnet IP/Synch sequence as OOB data.
1905 # On some systems like FreeBSD this happened to be a problem
1906 # due to a different SO_OOBINLINE behavior.
1907 # On some platforms (e.g. Python CE) the test may fail
1908 # although the MSG_OOB constant is defined.
1909 self.client.sock.sendall(chr(244), socket.MSG_OOB)
1910 self.client.sock.sendall(chr(255), socket.MSG_OOB)
1911 self.client.sock.sendall('abor\r\n')
1912 self.client.sock.settimeout(1)
1913 self.assertEqual(self.client.getresp()[:3], '225')
1916 class TestTimeouts(unittest.TestCase):
1917 """Test idle-timeout capabilities of control and data channels.
1918 Some tests may fail on slow machines.
1921 client_class = ftplib.FTP
1927 def _setUp(self, idle_timeout=300, data_timeout=300, pasv_timeout=30,
1929 self.server = self.server_class()
1930 self.server.handler.timeout = idle_timeout
1931 self.server.handler.dtp_handler.timeout = data_timeout
1932 self.server.handler.passive_dtp.timeout = pasv_timeout
1933 self.server.handler.active_dtp.timeout = port_timeout
1935 self.client = self.client_class()
1936 self.client.connect(self.server.host, self.server.port)
1937 self.client.sock.settimeout(2)
1938 self.client.login(USER, PASSWD)
1941 if self.client is not None and self.server is not None:
1943 self.server.handler.timeout = 300
1944 self.server.handler.dtp_handler.timeout = 300
1945 self.server.handler.passive_dtp.timeout = 30
1946 self.server.handler.active_dtp.timeout = 30
1949 def test_idle_timeout(self):
1950 # Test control channel timeout. The client which does not send
1951 # any command within the time specified in FTPHandler.timeout is
1952 # supposed to be kicked off.
1953 self._setUp(idle_timeout=0.1)
1954 # fail if no msg is received within 1 second
1955 self.client.sock.settimeout(1)
1956 data = self.client.sock.recv(1024)
1957 self.assertEqual(data, "421 Control connection timed out.\r\n")
1958 # ensure client has been kicked off
1959 self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
1961 def test_data_timeout(self):
1962 # Test data channel timeout. The client which does not send
1963 # or receive any data within the time specified in
1964 # DTPHandler.timeout is supposed to be kicked off.
1965 self._setUp(data_timeout=0.1)
1966 addr = self.client.makepasv()
1969 # fail if no msg is received within 1 second
1970 self.client.sock.settimeout(1)
1971 data = self.client.sock.recv(1024)
1972 self.assertEqual(data, "421 Data connection timed out.\r\n")
1973 # ensure client has been kicked off
1974 self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
1976 def test_data_timeout_not_reached(self):
1977 # Impose a timeout for the data channel, then keep sending data for a
1978 # time which is longer than that to make sure that the code checking
1979 # whether the transfer stalled for with no progress is executed.
1980 self._setUp(data_timeout=0.1)
1981 sock = self.client.transfercmd('stor ' + TESTFN)
1982 if hasattr(self.client_class, 'ssl_version'):
1983 sock = ssl.wrap_socket(sock)
1985 stop_at = time.time() + 0.2
1986 while time.time() < stop_at:
1987 sock.send('x' * 1024)
1989 self.client.voidresp()
1991 if os.path.exists(TESTFN):
1992 self.client.delete(TESTFN)
1994 def test_idle_data_timeout1(self):
1995 # Tests that the control connection timeout is suspended while
1996 # the data channel is opened
1997 self._setUp(idle_timeout=0.1, data_timeout=0.2)
1998 addr = self.client.makepasv()
2001 # fail if no msg is received within 1 second
2002 self.client.sock.settimeout(1)
2003 data = self.client.sock.recv(1024)
2004 self.assertEqual(data, "421 Data connection timed out.\r\n")
2005 # ensure client has been kicked off
2006 self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
2008 def test_idle_data_timeout2(self):
2009 # Tests that the control connection timeout is restarted after
2010 # data channel has been closed
2011 self._setUp(idle_timeout=0.1, data_timeout=0.2)
2012 addr = self.client.makepasv()
2015 # close data channel
2016 self.client.sendcmd('abor')
2017 self.client.sock.settimeout(1)
2018 data = self.client.sock.recv(1024)
2019 self.assertEqual(data, "421 Control connection timed out.\r\n")
2020 # ensure client has been kicked off
2021 self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
2023 def test_pasv_timeout(self):
2024 # Test pasv data channel timeout. The client which does not
2025 # connect to the listening data socket within the time specified
2026 # in PassiveDTP.timeout is supposed to receive a 421 response.
2027 self._setUp(pasv_timeout=0.1)
2028 self.client.makepasv()
2029 # fail if no msg is received within 1 second
2030 self.client.sock.settimeout(1)
2031 data = self.client.sock.recv(1024)
2032 self.assertEqual(data, "421 Passive data channel timed out.\r\n")
2033 # client is not expected to be kicked off
2034 self.client.sendcmd('noop')
2036 def test_disabled_idle_timeout(self):
2037 self._setUp(idle_timeout=0)
2038 self.client.sendcmd('noop')
2040 def test_disabled_data_timeout(self):
2041 self._setUp(data_timeout=0)
2042 addr = self.client.makepasv()
2047 def test_disabled_pasv_timeout(self):
2048 self._setUp(pasv_timeout=0)
2049 self.client.makepasv()
2050 # reset passive socket
2051 addr = self.client.makepasv()
2056 def test_disabled_port_timeout(self):
2057 self._setUp(port_timeout=0)
2058 s1 =self.client.makeport()
2059 s2 = self.client.makeport()
2064 class TestConfigurableOptions(unittest.TestCase):
2065 """Test those daemon options which are commonly modified by user."""
2067 client_class = ftplib.FTP
2071 self.server = self.server_class()
2073 self.client = self.client_class()
2074 self.client.connect(self.server.host, self.server.port)
2075 self.client.sock.settimeout(2)
2076 self.client.login(USER, PASSWD)
2080 # set back options to their original value
2081 self.server.server.max_cons = 0
2082 self.server.server.max_cons_per_ip = 0
2083 self.server.handler.banner = "pyftpdlib %s ready." % ftpserver.__ver__
2084 self.server.handler.max_login_attempts = 3
2085 self.server.handler._auth_failed_timeout = 5
2086 self.server.handler.masquerade_address = None
2087 self.server.handler.masquerade_address_map = {}
2088 self.server.handler.permit_privileged_ports = False
2089 self.server.handler.passive_ports = None
2090 self.server.handler.use_gmt_times = True
2091 self.server.handler.tcp_no_delay = hasattr(socket, 'TCP_NODELAY')
2094 def test_max_connections(self):
2095 # Test FTPServer.max_cons attribute
2096 self.server.server.max_cons = 3
2098 c1 = self.client_class()
2099 c2 = self.client_class()
2100 c3 = self.client_class()
2102 c1.connect(self.server.host, self.server.port)
2103 c2.connect(self.server.host, self.server.port)
2104 self.assertRaises(ftplib.error_temp, c3.connect, self.server.host,
2106 # with passive data channel established
2108 c1.login(USER, PASSWD)
2110 self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
2112 # with passive data socket waiting for connection
2113 c1.login(USER, PASSWD)
2115 self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
2117 # with active data channel established
2118 c1.login(USER, PASSWD)
2119 sock = c1.makeport()
2120 self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
2128 def test_max_connections_per_ip(self):
2129 # Test FTPServer.max_cons_per_ip attribute
2130 self.server.server.max_cons_per_ip = 3
2132 c1 = self.client_class()
2133 c2 = self.client_class()
2134 c3 = self.client_class()
2135 c4 = self.client_class()
2137 c1.connect(self.server.host, self.server.port)
2138 c2.connect(self.server.host, self.server.port)
2139 c3.connect(self.server.host, self.server.port)
2140 self.assertRaises(ftplib.error_temp, c4.connect, self.server.host,
2142 # Make sure client has been disconnected.
2143 # socket.error (Windows) or EOFError (Linux) exception is
2144 # supposed to be raised in such a case.
2145 self.assertRaises((socket.error, EOFError), c4.sendcmd, 'noop')
2152 def test_banner(self):
2153 # Test FTPHandler.banner attribute
2154 self.server.handler.banner = 'hello there'
2156 self.client = self.client_class()
2157 self.client.connect(self.server.host, self.server.port)
2158 self.client.sock.settimeout(2)
2159 self.assertEqual(self.client.getwelcome()[4:], 'hello there')
2161 def test_max_login_attempts(self):
2162 # Test FTPHandler.max_login_attempts attribute.
2163 self.server.handler.max_login_attempts = 1
2164 self.server.handler._auth_failed_timeout = 0
2165 self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', 'wrong')
2166 # socket.error (Windows) or EOFError (Linux) exceptions are
2167 # supposed to be raised when attempting to send/recv some data
2168 # using a disconnected socket
2169 self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
2171 def test_masquerade_address(self):
2172 # Test FTPHandler.masquerade_address attribute
2173 host, port = self.client.makepasv()
2174 self.assertEqual(host, self.server.host)
2175 self.server.handler.masquerade_address = "256.256.256.256"
2176 host, port = self.client.makepasv()
2177 self.assertEqual(host, "256.256.256.256")
2179 def test_masquerade_address_map(self):
2180 # Test FTPHandler.masquerade_address_map attribute
2181 host, port = self.client.makepasv()
2182 self.assertEqual(host, self.server.host)
2183 self.server.handler.masquerade_address_map = {self.server.host :
2185 host, port = self.client.makepasv()
2186 self.assertEqual(host, "128.128.128.128")
2188 def test_passive_ports(self):
2189 # Test FTPHandler.passive_ports attribute
2190 _range = range(40000, 60000, 200)
2191 self.server.handler.passive_ports = _range
2192 self.assert_(self.client.makepasv()[1] in _range)
2193 self.assert_(self.client.makepasv()[1] in _range)
2194 self.assert_(self.client.makepasv()[1] in _range)
2195 self.assert_(self.client.makepasv()[1] in _range)
2197 def test_passive_ports_busy(self):
2198 # If the ports in the configured range are busy it is expected
2199 # that a kernel-assigned port gets chosen
2202 port = s.getsockname()[1]
2203 self.server.handler.passive_ports = [port]
2204 resulting_port = self.client.makepasv()[1]
2205 self.assert_(port != resulting_port)
2207 def test_permit_privileged_ports(self):
2208 # Test FTPHandler.permit_privileged_ports_active attribute
2210 # try to bind a socket on a privileged port
2212 for port in reversed(range(1, 1024)):
2214 socket.getservbyport(port)
2215 except socket.error, err:
2216 # not registered port; go on
2218 sock = socket.socket(self.client.af, socket.SOCK_STREAM)
2219 sock.bind((HOST, port))
2221 except socket.error, err:
2222 if err.args[0] == errno.EACCES:
2223 # root privileges needed
2229 # registered port found; skip to the next one
2232 # no usable privileged port was found
2236 self.server.handler.permit_privileged_ports = False
2237 self.assertRaises(ftplib.error_perm, self.client.sendport, HOST,
2240 port = sock.getsockname()[1]
2241 self.server.handler.permit_privileged_ports = True
2244 self.client.sendport(HOST, port)
2247 if sock is not None:
2250 def test_use_gmt_times(self):
2252 self.server.handler.use_gmt_times = True
2253 gmt1 = self.client.sendcmd('mdtm ' + TESTFN)
2254 gmt2 = self.client.sendcmd('mlst ' + TESTFN)
2255 gmt3 = self.client.sendcmd('stat ' + TESTFN)
2258 self.server.handler.use_gmt_times = False
2261 self.client.connect(self.server.host, self.server.port)
2262 self.client.sock.settimeout(2)
2263 self.client.login(USER, PASSWD)
2265 loc1 = self.client.sendcmd('mdtm ' + TESTFN)
2266 loc2 = self.client.sendcmd('mlst ' + TESTFN)
2267 loc3 = self.client.sendcmd('stat ' + TESTFN)
2269 # if we're not in a GMT time zone times are supposed to be
2271 if time.timezone != 0:
2272 self.assertNotEqual(gmt1, loc1)
2273 self.assertNotEqual(gmt2, loc2)
2274 self.assertNotEqual(gmt3, loc3)
2275 # ...otherwise they should be the same
2277 self.assertEqual(gmt1, loc1)
2278 self.assertEqual(gmt2, loc2)
2279 self.assertEqual(gmt3, loc3)
2281 if hasattr(socket, 'TCP_NODELAY'):
2282 def test_tcp_no_delay(self):
2283 def get_handler_socket():
2284 # return the server's handler socket object
2285 for fd in asyncore.socket_map:
2286 instance = asyncore.socket_map[fd]
2287 if isinstance(instance, ftpserver.FTPHandler):
2289 return instance.socket
2291 s = get_handler_socket()
2292 self.assertTrue(s.getsockopt(socket.SOL_TCP, socket.TCP_NODELAY))
2294 self.server.handler.tcp_no_delay = False
2295 self.client.connect(self.server.host, self.server.port)
2296 self.client.sendcmd('noop')
2297 s = get_handler_socket()
2298 self.assertFalse(s.getsockopt(socket.SOL_TCP, socket.TCP_NODELAY))
2301 class TestCallbacks(unittest.TestCase):
2302 """Test FTPHandler class callback methods."""
2304 client_class = ftplib.FTP
2309 self._tearDown = True
2311 def _setUp(self, handler, login=True):
2312 FTPd.handler = handler
2313 self.server = self.server_class()
2315 self.client = self.client_class()
2316 self.client.connect(self.server.host, self.server.port)
2317 self.client.sock.settimeout(2)
2319 self.client.login(USER, PASSWD)
2320 self.file = open(TESTFN, 'w+b')
2321 self.dummyfile = StringIO.StringIO()
2322 self._tearDown = False
2325 if not self._tearDown:
2326 FTPd.handler = ftpserver.FTPHandler
2327 self._tearDown = True
2328 if self.client is not None:
2330 if self.server is not None:
2332 if not self.file.closed:
2334 if not self.dummyfile.closed:
2335 self.dummyfile.close()
2338 def test_on_file_sent(self):
2341 class TestHandler(ftpserver.FTPHandler):
2343 def on_file_sent(self, file):
2346 def on_file_received(self, file):
2349 def on_incomplete_file_sent(self, file):
2352 def on_incomplete_file_received(self, file):
2355 self._setUp(TestHandler)
2356 data = 'abcde12345' * 100000
2357 self.file.write(data)
2359 self.client.retrbinary("retr " + TESTFN, lambda x: x)
2360 # shut down the server to avoid race conditions
2362 self.assertEqual(_file, [os.path.abspath(TESTFN)])
2364 def test_on_file_received(self):
2367 class TestHandler(ftpserver.FTPHandler):
2369 def on_file_sent(self, file):
2372 def on_file_received(self, file):
2375 def on_incomplete_file_sent(self, file):
2378 def on_incomplete_file_received(self, file):
2381 self._setUp(TestHandler)
2382 data = 'abcde12345' * 100000
2383 self.dummyfile.write(data)
2384 self.dummyfile.seek(0)
2385 self.client.storbinary('stor ' + TESTFN, self.dummyfile)
2386 # shut down the server to avoid race conditions
2388 self.assertEqual(_file, [os.path.abspath(TESTFN)])
2390 def test_on_incomplete_file_sent(self):
2393 class TestHandler(ftpserver.FTPHandler):
2395 def on_file_sent(self, file):
2398 def on_file_received(self, file):
2401 def on_incomplete_file_sent(self, file):
2404 def on_incomplete_file_received(self, file):
2407 self._setUp(TestHandler)
2408 data = 'abcde12345' * 100000
2409 self.file.write(data)
2413 conn = self.client.transfercmd("retr " + TESTFN, None)
2415 chunk = conn.recv(8192)
2416 bytes_recv += len(chunk)
2417 if bytes_recv >= 524288 or not chunk:
2420 self.assertEqual(self.client.getline()[:3], "426")
2422 # shut down the server to avoid race conditions
2424 self.assertEqual(_file, [os.path.abspath(TESTFN)])
2426 def test_on_incomplete_file_received(self):
2429 class TestHandler(ftpserver.FTPHandler):
2431 def on_file_sent(self, file):
2434 def on_file_received(self, file):
2437 def on_incomplete_file_sent(self, file):
2440 def on_incomplete_file_received(self, file):
2443 self._setUp(TestHandler)
2444 data = 'abcde12345' * 100000
2445 self.dummyfile.write(data)
2446 self.dummyfile.seek(0)
2448 conn = self.client.transfercmd('stor ' + TESTFN)
2451 chunk = self.dummyfile.read(8192)
2453 bytes_sent += len(chunk)
2454 # stop transfer while it isn't finished yet
2455 if bytes_sent >= 524288 or not chunk:
2456 self.client.putcmd('abor')
2460 # shut down the server to avoid race conditions
2462 self.assertEqual(_file, [os.path.abspath(TESTFN)])
2464 def test_on_login(self):
2467 class TestHandler(ftpserver.FTPHandler):
2468 _auth_failed_timeout = 0
2470 def on_login(self, username):
2471 user.append(username)
2473 def on_login_failed(self, username, password):
2477 self._setUp(TestHandler)
2478 # shut down the server to avoid race conditions
2480 self.assertEqual(user, [USER])
2482 def test_on_login_failed(self):
2485 class TestHandler(ftpserver.FTPHandler):
2486 _auth_failed_timeout = 0
2488 def on_login(self, username):
2491 def on_login_failed(self, username, password):
2492 pair.append((username, password))
2494 self._setUp(TestHandler, login=False)
2495 self.assertRaises(ftplib.error_perm, self.client.login, 'foo', 'bar')
2496 # shut down the server to avoid race conditions
2498 self.assertEqual(pair, [('foo', 'bar')])
2500 def test_on_login_failed(self):
2503 class TestHandler(ftpserver.FTPHandler):
2504 _auth_failed_timeout = 0
2506 def on_login(self, username):
2509 def on_login_failed(self, username, password):
2510 pair.append((username, password))
2512 self._setUp(TestHandler, login=False)
2513 self.assertRaises(ftplib.error_perm, self.client.login, 'foo', 'bar')
2514 # shut down the server to avoid race conditions
2516 self.assertEqual(pair, [('foo', 'bar')])
2519 def test_on_logout_quit(self):
2522 class TestHandler(ftpserver.FTPHandler):
2524 def on_logout(self, username):
2525 user.append(username)
2527 self._setUp(TestHandler)
2529 # shut down the server to avoid race conditions
2531 self.assertEqual(user, [USER])
2533 def test_on_logout_rein(self):
2536 class TestHandler(ftpserver.FTPHandler):
2538 def on_logout(self, username):
2539 user.append(username)
2541 self._setUp(TestHandler)
2542 self.client.sendcmd('rein')
2543 # shut down the server to avoid race conditions
2545 self.assertEqual(user, [USER])
2547 def test_on_logout_user_issued_twice(self):
2550 class TestHandler(ftpserver.FTPHandler):
2552 def on_logout(self, username):
2553 users.append(username)
2555 self._setUp(TestHandler)
2556 # At this point user "user" is logged in. Re-login as anonymous,
2557 # then quit and expect queue == ["user", "anonymous"]
2558 self.client.login("anonymous")
2560 # shut down the server to avoid race conditions
2562 self.assertEqual(users, [USER, 'anonymous'])
2565 class _TestNetworkProtocols(unittest.TestCase):
2566 """Test PASV, EPSV, PORT and EPRT commands.
2568 Do not use this class directly, let TestIPv4Environment and
2569 TestIPv6Environment classes use it instead.
2572 client_class = ftplib.FTP
2576 self.server = self.server_class(self.HOST)
2578 self.client = self.client_class()
2579 self.client.connect(self.server.host, self.server.port)
2580 self.client.sock.settimeout(2)
2581 self.client.login(USER, PASSWD)
2582 if self.client.af == socket.AF_INET:
2584 self.other_proto = "2"
2587 self.other_proto = "1"
2593 def cmdresp(self, cmd):
2594 """Send a command and return response, also if the command failed."""
2596 return self.client.sendcmd(cmd)
2597 except ftplib.Error, err:
2600 def test_eprt(self):
2603 self.client.sendcmd('eprt |%s|%s|%s|' % (self.other_proto,
2604 self.server.host, self.server.port))
2605 except ftplib.error_perm, err:
2606 self.assertEqual(str(err)[0:3], "522")
2608 self.fail("Exception not raised")
2611 msg = "501 Invalid EPRT format."
2613 self.assertEqual(self.cmdresp('eprt ||||'), msg)
2615 self.assertEqual(self.cmdresp('eprt ||'), msg)
2617 self.assertEqual(self.cmdresp('eprt |%s|%s|65536|' % (self.proto,
2620 self.assertEqual(self.cmdresp('eprt |%s|%s|-1|' % (self.proto,
2623 self.assertEqual(self.cmdresp('eprt |%s|%s|222|' % (self.proto,
2624 self.HOST)), "501 Can't connect over a privileged port.")
2626 _cmd = 'eprt |3|%s|%s|' % (self.server.host, self.server.port)
2627 self.assertRaises(ftplib.error_perm, self.client.sendcmd, _cmd)
2630 if self.proto == '1':
2632 self.assertEqual(self.cmdresp('eprt |1|1.2.3.4.5|2048|'), msg)
2634 self.assertEqual(self.cmdresp('eprt |1|1.2.3.256|2048|'), msg)
2636 resp = self.cmdresp('eprt |2|1.2.3.256|2048|')
2637 self.assert_("Network protocol not supported" in resp)
2640 sock = socket.socket(self.client.af)
2641 sock.bind((self.client.sock.getsockname()[0], 0))
2644 ip, port = sock.getsockname()[:2]
2645 self.client.sendcmd('eprt |%s|%s|%s|' % (self.proto, ip, port))
2649 except socket.timeout:
2650 self.fail("Server didn't connect to passive socket")
2654 def test_epsv(self):
2657 self.client.sendcmd('epsv ' + self.other_proto)
2658 except ftplib.error_perm, err:
2659 self.assertEqual(str(err)[0:3], "522")
2661 self.fail("Exception not raised")
2664 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'epsv 3')
2667 for cmd in ('EPSV', 'EPSV ' + self.proto):
2668 host, port = ftplib.parse229(self.client.sendcmd(cmd),
2669 self.client.sock.getpeername())
2670 s = socket.socket(self.client.af, socket.SOCK_STREAM)
2673 s.connect((host, port))
2674 self.client.sendcmd('abor')
2678 def test_epsv_all(self):
2679 self.client.sendcmd('epsv all')
2680 self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pasv')
2681 self.assertRaises(ftplib.error_perm, self.client.sendport, self.HOST, 2000)
2682 self.assertRaises(ftplib.error_perm, self.client.sendcmd,
2683 'eprt |%s|%s|%s|' % (self.proto, self.HOST, 2000))
2686 class TestIPv4Environment(_TestNetworkProtocols):
2687 """Test PASV, EPSV, PORT and EPRT commands.
2689 Runs tests contained in _TestNetworkProtocols class by using IPv4
2690 plus some additional specific tests.
2693 client_class = ftplib.FTP
2696 def test_port_v4(self):
2698 sock = self.client.makeport()
2699 self.client.sendcmd('abor')
2701 # test bad arguments
2702 ae = self.assertEqual
2703 msg = "501 Invalid PORT format."
2704 ae(self.cmdresp('port 127,0,0,1,1.1'), msg) # sep != ','
2705 ae(self.cmdresp('port X,0,0,1,1,1'), msg) # value != int
2706 ae(self.cmdresp('port 127,0,0,1,1,1,1'), msg) # len(args) > 6
2707 ae(self.cmdresp('port 127,0,0,1'), msg) # len(args) < 6
2708 ae(self.cmdresp('port 256,0,0,1,1,1'), msg) # oct > 255
2709 ae(self.cmdresp('port 127,0,0,1,256,1'), msg) # port > 65535
2710 ae(self.cmdresp('port 127,0,0,1,-1,0'), msg) # port < 0
2711 msg = "501 Can't connect over a privileged port."
2712 ae(self.cmdresp('port %s,1,1' % self.HOST.replace('.',',')),msg) # port < 1024
2713 if "1.2.3.4" != self.HOST:
2714 msg = "501 Can't connect to a foreign address."
2715 ae(self.cmdresp('port 1,2,3,4,4,4'), msg)
2717 def test_eprt_v4(self):
2718 self.assertEqual(self.cmdresp('eprt |1|0.10.10.10|2222|'),
2719 "501 Can't connect to a foreign address.")
2721 def test_pasv_v4(self):
2722 host, port = ftplib.parse227(self.client.sendcmd('pasv'))
2723 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2726 s.connect((host, port))
2731 class TestIPv6Environment(_TestNetworkProtocols):
2732 """Test PASV, EPSV, PORT and EPRT commands.
2734 Runs tests contained in _TestNetworkProtocols class by using IPv6
2735 plus some additional specific tests.
2738 client_class = ftplib.FTP
2741 def test_port_v6(self):
2742 # PORT is not supposed to work
2743 self.assertRaises(ftplib.error_perm, self.client.sendport,
2744 self.server.host, self.server.port)
2746 def test_pasv_v6(self):
2747 # PASV is still supposed to work to support clients using
2748 # IPv4 connecting to a server supporting both IPv4 and IPv6
2749 self.client.makepasv()
2751 def test_eprt_v6(self):
2752 self.assertEqual(self.cmdresp('eprt |2|::foo|2222|'),
2753 "501 Can't connect to a foreign address.")
2756 class TestIPv6MixedEnvironment(unittest.TestCase):
2757 """By running the server by specifying "::" as IP address the
2758 server is supposed to listen on all interfaces, supporting both
2759 IPv4 and IPv6 by using a single socket.
2761 What we are going to do here is starting the server in this
2762 manner and try to connect by using an IPv4 client.
2765 client_class = ftplib.FTP
2769 self.server = self.server_class(self.HOST)
2774 if self.client is not None:
2778 def test_port_v4(self):
2780 self.client = self.client_class()
2781 self.client.connect('127.0.0.1', self.server.port)
2782 self.client.set_pasv(False)
2783 self.client.sock.settimeout(2)
2784 self.client.login(USER, PASSWD)
2785 self.client.retrlines('list', noop)
2787 def test_pasv_v4(self):
2789 self.client = self.client_class()
2790 self.client.connect('127.0.0.1', self.server.port)
2791 self.client.set_pasv(True)
2792 self.client.sock.settimeout(2)
2793 self.client.login(USER, PASSWD)
2794 self.client.retrlines('list', noop)
2795 # make sure pasv response doesn't return an IPv4-mapped address
2796 ip = self.client.makepasv()[0]
2797 self.assertFalse(ip.startswith("::ffff:"))
2800 class TestCornerCases(unittest.TestCase):
2801 """Tests for any kind of strange situation for the server to be in,
2802 mainly referring to bugs signaled on the bug tracker.
2805 client_class = ftplib.FTP
2808 self.server = self.server_class()
2810 self.client = self.client_class()
2811 self.client.connect(self.server.host, self.server.port)
2812 self.client.sock.settimeout(2)
2813 self.client.login(USER, PASSWD)
2817 if self.server.running:
2820 def test_port_race_condition(self):
2821 # Refers to bug #120, first sends PORT, then disconnects the
2822 # control channel before accept()ing the incoming data connection.
2823 # The original server behavior was to reply with "200 Active
2824 # data connection established" *after* the client had already
2825 # disconnected the control connection.
2826 sock = socket.socket(self.client.af)
2827 sock.bind((self.client.sock.getsockname()[0], 0))
2830 host, port = sock.getsockname()[:2]
2832 hbytes = host.split('.')
2833 pbytes = [repr(port // 256), repr(port % 256)]
2834 bytes = hbytes + pbytes
2835 cmd = 'PORT ' + ','.join(bytes)
2836 self.client.sock.sendall(cmd + '\r\n')
2841 def test_stou_max_tries(self):
2842 # Emulates case where the max number of tries to find out a
2843 # unique file name when processing STOU command gets hit.
2845 class TestFS(ftpserver.AbstractedFS):
2846 def mkstemp(self, *args, **kwargs):
2847 raise IOError(errno.EEXIST, "No usable temporary file name found")
2849 self.server.handler.abstracted_fs = TestFS
2852 self.client.connect(self.server.host, self.server.port)
2853 self.client.login(USER, PASSWD)
2854 self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'stou')
2856 self.server.handler.abstracted_fs = ftpserver.AbstractedFS
2858 def test_quick_connect(self):
2859 # Clients that connected and disconnected quickly could cause
2860 # the server to crash, due to a failure to catch errors in the
2861 # initial part of the connection process.
2862 # Tracked in issues #91, #104 and #105.
2863 # See also https://bugs.launchpad.net/zodb/+bug/135108
2868 s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
2869 struct.pack('ii', 1, 0))
2872 except socket.error:
2876 for x in xrange(10):
2877 connect((self.server.host, self.server.port))
2878 for x in xrange(10):
2879 addr = self.client.makepasv()
2882 def test_error_on_callback(self):
2883 # test that the server do not crash in case an error occurs
2884 # while firing a scheduled function
2887 original_logerror = ftpserver.logerror
2888 ftpserver.logerror = lambda msg: flag.append(msg)
2889 server = ftpserver.FTPServer((HOST, 0), ftpserver.FTPHandler)
2891 len1 = len(asyncore.socket_map)
2892 ftpserver.CallLater(0, lambda: 1 // 0)
2893 server.serve_forever(timeout=0, count=1)
2894 len2 = len(asyncore.socket_map)
2895 self.assertEqual(len1, len2)
2896 self.assertTrue(flag)
2898 ftpserver.logerror = original_logerror
2901 def test_active_conn_error(self):
2902 # we open a socket() but avoid to invoke accept() to
2903 # reproduce this error condition:
2904 # http://code.google.com/p/pyftpdlib/source/detail?r=905
2905 sock = socket.socket()
2906 sock.bind((HOST, 0))
2907 port = sock.getsockname()[1]
2908 self.client.sock.settimeout(.1)
2910 resp = self.client.sendport(HOST, port)
2911 except ftplib.error_temp, err:
2912 self.assertEqual(str(err)[:3], '425')
2913 except socket.timeout:
2916 self.assertNotEqual(str(resp)[:3], '200')
2919 class TestUnicodePathNames(unittest.TestCase):
2920 """Test FTP commands and responses by using path names with non
2924 client_class = ftplib.FTP
2927 self.server = self.server_class()
2929 self.client = self.client_class()
2930 self.client.connect(self.server.host, self.server.port)
2931 self.client.sock.settimeout(2)
2932 self.client.login(USER, PASSWD)
2933 self.tempfile = os.path.basename(touch(TESTFN + '☃'))
2934 self.tempdir = TESTFN + '☺'
2935 os.mkdir(self.tempdir)
2940 safe_remove(self.tempfile)
2941 if os.path.exists(self.tempdir):
2942 shutil.rmtree(self.tempdir)
2947 resp = self.client.cwd(self.tempdir)
2948 self.assertTrue(self.tempdir in resp)
2951 os.rmdir(self.tempdir)
2952 dirname = self.client.mkd(self.tempdir)
2953 self.assertEqual(dirname, '/' + self.tempdir)
2954 self.assertTrue(os.path.isdir(self.tempdir))
2956 def test_rmdir(self):
2957 self.client.rmd(self.tempdir)
2959 def test_dele(self):
2960 self.client.delete(self.tempfile)
2961 self.assertFalse(os.path.exists(self.tempfile))
2963 def test_rnfr_rnto(self):
2964 tempname = TESTFN + '♥'
2967 self.client.rename(self.tempfile, tempname)
2968 self.assertTrue(os.path.isfile(tempname))
2969 self.client.rename(tempname, self.tempfile)
2971 self.client.rename(self.tempdir, tempname)
2972 self.assertTrue(os.path.isdir(tempname))
2973 self.client.rename(tempname, self.tempdir)
2975 safe_remove(tempname)
2976 safe_rmdir(tempname)
2978 def test_size(self):
2979 self.client.sendcmd('type i')
2980 self.client.sendcmd('size ' + self.tempfile)
2982 def test_mdtm(self):
2983 self.client.sendcmd('mdtm ' + self.tempfile)
2985 def test_stou(self):
2986 resp = self.client.sendcmd('stou ' + self.tempfile)
2988 self.assertTrue(self.tempfile in resp)
2991 os.remove(resp.rsplit(' ', 1)[1])
2993 if hasattr(os, 'chmod'):
2994 def test_site_chmod(self):
2995 self.client.sendcmd('site chmod 777 ' + self.tempfile)
2999 def _test_listing_cmds(self, cmd):
3001 touch(os.path.join(self.tempdir, self.tempfile))
3002 self.client.retrlines("%s %s" % (cmd, self.tempdir), ls.append)
3003 self.assertTrue(self.tempfile in ls[0])
3005 def test_list(self):
3006 self._test_listing_cmds('list')
3008 def test_nlst(self):
3009 self._test_listing_cmds('nlst')
3011 def test_mlsd(self):
3012 self._test_listing_cmds('mlsd')
3014 def test_mlst(self):
3015 # utility function for extracting the line of interest
3016 mlstline = lambda cmd: self.client.voidcmd(cmd).split('\n')[1]
3017 self.assertTrue('type=dir' in mlstline('mlst ' + self.tempdir))
3018 self.assertTrue('/' + self.tempdir in mlstline('mlst ' + self.tempdir))
3019 self.assertTrue('type=file' in mlstline('mlst ' + self.tempfile))
3020 self.assertTrue('/' + self.tempfile in mlstline('mlst ' + self.tempfile))
3024 def test_stor(self):
3025 data = 'abcde12345' * 500
3026 os.remove(self.tempfile)
3027 dummy = StringIO.StringIO()
3030 self.client.storbinary('stor ' + self.tempfile, dummy)
3031 dummy_recv = StringIO.StringIO()
3032 self.client.retrbinary('retr ' + self.tempfile, dummy_recv.write)
3034 self.assertEqual(dummy_recv.read(), data)
3036 def test_retr(self):
3037 data = 'abcd1234' * 500
3038 f = open(self.tempfile, 'wb')
3041 dummy = StringIO.StringIO()
3042 self.client.retrbinary('retr ' + self.tempfile, dummy.write)
3044 self.assertEqual(dummy.read(), data)
3047 def test_encode_decode(self):
3048 self.client.sendcmd('type i')
3049 self.client.sendcmd('size ' + self.tempfile.decode('utf8').encode('utf8'))
3052 class TestCommandLineParser(unittest.TestCase):
3053 """Test command line parser."""
3058 class DummyFTPServer(ftpserver.FTPServer):
3059 """An overridden version of FTPServer class which forces
3060 serve_forever() to return immediately.
3062 def serve_forever(self, *args, **kwargs):
3065 self.devnull = StringIO.StringIO()
3066 sys.argv = self.SYSARGV[:]
3067 sys.stderr = self.STDERR
3068 self.original_ftpserver_class = ftpserver.FTPServer
3069 ftpserver.FTPServer = DummyFTPServer
3072 self.devnull.close()
3073 sys.argv = self.SYSARGV[:]
3074 sys.stderr = self.STDERR
3075 ftpserver.FTPServer = self.original_ftpserver_class
3078 def test_a_option(self):
3079 sys.argv += ["-i", "localhost", "-p", "0"]
3081 sys.argv = self.SYSARGV[:]
3085 sys.stderr = self.devnull
3086 self.assertRaises(SystemExit, ftpserver.main)
3088 def test_p_option(self):
3089 sys.argv += ["-p", "0"]
3093 sys.argv = self.SYSARGV[:]
3095 sys.stderr = self.devnull
3096 self.assertRaises(SystemExit, ftpserver.main)
3099 sys.argv += ["-p foo"]
3100 self.assertRaises(SystemExit, ftpserver.main)
3102 def test_w_option(self):
3103 sys.argv += ["-w", "-p", "0"]
3104 warnings.filterwarnings("error")
3106 self.assertRaises(RuntimeWarning, ftpserver.main)
3108 warnings.resetwarnings()
3110 # unexpected argument
3111 sys.argv = self.SYSARGV[:]
3112 sys.argv += ["-w foo"]
3113 sys.stderr = self.devnull
3114 self.assertRaises(SystemExit, ftpserver.main)
3116 def test_d_option(self):
3117 sys.argv += ["-d", TESTFN, "-p", "0"]
3118 if not os.path.isdir(TESTFN):
3123 sys.argv = self.SYSARGV[:]
3125 sys.stderr = self.devnull
3126 self.assertRaises(SystemExit, ftpserver.main)
3129 sys.argv = self.SYSARGV[:]
3130 sys.argv += ["-d %s" % TESTFN]
3132 self.assertRaises(ValueError, ftpserver.main)
3134 def test_r_option(self):
3135 sys.argv += ["-r 60000-61000", "-p", "0"]
3139 sys.argv = self.SYSARGV[:]
3141 sys.stderr = self.devnull
3142 self.assertRaises(SystemExit, ftpserver.main)
3145 sys.argv = self.SYSARGV[:]
3146 sys.argv += ["-r yyy-zzz"]
3147 self.assertRaises(SystemExit, ftpserver.main)
3149 def test_v_option(self):
3151 self.assertRaises(SystemExit, ftpserver.main)
3153 # unexpected argument
3154 sys.argv = self.SYSARGV[:]
3155 sys.argv += ["-v foo"]
3156 sys.stderr = self.devnull
3157 self.assertRaises(SystemExit, ftpserver.main)
3160 def test_main(tests=None):
3161 test_suite = unittest.TestSuite()
3165 TestDummyAuthorizer,
3168 TestFtpAuthentication,
3170 TestFtpCmdsSemantic,
3171 TestFtpFsOperations,
3173 TestFtpRetrieveData,
3177 TestConfigurableOptions,
3180 TestUnicodePathNames,
3181 TestCommandLineParser,
3184 tests.append(TestIPv4Environment)
3186 tests.append(TestIPv6Environment)
3187 if SUPPORTS_HYBRID_IPV6:
3188 tests.append(TestIPv6MixedEnvironment)
3189 if SUPPORTS_SENDFILE:
3190 tests.append(TestFtpRetrieveDataNoSendfile)
3191 tests.append(TestFtpStoreDataNoSendfile)
3193 if os.name == 'posix':
3194 atexit.register(warnings.warn, "couldn't run sendfile() tests",
3198 test_suite.addTest(unittest.makeSuite(test))
3200 unittest.TextTestRunner(verbosity=2).run(test_suite)
3202 # in case of KeyboardInterrupt grant that the threaded FTP
3203 # server running in background gets stopped
3204 asyncore.socket_map.clear()
3208 if __name__ == '__main__':