diff --git a/search/search.py b/search/search.py index 6d1da8e..e487cd7 100644 --- a/search/search.py +++ b/search/search.py @@ -1,3 +1,5 @@ +import itertools + import elasticsearch import time from elasticsearch import helpers @@ -110,27 +112,46 @@ class ElasticSearchEngine(SearchEngine): def ping(self): return self.es.ping() + @staticmethod + def batch(iterable, n=1): + l = len(iterable) + for ndx in range(0, l, n): + yield iterable[ndx:min(ndx + n, l)] + def delete_docs(self, website_id): - while True: - try: - logger.debug("Deleting docs of " + str(website_id)) - self.es.delete_by_query(body={ - "query": { - "constant_score": { - "filter": { - "term": {"website_id": website_id} - } - } - } - }, index=self.index_name, request_timeout=200) - break - except elasticsearch.exceptions.ConflictError: - logger.warning("Error: multiple delete tasks at the same time, retrying in 20s") - time.sleep(20) - except Exception: - logger.warning("Timeout during delete! Retrying in 20s") - time.sleep(20) + try: + logger.debug("Deleting docs of " + str(website_id)) + + to_delete = helpers.scan(query={ + "query": { + "term": {"website_id": website_id} + } + }, scroll="1m", client=self.es, index=self.index_name, request_timeout=60) + + buf = [] + counter = 0 + for doc in to_delete: + buf.append(doc) + counter += 1 + + if counter >= 400: + self._delete(buf) + buf.clear() + counter = 0 + if counter > 0: + self._delete(buf) + + except Exception as e: + logger.error(str(e)) + + def _delete(self, docs): + bulk_string = self.create_bulk_delete_string(docs) + result = self.es.bulk(body=bulk_string, index=self.index_name, doc_type="file", request_timeout=30) + + if result["errors"]: + logger.error("Error in ES bulk delete: \n" + result["errors"]) + raise IndexingError def import_json(self, in_lines, website_id: int): @@ -173,6 +194,11 @@ class ElasticSearchEngine(SearchEngine): action_string = '{"index":{}}\n' return "\n".join("".join([action_string, ujson.dumps(doc)]) for doc in docs) + @staticmethod + def create_bulk_delete_string(docs: list): + + return "\n".join("".join(["{\"delete\":{\"_id\":\"", doc["_id"], "\"}}"]) for doc in docs) + def search(self, query, page, per_page, sort_order, extensions, size_min, size_max, match_all, fields, date_min, date_max) -> {}: