mirror of
https://github.com/simon987/music-graph-ui.git
synced 2025-04-10 14:06:41 +00:00
121 lines
3.1 KiB
Python
Executable File
121 lines
3.1 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
import json
|
|
import os
|
|
import random
|
|
import sqlite3
|
|
import sys
|
|
import traceback
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
|
|
import PIL
|
|
import requests
|
|
from PIL import Image
|
|
|
|
PIL.Image.MAX_IMAGE_PIXELS = 933120000
|
|
current_mbid = ""
|
|
|
|
|
|
def should_download(image: dict):
|
|
return image["front"] is True
|
|
|
|
|
|
def thumb(cover_blob):
|
|
with Image.open(BytesIO(cover_blob)) as image:
|
|
|
|
# https://stackoverflow.com/questions/43978819
|
|
if image.mode == "I;16":
|
|
image.mode = "I"
|
|
image.point(lambda i: i * (1. / 256)).convert('L')
|
|
|
|
image.thumbnail((256, 256), Image.BICUBIC)
|
|
canvas = Image.new("RGB", image.size, 0x000000)
|
|
|
|
if image.mode in ('RGBA', 'LA') or (image.mode == 'P' and 'transparency' in image.info):
|
|
try:
|
|
canvas.paste(image, mask=image.split()[-1])
|
|
except ValueError:
|
|
canvas.paste(image)
|
|
else:
|
|
canvas.paste(image)
|
|
|
|
blob = BytesIO()
|
|
canvas.save(blob, "JPEG", quality=85, optimize=True)
|
|
canvas.close()
|
|
|
|
return blob.getvalue()
|
|
|
|
|
|
def download(mbid):
|
|
global current_mbid
|
|
current_mbid = mbid
|
|
r = requests.get("https://archive.org/metadata/mbid-" + mbid)
|
|
meta = r.json()
|
|
|
|
if "files" not in meta or "workable_servers" not in meta or not meta["workable_servers"]:
|
|
return
|
|
|
|
directory = "https://" + random.choice(meta["workable_servers"]) + meta["dir"]
|
|
index = directory + "/index.json"
|
|
|
|
r = requests.get(index)
|
|
if r.status_code == 404:
|
|
mb_meta = meta
|
|
urls = [
|
|
directory + "/" + f["name"]
|
|
for f in meta["files"] if "thumb" not in f["name"] and
|
|
not f["name"].endswith((".xml", ".txt", ".json", ".torrent"))
|
|
]
|
|
else:
|
|
mb_meta = r.json()
|
|
|
|
urls = [
|
|
directory + "/mbid-" + mbid + "-" + image["image"][image["image"].rfind("/") + 1:]
|
|
for image in mb_meta["images"] if should_download(image)
|
|
]
|
|
|
|
if not urls:
|
|
return
|
|
|
|
cover = requests.get(urls[0]).content
|
|
|
|
if cover:
|
|
dbfile = "/mnt/Data7/caa_p2.db"
|
|
if not os.path.exists(dbfile):
|
|
with sqlite3.connect(dbfile, timeout=30000) as conn:
|
|
c = conn.cursor()
|
|
c.execute(
|
|
"CREATE TABLE covers(id TEXT, cover BLOB, tn BLOB, meta TEXT, ts TEXT default CURRENT_TIMESTAMP)")
|
|
|
|
try:
|
|
tn = thumb(cover)
|
|
except:
|
|
tn = None
|
|
|
|
with sqlite3.connect(dbfile, timeout=30000) as conn:
|
|
c = conn.cursor()
|
|
c.execute("INSERT INTO covers (id, cover, tn, meta) VALUES (?,?,?,?)",
|
|
(mbid, cover, tn, json.dumps(mb_meta),))
|
|
|
|
|
|
try:
|
|
task_str = sys.argv[1]
|
|
task = json.loads(task_str)
|
|
mbids = json.loads(task["recipe"])
|
|
for mbid in mbids:
|
|
download(mbid)
|
|
|
|
except Exception as e:
|
|
print(json.dumps({
|
|
"result": 1,
|
|
"logs": [
|
|
{"message": str(e) + "$$" + current_mbid + "$$" + traceback.format_exc(), "level": 3}
|
|
]
|
|
}))
|
|
quit(2)
|
|
|
|
print(json.dumps({
|
|
"result": 0,
|
|
}))
|