2 from test import test_support
5 from test.test_support import precisionbigmemtest, _1G
7 zlib = test_support.import_module('zlib')
10 class ChecksumTestCase(unittest.TestCase):
12 def test_crc32start(self):
13 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
14 self.assertTrue(zlib.crc32("abc", 0xffffffff))
16 def test_crc32empty(self):
17 self.assertEqual(zlib.crc32("", 0), 0)
18 self.assertEqual(zlib.crc32("", 1), 1)
19 self.assertEqual(zlib.crc32("", 432), 432)
21 def test_adler32start(self):
22 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
23 self.assertTrue(zlib.adler32("abc", 0xffffffff))
25 def test_adler32empty(self):
26 self.assertEqual(zlib.adler32("", 0), 0)
27 self.assertEqual(zlib.adler32("", 1), 1)
28 self.assertEqual(zlib.adler32("", 432), 432)
30 def assertEqual32(self, seen, expected):
31 # 32-bit values masked -- checksums on 32- vs 64- bit machines
32 # This is important if bit 31 (0x08000000L) is set.
33 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
35 def test_penguins(self):
36 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
37 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
38 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
39 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
41 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
42 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
44 def test_abcdefghijklmnop(self):
45 """test issue1202 compliance: signed crc32, adler32 in 2.x"""
46 foo = 'abcdefghijklmnop'
47 # explicitly test signed behavior
48 self.assertEqual(zlib.crc32(foo), -1808088941)
49 self.assertEqual(zlib.crc32('spam'), 1138425661)
50 self.assertEqual(zlib.adler32(foo+foo), -721416943)
51 self.assertEqual(zlib.adler32('spam'), 72286642)
53 def test_same_as_binascii_crc32(self):
54 foo = 'abcdefghijklmnop'
55 self.assertEqual(binascii.crc32(foo), zlib.crc32(foo))
56 self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam'))
58 def test_negative_crc_iv_input(self):
59 # The range of valid input values for the crc state should be
60 # -2**31 through 2**32-1 to allow inputs artifically constrained
61 # to a signed 32-bit integer.
62 self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL))
63 self.assertEqual(zlib.crc32('spam', -3141593),
64 zlib.crc32('spam', 0xffd01027L))
65 self.assertEqual(zlib.crc32('spam', -(2**31)),
66 zlib.crc32('spam', (2**31)))
69 class ExceptionTestCase(unittest.TestCase):
70 # make sure we generate some expected errors
71 def test_badlevel(self):
72 # specifying compression level out of range causes an error
73 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
75 self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10)
77 def test_badcompressobj(self):
78 # verify failure on building compress object with bad params
79 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
80 # specifying total bits too large causes an error
81 self.assertRaises(ValueError,
82 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
84 def test_baddecompressobj(self):
85 # verify failure on building decompress object with bad params
86 self.assertRaises(ValueError, zlib.decompressobj, -1)
88 def test_decompressobj_badflush(self):
89 # verify failure on calling decompressobj.flush with bad params
90 self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
91 self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
94 class BaseCompressTestCase(object):
95 def check_big_compress_buffer(self, size, compress_func):
97 fmt = "%%0%dx" % (2 * _1M)
98 # Generate 10MB worth of random, and expand it by repeating it.
99 # The assumption is that zlib's memory is not big enough to exploit
100 # such spread out redundancy.
101 data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M))
103 data = data * (size // len(data) + 1)
110 def check_big_decompress_buffer(self, size, decompress_func):
113 compressed = zlib.compress(data, 1)
117 data = decompress_func(compressed)
120 self.assertEqual(len(data), size)
121 self.assertEqual(len(data.strip('x')), 0)
126 class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
127 # Test compression in one go (whole message compression)
128 def test_speech(self):
129 x = zlib.compress(HAMLET_SCENE)
130 self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
132 def test_speech128(self):
134 data = HAMLET_SCENE * 128
135 x = zlib.compress(data)
136 self.assertEqual(zlib.decompress(x), data)
138 def test_incomplete_stream(self):
139 # An useful error message is given
140 x = zlib.compress(HAMLET_SCENE)
141 self.assertRaisesRegexp(zlib.error,
142 "Error -5 while decompressing data: incomplete or truncated stream",
143 zlib.decompress, x[:-1])
145 # Memory use of the following functions takes into account overallocation
147 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
148 def test_big_compress_buffer(self, size):
149 compress = lambda s: zlib.compress(s, 1)
150 self.check_big_compress_buffer(size, compress)
152 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
153 def test_big_decompress_buffer(self, size):
154 self.check_big_decompress_buffer(size, zlib.decompress)
157 class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
158 # Test compression object
160 # straightforward compress/decompress objects
161 data = HAMLET_SCENE * 128
162 co = zlib.compressobj()
163 x1 = co.compress(data)
165 self.assertRaises(zlib.error, co.flush) # second flush should not work
166 dco = zlib.decompressobj()
167 y1 = dco.decompress(x1 + x2)
169 self.assertEqual(data, y1 + y2)
171 def test_compressoptions(self):
172 # specify lots of options to compressobj()
174 method = zlib.DEFLATED
177 strategy = zlib.Z_FILTERED
178 co = zlib.compressobj(level, method, wbits, memlevel, strategy)
179 x1 = co.compress(HAMLET_SCENE)
181 dco = zlib.decompressobj(wbits)
182 y1 = dco.decompress(x1 + x2)
184 self.assertEqual(HAMLET_SCENE, y1 + y2)
186 def test_compressincremental(self):
187 # compress object in steps, decompress object as one-shot
188 data = HAMLET_SCENE * 128
189 co = zlib.compressobj()
191 for i in range(0, len(data), 256):
192 bufs.append(co.compress(data[i:i+256]))
193 bufs.append(co.flush())
194 combuf = ''.join(bufs)
196 dco = zlib.decompressobj()
197 y1 = dco.decompress(''.join(bufs))
199 self.assertEqual(data, y1 + y2)
201 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
202 # compress object in steps, decompress object in steps
203 source = source or HAMLET_SCENE
205 co = zlib.compressobj()
207 for i in range(0, len(data), cx):
208 bufs.append(co.compress(data[i:i+cx]))
209 bufs.append(co.flush())
210 combuf = ''.join(bufs)
212 self.assertEqual(data, zlib.decompress(combuf))
214 dco = zlib.decompressobj()
216 for i in range(0, len(combuf), dcx):
217 bufs.append(dco.decompress(combuf[i:i+dcx]))
218 self.assertEqual('', dco.unconsumed_tail, ########
219 "(A) uct should be '': not %d long" %
220 len(dco.unconsumed_tail))
222 bufs.append(dco.flush())
225 chunk = dco.decompress('')
230 self.assertEqual('', dco.unconsumed_tail, ########
231 "(B) uct should be '': not %d long" %
232 len(dco.unconsumed_tail))
233 self.assertEqual(data, ''.join(bufs))
234 # Failure means: "decompressobj with init options failed"
236 def test_decompincflush(self):
237 self.test_decompinc(flush=True)
239 def test_decompimax(self, source=None, cx=256, dcx=64):
240 # compress in steps, decompress in length-restricted steps
241 source = source or HAMLET_SCENE
242 # Check a decompression object with max_length specified
244 co = zlib.compressobj()
246 for i in range(0, len(data), cx):
247 bufs.append(co.compress(data[i:i+cx]))
248 bufs.append(co.flush())
249 combuf = ''.join(bufs)
250 self.assertEqual(data, zlib.decompress(combuf),
251 'compressed data failure')
253 dco = zlib.decompressobj()
257 #max_length = 1 + len(cb)//10
258 chunk = dco.decompress(cb, dcx)
259 self.assertFalse(len(chunk) > dcx,
260 'chunk too big (%d>%d)' % (len(chunk), dcx))
262 cb = dco.unconsumed_tail
263 bufs.append(dco.flush())
264 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
266 def test_decompressmaxlen(self, flush=False):
267 # Check a decompression object with max_length specified
268 data = HAMLET_SCENE * 128
269 co = zlib.compressobj()
271 for i in range(0, len(data), 256):
272 bufs.append(co.compress(data[i:i+256]))
273 bufs.append(co.flush())
274 combuf = ''.join(bufs)
275 self.assertEqual(data, zlib.decompress(combuf),
276 'compressed data failure')
278 dco = zlib.decompressobj()
282 max_length = 1 + len(cb)//10
283 chunk = dco.decompress(cb, max_length)
284 self.assertFalse(len(chunk) > max_length,
285 'chunk too big (%d>%d)' % (len(chunk),max_length))
287 cb = dco.unconsumed_tail
289 bufs.append(dco.flush())
292 chunk = dco.decompress('', max_length)
293 self.assertFalse(len(chunk) > max_length,
294 'chunk too big (%d>%d)' % (len(chunk),max_length))
296 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
298 def test_decompressmaxlenflush(self):
299 self.test_decompressmaxlen(flush=True)
301 def test_maxlenmisc(self):
302 # Misc tests of max_length
303 dco = zlib.decompressobj()
304 self.assertRaises(ValueError, dco.decompress, "", -1)
305 self.assertEqual('', dco.unconsumed_tail)
307 def test_flushes(self):
308 # Test flush() with the various options, using all the
309 # different levels in order to provide more variations.
310 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
311 sync_opt = [getattr(zlib, opt) for opt in sync_opt
312 if hasattr(zlib, opt)]
313 data = HAMLET_SCENE * 8
315 for sync in sync_opt:
316 for level in range(10):
317 obj = zlib.compressobj( level )
318 a = obj.compress( data[:3000] )
319 b = obj.flush( sync )
320 c = obj.compress( data[3000:] )
322 self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
323 data, ("Decompress failed: flush "
324 "mode=%i, level=%i") % (sync, level))
327 def test_odd_flush(self):
328 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
331 if hasattr(zlib, 'Z_SYNC_FLUSH'):
332 # Testing on 17K of "random" data
334 # Create compressor and decompressor objects
335 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
336 dco = zlib.decompressobj()
339 # generate random data stream
341 # In 2.3 and later, WichmannHill is the RNG of the bug report
342 gen = random.WichmannHill()
343 except AttributeError:
345 # 2.2 called it Random
346 gen = random.Random()
347 except AttributeError:
348 # others might simply have a single RNG
351 data = genblock(1, 17 * 1024, generator=gen)
353 # compress, sync-flush, and decompress
354 first = co.compress(data)
355 second = co.flush(zlib.Z_SYNC_FLUSH)
356 expanded = dco.decompress(first + second)
358 # if decompressed data is different from the input data, choke.
359 self.assertEqual(expanded, data, "17K random source doesn't match")
361 def test_empty_flush(self):
362 # Test that calling .flush() on unused objects works.
363 # (Bug #1083110 -- calling .flush() on decompress objects
364 # caused a core dump.)
366 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
367 self.assertTrue(co.flush()) # Returns a zlib header
368 dco = zlib.decompressobj()
369 self.assertEqual(dco.flush(), "") # Returns nothing
371 def test_decompress_incomplete_stream(self):
372 # This is 'foo', deflated
373 x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
375 self.assertEqual(zlib.decompress(x), 'foo')
376 self.assertRaises(zlib.error, zlib.decompress, x[:-5])
377 # Omitting the stream end works with decompressor objects
379 dco = zlib.decompressobj()
380 y = dco.decompress(x[:-5])
382 self.assertEqual(y, 'foo')
384 if hasattr(zlib.compressobj(), "copy"):
385 def test_compresscopy(self):
386 # Test copying a compression object
388 data1 = HAMLET_SCENE.swapcase()
389 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
391 bufs0.append(c0.compress(data0))
396 bufs0.append(c0.compress(data0))
397 bufs0.append(c0.flush())
400 bufs1.append(c1.compress(data1))
401 bufs1.append(c1.flush())
404 self.assertEqual(zlib.decompress(s0),data0+data0)
405 self.assertEqual(zlib.decompress(s1),data0+data1)
407 def test_badcompresscopy(self):
408 # Test copying a compression object in an inconsistent state
409 c = zlib.compressobj()
410 c.compress(HAMLET_SCENE)
412 self.assertRaises(ValueError, c.copy)
414 if hasattr(zlib.decompressobj(), "copy"):
415 def test_decompresscopy(self):
416 # Test copying a decompression object
418 comp = zlib.compress(data)
420 d0 = zlib.decompressobj()
422 bufs0.append(d0.decompress(comp[:32]))
427 bufs0.append(d0.decompress(comp[32:]))
430 bufs1.append(d1.decompress(comp[32:]))
433 self.assertEqual(s0,s1)
434 self.assertEqual(s0,data)
436 def test_baddecompresscopy(self):
437 # Test copying a compression object in an inconsistent state
438 data = zlib.compress(HAMLET_SCENE)
439 d = zlib.decompressobj()
442 self.assertRaises(ValueError, d.copy)
444 # Memory use of the following functions takes into account overallocation
446 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
447 def test_big_compress_buffer(self, size):
448 c = zlib.compressobj(1)
449 compress = lambda s: c.compress(s) + c.flush()
450 self.check_big_compress_buffer(size, compress)
452 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
453 def test_big_decompress_buffer(self, size):
454 d = zlib.decompressobj()
455 decompress = lambda s: d.decompress(s) + d.flush()
456 self.check_big_decompress_buffer(size, decompress)
459 def genblock(seed, length, step=1024, generator=random):
460 """length-byte stream of random data from a seed (in step-byte blocks)."""
463 randint = generator.randint
464 if length < step or step < 2:
467 for i in range(0, length, step):
468 blocks.append(''.join([chr(randint(0,255))
469 for x in range(step)]))
470 return ''.join(blocks)[:length]
474 def choose_lines(source, number, seed=None, generator=random):
475 """Return a list of number lines randomly chosen from the source"""
478 sources = source.split('\n')
479 return [generator.choice(sources) for n in range(number)]
487 I stay too long: but here my father comes.
491 A double blessing is a double grace,
492 Occasion smiles upon a second leave.
496 Yet here, Laertes! aboard, aboard, for shame!
497 The wind sits in the shoulder of your sail,
498 And you are stay'd for. There; my blessing with thee!
499 And these few precepts in thy memory
500 See thou character. Give thy thoughts no tongue,
501 Nor any unproportioned thought his act.
502 Be thou familiar, but by no means vulgar.
503 Those friends thou hast, and their adoption tried,
504 Grapple them to thy soul with hoops of steel;
505 But do not dull thy palm with entertainment
506 Of each new-hatch'd, unfledged comrade. Beware
507 Of entrance to a quarrel, but being in,
508 Bear't that the opposed may beware of thee.
509 Give every man thy ear, but few thy voice;
510 Take each man's censure, but reserve thy judgment.
511 Costly thy habit as thy purse can buy,
512 But not express'd in fancy; rich, not gaudy;
513 For the apparel oft proclaims the man,
514 And they in France of the best rank and station
515 Are of a most select and generous chief in that.
516 Neither a borrower nor a lender be;
517 For loan oft loses both itself and friend,
518 And borrowing dulls the edge of husbandry.
519 This above all: to thine ownself be true,
520 And it must follow, as the night the day,
521 Thou canst not then be false to any man.
522 Farewell: my blessing season this in thee!
526 Most humbly do I take my leave, my lord.
530 The time invites you; go; your servants tend.
534 Farewell, Ophelia; and remember well
535 What I have said to you.
539 'Tis in my memory lock'd,
540 And you yourself shall keep the key of it.
549 test_support.run_unittest(
553 CompressObjectTestCase
556 if __name__ == "__main__":