Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / web / test / test_httpauth.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for L{twisted.web._auth}.
6 """
7
8
9 from zope.interface import implements
10 from zope.interface.verify import verifyObject
11
12 from twisted.trial import unittest
13
14 from twisted.python.failure import Failure
15 from twisted.internet.error import ConnectionDone
16 from twisted.internet.address import IPv4Address
17
18 from twisted.cred import error, portal
19 from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
20 from twisted.cred.checkers import ANONYMOUS, AllowAnonymousAccess
21 from twisted.cred.credentials import IUsernamePassword
22
23 from twisted.web.iweb import ICredentialFactory
24 from twisted.web.resource import IResource, Resource, getChildForRequest
25 from twisted.web._auth import basic, digest
26 from twisted.web._auth.wrapper import HTTPAuthSessionWrapper, UnauthorizedResource
27 from twisted.web._auth.basic import BasicCredentialFactory
28
29 from twisted.web.server import NOT_DONE_YET
30 from twisted.web.static import Data
31
32 from twisted.web.test.test_web import DummyRequest
33
34
35 def b64encode(s):
36     return s.encode('base64').strip()
37
38
39 class BasicAuthTestsMixin:
40     """
41     L{TestCase} mixin class which defines a number of tests for
42     L{basic.BasicCredentialFactory}.  Because this mixin defines C{setUp}, it
43     must be inherited before L{TestCase}.
44     """
45     def setUp(self):
46         self.request = self.makeRequest()
47         self.realm = 'foo'
48         self.username = 'dreid'
49         self.password = 'S3CuR1Ty'
50         self.credentialFactory = basic.BasicCredentialFactory(self.realm)
51
52
53     def makeRequest(self, method='GET', clientAddress=None):
54         """
55         Create a request object to be passed to
56         L{basic.BasicCredentialFactory.decode} along with a response value.
57         Override this in a subclass.
58         """
59         raise NotImplementedError("%r did not implement makeRequest" % (
60                 self.__class__,))
61
62
63     def test_interface(self):
64         """
65         L{BasicCredentialFactory} implements L{ICredentialFactory}.
66         """
67         self.assertTrue(
68             verifyObject(ICredentialFactory, self.credentialFactory))
69
70
71     def test_usernamePassword(self):
72         """
73         L{basic.BasicCredentialFactory.decode} turns a base64-encoded response
74         into a L{UsernamePassword} object with a password which reflects the
75         one which was encoded in the response.
76         """
77         response = b64encode('%s:%s' % (self.username, self.password))
78
79         creds = self.credentialFactory.decode(response, self.request)
80         self.assertTrue(IUsernamePassword.providedBy(creds))
81         self.assertTrue(creds.checkPassword(self.password))
82         self.assertFalse(creds.checkPassword(self.password + 'wrong'))
83
84
85     def test_incorrectPadding(self):
86         """
87         L{basic.BasicCredentialFactory.decode} decodes a base64-encoded
88         response with incorrect padding.
89         """
90         response = b64encode('%s:%s' % (self.username, self.password))
91         response = response.strip('=')
92
93         creds = self.credentialFactory.decode(response, self.request)
94         self.assertTrue(verifyObject(IUsernamePassword, creds))
95         self.assertTrue(creds.checkPassword(self.password))
96
97
98     def test_invalidEncoding(self):
99         """
100         L{basic.BasicCredentialFactory.decode} raises L{LoginFailed} if passed
101         a response which is not base64-encoded.
102         """
103         response = 'x' # one byte cannot be valid base64 text
104         self.assertRaises(
105             error.LoginFailed,
106             self.credentialFactory.decode, response, self.makeRequest())
107
108
109     def test_invalidCredentials(self):
110         """
111         L{basic.BasicCredentialFactory.decode} raises L{LoginFailed} when
112         passed a response which is not valid base64-encoded text.
113         """
114         response = b64encode('123abc+/')
115         self.assertRaises(
116             error.LoginFailed,
117             self.credentialFactory.decode,
118             response, self.makeRequest())
119
120
121 class RequestMixin:
122     def makeRequest(self, method='GET', clientAddress=None):
123         """
124         Create a L{DummyRequest} (change me to create a
125         L{twisted.web.http.Request} instead).
126         """
127         request = DummyRequest('/')
128         request.method = method
129         request.client = clientAddress
130         return request
131
132
133
134 class BasicAuthTestCase(RequestMixin, BasicAuthTestsMixin, unittest.TestCase):
135     """
136     Basic authentication tests which use L{twisted.web.http.Request}.
137     """
138
139
140
141 class DigestAuthTestCase(RequestMixin, unittest.TestCase):
142     """
143     Digest authentication tests which use L{twisted.web.http.Request}.
144     """
145
146     def setUp(self):
147         """
148         Create a DigestCredentialFactory for testing
149         """
150         self.realm = "test realm"
151         self.algorithm = "md5"
152         self.credentialFactory = digest.DigestCredentialFactory(
153             self.algorithm, self.realm)
154         self.request = self.makeRequest()
155
156
157     def test_decode(self):
158         """
159         L{digest.DigestCredentialFactory.decode} calls the C{decode} method on
160         L{twisted.cred.digest.DigestCredentialFactory} with the HTTP method and
161         host of the request.
162         """
163         host = '169.254.0.1'
164         method = 'GET'
165         done = [False]
166         response = object()
167         def check(_response, _method, _host):
168             self.assertEqual(response, _response)
169             self.assertEqual(method, _method)
170             self.assertEqual(host, _host)
171             done[0] = True
172
173         self.patch(self.credentialFactory.digest, 'decode', check)
174         req = self.makeRequest(method, IPv4Address('TCP', host, 81))
175         self.credentialFactory.decode(response, req)
176         self.assertTrue(done[0])
177
178
179     def test_interface(self):
180         """
181         L{DigestCredentialFactory} implements L{ICredentialFactory}.
182         """
183         self.assertTrue(
184             verifyObject(ICredentialFactory, self.credentialFactory))
185
186
187     def test_getChallenge(self):
188         """
189         The challenge issued by L{DigestCredentialFactory.getChallenge} must
190         include C{'qop'}, C{'realm'}, C{'algorithm'}, C{'nonce'}, and
191         C{'opaque'} keys.  The values for the C{'realm'} and C{'algorithm'}
192         keys must match the values supplied to the factory's initializer.
193         None of the values may have newlines in them.
194         """
195         challenge = self.credentialFactory.getChallenge(self.request)
196         self.assertEqual(challenge['qop'], 'auth')
197         self.assertEqual(challenge['realm'], 'test realm')
198         self.assertEqual(challenge['algorithm'], 'md5')
199         self.assertIn('nonce', challenge)
200         self.assertIn('opaque', challenge)
201         for v in challenge.values():
202             self.assertNotIn('\n', v)
203
204
205     def test_getChallengeWithoutClientIP(self):
206         """
207         L{DigestCredentialFactory.getChallenge} can issue a challenge even if
208         the L{Request} it is passed returns C{None} from C{getClientIP}.
209         """
210         request = self.makeRequest('GET', None)
211         challenge = self.credentialFactory.getChallenge(request)
212         self.assertEqual(challenge['qop'], 'auth')
213         self.assertEqual(challenge['realm'], 'test realm')
214         self.assertEqual(challenge['algorithm'], 'md5')
215         self.assertIn('nonce', challenge)
216         self.assertIn('opaque', challenge)
217
218
219
220 class UnauthorizedResourceTests(unittest.TestCase):
221     """
222     Tests for L{UnauthorizedResource}.
223     """
224     def test_getChildWithDefault(self):
225         """
226         An L{UnauthorizedResource} is every child of itself.
227         """
228         resource = UnauthorizedResource([])
229         self.assertIdentical(
230             resource.getChildWithDefault("foo", None), resource)
231         self.assertIdentical(
232             resource.getChildWithDefault("bar", None), resource)
233
234
235     def _unauthorizedRenderTest(self, request):
236         """
237         Render L{UnauthorizedResource} for the given request object and verify
238         that the response code is I{Unauthorized} and that a I{WWW-Authenticate}
239         header is set in the response containing a challenge.
240         """
241         resource = UnauthorizedResource([
242                 BasicCredentialFactory('example.com')])
243         request.render(resource)
244         self.assertEqual(request.responseCode, 401)
245         self.assertEqual(
246             request.responseHeaders.getRawHeaders('www-authenticate'),
247             ['basic realm="example.com"'])
248
249
250     def test_render(self):
251         """
252         L{UnauthorizedResource} renders with a 401 response code and a
253         I{WWW-Authenticate} header and puts a simple unauthorized message
254         into the response body.
255         """
256         request = DummyRequest([''])
257         self._unauthorizedRenderTest(request)
258         self.assertEqual('Unauthorized', ''.join(request.written))
259
260
261     def test_renderHEAD(self):
262         """
263         The rendering behavior of L{UnauthorizedResource} for a I{HEAD} request
264         is like its handling of a I{GET} request, but no response body is
265         written.
266         """
267         request = DummyRequest([''])
268         request.method = 'HEAD'
269         self._unauthorizedRenderTest(request)
270         self.assertEqual('', ''.join(request.written))
271
272
273     def test_renderQuotesRealm(self):
274         """
275         The realm value included in the I{WWW-Authenticate} header set in
276         the response when L{UnauthorizedResounrce} is rendered has quotes
277         and backslashes escaped.
278         """
279         resource = UnauthorizedResource([
280                 BasicCredentialFactory('example\\"foo')])
281         request = DummyRequest([''])
282         request.render(resource)
283         self.assertEqual(
284             request.responseHeaders.getRawHeaders('www-authenticate'),
285             ['basic realm="example\\\\\\"foo"'])
286
287
288
289 class Realm(object):
290     """
291     A simple L{IRealm} implementation which gives out L{WebAvatar} for any
292     avatarId.
293
294     @type loggedIn: C{int}
295     @ivar loggedIn: The number of times C{requestAvatar} has been invoked for
296         L{IResource}.
297
298     @type loggedOut: C{int}
299     @ivar loggedOut: The number of times the logout callback has been invoked.
300     """
301     implements(portal.IRealm)
302
303     def __init__(self, avatarFactory):
304         self.loggedOut = 0
305         self.loggedIn = 0
306         self.avatarFactory = avatarFactory
307
308
309     def requestAvatar(self, avatarId, mind, *interfaces):
310         if IResource in interfaces:
311             self.loggedIn += 1
312             return IResource, self.avatarFactory(avatarId), self.logout
313         raise NotImplementedError()
314
315
316     def logout(self):
317         self.loggedOut += 1
318
319
320
321 class HTTPAuthHeaderTests(unittest.TestCase):
322     """
323     Tests for L{HTTPAuthSessionWrapper}.
324     """
325     makeRequest = DummyRequest
326
327     def setUp(self):
328         """
329         Create a realm, portal, and L{HTTPAuthSessionWrapper} to use in the tests.
330         """
331         self.username = 'foo bar'
332         self.password = 'bar baz'
333         self.avatarContent = "contents of the avatar resource itself"
334         self.childName = "foo-child"
335         self.childContent = "contents of the foo child of the avatar"
336         self.checker = InMemoryUsernamePasswordDatabaseDontUse()
337         self.checker.addUser(self.username, self.password)
338         self.avatar = Data(self.avatarContent, 'text/plain')
339         self.avatar.putChild(
340             self.childName, Data(self.childContent, 'text/plain'))
341         self.avatars = {self.username: self.avatar}
342         self.realm = Realm(self.avatars.get)
343         self.portal = portal.Portal(self.realm, [self.checker])
344         self.credentialFactories = []
345         self.wrapper = HTTPAuthSessionWrapper(
346             self.portal, self.credentialFactories)
347
348
349     def _authorizedBasicLogin(self, request):
350         """
351         Add an I{basic authorization} header to the given request and then
352         dispatch it, starting from C{self.wrapper} and returning the resulting
353         L{IResource}.
354         """
355         authorization = b64encode(self.username + ':' + self.password)
356         request.headers['authorization'] = 'Basic ' + authorization
357         return getChildForRequest(self.wrapper, request)
358
359
360     def test_getChildWithDefault(self):
361         """
362         Resource traversal which encounters an L{HTTPAuthSessionWrapper}
363         results in an L{UnauthorizedResource} instance when the request does
364         not have the required I{Authorization} headers.
365         """
366         request = self.makeRequest([self.childName])
367         child = getChildForRequest(self.wrapper, request)
368         d = request.notifyFinish()
369         def cbFinished(result):
370             self.assertEqual(request.responseCode, 401)
371         d.addCallback(cbFinished)
372         request.render(child)
373         return d
374
375
376     def _invalidAuthorizationTest(self, response):
377         """
378         Create a request with the given value as the value of an
379         I{Authorization} header and perform resource traversal with it,
380         starting at C{self.wrapper}.  Assert that the result is a 401 response
381         code.  Return a L{Deferred} which fires when this is all done.
382         """
383         self.credentialFactories.append(BasicCredentialFactory('example.com'))
384         request = self.makeRequest([self.childName])
385         request.headers['authorization'] = response
386         child = getChildForRequest(self.wrapper, request)
387         d = request.notifyFinish()
388         def cbFinished(result):
389             self.assertEqual(request.responseCode, 401)
390         d.addCallback(cbFinished)
391         request.render(child)
392         return d
393
394
395     def test_getChildWithDefaultUnauthorizedUser(self):
396         """
397         Resource traversal which enouncters an L{HTTPAuthSessionWrapper}
398         results in an L{UnauthorizedResource} when the request has an
399         I{Authorization} header with a user which does not exist.
400         """
401         return self._invalidAuthorizationTest('Basic ' + b64encode('foo:bar'))
402
403
404     def test_getChildWithDefaultUnauthorizedPassword(self):
405         """
406         Resource traversal which enouncters an L{HTTPAuthSessionWrapper}
407         results in an L{UnauthorizedResource} when the request has an
408         I{Authorization} header with a user which exists and the wrong
409         password.
410         """
411         return self._invalidAuthorizationTest(
412             'Basic ' + b64encode(self.username + ':bar'))
413
414
415     def test_getChildWithDefaultUnrecognizedScheme(self):
416         """
417         Resource traversal which enouncters an L{HTTPAuthSessionWrapper}
418         results in an L{UnauthorizedResource} when the request has an
419         I{Authorization} header with an unrecognized scheme.
420         """
421         return self._invalidAuthorizationTest('Quux foo bar baz')
422
423
424     def test_getChildWithDefaultAuthorized(self):
425         """
426         Resource traversal which encounters an L{HTTPAuthSessionWrapper}
427         results in an L{IResource} which renders the L{IResource} avatar
428         retrieved from the portal when the request has a valid I{Authorization}
429         header.
430         """
431         self.credentialFactories.append(BasicCredentialFactory('example.com'))
432         request = self.makeRequest([self.childName])
433         child = self._authorizedBasicLogin(request)
434         d = request.notifyFinish()
435         def cbFinished(ignored):
436             self.assertEqual(request.written, [self.childContent])
437         d.addCallback(cbFinished)
438         request.render(child)
439         return d
440
441
442     def test_renderAuthorized(self):
443         """
444         Resource traversal which terminates at an L{HTTPAuthSessionWrapper}
445         and includes correct authentication headers results in the
446         L{IResource} avatar (not one of its children) retrieved from the
447         portal being rendered.
448         """
449         self.credentialFactories.append(BasicCredentialFactory('example.com'))
450         # Request it exactly, not any of its children.
451         request = self.makeRequest([])
452         child = self._authorizedBasicLogin(request)
453         d = request.notifyFinish()
454         def cbFinished(ignored):
455             self.assertEqual(request.written, [self.avatarContent])
456         d.addCallback(cbFinished)
457         request.render(child)
458         return d
459
460
461     def test_getChallengeCalledWithRequest(self):
462         """
463         When L{HTTPAuthSessionWrapper} finds an L{ICredentialFactory} to issue
464         a challenge, it calls the C{getChallenge} method with the request as an
465         argument.
466         """
467         class DumbCredentialFactory(object):
468             implements(ICredentialFactory)
469             scheme = 'dumb'
470
471             def __init__(self):
472                 self.requests = []
473
474             def getChallenge(self, request):
475                 self.requests.append(request)
476                 return {}
477
478         factory = DumbCredentialFactory()
479         self.credentialFactories.append(factory)
480         request = self.makeRequest([self.childName])
481         child = getChildForRequest(self.wrapper, request)
482         d = request.notifyFinish()
483         def cbFinished(ignored):
484             self.assertEqual(factory.requests, [request])
485         d.addCallback(cbFinished)
486         request.render(child)
487         return d
488
489
490     def _logoutTest(self):
491         """
492         Issue a request for an authentication-protected resource using valid
493         credentials and then return the C{DummyRequest} instance which was
494         used.
495
496         This is a helper for tests about the behavior of the logout
497         callback.
498         """
499         self.credentialFactories.append(BasicCredentialFactory('example.com'))
500
501         class SlowerResource(Resource):
502             def render(self, request):
503                 return NOT_DONE_YET
504
505         self.avatar.putChild(self.childName, SlowerResource())
506         request = self.makeRequest([self.childName])
507         child = self._authorizedBasicLogin(request)
508         request.render(child)
509         self.assertEqual(self.realm.loggedOut, 0)
510         return request
511
512
513     def test_logout(self):
514         """
515         The realm's logout callback is invoked after the resource is rendered.
516         """
517         request = self._logoutTest()
518         request.finish()
519         self.assertEqual(self.realm.loggedOut, 1)
520
521
522     def test_logoutOnError(self):
523         """
524         The realm's logout callback is also invoked if there is an error
525         generating the response (for example, if the client disconnects
526         early).
527         """
528         request = self._logoutTest()
529         request.processingFailed(
530             Failure(ConnectionDone("Simulated disconnect")))
531         self.assertEqual(self.realm.loggedOut, 1)
532
533
534     def test_decodeRaises(self):
535         """
536         Resource traversal which enouncters an L{HTTPAuthSessionWrapper}
537         results in an L{UnauthorizedResource} when the request has a I{Basic
538         Authorization} header which cannot be decoded using base64.
539         """
540         self.credentialFactories.append(BasicCredentialFactory('example.com'))
541         request = self.makeRequest([self.childName])
542         request.headers['authorization'] = 'Basic decode should fail'
543         child = getChildForRequest(self.wrapper, request)
544         self.assertIsInstance(child, UnauthorizedResource)
545
546
547     def test_selectParseResponse(self):
548         """
549         L{HTTPAuthSessionWrapper._selectParseHeader} returns a two-tuple giving
550         the L{ICredentialFactory} to use to parse the header and a string
551         containing the portion of the header which remains to be parsed.
552         """
553         basicAuthorization = 'Basic abcdef123456'
554         self.assertEqual(
555             self.wrapper._selectParseHeader(basicAuthorization),
556             (None, None))
557         factory = BasicCredentialFactory('example.com')
558         self.credentialFactories.append(factory)
559         self.assertEqual(
560             self.wrapper._selectParseHeader(basicAuthorization),
561             (factory, 'abcdef123456'))
562
563
564     def test_unexpectedDecodeError(self):
565         """
566         Any unexpected exception raised by the credential factory's C{decode}
567         method results in a 500 response code and causes the exception to be
568         logged.
569         """
570         class UnexpectedException(Exception):
571             pass
572
573         class BadFactory(object):
574             scheme = 'bad'
575
576             def getChallenge(self, client):
577                 return {}
578
579             def decode(self, response, request):
580                 raise UnexpectedException()
581
582         self.credentialFactories.append(BadFactory())
583         request = self.makeRequest([self.childName])
584         request.headers['authorization'] = 'Bad abc'
585         child = getChildForRequest(self.wrapper, request)
586         request.render(child)
587         self.assertEqual(request.responseCode, 500)
588         self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1)
589
590
591     def test_unexpectedLoginError(self):
592         """
593         Any unexpected failure from L{Portal.login} results in a 500 response
594         code and causes the failure to be logged.
595         """
596         class UnexpectedException(Exception):
597             pass
598
599         class BrokenChecker(object):
600             credentialInterfaces = (IUsernamePassword,)
601
602             def requestAvatarId(self, credentials):
603                 raise UnexpectedException()
604
605         self.portal.registerChecker(BrokenChecker())
606         self.credentialFactories.append(BasicCredentialFactory('example.com'))
607         request = self.makeRequest([self.childName])
608         child = self._authorizedBasicLogin(request)
609         request.render(child)
610         self.assertEqual(request.responseCode, 500)
611         self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1)
612
613
614     def test_anonymousAccess(self):
615         """
616         Anonymous requests are allowed if a L{Portal} has an anonymous checker
617         registered.
618         """
619         unprotectedContents = "contents of the unprotected child resource"
620
621         self.avatars[ANONYMOUS] = Resource()
622         self.avatars[ANONYMOUS].putChild(
623             self.childName, Data(unprotectedContents, 'text/plain'))
624         self.portal.registerChecker(AllowAnonymousAccess())
625
626         self.credentialFactories.append(BasicCredentialFactory('example.com'))
627         request = self.makeRequest([self.childName])
628         child = getChildForRequest(self.wrapper, request)
629         d = request.notifyFinish()
630         def cbFinished(ignored):
631             self.assertEqual(request.written, [unprotectedContents])
632         d.addCallback(cbFinished)
633         request.render(child)
634         return d