Logging and bugfix for http crawler

This commit is contained in:
Simon 2018-06-25 14:36:16 -04:00
parent 5fd00f22af
commit d7ce1670a8
8 changed files with 67 additions and 42 deletions

View File

@ -0,0 +1,10 @@
import logging
from logging import FileHandler
logger = logging.getLogger("default")
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s')
file_handler = FileHandler("crawl_server.log")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

View File

@ -3,6 +3,7 @@ import ujson
from urllib.parse import urlparse, urljoin from urllib.parse import urlparse, urljoin
from threading import Thread from threading import Thread
from queue import Queue, Empty from queue import Queue, Empty
from crawl_server import logger
class TooManyConnectionsError(Exception): class TooManyConnectionsError(Exception):
@ -86,6 +87,7 @@ class RemoteDirectoryCrawler:
try: try:
try: try:
directory = RemoteDirectoryFactory.get_directory(self.url) directory = RemoteDirectoryFactory.get_directory(self.url)
logger.info("Crawling directory " + self.url + " with " + str(type(directory)))
path_id, root_listing = directory.list_dir(urlparse(self.url).path) path_id, root_listing = directory.list_dir(urlparse(self.url).path)
if root_listing: if root_listing:
self.crawled_paths.append(path_id) self.crawled_paths.append(path_id)
@ -115,7 +117,7 @@ class RemoteDirectoryCrawler:
in_q.join() in_q.join()
files_q.join() files_q.join()
print("Done") logger.info("Crawling for " + self.url + " done, waiting for threads to terminate...")
# Kill threads # Kill threads
for _ in threads: for _ in threads:
@ -153,13 +155,11 @@ class RemoteDirectoryCrawler:
in_q.put(urljoin(f.path, f.name)) in_q.put(urljoin(f.path, f.name))
else: else:
files_q.put(f) files_q.put(f)
import sys logger.debug("LISTED " + self.url + path)
print("LISTED " + repr(path) + "dirs:" + str(in_q.qsize()))
else: else:
pass logger.debug("Dropped " + self.url + path + " (was empty or already crawled)")
# print("SKIPPED: " + path + ", dropped " + str(len(listing)))
except TooManyConnectionsError: except TooManyConnectionsError:
print("Too many connections") logger.debug("Too many connections, this thread will be killed and path resubmitted")
# Kill worker and resubmit listing task # Kill worker and resubmit listing task
directory.close() directory.close()
in_q.put(path) in_q.put(path)
@ -178,6 +178,7 @@ class RemoteDirectoryCrawler:
try: try:
file = files_q.get(timeout=800) file = files_q.get(timeout=800)
except Empty: except Empty:
logger.error("File writer thread timed out")
break break
if file is None: if file is None:
@ -188,7 +189,7 @@ class RemoteDirectoryCrawler:
files_q.task_done() files_q.task_done()
files_written.append(counter) files_written.append(counter)
print("File writer done") logger.info("File writer thread done")

View File

@ -1,3 +1,4 @@
from crawl_server import logger
import os import os
import json import json
import sqlite3 import sqlite3
@ -59,6 +60,7 @@ class TaskManagerDatabase:
if not os.path.exists(db_path): if not os.path.exists(db_path):
self.init_database() self.init_database()
logger.info("Initialised database")
def init_database(self): def init_database(self):

View File

