1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Tests for L{twisted.conch.client.default}.
8 import Crypto.Cipher.DES3
11 skip = "PyCrypto and PyASN1 required for twisted.conch.client.default."
13 from twisted.conch.client.agent import SSHAgentClient
14 from twisted.conch.client.default import SSHUserAuthClient
15 from twisted.conch.client.options import ConchOptions
16 from twisted.conch.ssh.keys import Key
19 from twisted.trial.unittest import TestCase
20 from twisted.python.filepath import FilePath
21 from twisted.conch.test import keydata
22 from twisted.test.proto_helpers import StringTransport
26 class SSHUserAuthClientTest(TestCase):
28 Tests for L{SSHUserAuthClient}.
30 @type rsaPublic: L{Key}
31 @ivar rsaPublic: A public RSA key.
35 self.rsaPublic = Key.fromString(keydata.publicRSA_openssh)
36 self.tmpdir = FilePath(self.mktemp())
37 self.tmpdir.makedirs()
38 self.rsaFile = self.tmpdir.child('id_rsa')
39 self.rsaFile.setContent(keydata.privateRSA_openssh)
40 self.tmpdir.child('id_rsa.pub').setContent(keydata.publicRSA_openssh)
43 def test_signDataWithAgent(self):
45 When connected to an agent, L{SSHUserAuthClient} can use it to
46 request signatures of particular data with a particular L{Key}.
48 client = SSHUserAuthClient("user", ConchOptions(), None)
49 agent = SSHAgentClient()
50 transport = StringTransport()
51 agent.makeConnection(transport)
52 client.keyAgent = agent
53 cleartext = "Sign here"
54 client.signData(self.rsaPublic, cleartext)
57 "\x00\x00\x00\x8b\r\x00\x00\x00u" + self.rsaPublic.blob() +
58 "\x00\x00\x00\t" + cleartext +
62 def test_agentGetPublicKey(self):
64 L{SSHUserAuthClient} looks up public keys from the agent using the
65 L{SSHAgentClient} class. That L{SSHAgentClient.getPublicKey} returns a
66 L{Key} object with one of the public keys in the agent. If no more
67 keys are present, it returns C{None}.
69 agent = SSHAgentClient()
70 agent.blobs = [self.rsaPublic.blob()]
71 key = agent.getPublicKey()
72 self.assertEqual(key.isPublic(), True)
73 self.assertEqual(key, self.rsaPublic)
74 self.assertEqual(agent.getPublicKey(), None)
77 def test_getPublicKeyFromFile(self):
79 L{SSHUserAuthClient.getPublicKey()} is able to get a public key from
80 the first file described by its options' C{identitys} list, and return
81 the corresponding public L{Key} object.
83 options = ConchOptions()
84 options.identitys = [self.rsaFile.path]
85 client = SSHUserAuthClient("user", options, None)
86 key = client.getPublicKey()
87 self.assertEqual(key.isPublic(), True)
88 self.assertEqual(key, self.rsaPublic)
91 def test_getPublicKeyAgentFallback(self):
93 If an agent is present, but doesn't return a key,
94 L{SSHUserAuthClient.getPublicKey} continue with the normal key lookup.
96 options = ConchOptions()
97 options.identitys = [self.rsaFile.path]
98 agent = SSHAgentClient()
99 client = SSHUserAuthClient("user", options, None)
100 client.keyAgent = agent
101 key = client.getPublicKey()
102 self.assertEqual(key.isPublic(), True)
103 self.assertEqual(key, self.rsaPublic)
106 def test_getPublicKeyBadKeyError(self):
108 If L{keys.Key.fromFile} raises a L{keys.BadKeyError}, the
109 L{SSHUserAuthClient.getPublicKey} tries again to get a public key by
110 calling itself recursively.
112 options = ConchOptions()
113 self.tmpdir.child('id_dsa.pub').setContent(keydata.publicDSA_openssh)
114 dsaFile = self.tmpdir.child('id_dsa')
115 dsaFile.setContent(keydata.privateDSA_openssh)
116 options.identitys = [self.rsaFile.path, dsaFile.path]
117 self.tmpdir.child('id_rsa.pub').setContent('not a key!')
118 client = SSHUserAuthClient("user", options, None)
119 key = client.getPublicKey()
120 self.assertEqual(key.isPublic(), True)
121 self.assertEqual(key, Key.fromString(keydata.publicDSA_openssh))
122 self.assertEqual(client.usedFiles, [self.rsaFile.path, dsaFile.path])
125 def test_getPrivateKey(self):
127 L{SSHUserAuthClient.getPrivateKey} will load a private key from the
128 last used file populated by L{SSHUserAuthClient.getPublicKey}, and
129 return a L{Deferred} which fires with the corresponding private L{Key}.
131 rsaPrivate = Key.fromString(keydata.privateRSA_openssh)
132 options = ConchOptions()
133 options.identitys = [self.rsaFile.path]
134 client = SSHUserAuthClient("user", options, None)
135 # Populate the list of used files
136 client.getPublicKey()
138 def _cbGetPrivateKey(key):
139 self.assertEqual(key.isPublic(), False)
140 self.assertEqual(key, rsaPrivate)
142 return client.getPrivateKey().addCallback(_cbGetPrivateKey)
145 def test_getPrivateKeyPassphrase(self):
147 L{SSHUserAuthClient} can get a private key from a file, and return a
148 Deferred called back with a private L{Key} object, even if the key is
151 rsaPrivate = Key.fromString(keydata.privateRSA_openssh)
152 passphrase = 'this is the passphrase'
153 self.rsaFile.setContent(rsaPrivate.toString('openssh', passphrase))
154 options = ConchOptions()
155 options.identitys = [self.rsaFile.path]
156 client = SSHUserAuthClient("user", options, None)
157 # Populate the list of used files
158 client.getPublicKey()
160 def _getPassword(prompt):
161 self.assertEqual(prompt,
162 "Enter passphrase for key '%s': " % (
166 def _cbGetPrivateKey(key):
167 self.assertEqual(key.isPublic(), False)
168 self.assertEqual(key, rsaPrivate)
170 self.patch(client, '_getPassword', _getPassword)
171 return client.getPrivateKey().addCallback(_cbGetPrivateKey)