Started working on post-crawl callbacks and basic auth for crawl servers

This commit is contained in:
Simon 2018-06-14 15:05:56 -04:00
parent 1bd58468eb
commit 83ca579ec7
13 changed files with 142 additions and 56 deletions

2
app.py
View File

@ -202,7 +202,7 @@ def enqueue_bulk():
if urls: if urls:
urls = urls.split() urls = urls.split()
if 0 < len(urls) <= 10: if 0 < len(urls) <= 1000000000000:
for url in urls: for url in urls:
url = os.path.join(url, "") url = os.path.join(url, "")

View File

@ -1,9 +0,0 @@
from database import Database
db = Database("db.sqlite3")
websites_to_delete = db.get_websites_smaller(10000000)
for website_id in [x[0] for x in websites_to_delete]:
db.clear_website(website_id)
db.delete_website(website_id)
print("Deleted " + str(website_id))

61
crawl_server/callbacks.py Normal file
View File

@ -0,0 +1,61 @@
from crawl_server.database import Task
from crawl_server.reddit_bot import RedditBot
import praw
class PostCrawlCallback:
def __init__(self, task: Task):
self.task = task
def run(self):
raise NotImplementedError
class PostCrawlCallbackFactory:
@staticmethod
def get_callback(task: Task):
if task.callback_type == "reddit_post":
return RedditPostCallback(task)
elif task.callback_type == "reddit_comment":
return RedditCommentCallback(task)
elif task.callback_type == "discord":
return DiscordCallback(task)
class RedditCallback(PostCrawlCallback):
def __init__(self, task: Task):
super().__init__(task)
reddit = praw.Reddit('opendirectories-bot',
user_agent='github.com/simon987/od-database (by /u/Hexahedr_n)')
self.reddit_bot = RedditBot("crawled.txt", reddit)
def run(self):
raise NotImplementedError
class RedditPostCallback(RedditCallback):
def run(self):
print("Reddit post callback for task " + str(self.task))
pass
class RedditCommentCallback(RedditCallback):
def run(self):
print("Reddit comment callback for task " + str(self.task))
pass
class DiscordCallback(PostCrawlCallback):
def run(self):
print("Discord callback for task " + str(self.task))
pass

View File

@ -41,9 +41,12 @@ class Task:
"callback_args": json.dumps(self.callback_args) "callback_args": json.dumps(self.callback_args)
} }
def __repr__(self): def __str__(self):
return json.dumps(self.to_json()) return json.dumps(self.to_json())
def __repr__(self):
return self.__str__()
class TaskManagerDatabase: class TaskManagerDatabase:

View File

@ -36,7 +36,7 @@ class HttpDirectory(RemoteDirectory):
def __init__(self, url): def __init__(self, url):
super().__init__(url) super().__init__(url)
self.parser = etree.HTMLParser(collect_ids=False) self.parser = etree.HTMLParser(collect_ids=False, encoding="utf-8")
def list_dir(self, path) -> list: def list_dir(self, path) -> list:
results = [] results = []

View File

@ -1,19 +1,33 @@
from flask import Flask, request, abort, Response, send_from_directory from flask import Flask, request, abort, Response
from flask_httpauth import HTTPTokenAuth
import json import json
from crawl_server.task_manager import TaskManager, Task, TaskResult from crawl_server.task_manager import TaskManager, Task
import os import os
import config
app = Flask(__name__) app = Flask(__name__)
auth = HTTPTokenAuth(scheme="Token")
tm = TaskManager("tm_db.sqlite3", 2) tokens = [config.CRAWL_SERVER_TOKEN]
tm = TaskManager("tm_db.sqlite3", 8)
@auth.verify_token
def verify_token(token):
print(token)
if token in tokens:
return True
@app.route("/task/") @app.route("/task/")
@auth.login_required
def get_tasks(): def get_tasks():
json_str = json.dumps([task.to_json() for task in tm.get_tasks()]) json_str = json.dumps([task.to_json() for task in tm.get_tasks()])
return Response(json_str, mimetype="application/json") return Response(json_str, mimetype="application/json")
@app.route("/task/put", methods=["POST"]) @app.route("/task/put", methods=["POST"])
@auth.login_required
def task_put(): def task_put():
if request.json: if request.json:
@ -34,12 +48,14 @@ def task_put():
@app.route("/task/completed", methods=["GET"]) @app.route("/task/completed", methods=["GET"])
@auth.login_required
def get_completed_tasks(): def get_completed_tasks():
json_str = json.dumps([result.to_json() for result in tm.get_non_indexed_results()]) json_str = json.dumps([result.to_json() for result in tm.get_non_indexed_results()])
return json_str return json_str
@app.route("/task/current", methods=["GET"]) @app.route("/task/current", methods=["GET"])
@auth.login_required
def get_current_tasks(): def get_current_tasks():
current_tasks = tm.get_current_tasks() current_tasks = tm.get_current_tasks()
@ -47,6 +63,7 @@ def get_current_tasks():
@app.route("/file_list/<int:website_id>/") @app.route("/file_list/<int:website_id>/")
@auth.login_required
def get_file_list(website_id): def get_file_list(website_id):
file_name = "./crawled/" + str(website_id) + ".json" file_name = "./crawled/" + str(website_id) + ".json"
@ -62,4 +79,4 @@ def get_file_list(website_id):
if __name__ == "__main__": if __name__ == "__main__":
app.run(port=5001) app.run(port=5002)

