Multithreading

This commit is contained in:
simon987 2020-06-14 13:26:24 -04:00
parent 38e81a3dc6
commit a3418af7c1
6 changed files with 117 additions and 78 deletions

View File

@ -1,9 +1,5 @@
#!/usr/bin/env bash #!/usr/bin/env bash
tmux new -d -s mgCover bash -c "./task_runner.sh task_get_cover.py" tmux new -d -s mgCover bash -c "./task_runner.sh task_get_cover.py --count 100 --threads 2"
tmux new -d -s mgLastfm bash -c "./task_runner.sh task_get_lastfm.py --count 500 --threads 5"
tmux new -d -s mgLastfm bash -c "./task_runner.sh task_get_lastfm" tmux new -d -s mgSpotify bash -c "./task_runner.sh task_get_spotify.py --count 500 --threads 10"
tmux new -d -s mgLastfm2 bash -c "./task_runner.sh task_get_lastfm"
tmux new -d -s mgSpotify bash -c "./task_runner.sh task_get_spotify.py"
tmux new -d -s mgSpotify2 bash -c "./task_runner.sh task_get_spotify.py"

View File

@ -1,3 +1,5 @@
requests requests
psycopg2 psycopg2
spotipy spotipy
Pillow
git+git://github.com/simon987/hexlib.git

View File

@ -1,4 +1,5 @@
from io import BytesIO from io import BytesIO
from multiprocessing.pool import ThreadPool
import psycopg2 import psycopg2
import requests import requests
@ -72,14 +73,19 @@ def download(mbid):
return None return None
def work(mbid):
tn = download(mbid)
save(mbid, tn)
print(mbid)
if __name__ == "__main__": if __name__ == "__main__":
import argparse import argparse
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--count", type=int, default=1) parser.add_argument("--count", type=int, default=1)
parser.add_argument("--threads", type=int, default=3)
args = parser.parse_args() args = parser.parse_args()
for mbid in get_mbids(args.count): pool = ThreadPool(processes=args.threads)
tn = download(mbid) pool.map(func=work, iterable=get_mbids(args.count))
save(mbid, tn)
print(mbid)

View File

@ -1,15 +1,18 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import json import json
import traceback
from itertools import repeat from itertools import repeat
from queue import Queue
import config
import psycopg2 import psycopg2
import requests import requests
from hexlib.concurrency import queue_iter, queue_thread_exec
import config
def get_mbid(lfm_name): def get_mbid(conn, lfm_name):
cur = conn.cursor() cur = conn.cursor()
cur.execute("SELECT mbid " cur.execute("SELECT mbid "
"FROM mg.lastfm_artist WHERE name=%s", (lfm_name,)) "FROM mg.lastfm_artist WHERE name=%s", (lfm_name,))
@ -17,13 +20,13 @@ def get_mbid(lfm_name):
return row[0] if row else None return row[0] if row else None
def set_mbid(lfm_name, mbid): def set_mbid(conn, lfm_name, mbid):
cur = conn.cursor() cur = conn.cursor()
cur.execute("INSERT INTO mg.lastfm_artist VALUES (%s,%s) ON CONFLICT (name) " cur.execute("INSERT INTO mg.lastfm_artist VALUES (%s,%s) ON CONFLICT (name) "
"DO UPDATE SET mbid=excluded.mbid", (lfm_name, mbid)) "DO UPDATE SET mbid=excluded.mbid", (lfm_name, mbid))
def save_tags(lfm_name, tags): def save_tags(conn, lfm_name, tags):
if not tags: if not tags:
return return
cur = conn.cursor() cur = conn.cursor()
@ -31,23 +34,23 @@ def save_tags(lfm_name, tags):
cur.execute("DELETE FROM mg.lastfm_artist_tag WHERE name=%s", (lfm_name,)) cur.execute("DELETE FROM mg.lastfm_artist_tag WHERE name=%s", (lfm_name,))
cur.execute( cur.execute(
"INSERT INTO mg.lastfm_artist_tag VALUES %s" % "INSERT INTO mg.lastfm_artist_tag VALUES %s" %
",".join("('%s', '%s')" % (n, t.strip()) for (n, t) in zip(repeat(lfm_name), tags)) ",".join("('%s', '%s')" % (n, t.strip().replace("'", "''")) for (n, t) in zip(repeat(lfm_name), tags))
) )
def save_data(data): def save_data(conn, data):
if data: if data:
disambiguate(data["name"], mbid=data["artist"]) disambiguate(conn, data["name"], mbid=data["artist"])
for similar in [s for s in data["similar"] if s["mbid"] is not None]: for similar in [s for s in data["similar"] if s["mbid"] is not None]:
disambiguate(similar["name"], similar["mbid"]) disambiguate(conn, similar["name"], similar["mbid"])
save_similar(data["name"], similar["name"], similar["match"]) save_similar(conn, data["name"], similar["name"], similar["match"])
save_tags(data["name"], data["tags"]) save_tags(conn, data["name"], data["tags"])
save_meta(data["name"], data["listeners"], data["playcount"]) save_meta(conn, data["name"], data["listeners"], data["playcount"])
def save_similar(lfm_name, similar, weight): def save_similar(conn, lfm_name, similar, weight):
cur = conn.cursor() cur = conn.cursor()
cur.execute( cur.execute(
@ -57,21 +60,21 @@ def save_similar(lfm_name, similar, weight):
) )
def save_meta(lfm_name, listeners, playcount): def save_meta(conn, lfm_name, listeners, playcount):
cur = conn.cursor() cur = conn.cursor()
cur.execute("INSERT INTO mg.lastfm_artist_meta VALUES (%s,%s,%s) ON CONFLICT (name) " cur.execute("INSERT INTO mg.lastfm_artist_meta VALUES (%s,%s,%s) ON CONFLICT (name) "
"DO UPDATE SET listeners=excluded.listeners, playcount=excluded.playcount", "DO UPDATE SET listeners=excluded.listeners, playcount=excluded.playcount",
(lfm_name, listeners, playcount)) (lfm_name, listeners, playcount))
def save_raw_data(name, mbid, data): def save_raw_data(conn, name, mbid, data):
cur = conn.cursor() cur = conn.cursor()
cur.execute("INSERT INTO mg.lastfm_raw_data (name, mbid, data) VALUES (%s,%s,%s) " cur.execute("INSERT INTO mg.lastfm_raw_data (name, mbid, data) VALUES (%s,%s,%s) "
"ON CONFLICT (name, mbid) DO UPDATE SET ts=CURRENT_TIMESTAMP, data=excluded.data", "ON CONFLICT (name, mbid) DO UPDATE SET ts=CURRENT_TIMESTAMP, data=excluded.data",
(name, mbid, json.dumps(data))) (name, mbid, json.dumps(data)))
def get_release_count(mbid): def get_release_count(conn, mbid):
cur = conn.cursor() cur = conn.cursor()
cur.execute('SELECT COUNT(*) ' cur.execute('SELECT COUNT(*) '
'FROM l_artist_release ' 'FROM l_artist_release '
@ -81,22 +84,22 @@ def get_release_count(mbid):
return row[0] if row else 0 return row[0] if row else 0
def disambiguate(name, mbid): def disambiguate(conn, name, mbid):
""" """
A lastfm artist name can refer to multiple MBIDs A lastfm artist name can refer to multiple MBIDs
For RELATED_TO purposes, we assume that the MBID referring For RELATED_TO purposes, we assume that the MBID referring
to the artist with the most official releases is the one to the artist with the most official releases is the one
""" """
existing_mbid = get_mbid(name) existing_mbid = get_mbid(conn, name)
if existing_mbid and mbid != existing_mbid: if existing_mbid and mbid != existing_mbid:
if get_release_count(existing_mbid) < get_release_count(mbid): if get_release_count(conn, existing_mbid) < get_release_count(conn, mbid):
set_mbid(name, mbid) set_mbid(conn, name, mbid)
else: else:
set_mbid(name, mbid) set_mbid(conn, name, mbid)
def get_cached_artist_data(name, mbid, max_age_days): def get_cached_artist_data(conn, name, mbid, max_age_days):
cur = conn.cursor() cur = conn.cursor()
cur.execute("SELECT data FROM mg.lastfm_raw_data WHERE name=%s AND mbid=%s " cur.execute("SELECT data FROM mg.lastfm_raw_data WHERE name=%s AND mbid=%s "
"AND date_part('day', CURRENT_TIMESTAMP - ts) <= %s ", "AND date_part('day', CURRENT_TIMESTAMP - ts) <= %s ",
@ -106,8 +109,8 @@ def get_cached_artist_data(name, mbid, max_age_days):
return row[0] if row else 0 return row[0] if row else 0
def get_artist_data(name: str, mbid: str): def get_artist_data(conn, name: str, mbid: str):
cached_data = get_cached_artist_data(name, mbid, max_age_days=30) cached_data = get_cached_artist_data(conn, name, mbid, max_age_days=30)
if cached_data: if cached_data:
return cached_data return cached_data
@ -132,7 +135,7 @@ def get_artist_data(name: str, mbid: str):
data = { data = {
"_raw": raw "_raw": raw
} }
save_raw_data(name, mbid, data) save_raw_data(conn, name, mbid, data)
return return
by_name = True by_name = True
@ -164,12 +167,12 @@ def get_artist_data(name: str, mbid: str):
"_raw": raw "_raw": raw
} }
save_raw_data(name, mbid, data) save_raw_data(conn, name, mbid, data)
return data return data
def get_task(count=1): def get_task(conn, count=1):
cur = conn.cursor() cur = conn.cursor()
cur.execute( cur.execute(
"SELECT artist.name, artist.gid FROM artist " "SELECT artist.name, artist.gid FROM artist "
@ -177,7 +180,21 @@ def get_task(count=1):
"ORDER BY lfm.ts NULLS FIRST LIMIT %s", "ORDER BY lfm.ts NULLS FIRST LIMIT %s",
(count,) (count,)
) )
return cur.fetchone() return cur.fetchall()
def worker(q):
conn = psycopg2.connect(config.connstr())
for task in queue_iter(q, block=False):
try:
save_data(conn, get_artist_data(conn, *task))
conn.commit()
print(task[0])
except Exception as e:
print("Error %s : %s" % (e, traceback.format_exc()))
conn.close()
if __name__ == "__main__": if __name__ == "__main__":
@ -185,13 +202,14 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--count", type=int, default=1) parser.add_argument("--count", type=int, default=1)
parser.add_argument("--threads", type=int, default=4)
args = parser.parse_args() args = parser.parse_args()
queue = Queue()
conn = psycopg2.connect(config.connstr()) conn = psycopg2.connect(config.connstr())
for task in get_task(conn, args.count):
for task in get_task(args.count): queue.put(task)
save_data(get_artist_data(*task))
conn.commit()
print(task[0])
conn.close() conn.close()
queue_thread_exec(queue, func=worker, thread_count=args.threads)

View File

@ -1,16 +1,19 @@
#!/usr/bin/env python #!/usr/bin/env python
import json import json
import traceback
from itertools import repeat from itertools import repeat
from queue import Queue
import psycopg2 import psycopg2
import spotipy import spotipy
from hexlib.concurrency import queue_thread_exec, queue_iter
from hexlib.misc import silent_stdout from hexlib.misc import silent_stdout
from spotipy.oauth2 import SpotifyClientCredentials from spotipy.oauth2 import SpotifyClientCredentials
import config import config
def save_raw(query, endpoint, data): def save_raw(conn, query, endpoint, data):
cur = conn.cursor() cur = conn.cursor()
cur.execute( cur.execute(
"INSERT INTO mg.spotify_raw_data (query, endpoint, data) VALUES (%s,%s,%s) " "INSERT INTO mg.spotify_raw_data (query, endpoint, data) VALUES (%s,%s,%s) "
@ -20,7 +23,7 @@ def save_raw(query, endpoint, data):
) )
def save_artist(data, max_age_days=30): def save_artist(conn, data, max_age_days=30):
"""Returns True if artist is new (and therefore, its albums, tracks etc. should be fetched)""" """Returns True if artist is new (and therefore, its albums, tracks etc. should be fetched)"""
cur = conn.cursor() cur = conn.cursor()
@ -47,9 +50,9 @@ def save_artist(data, max_age_days=30):
return True return True
def get_albums(spotid): def get_albums(conn, spotify, spotid):
data = silent_stdout(spotify.artist_albums, spotid, album_type="album,single,compilation") data = silent_stdout(spotify.artist_albums, spotid, album_type="album,single,compilation")
save_raw(spotid, "artist_albums", data) save_raw(conn, spotid, "artist_albums", data)
cur = conn.cursor() cur = conn.cursor()
cur.execute("DELETE FROM mg.spotify_artist_album WHERE spotid=%s", (spotid,)) cur.execute("DELETE FROM mg.spotify_artist_album WHERE spotid=%s", (spotid,))
@ -62,9 +65,9 @@ def get_albums(spotid):
return list() return list()
def get_tracks(spotid): def get_tracks(conn, spotify, spotid):
data = silent_stdout(spotify.artist_top_tracks, spotid) data = silent_stdout(spotify.artist_top_tracks, spotid)
save_raw(spotid, "artist_top_tracks", data) save_raw(conn, spotid, "artist_top_tracks", data)
cur = conn.cursor() cur = conn.cursor()
cur.execute("DELETE FROM mg.spotify_artist_track WHERE spotid=%s", (spotid,)) cur.execute("DELETE FROM mg.spotify_artist_track WHERE spotid=%s", (spotid,))
@ -85,13 +88,13 @@ def get_tracks(spotid):
) )
def related(spotid): def related(conn, spotify, spotid):
data = silent_stdout(spotify.artist_related_artists, spotid) data = silent_stdout(spotify.artist_related_artists, spotid)
save_raw(spotid, "artist_related_artists", data) save_raw(conn, spotid, "artist_related_artists", data)
return data["artists"] return data["artists"]
def save_artist_artist(id0, relations): def save_artist_artist(conn, id0, relations):
if relations: if relations:
cur = conn.cursor() cur = conn.cursor()
cur.execute( cur.execute(
@ -103,7 +106,7 @@ def save_artist_artist(id0, relations):
) )
def get_mbids_with_matching_name(name): def get_mbids_with_matching_name(conn, name):
cur = conn.cursor() cur = conn.cursor()
cur.execute( cur.execute(
"SELECT gid FROM artist " "SELECT gid FROM artist "
@ -115,7 +118,7 @@ def get_mbids_with_matching_name(name):
return [r[0] for r in rows] return [r[0] for r in rows]
def resolve_spotify_conflict(mbid, existing_spotid, new_spotid): def resolve_spotify_conflict(conn, mbid, existing_spotid, new_spotid):
cur = conn.cursor() cur = conn.cursor()
cur.execute( cur.execute(
"SELECT asciifold_lower(album) FROM mg.spotify_artist_album WHERE spotid=%s", "SELECT asciifold_lower(album) FROM mg.spotify_artist_album WHERE spotid=%s",
@ -144,7 +147,7 @@ def resolve_spotify_conflict(mbid, existing_spotid, new_spotid):
cur.execute("UPDATE mg.spotify_artist SET spotid = %s WHERE mbid=%s", (new_spotid, mbid)) cur.execute("UPDATE mg.spotify_artist SET spotid = %s WHERE mbid=%s", (new_spotid, mbid))
def resolve_mb_conflict(spotid, mbids): def resolve_mb_conflict(conn, spotid, mbids):
cur = conn.cursor() cur = conn.cursor()
cur.execute( cur.execute(
@ -185,10 +188,10 @@ def resolve_mb_conflict(spotid, mbids):
best_match_count = match_count best_match_count = match_count
best_match = mbid best_match = mbid
save_spotid_to_mbid(spotid, best_match) save_spotid_to_mbid(conn, spotid, best_match)
def save_spotid_to_mbid(spotid, mbid): def save_spotid_to_mbid(conn, spotid, mbid):
cur = conn.cursor() cur = conn.cursor()
cur.execute( cur.execute(
"SELECT spotid FROM mg.spotify_artist WHERE mbid=%s", "SELECT spotid FROM mg.spotify_artist WHERE mbid=%s",
@ -196,7 +199,7 @@ def save_spotid_to_mbid(spotid, mbid):
) )
row = cur.fetchone() row = cur.fetchone()
if row: if row:
resolve_spotify_conflict(mbid, row[0], spotid) resolve_spotify_conflict(conn, mbid, row[0], spotid)
else: else:
cur.execute( cur.execute(
"INSERT INTO mg.spotify_artist (spotid, mbid) VALUES (%s,%s)", "INSERT INTO mg.spotify_artist (spotid, mbid) VALUES (%s,%s)",
@ -204,28 +207,28 @@ def save_spotid_to_mbid(spotid, mbid):
) )
def search_artist(name): def search_artist(conn, spotify, name):
quoted_name = "\"%s\"" % name quoted_name = "\"%s\"" % name
data = silent_stdout(spotify.search, quoted_name, type="artist", limit=20) data = silent_stdout(spotify.search, quoted_name, type="artist", limit=20)
save_raw(name, "search", data) save_raw(conn, name, "search", data)
for result in data["artists"]["items"]: for result in data["artists"]["items"]:
if save_artist(result): if save_artist(conn, result):
mbids = get_mbids_with_matching_name(result["name"]) mbids = get_mbids_with_matching_name(conn, result["name"])
get_albums(result["id"]) get_albums(conn, spotify, result["id"])
get_tracks(result["id"]) get_tracks(conn, spotify, result["id"])
if len(mbids) > 1: if len(mbids) > 1:
resolve_mb_conflict(result["id"], mbids) resolve_mb_conflict(conn, result["id"], mbids)
elif len(mbids) == 1: elif len(mbids) == 1:
save_spotid_to_mbid(result["id"], mbids[0]) save_spotid_to_mbid(conn, result["id"], mbids[0])
save_artist_artist(result["id"], related(result["id"])) save_artist_artist(conn, result["id"], related(conn, spotify, result["id"]))
def get_tasks(count=1): def get_tasks(conn, count=1):
cur = conn.cursor() cur = conn.cursor()
cur.execute( cur.execute(
"SELECT artist.name FROM artist " "SELECT artist.name FROM artist "
@ -239,23 +242,37 @@ def get_tasks(count=1):
yield row[0] yield row[0]
def worker(q):
conn = psycopg2.connect(config.connstr())
spotify = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
for task in queue_iter(q, block=False):
try:
search_artist(conn, spotify, task)
conn.commit()
# print(task)
except Exception as e:
print("Error %s : %s" % (e, traceback.format_exc()))
conn.close()
if __name__ == "__main__": if __name__ == "__main__":
import argparse import argparse
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--count", type=int, default=1) parser.add_argument("--count", type=int, default=1)
parser.add_argument("--threads", type=int, default=10)
args = parser.parse_args() args = parser.parse_args()
conn = psycopg2.connect(config.connstr())
client_credentials_manager = SpotifyClientCredentials( client_credentials_manager = SpotifyClientCredentials(
client_id=config.config["SPOTIFY_CLIENTID"], client_id=config.config["SPOTIFY_CLIENTID"],
client_secret=config.config["SPOTIFY_SECRET"] client_secret=config.config["SPOTIFY_SECRET"]
) )
spotify = spotipy.Spotify(client_credentials_manager=client_credentials_manager) queue = Queue()
for name in get_tasks(args.count):
search_artist(name)
conn.commit()
print(name)
conn = psycopg2.connect(config.connstr())
for task in get_tasks(conn, args.count):
queue.put(task)
conn.close() conn.close()
queue_thread_exec(queue, func=worker, thread_count=args.threads)

View File

@ -16,5 +16,5 @@ if [[ -z "${SPOTIFY_SECRET}" ]]; then
fi fi
while true; do while true; do
/usr/bin/time python3 "$1" --count 50 &>> "$1".log /usr/bin/time python3 "$@" &>> "$1".log
done done