Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / conch / test / test_cftp.py
1 # -*- test-case-name: twisted.conch.test.test_cftp -*-
2 # Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3 # See LICENSE file for details.
4
5 """
6 Tests for L{twisted.conch.scripts.cftp}.
7 """
8
9 import locale
10 import time, sys, os, operator, getpass, struct
11 from StringIO import StringIO
12
13 from twisted.conch.test.test_ssh import Crypto, pyasn1
14
15 _reason = None
16 if Crypto and pyasn1:
17     try:
18         from twisted.conch import unix
19         from twisted.conch.scripts import cftp
20         from twisted.conch.test.test_filetransfer import FileTransferForTestAvatar
21     except ImportError, e:
22         # Python 2.3 compatibility fix
23         sys.modules.pop("twisted.conch.unix", None)
24         unix = None
25         _reason = str(e)
26         del e
27 else:
28     unix = None
29
30
31 from twisted.python.fakepwd import UserDatabase
32 from twisted.trial.unittest import TestCase
33 from twisted.cred import portal
34 from twisted.internet import reactor, protocol, interfaces, defer, error
35 from twisted.internet.utils import getProcessOutputAndValue
36 from twisted.python import log
37 from twisted.conch import ls
38 from twisted.test.proto_helpers import StringTransport
39 from twisted.internet.task import Clock
40
41 from twisted.conch.test import test_ssh, test_conch
42 from twisted.conch.test.test_filetransfer import SFTPTestBase
43 from twisted.conch.test.test_filetransfer import FileTransferTestAvatar
44
45
46
47 class ListingTests(TestCase):
48     """
49     Tests for L{lsLine}, the function which generates an entry for a file or
50     directory in an SFTP I{ls} command's output.
51     """
52     if getattr(time, 'tzset', None) is None:
53         skip = "Cannot test timestamp formatting code without time.tzset"
54
55     def setUp(self):
56         """
57         Patch the L{ls} module's time function so the results of L{lsLine} are
58         deterministic.
59         """
60         self.now = 123456789
61         def fakeTime():
62             return self.now
63         self.patch(ls, 'time', fakeTime)
64
65         # Make sure that the timezone ends up the same after these tests as
66         # it was before.
67         if 'TZ' in os.environ:
68             self.addCleanup(operator.setitem, os.environ, 'TZ', os.environ['TZ'])
69             self.addCleanup(time.tzset)
70         else:
71             def cleanup():
72                 # os.environ.pop is broken!  Don't use it!  Ever!  Or die!
73                 try:
74                     del os.environ['TZ']
75                 except KeyError:
76                     pass
77                 time.tzset()
78             self.addCleanup(cleanup)
79
80
81     def _lsInTimezone(self, timezone, stat):
82         """
83         Call L{ls.lsLine} after setting the timezone to C{timezone} and return
84         the result.
85         """
86         # Set the timezone to a well-known value so the timestamps are
87         # predictable.
88         os.environ['TZ'] = timezone
89         time.tzset()
90         return ls.lsLine('foo', stat)
91
92
93     def test_oldFile(self):
94         """
95         A file with an mtime six months (approximately) or more in the past has
96         a listing including a low-resolution timestamp.
97         """
98         # Go with 7 months.  That's more than 6 months.
99         then = self.now - (60 * 60 * 24 * 31 * 7)
100         stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))
101
102         self.assertEqual(
103             self._lsInTimezone('America/New_York', stat),
104             '!---------    0 0        0               0 Apr 26  1973 foo')
105         self.assertEqual(
106             self._lsInTimezone('Pacific/Auckland', stat),
107             '!---------    0 0        0               0 Apr 27  1973 foo')
108
109
110     def test_oldSingleDigitDayOfMonth(self):
111         """
112         A file with a high-resolution timestamp which falls on a day of the
113         month which can be represented by one decimal digit is formatted with
114         one padding 0 to preserve the columns which come after it.
115         """
116         # A point about 7 months in the past, tweaked to fall on the first of a
117         # month so we test the case we want to test.
118         then = self.now - (60 * 60 * 24 * 31 * 7) + (60 * 60 * 24 * 5)
119         stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))
120
121         self.assertEqual(
122             self._lsInTimezone('America/New_York', stat),
123             '!---------    0 0        0               0 May 01  1973 foo')
124         self.assertEqual(
125             self._lsInTimezone('Pacific/Auckland', stat),
126             '!---------    0 0        0               0 May 02  1973 foo')
127
128
129     def test_newFile(self):
130         """
131         A file with an mtime fewer than six months (approximately) in the past
132         has a listing including a high-resolution timestamp excluding the year.
133         """
134         # A point about three months in the past.
135         then = self.now - (60 * 60 * 24 * 31 * 3)
136         stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))
137
138         self.assertEqual(
139             self._lsInTimezone('America/New_York', stat),
140             '!---------    0 0        0               0 Aug 28 17:33 foo')
141         self.assertEqual(
142             self._lsInTimezone('Pacific/Auckland', stat),
143             '!---------    0 0        0               0 Aug 29 09:33 foo')
144
145
146     def test_localeIndependent(self):
147         """
148         The month name in the date is locale independent.
149         """
150         # A point about three months in the past.
151         then = self.now - (60 * 60 * 24 * 31 * 3)
152         stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))
153
154         # Fake that we're in a language where August is not Aug (e.g.: Spanish)
155         currentLocale = locale.getlocale()
156         locale.setlocale(locale.LC_ALL, "es_AR.UTF8")
157         self.addCleanup(locale.setlocale, locale.LC_ALL, currentLocale)
158
159         self.assertEqual(
160             self._lsInTimezone('America/New_York', stat),
161             '!---------    0 0        0               0 Aug 28 17:33 foo')
162         self.assertEqual(
163             self._lsInTimezone('Pacific/Auckland', stat),
164             '!---------    0 0        0               0 Aug 29 09:33 foo')
165
166     # if alternate locale is not available, the previous test will be
167     # skipped, please install this locale for it to run
168     currentLocale = locale.getlocale()
169     try:
170         try:
171             locale.setlocale(locale.LC_ALL, "es_AR.UTF8")
172         except locale.Error:
173             test_localeIndependent.skip = "The es_AR.UTF8 locale is not installed."
174     finally:
175         locale.setlocale(locale.LC_ALL, currentLocale)
176
177
178     def test_newSingleDigitDayOfMonth(self):
179         """
180         A file with a high-resolution timestamp which falls on a day of the
181         month which can be represented by one decimal digit is formatted with
182         one padding 0 to preserve the columns which come after it.
183         """
184         # A point about three months in the past, tweaked to fall on the first
185         # of a month so we test the case we want to test.
186         then = self.now - (60 * 60 * 24 * 31 * 3) + (60 * 60 * 24 * 4)
187         stat = os.stat_result((0, 0, 0, 0, 0, 0, 0, 0, then, 0))
188
189         self.assertEqual(
190             self._lsInTimezone('America/New_York', stat),
191             '!---------    0 0        0               0 Sep 01 17:33 foo')
192         self.assertEqual(
193             self._lsInTimezone('Pacific/Auckland', stat),
194             '!---------    0 0        0               0 Sep 02 09:33 foo')
195
196
197
198 class StdioClientTests(TestCase):
199     """
200     Tests for L{cftp.StdioClient}.
201     """
202     def setUp(self):
203         """
204         Create a L{cftp.StdioClient} hooked up to dummy transport and a fake
205         user database.
206         """
207         class Connection:
208             pass
209
210         conn = Connection()
211         conn.transport = StringTransport()
212         conn.transport.localClosed = False
213
214         self.client = cftp.StdioClient(conn)
215         self.database = self.client._pwd = UserDatabase()
216
217         # Intentionally bypassing makeConnection - that triggers some code
218         # which uses features not provided by our dumb Connection fake.
219         self.client.transport = StringTransport()
220
221
222     def test_exec(self):
223         """
224         The I{exec} command runs its arguments locally in a child process
225         using the user's shell.
226         """
227         self.database.addUser(
228             getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar',
229             sys.executable)
230
231         d = self.client._dispatchCommand("exec print 1 + 2")
232         d.addCallback(self.assertEqual, "3\n")
233         return d
234
235
236     def test_execWithoutShell(self):
237         """
238         If the local user has no shell, the I{exec} command runs its arguments
239         using I{/bin/sh}.
240         """
241         self.database.addUser(
242             getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar', '')
243
244         d = self.client._dispatchCommand("exec echo hello")
245         d.addCallback(self.assertEqual, "hello\n")
246         return d
247
248
249     def test_bang(self):
250         """
251         The I{exec} command is run for lines which start with C{"!"}.
252         """
253         self.database.addUser(
254             getpass.getuser(), 'secret', os.getuid(), 1234, 'foo', 'bar',
255             '/bin/sh')
256
257         d = self.client._dispatchCommand("!echo hello")
258         d.addCallback(self.assertEqual, "hello\n")
259         return d
260
261
262     def setKnownConsoleSize(self, width, height):
263         """
264         For the duration of this test, patch C{cftp}'s C{fcntl} module to return
265         a fixed width and height.
266
267         @param width: the width in characters
268         @type width: C{int}
269         @param height: the height in characters
270         @type height: C{int}
271         """
272         import tty # local import to avoid win32 issues
273         class FakeFcntl(object):
274             def ioctl(self, fd, opt, mutate):
275                 if opt != tty.TIOCGWINSZ:
276                     self.fail("Only window-size queries supported.")
277                 return struct.pack("4H", height, width, 0, 0)
278         self.patch(cftp, "fcntl", FakeFcntl())
279
280
281     def test_progressReporting(self):
282         """
283         L{StdioClient._printProgressBar} prints a progress description,
284         including percent done, amount transferred, transfer rate, and time
285         remaining, all based the given start time, the given L{FileWrapper}'s
286         progress information and the reactor's current time.
287         """
288         # Use a short, known console width because this simple test doesn't need
289         # to test the console padding.
290         self.setKnownConsoleSize(10, 34)
291         clock = self.client.reactor = Clock()
292         wrapped = StringIO("x")
293         wrapped.name = "sample"
294         wrapper = cftp.FileWrapper(wrapped)
295         wrapper.size = 1024 * 10
296         startTime = clock.seconds()
297         clock.advance(2.0)
298         wrapper.total += 4096
299         self.client._printProgressBar(wrapper, startTime)
300         self.assertEqual(self.client.transport.value(),
301                           "\rsample 40% 4.0kB 2.0kBps 00:03 ")
302
303
304     def test_reportNoProgress(self):
305         """
306         L{StdioClient._printProgressBar} prints a progress description that
307         indicates 0 bytes transferred if no bytes have been transferred and no
308         time has passed.
309         """
310         self.setKnownConsoleSize(10, 34)
311         clock = self.client.reactor = Clock()
312         wrapped = StringIO("x")
313         wrapped.name = "sample"
314         wrapper = cftp.FileWrapper(wrapped)
315         startTime = clock.seconds()
316         self.client._printProgressBar(wrapper, startTime)
317         self.assertEqual(self.client.transport.value(),
318                           "\rsample  0% 0.0B 0.0Bps 00:00 ")
319
320
321
322 class FileTransferTestRealm:
323     def __init__(self, testDir):
324         self.testDir = testDir
325
326     def requestAvatar(self, avatarID, mind, *interfaces):
327         a = FileTransferTestAvatar(self.testDir)
328         return interfaces[0], a, lambda: None
329
330
331 class SFTPTestProcess(protocol.ProcessProtocol):
332     """
333     Protocol for testing cftp. Provides an interface between Python (where all
334     the tests are) and the cftp client process (which does the work that is
335     being tested).
336     """
337
338     def __init__(self, onOutReceived):
339         """
340         @param onOutReceived: A L{Deferred} to be fired as soon as data is
341         received from stdout.
342         """
343         self.clearBuffer()
344         self.onOutReceived = onOutReceived
345         self.onProcessEnd = None
346         self._expectingCommand = None
347         self._processEnded = False
348
349     def clearBuffer(self):
350         """
351         Clear any buffered data received from stdout. Should be private.
352         """
353         self.buffer = ''
354         self._linesReceived = []
355         self._lineBuffer = ''
356
357     def outReceived(self, data):
358         """
359         Called by Twisted when the cftp client prints data to stdout.
360         """
361         log.msg('got %s' % data)
362         lines = (self._lineBuffer + data).split('\n')
363         self._lineBuffer = lines.pop(-1)
364         self._linesReceived.extend(lines)
365         # XXX - not strictly correct.
366         # We really want onOutReceived to fire after the first 'cftp>' prompt
367         # has been received. (See use in TestOurServerCmdLineClient.setUp)
368         if self.onOutReceived is not None:
369             d, self.onOutReceived = self.onOutReceived, None
370             d.callback(data)
371         self.buffer += data
372         self._checkForCommand()
373
374     def _checkForCommand(self):
375         prompt = 'cftp> '
376         if self._expectingCommand and self._lineBuffer == prompt:
377             buf = '\n'.join(self._linesReceived)
378             if buf.startswith(prompt):
379                 buf = buf[len(prompt):]
380             self.clearBuffer()
381             d, self._expectingCommand = self._expectingCommand, None
382             d.callback(buf)
383
384     def errReceived(self, data):
385         """
386         Called by Twisted when the cftp client prints data to stderr.
387         """
388         log.msg('err: %s' % data)
389
390     def getBuffer(self):
391         """
392         Return the contents of the buffer of data received from stdout.
393         """
394         return self.buffer
395
396     def runCommand(self, command):
397         """
398         Issue the given command via the cftp client. Return a C{Deferred} that
399         fires when the server returns a result. Note that the C{Deferred} will
400         callback even if the server returns some kind of error.
401
402         @param command: A string containing an sftp command.
403
404         @return: A C{Deferred} that fires when the sftp server returns a
405         result. The payload is the server's response string.
406         """
407         self._expectingCommand = defer.Deferred()
408         self.clearBuffer()
409         self.transport.write(command + '\n')
410         return self._expectingCommand
411
412     def runScript(self, commands):
413         """
414         Run each command in sequence and return a Deferred that fires when all
415         commands are completed.
416
417         @param commands: A list of strings containing sftp commands.
418
419         @return: A C{Deferred} that fires when all commands are completed. The
420         payload is a list of response strings from the server, in the same
421         order as the commands.
422         """
423         sem = defer.DeferredSemaphore(1)
424         dl = [sem.run(self.runCommand, command) for command in commands]
425         return defer.gatherResults(dl)
426
427     def killProcess(self):
428         """
429         Kill the process if it is still running.
430
431         If the process is still running, sends a KILL signal to the transport
432         and returns a C{Deferred} which fires when L{processEnded} is called.
433
434         @return: a C{Deferred}.
435         """
436         if self._processEnded:
437             return defer.succeed(None)
438         self.onProcessEnd = defer.Deferred()
439         self.transport.signalProcess('KILL')
440         return self.onProcessEnd
441
442     def processEnded(self, reason):
443         """
444         Called by Twisted when the cftp client process ends.
445         """
446         self._processEnded = True
447         if self.onProcessEnd:
448             d, self.onProcessEnd = self.onProcessEnd, None
449             d.callback(None)
450
451
452 class CFTPClientTestBase(SFTPTestBase):
453     def setUp(self):
454         f = open('dsa_test.pub','w')
455         f.write(test_ssh.publicDSA_openssh)
456         f.close()
457         f = open('dsa_test','w')
458         f.write(test_ssh.privateDSA_openssh)
459         f.close()
460         os.chmod('dsa_test', 33152)
461         f = open('kh_test','w')
462         f.write('127.0.0.1 ' + test_ssh.publicRSA_openssh)
463         f.close()
464         return SFTPTestBase.setUp(self)
465
466     def startServer(self):
467         realm = FileTransferTestRealm(self.testDir)
468         p = portal.Portal(realm)
469         p.registerChecker(test_ssh.ConchTestPublicKeyChecker())
470         fac = test_ssh.ConchTestServerFactory()
471         fac.portal = p
472         self.server = reactor.listenTCP(0, fac, interface="127.0.0.1")
473
474     def stopServer(self):
475         if not hasattr(self.server.factory, 'proto'):
476             return self._cbStopServer(None)
477         self.server.factory.proto.expectedLoseConnection = 1
478         d = defer.maybeDeferred(
479             self.server.factory.proto.transport.loseConnection)
480         d.addCallback(self._cbStopServer)
481         return d
482
483     def _cbStopServer(self, ignored):
484         return defer.maybeDeferred(self.server.stopListening)
485
486     def tearDown(self):
487         for f in ['dsa_test.pub', 'dsa_test', 'kh_test']:
488             try:
489                 os.remove(f)
490             except:
491                 pass
492         return SFTPTestBase.tearDown(self)
493
494
495
496 class TestOurServerCmdLineClient(CFTPClientTestBase):
497
498     def setUp(self):
499         CFTPClientTestBase.setUp(self)
500
501         self.startServer()
502         cmds = ('-p %i -l testuser '
503                '--known-hosts kh_test '
504                '--user-authentications publickey '
505                '--host-key-algorithms ssh-rsa '
506                '-i dsa_test '
507                '-a '
508                '-v '
509                '127.0.0.1')
510         port = self.server.getHost().port
511         cmds = test_conch._makeArgs((cmds % port).split(), mod='cftp')
512         log.msg('running %s %s' % (sys.executable, cmds))
513         d = defer.Deferred()
514         self.processProtocol = SFTPTestProcess(d)
515         d.addCallback(lambda _: self.processProtocol.clearBuffer())
516         env = os.environ.copy()
517         env['PYTHONPATH'] = os.pathsep.join(sys.path)
518         reactor.spawnProcess(self.processProtocol, sys.executable, cmds,
519                              env=env)
520         return d
521
522     def tearDown(self):
523         d = self.stopServer()
524         d.addCallback(lambda _: self.processProtocol.killProcess())
525         return d
526
527     def _killProcess(self, ignored):
528         try:
529             self.processProtocol.transport.signalProcess('KILL')
530         except error.ProcessExitedAlready:
531             pass
532
533     def runCommand(self, command):
534         """
535         Run the given command with the cftp client. Return a C{Deferred} that
536         fires when the command is complete. Payload is the server's output for
537         that command.
538         """
539         return self.processProtocol.runCommand(command)
540
541     def runScript(self, *commands):
542         """
543         Run the given commands with the cftp client. Returns a C{Deferred}
544         that fires when the commands are all complete. The C{Deferred}'s
545         payload is a list of output for each command.
546         """
547         return self.processProtocol.runScript(commands)
548
549     def testCdPwd(self):
550         """
551         Test that 'pwd' reports the current remote directory, that 'lpwd'
552         reports the current local directory, and that changing to a
553         subdirectory then changing to its parent leaves you in the original
554         remote directory.
555         """
556         # XXX - not actually a unit test, see docstring.
557         homeDir = os.path.join(os.getcwd(), self.testDir)
558         d = self.runScript('pwd', 'lpwd', 'cd testDirectory', 'cd ..', 'pwd')
559         d.addCallback(lambda xs: xs[:3] + xs[4:])
560         d.addCallback(self.assertEqual,
561                       [homeDir, os.getcwd(), '', homeDir])
562         return d
563
564     def testChAttrs(self):
565         """
566         Check that 'ls -l' output includes the access permissions and that
567         this output changes appropriately with 'chmod'.
568         """
569         def _check(results):
570             self.flushLoggedErrors()
571             self.assertTrue(results[0].startswith('-rw-r--r--'))
572             self.assertEqual(results[1], '')
573             self.assertTrue(results[2].startswith('----------'), results[2])
574             self.assertEqual(results[3], '')
575
576         d = self.runScript('ls -l testfile1', 'chmod 0 testfile1',
577                            'ls -l testfile1', 'chmod 644 testfile1')
578         return d.addCallback(_check)
579         # XXX test chgrp/own
580
581
582     def testList(self):
583         """
584         Check 'ls' works as expected. Checks for wildcards, hidden files,
585         listing directories and listing empty directories.
586         """
587         def _check(results):
588             self.assertEqual(results[0], ['testDirectory', 'testRemoveFile',
589                                           'testRenameFile', 'testfile1'])
590             self.assertEqual(results[1], ['testDirectory', 'testRemoveFile',
591                                           'testRenameFile', 'testfile1'])
592             self.assertEqual(results[2], ['testRemoveFile', 'testRenameFile'])
593             self.assertEqual(results[3], ['.testHiddenFile', 'testRemoveFile',
594                                           'testRenameFile'])
595             self.assertEqual(results[4], [''])
596         d = self.runScript('ls', 'ls ../' + os.path.basename(self.testDir),
597                            'ls *File', 'ls -a *File', 'ls -l testDirectory')
598         d.addCallback(lambda xs: [x.split('\n') for x in xs])
599         return d.addCallback(_check)
600
601
602     def testHelp(self):
603         """
604         Check that running the '?' command returns help.
605         """
606         d = self.runCommand('?')
607         d.addCallback(self.assertEqual,
608                       cftp.StdioClient(None).cmd_HELP('').strip())
609         return d
610
611     def assertFilesEqual(self, name1, name2, msg=None):
612         """
613         Assert that the files at C{name1} and C{name2} contain exactly the
614         same data.
615         """
616         f1 = file(name1).read()
617         f2 = file(name2).read()
618         self.assertEqual(f1, f2, msg)
619
620
621     def testGet(self):
622         """
623         Test that 'get' saves the remote file to the correct local location,
624         that the output of 'get' is correct and that 'rm' actually removes
625         the file.
626         """
627         # XXX - not actually a unit test
628         expectedOutput = ("Transferred %s/%s/testfile1 to %s/test file2"
629                           % (os.getcwd(), self.testDir, self.testDir))
630         def _checkGet(result):
631             self.assertTrue(result.endswith(expectedOutput))
632             self.assertFilesEqual(self.testDir + '/testfile1',
633                                   self.testDir + '/test file2',
634                                   "get failed")
635             return self.runCommand('rm "test file2"')
636
637         d = self.runCommand('get testfile1 "%s/test file2"' % (self.testDir,))
638         d.addCallback(_checkGet)
639         d.addCallback(lambda _: self.failIf(
640             os.path.exists(self.testDir + '/test file2')))
641         return d
642
643
644     def testWildcardGet(self):
645         """
646         Test that 'get' works correctly when given wildcard parameters.
647         """
648         def _check(ignored):
649             self.assertFilesEqual(self.testDir + '/testRemoveFile',
650                                   'testRemoveFile',
651                                   'testRemoveFile get failed')
652             self.assertFilesEqual(self.testDir + '/testRenameFile',
653                                   'testRenameFile',
654                                   'testRenameFile get failed')
655
656         d = self.runCommand('get testR*')
657         return d.addCallback(_check)
658
659
660     def testPut(self):
661         """
662         Check that 'put' uploads files correctly and that they can be
663         successfully removed. Also check the output of the put command.
664         """
665         # XXX - not actually a unit test
666         expectedOutput = ('Transferred %s/testfile1 to %s/%s/test"file2'
667                           % (self.testDir, os.getcwd(), self.testDir))
668         def _checkPut(result):
669             self.assertFilesEqual(self.testDir + '/testfile1',
670                                   self.testDir + '/test"file2')
671             self.failUnless(result.endswith(expectedOutput))
672             return self.runCommand('rm "test\\"file2"')
673
674         d = self.runCommand('put %s/testfile1 "test\\"file2"'
675                             % (self.testDir,))
676         d.addCallback(_checkPut)
677         d.addCallback(lambda _: self.failIf(
678             os.path.exists(self.testDir + '/test"file2')))
679         return d
680
681
682     def test_putOverLongerFile(self):
683         """
684         Check that 'put' uploads files correctly when overwriting a longer
685         file.
686         """
687         # XXX - not actually a unit test
688         f = file(os.path.join(self.testDir, 'shorterFile'), 'w')
689         f.write("a")
690         f.close()
691         f = file(os.path.join(self.testDir, 'longerFile'), 'w')
692         f.write("bb")
693         f.close()
694         def _checkPut(result):
695             self.assertFilesEqual(self.testDir + '/shorterFile',
696                                   self.testDir + '/longerFile')
697
698         d = self.runCommand('put %s/shorterFile longerFile'
699                             % (self.testDir,))
700         d.addCallback(_checkPut)
701         return d
702
703
704     def test_putMultipleOverLongerFile(self):
705         """
706         Check that 'put' uploads files correctly when overwriting a longer
707         file and you use a wildcard to specify the files to upload.
708         """
709         # XXX - not actually a unit test
710         os.mkdir(os.path.join(self.testDir, 'dir'))
711         f = file(os.path.join(self.testDir, 'dir', 'file'), 'w')
712         f.write("a")
713         f.close()
714         f = file(os.path.join(self.testDir, 'file'), 'w')
715         f.write("bb")
716         f.close()
717         def _checkPut(result):
718             self.assertFilesEqual(self.testDir + '/dir/file',
719                                   self.testDir + '/file')
720
721         d = self.runCommand('put %s/dir/*'
722                             % (self.testDir,))
723         d.addCallback(_checkPut)
724         return d
725
726
727     def testWildcardPut(self):
728         """
729         What happens if you issue a 'put' command and include a wildcard (i.e.
730         '*') in parameter? Check that all files matching the wildcard are
731         uploaded to the correct directory.
732         """
733         def check(results):
734             self.assertEqual(results[0], '')
735             self.assertEqual(results[2], '')
736             self.assertFilesEqual(self.testDir + '/testRemoveFile',
737                                   self.testDir + '/../testRemoveFile',
738                                   'testRemoveFile get failed')
739             self.assertFilesEqual(self.testDir + '/testRenameFile',
740                                   self.testDir + '/../testRenameFile',
741                                   'testRenameFile get failed')
742
743         d = self.runScript('cd ..',
744                            'put %s/testR*' % (self.testDir,),
745                            'cd %s' % os.path.basename(self.testDir))
746         d.addCallback(check)
747         return d
748
749
750     def testLink(self):
751         """
752         Test that 'ln' creates a file which appears as a link in the output of
753         'ls'. Check that removing the new file succeeds without output.
754         """
755         def _check(results):
756             self.flushLoggedErrors()
757             self.assertEqual(results[0], '')
758             self.assertTrue(results[1].startswith('l'), 'link failed')
759             return self.runCommand('rm testLink')
760
761         d = self.runScript('ln testLink testfile1', 'ls -l testLink')
762         d.addCallback(_check)
763         d.addCallback(self.assertEqual, '')
764         return d
765
766
767     def testRemoteDirectory(self):
768         """
769         Test that we can create and remove directories with the cftp client.
770         """
771         def _check(results):
772             self.assertEqual(results[0], '')
773             self.assertTrue(results[1].startswith('d'))
774             return self.runCommand('rmdir testMakeDirectory')
775
776         d = self.runScript('mkdir testMakeDirectory',
777                            'ls -l testMakeDirector?')
778         d.addCallback(_check)
779         d.addCallback(self.assertEqual, '')
780         return d
781
782
783     def test_existingRemoteDirectory(self):
784         """
785         Test that a C{mkdir} on an existing directory fails with the
786         appropriate error, and doesn't log an useless error server side.
787         """
788         def _check(results):
789             self.assertEqual(results[0], '')
790             self.assertEqual(results[1],
791                               'remote error 11: mkdir failed')
792
793         d = self.runScript('mkdir testMakeDirectory',
794                            'mkdir testMakeDirectory')
795         d.addCallback(_check)
796         return d
797
798
799     def testLocalDirectory(self):
800         """
801         Test that we can create a directory locally and remove it with the
802         cftp client. This test works because the 'remote' server is running
803         out of a local directory.
804         """
805         d = self.runCommand('lmkdir %s/testLocalDirectory' % (self.testDir,))
806         d.addCallback(self.assertEqual, '')
807         d.addCallback(lambda _: self.runCommand('rmdir testLocalDirectory'))
808         d.addCallback(self.assertEqual, '')
809         return d
810
811
812     def testRename(self):
813         """
814         Test that we can rename a file.
815         """
816         def _check(results):
817             self.assertEqual(results[0], '')
818             self.assertEqual(results[1], 'testfile2')
819             return self.runCommand('rename testfile2 testfile1')
820
821         d = self.runScript('rename testfile1 testfile2', 'ls testfile?')
822         d.addCallback(_check)
823         d.addCallback(self.assertEqual, '')
824         return d
825
826
827
828 class TestOurServerBatchFile(CFTPClientTestBase):
829     def setUp(self):
830         CFTPClientTestBase.setUp(self)
831         self.startServer()
832
833     def tearDown(self):
834         CFTPClientTestBase.tearDown(self)
835         return self.stopServer()
836
837     def _getBatchOutput(self, f):
838         fn = self.mktemp()
839         open(fn, 'w').write(f)
840         port = self.server.getHost().port
841         cmds = ('-p %i -l testuser '
842                     '--known-hosts kh_test '
843                     '--user-authentications publickey '
844                     '--host-key-algorithms ssh-rsa '
845                     '-i dsa_test '
846                     '-a '
847                     '-v -b %s 127.0.0.1') % (port, fn)
848         cmds = test_conch._makeArgs(cmds.split(), mod='cftp')[1:]
849         log.msg('running %s %s' % (sys.executable, cmds))
850         env = os.environ.copy()
851         env['PYTHONPATH'] = os.pathsep.join(sys.path)
852
853         self.server.factory.expectedLoseConnection = 1
854
855         d = getProcessOutputAndValue(sys.executable, cmds, env=env)
856
857         def _cleanup(res):
858             os.remove(fn)
859             return res
860
861         d.addCallback(lambda res: res[0])
862         d.addBoth(_cleanup)
863
864         return d
865
866     def testBatchFile(self):
867         """Test whether batch file function of cftp ('cftp -b batchfile').
868         This works by treating the file as a list of commands to be run.
869         """
870         cmds = """pwd
871 ls
872 exit
873 """
874         def _cbCheckResult(res):
875             res = res.split('\n')
876             log.msg('RES %s' % str(res))
877             self.failUnless(res[1].find(self.testDir) != -1, repr(res))
878             self.assertEqual(res[3:-2], ['testDirectory', 'testRemoveFile',
879                                              'testRenameFile', 'testfile1'])
880
881         d = self._getBatchOutput(cmds)
882         d.addCallback(_cbCheckResult)
883         return d
884
885     def testError(self):
886         """Test that an error in the batch file stops running the batch.
887         """
888         cmds = """chown 0 missingFile
889 pwd
890 exit
891 """
892         def _cbCheckResult(res):
893             self.failIf(res.find(self.testDir) != -1)
894
895         d = self._getBatchOutput(cmds)
896         d.addCallback(_cbCheckResult)
897         return d
898
899     def testIgnoredError(self):
900         """Test that a minus sign '-' at the front of a line ignores
901         any errors.
902         """
903         cmds = """-chown 0 missingFile
904 pwd
905 exit
906 """
907         def _cbCheckResult(res):
908             self.failIf(res.find(self.testDir) == -1)
909
910         d = self._getBatchOutput(cmds)
911         d.addCallback(_cbCheckResult)
912         return d
913
914
915
916 class TestOurServerSftpClient(CFTPClientTestBase):
917     """
918     Test the sftp server against sftp command line client.
919     """
920
921     def setUp(self):
922         CFTPClientTestBase.setUp(self)
923         return self.startServer()
924
925
926     def tearDown(self):
927         return self.stopServer()
928
929
930     def test_extendedAttributes(self):
931         """
932         Test the return of extended attributes by the server: the sftp client
933         should ignore them, but still be able to parse the response correctly.
934
935         This test is mainly here to check that
936         L{filetransfer.FILEXFER_ATTR_EXTENDED} has the correct value.
937         """
938         fn = self.mktemp()
939         open(fn, 'w').write("ls .\nexit")
940         port = self.server.getHost().port
941
942         oldGetAttr = FileTransferForTestAvatar._getAttrs
943         def _getAttrs(self, s):
944             attrs = oldGetAttr(self, s)
945             attrs["ext_foo"] = "bar"
946             return attrs
947
948         self.patch(FileTransferForTestAvatar, "_getAttrs", _getAttrs)
949
950         self.server.factory.expectedLoseConnection = True
951         cmds = ('-o', 'IdentityFile=dsa_test',
952                 '-o', 'UserKnownHostsFile=kh_test',
953                 '-o', 'HostKeyAlgorithms=ssh-rsa',
954                 '-o', 'Port=%i' % (port,), '-b', fn, 'testuser@127.0.0.1')
955         d = getProcessOutputAndValue("sftp", cmds)
956         def check(result):
957             self.assertEqual(result[2], 0)
958             for i in ['testDirectory', 'testRemoveFile',
959                       'testRenameFile', 'testfile1']:
960                 self.assertIn(i, result[0])
961         return d.addCallback(check)
962
963
964
965 if unix is None or Crypto is None or pyasn1 is None or interfaces.IReactorProcess(reactor, None) is None:
966     if _reason is None:
967         _reason = "don't run w/o spawnProcess or PyCrypto or pyasn1"
968     TestOurServerCmdLineClient.skip = _reason
969     TestOurServerBatchFile.skip = _reason
970     TestOurServerSftpClient.skip = _reason
971     StdioClientTests.skip = _reason
972 else:
973     from twisted.python.procutils import which
974     if not which('sftp'):
975         TestOurServerSftpClient.skip = "no sftp command-line client available"