Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / words / protocols / jabber / sasl_mechanisms.py
1 # -*- test-case-name: twisted.words.test.test_jabbersaslmechanisms -*-
2 #
3 # Copyright (c) Twisted Matrix Laboratories.
4 # See LICENSE for details.
5
6 """
7 Protocol agnostic implementations of SASL authentication mechanisms.
8 """
9
10 import binascii, random, time, os
11
12 from zope.interface import Interface, Attribute, implements
13
14 from twisted.python.hashlib import md5
15
16 class ISASLMechanism(Interface):
17     name = Attribute("""Common name for the SASL Mechanism.""")
18
19     def getInitialResponse():
20         """
21         Get the initial client response, if defined for this mechanism.
22
23         @return: initial client response string.
24         @rtype: C{str}.
25         """
26
27
28     def getResponse(challenge):
29         """
30         Get the response to a server challenge.
31
32         @param challenge: server challenge.
33         @type challenge: C{str}.
34         @return: client response.
35         @rtype: C{str}.
36         """
37
38
39
40 class Anonymous(object):
41     """
42     Implements the ANONYMOUS SASL authentication mechanism.
43
44     This mechanism is defined in RFC 2245.
45     """
46     implements(ISASLMechanism)
47     name = 'ANONYMOUS'
48
49     def getInitialResponse(self):
50         return None
51
52
53
54 class Plain(object):
55     """
56     Implements the PLAIN SASL authentication mechanism.
57
58     The PLAIN SASL authentication mechanism is defined in RFC 2595.
59     """
60     implements(ISASLMechanism)
61
62     name = 'PLAIN'
63
64     def __init__(self, authzid, authcid, password):
65         self.authzid = authzid or ''
66         self.authcid = authcid or ''
67         self.password = password or ''
68
69
70     def getInitialResponse(self):
71         return "%s\x00%s\x00%s" % (self.authzid.encode('utf-8'),
72                                    self.authcid.encode('utf-8'),
73                                    self.password.encode('utf-8'))
74
75
76
77 class DigestMD5(object):
78     """
79     Implements the DIGEST-MD5 SASL authentication mechanism.
80
81     The DIGEST-MD5 SASL authentication mechanism is defined in RFC 2831.
82     """
83     implements(ISASLMechanism)
84
85     name = 'DIGEST-MD5'
86
87     def __init__(self, serv_type, host, serv_name, username, password):
88         self.username = username
89         self.password = password
90         self.defaultRealm = host
91
92         self.digest_uri = '%s/%s' % (serv_type, host)
93         if serv_name is not None:
94             self.digest_uri += '/%s' % serv_name
95
96
97     def getInitialResponse(self):
98         return None
99
100
101     def getResponse(self, challenge):
102         directives = self._parse(challenge)
103
104         # Compat for implementations that do not send this along with
105         # a succesful authentication.
106         if 'rspauth' in directives:
107             return ''
108
109         try:
110             realm = directives['realm']
111         except KeyError:
112             realm = self.defaultRealm
113
114         return self._gen_response(directives['charset'],
115                                   realm,
116                                   directives['nonce'])
117
118     def _parse(self, challenge):
119         """
120         Parses the server challenge.
121
122         Splits the challenge into a dictionary of directives with values.
123
124         @return: challenge directives and their values.
125         @rtype: C{dict} of C{str} to C{str}.
126         """
127         s = challenge
128         paramDict = {}
129         cur = 0
130         remainingParams = True
131         while remainingParams:
132             # Parse a param. We can't just split on commas, because there can
133             # be some commas inside (quoted) param values, e.g.:
134             # qop="auth,auth-int"
135
136             middle = s.index("=", cur)
137             name = s[cur:middle].lstrip()
138             middle += 1
139             if s[middle] == '"':
140                 middle += 1
141                 end = s.index('"', middle)
142                 value = s[middle:end]
143                 cur = s.find(',', end) + 1
144                 if cur == 0:
145                     remainingParams = False
146             else:
147                 end = s.find(',', middle)
148                 if end == -1:
149                     value = s[middle:].rstrip()
150                     remainingParams = False
151                 else:
152                     value = s[middle:end].rstrip()
153                 cur = end + 1
154             paramDict[name] = value
155
156         for param in ('qop', 'cipher'):
157             if param in paramDict:
158                 paramDict[param] = paramDict[param].split(',')
159
160         return paramDict
161
162     def _unparse(self, directives):
163         """
164         Create message string from directives.
165
166         @param directives: dictionary of directives (names to their values).
167                            For certain directives, extra quotes are added, as
168                            needed.
169         @type directives: C{dict} of C{str} to C{str}
170         @return: message string.
171         @rtype: C{str}.
172         """
173
174         directive_list = []
175         for name, value in directives.iteritems():
176             if name in ('username', 'realm', 'cnonce',
177                         'nonce', 'digest-uri', 'authzid', 'cipher'):
178                 directive = '%s="%s"' % (name, value)
179             else:
180                 directive = '%s=%s' % (name, value)
181
182             directive_list.append(directive)
183
184         return ','.join(directive_list)
185
186
187     def _gen_response(self, charset, realm, nonce):
188         """
189         Generate response-value.
190
191         Creates a response to a challenge according to section 2.1.2.1 of
192         RFC 2831 using the C{charset}, C{realm} and C{nonce} directives
193         from the challenge.
194         """
195
196         def H(s):
197             return md5(s).digest()
198
199         def HEX(n):
200             return binascii.b2a_hex(n)
201
202         def KD(k, s):
203             return H('%s:%s' % (k, s))
204
205         try:
206             username = self.username.encode(charset)
207             password = self.password.encode(charset)
208         except UnicodeError:
209             # TODO - add error checking
210             raise
211
212         nc = '%08x' % 1 # TODO: support subsequent auth.
213         cnonce = self._gen_nonce()
214         qop = 'auth'
215
216         # TODO - add support for authzid
217         a1 = "%s:%s:%s" % (H("%s:%s:%s" % (username, realm, password)),
218                            nonce,
219                            cnonce)
220         a2 = "AUTHENTICATE:%s" % self.digest_uri
221
222         response = HEX( KD ( HEX(H(a1)),
223                              "%s:%s:%s:%s:%s" % (nonce, nc,
224                                                  cnonce, "auth", HEX(H(a2)))))
225
226         directives = {'username': username,
227                       'realm' : realm,
228                       'nonce' : nonce,
229                       'cnonce' : cnonce,
230                       'nc' : nc,
231                       'qop' : qop,
232                       'digest-uri': self.digest_uri,
233                       'response': response,
234                       'charset': charset}
235
236         return self._unparse(directives)
237
238
239     def _gen_nonce(self):
240         return md5("%s:%s:%s" % (str(random.random()) , str(time.gmtime()),str(os.getpid()))).hexdigest()