Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / conch / checkers.py
1 # -*- test-case-name: twisted.conch.test.test_checkers -*-
2 # Copyright (c) Twisted Matrix Laboratories.
3 # See LICENSE for details.
4
5 """
6 Provide L{ICredentialsChecker} implementations to be used in Conch protocols.
7 """
8
9 import os, base64, binascii, errno
10 try:
11     import pwd
12 except ImportError:
13     pwd = None
14 else:
15     import crypt
16
17 try:
18     # Python 2.5 got spwd to interface with shadow passwords
19     import spwd
20 except ImportError:
21     spwd = None
22     try:
23         import shadow
24     except ImportError:
25         shadow = None
26 else:
27     shadow = None
28
29 try:
30     from twisted.cred import pamauth
31 except ImportError:
32     pamauth = None
33
34 from zope.interface import implements, providedBy
35
36 from twisted.conch import error
37 from twisted.conch.ssh import keys
38 from twisted.cred.checkers import ICredentialsChecker
39 from twisted.cred.credentials import IUsernamePassword, ISSHPrivateKey
40 from twisted.cred.error import UnauthorizedLogin, UnhandledCredentials
41 from twisted.internet import defer
42 from twisted.python import failure, reflect, log
43 from twisted.python.util import runAsEffectiveUser
44 from twisted.python.filepath import FilePath
45
46
47
48 def verifyCryptedPassword(crypted, pw):
49     return crypt.crypt(pw, crypted) == crypted
50
51
52
53 def _pwdGetByName(username):
54     """
55     Look up a user in the /etc/passwd database using the pwd module.  If the
56     pwd module is not available, return None.
57
58     @param username: the username of the user to return the passwd database
59         information for.
60     """
61     if pwd is None:
62         return None
63     return pwd.getpwnam(username)
64
65
66
67 def _shadowGetByName(username):
68     """
69     Look up a user in the /etc/shadow database using the spwd or shadow
70     modules.  If neither module is available, return None.
71
72     @param username: the username of the user to return the shadow database
73         information for.
74     """
75     if spwd is not None:
76         f = spwd.getspnam
77     elif shadow is not None:
78         f = shadow.getspnam
79     else:
80         return None
81     return runAsEffectiveUser(0, 0, f, username)
82
83
84
85 class UNIXPasswordDatabase:
86     """
87     A checker which validates users out of the UNIX password databases, or
88     databases of a compatible format.
89
90     @ivar _getByNameFunctions: a C{list} of functions which are called in order
91         to valid a user.  The default value is such that the /etc/passwd
92         database will be tried first, followed by the /etc/shadow database.
93     """
94     credentialInterfaces = IUsernamePassword,
95     implements(ICredentialsChecker)
96
97
98     def __init__(self, getByNameFunctions=None):
99         if getByNameFunctions is None:
100             getByNameFunctions = [_pwdGetByName, _shadowGetByName]
101         self._getByNameFunctions = getByNameFunctions
102
103
104     def requestAvatarId(self, credentials):
105         for func in self._getByNameFunctions:
106             try:
107                 pwnam = func(credentials.username)
108             except KeyError:
109                 return defer.fail(UnauthorizedLogin("invalid username"))
110             else:
111                 if pwnam is not None:
112                     crypted = pwnam[1]
113                     if crypted == '':
114                         continue
115                     if verifyCryptedPassword(crypted, credentials.password):
116                         return defer.succeed(credentials.username)
117         # fallback
118         return defer.fail(UnauthorizedLogin("unable to verify password"))
119
120
121
122 class SSHPublicKeyDatabase:
123     """
124     Checker that authenticates SSH public keys, based on public keys listed in
125     authorized_keys and authorized_keys2 files in user .ssh/ directories.
126     """
127     implements(ICredentialsChecker)
128
129     credentialInterfaces = (ISSHPrivateKey,)
130
131     _userdb = pwd
132
133     def requestAvatarId(self, credentials):
134         d = defer.maybeDeferred(self.checkKey, credentials)
135         d.addCallback(self._cbRequestAvatarId, credentials)
136         d.addErrback(self._ebRequestAvatarId)
137         return d
138
139     def _cbRequestAvatarId(self, validKey, credentials):
140         """
141         Check whether the credentials themselves are valid, now that we know
142         if the key matches the user.
143
144         @param validKey: A boolean indicating whether or not the public key
145             matches a key in the user's authorized_keys file.
146
147         @param credentials: The credentials offered by the user.
148         @type credentials: L{ISSHPrivateKey} provider
149
150         @raise UnauthorizedLogin: (as a failure) if the key does not match the
151             user in C{credentials}. Also raised if the user provides an invalid
152             signature.
153
154         @raise ValidPublicKey: (as a failure) if the key matches the user but
155             the credentials do not include a signature. See
156             L{error.ValidPublicKey} for more information.
157
158         @return: The user's username, if authentication was successful.
159         """
160         if not validKey:
161             return failure.Failure(UnauthorizedLogin("invalid key"))
162         if not credentials.signature:
163             return failure.Failure(error.ValidPublicKey())
164         else:
165             try:
166                 pubKey = keys.Key.fromString(credentials.blob)
167                 if pubKey.verify(credentials.signature, credentials.sigData):
168                     return credentials.username
169             except: # any error should be treated as a failed login
170                 log.err()
171                 return failure.Failure(UnauthorizedLogin('error while verifying key'))
172         return failure.Failure(UnauthorizedLogin("unable to verify key"))
173
174
175     def getAuthorizedKeysFiles(self, credentials):
176         """
177         Return a list of L{FilePath} instances for I{authorized_keys} files
178         which might contain information about authorized keys for the given
179         credentials.
180
181         On OpenSSH servers, the default location of the file containing the
182         list of authorized public keys is
183         U{$HOME/.ssh/authorized_keys<http://www.openbsd.org/cgi-bin/man.cgi?query=sshd_config>}.
184
185         I{$HOME/.ssh/authorized_keys2} is also returned, though it has been
186         U{deprecated by OpenSSH since
187         2001<http://marc.info/?m=100508718416162>}.
188
189         @return: A list of L{FilePath} instances to files with the authorized keys.
190         """
191         pwent = self._userdb.getpwnam(credentials.username)
192         root = FilePath(pwent.pw_dir).child('.ssh')
193         files = ['authorized_keys', 'authorized_keys2']
194         return [root.child(f) for f in files]
195
196
197     def checkKey(self, credentials):
198         """
199         Retrieve files containing authorized keys and check against user
200         credentials.
201         """
202         uid, gid = os.geteuid(), os.getegid()
203         ouid, ogid = self._userdb.getpwnam(credentials.username)[2:4]
204         for filepath in self.getAuthorizedKeysFiles(credentials):
205             if not filepath.exists():
206                 continue
207             try:
208                 lines = filepath.open()
209             except IOError, e:
210                 if e.errno == errno.EACCES:
211                     lines = runAsEffectiveUser(ouid, ogid, filepath.open)
212                 else:
213                     raise
214             for l in lines:
215                 l2 = l.split()
216                 if len(l2) < 2:
217                     continue
218                 try:
219                     if base64.decodestring(l2[1]) == credentials.blob:
220                         return True
221                 except binascii.Error:
222                     continue
223         return False
224
225     def _ebRequestAvatarId(self, f):
226         if not f.check(UnauthorizedLogin):
227             log.msg(f)
228             return failure.Failure(UnauthorizedLogin("unable to get avatar id"))
229         return f
230
231
232 class SSHProtocolChecker:
233     """
234     SSHProtocolChecker is a checker that requires multiple authentications
235     to succeed.  To add a checker, call my registerChecker method with
236     the checker and the interface.
237
238     After each successful authenticate, I call my areDone method with the
239     avatar id.  To get a list of the successful credentials for an avatar id,
240     use C{SSHProcotolChecker.successfulCredentials[avatarId]}.  If L{areDone}
241     returns True, the authentication has succeeded.
242     """
243
244     implements(ICredentialsChecker)
245
246     def __init__(self):
247         self.checkers = {}
248         self.successfulCredentials = {}
249
250     def get_credentialInterfaces(self):
251         return self.checkers.keys()
252
253     credentialInterfaces = property(get_credentialInterfaces)
254
255     def registerChecker(self, checker, *credentialInterfaces):
256         if not credentialInterfaces:
257             credentialInterfaces = checker.credentialInterfaces
258         for credentialInterface in credentialInterfaces:
259             self.checkers[credentialInterface] = checker
260
261     def requestAvatarId(self, credentials):
262         """
263         Part of the L{ICredentialsChecker} interface.  Called by a portal with
264         some credentials to check if they'll authenticate a user.  We check the
265         interfaces that the credentials provide against our list of acceptable
266         checkers.  If one of them matches, we ask that checker to verify the
267         credentials.  If they're valid, we call our L{_cbGoodAuthentication}
268         method to continue.
269
270         @param credentials: the credentials the L{Portal} wants us to verify
271         """
272         ifac = providedBy(credentials)
273         for i in ifac:
274             c = self.checkers.get(i)
275             if c is not None:
276                 d = defer.maybeDeferred(c.requestAvatarId, credentials)
277                 return d.addCallback(self._cbGoodAuthentication,
278                         credentials)
279         return defer.fail(UnhandledCredentials("No checker for %s" % \
280             ', '.join(map(reflect.qual, ifac))))
281
282     def _cbGoodAuthentication(self, avatarId, credentials):
283         """
284         Called if a checker has verified the credentials.  We call our
285         L{areDone} method to see if the whole of the successful authentications
286         are enough.  If they are, we return the avatar ID returned by the first
287         checker.
288         """
289         if avatarId not in self.successfulCredentials:
290             self.successfulCredentials[avatarId] = []
291         self.successfulCredentials[avatarId].append(credentials)
292         if self.areDone(avatarId):
293             del self.successfulCredentials[avatarId]
294             return avatarId
295         else:
296             raise error.NotEnoughAuthentication()
297
298     def areDone(self, avatarId):
299         """
300         Override to determine if the authentication is finished for a given
301         avatarId.
302
303         @param avatarId: the avatar returned by the first checker.  For
304             this checker to function correctly, all the checkers must
305             return the same avatar ID.
306         """
307         return True
308