1 # -*- test-case-name: twisted.test.test_newcred-*-
3 # Copyright (c) Twisted Matrix Laboratories.
4 # See LICENSE for details.
7 from zope.interface import implements, Interface
9 import hmac, time, random
10 from twisted.python.hashlib import md5
11 from twisted.python.randbytes import secureRandom
12 from twisted.cred._digest import calcResponse, calcHA1, calcHA2
13 from twisted.cred import error
15 class ICredentials(Interface):
19 Implementors _must_ specify which sub-interfaces of ICredentials
20 to which it conforms, using zope.interface.implements().
25 class IUsernameDigestHash(ICredentials):
27 This credential is used when a CredentialChecker has access to the hash
28 of the username:realm:password as in an Apache .htdigest file.
30 def checkHash(digestHash):
32 @param digestHash: The hashed username:realm:password to check against.
34 @return: C{True} if the credentials represented by this object match
35 the given hash, C{False} if they do not, or a L{Deferred} which
36 will be called back with one of these values.
41 class IUsernameHashedPassword(ICredentials):
43 I encapsulate a username and a hashed password.
45 This credential is used when a hashed password is received from the
46 party requesting authentication. CredentialCheckers which check this
47 kind of credential must store the passwords in plaintext (or as
48 password-equivalent hashes) form so that they can be hashed in a manner
49 appropriate for the particular credentials class.
51 @type username: C{str}
52 @ivar username: The username associated with these credentials.
55 def checkPassword(password):
57 Validate these credentials against the correct password.
59 @type password: C{str}
60 @param password: The correct, plaintext password against which to
63 @rtype: C{bool} or L{Deferred}
64 @return: C{True} if the credentials represented by this object match the
65 given password, C{False} if they do not, or a L{Deferred} which will
66 be called back with one of these values.
71 class IUsernamePassword(ICredentials):
73 I encapsulate a username and a plaintext password.
75 This encapsulates the case where the password received over the network
76 has been hashed with the identity function (That is, not at all). The
77 CredentialsChecker may store the password in whatever format it desires,
78 it need only transform the stored password in a similar way before
79 performing the comparison.
81 @type username: C{str}
82 @ivar username: The username associated with these credentials.
84 @type password: C{str}
85 @ivar password: The password associated with these credentials.
88 def checkPassword(password):
90 Validate these credentials against the correct password.
92 @type password: C{str}
93 @param password: The correct, plaintext password against which to
96 @rtype: C{bool} or L{Deferred}
97 @return: C{True} if the credentials represented by this object match the
98 given password, C{False} if they do not, or a L{Deferred} which will
99 be called back with one of these values.
104 class IAnonymous(ICredentials):
106 I am an explicitly anonymous request for access.
111 class DigestedCredentials(object):
113 Yet Another Simple HTTP Digest authentication scheme.
115 implements(IUsernameHashedPassword, IUsernameDigestHash)
117 def __init__(self, username, method, realm, fields):
118 self.username = username
124 def checkPassword(self, password):
126 Verify that the credentials represented by this object agree with the
127 given plaintext C{password} by hashing C{password} in the same way the
128 response hash represented by this object was generated and comparing
131 response = self.fields.get('response')
132 uri = self.fields.get('uri')
133 nonce = self.fields.get('nonce')
134 cnonce = self.fields.get('cnonce')
135 nc = self.fields.get('nc')
136 algo = self.fields.get('algorithm', 'md5').lower()
137 qop = self.fields.get('qop', 'auth')
139 expected = calcResponse(
140 calcHA1(algo, self.username, self.realm, password, nonce, cnonce),
141 calcHA2(algo, self.method, uri, qop, None),
142 algo, nonce, nc, cnonce, qop)
144 return expected == response
147 def checkHash(self, digestHash):
149 Verify that the credentials represented by this object agree with the
150 credentials represented by the I{H(A1)} given in C{digestHash}.
152 @param digestHash: A precomputed H(A1) value based on the username,
153 realm, and password associate with this credentials object.
155 response = self.fields.get('response')
156 uri = self.fields.get('uri')
157 nonce = self.fields.get('nonce')
158 cnonce = self.fields.get('cnonce')
159 nc = self.fields.get('nc')
160 algo = self.fields.get('algorithm', 'md5').lower()
161 qop = self.fields.get('qop', 'auth')
163 expected = calcResponse(
164 calcHA1(algo, None, None, None, nonce, cnonce, preHA1=digestHash),
165 calcHA2(algo, self.method, uri, qop, None),
166 algo, nonce, nc, cnonce, qop)
168 return expected == response
172 class DigestCredentialFactory(object):
174 Support for RFC2617 HTTP Digest Authentication
176 @cvar CHALLENGE_LIFETIME_SECS: The number of seconds for which an
177 opaque should be valid.
179 @type privateKey: C{str}
180 @ivar privateKey: A random string used for generating the secure opaque.
182 @type algorithm: C{str}
183 @param algorithm: Case insensitive string specifying the hash algorithm to
184 use. Must be either C{'md5'} or C{'sha'}. C{'md5-sess'} is B{not}
187 @type authenticationRealm: C{str}
188 @param authenticationRealm: case sensitive string that specifies the realm
189 portion of the challenge
192 CHALLENGE_LIFETIME_SECS = 15 * 60 # 15 minutes
196 def __init__(self, algorithm, authenticationRealm):
197 self.algorithm = algorithm
198 self.authenticationRealm = authenticationRealm
199 self.privateKey = secureRandom(12)
202 def getChallenge(self, address):
204 Generate the challenge for use in the WWW-Authenticate header.
206 @param address: The client address to which this challenge is being
209 @return: The C{dict} that can be used to generate a WWW-Authenticate
212 c = self._generateNonce()
213 o = self._generateOpaque(c, address)
218 'algorithm': self.algorithm,
219 'realm': self.authenticationRealm}
222 def _generateNonce(self):
224 Create a random value suitable for use as the nonce parameter of a
225 WWW-Authenticate challenge.
229 return secureRandom(12).encode('hex')
234 Parameterize the time based seed used in C{_generateOpaque}
235 so we can deterministically unittest it's behavior.
240 def _generateOpaque(self, nonce, clientip):
242 Generate an opaque to be returned to the client. This is a unique
243 string that can be returned to us and verified.
245 # Now, what we do is encode the nonce, client ip and a timestamp in the
246 # opaque value with a suitable digest.
247 now = str(int(self._getTime()))
250 key = "%s,%s,%s" % (nonce, clientip, now)
251 digest = md5(key + self.privateKey).hexdigest()
252 ekey = key.encode('base64')
253 return "%s-%s" % (digest, ekey.replace('\n', ''))
256 def _verifyOpaque(self, opaque, nonce, clientip):
258 Given the opaque and nonce from the request, as well as the client IP
259 that made the request, verify that the opaque was generated by us.
260 And that it's not too old.
262 @param opaque: The opaque value from the Digest response
263 @param nonce: The nonce value from the Digest response
264 @param clientip: The remote IP address of the client making the request
265 or C{None} if the request was submitted over a channel where this
268 @return: C{True} if the opaque was successfully verified.
270 @raise error.LoginFailed: if C{opaque} could not be parsed or
271 contained the wrong values.
273 # First split the digest from the key
274 opaqueParts = opaque.split('-')
275 if len(opaqueParts) != 2:
276 raise error.LoginFailed('Invalid response, invalid opaque value')
282 key = opaqueParts[1].decode('base64')
283 keyParts = key.split(',')
285 if len(keyParts) != 3:
286 raise error.LoginFailed('Invalid response, invalid opaque value')
288 if keyParts[0] != nonce:
289 raise error.LoginFailed(
290 'Invalid response, incompatible opaque/nonce values')
292 if keyParts[1] != clientip:
293 raise error.LoginFailed(
294 'Invalid response, incompatible opaque/client values')
297 when = int(keyParts[2])
299 raise error.LoginFailed(
300 'Invalid response, invalid opaque/time values')
302 if (int(self._getTime()) - when >
303 DigestCredentialFactory.CHALLENGE_LIFETIME_SECS):
305 raise error.LoginFailed(
306 'Invalid response, incompatible opaque/nonce too old')
309 digest = md5(key + self.privateKey).hexdigest()
310 if digest != opaqueParts[0]:
311 raise error.LoginFailed('Invalid response, invalid opaque value')
316 def decode(self, response, method, host):
318 Decode the given response and attempt to generate a
319 L{DigestedCredentials} from it.
321 @type response: C{str}
322 @param response: A string of comma seperated key=value pairs
325 @param method: The action requested to which this response is addressed
326 (GET, POST, INVITE, OPTIONS, etc).
329 @param host: The address the request was sent from.
331 @raise error.LoginFailed: If the response does not contain a username,
332 a nonce, an opaque, or if the opaque is invalid.
334 @return: L{DigestedCredentials}
337 if s[0] == s[-1] == '"':
340 response = ' '.join(response.splitlines())
341 parts = response.split(',')
345 for (k, v) in [p.split('=', 1) for p in parts]:
346 auth[k.strip()] = unq(v.strip())
348 username = auth.get('username')
350 raise error.LoginFailed('Invalid response, no username given.')
352 if 'opaque' not in auth:
353 raise error.LoginFailed('Invalid response, no opaque given.')
355 if 'nonce' not in auth:
356 raise error.LoginFailed('Invalid response, no nonce given.')
358 # Now verify the nonce/opaque values for this client
359 if self._verifyOpaque(auth.get('opaque'), auth.get('nonce'), host):
360 return DigestedCredentials(username,
362 self.authenticationRealm,
367 class CramMD5Credentials:
368 implements(IUsernameHashedPassword)
373 def __init__(self, host=None):
376 def getChallenge(self):
378 return self.challenge
379 # The data encoded in the first ready response contains an
380 # presumptively arbitrary string of random digits, a timestamp, and
381 # the fully-qualified primary host name of the server. The syntax of
382 # the unencoded form must correspond to that of an RFC 822 'msg-id'
383 # [RFC822] as described in [POP3].
385 r = random.randrange(0x7fffffff)
387 self.challenge = '<%d.%d@%s>' % (r, t, self.host)
388 return self.challenge
390 def setResponse(self, response):
391 self.username, self.response = response.split(None, 1)
393 def moreChallenges(self):
396 def checkPassword(self, password):
397 verify = hmac.HMAC(password, self.challenge).hexdigest()
398 return verify == self.response
401 class UsernameHashedPassword:
402 implements(IUsernameHashedPassword)
404 def __init__(self, username, hashed):
405 self.username = username
408 def checkPassword(self, password):
409 return self.hashed == password
412 class UsernamePassword:
413 implements(IUsernamePassword)
415 def __init__(self, username, password):
416 self.username = username
417 self.password = password
419 def checkPassword(self, password):
420 return self.password == password
424 implements(IAnonymous)
428 class ISSHPrivateKey(ICredentials):
430 L{ISSHPrivateKey} credentials encapsulate an SSH public key to be checked
431 against a user's private key.
433 @ivar username: The username associated with these credentials.
434 @type username: C{str}
436 @ivar algName: The algorithm name for the blob.
437 @type algName: C{str}
439 @ivar blob: The public key blob as sent by the client.
442 @ivar sigData: The data the signature was made from.
443 @type sigData: C{str}
445 @ivar signature: The signed data. This is checked to verify that the user
446 owns the private key.
447 @type signature: C{str} or C{NoneType}
453 implements(ISSHPrivateKey)
454 def __init__(self, username, algName, blob, sigData, signature):
455 self.username = username
456 self.algName = algName
458 self.sigData = sigData
459 self.signature = signature
462 class IPluggableAuthenticationModules(ICredentials):
463 """I encapsulate the authentication of a user via PAM (Pluggable
464 Authentication Modules. I use PyPAM (available from
465 http://www.tummy.com/Software/PyPam/index.html).
467 @ivar username: The username for the user being logged in.
469 @ivar pamConversion: A function that is called with a list of tuples
470 (message, messageType). See the PAM documentation
471 for the meaning of messageType. The function
472 returns a Deferred which will fire with a list
473 of (response, 0), one for each message. The 0 is
474 currently unused, but is required by the PAM library.
477 class PluggableAuthenticationModules:
478 implements(IPluggableAuthenticationModules)
480 def __init__(self, username, pamConversion):
481 self.username = username
482 self.pamConversion = pamConversion