Initial import to Tizen
[profile/ivi/python-twisted.git] / twisted / web / test / test_static.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for L{twisted.web.static}.
6 """
7
8 import os, re, StringIO
9
10 from zope.interface.verify import verifyObject
11
12 from twisted.internet import abstract, interfaces
13 from twisted.python.compat import set
14 from twisted.python.runtime import platform
15 from twisted.python.filepath import FilePath
16 from twisted.python import log
17 from twisted.trial.unittest import TestCase
18 from twisted.web import static, http, script, resource
19 from twisted.web.server import UnsupportedMethod
20 from twisted.web.test.test_web import DummyRequest
21 from twisted.web.test._util import _render
22
23
24 class StaticDataTests(TestCase):
25     """
26     Tests for L{Data}.
27     """
28     def test_headRequest(self):
29         """
30         L{Data.render} returns an empty response body for a I{HEAD} request.
31         """
32         data = static.Data("foo", "bar")
33         request = DummyRequest([''])
34         request.method = 'HEAD'
35         d = _render(data, request)
36         def cbRendered(ignored):
37             self.assertEqual(''.join(request.written), "")
38         d.addCallback(cbRendered)
39         return d
40
41
42     def test_invalidMethod(self):
43         """
44         L{Data.render} raises L{UnsupportedMethod} in response to a non-I{GET},
45         non-I{HEAD} request.
46         """
47         data = static.Data("foo", "bar")
48         request = DummyRequest([''])
49         request.method = 'POST'
50         self.assertRaises(UnsupportedMethod, data.render, request)
51
52
53
54 class StaticFileTests(TestCase):
55     """
56     Tests for the basic behavior of L{File}.
57     """
58     def _render(self, resource, request):
59         return _render(resource, request)
60
61
62     def test_invalidMethod(self):
63         """
64         L{File.render} raises L{UnsupportedMethod} in response to a non-I{GET},
65         non-I{HEAD} request.
66         """
67         request = DummyRequest([''])
68         request.method = 'POST'
69         path = FilePath(self.mktemp())
70         path.setContent("foo")
71         file = static.File(path.path)
72         self.assertRaises(UnsupportedMethod, file.render, request)
73
74
75     def test_notFound(self):
76         """
77         If a request is made which encounters a L{File} before a final segment
78         which does not correspond to any file in the path the L{File} was
79         created with, a not found response is sent.
80         """
81         base = FilePath(self.mktemp())
82         base.makedirs()
83         file = static.File(base.path)
84
85         request = DummyRequest(['foobar'])
86         child = resource.getChildForRequest(file, request)
87
88         d = self._render(child, request)
89         def cbRendered(ignored):
90             self.assertEqual(request.responseCode, 404)
91         d.addCallback(cbRendered)
92         return d
93
94
95     def test_emptyChild(self):
96         """
97         The C{''} child of a L{File} which corresponds to a directory in the
98         filesystem is a L{DirectoryLister}.
99         """
100         base = FilePath(self.mktemp())
101         base.makedirs()
102         file = static.File(base.path)
103
104         request = DummyRequest([''])
105         child = resource.getChildForRequest(file, request)
106         self.assertIsInstance(child, static.DirectoryLister)
107         self.assertEqual(child.path, base.path)
108
109
110     def test_securityViolationNotFound(self):
111         """
112         If a request is made which encounters a L{File} before a final segment
113         which cannot be looked up in the filesystem due to security
114         considerations, a not found response is sent.
115         """
116         base = FilePath(self.mktemp())
117         base.makedirs()
118         file = static.File(base.path)
119
120         request = DummyRequest(['..'])
121         child = resource.getChildForRequest(file, request)
122
123         d = self._render(child, request)
124         def cbRendered(ignored):
125             self.assertEqual(request.responseCode, 404)
126         d.addCallback(cbRendered)
127         return d
128
129
130     def test_forbiddenResource(self):
131         """
132         If the file in the filesystem which would satisfy a request cannot be
133         read, L{File.render} sets the HTTP response code to I{FORBIDDEN}.
134         """
135         base = FilePath(self.mktemp())
136         base.setContent('')
137         # Make sure we can delete the file later.
138         self.addCleanup(base.chmod, 0700)
139
140         # Get rid of our own read permission.
141         base.chmod(0)
142
143         file = static.File(base.path)
144         request = DummyRequest([''])
145         d = self._render(file, request)
146         def cbRendered(ignored):
147             self.assertEqual(request.responseCode, 403)
148         d.addCallback(cbRendered)
149         return d
150     if platform.isWindows():
151         test_forbiddenResource.skip = "Cannot remove read permission on Windows"
152
153
154     def test_indexNames(self):
155         """
156         If a request is made which encounters a L{File} before a final empty
157         segment, a file in the L{File} instance's C{indexNames} list which
158         exists in the path the L{File} was created with is served as the
159         response to the request.
160         """
161         base = FilePath(self.mktemp())
162         base.makedirs()
163         base.child("foo.bar").setContent("baz")
164         file = static.File(base.path)
165         file.indexNames = ['foo.bar']
166
167         request = DummyRequest([''])
168         child = resource.getChildForRequest(file, request)
169
170         d = self._render(child, request)
171         def cbRendered(ignored):
172             self.assertEqual(''.join(request.written), 'baz')
173             self.assertEqual(request.outgoingHeaders['content-length'], '3')
174         d.addCallback(cbRendered)
175         return d
176
177
178     def test_staticFile(self):
179         """
180         If a request is made which encounters a L{File} before a final segment
181         which names a file in the path the L{File} was created with, that file
182         is served as the response to the request.
183         """
184         base = FilePath(self.mktemp())
185         base.makedirs()
186         base.child("foo.bar").setContent("baz")
187         file = static.File(base.path)
188
189         request = DummyRequest(['foo.bar'])
190         child = resource.getChildForRequest(file, request)
191
192         d = self._render(child, request)
193         def cbRendered(ignored):
194             self.assertEqual(''.join(request.written), 'baz')
195             self.assertEqual(request.outgoingHeaders['content-length'], '3')
196         d.addCallback(cbRendered)
197         return d
198
199
200     def test_staticFileDeletedGetChild(self):
201         """
202         A L{static.File} created for a directory which does not exist should
203         return childNotFound from L{static.File.getChild}.
204         """
205         staticFile = static.File(self.mktemp())
206         request = DummyRequest(['foo.bar'])
207         child = staticFile.getChild("foo.bar", request)
208         self.assertEqual(child, staticFile.childNotFound)
209
210
211     def test_staticFileDeletedRender(self):
212         """
213         A L{static.File} created for a file which does not exist should render
214         its C{childNotFound} page.
215         """
216         staticFile = static.File(self.mktemp())
217         request = DummyRequest(['foo.bar'])
218         request2 = DummyRequest(['foo.bar'])
219         d = self._render(staticFile, request)
220         d2 = self._render(staticFile.childNotFound, request2)
221         def cbRendered2(ignored):
222             def cbRendered(ignored):
223                 self.assertEqual(''.join(request.written),
224                                   ''.join(request2.written))
225             d.addCallback(cbRendered)
226             return d
227         d2.addCallback(cbRendered2)
228         return d2
229
230
231     def test_headRequest(self):
232         """
233         L{static.File.render} returns an empty response body for I{HEAD}
234         requests.
235         """
236         path = FilePath(self.mktemp())
237         path.setContent("foo")
238         file = static.File(path.path)
239         request = DummyRequest([''])
240         request.method = 'HEAD'
241         d = _render(file, request)
242         def cbRendered(ignored):
243             self.assertEqual("".join(request.written), "")
244         d.addCallback(cbRendered)
245         return d
246
247
248     def test_processors(self):
249         """
250         If a request is made which encounters a L{File} before a final segment
251         which names a file with an extension which is in the L{File}'s
252         C{processors} mapping, the processor associated with that extension is
253         used to serve the response to the request.
254         """
255         base = FilePath(self.mktemp())
256         base.makedirs()
257         base.child("foo.bar").setContent(
258             "from twisted.web.static import Data\n"
259             "resource = Data('dynamic world','text/plain')\n")
260
261         file = static.File(base.path)
262         file.processors = {'.bar': script.ResourceScript}
263         request = DummyRequest(["foo.bar"])
264         child = resource.getChildForRequest(file, request)
265
266         d = self._render(child, request)
267         def cbRendered(ignored):
268             self.assertEqual(''.join(request.written), 'dynamic world')
269             self.assertEqual(request.outgoingHeaders['content-length'], '13')
270         d.addCallback(cbRendered)
271         return d
272
273
274     def test_ignoreExt(self):
275         """
276         The list of ignored extensions can be set by passing a value to
277         L{File.__init__} or by calling L{File.ignoreExt} later.
278         """
279         file = static.File(".")
280         self.assertEqual(file.ignoredExts, [])
281         file.ignoreExt(".foo")
282         file.ignoreExt(".bar")
283         self.assertEqual(file.ignoredExts, [".foo", ".bar"])
284
285         file = static.File(".", ignoredExts=(".bar", ".baz"))
286         self.assertEqual(file.ignoredExts, [".bar", ".baz"])
287
288
289     def test_ignoredExtensionsIgnored(self):
290         """
291         A request for the I{base} child of a L{File} succeeds with a resource
292         for the I{base<extension>} file in the path the L{File} was created
293         with if such a file exists and the L{File} has been configured to
294         ignore the I{<extension>} extension.
295         """
296         base = FilePath(self.mktemp())
297         base.makedirs()
298         base.child('foo.bar').setContent('baz')
299         base.child('foo.quux').setContent('foobar')
300         file = static.File(base.path, ignoredExts=(".bar",))
301
302         request = DummyRequest(["foo"])
303         child = resource.getChildForRequest(file, request)
304
305         d = self._render(child, request)
306         def cbRendered(ignored):
307             self.assertEqual(''.join(request.written), 'baz')
308         d.addCallback(cbRendered)
309         return d
310
311
312
313 class StaticMakeProducerTests(TestCase):
314     """
315     Tests for L{File.makeProducer}.
316     """
317
318
319     def makeResourceWithContent(self, content, type=None, encoding=None):
320         """
321         Make a L{static.File} resource that has C{content} for its content.
322
323         @param content: The bytes to use as the contents of the resource.
324         @param type: Optional value for the content type of the resource.
325         """
326         fileName = self.mktemp()
327         fileObject = open(fileName, 'w')
328         fileObject.write(content)
329         fileObject.close()
330         resource = static.File(fileName)
331         resource.encoding = encoding
332         resource.type = type
333         return resource
334
335
336     def contentHeaders(self, request):
337         """
338         Extract the content-* headers from the L{DummyRequest} C{request}.
339
340         This returns the subset of C{request.outgoingHeaders} of headers that
341         start with 'content-'.
342         """
343         contentHeaders = {}
344         for k, v in request.outgoingHeaders.iteritems():
345             if k.startswith('content-'):
346                 contentHeaders[k] = v
347         return contentHeaders
348
349
350     def test_noRangeHeaderGivesNoRangeStaticProducer(self):
351         """
352         makeProducer when no Range header is set returns an instance of
353         NoRangeStaticProducer.
354         """
355         resource = self.makeResourceWithContent('')
356         request = DummyRequest([])
357         producer = resource.makeProducer(request, resource.openForReading())
358         self.assertIsInstance(producer, static.NoRangeStaticProducer)
359
360
361     def test_noRangeHeaderSets200OK(self):
362         """
363         makeProducer when no Range header is set sets the responseCode on the
364         request to 'OK'.
365         """
366         resource = self.makeResourceWithContent('')
367         request = DummyRequest([])
368         resource.makeProducer(request, resource.openForReading())
369         self.assertEqual(http.OK, request.responseCode)
370
371
372     def test_noRangeHeaderSetsContentHeaders(self):
373         """
374         makeProducer when no Range header is set sets the Content-* headers
375         for the response.
376         """
377         length = 123
378         contentType = "text/plain"
379         contentEncoding = 'gzip'
380         resource = self.makeResourceWithContent(
381             'a'*length, type=contentType, encoding=contentEncoding)
382         request = DummyRequest([])
383         resource.makeProducer(request, resource.openForReading())
384         self.assertEqual(
385             {'content-type': contentType, 'content-length': str(length),
386              'content-encoding': contentEncoding},
387             self.contentHeaders(request))
388
389
390     def test_singleRangeGivesSingleRangeStaticProducer(self):
391         """
392         makeProducer when the Range header requests a single byte range
393         returns an instance of SingleRangeStaticProducer.
394         """
395         request = DummyRequest([])
396         request.headers['range'] = 'bytes=1-3'
397         resource = self.makeResourceWithContent('abcdef')
398         producer = resource.makeProducer(request, resource.openForReading())
399         self.assertIsInstance(producer, static.SingleRangeStaticProducer)
400
401
402     def test_singleRangeSets206PartialContent(self):
403         """
404         makeProducer when the Range header requests a single, satisfiable byte
405         range sets the response code on the request to 'Partial Content'.
406         """
407         request = DummyRequest([])
408         request.headers['range'] = 'bytes=1-3'
409         resource = self.makeResourceWithContent('abcdef')
410         resource.makeProducer(request, resource.openForReading())
411         self.assertEqual(
412             http.PARTIAL_CONTENT, request.responseCode)
413
414
415     def test_singleRangeSetsContentHeaders(self):
416         """
417         makeProducer when the Range header requests a single, satisfiable byte
418         range sets the Content-* headers appropriately.
419         """
420         request = DummyRequest([])
421         request.headers['range'] = 'bytes=1-3'
422         contentType = "text/plain"
423         contentEncoding = 'gzip'
424         resource = self.makeResourceWithContent('abcdef', type=contentType, encoding=contentEncoding)
425         resource.makeProducer(request, resource.openForReading())
426         self.assertEqual(
427             {'content-type': contentType, 'content-encoding': contentEncoding,
428              'content-range': 'bytes 1-3/6', 'content-length': '3'},
429             self.contentHeaders(request))
430
431
432     def test_singleUnsatisfiableRangeReturnsSingleRangeStaticProducer(self):
433         """
434         makeProducer still returns an instance of L{SingleRangeStaticProducer}
435         when the Range header requests a single unsatisfiable byte range.
436         """
437         request = DummyRequest([])
438         request.headers['range'] = 'bytes=4-10'
439         resource = self.makeResourceWithContent('abc')
440         producer = resource.makeProducer(request, resource.openForReading())
441         self.assertIsInstance(producer, static.SingleRangeStaticProducer)
442
443
444     def test_singleUnsatisfiableRangeSets416ReqestedRangeNotSatisfiable(self):
445         """
446         makeProducer sets the response code of the request to of 'Requested
447         Range Not Satisfiable' when the Range header requests a single
448         unsatisfiable byte range.
449         """
450         request = DummyRequest([])
451         request.headers['range'] = 'bytes=4-10'
452         resource = self.makeResourceWithContent('abc')
453         resource.makeProducer(request, resource.openForReading())
454         self.assertEqual(
455             http.REQUESTED_RANGE_NOT_SATISFIABLE, request.responseCode)
456
457
458     def test_singleUnsatisfiableRangeSetsContentHeaders(self):
459         """
460         makeProducer when the Range header requests a single, unsatisfiable
461         byte range sets the Content-* headers appropriately.
462         """
463         request = DummyRequest([])
464         request.headers['range'] = 'bytes=4-10'
465         contentType = "text/plain"
466         resource = self.makeResourceWithContent('abc', type=contentType)
467         resource.makeProducer(request, resource.openForReading())
468         self.assertEqual(
469             {'content-type': 'text/plain', 'content-length': '0',
470              'content-range': 'bytes */3'},
471             self.contentHeaders(request))
472
473
474     def test_singlePartiallyOverlappingRangeSetsContentHeaders(self):
475         """
476         makeProducer when the Range header requests a single byte range that
477         partly overlaps the resource sets the Content-* headers appropriately.
478         """
479         request = DummyRequest([])
480         request.headers['range'] = 'bytes=2-10'
481         contentType = "text/plain"
482         resource = self.makeResourceWithContent('abc', type=contentType)
483         resource.makeProducer(request, resource.openForReading())
484         self.assertEqual(
485             {'content-type': 'text/plain', 'content-length': '1',
486              'content-range': 'bytes 2-2/3'},
487             self.contentHeaders(request))
488
489
490     def test_multipleRangeGivesMultipleRangeStaticProducer(self):
491         """
492         makeProducer when the Range header requests a single byte range
493         returns an instance of MultipleRangeStaticProducer.
494         """
495         request = DummyRequest([])
496         request.headers['range'] = 'bytes=1-3,5-6'
497         resource = self.makeResourceWithContent('abcdef')
498         producer = resource.makeProducer(request, resource.openForReading())
499         self.assertIsInstance(producer, static.MultipleRangeStaticProducer)
500
501
502     def test_multipleRangeSets206PartialContent(self):
503         """
504         makeProducer when the Range header requests a multiple satisfiable
505         byte ranges sets the response code on the request to 'Partial
506         Content'.
507         """
508         request = DummyRequest([])
509         request.headers['range'] = 'bytes=1-3,5-6'
510         resource = self.makeResourceWithContent('abcdef')
511         resource.makeProducer(request, resource.openForReading())
512         self.assertEqual(
513             http.PARTIAL_CONTENT, request.responseCode)
514
515
516     def test_mutipleRangeSetsContentHeaders(self):
517         """
518         makeProducer when the Range header requests a single, satisfiable byte
519         range sets the Content-* headers appropriately.
520         """
521         request = DummyRequest([])
522         request.headers['range'] = 'bytes=1-3,5-6'
523         resource = self.makeResourceWithContent(
524             'abcdefghijkl', encoding='gzip')
525         producer = resource.makeProducer(request, resource.openForReading())
526         contentHeaders = self.contentHeaders(request)
527         # The only content-* headers set are content-type and content-length.
528         self.assertEqual(
529             set(['content-length', 'content-type']),
530             set(contentHeaders.keys()))
531         # The content-length depends on the boundary used in the response.
532         expectedLength = 5
533         for boundary, offset, size in producer.rangeInfo:
534             expectedLength += len(boundary)
535         self.assertEqual(expectedLength, contentHeaders['content-length'])
536         # Content-type should be set to a value indicating a multipart
537         # response and the boundary used to separate the parts.
538         self.assertIn('content-type', contentHeaders)
539         contentType = contentHeaders['content-type']
540         self.assertNotIdentical(
541             None, re.match(
542                 'multipart/byteranges; boundary="[^"]*"\Z', contentType))
543         # Content-encoding is not set in the response to a multiple range
544         # response, which is a bit wussy but works well enough with the way
545         # static.File does content-encodings...
546         self.assertNotIn('content-encoding', contentHeaders)
547
548
549     def test_multipleUnsatisfiableRangesReturnsMultipleRangeStaticProducer(self):
550         """
551         makeProducer still returns an instance of L{SingleRangeStaticProducer}
552         when the Range header requests multiple ranges, none of which are
553         satisfiable.
554         """
555         request = DummyRequest([])
556         request.headers['range'] = 'bytes=10-12,15-20'
557         resource = self.makeResourceWithContent('abc')
558         producer = resource.makeProducer(request, resource.openForReading())
559         self.assertIsInstance(producer, static.MultipleRangeStaticProducer)
560
561
562     def test_multipleUnsatisfiableRangesSets416ReqestedRangeNotSatisfiable(self):
563         """
564         makeProducer sets the response code of the request to of 'Requested
565         Range Not Satisfiable' when the Range header requests multiple ranges,
566         none of which are satisfiable.
567         """
568         request = DummyRequest([])
569         request.headers['range'] = 'bytes=10-12,15-20'
570         resource = self.makeResourceWithContent('abc')
571         resource.makeProducer(request, resource.openForReading())
572         self.assertEqual(
573             http.REQUESTED_RANGE_NOT_SATISFIABLE, request.responseCode)
574
575
576     def test_multipleUnsatisfiableRangeSetsContentHeaders(self):
577         """
578         makeProducer when the Range header requests multiple ranges, none of
579         which are satisfiable, sets the Content-* headers appropriately.
580         """
581         request = DummyRequest([])
582         request.headers['range'] = 'bytes=4-10'
583         contentType = "text/plain"
584         request.headers['range'] = 'bytes=10-12,15-20'
585         resource = self.makeResourceWithContent('abc', type=contentType)
586         resource.makeProducer(request, resource.openForReading())
587         self.assertEqual(
588             {'content-length': '0', 'content-range': 'bytes */3'},
589             self.contentHeaders(request))
590
591
592     def test_oneSatisfiableRangeIsEnough(self):
593         """
594         makeProducer when the Range header requests multiple ranges, at least
595         one of which matches, sets the response code to 'Partial Content'.
596         """
597         request = DummyRequest([])
598         request.headers['range'] = 'bytes=1-3,100-200'
599         resource = self.makeResourceWithContent('abcdef')
600         resource.makeProducer(request, resource.openForReading())
601         self.assertEqual(
602             http.PARTIAL_CONTENT, request.responseCode)
603
604
605
606 class StaticProducerTests(TestCase):
607     """
608     Tests for the abstract L{StaticProducer}.
609     """
610
611     def test_stopProducingClosesFile(self):
612         """
613         L{StaticProducer.stopProducing} closes the file object the producer is
614         producing data from.
615         """
616         fileObject = StringIO.StringIO()
617         producer = static.StaticProducer(None, fileObject)
618         producer.stopProducing()
619         self.assertTrue(fileObject.closed)
620
621
622     def test_stopProducingSetsRequestToNone(self):
623         """
624         L{StaticProducer.stopProducing} sets the request instance variable to
625         None, which indicates to subclasses' resumeProducing methods that no
626         more data should be produced.
627         """
628         fileObject = StringIO.StringIO()
629         producer = static.StaticProducer(DummyRequest([]), fileObject)
630         producer.stopProducing()
631         self.assertIdentical(None, producer.request)
632
633
634
635 class NoRangeStaticProducerTests(TestCase):
636     """
637     Tests for L{NoRangeStaticProducer}.
638     """
639
640     def test_implementsIPullProducer(self):
641         """
642         L{NoRangeStaticProducer} implements L{IPullProducer}.
643         """
644         verifyObject(
645             interfaces.IPullProducer,
646             static.NoRangeStaticProducer(None, None))
647
648
649     def test_resumeProducingProducesContent(self):
650         """
651         L{NoRangeStaticProducer.resumeProducing} writes content from the
652         resource to the request.
653         """
654         request = DummyRequest([])
655         content = 'abcdef'
656         producer = static.NoRangeStaticProducer(
657             request, StringIO.StringIO(content))
658         # start calls registerProducer on the DummyRequest, which pulls all
659         # output from the producer and so we just need this one call.
660         producer.start()
661         self.assertEqual(content, ''.join(request.written))
662
663
664     def test_resumeProducingBuffersOutput(self):
665         """
666         L{NoRangeStaticProducer.start} writes at most
667         C{abstract.FileDescriptor.bufferSize} bytes of content from the
668         resource to the request at once.
669         """
670         request = DummyRequest([])
671         bufferSize = abstract.FileDescriptor.bufferSize
672         content = 'a' * (2*bufferSize + 1)
673         producer = static.NoRangeStaticProducer(
674             request, StringIO.StringIO(content))
675         # start calls registerProducer on the DummyRequest, which pulls all
676         # output from the producer and so we just need this one call.
677         producer.start()
678         expected = [
679             content[0:bufferSize],
680             content[bufferSize:2*bufferSize],
681             content[2*bufferSize:]
682             ]
683         self.assertEqual(expected, request.written)
684
685
686     def test_finishCalledWhenDone(self):
687         """
688         L{NoRangeStaticProducer.resumeProducing} calls finish() on the request
689         after it is done producing content.
690         """
691         request = DummyRequest([])
692         finishDeferred = request.notifyFinish()
693         callbackList = []
694         finishDeferred.addCallback(callbackList.append)
695         producer = static.NoRangeStaticProducer(
696             request, StringIO.StringIO('abcdef'))
697         # start calls registerProducer on the DummyRequest, which pulls all
698         # output from the producer and so we just need this one call.
699         producer.start()
700         self.assertEqual([None], callbackList)
701
702
703
704 class SingleRangeStaticProducerTests(TestCase):
705     """
706     Tests for L{SingleRangeStaticProducer}.
707     """
708
709     def test_implementsIPullProducer(self):
710         """
711         L{SingleRangeStaticProducer} implements L{IPullProducer}.
712         """
713         verifyObject(
714             interfaces.IPullProducer,
715             static.SingleRangeStaticProducer(None, None, None, None))
716
717
718     def test_resumeProducingProducesContent(self):
719         """
720         L{SingleRangeStaticProducer.resumeProducing} writes the given amount
721         of content, starting at the given offset, from the resource to the
722         request.
723         """
724         request = DummyRequest([])
725         content = 'abcdef'
726         producer = static.SingleRangeStaticProducer(
727             request, StringIO.StringIO(content), 1, 3)
728         # DummyRequest.registerProducer pulls all output from the producer, so
729         # we just need to call start.
730         producer.start()
731         self.assertEqual(content[1:4], ''.join(request.written))
732
733
734     def test_resumeProducingBuffersOutput(self):
735         """
736         L{SingleRangeStaticProducer.start} writes at most
737         C{abstract.FileDescriptor.bufferSize} bytes of content from the
738         resource to the request at once.
739         """
740         request = DummyRequest([])
741         bufferSize = abstract.FileDescriptor.bufferSize
742         content = 'abc' * bufferSize
743         producer = static.SingleRangeStaticProducer(
744             request, StringIO.StringIO(content), 1, bufferSize+10)
745         # DummyRequest.registerProducer pulls all output from the producer, so
746         # we just need to call start.
747         producer.start()
748         expected = [
749             content[1:bufferSize+1],
750             content[bufferSize+1:bufferSize+11],
751             ]
752         self.assertEqual(expected, request.written)
753
754
755     def test_finishCalledWhenDone(self):
756         """
757         L{SingleRangeStaticProducer.resumeProducing} calls finish() on the
758         request after it is done producing content.
759         """
760         request = DummyRequest([])
761         finishDeferred = request.notifyFinish()
762         callbackList = []
763         finishDeferred.addCallback(callbackList.append)
764         producer = static.SingleRangeStaticProducer(
765             request, StringIO.StringIO('abcdef'), 1, 1)
766         # start calls registerProducer on the DummyRequest, which pulls all
767         # output from the producer and so we just need this one call.
768         producer.start()
769         self.assertEqual([None], callbackList)
770
771
772
773 class MultipleRangeStaticProducerTests(TestCase):
774     """
775     Tests for L{MultipleRangeStaticProducer}.
776     """
777
778     def test_implementsIPullProducer(self):
779         """
780         L{MultipleRangeStaticProducer} implements L{IPullProducer}.
781         """
782         verifyObject(
783             interfaces.IPullProducer,
784             static.MultipleRangeStaticProducer(None, None, None))
785
786
787     def test_resumeProducingProducesContent(self):
788         """
789         L{MultipleRangeStaticProducer.resumeProducing} writes the requested
790         chunks of content from the resource to the request, with the supplied
791         boundaries in between each chunk.
792         """
793         request = DummyRequest([])
794         content = 'abcdef'
795         producer = static.MultipleRangeStaticProducer(
796             request, StringIO.StringIO(content), [('1', 1, 3), ('2', 5, 1)])
797         # DummyRequest.registerProducer pulls all output from the producer, so
798         # we just need to call start.
799         producer.start()
800         self.assertEqual('1bcd2f', ''.join(request.written))
801
802
803     def test_resumeProducingBuffersOutput(self):
804         """
805         L{MultipleRangeStaticProducer.start} writes about
806         C{abstract.FileDescriptor.bufferSize} bytes of content from the
807         resource to the request at once.
808
809         To be specific about the 'about' above: it can write slightly more,
810         for example in the case where the first boundary plus the first chunk
811         is less than C{bufferSize} but first boundary plus the first chunk
812         plus the second boundary is more, but this is unimportant as in
813         practice the boundaries are fairly small.  On the other side, it is
814         important for performance to bundle up several small chunks into one
815         call to request.write.
816         """
817         request = DummyRequest([])
818         content = '0123456789' * 2
819         producer = static.MultipleRangeStaticProducer(
820             request, StringIO.StringIO(content),
821             [('a', 0, 2), ('b', 5, 10), ('c', 0, 0)])
822         producer.bufferSize = 10
823         # DummyRequest.registerProducer pulls all output from the producer, so
824         # we just need to call start.
825         producer.start()
826         expected = [
827             'a' + content[0:2] + 'b' + content[5:11],
828             content[11:15] + 'c',
829             ]
830         self.assertEqual(expected, request.written)
831
832
833     def test_finishCalledWhenDone(self):
834         """
835         L{MultipleRangeStaticProducer.resumeProducing} calls finish() on the
836         request after it is done producing content.
837         """
838         request = DummyRequest([])
839         finishDeferred = request.notifyFinish()
840         callbackList = []
841         finishDeferred.addCallback(callbackList.append)
842         producer = static.MultipleRangeStaticProducer(
843             request, StringIO.StringIO('abcdef'), [('', 1, 2)])
844         # start calls registerProducer on the DummyRequest, which pulls all
845         # output from the producer and so we just need this one call.
846         producer.start()
847         self.assertEqual([None], callbackList)
848
849
850
851 class RangeTests(TestCase):
852     """
853     Tests for I{Range-Header} support in L{twisted.web.static.File}.
854
855     @type file: L{file}
856     @ivar file: Temporary (binary) file containing the content to be served.
857
858     @type resource: L{static.File}
859     @ivar resource: A leaf web resource using C{file} as content.
860
861     @type request: L{DummyRequest}
862     @ivar request: A fake request, requesting C{resource}.
863
864     @type catcher: L{list}
865     @ivar catcher: List which gathers all log information.
866     """
867     def setUp(self):
868         """
869         Create a temporary file with a fixed payload of 64 bytes.  Create a
870         resource for that file and create a request which will be for that
871         resource.  Each test can set a different range header to test different
872         aspects of the implementation.
873         """
874         path = FilePath(self.mktemp())
875         # This is just a jumble of random stuff.  It's supposed to be a good
876         # set of data for this test, particularly in order to avoid
877         # accidentally seeing the right result by having a byte sequence
878         # repeated at different locations or by having byte values which are
879         # somehow correlated with their position in the string.
880         self.payload = ('\xf8u\xf3E\x8c7\xce\x00\x9e\xb6a0y0S\xf0\xef\xac\xb7'
881                         '\xbe\xb5\x17M\x1e\x136k{\x1e\xbe\x0c\x07\x07\t\xd0'
882                         '\xbckY\xf5I\x0b\xb8\x88oZ\x1d\x85b\x1a\xcdk\xf2\x1d'
883                         '&\xfd%\xdd\x82q/A\x10Y\x8b')
884         path.setContent(self.payload)
885         self.file = path.open()
886         self.resource = static.File(self.file.name)
887         self.resource.isLeaf = 1
888         self.request = DummyRequest([''])
889         self.request.uri = self.file.name
890         self.catcher = []
891         log.addObserver(self.catcher.append)
892
893
894     def tearDown(self):
895         """
896         Clean up the resource file and the log observer.
897         """
898         self.file.close()
899         log.removeObserver(self.catcher.append)
900
901
902     def _assertLogged(self, expected):
903         """
904         Asserts that a given log message occurred with an expected message.
905         """
906         logItem = self.catcher.pop()
907         self.assertEqual(logItem["message"][0], expected)
908         self.assertEqual(
909             self.catcher, [], "An additional log occured: %r" % (logItem,))
910
911
912     def test_invalidRanges(self):
913         """
914         L{File._parseRangeHeader} raises L{ValueError} when passed
915         syntactically invalid byte ranges.
916         """
917         f = self.resource._parseRangeHeader
918
919         # there's no =
920         self.assertRaises(ValueError, f, 'bytes')
921
922         # unknown isn't a valid Bytes-Unit
923         self.assertRaises(ValueError, f, 'unknown=1-2')
924
925         # there's no - in =stuff
926         self.assertRaises(ValueError, f, 'bytes=3')
927
928         # both start and end are empty
929         self.assertRaises(ValueError, f, 'bytes=-')
930
931         # start isn't an integer
932         self.assertRaises(ValueError, f, 'bytes=foo-')
933
934         # end isn't an integer
935         self.assertRaises(ValueError, f, 'bytes=-foo')
936
937         # end isn't equal to or greater than start
938         self.assertRaises(ValueError, f, 'bytes=5-4')
939
940
941     def test_rangeMissingStop(self):
942         """
943         A single bytes range without an explicit stop position is parsed into a
944         two-tuple giving the start position and C{None}.
945         """
946         self.assertEqual(
947             self.resource._parseRangeHeader('bytes=0-'), [(0, None)])
948
949
950     def test_rangeMissingStart(self):
951         """
952         A single bytes range without an explicit start position is parsed into
953         a two-tuple of C{None} and the end position.
954         """
955         self.assertEqual(
956             self.resource._parseRangeHeader('bytes=-3'), [(None, 3)])
957
958
959     def test_range(self):
960         """
961         A single bytes range with explicit start and stop positions is parsed
962         into a two-tuple of those positions.
963         """
964         self.assertEqual(
965             self.resource._parseRangeHeader('bytes=2-5'), [(2, 5)])
966
967
968     def test_rangeWithSpace(self):
969         """
970         A single bytes range with whitespace in allowed places is parsed in
971         the same way as it would be without the whitespace.
972         """
973         self.assertEqual(
974             self.resource._parseRangeHeader(' bytes=1-2 '), [(1, 2)])
975         self.assertEqual(
976             self.resource._parseRangeHeader('bytes =1-2 '), [(1, 2)])
977         self.assertEqual(
978             self.resource._parseRangeHeader('bytes= 1-2'), [(1, 2)])
979         self.assertEqual(
980             self.resource._parseRangeHeader('bytes=1 -2'), [(1, 2)])
981         self.assertEqual(
982             self.resource._parseRangeHeader('bytes=1- 2'), [(1, 2)])
983         self.assertEqual(
984             self.resource._parseRangeHeader('bytes=1-2 '), [(1, 2)])
985
986
987     def test_nullRangeElements(self):
988         """
989         If there are multiple byte ranges but only one is non-null, the
990         non-null range is parsed and its start and stop returned.
991         """
992         self.assertEqual(
993             self.resource._parseRangeHeader('bytes=1-2,\r\n, ,\t'), [(1, 2)])
994
995
996     def test_multipleRanges(self):
997         """
998         If multiple byte ranges are specified their starts and stops are
999         returned.
1000         """
1001         self.assertEqual(
1002             self.resource._parseRangeHeader('bytes=1-2,3-4'),
1003             [(1, 2), (3, 4)])
1004
1005
1006     def test_bodyLength(self):
1007         """
1008         A correct response to a range request is as long as the length of the
1009         requested range.
1010         """
1011         self.request.headers['range'] = 'bytes=0-43'
1012         self.resource.render(self.request)
1013         self.assertEqual(len(''.join(self.request.written)), 44)
1014
1015
1016     def test_invalidRangeRequest(self):
1017         """
1018         An incorrect range request (RFC 2616 defines a correct range request as
1019         a Bytes-Unit followed by a '=' character followed by a specific range.
1020         Only 'bytes' is defined) results in the range header value being logged
1021         and a normal 200 response being sent.
1022         """
1023         self.request.headers['range'] = range = 'foobar=0-43'
1024         self.resource.render(self.request)
1025         expected = "Ignoring malformed Range header %r" % (range,)
1026         self._assertLogged(expected)
1027         self.assertEqual(''.join(self.request.written), self.payload)
1028         self.assertEqual(self.request.responseCode, http.OK)
1029         self.assertEqual(
1030             self.request.outgoingHeaders['content-length'],
1031             str(len(self.payload)))
1032
1033
1034     def parseMultipartBody(self, body, boundary):
1035         """
1036         Parse C{body} as a multipart MIME response separated by C{boundary}.
1037
1038         Note that this with fail the calling test on certain syntactic
1039         problems.
1040         """
1041         sep = "\r\n--" + boundary
1042         parts = ''.join(body).split(sep)
1043         self.assertEqual('', parts[0])
1044         self.assertEqual('--\r\n', parts[-1])
1045         parsed_parts = []
1046         for part in parts[1:-1]:
1047             before, header1, header2, blank, partBody = part.split('\r\n', 4)
1048             headers = header1 + '\n' + header2
1049             self.assertEqual('', before)
1050             self.assertEqual('', blank)
1051             partContentTypeValue = re.search(
1052                 '^content-type: (.*)$', headers, re.I|re.M).group(1)
1053             start, end, size = re.search(
1054                 '^content-range: bytes ([0-9]+)-([0-9]+)/([0-9]+)$',
1055                 headers, re.I|re.M).groups()
1056             parsed_parts.append(
1057                 {'contentType': partContentTypeValue,
1058                  'contentRange': (start, end, size),
1059                  'body': partBody})
1060         return parsed_parts
1061
1062
1063     def test_multipleRangeRequest(self):
1064         """
1065         The response to a request for multipe bytes ranges is a MIME-ish
1066         multipart response.
1067         """
1068         startEnds = [(0, 2), (20, 30), (40, 50)]
1069         rangeHeaderValue = ','.join(["%s-%s"%(s,e) for (s, e) in startEnds])
1070         self.request.headers['range'] = 'bytes=' + rangeHeaderValue
1071         self.resource.render(self.request)
1072         self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1073         boundary = re.match(
1074             '^multipart/byteranges; boundary="(.*)"$',
1075             self.request.outgoingHeaders['content-type']).group(1)
1076         parts = self.parseMultipartBody(''.join(self.request.written), boundary)
1077         self.assertEqual(len(startEnds), len(parts))
1078         for part, (s, e) in zip(parts, startEnds):
1079             self.assertEqual(self.resource.type, part['contentType'])
1080             start, end, size = part['contentRange']
1081             self.assertEqual(int(start), s)
1082             self.assertEqual(int(end), e)
1083             self.assertEqual(int(size), self.resource.getFileSize())
1084             self.assertEqual(self.payload[s:e+1], part['body'])
1085
1086
1087     def test_multipleRangeRequestWithRangeOverlappingEnd(self):
1088         """
1089         The response to a request for multipe bytes ranges is a MIME-ish
1090         multipart response, even when one of the ranged falls off the end of
1091         the resource.
1092         """
1093         startEnds = [(0, 2), (40, len(self.payload) + 10)]
1094         rangeHeaderValue = ','.join(["%s-%s"%(s,e) for (s, e) in startEnds])
1095         self.request.headers['range'] = 'bytes=' + rangeHeaderValue
1096         self.resource.render(self.request)
1097         self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1098         boundary = re.match(
1099             '^multipart/byteranges; boundary="(.*)"$',
1100             self.request.outgoingHeaders['content-type']).group(1)
1101         parts = self.parseMultipartBody(''.join(self.request.written), boundary)
1102         self.assertEqual(len(startEnds), len(parts))
1103         for part, (s, e) in zip(parts, startEnds):
1104             self.assertEqual(self.resource.type, part['contentType'])
1105             start, end, size = part['contentRange']
1106             self.assertEqual(int(start), s)
1107             self.assertEqual(int(end), min(e, self.resource.getFileSize()-1))
1108             self.assertEqual(int(size), self.resource.getFileSize())
1109             self.assertEqual(self.payload[s:e+1], part['body'])
1110
1111
1112     def test_implicitEnd(self):
1113         """
1114         If the end byte position is omitted, then it is treated as if the
1115         length of the resource was specified by the end byte position.
1116         """
1117         self.request.headers['range'] = 'bytes=23-'
1118         self.resource.render(self.request)
1119         self.assertEqual(''.join(self.request.written), self.payload[23:])
1120         self.assertEqual(len(''.join(self.request.written)), 41)
1121         self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1122         self.assertEqual(
1123             self.request.outgoingHeaders['content-range'], 'bytes 23-63/64')
1124         self.assertEqual(self.request.outgoingHeaders['content-length'], '41')
1125
1126
1127     def test_implicitStart(self):
1128         """
1129         If the start byte position is omitted but the end byte position is
1130         supplied, then the range is treated as requesting the last -N bytes of
1131         the resource, where N is the end byte position.
1132         """
1133         self.request.headers['range'] = 'bytes=-17'
1134         self.resource.render(self.request)
1135         self.assertEqual(''.join(self.request.written), self.payload[-17:])
1136         self.assertEqual(len(''.join(self.request.written)), 17)
1137         self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1138         self.assertEqual(
1139             self.request.outgoingHeaders['content-range'], 'bytes 47-63/64')
1140         self.assertEqual(self.request.outgoingHeaders['content-length'], '17')
1141
1142
1143     def test_explicitRange(self):
1144         """
1145         A correct response to a bytes range header request from A to B starts
1146         with the A'th byte and ends with (including) the B'th byte. The first
1147         byte of a page is numbered with 0.
1148         """
1149         self.request.headers['range'] = 'bytes=3-43'
1150         self.resource.render(self.request)
1151         written = ''.join(self.request.written)
1152         self.assertEqual(written, self.payload[3:44])
1153         self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1154         self.assertEqual(
1155             self.request.outgoingHeaders['content-range'], 'bytes 3-43/64')
1156         self.assertEqual(
1157             str(len(written)), self.request.outgoingHeaders['content-length'])
1158
1159
1160     def test_explicitRangeOverlappingEnd(self):
1161         """
1162         A correct response to a bytes range header request from A to B when B
1163         is past the end of the resource starts with the A'th byte and ends
1164         with the last byte of the resource. The first byte of a page is
1165         numbered with 0.
1166         """
1167         self.request.headers['range'] = 'bytes=40-100'
1168         self.resource.render(self.request)
1169         written = ''.join(self.request.written)
1170         self.assertEqual(written, self.payload[40:])
1171         self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
1172         self.assertEqual(
1173             self.request.outgoingHeaders['content-range'], 'bytes 40-63/64')
1174         self.assertEqual(
1175             str(len(written)), self.request.outgoingHeaders['content-length'])
1176
1177
1178     def test_statusCodeRequestedRangeNotSatisfiable(self):
1179         """
1180         If a range is syntactically invalid due to the start being greater than
1181         the end, the range header is ignored (the request is responded to as if
1182         it were not present).
1183         """
1184         self.request.headers['range'] = 'bytes=20-13'
1185         self.resource.render(self.request)
1186         self.assertEqual(self.request.responseCode, http.OK)
1187         self.assertEqual(''.join(self.request.written), self.payload)
1188         self.assertEqual(
1189             self.request.outgoingHeaders['content-length'],
1190             str(len(self.payload)))
1191
1192
1193     def test_invalidStartBytePos(self):
1194         """
1195         If a range is unsatisfiable due to the start not being less than the
1196         length of the resource, the response is 416 (Requested range not
1197         satisfiable) and no data is written to the response body (RFC 2616,
1198         section 14.35.1).
1199         """
1200         self.request.headers['range'] = 'bytes=67-108'
1201         self.resource.render(self.request)
1202         self.assertEqual(
1203             self.request.responseCode, http.REQUESTED_RANGE_NOT_SATISFIABLE)
1204         self.assertEqual(''.join(self.request.written), '')
1205         self.assertEqual(self.request.outgoingHeaders['content-length'], '0')
1206         # Sections 10.4.17 and 14.16
1207         self.assertEqual(
1208             self.request.outgoingHeaders['content-range'],
1209             'bytes */%d' % (len(self.payload),))
1210
1211
1212
1213 class DirectoryListerTest(TestCase):
1214     """
1215     Tests for L{static.DirectoryLister}.
1216     """
1217     def _request(self, uri):
1218         request = DummyRequest([''])
1219         request.uri = uri
1220         return request
1221
1222
1223     def test_renderHeader(self):
1224         """
1225         L{static.DirectoryLister} prints the request uri as header of the
1226         rendered content.
1227         """
1228         path = FilePath(self.mktemp())
1229         path.makedirs()
1230
1231         lister = static.DirectoryLister(path.path)
1232         data = lister.render(self._request('foo'))
1233         self.assertIn("<h1>Directory listing for foo</h1>", data)
1234         self.assertIn("<title>Directory listing for foo</title>", data)
1235
1236
1237     def test_renderUnquoteHeader(self):
1238         """
1239         L{static.DirectoryLister} unquote the request uri before printing it.
1240         """
1241         path = FilePath(self.mktemp())
1242         path.makedirs()
1243
1244         lister = static.DirectoryLister(path.path)
1245         data = lister.render(self._request('foo%20bar'))
1246         self.assertIn("<h1>Directory listing for foo bar</h1>", data)
1247         self.assertIn("<title>Directory listing for foo bar</title>", data)
1248
1249
1250     def test_escapeHeader(self):
1251         """
1252         L{static.DirectoryLister} escape "&", "<" and ">" after unquoting the
1253         request uri.
1254         """
1255         path = FilePath(self.mktemp())
1256         path.makedirs()
1257
1258         lister = static.DirectoryLister(path.path)
1259         data = lister.render(self._request('foo%26bar'))
1260         self.assertIn("<h1>Directory listing for foo&amp;bar</h1>", data)
1261         self.assertIn("<title>Directory listing for foo&amp;bar</title>", data)
1262
1263
1264     def test_renderFiles(self):
1265         """
1266         L{static.DirectoryLister} is able to list all the files inside a
1267         directory.
1268         """
1269         path = FilePath(self.mktemp())
1270         path.makedirs()
1271         path.child('file1').setContent("content1")
1272         path.child('file2').setContent("content2" * 1000)
1273
1274         lister = static.DirectoryLister(path.path)
1275         data = lister.render(self._request('foo'))
1276         body = """<tr class="odd">
1277     <td><a href="file1">file1</a></td>
1278     <td>8B</td>
1279     <td>[text/html]</td>
1280     <td></td>
1281 </tr>
1282 <tr class="even">
1283     <td><a href="file2">file2</a></td>
1284     <td>7K</td>
1285     <td>[text/html]</td>
1286     <td></td>
1287 </tr>"""
1288         self.assertIn(body, data)
1289
1290
1291     def test_renderDirectories(self):
1292         """
1293         L{static.DirectoryLister} is able to list all the directories inside
1294         a directory.
1295         """
1296         path = FilePath(self.mktemp())
1297         path.makedirs()
1298         path.child('dir1').makedirs()
1299         path.child('dir2 & 3').makedirs()
1300
1301         lister = static.DirectoryLister(path.path)
1302         data = lister.render(self._request('foo'))
1303         body = """<tr class="odd">
1304     <td><a href="dir1/">dir1/</a></td>
1305     <td></td>
1306     <td>[Directory]</td>
1307     <td></td>
1308 </tr>
1309 <tr class="even">
1310     <td><a href="dir2%20%26%203/">dir2 &amp; 3/</a></td>
1311     <td></td>
1312     <td>[Directory]</td>
1313     <td></td>
1314 </tr>"""
1315         self.assertIn(body, data)
1316
1317
1318     def test_renderFiltered(self):
1319         """
1320         L{static.DirectoryLister} takes a optional C{dirs} argument that
1321         filter out the list of of directories and files printed.
1322         """
1323         path = FilePath(self.mktemp())
1324         path.makedirs()
1325         path.child('dir1').makedirs()
1326         path.child('dir2').makedirs()
1327         path.child('dir3').makedirs()
1328         lister = static.DirectoryLister(path.path, dirs=["dir1", "dir3"])
1329         data = lister.render(self._request('foo'))
1330         body = """<tr class="odd">
1331     <td><a href="dir1/">dir1/</a></td>
1332     <td></td>
1333     <td>[Directory]</td>
1334     <td></td>
1335 </tr>
1336 <tr class="even">
1337     <td><a href="dir3/">dir3/</a></td>
1338     <td></td>
1339     <td>[Directory]</td>
1340     <td></td>
1341 </tr>"""
1342         self.assertIn(body, data)
1343
1344
1345     def test_oddAndEven(self):
1346         """
1347         L{static.DirectoryLister} gives an alternate class for each odd and
1348         even rows in the table.
1349         """
1350         lister = static.DirectoryLister(None)
1351         elements = [{"href": "", "text": "", "size": "", "type": "",
1352                      "encoding": ""}  for i in xrange(5)]
1353         content = lister._buildTableContent(elements)
1354
1355         self.assertEqual(len(content), 5)
1356         self.assertTrue(content[0].startswith('<tr class="odd">'))
1357         self.assertTrue(content[1].startswith('<tr class="even">'))
1358         self.assertTrue(content[2].startswith('<tr class="odd">'))
1359         self.assertTrue(content[3].startswith('<tr class="even">'))
1360         self.assertTrue(content[4].startswith('<tr class="odd">'))
1361
1362
1363     def test_contentType(self):
1364         """
1365         L{static.DirectoryLister} produces a MIME-type that indicates that it is
1366         HTML, and includes its charset (UTF-8).
1367         """
1368         path = FilePath(self.mktemp())
1369         path.makedirs()
1370         lister = static.DirectoryLister(path.path)
1371         req = self._request('')
1372         lister.render(req)
1373         self.assertEqual(req.outgoingHeaders['content-type'],
1374                           "text/html; charset=utf-8")
1375
1376
1377     def test_mimeTypeAndEncodings(self):
1378         """
1379         L{static.DirectoryLister} is able to detect mimetype and encoding of
1380         listed files.
1381         """
1382         path = FilePath(self.mktemp())
1383         path.makedirs()
1384         path.child('file1.txt').setContent("file1")
1385         path.child('file2.py').setContent("python")
1386         path.child('file3.conf.gz').setContent("conf compressed")
1387         path.child('file4.diff.bz2').setContent("diff compressed")
1388         directory = os.listdir(path.path)
1389         directory.sort()
1390
1391         contentTypes = {
1392             ".txt": "text/plain",
1393             ".py": "text/python",
1394             ".conf": "text/configuration",
1395             ".diff": "text/diff"
1396         }
1397
1398         lister = static.DirectoryLister(path.path, contentTypes=contentTypes)
1399         dirs, files = lister._getFilesAndDirectories(directory)
1400         self.assertEqual(dirs, [])
1401         self.assertEqual(files, [
1402             {'encoding': '',
1403              'href': 'file1.txt',
1404              'size': '5B',
1405              'text': 'file1.txt',
1406              'type': '[text/plain]'},
1407             {'encoding': '',
1408              'href': 'file2.py',
1409              'size': '6B',
1410              'text': 'file2.py',
1411              'type': '[text/python]'},
1412             {'encoding': '[gzip]',
1413              'href': 'file3.conf.gz',
1414              'size': '15B',
1415              'text': 'file3.conf.gz',
1416              'type': '[text/configuration]'},
1417             {'encoding': '[bzip2]',
1418              'href': 'file4.diff.bz2',
1419              'size': '15B',
1420              'text': 'file4.diff.bz2',
1421              'type': '[text/diff]'}])
1422
1423
1424     def test_brokenSymlink(self):
1425         """
1426         If on the file in the listing points to a broken symlink, it should not
1427         be returned by L{static.DirectoryLister._getFilesAndDirectories}.
1428         """
1429         path = FilePath(self.mktemp())
1430         path.makedirs()
1431         file1 = path.child('file1')
1432         file1.setContent("file1")
1433         file1.linkTo(path.child("file2"))
1434         file1.remove()
1435
1436         lister = static.DirectoryLister(path.path)
1437         directory = os.listdir(path.path)
1438         directory.sort()
1439         dirs, files = lister._getFilesAndDirectories(directory)
1440         self.assertEqual(dirs, [])
1441         self.assertEqual(files, [])
1442
1443     if getattr(os, "symlink", None) is None:
1444         test_brokenSymlink.skip = "No symlink support"
1445
1446
1447     def test_childrenNotFound(self):
1448         """
1449         Any child resource of L{static.DirectoryLister} renders an HTTP
1450         I{NOT FOUND} response code.
1451         """
1452         path = FilePath(self.mktemp())
1453         path.makedirs()
1454         lister = static.DirectoryLister(path.path)
1455         request = self._request('')
1456         child = resource.getChildForRequest(lister, request)
1457         result = _render(child, request)
1458         def cbRendered(ignored):
1459             self.assertEqual(request.responseCode, http.NOT_FOUND)
1460         result.addCallback(cbRendered)
1461         return result
1462
1463
1464     def test_repr(self):
1465         """
1466         L{static.DirectoryLister.__repr__} gives the path of the lister.
1467         """
1468         path = FilePath(self.mktemp())
1469         lister = static.DirectoryLister(path.path)
1470         self.assertEqual(repr(lister),
1471                           "<DirectoryLister of %r>" % (path.path,))
1472         self.assertEqual(str(lister),
1473                           "<DirectoryLister of %r>" % (path.path,))
1474
1475     def test_formatFileSize(self):
1476         """
1477         L{static.formatFileSize} format an amount of bytes into a more readable
1478         format.
1479         """
1480         self.assertEqual(static.formatFileSize(0), "0B")
1481         self.assertEqual(static.formatFileSize(123), "123B")
1482         self.assertEqual(static.formatFileSize(4567), "4K")
1483         self.assertEqual(static.formatFileSize(8900000), "8M")
1484         self.assertEqual(static.formatFileSize(1234000000), "1G")
1485         self.assertEqual(static.formatFileSize(1234567890000), "1149G")
1486
1487
1488
1489 class TestFileTransferDeprecated(TestCase):
1490     """
1491     L{static.FileTransfer} is deprecated.
1492     """
1493
1494     def test_deprecation(self):
1495         """
1496         Instantiation of L{FileTransfer} produces a deprecation warning.
1497         """
1498         static.FileTransfer(StringIO.StringIO(), 0, DummyRequest([]))
1499         warnings = self.flushWarnings([self.test_deprecation])
1500         self.assertEqual(len(warnings), 1)
1501         self.assertEqual(warnings[0]['category'], DeprecationWarning)
1502         self.assertEqual(
1503             warnings[0]['message'],
1504             'FileTransfer is deprecated since Twisted 9.0. '
1505             'Use a subclass of StaticProducer instead.')