# Create the parser for the "copy" command
#
text = "write an image to a block device using bmap"
- parser_copy = subparsers.add_parser("copy", help = text)
- parser_copy.set_defaults(func = copy_command)
+ parser_copy = subparsers.add_parser("copy", help=text)
+ parser_copy.set_defaults(func=copy_command)
# The first positional argument - image file
text = "the image file to copy. Supported formats: uncompressed, " + \
", ".join(TransRead.SUPPORTED_COMPRESSION_TYPES)
- parser_copy.add_argument("image", help = text)
+ parser_copy.add_argument("image", help=text)
# The second positional argument - block device node
text = "the destination file or device node to copy the image to"
- parser_copy.add_argument("dest", help = text)
+ parser_copy.add_argument("dest", help=text)
# The --bmap option
text = "the block map file for the image"
- parser_copy.add_argument("--bmap", help = text)
+ parser_copy.add_argument("--bmap", help=text)
# The --nobmap option
text = "allow copying without a bmap file"
- parser_copy.add_argument("--nobmap", action="store_true", help = text)
+ parser_copy.add_argument("--nobmap", action="store_true", help=text)
# The --no-verify option
text = "do not verify the data checksum while writing"
- parser_copy.add_argument("--no-verify", action="store_true", help = text)
+ parser_copy.add_argument("--no-verify", action="store_true", help=text)
return parser.parse_args()
# Bmap file checksum appeard in format 1.3
self._verify_bmap_checksum()
- def __init__(self, image, dest, bmap = None, image_size = None,
- logger = None):
+ def __init__(self, image, dest, bmap=None, image_size=None, logger=None):
"""
The class constructor. The parameters are:
image - file-like object of the image which should be copied,
else:
# Do not rotate the wheel too fast
now = datetime.datetime.now()
- min_delta = datetime.timedelta(milliseconds = 250)
+ min_delta = datetime.timedelta(milliseconds=250)
if now - self._progress_time < min_delta:
return
self._progress_time = now
self._batch_queue.put(None)
- def copy(self, sync = True, verify = True):
+ def copy(self, sync=True, verify=True):
"""
Copy the image to the destination file using bmap. The 'sync' argument
defines whether the destination file has to be synchronized upon
raise Error("cannot set the max. I/O ratio back to '%s': %s" \
% (self._old_max_ratio_value, err))
- def copy(self, sync = True, verify = True):
+ def copy(self, sync=True, verify=True):
"""
The same as in the base class but tunes the block device for better
performance before starting writing. Additionally, it forces block
finally:
self._restore_bdev_settings()
- def __init__(self, image, dest, bmap = None, image_size = None,
- logger = None):
+ def __init__(self, image, dest, bmap=None, image_size=None, logger=None):
"""
The same as the constructor of the 'BmapCopy' base class, but adds
useful guard-checks specific to block devices.
return hash_obj.hexdigest()
- def generate(self, include_checksums = True):
+ def generate(self, include_checksums=True):
"""
Generate bmap for the image file. If 'include_checksums' is 'True',
also generate SHA1 checksums for block ranges.
self._f_image_needs_close = True
- def __init__(self, image, buf_size = DEFAULT_BUFFER_SIZE):
+ def __init__(self, image, buf_size=DEFAULT_BUFFER_SIZE):
"""
Initialize a class instance. The 'image' argument is full path to the
file to operate on, or a file object to operate on.
# A list of supported compression types
SUPPORTED_COMPRESSION_TYPES = ('bz2', 'gz', 'tar.gz', 'tgz', 'tar.bz2')
-def _fake_seek_forward(file_obj, cur_pos, offset, whence = os.SEEK_SET):
+def _fake_seek_forward(file_obj, cur_pos, offset, whence=os.SEEK_SET):
"""
This function implements the 'seek()' method for file object 'file_obj'.
Only seeking forward and is allowed, and 'whence' may be either
object and decompressing its contents on-the-fly.
"""
- def __init__(self, file_obj, decompress_func = None, chunk_size = None):
+ def __init__(self, file_obj, decompress_func=None, chunk_size=None):
"""
Class constructor. The 'file_ojb' argument is the compressed file-like
object to read from. The 'decompress_func()' function is a function to
self._buffer_pos = 0
self._eof = False
- def seek(self, offset, whence = os.SEEK_SET):
+ def seek(self, offset, whence=os.SEEK_SET):
"""The 'seek()' method, similar to the one file objects have."""
self._pos = _fake_seek_forward(self, self._pos, offset, whence)
or self.name.endswith('.tgz'):
import tarfile
- tar = tarfile.open(fileobj = self._file_obj, mode = 'r|*')
+ tar = tarfile.open(fileobj=self._file_obj, mode='r|*')
member = tar.next()
self._transfile_obj = tar.extractfile(member)
self.size = member.size
# Make sure the ssh client program is installed
try:
- subprocess.Popen("ssh", stderr = subprocess.PIPE,
- stdout = subprocess.PIPE).wait()
+ subprocess.Popen("ssh", stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE).wait()
except OSError as err:
if err.errno == os.errno.ENOENT:
raise Error("\"sshpass\" program not found, but it is " \
# Make sure the sshpass program is installed
try:
- subprocess.Popen("sshpass", stderr = subprocess.PIPE,
- stdout = subprocess.PIPE).wait()
+ subprocess.Popen("sshpass", stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE).wait()
except OSError as err:
if err.errno == os.errno.ENOENT:
raise Error("\"sshpass\" program not found, but it is " \
# host
command = "test -f " + path + " && test -r " + path
child_process = subprocess.Popen(popen_args + [command],
- stdout = subprocess.PIPE)
+ stdout=subprocess.PIPE)
child_process.wait()
if child_process.returncode != 0:
raise Error("\"%s\" on \"%s\" cannot be read: make sure it " \
# Read the entire file using 'cat'
self._child_process = subprocess.Popen(popen_args + ["cat " + path],
- stdout = subprocess.PIPE)
+ stdout=subprocess.PIPE)
# Now the contents of the file should be available from sub-processes
# stdout
raise Error("cannot open own temporary file '%s': %s" \
% (tmp_file_obj.name, err))
- def __init__(self, filepath, local = False):
+ def __init__(self, filepath, local=False):
"""
Class constructor. The 'filepath' argument is the full path to the file
to read transparently. If 'local' is True, then the file-like object is
if local:
self._create_local_copy()
- def read(self, size = -1):
+ def read(self, size=-1):
"""
Read the data from the file or URL and and uncompress it on-the-fly if
necessary.
if self._child_process:
self._child_process.wait()
- def seek(self, offset, whence = os.SEEK_SET):
+ def seek(self, offset, whence=os.SEEK_SET):
"""The 'seek()' method, similar to the one file objects have."""
if self._force_fake_seek or not hasattr(self._transfile_obj, "seek"):
from setuptools import setup, find_packages
setup(
- name = "bmap-tools",
- description = "Bmap tools",
- author = "Artem Bityutskiy",
- author_email = "artem.bityutskiy@linux.intel.com",
- version = "2.4",
- scripts = ['bmaptool'],
- packages = find_packages(exclude = [ "test*" ]),
+ name="bmap-tools",
+ description="Bmap tools",
+ author="Artem Bityutskiy",
+ author_email="artem.bityutskiy@linux.intel.com",
+ version="2.4",
+ scripts=['bmaptool'],
+ packages=find_packages(exclude=["test*"]),
license='GPLv2',
long_description="Tools to generate block map (AKA bmap) and copy " \
"images using bmap",
file_obj.flush()
-def generate_test_files(max_size = 4 * 1024 * 1024, directory = None,
- delete = True):
+def generate_test_files(max_size=4*1024*1024, directory=None, delete=True):
"""
This is a generator which yields files which other tests use as the input
for the testing. The generator tries to yield "interesting" files which
#
# A block-sized hole
- file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "4Khole_",
- delete = delete, dir = directory,
- suffix = ".img")
+ file_obj = tempfile.NamedTemporaryFile("wb+", prefix="4Khole_",
+ delete=delete, dir=directory,
+ suffix=".img")
block_size = BmapHelpers.get_block_size(file_obj)
file_obj.truncate(block_size)
yield (file_obj, block_size, [], [(0, 0)])
file_obj.close()
# A block size + 1 byte hole
- file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "4Khole_plus_1_",
- delete = delete, dir = directory,
- suffix = ".img")
+ file_obj = tempfile.NamedTemporaryFile("wb+", prefix="4Khole_plus_1_",
+ delete=delete, dir=directory,
+ suffix=".img")
file_obj.truncate(block_size + 1)
yield (file_obj, block_size + 1, [], [(0, 1)])
file_obj.close()
# A block size - 1 byte hole
- file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "4Khole_minus_1_",
- delete = delete, dir = directory,
- suffix = ".img")
+ file_obj = tempfile.NamedTemporaryFile("wb+", prefix="4Khole_minus_1_",
+ delete=delete, dir=directory,
+ suffix=".img")
file_obj.truncate(block_size - 1)
yield (file_obj, block_size - 1, [], [(0, 0)])
file_obj.close()
# A 1-byte hole
- file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "1byte_hole_",
- delete = delete, dir = directory,
- suffix = ".img")
+ file_obj = tempfile.NamedTemporaryFile("wb+", prefix="1byte_hole_",
+ delete=delete, dir=directory,
+ suffix=".img")
file_obj.truncate(1)
yield (file_obj, 1, [], [(0, 0)])
file_obj.close()
# And 10 holes of random size
for i in xrange(10):
size = random.randint(1, max_size)
- file_obj = tempfile.NamedTemporaryFile("wb+", suffix = ".img",
- delete = delete, dir = directory,
- prefix = "rand_hole_%d_" % i)
+ file_obj = tempfile.NamedTemporaryFile("wb+", suffix=".img",
+ delete=delete, dir=directory,
+ prefix="rand_hole_%d_"%i)
file_obj.truncate(size)
blocks_cnt = (size + block_size - 1) / block_size
yield (file_obj, size, [], [(0, blocks_cnt - 1)])
#
# The maximum size
- file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "sparse_",
- delete = delete, dir = directory,
- suffix = ".img")
+ file_obj = tempfile.NamedTemporaryFile("wb+", prefix="sparse_",
+ delete=delete, dir=directory,
+ suffix=".img")
mapped, unmapped = _create_random_sparse_file(file_obj, max_size)
yield (file_obj, max_size, mapped, unmapped)
file_obj.close()
# The maximum size + 1 byte
- file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "sparse_plus_1_",
- delete = delete, dir = directory,
- suffix = ".img")
+ file_obj = tempfile.NamedTemporaryFile("wb+", prefix="sparse_plus_1_",
+ delete=delete, dir=directory,
+ suffix=".img")
mapped, unmapped = _create_random_sparse_file(file_obj, max_size + 1)
yield (file_obj, max_size + 1, mapped, unmapped)
file_obj.close()
# The maximum size - 1 byte
- file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "sparse_minus_1_",
- delete = delete, dir = directory,
- suffix = ".img")
+ file_obj = tempfile.NamedTemporaryFile("wb+", prefix="sparse_minus_1_",
+ delete=delete, dir=directory,
+ suffix=".img")
mapped, unmapped = _create_random_sparse_file(file_obj, max_size - 1)
yield (file_obj, max_size - 1, mapped, unmapped)
file_obj.close()
# And 10 files of random size
for i in xrange(10):
size = random.randint(1, max_size)
- file_obj = tempfile.NamedTemporaryFile("wb+", suffix = ".img",
- delete = delete, dir = directory,
- prefix = "sparse_%d_" % i)
+ file_obj = tempfile.NamedTemporaryFile("wb+", suffix=".img",
+ delete=delete, dir=directory,
+ prefix="sparse_%d_"%i)
mapped, unmapped = _create_random_sparse_file(file_obj, size)
yield (file_obj, size, mapped, unmapped)
file_obj.close()
#
# A block-sized file
- file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "4Kmapped_",
- delete = delete, dir = directory,
- suffix = ".img")
+ file_obj = tempfile.NamedTemporaryFile("wb+", prefix="4Kmapped_",
+ delete=delete, dir=directory,
+ suffix=".img")
_create_random_file(file_obj, block_size)
yield (file_obj, block_size, [(0, 0)], [])
file_obj.close()
# A block size + 1 byte file
- file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "4Kmapped_plus_1_",
- delete = delete, dir = directory,
- suffix = ".img")
+ file_obj = tempfile.NamedTemporaryFile("wb+", prefix="4Kmapped_plus_1_",
+ delete=delete, dir=directory,
+ suffix=".img")
_create_random_file(file_obj, block_size + 1)
yield (file_obj, block_size + 1, [(0, 1)], [])
file_obj.close()
# A block size - 1 byte file
- file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "4Kmapped_minus_1_",
- delete = delete, dir = directory,
- suffix = ".img")
+ file_obj = tempfile.NamedTemporaryFile("wb+", prefix="4Kmapped_minus_1_",
+ delete=delete, dir=directory,
+ suffix=".img")
_create_random_file(file_obj, block_size - 1)
yield (file_obj, block_size - 1, [(0, 0)], [])
file_obj.close()
# A 1-byte file
- file_obj = tempfile.NamedTemporaryFile("wb+", prefix = "1byte_mapped_",
- delete = delete, dir = directory,
- suffix = ".img")
+ file_obj = tempfile.NamedTemporaryFile("wb+", prefix="1byte_mapped_",
+ delete=delete, dir=directory,
+ suffix=".img")
_create_random_file(file_obj, 1)
yield (file_obj, 1, [(0, 0)], [])
file_obj.close()
# And 10 mapped files of random size
for i in xrange(10):
size = random.randint(1, max_size)
- file_obj = tempfile.NamedTemporaryFile("wb+", suffix = ".img",
- delete = delete, dir = directory,
- prefix = "rand_mapped_%d_" % i)
+ file_obj = tempfile.NamedTemporaryFile("wb+", suffix=".img",
+ delete=delete, dir=directory,
+ prefix="rand_mapped_%d_" % i)
_create_random_file(file_obj, size)
blocks_cnt = (size + block_size - 1) / block_size
yield (file_obj, size, [(0, blocks_cnt - 1)], [])
raise Error("mismatch for hole %d-%d, it is %d-%d in file2" \
% (range1[0], range1[1], range2[0], range2[1]))
-def _generate_compressed_files(file_obj, delete = True):
+def _generate_compressed_files(file_obj, delete=True):
"""
This is a generator which yields compressed versions of a file represented
by a file object 'file_obj'.
directory = os.path.dirname(file_obj.name)
# Generate an uncompressed version of the file
- tmp_file_obj = tempfile.NamedTemporaryFile('wb+', prefix = prefix,
- delete = delete, dir = directory,
- suffix = '.uncompressed')
+ tmp_file_obj = tempfile.NamedTemporaryFile('wb+', prefix=prefix,
+ delete=delete, dir=directory,
+ suffix='.uncompressed')
file_obj.seek(0)
shutil.copyfileobj(file_obj, tmp_file_obj)
tmp_file_obj.flush()
tmp_file_obj.close()
# Generate a .bz2 version of the file
- tmp_file_obj = tempfile.NamedTemporaryFile('wb+', prefix = prefix,
- delete = delete, dir = directory,
- suffix = '.bz2')
+ tmp_file_obj = tempfile.NamedTemporaryFile('wb+', prefix=prefix,
+ delete=delete, dir=directory,
+ suffix='.bz2')
bz2_file_obj = bz2.BZ2File(tmp_file_obj.name, 'wb')
file_obj.seek(0)
shutil.copyfileobj(file_obj, bz2_file_obj)
tmp_file_obj.close()
# Generate a .gz version of the file
- tmp_file_obj = tempfile.NamedTemporaryFile('wb+', prefix = prefix,
- delete = delete, dir = directory,
- suffix = '.gz')
+ tmp_file_obj = tempfile.NamedTemporaryFile('wb+', prefix=prefix,
+ delete=delete, dir=directory,
+ suffix='.gz')
gzip_file_obj = gzip.GzipFile(tmp_file_obj.name, 'wb')
file_obj.seek(0)
shutil.copyfileobj(file_obj, gzip_file_obj)
tmp_file_obj.close()
# Generate a tar.gz version of the file
- tmp_file_obj = tempfile.NamedTemporaryFile('wb+', prefix = prefix,
- delete = delete, dir = directory,
- suffix = '.tar.gz')
+ tmp_file_obj = tempfile.NamedTemporaryFile('wb+', prefix=prefix,
+ delete=delete, dir=directory,
+ suffix='.tar.gz')
tgz_file_obj = tarfile.open(tmp_file_obj.name, "w:gz")
tgz_file_obj.add(file_obj.name)
tgz_file_obj.close()
tmp_file_obj.close()
# Generate a tar.bz2 version of the file
- tmp_file_obj = tempfile.NamedTemporaryFile('wb+', prefix = prefix,
- delete = delete, dir = directory,
- suffix = '.tar.bz2')
+ tmp_file_obj = tempfile.NamedTemporaryFile('wb+', prefix=prefix,
+ delete=delete, dir=directory,
+ suffix='.tar.bz2')
tbz2_file_obj = tarfile.open(tmp_file_obj.name, "w:bz2")
tbz2_file_obj.add(file_obj.name)
tbz2_file_obj.close()
if not hasattr(image, "read"):
f_image.close()
-def _do_test(f_image, image_size, delete = True):
+def _do_test(f_image, image_size, delete=True):
"""
A basic test for the bmap creation and copying functionality. It first
generates a bmap for file object 'f_image', and then copies the sparse file
directory = os.path.dirname(f_image.name)
# Create and open a temporary file for a copy of the copy
- f_copy = tempfile.NamedTemporaryFile("wb+", prefix = prefix,
- delete = delete, dir = directory,
- suffix = ".copy")
+ f_copy = tempfile.NamedTemporaryFile("wb+", prefix=prefix,
+ delete=delete, dir=directory,
+ suffix=".copy")
# Create and open 2 temporary files for the bmap
- f_bmap1 = tempfile.NamedTemporaryFile("w+", prefix = prefix,
- delete = delete, dir = directory,
- suffix = ".bmap1")
- f_bmap2 = tempfile.NamedTemporaryFile("w+", prefix = prefix,
- delete = delete, dir = directory,
- suffix = ".bmap2")
+ f_bmap1 = tempfile.NamedTemporaryFile("w+", prefix=prefix,
+ delete=delete, dir=directory,
+ suffix=".bmap1")
+ f_bmap2 = tempfile.NamedTemporaryFile("w+", prefix=prefix,
+ delete=delete, dir=directory,
+ suffix=".bmap2")
image_sha1 = _calculate_sha1(f_image)
# Pass 3: test compressed files copying with bmap
#
- for compressed in _generate_compressed_files(f_image, delete = delete):
+ for compressed in _generate_compressed_files(f_image, delete=delete):
_copy_image(compressed, f_copy, f_bmap1, image_sha1, image_size)
# Test without setting the size
# Pass 6: test compressed files copying without bmap
#
- for compressed in _generate_compressed_files(f_image, delete = delete):
+ for compressed in _generate_compressed_files(f_image, delete=delete):
_copy_image(compressed, f_copy, f_bmap1, image_sha1, image_size)
# Test without setting the size
# FIEMAP).
directory = '.'
- iterator = tests.helpers.generate_test_files(delete = delete,
- directory = directory)
+ iterator = tests.helpers.generate_test_files(delete=delete,
+ directory=directory)
for f_image, image_size, _, _ in iterator:
assert image_size == os.path.getsize(f_image.name)
- _do_test(f_image, image_size, delete = delete)
+ _do_test(f_image, image_size, delete=delete)
ranges_type, first_block, blocks_cnt,
check[0], check[1]))
-def _do_test(f_image, mapped, unmapped, buf_size = Fiemap.DEFAULT_BUFFER_SIZE):
+def _do_test(f_image, mapped, unmapped, buf_size=Fiemap.DEFAULT_BUFFER_SIZE):
"""
Verify that Fiemap reports the correct mapped and unmapped areas for the
'f_image' file object. The 'mapped' and 'unmapped' lists contain the