1ec6db6cad83319c90acdc0dc670d84c51ece99b
[platform/framework/web/crosswalk.git] / src / third_party / pyftpdlib / src / test / test_ftpd.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 # $Id: test_ftpd.py 976 2012-01-22 22:39:10Z g.rodola $
5
6 #  ======================================================================
7 #  Copyright (C) 2007-2012 Giampaolo Rodola' <g.rodola@gmail.com>
8 #
9 #                         All Rights Reserved
10 #
11 # Permission is hereby granted, free of charge, to any person
12 # obtaining a copy of this software and associated documentation
13 # files (the "Software"), to deal in the Software without
14 # restriction, including without limitation the rights to use,
15 # copy, modify, merge, publish, distribute, sublicense, and/or sell
16 # copies of the Software, and to permit persons to whom the
17 # Software is furnished to do so, subject to the following
18 # conditions:
19 #
20 # The above copyright notice and this permission notice shall be
21 # included in all copies or substantial portions of the Software.
22 #
23 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
25 # OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
27 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
28 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
29 # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
30 # OTHER DEALINGS IN THE SOFTWARE.
31 #
32 #  ======================================================================
33
34
35 # This test suite has been run successfully on the following systems:
36 #
37 # -----------------------------------------------------------
38 #  System                          | Python version
39 # -----------------------------------------------------------
40 #  Linux Ubuntu 2.6.20-15          | 2.4, 2.5, 2.6, 2.7
41 #  Linux Kubuntu 8.04 32 & 64 bits | 2.5.2
42 #  Windows XP prof SP3             | 2.4, 2.5, 2.6.1
43 #  Windows Vista Ultimate 64 bit   | 2.6, 2.7
44 #  Windows Server 2008 64bit       | 2.5.1
45 #  Windows Mobile 6.1              | PythonCE 2.5
46 #  OS X 10.4.10                    | 2.4, 2.5, 2.7
47 #  FreeBSD 7.0                     | 2.4, 2.5, 2.7
48 # -----------------------------------------------------------
49
50
51 import threading
52 import unittest
53 import socket
54 import os
55 import shutil
56 import time
57 import re
58 import tempfile
59 import ftplib
60 import random
61 import warnings
62 import sys
63 import errno
64 import asyncore
65 import atexit
66 import stat
67 try:
68     import cStringIO as StringIO
69 except ImportError:
70     import StringIO
71 try:
72     import ssl
73 except ImportError:
74     ssl = None
75 try:
76     import sendfile
77 except ImportError:
78     sendfile = None
79
80 from pyftpdlib import ftpserver
81
82
83 # Attempt to use IP rather than hostname (test suite will run a lot faster)
84 try:
85     HOST = socket.gethostbyname('localhost')
86 except socket.error:
87     HOST = 'localhost'
88 USER = 'user'
89 PASSWD = '12345'
90 HOME = os.getcwd()
91 TESTFN = 'tmp-pyftpdlib'
92
93 def try_address(host, port=0, family=socket.AF_INET):
94     """Try to bind a socket on the given host:port and return True
95     if that has been possible."""
96     try:
97         sock = socket.socket(family)
98         sock.bind((host, port))
99     except (socket.error, socket.gaierror):
100         return False
101     else:
102         sock.close()
103         return True
104
105 def support_hybrid_ipv6():
106     """Return True if it is possible to use hybrid IPv6/IPv4 sockets
107     on this platform.
108     """
109     # IPPROTO_IPV6 constant is broken, see: http://bugs.python.org/issue6926
110     IPPROTO_IPV6 = getattr(socket, "IPV6_V6ONLY", 41)
111     IPV6_V6ONLY = getattr(socket, "IPV6_V6ONLY", 26)
112     sock = socket.socket(socket.AF_INET6)
113     try:
114         try:
115             return not sock.getsockopt(IPPROTO_IPV6, IPV6_V6ONLY)
116         except socket.error:
117             return False
118     finally:
119         sock.close()
120
121
122 SUPPORTS_IPV4 = try_address('127.0.0.1')
123 SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6)
124 SUPPORTS_HYBRID_IPV6 = SUPPORTS_IPV6 and support_hybrid_ipv6()
125 SUPPORTS_SENDFILE = sendfile is not None
126
127 def safe_remove(*files):
128     "Convenience function for removing temporary test files"
129     for file in files:
130         try:
131             os.remove(file)
132         except OSError, err:
133             if err.errno != errno.ENOENT:
134                 raise
135
136 def safe_rmdir(dir):
137     "Convenience function for removing temporary test directories"
138     try:
139         os.rmdir(dir)
140     except OSError, err:
141         if err.errno != errno.ENOENT:
142             raise
143
144 def touch(name):
145     """Create a file and return its name."""
146     assert not os.path.isfile(name), name
147     f = open(name, 'w')
148     try:
149         return f.name
150     finally:
151         f.close()
152
153 def onexit():
154     """Convenience function for removing temporary files and
155     directories on interpreter exit.
156     Also closes all sockets/instances left behind in asyncore
157     socket map (if any).
158     """
159     for name in os.listdir('.'):
160         if name.startswith(tempfile.template):
161             if os.path.isdir(name):
162                 shutil.rmtree(name)
163             else:
164                 os.remove(name)
165     map = asyncore.socket_map
166     for x in map.values():
167         try:
168             sys.stderr.write("garbage: %s\n" % repr(x))
169             x.close()
170         except:
171             pass
172     map.clear()
173
174
175 # commented out as per bug http://bugs.python.org/issue10354
176 #tempfile.template = 'tmp-pyftpdlib'
177 atexit.register(onexit)
178
179 # lower this threshold so that the scheduler internal queue
180 # gets re-heapified more often
181 ftpserver._scheduler.cancellations_threshold = 5
182
183
184 class FTPd(threading.Thread):
185     """A threaded FTP server used for running tests.
186
187     This is basically a modified version of the FTPServer class which
188     wraps the polling loop into a thread.
189
190     The instance returned can be used to start(), stop() and
191     eventually re-start() the server.
192     """
193     handler = ftpserver.FTPHandler
194
195     def __init__(self, host=HOST, port=0, verbose=False):
196         threading.Thread.__init__(self)
197         self.__serving = False
198         self.__stopped = False
199         self.__lock = threading.Lock()
200         self.__flag = threading.Event()
201
202         if not verbose:
203             ftpserver.log = ftpserver.logline = lambda x: x
204
205         # this makes the threaded server raise an actual exception
206         # instead of just logging its traceback
207         def logerror(msg):
208             raise
209         ftpserver.logerror = logerror
210         authorizer = ftpserver.DummyAuthorizer()
211         authorizer.add_user(USER, PASSWD, HOME, perm='elradfmwM')  # full perms
212         authorizer.add_anonymous(HOME)
213         self.handler.authorizer = authorizer
214         self.server = ftpserver.FTPServer((host, port), self.handler)
215         self.host, self.port = self.server.socket.getsockname()[:2]
216
217     def __repr__(self):
218         status = [self.__class__.__module__ + "." + self.__class__.__name__]
219         if self.__serving:
220             status.append('active')
221         else:
222             status.append('inactive')
223         status.append('%s:%s' % self.server.socket.getsockname()[:2])
224         return '<%s at %#x>' % (' '.join(status), id(self))
225
226     @property
227     def running(self):
228         return self.__serving
229
230     def start(self, timeout=0.001, use_poll=False):
231         """Start serving until an explicit stop() request.
232         Polls for shutdown every 'timeout' seconds.
233         """
234         if self.__serving:
235             raise RuntimeError("Server already started")
236         if self.__stopped:
237             # ensure the server can be started again
238             FTPd.__init__(self, self.server.socket.getsockname(), self.handler)
239         self.__timeout = timeout
240         self.__use_poll = use_poll
241         threading.Thread.start(self)
242         self.__flag.wait()
243
244     def run(self):
245         self.__serving = True
246         self.__flag.set()
247         while self.__serving and asyncore.socket_map:
248             self.__lock.acquire()
249             self.server.serve_forever(timeout=self.__timeout, count=1,
250                                       use_poll=self.__use_poll)
251             self.__lock.release()
252         self.server.close_all()
253
254     def stop(self):
255         """Stop serving (also disconnecting all currently connected
256         clients) by telling the serve_forever() loop to stop and
257         waits until it does.
258         """
259         if not self.__serving:
260             raise RuntimeError("Server not started yet")
261         self.__serving = False
262         self.__stopped = True
263         self.join()
264
265
266 class TestAbstractedFS(unittest.TestCase):
267     """Test for conversion utility methods of AbstractedFS class."""
268
269     def setUp(self):
270         safe_remove(TESTFN)
271
272     tearDown = setUp
273
274     def test_ftpnorm(self):
275         # Tests for ftpnorm method.
276         ae = self.assertEquals
277         fs = ftpserver.AbstractedFS('/', None)
278
279         fs._cwd = '/'
280         ae(fs.ftpnorm(''), '/')
281         ae(fs.ftpnorm('/'), '/')
282         ae(fs.ftpnorm('.'), '/')
283         ae(fs.ftpnorm('..'), '/')
284         ae(fs.ftpnorm('a'), '/a')
285         ae(fs.ftpnorm('/a'), '/a')
286         ae(fs.ftpnorm('/a/'), '/a')
287         ae(fs.ftpnorm('a/..'), '/')
288         ae(fs.ftpnorm('a/b'), '/a/b')
289         ae(fs.ftpnorm('a/b/..'), '/a')
290         ae(fs.ftpnorm('a/b/../..'), '/')
291         fs._cwd = '/sub'
292         ae(fs.ftpnorm(''), '/sub')
293         ae(fs.ftpnorm('/'), '/')
294         ae(fs.ftpnorm('.'), '/sub')
295         ae(fs.ftpnorm('..'), '/')
296         ae(fs.ftpnorm('a'), '/sub/a')
297         ae(fs.ftpnorm('a/'), '/sub/a')
298         ae(fs.ftpnorm('a/..'), '/sub')
299         ae(fs.ftpnorm('a/b'), '/sub/a/b')
300         ae(fs.ftpnorm('a/b/'), '/sub/a/b')
301         ae(fs.ftpnorm('a/b/..'), '/sub/a')
302         ae(fs.ftpnorm('a/b/../..'), '/sub')
303         ae(fs.ftpnorm('a/b/../../..'), '/')
304         ae(fs.ftpnorm('//'), '/')  # UNC paths must be collapsed
305
306     def test_ftp2fs(self):
307         # Tests for ftp2fs method.
308         ae = self.assertEquals
309         fs = ftpserver.AbstractedFS('/', None)
310         join = lambda x, y: os.path.join(x, y.replace('/', os.sep))
311
312         def goforit(root):
313             fs._root = root
314             fs._cwd = '/'
315             ae(fs.ftp2fs(''), root)
316             ae(fs.ftp2fs('/'), root)
317             ae(fs.ftp2fs('.'), root)
318             ae(fs.ftp2fs('..'), root)
319             ae(fs.ftp2fs('a'), join(root, 'a'))
320             ae(fs.ftp2fs('/a'), join(root, 'a'))
321             ae(fs.ftp2fs('/a/'), join(root, 'a'))
322             ae(fs.ftp2fs('a/..'), root)
323             ae(fs.ftp2fs('a/b'), join(root, r'a/b'))
324             ae(fs.ftp2fs('/a/b'), join(root, r'a/b'))
325             ae(fs.ftp2fs('/a/b/..'), join(root, 'a'))
326             ae(fs.ftp2fs('/a/b/../..'), root)
327             fs._cwd = '/sub'
328             ae(fs.ftp2fs(''), join(root, 'sub'))
329             ae(fs.ftp2fs('/'), root)
330             ae(fs.ftp2fs('.'), join(root, 'sub'))
331             ae(fs.ftp2fs('..'), root)
332             ae(fs.ftp2fs('a'), join(root, 'sub/a'))
333             ae(fs.ftp2fs('a/'), join(root, 'sub/a'))
334             ae(fs.ftp2fs('a/..'), join(root, 'sub'))
335             ae(fs.ftp2fs('a/b'), join(root, 'sub/a/b'))
336             ae(fs.ftp2fs('a/b/..'), join(root, 'sub/a'))
337             ae(fs.ftp2fs('a/b/../..'), join(root, 'sub'))
338             ae(fs.ftp2fs('a/b/../../..'), root)
339             ae(fs.ftp2fs('//a'), join(root, 'a'))  # UNC paths must be collapsed
340
341         if os.sep == '\\':
342             goforit(r'C:\dir')
343             goforit('C:\\')
344             # on DOS-derived filesystems (e.g. Windows) this is the same
345             # as specifying the current drive directory (e.g. 'C:\\')
346             goforit('\\')
347         elif os.sep == '/':
348             goforit('/home/user')
349             goforit('/')
350         else:
351             # os.sep == ':'? Don't know... let's try it anyway
352             goforit(os.getcwd())
353
354     def test_fs2ftp(self):
355         # Tests for fs2ftp method.
356         ae = self.assertEquals
357         fs = ftpserver.AbstractedFS('/', None)
358         join = lambda x, y: os.path.join(x, y.replace('/', os.sep))
359
360         def goforit(root):
361             fs._root = root
362             ae(fs.fs2ftp(root), '/')
363             ae(fs.fs2ftp(join(root, '/')), '/')
364             ae(fs.fs2ftp(join(root, '.')), '/')
365             ae(fs.fs2ftp(join(root, '..')), '/')  # can't escape from root
366             ae(fs.fs2ftp(join(root, 'a')), '/a')
367             ae(fs.fs2ftp(join(root, 'a/')), '/a')
368             ae(fs.fs2ftp(join(root, 'a/..')), '/')
369             ae(fs.fs2ftp(join(root, 'a/b')), '/a/b')
370             ae(fs.fs2ftp(join(root, 'a/b')), '/a/b')
371             ae(fs.fs2ftp(join(root, 'a/b/..')), '/a')
372             ae(fs.fs2ftp(join(root, '/a/b/../..')), '/')
373             fs._cwd = '/sub'
374             ae(fs.fs2ftp(join(root, 'a/')), '/a')
375
376         if os.sep == '\\':
377             goforit(r'C:\dir')
378             goforit('C:\\')
379             # on DOS-derived filesystems (e.g. Windows) this is the same
380             # as specifying the current drive directory (e.g. 'C:\\')
381             goforit('\\')
382             fs._root = r'C:\dir'
383             ae(fs.fs2ftp('C:\\'), '/')
384             ae(fs.fs2ftp('D:\\'), '/')
385             ae(fs.fs2ftp('D:\\dir'), '/')
386         elif os.sep == '/':
387             goforit('/')
388             if os.path.realpath('/__home/user') != '/__home/user':
389                 self.fail('Test skipped (symlinks not allowed).')
390             goforit('/__home/user')
391             fs._root = '/__home/user'
392             ae(fs.fs2ftp('/__home'), '/')
393             ae(fs.fs2ftp('/'), '/')
394             ae(fs.fs2ftp('/__home/userx'), '/')
395         else:
396             # os.sep == ':'? Don't know... let's try it anyway
397             goforit(os.getcwd())
398
399     def test_validpath(self):
400         # Tests for validpath method.
401         fs = ftpserver.AbstractedFS('/', None)
402         fs._root = HOME
403         self.assertTrue(fs.validpath(HOME))
404         self.assertTrue(fs.validpath(HOME + '/'))
405         self.assertFalse(fs.validpath(HOME + 'bar'))
406
407     if hasattr(os, 'symlink'):
408
409         def test_validpath_validlink(self):
410             # Test validpath by issuing a symlink pointing to a path
411             # inside the root directory.
412             fs = ftpserver.AbstractedFS('/', None)
413             fs._root = HOME
414             TESTFN2 = TESTFN + '1'
415             try:
416                 touch(TESTFN)
417                 os.symlink(TESTFN, TESTFN2)
418                 self.assertTrue(fs.validpath(TESTFN))
419             finally:
420                 safe_remove(TESTFN, TESTFN2)
421
422         def test_validpath_external_symlink(self):
423             # Test validpath by issuing a symlink pointing to a path
424             # outside the root directory.
425             fs = ftpserver.AbstractedFS('/', None)
426             fs._root = HOME
427             # tempfile should create our file in /tmp directory
428             # which should be outside the user root.  If it is
429             # not we just skip the test.
430             file = tempfile.NamedTemporaryFile()
431             try:
432                 if HOME == os.path.dirname(file.name):
433                     return
434                 os.symlink(file.name, TESTFN)
435                 self.assertFalse(fs.validpath(TESTFN))
436             finally:
437                 safe_remove(TESTFN)
438                 file.close()
439
440
441 class TestDummyAuthorizer(unittest.TestCase):
442     """Tests for DummyAuthorizer class."""
443
444     # temporarily change warnings to exceptions for the purposes of testing
445     def setUp(self):
446         self.tempdir = tempfile.mkdtemp(dir=HOME)
447         self.subtempdir = tempfile.mkdtemp(dir=os.path.join(HOME, self.tempdir))
448         self.tempfile = touch(os.path.join(self.tempdir, TESTFN))
449         self.subtempfile = touch(os.path.join(self.subtempdir, TESTFN))
450         warnings.filterwarnings("error")
451
452     def tearDown(self):
453         os.remove(self.tempfile)
454         os.remove(self.subtempfile)
455         os.rmdir(self.subtempdir)
456         os.rmdir(self.tempdir)
457         warnings.resetwarnings()
458
459     def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs):
460         try:
461             callableObj(*args, **kwargs)
462         except excClass, why:
463             if str(why) == msg:
464                 return
465             raise self.failureException("%s != %s" % (str(why), msg))
466         else:
467             if hasattr(excClass,'__name__'): excName = excClass.__name__
468             else: excName = str(excClass)
469             raise self.failureException, "%s not raised" % excName
470
471     def test_common_methods(self):
472         auth = ftpserver.DummyAuthorizer()
473         # create user
474         auth.add_user(USER, PASSWD, HOME)
475         auth.add_anonymous(HOME)
476         # check credentials
477         self.assertTrue(auth.validate_authentication(USER, PASSWD))
478         self.assertFalse(auth.validate_authentication(USER, 'wrongpwd'))
479         # remove them
480         auth.remove_user(USER)
481         auth.remove_user('anonymous')
482         # raise exc if user does not exists
483         self.assertRaises(KeyError, auth.remove_user, USER)
484         # raise exc if path does not exist
485         self.assertRaisesWithMsg(ValueError,
486                                 'no such directory: "%s"' % '?:\\',
487                                  auth.add_user, USER, PASSWD, '?:\\')
488         self.assertRaisesWithMsg(ValueError,
489                                 'no such directory: "%s"' % '?:\\',
490                                  auth.add_anonymous, '?:\\')
491         # raise exc if user already exists
492         auth.add_user(USER, PASSWD, HOME)
493         auth.add_anonymous(HOME)
494         self.assertRaisesWithMsg(ValueError,
495                                 'user "%s" already exists' % USER,
496                                  auth.add_user, USER, PASSWD, HOME)
497         self.assertRaisesWithMsg(ValueError,
498                                 'user "anonymous" already exists',
499                                  auth.add_anonymous, HOME)
500         auth.remove_user(USER)
501         auth.remove_user('anonymous')
502         # raise on wrong permission
503         self.assertRaisesWithMsg(ValueError,
504                                  'no such permission "?"',
505                                  auth.add_user, USER, PASSWD, HOME, perm='?')
506         self.assertRaisesWithMsg(ValueError,
507                                  'no such permission "?"',
508                                  auth.add_anonymous, HOME, perm='?')
509         # expect warning on write permissions assigned to anonymous user
510         for x in "adfmw":
511             self.assertRaisesWithMsg(RuntimeWarning,
512                                 "write permissions assigned to anonymous user.",
513                                 auth.add_anonymous, HOME, perm=x)
514
515     def test_override_perm_interface(self):
516         auth = ftpserver.DummyAuthorizer()
517         auth.add_user(USER, PASSWD, HOME, perm='elr')
518         # raise exc if user does not exists
519         self.assertRaises(KeyError, auth.override_perm, USER+'w', HOME, 'elr')
520         # raise exc if path does not exist or it's not a directory
521         self.assertRaisesWithMsg(ValueError,
522                                 'no such directory: "%s"' % '?:\\',
523                                 auth.override_perm, USER, '?:\\', 'elr')
524         self.assertRaisesWithMsg(ValueError,
525                                 'no such directory: "%s"' % self.tempfile,
526                                 auth.override_perm, USER, self.tempfile, 'elr')
527         # raise on wrong permission
528         self.assertRaisesWithMsg(ValueError,
529                                  'no such permission "?"', auth.override_perm,
530                                  USER, HOME, perm='?')
531         # expect warning on write permissions assigned to anonymous user
532         auth.add_anonymous(HOME)
533         for p in "adfmw":
534             self.assertRaisesWithMsg(RuntimeWarning,
535                                 "write permissions assigned to anonymous user.",
536                                 auth.override_perm, 'anonymous', HOME, p)
537         # raise on attempt to override home directory permissions
538         self.assertRaisesWithMsg(ValueError,
539                                  "can't override home directory permissions",
540                                  auth.override_perm, USER, HOME, perm='w')
541         # raise on attempt to override a path escaping home directory
542         if os.path.dirname(HOME) != HOME:
543             self.assertRaisesWithMsg(ValueError,
544                                      "path escapes user home directory",
545                                      auth.override_perm, USER,
546                                      os.path.dirname(HOME), perm='w')
547         # try to re-set an overridden permission
548         auth.override_perm(USER, self.tempdir, perm='w')
549         auth.override_perm(USER, self.tempdir, perm='wr')
550
551     def test_override_perm_recursive_paths(self):
552         auth = ftpserver.DummyAuthorizer()
553         auth.add_user(USER, PASSWD, HOME, perm='elr')
554         self.assertEqual(auth.has_perm(USER, 'w', self.tempdir), False)
555         auth.override_perm(USER, self.tempdir, perm='w', recursive=True)
556         self.assertEqual(auth.has_perm(USER, 'w', HOME), False)
557         self.assertEqual(auth.has_perm(USER, 'w', self.tempdir), True)
558         self.assertEqual(auth.has_perm(USER, 'w', self.tempfile), True)
559         self.assertEqual(auth.has_perm(USER, 'w', self.subtempdir), True)
560         self.assertEqual(auth.has_perm(USER, 'w', self.subtempfile), True)
561
562         self.assertEqual(auth.has_perm(USER, 'w', HOME + '@'), False)
563         self.assertEqual(auth.has_perm(USER, 'w', self.tempdir + '@'), False)
564         path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile))
565         self.assertEqual(auth.has_perm(USER, 'w', path), False)
566         # test case-sensitiveness
567         if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
568             self.assertEqual(auth.has_perm(USER, 'w', self.tempdir.upper()), True)
569
570     def test_override_perm_not_recursive_paths(self):
571         auth = ftpserver.DummyAuthorizer()
572         auth.add_user(USER, PASSWD, HOME, perm='elr')
573         self.assertEqual(auth.has_perm(USER, 'w', self.tempdir), False)
574         auth.override_perm(USER, self.tempdir, perm='w')
575         self.assertEqual(auth.has_perm(USER, 'w', HOME), False)
576         self.assertEqual(auth.has_perm(USER, 'w', self.tempdir), True)
577         self.assertEqual(auth.has_perm(USER, 'w', self.tempfile), True)
578         self.assertEqual(auth.has_perm(USER, 'w', self.subtempdir), False)
579         self.assertEqual(auth.has_perm(USER, 'w', self.subtempfile), False)
580
581         self.assertEqual(auth.has_perm(USER, 'w', HOME + '@'), False)
582         self.assertEqual(auth.has_perm(USER, 'w', self.tempdir + '@'), False)
583         path = os.path.join(self.tempdir + '@', os.path.basename(self.tempfile))
584         self.assertEqual(auth.has_perm(USER, 'w', path), False)
585         # test case-sensitiveness
586         if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
587             self.assertEqual(auth.has_perm(USER, 'w', self.tempdir.upper()), True)
588
589
590 class TestCallLater(unittest.TestCase):
591     """Tests for CallLater class."""
592
593     def setUp(self):
594         for task in ftpserver._scheduler._tasks:
595             if not task.cancelled:
596                 task.cancel()
597         del ftpserver._scheduler._tasks[:]
598
599     def scheduler(self, timeout=0.01, count=100):
600         while ftpserver._scheduler._tasks and count > 0:
601             ftpserver._scheduler()
602             count -= 1
603             time.sleep(timeout)
604
605     def test_interface(self):
606         fun = lambda: 0
607         self.assertRaises(AssertionError, ftpserver.CallLater, -1, fun)
608         x = ftpserver.CallLater(3, fun)
609         self.assertRaises(AssertionError, x.delay, -1)
610         self.assertEqual(x.cancelled, False)
611         x.cancel()
612         self.assertEqual(x.cancelled, True)
613         self.assertRaises(AssertionError, x.call)
614         self.assertRaises(AssertionError, x.reset)
615         self.assertRaises(AssertionError, x.delay, 2)
616         self.assertRaises(AssertionError, x.cancel)
617
618     def test_order(self):
619         l = []
620         fun = lambda x: l.append(x)
621         for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
622             ftpserver.CallLater(x, fun, x)
623         self.scheduler()
624         self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
625
626     def test_delay(self):
627         l = []
628         fun = lambda x: l.append(x)
629         ftpserver.CallLater(0.01, fun, 0.01).delay(0.07)
630         ftpserver.CallLater(0.02, fun, 0.02).delay(0.08)
631         ftpserver.CallLater(0.03, fun, 0.03)
632         ftpserver.CallLater(0.04, fun, 0.04)
633         ftpserver.CallLater(0.05, fun, 0.05)
634         ftpserver.CallLater(0.06, fun, 0.06).delay(0.001)
635         self.scheduler()
636         self.assertEqual(l, [0.06, 0.03, 0.04, 0.05, 0.01, 0.02])
637
638     # The test is reliable only on those systems where time.time()
639     # provides time with a better precision than 1 second.
640     if not str(time.time()).endswith('.0'):
641         def test_reset(self):
642             l = []
643             fun = lambda x: l.append(x)
644             ftpserver.CallLater(0.01, fun, 0.01)
645             ftpserver.CallLater(0.02, fun, 0.02)
646             ftpserver.CallLater(0.03, fun, 0.03)
647             x = ftpserver.CallLater(0.04, fun, 0.04)
648             ftpserver.CallLater(0.05, fun, 0.05)
649             time.sleep(0.1)
650             x.reset()
651             self.scheduler()
652             self.assertEqual(l, [0.01, 0.02, 0.03, 0.05, 0.04])
653
654     def test_cancel(self):
655         l = []
656         fun = lambda x: l.append(x)
657         ftpserver.CallLater(0.01, fun, 0.01).cancel()
658         ftpserver.CallLater(0.02, fun, 0.02)
659         ftpserver.CallLater(0.03, fun, 0.03)
660         ftpserver.CallLater(0.04, fun, 0.04)
661         ftpserver.CallLater(0.05, fun, 0.05).cancel()
662         self.scheduler()
663         self.assertEqual(l, [0.02, 0.03, 0.04])
664
665     def test_errback(self):
666         l = []
667         ftpserver.CallLater(0.0, lambda: 1//0, _errback=lambda: l.append(True))
668         self.scheduler()
669         self.assertEqual(l, [True])
670
671
672 class TestCallEvery(unittest.TestCase):
673     """Tests for CallEvery class."""
674
675     def setUp(self):
676         for task in ftpserver._scheduler._tasks:
677             if not task.cancelled:
678                 task.cancel()
679         del ftpserver._scheduler._tasks[:]
680
681     def scheduler(self, timeout=0.0001):
682         for x in range(100):
683             ftpserver._scheduler()
684             time.sleep(timeout)
685
686     def test_interface(self):
687         fun = lambda: 0
688         self.assertRaises(AssertionError, ftpserver.CallEvery, -1, fun)
689         x = ftpserver.CallEvery(3, fun)
690         self.assertRaises(AssertionError, x.delay, -1)
691         self.assertEqual(x.cancelled, False)
692         x.cancel()
693         self.assertEqual(x.cancelled, True)
694         self.assertRaises(AssertionError, x.call)
695         self.assertRaises(AssertionError, x.reset)
696         self.assertRaises(AssertionError, x.delay, 2)
697         self.assertRaises(AssertionError, x.cancel)
698
699     def test_only_once(self):
700         # make sure that callback is called only once per-loop
701         l1 = []
702         fun = lambda: l1.append(None)
703         ftpserver.CallEvery(0, fun)
704         ftpserver._scheduler()
705         self.assertEqual(l1, [None])
706
707     def test_multi_0_timeout(self):
708         # make sure a 0 timeout callback is called as many times
709         # as the number of loops
710         l = []
711         fun = lambda: l.append(None)
712         ftpserver.CallEvery(0, fun)
713         self.scheduler()
714         self.assertEqual(len(l), 100)
715
716     # run it on systems where time.time() has a higher precision
717     if os.name == 'posix':
718         def test_low_and_high_timeouts(self):
719             # make sure a callback with a lower timeout is called more
720             # frequently than another with a greater timeout
721             l1 = []
722             fun = lambda: l1.append(None)
723             ftpserver.CallEvery(0.001, fun)
724             self.scheduler()
725
726             l2 = []
727             fun = lambda: l2.append(None)
728             ftpserver.CallEvery(0.01, fun)
729             self.scheduler()
730
731             self.assertTrue(len(l1) > len(l2))
732
733     def test_cancel(self):
734         # make sure a cancelled callback doesn't get called anymore
735         l = []
736         fun = lambda: l.append(None)
737         call = ftpserver.CallEvery(0.001, fun)
738         self.scheduler()
739         len_l = len(l)
740         call.cancel()
741         self.scheduler()
742         self.assertEqual(len_l, len(l))
743
744     def test_errback(self):
745         l = []
746         ftpserver.CallEvery(0.0, lambda: 1//0, _errback=lambda: l.append(True))
747         self.scheduler()
748         self.assertTrue(l)
749
750
751 class TestFtpAuthentication(unittest.TestCase):
752     "test: USER, PASS, REIN."
753     server_class = FTPd
754     client_class = ftplib.FTP
755
756     def setUp(self):
757         self.server = self.server_class()
758         self.server.handler._auth_failed_timeout = 0
759         self.server.start()
760         self.client = self.client_class()
761         self.client.connect(self.server.host, self.server.port)
762         self.client.sock.settimeout(2)
763         self.file = open(TESTFN, 'w+b')
764         self.dummyfile = StringIO.StringIO()
765
766     def tearDown(self):
767         self.server.handler._auth_failed_timeout = 5
768         self.client.close()
769         self.server.stop()
770         if not self.file.closed:
771             self.file.close()
772         if not self.dummyfile.closed:
773             self.dummyfile.close()
774         os.remove(TESTFN)
775
776     def test_auth_ok(self):
777         self.client.login(user=USER, passwd=PASSWD)
778
779     def test_anon_auth(self):
780         self.client.login(user='anonymous', passwd='anon@')
781         self.client.login(user='anonymous', passwd='')
782         self.assertRaises(ftplib.error_perm, self.client.login, 'AnoNymouS')
783
784     def test_auth_failed(self):
785         self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
786         self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', PASSWD)
787         self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', 'wrong')
788
789     def test_wrong_cmds_order(self):
790         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pass ' + PASSWD)
791         self.client.login(user=USER, passwd=PASSWD)
792         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pass ' + PASSWD)
793
794     def test_max_auth(self):
795         self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
796         self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
797         self.assertRaises(ftplib.error_perm, self.client.login, USER, 'wrong')
798         # If authentication fails for 3 times ftpd disconnects the
799         # client.  We can check if that happens by using self.client.sendcmd()
800         # on the 'dead' socket object.  If socket object is really
801         # closed it should be raised a socket.error exception (Windows)
802         # or a EOFError exception (Linux).
803         self.assertRaises((socket.error, EOFError), self.client.sendcmd, '')
804
805     def test_rein(self):
806         self.client.login(user=USER, passwd=PASSWD)
807         self.client.sendcmd('rein')
808         # user not authenticated, error response expected
809         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
810         # by logging-in again we should be able to execute a
811         # file-system command
812         self.client.login(user=USER, passwd=PASSWD)
813         self.client.sendcmd('pwd')
814
815     def test_rein_during_transfer(self):
816         # Test REIN while already authenticated and a transfer is
817         # in progress.
818         self.client.login(user=USER, passwd=PASSWD)
819         data = 'abcde12345' * 1000000
820         self.file.write(data)
821         self.file.close()
822
823         conn = self.client.transfercmd('retr ' + TESTFN)
824         rein_sent = False
825         bytes_recv = 0
826         while 1:
827             chunk = conn.recv(8192)
828             if not chunk:
829                 break
830             bytes_recv += len(chunk)
831             self.dummyfile.write(chunk)
832             if bytes_recv > 65536 and not rein_sent:
833                 rein_sent = True
834                 # flush account, error response expected
835                 self.client.sendcmd('rein')
836                 self.assertRaises(ftplib.error_perm, self.client.dir)
837
838         # a 226 response is expected once tranfer finishes
839         self.assertEqual(self.client.voidresp()[:3], '226')
840         # account is still flushed, error response is still expected
841         self.assertRaises(ftplib.error_perm, self.client.sendcmd,
842                           'size ' + TESTFN)
843         # by logging-in again we should be able to execute a
844         # filesystem command
845         self.client.login(user=USER, passwd=PASSWD)
846         self.client.sendcmd('pwd')
847         self.dummyfile.seek(0)
848         self.assertEqual(hash(data), hash (self.dummyfile.read()))
849
850     def test_user(self):
851         # Test USER while already authenticated and no transfer
852         # is in progress.
853         self.client.login(user=USER, passwd=PASSWD)
854         self.client.sendcmd('user ' + USER)  # authentication flushed
855         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
856         self.client.sendcmd('pass ' + PASSWD)
857         self.client.sendcmd('pwd')
858
859     def test_user_during_transfer(self):
860         # Test USER while already authenticated and a transfer is
861         # in progress.
862         self.client.login(user=USER, passwd=PASSWD)
863         data = 'abcde12345' * 1000000
864         self.file.write(data)
865         self.file.close()
866
867         conn = self.client.transfercmd('retr ' + TESTFN)
868         rein_sent = 0
869         bytes_recv = 0
870         while 1:
871             chunk = conn.recv(8192)
872             if not chunk:
873                 break
874             bytes_recv += len(chunk)
875             self.dummyfile.write(chunk)
876             # stop transfer while it isn't finished yet
877             if bytes_recv > 65536 and not rein_sent:
878                 rein_sent = True
879                 # flush account, expect an error response
880                 self.client.sendcmd('user ' + USER)
881                 self.assertRaises(ftplib.error_perm, self.client.dir)
882
883         # a 226 response is expected once transfer finishes
884         self.assertEqual(self.client.voidresp()[:3], '226')
885         # account is still flushed, error response is still expected
886         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pwd')
887         # by logging-in again we should be able to execute a
888         # filesystem command
889         self.client.sendcmd('pass ' + PASSWD)
890         self.client.sendcmd('pwd')
891         self.dummyfile.seek(0)
892         self.assertEqual(hash(data), hash (self.dummyfile.read()))
893
894
895 class TestFtpDummyCmds(unittest.TestCase):
896     "test: TYPE, STRU, MODE, NOOP, SYST, ALLO, HELP, SITE HELP"
897     server_class = FTPd
898     client_class = ftplib.FTP
899
900     def setUp(self):
901         self.server = self.server_class()
902         self.server.start()
903         self.client = self.client_class()
904         self.client.connect(self.server.host, self.server.port)
905         self.client.sock.settimeout(2)
906         self.client.login(USER, PASSWD)
907
908     def tearDown(self):
909         self.client.close()
910         self.server.stop()
911
912     def test_type(self):
913         self.client.sendcmd('type a')
914         self.client.sendcmd('type i')
915         self.client.sendcmd('type l7')
916         self.client.sendcmd('type l8')
917         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'type ?!?')
918
919     def test_stru(self):
920         self.client.sendcmd('stru f')
921         self.client.sendcmd('stru F')
922         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stru p')
923         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stru r')
924         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stru ?!?')
925
926     def test_mode(self):
927         self.client.sendcmd('mode s')
928         self.client.sendcmd('mode S')
929         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mode b')
930         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mode c')
931         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mode ?!?')
932
933     def test_noop(self):
934         self.client.sendcmd('noop')
935
936     def test_syst(self):
937         self.client.sendcmd('syst')
938
939     def test_allo(self):
940         self.client.sendcmd('allo x')
941
942     def test_quit(self):
943         self.client.sendcmd('quit')
944
945     def test_help(self):
946         self.client.sendcmd('help')
947         cmd = random.choice(ftpserver.proto_cmds.keys())
948         self.client.sendcmd('help %s' % cmd)
949         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'help ?!?')
950
951     def test_site(self):
952         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'site')
953         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'site ?!?')
954         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'site foo bar')
955         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'sitefoo bar')
956
957     def test_site_help(self):
958         self.client.sendcmd('site help')
959         self.client.sendcmd('site help help')
960         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'site help ?!?')
961
962     def test_rest(self):
963         # Test error conditions only; resumed data transfers are
964         # tested later.
965         self.client.sendcmd('type i')
966         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest')
967         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest str')
968         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest -1')
969         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest 10.1')
970         # REST is not supposed to be allowed in ASCII mode
971         self.client.sendcmd('type a')
972         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rest 10')
973
974     def test_opts_feat(self):
975         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst bad_fact')
976         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts mlst type ;')
977         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'opts not_mlst')
978         # utility function which used for extracting the MLST "facts"
979         # string from the FEAT response
980         def mlst():
981             resp = self.client.sendcmd('feat')
982             return re.search(r'^\s*MLST\s+(\S+)$', resp, re.MULTILINE).group(1)
983         # we rely on "type", "perm", "size", and "modify" facts which
984         # are those available on all platforms
985         self.assertTrue('type*;perm*;size*;modify*;' in mlst())
986         self.assertEqual(self.client.sendcmd('opts mlst type;'), '200 MLST OPTS type;')
987         self.assertEqual(self.client.sendcmd('opts mLSt TypE;'), '200 MLST OPTS type;')
988         self.assertTrue('type*;perm;size;modify;' in mlst())
989
990         self.assertEqual(self.client.sendcmd('opts mlst'), '200 MLST OPTS ')
991         self.assertTrue(not '*' in mlst())
992
993         self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;'), '200 MLST OPTS ')
994         self.assertTrue(not '*' in mlst())
995         self.assertEqual(self.client.sendcmd('opts mlst fish;cakes;type;'),
996                          '200 MLST OPTS type;')
997         self.assertTrue('type*;perm;size;modify;' in mlst())
998
999
1000 class TestFtpCmdsSemantic(unittest.TestCase):
1001     server_class = FTPd
1002     client_class = ftplib.FTP
1003     arg_cmds = ['allo','appe','dele','eprt','mdtm','mode','mkd','opts','port',
1004                 'rest','retr','rmd','rnfr','rnto','site','size','stor','stru',
1005                 'type','user','xmkd','xrmd','site chmod']
1006
1007     def setUp(self):
1008         self.server = self.server_class()
1009         self.server.start()
1010         self.client = self.client_class()
1011         self.client.connect(self.server.host, self.server.port)
1012         self.client.sock.settimeout(2)
1013         self.client.login(USER, PASSWD)
1014
1015     def tearDown(self):
1016         self.client.close()
1017         self.server.stop()
1018
1019     def test_arg_cmds(self):
1020         # Test commands requiring an argument.
1021         expected = "501 Syntax error: command needs an argument."
1022         for cmd in self.arg_cmds:
1023             self.client.putcmd(cmd)
1024             resp = self.client.getmultiline()
1025             self.assertEqual(resp, expected)
1026
1027     def test_no_arg_cmds(self):
1028         # Test commands accepting no arguments.
1029         expected = "501 Syntax error: command does not accept arguments."
1030         for cmd in ('abor','cdup','feat','noop','pasv','pwd','quit','rein',
1031                     'syst','xcup','xpwd'):
1032             self.client.putcmd(cmd + ' arg')
1033             resp = self.client.getmultiline()
1034             self.assertEqual(resp, expected)
1035
1036     def test_auth_cmds(self):
1037         # Test those commands requiring client to be authenticated.
1038         expected = "530 Log in with USER and PASS first."
1039         self.client.sendcmd('rein')
1040         for cmd in self.server.handler.proto_cmds:
1041             cmd = cmd.lower()
1042             if cmd in ('feat','help','noop','user','pass','stat','syst','quit',
1043                        'site', 'site help', 'pbsz', 'auth', 'prot', 'ccc'):
1044                 continue
1045             if cmd in self.arg_cmds:
1046                 cmd = cmd + ' arg'
1047             self.client.putcmd(cmd)
1048             resp = self.client.getmultiline()
1049             self.assertEqual(resp, expected)
1050
1051     def test_no_auth_cmds(self):
1052         # Test those commands that do not require client to be authenticated.
1053         self.client.sendcmd('rein')
1054         for cmd in ('feat','help','noop','stat','syst','site help'):
1055             self.client.sendcmd(cmd)
1056         # STAT provided with an argument is equal to LIST hence not allowed
1057         # if not authenticated
1058         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat /')
1059         self.client.sendcmd('quit')
1060
1061
1062 class TestFtpFsOperations(unittest.TestCase):
1063     "test: PWD, CWD, CDUP, SIZE, RNFR, RNTO, DELE, MKD, RMD, MDTM, STAT"
1064     server_class = FTPd
1065     client_class = ftplib.FTP
1066
1067     def setUp(self):
1068         self.server = self.server_class()
1069         self.server.start()
1070         self.client = self.client_class()
1071         self.client.connect(self.server.host, self.server.port)
1072         self.client.sock.settimeout(2)
1073         self.client.login(USER, PASSWD)
1074         self.tempfile = os.path.basename(touch(TESTFN))
1075         self.tempdir = os.path.basename(tempfile.mkdtemp(dir=HOME))
1076
1077     def tearDown(self):
1078         self.client.close()
1079         self.server.stop()
1080         safe_remove(self.tempfile)
1081         if os.path.exists(self.tempdir):
1082             shutil.rmtree(self.tempdir)
1083
1084     def test_cwd(self):
1085         self.client.cwd(self.tempdir)
1086         self.assertEqual(self.client.pwd(), '/' + self.tempdir)
1087         self.assertRaises(ftplib.error_perm, self.client.cwd, 'subtempdir')
1088         # cwd provided with no arguments is supposed to move us to the
1089         # root directory
1090         self.client.sendcmd('cwd')
1091         self.assertEqual(self.client.pwd(), '/')
1092
1093     def test_pwd(self):
1094         self.assertEqual(self.client.pwd(), '/')
1095         self.client.cwd(self.tempdir)
1096         self.assertEqual(self.client.pwd(), '/' + self.tempdir)
1097
1098     def test_cdup(self):
1099         subfolder = os.path.basename(tempfile.mkdtemp(dir=self.tempdir))
1100         self.assertEqual(self.client.pwd(), '/')
1101         self.client.cwd(self.tempdir)
1102         self.assertEqual(self.client.pwd(), '/%s' % self.tempdir)
1103         self.client.cwd(subfolder)
1104         self.assertEqual(self.client.pwd(), '/%s/%s' % (self.tempdir, subfolder))
1105         self.client.sendcmd('cdup')
1106         self.assertEqual(self.client.pwd(), '/%s' % self.tempdir)
1107         self.client.sendcmd('cdup')
1108         self.assertEqual(self.client.pwd(), '/')
1109
1110         # make sure we can't escape from root directory
1111         self.client.sendcmd('cdup')
1112         self.assertEqual(self.client.pwd(), '/')
1113
1114     def test_mkd(self):
1115         tempdir = os.path.basename(tempfile.mktemp(dir=HOME))
1116         dirname = self.client.mkd(tempdir)
1117         # the 257 response is supposed to include the absolute dirname
1118         self.assertEqual(dirname, '/' + tempdir)
1119         # make sure we can't create directories which already exist
1120         # (probably not really necessary);
1121         # let's use a try/except statement to avoid leaving behind
1122         # orphaned temporary directory in the event of a test failure.
1123         try:
1124             self.client.mkd(tempdir)
1125         except ftplib.error_perm:
1126             os.rmdir(tempdir)  # ok
1127         else:
1128             self.fail('ftplib.error_perm not raised.')
1129
1130     def test_rmd(self):
1131         self.client.rmd(self.tempdir)
1132         self.assertRaises(ftplib.error_perm, self.client.rmd, self.tempfile)
1133         # make sure we can't remove the root directory
1134         self.assertRaises(ftplib.error_perm, self.client.rmd, '/')
1135
1136     def test_dele(self):
1137         self.client.delete(self.tempfile)
1138         self.assertRaises(ftplib.error_perm, self.client.delete, self.tempdir)
1139
1140     def test_rnfr_rnto(self):
1141         # rename file
1142         tempname = os.path.basename(tempfile.mktemp(dir=HOME))
1143         self.client.rename(self.tempfile, tempname)
1144         self.client.rename(tempname, self.tempfile)
1145         # rename dir
1146         tempname = os.path.basename(tempfile.mktemp(dir=HOME))
1147         self.client.rename(self.tempdir, tempname)
1148         self.client.rename(tempname, self.tempdir)
1149         # rnfr/rnto over non-existing paths
1150         bogus = os.path.basename(tempfile.mktemp(dir=HOME))
1151         self.assertRaises(ftplib.error_perm, self.client.rename, bogus, '/x')
1152         self.assertRaises(ftplib.error_perm, self.client.rename, self.tempfile, '/')
1153         # rnto sent without first specifying the source
1154         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'rnto ' + self.tempfile)
1155
1156         # make sure we can't rename root directory
1157         self.assertRaises(ftplib.error_perm, self.client.rename, '/', '/x')
1158
1159     def test_mdtm(self):
1160         self.client.sendcmd('mdtm ' + self.tempfile)
1161         bogus = os.path.basename(tempfile.mktemp(dir=HOME))
1162         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mdtm ' + bogus)
1163         # make sure we can't use mdtm against directories
1164         try:
1165             self.client.sendcmd('mdtm ' + self.tempdir)
1166         except ftplib.error_perm, err:
1167             self.assertTrue("not retrievable" in str(err))
1168         else:
1169             self.fail('Exception not raised')
1170
1171     def test_unforeseen_mdtm_event(self):
1172         # Emulate a case where the file last modification time is prior
1173         # to year 1900.  This most likely will never happen unless
1174         # someone specifically force the last modification time of a
1175         # file in some way.
1176         # To do so we temporarily override os.path.getmtime so that it
1177         # returns a negative value referring to a year prior to 1900.
1178         # It causes time.localtime/gmtime to raise a ValueError exception
1179         # which is supposed to be handled by server.
1180         _getmtime = ftpserver.AbstractedFS.getmtime
1181         try:
1182             ftpserver.AbstractedFS.getmtime = lambda x, y: -9000000000
1183             self.assertRaises(ftplib.error_perm, self.client.sendcmd,
1184                               'mdtm ' + self.tempfile)
1185             # make sure client hasn't been disconnected
1186             self.client.sendcmd('noop')
1187         finally:
1188             ftpserver.AbstractedFS.getmtime = _getmtime
1189
1190     def test_size(self):
1191         self.client.sendcmd('type a')
1192         self.assertRaises(ftplib.error_perm, self.client.size, self.tempfile)
1193         self.client.sendcmd('type i')
1194         self.client.size(self.tempfile)
1195         # make sure we can't use size against directories
1196         try:
1197             self.client.sendcmd('size ' + self.tempdir)
1198         except ftplib.error_perm, err:
1199             self.assertTrue("not retrievable" in str(err))
1200         else:
1201             self.fail('Exception not raised')
1202
1203     if not hasattr(os, 'chmod'):
1204         def test_site_chmod(self):
1205             self.assertRaises(ftplib.error_perm, self.client.sendcmd,
1206                               'site chmod 777 ' + self.tempfile)
1207     else:
1208         def test_site_chmod(self):
1209             # not enough args
1210             self.assertRaises(ftplib.error_perm,
1211                               self.client.sendcmd, 'site chmod 777')
1212             # bad args
1213             self.assertRaises(ftplib.error_perm, self.client.sendcmd,
1214                               'site chmod -177 ' + self.tempfile)
1215             self.assertRaises(ftplib.error_perm, self.client.sendcmd,
1216                               'site chmod 778 ' + self.tempfile)
1217             self.assertRaises(ftplib.error_perm, self.client.sendcmd,
1218                               'site chmod foo ' + self.tempfile)
1219
1220             # on Windows it is possible to set read-only flag only
1221             if os.name == 'nt':
1222                 self.client.sendcmd('site chmod 777 ' + self.tempfile)
1223                 mode = oct(stat.S_IMODE(os.stat(self.tempfile).st_mode))
1224                 self.assertEqual(mode, '0666')
1225                 self.client.sendcmd('site chmod 444 ' + self.tempfile)
1226                 mode = oct(stat.S_IMODE(os.stat(self.tempfile).st_mode))
1227                 self.assertEqual(mode, '0444')
1228                 self.client.sendcmd('site chmod 666 ' + self.tempfile)
1229                 mode = oct(stat.S_IMODE(os.stat(self.tempfile).st_mode))
1230                 self.assertEqual(mode, '0666')
1231             else:
1232                 self.client.sendcmd('site chmod 777 ' + self.tempfile)
1233                 mode = oct(stat.S_IMODE(os.stat(self.tempfile).st_mode))
1234                 self.assertEqual(mode, '0777')
1235                 self.client.sendcmd('site chmod 755 ' + self.tempfile)
1236                 mode = oct(stat.S_IMODE(os.stat(self.tempfile).st_mode))
1237                 self.assertEqual(mode, '0755')
1238                 self.client.sendcmd('site chmod 555 ' + self.tempfile)
1239                 mode = oct(stat.S_IMODE(os.stat(self.tempfile).st_mode))
1240                 self.assertEqual(mode, '0555')
1241
1242
1243 class TestFtpStoreData(unittest.TestCase):
1244     """Test STOR, STOU, APPE, REST, TYPE."""
1245     server_class = FTPd
1246     client_class = ftplib.FTP
1247
1248     def setUp(self):
1249         self.server = self.server_class()
1250         self.server.start()
1251         self.client = self.client_class()
1252         self.client.connect(self.server.host, self.server.port)
1253         self.client.sock.settimeout(2)
1254         self.client.login(USER, PASSWD)
1255         self.dummy_recvfile = StringIO.StringIO()
1256         self.dummy_sendfile = StringIO.StringIO()
1257
1258     def tearDown(self):
1259         self.client.close()
1260         self.server.stop()
1261         self.dummy_recvfile.close()
1262         self.dummy_sendfile.close()
1263         safe_remove(TESTFN)
1264
1265     def test_stor(self):
1266         try:
1267             data = 'abcde12345' * 100000
1268             self.dummy_sendfile.write(data)
1269             self.dummy_sendfile.seek(0)
1270             self.client.storbinary('stor ' + TESTFN, self.dummy_sendfile)
1271             self.client.retrbinary('retr ' + TESTFN, self.dummy_recvfile.write)
1272             self.dummy_recvfile.seek(0)
1273             self.assertEqual(hash(data), hash (self.dummy_recvfile.read()))
1274         finally:
1275             # We do not use os.remove() because file could still be
1276             # locked by ftpd thread.  If DELE through FTP fails try
1277             # os.remove() as last resort.
1278             if os.path.exists(TESTFN):
1279                 try:
1280                     self.client.delete(TESTFN)
1281                 except (ftplib.Error, EOFError, socket.error):
1282                     safe_remove(TESTFN)
1283
1284     def test_stor_active(self):
1285         # Like test_stor but using PORT
1286         self.client.set_pasv(False)
1287         self.test_stor()
1288
1289     def test_stor_ascii(self):
1290         # Test STOR in ASCII mode
1291
1292         def store(cmd, fp, blocksize=8192):
1293             # like storbinary() except it sends "type a" instead of
1294             # "type i" before starting the transfer
1295             self.client.voidcmd('type a')
1296             conn = self.client.transfercmd(cmd)
1297             while 1:
1298                 buf = fp.read(blocksize)
1299                 if not buf:
1300                     break
1301                 conn.sendall(buf)
1302             conn.close()
1303             return self.client.voidresp()
1304
1305         try:
1306             data = 'abcde12345\r\n' * 100000
1307             self.dummy_sendfile.write(data)
1308             self.dummy_sendfile.seek(0)
1309             store('stor ' + TESTFN, self.dummy_sendfile)
1310             self.client.retrbinary('retr ' + TESTFN, self.dummy_recvfile.write)
1311             expected = data.replace('\r\n', os.linesep)
1312             self.dummy_recvfile.seek(0)
1313             self.assertEqual(hash(expected), hash(self.dummy_recvfile.read()))
1314         finally:
1315             # We do not use os.remove() because file could still be
1316             # locked by ftpd thread.  If DELE through FTP fails try
1317             # os.remove() as last resort.
1318             if os.path.exists(TESTFN):
1319                 try:
1320                     self.client.delete(TESTFN)
1321                 except (ftplib.Error, EOFError, socket.error):
1322                     safe_remove(TESTFN)
1323
1324     def test_stor_ascii_2(self):
1325         # Test that no extra extra carriage returns are added to the
1326         # file in ASCII mode in case CRLF gets truncated in two chunks
1327         # (issue 116)
1328
1329         def store(cmd, fp, blocksize=8192):
1330             # like storbinary() except it sends "type a" instead of
1331             # "type i" before starting the transfer
1332             self.client.voidcmd('type a')
1333             conn = self.client.transfercmd(cmd)
1334             while 1:
1335                 buf = fp.read(blocksize)
1336                 if not buf:
1337                     break
1338                 conn.sendall(buf)
1339             conn.close()
1340             return self.client.voidresp()
1341
1342         old_buffer = ftpserver.DTPHandler.ac_in_buffer_size
1343         try:
1344             # set a small buffer so that CRLF gets delivered in two
1345             # separate chunks: "CRLF", " f", "oo", " CR", "LF", " b", "ar"
1346             ftpserver.DTPHandler.ac_in_buffer_size = 2
1347             data = '\r\n foo \r\n bar'
1348             self.dummy_sendfile.write(data)
1349             self.dummy_sendfile.seek(0)
1350             store('stor ' + TESTFN, self.dummy_sendfile)
1351
1352             expected = data.replace('\r\n', os.linesep)
1353             self.client.retrbinary('retr ' + TESTFN, self.dummy_recvfile.write)
1354             self.dummy_recvfile.seek(0)
1355             self.assertEqual(expected, self.dummy_recvfile.read())
1356         finally:
1357             ftpserver.DTPHandler.ac_in_buffer_size = old_buffer
1358             # We do not use os.remove() because file could still be
1359             # locked by ftpd thread.  If DELE through FTP fails try
1360             # os.remove() as last resort.
1361             if os.path.exists(TESTFN):
1362                 try:
1363                     self.client.delete(TESTFN)
1364                 except (ftplib.Error, EOFError, socket.error):
1365                     safe_remove(TESTFN)
1366
1367     def test_stou(self):
1368         data = 'abcde12345' * 100000
1369         self.dummy_sendfile.write(data)
1370         self.dummy_sendfile.seek(0)
1371
1372         self.client.voidcmd('TYPE I')
1373         # filename comes in as "1xx FILE: <filename>"
1374         filename = self.client.sendcmd('stou').split('FILE: ')[1]
1375         try:
1376             sock = self.client.makeport()
1377             conn, sockaddr = sock.accept()
1378             if hasattr(self.client_class, 'ssl_version'):
1379                 conn = ssl.wrap_socket(conn)
1380             while 1:
1381                 buf = self.dummy_sendfile.read(8192)
1382                 if not buf:
1383                     break
1384                 conn.sendall(buf)
1385             sock.close()
1386             conn.close()
1387             # transfer finished, a 226 response is expected
1388             self.assertEqual('226', self.client.voidresp()[:3])
1389             self.client.retrbinary('retr ' + filename, self.dummy_recvfile.write)
1390             self.dummy_recvfile.seek(0)
1391             self.assertEqual(hash(data), hash (self.dummy_recvfile.read()))
1392         finally:
1393             # We do not use os.remove() because file could still be
1394             # locked by ftpd thread.  If DELE through FTP fails try
1395             # os.remove() as last resort.
1396             if os.path.exists(filename):
1397                 try:
1398                     self.client.delete(filename)
1399                 except (ftplib.Error, EOFError, socket.error):
1400                     safe_remove(filename)
1401
1402     def test_stou_rest(self):
1403         # Watch for STOU preceded by REST, which makes no sense.
1404         self.client.sendcmd('type i')
1405         self.client.sendcmd('rest 10')
1406         self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'stou')
1407
1408     def test_stou_orphaned_file(self):
1409         # Check that no orphaned file gets left behind when STOU fails.
1410         # Even if STOU fails the file is first created and then erased.
1411         # Since we can't know the name of the file the best way that
1412         # we have to test this case is comparing the content of the
1413         # directory before and after STOU has been issued.
1414         # Assuming that TESTFN is supposed to be a "reserved" file
1415         # name we shouldn't get false positives.
1416         safe_remove(TESTFN)
1417         # login as a limited user to let STOU fail
1418         self.client.login('anonymous', '@nopasswd')
1419         before = os.listdir(HOME)
1420         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stou ' + TESTFN)
1421         after = os.listdir(HOME)
1422         if before != after:
1423             for file in after:
1424                 self.assert_(not file.startswith(TESTFN))
1425
1426     def test_appe(self):
1427         try:
1428             data1 = 'abcde12345' * 100000
1429             self.dummy_sendfile.write(data1)
1430             self.dummy_sendfile.seek(0)
1431             self.client.storbinary('stor ' + TESTFN, self.dummy_sendfile)
1432
1433             data2 = 'fghil67890' * 100000
1434             self.dummy_sendfile.write(data2)
1435             self.dummy_sendfile.seek(len(data1))
1436             self.client.storbinary('appe ' + TESTFN, self.dummy_sendfile)
1437
1438             self.client.retrbinary("retr " + TESTFN, self.dummy_recvfile.write)
1439             self.dummy_recvfile.seek(0)
1440             self.assertEqual(hash(data1 + data2), hash (self.dummy_recvfile.read()))
1441         finally:
1442             # We do not use os.remove() because file could still be
1443             # locked by ftpd thread.  If DELE through FTP fails try
1444             # os.remove() as last resort.
1445             if os.path.exists(TESTFN):
1446                 try:
1447                     self.client.delete(TESTFN)
1448                 except (ftplib.Error, EOFError, socket.error):
1449                     safe_remove(TESTFN)
1450
1451     def test_appe_rest(self):
1452         # Watch for APPE preceded by REST, which makes no sense.
1453         self.client.sendcmd('type i')
1454         self.client.sendcmd('rest 10')
1455         self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'appe x')
1456
1457     def test_rest_on_stor(self):
1458         # Test STOR preceded by REST.
1459         data = 'abcde12345' * 100000
1460         self.dummy_sendfile.write(data)
1461         self.dummy_sendfile.seek(0)
1462
1463         self.client.voidcmd('TYPE I')
1464         conn = self.client.transfercmd('stor ' + TESTFN)
1465         bytes_sent = 0
1466         while 1:
1467             chunk = self.dummy_sendfile.read(8192)
1468             conn.sendall(chunk)
1469             bytes_sent += len(chunk)
1470             # stop transfer while it isn't finished yet
1471             if bytes_sent >= 524288 or not chunk:
1472                 break
1473
1474         conn.close()
1475         # transfer wasn't finished yet but server can't know this,
1476         # hence expect a 226 response
1477         self.assertEqual('226', self.client.voidresp()[:3])
1478
1479         # resuming transfer by using a marker value greater than the
1480         # file size stored on the server should result in an error
1481         # on stor
1482         file_size = self.client.size(TESTFN)
1483         self.assertEqual(file_size, bytes_sent)
1484         self.client.sendcmd('rest %s' % ((file_size + 1)))
1485         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stor ' + TESTFN)
1486
1487         self.client.sendcmd('rest %s' % bytes_sent)
1488         self.client.storbinary('stor ' + TESTFN, self.dummy_sendfile)
1489
1490         self.client.retrbinary('retr ' + TESTFN, self.dummy_recvfile.write)
1491         self.dummy_sendfile.seek(0)
1492         self.dummy_recvfile.seek(0)
1493         self.assertEqual(hash(self.dummy_sendfile.read()),
1494                          hash(self.dummy_recvfile.read())
1495                          )
1496         self.client.delete(TESTFN)
1497
1498     def test_failing_rest_on_stor(self):
1499         # Test REST -> STOR against a non existing file.
1500         if os.path.exists(TESTFN):
1501             self.client.delete(TESTFN)
1502         self.client.sendcmd('type i')
1503         self.client.sendcmd('rest 10')
1504         self.assertRaises(ftplib.error_perm, self.client.storbinary,
1505                           'stor ' + TESTFN, lambda x: x)
1506         # if the first STOR failed because of REST, the REST marker
1507         # is supposed to be resetted to 0
1508         self.dummy_sendfile.write('x' * 4096)
1509         self.dummy_sendfile.seek(0)
1510         self.client.storbinary('stor ' + TESTFN, self.dummy_sendfile)
1511
1512     def test_quit_during_transfer(self):
1513         # RFC-959 states that if QUIT is sent while a transfer is in
1514         # progress, the connection must remain open for result response
1515         # and the server will then close it.
1516         conn = self.client.transfercmd('stor ' + TESTFN)
1517         conn.sendall('abcde12345' * 50000)
1518         self.client.sendcmd('quit')
1519         conn.sendall('abcde12345' * 50000)
1520         conn.close()
1521         # expect the response (transfer ok)
1522         self.assertEqual('226', self.client.voidresp()[:3])
1523         # Make sure client has been disconnected.
1524         # socket.error (Windows) or EOFError (Linux) exception is supposed
1525         # to be raised in such a case.
1526         self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
1527
1528     def test_stor_empty_file(self):
1529         self.client.storbinary('stor ' + TESTFN, self.dummy_sendfile)
1530         self.client.quit()
1531         f = open(TESTFN)
1532         self.assertEqual(f.read(), "")
1533         f.close()
1534
1535
1536 if SUPPORTS_SENDFILE:
1537     class TestFtpStoreDataNoSendfile(TestFtpStoreData):
1538         """Test STOR, STOU, APPE, REST, TYPE not using sendfile()."""
1539
1540         def setUp(self):
1541             TestFtpStoreData.setUp(self)
1542             self.server.handler.use_sendfile = False
1543
1544         def tearDown(self):
1545             TestFtpStoreData.tearDown(self)
1546             self.server.handler.use_sendfile = True
1547
1548
1549 class TestFtpRetrieveData(unittest.TestCase):
1550     "Test RETR, REST, TYPE"
1551     server_class = FTPd
1552     client_class = ftplib.FTP
1553
1554     def setUp(self):
1555         self.server = self.server_class()
1556         self.server.start()
1557         self.client = self.client_class()
1558         self.client.connect(self.server.host, self.server.port)
1559         self.client.sock.settimeout(2)
1560         self.client.login(USER, PASSWD)
1561         self.file = open(TESTFN, 'w+b')
1562         self.dummyfile = StringIO.StringIO()
1563
1564     def tearDown(self):
1565         self.client.close()
1566         self.server.stop()
1567         if not self.file.closed:
1568             self.file.close()
1569         if not self.dummyfile.closed:
1570             self.dummyfile.close()
1571         safe_remove(TESTFN)
1572
1573     def test_retr(self):
1574         data = 'abcde12345' * 100000
1575         self.file.write(data)
1576         self.file.close()
1577         self.client.retrbinary("retr " + TESTFN, self.dummyfile.write)
1578         self.dummyfile.seek(0)
1579         self.assertEqual(hash(data), hash(self.dummyfile.read()))
1580
1581         # attempt to retrieve a file which doesn't exist
1582         bogus = os.path.basename(tempfile.mktemp(dir=HOME))
1583         self.assertRaises(ftplib.error_perm, self.client.retrbinary,
1584                                              "retr " + bogus, lambda x: x)
1585
1586     def test_retr_ascii(self):
1587         # Test RETR in ASCII mode.
1588
1589         def retrieve(cmd, callback, blocksize=8192, rest=None):
1590             # like retrbinary but uses TYPE A instead
1591             self.client.voidcmd('type a')
1592             conn = self.client.transfercmd(cmd, rest)
1593             while 1:
1594                 data = conn.recv(blocksize)
1595                 if not data:
1596                     break
1597                 callback(data)
1598             conn.close()
1599             return self.client.voidresp()
1600
1601         data = ('abcde12345' + os.linesep) * 100000
1602         self.file.write(data)
1603         self.file.close()
1604         retrieve("retr " + TESTFN, self.dummyfile.write)
1605         expected = data.replace(os.linesep, '\r\n')
1606         self.dummyfile.seek(0)
1607         self.assertEqual(hash(expected), hash(self.dummyfile.read()))
1608
1609     def test_restore_on_retr(self):
1610         data = 'abcde12345' * 1000000
1611         self.file.write(data)
1612         self.file.close()
1613
1614         received_bytes = 0
1615         self.client.voidcmd('TYPE I')
1616         conn = self.client.transfercmd('retr ' + TESTFN)
1617         while 1:
1618             chunk = conn.recv(8192)
1619             if not chunk:
1620                 break
1621             self.dummyfile.write(chunk)
1622             received_bytes += len(chunk)
1623             if received_bytes >= len(data) // 2:
1624                 break
1625         conn.close()
1626
1627         # transfer wasn't finished yet so we expect a 426 response
1628         self.assertEqual(self.client.getline()[:3], "426")
1629
1630         # resuming transfer by using a marker value greater than the
1631         # file size stored on the server should result in an error
1632         # on retr (RFC-1123)
1633         file_size = self.client.size(TESTFN)
1634         self.client.sendcmd('rest %s' % ((file_size + 1)))
1635         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'retr ' + TESTFN)
1636
1637         # test resume
1638         self.client.sendcmd('rest %s' % received_bytes)
1639         self.client.retrbinary("retr " + TESTFN, self.dummyfile.write)
1640         self.dummyfile.seek(0)
1641         self.assertEqual(hash(data), hash (self.dummyfile.read()))
1642
1643     def test_retr_empty_file(self):
1644         self.client.retrbinary("retr " + TESTFN, self.dummyfile.write)
1645         self.dummyfile.seek(0)
1646         self.assertEqual(self.dummyfile.read(), "")
1647
1648
1649 if SUPPORTS_SENDFILE:
1650     class TestFtpRetrieveDataNoSendfile(TestFtpRetrieveData):
1651         """Test RETR, REST, TYPE by not using sendfile()."""
1652
1653         def setUp(self):
1654             TestFtpRetrieveData.setUp(self)
1655             self.server.handler.use_sendfile = False
1656
1657         def tearDown(self):
1658             TestFtpRetrieveData.tearDown(self)
1659             self.server.handler.use_sendfile = True
1660
1661
1662 class TestFtpListingCmds(unittest.TestCase):
1663     """Test LIST, NLST, argumented STAT."""
1664     server_class = FTPd
1665     client_class = ftplib.FTP
1666
1667     def setUp(self):
1668         self.server = self.server_class()
1669         self.server.start()
1670         self.client = self.client_class()
1671         self.client.connect(self.server.host, self.server.port)
1672         self.client.sock.settimeout(2)
1673         self.client.login(USER, PASSWD)
1674         touch(TESTFN)
1675
1676     def tearDown(self):
1677         self.client.close()
1678         self.server.stop()
1679         os.remove(TESTFN)
1680
1681     def _test_listing_cmds(self, cmd):
1682         """Tests common to LIST NLST and MLSD commands."""
1683         # assume that no argument has the same meaning of "/"
1684         l1 = l2 = []
1685         self.client.retrlines(cmd, l1.append)
1686         self.client.retrlines(cmd + ' /', l2.append)
1687         self.assertEqual(l1, l2)
1688         if cmd.lower() != 'mlsd':
1689             # if pathname is a file one line is expected
1690             x = []
1691             self.client.retrlines('%s ' % cmd + TESTFN, x.append)
1692             self.assertEqual(len(x), 1)
1693             self.assertTrue(''.join(x).endswith(TESTFN))
1694         # non-existent path, 550 response is expected
1695         bogus = os.path.basename(tempfile.mktemp(dir=HOME))
1696         self.assertRaises(ftplib.error_perm, self.client.retrlines,
1697                           '%s ' %cmd + bogus, lambda x: x)
1698         # for an empty directory we excpect that the data channel is
1699         # opened anyway and that no data is received
1700         x = []
1701         tempdir = os.path.basename(tempfile.mkdtemp(dir=HOME))
1702         try:
1703             self.client.retrlines('%s %s' % (cmd, tempdir), x.append)
1704             self.assertEqual(x, [])
1705         finally:
1706             try:
1707                 os.rmdir(tempdir)
1708             except OSError:
1709                 pass
1710
1711     def test_nlst(self):
1712         # common tests
1713         self._test_listing_cmds('nlst')
1714
1715     def test_list(self):
1716         # common tests
1717         self._test_listing_cmds('list')
1718         # known incorrect pathname arguments (e.g. old clients) are
1719         # expected to be treated as if pathname would be == '/'
1720         l1 = l2 = l3 = l4 = l5 = []
1721         self.client.retrlines('list /', l1.append)
1722         self.client.retrlines('list -a', l2.append)
1723         self.client.retrlines('list -l', l3.append)
1724         self.client.retrlines('list -al', l4.append)
1725         self.client.retrlines('list -la', l5.append)
1726         tot = (l1, l2, l3, l4, l5)
1727         for x in range(len(tot) - 1):
1728             self.assertEqual(tot[x], tot[x+1])
1729
1730     def test_mlst(self):
1731         # utility function for extracting the line of interest
1732         mlstline = lambda cmd: self.client.voidcmd(cmd).split('\n')[1]
1733
1734         # the fact set must be preceded by a space
1735         self.assertTrue(mlstline('mlst').startswith(' '))
1736         # where TVFS is supported, a fully qualified pathname is expected
1737         self.assertTrue(mlstline('mlst ' + TESTFN).endswith('/' + TESTFN))
1738         self.assertTrue(mlstline('mlst').endswith('/'))
1739         # assume that no argument has the same meaning of "/"
1740         self.assertEqual(mlstline('mlst'), mlstline('mlst /'))
1741         # non-existent path
1742         bogus = os.path.basename(tempfile.mktemp(dir=HOME))
1743         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'mlst '+bogus)
1744         # test file/dir notations
1745         self.assertTrue('type=dir' in mlstline('mlst'))
1746         self.assertTrue('type=file' in mlstline('mlst ' + TESTFN))
1747         # let's add some tests for OPTS command
1748         self.client.sendcmd('opts mlst type;')
1749         self.assertEqual(mlstline('mlst'), ' type=dir; /')
1750         # where no facts are present, two leading spaces before the
1751         # pathname are required (RFC-3659)
1752         self.client.sendcmd('opts mlst')
1753         self.assertEqual(mlstline('mlst'), '  /')
1754
1755     def test_mlsd(self):
1756         # common tests
1757         self._test_listing_cmds('mlsd')
1758         dir = os.path.basename(tempfile.mkdtemp(dir=HOME))
1759         try:
1760             try:
1761                 self.client.retrlines('mlsd ' + TESTFN, lambda x: x)
1762             except ftplib.error_perm, resp:
1763                 # if path is a file a 501 response code is expected
1764                 self.assertEqual(str(resp)[0:3], "501")
1765             else:
1766                 self.fail("Exception not raised")
1767         finally:
1768             safe_rmdir(dir)
1769
1770     def test_mlsd_all_facts(self):
1771         feat = self.client.sendcmd('feat')
1772         # all the facts
1773         facts = re.search(r'^\s*MLST\s+(\S+)$', feat, re.MULTILINE).group(1)
1774         facts = facts.replace("*;", ";")
1775         self.client.sendcmd('opts mlst ' + facts)
1776         resp = self.client.sendcmd('mlst')
1777
1778         local = facts[:-1].split(";")
1779         returned = resp.split("\n")[1].strip()[:-3]
1780         returned = [x.split("=")[0] for x in returned.split(";")]
1781         self.assertEqual(sorted(local), sorted(returned))
1782
1783         self.assertTrue("type" in resp)
1784         self.assertTrue("size" in resp)
1785         self.assertTrue("perm" in resp)
1786         self.assertTrue("modify" in resp)
1787         if os.name == 'posix':
1788             self.assertTrue("unique" in resp)
1789             self.assertTrue("unix.mode" in resp)
1790             self.assertTrue("unix.uid" in resp)
1791             self.assertTrue("unix.gid" in resp)
1792         elif os.name == 'nt':
1793             self.assertTrue("create" in resp)
1794
1795     def test_stat(self):
1796         # Test STAT provided with argument which is equal to LIST
1797         self.client.sendcmd('stat /')
1798         self.client.sendcmd('stat ' + TESTFN)
1799         self.client.putcmd('stat *')
1800         resp = self.client.getmultiline()
1801         self.assertEqual(resp, '550 Globbing not supported.')
1802         bogus = os.path.basename(tempfile.mktemp(dir=HOME))
1803         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'stat ' + bogus)
1804
1805     def test_unforeseen_time_event(self):
1806         # Emulate a case where the file last modification time is prior
1807         # to year 1900.  This most likely will never happen unless
1808         # someone specifically force the last modification time of a
1809         # file in some way.
1810         # To do so we temporarily override os.path.getmtime so that it
1811         # returns a negative value referring to a year prior to 1900.
1812         # It causes time.localtime/gmtime to raise a ValueError exception
1813         # which is supposed to be handled by server.
1814         _getmtime = ftpserver.AbstractedFS.getmtime
1815         try:
1816             ftpserver.AbstractedFS.getmtime = lambda x, y: -9000000000
1817             self.client.sendcmd('stat /')  # test AbstractedFS.format_list()
1818             self.client.sendcmd('mlst /')  # test AbstractedFS.format_mlsx()
1819             # make sure client hasn't been disconnected
1820             self.client.sendcmd('noop')
1821         finally:
1822             ftpserver.AbstractedFS.getmtime = _getmtime
1823
1824
1825 class TestFtpAbort(unittest.TestCase):
1826     "test: ABOR"
1827     server_class = FTPd
1828     client_class = ftplib.FTP
1829
1830     def setUp(self):
1831         self.server = self.server_class()
1832         self.server.start()
1833         self.client = self.client_class()
1834         self.client.connect(self.server.host, self.server.port)
1835         self.client.sock.settimeout(2)
1836         self.client.login(USER, PASSWD)
1837
1838     def tearDown(self):
1839         self.client.close()
1840         self.server.stop()
1841
1842     def test_abor_no_data(self):
1843         # Case 1: ABOR while no data channel is opened: respond with 225.
1844         resp = self.client.sendcmd('ABOR')
1845         self.assertEqual('225 No transfer to abort.', resp)
1846         self.client.retrlines('list', [].append)
1847
1848     def test_abor_pasv(self):
1849         # Case 2: user sends a PASV, a data-channel socket is listening
1850         # but not connected, and ABOR is sent: close listening data
1851         # socket, respond with 225.
1852         self.client.makepasv()
1853         respcode = self.client.sendcmd('ABOR')[:3]
1854         self.assertEqual('225', respcode)
1855         self.client.retrlines('list', [].append)
1856
1857     def test_abor_port(self):
1858         # Case 3: data channel opened with PASV or PORT, but ABOR sent
1859         # before a data transfer has been started: close data channel,
1860         # respond with 225
1861         self.client.set_pasv(0)
1862         sock = self.client.makeport()
1863         respcode = self.client.sendcmd('ABOR')[:3]
1864         sock.close()
1865         self.assertEqual('225', respcode)
1866         self.client.retrlines('list', [].append)
1867
1868     def test_abor_during_transfer(self):
1869         # Case 4: ABOR while a data transfer on DTP channel is in
1870         # progress: close data channel, respond with 426, respond
1871         # with 226.
1872         data = 'abcde12345' * 1000000
1873         f = open(TESTFN, 'w+b')
1874         f.write(data)
1875         f.close()
1876         try:
1877             self.client.voidcmd('TYPE I')
1878             conn = self.client.transfercmd('retr ' + TESTFN)
1879             bytes_recv = 0
1880             while bytes_recv < 65536:
1881                 chunk = conn.recv(8192)
1882                 bytes_recv += len(chunk)
1883
1884             # stop transfer while it isn't finished yet
1885             self.client.putcmd('ABOR')
1886
1887             # transfer isn't finished yet so ftpd should respond with 426
1888             self.assertEqual(self.client.getline()[:3], "426")
1889
1890             # transfer successfully aborted, so should now respond with a 226
1891             self.assertEqual('226', self.client.voidresp()[:3])
1892         finally:
1893             # We do not use os.remove() because file could still be
1894             # locked by ftpd thread.  If DELE through FTP fails try
1895             # os.remove() as last resort.
1896             try:
1897                 self.client.delete(TESTFN)
1898             except (ftplib.Error, EOFError, socket.error):
1899                 safe_remove(TESTFN)
1900
1901     if hasattr(socket, 'MSG_OOB'):
1902         def test_oob_abor(self):
1903             # Send ABOR by following the RFC-959 directives of sending
1904             # Telnet IP/Synch sequence as OOB data.
1905             # On some systems like FreeBSD this happened to be a problem
1906             # due to a different SO_OOBINLINE behavior.
1907             # On some platforms (e.g. Python CE) the test may fail
1908             # although the MSG_OOB constant is defined.
1909             self.client.sock.sendall(chr(244), socket.MSG_OOB)
1910             self.client.sock.sendall(chr(255), socket.MSG_OOB)
1911             self.client.sock.sendall('abor\r\n')
1912             self.client.sock.settimeout(1)
1913             self.assertEqual(self.client.getresp()[:3], '225')
1914
1915
1916 class TestTimeouts(unittest.TestCase):
1917     """Test idle-timeout capabilities of control and data channels.
1918     Some tests may fail on slow machines.
1919     """
1920     server_class = FTPd
1921     client_class = ftplib.FTP
1922
1923     def setUp(self):
1924         self.server = None
1925         self.client = None
1926
1927     def _setUp(self, idle_timeout=300, data_timeout=300, pasv_timeout=30,
1928                port_timeout=30):
1929         self.server = self.server_class()
1930         self.server.handler.timeout = idle_timeout
1931         self.server.handler.dtp_handler.timeout = data_timeout
1932         self.server.handler.passive_dtp.timeout = pasv_timeout
1933         self.server.handler.active_dtp.timeout = port_timeout
1934         self.server.start()
1935         self.client = self.client_class()
1936         self.client.connect(self.server.host, self.server.port)
1937         self.client.sock.settimeout(2)
1938         self.client.login(USER, PASSWD)
1939
1940     def tearDown(self):
1941         if self.client is not None and self.server is not None:
1942             self.client.close()
1943             self.server.handler.timeout = 300
1944             self.server.handler.dtp_handler.timeout = 300
1945             self.server.handler.passive_dtp.timeout = 30
1946             self.server.handler.active_dtp.timeout = 30
1947             self.server.stop()
1948
1949     def test_idle_timeout(self):
1950         # Test control channel timeout.  The client which does not send
1951         # any command within the time specified in FTPHandler.timeout is
1952         # supposed to be kicked off.
1953         self._setUp(idle_timeout=0.1)
1954         # fail if no msg is received within 1 second
1955         self.client.sock.settimeout(1)
1956         data = self.client.sock.recv(1024)
1957         self.assertEqual(data, "421 Control connection timed out.\r\n")
1958         # ensure client has been kicked off
1959         self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
1960
1961     def test_data_timeout(self):
1962         # Test data channel timeout.  The client which does not send
1963         # or receive any data within the time specified in
1964         # DTPHandler.timeout is supposed to be kicked off.
1965         self._setUp(data_timeout=0.1)
1966         addr = self.client.makepasv()
1967         s = socket.socket()
1968         s.connect(addr)
1969         # fail if no msg is received within 1 second
1970         self.client.sock.settimeout(1)
1971         data = self.client.sock.recv(1024)
1972         self.assertEqual(data, "421 Data connection timed out.\r\n")
1973         # ensure client has been kicked off
1974         self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
1975
1976     def test_data_timeout_not_reached(self):
1977         # Impose a timeout for the data channel, then keep sending data for a
1978         # time which is longer than that to make sure that the code checking
1979         # whether the transfer stalled for with no progress is executed.
1980         self._setUp(data_timeout=0.1)
1981         sock = self.client.transfercmd('stor ' + TESTFN)
1982         if hasattr(self.client_class, 'ssl_version'):
1983             sock = ssl.wrap_socket(sock)
1984         try:
1985             stop_at = time.time() + 0.2
1986             while time.time() < stop_at:
1987                 sock.send('x' * 1024)
1988             sock.close()
1989             self.client.voidresp()
1990         finally:
1991             if os.path.exists(TESTFN):
1992                 self.client.delete(TESTFN)
1993
1994     def test_idle_data_timeout1(self):
1995         # Tests that the control connection timeout is suspended while
1996         # the data channel is opened
1997         self._setUp(idle_timeout=0.1, data_timeout=0.2)
1998         addr = self.client.makepasv()
1999         s = socket.socket()
2000         s.connect(addr)
2001         # fail if no msg is received within 1 second
2002         self.client.sock.settimeout(1)
2003         data = self.client.sock.recv(1024)
2004         self.assertEqual(data, "421 Data connection timed out.\r\n")
2005         # ensure client has been kicked off
2006         self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
2007
2008     def test_idle_data_timeout2(self):
2009         # Tests that the control connection timeout is restarted after
2010         # data channel has been closed
2011         self._setUp(idle_timeout=0.1, data_timeout=0.2)
2012         addr = self.client.makepasv()
2013         s = socket.socket()
2014         s.connect(addr)
2015         # close data channel
2016         self.client.sendcmd('abor')
2017         self.client.sock.settimeout(1)
2018         data = self.client.sock.recv(1024)
2019         self.assertEqual(data, "421 Control connection timed out.\r\n")
2020         # ensure client has been kicked off
2021         self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
2022
2023     def test_pasv_timeout(self):
2024         # Test pasv data channel timeout.  The client which does not
2025         # connect to the listening data socket within the time specified
2026         # in PassiveDTP.timeout is supposed to receive a 421 response.
2027         self._setUp(pasv_timeout=0.1)
2028         self.client.makepasv()
2029         # fail if no msg is received within 1 second
2030         self.client.sock.settimeout(1)
2031         data = self.client.sock.recv(1024)
2032         self.assertEqual(data, "421 Passive data channel timed out.\r\n")
2033         # client is not expected to be kicked off
2034         self.client.sendcmd('noop')
2035
2036     def test_disabled_idle_timeout(self):
2037         self._setUp(idle_timeout=0)
2038         self.client.sendcmd('noop')
2039
2040     def test_disabled_data_timeout(self):
2041         self._setUp(data_timeout=0)
2042         addr = self.client.makepasv()
2043         s = socket.socket()
2044         s.connect(addr)
2045         s.close()
2046
2047     def test_disabled_pasv_timeout(self):
2048         self._setUp(pasv_timeout=0)
2049         self.client.makepasv()
2050         # reset passive socket
2051         addr = self.client.makepasv()
2052         s = socket.socket()
2053         s.connect(addr)
2054         s.close()
2055
2056     def test_disabled_port_timeout(self):
2057         self._setUp(port_timeout=0)
2058         s1 =self.client.makeport()
2059         s2 = self.client.makeport()
2060         s1.close()
2061         s2.close()
2062
2063
2064 class TestConfigurableOptions(unittest.TestCase):
2065     """Test those daemon options which are commonly modified by user."""
2066     server_class = FTPd
2067     client_class = ftplib.FTP
2068
2069     def setUp(self):
2070         touch(TESTFN)
2071         self.server = self.server_class()
2072         self.server.start()
2073         self.client = self.client_class()
2074         self.client.connect(self.server.host, self.server.port)
2075         self.client.sock.settimeout(2)
2076         self.client.login(USER, PASSWD)
2077
2078     def tearDown(self):
2079         os.remove(TESTFN)
2080         # set back options to their original value
2081         self.server.server.max_cons = 0
2082         self.server.server.max_cons_per_ip = 0
2083         self.server.handler.banner = "pyftpdlib %s ready." % ftpserver.__ver__
2084         self.server.handler.max_login_attempts = 3
2085         self.server.handler._auth_failed_timeout = 5
2086         self.server.handler.masquerade_address = None
2087         self.server.handler.masquerade_address_map = {}
2088         self.server.handler.permit_privileged_ports = False
2089         self.server.handler.passive_ports = None
2090         self.server.handler.use_gmt_times = True
2091         self.server.handler.tcp_no_delay = hasattr(socket, 'TCP_NODELAY')
2092         self.server.stop()
2093
2094     def test_max_connections(self):
2095         # Test FTPServer.max_cons attribute
2096         self.server.server.max_cons = 3
2097         self.client.quit()
2098         c1 = self.client_class()
2099         c2 = self.client_class()
2100         c3 = self.client_class()
2101         try:
2102             c1.connect(self.server.host, self.server.port)
2103             c2.connect(self.server.host, self.server.port)
2104             self.assertRaises(ftplib.error_temp, c3.connect, self.server.host,
2105                               self.server.port)
2106             # with passive data channel established
2107             c2.quit()
2108             c1.login(USER, PASSWD)
2109             c1.makepasv()
2110             self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
2111                               self.server.port)
2112             # with passive data socket waiting for connection
2113             c1.login(USER, PASSWD)
2114             c1.sendcmd('pasv')
2115             self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
2116                               self.server.port)
2117             # with active data channel established
2118             c1.login(USER, PASSWD)
2119             sock = c1.makeport()
2120             self.assertRaises(ftplib.error_temp, c2.connect, self.server.host,
2121                               self.server.port)
2122             sock.close()
2123         finally:
2124             c1.close()
2125             c2.close()
2126             c3.close()
2127
2128     def test_max_connections_per_ip(self):
2129         # Test FTPServer.max_cons_per_ip attribute
2130         self.server.server.max_cons_per_ip = 3
2131         self.client.quit()
2132         c1 = self.client_class()
2133         c2 = self.client_class()
2134         c3 = self.client_class()
2135         c4 = self.client_class()
2136         try:
2137             c1.connect(self.server.host, self.server.port)
2138             c2.connect(self.server.host, self.server.port)
2139             c3.connect(self.server.host, self.server.port)
2140             self.assertRaises(ftplib.error_temp, c4.connect, self.server.host,
2141                               self.server.port)
2142             # Make sure client has been disconnected.
2143             # socket.error (Windows) or EOFError (Linux) exception is
2144             # supposed to be raised in such a case.
2145             self.assertRaises((socket.error, EOFError), c4.sendcmd, 'noop')
2146         finally:
2147             c1.close()
2148             c2.close()
2149             c3.close()
2150             c4.close()
2151
2152     def test_banner(self):
2153         # Test FTPHandler.banner attribute
2154         self.server.handler.banner = 'hello there'
2155         self.client.close()
2156         self.client = self.client_class()
2157         self.client.connect(self.server.host, self.server.port)
2158         self.client.sock.settimeout(2)
2159         self.assertEqual(self.client.getwelcome()[4:], 'hello there')
2160
2161     def test_max_login_attempts(self):
2162         # Test FTPHandler.max_login_attempts attribute.
2163         self.server.handler.max_login_attempts = 1
2164         self.server.handler._auth_failed_timeout = 0
2165         self.assertRaises(ftplib.error_perm, self.client.login, 'wrong', 'wrong')
2166         # socket.error (Windows) or EOFError (Linux) exceptions are
2167         # supposed to be raised when attempting to send/recv some data
2168         # using a disconnected socket
2169         self.assertRaises((socket.error, EOFError), self.client.sendcmd, 'noop')
2170
2171     def test_masquerade_address(self):
2172         # Test FTPHandler.masquerade_address attribute
2173         host, port = self.client.makepasv()
2174         self.assertEqual(host, self.server.host)
2175         self.server.handler.masquerade_address = "256.256.256.256"
2176         host, port = self.client.makepasv()
2177         self.assertEqual(host, "256.256.256.256")
2178
2179     def test_masquerade_address_map(self):
2180         # Test FTPHandler.masquerade_address_map attribute
2181         host, port = self.client.makepasv()
2182         self.assertEqual(host, self.server.host)
2183         self.server.handler.masquerade_address_map = {self.server.host :
2184                                                       "128.128.128.128"}
2185         host, port = self.client.makepasv()
2186         self.assertEqual(host, "128.128.128.128")
2187
2188     def test_passive_ports(self):
2189         # Test FTPHandler.passive_ports attribute
2190         _range = range(40000, 60000, 200)
2191         self.server.handler.passive_ports = _range
2192         self.assert_(self.client.makepasv()[1] in _range)
2193         self.assert_(self.client.makepasv()[1] in _range)
2194         self.assert_(self.client.makepasv()[1] in _range)
2195         self.assert_(self.client.makepasv()[1] in _range)
2196
2197     def test_passive_ports_busy(self):
2198         # If the ports in the configured range are busy it is expected
2199         # that a kernel-assigned port gets chosen
2200         s = socket.socket()
2201         s.bind((HOST, 0))
2202         port = s.getsockname()[1]
2203         self.server.handler.passive_ports = [port]
2204         resulting_port = self.client.makepasv()[1]
2205         self.assert_(port != resulting_port)
2206
2207     def test_permit_privileged_ports(self):
2208         # Test FTPHandler.permit_privileged_ports_active attribute
2209
2210         # try to bind a socket on a privileged port
2211         sock = None
2212         for port in reversed(range(1, 1024)):
2213             try:
2214                 socket.getservbyport(port)
2215             except socket.error, err:
2216                 # not registered port; go on
2217                 try:
2218                     sock = socket.socket(self.client.af, socket.SOCK_STREAM)
2219                     sock.bind((HOST, port))
2220                     break
2221                 except socket.error, err:
2222                     if err.args[0] == errno.EACCES:
2223                         # root privileges needed
2224                         sock = None
2225                         break
2226                     sock.close()
2227                     continue
2228             else:
2229                 # registered port found; skip to the next one
2230                 continue
2231         else:
2232             # no usable privileged port was found
2233             sock = None
2234
2235         try:
2236             self.server.handler.permit_privileged_ports = False
2237             self.assertRaises(ftplib.error_perm, self.client.sendport, HOST,
2238                               port)
2239             if sock:
2240                 port = sock.getsockname()[1]
2241                 self.server.handler.permit_privileged_ports = True
2242                 sock.listen(5)
2243                 sock.settimeout(2)
2244                 self.client.sendport(HOST, port)
2245                 sock.accept()
2246         finally:
2247             if sock is not None:
2248                 sock.close()
2249
2250     def test_use_gmt_times(self):
2251         # use GMT time
2252         self.server.handler.use_gmt_times = True
2253         gmt1 = self.client.sendcmd('mdtm ' + TESTFN)
2254         gmt2 = self.client.sendcmd('mlst ' + TESTFN)
2255         gmt3 = self.client.sendcmd('stat ' + TESTFN)
2256
2257         # use local time
2258         self.server.handler.use_gmt_times = False
2259
2260         self.client.quit()
2261         self.client.connect(self.server.host, self.server.port)
2262         self.client.sock.settimeout(2)
2263         self.client.login(USER, PASSWD)
2264
2265         loc1 = self.client.sendcmd('mdtm ' + TESTFN)
2266         loc2 = self.client.sendcmd('mlst ' + TESTFN)
2267         loc3 = self.client.sendcmd('stat ' + TESTFN)
2268
2269         # if we're not in a GMT time zone times are supposed to be
2270         # different
2271         if time.timezone != 0:
2272             self.assertNotEqual(gmt1, loc1)
2273             self.assertNotEqual(gmt2, loc2)
2274             self.assertNotEqual(gmt3, loc3)
2275         # ...otherwise they should be the same
2276         else:
2277             self.assertEqual(gmt1, loc1)
2278             self.assertEqual(gmt2, loc2)
2279             self.assertEqual(gmt3, loc3)
2280
2281     if hasattr(socket, 'TCP_NODELAY'):
2282         def test_tcp_no_delay(self):
2283             def get_handler_socket():
2284                 # return the server's handler socket object
2285                 for fd in asyncore.socket_map:
2286                     instance = asyncore.socket_map[fd]
2287                     if isinstance(instance, ftpserver.FTPHandler):
2288                         break
2289                 return instance.socket
2290
2291             s = get_handler_socket()
2292             self.assertTrue(s.getsockopt(socket.SOL_TCP, socket.TCP_NODELAY))
2293             self.client.quit()
2294             self.server.handler.tcp_no_delay = False
2295             self.client.connect(self.server.host, self.server.port)
2296             self.client.sendcmd('noop')
2297             s = get_handler_socket()
2298             self.assertFalse(s.getsockopt(socket.SOL_TCP, socket.TCP_NODELAY))
2299
2300
2301 class TestCallbacks(unittest.TestCase):
2302     """Test FTPHandler class callback methods."""
2303     server_class = FTPd
2304     client_class = ftplib.FTP
2305
2306     def setUp(self):
2307         self.client = None
2308         self.server = None
2309         self._tearDown = True
2310
2311     def _setUp(self, handler, login=True):
2312         FTPd.handler = handler
2313         self.server = self.server_class()
2314         self.server.start()
2315         self.client = self.client_class()
2316         self.client.connect(self.server.host, self.server.port)
2317         self.client.sock.settimeout(2)
2318         if login:
2319             self.client.login(USER, PASSWD)
2320         self.file = open(TESTFN, 'w+b')
2321         self.dummyfile = StringIO.StringIO()
2322         self._tearDown = False
2323
2324     def tearDown(self):
2325         if not self._tearDown:
2326             FTPd.handler = ftpserver.FTPHandler
2327             self._tearDown = True
2328             if self.client is not None:
2329                 self.client.close()
2330             if self.server is not None:
2331                 self.server.stop()
2332             if not self.file.closed:
2333                 self.file.close()
2334             if not self.dummyfile.closed:
2335                 self.dummyfile.close()
2336             os.remove(TESTFN)
2337
2338     def test_on_file_sent(self):
2339         _file = []
2340
2341         class TestHandler(ftpserver.FTPHandler):
2342
2343             def on_file_sent(self, file):
2344                 _file.append(file)
2345
2346             def on_file_received(self, file):
2347                 raise Exception
2348
2349             def on_incomplete_file_sent(self, file):
2350                 raise Exception
2351
2352             def on_incomplete_file_received(self, file):
2353                 raise Exception
2354
2355         self._setUp(TestHandler)
2356         data = 'abcde12345' * 100000
2357         self.file.write(data)
2358         self.file.close()
2359         self.client.retrbinary("retr " + TESTFN, lambda x: x)
2360         # shut down the server to avoid race conditions
2361         self.tearDown()
2362         self.assertEqual(_file, [os.path.abspath(TESTFN)])
2363
2364     def test_on_file_received(self):
2365         _file = []
2366
2367         class TestHandler(ftpserver.FTPHandler):
2368
2369             def on_file_sent(self, file):
2370                 raise Exception
2371
2372             def on_file_received(self, file):
2373                 _file.append(file)
2374
2375             def on_incomplete_file_sent(self, file):
2376                 raise Exception
2377
2378             def on_incomplete_file_received(self, file):
2379                 raise Exception
2380
2381         self._setUp(TestHandler)
2382         data = 'abcde12345' * 100000
2383         self.dummyfile.write(data)
2384         self.dummyfile.seek(0)
2385         self.client.storbinary('stor ' + TESTFN, self.dummyfile)
2386         # shut down the server to avoid race conditions
2387         self.tearDown()
2388         self.assertEqual(_file, [os.path.abspath(TESTFN)])
2389
2390     def test_on_incomplete_file_sent(self):
2391         _file = []
2392
2393         class TestHandler(ftpserver.FTPHandler):
2394
2395             def on_file_sent(self, file):
2396                 raise Exception
2397
2398             def on_file_received(self, file):
2399                 raise Exception
2400
2401             def on_incomplete_file_sent(self, file):
2402                 _file.append(file)
2403
2404             def on_incomplete_file_received(self, file):
2405                 raise Exception
2406
2407         self._setUp(TestHandler)
2408         data = 'abcde12345' * 100000
2409         self.file.write(data)
2410         self.file.close()
2411
2412         bytes_recv = 0
2413         conn = self.client.transfercmd("retr " + TESTFN, None)
2414         while 1:
2415             chunk = conn.recv(8192)
2416             bytes_recv += len(chunk)
2417             if bytes_recv >= 524288 or not chunk:
2418                 break
2419         conn.close()
2420         self.assertEqual(self.client.getline()[:3], "426")
2421
2422         # shut down the server to avoid race conditions
2423         self.tearDown()
2424         self.assertEqual(_file, [os.path.abspath(TESTFN)])
2425
2426     def test_on_incomplete_file_received(self):
2427         _file = []
2428
2429         class TestHandler(ftpserver.FTPHandler):
2430
2431             def on_file_sent(self, file):
2432                 raise Exception
2433
2434             def on_file_received(self, file):
2435                 raise Exception
2436
2437             def on_incomplete_file_sent(self, file):
2438                 raise Exception
2439
2440             def on_incomplete_file_received(self, file):
2441                 _file.append(file)
2442
2443         self._setUp(TestHandler)
2444         data = 'abcde12345' * 100000
2445         self.dummyfile.write(data)
2446         self.dummyfile.seek(0)
2447
2448         conn = self.client.transfercmd('stor ' + TESTFN)
2449         bytes_sent = 0
2450         while 1:
2451             chunk = self.dummyfile.read(8192)
2452             conn.sendall(chunk)
2453             bytes_sent += len(chunk)
2454             # stop transfer while it isn't finished yet
2455             if bytes_sent >= 524288 or not chunk:
2456                 self.client.putcmd('abor')
2457                 break
2458         conn.close()
2459
2460         # shut down the server to avoid race conditions
2461         self.tearDown()
2462         self.assertEqual(_file, [os.path.abspath(TESTFN)])
2463
2464     def test_on_login(self):
2465         user = []
2466
2467         class TestHandler(ftpserver.FTPHandler):
2468             _auth_failed_timeout = 0
2469
2470             def on_login(self, username):
2471                 user.append(username)
2472
2473             def on_login_failed(self, username, password):
2474                 raise Exception
2475
2476
2477         self._setUp(TestHandler)
2478         # shut down the server to avoid race conditions
2479         self.tearDown()
2480         self.assertEqual(user, [USER])
2481
2482     def test_on_login_failed(self):
2483         pair = []
2484
2485         class TestHandler(ftpserver.FTPHandler):
2486             _auth_failed_timeout = 0
2487
2488             def on_login(self, username):
2489                 raise Exception
2490
2491             def on_login_failed(self, username, password):
2492                 pair.append((username, password))
2493
2494         self._setUp(TestHandler, login=False)
2495         self.assertRaises(ftplib.error_perm, self.client.login, 'foo', 'bar')
2496         # shut down the server to avoid race conditions
2497         self.tearDown()
2498         self.assertEqual(pair, [('foo', 'bar')])
2499
2500     def test_on_login_failed(self):
2501         pair = []
2502
2503         class TestHandler(ftpserver.FTPHandler):
2504             _auth_failed_timeout = 0
2505
2506             def on_login(self, username):
2507                 raise Exception
2508
2509             def on_login_failed(self, username, password):
2510                 pair.append((username, password))
2511
2512         self._setUp(TestHandler, login=False)
2513         self.assertRaises(ftplib.error_perm, self.client.login, 'foo', 'bar')
2514         # shut down the server to avoid race conditions
2515         self.tearDown()
2516         self.assertEqual(pair, [('foo', 'bar')])
2517
2518
2519     def test_on_logout_quit(self):
2520         user = []
2521
2522         class TestHandler(ftpserver.FTPHandler):
2523
2524             def on_logout(self, username):
2525                 user.append(username)
2526
2527         self._setUp(TestHandler)
2528         self.client.quit()
2529         # shut down the server to avoid race conditions
2530         self.tearDown()
2531         self.assertEqual(user, [USER])
2532
2533     def test_on_logout_rein(self):
2534         user = []
2535
2536         class TestHandler(ftpserver.FTPHandler):
2537
2538             def on_logout(self, username):
2539                 user.append(username)
2540
2541         self._setUp(TestHandler)
2542         self.client.sendcmd('rein')
2543         # shut down the server to avoid race conditions
2544         self.tearDown()
2545         self.assertEqual(user, [USER])
2546
2547     def test_on_logout_user_issued_twice(self):
2548         users = []
2549
2550         class TestHandler(ftpserver.FTPHandler):
2551
2552             def on_logout(self, username):
2553                 users.append(username)
2554
2555         self._setUp(TestHandler)
2556         # At this point user "user" is logged in. Re-login as anonymous,
2557         # then quit and expect queue == ["user", "anonymous"]
2558         self.client.login("anonymous")
2559         self.client.quit()
2560         # shut down the server to avoid race conditions
2561         self.tearDown()
2562         self.assertEqual(users, [USER, 'anonymous'])
2563
2564
2565 class _TestNetworkProtocols(unittest.TestCase):
2566     """Test PASV, EPSV, PORT and EPRT commands.
2567
2568     Do not use this class directly, let TestIPv4Environment and
2569     TestIPv6Environment classes use it instead.
2570     """
2571     server_class = FTPd
2572     client_class = ftplib.FTP
2573     HOST = HOST
2574
2575     def setUp(self):
2576         self.server = self.server_class(self.HOST)
2577         self.server.start()
2578         self.client = self.client_class()
2579         self.client.connect(self.server.host, self.server.port)
2580         self.client.sock.settimeout(2)
2581         self.client.login(USER, PASSWD)
2582         if self.client.af == socket.AF_INET:
2583             self.proto = "1"
2584             self.other_proto = "2"
2585         else:
2586             self.proto = "2"
2587             self.other_proto = "1"
2588
2589     def tearDown(self):
2590         self.client.close()
2591         self.server.stop()
2592
2593     def cmdresp(self, cmd):
2594         """Send a command and return response, also if the command failed."""
2595         try:
2596             return self.client.sendcmd(cmd)
2597         except ftplib.Error, err:
2598             return str(err)
2599
2600     def test_eprt(self):
2601         # test wrong proto
2602         try:
2603             self.client.sendcmd('eprt |%s|%s|%s|' % (self.other_proto,
2604                                 self.server.host, self.server.port))
2605         except ftplib.error_perm, err:
2606             self.assertEqual(str(err)[0:3], "522")
2607         else:
2608             self.fail("Exception not raised")
2609
2610         # test bad args
2611         msg = "501 Invalid EPRT format."
2612         # len('|') > 3
2613         self.assertEqual(self.cmdresp('eprt ||||'), msg)
2614         # len('|') < 3
2615         self.assertEqual(self.cmdresp('eprt ||'), msg)
2616         # port > 65535
2617         self.assertEqual(self.cmdresp('eprt |%s|%s|65536|' % (self.proto,
2618                                                              self.HOST)), msg)
2619         # port < 0
2620         self.assertEqual(self.cmdresp('eprt |%s|%s|-1|' % (self.proto,
2621                                                           self.HOST)), msg)
2622         # port < 1024
2623         self.assertEqual(self.cmdresp('eprt |%s|%s|222|' % (self.proto,
2624                        self.HOST)), "501 Can't connect over a privileged port.")
2625         # proto > 2
2626         _cmd = 'eprt |3|%s|%s|' % (self.server.host, self.server.port)
2627         self.assertRaises(ftplib.error_perm,  self.client.sendcmd, _cmd)
2628
2629
2630         if self.proto == '1':
2631             # len(ip.octs) > 4
2632             self.assertEqual(self.cmdresp('eprt |1|1.2.3.4.5|2048|'), msg)
2633             # ip.oct > 255
2634             self.assertEqual(self.cmdresp('eprt |1|1.2.3.256|2048|'), msg)
2635             # bad proto
2636             resp = self.cmdresp('eprt |2|1.2.3.256|2048|')
2637             self.assert_("Network protocol not supported" in resp)
2638
2639         # test connection
2640         sock = socket.socket(self.client.af)
2641         sock.bind((self.client.sock.getsockname()[0], 0))
2642         sock.listen(5)
2643         sock.settimeout(2)
2644         ip, port =  sock.getsockname()[:2]
2645         self.client.sendcmd('eprt |%s|%s|%s|' % (self.proto, ip, port))
2646         try:
2647             try:
2648                 sock.accept()
2649             except socket.timeout:
2650                 self.fail("Server didn't connect to passive socket")
2651         finally:
2652             sock.close()
2653
2654     def test_epsv(self):
2655         # test wrong proto
2656         try:
2657             self.client.sendcmd('epsv ' + self.other_proto)
2658         except ftplib.error_perm, err:
2659             self.assertEqual(str(err)[0:3], "522")
2660         else:
2661             self.fail("Exception not raised")
2662
2663         # proto > 2
2664         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'epsv 3')
2665
2666         # test connection
2667         for cmd in ('EPSV', 'EPSV ' + self.proto):
2668             host, port = ftplib.parse229(self.client.sendcmd(cmd),
2669                          self.client.sock.getpeername())
2670             s = socket.socket(self.client.af, socket.SOCK_STREAM)
2671             s.settimeout(2)
2672             try:
2673                 s.connect((host, port))
2674                 self.client.sendcmd('abor')
2675             finally:
2676                 s.close()
2677
2678     def test_epsv_all(self):
2679         self.client.sendcmd('epsv all')
2680         self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'pasv')
2681         self.assertRaises(ftplib.error_perm, self.client.sendport, self.HOST, 2000)
2682         self.assertRaises(ftplib.error_perm, self.client.sendcmd,
2683                           'eprt |%s|%s|%s|' % (self.proto, self.HOST, 2000))
2684
2685
2686 class TestIPv4Environment(_TestNetworkProtocols):
2687     """Test PASV, EPSV, PORT and EPRT commands.
2688
2689     Runs tests contained in _TestNetworkProtocols class by using IPv4
2690     plus some additional specific tests.
2691     """
2692     server_class = FTPd
2693     client_class = ftplib.FTP
2694     HOST = '127.0.0.1'
2695
2696     def test_port_v4(self):
2697         # test connection
2698         sock = self.client.makeport()
2699         self.client.sendcmd('abor')
2700         sock.close()
2701         # test bad arguments
2702         ae = self.assertEqual
2703         msg = "501 Invalid PORT format."
2704         ae(self.cmdresp('port 127,0,0,1,1.1'), msg)    # sep != ','
2705         ae(self.cmdresp('port X,0,0,1,1,1'), msg)      # value != int
2706         ae(self.cmdresp('port 127,0,0,1,1,1,1'), msg)  # len(args) > 6
2707         ae(self.cmdresp('port 127,0,0,1'), msg)        # len(args) < 6
2708         ae(self.cmdresp('port 256,0,0,1,1,1'), msg)    # oct > 255
2709         ae(self.cmdresp('port 127,0,0,1,256,1'), msg)  # port > 65535
2710         ae(self.cmdresp('port 127,0,0,1,-1,0'), msg)   # port < 0
2711         msg = "501 Can't connect over a privileged port."
2712         ae(self.cmdresp('port %s,1,1' % self.HOST.replace('.',',')),msg) # port < 1024
2713         if "1.2.3.4" != self.HOST:
2714             msg = "501 Can't connect to a foreign address."
2715             ae(self.cmdresp('port 1,2,3,4,4,4'), msg)
2716
2717     def test_eprt_v4(self):
2718         self.assertEqual(self.cmdresp('eprt |1|0.10.10.10|2222|'),
2719                          "501 Can't connect to a foreign address.")
2720
2721     def test_pasv_v4(self):
2722         host, port = ftplib.parse227(self.client.sendcmd('pasv'))
2723         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2724         s.settimeout(2)
2725         try:
2726             s.connect((host, port))
2727         finally:
2728             s.close()
2729
2730
2731 class TestIPv6Environment(_TestNetworkProtocols):
2732     """Test PASV, EPSV, PORT and EPRT commands.
2733
2734     Runs tests contained in _TestNetworkProtocols class by using IPv6
2735     plus some additional specific tests.
2736     """
2737     server_class = FTPd
2738     client_class = ftplib.FTP
2739     HOST = '::1'
2740
2741     def test_port_v6(self):
2742         # PORT is not supposed to work
2743         self.assertRaises(ftplib.error_perm, self.client.sendport,
2744                           self.server.host, self.server.port)
2745
2746     def test_pasv_v6(self):
2747         # PASV is still supposed to work to support clients using
2748         # IPv4 connecting to a server supporting both IPv4 and IPv6
2749         self.client.makepasv()
2750
2751     def test_eprt_v6(self):
2752         self.assertEqual(self.cmdresp('eprt |2|::foo|2222|'),
2753                          "501 Can't connect to a foreign address.")
2754
2755
2756 class TestIPv6MixedEnvironment(unittest.TestCase):
2757     """By running the server by specifying "::" as IP address the
2758     server is supposed to listen on all interfaces, supporting both
2759     IPv4 and IPv6 by using a single socket.
2760
2761     What we are going to do here is starting the server in this
2762     manner and try to connect by using an IPv4 client.
2763     """
2764     server_class = FTPd
2765     client_class = ftplib.FTP
2766     HOST = "::"
2767
2768     def setUp(self):
2769         self.server = self.server_class(self.HOST)
2770         self.server.start()
2771         self.client = None
2772
2773     def tearDown(self):
2774         if self.client is not None:
2775             self.client.close()
2776         self.server.stop()
2777
2778     def test_port_v4(self):
2779         noop = lambda x: x
2780         self.client = self.client_class()
2781         self.client.connect('127.0.0.1', self.server.port)
2782         self.client.set_pasv(False)
2783         self.client.sock.settimeout(2)
2784         self.client.login(USER, PASSWD)
2785         self.client.retrlines('list', noop)
2786
2787     def test_pasv_v4(self):
2788         noop = lambda x: x
2789         self.client = self.client_class()
2790         self.client.connect('127.0.0.1', self.server.port)
2791         self.client.set_pasv(True)
2792         self.client.sock.settimeout(2)
2793         self.client.login(USER, PASSWD)
2794         self.client.retrlines('list', noop)
2795         # make sure pasv response doesn't return an IPv4-mapped address
2796         ip = self.client.makepasv()[0]
2797         self.assertFalse(ip.startswith("::ffff:"))
2798
2799
2800 class TestCornerCases(unittest.TestCase):
2801     """Tests for any kind of strange situation for the server to be in,
2802     mainly referring to bugs signaled on the bug tracker.
2803     """
2804     server_class = FTPd
2805     client_class = ftplib.FTP
2806
2807     def setUp(self):
2808         self.server = self.server_class()
2809         self.server.start()
2810         self.client = self.client_class()
2811         self.client.connect(self.server.host, self.server.port)
2812         self.client.sock.settimeout(2)
2813         self.client.login(USER, PASSWD)
2814
2815     def tearDown(self):
2816         self.client.close()
2817         if self.server.running:
2818             self.server.stop()
2819
2820     def test_port_race_condition(self):
2821         # Refers to bug #120, first sends PORT, then disconnects the
2822         # control channel before accept()ing the incoming data connection.
2823         # The original server behavior was to reply with "200 Active
2824         # data connection established" *after* the client had already
2825         # disconnected the control connection.
2826         sock = socket.socket(self.client.af)
2827         sock.bind((self.client.sock.getsockname()[0], 0))
2828         sock.listen(5)
2829         sock.settimeout(2)
2830         host, port =  sock.getsockname()[:2]
2831
2832         hbytes = host.split('.')
2833         pbytes = [repr(port // 256), repr(port % 256)]
2834         bytes = hbytes + pbytes
2835         cmd = 'PORT ' + ','.join(bytes)
2836         self.client.sock.sendall(cmd + '\r\n')
2837         self.client.quit()
2838         sock.accept()
2839         sock.close()
2840
2841     def test_stou_max_tries(self):
2842         # Emulates case where the max number of tries to find out a
2843         # unique file name when processing STOU command gets hit.
2844
2845         class TestFS(ftpserver.AbstractedFS):
2846             def mkstemp(self, *args, **kwargs):
2847                 raise IOError(errno.EEXIST, "No usable temporary file name found")
2848
2849         self.server.handler.abstracted_fs = TestFS
2850         try:
2851             self.client.quit()
2852             self.client.connect(self.server.host, self.server.port)
2853             self.client.login(USER, PASSWD)
2854             self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'stou')
2855         finally:
2856             self.server.handler.abstracted_fs = ftpserver.AbstractedFS
2857
2858     def test_quick_connect(self):
2859         # Clients that connected and disconnected quickly could cause
2860         # the server to crash, due to a failure to catch errors in the
2861         # initial part of the connection process.
2862         # Tracked in issues #91, #104 and #105.
2863         # See also https://bugs.launchpad.net/zodb/+bug/135108
2864         import struct
2865
2866         def connect(addr):
2867             s = socket.socket()
2868             s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
2869                          struct.pack('ii', 1, 0))
2870             try:
2871                 s.connect(addr)
2872             except socket.error:
2873                 pass
2874             s.close()
2875
2876         for x in xrange(10):
2877             connect((self.server.host, self.server.port))
2878         for x in xrange(10):
2879             addr = self.client.makepasv()
2880             connect(addr)
2881
2882     def test_error_on_callback(self):
2883         # test that the server do not crash in case an error occurs
2884         # while firing a scheduled function
2885         self.tearDown()
2886         flag = []
2887         original_logerror = ftpserver.logerror
2888         ftpserver.logerror = lambda msg: flag.append(msg)
2889         server = ftpserver.FTPServer((HOST, 0), ftpserver.FTPHandler)
2890         try:
2891             len1 = len(asyncore.socket_map)
2892             ftpserver.CallLater(0, lambda: 1 // 0)
2893             server.serve_forever(timeout=0, count=1)
2894             len2 = len(asyncore.socket_map)
2895             self.assertEqual(len1, len2)
2896             self.assertTrue(flag)
2897         finally:
2898             ftpserver.logerror = original_logerror
2899             server.close()
2900
2901     def test_active_conn_error(self):
2902         # we open a socket() but avoid to invoke accept() to
2903         # reproduce this error condition:
2904         # http://code.google.com/p/pyftpdlib/source/detail?r=905
2905         sock = socket.socket()
2906         sock.bind((HOST, 0))
2907         port = sock.getsockname()[1]
2908         self.client.sock.settimeout(.1)
2909         try:
2910             resp = self.client.sendport(HOST, port)
2911         except ftplib.error_temp, err:
2912             self.assertEqual(str(err)[:3], '425')
2913         except socket.timeout:
2914             pass
2915         else:
2916             self.assertNotEqual(str(resp)[:3], '200')
2917
2918
2919 class TestUnicodePathNames(unittest.TestCase):
2920     """Test FTP commands and responses by using path names with non
2921     ASCII characters.
2922     """
2923     server_class = FTPd
2924     client_class = ftplib.FTP
2925
2926     def setUp(self):
2927         self.server = self.server_class()
2928         self.server.start()
2929         self.client = self.client_class()
2930         self.client.connect(self.server.host, self.server.port)
2931         self.client.sock.settimeout(2)
2932         self.client.login(USER, PASSWD)
2933         self.tempfile = os.path.basename(touch(TESTFN + '☃'))
2934         self.tempdir = TESTFN + '☺'
2935         os.mkdir(self.tempdir)
2936
2937     def tearDown(self):
2938         self.client.close()
2939         self.server.stop()
2940         safe_remove(self.tempfile)
2941         if os.path.exists(self.tempdir):
2942             shutil.rmtree(self.tempdir)
2943
2944     # --- fs operations
2945
2946     def test_cwd(self):
2947         resp = self.client.cwd(self.tempdir)
2948         self.assertTrue(self.tempdir in resp)
2949
2950     def test_mkd(self):
2951         os.rmdir(self.tempdir)
2952         dirname = self.client.mkd(self.tempdir)
2953         self.assertEqual(dirname, '/' + self.tempdir)
2954         self.assertTrue(os.path.isdir(self.tempdir))
2955
2956     def test_rmdir(self):
2957         self.client.rmd(self.tempdir)
2958
2959     def test_dele(self):
2960         self.client.delete(self.tempfile)
2961         self.assertFalse(os.path.exists(self.tempfile))
2962
2963     def test_rnfr_rnto(self):
2964         tempname = TESTFN + '♥'
2965         try:
2966             # rename file
2967             self.client.rename(self.tempfile, tempname)
2968             self.assertTrue(os.path.isfile(tempname))
2969             self.client.rename(tempname, self.tempfile)
2970             # rename dir
2971             self.client.rename(self.tempdir, tempname)
2972             self.assertTrue(os.path.isdir(tempname))
2973             self.client.rename(tempname, self.tempdir)
2974         finally:
2975             safe_remove(tempname)
2976             safe_rmdir(tempname)
2977
2978     def test_size(self):
2979         self.client.sendcmd('type i')
2980         self.client.sendcmd('size ' + self.tempfile)
2981
2982     def test_mdtm(self):
2983         self.client.sendcmd('mdtm ' + self.tempfile)
2984
2985     def test_stou(self):
2986         resp = self.client.sendcmd('stou ' + self.tempfile)
2987         try:
2988             self.assertTrue(self.tempfile in resp)
2989             self.client.quit()
2990         finally:
2991             os.remove(resp.rsplit(' ', 1)[1])
2992
2993     if hasattr(os, 'chmod'):
2994         def test_site_chmod(self):
2995             self.client.sendcmd('site chmod 777 ' + self.tempfile)
2996
2997     # --- listing cmds
2998
2999     def _test_listing_cmds(self, cmd):
3000         ls = []
3001         touch(os.path.join(self.tempdir, self.tempfile))
3002         self.client.retrlines("%s %s" % (cmd, self.tempdir), ls.append)
3003         self.assertTrue(self.tempfile in ls[0])
3004
3005     def test_list(self):
3006         self._test_listing_cmds('list')
3007
3008     def test_nlst(self):
3009         self._test_listing_cmds('nlst')
3010
3011     def test_mlsd(self):
3012         self._test_listing_cmds('mlsd')
3013
3014     def test_mlst(self):
3015         # utility function for extracting the line of interest
3016         mlstline = lambda cmd: self.client.voidcmd(cmd).split('\n')[1]
3017         self.assertTrue('type=dir' in mlstline('mlst ' + self.tempdir))
3018         self.assertTrue('/' + self.tempdir in mlstline('mlst ' + self.tempdir))
3019         self.assertTrue('type=file' in mlstline('mlst ' + self.tempfile))
3020         self.assertTrue('/' + self.tempfile in mlstline('mlst ' + self.tempfile))
3021
3022     # --- file transfer
3023
3024     def test_stor(self):
3025         data = 'abcde12345' * 500
3026         os.remove(self.tempfile)
3027         dummy = StringIO.StringIO()
3028         dummy.write(data)
3029         dummy.seek(0)
3030         self.client.storbinary('stor ' + self.tempfile, dummy)
3031         dummy_recv = StringIO.StringIO()
3032         self.client.retrbinary('retr ' + self.tempfile, dummy_recv.write)
3033         dummy_recv.seek(0)
3034         self.assertEqual(dummy_recv.read(), data)
3035
3036     def test_retr(self):
3037         data = 'abcd1234' * 500
3038         f = open(self.tempfile, 'wb')
3039         f.write(data)
3040         f.close()
3041         dummy = StringIO.StringIO()
3042         self.client.retrbinary('retr ' + self.tempfile, dummy.write)
3043         dummy.seek(0)
3044         self.assertEqual(dummy.read(), data)
3045
3046     # XXX - provisional
3047     def test_encode_decode(self):
3048         self.client.sendcmd('type i')
3049         self.client.sendcmd('size ' + self.tempfile.decode('utf8').encode('utf8'))
3050
3051
3052 class TestCommandLineParser(unittest.TestCase):
3053     """Test command line parser."""
3054     SYSARGV = sys.argv
3055     STDERR = sys.stderr
3056
3057     def setUp(self):
3058         class DummyFTPServer(ftpserver.FTPServer):
3059             """An overridden version of FTPServer class which forces
3060             serve_forever() to return immediately.
3061             """
3062             def serve_forever(self, *args, **kwargs):
3063                 return
3064
3065         self.devnull = StringIO.StringIO()
3066         sys.argv = self.SYSARGV[:]
3067         sys.stderr = self.STDERR
3068         self.original_ftpserver_class = ftpserver.FTPServer
3069         ftpserver.FTPServer = DummyFTPServer
3070
3071     def tearDown(self):
3072         self.devnull.close()
3073         sys.argv = self.SYSARGV[:]
3074         sys.stderr = self.STDERR
3075         ftpserver.FTPServer = self.original_ftpserver_class
3076         safe_rmdir(TESTFN)
3077
3078     def test_a_option(self):
3079         sys.argv += ["-i", "localhost", "-p", "0"]
3080         ftpserver.main()
3081         sys.argv = self.SYSARGV[:]
3082
3083         # no argument
3084         sys.argv += ["-a"]
3085         sys.stderr = self.devnull
3086         self.assertRaises(SystemExit, ftpserver.main)
3087
3088     def test_p_option(self):
3089         sys.argv += ["-p", "0"]
3090         ftpserver.main()
3091
3092         # no argument
3093         sys.argv = self.SYSARGV[:]
3094         sys.argv += ["-p"]
3095         sys.stderr = self.devnull
3096         self.assertRaises(SystemExit, ftpserver.main)
3097
3098         # invalid argument
3099         sys.argv += ["-p foo"]
3100         self.assertRaises(SystemExit, ftpserver.main)
3101
3102     def test_w_option(self):
3103         sys.argv += ["-w", "-p", "0"]
3104         warnings.filterwarnings("error")
3105         try:
3106             self.assertRaises(RuntimeWarning, ftpserver.main)
3107         finally:
3108             warnings.resetwarnings()
3109
3110         # unexpected argument
3111         sys.argv = self.SYSARGV[:]
3112         sys.argv += ["-w foo"]
3113         sys.stderr = self.devnull
3114         self.assertRaises(SystemExit, ftpserver.main)
3115
3116     def test_d_option(self):
3117         sys.argv += ["-d", TESTFN, "-p", "0"]
3118         if not os.path.isdir(TESTFN):
3119             os.mkdir(TESTFN)
3120         ftpserver.main()
3121
3122         # without argument
3123         sys.argv = self.SYSARGV[:]
3124         sys.argv += ["-d"]
3125         sys.stderr = self.devnull
3126         self.assertRaises(SystemExit, ftpserver.main)
3127
3128         # no such directory
3129         sys.argv = self.SYSARGV[:]
3130         sys.argv += ["-d %s" % TESTFN]
3131         safe_rmdir(TESTFN)
3132         self.assertRaises(ValueError, ftpserver.main)
3133
3134     def test_r_option(self):
3135         sys.argv += ["-r 60000-61000", "-p", "0"]
3136         ftpserver.main()
3137
3138         # without arg
3139         sys.argv = self.SYSARGV[:]
3140         sys.argv += ["-r"]
3141         sys.stderr = self.devnull
3142         self.assertRaises(SystemExit, ftpserver.main)
3143
3144         # wrong arg
3145         sys.argv = self.SYSARGV[:]
3146         sys.argv += ["-r yyy-zzz"]
3147         self.assertRaises(SystemExit, ftpserver.main)
3148
3149     def test_v_option(self):
3150         sys.argv += ["-v"]
3151         self.assertRaises(SystemExit, ftpserver.main)
3152
3153         # unexpected argument
3154         sys.argv = self.SYSARGV[:]
3155         sys.argv += ["-v foo"]
3156         sys.stderr = self.devnull
3157         self.assertRaises(SystemExit, ftpserver.main)
3158
3159
3160 def test_main(tests=None):
3161     test_suite = unittest.TestSuite()
3162     if tests is None:
3163         tests = [
3164                  TestAbstractedFS,
3165                  TestDummyAuthorizer,
3166                  TestCallLater,
3167                  TestCallEvery,
3168                  TestFtpAuthentication,
3169                  TestFtpDummyCmds,
3170                  TestFtpCmdsSemantic,
3171                  TestFtpFsOperations,
3172                  TestFtpStoreData,
3173                  TestFtpRetrieveData,
3174                  TestFtpListingCmds,
3175                  TestFtpAbort,
3176                  TestTimeouts,
3177                  TestConfigurableOptions,
3178                  TestCallbacks,
3179                  TestCornerCases,
3180                  TestUnicodePathNames,
3181                  TestCommandLineParser,
3182                  ]
3183         if SUPPORTS_IPV4:
3184             tests.append(TestIPv4Environment)
3185         if SUPPORTS_IPV6:
3186             tests.append(TestIPv6Environment)
3187         if SUPPORTS_HYBRID_IPV6:
3188             tests.append(TestIPv6MixedEnvironment)
3189         if SUPPORTS_SENDFILE:
3190             tests.append(TestFtpRetrieveDataNoSendfile)
3191             tests.append(TestFtpStoreDataNoSendfile)
3192         else:
3193             if os.name == 'posix':
3194                 atexit.register(warnings.warn, "couldn't run sendfile() tests",
3195                                 RuntimeWarning)
3196
3197     for test in tests:
3198         test_suite.addTest(unittest.makeSuite(test))
3199     try:
3200         unittest.TextTestRunner(verbosity=2).run(test_suite)
3201     except:
3202         # in case of KeyboardInterrupt grant that the threaded FTP
3203         # server running in background gets stopped
3204         asyncore.socket_map.clear()
3205         raise
3206
3207
3208 if __name__ == '__main__':
3209     test_main()