"""
This module allows opening and reading local and remote files and decompress
them on-the-fly if needed. Remote files are read using urllib2 (except of
-"ssh://" URLs, which are handled differently). Supported compression types are:
-'bz2', 'gz', 'xz', 'tar.gz', 'tgz', 'tar.bz2', 'tar.xz'.
+"ssh://" URLs, which are handled differently). Supported file extentions are:
+'bz2', 'gz', 'xz', 'lzo' and a "tar" version of them: 'tar.bz2', 'tbz2', 'tbz',
+'tb2', 'tar.gz', 'tgz', 'tar.xz', 'txz', 'tar.lzo', 'tzo'. This module uses
+the following system programs for decompressing: pbzip2, bzip2, gzip, pigz, xz,
+lzop, and tar.
"""
import os
import errno
import urlparse
import logging
+import threading
import subprocess
-import BmapHelpers
+from bmaptools import BmapHelpers
# Disable the following pylint errors and recommendations:
# * Instance of X has no member Y (E1101), because it produces
# * Too many instance attributes (R0902)
# * Too many branches (R0912)
# * Too many local variables (R0914)
+# * Too many statements (R0915)
# pylint: disable=E1101
# pylint: disable=R0902
# pylint: disable=R0912
# pylint: disable=R0914
+# pylint: disable=R0915
# A list of supported compression types
-SUPPORTED_COMPRESSION_TYPES = ('bz2', 'gz', 'xz', 'tar.gz', 'tgz', 'tar.bz2',
- 'tar.xz')
+SUPPORTED_COMPRESSION_TYPES = ('bz2', 'gz', 'xz', 'lzo', 'tar.gz', 'tar.bz2',
+ 'tar.xz', 'tar.lzo')
def _fake_seek_forward(file_obj, cur_pos, offset, whence=os.SEEK_SET):
"""
"""
pass
-class _CompressedFile(object):
- """
- This class implements transparent reading from a compressed file-like
- object and decompressing its contents on-the-fly.
- """
-
- def __init__(self, file_obj, decompress_func=None, chunk_size=None):
- """
- Class constructor. The 'file_ojb' argument is the compressed file-like
- object to read from. The 'decompress_func()' function is a function to
- use for decompression.
-
- The 'chunk_size' parameter may be used to limit the amount of data read
- from the input file at a time and it is assumed to be used with
- compressed files. This parameter has a big effect on the memory
- consumption in case the input file is a compressed stream of all
- zeroes. If we read a big chunk of such a compressed stream and
- decompress it, the length of the decompressed buffer may be huge. For
- example, when 'chunk_size' is 128KiB, the output buffer for a 4GiB .gz
- file filled with all zeroes is about 31MiB. Bzip2 is more dangerous -
- when 'chunk_size' is only 1KiB, the output buffer for a 4GiB .bz2 file
- filled with all zeroes is about 424MiB and when 'chunk_size' is 128
- bytes it is about 77MiB.
- """
-
- self._file_obj = file_obj
- self._decompress_func = decompress_func
- if chunk_size:
- self._chunk_size = chunk_size
- else:
- self._chunk_size = 128 * 1024
- self._pos = 0
- self._buffer = ''
- self._buffer_pos = 0
- self._eof = False
-
- def seek(self, offset, whence=os.SEEK_SET):
- """The 'seek()' method, similar to the one file objects have."""
- self._pos = _fake_seek_forward(self, self._pos, offset, whence)
-
- def tell(self):
- """The 'tell()' method, similar to the one file objects have."""
- return self._pos
-
- def _read_from_buffer(self, length):
- """Read from the internal buffer."""
- buffer_len = len(self._buffer)
- if buffer_len - self._buffer_pos > length:
- data = self._buffer[self._buffer_pos:self._buffer_pos + length]
- self._buffer_pos += length
- else:
- data = self._buffer[self._buffer_pos:]
- self._buffer = ''
- self._buffer_pos = 0
-
- return data
-
- def read(self, size):
- """
- Read the compressed file, uncompress the data on-the-fly, and return
- 'size' bytes of the uncompressed data.
- """
-
- assert self._pos >= 0
- assert self._buffer_pos >= 0
- assert self._buffer_pos <= len(self._buffer)
-
- if self._eof:
- return ''
-
- # Fetch the data from the buffers first
- data = self._read_from_buffer(size)
- size -= len(data)
-
- # If the buffers did not contain all the requested data, read them,
- # decompress, and buffer.
-
- while size > 0:
- buf = self._file_obj.read(self._chunk_size)
- if not buf:
- self._eof = True
- break
-
- buf = self._decompress_func(buf)
- if not buf:
- continue
-
- assert len(self._buffer) == 0
- assert self._buffer_pos == 0
-
- # The decompressor may return more data than we requested. Save the
- # extra data in an internal buffer.
- if len(buf) >= size:
- self._buffer = buf
- data += self._read_from_buffer(size)
- else:
- data += buf
-
- size -= len(buf)
-
- self._pos += len(data)
-
- return data
-
- def close(self):
- """Close the '_CompressedFile' file-like object."""
- pass
-
def _decode_sshpass_exit_code(code):
"""
A helper function which converts "sshpass" command-line tool's exit code
self.bz2file_found = False
# Whether the file is behind an URL
self.is_url = False
-
- # Wait for this child process in the destructor
- self._child_process = None
-
+ # List of child processes we forked
+ self._child_processes = []
+ # The reader thread
+ self._rthread = None
+ # This variable becomes 'True' when the instance of this class is not
+ # usable any longer.
+ self._done = False
# There may be a chain of open files, and we save the intermediate file
# objects in the 'self._f_objs' list. The final file object is stored
# in th elast element of the list.
#
- # For example, when the path is an URL to a tar.xz file, the chain of
+ # For example, when the path is an URL to a bz2 file, the chain of
# opened file will be:
# o self._f_objs[0] is the liburl2 file-like object
- # o self._f_objs[1] is the lzma file-like object
- # o self._f_objs[2] is the tarfile file-like object
- # o self._f_objs[3] is the tarfilemember file-like object
+ # o self._f_objs[1] is the stdout of the 'bzip2' process
self._f_objs = []
self._force_fake_seek = False
def __del__(self):
"""The class destructor which closes opened files."""
- for _file_obj in self._f_objs:
- if _file_obj:
- _file_obj.close()
+ self._done = True
+
+ for child in self._child_processes:
+ child.kill()
+
+ if self._rthread:
+ self._rthread.join()
+
+ for file_obj in self._f_objs:
+ file_obj.close()
- def _open_tarfile(self):
+ def _read_thread(self, f_from, f_to):
"""
- This is a helper function for '_open_compressed_file' which is called
- when the file is a tar archive.
+ This function is used when reading compressed files. It runs in a
+ spearate thread, reads data from the 'f_from' file-like object, and
+ writes them to the 'f_to' file-like object. 'F_from' may be a urllib2
+ object, while 'f_to' is usually stdin of the decompressor process.
"""
- import tarfile
+ chunk_size = 1024*1024
+ while not self._done:
+ buf = f_from.read(chunk_size)
+ if not buf:
+ break
- f_obj = tarfile.open(fileobj=self._f_objs[-1], mode='r|*')
- self._f_objs.append(f_obj)
+ f_to.write(buf)
- member = self._f_objs[-1].next()
- self.size = member.size
- f_obj = self._f_objs[-1].extractfile(member)
- self._f_objs.append(f_obj)
+ # This will make sure the process decompressor gets EOF and exits, as
+ # well as ublocks processes waiting on decompressor's stdin.
+ f_to.close()
def _open_compressed_file(self):
"""
compressed.
"""
- try:
- if self.name.endswith('.gz') or self.name.endswith('.gzip') \
- or self.name.endswith('.tgz'):
- import zlib
-
- self.compression_type = 'gzip'
- decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
- f_obj = _CompressedFile(self._f_objs[-1],
- decompressor.decompress)
- self._f_objs.append(f_obj)
-
- if self.name.endswith('.tar.gz') or self.name.endswith('.tgz'):
- self._open_tarfile()
- elif self.name.endswith('.bz2'):
- import bz2
-
- # Let's try to use the bz2file module, which is a backport from
- # python 3.3 available in PyPI. It supports multiple streams
- # (pbzip2) and handles handles out-of-memory issues nicely.
- try:
- import bz2file
-
- self.bz2file_found = True
- f_obj = bz2file.BZ2File(self._f_objs[-1], 'r')
- except ImportError:
- import bz2
-
- f_obj = _CompressedFile(self._f_objs[-1],
- bz2.BZ2Decompressor().decompress, 128)
-
- self.compression_type = 'bzip2'
- self._f_objs.append(f_obj)
-
- if self.name.endswith('.tar.bz2'):
- self._open_tarfile()
- elif self.name.endswith('.xz'):
- try:
- import lzma
- except ImportError:
- try:
- from backports import lzma # pylint: disable=F0401
- except ImportError:
- raise Error("cannot import the \"lzma\" python module, "
- "it's required for decompressing .xz files")
-
- self.compression_type = 'xz'
- f_obj = _CompressedFile(self._f_objs[-1],
- lzma.LZMADecompressor().decompress, 128)
- self._f_objs.append(f_obj)
-
- if self.name.endswith('.tar.xz'):
- self._open_tarfile()
+ def is_gzip(name):
+ """Returns 'True' if file 'name' is compressed with 'gzip'."""
+ if name.endswith('.gzip') or \
+ (name.endswith('.gz') and not name.endswith('.tar.gz')):
+ return True
+ return False
+
+ def is_bzip2(name):
+ """Returns 'True' if file 'name' is compressed with 'bzip2'."""
+ if name.endswith('.bz2') and not name.endswith('.tar.bz2'):
+ return True
+ return False
+
+ def is_xz(name):
+ """Returns 'True' if file 'name' is compressed with 'xz'."""
+ if name.endswith('.xz') and not name.endswith('.tar.xz'):
+ return True
+ return False
+
+ def is_lzop(name):
+ """Returns 'True' if file 'name' is compressed with 'lzop'."""
+ if name.endswith('.lzo') and not name.endswith('.tar.lzo'):
+ return True
+ return False
+
+ def is_tar_gz(name):
+ """
+ Returns 'True' if file 'name' is a tar archive compressed with
+ 'gzip'.
+ """
+
+ if name.endswith('.tar.gz') or name.endswith('.tgz'):
+ return True
+ return False
+
+ def is_tar_bz2(name):
+ """
+ Returns 'True' if file 'name' is a tar archive compressed with
+ 'bzip2'.
+ """
+
+ if name.endswith('.tar.bz2') or name.endswith('.tbz') or \
+ name.endswith('.tbz2') or name.endswith('.tb2'):
+ return True
+ return False
+
+ def is_tar_xz(name):
+ """
+ Returns 'True' if file 'name' is a tar archive compressed with 'xz'.
+ """
+
+ if name.endswith('.tar.xz') or name.endswith('.txz'):
+ return True
+ return False
+
+ def is_tar_lzo(name):
+ """
+ Returns 'True' if file 'name' is a tar archive compressed with
+ 'lzop'.
+ """
+
+ if name.endswith('.tar.lzo') or name.endswith('.tzo'):
+ return True
+ return False
+
+ archiver = None
+ if is_tar_gz(self.name) or is_gzip(self.name):
+ self.compression_type = 'gzip'
+ if BmapHelpers.program_is_available("pigz"):
+ decompressor = "pigz"
else:
- if not self.is_url:
- self.size = os.fstat(self._f_objs[-1].fileno()).st_size
- except IOError as err:
- raise Error("cannot open file '%s': %s" % (self.name, err))
+ decompressor = "gzip"
+
+ if is_gzip(self.name):
+ args = "-d -c"
+ else:
+ archiver = "tar"
+ args = "-x -z -O"
+ elif is_tar_bz2(self.name) or is_bzip2(self.name):
+ self.compression_type = 'bzip2'
+ if BmapHelpers.program_is_available("pbzip2"):
+ decompressor = "pbzip2"
+ else:
+ decompressor = "bzip2"
+
+ if is_bzip2(self.name):
+ args = "-d -c"
+ else:
+ archiver = "tar"
+ args = "-x -j -O"
+ elif is_tar_xz(self.name) or is_xz(self.name):
+ self.compression_type = 'xz'
+ decompressor = "xz"
+ if is_xz(self.name):
+ args = "-d -c"
+ else:
+ archiver = "tar"
+ args = "-x -J -O"
+ elif is_tar_lzo(self.name) or is_lzop(self.name):
+ self.compression_type = 'lzo'
+ decompressor = "lzop"
+ if is_lzop(self.name):
+ args = "-d -c"
+ else:
+ archiver = "tar"
+ args = "-x --lzo -O"
+ else:
+ if not self.is_url:
+ self.size = os.fstat(self._f_objs[-1].fileno()).st_size
+ return
+
+ # Make sure decompressor and the archiver programs are available
+ if not BmapHelpers.program_is_available(decompressor):
+ raise Error("the \"%s\" program is not available but it is "
+ "required decompressing \"%s\""
+ % (decompressor, self.name))
+ if archiver and not BmapHelpers.program_is_available(archiver):
+ raise Error("the \"%s\" program is not available but it is "
+ "required reading \"%s\"" % (archiver, self.name))
+
+ # Start the decompressor process. We'll send the data to its stdin and
+ # read the decompressed data from its stdout.
+ if archiver:
+ args = archiver + " " + args
+ else:
+ args = decompressor + " " + args
+ child_process = subprocess.Popen(args, shell=True,
+ bufsize=1024*1024,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ args = (self._f_objs[-1], child_process.stdin, )
+ self._rthread = threading.Thread(target=self._read_thread, args=args)
+ self._rthread.start()
+
+ self._force_fake_seek = True
+ self._f_objs.append(child_process.stdout)
+ self._child_processes.append(child_process)
def _open_url_ssh(self, parsed_url):
"""
"permissions" % (path, hostname))
# Read the entire file using 'cat'
- self._child_process = subprocess.Popen(popen_args + ["cat " + path],
- stdout=subprocess.PIPE)
+ child_process = subprocess.Popen(popen_args + ["cat " + path],
+ stdout=subprocess.PIPE)
# Now the contents of the file should be available from sub-processes
# stdout
- self._f_objs.append(self._child_process.stdout)
+ self._f_objs.append(child_process.stdout)
+ self._child_processes.append(child_process)
self.is_url = True
self._force_fake_seek = True
self.is_url = True
self._f_objs.append(f_obj)
- def _get_pbzip2_error_string(self):
- """
- This is a helper function which returns a string which describes the
- problem of decompressing multi-stream bzip2 archives.
- """
-
- res = "file \"%res\" is a multi-stream bz2 archive " % self.name
- res += "(pbzip2) and it is not supported by python 2.x\n"
- res += "Please, install the \"bz2file\" python library from PyPI to "
- res += "support multi-stream archives. Here is an example of how this "
- res += "could be done in Fedora:\n"
- res += "$ yum install python-pip\n"
- res += "$ pip install bz2file"
-
- return res
-
def read(self, size=-1):
"""
Read the data from the file or URL and and uncompress it on-the-fly if
if size < 0:
size = 0xFFFFFFFFFFFFFFFF
-
- try:
- buf = self._f_objs[-1].read(size)
- except EOFError:
- if self.compression_type == 'bzip2' and not self.bz2file_found:
- # The file is probably compressed with 'pbzip2'
- raise Error(self._get_pbzip2_error_string())
- else:
- raise
-
+ buf = self._f_objs[-1].read(size)
self._pos += len(buf)
+
return buf
def seek(self, offset, whence=os.SEEK_SET):
"""The 'seek()' method, similar to the one file objects have."""
- try:
- if self._force_fake_seek or not hasattr(self._f_objs[-1], "seek"):
- self._pos = _fake_seek_forward(self._f_objs[-1], self._pos,
- offset, whence)
- else:
- self._f_objs[-1].seek(offset, whence)
- except EOFError:
- if self.compression_type == 'bzip2' and not self.bz2file_found:
- # The file is probably compressed with 'pbzip2'
- raise Error(self._get_pbzip2_error_string())
- else:
- raise
+ if self._force_fake_seek or not hasattr(self._f_objs[-1], "seek"):
+ self._pos = _fake_seek_forward(self._f_objs[-1], self._pos,
+ offset, whence)
+ else:
+ self._f_objs[-1].seek(offset, whence)
def tell(self):
"""The 'tell()' method, similar to the one file objects have."""