hooked up ES... 90% done, need to figure out how to generate magnet URIs

This commit is contained in:
aldacron
2017-05-15 23:51:58 -07:00
parent c2c547e786
commit 899aa01473
11 changed files with 585 additions and 226 deletions

View File

@@ -40,7 +40,12 @@ log.setLevel(logging.INFO)
#logging.getLogger('elasticsearch').setLevel(logging.DEBUG)
# in prod want in /var/lib somewhere probably
SAVE_LOC = "/tmp/sync_es_position.json"
SAVE_LOC = "/var/lib/sync_es_position.json"
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_USER = 'test'
MYSQL_PW = 'test123'
NT_DB = 'nyaav2'
with open(SAVE_LOC) as f:
pos = json.load(f)
@@ -50,16 +55,16 @@ es = Elasticsearch()
stream = BinLogStreamReader(
# TODO parse out from config.py or something
connection_settings = {
'host': '127.0.0.1',
'port': 13306,
'user': 'root',
'passwd': 'dunnolol'
'host': MYSQL_HOST,
'port': MYSQL_PORT,
'user': MYSQL_USER,
'passwd': MYSQL_PW
},
server_id=10, # arbitrary
# only care about this table currently
only_schemas=["nyaav2"],
# TODO sukebei
only_tables=["nyaa_torrents", "nyaa_statistics"],
# only care about this database currently
only_schemas=[NT_DB],
# these tables in the database
only_tables=["nyaa_torrents", "nyaa_statistics", "sukebei_torrents", "sukebei_statistics"],
# from our save file
resume_stream=True,
log_file=pos['log_file'],
@@ -72,7 +77,7 @@ stream = BinLogStreamReader(
# using aiomysql if anybody wants to revive that.
blocking=True)
def reindex_torrent(t):
def reindex_torrent(t, index_name):
# XXX annoyingly different from import_to_es, and
# you need to keep them in sync manually.
f = t['flags']
@@ -103,14 +108,14 @@ def reindex_torrent(t):
}
# update, so we don't delete the stats if present
es.update(
index='nyaav2',
index=index_name,
doc_type='torrent',
id=t['id'],
body={"doc": doc, "doc_as_upsert": True})
def reindex_stats(s):
def reindex_stats(s, index_name):
es.update(
index='nyaav2',
index=index_name,
doc_type='torrent',
id=s['torrent_id'],
body={
@@ -126,21 +131,29 @@ last_save = time.time()
for event in stream:
for row in event.rows:
if event.table == "nyaa_torrents":
if event.table == "nyaa_torrents" or event.table == "sukebei_torrents":
if event.table == "nyaa_torrents":
index_name = "nyaa"
else:
index_name = "sukebei"
if type(event) is WriteRowsEvent:
reindex_torrent(row['values'])
reindex_torrent(row['values'], index_name)
elif type(event) is UpdateRowsEvent:
reindex_torrent(row['after_values'])
reindex_torrent(row['after_values'], index_name)
elif type(event) is DeleteRowsEvent:
# just delete it
es.delete(index='nyaav2', doc_type='torrent', id=row['values']['id'])
es.delete(index=index_name, doc_type='torrent', id=row['values']['id'])
else:
raise Exception(f"unknown event {type(event)}")
elif event.table == "nyaa_statistics":
elif event.table == "nyaa_statistics" or event.table == "sukebei_statistics":
if event.table == "nyaa_torrents":
index_name = "nyaa"
else:
index_name = "sukebei"
if type(event) is WriteRowsEvent:
reindex_stats(row['values'])
reindex_stats(row['values'], index_name)
elif type(event) is UpdateRowsEvent:
reindex_stats(row['after_values'])
reindex_stats(row['after_values'], index_name)
elif type(event) is DeleteRowsEvent:
# uh ok. assume that the torrent row will get deleted later.
pass