barebones crawl_server microservice

This commit is contained in:
Simon
2018-06-11 19:00:43 -04:00
parent 8421cc0885
commit d849227798
14 changed files with 264 additions and 220 deletions

0
crawl_server/__init__.py Normal file
View File

73
crawl_server/database.py Normal file
View File

@@ -0,0 +1,73 @@
import os
import json
import sqlite3
class Task:
def __init__(self, url: str, priority: int = 1, callback_type: str = None, callback_args: str = None):
self.url = url
self.priority = priority
self.callback_type = callback_type
self.callback_args = json.loads(callback_args) if callback_args else {}
def to_json(self):
return ({
"url": self.url,
"priority": self.priority,
"callback_type": self.callback_type,
"callback_args": json.dumps(self.callback_args)
})
class TaskManagerDatabase:
def __init__(self, db_path):
self.db_path = db_path
if not os.path.exists(db_path):
self.init_database()
def init_database(self):
with open("task_db_init.sql", "r") as f:
init_script = f.read()
with sqlite3.connect(self.db_path) as conn:
conn.executescript(init_script)
conn.commit()
def pop_task(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, url, priority, callback_type, callback_args"
" FROM Queue ORDER BY priority DESC, Queue.id ASC LIMIT 1")
task = cursor.fetchone()
if task:
cursor.execute("DELETE FROM Queue WHERE id=?", (task[0],))
conn.commit()
return Task(task[1], task[2], task[3], task[4])
else:
return None
def put_task(self, task: Task):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO Queue (url, priority, callback_type, callback_args) VALUES (?,?,?,?)",
(task.url, task.priority, task.callback_type, json.dumps(task.callback_args)))
conn.commit()
def get_tasks(self):
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM Queue")
tasks = cursor.fetchall()
return [Task(t[1], t[2], t[3], t[4]) for t in tasks]

40
crawl_server/server.py Normal file
View File

@@ -0,0 +1,40 @@
from flask import Flask, request, abort, Response
import json
from crawl_server.task_manager import TaskManager, Task
app = Flask(__name__)
tm = TaskManager("tm_db.sqlite3")
@app.route("/")
def hello():
return "Hello World!"
@app.route("/task/")
def get_tasks():
json_str = json.dumps([task.to_json() for task in tm.get_tasks()])
return Response(json_str, mimetype="application/json")
@app.route("/task/put", methods=["POST"])
def task_put():
if request.json:
try:
url = request.json["url"]
priority = request.json["priority"]
callback_type = request.json["callback_type"]
callback_args = request.json["callback_args"]
except KeyError:
return abort(400)
task = Task(url, priority, callback_type, callback_args)
tm.put_task(task)
return '{"ok": "true"}'
return abort(400)
if __name__ == "__main__":
app.run()

View File

@@ -0,0 +1,73 @@
from crawl_server.database import TaskManagerDatabase, Task
from multiprocessing import Pool
from apscheduler.schedulers.background import BackgroundScheduler
from enum import Enum
from datetime import datetime
from crawler.crawler import RemoteDirectoryCrawler
class TaskResultStatus(Enum):
SUCCESS = 0
FAILURE = 1
class TaskResult:
def __init__(self):
self.status_code: TaskResultStatus = None
self.file_count = 0
self.start_time = None
self.end_time = None
self.website_id = None
class TaskManager:
def __init__(self, db_path, max_processes=8):
self.db = TaskManagerDatabase(db_path)
self.pool = Pool(processes=max_processes)
scheduler = BackgroundScheduler()
scheduler.add_job(self.execute_queued_task, "interval", seconds=1)
scheduler.start()
def put_task(self, task: Task):
self.db.put_task(task)
def get_tasks(self):
return self.db.get_tasks()
def execute_queued_task(self):
task = self.db.pop_task()
if task:
print("pooled " + task.url)
self.pool.apply_async(
TaskManager.run_task,
args=(task, ),
callback=TaskManager.task_complete
)
@staticmethod
def run_task(task):
result = TaskResult()
result.start_time = datetime.utcnow()
print("Starting task " + task.url)
crawler = RemoteDirectoryCrawler(task.url, 10)
crawler.crawl_directory()
print("End task " + task.url)
result.end_time = datetime.utcnow()
return result
@staticmethod
def task_complete(result: TaskResult):
print("Task done " + str(result))
# todo save in db