1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Tests for L{twisted.web.static}.
8 import os, re, StringIO
10 from zope.interface.verify import verifyObject
12 from twisted.internet import abstract, interfaces
13 from twisted.python.compat import set
14 from twisted.python.runtime import platform
15 from twisted.python.filepath import FilePath
16 from twisted.python import log
17 from twisted.trial.unittest import TestCase
18 from twisted.web import static, http, script, resource
19 from twisted.web.server import UnsupportedMethod
20 from twisted.web.test.test_web import DummyRequest
21 from twisted.web.test._util import _render
24 class StaticDataTests(TestCase):
28 def test_headRequest(self):
30 L{Data.render} returns an empty response body for a I{HEAD} request.
32 data = static.Data("foo", "bar")
33 request = DummyRequest([''])
34 request.method = 'HEAD'
35 d = _render(data, request)
36 def cbRendered(ignored):
37 self.assertEqual(''.join(request.written), "")
38 d.addCallback(cbRendered)
42 def test_invalidMethod(self):
44 L{Data.render} raises L{UnsupportedMethod} in response to a non-I{GET},
47 data = static.Data("foo", "bar")
48 request = DummyRequest([''])
49 request.method = 'POST'
50 self.assertRaises(UnsupportedMethod, data.render, request)
54 class StaticFileTests(TestCase):
56 Tests for the basic behavior of L{File}.
58 def _render(self, resource, request):
59 return _render(resource, request)
62 def test_invalidMethod(self):
64 L{File.render} raises L{UnsupportedMethod} in response to a non-I{GET},
67 request = DummyRequest([''])
68 request.method = 'POST'
69 path = FilePath(self.mktemp())
70 path.setContent("foo")
71 file = static.File(path.path)
72 self.assertRaises(UnsupportedMethod, file.render, request)
75 def test_notFound(self):
77 If a request is made which encounters a L{File} before a final segment
78 which does not correspond to any file in the path the L{File} was
79 created with, a not found response is sent.
81 base = FilePath(self.mktemp())
83 file = static.File(base.path)
85 request = DummyRequest(['foobar'])
86 child = resource.getChildForRequest(file, request)
88 d = self._render(child, request)
89 def cbRendered(ignored):
90 self.assertEqual(request.responseCode, 404)
91 d.addCallback(cbRendered)
95 def test_emptyChild(self):
97 The C{''} child of a L{File} which corresponds to a directory in the
98 filesystem is a L{DirectoryLister}.
100 base = FilePath(self.mktemp())
102 file = static.File(base.path)
104 request = DummyRequest([''])
105 child = resource.getChildForRequest(file, request)
106 self.assertIsInstance(child, static.DirectoryLister)
107 self.assertEqual(child.path, base.path)
110 def test_securityViolationNotFound(self):
112 If a request is made which encounters a L{File} before a final segment
113 which cannot be looked up in the filesystem due to security
114 considerations, a not found response is sent.
116 base = FilePath(self.mktemp())
118 file = static.File(base.path)
120 request = DummyRequest(['..'])
121 child = resource.getChildForRequest(file, request)
123 d = self._render(child, request)
124 def cbRendered(ignored):
125 self.assertEqual(request.responseCode, 404)
126 d.addCallback(cbRendered)
130 def test_forbiddenResource(self):
132 If the file in the filesystem which would satisfy a request cannot be
133 read, L{File.render} sets the HTTP response code to I{FORBIDDEN}.
135 base = FilePath(self.mktemp())
137 # Make sure we can delete the file later.
138 self.addCleanup(base.chmod, 0700)
140 # Get rid of our own read permission.
143 file = static.File(base.path)
144 request = DummyRequest([''])
145 d = self._render(file, request)
146 def cbRendered(ignored):
147 self.assertEqual(request.responseCode, 403)
148 d.addCallback(cbRendered)
150 if platform.isWindows():
151 test_forbiddenResource.skip = "Cannot remove read permission on Windows"
154 def test_indexNames(self):
156 If a request is made which encounters a L{File} before a final empty
157 segment, a file in the L{File} instance's C{indexNames} list which
158 exists in the path the L{File} was created with is served as the
159 response to the request.
161 base = FilePath(self.mktemp())
163 base.child("foo.bar").setContent("baz")
164 file = static.File(base.path)
165 file.indexNames = ['foo.bar']
167 request = DummyRequest([''])
168 child = resource.getChildForRequest(file, request)
170 d = self._render(child, request)
171 def cbRendered(ignored):
172 self.assertEqual(''.join(request.written), 'baz')
173 self.assertEqual(request.outgoingHeaders['content-length'], '3')
174 d.addCallback(cbRendered)
178 def test_staticFile(self):
180 If a request is made which encounters a L{File} before a final segment
181 which names a file in the path the L{File} was created with, that file
182 is served as the response to the request.
184 base = FilePath(self.mktemp())
186 base.child("foo.bar").setContent("baz")
187 file = static.File(base.path)
189 request = DummyRequest(['foo.bar'])
190 child = resource.getChildForRequest(file, request)
192 d = self._render(child, request)
193 def cbRendered(ignored):
194 self.assertEqual(''.join(request.written), 'baz')
195 self.assertEqual(request.outgoingHeaders['content-length'], '3')
196 d.addCallback(cbRendered)
200 def test_staticFileDeletedGetChild(self):
202 A L{static.File} created for a directory which does not exist should
203 return childNotFound from L{static.File.getChild}.
205 staticFile = static.File(self.mktemp())
206 request = DummyRequest(['foo.bar'])
207 child = staticFile.getChild("foo.bar", request)
208 self.assertEqual(child, staticFile.childNotFound)
211 def test_staticFileDeletedRender(self):
213 A L{static.File} created for a file which does not exist should render
214 its C{childNotFound} page.
216 staticFile = static.File(self.mktemp())
217 request = DummyRequest(['foo.bar'])
218 request2 = DummyRequest(['foo.bar'])
219 d = self._render(staticFile, request)
220 d2 = self._render(staticFile.childNotFound, request2)
221 def cbRendered2(ignored):
222 def cbRendered(ignored):
223 self.assertEqual(''.join(request.written),
224 ''.join(request2.written))
225 d.addCallback(cbRendered)
227 d2.addCallback(cbRendered2)
231 def test_headRequest(self):
233 L{static.File.render} returns an empty response body for I{HEAD}
236 path = FilePath(self.mktemp())
237 path.setContent("foo")
238 file = static.File(path.path)
239 request = DummyRequest([''])
240 request.method = 'HEAD'
241 d = _render(file, request)
242 def cbRendered(ignored):
243 self.assertEqual("".join(request.written), "")
244 d.addCallback(cbRendered)
248 def test_processors(self):
250 If a request is made which encounters a L{File} before a final segment
251 which names a file with an extension which is in the L{File}'s
252 C{processors} mapping, the processor associated with that extension is
253 used to serve the response to the request.
255 base = FilePath(self.mktemp())
257 base.child("foo.bar").setContent(
258 "from twisted.web.static import Data\n"
259 "resource = Data('dynamic world','text/plain')\n")
261 file = static.File(base.path)
262 file.processors = {'.bar': script.ResourceScript}
263 request = DummyRequest(["foo.bar"])
264 child = resource.getChildForRequest(file, request)
266 d = self._render(child, request)
267 def cbRendered(ignored):
268 self.assertEqual(''.join(request.written), 'dynamic world')
269 self.assertEqual(request.outgoingHeaders['content-length'], '13')
270 d.addCallback(cbRendered)
274 def test_ignoreExt(self):
276 The list of ignored extensions can be set by passing a value to
277 L{File.__init__} or by calling L{File.ignoreExt} later.
279 file = static.File(".")
280 self.assertEqual(file.ignoredExts, [])
281 file.ignoreExt(".foo")
282 file.ignoreExt(".bar")
283 self.assertEqual(file.ignoredExts, [".foo", ".bar"])
285 file = static.File(".", ignoredExts=(".bar", ".baz"))
286 self.assertEqual(file.ignoredExts, [".bar", ".baz"])
289 def test_ignoredExtensionsIgnored(self):
291 A request for the I{base} child of a L{File} succeeds with a resource
292 for the I{base<extension>} file in the path the L{File} was created
293 with if such a file exists and the L{File} has been configured to
294 ignore the I{<extension>} extension.
296 base = FilePath(self.mktemp())
298 base.child('foo.bar').setContent('baz')
299 base.child('foo.quux').setContent('foobar')
300 file = static.File(base.path, ignoredExts=(".bar",))
302 request = DummyRequest(["foo"])
303 child = resource.getChildForRequest(file, request)
305 d = self._render(child, request)
306 def cbRendered(ignored):
307 self.assertEqual(''.join(request.written), 'baz')
308 d.addCallback(cbRendered)
313 class StaticMakeProducerTests(TestCase):
315 Tests for L{File.makeProducer}.
319 def makeResourceWithContent(self, content, type=None, encoding=None):
321 Make a L{static.File} resource that has C{content} for its content.
323 @param content: The bytes to use as the contents of the resource.
324 @param type: Optional value for the content type of the resource.
326 fileName = self.mktemp()
327 fileObject = open(fileName, 'w')
328 fileObject.write(content)
330 resource = static.File(fileName)
331 resource.encoding = encoding
336 def contentHeaders(self, request):
338 Extract the content-* headers from the L{DummyRequest} C{request}.
340 This returns the subset of C{request.outgoingHeaders} of headers that
341 start with 'content-'.
344 for k, v in request.outgoingHeaders.iteritems():
345 if k.startswith('content-'):
346 contentHeaders[k] = v
347 return contentHeaders
350 def test_noRangeHeaderGivesNoRangeStaticProducer(self):
352 makeProducer when no Range header is set returns an instance of
353 NoRangeStaticProducer.
355 resource = self.makeResourceWithContent('')
356 request = DummyRequest([])
357 producer = resource.makeProducer(request, resource.openForReading())
358 self.assertIsInstance(producer, static.NoRangeStaticProducer)
361 def test_noRangeHeaderSets200OK(self):
363 makeProducer when no Range header is set sets the responseCode on the
366 resource = self.makeResourceWithContent('')
367 request = DummyRequest([])
368 resource.makeProducer(request, resource.openForReading())
369 self.assertEqual(http.OK, request.responseCode)
372 def test_noRangeHeaderSetsContentHeaders(self):
374 makeProducer when no Range header is set sets the Content-* headers
378 contentType = "text/plain"
379 contentEncoding = 'gzip'
380 resource = self.makeResourceWithContent(
381 'a'*length, type=contentType, encoding=contentEncoding)
382 request = DummyRequest([])
383 resource.makeProducer(request, resource.openForReading())
385 {'content-type': contentType, 'content-length': str(length),
386 'content-encoding': contentEncoding},
387 self.contentHeaders(request))
390 def test_singleRangeGivesSingleRangeStaticProducer(self):
392 makeProducer when the Range header requests a single byte range
393 returns an instance of SingleRangeStaticProducer.
395 request = DummyRequest([])
396 request.headers['range'] = 'bytes=1-3'
397 resource = self.makeResourceWithContent('abcdef')
398 producer = resource.makeProducer(request, resource.openForReading())
399 self.assertIsInstance(producer, static.SingleRangeStaticProducer)
402 def test_singleRangeSets206PartialContent(self):
404 makeProducer when the Range header requests a single, satisfiable byte
405 range sets the response code on the request to 'Partial Content'.
407 request = DummyRequest([])
408 request.headers['range'] = 'bytes=1-3'
409 resource = self.makeResourceWithContent('abcdef')
410 resource.makeProducer(request, resource.openForReading())
412 http.PARTIAL_CONTENT, request.responseCode)
415 def test_singleRangeSetsContentHeaders(self):
417 makeProducer when the Range header requests a single, satisfiable byte
418 range sets the Content-* headers appropriately.
420 request = DummyRequest([])
421 request.headers['range'] = 'bytes=1-3'
422 contentType = "text/plain"
423 contentEncoding = 'gzip'
424 resource = self.makeResourceWithContent('abcdef', type=contentType, encoding=contentEncoding)
425 resource.makeProducer(request, resource.openForReading())
427 {'content-type': contentType, 'content-encoding': contentEncoding,
428 'content-range': 'bytes 1-3/6', 'content-length': '3'},
429 self.contentHeaders(request))
432 def test_singleUnsatisfiableRangeReturnsSingleRangeStaticProducer(self):
434 makeProducer still returns an instance of L{SingleRangeStaticProducer}
435 when the Range header requests a single unsatisfiable byte range.
437 request = DummyRequest([])
438 request.headers['range'] = 'bytes=4-10'
439 resource = self.makeResourceWithContent('abc')
440 producer = resource.makeProducer(request, resource.openForReading())
441 self.assertIsInstance(producer, static.SingleRangeStaticProducer)
444 def test_singleUnsatisfiableRangeSets416ReqestedRangeNotSatisfiable(self):
446 makeProducer sets the response code of the request to of 'Requested
447 Range Not Satisfiable' when the Range header requests a single
448 unsatisfiable byte range.
450 request = DummyRequest([])
451 request.headers['range'] = 'bytes=4-10'
452 resource = self.makeResourceWithContent('abc')
453 resource.makeProducer(request, resource.openForReading())
455 http.REQUESTED_RANGE_NOT_SATISFIABLE, request.responseCode)
458 def test_singleUnsatisfiableRangeSetsContentHeaders(self):
460 makeProducer when the Range header requests a single, unsatisfiable
461 byte range sets the Content-* headers appropriately.
463 request = DummyRequest([])
464 request.headers['range'] = 'bytes=4-10'
465 contentType = "text/plain"
466 resource = self.makeResourceWithContent('abc', type=contentType)
467 resource.makeProducer(request, resource.openForReading())
469 {'content-type': 'text/plain', 'content-length': '0',
470 'content-range': 'bytes */3'},
471 self.contentHeaders(request))
474 def test_singlePartiallyOverlappingRangeSetsContentHeaders(self):
476 makeProducer when the Range header requests a single byte range that
477 partly overlaps the resource sets the Content-* headers appropriately.
479 request = DummyRequest([])
480 request.headers['range'] = 'bytes=2-10'
481 contentType = "text/plain"
482 resource = self.makeResourceWithContent('abc', type=contentType)
483 resource.makeProducer(request, resource.openForReading())
485 {'content-type': 'text/plain', 'content-length': '1',
486 'content-range': 'bytes 2-2/3'},
487 self.contentHeaders(request))
490 def test_multipleRangeGivesMultipleRangeStaticProducer(self):
492 makeProducer when the Range header requests a single byte range
493 returns an instance of MultipleRangeStaticProducer.
495 request = DummyRequest([])
496 request.headers['range'] = 'bytes=1-3,5-6'
497 resource = self.makeResourceWithContent('abcdef')
498 producer = resource.makeProducer(request, resource.openForReading())
499 self.assertIsInstance(producer, static.MultipleRangeStaticProducer)
502 def test_multipleRangeSets206PartialContent(self):
504 makeProducer when the Range header requests a multiple satisfiable
505 byte ranges sets the response code on the request to 'Partial
508 request = DummyRequest([])
509 request.headers['range'] = 'bytes=1-3,5-6'
510 resource = self.makeResourceWithContent('abcdef')
511 resource.makeProducer(request, resource.openForReading())
513 http.PARTIAL_CONTENT, request.responseCode)
516 def test_mutipleRangeSetsContentHeaders(self):
518 makeProducer when the Range header requests a single, satisfiable byte
519 range sets the Content-* headers appropriately.
521 request = DummyRequest([])
522 request.headers['range'] = 'bytes=1-3,5-6'
523 resource = self.makeResourceWithContent(
524 'abcdefghijkl', encoding='gzip')
525 producer = resource.makeProducer(request, resource.openForReading())
526 contentHeaders = self.contentHeaders(request)
527 # The only content-* headers set are content-type and content-length.
529 set(['content-length', 'content-type']),
530 set(contentHeaders.keys()))
531 # The content-length depends on the boundary used in the response.
533 for boundary, offset, size in producer.rangeInfo:
534 expectedLength += len(boundary)
535 self.assertEqual(expectedLength, contentHeaders['content-length'])
536 # Content-type should be set to a value indicating a multipart
537 # response and the boundary used to separate the parts.
538 self.assertIn('content-type', contentHeaders)
539 contentType = contentHeaders['content-type']
540 self.assertNotIdentical(
542 'multipart/byteranges; boundary="[^"]*"\Z', contentType))
543 # Content-encoding is not set in the response to a multiple range
544 # response, which is a bit wussy but works well enough with the way
545 # static.File does content-encodings...
546 self.assertNotIn('content-encoding', contentHeaders)
549 def test_multipleUnsatisfiableRangesReturnsMultipleRangeStaticProducer(self):
551 makeProducer still returns an instance of L{SingleRangeStaticProducer}
552 when the Range header requests multiple ranges, none of which are
555 request = DummyRequest([])
556 request.headers['range'] = 'bytes=10-12,15-20'
557 resource = self.makeResourceWithContent('abc')
558 producer = resource.makeProducer(request, resource.openForReading())
559 self.assertIsInstance(producer, static.MultipleRangeStaticProducer)
562 def test_multipleUnsatisfiableRangesSets416ReqestedRangeNotSatisfiable(self):
564 makeProducer sets the response code of the request to of 'Requested
565 Range Not Satisfiable' when the Range header requests multiple ranges,
566 none of which are satisfiable.
568 request = DummyRequest([])
569 request.headers['range'] = 'bytes=10-12,15-20'
570 resource = self.makeResourceWithContent('abc')
571 resource.makeProducer(request, resource.openForReading())
573 http.REQUESTED_RANGE_NOT_SATISFIABLE, request.responseCode)
576 def test_multipleUnsatisfiableRangeSetsContentHeaders(self):
578 makeProducer when the Range header requests multiple ranges, none of
579 which are satisfiable, sets the Content-* headers appropriately.
581 request = DummyRequest([])
582 request.headers['range'] = 'bytes=4-10'
583 contentType = "text/plain"
584 request.headers['range'] = 'bytes=10-12,15-20'
585 resource = self.makeResourceWithContent('abc', type=contentType)
586 resource.makeProducer(request, resource.openForReading())
588 {'content-length': '0', 'content-range': 'bytes */3'},
589 self.contentHeaders(request))
592 def test_oneSatisfiableRangeIsEnough(self):
594 makeProducer when the Range header requests multiple ranges, at least
595 one of which matches, sets the response code to 'Partial Content'.
597 request = DummyRequest([])
598 request.headers['range'] = 'bytes=1-3,100-200'
599 resource = self.makeResourceWithContent('abcdef')
600 resource.makeProducer(request, resource.openForReading())
602 http.PARTIAL_CONTENT, request.responseCode)
606 class StaticProducerTests(TestCase):
608 Tests for the abstract L{StaticProducer}.
611 def test_stopProducingClosesFile(self):
613 L{StaticProducer.stopProducing} closes the file object the producer is
616 fileObject = StringIO.StringIO()
617 producer = static.StaticProducer(None, fileObject)
618 producer.stopProducing()
619 self.assertTrue(fileObject.closed)
622 def test_stopProducingSetsRequestToNone(self):
624 L{StaticProducer.stopProducing} sets the request instance variable to
625 None, which indicates to subclasses' resumeProducing methods that no
626 more data should be produced.
628 fileObject = StringIO.StringIO()
629 producer = static.StaticProducer(DummyRequest([]), fileObject)
630 producer.stopProducing()
631 self.assertIdentical(None, producer.request)
635 class NoRangeStaticProducerTests(TestCase):
637 Tests for L{NoRangeStaticProducer}.
640 def test_implementsIPullProducer(self):
642 L{NoRangeStaticProducer} implements L{IPullProducer}.
645 interfaces.IPullProducer,
646 static.NoRangeStaticProducer(None, None))
649 def test_resumeProducingProducesContent(self):
651 L{NoRangeStaticProducer.resumeProducing} writes content from the
652 resource to the request.
654 request = DummyRequest([])
656 producer = static.NoRangeStaticProducer(
657 request, StringIO.StringIO(content))
658 # start calls registerProducer on the DummyRequest, which pulls all
659 # output from the producer and so we just need this one call.
661 self.assertEqual(content, ''.join(request.written))
664 def test_resumeProducingBuffersOutput(self):
666 L{NoRangeStaticProducer.start} writes at most
667 C{abstract.FileDescriptor.bufferSize} bytes of content from the
668 resource to the request at once.
670 request = DummyRequest([])
671 bufferSize = abstract.FileDescriptor.bufferSize
672 content = 'a' * (2*bufferSize + 1)
673 producer = static.NoRangeStaticProducer(
674 request, StringIO.StringIO(content))
675 # start calls registerProducer on the DummyRequest, which pulls all
676 # output from the producer and so we just need this one call.
679 content[0:bufferSize],
680 content[bufferSize:2*bufferSize],
681 content[2*bufferSize:]
683 self.assertEqual(expected, request.written)
686 def test_finishCalledWhenDone(self):
688 L{NoRangeStaticProducer.resumeProducing} calls finish() on the request
689 after it is done producing content.
691 request = DummyRequest([])
692 finishDeferred = request.notifyFinish()
694 finishDeferred.addCallback(callbackList.append)
695 producer = static.NoRangeStaticProducer(
696 request, StringIO.StringIO('abcdef'))
697 # start calls registerProducer on the DummyRequest, which pulls all
698 # output from the producer and so we just need this one call.
700 self.assertEqual([None], callbackList)
704 class SingleRangeStaticProducerTests(TestCase):
706 Tests for L{SingleRangeStaticProducer}.
709 def test_implementsIPullProducer(self):
711 L{SingleRangeStaticProducer} implements L{IPullProducer}.
714 interfaces.IPullProducer,
715 static.SingleRangeStaticProducer(None, None, None, None))
718 def test_resumeProducingProducesContent(self):
720 L{SingleRangeStaticProducer.resumeProducing} writes the given amount
721 of content, starting at the given offset, from the resource to the
724 request = DummyRequest([])
726 producer = static.SingleRangeStaticProducer(
727 request, StringIO.StringIO(content), 1, 3)
728 # DummyRequest.registerProducer pulls all output from the producer, so
729 # we just need to call start.
731 self.assertEqual(content[1:4], ''.join(request.written))
734 def test_resumeProducingBuffersOutput(self):
736 L{SingleRangeStaticProducer.start} writes at most
737 C{abstract.FileDescriptor.bufferSize} bytes of content from the
738 resource to the request at once.
740 request = DummyRequest([])
741 bufferSize = abstract.FileDescriptor.bufferSize
742 content = 'abc' * bufferSize
743 producer = static.SingleRangeStaticProducer(
744 request, StringIO.StringIO(content), 1, bufferSize+10)
745 # DummyRequest.registerProducer pulls all output from the producer, so
746 # we just need to call start.
749 content[1:bufferSize+1],
750 content[bufferSize+1:bufferSize+11],
752 self.assertEqual(expected, request.written)
755 def test_finishCalledWhenDone(self):
757 L{SingleRangeStaticProducer.resumeProducing} calls finish() on the
758 request after it is done producing content.
760 request = DummyRequest([])
761 finishDeferred = request.notifyFinish()
763 finishDeferred.addCallback(callbackList.append)
764 producer = static.SingleRangeStaticProducer(
765 request, StringIO.StringIO('abcdef'), 1, 1)
766 # start calls registerProducer on the DummyRequest, which pulls all
767 # output from the producer and so we just need this one call.
769 self.assertEqual([None], callbackList)
773 class MultipleRangeStaticProducerTests(TestCase):
775 Tests for L{MultipleRangeStaticProducer}.
778 def test_implementsIPullProducer(self):
780 L{MultipleRangeStaticProducer} implements L{IPullProducer}.
783 interfaces.IPullProducer,
784 static.MultipleRangeStaticProducer(None, None, None))
787 def test_resumeProducingProducesContent(self):
789 L{MultipleRangeStaticProducer.resumeProducing} writes the requested
790 chunks of content from the resource to the request, with the supplied
791 boundaries in between each chunk.
793 request = DummyRequest([])
795 producer = static.MultipleRangeStaticProducer(
796 request, StringIO.StringIO(content), [('1', 1, 3), ('2', 5, 1)])
797 # DummyRequest.registerProducer pulls all output from the producer, so
798 # we just need to call start.
800 self.assertEqual('1bcd2f', ''.join(request.written))
803 def test_resumeProducingBuffersOutput(self):
805 L{MultipleRangeStaticProducer.start} writes about
806 C{abstract.FileDescriptor.bufferSize} bytes of content from the
807 resource to the request at once.
809 To be specific about the 'about' above: it can write slightly more,
810 for example in the case where the first boundary plus the first chunk
811 is less than C{bufferSize} but first boundary plus the first chunk
812 plus the second boundary is more, but this is unimportant as in
813 practice the boundaries are fairly small. On the other side, it is
814 important for performance to bundle up several small chunks into one
815 call to request.write.
817 request = DummyRequest([])
818 content = '0123456789' * 2
819 producer = static.MultipleRangeStaticProducer(
820 request, StringIO.StringIO(content),
821 [('a', 0, 2), ('b', 5, 10), ('c', 0, 0)])
822 producer.bufferSize = 10
823 # DummyRequest.registerProducer pulls all output from the producer, so
824 # we just need to call start.
827 'a' + content[0:2] + 'b' + content[5:11],
828 content[11:15] + 'c',
830 self.assertEqual(expected, request.written)
833 def test_finishCalledWhenDone(self):
835 L{MultipleRangeStaticProducer.resumeProducing} calls finish() on the
836 request after it is done producing content.
838 request = DummyRequest([])
839 finishDeferred = request.notifyFinish()
841 finishDeferred.addCallback(callbackList.append)
842 producer = static.MultipleRangeStaticProducer(
843 request, StringIO.StringIO('abcdef'), [('', 1, 2)])
844 # start calls registerProducer on the DummyRequest, which pulls all
845 # output from the producer and so we just need this one call.
847 self.assertEqual([None], callbackList)
851 class RangeTests(TestCase):
853 Tests for I{Range-Header} support in L{twisted.web.static.File}.
856 @ivar file: Temporary (binary) file containing the content to be served.
858 @type resource: L{static.File}
859 @ivar resource: A leaf web resource using C{file} as content.
861 @type request: L{DummyRequest}
862 @ivar request: A fake request, requesting C{resource}.
864 @type catcher: L{list}
865 @ivar catcher: List which gathers all log information.
869 Create a temporary file with a fixed payload of 64 bytes. Create a
870 resource for that file and create a request which will be for that
871 resource. Each test can set a different range header to test different
872 aspects of the implementation.
874 path = FilePath(self.mktemp())
875 # This is just a jumble of random stuff. It's supposed to be a good
876 # set of data for this test, particularly in order to avoid
877 # accidentally seeing the right result by having a byte sequence
878 # repeated at different locations or by having byte values which are
879 # somehow correlated with their position in the string.
880 self.payload = ('\xf8u\xf3E\x8c7\xce\x00\x9e\xb6a0y0S\xf0\xef\xac\xb7'
881 '\xbe\xb5\x17M\x1e\x136k{\x1e\xbe\x0c\x07\x07\t\xd0'
882 '\xbckY\xf5I\x0b\xb8\x88oZ\x1d\x85b\x1a\xcdk\xf2\x1d'
883 '&\xfd%\xdd\x82q/A\x10Y\x8b')
884 path.setContent(self.payload)
885 self.file = path.open()
886 self.resource = static.File(self.file.name)
887 self.resource.isLeaf = 1
888 self.request = DummyRequest([''])
889 self.request.uri = self.file.name
891 log.addObserver(self.catcher.append)
896 Clean up the resource file and the log observer.
899 log.removeObserver(self.catcher.append)
902 def _assertLogged(self, expected):
904 Asserts that a given log message occurred with an expected message.
906 logItem = self.catcher.pop()
907 self.assertEqual(logItem["message"][0], expected)
909 self.catcher, [], "An additional log occured: %r" % (logItem,))
912 def test_invalidRanges(self):
914 L{File._parseRangeHeader} raises L{ValueError} when passed
915 syntactically invalid byte ranges.
917 f = self.resource._parseRangeHeader
920 self.assertRaises(ValueError, f, 'bytes')
922 # unknown isn't a valid Bytes-Unit
923 self.assertRaises(ValueError, f, 'unknown=1-2')
925 # there's no - in =stuff
926 self.assertRaises(ValueError, f, 'bytes=3')
928 # both start and end are empty
929 self.assertRaises(ValueError, f, 'bytes=-')
931 # start isn't an integer
932 self.assertRaises(ValueError, f, 'bytes=foo-')
934 # end isn't an integer
935 self.assertRaises(ValueError, f, 'bytes=-foo')
937 # end isn't equal to or greater than start
938 self.assertRaises(ValueError, f, 'bytes=5-4')
941 def test_rangeMissingStop(self):
943 A single bytes range without an explicit stop position is parsed into a
944 two-tuple giving the start position and C{None}.
947 self.resource._parseRangeHeader('bytes=0-'), [(0, None)])
950 def test_rangeMissingStart(self):
952 A single bytes range without an explicit start position is parsed into
953 a two-tuple of C{None} and the end position.
956 self.resource._parseRangeHeader('bytes=-3'), [(None, 3)])
959 def test_range(self):
961 A single bytes range with explicit start and stop positions is parsed
962 into a two-tuple of those positions.
965 self.resource._parseRangeHeader('bytes=2-5'), [(2, 5)])
968 def test_rangeWithSpace(self):
970 A single bytes range with whitespace in allowed places is parsed in
971 the same way as it would be without the whitespace.
974 self.resource._parseRangeHeader(' bytes=1-2 '), [(1, 2)])
976 self.resource._parseRangeHeader('bytes =1-2 '), [(1, 2)])
978 self.resource._parseRangeHeader('bytes= 1-2'), [(1, 2)])
980 self.resource._parseRangeHeader('bytes=1 -2'), [(1, 2)])
982 self.resource._parseRangeHeader('bytes=1- 2'), [(1, 2)])
984 self.resource._parseRangeHeader('bytes=1-2 '), [(1, 2)])
987 def test_nullRangeElements(self):
989 If there are multiple byte ranges but only one is non-null, the
990 non-null range is parsed and its start and stop returned.
993 self.resource._parseRangeHeader('bytes=1-2,\r\n, ,\t'), [(1, 2)])
996 def test_multipleRanges(self):
998 If multiple byte ranges are specified their starts and stops are
1002 self.resource._parseRangeHeader('bytes=1-2,3-4'),
1006 def test_bodyLength(self):
1008 A correct response to a range request is as long as the length of the
1011 self.request.headers['range'] = 'bytes=0-43'
1012 self.resource.render(self.request)
1013 self.assertEqual(len(''.join(self.request.written)), 44)
1016 def test_invalidRangeRequest(self):
1018 An incorrect range request (RFC 2616 defines a correct range request as
1019 a Bytes-Unit followed by a '=' character followed by a specific range.
1020 Only 'bytes' is defined) results in the range header value being logged
1021 and a normal 200 response being sent.
1023 self.request.headers['range'] = range = 'foobar=0-43'
1024 self.resource.render(self.request)
1025 expected = "Ignoring malformed Range header %r" % (range,)
1026 self._assertLogged(expected)
1027 self.assertEqual(''.join(self.request.written), self.payload)
1028 self.assertEqual(self.request.responseCode, http.OK)
1030 self.request.outgoingHeaders['content-length'],
1031 str(len(self.payload)))
1034 def parseMultipartBody(self, body, boundary):
1036 Parse C{body} as a multipart MIME response separated by C{boundary}.
1038 Note that this with fail the calling test on certain syntactic
1041 sep = "\r\n--" + boundary
1042 parts = ''.join(body).split(sep)
1043 self.assertEqual('', parts[0])
1044 self.assertEqual('--\r\n', parts[-1])
1046 for part in parts[1:-1]:
1047 before, header1, header2, blank, partBody = part.split('\r\n', 4)
1048 headers = header1 + '\n' + header2
1049 self.assertEqual('', before)
1050 self.assertEqual('', blank)
1051 partContentTypeValue = re.search(
1052 '^content-type: (.*)$', headers, re.I|re.M).group(1)
1053 start, end, size = re.search(
1054 '^content-range: bytes ([0-9]+)-([0-9]+)/([0-9]+)$',
1055 headers, re.I|re.M).groups()
1056 parsed_parts.append(
1057 {'contentType': partContentTypeValue,
1058 'contentRange': (start, end, size),
1063 def test_multipleRangeRequest(self):
1065 The response to a request for multipe bytes ranges is a MIME-ish
1068 startEnds = [(0, 2), (20, 30), (40, 50)]
1069 rangeHeaderValue = ','.join(["%s-%s"%(s,e) for (s, e) in startEnds])
1070 self.request.headers['range'] = 'bytes=' + rangeHeaderValue
1071 self.resource.render(self.request)
1072 self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1073 boundary = re.match(
1074 '^multipart/byteranges; boundary="(.*)"$',
1075 self.request.outgoingHeaders['content-type']).group(1)
1076 parts = self.parseMultipartBody(''.join(self.request.written), boundary)
1077 self.assertEqual(len(startEnds), len(parts))
1078 for part, (s, e) in zip(parts, startEnds):
1079 self.assertEqual(self.resource.type, part['contentType'])
1080 start, end, size = part['contentRange']
1081 self.assertEqual(int(start), s)
1082 self.assertEqual(int(end), e)
1083 self.assertEqual(int(size), self.resource.getFileSize())
1084 self.assertEqual(self.payload[s:e+1], part['body'])
1087 def test_multipleRangeRequestWithRangeOverlappingEnd(self):
1089 The response to a request for multipe bytes ranges is a MIME-ish
1090 multipart response, even when one of the ranged falls off the end of
1093 startEnds = [(0, 2), (40, len(self.payload) + 10)]
1094 rangeHeaderValue = ','.join(["%s-%s"%(s,e) for (s, e) in startEnds])
1095 self.request.headers['range'] = 'bytes=' + rangeHeaderValue
1096 self.resource.render(self.request)
1097 self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1098 boundary = re.match(
1099 '^multipart/byteranges; boundary="(.*)"$',
1100 self.request.outgoingHeaders['content-type']).group(1)
1101 parts = self.parseMultipartBody(''.join(self.request.written), boundary)
1102 self.assertEqual(len(startEnds), len(parts))
1103 for part, (s, e) in zip(parts, startEnds):
1104 self.assertEqual(self.resource.type, part['contentType'])
1105 start, end, size = part['contentRange']
1106 self.assertEqual(int(start), s)
1107 self.assertEqual(int(end), min(e, self.resource.getFileSize()-1))
1108 self.assertEqual(int(size), self.resource.getFileSize())
1109 self.assertEqual(self.payload[s:e+1], part['body'])
1112 def test_implicitEnd(self):
1114 If the end byte position is omitted, then it is treated as if the
1115 length of the resource was specified by the end byte position.
1117 self.request.headers['range'] = 'bytes=23-'
1118 self.resource.render(self.request)
1119 self.assertEqual(''.join(self.request.written), self.payload[23:])
1120 self.assertEqual(len(''.join(self.request.written)), 41)
1121 self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1123 self.request.outgoingHeaders['content-range'], 'bytes 23-63/64')
1124 self.assertEqual(self.request.outgoingHeaders['content-length'], '41')
1127 def test_implicitStart(self):
1129 If the start byte position is omitted but the end byte position is
1130 supplied, then the range is treated as requesting the last -N bytes of
1131 the resource, where N is the end byte position.
1133 self.request.headers['range'] = 'bytes=-17'
1134 self.resource.render(self.request)
1135 self.assertEqual(''.join(self.request.written), self.payload[-17:])
1136 self.assertEqual(len(''.join(self.request.written)), 17)
1137 self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1139 self.request.outgoingHeaders['content-range'], 'bytes 47-63/64')
1140 self.assertEqual(self.request.outgoingHeaders['content-length'], '17')
1143 def test_explicitRange(self):
1145 A correct response to a bytes range header request from A to B starts
1146 with the A'th byte and ends with (including) the B'th byte. The first
1147 byte of a page is numbered with 0.
1149 self.request.headers['range'] = 'bytes=3-43'
1150 self.resource.render(self.request)
1151 written = ''.join(self.request.written)
1152 self.assertEqual(written, self.payload[3:44])
1153 self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1155 self.request.outgoingHeaders['content-range'], 'bytes 3-43/64')
1157 str(len(written)), self.request.outgoingHeaders['content-length'])
1160 def test_explicitRangeOverlappingEnd(self):
1162 A correct response to a bytes range header request from A to B when B
1163 is past the end of the resource starts with the A'th byte and ends
1164 with the last byte of the resource. The first byte of a page is
1167 self.request.headers['range'] = 'bytes=40-100'
1168 self.resource.render(self.request)
1169 written = ''.join(self.request.written)
1170 self.assertEqual(written, self.payload[40:])
1171 self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1173 self.request.outgoingHeaders['content-range'], 'bytes 40-63/64')
1175 str(len(written)), self.request.outgoingHeaders['content-length'])
1178 def test_statusCodeRequestedRangeNotSatisfiable(self):
1180 If a range is syntactically invalid due to the start being greater than
1181 the end, the range header is ignored (the request is responded to as if
1182 it were not present).
1184 self.request.headers['range'] = 'bytes=20-13'
1185 self.resource.render(self.request)
1186 self.assertEqual(self.request.responseCode, http.OK)
1187 self.assertEqual(''.join(self.request.written), self.payload)
1189 self.request.outgoingHeaders['content-length'],
1190 str(len(self.payload)))
1193 def test_invalidStartBytePos(self):
1195 If a range is unsatisfiable due to the start not being less than the
1196 length of the resource, the response is 416 (Requested range not
1197 satisfiable) and no data is written to the response body (RFC 2616,
1200 self.request.headers['range'] = 'bytes=67-108'
1201 self.resource.render(self.request)
1203 self.request.responseCode, http.REQUESTED_RANGE_NOT_SATISFIABLE)
1204 self.assertEqual(''.join(self.request.written), '')
1205 self.assertEqual(self.request.outgoingHeaders['content-length'], '0')
1206 # Sections 10.4.17 and 14.16
1208 self.request.outgoingHeaders['content-range'],
1209 'bytes */%d' % (len(self.payload),))
1213 class DirectoryListerTest(TestCase):
1215 Tests for L{static.DirectoryLister}.
1217 def _request(self, uri):
1218 request = DummyRequest([''])
1223 def test_renderHeader(self):
1225 L{static.DirectoryLister} prints the request uri as header of the
1228 path = FilePath(self.mktemp())
1231 lister = static.DirectoryLister(path.path)
1232 data = lister.render(self._request('foo'))
1233 self.assertIn("<h1>Directory listing for foo</h1>", data)
1234 self.assertIn("<title>Directory listing for foo</title>", data)
1237 def test_renderUnquoteHeader(self):
1239 L{static.DirectoryLister} unquote the request uri before printing it.
1241 path = FilePath(self.mktemp())
1244 lister = static.DirectoryLister(path.path)
1245 data = lister.render(self._request('foo%20bar'))
1246 self.assertIn("<h1>Directory listing for foo bar</h1>", data)
1247 self.assertIn("<title>Directory listing for foo bar</title>", data)
1250 def test_escapeHeader(self):
1252 L{static.DirectoryLister} escape "&", "<" and ">" after unquoting the
1255 path = FilePath(self.mktemp())
1258 lister = static.DirectoryLister(path.path)
1259 data = lister.render(self._request('foo%26bar'))
1260 self.assertIn("<h1>Directory listing for foo&bar</h1>", data)
1261 self.assertIn("<title>Directory listing for foo&bar</title>", data)
1264 def test_renderFiles(self):
1266 L{static.DirectoryLister} is able to list all the files inside a
1269 path = FilePath(self.mktemp())
1271 path.child('file1').setContent("content1")
1272 path.child('file2').setContent("content2" * 1000)
1274 lister = static.DirectoryLister(path.path)
1275 data = lister.render(self._request('foo'))
1276 body = """<tr class="odd">
1277 <td><a href="file1">file1</a></td>
1279 <td>[text/html]</td>
1283 <td><a href="file2">file2</a></td>
1285 <td>[text/html]</td>
1288 self.assertIn(body, data)
1291 def test_renderDirectories(self):
1293 L{static.DirectoryLister} is able to list all the directories inside
1296 path = FilePath(self.mktemp())
1298 path.child('dir1').makedirs()
1299 path.child('dir2 & 3').makedirs()
1301 lister = static.DirectoryLister(path.path)
1302 data = lister.render(self._request('foo'))
1303 body = """<tr class="odd">
1304 <td><a href="dir1/">dir1/</a></td>
1306 <td>[Directory]</td>
1310 <td><a href="dir2%20%26%203/">dir2 & 3/</a></td>
1312 <td>[Directory]</td>
1315 self.assertIn(body, data)
1318 def test_renderFiltered(self):
1320 L{static.DirectoryLister} takes a optional C{dirs} argument that
1321 filter out the list of of directories and files printed.
1323 path = FilePath(self.mktemp())
1325 path.child('dir1').makedirs()
1326 path.child('dir2').makedirs()
1327 path.child('dir3').makedirs()
1328 lister = static.DirectoryLister(path.path, dirs=["dir1", "dir3"])
1329 data = lister.render(self._request('foo'))
1330 body = """<tr class="odd">
1331 <td><a href="dir1/">dir1/</a></td>
1333 <td>[Directory]</td>
1337 <td><a href="dir3/">dir3/</a></td>
1339 <td>[Directory]</td>
1342 self.assertIn(body, data)
1345 def test_oddAndEven(self):
1347 L{static.DirectoryLister} gives an alternate class for each odd and
1348 even rows in the table.
1350 lister = static.DirectoryLister(None)
1351 elements = [{"href": "", "text": "", "size": "", "type": "",
1352 "encoding": ""} for i in xrange(5)]
1353 content = lister._buildTableContent(elements)
1355 self.assertEqual(len(content), 5)
1356 self.assertTrue(content[0].startswith('<tr class="odd">'))
1357 self.assertTrue(content[1].startswith('<tr class="even">'))
1358 self.assertTrue(content[2].startswith('<tr class="odd">'))
1359 self.assertTrue(content[3].startswith('<tr class="even">'))
1360 self.assertTrue(content[4].startswith('<tr class="odd">'))
1363 def test_contentType(self):
1365 L{static.DirectoryLister} produces a MIME-type that indicates that it is
1366 HTML, and includes its charset (UTF-8).
1368 path = FilePath(self.mktemp())
1370 lister = static.DirectoryLister(path.path)
1371 req = self._request('')
1373 self.assertEqual(req.outgoingHeaders['content-type'],
1374 "text/html; charset=utf-8")
1377 def test_mimeTypeAndEncodings(self):
1379 L{static.DirectoryLister} is able to detect mimetype and encoding of
1382 path = FilePath(self.mktemp())
1384 path.child('file1.txt').setContent("file1")
1385 path.child('file2.py').setContent("python")
1386 path.child('file3.conf.gz').setContent("conf compressed")
1387 path.child('file4.diff.bz2').setContent("diff compressed")
1388 directory = os.listdir(path.path)
1392 ".txt": "text/plain",
1393 ".py": "text/python",
1394 ".conf": "text/configuration",
1395 ".diff": "text/diff"
1398 lister = static.DirectoryLister(path.path, contentTypes=contentTypes)
1399 dirs, files = lister._getFilesAndDirectories(directory)
1400 self.assertEqual(dirs, [])
1401 self.assertEqual(files, [
1403 'href': 'file1.txt',
1405 'text': 'file1.txt',
1406 'type': '[text/plain]'},
1411 'type': '[text/python]'},
1412 {'encoding': '[gzip]',
1413 'href': 'file3.conf.gz',
1415 'text': 'file3.conf.gz',
1416 'type': '[text/configuration]'},
1417 {'encoding': '[bzip2]',
1418 'href': 'file4.diff.bz2',
1420 'text': 'file4.diff.bz2',
1421 'type': '[text/diff]'}])
1424 def test_brokenSymlink(self):
1426 If on the file in the listing points to a broken symlink, it should not
1427 be returned by L{static.DirectoryLister._getFilesAndDirectories}.
1429 path = FilePath(self.mktemp())
1431 file1 = path.child('file1')
1432 file1.setContent("file1")
1433 file1.linkTo(path.child("file2"))
1436 lister = static.DirectoryLister(path.path)
1437 directory = os.listdir(path.path)
1439 dirs, files = lister._getFilesAndDirectories(directory)
1440 self.assertEqual(dirs, [])
1441 self.assertEqual(files, [])
1443 if getattr(os, "symlink", None) is None:
1444 test_brokenSymlink.skip = "No symlink support"
1447 def test_childrenNotFound(self):
1449 Any child resource of L{static.DirectoryLister} renders an HTTP
1450 I{NOT FOUND} response code.
1452 path = FilePath(self.mktemp())
1454 lister = static.DirectoryLister(path.path)
1455 request = self._request('')
1456 child = resource.getChildForRequest(lister, request)
1457 result = _render(child, request)
1458 def cbRendered(ignored):
1459 self.assertEqual(request.responseCode, http.NOT_FOUND)
1460 result.addCallback(cbRendered)
1464 def test_repr(self):
1466 L{static.DirectoryLister.__repr__} gives the path of the lister.
1468 path = FilePath(self.mktemp())
1469 lister = static.DirectoryLister(path.path)
1470 self.assertEqual(repr(lister),
1471 "<DirectoryLister of %r>" % (path.path,))
1472 self.assertEqual(str(lister),
1473 "<DirectoryLister of %r>" % (path.path,))
1475 def test_formatFileSize(self):
1477 L{static.formatFileSize} format an amount of bytes into a more readable
1480 self.assertEqual(static.formatFileSize(0), "0B")
1481 self.assertEqual(static.formatFileSize(123), "123B")
1482 self.assertEqual(static.formatFileSize(4567), "4K")
1483 self.assertEqual(static.formatFileSize(8900000), "8M")
1484 self.assertEqual(static.formatFileSize(1234000000), "1G")
1485 self.assertEqual(static.formatFileSize(1234567890000), "1149G")
1489 class TestFileTransferDeprecated(TestCase):
1491 L{static.FileTransfer} is deprecated.
1494 def test_deprecation(self):
1496 Instantiation of L{FileTransfer} produces a deprecation warning.
1498 static.FileTransfer(StringIO.StringIO(), 0, DummyRequest([]))
1499 warnings = self.flushWarnings([self.test_deprecation])
1500 self.assertEqual(len(warnings), 1)
1501 self.assertEqual(warnings[0]['category'], DeprecationWarning)
1503 warnings[0]['message'],
1504 'FileTransfer is deprecated since Twisted 9.0. '
1505 'Use a subclass of StaticProducer instead.')