From a3418af7c1f986112798f05d049779e2235a5bf6 Mon Sep 17 00:00:00 2001 From: simon987 Date: Sun, 14 Jun 2020 13:26:24 -0400 Subject: [PATCH] Multithreading --- bootstrap.sh | 10 ++---- requirements.txt | 2 ++ task_get_cover.py | 14 +++++--- task_get_lastfm.py | 86 +++++++++++++++++++++++++++------------------ task_get_spotify.py | 81 +++++++++++++++++++++++++----------------- task_runner.sh | 2 +- 6 files changed, 117 insertions(+), 78 deletions(-) diff --git a/bootstrap.sh b/bootstrap.sh index 0cde934..f969d91 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -1,9 +1,5 @@ #!/usr/bin/env bash -tmux new -d -s mgCover bash -c "./task_runner.sh task_get_cover.py" - -tmux new -d -s mgLastfm bash -c "./task_runner.sh task_get_lastfm" -tmux new -d -s mgLastfm2 bash -c "./task_runner.sh task_get_lastfm" - -tmux new -d -s mgSpotify bash -c "./task_runner.sh task_get_spotify.py" -tmux new -d -s mgSpotify2 bash -c "./task_runner.sh task_get_spotify.py" +tmux new -d -s mgCover bash -c "./task_runner.sh task_get_cover.py --count 100 --threads 2" +tmux new -d -s mgLastfm bash -c "./task_runner.sh task_get_lastfm.py --count 500 --threads 5" +tmux new -d -s mgSpotify bash -c "./task_runner.sh task_get_spotify.py --count 500 --threads 10" diff --git a/requirements.txt b/requirements.txt index 311de66..bd6621a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ requests psycopg2 spotipy +Pillow +git+git://github.com/simon987/hexlib.git diff --git a/task_get_cover.py b/task_get_cover.py index 5211d6b..757f31f 100644 --- a/task_get_cover.py +++ b/task_get_cover.py @@ -1,4 +1,5 @@ from io import BytesIO +from multiprocessing.pool import ThreadPool import psycopg2 import requests @@ -72,14 +73,19 @@ def download(mbid): return None +def work(mbid): + tn = download(mbid) + save(mbid, tn) + print(mbid) + + if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("--count", type=int, default=1) + parser.add_argument("--threads", type=int, default=3) args = parser.parse_args() - for mbid in get_mbids(args.count): - tn = download(mbid) - save(mbid, tn) - print(mbid) + pool = ThreadPool(processes=args.threads) + pool.map(func=work, iterable=get_mbids(args.count)) diff --git a/task_get_lastfm.py b/task_get_lastfm.py index c756a51..f5dffc7 100755 --- a/task_get_lastfm.py +++ b/task_get_lastfm.py @@ -1,15 +1,18 @@ #!/usr/bin/env python3 import json +import traceback from itertools import repeat - -import config +from queue import Queue import psycopg2 import requests +from hexlib.concurrency import queue_iter, queue_thread_exec + +import config -def get_mbid(lfm_name): +def get_mbid(conn, lfm_name): cur = conn.cursor() cur.execute("SELECT mbid " "FROM mg.lastfm_artist WHERE name=%s", (lfm_name,)) @@ -17,13 +20,13 @@ def get_mbid(lfm_name): return row[0] if row else None -def set_mbid(lfm_name, mbid): +def set_mbid(conn, lfm_name, mbid): cur = conn.cursor() cur.execute("INSERT INTO mg.lastfm_artist VALUES (%s,%s) ON CONFLICT (name) " "DO UPDATE SET mbid=excluded.mbid", (lfm_name, mbid)) -def save_tags(lfm_name, tags): +def save_tags(conn, lfm_name, tags): if not tags: return cur = conn.cursor() @@ -31,23 +34,23 @@ def save_tags(lfm_name, tags): cur.execute("DELETE FROM mg.lastfm_artist_tag WHERE name=%s", (lfm_name,)) cur.execute( "INSERT INTO mg.lastfm_artist_tag VALUES %s" % - ",".join("('%s', '%s')" % (n, t.strip()) for (n, t) in zip(repeat(lfm_name), tags)) + ",".join("('%s', '%s')" % (n, t.strip().replace("'", "''")) for (n, t) in zip(repeat(lfm_name), tags)) ) -def save_data(data): +def save_data(conn, data): if data: - disambiguate(data["name"], mbid=data["artist"]) + disambiguate(conn, data["name"], mbid=data["artist"]) for similar in [s for s in data["similar"] if s["mbid"] is not None]: - disambiguate(similar["name"], similar["mbid"]) - save_similar(data["name"], similar["name"], similar["match"]) + disambiguate(conn, similar["name"], similar["mbid"]) + save_similar(conn, data["name"], similar["name"], similar["match"]) - save_tags(data["name"], data["tags"]) - save_meta(data["name"], data["listeners"], data["playcount"]) + save_tags(conn, data["name"], data["tags"]) + save_meta(conn, data["name"], data["listeners"], data["playcount"]) -def save_similar(lfm_name, similar, weight): +def save_similar(conn, lfm_name, similar, weight): cur = conn.cursor() cur.execute( @@ -57,21 +60,21 @@ def save_similar(lfm_name, similar, weight): ) -def save_meta(lfm_name, listeners, playcount): +def save_meta(conn, lfm_name, listeners, playcount): cur = conn.cursor() cur.execute("INSERT INTO mg.lastfm_artist_meta VALUES (%s,%s,%s) ON CONFLICT (name) " "DO UPDATE SET listeners=excluded.listeners, playcount=excluded.playcount", (lfm_name, listeners, playcount)) -def save_raw_data(name, mbid, data): +def save_raw_data(conn, name, mbid, data): cur = conn.cursor() cur.execute("INSERT INTO mg.lastfm_raw_data (name, mbid, data) VALUES (%s,%s,%s) " "ON CONFLICT (name, mbid) DO UPDATE SET ts=CURRENT_TIMESTAMP, data=excluded.data", (name, mbid, json.dumps(data))) -def get_release_count(mbid): +def get_release_count(conn, mbid): cur = conn.cursor() cur.execute('SELECT COUNT(*) ' 'FROM l_artist_release ' @@ -81,22 +84,22 @@ def get_release_count(mbid): return row[0] if row else 0 -def disambiguate(name, mbid): +def disambiguate(conn, name, mbid): """ A lastfm artist name can refer to multiple MBIDs For RELATED_TO purposes, we assume that the MBID referring to the artist with the most official releases is the one """ - existing_mbid = get_mbid(name) + existing_mbid = get_mbid(conn, name) if existing_mbid and mbid != existing_mbid: - if get_release_count(existing_mbid) < get_release_count(mbid): - set_mbid(name, mbid) + if get_release_count(conn, existing_mbid) < get_release_count(conn, mbid): + set_mbid(conn, name, mbid) else: - set_mbid(name, mbid) + set_mbid(conn, name, mbid) -def get_cached_artist_data(name, mbid, max_age_days): +def get_cached_artist_data(conn, name, mbid, max_age_days): cur = conn.cursor() cur.execute("SELECT data FROM mg.lastfm_raw_data WHERE name=%s AND mbid=%s " "AND date_part('day', CURRENT_TIMESTAMP - ts) <= %s ", @@ -106,8 +109,8 @@ def get_cached_artist_data(name, mbid, max_age_days): return row[0] if row else 0 -def get_artist_data(name: str, mbid: str): - cached_data = get_cached_artist_data(name, mbid, max_age_days=30) +def get_artist_data(conn, name: str, mbid: str): + cached_data = get_cached_artist_data(conn, name, mbid, max_age_days=30) if cached_data: return cached_data @@ -132,7 +135,7 @@ def get_artist_data(name: str, mbid: str): data = { "_raw": raw } - save_raw_data(name, mbid, data) + save_raw_data(conn, name, mbid, data) return by_name = True @@ -164,12 +167,12 @@ def get_artist_data(name: str, mbid: str): "_raw": raw } - save_raw_data(name, mbid, data) + save_raw_data(conn, name, mbid, data) return data -def get_task(count=1): +def get_task(conn, count=1): cur = conn.cursor() cur.execute( "SELECT artist.name, artist.gid FROM artist " @@ -177,7 +180,21 @@ def get_task(count=1): "ORDER BY lfm.ts NULLS FIRST LIMIT %s", (count,) ) - return cur.fetchone() + return cur.fetchall() + + +def worker(q): + conn = psycopg2.connect(config.connstr()) + + for task in queue_iter(q, block=False): + try: + save_data(conn, get_artist_data(conn, *task)) + conn.commit() + print(task[0]) + except Exception as e: + print("Error %s : %s" % (e, traceback.format_exc())) + + conn.close() if __name__ == "__main__": @@ -185,13 +202,14 @@ if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--count", type=int, default=1) + parser.add_argument("--threads", type=int, default=4) args = parser.parse_args() + queue = Queue() + conn = psycopg2.connect(config.connstr()) - - for task in get_task(args.count): - save_data(get_artist_data(*task)) - conn.commit() - print(task[0]) - + for task in get_task(conn, args.count): + queue.put(task) conn.close() + + queue_thread_exec(queue, func=worker, thread_count=args.threads) diff --git a/task_get_spotify.py b/task_get_spotify.py index 95253ff..b343542 100755 --- a/task_get_spotify.py +++ b/task_get_spotify.py @@ -1,16 +1,19 @@ #!/usr/bin/env python import json +import traceback from itertools import repeat +from queue import Queue import psycopg2 import spotipy +from hexlib.concurrency import queue_thread_exec, queue_iter from hexlib.misc import silent_stdout from spotipy.oauth2 import SpotifyClientCredentials import config -def save_raw(query, endpoint, data): +def save_raw(conn, query, endpoint, data): cur = conn.cursor() cur.execute( "INSERT INTO mg.spotify_raw_data (query, endpoint, data) VALUES (%s,%s,%s) " @@ -20,7 +23,7 @@ def save_raw(query, endpoint, data): ) -def save_artist(data, max_age_days=30): +def save_artist(conn, data, max_age_days=30): """Returns True if artist is new (and therefore, its albums, tracks etc. should be fetched)""" cur = conn.cursor() @@ -47,9 +50,9 @@ def save_artist(data, max_age_days=30): return True -def get_albums(spotid): +def get_albums(conn, spotify, spotid): data = silent_stdout(spotify.artist_albums, spotid, album_type="album,single,compilation") - save_raw(spotid, "artist_albums", data) + save_raw(conn, spotid, "artist_albums", data) cur = conn.cursor() cur.execute("DELETE FROM mg.spotify_artist_album WHERE spotid=%s", (spotid,)) @@ -62,9 +65,9 @@ def get_albums(spotid): return list() -def get_tracks(spotid): +def get_tracks(conn, spotify, spotid): data = silent_stdout(spotify.artist_top_tracks, spotid) - save_raw(spotid, "artist_top_tracks", data) + save_raw(conn, spotid, "artist_top_tracks", data) cur = conn.cursor() cur.execute("DELETE FROM mg.spotify_artist_track WHERE spotid=%s", (spotid,)) @@ -85,13 +88,13 @@ def get_tracks(spotid): ) -def related(spotid): +def related(conn, spotify, spotid): data = silent_stdout(spotify.artist_related_artists, spotid) - save_raw(spotid, "artist_related_artists", data) + save_raw(conn, spotid, "artist_related_artists", data) return data["artists"] -def save_artist_artist(id0, relations): +def save_artist_artist(conn, id0, relations): if relations: cur = conn.cursor() cur.execute( @@ -103,7 +106,7 @@ def save_artist_artist(id0, relations): ) -def get_mbids_with_matching_name(name): +def get_mbids_with_matching_name(conn, name): cur = conn.cursor() cur.execute( "SELECT gid FROM artist " @@ -115,7 +118,7 @@ def get_mbids_with_matching_name(name): return [r[0] for r in rows] -def resolve_spotify_conflict(mbid, existing_spotid, new_spotid): +def resolve_spotify_conflict(conn, mbid, existing_spotid, new_spotid): cur = conn.cursor() cur.execute( "SELECT asciifold_lower(album) FROM mg.spotify_artist_album WHERE spotid=%s", @@ -144,7 +147,7 @@ def resolve_spotify_conflict(mbid, existing_spotid, new_spotid): cur.execute("UPDATE mg.spotify_artist SET spotid = %s WHERE mbid=%s", (new_spotid, mbid)) -def resolve_mb_conflict(spotid, mbids): +def resolve_mb_conflict(conn, spotid, mbids): cur = conn.cursor() cur.execute( @@ -185,10 +188,10 @@ def resolve_mb_conflict(spotid, mbids): best_match_count = match_count best_match = mbid - save_spotid_to_mbid(spotid, best_match) + save_spotid_to_mbid(conn, spotid, best_match) -def save_spotid_to_mbid(spotid, mbid): +def save_spotid_to_mbid(conn, spotid, mbid): cur = conn.cursor() cur.execute( "SELECT spotid FROM mg.spotify_artist WHERE mbid=%s", @@ -196,7 +199,7 @@ def save_spotid_to_mbid(spotid, mbid): ) row = cur.fetchone() if row: - resolve_spotify_conflict(mbid, row[0], spotid) + resolve_spotify_conflict(conn, mbid, row[0], spotid) else: cur.execute( "INSERT INTO mg.spotify_artist (spotid, mbid) VALUES (%s,%s)", @@ -204,28 +207,28 @@ def save_spotid_to_mbid(spotid, mbid): ) -def search_artist(name): +def search_artist(conn, spotify, name): quoted_name = "\"%s\"" % name data = silent_stdout(spotify.search, quoted_name, type="artist", limit=20) - save_raw(name, "search", data) + save_raw(conn, name, "search", data) for result in data["artists"]["items"]: - if save_artist(result): - mbids = get_mbids_with_matching_name(result["name"]) + if save_artist(conn, result): + mbids = get_mbids_with_matching_name(conn, result["name"]) - get_albums(result["id"]) - get_tracks(result["id"]) + get_albums(conn, spotify, result["id"]) + get_tracks(conn, spotify, result["id"]) if len(mbids) > 1: - resolve_mb_conflict(result["id"], mbids) + resolve_mb_conflict(conn, result["id"], mbids) elif len(mbids) == 1: - save_spotid_to_mbid(result["id"], mbids[0]) + save_spotid_to_mbid(conn, result["id"], mbids[0]) - save_artist_artist(result["id"], related(result["id"])) + save_artist_artist(conn, result["id"], related(conn, spotify, result["id"])) -def get_tasks(count=1): +def get_tasks(conn, count=1): cur = conn.cursor() cur.execute( "SELECT artist.name FROM artist " @@ -239,23 +242,37 @@ def get_tasks(count=1): yield row[0] +def worker(q): + conn = psycopg2.connect(config.connstr()) + spotify = spotipy.Spotify(client_credentials_manager=client_credentials_manager) + + for task in queue_iter(q, block=False): + try: + search_artist(conn, spotify, task) + conn.commit() + # print(task) + except Exception as e: + print("Error %s : %s" % (e, traceback.format_exc())) + conn.close() + + if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("--count", type=int, default=1) + parser.add_argument("--threads", type=int, default=10) args = parser.parse_args() - conn = psycopg2.connect(config.connstr()) client_credentials_manager = SpotifyClientCredentials( client_id=config.config["SPOTIFY_CLIENTID"], client_secret=config.config["SPOTIFY_SECRET"] ) - spotify = spotipy.Spotify(client_credentials_manager=client_credentials_manager) - - for name in get_tasks(args.count): - search_artist(name) - conn.commit() - print(name) + queue = Queue() + conn = psycopg2.connect(config.connstr()) + for task in get_tasks(conn, args.count): + queue.put(task) conn.close() + + queue_thread_exec(queue, func=worker, thread_count=args.threads) diff --git a/task_runner.sh b/task_runner.sh index 30c5833..edcf10b 100755 --- a/task_runner.sh +++ b/task_runner.sh @@ -16,5 +16,5 @@ if [[ -z "${SPOTIFY_SECRET}" ]]; then fi while true; do - /usr/bin/time python3 "$1" --count 50 &>> "$1".log + /usr/bin/time python3 "$@" &>> "$1".log done