sync_es: move io to separate threads, config json

throughput is definitely massively improved, testing locally.
hopefully it'll be enough.

config moved a separate file by ops request. lazy lazy
This commit is contained in:
queue
2017-05-21 00:55:19 -06:00
parent 6a4ad827c1
commit ea2160a49d
2 changed files with 201 additions and 100 deletions

11
config_es_sync.json Normal file
View File

@@ -0,0 +1,11 @@
{
"save_loc": "/tmp/pos.json",
"mysql_host": "127.0.0.1",
"mysql_port": 13306,
"mysql_user": "root",
"mysql_password": "dunnolol",
"database": "nyaav2",
"internal_queue_depth": 10000,
"es_chunk_size": 10000,
"flush_interval": 5
}

View File

@@ -21,9 +21,12 @@ when import_to_es and sync_es will be lost. Instead, you can run SHOW MASTER
STATUS _before_ you run import_to_es. That way you'll definitely pick up any
changes that happen while the import_to_es script is dumping stuff from the
database into es, at the expense of redoing a (small) amount of indexing.
This uses multithreading so we don't have to block on socket io (both binlog
reading and es POSTing). asyncio soon™
"""
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import bulk, BulkIndexError
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent
from datetime import datetime
@@ -33,54 +36,38 @@ import json
import time
import logging
from statsd import StatsClient
from threading import Thread
from queue import Queue, Empty
logging.basicConfig(format='%(asctime)s %(levelname)s %(name)s - %(message)s')
log = logging.getLogger('sync_es')
log.setLevel(logging.INFO)
# config in json, 2lazy to argparse
if len(sys.argv) != 2:
print("need config.json location", file=sys.stderr)
sys.exit(-1)
with open(sys.argv[1]) as f:
config = json.load(f)
# goes to netdata or other statsd listener
stats = StatsClient('localhost', 8125, prefix="sync_es")
#logging.getLogger('elasticsearch').setLevel(logging.DEBUG)
# in prod want in /var/lib somewhere probably
SAVE_LOC = "/var/lib/sync_es_position.json"
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_USER = 'test'
MYSQL_PW = 'test123'
NT_DB = 'nyaav2'
with open(SAVE_LOC) as f:
pos = json.load(f)
es = Elasticsearch(timeout=30)
stream = BinLogStreamReader(
# TODO parse out from config.py or something
connection_settings = {
'host': MYSQL_HOST,
'port': MYSQL_PORT,
'user': MYSQL_USER,
'passwd': MYSQL_PW
},
server_id=10, # arbitrary
# only care about this database currently
only_schemas=[NT_DB],
# these tables in the database
only_tables=["nyaa_torrents", "nyaa_statistics", "sukebei_torrents", "sukebei_statistics"],
# from our save file
resume_stream=True,
log_file=pos['log_file'],
log_pos=pos['log_pos'],
# skip the other stuff like table mapping
only_events=[UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent],
# if we're at the head of the log, block until something happens
# note it'd be nice to block async-style instead, but the mainline
# binlogreader is synchronous. there is an (unmaintained?) fork
# using aiomysql if anybody wants to revive that.
blocking=True)
SAVE_LOC = config.get('save_loc', "/tmp/pos.json")
MYSQL_HOST = config.get('mysql_host', '127.0.0.1')
MYSQL_PORT = config.get('mysql_port', 3306)
MYSQL_USER = config.get('mysql_user', 'root')
MYSQL_PW = config.get('mysql_password', 'dunnolol')
NT_DB = config.get('database', 'nyaav2')
INTERNAL_QUEUE_DEPTH = config.get('internal_queue_depth', 10000)
ES_CHUNK_SIZE = config.get('es_chunk_size', 10000)
# seconds since no events happening to flush to es. remember this also
# interacts with es' refresh_interval setting.
FLUSH_INTERVAL = config.get('flush_interval', 5)
def reindex_torrent(t, index_name):
# XXX annoyingly different from import_to_es, and
@@ -145,73 +132,176 @@ def delet_this(row, index_name):
'_type': 'torrent',
'_id': str(row['values']['id'])}
last_save = time.time()
since_last = 0
# get POST latency
@stats.timer('post_bulk')
def timed_bulk(events, **kwargs):
bulk(es, events, **kwargs)
class BinlogReader(Thread):
# write_buf is the Queue we communicate with
def __init__(self, write_buf):
Thread.__init__(self)
self.write_buf = write_buf
log.info(f"reading binlog from {stream.log_file}/{stream.log_pos}")
def run(self):
with open(SAVE_LOC) as f:
pos = json.load(f)
for event in stream:
with stats.pipeline() as s:
s.incr('total_events')
s.incr(f"event.{event.table}.{type(event).__name__}")
s.incr('total_rows', len(event.rows))
s.incr(f"rows.{event.table}.{type(event).__name__}", len(event.rows))
# XXX not a "timer", but we get a histogram out of it
s.timing(f"rows_per_event.{event.table}.{type(event).__name__}", len(event.rows))
if event.table == "nyaa_torrents" or event.table == "sukebei_torrents":
if event.table == "nyaa_torrents":
index_name = "nyaa"
else:
index_name = "sukebei"
if type(event) is WriteRowsEvent:
timed_bulk(reindex_torrent(row['values'], index_name) for row in event.rows)
elif type(event) is UpdateRowsEvent:
# UpdateRowsEvent includes the old values too, but we don't care
timed_bulk(reindex_torrent(row['after_values'], index_name) for row in event.rows)
elif type(event) is DeleteRowsEvent:
# ok, bye
timed_bulk(delet_this(row, index_name) for row in event.rows)
else:
raise Exception(f"unknown event {type(event)}")
elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics":
if event.table == "nyaa_statistics":
index_name = "nyaa"
else:
index_name = "sukebei"
if type(event) is WriteRowsEvent:
timed_bulk(
(reindex_stats(row['values'], index_name) for row in event.rows),
# statistics updates are pretty low priority, plus in certain
# cases where we're really out of sync, the torrent could be missing,
# causing a "document missing" error from es, with no way to suppress
# that server-side. Thus ignore all errors, oh well.
raise_on_error=False, stats_only=True)
elif type(event) is UpdateRowsEvent:
timed_bulk(
(reindex_stats(row['after_values'], index_name) for row in event.rows),
raise_on_error=False, stats_only=True)
elif type(event) is DeleteRowsEvent:
# uh ok. Assume that the torrent row will get deleted later,
# which will clean up the entire es "torrent" document
pass
else:
raise Exception(f"unknown event {type(event)}")
else:
raise Exception(f"unknown table {s.table}")
stream = BinLogStreamReader(
# TODO parse out from config.py or something
connection_settings = {
'host': MYSQL_HOST,
'port': MYSQL_PORT,
'user': MYSQL_USER,
'passwd': MYSQL_PW
},
server_id=10, # arbitrary
# only care about this database currently
only_schemas=[NT_DB],
# these tables in the database
only_tables=["nyaa_torrents", "nyaa_statistics", "sukebei_torrents", "sukebei_statistics"],
# from our save file
resume_stream=True,
log_file=pos['log_file'],
log_pos=pos['log_pos'],
# skip the other stuff like table mapping
only_events=[UpdateRowsEvent, DeleteRowsEvent, WriteRowsEvent],
# if we're at the head of the log, block until something happens
# note it'd be nice to block async-style instead, but the mainline
# binlogreader is synchronous. there is an (unmaintained?) fork
# using aiomysql if anybody wants to revive that.
blocking=True)
# how far we're behind, wall clock
stats.gauge('process_latency', int((time.time() - event.timestamp) * 1000))
log.info(f"reading binlog from {stream.log_file}/{stream.log_pos}")
for event in stream:
# save the pos of the stream and timestamp with each message, so we
# can commit in the other thread. and keep track of process latency
pos = (stream.log_file, stream.log_pos, event.timestamp)
with stats.pipeline() as s:
s.incr('total_events')
s.incr(f"event.{event.table}.{type(event).__name__}")
s.incr('total_rows', len(event.rows))
s.incr(f"rows.{event.table}.{type(event).__name__}", len(event.rows))
# XXX not a "timer", but we get a histogram out of it
s.timing(f"rows_per_event.{event.table}.{type(event).__name__}", len(event.rows))
if event.table == "nyaa_torrents" or event.table == "sukebei_torrents":
if event.table == "nyaa_torrents":
index_name = "nyaa"
else:
index_name = "sukebei"
if type(event) is WriteRowsEvent:
for row in event.rows:
self.write_buf.put(
(pos, reindex_torrent(row['values'], index_name)),
block=True)
elif type(event) is UpdateRowsEvent:
# UpdateRowsEvent includes the old values too, but we don't care
for row in event.rows:
self.write_buf.put(
(pos, reindex_torrent(row['after_values'], index_name)),
block=True)
elif type(event) is DeleteRowsEvent:
# ok, bye
for row in event.rows:
self.write_buf.put((pos, delet_this(row, index_name)), block=True)
else:
raise Exception(f"unknown event {type(event)}")
elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics":
if event.table == "nyaa_statistics":
index_name = "nyaa"
else:
index_name = "sukebei"
if type(event) is WriteRowsEvent:
for row in event.rows:
self.write_buf.put(
(pos, reindex_stats(row['values'], index_name)),
block=True)
elif type(event) is UpdateRowsEvent:
for row in event.rows:
self.write_buf.put(
(pos, reindex_stats(row['after_values'], index_name)),
block=True)
elif type(event) is DeleteRowsEvent:
# uh ok. Assume that the torrent row will get deleted later,
# which will clean up the entire es "torrent" document
pass
else:
raise Exception(f"unknown event {type(event)}")
else:
raise Exception(f"unknown table {s.table}")
class EsPoster(Thread):
# read_buf is the queue of stuff to bulk post
def __init__(self, read_buf, chunk_size=1000, flush_interval=5):
Thread.__init__(self)
self.read_buf = read_buf
self.chunk_size = chunk_size
self.flush_interval = flush_interval
def run(self):
es = Elasticsearch(timeout=30)
since_last += 1
if since_last >= 10000 or (time.time() - last_save) > 10:
log.info(f"saving position {stream.log_file}/{stream.log_pos}, {time.time() - event.timestamp:,.3} seconds behind")
with stats.timer('save_pos'):
with open(SAVE_LOC, 'w') as f:
json.dump({"log_file": stream.log_file, "log_pos": stream.log_pos}, f)
last_save = time.time()
since_last = 0
while True:
actions = []
while len(actions) < self.chunk_size:
try:
# grab next event from queue with metadata that creepily
# updates, surviving outside the scope of the loop
((log_file, log_pos, timestamp), action) = \
self.read_buf.get(block=True, timeout=self.flush_interval)
actions.append(action)
except Empty:
# nothing new for the whole interval
break
if not actions:
# nothing to post
log.debug("no changes...")
continue
# XXX "time" to get histogram of no events per bulk
stats.timing('actions_per_bulk', len(actions))
try:
with stats.timer('post_bulk'):
bulk(es, actions, chunk_size=self.chunk_size)
except BulkIndexError as bie:
# in certain cases where we're really out of sync, we update a
# stat when the torrent doc is, causing a "document missing"
# error from es, with no way to suppress that server-side.
# Thus ignore that type of error if it's the only problem
for e in bie.errors:
try:
if e['update']['error']['type'] != 'document_missing_exception':
raise bie
except KeyError:
raise bie
# how far we're behind, wall clock
stats.gauge('process_latency', int((time.time() - timestamp) * 1000))
since_last += len(actions)
if since_last >= 10000 or (time.time() - last_save) > 10:
log.info(f"saving position {log_file}/{log_pos}, {time.time() - timestamp:,.3f} seconds behind")
with stats.timer('save_pos'):
with open(SAVE_LOC, 'w') as f:
json.dump({"log_file": log_file, "log_pos": log_pos}, f)
last_save = time.time()
since_last = 0
# in-memory queue between binlog and es. The bigger it is, the more events we
# can parse in memory while waiting for es to catch up, at the expense of heap.
buf = Queue(maxsize=INTERNAL_QUEUE_DEPTH)
reader = BinlogReader(buf)
reader.daemon = True
writer = EsPoster(buf, chunk_size=ES_CHUNK_SIZE, flush_interval=FLUSH_INTERVAL)
writer.daemon = True
reader.start()
writer.start()
# on the main thread, poll the queue size for monitoring
while True:
stats.gauge('queue_depth', buf.qsize())
time.sleep(1)