diff --git a/app.py b/app.py
index 3c423ad..6b92256 100644
--- a/app.py
+++ b/app.py
@@ -1,16 +1,13 @@
from flask import Flask, render_template, redirect, request, flash, abort, Response, send_from_directory, session
import os
-import json
import time
import ssl
from database import Database, Website, InvalidQueryException
from flask_recaptcha import ReCaptcha
import od_util
-import sqlite3
import config
from flask_caching import Cache
-from task import TaskManager
-
+from task import TaskDispatcher, Task
app = Flask(__name__)
recaptcha = ReCaptcha(app=app,
@@ -23,7 +20,7 @@ app.jinja_env.globals.update(truncate_path=od_util.truncate_path)
app.jinja_env.globals.update(get_color=od_util.get_color)
app.jinja_env.globals.update(get_mime=od_util.get_mime)
-tm = TaskManager()
+taskDispatcher = TaskDispatcher()
@app.template_filter("datetime_format")
@@ -68,8 +65,9 @@ def website_json_chart(website_id):
website = db.get_website_by_id(website_id)
+ print("FIXME: website_json_chart")
if website:
- stats = Response(json.dumps(db.get_website_stats(website_id)), mimetype="application/json")
+ stats = {}
return stats
else:
abort(404)
@@ -81,7 +79,9 @@ def website_links(website_id):
website = db.get_website_by_id(website_id)
if website:
- return Response("\n".join(db.get_website_links(website_id)), mimetype="text/plain")
+ print("FIXME: website_links")
+ links = []
+ return Response("\n".join(links), mimetype="text/plain")
else:
abort(404)
@@ -107,7 +107,9 @@ def search():
if q:
try:
- hits = db.search(q, per_page, page, sort_order)
+ # hits = sea.search(q, per_page, page, sort_order)
+ print("FIXME: Search")
+ hits = []
except InvalidQueryException as e:
flash("Invalid query: " + str(e), "warning")
return redirect("/search")
@@ -127,21 +129,16 @@ def contribute():
@app.route("/")
def home():
- if tm.busy.value == 1:
- current_website = tm.current_website.url
- else:
- current_website = None
-
- try:
- stats = db.get_stats()
- except sqlite3.OperationalError:
- stats = None
+ # TODO get stats
+ stats = {}
+ current_website = "TODO"
return render_template("home.html", stats=stats, current_website=current_website)
@app.route("/submit")
def submit():
- return render_template("submit.html", queue=db.queue(), recaptcha=recaptcha)
+ queued_websites = taskDispatcher.get_queued_tasks()
+ return render_template("submit.html", queue=queued_websites, recaptcha=recaptcha)
def try_enqueue(url):
@@ -172,7 +169,9 @@ def try_enqueue(url):
"this is an error, please contact me.", "danger"
web_id = db.insert_website(Website(url, str(request.remote_addr), str(request.user_agent)))
- db.enqueue(web_id)
+
+ task = Task(web_id, url, priority=1)
+ taskDispatcher.dispatch_task(task)
return "The website has been added to the queue", "success"
@@ -219,7 +218,6 @@ def enqueue_bulk():
return redirect("/submit")
-
@app.route("/admin")
def admin_login_form():
if "username" in session:
diff --git a/crawler/crawler.py b/crawl_server/crawler.py
similarity index 97%
rename from crawler/crawler.py
rename to crawl_server/crawler.py
index 6087443..5f4cda0 100644
--- a/crawler/crawler.py
+++ b/crawl_server/crawler.py
@@ -47,8 +47,8 @@ class RemoteDirectory:
class RemoteDirectoryFactory:
- from crawler.ftp import FtpDirectory
- from crawler.http import HttpDirectory
+ from crawl_server.remote_ftp import FtpDirectory
+ from crawl_server.remote_http import HttpDirectory
DIR_ENGINES = (FtpDirectory, HttpDirectory)
@staticmethod
diff --git a/crawl_server/database.py b/crawl_server/database.py
index e2b0704..1fb84ef 100644
--- a/crawl_server/database.py
+++ b/crawl_server/database.py
@@ -5,12 +5,21 @@ import sqlite3
class TaskResult:
- def __init__(self):
- self.status_code: str = None
- self.file_count = 0
- self.start_time = None
- self.end_time = None
- self.website_id = None
+ def __init__(self, status_code=None, file_count=0, start_time=0, end_time=0, website_id=0):
+ self.status_code = status_code
+ self.file_count = file_count
+ self.start_time = start_time
+ self.end_time = end_time
+ self.website_id = website_id
+
+ def to_json(self):
+ return {
+ "status_code": self.status_code,
+ "file_count": self.file_count,
+ "start_time": self.start_time,
+ "end_time": self.end_time,
+ "website_id": self.website_id
+ }
class Task:
@@ -24,13 +33,16 @@ class Task:
self.callback_args = json.loads(callback_args) if callback_args else {}
def to_json(self):
- return ({
+ return {
"website_id": self.website_id,
"url": self.url,
"priority": self.priority,
"callback_type": self.callback_type,
"callback_args": json.dumps(self.callback_args)
- })
+ }
+
+ def __repr__(self):
+ return json.dumps(self.to_json())
class TaskManagerDatabase:
@@ -96,3 +108,17 @@ class TaskManagerDatabase:
"VALUES (?,?,?,?,?)", (result.website_id, result.status_code, result.file_count,
result.start_time, result.end_time))
conn.commit()
+
+ def get_non_indexed_results(self):
+ """Get a list of new TaskResults since the last call of this method"""
+
+ with sqlite3.connect(self.db_path) as conn:
+ cursor = conn.cursor()
+
+ cursor.execute("SELECT status_code, file_count, start_time, end_time, website_id"
+ " FROM TaskResult WHERE indexed_time != NULL")
+ db_result = cursor.fetchall()
+
+ cursor.execute("UPDATE TaskResult SET indexed_time = CURRENT_TIMESTAMP")
+
+ return [TaskResult(r[0], r[1], r[2], r[3], r[4]) for r in db_result]
diff --git a/crawler/ftp.py b/crawl_server/remote_ftp.py
similarity index 96%
rename from crawler/ftp.py
rename to crawl_server/remote_ftp.py
index 660f39f..ac01d09 100644
--- a/crawler/ftp.py
+++ b/crawl_server/remote_ftp.py
@@ -8,7 +8,7 @@ import ftputil.error
from ftputil.session import session_factory
import random
import timeout_decorator
-from crawler.crawler import RemoteDirectory, File, TooManyConnectionsError
+from crawl_server.crawler import RemoteDirectory, File, TooManyConnectionsError
class FtpDirectory(RemoteDirectory):
diff --git a/crawler/http.py b/crawl_server/remote_http.py
similarity index 98%
rename from crawler/http.py
rename to crawl_server/remote_http.py
index 5728a1f..9f00ddf 100644
--- a/crawler/http.py
+++ b/crawl_server/remote_http.py
@@ -3,7 +3,7 @@ from urllib.parse import urljoin, unquote
import os
from lxml import etree
from itertools import repeat
-from crawler.crawler import RemoteDirectory, File
+from crawl_server.crawler import RemoteDirectory, File
import requests
from requests.exceptions import RequestException
from multiprocessing.pool import ThreadPool
diff --git a/crawl_server/server.py b/crawl_server/server.py
index cb1006d..ac16364 100644
--- a/crawl_server/server.py
+++ b/crawl_server/server.py
@@ -1,16 +1,11 @@
from flask import Flask, request, abort, Response
import json
-from crawl_server.task_manager import TaskManager, Task
+from crawl_server.task_manager import TaskManager, Task, TaskResult
app = Flask(__name__)
tm = TaskManager("tm_db.sqlite3")
-@app.route("/")
-def hello():
- return "Hello World!"
-
-
@app.route("/task/")
def get_tasks():
json_str = json.dumps([task.to_json() for task in tm.get_tasks()])
@@ -37,5 +32,18 @@ def task_put():
return abort(400)
+@app.route("/task/completed", methods=["GET"])
+def get_completed_tasks():
+ json_str = json.dumps([result.to_json() for result in tm.get_non_indexed_results()])
+ return json_str
+
+
+@app.route("/task/current", methods=["GET"])
+def get_current_tasks():
+
+ current_tasks = tm.get_current_tasks()
+ return current_tasks
+
+
if __name__ == "__main__":
- app.run()
+ app.run(port=5001)
diff --git a/crawl_server/task_manager.py b/crawl_server/task_manager.py
index cfeebda..4957893 100644
--- a/crawl_server/task_manager.py
+++ b/crawl_server/task_manager.py
@@ -2,7 +2,7 @@ from crawl_server.database import TaskManagerDatabase, Task, TaskResult
from multiprocessing import Pool
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
-from crawler.crawler import RemoteDirectoryCrawler
+from crawl_server.crawler import RemoteDirectoryCrawler
class TaskManager:
@@ -12,8 +12,10 @@ class TaskManager:
self.db = TaskManagerDatabase(db_path)
self.pool = Pool(processes=max_processes)
+ self.current_tasks = []
+
scheduler = BackgroundScheduler()
- scheduler.add_job(self.execute_queued_task, "interval", seconds=1)
+ scheduler.add_job(self.execute_queued_task, "interval", seconds=5)
scheduler.start()
def put_task(self, task: Task):
@@ -22,11 +24,21 @@ class TaskManager:
def get_tasks(self):
return self.db.get_tasks()
+ def get_current_tasks(self):
+ return self.current_tasks
+
+ def get_non_indexed_results(self):
+ return self.db.get_non_indexed_results()
+
def execute_queued_task(self):
task = self.db.pop_task()
if task:
+
+ self.current_tasks.append(task)
+
print("pooled " + task.url)
+
self.pool.apply_async(
TaskManager.run_task,
args=(task, self.db_path),
@@ -68,8 +80,9 @@ class TaskManager:
@staticmethod
def task_error(err):
- print("ERROR")
+ print("FIXME: Task failed (This should not happen)")
print(err)
+ raise err
diff --git a/crawler/__init__.py b/crawler/__init__.py
deleted file mode 100644
index e69de29..0000000
diff --git a/database.py b/database.py
index 11dd4b5..b991fda 100644
--- a/database.py
+++ b/database.py
@@ -20,16 +20,6 @@ class Website:
self.id = website_id
-class File:
-
- def __init__(self, website_id: int, path: str, mime: str, name: str, size: int):
- self.mime = mime
- self.size = size
- self.name = name
- self.path = path
- self.website_id = website_id
-
-
class ApiToken:
def __init__(self, token, description):
@@ -39,13 +29,6 @@ class ApiToken:
class Database:
- SORT_ORDERS = {
- "score": "ORDER BY rank",
- "size_asc": "ORDER BY size ASC",
- "size_dsc": "ORDER BY size DESC",
- "none": ""
- }
-
def __init__(self, db_path):
self.db_path = db_path
@@ -75,60 +58,6 @@ class Database:
return website_id
- def insert_files(self, files: list):
-
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.cursor()
-
- # Insert Paths first
- website_paths = dict()
- for file in files:
- if file.path not in website_paths:
- cursor.execute("INSERT INTO WebsitePath (website_id, path) VALUES (?,?)",
- (file.website_id, file.path))
- cursor.execute("SELECT LAST_INSERT_ROWID()")
- website_paths[file.path] = cursor.fetchone()[0]
-
- # Then FileTypes
- mimetypes = dict()
- cursor.execute("SELECT * FROM FileType")
- db_mimetypes = cursor.fetchall()
- for db_mimetype in db_mimetypes:
- mimetypes[db_mimetype[1]] = db_mimetype[0]
- for file in files:
- if file.mime not in mimetypes:
- cursor.execute("INSERT INTO FileType (mime) VALUES (?)", (file.mime, ))
- cursor.execute("SELECT LAST_INSERT_ROWID()")
- mimetypes[file.mime] = cursor.fetchone()[0]
-
- conn.commit()
- # Then insert files
- cursor.executemany("INSERT INTO File (path_id, name, size, mime_id) VALUES (?,?,?,?)",
- [(website_paths[x.path], x.name, x.size, mimetypes[x.mime]) for x in files])
-
- # Update date
- if len(files) > 0:
- cursor.execute("UPDATE Website SET last_modified=CURRENT_TIMESTAMP WHERE id = ?",
- (files[0].website_id, ))
-
- conn.commit()
-
- def import_json(self, json_file, website: Website):
-
- if not self.get_website_by_url(website.url):
- website_id = self.insert_website(website)
- else:
- website_id = website.id
-
- with open(json_file, "r") as f:
- try:
- self.insert_files([File(website_id, x["path"], os.path.splitext(x["name"])[1].lower(), x["name"], x["size"])
- for x in json.load(f)])
- except Exception as e:
- print(e)
- print("Couldn't read json file!")
- pass
-
def get_website_by_url(self, url):
with sqlite3.connect(self.db_path) as conn:
@@ -158,152 +87,6 @@ class Database:
else:
return None
- def enqueue(self, website_id, reddit_post_id=None, reddit_comment_id=None, priority=1):
-
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.cursor()
- if reddit_post_id:
- cursor.execute("INSERT OR IGNORE INTO Queue (website_id, reddit_post_id, priority) VALUES (?,?,?)",
- (website_id, reddit_post_id, priority))
- else:
- cursor.execute("INSERT OR IGNORE INTO Queue (website_id, reddit_comment_id, priority) VALUES (?,?,?)",
- (website_id, reddit_comment_id, priority))
- conn.commit()
-
- def dequeue(self):
-
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.cursor()
-
- cursor.execute("SELECT website_id, reddit_post_id, reddit_comment_id"
- " FROM Queue ORDER BY priority DESC, Queue.id ASC LIMIT 1")
- website = cursor.fetchone()
-
- if website:
- cursor.execute("DELETE FROM Queue WHERE website_id=?", (website[0],))
- return website[0], website[1], website[2]
- else:
- return None
-
- def queue(self):
-
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.cursor()
-
- cursor.execute("SELECT url, logged_ip, logged_useragent, last_modified "
- "FROM Queue INNER JOIN Website ON website_id=Website.id "
- "ORDER BY Queue.priority DESC, Queue.id ASC")
-
- return [Website(x[0], x[1], x[2], x[3]) for x in cursor.fetchall()]
-
- def get_stats(self):
-
- stats = {}
- with sqlite3.connect(self.db_path) as conn:
-
- cursor = conn.cursor()
-
- cursor.execute("SELECT COUNT(*), SUM(size) FROM File")
- db_files = cursor.fetchone()
-
- stats["file_count"] = db_files[0]
- stats["file_size"] = db_files[1]
-
- cursor.execute("SELECT COUNT(DISTINCT website_id), COUNT(id) FROM WebsitePath")
- db_websites = cursor.fetchone()
- stats["website_count"] = db_websites[0]
- stats["website_paths"] = db_websites[1]
-
- return stats
-
- def search(self, q, limit: int = 50, offset: int = 0, sort_order="score"):
-
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.cursor()
-
- try:
- order_by = Database.SORT_ORDERS.get(sort_order, "")
- cursor.execute("SELECT size, Website.url, WebsitePath.path, File.name, Website.id FROM File_index "
- "INNER JOIN File ON File.id = File_index.rowid "
- "INNER JOIN WebsitePath ON File.path_id = WebsitePath.id "
- "INNER JOIN Website ON website_id = Website.id "
- "WHERE File_index MATCH ? " +
- order_by + " LIMIT ? OFFSET ?",
- (q, limit, offset * limit))
- except sqlite3.OperationalError as e:
- raise InvalidQueryException(str(e))
-
- return cursor.fetchall()
-
- def get_website_stats(self, website_id):
-
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.cursor()
-
- cursor.execute("SELECT SUM(File.size), COUNT(*) FROM File "
- "INNER JOIN WebsitePath Path on File.path_id = Path.id "
- "WHERE Path.website_id = ?", (website_id, ))
- file_sum, file_count = cursor.fetchone()
-
- cursor.execute("SELECT SUM(File.size) as total_size, COUNT(File.id), FileType.mime FROM File "
- "INNER JOIN FileType ON FileType.id = File.mime_id "
- "INNER JOIN WebsitePath Path on File.path_id = Path.id "
- "WHERE Path.website_id = ? "
- "GROUP BY FileType.id ORDER BY total_size DESC", (website_id, ))
- db_mime_stats = cursor.fetchall()
-
- cursor.execute("SELECT Website.url, Website.last_modified FROM Website WHERE id = ?", (website_id, ))
- website_url, website_date = cursor.fetchone()
-
- return {
- "total_size": file_sum if file_sum else 0,
- "total_count": file_count if file_count else 0,
- "base_url": website_url,
- "report_time": website_date,
- "mime_stats": db_mime_stats
- }
-
- def get_subdir_stats(self, website_id: int, path: str):
- """Get stats of a sub directory. path must not start with / and must end with /"""
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.cursor()
-
- cursor.execute("SELECT SUM(File.size), COUNT(*) FROM File "
- "INNER JOIN WebsitePath Path on File.path_id = Path.id "
- "WHERE Path.website_id = ? AND Path.path LIKE ?", (website_id, path + "%"))
- file_sum, file_count = cursor.fetchone()
-
- cursor.execute("SELECT SUM(File.size) as total_size, COUNT(File.id), FileType.mime FROM File "
- "INNER JOIN FileType ON FileType.id = File.mime_id "
- "INNER JOIN WebsitePath Path on File.path_id = Path.id "
- "WHERE Path.website_id = ? AND Path.path LIKE ? "
- "GROUP BY FileType.id ORDER BY total_size DESC", (website_id, path + "%"))
- db_mime_stats = cursor.fetchall()
-
- cursor.execute("SELECT Website.url, Website.last_modified FROM Website WHERE id = ?", (website_id, ))
- website_url, website_date = cursor.fetchone()
-
- return {
- "total_size": file_sum if file_sum else 0,
- "total_count": file_count if file_count else 0,
- "base_url": website_url,
- "report_time": website_date,
- "mime_stats": db_mime_stats
- }
-
- def get_website_links(self, website_id):
- """Get all download links of a website"""
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.cursor()
-
- website = self.get_website_by_id(website_id)
-
- cursor.execute("SELECT File.name, WebsitePath.path FROM File "
- "INNER JOIN WebsitePath on File.path_id = WebsitePath.id "
- "WHERE WebsitePath.website_id = ?", (website.id, ))
-
- return [website.url + x[1] + ("/" if len(x[1]) > 0 else "") + x[0] for x in cursor.fetchall()]
-
def get_websites(self, per_page, page: int):
"""Get all websites"""
with sqlite3.connect(self.db_path) as conn:
@@ -325,29 +108,13 @@ class Database:
def website_has_been_scanned(self, url):
"""Check if a website has at least 1 file"""
-
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.cursor()
-
- website_id = self.website_exists(url)
-
- if website_id:
- cursor.execute("SELECT COUNT(Path.id) FROM Website "
- "INNER JOIN WebsitePath Path on Website.id = Path.website_id "
- "WHERE Website.id = ?", (website_id, ))
- return cursor.fetchone()[0] > 0
- return None
+ # TODO: Check with SearchEngine
+ print("FIXME: website_has_been_scanned")
def clear_website(self, website_id):
"""Remove all files from a website and update its last_updated date"""
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.cursor()
-
- cursor.execute("DELETE FROM File WHERE File.path_id IN (SELECT WebsitePath.id "
- "FROM WebsitePath WHERE WebsitePath.website_id=?)", (website_id, ))
- cursor.execute("DELETE FROM WebsitePath WHERE website_id=?", (website_id, ))
- cursor.execute("UPDATE Website SET last_modified=CURRENT_TIMESTAMP WHERE id=?", (website_id, ))
- conn.commit()
+ # TODO: Check with SearchEngine
+ print("FIXME: clear_website")
def get_websites_older(self, delta: datetime.timedelta):
"""Get websites last updated before a given date"""
@@ -358,17 +125,6 @@ class Database:
cursor.execute("SELECT Website.id FROM Website WHERE last_modified < ?", (date, ))
return [x[0] for x in cursor.fetchall()]
- def get_websites_smaller(self, size: int):
- """Get the websites with total size smaller than specified"""
-
- with sqlite3.connect(self.db_path) as conn:
- cursor = conn.cursor()
- cursor.execute("SELECT Website.id FROM Website "
- "INNER JOIN WebsitePath Path on Website.id = Path.website_id "
- "INNER JOIN File F on Path.id = F.path_id "
- "GROUP BY Website.id HAVING SUM(F.size) < ?", (size, ))
- return cursor.fetchall()
-
def delete_website(self, website_id):
with sqlite3.connect(self.db_path) as conn:
diff --git a/debug_put.py b/debug_put.py
index 9b5cffa..55a61b9 100644
--- a/debug_put.py
+++ b/debug_put.py
@@ -4,12 +4,12 @@ import json
payload = json.dumps({
"website_id": 123,
- "url": "http://124.158.108.137/ebooks/",
+ "url": "https://frenchy.ga/",
"priority": 2,
"callback_type": "",
"callback_args": "{}"
})
-r = requests.post("http://localhost:5000/task/put",
+r = requests.post("http://localhost:5001/task/put",
headers={"Content-Type": "application/json"},
data=payload)
diff --git a/init_script.sql b/init_script.sql
index a318de7..b52e948 100644
--- a/init_script.sql
+++ b/init_script.sql
@@ -9,39 +9,6 @@ CREATE TABLE Website (
last_modified INTEGER DEFAULT CURRENT_TIMESTAMP
);
-CREATE TABLE WebsitePath (
-
- id INTEGER PRIMARY KEY NOT NULL,
- website_id INTEGER,
- path TEXT,
-
- FOREIGN KEY (website_id) REFERENCES Website(id)
-);
-
-CREATE TABLE FileType (
- id INTEGER PRIMARY KEY NOT NULL,
- mime TEXT
-);
-
-CREATE TABLE File (
- id INTEGER PRIMARY KEY NOT NULL,
- path_id INTEGER,
- mime_id INTEGER,
- name TEXT,
- size INTEGER,
-
- FOREIGN KEY (path_id) REFERENCES WebsitePath(id),
- FOREIGN KEY (mime_id) REFERENCES FileType(id)
-);
-
-CREATE TABLE Queue (
- id INTEGER PRIMARY KEY NOT NULL,
- website_id INTEGER UNIQUE,
- reddit_post_id TEXT,
- reddit_comment_id TEXT,
- priority INTEGER
-);
-
CREATE TABLE Admin (
username TEXT PRIMARY KEY NOT NULL,
password TEXT
@@ -51,24 +18,3 @@ CREATE TABLE ApiToken (
token TEXT PRIMARY KEY NOT NULL,
description TEXT
);
-
--- Full Text Index
-
-CREATE VIRTUAL TABLE File_index USING fts5 (
- name,
- path,
- tokenize=porter
-);
-
-CREATE TRIGGER after_File_index_insert AFTER INSERT ON File BEGIN
-
- INSERT INTO File_index (rowid, name, path)
- SELECT File.id, File.name, WebsitePath.path
- FROM File
- INNER JOIN WebsitePath on File.path_id = WebsitePath.id
- WHERE File.id = new.id;
-END;
-
-CREATE TRIGGER after_File_index_delete AFTER DELETE ON File BEGIN
- DELETE FROM File_index WHERE rowid = old.id;
-END;
\ No newline at end of file
diff --git a/task.py b/task.py
index 8031d14..46a517c 100644
--- a/task.py
+++ b/task.py
@@ -1,81 +1,91 @@
from apscheduler.schedulers.background import BackgroundScheduler
-import os
-from database import Website
-from multiprocessing import Value, Process
-from database import Database
+from crawl_server.database import Task, TaskResult
+import requests
+import json
from reddit_bot import RedditBot
import praw
-class TaskManager:
+class CrawlServer:
+
+ headers = {
+ "Content-Type": "application/json"
+ }
+
+ def __init__(self, url):
+ self.url = url
+
+ def queue_task(self, task: Task) -> bool:
+
+ print("Sending task to crawl server " + self.url)
+ payload = json.dumps(task.to_json())
+ r = requests.post(self.url + "/task/put", headers=CrawlServer.headers, data=payload)
+ print(r)
+ return r.status_code == 200
+
+ def get_completed_tasks(self) -> list:
+
+ r = requests.get(self.url + "/task/completed")
+ return []
+
+ def get_queued_tasks(self) -> list:
+
+ r = requests.get(self.url + "/task/")
+ return [
+ Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"])
+ for t in json.loads(r.text)
+ ]
+
+ def get_current_tasks(self):
+
+ r = requests.get(self.url + "/task/current")
+ return [
+ Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"])
+ for t in json.loads(r.text)
+ ]
+
+
+class TaskDispatcher:
def __init__(self):
- self.busy = Value("i", 0)
- self.current_website = None
- self.current_task = None
-
reddit = praw.Reddit('opendirectories-bot',
user_agent='github.com/simon987/od-database v1.0 (by /u/Hexahedr_n)')
self.reddit_bot = RedditBot("crawled.txt", reddit)
- self.db = Database("db.sqlite3")
scheduler = BackgroundScheduler()
- scheduler.add_job(self.check_new_task, "interval", seconds=1)
+ scheduler.add_job(self.check_completed_tasks, "interval", seconds=1)
scheduler.start()
- def check_new_task(self):
- if self.current_task is None:
- task = self.db.dequeue()
+ # TODO load from config
+ self.crawl_servers = [
+ CrawlServer("http://localhost:5001"),
+ ]
- if task:
- website_id, post_id, comment_id = task
- website = self.db.get_website_by_id(website_id)
- self.current_task = Process(target=self.execute_task,
- args=(website, self.busy, post_id, comment_id))
- self.current_website = website
- self.current_task.start()
+ def check_completed_tasks(self):
+ return self._get_available_crawl_server().get_completed_tasks()
- elif self.busy.value == 0:
- self.current_task.terminate()
- self.current_task = None
- self.current_website = None
+ def dispatch_task(self, task: Task):
+ self._get_available_crawl_server().queue_task(task)
- def execute_task(self, website: Website, busy: Value, post_id: str, comment_id: str):
- busy.value = 1
- if os.path.exists("data.json"):
- os.remove("data.json")
- print("Started crawling task")
- process = CrawlerProcess(get_project_settings())
- process.crawl("od_links", base_url=website.url)
- process.start()
- print("Done crawling")
+ def _get_available_crawl_server(self) -> CrawlServer:
+ # TODO: Load balancing & health check for crawl servers
+ return self.crawl_servers[0]
- self.db.import_json("data.json", website)
- os.remove("data.json")
- print("Imported in SQLite3")
+ def get_queued_tasks(self) -> list:
- # TODO: Extract 'callbacks' for posts and comments in a function
- if post_id:
- # Reply to post
- stats = self.db.get_website_stats(website.id)
- comment = self.reddit_bot.get_comment({"": stats}, website.id)
- print(comment)
- if "total_size" in stats and stats["total_size"] > 10000000:
- post = self.reddit_bot.reddit.submission(post_id)
- self.reddit_bot.reply(post, comment)
- pass
- else:
- self.reddit_bot.log_crawl(post_id)
+ queued_tasks = []
- elif comment_id:
- # Reply to comment
- stats = self.db.get_website_stats(website.id)
- comment = self.reddit_bot.get_comment({"There you go!": stats}, website.id)
- print(comment)
- reddit_comment = self.reddit_bot.reddit.comment(comment_id)
- self.reddit_bot.reply(reddit_comment, comment)
+ for server in self.crawl_servers:
+ queued_tasks.extend(server.get_queued_tasks())
- busy.value = 0
- print("Done crawling task")
+ return queued_tasks
+
+ def get_current_tasks(self) -> list:
+
+ current_tasks = []
+ for server in self.crawl_servers:
+ current_tasks.extend(server.get_current_tasks())
+
+ return current_tasks
diff --git a/task_db_init.sql b/task_db_init.sql
index 809694f..61a27b4 100644
--- a/task_db_init.sql
+++ b/task_db_init.sql
@@ -14,5 +14,6 @@ CREATE TABLE TaskResult (
status_code TEXT,
file_count INT,
start_time INT,
- end_time INT
+ end_time INT,
+ indexed_time INT DEFAULT NULL
);
\ No newline at end of file
diff --git a/templates/submit.html b/templates/submit.html
index 61c2c85..a9d8dd8 100644
--- a/templates/submit.html
+++ b/templates/submit.html
@@ -71,15 +71,17 @@
Url
- Date added
+ Priority
+ Task type