1 # Copyright (c) Twisted Matrix Laboratories.
2 # See LICENSE for details.
5 Tests for L{twisted.python.zipstream}
11 from twisted.python.compat import set
12 from twisted.python import zipstream, filepath
13 from twisted.python.hashlib import md5
14 from twisted.trial import unittest, util
18 File entry classes should behave as file-like objects
20 def getFileEntry(self, contents):
22 Return an appropriate zip file entry
24 filename = self.mktemp()
25 z = zipfile.ZipFile(filename, 'w', self.compression)
26 z.writestr('content', contents)
28 z = zipstream.ChunkingZipFile(filename, 'r')
29 return z.readfile('content')
32 def test_isatty(self):
34 zip files should not be ttys, so isatty() should be false
36 self.assertEqual(self.getFileEntry('').isatty(), False)
39 def test_closed(self):
41 The C{closed} attribute should reflect whether C{close()} has been
44 fileEntry = self.getFileEntry('')
45 self.assertEqual(fileEntry.closed, False)
47 self.assertEqual(fileEntry.closed, True)
50 def test_readline(self):
52 C{readline()} should mirror L{file.readline} and return up to a single
55 fileEntry = self.getFileEntry('hoho\nho')
56 self.assertEqual(fileEntry.readline(), 'hoho\n')
57 self.assertEqual(fileEntry.readline(), 'ho')
58 self.assertEqual(fileEntry.readline(), '')
63 Zip file entries should implement the iterator protocol as files do.
65 fileEntry = self.getFileEntry('ho\nhoho')
66 self.assertEqual(fileEntry.next(), 'ho\n')
67 self.assertEqual(fileEntry.next(), 'hoho')
68 self.assertRaises(StopIteration, fileEntry.next)
71 def test_readlines(self):
73 C{readlines()} should return a list of all the lines.
75 fileEntry = self.getFileEntry('ho\nho\nho')
76 self.assertEqual(fileEntry.readlines(), ['ho\n', 'ho\n', 'ho'])
79 def test_iteration(self):
81 C{__iter__()} and C{xreadlines()} should return C{self}.
83 fileEntry = self.getFileEntry('')
84 self.assertIdentical(iter(fileEntry), fileEntry)
85 self.assertIdentical(fileEntry.xreadlines(), fileEntry)
88 def test_readWhole(self):
90 C{.read()} should read the entire file.
92 contents = "Hello, world!"
93 entry = self.getFileEntry(contents)
94 self.assertEqual(entry.read(), contents)
97 def test_readPartial(self):
99 C{.read(num)} should read num bytes from the file.
101 contents = "0123456789"
102 entry = self.getFileEntry(contents)
104 two = entry.read(200)
105 self.assertEqual(one, "0123")
106 self.assertEqual(two, "456789")
111 C{.tell()} should return the number of bytes that have been read so
115 entry = self.getFileEntry(contents)
117 self.assertEqual(entry.tell(), 2)
119 self.assertEqual(entry.tell(), 6)
123 class DeflatedZipFileEntryTest(FileEntryMixin, unittest.TestCase):
125 DeflatedZipFileEntry should be file-like
127 compression = zipfile.ZIP_DEFLATED
131 class ZipFileEntryTest(FileEntryMixin, unittest.TestCase):
133 ZipFileEntry should be file-like
135 compression = zipfile.ZIP_STORED
139 class ZipstreamTest(unittest.TestCase):
141 Tests for twisted.python.zipstream
145 Creates junk data that can be compressed and a test directory for any
146 files that will be created
148 self.testdir = filepath.FilePath(self.mktemp())
149 self.testdir.makedirs()
150 self.unzipdir = self.testdir.child('unzipped')
151 self.unzipdir.makedirs()
154 def makeZipFile(self, contents, directory=''):
156 Makes a zip file archive containing len(contents) files. Contents
157 should be a list of strings, each string being the content of one file.
159 zpfilename = self.testdir.child('zipfile.zip').path
160 zpfile = zipfile.ZipFile(zpfilename, 'w')
161 for i, content in enumerate(contents):
164 filename = directory + "/" + filename
165 zpfile.writestr(filename, content)
170 def test_countEntries(self):
172 Make sure the deprecated L{countZipFileEntries} returns the correct
173 number of entries for a zip file.
175 name = self.makeZipFile(["one", "two", "three", "four", "five"])
176 result = self.assertWarns(DeprecationWarning,
177 "countZipFileEntries is deprecated.",
179 zipstream.countZipFileEntries(name))
180 self.assertEqual(result, 5)
183 def test_invalidMode(self):
185 A ChunkingZipFile opened in write-mode should not allow .readfile(),
186 and raise a RuntimeError instead.
188 czf = zipstream.ChunkingZipFile(self.mktemp(), "w")
189 self.assertRaises(RuntimeError, czf.readfile, "something")
192 def test_closedArchive(self):
194 A closed ChunkingZipFile should raise a L{RuntimeError} when
195 .readfile() is invoked.
197 czf = zipstream.ChunkingZipFile(self.makeZipFile(["something"]), "r")
199 self.assertRaises(RuntimeError, czf.readfile, "something")
202 def test_invalidHeader(self):
204 A zipfile entry with the wrong magic number should raise BadZipfile for
205 readfile(), but that should not affect other files in the archive.
207 fn = self.makeZipFile(["test contents",
209 zf = zipfile.ZipFile(fn, "r")
210 zeroOffset = zf.getinfo("0").header_offset
212 # Zero out just the one header.
213 scribble = file(fn, "r+b")
214 scribble.seek(zeroOffset, 0)
215 scribble.write(chr(0) * 4)
217 czf = zipstream.ChunkingZipFile(fn)
218 self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
219 self.assertEqual(czf.readfile("1").read(), "more contents")
222 def test_filenameMismatch(self):
224 A zipfile entry with a different filename than is found in the central
225 directory should raise BadZipfile.
227 fn = self.makeZipFile(["test contents",
229 zf = zipfile.ZipFile(fn, "r")
230 info = zf.getinfo("0")
231 info.filename = "not zero"
233 scribble = file(fn, "r+b")
234 scribble.seek(info.header_offset, 0)
235 scribble.write(info.FileHeader())
238 czf = zipstream.ChunkingZipFile(fn)
239 self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
240 self.assertEqual(czf.readfile("1").read(), "more contents")
243 if sys.version_info < (2, 5):
244 # In python 2.4 and earlier, consistency between the directory and the
245 # file header are verified at archive-opening time. In python 2.5
246 # (and, presumably, later) it is readzipfile's responsibility.
247 message = "Consistency-checking only necessary in 2.5."
248 test_invalidHeader.skip = message
249 test_filenameMismatch.skip = message
253 def test_unsupportedCompression(self):
255 A zipfile which describes an unsupported compression mechanism should
259 zf = zipfile.ZipFile(fn, "w")
260 zi = zipfile.ZipInfo("0")
261 zf.writestr(zi, "some data")
262 # Mangle its compression type in the central directory; can't do this
263 # before the writestr call or zipfile will (correctly) tell us not to
264 # pass bad compression types :)
265 zi.compress_type = 1234
268 czf = zipstream.ChunkingZipFile(fn)
269 self.assertRaises(zipfile.BadZipfile, czf.readfile, "0")
272 def test_extraData(self):
274 readfile() should skip over 'extra' data present in the zip metadata.
277 zf = zipfile.ZipFile(fn, 'w')
278 zi = zipfile.ZipInfo("0")
279 zi.extra = "hello, extra"
280 zf.writestr(zi, "the real data")
282 czf = zipstream.ChunkingZipFile(fn)
283 self.assertEqual(czf.readfile("0").read(), "the real data")
286 def test_unzipIter(self):
288 L{twisted.python.zipstream.unzipIter} should unzip a file for each
289 iteration and yield the number of files left to unzip after that
293 contents = ['This is test file %d!' % i for i in range(numfiles)]
294 zpfilename = self.makeZipFile(contents)
295 uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path)
296 for i in range(numfiles):
297 self.assertEqual(len(list(self.unzipdir.children())), i)
298 self.assertEqual(uziter.next(), numfiles - i - 1)
299 self.assertEqual(len(list(self.unzipdir.children())), numfiles)
301 for child in self.unzipdir.children():
302 num = int(child.basename())
303 self.assertEqual(child.open().read(), contents[num])
304 test_unzipIter.suppress = [
305 util.suppress(message="zipstream.unzipIter is deprecated")]
308 def test_unzipIterDeprecated(self):
310 Use of C{twisted.python.zipstream.unzipIter} will emit a
313 zpfilename = self.makeZipFile('foo')
315 self.assertEqual(len(self.flushWarnings()), 0)
317 for f in zipstream.unzipIter(zpfilename, self.unzipdir.path):
320 warnings = self.flushWarnings()
321 self.assertEqual(len(warnings), 1)
322 self.assertEqual(warnings[0]['category'], DeprecationWarning)
324 warnings[0]['message'],
325 "zipstream.unzipIter is deprecated since Twisted 11.0.0 for "
326 "security reasons. Use Python's zipfile instead.")
329 def test_unzipIterChunky(self):
331 L{twisted.python.zipstream.unzipIterChunky} returns an iterator which
332 must be exhausted to completely unzip the input archive.
335 contents = ['This is test file %d!' % i for i in range(numfiles)]
336 zpfilename = self.makeZipFile(contents)
337 list(zipstream.unzipIterChunky(zpfilename, self.unzipdir.path))
339 set(self.unzipdir.listdir()),
340 set(map(str, range(numfiles))))
342 for child in self.unzipdir.children():
343 num = int(child.basename())
344 self.assertEqual(child.getContent(), contents[num])
347 def test_unzipIterChunkyDirectory(self):
349 The path to which a file is extracted by L{zipstream.unzipIterChunky}
350 is determined by joining the C{directory} argument to C{unzip} with the
351 path within the archive of the file being extracted.
354 contents = ['This is test file %d!' % i for i in range(numfiles)]
355 zpfilename = self.makeZipFile(contents, 'foo')
356 list(zipstream.unzipIterChunky(zpfilename, self.unzipdir.path))
358 set(self.unzipdir.child('foo').listdir()),
359 set(map(str, range(numfiles))))
361 for child in self.unzipdir.child('foo').children():
362 num = int(child.basename())
363 self.assertEqual(child.getContent(), contents[num])
366 def test_unzip(self):
368 L{twisted.python.zipstream.unzip} should extract all files from a zip
372 zpfilename = self.makeZipFile([str(i) for i in range(numfiles)])
373 zipstream.unzip(zpfilename, self.unzipdir.path)
375 set(self.unzipdir.listdir()),
376 set(map(str, range(numfiles))))
377 for i in range(numfiles):
378 self.assertEqual(self.unzipdir.child(str(i)).getContent(), str(i))
379 test_unzip.suppress = [
380 util.suppress(message="zipstream.unzip is deprecated")]
383 def test_unzipDeprecated(self):
385 Use of C{twisted.python.zipstream.unzip} will emit a deprecated warning.
387 zpfilename = self.makeZipFile('foo')
389 self.assertEqual(len(self.flushWarnings()), 0)
391 zipstream.unzip(zpfilename, self.unzipdir.path)
393 warnings = self.flushWarnings()
394 self.assertEqual(len(warnings), 1)
395 self.assertEqual(warnings[0]['category'], DeprecationWarning)
397 warnings[0]['message'],
398 "zipstream.unzip is deprecated since Twisted 11.0.0 for "
399 "security reasons. Use Python's zipfile instead.")
402 def test_unzipDirectory(self):
404 The path to which a file is extracted by L{zipstream.unzip} is
405 determined by joining the C{directory} argument to C{unzip} with the
406 path within the archive of the file being extracted.
409 zpfilename = self.makeZipFile([str(i) for i in range(numfiles)], 'foo')
410 zipstream.unzip(zpfilename, self.unzipdir.path)
412 set(self.unzipdir.child('foo').listdir()),
413 set(map(str, range(numfiles))))
414 for i in range(numfiles):
416 self.unzipdir.child('foo').child(str(i)).getContent(), str(i))
417 test_unzipDirectory.suppress = [
418 util.suppress(message="zipstream.unzip is deprecated")]
421 def test_overwrite(self):
423 L{twisted.python.zipstream.unzip} and
424 L{twisted.python.zipstream.unzipIter} shouldn't overwrite files unless
425 the 'overwrite' flag is passed
427 testfile = self.unzipdir.child('0')
428 zpfilename = self.makeZipFile(['OVERWRITTEN'])
430 testfile.setContent('NOT OVERWRITTEN')
431 zipstream.unzip(zpfilename, self.unzipdir.path)
432 self.assertEqual(testfile.open().read(), 'NOT OVERWRITTEN')
433 zipstream.unzip(zpfilename, self.unzipdir.path, overwrite=True)
434 self.assertEqual(testfile.open().read(), 'OVERWRITTEN')
436 testfile.setContent('NOT OVERWRITTEN')
437 uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path)
439 self.assertEqual(testfile.open().read(), 'NOT OVERWRITTEN')
440 uziter = zipstream.unzipIter(zpfilename, self.unzipdir.path,
443 self.assertEqual(testfile.open().read(), 'OVERWRITTEN')
444 test_overwrite.suppress = [
445 util.suppress(message="zipstream.unzip is deprecated"),
446 util.suppress(message="zipstream.unzipIter is deprecated")]
449 # XXX these tests are kind of gross and old, but I think unzipIterChunky is
450 # kind of a gross function anyway. We should really write an abstract
451 # copyTo/moveTo that operates on FilePath and make sure ZipPath can support
452 # it, then just deprecate / remove this stuff.
453 def _unzipIterChunkyTest(self, compression, chunksize, lower, upper):
455 unzipIterChunky should unzip the given number of bytes per iteration.
457 junk = ' '.join([str(random.random()) for n in xrange(1000)])
458 junkmd5 = md5(junk).hexdigest()
460 tempdir = filepath.FilePath(self.mktemp())
462 zfpath = tempdir.child('bigfile.zip').path
463 self._makebigfile(zfpath, compression, junk)
464 uziter = zipstream.unzipIterChunky(zfpath, tempdir.path,
467 # test that the number of chunks is in the right ballpark;
468 # this could theoretically be any number but statistically it
469 # should always be in this range
470 approx = lower < r < upper
471 self.failUnless(approx)
474 self.assertEqual(r, 0)
476 tempdir.child("zipstreamjunk").open().read()).hexdigest()
477 self.assertEqual(newmd5, junkmd5)
479 def test_unzipIterChunkyStored(self):
481 unzipIterChunky should unzip the given number of bytes per iteration on
484 self._unzipIterChunkyTest(zipfile.ZIP_STORED, 500, 35, 45)
487 def test_chunkyDeflated(self):
489 unzipIterChunky should unzip the given number of bytes per iteration on
492 self._unzipIterChunkyTest(zipfile.ZIP_DEFLATED, 972, 23, 27)
495 def _makebigfile(self, filename, compression, junk):
497 Create a zip file with the given file name and compression scheme.
499 zf = zipfile.ZipFile(filename, 'w', compression)
501 fn = 'zipstream%d' % i
503 zf.writestr('zipstreamjunk', junk)