1 # -*- test-case-name: twisted.conch.test.test_knownhosts -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 An implementation of the OpenSSH known_hosts database.
11 from binascii import Error as DecodeError, b2a_base64
15 from zope.interface import implements
17 from twisted.python.randbytes import secureRandom
18 if sys.version_info >= (2, 5):
19 from twisted.python.hashlib import sha1
21 # We need to have an object with a method named 'new'.
24 from twisted.internet import defer
26 from twisted.python import log
27 from twisted.conch.interfaces import IKnownHostEntry
28 from twisted.conch.error import HostKeyChanged, UserRejectedKey, InvalidEntry
29 from twisted.conch.ssh.keys import Key, BadKeyError
34 Encode a binary string as base64 with no trailing newline.
36 return b2a_base64(s).strip()
40 def _extractCommon(string):
42 Extract common elements of base64 keys from an entry in a hosts file.
44 @return: a 4-tuple of hostname data (L{str}), ssh key type (L{str}), key
45 (L{Key}), and comment (L{str} or L{None}). The hostname data is simply the
46 beginning of the line up to the first occurrence of whitespace.
48 elements = string.split(None, 2)
49 if len(elements) != 3:
51 hostnames, keyType, keyAndComment = elements
52 splitkey = keyAndComment.split(None, 1)
53 if len(splitkey) == 2:
54 keyString, comment = splitkey
55 comment = comment.rstrip("\n")
57 keyString = splitkey[0]
59 key = Key.fromString(keyString.decode('base64'))
60 return hostnames, keyType, key, comment
64 class _BaseEntry(object):
66 Abstract base of both hashed and non-hashed entry objects, since they
67 represent keys and key types the same way.
69 @ivar keyType: The type of the key; either ssh-dss or ssh-rsa.
72 @ivar publicKey: The server public key indicated by this line.
73 @type publicKey: L{twisted.conch.ssh.keys.Key}
75 @ivar comment: Trailing garbage after the key line.
79 def __init__(self, keyType, publicKey, comment):
80 self.keyType = keyType
81 self.publicKey = publicKey
82 self.comment = comment
85 def matchesKey(self, keyObject):
87 Check to see if this entry matches a given key object.
89 @type keyObject: L{Key}
93 return self.publicKey == keyObject
97 class PlainEntry(_BaseEntry):
99 A L{PlainEntry} is a representation of a plain-text entry in a known_hosts
102 @ivar _hostnames: the list of all host-names associated with this entry.
103 @type _hostnames: L{list} of L{str}
106 implements(IKnownHostEntry)
108 def __init__(self, hostnames, keyType, publicKey, comment):
109 self._hostnames = hostnames
110 super(PlainEntry, self).__init__(keyType, publicKey, comment)
113 def fromString(cls, string):
115 Parse a plain-text entry in a known_hosts file, and return a
116 corresponding L{PlainEntry}.
118 @param string: a space-separated string formatted like "hostname
119 key-type base64-key-data comment".
123 @raise DecodeError: if the key is not valid encoded as valid base64.
125 @raise InvalidEntry: if the entry does not have the right number of
126 elements and is therefore invalid.
128 @raise BadKeyError: if the key, once decoded from base64, is not
131 @return: an IKnownHostEntry representing the hostname and key in the
134 @rtype: L{PlainEntry}
136 hostnames, keyType, key, comment = _extractCommon(string)
137 self = cls(hostnames.split(","), keyType, key, comment)
140 fromString = classmethod(fromString)
143 def matchesHost(self, hostname):
145 Check to see if this entry matches a given hostname.
147 @type hostname: L{str}
151 return hostname in self._hostnames
156 Implement L{IKnownHostEntry.toString} by recording the comma-separated
157 hostnames, key type, and base-64 encoded key.
159 fields = [','.join(self._hostnames),
161 _b64encode(self.publicKey.blob())]
162 if self.comment is not None:
163 fields.append(self.comment)
164 return ' '.join(fields)
167 class UnparsedEntry(object):
169 L{UnparsedEntry} is an entry in a L{KnownHostsFile} which can't actually be
170 parsed; therefore it matches no keys and no hosts.
173 implements(IKnownHostEntry)
175 def __init__(self, string):
177 Create an unparsed entry from a line in a known_hosts file which cannot
180 self._string = string
183 def matchesHost(self, hostname):
185 Always returns False.
190 def matchesKey(self, key):
192 Always returns False.
199 Returns the input line, without its newline if one was given.
201 return self._string.rstrip("\n")
205 def _hmacedString(key, string):
207 Return the SHA-1 HMAC hash of the given key and string.
209 hash = hmac.HMAC(key, digestmod=sha1)
215 class HashedEntry(_BaseEntry):
217 A L{HashedEntry} is a representation of an entry in a known_hosts file
218 where the hostname has been hashed and salted.
220 @ivar _hostSalt: the salt to combine with a hostname for hashing.
222 @ivar _hostHash: the hashed representation of the hostname.
224 @cvar MAGIC: the 'hash magic' string used to identify a hashed line in a
225 known_hosts file as opposed to a plaintext one.
228 implements(IKnownHostEntry)
232 def __init__(self, hostSalt, hostHash, keyType, publicKey, comment):
233 self._hostSalt = hostSalt
234 self._hostHash = hostHash
235 super(HashedEntry, self).__init__(keyType, publicKey, comment)
238 def fromString(cls, string):
240 Load a hashed entry from a string representing a line in a known_hosts
243 @raise DecodeError: if the key, the hostname, or the is not valid
244 encoded as valid base64
246 @raise InvalidEntry: if the entry does not have the right number of
247 elements and is therefore invalid, or the host/hash portion contains
248 more items than just the host and hash.
250 @raise BadKeyError: if the key, once decoded from base64, is not
253 stuff, keyType, key, comment = _extractCommon(string)
254 saltAndHash = stuff[len(cls.MAGIC):].split("|")
255 if len(saltAndHash) != 2:
257 hostSalt, hostHash = saltAndHash
258 self = cls(hostSalt.decode("base64"), hostHash.decode("base64"),
259 keyType, key, comment)
262 fromString = classmethod(fromString)
265 def matchesHost(self, hostname):
267 Implement L{IKnownHostEntry.matchesHost} to compare the hash of the
268 input to the stored hash.
270 return (_hmacedString(self._hostSalt, hostname) == self._hostHash)
275 Implement L{IKnownHostEntry.toString} by base64-encoding the salt, host
278 fields = [self.MAGIC + '|'.join([_b64encode(self._hostSalt),
279 _b64encode(self._hostHash)]),
281 _b64encode(self.publicKey.blob())]
282 if self.comment is not None:
283 fields.append(self.comment)
284 return ' '.join(fields)
288 class KnownHostsFile(object):
290 A structured representation of an OpenSSH-format ~/.ssh/known_hosts file.
292 @ivar _entries: a list of L{IKnownHostEntry} providers.
294 @ivar _savePath: the L{FilePath} to save new entries to.
297 def __init__(self, savePath):
299 Create a new, empty KnownHostsFile.
301 You want to use L{KnownHostsFile.fromPath} to parse one of these.
304 self._savePath = savePath
307 def hasHostKey(self, hostname, key):
309 @return: True if the given hostname and key are present in this file,
310 False if they are not.
314 @raise HostKeyChanged: if the host key found for the given hostname
315 does not match the given key.
317 for lineidx, entry in enumerate(self._entries):
318 if entry.matchesHost(hostname):
319 if entry.matchesKey(key):
322 raise HostKeyChanged(entry, self._savePath, lineidx + 1)
326 def verifyHostKey(self, ui, hostname, ip, key):
328 Verify the given host key for the given IP and host, asking for
329 confirmation from, and notifying, the given UI about changes to this
332 @param ui: The user interface to request an IP address from.
334 @param hostname: The hostname that the user requested to connect to.
336 @param ip: The string representation of the IP address that is actually
339 @param key: The public key of the server.
341 @return: a L{Deferred} that fires with True when the key has been
342 verified, or fires with an errback when the key either cannot be
343 verified or has changed.
347 hhk = defer.maybeDeferred(self.hasHostKey, hostname, key)
348 def gotHasKey(result):
350 if not self.hasHostKey(ip, key):
351 ui.warn("Warning: Permanently added the %s host key for "
352 "IP address '%s' to the list of known hosts." %
354 self.addHostKey(ip, key)
358 def promptResponse(response):
360 self.addHostKey(hostname, key)
361 self.addHostKey(ip, key)
365 raise UserRejectedKey()
367 "The authenticity of host '%s (%s)' "
368 "can't be established.\n"
369 "RSA key fingerprint is %s.\n"
370 "Are you sure you want to continue connecting (yes/no)? " %
371 (hostname, ip, key.fingerprint())).addCallback(promptResponse)
372 return hhk.addCallback(gotHasKey)
375 def addHostKey(self, hostname, key):
377 Add a new L{HashedEntry} to the key database.
379 Note that you still need to call L{KnownHostsFile.save} if you wish
380 these changes to be persisted.
382 @return: the L{HashedEntry} that was added.
384 salt = secureRandom(20)
385 keyType = "ssh-" + key.type().lower()
386 entry = HashedEntry(salt, _hmacedString(salt, hostname),
388 self._entries.append(entry)
394 Save this L{KnownHostsFile} to the path it was loaded from.
396 p = self._savePath.parent()
399 self._savePath.setContent('\n'.join(
400 [entry.toString() for entry in self._entries]) + "\n")
403 def fromPath(cls, path):
405 @param path: A path object to use for both reading contents from and
408 @type path: L{FilePath}
417 if line.startswith(HashedEntry.MAGIC):
418 entry = HashedEntry.fromString(line)
420 entry = PlainEntry.fromString(line)
421 except (DecodeError, InvalidEntry, BadKeyError):
422 entry = UnparsedEntry(line)
423 self._entries.append(entry)
426 fromPath = classmethod(fromPath)
429 class ConsoleUI(object):
431 A UI object that can ask true/false questions and post notifications on the
432 console, to be used during key verification.
434 @ivar opener: a no-argument callable which should open a console file-like
435 object to be used for reading and writing.
438 def __init__(self, opener):
442 def prompt(self, text):
444 Write the given text as a prompt to the console output, then read a
445 result from the console input.
447 @return: a L{Deferred} which fires with L{True} when the user answers
448 'yes' and L{False} when the user answers 'no'. It may errback if there
451 d = defer.succeed(None)
456 answer = f.readline().strip().lower()
464 f.write("Please type 'yes' or 'no': ")
465 return d.addCallback(body)
468 def warn(self, text):
470 Notify the user (non-interactively) of the provided text, by writing it