@ -1,5 +1,5 @@
#! /usr/bin/env python #! /usr/bin/env python
from crawl_server import logger
from urllib.parse import urlparse from urllib.parse import urlparse
import os import os
import time import time
@ -36,6 +36,7 @@ class FtpDirectory(RemoteDirectory):
while failed_attempts < self.max_attempts: while failed_attempts < self.max_attempts:
try: try:
self._connect() self._connect()
logger.debug("New FTP connection @ " + self.base_url)
return True return True
except ftputil.error.FTPError as e: except ftputil.error.FTPError as e:
@ -71,7 +72,7 @@ class FtpDirectory(RemoteDirectory):
)) ))
return path, results return path, results
except ftputil.error.ParserError as e: except ftputil.error.ParserError as e:
print("TODO: fix parsing error: " + e.strerror + " @ " + str(e.file_name)) logger.error("TODO: fix parsing error: " + e.strerror + " @ " + str(e.file_name))
break break
except ftputil.error.FTPError as e: except ftputil.error.FTPError as e:
if e.errno in FtpDirectory.CANCEL_LISTING_CODE: if e.errno in FtpDirectory.CANCEL_LISTING_CODE:
@ -90,14 +91,15 @@ class FtpDirectory(RemoteDirectory):
except Exception as e: except Exception as e:
failed_attempts += 1 failed_attempts += 1
self.reconnect() self.reconnect()
print(e) logger.error("Exception while processing FTP listing for " + self.base_url + ": " + str(e))
return path, [] return path, []
def reconnect(self): def reconnect(self):
if self.ftp: if self.ftp:
self.ftp.close() self.ftp.close()
self.stop_when_connected() success = self.stop_when_connected()
logger.debug("Reconnecting to FTP server " + self.base_url + (" (OK)" if success else " (ERR)"))
def try_stat(self, path): def try_stat(self, path):
@ -105,11 +107,12 @@ class FtpDirectory(RemoteDirectory):
return self.ftp.stat(path) return self.ftp.stat(path)
except ftputil.error.ParserError as e: except ftputil.error.ParserError as e:
# TODO: Try to parse it ourselves? # TODO: Try to parse it ourselves?
print("Could not parse " + path + " " + e.strerror) logger.error("Exception while parsing FTP listing for " + self.base_url + path + " " + e.strerror)
return None return None
def close(self): def close(self):
if self.ftp: if self.ftp:
self.ftp.close() self.ftp.close()
self.ftp = None self.ftp = None
logger.debug("Closing FtpRemoteDirectory for " + self.base_url)

View File

@ -1,3 +1,4 @@
from crawl_server import logger
from urllib.parse import unquote, urljoin from urllib.parse import unquote, urljoin
import os import os
from html.parser import HTMLParser from html.parser import HTMLParser
@ -19,6 +20,9 @@ class Anchor:
self.text = None self.text = None
self.href = None self.href = None
def __str__(self):
return "<" + self.href + ", " + str(self.text).strip() + ">"
class HTMLAnchorParser(HTMLParser): class HTMLAnchorParser(HTMLParser):
@ -46,7 +50,7 @@ class HTMLAnchorParser(HTMLParser):
self.current_anchor = None self.current_anchor = None
def error(self, message): def error(self, message):
pass logger.debug("HTML Parser error: " + message)
def feed(self, data): def feed(self, data):
self.anchors.clear() self.anchors.clear()
@ -181,7 +185,6 @@ class HttpDirectory(RemoteDirectory):
# Unsupported encoding # Unsupported encoding
yield chunk.decode("utf-8", errors="ignore") yield chunk.decode("utf-8", errors="ignore")
r.close() r.close()
del r
break break
except RequestException: except RequestException:
self.session.close() self.session.close()
@ -208,7 +211,7 @@ class HttpDirectory(RemoteDirectory):
@staticmethod @staticmethod
def _should_ignore(base_url, link: Anchor): def _should_ignore(base_url, link: Anchor):
if link.text in HttpDirectory.FILE_NAME_BLACKLIST or link.href in ("../", "./", "") \ if link.text in HttpDirectory.FILE_NAME_BLACKLIST or link.href in ("../", "./", "", "..", "../../") \
or link.href.endswith(HttpDirectory.BLACK_LIST): or link.href.endswith(HttpDirectory.BLACK_LIST):
return True return True
@ -217,7 +220,12 @@ class HttpDirectory(RemoteDirectory):
if not full_url.startswith(base_url): if not full_url.startswith(base_url):
return True return True
# Ignore parameters in url
if "?" in link.href:
return True
def close(self): def close(self):
self.session.close() self.session.close()
logger.debug("Closing HTTPRemoteDirectory for " + self.base_url)

View File

