import os import time from urllib.parse import urljoin import elasticsearch import ujson from apscheduler.schedulers.background import BackgroundScheduler from elasticsearch import helpers from search import logger from search.filter import SearchFilter class InvalidQueryException(Exception): pass class IndexingError(Exception): pass class ElasticSearchEngine: SORT_ORDERS = { "score": ["_score"], "size_asc": [{"size": {"order": "asc"}}], "size_dsc": [{"size": {"order": "desc"}}], "date_asc": [{"mtime": {"order": "asc"}}], "date_desc": [{"mtime": {"order": "desc"}}], "none": [] } def __init__(self, url, index_name): super().__init__() self.index_name = index_name logger.info("Connecting to ES @ %s" % url) self.es = elasticsearch.Elasticsearch(hosts=[url]) self.filter = SearchFilter() if not self.es.indices.exists(self.index_name): self.init() def start_stats_scheduler(self): scheduler = BackgroundScheduler() scheduler.add_job(self._generate_global_stats, "interval", seconds=60 * 120) scheduler.start() def init(self): logger.info("Elasticsearch first time setup") if self.es.indices.exists(self.index_name): self.es.indices.delete(index=self.index_name) self.es.indices.create(index=self.index_name, body={ "settings": { "index": { "number_of_shards": 50, "number_of_replicas": 0, "refresh_interval": "30s", "codec": "best_compression" }, "analysis": { "analyzer": { "my_nGram": { "tokenizer": "my_nGram_tokenizer", "filter": ["lowercase", "asciifolding"] } }, "tokenizer": { "my_nGram_tokenizer": { "type": "nGram", "min_gram": 3, "max_gram": 3 } } } } }) # Index Mappings self.es.indices.put_mapping(body={ "properties": { "path": {"analyzer": "standard", "type": "text"}, "name": {"analyzer": "standard", "type": "text", "fields": {"nGram": {"type": "text", "analyzer": "my_nGram"}}}, "mtime": {"type": "date", "format": "epoch_second"}, "size": {"type": "long"}, "website_id": {"type": "integer"}, "ext": {"type": "keyword"}, }, "_routing": {"required": True} }, doc_type="file", index=self.index_name, include_type_name=True) self.es.indices.open(index=self.index_name) def delete_docs(self, website_id): while True: try: logger.debug("Deleting docs of " + str(website_id)) to_delete = helpers.scan(query={ "query": { "term": { "website_id": website_id } } }, scroll="1m", client=self.es, index=self.index_name, request_timeout=120, routing=website_id) buf = [] counter = 0 for doc in to_delete: buf.append(doc) counter += 1 if counter >= 10000: self._delete(buf, website_id) buf.clear() counter = 0 if counter > 0: self._delete(buf, website_id) break except Exception as e: logger.error("During delete: " + str(e)) time.sleep(10) logger.debug("Done deleting for " + str(website_id)) def _delete(self, docs, website_id): bulk_string = self.create_bulk_delete_string(docs) result = self.es.bulk(body=bulk_string, index=self.index_name, doc_type="file", request_timeout=30, routing=website_id) if result["errors"]: logger.error("Error in ES bulk delete: \n" + result["errors"]) raise IndexingError def import_json(self, in_lines, website_id: int): import_every = 10000 cooldown_time = 0 docs = [] for line in in_lines: try: doc = ujson.loads(line) name, ext = os.path.splitext(doc["name"]) doc["ext"] = ext[1:].lower() if ext and len(ext) > 1 else "" doc["name"] = name doc["website_id"] = website_id docs.append(doc) except Exception as e: logger.error("Error in import_json: " + str(e) + " for line : + \n" + line) if len(docs) >= import_every: self._index(docs) docs.clear() time.sleep(cooldown_time) if docs: self._index(docs) def _index(self, docs): while True: try: logger.debug("Indexing " + str(len(docs)) + " docs") bulk_string = ElasticSearchEngine.create_bulk_index_string(docs) self.es.bulk(body=bulk_string, index=self.index_name, doc_type="file", request_timeout=30, routing=docs[0]["website_id"]) break except Exception as e: logger.error("Error in _index: " + str(e) + ", retrying") time.sleep(10) @staticmethod def create_bulk_index_string(docs: list): action_string = '{"index":{}}\n' return "\n".join("".join([action_string, ujson.dumps(doc)]) for doc in docs) @staticmethod def create_bulk_delete_string(docs: list): return "\n".join("".join(["{\"delete\":{\"_id\":\"", doc["_id"], "\"}}"]) for doc in docs) def search(self, query, page, per_page, sort_order, extensions, size_min, size_max, match_all, fields, date_min, date_max) -> {}: if self.filter.should_block(query): logger.info("Search was blocked") raise InvalidQueryException("One or more terms in your query is blocked by the search filter. " "This incident has been reported.") filters = [] if extensions: filters.append({"terms": {"ext": extensions}}) if size_min > 0 or size_max: size_filer = dict() new_filter = {"range": {"size": size_filer}} if size_min > 0: size_filer["gte"] = size_min if size_max: size_filer["lte"] = size_max filters.append(new_filter) if date_min > 0 or date_max: date_filer = dict() new_filter = {"range": {"mtime": date_filer}} if date_min > 0: date_filer["gte"] = date_min if date_max: date_filer["lte"] = date_max filters.append(new_filter) sort_by = ElasticSearchEngine.SORT_ORDERS.get(sort_order, []) page = self.es.search(body={ "query": { "bool": { "must": { "multi_match": { "query": query, "fields": fields, "operator": "or" if match_all else "and" } }, "filter": filters } }, "sort": sort_by, "highlight": { "fields": { "name": {"pre_tags": [""], "post_tags": [""]}, "name.nGram": {"pre_tags": [""], "post_tags": [""]}, "path": {"pre_tags": [""], "post_tags": [""]} } }, "size": per_page, "from": min(page * per_page, 10000 - per_page)}, index=self.index_name, request_timeout=20) return page def get_stats(self, website_id: int, subdir: str = None): result = self.es.search(body={ "query": { "constant_score": { "filter": { "term": {"website_id": website_id} } } }, "aggs": { "ext_group": { "terms": { "field": "ext", "size": 12 }, "aggs": { "size": { "sum": { "field": "size" } } } }, "total_size": { "sum_bucket": { "buckets_path": "ext_group>size" } } }, "size": 0 }, index=self.index_name, request_timeout=30, routing=website_id) stats = dict() stats["total_size"] = result["aggregations"]["total_size"]["value"] stats["total_count"] = result["hits"]["total"] stats["ext_stats"] = [(b["size"]["value"], b["doc_count"], b["key"]) for b in result["aggregations"]["ext_group"]["buckets"]] return stats def get_link_list(self, website_id, base_url): hits = helpers.scan(client=self.es, query={ "_source": { "includes": ["path", "name", "ext"] }, "query": { "constant_score": { "filter": { "term": {"website_id": website_id} } } }, }, index=self.index_name, request_timeout=20, routing=website_id) for hit in hits: src = hit["_source"] yield urljoin(base_url, "/") + src["path"] + ("/" if src["path"] != "" else "") + src["name"] + \ ("." if src["ext"] != "" else "") + src["ext"] @staticmethod def get_global_stats(): if os.path.exists("_stats.json"): with open("_stats.json", "r") as f: return ujson.load(f) else: return None def _generate_global_stats(self): size_per_ext = self.es.search(body={ "query": { "bool": { "filter": [ {"range": { "size": {"gte": 0, "lte": (1000000000000 - 1)} # 0-1TB }} ] } }, "aggs": { "ext_group": { "terms": { "field": "ext", "size": 40 }, "aggs": { "size": { "sum": { "field": "size" } } } } }, "size": 0 }, index=self.index_name, request_timeout=240) total_stats = self.es.search(body={ "query": { "bool": { "filter": [ {"range": { "size": {"gte": 0, "lte": (1000000000000 - 1)} # 0-1TB }} ] } }, "aggs": { "file_stats": { "extended_stats": { "field": "size", "sigma": 1 } } }, "size": 0 }, index=self.index_name, request_timeout=241) size_and_date_histogram = self.es.search(body={ "query": { "bool": { "filter": [ {"range": { "size": {"gte": 0, "lte": (1000000000000 - 1)} # 0-1TB }}, {"range": { "mtime": { "gt": 0 # 1970-01-01 } }} ] } }, "aggs": { "sizes": { "histogram": { "field": "size", "interval": 100000000, # 100Mb "min_doc_count": 500 } }, "dates": { "date_histogram": { "field": "mtime", "interval": "1y", "min_doc_count": 500, "format": "yyyy" } } }, "size": 0 }, index=self.index_name, request_timeout=242) website_scatter = self.es.search(body={ "query": { "bool": { "filter": [ {"range": { "size": {"gte": 0, "lte": (1000000000000 - 1)} # 0-1TB }} ] } }, "aggs": { "websites": { "terms": { "field": "website_id", "size": 600 # TODO: Figure out what size is appropriate }, "aggs": { "size": { "sum": { "field": "size" } } } } }, "size": 0 }, index=self.index_name, request_timeout=243) es_stats = self.es.indices.stats(self.index_name, request_timeout=244) stats = dict() stats["es_index_size"] = es_stats["indices"][self.index_name]["total"]["store"]["size_in_bytes"] stats["es_search_count"] = es_stats["indices"][self.index_name]["total"]["search"]["query_total"] stats["es_search_time"] = es_stats["indices"][self.index_name]["total"]["search"]["query_time_in_millis"] stats["es_search_time_avg"] = stats["es_search_time"] / ( stats["es_search_count"] if stats["es_search_count"] != 0 else 1) stats["total_count"] = total_stats["aggregations"]["file_stats"]["count"] stats["total_size"] = total_stats["aggregations"]["file_stats"]["sum"] stats["size_avg"] = total_stats["aggregations"]["file_stats"]["avg"] stats["size_std_deviation"] = total_stats["aggregations"]["file_stats"]["std_deviation"] stats["size_std_deviation_bounds"] = total_stats["aggregations"]["file_stats"]["std_deviation_bounds"] stats["size_variance"] = total_stats["aggregations"]["file_stats"]["variance"] stats["ext_stats"] = [(b["size"]["value"], b["doc_count"], b["key"]) for b in size_per_ext["aggregations"]["ext_group"]["buckets"]] stats["sizes_histogram"] = [(b["key"], b["doc_count"]) for b in size_and_date_histogram["aggregations"]["sizes"]["buckets"]] stats["dates_histogram"] = [(b["key_as_string"], b["doc_count"]) for b in size_and_date_histogram["aggregations"]["dates"]["buckets"]] stats["website_scatter"] = [[b["key"], b["doc_count"], b["size"]["value"]] for b in website_scatter["aggregations"]["websites"]["buckets"]] stats["base_url"] = "entire database" with open("_stats.json", "w") as f: ujson.dump(stats, f) def stream_all_docs(self): return helpers.scan(query={ "query": { "match_all": {} } }, scroll="30s", client=self.es, index=self.index_name, request_timeout=30) def refresh(self): self.es.indices.refresh(self.index_name)