View File

@ -4,11 +4,12 @@ from multiprocessing import Manager
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime from datetime import datetime
from crawl_server.crawler import RemoteDirectoryCrawler from crawl_server.crawler import RemoteDirectoryCrawler
from crawl_server.callbacks import PostCrawlCallbackFactory
class TaskManager: class TaskManager:
def __init__(self, db_path, max_processes=4): def __init__(self, db_path, max_processes=2):
self.db_path = db_path self.db_path = db_path
self.db = TaskManagerDatabase(db_path) self.db = TaskManagerDatabase(db_path)
self.pool = ProcessPoolExecutor(max_workers=max_processes) self.pool = ProcessPoolExecutor(max_workers=max_processes)
@ -53,7 +54,7 @@ class TaskManager:
print("Starting task " + task.url) print("Starting task " + task.url)
crawler = RemoteDirectoryCrawler(task.url, 30) crawler = RemoteDirectoryCrawler(task.url, 100)
crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json") crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json")
result.file_count = crawl_result.file_count result.file_count = crawl_result.file_count
@ -62,6 +63,11 @@ class TaskManager:
result.end_time = datetime.utcnow() result.end_time = datetime.utcnow()
print("End task " + task.url) print("End task " + task.url)
callback = PostCrawlCallbackFactory.get_callback(task)
if callback:
callback.run()
print("Executed callback")
return result, db_path, current_tasks return result, db_path, current_tasks
@staticmethod @staticmethod

View File

@ -1,14 +1,16 @@
import praw import praw
from reddit_bot import RedditBot from crawl_server.reddit_bot import RedditBot
from search.search import ElasticSearchEngine
from database import Database, Website from database import Database, Website
import od_util import od_util
import os import os
import re import re
pattern = re.compile("[\[\]\\\()]+") chars_to_remove_from_comment = re.compile("[\[\]\\\()]+")
reddit = praw.Reddit('opendirectories-bot', reddit = praw.Reddit('opendirectories-bot',
user_agent='github.com/simon987/od-database v1.0 (by /u/Hexahedr_n)') user_agent='github.com/simon987/od-database v1.0 (by /u/Hexahedr_n)')
db = Database("db.sqlite3") db = Database("db.sqlite3")
search = ElasticSearchEngine("od-database")
subreddit = reddit.subreddit("opendirectories") subreddit = reddit.subreddit("opendirectories")
# subreddit = reddit.subreddit("test") # subreddit = reddit.subreddit("test")
bot = RedditBot("crawled.txt", reddit) bot = RedditBot("crawled.txt", reddit)
@ -17,7 +19,7 @@ submissions = []
def handle_exact_repost(website_id, reddit_obj): def handle_exact_repost(website_id, reddit_obj):
stats = db.get_website_stats(website_id) stats = search.get_stats(website_id)
comment = bot.get_comment({"": stats}, website_id, comment = bot.get_comment({"": stats}, website_id,
"I already scanned this website on " + website.last_modified + " UTC") "I already scanned this website on " + website.last_modified + " UTC")
print(comment) print(comment)
@ -48,7 +50,7 @@ def handle_subdir_repost(website_id, reddit_obj):
for comment in subreddit.comments(limit=50): for comment in subreddit.comments(limit=50):
if not bot.has_crawled(comment): if not bot.has_crawled(comment):
text = pattern.sub(" ", comment.body).strip() text = chars_to_remove_from_comment.sub(" ", comment.body).strip()
if text.startswith("u/opendirectories-bot") or text.startswith("/u/opendirectories-bot"): if text.startswith("u/opendirectories-bot") or text.startswith("/u/opendirectories-bot"):
lines = text.split() lines = text.split()
if len(lines) > 1: if len(lines) > 1:

View File

