import hashlib import json import mimetypes import os import subprocess import warnings import chardet from PIL import Image from fontTools.ttLib import TTFont, TTLibError from common import tika class MimeGuesser: def guess_mime(self, full_path): raise NotImplementedError() class ContentMimeGuesser(MimeGuesser): def __init__(self): import magic self.libmagic = magic.Magic(mime=True) def guess_mime(self, full_path): try: return self.libmagic.from_file(full_path) except FileNotFoundError: return None class ExtensionMimeGuesser(MimeGuesser): def guess_mime(self, full_path): return mimetypes.guess_type(full_path, strict=False)[0] class FileParser: mime_types = [] is_default = False def parse(self, full_path: str): raise NotImplemented class FileCheckSumCalculator: def checksum(self, path: str) -> str: """ Calculate the checksum of a file :param path: path of the file :return: checksum """ raise NotImplementedError() class Md5CheckSumCalculator(FileCheckSumCalculator): def __init__(self): self.name = "md5" def checksum(self, path: str) -> str: """ Calculate the md5 checksum of a file :param path: path of the file :return: md5 checksum """ result = hashlib.md5() with open(path, "rb") as f: for block in iter(lambda: f.read(65536), b""): result.update(block) return result.hexdigest().upper() class Sha1CheckSumCalculator(FileCheckSumCalculator): def __init__(self): self.name = "sha1" def checksum(self, path: str) -> str: """ Calculate the sha1 checksum of a file :param path: path of the file :return: sha1 checksum """ result = hashlib.sha1() with open(path, "rb") as f: for block in iter(lambda: f.read(65536), b""): result.update(block) return result.hexdigest().upper() class Sha256CheckSumCalculator(FileCheckSumCalculator): def __init__(self): self.name = "sha256" def checksum(self, path: str) -> str: """ Calculate the sha256 checksum of a file :param path: path of the file :return: sha256 checksum """ result = hashlib.sha256() with open(path, "rb") as f: for block in iter(lambda: f.read(65536), b""): result.update(block) return result.hexdigest().upper() class GenericFileParser(FileParser): mime_types = [] is_default = True def __init__(self, checksum_calculators: list, root_dir: str): self.checksum_calculators = checksum_calculators self.root_dir = root_dir self.root_dir_len = len(root_dir) + 1 def parse(self, full_path: str) -> dict: """ Parse a generic file :param full_path: path of the file to parse :return: dict information about the file """ info = dict() file_stat = os.stat(full_path) path, name = os.path.split(full_path) name, extension = os.path.splitext(name) info["size"] = file_stat.st_size info["path"] = os.path.relpath(path, self.root_dir) info["name"] = name info["extension"] = extension[1:] info["mtime"] = file_stat.st_mtime # TODO: calculate all checksums at once for calculator in self.checksum_calculators: info[calculator.name] = calculator.checksum(full_path) return info class MediaFileParser(GenericFileParser): is_default = False relevant_properties = ["bit_rate", "nb_streams", "duration", "format_name", "format_long_name"] def __init__(self, checksum_calculators: list, root_dir): super().__init__(checksum_calculators, root_dir) self.mime_types = [ "video/3gpp", "video/mp4", "video/mpeg", "video/ogg", "video/quicktime", "video/webm", "video/x-flv", "video/x-mng", "video/x-ms-asf", "video/x-ms-wmv", "video/x-msvideo", "audio/basic", "auido/L24", "audio/mid", "audio/mpeg", "audio/mp4", "audio/x-aiff", "audio/ogg", "audio/vorbis" "audio/x-realaudio", "audio/x-wav", "audio/flac", "audio/x-monkeys-audio", "audio/wav", "audio/wave", "audio/x-wav", "audio/x-ms-wma", "audio/x-flac", ] def parse(self, full_path: str): info = super().parse(full_path) p = subprocess.Popen(["ffprobe", "-v", "quiet", "-print_format", "json=c=1", "-show_format", full_path], stdout=subprocess.PIPE) out, err = p.communicate() try: metadata = json.loads(out.decode("utf-8")) if "format" in metadata: if "duration" in metadata["format"]: info["duration"] = float(metadata["format"]["duration"]) if "format_long_name" in metadata["format"]: info["format_long_name"] = metadata["format"]["format_long_name"] if "tags" in metadata["format"]: if "genre" in metadata["format"]["tags"]: info["genre"] = metadata["format"]["tags"]["genre"] if "title" in metadata["format"]["tags"]: info["title"] = metadata["format"]["tags"]["title"] if "album" in metadata["format"]["tags"]: info["album"] = metadata["format"]["tags"]["album"] if "album_artist" in metadata["format"]["tags"]: info["album_artist"] = metadata["format"]["tags"]["album_artist"] except json.decoder.JSONDecodeError: print("json decode error:" + full_path) pass return info class PictureFileParser(GenericFileParser): is_default = False def __init__(self, checksum_calculators: list, root_dir): super().__init__(checksum_calculators, root_dir) self.mime_types = [ "image/bmp", "image/cgm", "image/cis-cod", "image/g3fax", "image/gif", "image/ief", "image/jpeg", "image/ktx", "image/pipeg", "image/pjpeg", "image/png", "image/prs.btif", "image/svg+xml", "image/tiff", "image/vnd.adobe.photoshop", "image/vnd.dece.graphic", "image/vnd.djvu", "image/vnd.dvb.subtitle", "image/vnd.dwg", "image/vnd.dxf", "image/vnd.fastbidsheet", "image/vnd.fpx", "image/vnd.fst", "image/vnd.fujixerox.edmics-mmr", "image/vnd.fujixerox.edmics-rlc", "image/vnd.ms-modi", "image/vnd.net-fpx", "image/vnd.wap.wbmp", "image/vnd.xiff", "image/webp", "image/x-citrix-jpeg", "image/x-citrix-png", "image/x-cmu-raster", "image/x-cmx", "image/x-icon", "image/x-pcx", "image/x-pict", "image/x-png", "image/x-portable-bitmap", "image/x-portable-graymap", "image/x-portable-pixmap", "image/x-rgb", "image/x-xbitmap", "image/x-xpixmap", "image/x-xwindowdump" ] def parse(self, full_path: str): info = super().parse(full_path) try: with open(full_path, "rb") as image_file: with Image.open(image_file) as image: info["mode"] = image.mode info["format_name"] = image.format info["width"] = image.width info["height"] = image.height except (OSError, ValueError): pass return info class TextFileParser(GenericFileParser): is_default = False def __init__(self, checksum_calculators: list, content_length: int, root_dir): super().__init__(checksum_calculators, root_dir) self.content_length = content_length self.mime_types = [ "text/asp", "text/css", "text/ecmascript", "text/html", "text/javascript", "text/mcf", "text/pascal", "text/plain", "text/richtext", "text/scriplet", "text/sgml", "text/tab-separated-values", "text/uri-list", "text/vnd.abc", "text/vnd.fmi.flexstor", "text/vnd.rn-realtext", "text/vnd.wap.wml", "text/vnd.wap.wmlscript", "text/webviewhtml", "text/x-asm", "text/x-audiosoft-intra", "text/x-c", "text/x-component", "text/x-fortran", "text/x-h", "text/x-java-source", "text/x-la-asf", "text/x-m", "text/x-pascal", "text/x-script", "text/x-script.csh", "text/x-script.elisp", "text/x-script.guile", "text/x-script.ksh", "text/x-script.lisp", "text/x-script.perl", "text/x-script.perl-module", "text/x-script.phyton", "text/x-script.rexx", "text/x-script.scheme", "text/x-script.sh", "text/x-script.tcl", "text/x-script.tcsh", "text/x-script.zsh", "text/x-server-parsed-html", "text/x-setext", "text/x-sgml", "text/x-speech", "text/x-uil", "text/x-uuencode", "text/x-vcalendar", "text/xml", "text/x-csrc", "text/csv", "text/x-c++src", "text/x-chdr", "text/markdown", "text/x-sh", "text/x-java", "text/x-python", "text/x-c++hdr", "text/x-tex", "text/x-diff", "text/x-haskell", "text/x-perl", "text/x-dsrc", "text/scriptlet", "text/x-scala", "text/calendar", "text/x-bibtex", "text/x-tcl", "text/x-c++", "text/x-shellscript", "text/x-msdos-batch", "text/x-makefile", "text/rtf", "text/x-objective-c", "text/troff", "text/x-m4", "text/x-lisp", "text/x-php", "text/x-gawk", "text/x-awk", "text/x-ruby", "text/x-po", "text/x-makefile", "application/javascript", "application/rtf", "application/json", ] def parse(self, full_path: str): info = super().parse(full_path) if self.content_length > 0: with open(full_path, "rb") as text_file: raw_content = text_file.read(self.content_length) chardet.detect(raw_content) encoding = chardet.detect(raw_content)["encoding"] if encoding is not None: info["encoding"] = encoding try: content = raw_content.decode(encoding, "ignore") info["content"] = content except Exception: print("Unknown encoding: " + encoding) return info class FontParser(GenericFileParser): is_default = False def __init__(self, checksum_calculators: list, root_dir): super().__init__(checksum_calculators, root_dir) self.mime_types = [ "application/font-sfnt", "application/font-woff", "application/vdn.ms-fontobject", "application/x-font-ttf" ] def parse(self, full_path: str): info = super().parse(full_path) with open(full_path, "rb") as f: with warnings.catch_warnings(): warnings.simplefilter("ignore") try: font = TTFont(f) if "name" in font: try: for name in font["name"].names: if name.nameID == 4: info["content"] = name.toUnicode("replace") break except AssertionError: print("Could not read font name for " + full_path) except TTLibError: print("Could not read font for " + full_path) return info class TikaFileParser(GenericFileParser): mime_types = [ "application/vnd.ms-excel", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/epub+zip", "application/pdf", "application/x-pdf", ] is_default = False def __init__(self, checksum_calculators: list, root_dir: str, content_len=4096): super().__init__(checksum_calculators, root_dir) self.content_len = content_len def parse(self, full_path: str) -> dict: """ Parse a generic file :param full_path: path of the file to parse :return: dict information about the file """ info = super().parse(full_path) if info["size"] == 0: return info tika_res = tika.from_file(full_path) if "metadata" not in tika_res: return info tika_meta = tika_res["metadata"] tika_content = tika_res["content"] if isinstance(tika_meta["Content-Type"], list): info["mime"] = tika_meta["Content-Type"][0] else: info["mime"] = tika_meta["Content-Type"] if tika_content: info["content"] = tika_content.lstrip()[:self.content_len] if "Content-Encoding" in tika_meta: info["encoding"] = tika_meta["Content-Encoding"] return info