return output_name
+def _do_zstd(input_name, compression=True):
+ """ Compress/decompress the file with 'zstd' utility.
+
+ @input_name: the file name to compress/decompress
+ @compress: True for compressing, False for decompressing
+ @retval: the path of the compressed/decompressed file
+ """
+ if which("zstd") is None:
+ msger.error("zstd is not in mic-bootstrap, please add zstd to mic-bootstrap")
+
+ compressor = "zstd"
+
+ if compression:
+ cmdln = [compressor,"--rm", "-f", input_name]
+ else:
+ cmdln = [compressor, "-d", "-f", input_name]
+
+ _call_external(cmdln)
+
+ if compression:
+ output_name = input_name + ".zst"
+ else:
+ output_name = os.path.splitext(input_name)[0]
+
+ return output_name
+
_COMPRESS_SUFFIXES = {
".lzo" : [".lzo"],
".gz" : [".gz"],
return os.path.exists(archive_name)
+
_ARCHIVE_SUFFIXES = {
"zip" : [".zip"],
"tar" : [".tar"],
"lzotar": [".tzo", ".tar.lzo"],
"gztar" : [".tgz", ".taz", ".tar.gz"],
"bztar" : [".tbz", ".tbz2", ".tar.bz", ".tar.bz2"],
+ "zsttar": [".tar.zst"],
}
_ARCHIVE_FORMATS = {
"lzotar": ( _make_tarball, {"compressor" : _do_lzop} ),
"gztar" : ( _make_tarball, {"compressor" : _do_gzip} ),
"bztar" : ( _make_tarball, {"compressor" : _do_bzip2} ),
+ "zsttar" : ( _make_tarball, {"compressor" : _do_zstd} ),
}
def get_archive_formats():
if self.pack_to:
(tar, ext) = os.path.splitext(self.pack_to)
- if ext in (".gz", ".bz2", ".lzo", ".bz") and tar.endswith(".tar"):
+ if ext in (".gz", ".bz2", ".lzo", ".bz", ".zst") and tar.endswith(".tar"):
ext = ".tar" + ext
if ext not in get_archive_suffixes():
self.pack_to += ".tar"
archive_formats = archive.get_archive_formats()
archive_formats.sort()
self.assertEqual(archive_formats,
- ["bztar", "gztar", "lzotar", "tar", 'zip'])
+ ["bztar", "gztar", "lzotar", "tar", 'zip', 'zsttar'])
def test_get_archive_suffixes(self):
"""Test get archive suffixes"""
archive_suffixes.sort()
self.assertEqual(archive_suffixes,
- ['.tar', '.tar.bz', '.tar.bz2', '.tar.gz', '.tar.lzo',
+ ['.tar', '.tar.bz', '.tar.bz2', '.tar.gz', '.tar.lzo','.tar.zst',
'.taz', '.tbz', '.tbz2', '.tgz', '.tzo', '.zip'])
def test_make_archive_negtive_archive_name_is_required(self):
self.assertTrue(os.path.exists(os.path.join(out_dir, item)))
shutil.rmtree(out_dir)
+ def test_make_archive_tar_zst_with_different_name(self):
+ """ Test make_archive format: tar.zst
+ it packs the source with another name """
+ for item in self.files + self.dirs:
+ out_file = 'df.tar.zst'
+ self.assertTrue(archive.make_archive(out_file, item))
+ self.assertTrue(os.path.exists(out_file))
+ os.remove(out_file)
+
+ def test_make_archive_tar_zst(self):
+ """ Test make_archive format: tar.zst"""
+ for item in self.files + self.dirs:
+ out_file = '%s.tar.zst' % item
+ self.assertTrue(archive.make_archive(out_file, item))
+ self.assertTrue(os.path.exists(out_file))
+ os.remove(out_file)
+
if __name__ == "__main__":
unittest.main()