return holes
-def generate_test_files(max_size = 4 * 1024 * 1024, directory = None):
+def generate_test_files(max_size = 4 * 1024 * 1024, directory = None,
+ delete = True):
""" This is an iterator which generates files which other tests use as the
input for the testing. The iterator tries to generate "interesting" files
which cover various corner-cases. For example, a large hole file, a file
with no holes, files of unaligned length, etc.
The 'directory' argument specifies the directory path where the generated
- test files should be created.
+ test files should be created. The 'delete' argument specifies whether the
+ generated test files have to be automatically deleted.
Returns a tuple consisting of the open file object and a list of unmapped
block ranges (holes) in the file. """
# A block-sized hole
file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "4Khole_",
- dir = directory, suffix = ".img")
+ delete = delete, dir = directory,
+ suffix = ".img")
block_size = BmapHelpers.get_block_size(file_obj)
file_obj.truncate(block_size)
yield (file_obj, [(0, 0)])
# A block size + 1 byte hole
file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "4Khole_plus_1_",
- dir = directory, suffix = ".img")
+ delete = delete, dir = directory,
+ suffix = ".img")
file_obj.truncate(block_size + 1)
yield (file_obj, [(0, 0)])
file_obj.close()
# A block size - 1 byte hole
file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "4Khole_minus_1_",
- dir = directory, suffix = ".img")
+ delete = delete, dir = directory,
+ suffix = ".img")
file_obj.truncate(block_size - 1)
yield (file_obj, [(0, 0)])
file_obj.close()
# A 1-byte hole
file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "1byte_hole_",
- dir = directory, suffix = ".img")
+ delete = delete, dir = directory,
+ suffix = ".img")
file_obj.truncate(1)
yield (file_obj, [(0, 0)])
file_obj.close()
for i in xrange(10):
size = random.randint(1, max_size)
file_obj = tempfile.NamedTemporaryFile("wb+", suffix = ".img",
- dir = directory, prefix = "rand_hole_%d_" % i)
+ delete = delete, dir = directory,
+ prefix = "rand_hole_%d_" % i)
file_obj.truncate(size)
blocks_cnt = (size + block_size - 1) / block_size
yield (file_obj, [(0, blocks_cnt - 1)])
# The maximum size
file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "sparse_",
- dir = directory, suffix = ".img")
+ delete = delete, dir = directory,
+ suffix = ".img")
holes = create_random_sparse_file(file_obj, max_size)
yield (file_obj, holes)
file_obj.close()
# The maximum size + 1 byte
file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "sparse_plus_1_",
- dir = directory, suffix = ".img")
+ delete = delete, dir = directory,
+ suffix = ".img")
holes = create_random_sparse_file(file_obj, max_size + 1)
yield (file_obj, holes)
file_obj.close()
# The maximum size - 1 byte
file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "sparse_minus_1_",
- dir = directory, suffix = ".img")
+ delete = delete, dir = directory,
+ suffix = ".img")
holes = create_random_sparse_file(file_obj, max_size - 1)
yield (file_obj, holes)
file_obj.close()
for i in xrange(10):
size = random.randint(1, max_size)
file_obj = tempfile.NamedTemporaryFile("wb+", suffix = ".img",
- dir = directory, prefix = "sparse_%d_" % i)
+ delete = delete, dir = directory,
+ prefix = "sparse_%d_" % i)
holes = create_random_sparse_file(file_obj, size)
yield (file_obj, holes)
file_obj.close()
raise Error("mismatch for hole %d-%d, it is %d-%d in file2" \
% (range1[0], range1[1], range2[0], range2[1]))
-def _generate_compressed_files(file_obj):
+def _generate_compressed_files(file_obj, delete = True):
""" This is an iterator which generates compressed versions of a file
- represented by a file object 'file_obj'. """
+ represented by a file object 'file_obj'.
+
+ The 'delete' argument specifies whether the compressed files that this
+ iterator generates have to be automatically deleted. """
import bz2
import gzip
# Generate a .bz2 version of the file
tmp_file_obj = tempfile.NamedTemporaryFile('wb+', prefix = prefix,
- dir = directory, suffix = '.bz2')
+ delete = delete, dir = directory,
+ suffix = '.bz2')
bz2_file_obj = bz2.BZ2File(tmp_file_obj.name, 'wb')
file_obj.seek(0)
shutil.copyfileobj(file_obj, bz2_file_obj)
# Generate a .gz version of the file
tmp_file_obj = tempfile.NamedTemporaryFile('wb+', prefix = prefix,
- dir = directory, suffix = '.gz')
+ delete = delete, dir = directory,
+ suffix = '.gz')
gzip_file_obj = gzip.GzipFile(tmp_file_obj.name, 'wb')
file_obj.seek(0)
shutil.copyfileobj(file_obj, gzip_file_obj)
return hash_obj.hexdigest()
-def _do_test(f_image):
+def _do_test(f_image, delete = True):
"""" A basic test for the bmap creation and copying functionality. It first
generates a bmap for file object 'f_image', and then copies the sparse file
to a different file, and then checks that the original file and the copy
- are identical. """
+ are identical.
+
+ The 'delete' argument specifies whether the temporary files that this
+ function creates have to be automatically deleted. """
# Make sure the temporary files start with the same name as 'f_image' in
# order to simplify debugging.
# Create and open a temporary file for a copy of the copy
f_copy = tempfile.NamedTemporaryFile("wb+", prefix = prefix,
- dir = directory, suffix = ".copy")
+ delete = delete, dir = directory,
+ suffix = ".copy")
# Create and open 2 temporary files for the bmap
f_bmap1 = tempfile.NamedTemporaryFile("w+", prefix = prefix,
- dir = directory, suffix = ".bmap1")
+ delete = delete, dir = directory,
+ suffix = ".bmap1")
f_bmap2 = tempfile.NamedTemporaryFile("w+", prefix = prefix,
- dir = directory, suffix = ".bmap2")
+ delete = delete, dir = directory,
+ suffix = ".bmap2")
image_sha1 = _calculate_sha1(f_image)
# Pass 4: test compressed files copying with bmap
#
- for compressed in _generate_compressed_files(f_image):
+ for compressed in _generate_compressed_files(f_image, delete = delete):
writer = BmapCopy.BmapCopy(compressed, f_copy, f_bmap1)
writer.copy()
# Pass 6: test compressed files copying without bmap
#
- for compressed in _generate_compressed_files(f_image):
+ for compressed in _generate_compressed_files(f_image, delete = delete):
writer = BmapCopy.BmapCopy(compressed, f_copy)
writer.copy()
""" The test entry point. Executes the '_do_test()' function for files
of different sizes, holes distribution and format. """
- for f_image, _ in tests.helpers.generate_test_files():
- _do_test(f_image)
+ # Delete all the test-related temporary files automatically
+ delete = True
+ # Create all the test-related temporary files in the default directory
+ # (usually /tmp).
+ directory = None
+
+ iterator = tests.helpers.generate_test_files(delete = delete,
+ directory = directory)
+ for f_image, _ in iterator:
+ _do_test(f_image, delete = delete)