From 72495275b0b97c3f2c5722a4ecad6d9f3f207122 Mon Sep 17 00:00:00 2001 From: Simon Date: Mon, 11 Jun 2018 22:35:49 -0400 Subject: [PATCH] Elasticsearch search engine (import from json) --- crawl_server/task_manager.py | 3 +- crawler/crawler.py | 5 +- crawler/ftp.py | 2 +- crawler/http.py | 8 ++- debug_put.py | 2 +- requirements.txt | 3 +- search/search.py | 135 +++++++++++++++++++++++++++++++++++ test/test_crawl_server.py | 17 ++--- test/test_search.py | 38 ++++++++++ 9 files changed, 190 insertions(+), 23 deletions(-) create mode 100644 search/search.py create mode 100644 test/test_search.py diff --git a/crawl_server/task_manager.py b/crawl_server/task_manager.py index a21abc4..6daaefd 100644 --- a/crawl_server/task_manager.py +++ b/crawl_server/task_manager.py @@ -49,7 +49,7 @@ class TaskManager: print("Starting task " + task.url) - crawler = RemoteDirectoryCrawler(task.url, 10) + crawler = RemoteDirectoryCrawler(task.url, 100) crawl_result = crawler.crawl_directory("12345.json") result.file_count = crawl_result.file_count @@ -63,7 +63,6 @@ class TaskManager: @staticmethod def task_complete(result: TaskResult): - print("Task done " + str(result)) print(result.status_code) print(result.file_count) print(result.start_time) diff --git a/crawler/crawler.py b/crawler/crawler.py index cd35bf9..6087443 100644 --- a/crawler/crawler.py +++ b/crawler/crawler.py @@ -12,7 +12,7 @@ class TooManyConnectionsError(Exception): class File: - def __init__(self, name: str, size: int, mtime: str, path: str, is_dir: bool): + def __init__(self, name: str, size: int, mtime: int, path: str, is_dir: bool): self.name = name self.size = size self.mtime = mtime @@ -69,8 +69,7 @@ def export_to_json(q: Queue, out_file: str) -> int: while True: try: next_file: File = q.get_nowait() - f.write(next_file.to_json()) - f.write("\n") + f.write(next_file.to_json() + "\n") counter += 1 except Empty: break diff --git a/crawler/ftp.py b/crawler/ftp.py index 4eb3436..660f39f 100644 --- a/crawler/ftp.py +++ b/crawler/ftp.py @@ -61,7 +61,7 @@ class FtpDirectory(RemoteDirectory): results.append(File( name=file_name, - mtime=stat.st_mtime, + mtime=stat.st_mtime, # TODO: check size=-1 if is_dir else stat.st_size, is_dir=is_dir, path=path diff --git a/crawler/http.py b/crawler/http.py index dfa3dc6..e01fbc3 100644 --- a/crawler/http.py +++ b/crawler/http.py @@ -8,6 +8,7 @@ import requests from requests.exceptions import RequestException from multiprocessing.pool import ThreadPool import config +from dateutil.parser import parse as parse_date class Link: @@ -59,7 +60,7 @@ class HttpDirectory(RemoteDirectory): if self._isdir(link): results.append(File( name=file_name, - mtime="", + mtime=0, size=-1, is_dir=True, path=path @@ -79,6 +80,7 @@ class HttpDirectory(RemoteDirectory): # Many urls, use multi-threaded solution pool = ThreadPool(processes=10) files = pool.starmap(HttpDirectory._request_file, zip(repeat(self), urls_to_request)) + pool.close() for file in files: if file: results.append(file) @@ -132,12 +134,12 @@ class HttpDirectory(RemoteDirectory): stripped_url = url[len(self.base_url) - 1:] path, name = os.path.split(stripped_url) - + date = r.headers["Date"] if "Date" in r.headers else "1970-01-01" return File( path=unquote(path).strip("/"), name=unquote(name), size=int(r.headers["Content-Length"]) if "Content-Length" in r.headers else -1, - mtime=r.headers["Date"] if "Date" in r.headers else "?", + mtime=int(parse_date(date).timestamp()), is_dir=False ) except RequestException: diff --git a/debug_put.py b/debug_put.py index 416a368..3274d78 100644 --- a/debug_put.py +++ b/debug_put.py @@ -3,7 +3,7 @@ import json payload = json.dumps({ - "url": "http://124.158.108.137/ebooks/", + "url": "http://138.197.215.189/", "priority": 2, "callback_type": "", "callback_args": "{}" diff --git a/requirements.txt b/requirements.txt index 5758590..a3f3124 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ humanfriendly apscheduler bcrypt ftputil -lxml \ No newline at end of file +lxml +elasticsearch \ No newline at end of file diff --git a/search/search.py b/search/search.py new file mode 100644 index 0000000..5e3490f --- /dev/null +++ b/search/search.py @@ -0,0 +1,135 @@ +import elasticsearch + + +class IndexingError(Exception): + pass + + +class SearchEngine: + + def __init__(self): + pass + + def import_json(self, in_file: str, website_id: int): + raise NotImplementedError + + def search(self, query) -> list: + raise NotImplementedError + + def reset(self): + raise NotImplementedError + + def ping(self): + raise NotImplementedError + + +class ElasticSearchEngine(SearchEngine): + + def __init__(self, index_name): + super().__init__() + self.index_name = index_name + self.es = elasticsearch.Elasticsearch() + + if not self.es.indices.exists(self.index_name): + self.init() + + def init(self): + print("Elasticsearch first time setup") + if self.es.indices.exists(self.index_name): + self.es.indices.delete(index=self.index_name) + self.es.indices.create(index=self.index_name) + self.es.indices.close(index=self.index_name) + + # Paths + self.es.indices.put_settings(body= + {"analysis": { + "tokenizer": { + "path_tokenizer": { + "type": "path_hierarchy" + } + } + }}, index=self.index_name) + + self.es.indices.put_settings(body= + {"analysis": { + "analyzer": { + "path_analyser": { + "tokenizer": "path_tokenizer", "filter": ["lowercase"] + } + } + }}, index=self.index_name) + + # File names + self.es.indices.put_settings(body= + {"analysis": { + "tokenizer": { + "my_nGram_tokenizer": { + "type": "nGram", "min_gram": 3, "max_gram": 3} + } + }}, index=self.index_name) + self.es.indices.put_settings(body= + {"analysis": { + "analyzer": { + "my_nGram": { + "tokenizer": "my_nGram_tokenizer", + "filter": ["lowercase", "asciifolding"] + } + } + }}, index=self.index_name) + + # Mappings + self.es.indices.put_mapping(body={"properties": { + "path": {"type": "text", "analyzer": "path_analyser"}, + "name": {"analyzer": "my_nGram", "type": "text"}, + "mtime": {"type": "date", "format": "epoch_millis"}, + "size": {"type": "long"}, + "website_id": {"type": "integer"} + }}, doc_type="file", index=self.index_name) + + self.es.indices.open(index=self.index_name) + + def reset(self): + self.init() + + def ping(self): + return self.es.ping() + + def import_json(self, in_file: str, website_id: int): + import_every = 1000 + + with open(in_file, "r") as f: + docs = [] + + line = f.readline() + while line: + docs.append(line[:-1]) # Remove trailing new line + + if len(docs) >= import_every: + self._index(docs, website_id) + docs.clear() + line = f.readline() + self._index(docs, website_id) + + def _index(self, docs, website_id): + print("Indexing " + str(len(docs)) + " docs") + bulk_string = ElasticSearchEngine.create_bulk_index_string(docs, website_id) + result = self.es.bulk(body=bulk_string, index=self.index_name, doc_type="file") + + if result["errors"]: + print(result) + raise IndexingError + + @staticmethod + def create_bulk_index_string(docs: list, website_id: int): + + result = "" + + action_string = '{"index":{}}\n' + website_id_string = ',"website_id":' + str(website_id) + '}\n' # Add website_id param to each doc + + for doc in docs: + result += action_string + doc[:-1] + website_id_string + return result + + def search(self, query): + pass diff --git a/test/test_crawl_server.py b/test/test_crawl_server.py index 57cc23d..bba031b 100644 --- a/test/test_crawl_server.py +++ b/test/test_crawl_server.py @@ -1,9 +1,7 @@ from flask_testing import LiveServerTestCase -import os import json import requests from crawl_server.server import app -from crawl_server.task_manager import TaskManager class CrawlServerTest(LiveServerTestCase): @@ -20,15 +18,6 @@ class CrawlServerTest(LiveServerTestCase): app.config['LIVESERVER_PORT'] = 9999 return app - def test_put_only_accepts_json(self): - - payload = json.dumps({"url": "", "priority": 1, "callback_type": "", "callback_args": "{}"}) - r = requests.post(self.HOST + "/task/put", data=payload) - self.assertEqual(400, r.status_code) - - r2 = requests.post(self.HOST + "/task/put", headers=self.headers, data=payload) - self.assertEqual(200, r2.status_code) - def test_put_task(self): payload = json.dumps({ @@ -43,11 +32,15 @@ class CrawlServerTest(LiveServerTestCase): r = requests.get(self.HOST + "/task") self.assertEqual(200, r.status_code) - print(r.text) result = json.loads(r.text)[0] self.assertEqual(result["url"], "a") self.assertEqual(result["priority"], 2) self.assertEqual(result["callback_type"], "c") self.assertEqual(result["callback_args"], '{"d": 4}') + payload = json.dumps({"url": "", "priority": 1, "callback_type": "", "callback_args": "{}"}) + r = requests.post(self.HOST + "/task/put", data=payload) + self.assertEqual(400, r.status_code) + r2 = requests.post(self.HOST + "/task/put", headers=self.headers, data=payload) + self.assertEqual(200, r2.status_code) diff --git a/test/test_search.py b/test/test_search.py new file mode 100644 index 0000000..be6b592 --- /dev/null +++ b/test/test_search.py @@ -0,0 +1,38 @@ +from unittest import TestCase +import time +import json +import os +from search.search import ElasticSearchEngine + + +class SearchTest(TestCase): + + def setUp(self): + self.search = ElasticSearchEngine("od-database-test") + self.search.reset() + time.sleep(1) + + def test_ping(self): + self.assertTrue(self.search.ping(), "Search engine not running") + + def test_import_json(self): + + files = [ + {"name": "a", "size": 1000000000000000000, "path": "c/d", "mtime": 1528765672}, + {"name": "b", "size": 123, "path": "", "mtime": None}, + {"name": "c", "size": -1, "path": "c", "mtime": 12345} + ] + + with open("tmp.json", "w") as f: + for file in files: + f.write(json.dumps(file) + "\n") + + self.search.import_json("tmp.json", 123) + time.sleep(3) + self.assertEqual(3, self.search.es.count(self.search.index_name, "file")["count"]) + + os.remove("tmp.json") + + + +