Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / web / test / test_web.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for various parts of L{twisted.web}.
6 """
7
8 from cStringIO import StringIO
9
10 from zope.interface import implements
11 from zope.interface.verify import verifyObject
12
13 from twisted.trial import unittest
14 from twisted.internet import reactor
15 from twisted.internet.address import IPv4Address
16 from twisted.internet.defer import Deferred
17 from twisted.web import server, resource, util
18 from twisted.internet import defer, interfaces, task
19 from twisted.web import iweb, http, http_headers, error
20 from twisted.python import log
21
22
23 class DummyRequest:
24     """
25     Represents a dummy or fake request.
26
27     @ivar _finishedDeferreds: C{None} or a C{list} of L{Deferreds} which will
28         be called back with C{None} when C{finish} is called or which will be
29         errbacked if C{processingFailed} is called.
30
31     @type headers: C{dict}
32     @ivar headers: A mapping of header name to header value for all request
33         headers.
34
35     @type outgoingHeaders: C{dict}
36     @ivar outgoingHeaders: A mapping of header name to header value for all
37         response headers.
38
39     @type responseCode: C{int}
40     @ivar responseCode: The response code which was passed to
41         C{setResponseCode}.
42
43     @type written: C{list} of C{str}
44     @ivar written: The bytes which have been written to the request.
45     """
46     uri = 'http://dummy/'
47     method = 'GET'
48     client = None
49
50     def registerProducer(self, prod,s):
51         self.go = 1
52         while self.go:
53             prod.resumeProducing()
54
55     def unregisterProducer(self):
56         self.go = 0
57
58
59     def __init__(self, postpath, session=None):
60         self.sitepath = []
61         self.written = []
62         self.finished = 0
63         self.postpath = postpath
64         self.prepath = []
65         self.session = None
66         self.protoSession = session or server.Session(0, self)
67         self.args = {}
68         self.outgoingHeaders = {}
69         self.responseHeaders = http_headers.Headers()
70         self.responseCode = None
71         self.headers = {}
72         self._finishedDeferreds = []
73
74
75     def getHeader(self, name):
76         """
77         Retrieve the value of a request header.
78
79         @type name: C{str}
80         @param name: The name of the request header for which to retrieve the
81             value.  Header names are compared case-insensitively.
82
83         @rtype: C{str} or L{NoneType}
84         @return: The value of the specified request header.
85         """
86         return self.headers.get(name.lower(), None)
87
88
89     def setHeader(self, name, value):
90         """TODO: make this assert on write() if the header is content-length
91         """
92         self.outgoingHeaders[name.lower()] = value
93
94     def getSession(self):
95         if self.session:
96             return self.session
97         assert not self.written, "Session cannot be requested after data has been written."
98         self.session = self.protoSession
99         return self.session
100
101
102     def render(self, resource):
103         """
104         Render the given resource as a response to this request.
105
106         This implementation only handles a few of the most common behaviors of
107         resources.  It can handle a render method that returns a string or
108         C{NOT_DONE_YET}.  It doesn't know anything about the semantics of
109         request methods (eg HEAD) nor how to set any particular headers.
110         Basically, it's largely broken, but sufficient for some tests at least.
111         It should B{not} be expanded to do all the same stuff L{Request} does.
112         Instead, L{DummyRequest} should be phased out and L{Request} (or some
113         other real code factored in a different way) used.
114         """
115         result = resource.render(self)
116         if result is server.NOT_DONE_YET:
117             return
118         self.write(result)
119         self.finish()
120
121
122     def write(self, data):
123         self.written.append(data)
124
125     def notifyFinish(self):
126         """
127         Return a L{Deferred} which is called back with C{None} when the request
128         is finished.  This will probably only work if you haven't called
129         C{finish} yet.
130         """
131         finished = Deferred()
132         self._finishedDeferreds.append(finished)
133         return finished
134
135
136     def finish(self):
137         """
138         Record that the request is finished and callback and L{Deferred}s
139         waiting for notification of this.
140         """
141         self.finished = self.finished + 1
142         if self._finishedDeferreds is not None:
143             observers = self._finishedDeferreds
144             self._finishedDeferreds = None
145             for obs in observers:
146                 obs.callback(None)
147
148
149     def processingFailed(self, reason):
150         """
151         Errback and L{Deferreds} waiting for finish notification.
152         """
153         if self._finishedDeferreds is not None:
154             observers = self._finishedDeferreds
155             self._finishedDeferreds = None
156             for obs in observers:
157                 obs.errback(reason)
158
159
160     def addArg(self, name, value):
161         self.args[name] = [value]
162
163
164     def setResponseCode(self, code, message=None):
165         """
166         Set the HTTP status response code, but takes care that this is called
167         before any data is written.
168         """
169         assert not self.written, "Response code cannot be set after data has been written: %s." % "@@@@".join(self.written)
170         self.responseCode = code
171         self.responseMessage = message
172
173
174     def setLastModified(self, when):
175         assert not self.written, "Last-Modified cannot be set after data has been written: %s." % "@@@@".join(self.written)
176
177
178     def setETag(self, tag):
179         assert not self.written, "ETag cannot be set after data has been written: %s." % "@@@@".join(self.written)
180
181
182     def getClientIP(self):
183         """
184         Return the IPv4 address of the client which made this request, if there
185         is one, otherwise C{None}.
186         """
187         if isinstance(self.client, IPv4Address):
188             return self.client.host
189         return None
190
191
192 class ResourceTestCase(unittest.TestCase):
193     def testListEntities(self):
194         r = resource.Resource()
195         self.assertEqual([], r.listEntities())
196
197
198 class SimpleResource(resource.Resource):
199     """
200     @ivar _contentType: C{None} or a C{str} giving the value of the
201         I{Content-Type} header in the response this resource will render.  If it
202         is C{None}, no I{Content-Type} header will be set in the response.
203     """
204     def __init__(self, contentType=None):
205         resource.Resource.__init__(self)
206         self._contentType = contentType
207
208
209     def render(self, request):
210         if self._contentType is not None:
211             request.responseHeaders.setRawHeaders(
212                 "content-type", [self._contentType])
213
214         if http.CACHED in (request.setLastModified(10),
215                            request.setETag('MatchingTag')):
216             return ''
217         else:
218             return "correct"
219
220
221 class DummyChannel:
222     class TCP:
223         port = 80
224         disconnected = False
225
226         def __init__(self):
227             self.written = StringIO()
228             self.producers = []
229
230         def getPeer(self):
231             return IPv4Address("TCP", '192.168.1.1', 12344)
232
233         def write(self, bytes):
234             assert isinstance(bytes, str)
235             self.written.write(bytes)
236
237         def writeSequence(self, iovec):
238             map(self.write, iovec)
239
240         def getHost(self):
241             return IPv4Address("TCP", '10.0.0.1', self.port)
242
243         def registerProducer(self, producer, streaming):
244             self.producers.append((producer, streaming))
245
246         def loseConnection(self):
247             self.disconnected = True
248
249
250     class SSL(TCP):
251         implements(interfaces.ISSLTransport)
252
253     site = server.Site(resource.Resource())
254
255     def __init__(self):
256         self.transport = self.TCP()
257
258
259     def requestDone(self, request):
260         pass
261
262
263
264 class SiteTest(unittest.TestCase):
265     def test_simplestSite(self):
266         """
267         L{Site.getResourceFor} returns the C{""} child of the root resource it
268         is constructed with when processing a request for I{/}.
269         """
270         sres1 = SimpleResource()
271         sres2 = SimpleResource()
272         sres1.putChild("",sres2)
273         site = server.Site(sres1)
274         self.assertIdentical(
275             site.getResourceFor(DummyRequest([''])),
276             sres2, "Got the wrong resource.")
277
278
279
280 class SessionTest(unittest.TestCase):
281     """
282     Tests for L{server.Session}.
283     """
284     def setUp(self):
285         """
286         Create a site with one active session using a deterministic, easily
287         controlled clock.
288         """
289         self.clock = task.Clock()
290         self.uid = 'unique'
291         self.site = server.Site(resource.Resource())
292         self.session = server.Session(self.site, self.uid, self.clock)
293         self.site.sessions[self.uid] = self.session
294
295
296     def test_defaultReactor(self):
297         """
298         If not value is passed to L{server.Session.__init__}, the global
299         reactor is used.
300         """
301         session = server.Session(server.Site(resource.Resource()), '123')
302         self.assertIdentical(session._reactor, reactor)
303
304
305     def test_startCheckingExpiration(self):
306         """
307         L{server.Session.startCheckingExpiration} causes the session to expire
308         after L{server.Session.sessionTimeout} seconds without activity.
309         """
310         self.session.startCheckingExpiration()
311
312         # Advance to almost the timeout - nothing should happen.
313         self.clock.advance(self.session.sessionTimeout - 1)
314         self.assertIn(self.uid, self.site.sessions)
315
316         # Advance to the timeout, the session should expire.
317         self.clock.advance(1)
318         self.assertNotIn(self.uid, self.site.sessions)
319
320         # There should be no calls left over, either.
321         self.assertFalse(self.clock.calls)
322
323
324     def test_expire(self):
325         """
326         L{server.Session.expire} expires the session.
327         """
328         self.session.expire()
329         # It should be gone from the session dictionary.
330         self.assertNotIn(self.uid, self.site.sessions)
331         # And there should be no pending delayed calls.
332         self.assertFalse(self.clock.calls)
333
334
335     def test_expireWhileChecking(self):
336         """
337         L{server.Session.expire} expires the session even if the timeout call
338         isn't due yet.
339         """
340         self.session.startCheckingExpiration()
341         self.test_expire()
342
343
344     def test_notifyOnExpire(self):
345         """
346         A function registered with L{server.Session.notifyOnExpire} is called
347         when the session expires.
348         """
349         callbackRan = [False]
350         def expired():
351             callbackRan[0] = True
352         self.session.notifyOnExpire(expired)
353         self.session.expire()
354         self.assertTrue(callbackRan[0])
355
356
357     def test_touch(self):
358         """
359         L{server.Session.touch} updates L{server.Session.lastModified} and
360         delays session timeout.
361         """
362         # Make sure it works before startCheckingExpiration
363         self.clock.advance(3)
364         self.session.touch()
365         self.assertEqual(self.session.lastModified, 3)
366
367         # And after startCheckingExpiration
368         self.session.startCheckingExpiration()
369         self.clock.advance(self.session.sessionTimeout - 1)
370         self.session.touch()
371         self.clock.advance(self.session.sessionTimeout - 1)
372         self.assertIn(self.uid, self.site.sessions)
373
374         # It should have advanced it by just sessionTimeout, no more.
375         self.clock.advance(1)
376         self.assertNotIn(self.uid, self.site.sessions)
377
378
379     def test_startCheckingExpirationParameterDeprecated(self):
380         """
381         L{server.Session.startCheckingExpiration} emits a deprecation warning
382         if it is invoked with a parameter.
383         """
384         self.session.startCheckingExpiration(123)
385         warnings = self.flushWarnings([
386                 self.test_startCheckingExpirationParameterDeprecated])
387         self.assertEqual(len(warnings), 1)
388         self.assertEqual(warnings[0]['category'], DeprecationWarning)
389         self.assertEqual(
390             warnings[0]['message'],
391             "The lifetime parameter to startCheckingExpiration is deprecated "
392             "since Twisted 9.0.  See Session.sessionTimeout instead.")
393
394
395     def test_checkExpiredDeprecated(self):
396         """
397         L{server.Session.checkExpired} is deprecated.
398         """
399         self.session.checkExpired()
400         warnings = self.flushWarnings([self.test_checkExpiredDeprecated])
401         self.assertEqual(warnings[0]['category'], DeprecationWarning)
402         self.assertEqual(
403             warnings[0]['message'],
404             "Session.checkExpired is deprecated since Twisted 9.0; sessions "
405             "check themselves now, you don't need to.")
406         self.assertEqual(len(warnings), 1)
407
408
409 # Conditional requests:
410 # If-None-Match, If-Modified-Since
411
412 # make conditional request:
413 #   normal response if condition succeeds
414 #   if condition fails:
415 #      response code
416 #      no body
417
418 def httpBody(whole):
419     return whole.split('\r\n\r\n', 1)[1]
420
421 def httpHeader(whole, key):
422     key = key.lower()
423     headers = whole.split('\r\n\r\n', 1)[0]
424     for header in headers.split('\r\n'):
425         if header.lower().startswith(key):
426             return header.split(':', 1)[1].strip()
427     return None
428
429 def httpCode(whole):
430     l1 = whole.split('\r\n', 1)[0]
431     return int(l1.split()[1])
432
433 class ConditionalTest(unittest.TestCase):
434     """
435     web.server's handling of conditional requests for cache validation.
436     """
437     def setUp(self):
438         self.resrc = SimpleResource()
439         self.resrc.putChild('', self.resrc)
440         self.resrc.putChild('with-content-type', SimpleResource('image/jpeg'))
441         self.site = server.Site(self.resrc)
442         self.site.logFile = log.logfile
443
444         # HELLLLLLLLLLP!  This harness is Very Ugly.
445         self.channel = self.site.buildProtocol(None)
446         self.transport = http.StringTransport()
447         self.transport.close = lambda *a, **kw: None
448         self.transport.disconnecting = lambda *a, **kw: 0
449         self.transport.getPeer = lambda *a, **kw: "peer"
450         self.transport.getHost = lambda *a, **kw: "host"
451         self.channel.makeConnection(self.transport)
452
453
454     def tearDown(self):
455         self.channel.connectionLost(None)
456
457
458     def _modifiedTest(self, modifiedSince=None, etag=None):
459         """
460         Given the value C{modifiedSince} for the I{If-Modified-Since} header or
461         the value C{etag} for the I{If-Not-Match} header, verify that a response
462         with a 200 code, a default Content-Type, and the resource as the body is
463         returned.
464         """
465         if modifiedSince is not None:
466             validator = "If-Modified-Since: " + modifiedSince
467         else:
468             validator = "If-Not-Match: " + etag
469         for line in ["GET / HTTP/1.1", validator, ""]:
470             self.channel.lineReceived(line)
471         result = self.transport.getvalue()
472         self.assertEqual(httpCode(result), http.OK)
473         self.assertEqual(httpBody(result), "correct")
474         self.assertEqual(httpHeader(result, "Content-Type"), "text/html")
475
476
477     def test_modified(self):
478         """
479         If a request is made with an I{If-Modified-Since} header value with
480         a timestamp indicating a time before the last modification of the
481         requested resource, a 200 response is returned along with a response
482         body containing the resource.
483         """
484         self._modifiedTest(modifiedSince=http.datetimeToString(1))
485
486
487     def test_unmodified(self):
488         """
489         If a request is made with an I{If-Modified-Since} header value with a
490         timestamp indicating a time after the last modification of the request
491         resource, a 304 response is returned along with an empty response body
492         and no Content-Type header if the application does not set one.
493         """
494         for line in ["GET / HTTP/1.1",
495                      "If-Modified-Since: " + http.datetimeToString(100), ""]:
496             self.channel.lineReceived(line)
497         result = self.transport.getvalue()
498         self.assertEqual(httpCode(result), http.NOT_MODIFIED)
499         self.assertEqual(httpBody(result), "")
500         # Since there SHOULD NOT (RFC 2616, section 10.3.5) be any
501         # entity-headers, the Content-Type is not set if the application does
502         # not explicitly set it.
503         self.assertEqual(httpHeader(result, "Content-Type"), None)
504
505
506     def test_invalidTimestamp(self):
507         """
508         If a request is made with an I{If-Modified-Since} header value which
509         cannot be parsed, the header is treated as not having been present
510         and a normal 200 response is returned with a response body
511         containing the resource.
512         """
513         self._modifiedTest(modifiedSince="like, maybe a week ago, I guess?")
514
515
516     def test_invalidTimestampYear(self):
517         """
518         If a request is made with an I{If-Modified-Since} header value which
519         contains a string in the year position which is not an integer, the
520         header is treated as not having been present and a normal 200
521         response is returned with a response body containing the resource.
522         """
523         self._modifiedTest(modifiedSince="Thu, 01 Jan blah 00:00:10 GMT")
524
525
526     def test_invalidTimestampTooLongAgo(self):
527         """
528         If a request is made with an I{If-Modified-Since} header value which
529         contains a year before the epoch, the header is treated as not
530         having been present and a normal 200 response is returned with a
531         response body containing the resource.
532         """
533         self._modifiedTest(modifiedSince="Thu, 01 Jan 1899 00:00:10 GMT")
534
535
536     def test_invalidTimestampMonth(self):
537         """
538         If a request is made with an I{If-Modified-Since} header value which
539         contains a string in the month position which is not a recognized
540         month abbreviation, the header is treated as not having been present
541         and a normal 200 response is returned with a response body
542         containing the resource.
543         """
544         self._modifiedTest(modifiedSince="Thu, 01 Blah 1970 00:00:10 GMT")
545
546
547     def test_etagMatchedNot(self):
548         """
549         If a request is made with an I{If-None-Match} ETag which does not match
550         the current ETag of the requested resource, the header is treated as not
551         having been present and a normal 200 response is returned with a
552         response body containing the resource.
553         """
554         self._modifiedTest(etag="unmatchedTag")
555
556
557     def test_etagMatched(self):
558         """
559         If a request is made with an I{If-None-Match} ETag which does match the
560         current ETag of the requested resource, a 304 response is returned along
561         with an empty response body.
562         """
563         for line in ["GET / HTTP/1.1", "If-None-Match: MatchingTag", ""]:
564             self.channel.lineReceived(line)
565         result = self.transport.getvalue()
566         self.assertEqual(httpHeader(result, "ETag"), "MatchingTag")
567         self.assertEqual(httpCode(result), http.NOT_MODIFIED)
568         self.assertEqual(httpBody(result), "")
569
570
571     def test_unmodifiedWithContentType(self):
572         """
573         Similar to L{test_etagMatched}, but the response should include a
574         I{Content-Type} header if the application explicitly sets one.
575
576         This I{Content-Type} header SHOULD NOT be present according to RFC 2616,
577         section 10.3.5.  It will only be present if the application explicitly
578         sets it.
579         """
580         for line in ["GET /with-content-type HTTP/1.1",
581                      "If-None-Match: MatchingTag", ""]:
582             self.channel.lineReceived(line)
583         result = self.transport.getvalue()
584         self.assertEqual(httpCode(result), http.NOT_MODIFIED)
585         self.assertEqual(httpBody(result), "")
586         self.assertEqual(httpHeader(result, "Content-Type"), "image/jpeg")
587
588
589
590
591 from twisted.web import google
592 class GoogleTestCase(unittest.TestCase):
593     def testCheckGoogle(self):
594         raise unittest.SkipTest("no violation of google ToS")
595         d = google.checkGoogle('site:www.twistedmatrix.com twisted')
596         d.addCallback(self.assertEqual, 'http://twistedmatrix.com/')
597         return d
598
599
600     def test_deprecated(self):
601         """
602         Google module is deprecated since Twisted 11.1.0
603         """
604         from twisted.web import google
605         warnings = self.flushWarnings(offendingFunctions=[self.test_deprecated])
606         self.assertEqual(len(warnings), 1)
607         self.assertEqual(warnings[0]['category'], DeprecationWarning)
608
609
610
611 class RequestTests(unittest.TestCase):
612     """
613     Tests for the HTTP request class, L{server.Request}.
614     """
615
616     def test_interface(self):
617         """
618         L{server.Request} instances provide L{iweb.IRequest}.
619         """
620         self.assertTrue(
621             verifyObject(iweb.IRequest, server.Request(DummyChannel(), True)))
622
623
624     def testChildLink(self):
625         request = server.Request(DummyChannel(), 1)
626         request.gotLength(0)
627         request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
628         self.assertEqual(request.childLink('baz'), 'bar/baz')
629         request = server.Request(DummyChannel(), 1)
630         request.gotLength(0)
631         request.requestReceived('GET', '/foo/bar/', 'HTTP/1.0')
632         self.assertEqual(request.childLink('baz'), 'baz')
633
634     def testPrePathURLSimple(self):
635         request = server.Request(DummyChannel(), 1)
636         request.gotLength(0)
637         request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
638         request.setHost('example.com', 80)
639         self.assertEqual(request.prePathURL(), 'http://example.com/foo/bar')
640
641     def testPrePathURLNonDefault(self):
642         d = DummyChannel()
643         d.transport.port = 81
644         request = server.Request(d, 1)
645         request.setHost('example.com', 81)
646         request.gotLength(0)
647         request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
648         self.assertEqual(request.prePathURL(), 'http://example.com:81/foo/bar')
649
650     def testPrePathURLSSLPort(self):
651         d = DummyChannel()
652         d.transport.port = 443
653         request = server.Request(d, 1)
654         request.setHost('example.com', 443)
655         request.gotLength(0)
656         request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
657         self.assertEqual(request.prePathURL(), 'http://example.com:443/foo/bar')
658
659     def testPrePathURLSSLPortAndSSL(self):
660         d = DummyChannel()
661         d.transport = DummyChannel.SSL()
662         d.transport.port = 443
663         request = server.Request(d, 1)
664         request.setHost('example.com', 443)
665         request.gotLength(0)
666         request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
667         self.assertEqual(request.prePathURL(), 'https://example.com/foo/bar')
668
669     def testPrePathURLHTTPPortAndSSL(self):
670         d = DummyChannel()
671         d.transport = DummyChannel.SSL()
672         d.transport.port = 80
673         request = server.Request(d, 1)
674         request.setHost('example.com', 80)
675         request.gotLength(0)
676         request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
677         self.assertEqual(request.prePathURL(), 'https://example.com:80/foo/bar')
678
679     def testPrePathURLSSLNonDefault(self):
680         d = DummyChannel()
681         d.transport = DummyChannel.SSL()
682         d.transport.port = 81
683         request = server.Request(d, 1)
684         request.setHost('example.com', 81)
685         request.gotLength(0)
686         request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
687         self.assertEqual(request.prePathURL(), 'https://example.com:81/foo/bar')
688
689     def testPrePathURLSetSSLHost(self):
690         d = DummyChannel()
691         d.transport.port = 81
692         request = server.Request(d, 1)
693         request.setHost('foo.com', 81, 1)
694         request.gotLength(0)
695         request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
696         self.assertEqual(request.prePathURL(), 'https://foo.com:81/foo/bar')
697
698
699     def test_prePathURLQuoting(self):
700         """
701         L{Request.prePathURL} quotes special characters in the URL segments to
702         preserve the original meaning.
703         """
704         d = DummyChannel()
705         request = server.Request(d, 1)
706         request.setHost('example.com', 80)
707         request.gotLength(0)
708         request.requestReceived('GET', '/foo%2Fbar', 'HTTP/1.0')
709         self.assertEqual(request.prePathURL(), 'http://example.com/foo%2Fbar')
710
711
712
713 class RootResource(resource.Resource):
714     isLeaf=0
715     def getChildWithDefault(self, name, request):
716         request.rememberRootURL()
717         return resource.Resource.getChildWithDefault(self, name, request)
718     def render(self, request):
719         return ''
720
721 class RememberURLTest(unittest.TestCase):
722     def createServer(self, r):
723         chan = DummyChannel()
724         chan.site = server.Site(r)
725         return chan
726
727     def testSimple(self):
728         r = resource.Resource()
729         r.isLeaf=0
730         rr = RootResource()
731         r.putChild('foo', rr)
732         rr.putChild('', rr)
733         rr.putChild('bar', resource.Resource())
734         chan = self.createServer(r)
735         for url in ['/foo/', '/foo/bar', '/foo/bar/baz', '/foo/bar/']:
736             request = server.Request(chan, 1)
737             request.setHost('example.com', 81)
738             request.gotLength(0)
739             request.requestReceived('GET', url, 'HTTP/1.0')
740             self.assertEqual(request.getRootURL(), "http://example.com/foo")
741
742     def testRoot(self):
743         rr = RootResource()
744         rr.putChild('', rr)
745         rr.putChild('bar', resource.Resource())
746         chan = self.createServer(rr)
747         for url in ['/', '/bar', '/bar/baz', '/bar/']:
748             request = server.Request(chan, 1)
749             request.setHost('example.com', 81)
750             request.gotLength(0)
751             request.requestReceived('GET', url, 'HTTP/1.0')
752             self.assertEqual(request.getRootURL(), "http://example.com/")
753
754
755 class NewRenderResource(resource.Resource):
756     def render_GET(self, request):
757         return "hi hi"
758
759     def render_HEH(self, request):
760         return "ho ho"
761
762
763
764 class HeadlessResource(object):
765     """
766     A resource that implements GET but not HEAD.
767     """
768     implements(resource.IResource)
769
770     allowedMethods = ["GET"]
771
772     def render(self, request):
773         """
774         Leave the request open for future writes.
775         """
776         self.request = request
777         if request.method not in self.allowedMethods:
778             raise error.UnsupportedMethod(self.allowedMethods)
779         self.request.write("some data")
780         return server.NOT_DONE_YET
781
782
783
784
785 class NewRenderTestCase(unittest.TestCase):
786     """
787     Tests for L{server.Request.render}.
788     """
789     def _getReq(self, resource=None):
790         """
791         Create a request object with a stub channel and install the
792         passed resource at /newrender. If no resource is passed,
793         create one.
794         """
795         d = DummyChannel()
796         if resource is None:
797             resource = NewRenderResource()
798         d.site.resource.putChild('newrender', resource)
799         d.transport.port = 81
800         request = server.Request(d, 1)
801         request.setHost('example.com', 81)
802         request.gotLength(0)
803         return request
804
805     def testGoodMethods(self):
806         req = self._getReq()
807         req.requestReceived('GET', '/newrender', 'HTTP/1.0')
808         self.assertEqual(req.transport.getvalue().splitlines()[-1], 'hi hi')
809
810         req = self._getReq()
811         req.requestReceived('HEH', '/newrender', 'HTTP/1.0')
812         self.assertEqual(req.transport.getvalue().splitlines()[-1], 'ho ho')
813
814     def testBadMethods(self):
815         req = self._getReq()
816         req.requestReceived('CONNECT', '/newrender', 'HTTP/1.0')
817         self.assertEqual(req.code, 501)
818
819         req = self._getReq()
820         req.requestReceived('hlalauguG', '/newrender', 'HTTP/1.0')
821         self.assertEqual(req.code, 501)
822
823     def testImplicitHead(self):
824         req = self._getReq()
825         req.requestReceived('HEAD', '/newrender', 'HTTP/1.0')
826         self.assertEqual(req.code, 200)
827         self.assertEqual(-1, req.transport.getvalue().find('hi hi'))
828
829
830     def test_unsupportedHead(self):
831         """
832         HEAD requests against resource that only claim support for GET
833         should not include a body in the response.
834         """
835         resource = HeadlessResource()
836         req = self._getReq(resource)
837         req.requestReceived("HEAD", "/newrender", "HTTP/1.0")
838         headers, body = req.transport.getvalue().split('\r\n\r\n')
839         self.assertEqual(req.code, 200)
840         self.assertEqual(body, '')
841
842
843
844 class GettableResource(resource.Resource):
845     """
846     Used by AllowedMethodsTest to simulate an allowed method.
847     """
848     def render_GET(self):
849         pass
850
851     def render_fred_render_ethel(self):
852         """
853         The unusual method name is designed to test the culling method
854         in C{twisted.web.resource._computeAllowedMethods}.
855         """
856         pass
857
858
859
860 class AllowedMethodsTest(unittest.TestCase):
861     """
862     'C{twisted.web.resource._computeAllowedMethods} is provided by a
863     default should the subclass not provide the method.
864     """
865
866
867     def _getReq(self):
868         """
869         Generate a dummy request for use by C{_computeAllowedMethod} tests.
870         """
871         d = DummyChannel()
872         d.site.resource.putChild('gettableresource', GettableResource())
873         d.transport.port = 81
874         request = server.Request(d, 1)
875         request.setHost('example.com', 81)
876         request.gotLength(0)
877         return request
878
879
880     def test_computeAllowedMethods(self):
881         """
882         C{_computeAllowedMethods} will search through the
883         'gettableresource' for all attributes/methods of the form
884         'render_{method}' ('render_GET', for example) and return a list of
885         the methods. 'HEAD' will always be included from the
886         resource.Resource superclass.
887         """
888         res = GettableResource()
889         allowedMethods = resource._computeAllowedMethods(res)
890         self.assertEqual(set(allowedMethods),
891                           set(['GET', 'HEAD', 'fred_render_ethel'])) 
892
893
894     def test_notAllowed(self):
895         """
896         When an unsupported method is requested, the default
897         L{_computeAllowedMethods} method will be called to determine the
898         allowed methods, and the HTTP 405 'Method Not Allowed' status will
899         be returned with the allowed methods will be returned in the
900         'Allow' header.
901         """
902         req = self._getReq()
903         req.requestReceived('POST', '/gettableresource', 'HTTP/1.0')
904         self.assertEqual(req.code, 405)
905         self.assertEqual(
906             set(req.responseHeaders.getRawHeaders('allow')[0].split(", ")),
907             set(['GET', 'HEAD','fred_render_ethel'])
908         )
909
910
911     def test_notAllowedQuoting(self):
912         """
913         When an unsupported method response is generated, an HTML message will
914         be displayed.  That message should include a quoted form of the URI and,
915         since that value come from a browser and shouldn't necessarily be
916         trusted.
917         """
918         req = self._getReq()
919         req.requestReceived('POST', '/gettableresource?'
920                             'value=<script>bad', 'HTTP/1.0')
921         self.assertEqual(req.code, 405)
922         renderedPage = req.transport.getvalue()
923         self.assertNotIn("<script>bad", renderedPage)
924         self.assertIn('&lt;script&gt;bad', renderedPage)
925
926
927     def test_notImplementedQuoting(self):
928         """
929         When an not-implemented method response is generated, an HTML message
930         will be displayed.  That message should include a quoted form of the
931         requested method, since that value come from a browser and shouldn't
932         necessarily be trusted.
933         """
934         req = self._getReq()
935         req.requestReceived('<style>bad', '/gettableresource', 'HTTP/1.0')
936         self.assertEqual(req.code, 501)
937         renderedPage = req.transport.getvalue()
938         self.assertNotIn("<style>bad", renderedPage)
939         self.assertIn('&lt;style&gt;bad', renderedPage)
940
941
942
943 class SDResource(resource.Resource):
944     def __init__(self,default):
945         self.default = default
946
947
948     def getChildWithDefault(self, name, request):
949         d = defer.succeed(self.default)
950         resource = util.DeferredResource(d)
951         return resource.getChildWithDefault(name, request)
952
953
954
955 class DeferredResourceTests(unittest.TestCase):
956     """
957     Tests for L{DeferredResource}.
958     """
959
960     def testDeferredResource(self):
961         r = resource.Resource()
962         r.isLeaf = 1
963         s = SDResource(r)
964         d = DummyRequest(['foo', 'bar', 'baz'])
965         resource.getChildForRequest(s, d)
966         self.assertEqual(d.postpath, ['bar', 'baz'])
967
968
969     def test_render(self):
970         """
971         L{DeferredResource} uses the request object's C{render} method to
972         render the resource which is the result of the L{Deferred} being
973         handled.
974         """
975         rendered = []
976         request = DummyRequest([])
977         request.render = rendered.append
978
979         result = resource.Resource()
980         deferredResource = util.DeferredResource(defer.succeed(result))
981         deferredResource.render(request)
982         self.assertEqual(rendered, [result])
983
984
985
986 class DummyRequestForLogTest(DummyRequest):
987     uri = '/dummy' # parent class uri has "http://", which doesn't really happen
988     code = 123
989
990     clientproto = 'HTTP/1.0'
991     sentLength = None
992     client = IPv4Address('TCP', '1.2.3.4', 12345)
993
994
995
996 class TestLogEscaping(unittest.TestCase):
997     def setUp(self):
998         self.site = http.HTTPFactory()
999         self.site.logFile = StringIO()
1000         self.request = DummyRequestForLogTest(self.site, False)
1001
1002     def testSimple(self):
1003         self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
1004             25, 'Oct', 2004, 12, 31, 59)
1005         self.site.log(self.request)
1006         self.site.logFile.seek(0)
1007         self.assertEqual(
1008             self.site.logFile.read(),
1009             '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HTTP/1.0" 123 - "-" "-"\n')
1010
1011     def testMethodQuote(self):
1012         self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
1013             25, 'Oct', 2004, 12, 31, 59)
1014         self.request.method = 'G"T'
1015         self.site.log(self.request)
1016         self.site.logFile.seek(0)
1017         self.assertEqual(
1018             self.site.logFile.read(),
1019             '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "G\\"T /dummy HTTP/1.0" 123 - "-" "-"\n')
1020
1021     def testRequestQuote(self):
1022         self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
1023             25, 'Oct', 2004, 12, 31, 59)
1024         self.request.uri='/dummy"withquote'
1025         self.site.log(self.request)
1026         self.site.logFile.seek(0)
1027         self.assertEqual(
1028             self.site.logFile.read(),
1029             '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy\\"withquote HTTP/1.0" 123 - "-" "-"\n')
1030
1031     def testProtoQuote(self):
1032         self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
1033             25, 'Oct', 2004, 12, 31, 59)
1034         self.request.clientproto='HT"P/1.0'
1035         self.site.log(self.request)
1036         self.site.logFile.seek(0)
1037         self.assertEqual(
1038             self.site.logFile.read(),
1039             '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HT\\"P/1.0" 123 - "-" "-"\n')
1040
1041     def testRefererQuote(self):
1042         self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
1043             25, 'Oct', 2004, 12, 31, 59)
1044         self.request.headers['referer'] = 'http://malicious" ".website.invalid'
1045         self.site.log(self.request)
1046         self.site.logFile.seek(0)
1047         self.assertEqual(
1048             self.site.logFile.read(),
1049             '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HTTP/1.0" 123 - "http://malicious\\" \\".website.invalid" "-"\n')
1050
1051     def testUserAgentQuote(self):
1052         self.site._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
1053             25, 'Oct', 2004, 12, 31, 59)
1054         self.request.headers['user-agent'] = 'Malicious Web" Evil'
1055         self.site.log(self.request)
1056         self.site.logFile.seek(0)
1057         self.assertEqual(
1058             self.site.logFile.read(),
1059             '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HTTP/1.0" 123 - "-" "Malicious Web\\" Evil"\n')
1060
1061
1062
1063 class ServerAttributesTestCase(unittest.TestCase):
1064     """
1065     Tests that deprecated twisted.web.server attributes raise the appropriate
1066     deprecation warnings when used.
1067     """
1068
1069     def test_deprecatedAttributeDateTimeString(self):
1070         """
1071         twisted.web.server.date_time_string should not be used; instead use
1072         twisted.web.http.datetimeToString directly
1073         """
1074         deprecated_func = server.date_time_string
1075         warnings = self.flushWarnings(
1076             offendingFunctions=[self.test_deprecatedAttributeDateTimeString])
1077
1078         self.assertEqual(len(warnings), 1)
1079         self.assertEqual(warnings[0]['category'], DeprecationWarning)
1080         self.assertEqual(
1081             warnings[0]['message'],
1082             ("twisted.web.server.date_time_string was deprecated in Twisted "
1083              "12.1.0: Please use twisted.web.http.datetimeToString instead"))
1084
1085
1086     def test_deprecatedAttributeStringDateTime(self):
1087         """
1088         twisted.web.server.string_date_time should not be used; instead use
1089         twisted.web.http.stringToDatetime directly
1090         """
1091         deprecated_func = server.string_date_time
1092         warnings = self.flushWarnings(
1093             offendingFunctions=[self.test_deprecatedAttributeStringDateTime])
1094
1095         self.assertEqual(len(warnings), 1)
1096         self.assertEqual(warnings[0]['category'], DeprecationWarning)
1097         self.assertEqual(
1098             warnings[0]['message'],
1099             ("twisted.web.server.string_date_time was deprecated in Twisted "
1100              "12.1.0: Please use twisted.web.http.stringToDatetime instead"))