@ -13,3 +13,4 @@ ftputil
lxml lxml
elasticsearch elasticsearch
python-dateutil python-dateutil
flask_httpauth

View File

@ -6,6 +6,14 @@ from concurrent.futures import ThreadPoolExecutor
import requests import requests
import random import random
terms = requests.get("https://svnweb.freebsd.org/csrg/share/dict/words?view=co&content-type=text/plain") \
.text.splitlines()
exts = [
"zip", "exe", "mp3", "avi", "mp4", "rar", "7zip", "ogg", "m4a", "flac", "doc", "docx", "aac", "xls",
"cab", "txt", "c", "java", "class", "jar", "py", "cpp", "h", "png", "jpg", "jpeg", "ttf", "torrent",
"part", "blend", "3ds", "obj", "ico", "html", "css", "js", "ts", "ape", "asm", "nasm", "fasm", "o",
"so", "dll", "tar", "gz", "bin", "cad", "cmd", "bat", "sh", "md"
]
def dump_local_filesystem(root_dir: str): def dump_local_filesystem(root_dir: str):
@ -29,6 +37,31 @@ def dump_local_filesystem(root_dir: str):
f.writelines(json.dumps(doc) + "\n" for doc in docs) f.writelines(json.dumps(doc) + "\n" for doc in docs)
def random_path():
return "/".join(random.choices(terms, k=random.randint(1, 5)))
def random_file_name():
return random.choice(["_", " ", "-", ".", "#", ""]).\
join(random.choices(terms, k=random.randint(1, 3))) + "." + random.choice(exts)
def get_random_file():
doc = dict()
doc["name"] = random_file_name()
doc["path"] = random_path()
doc["mtime"] = random.randint(0, 10000000)
doc["size"] = random.randint(-1, 100000000000000)
return doc
def dump_random_files(count=10):
with open("random_dump.json", "w") as f:
f.writelines(json.dumps(get_random_file()) + "\n" for _ in range(count))
def index_file_list(path: str, website_id): def index_file_list(path: str, website_id):
es = ElasticSearchEngine("od-database") es = ElasticSearchEngine("od-database")
@ -43,14 +76,12 @@ def search(term=""):
def random_searches(count=10000000, max_workers=1000): def random_searches(count=10000000, max_workers=1000):
terms = requests.get("https://svnweb.freebsd.org/csrg/share/dict/words?view=co&content-type=text/plain")\
.text.splitlines()
pool = ThreadPoolExecutor(max_workers=max_workers) pool = ThreadPoolExecutor(max_workers=max_workers)
pool.map(search, random.choices(terms, k=count)) pool.map(search, random.choices(terms, k=count))
# dump_local_filesystem("/mnt/") # dump_local_filesystem("/mnt/")
index_file_list("crawl_server/crawled/123.json", 10) # index_file_list("crawl_server/crawled/123.json", 10)
# random_searches(100000) # random_searches(100000)
dump_random_files(20000 * 100000)

11
task.py
View File

@ -4,14 +4,14 @@ from crawl_server.database import Task, TaskResult
import requests import requests
from requests.exceptions import ConnectionError from requests.exceptions import ConnectionError
import json import json
from reddit_bot import RedditBot import config
import praw
class CrawlServer: class CrawlServer:
headers = { headers = {
"Content-Type": "application/json" "Content-Type": "application/json",
"Authorization": "Token " + config.CRAWL_SERVER_TOKEN,
} }
def __init__(self, url): def __init__(self, url):
@ -73,11 +73,6 @@ class CrawlServer:
class TaskDispatcher: class TaskDispatcher:
def __init__(self): def __init__(self):
# TODO: remove reddit
reddit = praw.Reddit('opendirectories-bot',
user_agent='github.com/simon987/od-database v1.0 (by /u/Hexahedr_n)')
self.reddit_bot = RedditBot("crawled.txt", reddit)
scheduler = BackgroundScheduler() scheduler = BackgroundScheduler()
scheduler.add_job(self.check_completed_tasks, "interval", seconds=1) scheduler.add_job(self.check_completed_tasks, "interval", seconds=1)
scheduler.start() scheduler.start()

View File

@ -1,21 +0,0 @@
from database import Database
import od_util
import datetime
db = Database("db.sqlite3")
websites_to_update = db.get_websites_older(datetime.timedelta(minutes=5))
if websites_to_update:
for website_id in websites_to_update:
website = db.get_website_by_id(website_id)
# Ignore if website is down
if od_util.is_od(website.url):
# If website is still up, re-scan it
print("Re-scanning " + str(website_id))
db.clear_website(website_id)
db.enqueue(website_id)
else:
print("Website is down")