diff --git a/.gitignore b/.gitignore index bd07f71..aa66ea9 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,6 @@ config.py db.sqlite3 oddb.log praw.ini -tmp/ \ No newline at end of file +env/ +worker.json +search_blacklist.txt diff --git a/.gitmodules b/.gitmodules index 6be6199..5451fff 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,9 @@ [submodule "fold_to_ascii"] path = fold_to_ascii url = https://github.com/spanishdict/fold_to_ascii +[submodule "task_tracker_drone"] + path = task_tracker_drone + url = https://github.com/simon987/task_tracker_drone +[submodule "ws_bucket_client"] + path = ws_bucket_client + url = https://github.com/simon987/ws_bucket_client diff --git a/README.md b/README.md index 882eb80..4f3dc46 100644 --- a/README.md +++ b/README.md @@ -36,9 +36,6 @@ RESULTS_PER_PAGE = (25, 50, 100, 250, 500, 1000) SUBMIT_FTP = False # Allow http(s) websites in /submit SUBMIT_HTTP = True - -SERVER_URL = "http://localhost/api" -API_TOKEN = "5817926d-f2f9-4422-a411-a98f1bfe4b6c" ``` ## Running the crawl server @@ -54,7 +51,7 @@ python3 app.py ## Running the web server with Nginx (production) * Install dependencies: ```bash -sudo apt install build-essential python-dev +sudo apt install build-essential python-dev redis-server sudo pip install uwsgi ``` * Adjust the path in `od-database.ini` diff --git a/__init__.py b/__init__.py index 25a3e0b..139597f 100644 --- a/__init__.py +++ b/__init__.py @@ -1,5 +1,2 @@ -import logging -from logging import FileHandler, StreamHandler -import sys diff --git a/api.py b/api.py index 6e0f18f..9d10c84 100644 --- a/api.py +++ b/api.py @@ -1,87 +1,15 @@ import json -import os -from threading import Lock from uuid import uuid4 -from flask import request, abort, Response, send_file, session +from flask import request, abort, send_file, session -import common as oddb import captcha -from callbacks import PostCrawlCallbackFactory -from database import Task, Website +import common as oddb +from database import Website from search.search import InvalidQueryException -from tasks import TaskResult - -uploadLock = Lock() def setup_api(app): - @app.route("/api/task/complete", methods=["POST"]) - def api_complete_task(): - # TODO: task_tracker - token = request.form.get("token") - name = oddb.db.check_api_token(token) - - if name: - tr = json.loads(request.form.get("result")) - oddb.logger.debug("Task result: " + str(tr)) - task_result = TaskResult(tr["status_code"], tr["file_count"], tr["start_time"], tr["end_time"], - tr["website_id"]) - - oddb.logger.info("Task for " + str(task_result.website_id) + " completed by " + name) - task = oddb.db.complete_task(task_result.website_id, name) - - if task: - - filename = "./tmp/" + str(task_result.website_id) + ".json" - if not os.path.exists(filename): - filename = None - oddb.taskManager.complete_task(filename, task, task_result, name) - - if filename and os.path.exists(filename): - os.remove(filename) - - # Handle task callback - callback = PostCrawlCallbackFactory.get_callback(task) - if callback: - callback.run(task_result, oddb.search) - - return "Successfully logged task result and indexed files" - - else: - oddb.logger.error("ERROR: " + name + " indicated that task for " + str(task_result.website_id) + - " was completed but there is no such task in the database.") - return "No such task" - return abort(403) - - @app.route("/api/task/upload", methods=["POST"]) - def api_upload(): - token = request.form.get("token") - name = oddb.db.check_api_token(token) - - if name: - website_id = request.form.get("website_id") - oddb.logger.debug("Result part upload for '" + str(website_id) + "' by " + name) - - if "file_list" in request.files: - file = request.files['file_list'] - - filename = "./tmp/" + str(website_id) + ".json" - - # Read the file into memory cuz if the request fails - # no file is corrupted. - buf = file.stream.read() - - # Write to file (create if not exists) when - # everything read successfully. - with uploadLock: - with open(filename, "a+b") as f: - f.write(buf) - - oddb.logger.debug("Written chunk to file") - return "ok" - else: - return abort(403) @app.route("/api/website/by_url", methods=["GET"]) def api_website_by_url(): @@ -126,52 +54,6 @@ def setup_api(app): else: return abort(403) - @app.route("/api/task/force_enqueue", methods=["POST"]) - def api_task_enqueue(): - try: - token = request.json["token"] - except KeyError: - return abort(400) - - name = oddb.db.check_api_token(token) - - if name: - - task = Task( - request.json["website_id"], - request.json["url"], - request.json["priority"], - request.json["callback_type"], - json.dumps(request.json["callback_args"]) - ) - - oddb.logger.info("API force enqueue by " + name + "\n(" + str(task.to_json()) + ")") - - oddb.taskManager.queue_task(task) - return "" - else: - return abort(403) - - @app.route("/api/task/try_enqueue", methods=["POST"]) - def api_task_try_enqueue(): - token = request.form.get("token") - name = oddb.db.check_api_token(token) - - if name: - - url = request.form.get("url") - # TODO: task_tracker - message, result = oddb.try_enqueue(url) - - oddb.logger.info("API try enqueue '" + url + "' by " + name + " (" + message + ")") - - return json.dumps({ - "message": message, - "result": result - }) - else: - return abort(403) - @app.route("/api/website/random") def api_random_website(): token = request.json["token"] @@ -215,9 +97,10 @@ def setup_api(app): @app.route("/cap", methods=["GET"]) def cap(): word = captcha.make_captcha() - cap_id = uuid4() + cap_id = uuid4().__str__() session["cap"] = cap_id - oddb.sessionStore[cap_id] = word + + oddb.redis.set(cap_id, word) return send_file(captcha.get_path(word), cache_timeout=0) diff --git a/app.py b/app.py index 7614e45..1e3112f 100644 --- a/app.py +++ b/app.py @@ -2,8 +2,8 @@ from flask import Flask import api import config -import views import template_filters +import views app = Flask(__name__) app.secret_key = config.FLASK_SECRET diff --git a/callbacks.py b/callbacks.py deleted file mode 100644 index 647c963..0000000 --- a/callbacks.py +++ /dev/null @@ -1,73 +0,0 @@ -from tasks import Task, TaskResult -from reddit_bot import RedditBot -import praw -from search.search import SearchEngine -import json - - -class PostCrawlCallback: - - def __init__(self, task: Task): - self.task = task - - if self.task.callback_args: - self.task.callback_args = json.loads(self.task.callback_args) - - def run(self, task_result: TaskResult, search: SearchEngine): - raise NotImplementedError - - -class PostCrawlCallbackFactory: - - @staticmethod - def get_callback(task: Task): - - if task.callback_type == "reddit_post": - return RedditPostCallback(task) - - elif task.callback_type == "reddit_comment": - return RedditCommentCallback(task) - - elif task.callback_type == "discord": - return DiscordCallback(task) - - -class RedditCallback(PostCrawlCallback): - - def __init__(self, task: Task): - super().__init__(task) - - reddit = praw.Reddit('opendirectories-bot', - user_agent='github.com/simon987/od-database (by /u/Hexahedr_n)') - self.reddit_bot = RedditBot("crawled.txt", reddit) - - def run(self, task_result: TaskResult, search: SearchEngine): - raise NotImplementedError - - -class RedditPostCallback(RedditCallback): - - def run(self, task_result: TaskResult, search: SearchEngine): - print("Reddit post callback for task " + str(self.task)) - - -class RedditCommentCallback(RedditCallback): - - def run(self, task_result: TaskResult, search: SearchEngine): - - comment_id = self.task.callback_args["comment_id"] - print("Editing comment comment " + comment_id) - - search.refresh() # Make sure the newly indexed documents are available before commenting - stats = search.get_stats(self.task.website_id) - message = self.reddit_bot.get_comment(stats, self.task.website_id, - message="There you go! This website was crawled in `" + - str(int(task_result.end_time - task_result.start_time)) + "s`") - print(message) - self.reddit_bot.edit(self.reddit_bot.reddit.comment(comment_id), message) - - -class DiscordCallback(PostCrawlCallback): - - def run(self, task_result: TaskResult, search: SearchEngine): - print("Discord callback for task " + str(self.task)) diff --git a/captcha.py b/captcha.py index 42fee89..2352b38 100644 --- a/captcha.py +++ b/captcha.py @@ -4,8 +4,8 @@ import string from PIL import Image, ImageDraw, ImageFont from flask import request, session -import config import common as oddb +import config def get_code(): @@ -36,9 +36,14 @@ def verify(): request.args.get("cap") if "cap" in request.args else "" ) - if "cap" in session and session["cap"] in oddb.sessionStore and oddb.sessionStore[session["cap"]] == attempt: - session["cap_remaining"] = config.CAPTCHA_EVERY - return True + if "cap" in session: + expected = oddb.redis.get(session["cap"]).decode("utf8") + oddb.redis.delete(session["cap"]) + + if expected == attempt: + session["cap_remaining"] = config.CAPTCHA_EVERY + return True + return False diff --git a/common.py b/common.py index baa8282..fe68ab1 100644 --- a/common.py +++ b/common.py @@ -1,12 +1,13 @@ +import logging +import sys from logging import FileHandler, StreamHandler -import sys +import redis as r +from flask import session, abort from database import Database from search.search import ElasticSearchEngine from tasks import TaskManager -import logging -from flask import session, abort # Disable flask logging flaskLogger = logging.getLogger('werkzeug') @@ -26,8 +27,7 @@ searchEngine = ElasticSearchEngine("od-database") searchEngine.start_stats_scheduler() db = Database("db.sqlite3") -# temporary hotfix... -sessionStore = dict() +redis = r.Redis() def require_role(role: str): diff --git a/database.py b/database.py index 711fc7a..d8ba1a7 100644 --- a/database.py +++ b/database.py @@ -1,4 +1,3 @@ -import json import os import sqlite3 import uuid @@ -30,32 +29,6 @@ class ApiClient: self.name = name -class Task: - - def __init__(self, website_id: int, url: str, priority: int = 1, - callback_type: str = None, callback_args: str = None): - self.website_id = website_id - self.url = url - self.priority = priority - self.callback_type = callback_type - self.callback_args = json.loads(callback_args) if callback_args else {} - - def to_json(self): - return { - "website_id": self.website_id, - "url": self.url, - "priority": self.priority, - "callback_type": self.callback_type, - "callback_args": json.dumps(self.callback_args) - } - - def __str__(self): - return json.dumps(self.to_json()) - - def __repr__(self): - return self.__str__() - - class Database: def __init__(self, db_path): @@ -152,19 +125,6 @@ class Database: website_id = cursor.fetchone() return website_id[0] if website_id else None - def make_task_for_oldest(self, assigned_crawler): - - # TODO: task_tracker - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - cursor.execute("INSERT INTO QUEUE (website_id, url, assigned_crawler) SELECT Website.id, Website.url, ? FROM Website WHERE Website.id not in (SELECT website_id FROM Queue) " - "ORDER BY last_modified ASC LIMIT 1", (assigned_crawler, )) - - cursor.execute("SELECT id, website_id, url, priority, callback_type, callback_args FROM Queue " - "WHERE id=LAST_INSERT_ROWID()") - task = cursor.fetchone() - return Task(task[1], task[2], task[3], task[4], task[5]) - def delete_website(self, website_id): with sqlite3.connect(self.db_path) as conn: diff --git a/export.py b/export.py index a2c74fb..efea079 100644 --- a/export.py +++ b/export.py @@ -1,8 +1,9 @@ -from search.search import ElasticSearchEngine -from database import Database import csv import os +from database import Database +from search.search import ElasticSearchEngine + def export(outfile="out.csv"): diff --git a/fold_to_ascii b/fold_to_ascii index 2c82209..d134a0f 160000 --- a/fold_to_ascii +++ b/fold_to_ascii @@ -1 +1 @@ -Subproject commit 2c8220921dac132e20af98f12ba724c834dcbebf +Subproject commit d134a0f61ec177b3f2419cd1956603b247ba35b9 diff --git a/jenkins/Jenkinsfile b/jenkins/Jenkinsfile new file mode 100644 index 0000000..db44afe --- /dev/null +++ b/jenkins/Jenkinsfile @@ -0,0 +1,24 @@ +def remote = [:] +remote.name = 'remote' +remote.host = env.DEPLOY_HOST +remote.user = env.DEPLOY_USER +remote.identityFile = '/var/lib/jenkins/.ssh/id_rsa' +remote.knownHosts = '/var/lib/jenkins/.ssh/known_hosts' + +pipeline { + agent any + stages { + stage('Build') { + steps { + sh './jenkins/build.sh' + stash includes: 'env/', name: 'env' + } + } + stage('Deploy') { + steps { + sshPut remote: remote, from: '.', into: 'oddb' + sshCommand remote: remote, command: 'chmod +x oddb/deploy.sh && ./oddb/deploy.sh' + } + } + } +} \ No newline at end of file diff --git a/jenkins/build.sh b/jenkins/build.sh new file mode 100755 index 0000000..2bfa054 --- /dev/null +++ b/jenkins/build.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +virtualenv env --download --clear -p python3.7 +source env/bin/activate +python --version + +pip install -r requirements.txt +git submodule update --remote --recursive +deactivate diff --git a/jenkins/deploy.sh b/jenkins/deploy.sh new file mode 100755 index 0000000..14e8954 --- /dev/null +++ b/jenkins/deploy.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +export ODDBROOT="od-database" + +screen -S oddb -X quit +echo "starting oddb_web" +screen -S tt_drone -d -m bash -c "cd ${ODDBROOT} && uwsgi od-database.ini" +sleep 1 +screen -list \ No newline at end of file diff --git a/od-database.ini b/od-database.ini index 282d72e..5dd1813 100644 --- a/od-database.ini +++ b/od-database.ini @@ -1,8 +1,10 @@ [uwsgi] socket = 127.0.0.1:3031 -chdir = /home/simon/Dropbox/data/CS/python/od-database/ wsgi-file = uwsgi.py processes = 4 threads = 4 stats = 127.0.0.1:9191 -callable=app \ No newline at end of file +callable=app +virtualenv=./env + +disable-logging=True \ No newline at end of file diff --git a/od_util.py b/od_util.py index 3407fc9..3f950dd 100644 --- a/od_util.py +++ b/od_util.py @@ -1,10 +1,11 @@ -import requests -from urllib.parse import urljoin, urlparse -from bs4 import BeautifulSoup import os -import validators import re from ftplib import FTP +from urllib.parse import urljoin, urlparse + +import requests +import validators +from bs4 import BeautifulSoup # TODO: find a better way to do this try: diff --git a/reddit_bot.py b/reddit_bot.py index 66f03f6..6878585 100644 --- a/reddit_bot.py +++ b/reddit_bot.py @@ -1,7 +1,8 @@ import os import time -import praw + import humanfriendly +import praw class RedditBot: diff --git a/requirements.txt b/requirements.txt index 0b80818..24699b2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,4 +21,6 @@ lxml pillow Wand numpy -matplotlib \ No newline at end of file +matplotlib +uwsgi +redis \ No newline at end of file diff --git a/restore.py b/restore.py index e7b50fd..eb3ba6b 100644 --- a/restore.py +++ b/restore.py @@ -1,6 +1,7 @@ -from search.search import ElasticSearchEngine import ujson +from search.search import ElasticSearchEngine + es = ElasticSearchEngine("od-database") es.reset() diff --git a/search/__init__.py b/search/__init__.py index abb4d86..fad27f7 100644 --- a/search/__init__.py +++ b/search/__init__.py @@ -1,7 +1,5 @@ import logging -from logging import FileHandler, StreamHandler - -import sys +from logging import FileHandler logger = logging.getLogger("default") logger.setLevel(logging.DEBUG) diff --git a/search/search.py b/search/search.py index 768b993..40747f1 100644 --- a/search/search.py +++ b/search/search.py @@ -1,11 +1,10 @@ -import itertools +import os +import time +import ujson import elasticsearch -import time -from elasticsearch import helpers -import os -import ujson from apscheduler.schedulers.background import BackgroundScheduler +from elasticsearch import helpers from search import logger from search.filter import SearchFilter @@ -318,8 +317,12 @@ class ElasticSearchEngine(SearchEngine): "includes": ["path", "name", "ext"] }, "query": { - "match_all": {} - } + "constant_score": { + "filter": { + "term": {"website_id": website_id} + } + } + }, }, index=self.index_name, request_timeout=20, routing=website_id) for hit in hits: diff --git a/task_tracker_drone b/task_tracker_drone new file mode 160000 index 0000000..ef07c05 --- /dev/null +++ b/task_tracker_drone @@ -0,0 +1 @@ +Subproject commit ef07c059ad30def78547562919422925e7686ca6 diff --git a/tasks.py b/tasks.py index a15ac28..d6846cc 100644 --- a/tasks.py +++ b/tasks.py @@ -1,12 +1,18 @@ +import json import logging import os +import time +from threading import Thread +from uuid import uuid4 -from apscheduler.schedulers.background import BackgroundScheduler -from search.search import ElasticSearchEngine -import json -import database import urllib3 +import config +import database +from search.search import ElasticSearchEngine +from task_tracker_drone.src.tt_drone.api import TaskTrackerApi, Worker +from ws_bucket_client.api import WsBucketApi + urllib3.disable_warnings() logger = logging.getLogger("default") @@ -15,20 +21,22 @@ logger = logging.getLogger("default") class Task: def __init__(self, website_id: int, url: str, priority: int = 1, - callback_type: str = None, callback_args: str = None): + callback_type: str = None, callback_args: str = None, + upload_token: str = None): self.website_id = website_id self.url = url self.priority = priority self.callback_type = callback_type self.callback_args = json.loads(callback_args) if callback_args else {} + self.upload_token = upload_token def to_json(self): return { "website_id": self.website_id, "url": self.url, - "priority": self.priority, "callback_type": self.callback_type, - "callback_args": json.dumps(self.callback_args) + "callback_args": json.dumps(self.callback_args), + "upload_token": self.upload_token } def __str__(self): @@ -38,25 +46,13 @@ class Task: return self.__str__() -class TaskResult: +class IndexingTask: - def __init__(self, status_code=None, file_count=0, start_time=0, - end_time=0, website_id=0, server_name=""): - self.status_code = status_code - self.file_count = file_count - self.start_time = start_time - self.end_time = end_time + def __init__(self, website_id: int, file_path: str, callback_type: str, callback_args): self.website_id = website_id - self.server_name = server_name - - def to_json(self): - return { - "status_code": self.status_code, - "file_count": self.file_count, - "start_time": self.start_time, - "end_time": self.end_time, - "website_id": self.website_id - } + self.file_path = file_path + self.callback_type = callback_type + self.callback_args = callback_args class TaskManager: @@ -64,14 +60,47 @@ class TaskManager: def __init__(self): self.search = ElasticSearchEngine("od-database") self.db = database.Database("db.sqlite3") + self.tracker = TaskTrackerApi(config.TT_API) - def complete_task(self, file_list, task, task_result, crawler_name): + self.worker = Worker.from_file(self.tracker) + if not self.worker: + self.worker = self.tracker.make_worker("oddb_master") + self.worker.dump_to_file() + self.worker.request_access(config.TT_CRAWL_PROJECT, False, True) + self.worker.request_access(config.TT_INDEX_PROJECT, True, False) - self.search.delete_docs(task_result.website_id) + self.bucket = WsBucketApi(config.WSB_API, config.WSB_SECRET) + + self._indexer_thread = Thread(target=self._do_indexing) + self._indexer_thread.start() + + def _do_indexing(self): + + while True: + logger.debug("Fetching indexing task...") + task = self.tracker.fetch_task(worker=self.worker, project_id=config.TT_INDEX_PROJECT) + + if task: + try: + recipe = task.json_recipe() + logger.debug("Got indexing task: " + str(recipe)) + filename = os.path.join(config.WSB_PATH, format_file_name(recipe["website_id"], recipe["upload_token"])) + except Exception as e: + print(e) + finally: + try: + self._complete_task(filename, Task(recipe["website_id"], recipe["url"])) + except: + pass + else: + time.sleep(5) + + def _complete_task(self, file_list, task): + + self.search.delete_docs(task.website_id) if file_list: def iter_lines(): - with open(file_list, "r") as f: line = f.readline() while line: @@ -82,11 +111,38 @@ class TaskManager: self.db.update_website_date_if_exists(task.website_id) - task_result.server_id = crawler_name + def fetch_indexing_task(self): - self.db.log_result(task_result) + task = self.tracker.fetch_task(worker=self.worker, project_id=config.TT_INDEX_PROJECT) + print(task) def queue_task(self, task: Task): - self.db.put_task(task) - print("Queued task and made it available to crawlers: " + str(task.website_id)) + + max_assign_time = 24 * 7 * 3600 + upload_token = uuid4().__str__() + + bucket_response = self.bucket.allocate(upload_token.__str__(), + 21474837499, # 20Gib + format_file_name(task.website_id, upload_token), + to_dispose_date=int(time.time() + max_assign_time), + upload_hook="") + if not bucket_response: + return + + print("Allocated upload bucket: %d, t=%s, r=%s" % (task.website_id, upload_token, bucket_response.text)) + + task.upload_token = upload_token + tracker_response = self.worker.submit_task(config.TT_CRAWL_PROJECT, + recipe=task.__str__(), + priority=task.priority, + max_assign_time=max_assign_time, + hash64=task.website_id, + verification_count=1, + max_retries=3 + ) + print("Queued task and made it available to crawlers: t=%s, r=%s" % (task, tracker_response.text)) + + +def format_file_name(website_id, token): + return "%d_%s.NDJSON" % (website_id, token, ) diff --git a/template_filters.py b/template_filters.py index 788e95d..20df35d 100644 --- a/template_filters.py +++ b/template_filters.py @@ -1,5 +1,6 @@ import datetime import time + import od_util diff --git a/tmp/README.md b/tmp/README.md deleted file mode 100644 index a9a812e..0000000 --- a/tmp/README.md +++ /dev/null @@ -1 +0,0 @@ -Files currently being indexing goes here \ No newline at end of file diff --git a/uwsgi.py b/uwsgi.py index 433d3be..86a1590 100644 --- a/uwsgi.py +++ b/uwsgi.py @@ -1,11 +1,4 @@ from app import app -import config -import ssl if __name__ == '__main__': - if not config.USE_SSL: - context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) - context.load_cert_chain('certificates/cert.pem', 'certificates/privkey.pem') - app.run("0.0.0.0", port=12345, ssl_context=context, threaded=True) - else: - app.run("0.0.0.0", port=12345, threaded=True) + app.run("0.0.0.0", port=12345) diff --git a/views.py b/views.py index 02e003e..bffc112 100644 --- a/views.py +++ b/views.py @@ -6,12 +6,12 @@ from urllib.parse import urlparse from flask import render_template, redirect, request, flash, abort, Response, session from flask_caching import Cache +import captcha import config import od_util from common import db, taskManager, searchEngine, logger, require_role from database import Task, Website from search.search import InvalidQueryException -import captcha def setup_views(app): diff --git a/ws_bucket_client b/ws_bucket_client new file mode 160000 index 0000000..2cd3d21 --- /dev/null +++ b/ws_bucket_client @@ -0,0 +1 @@ +Subproject commit 2cd3d217a7bc57cc7a11566d8ed60f48c9ea9e9e