mirror of
https://github.com/simon987/Simple-Incremental-Search-Tool.git
synced 2025-04-04 07:52:58 +00:00
521 lines
18 KiB
Python
521 lines
18 KiB
Python
import hashlib
|
|
import os
|
|
import mimetypes
|
|
import subprocess
|
|
import json
|
|
import chardet
|
|
import warnings
|
|
import docx2txt
|
|
import xlrd
|
|
from pdfminer.pdfparser import PDFParser, PDFSyntaxError
|
|
from pdfminer.pdfdocument import PDFDocument
|
|
from pdfminer.pdfpage import PDFPage
|
|
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
|
|
from pdfminer.layout import LAParams, LTTextBox, LTTextLine
|
|
from pdfminer.converter import PDFPageAggregator
|
|
import html2text
|
|
from ebooklib import epub
|
|
import ebooklib
|
|
from PIL import Image
|
|
from fontTools.ttLib import TTFont, TTLibError
|
|
import six
|
|
from six.moves import xrange
|
|
|
|
|
|
class MimeGuesser:
|
|
def guess_mime(self, full_path):
|
|
raise NotImplementedError()
|
|
|
|
|
|
class ContentMimeGuesser(MimeGuesser):
|
|
|
|
def __init__(self):
|
|
import magic
|
|
self.libmagic = magic.Magic(mime=True)
|
|
|
|
def guess_mime(self, full_path):
|
|
try:
|
|
return self.libmagic.from_file(full_path)
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
|
|
class ExtensionMimeGuesser(MimeGuesser):
|
|
def guess_mime(self, full_path):
|
|
return mimetypes.guess_type(full_path, strict=False)[0]
|
|
|
|
|
|
class FileParser:
|
|
mime_types = []
|
|
is_default = False
|
|
|
|
def parse(self, full_path: str):
|
|
raise NotImplemented
|
|
|
|
|
|
class FileCheckSumCalculator:
|
|
def checksum(self, path: str) -> str:
|
|
"""
|
|
Calculate the checksum of a file
|
|
:param path: path of the file
|
|
:return: checksum
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
|
|
class Md5CheckSumCalculator(FileCheckSumCalculator):
|
|
def __init__(self):
|
|
self.name = "md5"
|
|
|
|
def checksum(self, path: str) -> str:
|
|
"""
|
|
Calculate the md5 checksum of a file
|
|
:param path: path of the file
|
|
:return: md5 checksum
|
|
"""
|
|
result = hashlib.md5()
|
|
|
|
with open(path, "rb") as f:
|
|
for block in iter(lambda: f.read(65536), b""):
|
|
result.update(block)
|
|
|
|
return result.hexdigest().upper()
|
|
|
|
|
|
class Sha1CheckSumCalculator(FileCheckSumCalculator):
|
|
def __init__(self):
|
|
self.name = "sha1"
|
|
|
|
def checksum(self, path: str) -> str:
|
|
"""
|
|
Calculate the sha1 checksum of a file
|
|
:param path: path of the file
|
|
:return: sha1 checksum
|
|
"""
|
|
result = hashlib.sha1()
|
|
|
|
with open(path, "rb") as f:
|
|
for block in iter(lambda: f.read(65536), b""):
|
|
result.update(block)
|
|
|
|
return result.hexdigest().upper()
|
|
|
|
|
|
class Sha256CheckSumCalculator(FileCheckSumCalculator):
|
|
def __init__(self):
|
|
self.name = "sha256"
|
|
|
|
def checksum(self, path: str) -> str:
|
|
"""
|
|
Calculate the sha256 checksum of a file
|
|
:param path: path of the file
|
|
:return: sha256 checksum
|
|
"""
|
|
result = hashlib.sha256()
|
|
|
|
with open(path, "rb") as f:
|
|
for block in iter(lambda: f.read(65536), b""):
|
|
result.update(block)
|
|
|
|
return result.hexdigest().upper()
|
|
|
|
|
|
class GenericFileParser(FileParser):
|
|
mime_types = []
|
|
is_default = True
|
|
|
|
def __init__(self, checksum_calculators: list, root_dir: str):
|
|
self.checksum_calculators = checksum_calculators
|
|
self.root_dir = root_dir
|
|
self.root_dir_len = len(root_dir)+1
|
|
|
|
def parse(self, full_path: str) -> dict:
|
|
"""
|
|
Parse a generic file
|
|
:param full_path: path of the file to parse
|
|
:return: dict information about the file
|
|
"""
|
|
|
|
info = dict()
|
|
|
|
file_stat = os.stat(full_path)
|
|
path, name = os.path.split(full_path)
|
|
name, extension = os.path.splitext(name)
|
|
|
|
info["size"] = file_stat.st_size
|
|
info["path"] = os.path.relpath(path, self.root_dir)
|
|
info["name"] = name
|
|
info["extension"] = extension[1:]
|
|
info["mtime"] = file_stat.st_mtime
|
|
|
|
# TODO: calculate all checksums at once
|
|
for calculator in self.checksum_calculators:
|
|
info[calculator.name] = calculator.checksum(full_path)
|
|
|
|
return info
|
|
|
|
|
|
class MediaFileParser(GenericFileParser):
|
|
is_default = False
|
|
relevant_properties = ["bit_rate", "nb_streams", "duration", "format_name", "format_long_name"]
|
|
|
|
def __init__(self, checksum_calculators: list, root_dir):
|
|
super().__init__(checksum_calculators, root_dir)
|
|
|
|
self.mime_types = [
|
|
"video/3gpp", "video/mp4", "video/mpeg", "video/ogg", "video/quicktime",
|
|
"video/webm", "video/x-flv", "video/x-mng", "video/x-ms-asf",
|
|
"video/x-ms-wmv", "video/x-msvideo", "audio/basic", "auido/L24",
|
|
"audio/mid", "audio/mpeg", "audio/mp4", "audio/x-aiff",
|
|
"audio/ogg", "audio/vorbis" "audio/x-realaudio", "audio/x-wav",
|
|
"audio/flac", "audio/x-monkeys-audio", "audio/wav", "audio/wave",
|
|
"audio/x-wav", "audio/x-ms-wma", "audio/x-flac",
|
|
]
|
|
|
|
def parse(self, full_path: str):
|
|
info = super().parse(full_path)
|
|
|
|
p = subprocess.Popen(["ffprobe", "-v", "quiet", "-print_format", "json=c=1", "-show_format", full_path],
|
|
stdout=subprocess.PIPE)
|
|
out, err = p.communicate()
|
|
|
|
try:
|
|
metadata = json.loads(out.decode("utf-8"))
|
|
|
|
if "format" in metadata:
|
|
|
|
if "duration" in metadata["format"]:
|
|
info["duration"] = float(metadata["format"]["duration"])
|
|
|
|
if "format_long_name" in metadata["format"]:
|
|
info["format_long_name"] = metadata["format"]["format_long_name"]
|
|
|
|
if "tags" in metadata["format"]:
|
|
if "genre" in metadata["format"]["tags"]:
|
|
info["genre"] = metadata["format"]["tags"]["genre"]
|
|
if "title" in metadata["format"]["tags"]:
|
|
info["title"] = metadata["format"]["tags"]["title"]
|
|
if "album" in metadata["format"]["tags"]:
|
|
info["album"] = metadata["format"]["tags"]["album"]
|
|
if "album_artist" in metadata["format"]["tags"]:
|
|
info["album_artist"] = metadata["format"]["tags"]["album_artist"]
|
|
|
|
except json.decoder.JSONDecodeError:
|
|
print("json decode error:" + full_path)
|
|
pass
|
|
|
|
return info
|
|
|
|
|
|
class PictureFileParser(GenericFileParser):
|
|
is_default = False
|
|
|
|
def __init__(self, checksum_calculators: list, root_dir):
|
|
super().__init__(checksum_calculators, root_dir)
|
|
|
|
self.mime_types = [
|
|
"image/bmp", "image/cgm", "image/cis-cod", "image/g3fax", "image/gif",
|
|
"image/ief", "image/jpeg", "image/ktx", "image/pipeg", "image/pjpeg",
|
|
"image/png", "image/prs.btif", "image/svg+xml", "image/tiff",
|
|
"image/vnd.adobe.photoshop", "image/vnd.dece.graphic", "image/vnd.djvu",
|
|
"image/vnd.dvb.subtitle", "image/vnd.dwg", "image/vnd.dxf",
|
|
"image/vnd.fastbidsheet", "image/vnd.fpx", "image/vnd.fst",
|
|
"image/vnd.fujixerox.edmics-mmr", "image/vnd.fujixerox.edmics-rlc",
|
|
"image/vnd.ms-modi", "image/vnd.net-fpx", "image/vnd.wap.wbmp",
|
|
"image/vnd.xiff", "image/webp", "image/x-citrix-jpeg", "image/x-citrix-png",
|
|
"image/x-cmu-raster", "image/x-cmx", "image/x-icon",
|
|
"image/x-pcx", "image/x-pict", "image/x-png", "image/x-portable-bitmap",
|
|
"image/x-portable-graymap", "image/x-portable-pixmap",
|
|
"image/x-rgb", "image/x-xbitmap", "image/x-xpixmap", "image/x-xwindowdump"
|
|
]
|
|
|
|
def parse(self, full_path: str):
|
|
|
|
info = super().parse(full_path)
|
|
|
|
try:
|
|
with open(full_path, "rb") as image_file:
|
|
with Image.open(image_file) as image:
|
|
info["mode"] = image.mode
|
|
info["format_name"] = image.format
|
|
info["width"] = image.width
|
|
info["height"] = image.height
|
|
except (OSError, ValueError):
|
|
pass
|
|
|
|
return info
|
|
|
|
|
|
class TextFileParser(GenericFileParser):
|
|
is_default = False
|
|
|
|
def __init__(self, checksum_calculators: list, content_length: int, root_dir):
|
|
super().__init__(checksum_calculators, root_dir)
|
|
self.content_length = content_length
|
|
|
|
self.mime_types = [
|
|
"text/asp", "text/css", "text/ecmascript", "text/html", "text/javascript",
|
|
"text/mcf", "text/pascal", "text/plain", "text/richtext", "text/scriplet",
|
|
"text/sgml", "text/tab-separated-values", "text/uri-list", "text/vnd.abc",
|
|
"text/vnd.fmi.flexstor", "text/vnd.rn-realtext", "text/vnd.wap.wml",
|
|
"text/vnd.wap.wmlscript", "text/webviewhtml", "text/x-asm", "text/x-audiosoft-intra",
|
|
"text/x-c", "text/x-component", "text/x-fortran", "text/x-h", "text/x-java-source",
|
|
"text/x-la-asf", "text/x-m", "text/x-pascal", "text/x-script",
|
|
"text/x-script.csh", "text/x-script.elisp", "text/x-script.guile",
|
|
"text/x-script.ksh", "text/x-script.lisp", "text/x-script.perl",
|
|
"text/x-script.perl-module", "text/x-script.phyton", "text/x-script.rexx",
|
|
"text/x-script.scheme", "text/x-script.sh", "text/x-script.tcl",
|
|
"text/x-script.tcsh", "text/x-script.zsh", "text/x-server-parsed-html",
|
|
"text/x-setext", "text/x-sgml", "text/x-speech", "text/x-uil",
|
|
"text/x-uuencode", "text/x-vcalendar", "text/xml", "text/x-csrc", "text/csv",
|
|
"text/x-c++src", "text/x-chdr", "text/markdown", "text/x-sh", "text/x-java",
|
|
"text/x-python", "text/x-c++hdr", "text/x-tex", "text/x-diff", "text/x-haskell",
|
|
"text/x-perl", "text/x-dsrc", "text/scriptlet", "text/x-scala", "text/calendar",
|
|
"text/x-bibtex", "text/x-tcl", "text/x-c++", "text/x-shellscript", "text/x-msdos-batch",
|
|
"text/x-makefile", "text/rtf", "text/x-objective-c", "text/troff", "text/x-m4",
|
|
"text/x-lisp", "text/x-php", "text/x-gawk", "text/x-awk", "text/x-ruby", "text/x-po",
|
|
"text/x-makefile", "application/javascript", "application/rtf", "application/json",
|
|
]
|
|
|
|
def parse(self, full_path: str):
|
|
info = super().parse(full_path)
|
|
|
|
if self.content_length > 0:
|
|
with open(full_path, "rb") as text_file:
|
|
raw_content = text_file.read(self.content_length)
|
|
|
|
chardet.detect(raw_content)
|
|
encoding = chardet.detect(raw_content)["encoding"]
|
|
|
|
if encoding is not None:
|
|
info["encoding"] = encoding
|
|
try:
|
|
content = raw_content.decode(encoding, "ignore")
|
|
info["content"] = content
|
|
except Exception:
|
|
print("Unknown encoding: " + encoding)
|
|
|
|
return info
|
|
|
|
|
|
class FontParser(GenericFileParser):
|
|
is_default = False
|
|
|
|
def __init__(self, checksum_calculators: list, root_dir):
|
|
super().__init__(checksum_calculators, root_dir)
|
|
|
|
self.mime_types = [
|
|
"application/font-sfnt", "application/font-woff", "application/vdn.ms-fontobject",
|
|
"application/x-font-ttf"
|
|
]
|
|
|
|
def parse(self, full_path: str):
|
|
|
|
info = super().parse(full_path)
|
|
|
|
with open(full_path, "rb") as f:
|
|
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("ignore")
|
|
|
|
try:
|
|
font = TTFont(f)
|
|
|
|
if "name" in font:
|
|
try:
|
|
for name in font["name"].names:
|
|
if name.nameID == 4:
|
|
info["content"] = name.toUnicode("replace")
|
|
break
|
|
except AssertionError:
|
|
print("Could not read font name for " + full_path)
|
|
except TTLibError:
|
|
print("Could not read font for " + full_path)
|
|
|
|
return info
|
|
|
|
|
|
class PdfFileParser(GenericFileParser):
|
|
is_default = False
|
|
|
|
def __init__(self, checksum_calculators: list, content_length: int, root_dir):
|
|
super().__init__(checksum_calculators, root_dir)
|
|
|
|
self.content_length = content_length
|
|
|
|
self.mime_types = [
|
|
"application/pdf", "application/x-pdf"
|
|
]
|
|
|
|
def parse(self, full_path: str):
|
|
info = super().parse(full_path)
|
|
|
|
if self.content_length > 0:
|
|
with open(full_path, "rb") as f:
|
|
|
|
try:
|
|
parser = PDFParser(f)
|
|
document = PDFDocument(parser)
|
|
except PDFSyntaxError:
|
|
print("couldn't parse PDF " + full_path)
|
|
return info
|
|
|
|
info["content"] = ""
|
|
if len(document.info) > 0 and "Title" in document.info[0] and document.info[0]["Title"] != b"":
|
|
if isinstance(document.info[0]["Title"], bytes):
|
|
info["content"] += document.info[0]["Title"].decode("utf-8", "replace") + "\n"
|
|
else:
|
|
info["content"] += document.info[0]["Title"].resolve().decode("utf-8", "replace") + "\n"
|
|
|
|
try:
|
|
if document.is_extractable:
|
|
resource_manager = PDFResourceManager()
|
|
la_params = LAParams()
|
|
|
|
device = PDFPageAggregator(resource_manager, laparams=la_params)
|
|
interpreter = PDFPageInterpreter(resource_manager, device)
|
|
|
|
for page in PDFPage.create_pages(document):
|
|
|
|
interpreter.process_page(page)
|
|
layout = device.get_result()
|
|
|
|
for lt_obj in layout:
|
|
if isinstance(lt_obj, LTTextBox) or isinstance(lt_obj, LTTextLine):
|
|
|
|
text = lt_obj.get_text()
|
|
|
|
if len(info["content"]) + len(text) <= self.content_length:
|
|
info["content"] += text
|
|
else:
|
|
info["content"] += text[0:self.content_length - len(info["content"])]
|
|
break
|
|
else:
|
|
continue
|
|
break
|
|
else:
|
|
print("PDF is not extractable: " + full_path)
|
|
except ValueError:
|
|
print("Couldn't parse page for " + full_path)
|
|
|
|
return info
|
|
|
|
|
|
class EbookParser(GenericFileParser):
|
|
is_default = False
|
|
|
|
def __init__(self, checksum_calculators: list, content_length: int, root_dir):
|
|
super().__init__(checksum_calculators, root_dir)
|
|
|
|
self.content_length = content_length
|
|
|
|
self.mime_types = [
|
|
"application/epub+zip"
|
|
]
|
|
|
|
self.html2text = html2text.HTML2Text()
|
|
self.html2text.ignore_images = True
|
|
self.html2text.ignore_emphasis = True
|
|
|
|
def parse(self, full_path: str):
|
|
info = super().parse(full_path)
|
|
|
|
book = epub.read_epub(full_path)
|
|
|
|
info["content"] = ""
|
|
|
|
for text in book.get_items_of_type(ebooklib.ITEM_DOCUMENT):
|
|
|
|
text = self.html2text.handle(text.content.decode("utf-8"))
|
|
|
|
if len(info["content"]) + len(text) <= self.content_length:
|
|
info["content"] += text
|
|
else:
|
|
info["content"] += text[0:self.content_length - len(info["content"])]
|
|
break
|
|
|
|
return info
|
|
|
|
|
|
class DocxParser(GenericFileParser):
|
|
is_default = False
|
|
|
|
def __init__(self, checksum_calculators: list, content_length: int, root_dir):
|
|
super().__init__(checksum_calculators, root_dir)
|
|
|
|
self.content_length = content_length
|
|
|
|
self.mime_types = [
|
|
"application/vnd.openxmlformats-officedocument.wordprocessingml.document"
|
|
]
|
|
|
|
def parse(self, full_path: str):
|
|
info = super().parse(full_path)
|
|
|
|
if self.content_length > 0:
|
|
try:
|
|
text = docx2txt.process(full_path)
|
|
|
|
if len(text) < self.content_length:
|
|
info["content"] = text
|
|
else:
|
|
info["content"] = text[0:self.content_length]
|
|
except:
|
|
print("Couldn't parse Ebook: " + full_path)
|
|
|
|
return info
|
|
|
|
|
|
class SpreadSheetParser(GenericFileParser):
|
|
is_default = False
|
|
|
|
def __init__(self, checksum_calculators: list, content_length: int, root_dir):
|
|
super().__init__(checksum_calculators, root_dir)
|
|
|
|
self.content_length = content_length
|
|
|
|
self.mime_types = [
|
|
"application/vnd.ms-excel", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
|
]
|
|
|
|
def parse(self, full_path: str):
|
|
info = super().parse(full_path)
|
|
|
|
# The MIT License (MIT)
|
|
# Copyright (c) 2014 Dean Malmgren
|
|
# https://github.com/deanmalmgren/textract/blob/master/textract/parsers/xlsx_parser.py
|
|
|
|
try:
|
|
workbook = xlrd.open_workbook(full_path)
|
|
|
|
sheets_name = workbook.sheet_names()
|
|
info["content"] = ""
|
|
|
|
for names in sheets_name:
|
|
worksheet = workbook.sheet_by_name(names)
|
|
num_rows = worksheet.nrows
|
|
num_cells = worksheet.ncols
|
|
|
|
for curr_row in range(num_rows):
|
|
new_output = []
|
|
for index_col in xrange(num_cells):
|
|
value = worksheet.cell_value(curr_row, index_col)
|
|
if value:
|
|
if isinstance(value, (int, float)):
|
|
value = six.text_type(value)
|
|
new_output.append(value)
|
|
|
|
if new_output:
|
|
text = u' '.join(new_output) + u'\n'
|
|
if len(info["content"]) + len(text) <= self.content_length:
|
|
info["content"] += text
|
|
else:
|
|
info["content"] += text[0:self.content_length - len(info["content"])]
|
|
break
|
|
|
|
return info
|
|
|
|
except xlrd.biffh.XLRDError:
|
|
print("Couldn't parse spreadsheet: " + full_path)
|
|
|