od-database/search/search.py
2019-03-24 20:23:05 -04:00

494 lines
17 KiB
Python

import os
import time
import ujson
import elasticsearch
from apscheduler.schedulers.background import BackgroundScheduler
from elasticsearch import helpers
from search import logger
from search.filter import SearchFilter
class InvalidQueryException(Exception):
pass
class IndexingError(Exception):
pass
class SearchEngine:
def __init__(self):
pass
def import_json(self, in_str: str, website_id: int):
raise NotImplementedError
def search(self, query, page, per_page, sort_order, extension, size_min, size_max, match_all, fields, date_min,
date_max) -> {}:
raise NotImplementedError
def reset(self):
raise NotImplementedError
def ping(self):
raise NotImplementedError
def get_stats(self, website_id: int, subdir: str = None):
raise NotImplementedError
def refresh(self):
raise NotImplementedError
class ElasticSearchEngine(SearchEngine):
SORT_ORDERS = {
"score": ["_score"],
"size_asc": [{"size": {"order": "asc"}}],
"size_dsc": [{"size": {"order": "desc"}}],
"date_asc": [{"mtime": {"order": "asc"}}],
"date_desc": [{"mtime": {"order": "desc"}}],
"none": []
}
def __init__(self, index_name):
super().__init__()
self.index_name = index_name
self.es = elasticsearch.Elasticsearch()
self.filter = SearchFilter()
if not self.es.indices.exists(self.index_name):
self.init()
def start_stats_scheduler(self):
scheduler = BackgroundScheduler()
scheduler.add_job(self._generate_global_stats, "interval", seconds=60 * 120)
scheduler.start()
def init(self):
logger.info("Elasticsearch first time setup")
if self.es.indices.exists(self.index_name):
self.es.indices.delete(index=self.index_name)
self.es.indices.create(index=self.index_name)
self.es.indices.close(index=self.index_name)
# Index settings
self.es.indices.put_settings(body={
"analysis": {
"tokenizer": {
"my_nGram_tokenizer": {
"type": "nGram", "min_gram": 3, "max_gram": 3
}
}
}}, index=self.index_name, request_timeout=60)
self.es.indices.put_settings(body={
"analysis": {
"analyzer": {
"my_nGram": {
"tokenizer": "my_nGram_tokenizer",
"filter": ["lowercase", "asciifolding"]
}
}
}}, index=self.index_name)
self.es.indices.put_mapping(body={
"properties": {
"path": {"analyzer": "standard", "type": "text"},
"name": {"analyzer": "standard", "type": "text",
"fields": {"nGram": {"type": "text", "analyzer": "my_nGram"}}},
"mtime": {"type": "date", "format": "epoch_second"},
"size": {"type": "long"},
"website_id": {"type": "integer"},
"ext": {"type": "keyword"},
},
"_routing": {"required": True}
}, doc_type="file", index=self.index_name, request_timeout=60)
self.es.indices.open(index=self.index_name)
def reset(self):
self.init()
def ping(self):
return self.es.ping()
def delete_docs(self, website_id):
while True:
try:
logger.debug("Deleting docs of " + str(website_id))
to_delete = helpers.scan(query={
"query": {
"term": {
"website_id": website_id
}
}
}, scroll="1m", client=self.es, index=self.index_name, request_timeout=120, routing=website_id)
buf = []
counter = 0
for doc in to_delete:
buf.append(doc)
counter += 1
if counter >= 10000:
self._delete(buf, website_id)
buf.clear()
counter = 0
if counter > 0:
self._delete(buf, website_id)
break
except Exception as e:
logger.error("During delete: " + str(e))
time.sleep(10)
logger.debug("Done deleting for " + str(website_id))
def _delete(self, docs, website_id):
bulk_string = self.create_bulk_delete_string(docs)
result = self.es.bulk(body=bulk_string, index=self.index_name, doc_type="file", request_timeout=30,
routing=website_id)
if result["errors"]:
logger.error("Error in ES bulk delete: \n" + result["errors"])
raise IndexingError
def import_json(self, in_lines, website_id: int):
import_every = 10000
cooldown_time = 0
docs = []
for line in in_lines:
try:
doc = ujson.loads(line)
name, ext = os.path.splitext(doc["name"])
doc["ext"] = ext[1:].lower() if ext and len(ext) > 1 else ""
doc["name"] = name
doc["website_id"] = website_id
docs.append(doc)
except Exception as e:
logger.error("Error in import_json: " + str(e) + " for line : + \n" + line)
if len(docs) >= import_every:
self._index(docs)
docs.clear()
time.sleep(cooldown_time)
if docs:
self._index(docs)
def _index(self, docs):
while True:
try:
logger.debug("Indexing " + str(len(docs)) + " docs")
bulk_string = ElasticSearchEngine.create_bulk_index_string(docs)
self.es.bulk(body=bulk_string, index=self.index_name, doc_type="file", request_timeout=30,
routing=docs[0]["website_id"])
break
except Exception as e:
logger.error("Error in _index: " + str(e) + ", retrying")
time.sleep(10)
@staticmethod
def create_bulk_index_string(docs: list):
action_string = '{"index":{}}\n'
return "\n".join("".join([action_string, ujson.dumps(doc)]) for doc in docs)
@staticmethod
def create_bulk_delete_string(docs: list):
return "\n".join("".join(["{\"delete\":{\"_id\":\"", doc["_id"], "\"}}"]) for doc in docs)
def search(self, query, page, per_page, sort_order, extensions, size_min, size_max, match_all, fields, date_min,
date_max) -> {}:
if self.filter.should_block(query):
logger.info("Search was blocked")
raise InvalidQueryException("One or more terms in your query is blocked by the search filter. "
"This incident has been reported.")
filters = []
if extensions:
filters.append({"terms": {"ext": extensions}})
if size_min > 0 or size_max:
size_filer = dict()
new_filter = {"range": {"size": size_filer}}
if size_min > 0:
size_filer["gte"] = size_min
if size_max:
size_filer["lte"] = size_max
filters.append(new_filter)
if date_min > 0 or date_max:
date_filer = dict()
new_filter = {"range": {"mtime": date_filer}}
if date_min > 0:
date_filer["gte"] = date_min
if date_max:
date_filer["lte"] = date_max
filters.append(new_filter)
sort_by = ElasticSearchEngine.SORT_ORDERS.get(sort_order, [])
page = self.es.search(body={
"query": {
"bool": {
"must": {
"multi_match": {
"query": query,
"fields": fields,
"operator": "or" if match_all else "and"
}
},
"filter": filters
}
},
"sort": sort_by,
"highlight": {
"fields": {
"name": {"pre_tags": ["<mark>"], "post_tags": ["</mark>"]},
"name.nGram": {"pre_tags": ["<mark>"], "post_tags": ["</mark>"]},
"path": {"pre_tags": ["<mark>"], "post_tags": ["</mark>"]}
}
},
"size": per_page, "from": min(page * per_page, 10000 - per_page)},
index=self.index_name, request_timeout=20)
return page
def get_stats(self, website_id: int, subdir: str = None):
result = self.es.search(body={
"query": {
"constant_score": {
"filter": {
"term": {"website_id": website_id}
}
}
},
"aggs": {
"ext_group": {
"terms": {
"field": "ext",
"size": 12
},
"aggs": {
"size": {
"sum": {
"field": "size"
}
}
}
},
"total_size": {
"sum_bucket": {
"buckets_path": "ext_group>size"
}
}
},
"size": 0
}, index=self.index_name, request_timeout=30, routing=website_id)
stats = dict()
stats["total_size"] = result["aggregations"]["total_size"]["value"]
stats["total_count"] = result["hits"]["total"]
stats["ext_stats"] = [(b["size"]["value"], b["doc_count"], b["key"])
for b in result["aggregations"]["ext_group"]["buckets"]]
return stats
def get_link_list(self, website_id, base_url):
hits = helpers.scan(client=self.es,
query={
"_source": {
"includes": ["path", "name", "ext"]
},
"query": {
"constant_score": {
"filter": {
"term": {"website_id": website_id}
}
}
},
},
index=self.index_name, request_timeout=20, routing=website_id)
for hit in hits:
src = hit["_source"]
yield base_url + src["path"] + ("/" if src["path"] != "" else "") + src["name"] + \
("." if src["ext"] != "" else "") + src["ext"]
def get_global_stats(self):
if os.path.exists("_stats.json"):
with open("_stats.json", "r") as f:
return ujson.load(f)
else:
return None
def _generate_global_stats(self):
size_per_ext = self.es.search(body={
"query": {
"bool": {
"filter": [
{"range": {
"size": {"gte": 0, "lte": (1000000000000 - 1)} # 0-1TB
}}
]
}
},
"aggs": {
"ext_group": {
"terms": {
"field": "ext",
"size": 40
},
"aggs": {
"size": {
"sum": {
"field": "size"
}
}
}
}
},
"size": 0
}, index=self.index_name, request_timeout=240)
total_stats = self.es.search(body={
"query": {
"bool": {
"filter": [
{"range": {
"size": {"gte": 0, "lte": (1000000000000 - 1)} # 0-1TB
}}
]
}
},
"aggs": {
"file_stats": {
"extended_stats": {
"field": "size",
"sigma": 1
}
}
},
"size": 0
}, index=self.index_name, request_timeout=241)
size_and_date_histogram = self.es.search(body={
"query": {
"bool": {
"filter": [
{"range": {
"size": {"gte": 0, "lte": (1000000000000 - 1)} # 0-1TB
}},
{"range": {
"mtime": {
"gt": 0 # 1970-01-01
}
}}
]
}
},
"aggs": {
"sizes": {
"histogram": {
"field": "size",
"interval": 100000000, # 100Mb
"min_doc_count": 500
}
},
"dates": {
"date_histogram": {
"field": "mtime",
"interval": "1y",
"min_doc_count": 500,
"format": "yyyy"
}
}
},
"size": 0
}, index=self.index_name, request_timeout=242)
website_scatter = self.es.search(body={
"query": {
"bool": {
"filter": [
{"range": {
"size": {"gte": 0, "lte": (1000000000000 - 1)} # 0-1TB
}}
]
}
},
"aggs": {
"websites": {
"terms": {
"field": "website_id",
"size": 600 # TODO: Figure out what size is appropriate
},
"aggs": {
"size": {
"sum": {
"field": "size"
}
}
}
}
},
"size": 0
}, index=self.index_name, request_timeout=243)
es_stats = self.es.indices.stats(self.index_name, request_timeout=244)
stats = dict()
stats["es_index_size"] = es_stats["indices"][self.index_name]["total"]["store"]["size_in_bytes"]
stats["es_search_count"] = es_stats["indices"][self.index_name]["total"]["search"]["query_total"]
stats["es_search_time"] = es_stats["indices"][self.index_name]["total"]["search"]["query_time_in_millis"]
stats["es_search_time_avg"] = stats["es_search_time"] / (
stats["es_search_count"] if stats["es_search_count"] != 0 else 1)
stats["total_count"] = total_stats["hits"]["total"]
stats["total_size"] = total_stats["aggregations"]["file_stats"]["sum"]
stats["size_avg"] = total_stats["aggregations"]["file_stats"]["avg"]
stats["size_std_deviation"] = total_stats["aggregations"]["file_stats"]["std_deviation"]
stats["size_std_deviation_bounds"] = total_stats["aggregations"]["file_stats"]["std_deviation_bounds"]
stats["size_variance"] = total_stats["aggregations"]["file_stats"]["variance"]
stats["ext_stats"] = [(b["size"]["value"], b["doc_count"], b["key"])
for b in size_per_ext["aggregations"]["ext_group"]["buckets"]]
stats["sizes_histogram"] = [(b["key"], b["doc_count"])
for b in size_and_date_histogram["aggregations"]["sizes"]["buckets"]]
stats["dates_histogram"] = [(b["key_as_string"], b["doc_count"])
for b in size_and_date_histogram["aggregations"]["dates"]["buckets"]]
stats["website_scatter"] = [[b["key"], b["doc_count"], b["size"]["value"]]
for b in website_scatter["aggregations"]["websites"]["buckets"]]
stats["base_url"] = "entire database"
with open("_stats.json", "w") as f:
ujson.dump(stats, f)
def stream_all_docs(self):
return helpers.scan(query={
"query": {
"match_all": {}
}
}, scroll="1m", client=self.es, index=self.index_name, request_timeout=60)
def refresh(self):
self.es.indices.refresh(self.index_name)