sync_es: die when killed

This commit is contained in:
queue
2017-05-28 19:46:38 -06:00
parent 847b7d4b5e
commit 33852a55bf

View File

@@ -24,6 +24,10 @@ database into es, at the expense of redoing a (small) amount of indexing.
This uses multithreading so we don't have to block on socket io (both binlog
reading and es POSTing). asyncio soon™
This script will exit on any sort of exception, so you'll want to use your
supervisor's restart functionality, e.g. Restart=failure in systemd, or
the poor man's `while true; do sync_es.py; sleep 1; done` in tmux.
"""
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, BulkIndexError
@@ -133,14 +137,31 @@ def delet_this(row, index_name):
'_type': 'torrent',
'_id': str(row['values']['id'])}
# we could try to make this script robust to errors from es or mysql, but since
# the only thing we can do is "clear state and retry", it's easier to leave
# this to the supervisor. If we we carrying around heavier state in-process,
# it'd be more worth it to handle errors ourselves.
#
# Apparently there's no setDefaultUncaughtExceptionHandler in threading, and
# sys.excepthook is also broken, so this gives us the same
# exit-if-anything-happens semantics.
class ExitingThread(Thread):
def run(self):
try:
self.run_happy()
except:
log.exception("something happened")
# sys.exit only exits the thread, lame
import os
os._exit(1)
class BinlogReader(Thread):
class BinlogReader(ExitingThread):
# write_buf is the Queue we communicate with
def __init__(self, write_buf):
Thread.__init__(self)
self.write_buf = write_buf
def run(self):
def run_happy(self):
with open(SAVE_LOC) as f:
pos = json.load(f)
@@ -229,7 +250,7 @@ class BinlogReader(Thread):
else:
raise Exception(f"unknown table {s.table}")
class EsPoster(Thread):
class EsPoster(ExitingThread):
# read_buf is the queue of stuff to bulk post
def __init__(self, read_buf, chunk_size=1000, flush_interval=5):
Thread.__init__(self)
@@ -237,7 +258,7 @@ class EsPoster(Thread):
self.chunk_size = chunk_size
self.flush_interval = flush_interval
def run(self):
def run_happy(self):
es = Elasticsearch(timeout=30)
last_save = time.time()