1 # -*- test-case-name: twisted.conch.test.test_checkers -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
6 Provide L{ICredentialsChecker} implementations to be used in Conch protocols.
9 import os, base64, binascii, errno
18 # Python 2.5 got spwd to interface with shadow passwords
30 from twisted.cred import pamauth
34 from zope.interface import implements, providedBy
36 from twisted.conch import error
37 from twisted.conch.ssh import keys
38 from twisted.cred.checkers import ICredentialsChecker
39 from twisted.cred.credentials import IUsernamePassword, ISSHPrivateKey
40 from twisted.cred.error import UnauthorizedLogin, UnhandledCredentials
41 from twisted.internet import defer
42 from twisted.python import failure, reflect, log
43 from twisted.python.util import runAsEffectiveUser
44 from twisted.python.filepath import FilePath
48 def verifyCryptedPassword(crypted, pw):
49 return crypt.crypt(pw, crypted) == crypted
53 def _pwdGetByName(username):
55 Look up a user in the /etc/passwd database using the pwd module. If the
56 pwd module is not available, return None.
58 @param username: the username of the user to return the passwd database
63 return pwd.getpwnam(username)
67 def _shadowGetByName(username):
69 Look up a user in the /etc/shadow database using the spwd or shadow
70 modules. If neither module is available, return None.
72 @param username: the username of the user to return the shadow database
77 elif shadow is not None:
81 return runAsEffectiveUser(0, 0, f, username)
85 class UNIXPasswordDatabase:
87 A checker which validates users out of the UNIX password databases, or
88 databases of a compatible format.
90 @ivar _getByNameFunctions: a C{list} of functions which are called in order
91 to valid a user. The default value is such that the /etc/passwd
92 database will be tried first, followed by the /etc/shadow database.
94 credentialInterfaces = IUsernamePassword,
95 implements(ICredentialsChecker)
98 def __init__(self, getByNameFunctions=None):
99 if getByNameFunctions is None:
100 getByNameFunctions = [_pwdGetByName, _shadowGetByName]
101 self._getByNameFunctions = getByNameFunctions
104 def requestAvatarId(self, credentials):
105 for func in self._getByNameFunctions:
107 pwnam = func(credentials.username)
109 return defer.fail(UnauthorizedLogin("invalid username"))
111 if pwnam is not None:
115 if verifyCryptedPassword(crypted, credentials.password):
116 return defer.succeed(credentials.username)
118 return defer.fail(UnauthorizedLogin("unable to verify password"))
122 class SSHPublicKeyDatabase:
124 Checker that authenticates SSH public keys, based on public keys listed in
125 authorized_keys and authorized_keys2 files in user .ssh/ directories.
127 implements(ICredentialsChecker)
129 credentialInterfaces = (ISSHPrivateKey,)
133 def requestAvatarId(self, credentials):
134 d = defer.maybeDeferred(self.checkKey, credentials)
135 d.addCallback(self._cbRequestAvatarId, credentials)
136 d.addErrback(self._ebRequestAvatarId)
139 def _cbRequestAvatarId(self, validKey, credentials):
141 Check whether the credentials themselves are valid, now that we know
142 if the key matches the user.
144 @param validKey: A boolean indicating whether or not the public key
145 matches a key in the user's authorized_keys file.
147 @param credentials: The credentials offered by the user.
148 @type credentials: L{ISSHPrivateKey} provider
150 @raise UnauthorizedLogin: (as a failure) if the key does not match the
151 user in C{credentials}. Also raised if the user provides an invalid
154 @raise ValidPublicKey: (as a failure) if the key matches the user but
155 the credentials do not include a signature. See
156 L{error.ValidPublicKey} for more information.
158 @return: The user's username, if authentication was successful.
161 return failure.Failure(UnauthorizedLogin("invalid key"))
162 if not credentials.signature:
163 return failure.Failure(error.ValidPublicKey())
166 pubKey = keys.Key.fromString(credentials.blob)
167 if pubKey.verify(credentials.signature, credentials.sigData):
168 return credentials.username
169 except: # any error should be treated as a failed login
171 return failure.Failure(UnauthorizedLogin('error while verifying key'))
172 return failure.Failure(UnauthorizedLogin("unable to verify key"))
175 def getAuthorizedKeysFiles(self, credentials):
177 Return a list of L{FilePath} instances for I{authorized_keys} files
178 which might contain information about authorized keys for the given
181 On OpenSSH servers, the default location of the file containing the
182 list of authorized public keys is
183 U{$HOME/.ssh/authorized_keys<http://www.openbsd.org/cgi-bin/man.cgi?query=sshd_config>}.
185 I{$HOME/.ssh/authorized_keys2} is also returned, though it has been
186 U{deprecated by OpenSSH since
187 2001<http://marc.info/?m=100508718416162>}.
189 @return: A list of L{FilePath} instances to files with the authorized keys.
191 pwent = self._userdb.getpwnam(credentials.username)
192 root = FilePath(pwent.pw_dir).child('.ssh')
193 files = ['authorized_keys', 'authorized_keys2']
194 return [root.child(f) for f in files]
197 def checkKey(self, credentials):
199 Retrieve files containing authorized keys and check against user
202 uid, gid = os.geteuid(), os.getegid()
203 ouid, ogid = self._userdb.getpwnam(credentials.username)[2:4]
204 for filepath in self.getAuthorizedKeysFiles(credentials):
205 if not filepath.exists():
208 lines = filepath.open()
210 if e.errno == errno.EACCES:
211 lines = runAsEffectiveUser(ouid, ogid, filepath.open)
219 if base64.decodestring(l2[1]) == credentials.blob:
221 except binascii.Error:
225 def _ebRequestAvatarId(self, f):
226 if not f.check(UnauthorizedLogin):
228 return failure.Failure(UnauthorizedLogin("unable to get avatar id"))
232 class SSHProtocolChecker:
234 SSHProtocolChecker is a checker that requires multiple authentications
235 to succeed. To add a checker, call my registerChecker method with
236 the checker and the interface.
238 After each successful authenticate, I call my areDone method with the
239 avatar id. To get a list of the successful credentials for an avatar id,
240 use C{SSHProcotolChecker.successfulCredentials[avatarId]}. If L{areDone}
241 returns True, the authentication has succeeded.
244 implements(ICredentialsChecker)
248 self.successfulCredentials = {}
250 def get_credentialInterfaces(self):
251 return self.checkers.keys()
253 credentialInterfaces = property(get_credentialInterfaces)
255 def registerChecker(self, checker, *credentialInterfaces):
256 if not credentialInterfaces:
257 credentialInterfaces = checker.credentialInterfaces
258 for credentialInterface in credentialInterfaces:
259 self.checkers[credentialInterface] = checker
261 def requestAvatarId(self, credentials):
263 Part of the L{ICredentialsChecker} interface. Called by a portal with
264 some credentials to check if they'll authenticate a user. We check the
265 interfaces that the credentials provide against our list of acceptable
266 checkers. If one of them matches, we ask that checker to verify the
267 credentials. If they're valid, we call our L{_cbGoodAuthentication}
270 @param credentials: the credentials the L{Portal} wants us to verify
272 ifac = providedBy(credentials)
274 c = self.checkers.get(i)
276 d = defer.maybeDeferred(c.requestAvatarId, credentials)
277 return d.addCallback(self._cbGoodAuthentication,
279 return defer.fail(UnhandledCredentials("No checker for %s" % \
280 ', '.join(map(reflect.qual, ifac))))
282 def _cbGoodAuthentication(self, avatarId, credentials):
284 Called if a checker has verified the credentials. We call our
285 L{areDone} method to see if the whole of the successful authentications
286 are enough. If they are, we return the avatar ID returned by the first
289 if avatarId not in self.successfulCredentials:
290 self.successfulCredentials[avatarId] = []
291 self.successfulCredentials[avatarId].append(credentials)
292 if self.areDone(avatarId):
293 del self.successfulCredentials[avatarId]
296 raise error.NotEnoughAuthentication()
298 def areDone(self, avatarId):
300 Override to determine if the authentication is finished for a given
303 @param avatarId: the avatar returned by the first checker. For
304 this checker to function correctly, all the checkers must
305 return the same avatar ID.