cbc3ed498269d471edacf569db4bc56be785eb30
[profile/ivi/python.git] / Lib / test / test_zlib.py
1 import unittest
2 from test import test_support
3 import binascii
4 import random
5 from test.test_support import precisionbigmemtest, _1G
6
7 zlib = test_support.import_module('zlib')
8
9
10 class ChecksumTestCase(unittest.TestCase):
11     # checksum test cases
12     def test_crc32start(self):
13         self.assertEqual(zlib.crc32(""), zlib.crc32("", 0))
14         self.assertTrue(zlib.crc32("abc", 0xffffffff))
15
16     def test_crc32empty(self):
17         self.assertEqual(zlib.crc32("", 0), 0)
18         self.assertEqual(zlib.crc32("", 1), 1)
19         self.assertEqual(zlib.crc32("", 432), 432)
20
21     def test_adler32start(self):
22         self.assertEqual(zlib.adler32(""), zlib.adler32("", 1))
23         self.assertTrue(zlib.adler32("abc", 0xffffffff))
24
25     def test_adler32empty(self):
26         self.assertEqual(zlib.adler32("", 0), 0)
27         self.assertEqual(zlib.adler32("", 1), 1)
28         self.assertEqual(zlib.adler32("", 432), 432)
29
30     def assertEqual32(self, seen, expected):
31         # 32-bit values masked -- checksums on 32- vs 64- bit machines
32         # This is important if bit 31 (0x08000000L) is set.
33         self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL)
34
35     def test_penguins(self):
36         self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L)
37         self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94)
38         self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6)
39         self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7)
40
41         self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0))
42         self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1))
43
44     def test_abcdefghijklmnop(self):
45         """test issue1202 compliance: signed crc32, adler32 in 2.x"""
46         foo = 'abcdefghijklmnop'
47         # explicitly test signed behavior
48         self.assertEqual(zlib.crc32(foo), -1808088941)
49         self.assertEqual(zlib.crc32('spam'), 1138425661)
50         self.assertEqual(zlib.adler32(foo+foo), -721416943)
51         self.assertEqual(zlib.adler32('spam'), 72286642)
52
53     def test_same_as_binascii_crc32(self):
54         foo = 'abcdefghijklmnop'
55         self.assertEqual(binascii.crc32(foo), zlib.crc32(foo))
56         self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam'))
57
58     def test_negative_crc_iv_input(self):
59         # The range of valid input values for the crc state should be
60         # -2**31 through 2**32-1 to allow inputs artifically constrained
61         # to a signed 32-bit integer.
62         self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL))
63         self.assertEqual(zlib.crc32('spam', -3141593),
64                          zlib.crc32('spam',  0xffd01027L))
65         self.assertEqual(zlib.crc32('spam', -(2**31)),
66                          zlib.crc32('spam',  (2**31)))
67
68
69 class ExceptionTestCase(unittest.TestCase):
70     # make sure we generate some expected errors
71     def test_badlevel(self):
72         # specifying compression level out of range causes an error
73         # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
74         # accepts 0 too)
75         self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10)
76
77     def test_badcompressobj(self):
78         # verify failure on building compress object with bad params
79         self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
80         # specifying total bits too large causes an error
81         self.assertRaises(ValueError,
82                 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
83
84     def test_baddecompressobj(self):
85         # verify failure on building decompress object with bad params
86         self.assertRaises(ValueError, zlib.decompressobj, -1)
87
88     def test_decompressobj_badflush(self):
89         # verify failure on calling decompressobj.flush with bad params
90         self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
91         self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
92
93
94 class BaseCompressTestCase(object):
95     def check_big_compress_buffer(self, size, compress_func):
96         _1M = 1024 * 1024
97         fmt = "%%0%dx" % (2 * _1M)
98         # Generate 10MB worth of random, and expand it by repeating it.
99         # The assumption is that zlib's memory is not big enough to exploit
100         # such spread out redundancy.
101         data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M))
102                         for i in range(10)])
103         data = data * (size // len(data) + 1)
104         try:
105             compress_func(data)
106         finally:
107             # Release memory
108             data = None
109
110     def check_big_decompress_buffer(self, size, decompress_func):
111         data = 'x' * size
112         try:
113             compressed = zlib.compress(data, 1)
114         finally:
115             # Release memory
116             data = None
117         data = decompress_func(compressed)
118         # Sanity check
119         try:
120             self.assertEqual(len(data), size)
121             self.assertEqual(len(data.strip('x')), 0)
122         finally:
123             data = None
124
125
126 class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
127     # Test compression in one go (whole message compression)
128     def test_speech(self):
129         x = zlib.compress(HAMLET_SCENE)
130         self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
131
132     def test_speech128(self):
133         # compress more data
134         data = HAMLET_SCENE * 128
135         x = zlib.compress(data)
136         self.assertEqual(zlib.decompress(x), data)
137
138     def test_incomplete_stream(self):
139         # An useful error message is given
140         x = zlib.compress(HAMLET_SCENE)
141         self.assertRaisesRegexp(zlib.error,
142             "Error -5 while decompressing data: incomplete or truncated stream",
143             zlib.decompress, x[:-1])
144
145     # Memory use of the following functions takes into account overallocation
146
147     @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
148     def test_big_compress_buffer(self, size):
149         compress = lambda s: zlib.compress(s, 1)
150         self.check_big_compress_buffer(size, compress)
151
152     @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
153     def test_big_decompress_buffer(self, size):
154         self.check_big_decompress_buffer(size, zlib.decompress)
155
156
157 class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
158     # Test compression object
159     def test_pair(self):
160         # straightforward compress/decompress objects
161         data = HAMLET_SCENE * 128
162         co = zlib.compressobj()
163         x1 = co.compress(data)
164         x2 = co.flush()
165         self.assertRaises(zlib.error, co.flush) # second flush should not work
166         dco = zlib.decompressobj()
167         y1 = dco.decompress(x1 + x2)
168         y2 = dco.flush()
169         self.assertEqual(data, y1 + y2)
170
171     def test_compressoptions(self):
172         # specify lots of options to compressobj()
173         level = 2
174         method = zlib.DEFLATED
175         wbits = -12
176         memlevel = 9
177         strategy = zlib.Z_FILTERED
178         co = zlib.compressobj(level, method, wbits, memlevel, strategy)
179         x1 = co.compress(HAMLET_SCENE)
180         x2 = co.flush()
181         dco = zlib.decompressobj(wbits)
182         y1 = dco.decompress(x1 + x2)
183         y2 = dco.flush()
184         self.assertEqual(HAMLET_SCENE, y1 + y2)
185
186     def test_compressincremental(self):
187         # compress object in steps, decompress object as one-shot
188         data = HAMLET_SCENE * 128
189         co = zlib.compressobj()
190         bufs = []
191         for i in range(0, len(data), 256):
192             bufs.append(co.compress(data[i:i+256]))
193         bufs.append(co.flush())
194         combuf = ''.join(bufs)
195
196         dco = zlib.decompressobj()
197         y1 = dco.decompress(''.join(bufs))
198         y2 = dco.flush()
199         self.assertEqual(data, y1 + y2)
200
201     def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
202         # compress object in steps, decompress object in steps
203         source = source or HAMLET_SCENE
204         data = source * 128
205         co = zlib.compressobj()
206         bufs = []
207         for i in range(0, len(data), cx):
208             bufs.append(co.compress(data[i:i+cx]))
209         bufs.append(co.flush())
210         combuf = ''.join(bufs)
211
212         self.assertEqual(data, zlib.decompress(combuf))
213
214         dco = zlib.decompressobj()
215         bufs = []
216         for i in range(0, len(combuf), dcx):
217             bufs.append(dco.decompress(combuf[i:i+dcx]))
218             self.assertEqual('', dco.unconsumed_tail, ########
219                              "(A) uct should be '': not %d long" %
220                                        len(dco.unconsumed_tail))
221         if flush:
222             bufs.append(dco.flush())
223         else:
224             while True:
225                 chunk = dco.decompress('')
226                 if chunk:
227                     bufs.append(chunk)
228                 else:
229                     break
230         self.assertEqual('', dco.unconsumed_tail, ########
231                          "(B) uct should be '': not %d long" %
232                                        len(dco.unconsumed_tail))
233         self.assertEqual(data, ''.join(bufs))
234         # Failure means: "decompressobj with init options failed"
235
236     def test_decompincflush(self):
237         self.test_decompinc(flush=True)
238
239     def test_decompimax(self, source=None, cx=256, dcx=64):
240         # compress in steps, decompress in length-restricted steps
241         source = source or HAMLET_SCENE
242         # Check a decompression object with max_length specified
243         data = source * 128
244         co = zlib.compressobj()
245         bufs = []
246         for i in range(0, len(data), cx):
247             bufs.append(co.compress(data[i:i+cx]))
248         bufs.append(co.flush())
249         combuf = ''.join(bufs)
250         self.assertEqual(data, zlib.decompress(combuf),
251                          'compressed data failure')
252
253         dco = zlib.decompressobj()
254         bufs = []
255         cb = combuf
256         while cb:
257             #max_length = 1 + len(cb)//10
258             chunk = dco.decompress(cb, dcx)
259             self.assertFalse(len(chunk) > dcx,
260                     'chunk too big (%d>%d)' % (len(chunk), dcx))
261             bufs.append(chunk)
262             cb = dco.unconsumed_tail
263         bufs.append(dco.flush())
264         self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
265
266     def test_decompressmaxlen(self, flush=False):
267         # Check a decompression object with max_length specified
268         data = HAMLET_SCENE * 128
269         co = zlib.compressobj()
270         bufs = []
271         for i in range(0, len(data), 256):
272             bufs.append(co.compress(data[i:i+256]))
273         bufs.append(co.flush())
274         combuf = ''.join(bufs)
275         self.assertEqual(data, zlib.decompress(combuf),
276                          'compressed data failure')
277
278         dco = zlib.decompressobj()
279         bufs = []
280         cb = combuf
281         while cb:
282             max_length = 1 + len(cb)//10
283             chunk = dco.decompress(cb, max_length)
284             self.assertFalse(len(chunk) > max_length,
285                         'chunk too big (%d>%d)' % (len(chunk),max_length))
286             bufs.append(chunk)
287             cb = dco.unconsumed_tail
288         if flush:
289             bufs.append(dco.flush())
290         else:
291             while chunk:
292                 chunk = dco.decompress('', max_length)
293                 self.assertFalse(len(chunk) > max_length,
294                             'chunk too big (%d>%d)' % (len(chunk),max_length))
295                 bufs.append(chunk)
296         self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved')
297
298     def test_decompressmaxlenflush(self):
299         self.test_decompressmaxlen(flush=True)
300
301     def test_maxlenmisc(self):
302         # Misc tests of max_length
303         dco = zlib.decompressobj()
304         self.assertRaises(ValueError, dco.decompress, "", -1)
305         self.assertEqual('', dco.unconsumed_tail)
306
307     def test_flushes(self):
308         # Test flush() with the various options, using all the
309         # different levels in order to provide more variations.
310         sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH']
311         sync_opt = [getattr(zlib, opt) for opt in sync_opt
312                     if hasattr(zlib, opt)]
313         data = HAMLET_SCENE * 8
314
315         for sync in sync_opt:
316             for level in range(10):
317                 obj = zlib.compressobj( level )
318                 a = obj.compress( data[:3000] )
319                 b = obj.flush( sync )
320                 c = obj.compress( data[3000:] )
321                 d = obj.flush()
322                 self.assertEqual(zlib.decompress(''.join([a,b,c,d])),
323                                  data, ("Decompress failed: flush "
324                                         "mode=%i, level=%i") % (sync, level))
325                 del obj
326
327     def test_odd_flush(self):
328         # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
329         import random
330
331         if hasattr(zlib, 'Z_SYNC_FLUSH'):
332             # Testing on 17K of "random" data
333
334             # Create compressor and decompressor objects
335             co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
336             dco = zlib.decompressobj()
337
338             # Try 17K of data
339             # generate random data stream
340             try:
341                 # In 2.3 and later, WichmannHill is the RNG of the bug report
342                 gen = random.WichmannHill()
343             except AttributeError:
344                 try:
345                     # 2.2 called it Random
346                     gen = random.Random()
347                 except AttributeError:
348                     # others might simply have a single RNG
349                     gen = random
350             gen.seed(1)
351             data = genblock(1, 17 * 1024, generator=gen)
352
353             # compress, sync-flush, and decompress
354             first = co.compress(data)
355             second = co.flush(zlib.Z_SYNC_FLUSH)
356             expanded = dco.decompress(first + second)
357
358             # if decompressed data is different from the input data, choke.
359             self.assertEqual(expanded, data, "17K random source doesn't match")
360
361     def test_empty_flush(self):
362         # Test that calling .flush() on unused objects works.
363         # (Bug #1083110 -- calling .flush() on decompress objects
364         # caused a core dump.)
365
366         co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
367         self.assertTrue(co.flush())  # Returns a zlib header
368         dco = zlib.decompressobj()
369         self.assertEqual(dco.flush(), "") # Returns nothing
370
371     def test_decompress_incomplete_stream(self):
372         # This is 'foo', deflated
373         x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
374         # For the record
375         self.assertEqual(zlib.decompress(x), 'foo')
376         self.assertRaises(zlib.error, zlib.decompress, x[:-5])
377         # Omitting the stream end works with decompressor objects
378         # (see issue #8672).
379         dco = zlib.decompressobj()
380         y = dco.decompress(x[:-5])
381         y += dco.flush()
382         self.assertEqual(y, 'foo')
383
384     if hasattr(zlib.compressobj(), "copy"):
385         def test_compresscopy(self):
386             # Test copying a compression object
387             data0 = HAMLET_SCENE
388             data1 = HAMLET_SCENE.swapcase()
389             c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
390             bufs0 = []
391             bufs0.append(c0.compress(data0))
392
393             c1 = c0.copy()
394             bufs1 = bufs0[:]
395
396             bufs0.append(c0.compress(data0))
397             bufs0.append(c0.flush())
398             s0 = ''.join(bufs0)
399
400             bufs1.append(c1.compress(data1))
401             bufs1.append(c1.flush())
402             s1 = ''.join(bufs1)
403
404             self.assertEqual(zlib.decompress(s0),data0+data0)
405             self.assertEqual(zlib.decompress(s1),data0+data1)
406
407         def test_badcompresscopy(self):
408             # Test copying a compression object in an inconsistent state
409             c = zlib.compressobj()
410             c.compress(HAMLET_SCENE)
411             c.flush()
412             self.assertRaises(ValueError, c.copy)
413
414     if hasattr(zlib.decompressobj(), "copy"):
415         def test_decompresscopy(self):
416             # Test copying a decompression object
417             data = HAMLET_SCENE
418             comp = zlib.compress(data)
419
420             d0 = zlib.decompressobj()
421             bufs0 = []
422             bufs0.append(d0.decompress(comp[:32]))
423
424             d1 = d0.copy()
425             bufs1 = bufs0[:]
426
427             bufs0.append(d0.decompress(comp[32:]))
428             s0 = ''.join(bufs0)
429
430             bufs1.append(d1.decompress(comp[32:]))
431             s1 = ''.join(bufs1)
432
433             self.assertEqual(s0,s1)
434             self.assertEqual(s0,data)
435
436         def test_baddecompresscopy(self):
437             # Test copying a compression object in an inconsistent state
438             data = zlib.compress(HAMLET_SCENE)
439             d = zlib.decompressobj()
440             d.decompress(data)
441             d.flush()
442             self.assertRaises(ValueError, d.copy)
443
444     # Memory use of the following functions takes into account overallocation
445
446     @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3)
447     def test_big_compress_buffer(self, size):
448         c = zlib.compressobj(1)
449         compress = lambda s: c.compress(s) + c.flush()
450         self.check_big_compress_buffer(size, compress)
451
452     @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2)
453     def test_big_decompress_buffer(self, size):
454         d = zlib.decompressobj()
455         decompress = lambda s: d.decompress(s) + d.flush()
456         self.check_big_decompress_buffer(size, decompress)
457
458
459 def genblock(seed, length, step=1024, generator=random):
460     """length-byte stream of random data from a seed (in step-byte blocks)."""
461     if seed is not None:
462         generator.seed(seed)
463     randint = generator.randint
464     if length < step or step < 2:
465         step = length
466     blocks = []
467     for i in range(0, length, step):
468         blocks.append(''.join([chr(randint(0,255))
469                                for x in range(step)]))
470     return ''.join(blocks)[:length]
471
472
473
474 def choose_lines(source, number, seed=None, generator=random):
475     """Return a list of number lines randomly chosen from the source"""
476     if seed is not None:
477         generator.seed(seed)
478     sources = source.split('\n')
479     return [generator.choice(sources) for n in range(number)]
480
481
482
483 HAMLET_SCENE = """
484 LAERTES
485
486        O, fear me not.
487        I stay too long: but here my father comes.
488
489        Enter POLONIUS
490
491        A double blessing is a double grace,
492        Occasion smiles upon a second leave.
493
494 LORD POLONIUS
495
496        Yet here, Laertes! aboard, aboard, for shame!
497        The wind sits in the shoulder of your sail,
498        And you are stay'd for. There; my blessing with thee!
499        And these few precepts in thy memory
500        See thou character. Give thy thoughts no tongue,
501        Nor any unproportioned thought his act.
502        Be thou familiar, but by no means vulgar.
503        Those friends thou hast, and their adoption tried,
504        Grapple them to thy soul with hoops of steel;
505        But do not dull thy palm with entertainment
506        Of each new-hatch'd, unfledged comrade. Beware
507        Of entrance to a quarrel, but being in,
508        Bear't that the opposed may beware of thee.
509        Give every man thy ear, but few thy voice;
510        Take each man's censure, but reserve thy judgment.
511        Costly thy habit as thy purse can buy,
512        But not express'd in fancy; rich, not gaudy;
513        For the apparel oft proclaims the man,
514        And they in France of the best rank and station
515        Are of a most select and generous chief in that.
516        Neither a borrower nor a lender be;
517        For loan oft loses both itself and friend,
518        And borrowing dulls the edge of husbandry.
519        This above all: to thine ownself be true,
520        And it must follow, as the night the day,
521        Thou canst not then be false to any man.
522        Farewell: my blessing season this in thee!
523
524 LAERTES
525
526        Most humbly do I take my leave, my lord.
527
528 LORD POLONIUS
529
530        The time invites you; go; your servants tend.
531
532 LAERTES
533
534        Farewell, Ophelia; and remember well
535        What I have said to you.
536
537 OPHELIA
538
539        'Tis in my memory lock'd,
540        And you yourself shall keep the key of it.
541
542 LAERTES
543
544        Farewell.
545 """
546
547
548 def test_main():
549     test_support.run_unittest(
550         ChecksumTestCase,
551         ExceptionTestCase,
552         CompressTestCase,
553         CompressObjectTestCase
554     )
555
556 if __name__ == "__main__":
557     test_main()