1 # -*- test-case-name: twisted.words.test.test_jabbersaslmechanisms -*-
3 # Copyright (c) Twisted Matrix Laboratories.
4 # See LICENSE for details.
7 Protocol agnostic implementations of SASL authentication mechanisms.
10 import binascii, random, time, os
12 from zope.interface import Interface, Attribute, implements
14 from twisted.python.hashlib import md5
16 class ISASLMechanism(Interface):
17 name = Attribute("""Common name for the SASL Mechanism.""")
19 def getInitialResponse():
21 Get the initial client response, if defined for this mechanism.
23 @return: initial client response string.
28 def getResponse(challenge):
30 Get the response to a server challenge.
32 @param challenge: server challenge.
33 @type challenge: C{str}.
34 @return: client response.
40 class Anonymous(object):
42 Implements the ANONYMOUS SASL authentication mechanism.
44 This mechanism is defined in RFC 2245.
46 implements(ISASLMechanism)
49 def getInitialResponse(self):
56 Implements the PLAIN SASL authentication mechanism.
58 The PLAIN SASL authentication mechanism is defined in RFC 2595.
60 implements(ISASLMechanism)
64 def __init__(self, authzid, authcid, password):
65 self.authzid = authzid or ''
66 self.authcid = authcid or ''
67 self.password = password or ''
70 def getInitialResponse(self):
71 return "%s\x00%s\x00%s" % (self.authzid.encode('utf-8'),
72 self.authcid.encode('utf-8'),
73 self.password.encode('utf-8'))
77 class DigestMD5(object):
79 Implements the DIGEST-MD5 SASL authentication mechanism.
81 The DIGEST-MD5 SASL authentication mechanism is defined in RFC 2831.
83 implements(ISASLMechanism)
87 def __init__(self, serv_type, host, serv_name, username, password):
88 self.username = username
89 self.password = password
90 self.defaultRealm = host
92 self.digest_uri = '%s/%s' % (serv_type, host)
93 if serv_name is not None:
94 self.digest_uri += '/%s' % serv_name
97 def getInitialResponse(self):
101 def getResponse(self, challenge):
102 directives = self._parse(challenge)
104 # Compat for implementations that do not send this along with
105 # a succesful authentication.
106 if 'rspauth' in directives:
110 realm = directives['realm']
112 realm = self.defaultRealm
114 return self._gen_response(directives['charset'],
118 def _parse(self, challenge):
120 Parses the server challenge.
122 Splits the challenge into a dictionary of directives with values.
124 @return: challenge directives and their values.
125 @rtype: C{dict} of C{str} to C{str}.
130 remainingParams = True
131 while remainingParams:
132 # Parse a param. We can't just split on commas, because there can
133 # be some commas inside (quoted) param values, e.g.:
134 # qop="auth,auth-int"
136 middle = s.index("=", cur)
137 name = s[cur:middle].lstrip()
141 end = s.index('"', middle)
142 value = s[middle:end]
143 cur = s.find(',', end) + 1
145 remainingParams = False
147 end = s.find(',', middle)
149 value = s[middle:].rstrip()
150 remainingParams = False
152 value = s[middle:end].rstrip()
154 paramDict[name] = value
156 for param in ('qop', 'cipher'):
157 if param in paramDict:
158 paramDict[param] = paramDict[param].split(',')
162 def _unparse(self, directives):
164 Create message string from directives.
166 @param directives: dictionary of directives (names to their values).
167 For certain directives, extra quotes are added, as
169 @type directives: C{dict} of C{str} to C{str}
170 @return: message string.
175 for name, value in directives.iteritems():
176 if name in ('username', 'realm', 'cnonce',
177 'nonce', 'digest-uri', 'authzid', 'cipher'):
178 directive = '%s="%s"' % (name, value)
180 directive = '%s=%s' % (name, value)
182 directive_list.append(directive)
184 return ','.join(directive_list)
187 def _gen_response(self, charset, realm, nonce):
189 Generate response-value.
191 Creates a response to a challenge according to section 2.1.2.1 of
192 RFC 2831 using the C{charset}, C{realm} and C{nonce} directives
197 return md5(s).digest()
200 return binascii.b2a_hex(n)
203 return H('%s:%s' % (k, s))
206 username = self.username.encode(charset)
207 password = self.password.encode(charset)
209 # TODO - add error checking
212 nc = '%08x' % 1 # TODO: support subsequent auth.
213 cnonce = self._gen_nonce()
216 # TODO - add support for authzid
217 a1 = "%s:%s:%s" % (H("%s:%s:%s" % (username, realm, password)),
220 a2 = "AUTHENTICATE:%s" % self.digest_uri
222 response = HEX( KD ( HEX(H(a1)),
223 "%s:%s:%s:%s:%s" % (nonce, nc,
224 cnonce, "auth", HEX(H(a2)))))
226 directives = {'username': username,
232 'digest-uri': self.digest_uri,
233 'response': response,
236 return self._unparse(directives)
239 def _gen_nonce(self):
240 return md5("%s:%s:%s" % (str(random.random()) , str(time.gmtime()),str(os.getpid()))).hexdigest()