@ -1,6 +1,7 @@
from flask import Flask, request, abort, Response, send_file from flask import Flask, request, abort, Response, send_file
from flask_httpauth import HTTPTokenAuth from flask_httpauth import HTTPTokenAuth
import json import json
from crawl_server import logger
from crawl_server.task_manager import TaskManager, Task from crawl_server.task_manager import TaskManager, Task
import os import os
import config import config
@ -35,11 +36,13 @@ def task_put():
priority = request.json["priority"] priority = request.json["priority"]
callback_type = request.json["callback_type"] callback_type = request.json["callback_type"]
callback_args = request.json["callback_args"] callback_args = request.json["callback_args"]
except KeyError: except KeyError as e:
logger.error("Invalid task put request from " + request.remote_addr + " missing key: " + str(e))
return abort(400) return abort(400)
task = Task(website_id, url, priority, callback_type, callback_args) task = Task(website_id, url, priority, callback_type, callback_args)
tm.put_task(task) tm.put_task(task)
logger.info("Submitted new task to queue: " + str(task.to_json()))
return '{"ok": "true"}' return '{"ok": "true"}'
return abort(400) return abort(400)
@ -49,6 +52,7 @@ def task_put():
@auth.login_required @auth.login_required
def get_completed_tasks(): def get_completed_tasks():
json_str = json.dumps([result.to_json() for result in tm.get_non_indexed_results()]) json_str = json.dumps([result.to_json() for result in tm.get_non_indexed_results()])
logger.debug("Webserver has requested list of newly completed tasks from " + request.remote_addr)
return Response(json_str, mimetype="application/json") return Response(json_str, mimetype="application/json")
@ -57,6 +61,7 @@ def get_completed_tasks():
def get_current_tasks(): def get_current_tasks():
current_tasks = tm.get_current_tasks() current_tasks = tm.get_current_tasks()
logger.debug("Webserver has requested list of current tasks from " + request.remote_addr)
return json.dumps([t.to_json() for t in current_tasks]) return json.dumps([t.to_json() for t in current_tasks])
@ -66,8 +71,10 @@ def get_file_list(website_id):
file_name = "./crawled/" + str(website_id) + ".json" file_name = "./crawled/" + str(website_id) + ".json"
if os.path.exists(file_name): if os.path.exists(file_name):
logger.info("Webserver requested file list of website with id" + str(website_id))
return send_file(file_name) return send_file(file_name)
else: else:
logger.error("Webserver requested file list of non-existent or empty website with id: " + str(website_id))
return abort(404) return abort(404)
@ -77,24 +84,19 @@ def free_file_list(website_id):
file_name = "./crawled/" + str(website_id) + ".json" file_name = "./crawled/" + str(website_id) + ".json"
if os.path.exists(file_name): if os.path.exists(file_name):
os.remove(file_name) os.remove(file_name)
logger.debug("Webserver indicated that the files for the website with id " +
str(website_id) + " are safe to delete")
return '{"ok": "true"}' return '{"ok": "true"}'
else: else:
return abort(404) return abort(404)
@app.route("/task/logs/")
@auth.login_required
def get_task_logs():
json_str = json.dumps([result.to_json() for result in tm.get_all_results()])
return Response(json_str, mimetype="application/json")
@app.route("/task/pop_all") @app.route("/task/pop_all")
@auth.login_required @auth.login_required
def pop_queued_tasks(): def pop_queued_tasks():
json_str = json.dumps([task.to_json() for task in tm.pop_tasks()]) json_str = json.dumps([task.to_json() for task in tm.pop_tasks()])
logger.info("Webserver poped all queued tasks")
return Response(json_str, mimetype="application/json") return Response(json_str, mimetype="application/json")

View File

