1 # -*- test-case-name: twisted.conch.test.test_knownhosts,twisted.conch.test.test_default -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Various classes and functions for implementing user-interaction in the
7 command-line conch client.
9 You probably shouldn't use anything in this module directly, since it assumes
10 you are sitting at an interactive terminal. For example, to programmatically
11 interact with a known_hosts database, use L{twisted.conch.client.knownhosts}.
14 from twisted.python import log
15 from twisted.python.filepath import FilePath
17 from twisted.conch.error import ConchError
18 from twisted.conch.ssh import common, keys, userauth
19 from twisted.internet import defer, protocol, reactor
21 from twisted.conch.client.knownhosts import KnownHostsFile, ConsoleUI
23 from twisted.conch.client import agent
25 import os, sys, base64, getpass
27 # This name is bound so that the unit tests can use 'patch' to override it.
30 def verifyHostKey(transport, host, pubKey, fingerprint):
34 This function is a gross vestige of some bad factoring in the client
35 internals. The actual implementation, and a better signature of this logic
36 is in L{KnownHostsFile.verifyHostKey}. This function is not deprecated yet
37 because the callers have not yet been rehabilitated, but they should
38 eventually be changed to call that method instead.
40 However, this function does perform two functions not implemented by
41 L{KnownHostsFile.verifyHostKey}. It determines the path to the user's
42 known_hosts file based on the options (which should really be the options
43 object's job), and it provides an opener to L{ConsoleUI} which opens
44 '/dev/tty' so that the user will be prompted on the tty of the process even
45 if the input and output of the process has been redirected. This latter
46 part is, somewhat obviously, not portable, but I don't know of a portable
47 equivalent that could be used.
49 @param host: Due to a bug in L{SSHClientTransport.verifyHostKey}, this is
50 always the dotted-quad IP address of the host being connected to.
53 @param transport: the client transport which is attempting to connect to
55 @type transport: L{SSHClientTransport}
57 @param fingerprint: the fingerprint of the given public key, in
58 xx:xx:xx:... format. This is ignored in favor of getting the fingerprint
60 @type fingerprint: L{str}
62 @param pubKey: The public key of the server being connected to.
65 @return: a L{Deferred} which fires with C{1} if the key was successfully
66 verified, or fails if the key could not be successfully verified. Failure
67 types may include L{HostKeyChanged}, L{UserRejectedKey}, L{IOError} or
70 actualHost = transport.factory.options['host']
71 actualKey = keys.Key.fromString(pubKey)
72 kh = KnownHostsFile.fromPath(FilePath(
73 transport.factory.options['known-hosts']
74 or os.path.expanduser("~/.ssh/known_hosts")
76 ui = ConsoleUI(lambda : _open("/dev/tty", "r+b"))
77 return kh.verifyHostKey(ui, actualHost, host, actualKey)
81 def isInKnownHosts(host, pubKey, options):
82 """checks to see if host is in the known_hosts file for the user.
83 returns 0 if it isn't, 1 if it is and is the same, 2 if it's changed.
85 keyType = common.getNS(pubKey)[0]
88 if not options['known-hosts'] and not os.path.exists(os.path.expanduser('~/.ssh/')):
89 print 'Creating ~/.ssh directory...'
90 os.mkdir(os.path.expanduser('~/.ssh'))
91 kh_file = options['known-hosts'] or '~/.ssh/known_hosts'
93 known_hosts = open(os.path.expanduser(kh_file))
96 for line in known_hosts.xreadlines():
100 hosts, hostKeyType, encodedKey = split[:3]
101 if host not in hosts.split(','): # incorrect host
103 if hostKeyType != keyType: # incorrect type of key
106 decodedKey = base64.decodestring(encodedKey)
109 if decodedKey == pubKey:
117 class SSHUserAuthClient(userauth.SSHUserAuthClient):
119 def __init__(self, user, options, *args):
120 userauth.SSHUserAuthClient.__init__(self, user, *args)
122 self.options = options
124 if not options.identitys:
125 options.identitys = ['~/.ssh/id_rsa', '~/.ssh/id_dsa']
127 def serviceStarted(self):
128 if 'SSH_AUTH_SOCK' in os.environ and not self.options['noagent']:
129 log.msg('using agent')
130 cc = protocol.ClientCreator(reactor, agent.SSHAgentClient)
131 d = cc.connectUNIX(os.environ['SSH_AUTH_SOCK'])
132 d.addCallback(self._setAgent)
133 d.addErrback(self._ebSetAgent)
135 userauth.SSHUserAuthClient.serviceStarted(self)
137 def serviceStopped(self):
139 self.keyAgent.transport.loseConnection()
142 def _setAgent(self, a):
144 d = self.keyAgent.getPublicKeys()
145 d.addBoth(self._ebSetAgent)
148 def _ebSetAgent(self, f):
149 userauth.SSHUserAuthClient.serviceStarted(self)
151 def _getPassword(self, prompt):
153 oldout, oldin = sys.stdout, sys.stdin
154 sys.stdin = sys.stdout = open('/dev/tty','r+')
155 p=getpass.getpass(prompt)
156 sys.stdout,sys.stdin=oldout,oldin
158 except (KeyboardInterrupt, IOError):
160 raise ConchError('PEBKAC')
162 def getPassword(self, prompt = None):
164 prompt = "%s@%s's password: " % (self.user, self.transport.transport.getPeer().host)
166 p = self._getPassword(prompt)
167 return defer.succeed(p)
172 def getPublicKey(self):
174 Get a public key from the key agent if possible, otherwise look in
175 the next configured identity file for one.
178 key = self.keyAgent.getPublicKey()
181 files = [x for x in self.options.identitys if x not in self.usedFiles]
182 log.msg(str(self.options.identitys))
188 self.usedFiles.append(file)
189 file = os.path.expanduser(file)
191 if not os.path.exists(file):
192 return self.getPublicKey() # try again
194 return keys.Key.fromFile(file)
195 except keys.BadKeyError:
196 return self.getPublicKey() # try again
199 def signData(self, publicKey, signData):
201 Extend the base signing behavior by using an SSH agent to sign the
202 data, if one is available.
204 @type publicKey: L{Key}
205 @type signData: C{str}
207 if not self.usedFiles: # agent key
208 return self.keyAgent.signData(publicKey.blob(), signData)
210 return userauth.SSHUserAuthClient.signData(self, publicKey, signData)
213 def getPrivateKey(self):
215 Try to load the private key from the last used file identified by
216 C{getPublicKey}, potentially asking for the passphrase if the key is
219 file = os.path.expanduser(self.usedFiles[-1])
220 if not os.path.exists(file):
223 return defer.succeed(keys.Key.fromFile(file))
224 except keys.EncryptedKeyError:
226 prompt = "Enter passphrase for key '%s': " % \
229 p = self._getPassword(prompt)
230 return defer.succeed(keys.Key.fromFile(file, passphrase=p))
231 except (keys.BadKeyError, ConchError):
233 return defer.fail(ConchError('bad password'))
235 except KeyboardInterrupt:
240 def getGenericAnswers(self, name, instruction, prompts):
243 oldout, oldin = sys.stdout, sys.stdin
244 sys.stdin = sys.stdout = open('/dev/tty','r+')
249 for prompt, echo in prompts:
251 responses.append(raw_input(prompt))
253 responses.append(getpass.getpass(prompt))
255 sys.stdout,sys.stdin=oldout,oldin
256 return defer.succeed(responses)