Imported Upstream version 12.1.0
[contrib/python-twisted.git] / twisted / python / test / test_zipstream.py
1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for L{twisted.python.zipstream}
6 """
7 import sys
8 import random
9 import zipfile
10
11 from twisted.python.compat import set
12 from twisted.python import zipstream, filepath
13 from twisted.python.hashlib import md5
14 from twisted.trial import unittest, util
15
16 class FileEntryMixin:
17     """
18     File entry classes should behave as file-like objects
19     """
20     def getFileEntry(self, contents):
21         """
22         Return an appropriate zip file entry
23         """
24         filename = self.mktemp()
25         z = zipfile.ZipFile(filename, 'w', self.compression)
26         z.writestr('content', contents)
27         z.close()
28         z = zipstream.ChunkingZipFile(filename, 'r')
29         return z.readfile('content')
30
31
32     def test_isatty(self):
33         """
34         zip files should not be ttys, so isatty() should be false
35         """
36         self.assertEqual(self.getFileEntry('').isatty(), False)
37
38
39     def test_closed(self):
40         """
41         The C{closed} attribute should reflect whether C{close()} has been
42         called.
43         """
44         fileEntry = self.getFileEntry('')
45         self.assertEqual(fileEntry.closed, False)
46         fileEntry.close()
47         self.assertEqual(fileEntry.closed, True)
48
49
50     def test_readline(self):
51         """
52         C{readline()} should mirror L{file.readline} and return up to a single
53         deliminter.
54         """
55         fileEntry = self.getFileEntry('hoho\nho')
56         self.assertEqual(fileEntry.readline(), 'hoho\n')
57         self.assertEqual(fileEntry.readline(), 'ho')
58         self.assertEqual(fileEntry.readline(), '')
59
60
61     def test_next(self):
62         """
63         Zip file entries should implement the iterator protocol as files do.
64         """
65         fileEntry = self.getFileEntry('ho\nhoho')
66         self.assertEqual(fileEntry.next(), 'ho\n')
67         self.assertEqual(fileEntry.next(), 'hoho')
68         self.assertRaises(StopIteration, fileEntry.next)
69
70
71     def test_readlines(self):
72         """
73         C{readlines()} should return a list of all the lines.
74         """
75         fileEntry = self.getFileEntry('ho\nho\nho')
76         self.assertEqual(fileEntry.readlines(), ['ho\n', 'ho\n', 'ho'])
77
78
79     def test_iteration(self):
80         """
81         C{__iter__()} and C{xreadlines()} should return C{self}.
82         """
83         fileEntry = self.getFileEntry('')
84         self.assertIdentical(iter(fileEntry), fileEntry)
85         self.assertIdentical(fileEntry.xreadlines(), fileEntry)
86
87
88     def test_readWhole(self):
89         """
90         C{.read()} should read the entire file.
91         """
92         contents = "Hello, world!"
93         entry = self.getFileEntry(contents)
94         self.assertEqual(entry.read(), contents)
95
96
97     def test_readPartial(self):
98         """
99         C{.read(num)} should read num bytes from the file.
100         """
101         contents = "0123456789"
102         entry = self.getFileEntry(contents)
103         one = entry.read(4)
104         two = entry.read(200)
105         self.assertEqual(one, "0123")
106         self.assertEqual(two, "456789")
107
108
109     def test_tell(self):
110         """
111         C{.tell()} should return the number of bytes that have been read so
112         far.
113         """
114         contents = "x" * 100
115         entry = self.getFileEntry(contents)
116         entry.read(2)
117         self.assertEqual(entry.tell(), 2)
118         entry.read(4)
119         self.assertEqual(entry.tell(), 6)
120
121
122
123 class DeflatedZipFileEntryTest(FileEntryMixin, unittest.TestCase):
124     """
125     DeflatedZipFileEntry should be file-like
126     """
127     compression = zipfile.ZIP_DEFLATED
128
129
130
131 class ZipFileEntryTest(FileEntryMixin, unittest.TestCase):
132    """
133    ZipFileEntry should be file-like
134    """
135    compression = zipfile.ZIP_STORED
136
137
138
139 class ZipstreamTest(unittest.TestCase):
140     """
141     Tests for twisted.python.zipstream
142     """
143     def setUp(self):
144         """
145         Creates junk data that can be compressed and a test directory for any
146         files that will be created
147         """
148         self.testdir = filepath.FilePath(self.mktemp())
149         self.testdir.makedirs()
150         self.unzipdir = self.testdir.child('unzipped')
151         self.unzipdir.makedirs()
152
153
154     def makeZipFile(self, contents, directory=''):
155         """
156         Makes a zip file archive containing len(contents) files.  Contents
157         should be a list of strings, each string being the content of one file.
158         """
159         zpfilename = self.testdir.child('zipfile.zip').path
160         zpfile = zipfile.ZipFile(zpfilename, 'w')
161         for i, content in enumerate(contents):
162             filename = str(i)
163             if directory:
164                 filename = directory + "/" + filename
165             zpfile.writestr(filename, content)
166         zpfile.close()
167         return zpfilename
168
169
170     def test_countEntries(self):
171         """
172         Make sure the deprecated L{countZipFileEntries} returns the correct
173         number of entries for a zip file.
174         """
175         name = self.makeZipFile(["one", "two", "three", "four", "five"])
176         result = self.assertWarns(DeprecationWarning,
177                                   "countZipFileEntries is deprecated.",
178                                   __file__, lambda :
179                                       zipstream.countZipFileEntries(name))
180         self.assertEqual(result, 5)
181
182
183     def test_invalidMode(self):
184         """
185         A ChunkingZipFile opened in write-mode should not allow .readfile(),
186         and raise a RuntimeError instead.
187         """
188         czf = zipstream.ChunkingZipFile(self.mktemp(), "w")
189         self.assertRaises(RuntimeError, czf.readfile, "something")
190
191
192     def test_closedArchive(self):
193         """
194         A closed ChunkingZipFile should raise a L{RuntimeError} when
195         .readfile() is invoked.
196         """
197         czf = zipstream.ChunkingZipFile(self.makeZipFile(["something"]), "r")
198         czf.close()
199         self.assertRaises(RuntimeError, czf.readfile, "something")
200
201
202     def test_invalidHeader(self):
203         """
204         A zipfile entry with the wrong magic number should raise BadZipfile for
205         readfile(), but that should not affect other files in the archive.
206         """
207         fn = self.makeZipFile(["test contents",
208                                "more contents"])
209         zf = zipfile.ZipFile(fn, "r")
210         zeroOffset = zf.getinfo("0").header_offset
211         zf.close()
212         # Zero out just the one header.
213         scribble = file(fn, "r+b")
214         scribble.seek(zeroOffset, 0)
215         scribble.write(chr(0) * 4)
216         scribble.close()
217         czf = zipstream.ChunkingZipFile(fn)
218         self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
219         self.assertEqual(czf.readfile("1").read(), "more contents")
220
221
222     def test_filenameMismatch(self):
223         """
224         A zipfile entry with a different filename than is found in the central
225         directory should raise BadZipfile.
226         """
227         fn = self.makeZipFile(["test contents",
228                                "more contents"])
229         zf = zipfile.ZipFile(fn, "r")
230         info = zf.getinfo("0")
231         info.filename = "not zero"
232         zf.close()
233         scribble = file(fn, "r+b")
234         scribble.seek(info.header_offset, 0)
235         scribble.write(info.FileHeader())
236         scribble.close()
237
238         czf = zipstream.ChunkingZipFile(fn)
239         self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
240         self.assertEqual(czf.readfile("1").read(), "more contents")
241
242
243     if sys.version_info < (2, 5):
244         # In python 2.4 and earlier, consistency between the directory and the
245         # file header are verified at archive-opening time.  In python 2.5
246         # (and, presumably, later) it is readzipfile's responsibility.
247         message = "Consistency-checking only necessary in 2.5."
248         test_invalidHeader.skip = message
249         test_filenameMismatch.skip = message
250
251
252
253     def test_unsupportedCompression(self):
254         """
255         A zipfile which describes an unsupported compression mechanism should
256         raise BadZipfile.
257         """
258         fn = self.mktemp()
259         zf = zipfile.ZipFile(fn, "w")
260         zi = zipfile.ZipInfo("0")
261         zf.writestr(zi, "some data")
262         # Mangle its compression type in the central directory; can't do this
263         # before the writestr call or zipfile will (correctly) tell us not to
264         # pass bad compression types :)
265         zi.compress_type = 1234
266         zf.close()
267
268         czf = zipstream.ChunkingZipFile(fn)
269         self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
270
271
272     def test_extraData(self):
273         """
274         readfile() should skip over 'extra' data present in the zip metadata.
275         """
276         fn = self.mktemp()
277         zf = zipfile.ZipFile(fn, 'w')
278         zi = zipfile.ZipInfo("0")
279         zi.extra = "hello, extra"
280         zf.writestr(zi, "the real data")
281         zf.close()
282         czf = zipstream.ChunkingZipFile(fn)
283         self.assertEqual(czf.readfile("0").read(), "the real data")
284
285
286     def test_unzipIter(self):
287         """
288         L{twisted.python.zipstream.unzipIter} should unzip a file for each
289         iteration and yield the number of files left to unzip after that
290         iteration
291         """
292         numfiles = 10
293         contents = ['This is test file %d!' % i for i in range(numfiles)]
294         zpfilename = self.makeZipFile(contents)
295         uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path)
296         for i in range(numfiles):
297             self.assertEqual(len(list(self.unzipdir.children())), i)
298             self.assertEqual(uziter.next(), numfiles - i - 1)
299         self.assertEqual(len(list(self.unzipdir.children())), numfiles)
300
301         for child in self.unzipdir.children():
302             num = int(child.basename())
303             self.assertEqual(child.open().read(), contents[num])
304     test_unzipIter.suppress = [
305         util.suppress(message="zipstream.unzipIter is deprecated")]
306
307
308     def test_unzipIterDeprecated(self):
309         """
310         Use of C{twisted.python.zipstream.unzipIter} will emit a
311         deprecated warning.
312         """
313         zpfilename = self.makeZipFile('foo')
314
315         self.assertEqual(len(self.flushWarnings()), 0)
316
317         for f in zipstream.unzipIter(zpfilename, self.unzipdir.path):
318             pass
319
320         warnings = self.flushWarnings()
321         self.assertEqual(len(warnings), 1)
322         self.assertEqual(warnings[0]['category'], DeprecationWarning)
323         self.assertEqual(
324             warnings[0]['message'],
325             "zipstream.unzipIter is deprecated since Twisted 11.0.0 for "
326             "security reasons.  Use Python's zipfile instead.")
327
328
329     def test_unzipIterChunky(self):
330         """
331         L{twisted.python.zipstream.unzipIterChunky} returns an iterator which
332         must be exhausted to completely unzip the input archive.
333         """
334         numfiles = 10
335         contents = ['This is test file %d!' % i for i in range(numfiles)]
336         zpfilename = self.makeZipFile(contents)
337         list(zipstream.unzipIterChunky(zpfilename, self.unzipdir.path))
338         self.assertEqual(
339             set(self.unzipdir.listdir()),
340             set(map(str, range(numfiles))))
341
342         for child in self.unzipdir.children():
343             num = int(child.basename())
344             self.assertEqual(child.getContent(), contents[num])
345
346
347     def test_unzipIterChunkyDirectory(self):
348         """
349         The path to which a file is extracted by L{zipstream.unzipIterChunky}
350         is determined by joining the C{directory} argument to C{unzip} with the
351         path within the archive of the file being extracted.
352         """
353         numfiles = 10
354         contents = ['This is test file %d!' % i for i in range(numfiles)]
355         zpfilename = self.makeZipFile(contents, 'foo')
356         list(zipstream.unzipIterChunky(zpfilename, self.unzipdir.path))
357         self.assertEqual(
358             set(self.unzipdir.child('foo').listdir()),
359             set(map(str, range(numfiles))))
360
361         for child in self.unzipdir.child('foo').children():
362             num = int(child.basename())
363             self.assertEqual(child.getContent(), contents[num])
364
365
366     def test_unzip(self):
367         """
368         L{twisted.python.zipstream.unzip} should extract all files from a zip
369         archive
370         """
371         numfiles = 3
372         zpfilename = self.makeZipFile([str(i) for i in range(numfiles)])
373         zipstream.unzip(zpfilename, self.unzipdir.path)
374         self.assertEqual(
375             set(self.unzipdir.listdir()),
376             set(map(str, range(numfiles))))
377         for i in range(numfiles):
378             self.assertEqual(self.unzipdir.child(str(i)).getContent(), str(i))
379     test_unzip.suppress = [
380         util.suppress(message="zipstream.unzip is deprecated")]
381
382
383     def test_unzipDeprecated(self):
384         """
385         Use of C{twisted.python.zipstream.unzip} will emit a deprecated warning.
386         """
387         zpfilename = self.makeZipFile('foo')
388
389         self.assertEqual(len(self.flushWarnings()), 0)
390
391         zipstream.unzip(zpfilename, self.unzipdir.path)
392
393         warnings = self.flushWarnings()
394         self.assertEqual(len(warnings), 1)
395         self.assertEqual(warnings[0]['category'], DeprecationWarning)
396         self.assertEqual(
397             warnings[0]['message'],
398             "zipstream.unzip is deprecated since Twisted 11.0.0 for "
399             "security reasons.  Use Python's zipfile instead.")
400
401
402     def test_unzipDirectory(self):
403         """
404         The path to which a file is extracted by L{zipstream.unzip} is
405         determined by joining the C{directory} argument to C{unzip} with the
406         path within the archive of the file being extracted.
407         """
408         numfiles = 3
409         zpfilename = self.makeZipFile([str(i) for i in range(numfiles)], 'foo')
410         zipstream.unzip(zpfilename, self.unzipdir.path)
411         self.assertEqual(
412             set(self.unzipdir.child('foo').listdir()),
413             set(map(str, range(numfiles))))
414         for i in range(numfiles):
415             self.assertEqual(
416                 self.unzipdir.child('foo').child(str(i)).getContent(), str(i))
417     test_unzipDirectory.suppress = [
418         util.suppress(message="zipstream.unzip is deprecated")]
419
420
421     def test_overwrite(self):
422         """
423         L{twisted.python.zipstream.unzip} and
424         L{twisted.python.zipstream.unzipIter} shouldn't overwrite files unless
425         the 'overwrite' flag is passed
426         """
427         testfile = self.unzipdir.child('0')
428         zpfilename = self.makeZipFile(['OVERWRITTEN'])
429
430         testfile.setContent('NOT OVERWRITTEN')
431         zipstream.unzip(zpfilename, self.unzipdir.path)
432         self.assertEqual(testfile.open().read(), 'NOT OVERWRITTEN')
433         zipstream.unzip(zpfilename, self.unzipdir.path, overwrite=True)
434         self.assertEqual(testfile.open().read(), 'OVERWRITTEN')
435
436         testfile.setContent('NOT OVERWRITTEN')
437         uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path)
438         uziter.next()
439         self.assertEqual(testfile.open().read(), 'NOT OVERWRITTEN')
440         uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path,
441                                      overwrite=True)
442         uziter.next()
443         self.assertEqual(testfile.open().read(), 'OVERWRITTEN')
444     test_overwrite.suppress = [
445         util.suppress(message="zipstream.unzip is deprecated"),
446         util.suppress(message="zipstream.unzipIter is deprecated")]
447
448
449     # XXX these tests are kind of gross and old, but I think unzipIterChunky is
450     # kind of a gross function anyway.  We should really write an abstract
451     # copyTo/moveTo that operates on FilePath and make sure ZipPath can support
452     # it, then just deprecate / remove this stuff.
453     def _unzipIterChunkyTest(self, compression, chunksize, lower, upper):
454         """
455         unzipIterChunky should unzip the given number of bytes per iteration.
456         """
457         junk = ' '.join([str(random.random()) for n in xrange(1000)])
458         junkmd5 = md5(junk).hexdigest()
459
460         tempdir = filepath.FilePath(self.mktemp())
461         tempdir.makedirs()
462         zfpath = tempdir.child('bigfile.zip').path
463         self._makebigfile(zfpath, compression, junk)
464         uziter = zipstream.unzipIterChunky(zfpath, tempdir.path,
465                                            chunksize=chunksize)
466         r = uziter.next()
467         # test that the number of chunks is in the right ballpark;
468         # this could theoretically be any number but statistically it
469         # should always be in this range
470         approx = lower < r < upper
471         self.failUnless(approx)
472         for r in uziter:
473             pass
474         self.assertEqual(r, 0)
475         newmd5 = md5(
476             tempdir.child("zipstreamjunk").open().read()).hexdigest()
477         self.assertEqual(newmd5, junkmd5)
478
479     def test_unzipIterChunkyStored(self):
480         """
481         unzipIterChunky should unzip the given number of bytes per iteration on
482         a stored archive.
483         """
484         self._unzipIterChunkyTest(zipfile.ZIP_STORED, 500, 35, 45)
485
486
487     def test_chunkyDeflated(self):
488         """
489         unzipIterChunky should unzip the given number of bytes per iteration on
490         a deflated archive.
491         """
492         self._unzipIterChunkyTest(zipfile.ZIP_DEFLATED, 972, 23, 27)
493
494
495     def _makebigfile(self, filename, compression, junk):
496         """
497         Create a zip file with the given file name and compression scheme.
498         """
499         zf = zipfile.ZipFile(filename, 'w', compression)
500         for i in range(10):
501             fn = 'zipstream%d' % i
502             zf.writestr(fn, "")
503         zf.writestr('zipstreamjunk', junk)
504         zf.close()