diff --git a/captcha.py b/captcha.py index 2352b38..9b3620e 100644 --- a/captcha.py +++ b/captcha.py @@ -37,7 +37,8 @@ def verify(): ) if "cap" in session: - expected = oddb.redis.get(session["cap"]).decode("utf8") + expected = oddb.redis.get(session["cap"]) + expected = expected.decode("utf8") if expected is not None else "" oddb.redis.delete(session["cap"]) if expected == attempt: diff --git a/common.py b/common.py index fe68ab1..089f8d7 100644 --- a/common.py +++ b/common.py @@ -5,6 +5,7 @@ from logging import FileHandler, StreamHandler import redis as r from flask import session, abort +import config from database import Database from search.search import ElasticSearchEngine from tasks import TaskManager @@ -19,13 +20,15 @@ logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s') file_handler = FileHandler("oddb.log") file_handler.setFormatter(formatter) +for h in logger.handlers: + logger.removeHandler(h) logger.addHandler(file_handler) logger.addHandler(StreamHandler(sys.stdout)) taskManager = TaskManager() searchEngine = ElasticSearchEngine("od-database") searchEngine.start_stats_scheduler() -db = Database("db.sqlite3") +db = Database(config.DB_CONN_STR) redis = r.Redis() diff --git a/database.py b/database.py index d8ba1a7..bdaf0dc 100644 --- a/database.py +++ b/database.py @@ -1,9 +1,9 @@ -import os -import sqlite3 +import time import uuid from urllib.parse import urlparse import bcrypt +import psycopg2 class BlacklistedWebsite: @@ -31,35 +31,42 @@ class ApiClient: class Database: - def __init__(self, db_path): + def __init__(self, db_conn_str): + self.db_conn_str = db_conn_str + self.website_cache = dict() + self.website_cache_time = 0 - self.db_path = db_path + with psycopg2.connect(self.db_conn_str) as conn: + cursor = conn.cursor() + cursor.execute("SELECT EXISTS (SELECT 1 FROM pg_tables " + "WHERE tablename = 'searchlogentry')") - if not os.path.exists(db_path): - self.init_database() + if not cursor.fetchone()[0]: + self.init_database() def init_database(self): + print("Initializing database") + with open("init_script.sql", "r") as f: init_script = f.read() - with sqlite3.connect(self.db_path) as conn: - conn.executescript(init_script) - conn.commit() + with psycopg2.connect(self.db_conn_str) as conn: + cur = conn.cursor() + cur.execute(init_script) def update_website_date_if_exists(self, website_id): - with sqlite3.connect(self.db_path) as conn: - + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() - cursor.execute("UPDATE Website SET last_modified=CURRENT_TIMESTAMP WHERE id=?", (website_id, )) + cursor.execute("UPDATE Website SET last_modified=CURRENT_TIMESTAMP WHERE id=%s", (website_id,)) conn.commit() def insert_website(self, website: Website): - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() - cursor.execute("INSERT INTO Website (url, logged_ip, logged_useragent) VALUES (?,?,?)", + cursor.execute("INSERT INTO Website (url, logged_ip, logged_useragent) VALUES (%s,%s,%s)", (website.url, str(website.logged_ip), str(website.logged_useragent))) cursor.execute("SELECT LAST_INSERT_ROWID()") @@ -70,28 +77,28 @@ class Database: def get_website_by_url(self, url): - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() - cursor.execute("SELECT id, url, logged_ip, logged_useragent, last_modified FROM Website WHERE url=?", - (url, )) + cursor.execute("SELECT id, url, logged_ip, logged_useragent, last_modified FROM Website WHERE url=%s", + (url,)) db_web = cursor.fetchone() if db_web: - website = Website(db_web[1], db_web[2], db_web[3], db_web[4], db_web[0]) + website = Website(db_web[1], db_web[2], db_web[3], db_web[4], int(db_web[0].timestamp())) return website else: return None def get_website_by_id(self, website_id): - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() - cursor.execute("SELECT * FROM Website WHERE id=?", (website_id, )) + cursor.execute("SELECT * FROM Website WHERE id=%s", (website_id,)) db_web = cursor.fetchone() if db_web: - website = Website(db_web[1], db_web[2], db_web[3], db_web[4]) + website = Website(db_web[1], db_web[2], db_web[3], int(db_web[4].timestamp())) website.id = db_web[0] return website else: @@ -99,57 +106,58 @@ class Database: def get_websites(self, per_page, page: int, url): """Get all websites""" - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() cursor.execute("SELECT Website.id, Website.url, Website.last_modified FROM Website " - "WHERE Website.url LIKE ?" - "ORDER BY last_modified DESC LIMIT ? OFFSET ?", (url + "%", per_page, page * per_page)) + "WHERE Website.url LIKE %s " + "ORDER BY last_modified DESC LIMIT %s OFFSET %s", (url + "%", per_page, page * per_page)) return cursor.fetchall() def get_random_website_id(self): - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() - cursor.execute("SELECT id FROM Website WHERE id >= (abs(random()) % (SELECT max(id) FROM Website)) LIMIT 1;") + cursor.execute( + "SELECT id FROM Website WHERE id >= (abs(random()) % (SELECT max(id) FROM Website)) LIMIT 1;") return cursor.fetchone()[0] def website_exists(self, url): """Check if an url or the parent directory of an url already exists""" - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() - cursor.execute("SELECT id FROM Website WHERE url = substr(?, 0, length(url) + 1)", (url, )) + cursor.execute("SELECT id FROM Website WHERE url = substr(%s, 0, length(url) + 1)", (url,)) website_id = cursor.fetchone() return website_id[0] if website_id else None def delete_website(self, website_id): - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() - cursor.execute("DELETE FROM Website WHERE id=?", (website_id, )) + cursor.execute("DELETE FROM Website WHERE id=%s", (website_id,)) conn.commit() def check_login(self, username, password) -> bool: - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() - cursor.execute("SELECT password FROM Admin WHERE username=?", (username, )) + cursor.execute("SELECT password FROM Admin WHERE username=%s", (username,)) db_user = cursor.fetchone() if db_user: - return bcrypt.checkpw(password.encode(), db_user[0]) + return bcrypt.checkpw(password.encode(), db_user[0].tobytes()) return False def get_user_role(self, username: str): - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() - cursor.execute("SELECT role FROM Admin WHERE username=?", (username, )) + cursor.execute("SELECT role FROM Admin WHERE username=%s", (username,)) db_user = cursor.fetchone() @@ -159,37 +167,38 @@ class Database: def generate_login(self, username, password) -> None: - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() hashed_pw = bcrypt.hashpw(password.encode(), bcrypt.gensalt(12)) - cursor.execute("INSERT INTO Admin (username, password, role) VALUES (?,?, 'admin')", (username, hashed_pw)) + cursor.execute("INSERT INTO Admin (username, password, role) VALUES (%s,%s, 'admin')", + (username, hashed_pw)) conn.commit() def check_api_token(self, token) -> str: - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() - cursor.execute("SELECT name FROM ApiClient WHERE token=?", (token, )) + cursor.execute("SELECT name FROM ApiClient WHERE token=%s", (token,)) result = cursor.fetchone() return result[0] if result else None def generate_api_token(self, name: str) -> str: - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() token = str(uuid.uuid4()) - cursor.execute("INSERT INTO ApiClient (token, name) VALUES (?, ?)", (token, name)) + cursor.execute("INSERT INTO ApiClient (token, name) VALUES (%s, %s)", (token, name)) conn.commit() return token def get_tokens(self) -> list: - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() cursor.execute("SELECT token, name FROM ApiClient") @@ -198,26 +207,28 @@ class Database: def delete_token(self, token: str) -> None: - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() - cursor.execute("DELETE FROM ApiClient WHERE token=?", (token, )) + cursor.execute("DELETE FROM ApiClient WHERE token=%s", (token,)) conn.commit() def get_all_websites(self) -> dict: + if self.website_cache_time + 120 < time.time(): + with psycopg2.connect(self.db_conn_str) as conn: + cursor = conn.cursor() - # todo: mem cache that - with sqlite3.connect(self.db_path) as conn: + cursor.execute("SELECT id, url FROM Website") - cursor = conn.cursor() + result = dict() - cursor.execute("SELECT id, url FROM Website") + for db_website in cursor.fetchall(): + result[db_website[0]] = db_website[1] - result = {} + self.website_cache = result + self.website_cache_time = time.time() - for db_website in cursor.fetchall(): - result[db_website[0]] = db_website[1] - return result + return self.website_cache def join_website_on_search_result(self, page: dict) -> dict: @@ -248,39 +259,39 @@ class Database: websites = self.get_all_websites() for website in stats["website_scatter"]: - website[0] = websites.get(website[0], "[DELETED]") + website[0] = websites.get(website[0], "[DELETED]") def add_blacklist_website(self, url): - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() parsed_url = urlparse(url) url = parsed_url.scheme + "://" + parsed_url.netloc - cursor.execute("INSERT INTO BlacklistedWebsite (url) VALUES (?)", (url, )) + cursor.execute("INSERT INTO BlacklistedWebsite (url) VALUES (%s)", (url,)) conn.commit() def remove_blacklist_website(self, blacklist_id): - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() - cursor.execute("DELETE FROM BlacklistedWebsite WHERE id=?", (blacklist_id, )) + cursor.execute("DELETE FROM BlacklistedWebsite WHERE id=%s", (blacklist_id,)) conn.commit() def is_blacklisted(self, url): - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() parsed_url = urlparse(url) url = parsed_url.scheme + "://" + parsed_url.netloc print(url) - cursor.execute("SELECT id FROM BlacklistedWebsite WHERE url LIKE ? LIMIT 1", (url, )) + cursor.execute("SELECT id FROM BlacklistedWebsite WHERE url LIKE %s LIMIT 1", (url,)) return cursor.fetchone() is not None def get_blacklist(self): - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM BlacklistedWebsite") @@ -288,10 +299,30 @@ class Database: def log_search(self, remote_addr, forwarded_for, q, exts, page, blocked, results, took): - with sqlite3.connect(self.db_path) as conn: + with psycopg2.connect(self.db_conn_str) as conn: cursor = conn.cursor() - cursor.execute("INSERT INTO SearchLogEntry (remote_addr, forwarded_for, query, extensions, page, blocked, results, took) " - "VALUES (?,?,?,?,?,?,?,?)", (remote_addr, forwarded_for, q, ",".join(exts), page, blocked, results, took)) + cursor.execute( + "INSERT INTO SearchLogEntry " + "(remote_addr, forwarded_for, query, extensions, page, blocked, results, took) " + "VALUES (%s,%s,%s,%s,%s,%s,%s,%s)", + (remote_addr, forwarded_for, q, ",".join(exts), page, blocked, results, took)) conn.commit() + + def get_oldest_updated_websites(self, size: int): + + with psycopg2.connect(self.db_conn_str) as conn: + cursor = conn.cursor() + + cursor.execute("SELECT id, url, last_modified FROM website " + "ORDER BY last_modified ASC LIMIT %s", + (size,)) + return [Website(url=r[1], + website_id=r[0], + last_modified=r[2], + logged_ip=None, + logged_useragent=None + ) + for r in cursor.fetchall()] + diff --git a/export.py b/export.py index efea079..64c917b 100644 --- a/export.py +++ b/export.py @@ -1,6 +1,7 @@ import csv import os +import config from database import Database from search.search import ElasticSearchEngine @@ -9,7 +10,7 @@ def export(outfile="out.csv"): print("Export started, connecting to databases...") es = ElasticSearchEngine("od-database") - db = Database("db.sqlite3") + db = Database(config.DB_CONN_STR) docs = es.stream_all_docs() docs_with_website = db.join_website_on_scan(docs) diff --git a/init_script.sql b/init_script.sql index c5de4a9..dc171b9 100644 --- a/init_script.sql +++ b/init_script.sql @@ -1,22 +1,22 @@ -PRAGMA journal_mode=WAL; +DROP TABLE IF EXISTS Website, Admin, BlacklistedWebsite, ApiClient, SearchLogEntry; CREATE TABLE Website ( - id INTEGER PRIMARY KEY NOT NULL, + id SERIAL PRIMARY KEY NOT NULL, url TEXT, logged_ip TEXT, logged_useragent TEXT, - last_modified INTEGER DEFAULT CURRENT_TIMESTAMP + last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE Admin ( username TEXT PRIMARY KEY NOT NULL, - password TEXT, + password BYTEA, role TEXT ); CREATE TABLE BlacklistedWebsite ( - id INTEGER PRIMARY KEY NOT NULL, + id SERIAL PRIMARY KEY NOT NULL, url TEXT ); @@ -25,16 +25,15 @@ CREATE TABLE ApiClient ( token TEXT NOT NULL ); - CREATE TABLE SearchLogEntry ( - id INTEGER PRIMARY KEY, + id SERIAL PRIMARY KEY, search_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, remote_addr TEXT, forwarded_for TEXT, query TEXT, extensions TEXT, page INT, - blocked INT DEFAULT 0, + blocked BOOLEAN DEFAULT FALSE, results INT DEFAULT 0, took INT DEFAULT 0 ); diff --git a/requirements.txt b/requirements.txt index 24699b2..c4aa199 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,19 +8,17 @@ praw humanfriendly apscheduler bcrypt -ftputil elasticsearch python-dateutil flask_httpauth ujson urllib3 pyOpenSSL -pybloom-live -pycurl lxml pillow Wand numpy matplotlib uwsgi -redis \ No newline at end of file +redis +psycopg2-binary \ No newline at end of file diff --git a/tasks.py b/tasks.py index d6846cc..846bed0 100644 --- a/tasks.py +++ b/tasks.py @@ -2,6 +2,7 @@ import json import logging import os import time +from multiprocessing.pool import ThreadPool from threading import Thread from uuid import uuid4 @@ -9,6 +10,7 @@ import urllib3 import config import database +from database import Website from search.search import ElasticSearchEngine from task_tracker_drone.src.tt_drone.api import TaskTrackerApi, Worker from ws_bucket_client.api import WsBucketApi @@ -59,7 +61,7 @@ class TaskManager: def __init__(self): self.search = ElasticSearchEngine("od-database") - self.db = database.Database("db.sqlite3") + self.db = database.Database(config.DB_CONN_STR) self.tracker = TaskTrackerApi(config.TT_API) self.worker = Worker.from_file(self.tracker) @@ -71,25 +73,33 @@ class TaskManager: self.bucket = WsBucketApi(config.WSB_API, config.WSB_SECRET) - self._indexer_thread = Thread(target=self._do_indexing) - self._indexer_thread.start() + self._indexer_threads = list() + logger.info("Starting %s indexer threads " % (config.INDEXER_THREADS, )) + for _ in range(config.INDEXER_THREADS): + t = Thread(target=self._do_indexing) + self._indexer_threads.append(t) + t.start() + + self._recrawl_thread = Thread(target=self._do_recrawl) + self._recrawl_thread.start() def _do_indexing(self): while True: - logger.debug("Fetching indexing task...") - task = self.tracker.fetch_task(worker=self.worker, project_id=config.TT_INDEX_PROJECT) + task = self.worker.fetch_task(project_id=config.TT_INDEX_PROJECT) if task: try: recipe = task.json_recipe() logger.debug("Got indexing task: " + str(recipe)) - filename = os.path.join(config.WSB_PATH, format_file_name(recipe["website_id"], recipe["upload_token"])) + filename = os.path.join(config.WSB_PATH, + format_file_name(recipe["website_id"], recipe["upload_token"])) + self._complete_task(filename, Task(recipe["website_id"], recipe["url"])) except Exception as e: - print(e) + self.worker.release_task(task_id=task.id, result=1, verification=0) finally: try: - self._complete_task(filename, Task(recipe["website_id"], recipe["url"])) + self.worker.release_task(task_id=task.id, result=0, verification=0) except: pass else: @@ -108,29 +118,34 @@ class TaskManager: line = f.readline() self.search.import_json(iter_lines(), task.website_id) + os.remove(file_list) self.db.update_website_date_if_exists(task.website_id) - def fetch_indexing_task(self): + def _do_recrawl(self): + while True: + time.sleep(60 * 30) + logger.debug("Creating re-crawl tasks") + self._generate_crawling_tasks() - task = self.tracker.fetch_task(worker=self.worker, project_id=config.TT_INDEX_PROJECT) - print(task) + def _generate_crawling_tasks(self): + + # TODO: Insert more in-depth re-crawl logic here + websites_to_crawl = self.db.get_oldest_updated_websites(10000) + + def recrawl(website: Website): + crawl_task = Task(website.id, website.url, + priority=(int((time.time() - website.last_modified.timestamp()) / 3600)) + ) + self.queue_task(crawl_task) + + pool = ThreadPool(processes=10) + pool.map(func=recrawl, iterable=websites_to_crawl) def queue_task(self, task: Task): - max_assign_time = 24 * 7 * 3600 upload_token = uuid4().__str__() - bucket_response = self.bucket.allocate(upload_token.__str__(), - 21474837499, # 20Gib - format_file_name(task.website_id, upload_token), - to_dispose_date=int(time.time() + max_assign_time), - upload_hook="") - if not bucket_response: - return - - print("Allocated upload bucket: %d, t=%s, r=%s" % (task.website_id, upload_token, bucket_response.text)) - task.upload_token = upload_token tracker_response = self.worker.submit_task(config.TT_CRAWL_PROJECT, recipe=task.__str__(), @@ -140,9 +155,18 @@ class TaskManager: verification_count=1, max_retries=3 ) - print("Queued task and made it available to crawlers: t=%s, r=%s" % (task, tracker_response.text)) + print(tracker_response.text) + logging.info("Queued task and made it available to crawlers: t=%s, r=%s" % (task, tracker_response.text)) + if not tracker_response.json()["ok"]: + return + + bucket_response = self.bucket.allocate(upload_token.__str__(), + 21474837499, # 20Gib + format_file_name(task.website_id, upload_token), + to_dispose_date=int(time.time() + max_assign_time), + upload_hook="") + logging.info("Allocated upload bucket: %d, t=%s, r=%s" % (task.website_id, upload_token, bucket_response.text)) def format_file_name(website_id, token): - return "%d_%s.NDJSON" % (website_id, token, ) - + return "%d_%s.NDJSON" % (website_id, token,) diff --git a/templates/dashboard.html b/templates/dashboard.html index 1d1c2ff..85fd607 100644 --- a/templates/dashboard.html +++ b/templates/dashboard.html @@ -74,12 +74,6 @@
-
-

Misc actions

- - Delete websites with no associated files that are - not queued -
Logout