diff --git a/crawl_server/__init__.py b/crawl_server/__init__.py index e69de29..2623852 100644 --- a/crawl_server/__init__.py +++ b/crawl_server/__init__.py @@ -0,0 +1,10 @@ +import logging +from logging import FileHandler + +logger = logging.getLogger("default") +logger.setLevel(logging.DEBUG) + +formatter = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s') +file_handler = FileHandler("crawl_server.log") +file_handler.setFormatter(formatter) +logger.addHandler(file_handler) diff --git a/crawl_server/crawler.py b/crawl_server/crawler.py index 8ac4616..79edd2c 100644 --- a/crawl_server/crawler.py +++ b/crawl_server/crawler.py @@ -3,6 +3,7 @@ import ujson from urllib.parse import urlparse, urljoin from threading import Thread from queue import Queue, Empty +from crawl_server import logger class TooManyConnectionsError(Exception): @@ -86,6 +87,7 @@ class RemoteDirectoryCrawler: try: try: directory = RemoteDirectoryFactory.get_directory(self.url) + logger.info("Crawling directory " + self.url + " with " + str(type(directory))) path_id, root_listing = directory.list_dir(urlparse(self.url).path) if root_listing: self.crawled_paths.append(path_id) @@ -115,7 +117,7 @@ class RemoteDirectoryCrawler: in_q.join() files_q.join() - print("Done") + logger.info("Crawling for " + self.url + " done, waiting for threads to terminate...") # Kill threads for _ in threads: @@ -153,13 +155,11 @@ class RemoteDirectoryCrawler: in_q.put(urljoin(f.path, f.name)) else: files_q.put(f) - import sys - print("LISTED " + repr(path) + "dirs:" + str(in_q.qsize())) + logger.debug("LISTED " + self.url + path) else: - pass - # print("SKIPPED: " + path + ", dropped " + str(len(listing))) + logger.debug("Dropped " + self.url + path + " (was empty or already crawled)") except TooManyConnectionsError: - print("Too many connections") + logger.debug("Too many connections, this thread will be killed and path resubmitted") # Kill worker and resubmit listing task directory.close() in_q.put(path) @@ -178,6 +178,7 @@ class RemoteDirectoryCrawler: try: file = files_q.get(timeout=800) except Empty: + logger.error("File writer thread timed out") break if file is None: @@ -188,7 +189,7 @@ class RemoteDirectoryCrawler: files_q.task_done() files_written.append(counter) - print("File writer done") + logger.info("File writer thread done") diff --git a/crawl_server/database.py b/crawl_server/database.py index 1e47b84..1493efc 100644 --- a/crawl_server/database.py +++ b/crawl_server/database.py @@ -1,3 +1,4 @@ +from crawl_server import logger import os import json import sqlite3 @@ -59,6 +60,7 @@ class TaskManagerDatabase: if not os.path.exists(db_path): self.init_database() + logger.info("Initialised database") def init_database(self): diff --git a/crawl_server/remote_ftp.py b/crawl_server/remote_ftp.py index 06e8eb4..6259b83 100644 --- a/crawl_server/remote_ftp.py +++ b/crawl_server/remote_ftp.py @@ -1,5 +1,5 @@ #! /usr/bin/env python - +from crawl_server import logger from urllib.parse import urlparse import os import time @@ -36,6 +36,7 @@ class FtpDirectory(RemoteDirectory): while failed_attempts < self.max_attempts: try: self._connect() + logger.debug("New FTP connection @ " + self.base_url) return True except ftputil.error.FTPError as e: @@ -71,7 +72,7 @@ class FtpDirectory(RemoteDirectory): )) return path, results except ftputil.error.ParserError as e: - print("TODO: fix parsing error: " + e.strerror + " @ " + str(e.file_name)) + logger.error("TODO: fix parsing error: " + e.strerror + " @ " + str(e.file_name)) break except ftputil.error.FTPError as e: if e.errno in FtpDirectory.CANCEL_LISTING_CODE: @@ -90,14 +91,15 @@ class FtpDirectory(RemoteDirectory): except Exception as e: failed_attempts += 1 self.reconnect() - print(e) + logger.error("Exception while processing FTP listing for " + self.base_url + ": " + str(e)) return path, [] def reconnect(self): if self.ftp: self.ftp.close() - self.stop_when_connected() + success = self.stop_when_connected() + logger.debug("Reconnecting to FTP server " + self.base_url + (" (OK)" if success else " (ERR)")) def try_stat(self, path): @@ -105,11 +107,12 @@ class FtpDirectory(RemoteDirectory): return self.ftp.stat(path) except ftputil.error.ParserError as e: # TODO: Try to parse it ourselves? - print("Could not parse " + path + " " + e.strerror) + logger.error("Exception while parsing FTP listing for " + self.base_url + path + " " + e.strerror) return None def close(self): if self.ftp: self.ftp.close() self.ftp = None + logger.debug("Closing FtpRemoteDirectory for " + self.base_url) diff --git a/crawl_server/remote_http.py b/crawl_server/remote_http.py index e386842..0f5c5e2 100644 --- a/crawl_server/remote_http.py +++ b/crawl_server/remote_http.py @@ -1,3 +1,4 @@ +from crawl_server import logger from urllib.parse import unquote, urljoin import os from html.parser import HTMLParser @@ -19,6 +20,9 @@ class Anchor: self.text = None self.href = None + def __str__(self): + return "<" + self.href + ", " + str(self.text).strip() + ">" + class HTMLAnchorParser(HTMLParser): @@ -46,7 +50,7 @@ class HTMLAnchorParser(HTMLParser): self.current_anchor = None def error(self, message): - pass + logger.debug("HTML Parser error: " + message) def feed(self, data): self.anchors.clear() @@ -181,7 +185,6 @@ class HttpDirectory(RemoteDirectory): # Unsupported encoding yield chunk.decode("utf-8", errors="ignore") r.close() - del r break except RequestException: self.session.close() @@ -208,7 +211,7 @@ class HttpDirectory(RemoteDirectory): @staticmethod def _should_ignore(base_url, link: Anchor): - if link.text in HttpDirectory.FILE_NAME_BLACKLIST or link.href in ("../", "./", "") \ + if link.text in HttpDirectory.FILE_NAME_BLACKLIST or link.href in ("../", "./", "", "..", "../../") \ or link.href.endswith(HttpDirectory.BLACK_LIST): return True @@ -217,7 +220,12 @@ class HttpDirectory(RemoteDirectory): if not full_url.startswith(base_url): return True + # Ignore parameters in url + if "?" in link.href: + return True + def close(self): self.session.close() + logger.debug("Closing HTTPRemoteDirectory for " + self.base_url) diff --git a/crawl_server/server.py b/crawl_server/server.py index 970e0c7..f5d4d61 100644 --- a/crawl_server/server.py +++ b/crawl_server/server.py @@ -1,6 +1,7 @@ from flask import Flask, request, abort, Response, send_file from flask_httpauth import HTTPTokenAuth import json +from crawl_server import logger from crawl_server.task_manager import TaskManager, Task import os import config @@ -35,11 +36,13 @@ def task_put(): priority = request.json["priority"] callback_type = request.json["callback_type"] callback_args = request.json["callback_args"] - except KeyError: + except KeyError as e: + logger.error("Invalid task put request from " + request.remote_addr + " missing key: " + str(e)) return abort(400) task = Task(website_id, url, priority, callback_type, callback_args) tm.put_task(task) + logger.info("Submitted new task to queue: " + str(task.to_json())) return '{"ok": "true"}' return abort(400) @@ -49,6 +52,7 @@ def task_put(): @auth.login_required def get_completed_tasks(): json_str = json.dumps([result.to_json() for result in tm.get_non_indexed_results()]) + logger.debug("Webserver has requested list of newly completed tasks from " + request.remote_addr) return Response(json_str, mimetype="application/json") @@ -57,6 +61,7 @@ def get_completed_tasks(): def get_current_tasks(): current_tasks = tm.get_current_tasks() + logger.debug("Webserver has requested list of current tasks from " + request.remote_addr) return json.dumps([t.to_json() for t in current_tasks]) @@ -66,8 +71,10 @@ def get_file_list(website_id): file_name = "./crawled/" + str(website_id) + ".json" if os.path.exists(file_name): + logger.info("Webserver requested file list of website with id" + str(website_id)) return send_file(file_name) else: + logger.error("Webserver requested file list of non-existent or empty website with id: " + str(website_id)) return abort(404) @@ -77,24 +84,19 @@ def free_file_list(website_id): file_name = "./crawled/" + str(website_id) + ".json" if os.path.exists(file_name): os.remove(file_name) + logger.debug("Webserver indicated that the files for the website with id " + + str(website_id) + " are safe to delete") return '{"ok": "true"}' else: return abort(404) -@app.route("/task/logs/") -@auth.login_required -def get_task_logs(): - - json_str = json.dumps([result.to_json() for result in tm.get_all_results()]) - return Response(json_str, mimetype="application/json") - - @app.route("/task/pop_all") @auth.login_required def pop_queued_tasks(): json_str = json.dumps([task.to_json() for task in tm.pop_tasks()]) + logger.info("Webserver poped all queued tasks") return Response(json_str, mimetype="application/json") diff --git a/crawl_server/task_manager.py b/crawl_server/task_manager.py index 585aa8a..47a6e11 100644 --- a/crawl_server/task_manager.py +++ b/crawl_server/task_manager.py @@ -1,10 +1,10 @@ +from crawl_server import logger import config from crawl_server.database import TaskManagerDatabase, Task, TaskResult from multiprocessing import Manager, Pool from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime from crawl_server.crawler import RemoteDirectoryCrawler -from crawl_server.callbacks import PostCrawlCallbackFactory class TaskManager: @@ -41,7 +41,7 @@ class TaskManager: if len(self.current_tasks) <= self.max_processes: task = self.db.pop_task() if task: - print("pooled " + task.url) + logger.info("Submitted " + task.url + " to process pool") self.current_tasks.append(task) self.pool.apply_async( @@ -58,7 +58,7 @@ class TaskManager: result.start_time = datetime.utcnow() result.website_id = task.website_id - print("Starting task " + task.url) + logger.info("Starting task " + task.url) crawler = RemoteDirectoryCrawler(task.url, config.CRAWL_SERVER_THREADS) crawl_result = crawler.crawl_directory("./crawled/" + str(task.website_id) + ".json") @@ -68,18 +68,18 @@ class TaskManager: result.status_code = crawl_result.status_code result.end_time = datetime.utcnow() - print("End task " + task.url) + logger.info("End task " + task.url) - callback = PostCrawlCallbackFactory.get_callback(task) - if callback: - callback.run() - print("Executed callback") + # TODO: Figure out the callbacks + # callback = PostCrawlCallbackFactory.get_callback(task) + # if callback: + # callback.run() return result, db_path, current_tasks @staticmethod def task_error(result): - print("TASK ERROR") + logger.error("Uncaught exception during a task: ") raise result @staticmethod @@ -87,14 +87,12 @@ class TaskManager: task_result, db_path, current_tasks = result - print(task_result.status_code) - print(task_result.file_count) - print(task_result.start_time) - print(task_result.end_time) + logger.info("Task completed, logger result to database") + logger.info("Status code: " + task_result.status_code) + logger.info("File count: " + str(task_result.file_count)) db = TaskManagerDatabase(db_path) db.log_result(task_result) - print("Logged result to DB") for i, task in enumerate(current_tasks): if task.website_id == task_result.website_id: diff --git a/task.py b/task.py index ed7c2ca..bfea176 100644 --- a/task.py +++ b/task.py @@ -32,7 +32,8 @@ class CrawlServer: print("Sending task to crawl server " + self.url) try: payload = json.dumps(task.to_json()) - r = requests.post(self.url + "/task/put", headers=self._generate_headers(), data=payload, verify=False) + r = requests.post(self.url + "/task/put", headers=self._generate_headers(), data=payload, verify=False, + timeout=5) print(r) # TODO: If the task could not be added, fallback to another server return r.status_code == 200 except ConnectionError: @@ -41,7 +42,7 @@ class CrawlServer: def pop_completed_tasks(self) -> list: try: - r = requests.get(self.url + "/task/completed", headers=self._generate_headers(), verify=False) + r = requests.get(self.url + "/task/completed", headers=self._generate_headers(), verify=False, timeout=5) if r.status_code != 200: print("Problem while fetching completed tasks for '" + self.name + "': " + str(r.status_code)) print(r.text) @@ -56,7 +57,7 @@ class CrawlServer: def fetch_queued_tasks(self): try: - r = requests.get(self.url + "/task/", headers=self._generate_headers(), verify=False) + r = requests.get(self.url + "/task/", headers=self._generate_headers(), verify=False, timeout=5) if r.status_code != 200: print("Problem while fetching queued tasks for '" + self.name + "' " + str(r.status_code)) @@ -73,7 +74,7 @@ class CrawlServer: def fetch_current_tasks(self): try: - r = requests.get(self.url + "/task/current", headers=self._generate_headers(), verify=False) + r = requests.get(self.url + "/task/current", headers=self._generate_headers(), verify=False, timeout=5) if r.status_code != 200: print("Problem while fetching current tasks for '" + self.name + "' " + str(r.status_code))