od-database/database.py
2020-01-25 12:54:26 -05:00

331 lines
11 KiB
Python

import os
import time
import uuid
from urllib.parse import urlparse, urljoin
import bcrypt
import psycopg2
class BlacklistedWebsite:
def __init__(self, blacklist_id, url):
self.id = blacklist_id
self.netloc = url
class Website:
def __init__(self, url, logged_ip, logged_useragent, last_modified=None, website_id=None):
self.url = url
self.logged_ip = logged_ip
self.logged_useragent = logged_useragent
self.last_modified = last_modified
self.id = website_id
class ApiClient:
def __init__(self, token, name):
self.token = token
self.name = name
class Database:
def __init__(self, db_conn_str):
self.db_conn_str = db_conn_str
self.website_cache = dict()
self.website_cache_time = 0
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("SELECT EXISTS (SELECT 1 FROM pg_tables "
"WHERE tablename = 'searchlogentry')")
if not cursor.fetchone()[0]:
self.init_database()
def init_database(self):
print("Initializing database")
with open("init_script.sql", "r") as f:
init_script = f.read()
with psycopg2.connect(self.db_conn_str) as conn:
cur = conn.cursor()
cur.execute(init_script)
def update_website_date_if_exists(self, website_id):
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("UPDATE Website SET last_modified=CURRENT_TIMESTAMP WHERE id=%s", (website_id,))
conn.commit()
def insert_website(self, website: Website):
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO Website (url, logged_ip, logged_useragent) VALUES (%s,%s,%s) RETURNING id",
(website.url, str(website.logged_ip), str(website.logged_useragent)))
website_id = cursor.fetchone()[0]
conn.commit()
return website_id
def get_website_by_url(self, url):
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, url, logged_ip, logged_useragent, last_modified FROM Website WHERE url=%s",
(url,))
db_web = cursor.fetchone()
if db_web:
website = Website(db_web[1], db_web[2], db_web[3], db_web[4], str(db_web[0]))
return website
else:
return None
def get_website_by_id(self, website_id):
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM Website WHERE id=%s", (website_id,))
db_web = cursor.fetchone()
if db_web:
website = Website(db_web[1], db_web[2], db_web[3], str(db_web[4]))
website.id = db_web[0]
return website
else:
return None
def get_websites(self, per_page, page: int, url):
"""Get all websites"""
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("SELECT Website.id, Website.url, Website.last_modified FROM Website "
"WHERE Website.url LIKE %s "
"ORDER BY last_modified DESC LIMIT %s OFFSET %s", (url + "%", per_page, page * per_page))
return cursor.fetchall()
def get_random_website_id(self):
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("SELECT id FROM Website ORDER BY random() LIMIT 1")
row = cursor.fetchone()
if row:
return row[0]
return None
def website_exists(self, url):
"""Check if an url or the parent directory of an url already exists"""
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("SELECT id FROM Website WHERE url = substr(%s, 0, length(url) + 1)", (url,))
website_id = cursor.fetchone()
return website_id[0] if website_id else None
def delete_website(self, website_id):
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM Website WHERE id=%s", (website_id,))
conn.commit()
def check_login(self, username, password) -> bool:
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("SELECT password FROM Admin WHERE username=%s", (username,))
db_user = cursor.fetchone()
if db_user:
return bcrypt.checkpw(password.encode(), db_user[0].tobytes())
return False
def get_user_role(self, username: str):
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("SELECT role FROM Admin WHERE username=%s", (username,))
db_user = cursor.fetchone()
if db_user:
return db_user[0]
return False
def generate_login(self, username, password) -> None:
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
hashed_pw = bcrypt.hashpw(password.encode(), bcrypt.gensalt(12))
cursor.execute("INSERT INTO Admin (username, password, role) VALUES (%s,%s, 'admin')",
(username, hashed_pw))
conn.commit()
def check_api_token(self, token) -> str:
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("SELECT name FROM ApiClient WHERE token=%s", (token,))
result = cursor.fetchone()
return result[0] if result else None
def generate_api_token(self, name: str) -> str:
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
token = str(uuid.uuid4())
cursor.execute("INSERT INTO ApiClient (token, name) VALUES (%s, %s)", (token, name))
conn.commit()
return token
def get_tokens(self) -> list:
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("SELECT token, name FROM ApiClient")
return [ApiClient(x[0], x[1]) for x in cursor.fetchall()]
def delete_token(self, token: str) -> None:
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM ApiClient WHERE token=%s", (token,))
conn.commit()
def get_all_websites(self) -> dict:
if self.website_cache_time + 120 < time.time():
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, url FROM Website")
result = dict()
for db_website in cursor.fetchall():
result[db_website[0]] = db_website[1]
self.website_cache = result
self.website_cache_time = time.time()
return self.website_cache
def join_website_on_search_result(self, page: dict) -> dict:
websites = self.get_all_websites()
for hit in page["hits"]["hits"]:
if hit["_source"]["website_id"] in websites:
hit["_source"]["website_url"] = urljoin(websites[hit["_source"]["website_id"]], "/")
else:
hit["_source"]["website_url"] = "[DELETED]"
return page
def join_website_url(self, docs):
websites = self.get_all_websites()
for doc in docs:
if doc["_source"]["website_id"] in websites:
doc["_source"]["website_url"] = urljoin(websites[doc["_source"]["website_id"]], "/")
else:
doc["_source"]["website_url"] = "[DELETED]"
yield doc
def join_website_on_stats(self, stats):
websites = self.get_all_websites()
for website in stats["website_scatter"]:
website[0] = websites.get(website[0], "[DELETED]")
def add_blacklist_website(self, url):
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
parsed_url = urlparse(url)
url = parsed_url.scheme + "://" + parsed_url.netloc
cursor.execute("INSERT INTO BlacklistedWebsite (url) VALUES (%s)", (url,))
conn.commit()
def remove_blacklist_website(self, blacklist_id):
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM BlacklistedWebsite WHERE id=%s", (blacklist_id,))
conn.commit()
def is_blacklisted(self, url):
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
parsed_url = urlparse(url)
url = parsed_url.scheme + "://" + parsed_url.netloc
print(url)
cursor.execute("SELECT id FROM BlacklistedWebsite WHERE url LIKE %s LIMIT 1", (url,))
return cursor.fetchone() is not None
def get_blacklist(self):
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM BlacklistedWebsite")
return [BlacklistedWebsite(r[0], r[1]) for r in cursor.fetchall()]
def log_search(self, remote_addr, forwarded_for, q, exts, page, blocked, results, took):
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT INTO SearchLogEntry "
"(remote_addr, forwarded_for, query, extensions, page, blocked, results, took) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s)",
(remote_addr, forwarded_for, q, ",".join(exts), page, blocked, results, took))
conn.commit()
def get_oldest_updated_websites(self, size: int, prefix: str):
with psycopg2.connect(self.db_conn_str) as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, url, last_modified FROM website "
"WHERE url LIKE %s "
"ORDER BY last_modified ASC LIMIT %s",
(prefix + "%", size, ))
return [Website(url=r[1],
website_id=r[0],
last_modified=r[2],
logged_ip=None,
logged_useragent=None
)
for r in cursor.fetchall()]