diff --git a/app.py b/app.py index 7a8bb53..9fe8d3f 100644 --- a/app.py +++ b/app.py @@ -202,7 +202,7 @@ def enqueue_bulk(): if urls: urls = urls.split() - if 0 < len(urls) <= 10: + if 0 < len(urls) <= 1000000000000: for url in urls: url = os.path.join(url, "") diff --git a/clean_invalid_websites.py b/clean_invalid_websites.py deleted file mode 100644 index 620459b..0000000 --- a/clean_invalid_websites.py +++ /dev/null @@ -1,9 +0,0 @@ -from database import Database - - -db = Database("db.sqlite3") -websites_to_delete = db.get_websites_smaller(10000000) -for website_id in [x[0] for x in websites_to_delete]: - db.clear_website(website_id) - db.delete_website(website_id) - print("Deleted " + str(website_id)) \ No newline at end of file diff --git a/crawl_server/callbacks.py b/crawl_server/callbacks.py new file mode 100644 index 0000000..ae0faee --- /dev/null +++ b/crawl_server/callbacks.py @@ -0,0 +1,61 @@ +from crawl_server.database import Task +from crawl_server.reddit_bot import RedditBot +import praw + + +class PostCrawlCallback: + + def __init__(self, task: Task): + self.task = task + + def run(self): + raise NotImplementedError + + +class PostCrawlCallbackFactory: + + @staticmethod + def get_callback(task: Task): + + if task.callback_type == "reddit_post": + return RedditPostCallback(task) + + elif task.callback_type == "reddit_comment": + return RedditCommentCallback(task) + + elif task.callback_type == "discord": + return DiscordCallback(task) + + +class RedditCallback(PostCrawlCallback): + + def __init__(self, task: Task): + super().__init__(task) + + reddit = praw.Reddit('opendirectories-bot', + user_agent='github.com/simon987/od-database (by /u/Hexahedr_n)') + self.reddit_bot = RedditBot("crawled.txt", reddit) + + def run(self): + raise NotImplementedError + + +class RedditPostCallback(RedditCallback): + + def run(self): + print("Reddit post callback for task " + str(self.task)) + pass + + +class RedditCommentCallback(RedditCallback): + + def run(self): + print("Reddit comment callback for task " + str(self.task)) + pass + + +class DiscordCallback(PostCrawlCallback): + + def run(self): + print("Discord callback for task " + str(self.task)) + pass diff --git a/crawl_server/database.py b/crawl_server/database.py index c0acc25..f5c7f70 100644 --- a/crawl_server/database.py +++ b/crawl_server/database.py @@ -41,9 +41,12 @@ class Task: "callback_args": json.dumps(self.callback_args) } - def __repr__(self): + def __str__(self): return json.dumps(self.to_json()) + def __repr__(self): + return self.__str__() + class TaskManagerDatabase: diff --git a/reddit_bot.py b/crawl_server/reddit_bot.py similarity index 100% rename from reddit_bot.py rename to crawl_server/reddit_bot.py diff --git a/crawl_server/remote_http.py b/crawl_server/remote_http.py index 9f00ddf..b9affc5 100644 --- a/crawl_server/remote_http.py +++ b/crawl_server/remote_http.py @@ -36,7 +36,7 @@ class HttpDirectory(RemoteDirectory): def __init__(self, url): super().__init__(url) - self.parser = etree.HTMLParser(collect_ids=False) + self.parser = etree.HTMLParser(collect_ids=False, encoding="utf-8") def list_dir(self, path) -> list: results = [] diff --git a/crawl_server/server.py b/crawl_server/server.py index 0cfa15d..99d4527 100644 --- a/crawl_server/server.py +++ b/crawl_server/server.py @@ -1,19 +1,33 @@ -from flask import Flask, request, abort, Response, send_from_directory +from flask import Flask, request, abort, Response +from flask_httpauth import HTTPTokenAuth import json -from crawl_server.task_manager import TaskManager, Task, TaskResult +from crawl_server.task_manager import TaskManager, Task import os +import config app = Flask(__name__) +auth = HTTPTokenAuth(scheme="Token") -tm = TaskManager("tm_db.sqlite3", 2) +tokens = [config.CRAWL_SERVER_TOKEN] + +tm = TaskManager("tm_db.sqlite3", 8) + + +@auth.verify_token +def verify_token(token): + print(token) + if token in tokens: + return True @app.route("/task/") +@auth.login_required def get_tasks(): json_str = json.dumps([task.to_json() for task in tm.get_tasks()]) return Response(json_str, mimetype="application/json") @app.route("/task/put", methods=["POST"]) +@auth.login_required def task_put(): if request.json: @@ -34,12 +48,14 @@ def task_put(): @app.route("/task/completed", methods=["GET"]) +@auth.login_required def get_completed_tasks(): json_str = json.dumps([result.to_json() for result in tm.get_non_indexed_results()]) return json_str @app.route("/task/current", methods=["GET"]) +@auth.login_required def get_current_tasks(): current_tasks = tm.get_current_tasks() @@ -47,6 +63,7 @@ def get_current_tasks(): @app.route("/file_list//") +@auth.login_required def get_file_list(website_id): file_name = "./crawled/" + str(website_id) + ".json" @@ -62,4 +79,4 @@ def get_file_list(website_id): if __name__ == "__main__": - app.run(port=5001) + app.run(port=5002) diff --git a/crawl_server/task_manager.py b/crawl_server/task_manager.py index 7641cb9..b754b09 100644 --- a/crawl_server/task_manager.py +++ b/crawl_server/task_manager.py @@ -4,11 +4,12 @@ from multiprocessing import Manager from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime from crawl_server.crawler import RemoteDirectoryCrawler +from crawl_server.callbacks import PostCrawlCallbackFactory class TaskManager: - def __init__(self, db_path, max_processes=4): + def __init__(self, db_path, max_processes=2): self.db_path = db_path self.db = TaskManagerDatabase(db_path) self.pool = ProcessPoolExecutor(max_workers=max_processes) @@ -53,7 +54,7 @@ class TaskManager: print("Starting task " + task.url) - crawler = RemoteDirectoryCrawler(task.url, 30) + crawler = RemoteDirectoryCrawler(task.url, 100) crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json") result.file_count = crawl_result.file_count @@ -62,6 +63,11 @@ class TaskManager: result.end_time = datetime.utcnow() print("End task " + task.url) + callback = PostCrawlCallbackFactory.get_callback(task) + if callback: + callback.run() + print("Executed callback") + return result, db_path, current_tasks @staticmethod diff --git a/queue_reddit_links.py b/queue_reddit_links.py index 5005d6b..c5cdb95 100644 --- a/queue_reddit_links.py +++ b/queue_reddit_links.py @@ -1,14 +1,16 @@ import praw -from reddit_bot import RedditBot +from crawl_server.reddit_bot import RedditBot +from search.search import ElasticSearchEngine from database import Database, Website import od_util import os import re -pattern = re.compile("[\[\]\\\()]+") +chars_to_remove_from_comment = re.compile("[\[\]\\\()]+") reddit = praw.Reddit('opendirectories-bot', user_agent='github.com/simon987/od-database v1.0 (by /u/Hexahedr_n)') db = Database("db.sqlite3") +search = ElasticSearchEngine("od-database") subreddit = reddit.subreddit("opendirectories") # subreddit = reddit.subreddit("test") bot = RedditBot("crawled.txt", reddit) @@ -17,7 +19,7 @@ submissions = [] def handle_exact_repost(website_id, reddit_obj): - stats = db.get_website_stats(website_id) + stats = search.get_stats(website_id) comment = bot.get_comment({"": stats}, website_id, "I already scanned this website on " + website.last_modified + " UTC") print(comment) @@ -48,7 +50,7 @@ def handle_subdir_repost(website_id, reddit_obj): for comment in subreddit.comments(limit=50): if not bot.has_crawled(comment): - text = pattern.sub(" ", comment.body).strip() + text = chars_to_remove_from_comment.sub(" ", comment.body).strip() if text.startswith("u/opendirectories-bot") or text.startswith("/u/opendirectories-bot"): lines = text.split() if len(lines) > 1: diff --git a/requirements.txt b/requirements.txt index e9ae70f..d69162a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,4 +12,5 @@ bcrypt ftputil lxml elasticsearch -python-dateutil \ No newline at end of file +python-dateutil +flask_httpauth \ No newline at end of file diff --git a/stress_test.py b/stress_test.py index f4cf58f..1d23edc 100644 --- a/stress_test.py +++ b/stress_test.py @@ -6,6 +6,14 @@ from concurrent.futures import ThreadPoolExecutor import requests import random +terms = requests.get("https://svnweb.freebsd.org/csrg/share/dict/words?view=co&content-type=text/plain") \ + .text.splitlines() +exts = [ + "zip", "exe", "mp3", "avi", "mp4", "rar", "7zip", "ogg", "m4a", "flac", "doc", "docx", "aac", "xls", + "cab", "txt", "c", "java", "class", "jar", "py", "cpp", "h", "png", "jpg", "jpeg", "ttf", "torrent", + "part", "blend", "3ds", "obj", "ico", "html", "css", "js", "ts", "ape", "asm", "nasm", "fasm", "o", + "so", "dll", "tar", "gz", "bin", "cad", "cmd", "bat", "sh", "md" +] def dump_local_filesystem(root_dir: str): @@ -29,6 +37,31 @@ def dump_local_filesystem(root_dir: str): f.writelines(json.dumps(doc) + "\n" for doc in docs) +def random_path(): + return "/".join(random.choices(terms, k=random.randint(1, 5))) + + +def random_file_name(): + return random.choice(["_", " ", "-", ".", "#", ""]).\ + join(random.choices(terms, k=random.randint(1, 3))) + "." + random.choice(exts) + + +def get_random_file(): + + doc = dict() + doc["name"] = random_file_name() + doc["path"] = random_path() + doc["mtime"] = random.randint(0, 10000000) + doc["size"] = random.randint(-1, 100000000000000) + + return doc + + +def dump_random_files(count=10): + with open("random_dump.json", "w") as f: + f.writelines(json.dumps(get_random_file()) + "\n" for _ in range(count)) + + def index_file_list(path: str, website_id): es = ElasticSearchEngine("od-database") @@ -43,14 +76,12 @@ def search(term=""): def random_searches(count=10000000, max_workers=1000): - terms = requests.get("https://svnweb.freebsd.org/csrg/share/dict/words?view=co&content-type=text/plain")\ - .text.splitlines() - pool = ThreadPoolExecutor(max_workers=max_workers) pool.map(search, random.choices(terms, k=count)) # dump_local_filesystem("/mnt/") -index_file_list("crawl_server/crawled/123.json", 10) +# index_file_list("crawl_server/crawled/123.json", 10) # random_searches(100000) +dump_random_files(20000 * 100000) diff --git a/task.py b/task.py index abfd2c5..aa06b67 100644 --- a/task.py +++ b/task.py @@ -4,14 +4,14 @@ from crawl_server.database import Task, TaskResult import requests from requests.exceptions import ConnectionError import json -from reddit_bot import RedditBot -import praw +import config class CrawlServer: headers = { - "Content-Type": "application/json" + "Content-Type": "application/json", + "Authorization": "Token " + config.CRAWL_SERVER_TOKEN, } def __init__(self, url): @@ -73,11 +73,6 @@ class CrawlServer: class TaskDispatcher: def __init__(self): - # TODO: remove reddit - reddit = praw.Reddit('opendirectories-bot', - user_agent='github.com/simon987/od-database v1.0 (by /u/Hexahedr_n)') - self.reddit_bot = RedditBot("crawled.txt", reddit) - scheduler = BackgroundScheduler() scheduler.add_job(self.check_completed_tasks, "interval", seconds=1) scheduler.start() diff --git a/update_website.py b/update_website.py deleted file mode 100644 index af48b36..0000000 --- a/update_website.py +++ /dev/null @@ -1,21 +0,0 @@ -from database import Database -import od_util -import datetime - - -db = Database("db.sqlite3") - -websites_to_update = db.get_websites_older(datetime.timedelta(minutes=5)) - -if websites_to_update: - for website_id in websites_to_update: - website = db.get_website_by_id(website_id) - - # Ignore if website is down - if od_util.is_od(website.url): - # If website is still up, re-scan it - print("Re-scanning " + str(website_id)) - db.clear_website(website_id) - db.enqueue(website_id) - else: - print("Website is down")