1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Tests for various parts of L{twisted.web}.
8 from cStringIO import StringIO
10 from zope.interface import implements
11 from zope.interface.verify import verifyObject
13 from twisted.trial import unittest
14 from twisted.internet import reactor
15 from twisted.internet.address import IPv4Address
16 from twisted.internet.defer import Deferred
17 from twisted.web import server, resource, util
18 from twisted.internet import defer, interfaces, task
19 from twisted.web import iweb, http, http_headers, error
20 from twisted.python import log
25 Represents a dummy or fake request.
27 @ivar _finishedDeferreds: C{None} or a C{list} of L{Deferreds} which will
28 be called back with C{None} when C{finish} is called or which will be
29 errbacked if C{processingFailed} is called.
31 @type headers: C{dict}
32 @ivar headers: A mapping of header name to header value for all request
35 @type outgoingHeaders: C{dict}
36 @ivar outgoingHeaders: A mapping of header name to header value for all
39 @type responseCode: C{int}
40 @ivar responseCode: The response code which was passed to
43 @type written: C{list} of C{str}
44 @ivar written: The bytes which have been written to the request.
50 def registerProducer(self, prod,s):
53 prod.resumeProducing()
55 def unregisterProducer(self):
59 def __init__(self, postpath, session=None):
63 self.postpath = postpath
66 self.protoSession = session or server.Session(0, self)
68 self.outgoingHeaders = {}
69 self.responseHeaders = http_headers.Headers()
70 self.responseCode = None
72 self._finishedDeferreds = []
75 def getHeader(self, name):
77 Retrieve the value of a request header.
80 @param name: The name of the request header for which to retrieve the
81 value. Header names are compared case-insensitively.
83 @rtype: C{str} or L{NoneType}
84 @return: The value of the specified request header.
86 return self.headers.get(name.lower(), None)
89 def setHeader(self, name, value):
90 """TODO: make this assert on write() if the header is content-length
92 self.outgoingHeaders[name.lower()] = value
97 assert not self.written, "Session cannot be requested after data has been written."
98 self.session = self.protoSession
102 def render(self, resource):
104 Render the given resource as a response to this request.
106 This implementation only handles a few of the most common behaviors of
107 resources. It can handle a render method that returns a string or
108 C{NOT_DONE_YET}. It doesn't know anything about the semantics of
109 request methods (eg HEAD) nor how to set any particular headers.
110 Basically, it's largely broken, but sufficient for some tests at least.
111 It should B{not} be expanded to do all the same stuff L{Request} does.
112 Instead, L{DummyRequest} should be phased out and L{Request} (or some
113 other real code factored in a different way) used.
115 result = resource.render(self)
116 if result is server.NOT_DONE_YET:
122 def write(self, data):
123 self.written.append(data)
125 def notifyFinish(self):
127 Return a L{Deferred} which is called back with C{None} when the request
128 is finished. This will probably only work if you haven't called
131 finished = Deferred()
132 self._finishedDeferreds.append(finished)
138 Record that the request is finished and callback and L{Deferred}s
139 waiting for notification of this.
141 self.finished = self.finished + 1
142 if self._finishedDeferreds is not None:
143 observers = self._finishedDeferreds
144 self._finishedDeferreds = None
145 for obs in observers:
149 def processingFailed(self, reason):
151 Errback and L{Deferreds} waiting for finish notification.
153 if self._finishedDeferreds is not None:
154 observers = self._finishedDeferreds
155 self._finishedDeferreds = None
156 for obs in observers:
160 def addArg(self, name, value):
161 self.args[name] = [value]
164 def setResponseCode(self, code, message=None):
166 Set the HTTP status response code, but takes care that this is called
167 before any data is written.
169 assert not self.written, "Response code cannot be set after data has been written: %s." % "@@@@".join(self.written)
170 self.responseCode = code
171 self.responseMessage = message
174 def setLastModified(self, when):
175 assert not self.written, "Last-Modified cannot be set after data has been written: %s." % "@@@@".join(self.written)
178 def setETag(self, tag):
179 assert not self.written, "ETag cannot be set after data has been written: %s." % "@@@@".join(self.written)
182 def getClientIP(self):
184 Return the IPv4 address of the client which made this request, if there
185 is one, otherwise C{None}.
187 if isinstance(self.client, IPv4Address):
188 return self.client.host
192 class ResourceTestCase(unittest.TestCase):
193 def testListEntities(self):
194 r = resource.Resource()
195 self.assertEqual([], r.listEntities())
198 class SimpleResource(resource.Resource):
200 @ivar _contentType: C{None} or a C{str} giving the value of the
201 I{Content-Type} header in the response this resource will render. If it
202 is C{None}, no I{Content-Type} header will be set in the response.
204 def __init__(self, contentType=None):
205 resource.Resource.__init__(self)
206 self._contentType = contentType
209 def render(self, request):
210 if self._contentType is not None:
211 request.responseHeaders.setRawHeaders(
212 "content-type", [self._contentType])
214 if http.CACHED in (request.setLastModified(10),
215 request.setETag('MatchingTag')):
227 self.written = StringIO()
231 return IPv4Address("TCP", '192.168.1.1', 12344)
233 def write(self, bytes):
234 assert isinstance(bytes, str)
235 self.written.write(bytes)
237 def writeSequence(self, iovec):
238 map(self.write, iovec)
241 return IPv4Address("TCP", '10.0.0.1', self.port)
243 def registerProducer(self, producer, streaming):
244 self.producers.append((producer, streaming))
246 def loseConnection(self):
247 self.disconnected = True
251 implements(interfaces.ISSLTransport)
253 site = server.Site(resource.Resource())
256 self.transport = self.TCP()
259 def requestDone(self, request):
264 class SiteTest(unittest.TestCase):
265 def test_simplestSite(self):
267 L{Site.getResourceFor} returns the C{""} child of the root resource it
268 is constructed with when processing a request for I{/}.
270 sres1 = SimpleResource()
271 sres2 = SimpleResource()
272 sres1.putChild("",sres2)
273 site = server.Site(sres1)
274 self.assertIdentical(
275 site.getResourceFor(DummyRequest([''])),
276 sres2, "Got the wrong resource.")
280 class SessionTest(unittest.TestCase):
282 Tests for L{server.Session}.
286 Create a site with one active session using a deterministic, easily
289 self.clock = task.Clock()
291 self.site = server.Site(resource.Resource())
292 self.session = server.Session(self.site, self.uid, self.clock)
293 self.site.sessions[self.uid] = self.session
296 def test_defaultReactor(self):
298 If not value is passed to L{server.Session.__init__}, the global
301 session = server.Session(server.Site(resource.Resource()), '123')
302 self.assertIdentical(session._reactor, reactor)
305 def test_startCheckingExpiration(self):
307 L{server.Session.startCheckingExpiration} causes the session to expire
308 after L{server.Session.sessionTimeout} seconds without activity.
310 self.session.startCheckingExpiration()
312 # Advance to almost the timeout - nothing should happen.
313 self.clock.advance(self.session.sessionTimeout - 1)
314 self.assertIn(self.uid, self.site.sessions)
316 # Advance to the timeout, the session should expire.
317 self.clock.advance(1)
318 self.assertNotIn(self.uid, self.site.sessions)
320 # There should be no calls left over, either.
321 self.assertFalse(self.clock.calls)
324 def test_expire(self):
326 L{server.Session.expire} expires the session.
328 self.session.expire()
329 # It should be gone from the session dictionary.
330 self.assertNotIn(self.uid, self.site.sessions)
331 # And there should be no pending delayed calls.
332 self.assertFalse(self.clock.calls)
335 def test_expireWhileChecking(self):
337 L{server.Session.expire} expires the session even if the timeout call
340 self.session.startCheckingExpiration()
344 def test_notifyOnExpire(self):
346 A function registered with L{server.Session.notifyOnExpire} is called
347 when the session expires.
349 callbackRan = [False]
351 callbackRan[0] = True
352 self.session.notifyOnExpire(expired)
353 self.session.expire()
354 self.assertTrue(callbackRan[0])
357 def test_touch(self):
359 L{server.Session.touch} updates L{server.Session.lastModified} and
360 delays session timeout.
362 # Make sure it works before startCheckingExpiration
363 self.clock.advance(3)
365 self.assertEqual(self.session.lastModified, 3)
367 # And after startCheckingExpiration
368 self.session.startCheckingExpiration()
369 self.clock.advance(self.session.sessionTimeout - 1)
371 self.clock.advance(self.session.sessionTimeout - 1)
372 self.assertIn(self.uid, self.site.sessions)
374 # It should have advanced it by just sessionTimeout, no more.
375 self.clock.advance(1)
376 self.assertNotIn(self.uid, self.site.sessions)
379 def test_startCheckingExpirationParameterDeprecated(self):
381 L{server.Session.startCheckingExpiration} emits a deprecation warning
382 if it is invoked with a parameter.
384 self.session.startCheckingExpiration(123)
385 warnings = self.flushWarnings([
386 self.test_startCheckingExpirationParameterDeprecated])
387 self.assertEqual(len(warnings), 1)
388 self.assertEqual(warnings[0]['category'], DeprecationWarning)
390 warnings[0]['message'],
391 "The lifetime parameter to startCheckingExpiration is deprecated "
392 "since Twisted 9.0. See Session.sessionTimeout instead.")
395 def test_checkExpiredDeprecated(self):
397 L{server.Session.checkExpired} is deprecated.
399 self.session.checkExpired()
400 warnings = self.flushWarnings([self.test_checkExpiredDeprecated])
401 self.assertEqual(warnings[0]['category'], DeprecationWarning)
403 warnings[0]['message'],
404 "Session.checkExpired is deprecated since Twisted 9.0; sessions "
405 "check themselves now, you don't need to.")
406 self.assertEqual(len(warnings), 1)
409 # Conditional requests:
410 # If-None-Match, If-Modified-Since
412 # make conditional request:
413 # normal response if condition succeeds
414 # if condition fails:
419 return whole.split('\r\n\r\n', 1)[1]
421 def httpHeader(whole, key):
423 headers = whole.split('\r\n\r\n', 1)[0]
424 for header in headers.split('\r\n'):
425 if header.lower().startswith(key):
426 return header.split(':', 1)[1].strip()
430 l1 = whole.split('\r\n', 1)[0]
431 return int(l1.split()[1])
433 class ConditionalTest(unittest.TestCase):
435 web.server's handling of conditional requests for cache validation.
438 self.resrc = SimpleResource()
439 self.resrc.putChild('', self.resrc)
440 self.resrc.putChild('with-content-type', SimpleResource('image/jpeg'))
441 self.site = server.Site(self.resrc)
442 self.site.logFile = log.logfile
444 # HELLLLLLLLLLP! This harness is Very Ugly.
445 self.channel = self.site.buildProtocol(None)
446 self.transport = http.StringTransport()
447 self.transport.close = lambda *a, **kw: None
448 self.transport.disconnecting = lambda *a, **kw: 0
449 self.transport.getPeer = lambda *a, **kw: "peer"
450 self.transport.getHost = lambda *a, **kw: "host"
451 self.channel.makeConnection(self.transport)
455 self.channel.connectionLost(None)
458 def _modifiedTest(self, modifiedSince=None, etag=None):
460 Given the value C{modifiedSince} for the I{If-Modified-Since} header or
461 the value C{etag} for the I{If-Not-Match} header, verify that a response
462 with a 200 code, a default Content-Type, and the resource as the body is
465 if modifiedSince is not None:
466 validator = "If-Modified-Since: " + modifiedSince
468 validator = "If-Not-Match: " + etag
469 for line in ["GET / HTTP/1.1", validator, ""]:
470 self.channel.lineReceived(line)
471 result = self.transport.getvalue()
472 self.assertEqual(httpCode(result), http.OK)
473 self.assertEqual(httpBody(result), "correct")
474 self.assertEqual(httpHeader(result, "Content-Type"), "text/html")
477 def test_modified(self):
479 If a request is made with an I{If-Modified-Since} header value with
480 a timestamp indicating a time before the last modification of the
481 requested resource, a 200 response is returned along with a response
482 body containing the resource.
484 self._modifiedTest(modifiedSince=http.datetimeToString(1))
487 def test_unmodified(self):
489 If a request is made with an I{If-Modified-Since} header value with a
490 timestamp indicating a time after the last modification of the request
491 resource, a 304 response is returned along with an empty response body
492 and no Content-Type header if the application does not set one.
494 for line in ["GET / HTTP/1.1",
495 "If-Modified-Since: " + http.datetimeToString(100), ""]:
496 self.channel.lineReceived(line)
497 result = self.transport.getvalue()
498 self.assertEqual(httpCode(result), http.NOT_MODIFIED)
499 self.assertEqual(httpBody(result), "")
500 # Since there SHOULD NOT (RFC 2616, section 10.3.5) be any
501 # entity-headers, the Content-Type is not set if the application does
502 # not explicitly set it.
503 self.assertEqual(httpHeader(result, "Content-Type"), None)
506 def test_invalidTimestamp(self):
508 If a request is made with an I{If-Modified-Since} header value which
509 cannot be parsed, the header is treated as not having been present
510 and a normal 200 response is returned with a response body
511 containing the resource.
513 self._modifiedTest(modifiedSince="like, maybe a week ago, I guess?")
516 def test_invalidTimestampYear(self):
518 If a request is made with an I{If-Modified-Since} header value which
519 contains a string in the year position which is not an integer, the
520 header is treated as not having been present and a normal 200
521 response is returned with a response body containing the resource.
523 self._modifiedTest(modifiedSince="Thu, 01 Jan blah 00:00:10 GMT")
526 def test_invalidTimestampTooLongAgo(self):
528 If a request is made with an I{If-Modified-Since} header value which
529 contains a year before the epoch, the header is treated as not
530 having been present and a normal 200 response is returned with a
531 response body containing the resource.
533 self._modifiedTest(modifiedSince="Thu, 01 Jan 1899 00:00:10 GMT")
536 def test_invalidTimestampMonth(self):
538 If a request is made with an I{If-Modified-Since} header value which
539 contains a string in the month position which is not a recognized
540 month abbreviation, the header is treated as not having been present
541 and a normal 200 response is returned with a response body
542 containing the resource.
544 self._modifiedTest(modifiedSince="Thu, 01 Blah 1970 00:00:10 GMT")
547 def test_etagMatchedNot(self):
549 If a request is made with an I{If-None-Match} ETag which does not match
550 the current ETag of the requested resource, the header is treated as not
551 having been present and a normal 200 response is returned with a
552 response body containing the resource.
554 self._modifiedTest(etag="unmatchedTag")
557 def test_etagMatched(self):
559 If a request is made with an I{If-None-Match} ETag which does match the
560 current ETag of the requested resource, a 304 response is returned along
561 with an empty response body.
563 for line in ["GET / HTTP/1.1", "If-None-Match: MatchingTag", ""]:
564 self.channel.lineReceived(line)
565 result = self.transport.getvalue()
566 self.assertEqual(httpHeader(result, "ETag"), "MatchingTag")
567 self.assertEqual(httpCode(result), http.NOT_MODIFIED)
568 self.assertEqual(httpBody(result), "")
571 def test_unmodifiedWithContentType(self):
573 Similar to L{test_etagMatched}, but the response should include a
574 I{Content-Type} header if the application explicitly sets one.
576 This I{Content-Type} header SHOULD NOT be present according to RFC 2616,
577 section 10.3.5. It will only be present if the application explicitly
580 for line in ["GET /with-content-type HTTP/1.1",
581 "If-None-Match: MatchingTag", ""]:
582 self.channel.lineReceived(line)
583 result = self.transport.getvalue()
584 self.assertEqual(httpCode(result), http.NOT_MODIFIED)
585 self.assertEqual(httpBody(result), "")
586 self.assertEqual(httpHeader(result, "Content-Type"), "image/jpeg")
591 from twisted.web import google
592 class GoogleTestCase(unittest.TestCase):
593 def testCheckGoogle(self):
594 raise unittest.SkipTest("no violation of google ToS")
595 d = google.checkGoogle('site:www.twistedmatrix.com twisted')
596 d.addCallback(self.assertEqual, 'http://twistedmatrix.com/')
600 def test_deprecated(self):
602 Google module is deprecated since Twisted 11.1.0
604 from twisted.web import google
605 warnings = self.flushWarnings(offendingFunctions=[self.test_deprecated])
606 self.assertEqual(len(warnings), 1)
607 self.assertEqual(warnings[0]['category'], DeprecationWarning)
611 class RequestTests(unittest.TestCase):
613 Tests for the HTTP request class, L{server.Request}.
616 def test_interface(self):
618 L{server.Request} instances provide L{iweb.IRequest}.
621 verifyObject(iweb.IRequest, server.Request(DummyChannel(), True)))
624 def testChildLink(self):
625 request = server.Request(DummyChannel(), 1)
627 request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
628 self.assertEqual(request.childLink('baz'), 'bar/baz')
629 request = server.Request(DummyChannel(), 1)
631 request.requestReceived('GET', '/foo/bar/', 'HTTP/1.0')
632 self.assertEqual(request.childLink('baz'), 'baz')
634 def testPrePathURLSimple(self):
635 request = server.Request(DummyChannel(), 1)
637 request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
638 request.setHost('example.com', 80)
639 self.assertEqual(request.prePathURL(), 'http://example.com/foo/bar')
641 def testPrePathURLNonDefault(self):
643 d.transport.port = 81
644 request = server.Request(d, 1)
645 request.setHost('example.com', 81)
647 request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
648 self.assertEqual(request.prePathURL(), 'http://example.com:81/foo/bar')
650 def testPrePathURLSSLPort(self):
652 d.transport.port = 443
653 request = server.Request(d, 1)
654 request.setHost('example.com', 443)
656 request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
657 self.assertEqual(request.prePathURL(), 'http://example.com:443/foo/bar')
659 def testPrePathURLSSLPortAndSSL(self):
661 d.transport = DummyChannel.SSL()
662 d.transport.port = 443
663 request = server.Request(d, 1)
664 request.setHost('example.com', 443)
666 request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
667 self.assertEqual(request.prePathURL(), 'https://example.com/foo/bar')
669 def testPrePathURLHTTPPortAndSSL(self):
671 d.transport = DummyChannel.SSL()
672 d.transport.port = 80
673 request = server.Request(d, 1)
674 request.setHost('example.com', 80)
676 request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
677 self.assertEqual(request.prePathURL(), 'https://example.com:80/foo/bar')
679 def testPrePathURLSSLNonDefault(self):
681 d.transport = DummyChannel.SSL()
682 d.transport.port = 81
683 request = server.Request(d, 1)
684 request.setHost('example.com', 81)
686 request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
687 self.assertEqual(request.prePathURL(), 'https://example.com:81/foo/bar')
689 def testPrePathURLSetSSLHost(self):
691 d.transport.port = 81
692 request = server.Request(d, 1)
693 request.setHost('foo.com', 81, 1)
695 request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
696 self.assertEqual(request.prePathURL(), 'https://foo.com:81/foo/bar')
699 def test_prePathURLQuoting(self):
701 L{Request.prePathURL} quotes special characters in the URL segments to
702 preserve the original meaning.
705 request = server.Request(d, 1)
706 request.setHost('example.com', 80)
708 request.requestReceived('GET', '/foo%2Fbar', 'HTTP/1.0')
709 self.assertEqual(request.prePathURL(), 'http://example.com/foo%2Fbar')
713 class RootResource(resource.Resource):
715 def getChildWithDefault(self, name, request):
716 request.rememberRootURL()
717 return resource.Resource.getChildWithDefault(self, name, request)
718 def render(self, request):
721 class RememberURLTest(unittest.TestCase):
722 def createServer(self, r):
723 chan = DummyChannel()
724 chan.site = server.Site(r)
727 def testSimple(self):
728 r = resource.Resource()
731 r.putChild('foo', rr)
733 rr.putChild('bar', resource.Resource())
734 chan = self.createServer(r)
735 for url in ['/foo/', '/foo/bar', '/foo/bar/baz', '/foo/bar/']:
736 request = server.Request(chan, 1)
737 request.setHost('example.com', 81)
739 request.requestReceived('GET', url, 'HTTP/1.0')
740 self.assertEqual(request.getRootURL(), "http://example.com/foo")
745 rr.putChild('bar', resource.Resource())
746 chan = self.createServer(rr)
747 for url in ['/', '/bar', '/bar/baz', '/bar/']:
748 request = server.Request(chan, 1)
749 request.setHost('example.com', 81)
751 request.requestReceived('GET', url, 'HTTP/1.0')
752 self.assertEqual(request.getRootURL(), "http://example.com/")
755 class NewRenderResource(resource.Resource):
756 def render_GET(self, request):
759 def render_HEH(self, request):
764 class HeadlessResource(object):
766 A resource that implements GET but not HEAD.
768 implements(resource.IResource)
770 allowedMethods = ["GET"]
772 def render(self, request):
774 Leave the request open for future writes.
776 self.request = request
777 if request.method not in self.allowedMethods:
778 raise error.UnsupportedMethod(self.allowedMethods)
779 self.request.write("some data")
780 return server.NOT_DONE_YET
785 class NewRenderTestCase(unittest.TestCase):
787 Tests for L{server.Request.render}.
789 def _getReq(self, resource=None):
791 Create a request object with a stub channel and install the
792 passed resource at /newrender. If no resource is passed,
797 resource = NewRenderResource()
798 d.site.resource.putChild('newrender', resource)
799 d.transport.port = 81
800 request = server.Request(d, 1)
801 request.setHost('example.com', 81)
805 def testGoodMethods(self):
807 req.requestReceived('GET', '/newrender', 'HTTP/1.0')
808 self.assertEqual(req.transport.getvalue().splitlines()[-1], 'hi hi')
811 req.requestReceived('HEH', '/newrender', 'HTTP/1.0')
812 self.assertEqual(req.transport.getvalue().splitlines()[-1], 'ho ho')
814 def testBadMethods(self):
816 req.requestReceived('CONNECT', '/newrender', 'HTTP/1.0')
817 self.assertEqual(req.code, 501)
820 req.requestReceived('hlalauguG', '/newrender', 'HTTP/1.0')
821 self.assertEqual(req.code, 501)
823 def testImplicitHead(self):
825 req.requestReceived('HEAD', '/newrender', 'HTTP/1.0')
826 self.assertEqual(req.code, 200)
827 self.assertEqual(-1, req.transport.getvalue().find('hi hi'))
830 def test_unsupportedHead(self):
832 HEAD requests against resource that only claim support for GET
833 should not include a body in the response.
835 resource = HeadlessResource()
836 req = self._getReq(resource)
837 req.requestReceived("HEAD", "/newrender", "HTTP/1.0")
838 headers, body = req.transport.getvalue().split('\r\n\r\n')
839 self.assertEqual(req.code, 200)
840 self.assertEqual(body, '')
844 class GettableResource(resource.Resource):
846 Used by AllowedMethodsTest to simulate an allowed method.
848 def render_GET(self):
851 def render_fred_render_ethel(self):
853 The unusual method name is designed to test the culling method
854 in C{twisted.web.resource._computeAllowedMethods}.
860 class AllowedMethodsTest(unittest.TestCase):
862 'C{twisted.web.resource._computeAllowedMethods} is provided by a
863 default should the subclass not provide the method.
869 Generate a dummy request for use by C{_computeAllowedMethod} tests.
872 d.site.resource.putChild('gettableresource', GettableResource())
873 d.transport.port = 81
874 request = server.Request(d, 1)
875 request.setHost('example.com', 81)
880 def test_computeAllowedMethods(self):
882 C{_computeAllowedMethods} will search through the
883 'gettableresource' for all attributes/methods of the form
884 'render_{method}' ('render_GET', for example) and return a list of
885 the methods. 'HEAD' will always be included from the
886 resource.Resource superclass.
888 res = GettableResource()
889 allowedMethods = resource._computeAllowedMethods(res)
890 self.assertEqual(set(allowedMethods),
891 set(['GET', 'HEAD', 'fred_render_ethel']))
894 def test_notAllowed(self):
896 When an unsupported method is requested, the default
897 L{_computeAllowedMethods} method will be called to determine the
898 allowed methods, and the HTTP 405 'Method Not Allowed' status will
899 be returned with the allowed methods will be returned in the
903 req.requestReceived('POST', '/gettableresource', 'HTTP/1.0')
904 self.assertEqual(req.code, 405)
906 set(req.responseHeaders.getRawHeaders('allow')[0].split(", ")),
907 set(['GET', 'HEAD','fred_render_ethel'])
911 def test_notAllowedQuoting(self):
913 When an unsupported method response is generated, an HTML message will
914 be displayed. That message should include a quoted form of the URI and,
915 since that value come from a browser and shouldn't necessarily be
919 req.requestReceived('POST', '/gettableresource?'
920 'value=<script>bad', 'HTTP/1.0')
921 self.assertEqual(req.code, 405)
922 renderedPage = req.transport.getvalue()
923 self.assertNotIn("<script>bad", renderedPage)
924 self.assertIn('<script>bad', renderedPage)
927 def test_notImplementedQuoting(self):
929 When an not-implemented method response is generated, an HTML message
930 will be displayed. That message should include a quoted form of the
931 requested method, since that value come from a browser and shouldn't
932 necessarily be trusted.
935 req.requestReceived('<style>bad', '/gettableresource', 'HTTP/1.0')
936 self.assertEqual(req.code, 501)
937 renderedPage = req.transport.getvalue()
938 self.assertNotIn("<style>bad", renderedPage)
939 self.assertIn('<style>bad', renderedPage)
943 class SDResource(resource.Resource):
944 def __init__(self,default):
945 self.default = default
948 def getChildWithDefault(self, name, request):
949 d = defer.succeed(self.default)
950 resource = util.DeferredResource(d)
951 return resource.getChildWithDefault(name, request)
955 class DeferredResourceTests(unittest.TestCase):
957 Tests for L{DeferredResource}.
960 def testDeferredResource(self):
961 r = resource.Resource()
964 d = DummyRequest(['foo', 'bar', 'baz'])
965 resource.getChildForRequest(s, d)
966 self.assertEqual(d.postpath, ['bar', 'baz'])
969 def test_render(self):
971 L{DeferredResource} uses the request object's C{render} method to
972 render the resource which is the result of the L{Deferred} being
976 request = DummyRequest([])
977 request.render = rendered.append
979 result = resource.Resource()
980 deferredResource = util.DeferredResource(defer.succeed(result))
981 deferredResource.render(request)
982 self.assertEqual(rendered, [result])
986 class DummyRequestForLogTest(DummyRequest):
987 uri = '/dummy' # parent class uri has "http://", which doesn't really happen
990 clientproto = 'HTTP/1.0'
992 client = IPv4Address('TCP', '1.2.3.4', 12345)
996 class TestLogEscaping(unittest.TestCase):
998 self.site = http.HTTPFactory()
999 self.site.logFile = StringIO()
1000 self.request = DummyRequestForLogTest(self.site, False)
1002 def testSimple(self):
1003 self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
1004 25, 'Oct', 2004, 12, 31, 59)
1005 self.site.log(self.request)
1006 self.site.logFile.seek(0)
1008 self.site.logFile.read(),
1009 '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HTTP/1.0" 123 - "-" "-"\n')
1011 def testMethodQuote(self):
1012 self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
1013 25, 'Oct', 2004, 12, 31, 59)
1014 self.request.method = 'G"T'
1015 self.site.log(self.request)
1016 self.site.logFile.seek(0)
1018 self.site.logFile.read(),
1019 '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "G\\"T /dummy HTTP/1.0" 123 - "-" "-"\n')
1021 def testRequestQuote(self):
1022 self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
1023 25, 'Oct', 2004, 12, 31, 59)
1024 self.request.uri='/dummy"withquote'
1025 self.site.log(self.request)
1026 self.site.logFile.seek(0)
1028 self.site.logFile.read(),
1029 '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy\\"withquote HTTP/1.0" 123 - "-" "-"\n')
1031 def testProtoQuote(self):
1032 self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
1033 25, 'Oct', 2004, 12, 31, 59)
1034 self.request.clientproto='HT"P/1.0'
1035 self.site.log(self.request)
1036 self.site.logFile.seek(0)
1038 self.site.logFile.read(),
1039 '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HT\\"P/1.0" 123 - "-" "-"\n')
1041 def testRefererQuote(self):
1042 self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
1043 25, 'Oct', 2004, 12, 31, 59)
1044 self.request.headers['referer'] = 'http://malicious" ".website.invalid'
1045 self.site.log(self.request)
1046 self.site.logFile.seek(0)
1048 self.site.logFile.read(),
1049 '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HTTP/1.0" 123 - "http://malicious\\" \\".website.invalid" "-"\n')
1051 def testUserAgentQuote(self):
1052 self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
1053 25, 'Oct', 2004, 12, 31, 59)
1054 self.request.headers['user-agent'] = 'Malicious Web" Evil'
1055 self.site.log(self.request)
1056 self.site.logFile.seek(0)
1058 self.site.logFile.read(),
1059 '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HTTP/1.0" 123 - "-" "Malicious Web\\" Evil"\n')
1063 class ServerAttributesTestCase(unittest.TestCase):
1065 Tests that deprecated twisted.web.server attributes raise the appropriate
1066 deprecation warnings when used.
1069 def test_deprecatedAttributeDateTimeString(self):
1071 twisted.web.server.date_time_string should not be used; instead use
1072 twisted.web.http.datetimeToString directly
1074 deprecated_func = server.date_time_string
1075 warnings = self.flushWarnings(
1076 offendingFunctions=[self.test_deprecatedAttributeDateTimeString])
1078 self.assertEqual(len(warnings), 1)
1079 self.assertEqual(warnings[0]['category'], DeprecationWarning)
1081 warnings[0]['message'],
1082 ("twisted.web.server.date_time_string was deprecated in Twisted "
1083 "12.1.0: Please use twisted.web.http.datetimeToString instead"))
1086 def test_deprecatedAttributeStringDateTime(self):
1088 twisted.web.server.string_date_time should not be used; instead use
1089 twisted.web.http.stringToDatetime directly
1091 deprecated_func = server.string_date_time
1092 warnings = self.flushWarnings(
1093 offendingFunctions=[self.test_deprecatedAttributeStringDateTime])
1095 self.assertEqual(len(warnings), 1)
1096 self.assertEqual(warnings[0]['category'], DeprecationWarning)
1098 warnings[0]['message'],
1099 ("twisted.web.server.string_date_time was deprecated in Twisted "
1100 "12.1.0: Please use twisted.web.http.stringToDatetime instead"))