1 # -*- test-case-name: twisted.conch.test.test_cftp -*-
2 # Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3 # See LICENSE file for details.
6 Tests for L{twisted.conch.scripts.cftp}.
10 import time, sys, os, operator, getpass, struct
11 from StringIO import StringIO
13 from twisted.conch.test.test_ssh import Crypto, pyasn1
18 from twisted.conch import unix
19 from twisted.conch.scripts import cftp
20 from twisted.conch.test.test_filetransfer import FileTransferForTestAvatar
21 except ImportError, e:
22 # Python 2.3 compatibility fix
23 sys.modules.pop("twisted.conch.unix", None)
31 from twisted.python.fakepwd import UserDatabase
32 from twisted.trial.unittest import TestCase
33 from twisted.cred import portal
34 from twisted.internet import reactor, protocol, interfaces, defer, error
35 from twisted.internet.utils import getProcessOutputAndValue
36 from twisted.python import log
37 from twisted.conch import ls
38 from twisted.test.proto_helpers import StringTransport
39 from twisted.internet.task import Clock
41 from twisted.conch.test import test_ssh, test_conch
42 from twisted.conch.test.test_filetransfer import SFTPTestBase
43 from twisted.conch.test.test_filetransfer import FileTransferTestAvatar
47 class ListingTests(TestCase):
49 Tests for L{lsLine}, the function which generates an entry for a file or
50 directory in an SFTP I{ls} command's output.
52 if getattr(time, 'tzset', None) is None:
53 skip = "Cannot test timestamp formatting code without time.tzset"
57 Patch the L{ls} module's time function so the results of L{lsLine} are
63 self.patch(ls, 'time', fakeTime)
65 # Make sure that the timezone ends up the same after these tests as
67 if 'TZ' in os.environ:
68 self.addCleanup(operator.setitem, os.environ, 'TZ', os.environ['TZ'])
69 self.addCleanup(time.tzset)
72 # os.environ.pop is broken! Don't use it! Ever! Or die!
78 self.addCleanup(cleanup)
81 def _lsInTimezone(self, timezone, stat):
83 Call L{ls.lsLine} after setting the timezone to C{timezone} and return
86 # Set the timezone to a well-known value so the timestamps are
88 os.environ['TZ'] = timezone
90 return ls.lsLine('foo', stat)
93 def test_oldFile(self):
95 A file with an mtime six months (approximately) or more in the past has
96 a listing including a low-resolution timestamp.
98 # Go with 7 months. That's more than 6 months.
99 then = self.now - (60 * 60 * 24 * 31 * 7)
100 stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))
103 self._lsInTimezone('America/New_York', stat),
104 '!--------- 0 0 0 0 Apr 26 1973 foo')
106 self._lsInTimezone('Pacific/Auckland', stat),
107 '!--------- 0 0 0 0 Apr 27 1973 foo')
110 def test_oldSingleDigitDayOfMonth(self):
112 A file with a high-resolution timestamp which falls on a day of the
113 month which can be represented by one decimal digit is formatted with
114 one padding 0 to preserve the columns which come after it.
116 # A point about 7 months in the past, tweaked to fall on the first of a
117 # month so we test the case we want to test.
118 then = self.now - (60 * 60 * 24 * 31 * 7) + (60 * 60 * 24 * 5)
119 stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))
122 self._lsInTimezone('America/New_York', stat),
123 '!--------- 0 0 0 0 May 01 1973 foo')
125 self._lsInTimezone('Pacific/Auckland', stat),
126 '!--------- 0 0 0 0 May 02 1973 foo')
129 def test_newFile(self):
131 A file with an mtime fewer than six months (approximately) in the past
132 has a listing including a high-resolution timestamp excluding the year.
134 # A point about three months in the past.
135 then = self.now - (60 * 60 * 24 * 31 * 3)
136 stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))
139 self._lsInTimezone('America/New_York', stat),
140 '!--------- 0 0 0 0 Aug 28 17:33 foo')
142 self._lsInTimezone('Pacific/Auckland', stat),
143 '!--------- 0 0 0 0 Aug 29 09:33 foo')
146 def test_localeIndependent(self):
148 The month name in the date is locale independent.
150 # A point about three months in the past.
151 then = self.now - (60 * 60 * 24 * 31 * 3)
152 stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))
154 # Fake that we're in a language where August is not Aug (e.g.: Spanish)
155 currentLocale = locale.getlocale()
156 locale.setlocale(locale.LC_ALL, "es_AR.UTF8")
157 self.addCleanup(locale.setlocale, locale.LC_ALL, currentLocale)
160 self._lsInTimezone('America/New_York', stat),
161 '!--------- 0 0 0 0 Aug 28 17:33 foo')
163 self._lsInTimezone('Pacific/Auckland', stat),
164 '!--------- 0 0 0 0 Aug 29 09:33 foo')
166 # if alternate locale is not available, the previous test will be
167 # skipped, please install this locale for it to run
168 currentLocale = locale.getlocale()
171 locale.setlocale(locale.LC_ALL, "es_AR.UTF8")
173 test_localeIndependent.skip = "The es_AR.UTF8 locale is not installed."
175 locale.setlocale(locale.LC_ALL, currentLocale)
178 def test_newSingleDigitDayOfMonth(self):
180 A file with a high-resolution timestamp which falls on a day of the
181 month which can be represented by one decimal digit is formatted with
182 one padding 0 to preserve the columns which come after it.
184 # A point about three months in the past, tweaked to fall on the first
185 # of a month so we test the case we want to test.
186 then = self.now - (60 * 60 * 24 * 31 * 3) + (60 * 60 * 24 * 4)
187 stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))
190 self._lsInTimezone('America/New_York', stat),
191 '!--------- 0 0 0 0 Sep 01 17:33 foo')
193 self._lsInTimezone('Pacific/Auckland', stat),
194 '!--------- 0 0 0 0 Sep 02 09:33 foo')
198 class StdioClientTests(TestCase):
200 Tests for L{cftp.StdioClient}.
204 Create a L{cftp.StdioClient} hooked up to dummy transport and a fake
211 conn.transport = StringTransport()
212 conn.transport.localClosed = False
214 self.client = cftp.StdioClient(conn)
215 self.database = self.client._pwd = UserDatabase()
217 # Intentionally bypassing makeConnection - that triggers some code
218 # which uses features not provided by our dumb Connection fake.
219 self.client.transport = StringTransport()
224 The I{exec} command runs its arguments locally in a child process
225 using the user's shell.
227 self.database.addUser(
228 getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar',
231 d = self.client._dispatchCommand("exec print 1 + 2")
232 d.addCallback(self.assertEqual, "3\n")
236 def test_execWithoutShell(self):
238 If the local user has no shell, the I{exec} command runs its arguments
241 self.database.addUser(
242 getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar', '')
244 d = self.client._dispatchCommand("exec echo hello")
245 d.addCallback(self.assertEqual, "hello\n")
251 The I{exec} command is run for lines which start with C{"!"}.
253 self.database.addUser(
254 getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar',
257 d = self.client._dispatchCommand("!echo hello")
258 d.addCallback(self.assertEqual, "hello\n")
262 def setKnownConsoleSize(self, width, height):
264 For the duration of this test, patch C{cftp}'s C{fcntl} module to return
265 a fixed width and height.
267 @param width: the width in characters
269 @param height: the height in characters
272 import tty # local import to avoid win32 issues
273 class FakeFcntl(object):
274 def ioctl(self, fd, opt, mutate):
275 if opt != tty.TIOCGWINSZ:
276 self.fail("Only window-size queries supported.")
277 return struct.pack("4H", height, width, 0, 0)
278 self.patch(cftp, "fcntl", FakeFcntl())
281 def test_progressReporting(self):
283 L{StdioClient._printProgressBar} prints a progress description,
284 including percent done, amount transferred, transfer rate, and time
285 remaining, all based the given start time, the given L{FileWrapper}'s
286 progress information and the reactor's current time.
288 # Use a short, known console width because this simple test doesn't need
289 # to test the console padding.
290 self.setKnownConsoleSize(10, 34)
291 clock = self.client.reactor = Clock()
292 wrapped = StringIO("x")
293 wrapped.name = "sample"
294 wrapper = cftp.FileWrapper(wrapped)
295 wrapper.size = 1024 * 10
296 startTime = clock.seconds()
298 wrapper.total += 4096
299 self.client._printProgressBar(wrapper, startTime)
300 self.assertEqual(self.client.transport.value(),
301 "\rsample 40% 4.0kB 2.0kBps 00:03 ")
304 def test_reportNoProgress(self):
306 L{StdioClient._printProgressBar} prints a progress description that
307 indicates 0 bytes transferred if no bytes have been transferred and no
310 self.setKnownConsoleSize(10, 34)
311 clock = self.client.reactor = Clock()
312 wrapped = StringIO("x")
313 wrapped.name = "sample"
314 wrapper = cftp.FileWrapper(wrapped)
315 startTime = clock.seconds()
316 self.client._printProgressBar(wrapper, startTime)
317 self.assertEqual(self.client.transport.value(),
318 "\rsample 0% 0.0B 0.0Bps 00:00 ")
322 class FileTransferTestRealm:
323 def __init__(self, testDir):
324 self.testDir = testDir
326 def requestAvatar(self, avatarID, mind, *interfaces):
327 a = FileTransferTestAvatar(self.testDir)
328 return interfaces[0], a, lambda: None
331 class SFTPTestProcess(protocol.ProcessProtocol):
333 Protocol for testing cftp. Provides an interface between Python (where all
334 the tests are) and the cftp client process (which does the work that is
338 def __init__(self, onOutReceived):
340 @param onOutReceived: A L{Deferred} to be fired as soon as data is
341 received from stdout.
344 self.onOutReceived = onOutReceived
345 self.onProcessEnd = None
346 self._expectingCommand = None
347 self._processEnded = False
349 def clearBuffer(self):
351 Clear any buffered data received from stdout. Should be private.
354 self._linesReceived = []
355 self._lineBuffer = ''
357 def outReceived(self, data):
359 Called by Twisted when the cftp client prints data to stdout.
361 log.msg('got %s' % data)
362 lines = (self._lineBuffer + data).split('\n')
363 self._lineBuffer = lines.pop(-1)
364 self._linesReceived.extend(lines)
365 # XXX - not strictly correct.
366 # We really want onOutReceived to fire after the first 'cftp>' prompt
367 # has been received. (See use in TestOurServerCmdLineClient.setUp)
368 if self.onOutReceived is not None:
369 d, self.onOutReceived = self.onOutReceived, None
372 self._checkForCommand()
374 def _checkForCommand(self):
376 if self._expectingCommand and self._lineBuffer == prompt:
377 buf = '\n'.join(self._linesReceived)
378 if buf.startswith(prompt):
379 buf = buf[len(prompt):]
381 d, self._expectingCommand = self._expectingCommand, None
384 def errReceived(self, data):
386 Called by Twisted when the cftp client prints data to stderr.
388 log.msg('err: %s' % data)
392 Return the contents of the buffer of data received from stdout.
396 def runCommand(self, command):
398 Issue the given command via the cftp client. Return a C{Deferred} that
399 fires when the server returns a result. Note that the C{Deferred} will
400 callback even if the server returns some kind of error.
402 @param command: A string containing an sftp command.
404 @return: A C{Deferred} that fires when the sftp server returns a
405 result. The payload is the server's response string.
407 self._expectingCommand = defer.Deferred()
409 self.transport.write(command + '\n')
410 return self._expectingCommand
412 def runScript(self, commands):
414 Run each command in sequence and return a Deferred that fires when all
415 commands are completed.
417 @param commands: A list of strings containing sftp commands.
419 @return: A C{Deferred} that fires when all commands are completed. The
420 payload is a list of response strings from the server, in the same
421 order as the commands.
423 sem = defer.DeferredSemaphore(1)
424 dl = [sem.run(self.runCommand, command) for command in commands]
425 return defer.gatherResults(dl)
427 def killProcess(self):
429 Kill the process if it is still running.
431 If the process is still running, sends a KILL signal to the transport
432 and returns a C{Deferred} which fires when L{processEnded} is called.
434 @return: a C{Deferred}.
436 if self._processEnded:
437 return defer.succeed(None)
438 self.onProcessEnd = defer.Deferred()
439 self.transport.signalProcess('KILL')
440 return self.onProcessEnd
442 def processEnded(self, reason):
444 Called by Twisted when the cftp client process ends.
446 self._processEnded = True
447 if self.onProcessEnd:
448 d, self.onProcessEnd = self.onProcessEnd, None
452 class CFTPClientTestBase(SFTPTestBase):
454 f = open('dsa_test.pub','w')
455 f.write(test_ssh.publicDSA_openssh)
457 f = open('dsa_test','w')
458 f.write(test_ssh.privateDSA_openssh)
460 os.chmod('dsa_test', 33152)
461 f = open('kh_test','w')
462 f.write('127.0.0.1 ' + test_ssh.publicRSA_openssh)
464 return SFTPTestBase.setUp(self)
466 def startServer(self):
467 realm = FileTransferTestRealm(self.testDir)
468 p = portal.Portal(realm)
469 p.registerChecker(test_ssh.ConchTestPublicKeyChecker())
470 fac = test_ssh.ConchTestServerFactory()
472 self.server = reactor.listenTCP(0, fac, interface="127.0.0.1")
474 def stopServer(self):
475 if not hasattr(self.server.factory, 'proto'):
476 return self._cbStopServer(None)
477 self.server.factory.proto.expectedLoseConnection = 1
478 d = defer.maybeDeferred(
479 self.server.factory.proto.transport.loseConnection)
480 d.addCallback(self._cbStopServer)
483 def _cbStopServer(self, ignored):
484 return defer.maybeDeferred(self.server.stopListening)
487 for f in ['dsa_test.pub', 'dsa_test', 'kh_test']:
492 return SFTPTestBase.tearDown(self)
496 class TestOurServerCmdLineClient(CFTPClientTestBase):
499 CFTPClientTestBase.setUp(self)
502 cmds = ('-p %i -l testuser '
503 '--known-hosts kh_test '
504 '--user-authentications publickey '
505 '--host-key-algorithms ssh-rsa '
510 port = self.server.getHost().port
511 cmds = test_conch._makeArgs((cmds % port).split(), mod='cftp')
512 log.msg('running %s %s' % (sys.executable, cmds))
514 self.processProtocol = SFTPTestProcess(d)
515 d.addCallback(lambda _: self.processProtocol.clearBuffer())
516 env = os.environ.copy()
517 env['PYTHONPATH'] = os.pathsep.join(sys.path)
518 reactor.spawnProcess(self.processProtocol, sys.executable, cmds,
523 d = self.stopServer()
524 d.addCallback(lambda _: self.processProtocol.killProcess())
527 def _killProcess(self, ignored):
529 self.processProtocol.transport.signalProcess('KILL')
530 except error.ProcessExitedAlready:
533 def runCommand(self, command):
535 Run the given command with the cftp client. Return a C{Deferred} that
536 fires when the command is complete. Payload is the server's output for
539 return self.processProtocol.runCommand(command)
541 def runScript(self, *commands):
543 Run the given commands with the cftp client. Returns a C{Deferred}
544 that fires when the commands are all complete. The C{Deferred}'s
545 payload is a list of output for each command.
547 return self.processProtocol.runScript(commands)
551 Test that 'pwd' reports the current remote directory, that 'lpwd'
552 reports the current local directory, and that changing to a
553 subdirectory then changing to its parent leaves you in the original
556 # XXX - not actually a unit test, see docstring.
557 homeDir = os.path.join(os.getcwd(), self.testDir)
558 d = self.runScript('pwd', 'lpwd', 'cd testDirectory', 'cd ..', 'pwd')
559 d.addCallback(lambda xs: xs[:3] + xs[4:])
560 d.addCallback(self.assertEqual,
561 [homeDir, os.getcwd(), '', homeDir])
564 def testChAttrs(self):
566 Check that 'ls -l' output includes the access permissions and that
567 this output changes appropriately with 'chmod'.
570 self.flushLoggedErrors()
571 self.assertTrue(results[0].startswith('-rw-r--r--'))
572 self.assertEqual(results[1], '')
573 self.assertTrue(results[2].startswith('----------'), results[2])
574 self.assertEqual(results[3], '')
576 d = self.runScript('ls -l testfile1', 'chmod 0 testfile1',
577 'ls -l testfile1', 'chmod 644 testfile1')
578 return d.addCallback(_check)
584 Check 'ls' works as expected. Checks for wildcards, hidden files,
585 listing directories and listing empty directories.
588 self.assertEqual(results[0], ['testDirectory', 'testRemoveFile',
589 'testRenameFile', 'testfile1'])
590 self.assertEqual(results[1], ['testDirectory', 'testRemoveFile',
591 'testRenameFile', 'testfile1'])
592 self.assertEqual(results[2], ['testRemoveFile', 'testRenameFile'])
593 self.assertEqual(results[3], ['.testHiddenFile', 'testRemoveFile',
595 self.assertEqual(results[4], [''])
596 d = self.runScript('ls', 'ls ../' + os.path.basename(self.testDir),
597 'ls *File', 'ls -a *File', 'ls -l testDirectory')
598 d.addCallback(lambda xs: [x.split('\n') for x in xs])
599 return d.addCallback(_check)
604 Check that running the '?' command returns help.
606 d = self.runCommand('?')
607 d.addCallback(self.assertEqual,
608 cftp.StdioClient(None).cmd_HELP('').strip())
611 def assertFilesEqual(self, name1, name2, msg=None):
613 Assert that the files at C{name1} and C{name2} contain exactly the
616 f1 = file(name1).read()
617 f2 = file(name2).read()
618 self.assertEqual(f1, f2, msg)
623 Test that 'get' saves the remote file to the correct local location,
624 that the output of 'get' is correct and that 'rm' actually removes
627 # XXX - not actually a unit test
628 expectedOutput = ("Transferred %s/%s/testfile1 to %s/test file2"
629 % (os.getcwd(), self.testDir, self.testDir))
630 def _checkGet(result):
631 self.assertTrue(result.endswith(expectedOutput))
632 self.assertFilesEqual(self.testDir + '/testfile1',
633 self.testDir + '/test file2',
635 return self.runCommand('rm "test file2"')
637 d = self.runCommand('get testfile1 "%s/test file2"' % (self.testDir,))
638 d.addCallback(_checkGet)
639 d.addCallback(lambda _: self.failIf(
640 os.path.exists(self.testDir + '/test file2')))
644 def testWildcardGet(self):
646 Test that 'get' works correctly when given wildcard parameters.
649 self.assertFilesEqual(self.testDir + '/testRemoveFile',
651 'testRemoveFile get failed')
652 self.assertFilesEqual(self.testDir + '/testRenameFile',
654 'testRenameFile get failed')
656 d = self.runCommand('get testR*')
657 return d.addCallback(_check)
662 Check that 'put' uploads files correctly and that they can be
663 successfully removed. Also check the output of the put command.
665 # XXX - not actually a unit test
666 expectedOutput = ('Transferred %s/testfile1 to %s/%s/test"file2'
667 % (self.testDir, os.getcwd(), self.testDir))
668 def _checkPut(result):
669 self.assertFilesEqual(self.testDir + '/testfile1',
670 self.testDir + '/test"file2')
671 self.failUnless(result.endswith(expectedOutput))
672 return self.runCommand('rm "test\\"file2"')
674 d = self.runCommand('put %s/testfile1 "test\\"file2"'
676 d.addCallback(_checkPut)
677 d.addCallback(lambda _: self.failIf(
678 os.path.exists(self.testDir + '/test"file2')))
682 def test_putOverLongerFile(self):
684 Check that 'put' uploads files correctly when overwriting a longer
687 # XXX - not actually a unit test
688 f = file(os.path.join(self.testDir, 'shorterFile'), 'w')
691 f = file(os.path.join(self.testDir, 'longerFile'), 'w')
694 def _checkPut(result):
695 self.assertFilesEqual(self.testDir + '/shorterFile',
696 self.testDir + '/longerFile')
698 d = self.runCommand('put %s/shorterFile longerFile'
700 d.addCallback(_checkPut)
704 def test_putMultipleOverLongerFile(self):
706 Check that 'put' uploads files correctly when overwriting a longer
707 file and you use a wildcard to specify the files to upload.
709 # XXX - not actually a unit test
710 os.mkdir(os.path.join(self.testDir, 'dir'))
711 f = file(os.path.join(self.testDir, 'dir', 'file'), 'w')
714 f = file(os.path.join(self.testDir, 'file'), 'w')
717 def _checkPut(result):
718 self.assertFilesEqual(self.testDir + '/dir/file',
719 self.testDir + '/file')
721 d = self.runCommand('put %s/dir/*'
723 d.addCallback(_checkPut)
727 def testWildcardPut(self):
729 What happens if you issue a 'put' command and include a wildcard (i.e.
730 '*') in parameter? Check that all files matching the wildcard are
731 uploaded to the correct directory.
734 self.assertEqual(results[0], '')
735 self.assertEqual(results[2], '')
736 self.assertFilesEqual(self.testDir + '/testRemoveFile',
737 self.testDir + '/../testRemoveFile',
738 'testRemoveFile get failed')
739 self.assertFilesEqual(self.testDir + '/testRenameFile',
740 self.testDir + '/../testRenameFile',
741 'testRenameFile get failed')
743 d = self.runScript('cd ..',
744 'put %s/testR*' % (self.testDir,),
745 'cd %s' % os.path.basename(self.testDir))
752 Test that 'ln' creates a file which appears as a link in the output of
753 'ls'. Check that removing the new file succeeds without output.
756 self.flushLoggedErrors()
757 self.assertEqual(results[0], '')
758 self.assertTrue(results[1].startswith('l'), 'link failed')
759 return self.runCommand('rm testLink')
761 d = self.runScript('ln testLink testfile1', 'ls -l testLink')
762 d.addCallback(_check)
763 d.addCallback(self.assertEqual, '')
767 def testRemoteDirectory(self):
769 Test that we can create and remove directories with the cftp client.
772 self.assertEqual(results[0], '')
773 self.assertTrue(results[1].startswith('d'))
774 return self.runCommand('rmdir testMakeDirectory')
776 d = self.runScript('mkdir testMakeDirectory',
777 'ls -l testMakeDirector?')
778 d.addCallback(_check)
779 d.addCallback(self.assertEqual, '')
783 def test_existingRemoteDirectory(self):
785 Test that a C{mkdir} on an existing directory fails with the
786 appropriate error, and doesn't log an useless error server side.
789 self.assertEqual(results[0], '')
790 self.assertEqual(results[1],
791 'remote error 11: mkdir failed')
793 d = self.runScript('mkdir testMakeDirectory',
794 'mkdir testMakeDirectory')
795 d.addCallback(_check)
799 def testLocalDirectory(self):
801 Test that we can create a directory locally and remove it with the
802 cftp client. This test works because the 'remote' server is running
803 out of a local directory.
805 d = self.runCommand('lmkdir %s/testLocalDirectory' % (self.testDir,))
806 d.addCallback(self.assertEqual, '')
807 d.addCallback(lambda _: self.runCommand('rmdir testLocalDirectory'))
808 d.addCallback(self.assertEqual, '')
812 def testRename(self):
814 Test that we can rename a file.
817 self.assertEqual(results[0], '')
818 self.assertEqual(results[1], 'testfile2')
819 return self.runCommand('rename testfile2 testfile1')
821 d = self.runScript('rename testfile1 testfile2', 'ls testfile?')
822 d.addCallback(_check)
823 d.addCallback(self.assertEqual, '')
828 class TestOurServerBatchFile(CFTPClientTestBase):
830 CFTPClientTestBase.setUp(self)
834 CFTPClientTestBase.tearDown(self)
835 return self.stopServer()
837 def _getBatchOutput(self, f):
839 open(fn, 'w').write(f)
840 port = self.server.getHost().port
841 cmds = ('-p %i -l testuser '
842 '--known-hosts kh_test '
843 '--user-authentications publickey '
844 '--host-key-algorithms ssh-rsa '
847 '-v -b %s 127.0.0.1') % (port, fn)
848 cmds = test_conch._makeArgs(cmds.split(), mod='cftp')[1:]
849 log.msg('running %s %s' % (sys.executable, cmds))
850 env = os.environ.copy()
851 env['PYTHONPATH'] = os.pathsep.join(sys.path)
853 self.server.factory.expectedLoseConnection = 1
855 d = getProcessOutputAndValue(sys.executable, cmds, env=env)
861 d.addCallback(lambda res: res[0])
866 def testBatchFile(self):
867 """Test whether batch file function of cftp ('cftp -b batchfile').
868 This works by treating the file as a list of commands to be run.
874 def _cbCheckResult(res):
875 res = res.split('\n')
876 log.msg('RES %s' % str(res))
877 self.failUnless(res[1].find(self.testDir) != -1, repr(res))
878 self.assertEqual(res[3:-2], ['testDirectory', 'testRemoveFile',
879 'testRenameFile', 'testfile1'])
881 d = self._getBatchOutput(cmds)
882 d.addCallback(_cbCheckResult)
886 """Test that an error in the batch file stops running the batch.
888 cmds = """chown 0 missingFile
892 def _cbCheckResult(res):
893 self.failIf(res.find(self.testDir) != -1)
895 d = self._getBatchOutput(cmds)
896 d.addCallback(_cbCheckResult)
899 def testIgnoredError(self):
900 """Test that a minus sign '-' at the front of a line ignores
903 cmds = """-chown 0 missingFile
907 def _cbCheckResult(res):
908 self.failIf(res.find(self.testDir) == -1)
910 d = self._getBatchOutput(cmds)
911 d.addCallback(_cbCheckResult)
916 class TestOurServerSftpClient(CFTPClientTestBase):
918 Test the sftp server against sftp command line client.
922 CFTPClientTestBase.setUp(self)
923 return self.startServer()
927 return self.stopServer()
930 def test_extendedAttributes(self):
932 Test the return of extended attributes by the server: the sftp client
933 should ignore them, but still be able to parse the response correctly.
935 This test is mainly here to check that
936 L{filetransfer.FILEXFER_ATTR_EXTENDED} has the correct value.
939 open(fn, 'w').write("ls .\nexit")
940 port = self.server.getHost().port
942 oldGetAttr = FileTransferForTestAvatar._getAttrs
943 def _getAttrs(self, s):
944 attrs = oldGetAttr(self, s)
945 attrs["ext_foo"] = "bar"
948 self.patch(FileTransferForTestAvatar, "_getAttrs", _getAttrs)
950 self.server.factory.expectedLoseConnection = True
951 cmds = ('-o', 'IdentityFile=dsa_test',
952 '-o', 'UserKnownHostsFile=kh_test',
953 '-o', 'HostKeyAlgorithms=ssh-rsa',
954 '-o', 'Port=%i' % (port,), '-b', fn, 'testuser@127.0.0.1')
955 d = getProcessOutputAndValue("sftp", cmds)
957 self.assertEqual(result[2], 0)
958 for i in ['testDirectory', 'testRemoveFile',
959 'testRenameFile', 'testfile1']:
960 self.assertIn(i, result[0])
961 return d.addCallback(check)
965 if unix is None or Crypto is None or pyasn1 is None or interfaces.IReactorProcess(reactor, None) is None:
967 _reason = "don't run w/o spawnProcess or PyCrypto or pyasn1"
968 TestOurServerCmdLineClient.skip = _reason
969 TestOurServerBatchFile.skip = _reason
970 TestOurServerSftpClient.skip = _reason
971 StdioClientTests.skip = _reason
973 from twisted.python.procutils import which
974 if not which('sftp'):
975 TestOurServerSftpClient.skip = "no sftp command-line client available"