Replace delete_by_query with bulk delete

This commit is contained in:
Simon 2018-11-18 11:07:18 -05:00
parent 876a511b54
commit 2f6ae3cb35

View File

@ -1,3 +1,5 @@
import itertools
import elasticsearch import elasticsearch
import time import time
from elasticsearch import helpers from elasticsearch import helpers
@ -110,27 +112,46 @@ class ElasticSearchEngine(SearchEngine):
def ping(self): def ping(self):
return self.es.ping() return self.es.ping()
@staticmethod
def batch(iterable, n=1):
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]
def delete_docs(self, website_id): def delete_docs(self, website_id):
while True: try:
try: logger.debug("Deleting docs of " + str(website_id))
logger.debug("Deleting docs of " + str(website_id))
self.es.delete_by_query(body={ to_delete = helpers.scan(query={
"query": { "query": {
"constant_score": { "term": {"website_id": website_id}
"filter": { }
"term": {"website_id": website_id} }, scroll="1m", client=self.es, index=self.index_name, request_timeout=60)
}
} buf = []
} counter = 0
}, index=self.index_name, request_timeout=200) for doc in to_delete:
break buf.append(doc)
except elasticsearch.exceptions.ConflictError: counter += 1
logger.warning("Error: multiple delete tasks at the same time, retrying in 20s")
time.sleep(20) if counter >= 400:
except Exception: self._delete(buf)
logger.warning("Timeout during delete! Retrying in 20s") buf.clear()
time.sleep(20) counter = 0
if counter > 0:
self._delete(buf)
except Exception as e:
logger.error(str(e))
def _delete(self, docs):
bulk_string = self.create_bulk_delete_string(docs)
result = self.es.bulk(body=bulk_string, index=self.index_name, doc_type="file", request_timeout=30)
if result["errors"]:
logger.error("Error in ES bulk delete: \n" + result["errors"])
raise IndexingError
def import_json(self, in_lines, website_id: int): def import_json(self, in_lines, website_id: int):
@ -173,6 +194,11 @@ class ElasticSearchEngine(SearchEngine):
action_string = '{"index":{}}\n' action_string = '{"index":{}}\n'
return "\n".join("".join([action_string, ujson.dumps(doc)]) for doc in docs) return "\n".join("".join([action_string, ujson.dumps(doc)]) for doc in docs)
@staticmethod
def create_bulk_delete_string(docs: list):
return "\n".join("".join(["{\"delete\":{\"_id\":\"", doc["_id"], "\"}}"]) for doc in docs)
def search(self, query, page, per_page, sort_order, extensions, size_min, size_max, match_all, fields, date_min, def search(self, query, page, per_page, sort_order, extensions, size_min, size_max, match_all, fields, date_min,
date_max) -> {}: date_max) -> {}: