1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Tests for L{twisted.web._auth}.
9 from zope.interface import implements
10 from zope.interface.verify import verifyObject
12 from twisted.trial import unittest
14 from twisted.python.failure import Failure
15 from twisted.internet.error import ConnectionDone
16 from twisted.internet.address import IPv4Address
18 from twisted.cred import error, portal
19 from twisted.cred.checkers import InMemoryUsernamePasswordDatabaseDontUse
20 from twisted.cred.checkers import ANONYMOUS, AllowAnonymousAccess
21 from twisted.cred.credentials import IUsernamePassword
23 from twisted.web.iweb import ICredentialFactory
24 from twisted.web.resource import IResource, Resource, getChildForRequest
25 from twisted.web._auth import basic, digest
26 from twisted.web._auth.wrapper import HTTPAuthSessionWrapper, UnauthorizedResource
27 from twisted.web._auth.basic import BasicCredentialFactory
29 from twisted.web.server import NOT_DONE_YET
30 from twisted.web.static import Data
32 from twisted.web.test.test_web import DummyRequest
36 return s.encode('base64').strip()
39 class BasicAuthTestsMixin:
41 L{TestCase} mixin class which defines a number of tests for
42 L{basic.BasicCredentialFactory}. Because this mixin defines C{setUp}, it
43 must be inherited before L{TestCase}.
46 self.request = self.makeRequest()
48 self.username = 'dreid'
49 self.password = 'S3CuR1Ty'
50 self.credentialFactory = basic.BasicCredentialFactory(self.realm)
53 def makeRequest(self, method='GET', clientAddress=None):
55 Create a request object to be passed to
56 L{basic.BasicCredentialFactory.decode} along with a response value.
57 Override this in a subclass.
59 raise NotImplementedError("%r did not implement makeRequest" % (
63 def test_interface(self):
65 L{BasicCredentialFactory} implements L{ICredentialFactory}.
68 verifyObject(ICredentialFactory, self.credentialFactory))
71 def test_usernamePassword(self):
73 L{basic.BasicCredentialFactory.decode} turns a base64-encoded response
74 into a L{UsernamePassword} object with a password which reflects the
75 one which was encoded in the response.
77 response = b64encode('%s:%s' % (self.username, self.password))
79 creds = self.credentialFactory.decode(response, self.request)
80 self.assertTrue(IUsernamePassword.providedBy(creds))
81 self.assertTrue(creds.checkPassword(self.password))
82 self.assertFalse(creds.checkPassword(self.password + 'wrong'))
85 def test_incorrectPadding(self):
87 L{basic.BasicCredentialFactory.decode} decodes a base64-encoded
88 response with incorrect padding.
90 response = b64encode('%s:%s' % (self.username, self.password))
91 response = response.strip('=')
93 creds = self.credentialFactory.decode(response, self.request)
94 self.assertTrue(verifyObject(IUsernamePassword, creds))
95 self.assertTrue(creds.checkPassword(self.password))
98 def test_invalidEncoding(self):
100 L{basic.BasicCredentialFactory.decode} raises L{LoginFailed} if passed
101 a response which is not base64-encoded.
103 response = 'x' # one byte cannot be valid base64 text
106 self.credentialFactory.decode, response, self.makeRequest())
109 def test_invalidCredentials(self):
111 L{basic.BasicCredentialFactory.decode} raises L{LoginFailed} when
112 passed a response which is not valid base64-encoded text.
114 response = b64encode('123abc+/')
117 self.credentialFactory.decode,
118 response, self.makeRequest())
122 def makeRequest(self, method='GET', clientAddress=None):
124 Create a L{DummyRequest} (change me to create a
125 L{twisted.web.http.Request} instead).
127 request = DummyRequest('/')
128 request.method = method
129 request.client = clientAddress
134 class BasicAuthTestCase(RequestMixin, BasicAuthTestsMixin, unittest.TestCase):
136 Basic authentication tests which use L{twisted.web.http.Request}.
141 class DigestAuthTestCase(RequestMixin, unittest.TestCase):
143 Digest authentication tests which use L{twisted.web.http.Request}.
148 Create a DigestCredentialFactory for testing
150 self.realm = "test realm"
151 self.algorithm = "md5"
152 self.credentialFactory = digest.DigestCredentialFactory(
153 self.algorithm, self.realm)
154 self.request = self.makeRequest()
157 def test_decode(self):
159 L{digest.DigestCredentialFactory.decode} calls the C{decode} method on
160 L{twisted.cred.digest.DigestCredentialFactory} with the HTTP method and
167 def check(_response, _method, _host):
168 self.assertEqual(response, _response)
169 self.assertEqual(method, _method)
170 self.assertEqual(host, _host)
173 self.patch(self.credentialFactory.digest, 'decode', check)
174 req = self.makeRequest(method, IPv4Address('TCP', host, 81))
175 self.credentialFactory.decode(response, req)
176 self.assertTrue(done[0])
179 def test_interface(self):
181 L{DigestCredentialFactory} implements L{ICredentialFactory}.
184 verifyObject(ICredentialFactory, self.credentialFactory))
187 def test_getChallenge(self):
189 The challenge issued by L{DigestCredentialFactory.getChallenge} must
190 include C{'qop'}, C{'realm'}, C{'algorithm'}, C{'nonce'}, and
191 C{'opaque'} keys. The values for the C{'realm'} and C{'algorithm'}
192 keys must match the values supplied to the factory's initializer.
193 None of the values may have newlines in them.
195 challenge = self.credentialFactory.getChallenge(self.request)
196 self.assertEqual(challenge['qop'], 'auth')
197 self.assertEqual(challenge['realm'], 'test realm')
198 self.assertEqual(challenge['algorithm'], 'md5')
199 self.assertIn('nonce', challenge)
200 self.assertIn('opaque', challenge)
201 for v in challenge.values():
202 self.assertNotIn('\n', v)
205 def test_getChallengeWithoutClientIP(self):
207 L{DigestCredentialFactory.getChallenge} can issue a challenge even if
208 the L{Request} it is passed returns C{None} from C{getClientIP}.
210 request = self.makeRequest('GET', None)
211 challenge = self.credentialFactory.getChallenge(request)
212 self.assertEqual(challenge['qop'], 'auth')
213 self.assertEqual(challenge['realm'], 'test realm')
214 self.assertEqual(challenge['algorithm'], 'md5')
215 self.assertIn('nonce', challenge)
216 self.assertIn('opaque', challenge)
220 class UnauthorizedResourceTests(unittest.TestCase):
222 Tests for L{UnauthorizedResource}.
224 def test_getChildWithDefault(self):
226 An L{UnauthorizedResource} is every child of itself.
228 resource = UnauthorizedResource([])
229 self.assertIdentical(
230 resource.getChildWithDefault("foo", None), resource)
231 self.assertIdentical(
232 resource.getChildWithDefault("bar", None), resource)
235 def _unauthorizedRenderTest(self, request):
237 Render L{UnauthorizedResource} for the given request object and verify
238 that the response code is I{Unauthorized} and that a I{WWW-Authenticate}
239 header is set in the response containing a challenge.
241 resource = UnauthorizedResource([
242 BasicCredentialFactory('example.com')])
243 request.render(resource)
244 self.assertEqual(request.responseCode, 401)
246 request.responseHeaders.getRawHeaders('www-authenticate'),
247 ['basic realm="example.com"'])
250 def test_render(self):
252 L{UnauthorizedResource} renders with a 401 response code and a
253 I{WWW-Authenticate} header and puts a simple unauthorized message
254 into the response body.
256 request = DummyRequest([''])
257 self._unauthorizedRenderTest(request)
258 self.assertEqual('Unauthorized', ''.join(request.written))
261 def test_renderHEAD(self):
263 The rendering behavior of L{UnauthorizedResource} for a I{HEAD} request
264 is like its handling of a I{GET} request, but no response body is
267 request = DummyRequest([''])
268 request.method = 'HEAD'
269 self._unauthorizedRenderTest(request)
270 self.assertEqual('', ''.join(request.written))
273 def test_renderQuotesRealm(self):
275 The realm value included in the I{WWW-Authenticate} header set in
276 the response when L{UnauthorizedResounrce} is rendered has quotes
277 and backslashes escaped.
279 resource = UnauthorizedResource([
280 BasicCredentialFactory('example\\"foo')])
281 request = DummyRequest([''])
282 request.render(resource)
284 request.responseHeaders.getRawHeaders('www-authenticate'),
285 ['basic realm="example\\\\\\"foo"'])
291 A simple L{IRealm} implementation which gives out L{WebAvatar} for any
294 @type loggedIn: C{int}
295 @ivar loggedIn: The number of times C{requestAvatar} has been invoked for
298 @type loggedOut: C{int}
299 @ivar loggedOut: The number of times the logout callback has been invoked.
301 implements(portal.IRealm)
303 def __init__(self, avatarFactory):
306 self.avatarFactory = avatarFactory
309 def requestAvatar(self, avatarId, mind, *interfaces):
310 if IResource in interfaces:
312 return IResource, self.avatarFactory(avatarId), self.logout
313 raise NotImplementedError()
321 class HTTPAuthHeaderTests(unittest.TestCase):
323 Tests for L{HTTPAuthSessionWrapper}.
325 makeRequest = DummyRequest
329 Create a realm, portal, and L{HTTPAuthSessionWrapper} to use in the tests.
331 self.username = 'foo bar'
332 self.password = 'bar baz'
333 self.avatarContent = "contents of the avatar resource itself"
334 self.childName = "foo-child"
335 self.childContent = "contents of the foo child of the avatar"
336 self.checker = InMemoryUsernamePasswordDatabaseDontUse()
337 self.checker.addUser(self.username, self.password)
338 self.avatar = Data(self.avatarContent, 'text/plain')
339 self.avatar.putChild(
340 self.childName, Data(self.childContent, 'text/plain'))
341 self.avatars = {self.username: self.avatar}
342 self.realm = Realm(self.avatars.get)
343 self.portal = portal.Portal(self.realm, [self.checker])
344 self.credentialFactories = []
345 self.wrapper = HTTPAuthSessionWrapper(
346 self.portal, self.credentialFactories)
349 def _authorizedBasicLogin(self, request):
351 Add an I{basic authorization} header to the given request and then
352 dispatch it, starting from C{self.wrapper} and returning the resulting
355 authorization = b64encode(self.username + ':' + self.password)
356 request.headers['authorization'] = 'Basic ' + authorization
357 return getChildForRequest(self.wrapper, request)
360 def test_getChildWithDefault(self):
362 Resource traversal which encounters an L{HTTPAuthSessionWrapper}
363 results in an L{UnauthorizedResource} instance when the request does
364 not have the required I{Authorization} headers.
366 request = self.makeRequest([self.childName])
367 child = getChildForRequest(self.wrapper, request)
368 d = request.notifyFinish()
369 def cbFinished(result):
370 self.assertEqual(request.responseCode, 401)
371 d.addCallback(cbFinished)
372 request.render(child)
376 def _invalidAuthorizationTest(self, response):
378 Create a request with the given value as the value of an
379 I{Authorization} header and perform resource traversal with it,
380 starting at C{self.wrapper}. Assert that the result is a 401 response
381 code. Return a L{Deferred} which fires when this is all done.
383 self.credentialFactories.append(BasicCredentialFactory('example.com'))
384 request = self.makeRequest([self.childName])
385 request.headers['authorization'] = response
386 child = getChildForRequest(self.wrapper, request)
387 d = request.notifyFinish()
388 def cbFinished(result):
389 self.assertEqual(request.responseCode, 401)
390 d.addCallback(cbFinished)
391 request.render(child)
395 def test_getChildWithDefaultUnauthorizedUser(self):
397 Resource traversal which enouncters an L{HTTPAuthSessionWrapper}
398 results in an L{UnauthorizedResource} when the request has an
399 I{Authorization} header with a user which does not exist.
401 return self._invalidAuthorizationTest('Basic ' + b64encode('foo:bar'))
404 def test_getChildWithDefaultUnauthorizedPassword(self):
406 Resource traversal which enouncters an L{HTTPAuthSessionWrapper}
407 results in an L{UnauthorizedResource} when the request has an
408 I{Authorization} header with a user which exists and the wrong
411 return self._invalidAuthorizationTest(
412 'Basic ' + b64encode(self.username + ':bar'))
415 def test_getChildWithDefaultUnrecognizedScheme(self):
417 Resource traversal which enouncters an L{HTTPAuthSessionWrapper}
418 results in an L{UnauthorizedResource} when the request has an
419 I{Authorization} header with an unrecognized scheme.
421 return self._invalidAuthorizationTest('Quux foo bar baz')
424 def test_getChildWithDefaultAuthorized(self):
426 Resource traversal which encounters an L{HTTPAuthSessionWrapper}
427 results in an L{IResource} which renders the L{IResource} avatar
428 retrieved from the portal when the request has a valid I{Authorization}
431 self.credentialFactories.append(BasicCredentialFactory('example.com'))
432 request = self.makeRequest([self.childName])
433 child = self._authorizedBasicLogin(request)
434 d = request.notifyFinish()
435 def cbFinished(ignored):
436 self.assertEqual(request.written, [self.childContent])
437 d.addCallback(cbFinished)
438 request.render(child)
442 def test_renderAuthorized(self):
444 Resource traversal which terminates at an L{HTTPAuthSessionWrapper}
445 and includes correct authentication headers results in the
446 L{IResource} avatar (not one of its children) retrieved from the
447 portal being rendered.
449 self.credentialFactories.append(BasicCredentialFactory('example.com'))
450 # Request it exactly, not any of its children.
451 request = self.makeRequest([])
452 child = self._authorizedBasicLogin(request)
453 d = request.notifyFinish()
454 def cbFinished(ignored):
455 self.assertEqual(request.written, [self.avatarContent])
456 d.addCallback(cbFinished)
457 request.render(child)
461 def test_getChallengeCalledWithRequest(self):
463 When L{HTTPAuthSessionWrapper} finds an L{ICredentialFactory} to issue
464 a challenge, it calls the C{getChallenge} method with the request as an
467 class DumbCredentialFactory(object):
468 implements(ICredentialFactory)
474 def getChallenge(self, request):
475 self.requests.append(request)
478 factory = DumbCredentialFactory()
479 self.credentialFactories.append(factory)
480 request = self.makeRequest([self.childName])
481 child = getChildForRequest(self.wrapper, request)
482 d = request.notifyFinish()
483 def cbFinished(ignored):
484 self.assertEqual(factory.requests, [request])
485 d.addCallback(cbFinished)
486 request.render(child)
490 def _logoutTest(self):
492 Issue a request for an authentication-protected resource using valid
493 credentials and then return the C{DummyRequest} instance which was
496 This is a helper for tests about the behavior of the logout
499 self.credentialFactories.append(BasicCredentialFactory('example.com'))
501 class SlowerResource(Resource):
502 def render(self, request):
505 self.avatar.putChild(self.childName, SlowerResource())
506 request = self.makeRequest([self.childName])
507 child = self._authorizedBasicLogin(request)
508 request.render(child)
509 self.assertEqual(self.realm.loggedOut, 0)
513 def test_logout(self):
515 The realm's logout callback is invoked after the resource is rendered.
517 request = self._logoutTest()
519 self.assertEqual(self.realm.loggedOut, 1)
522 def test_logoutOnError(self):
524 The realm's logout callback is also invoked if there is an error
525 generating the response (for example, if the client disconnects
528 request = self._logoutTest()
529 request.processingFailed(
530 Failure(ConnectionDone("Simulated disconnect")))
531 self.assertEqual(self.realm.loggedOut, 1)
534 def test_decodeRaises(self):
536 Resource traversal which enouncters an L{HTTPAuthSessionWrapper}
537 results in an L{UnauthorizedResource} when the request has a I{Basic
538 Authorization} header which cannot be decoded using base64.
540 self.credentialFactories.append(BasicCredentialFactory('example.com'))
541 request = self.makeRequest([self.childName])
542 request.headers['authorization'] = 'Basic decode should fail'
543 child = getChildForRequest(self.wrapper, request)
544 self.assertIsInstance(child, UnauthorizedResource)
547 def test_selectParseResponse(self):
549 L{HTTPAuthSessionWrapper._selectParseHeader} returns a two-tuple giving
550 the L{ICredentialFactory} to use to parse the header and a string
551 containing the portion of the header which remains to be parsed.
553 basicAuthorization = 'Basic abcdef123456'
555 self.wrapper._selectParseHeader(basicAuthorization),
557 factory = BasicCredentialFactory('example.com')
558 self.credentialFactories.append(factory)
560 self.wrapper._selectParseHeader(basicAuthorization),
561 (factory, 'abcdef123456'))
564 def test_unexpectedDecodeError(self):
566 Any unexpected exception raised by the credential factory's C{decode}
567 method results in a 500 response code and causes the exception to be
570 class UnexpectedException(Exception):
573 class BadFactory(object):
576 def getChallenge(self, client):
579 def decode(self, response, request):
580 raise UnexpectedException()
582 self.credentialFactories.append(BadFactory())
583 request = self.makeRequest([self.childName])
584 request.headers['authorization'] = 'Bad abc'
585 child = getChildForRequest(self.wrapper, request)
586 request.render(child)
587 self.assertEqual(request.responseCode, 500)
588 self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1)
591 def test_unexpectedLoginError(self):
593 Any unexpected failure from L{Portal.login} results in a 500 response
594 code and causes the failure to be logged.
596 class UnexpectedException(Exception):
599 class BrokenChecker(object):
600 credentialInterfaces = (IUsernamePassword,)
602 def requestAvatarId(self, credentials):
603 raise UnexpectedException()
605 self.portal.registerChecker(BrokenChecker())
606 self.credentialFactories.append(BasicCredentialFactory('example.com'))
607 request = self.makeRequest([self.childName])
608 child = self._authorizedBasicLogin(request)
609 request.render(child)
610 self.assertEqual(request.responseCode, 500)
611 self.assertEqual(len(self.flushLoggedErrors(UnexpectedException)), 1)
614 def test_anonymousAccess(self):
616 Anonymous requests are allowed if a L{Portal} has an anonymous checker
619 unprotectedContents = "contents of the unprotected child resource"
621 self.avatars[ANONYMOUS] = Resource()
622 self.avatars[ANONYMOUS].putChild(
623 self.childName, Data(unprotectedContents, 'text/plain'))
624 self.portal.registerChecker(AllowAnonymousAccess())
626 self.credentialFactories.append(BasicCredentialFactory('example.com'))
627 request = self.makeRequest([self.childName])
628 child = getChildForRequest(self.wrapper, request)
629 d = request.notifyFinish()
630 def cbFinished(ignored):
631 self.assertEqual(request.written, [unprotectedContents])
632 d.addCallback(cbFinished)
633 request.render(child)