@ -1,10 +1,10 @@
from crawl_server import logger
import config import config
from crawl_server.database import TaskManagerDatabase, Task, TaskResult from crawl_server.database import TaskManagerDatabase, Task, TaskResult
from multiprocessing import Manager, Pool from multiprocessing import Manager, Pool
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime from datetime import datetime
from crawl_server.crawler import RemoteDirectoryCrawler from crawl_server.crawler import RemoteDirectoryCrawler
from crawl_server.callbacks import PostCrawlCallbackFactory
class TaskManager: class TaskManager:
@ -41,7 +41,7 @@ class TaskManager:
if len(self.current_tasks) <= self.max_processes: if len(self.current_tasks) <= self.max_processes:
task = self.db.pop_task() task = self.db.pop_task()
if task: if task:
print("pooled " + task.url) logger.info("Submitted " + task.url + " to process pool")
self.current_tasks.append(task) self.current_tasks.append(task)
self.pool.apply_async( self.pool.apply_async(
@ -58,7 +58,7 @@ class TaskManager:
result.start_time = datetime.utcnow() result.start_time = datetime.utcnow()
result.website_id = task.website_id result.website_id = task.website_id
print("Starting task " + task.url) logger.info("Starting task " + task.url)
crawler = RemoteDirectoryCrawler(task.url, config.CRAWL_SERVER_THREADS) crawler = RemoteDirectoryCrawler(task.url, config.CRAWL_SERVER_THREADS)
crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json") crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json")
@ -68,18 +68,18 @@ class TaskManager:
result.status_code = crawl_result.status_code result.status_code = crawl_result.status_code
result.end_time = datetime.utcnow() result.end_time = datetime.utcnow()
print("End task " + task.url) logger.info("End task " + task.url)
callback = PostCrawlCallbackFactory.get_callback(task) # TODO: Figure out the callbacks
if callback: # callback = PostCrawlCallbackFactory.get_callback(task)
callback.run() # if callback:
print("Executed callback") # callback.run()
return result, db_path, current_tasks return result, db_path, current_tasks
@staticmethod @staticmethod
def task_error(result): def task_error(result):
print("TASK ERROR") logger.error("Uncaught exception during a task: ")
raise result raise result
@staticmethod @staticmethod
@ -87,14 +87,12 @@ class TaskManager:
task_result, db_path, current_tasks = result task_result, db_path, current_tasks = result
print(task_result.status_code) logger.info("Task completed, logger result to database")
print(task_result.file_count) logger.info("Status code: " + task_result.status_code)
print(task_result.start_time) logger.info("File count: " + str(task_result.file_count))
print(task_result.end_time)
db = TaskManagerDatabase(db_path) db = TaskManagerDatabase(db_path)
db.log_result(task_result) db.log_result(task_result)
print("Logged result to DB")
for i, task in enumerate(current_tasks): for i, task in enumerate(current_tasks):
if task.website_id == task_result.website_id: if task.website_id == task_result.website_id:

View File

@ -32,7 +32,8 @@ class CrawlServer:
print("Sending task to crawl server " + self.url) print("Sending task to crawl server " + self.url)
try: try:
payload = json.dumps(task.to_json()) payload = json.dumps(task.to_json())
r = requests.post(self.url + "/task/put", headers=self._generate_headers(), data=payload, verify=False) r = requests.post(self.url + "/task/put", headers=self._generate_headers(), data=payload, verify=False,
timeout=5)
print(r) # TODO: If the task could not be added, fallback to another server print(r) # TODO: If the task could not be added, fallback to another server
return r.status_code == 200 return r.status_code == 200
except ConnectionError: except ConnectionError:
@ -41,7 +42,7 @@ class CrawlServer:
def pop_completed_tasks(self) -> list: def pop_completed_tasks(self) -> list:
try: try:
r = requests.get(self.url + "/task/completed", headers=self._generate_headers(), verify=False) r = requests.get(self.url + "/task/completed", headers=self._generate_headers(), verify=False, timeout=5)
if r.status_code != 200: if r.status_code != 200:
print("Problem while fetching completed tasks for '" + self.name + "': " + str(r.status_code)) print("Problem while fetching completed tasks for '" + self.name + "': " + str(r.status_code))
print(r.text) print(r.text)
@ -56,7 +57,7 @@ class CrawlServer:
def fetch_queued_tasks(self): def fetch_queued_tasks(self):
try: try:
r = requests.get(self.url + "/task/", headers=self._generate_headers(), verify=False) r = requests.get(self.url + "/task/", headers=self._generate_headers(), verify=False, timeout=5)
if r.status_code != 200: if r.status_code != 200:
print("Problem while fetching queued tasks for '" + self.name + "' " + str(r.status_code)) print("Problem while fetching queued tasks for '" + self.name + "' " + str(r.status_code))
@ -73,7 +74,7 @@ class CrawlServer:
def fetch_current_tasks(self): def fetch_current_tasks(self):
try: try:
r = requests.get(self.url + "/task/current", headers=self._generate_headers(), verify=False) r = requests.get(self.url + "/task/current", headers=self._generate_headers(), verify=False, timeout=5)
if r.status_code != 200: if r.status_code != 200:
print("Problem while fetching current tasks for '" + self.name + "' " + str(r.status_code)) print("Problem while fetching current tasks for '" + self.name + "' " + str(r.status_code))