# Copyright 2015 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 import errno import re import sys if sys.hexversion >= 0x3000000: basestring = str from portage import _encodings, _unicode_encode from portage.exception import FileNotFound, PermissionDenied _compressors = { "bzip2": { "compress": "${PORTAGE_BZIP2_COMMAND} ${BINPKG_COMPRESS_FLAGS}", "decompress": "${PORTAGE_BUNZIP2_COMMAND}", "decompress_alt": "${PORTAGE_BZIP2_COMMAND} -d", "package": "app-arch/bzip2", }, "gzip": { "compress": "gzip ${BINPKG_COMPRESS_FLAGS}", "decompress": "gzip -d", "package": "app-arch/gzip", }, "lz4": { "compress": "lz4 ${BINPKG_COMPRESS_FLAGS}", "decompress": "lz4 -d", "package": "app-arch/lz4", }, "lzip": { "compress": "lzip ${BINPKG_COMPRESS_FLAGS}", "decompress": "lzip -d", "package": "app-arch/lzip", }, "lzop": { "compress": "lzop ${BINPKG_COMPRESS_FLAGS}", "decompress": "lzop -d", "package": "app-arch/lzop", }, "xz": { "compress": "xz ${BINPKG_COMPRESS_FLAGS}", "decompress": "xz -d", "package": "app-arch/xz-utils", }, "zstd": { "compress": "zstd ${BINPKG_COMPRESS_FLAGS}", "decompress": "zstd -d --long=31", "package": "app-arch/zstd", }, } _compression_re = re.compile(b'^(' + b'(?P\x42\x5a\x68\x39)|' + b'(?P\x1f\x8b)|' + b'(?P(?:\x04\x22\x4d\x18|\x02\x21\x4c\x18))|' + b'(?PLZIP)|' + b'(?P\x89LZO\x00\x0d\x0a\x1a\x0a)|' + b'(?P\xfd\x37\x7a\x58\x5a\x00)|' + b'(?P([\x22-\x28]\xb5\x2f\xfd)))') _max_compression_re_len = 9 def compression_probe(f): """ Identify the compression type of a file. Returns one of the following identifier strings: bzip2 gzip lz4 lzip lzop xz zstd @param f: a file path, or file-like object @type f: str or file @return: a string identifying the compression type, or None if the compression type is unrecognized @rtype str or None """ open_file = isinstance(f, basestring) if open_file: try: f = open(_unicode_encode(f, encoding=_encodings['fs'], errors='strict'), mode='rb') except IOError as e: if e.errno == PermissionDenied.errno: raise PermissionDenied(f) elif e.errno in (errno.ENOENT, errno.ESTALE): raise FileNotFound(f) else: raise try: return _compression_probe_file(f) finally: if open_file: f.close() def _compression_probe_file(f): m = _compression_re.match(f.read(_max_compression_re_len)) if m is not None: for k, v in m.groupdict().items(): if v is not None: return k return None