diff --git a/app.py b/app.py index 8dfb9b7..4605610 100644 --- a/app.py +++ b/app.py @@ -26,7 +26,7 @@ searchEngine = ElasticSearchEngine("od-database") @app.template_filter("datetime_format") -def datetime_format(value, format='%Y-%m-%d %H:%M UTC'): +def datetime_format(value, format='%Y-%m-%d'): return time.strftime(format, time.gmtime(value)) @@ -107,18 +107,16 @@ def search(): per_page = int(per_page) if per_page.isdigit() else "50" per_page = per_page if per_page in config.RESULTS_PER_PAGE else 50 - if q: + if len(q) >= 3: try: - # hits = sea.search(q, per_page, page, sort_order) - hits = searchEngine.search(q, page, per_page) + hits = searchEngine.search(q, page, per_page, sort_order) + hits = db.join_search_result(hits) except InvalidQueryException as e: flash("Invalid query: " + str(e), "warning") return redirect("/search") else: hits = None - print(hits) - return render_template("search.html", results=hits, q=q, p=page, sort_order=sort_order, per_page=per_page, results_set=config.RESULTS_PER_PAGE) diff --git a/crawl_server/task_manager.py b/crawl_server/task_manager.py index 32c0711..5c853e1 100644 --- a/crawl_server/task_manager.py +++ b/crawl_server/task_manager.py @@ -1,5 +1,5 @@ from crawl_server.database import TaskManagerDatabase, Task, TaskResult -from multiprocessing import Pool +from concurrent.futures import ProcessPoolExecutor from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime from crawl_server.crawler import RemoteDirectoryCrawler @@ -10,7 +10,7 @@ class TaskManager: def __init__(self, db_path, max_processes=8): self.db_path = db_path self.db = TaskManagerDatabase(db_path) - self.pool = Pool(processes=max_processes) + self.pool = ProcessPoolExecutor(max_workers=max_processes) self.current_tasks = [] @@ -39,12 +39,10 @@ class TaskManager: print("pooled " + task.url) - self.pool.apply_async( + self.pool.submit( TaskManager.run_task, - args=(task, self.db_path), - callback=TaskManager.task_complete, - error_callback=TaskManager.task_error - ) + task, self.db_path + ).add_done_callback(TaskManager.task_complete) @staticmethod def run_task(task, db_path): @@ -63,19 +61,20 @@ class TaskManager: result.end_time = datetime.utcnow() print("End task " + task.url) - return dict(result=result, db_path=db_path) + return result, db_path @staticmethod - def task_complete(kwargs): - result = kwargs["result"] - db_path = kwargs["db_path"] - print(result.status_code) - print(result.file_count) - print(result.start_time) - print(result.end_time) + def task_complete(result): + + task_result, db_path = result.result() + + print(task_result.status_code) + print(task_result.file_count) + print(task_result.start_time) + print(task_result.end_time) db = TaskManagerDatabase(db_path) - db.log_result(result) + db.log_result(task_result) print("Logged result to DB") @staticmethod diff --git a/database.py b/database.py index b991fda..fa8b2e6 100644 --- a/database.py +++ b/database.py @@ -191,6 +191,31 @@ class Database: cursor.execute("DELETE FROM ApiToken WHERE token=?", (token, )) conn.commit() + def _get_websites(self) -> dict: + + # todo: mem cache that + with sqlite3.connect(self.db_path) as conn: + + cursor = conn.cursor() + + cursor.execute("SELECT id, url FROM Website") + + result = {} + + for db_website in cursor.fetchall(): + result[db_website[0]] = db_website[1] + return result + + def join_search_result(self, page: dict) -> dict: + + websites = self._get_websites() + + for hit in page["hits"]["hits"]: + hit["_source"]["website_url"] = websites[hit["_source"]["website_id"]] + + return page + + diff --git a/search/search.py b/search/search.py index f00314a..e74693e 100644 --- a/search/search.py +++ b/search/search.py @@ -1,4 +1,6 @@ import elasticsearch +import os +import json from elasticsearch.exceptions import TransportError @@ -14,10 +16,7 @@ class SearchEngine: def import_json(self, in_str: str, website_id: int): raise NotImplementedError - def search(self, query) -> {}: - raise NotImplementedError - - def scroll(self, scroll_id) -> {}: + def search(self, query, page, per_page, sort_order) -> {}: raise NotImplementedError def reset(self): @@ -29,6 +28,15 @@ class SearchEngine: class ElasticSearchEngine(SearchEngine): + SORT_ORDERS = { + "score": ["_score"], + "size_asc": [{"size": {"order": "asc"}}], + "size_dsc": [{"size": {"order": "desc"}}], + "date_asc": [{"mtime": {"order": "asc"}}], + "date_desc": [{"mtime": {"order": "desc"}}], + "none": [] + } + def __init__(self, index_name): super().__init__() self.index_name = index_name @@ -68,7 +76,8 @@ class ElasticSearchEngine(SearchEngine): "name": {"analyzer": "my_nGram", "type": "text"}, "mtime": {"type": "date", "format": "epoch_millis"}, "size": {"type": "long"}, - "website_id": {"type": "integer"} + "website_id": {"type": "integer"}, + "ext": {"type": "keyword"} }}, doc_type="file", index=self.index_name) self.es.indices.open(index=self.index_name) @@ -85,16 +94,21 @@ class ElasticSearchEngine(SearchEngine): docs = [] for line in in_str.splitlines(): - docs.append(line) + doc = json.loads(line) + name, ext = os.path.splitext(doc["name"]) + doc["ext"] = ext if ext else "" + doc["name"] = name + doc["website_id"] = website_id + docs.append(doc) if len(docs) >= import_every: - self._index(docs, website_id) + self._index(docs) docs.clear() - self._index(docs, website_id) + self._index(docs) - def _index(self, docs, website_id): + def _index(self, docs): print("Indexing " + str(len(docs)) + " docs") - bulk_string = ElasticSearchEngine.create_bulk_index_string(docs, website_id) + bulk_string = ElasticSearchEngine.create_bulk_index_string(docs) result = self.es.bulk(body=bulk_string, index=self.index_name, doc_type="file") if result["errors"]: @@ -102,17 +116,15 @@ class ElasticSearchEngine(SearchEngine): raise IndexingError @staticmethod - def create_bulk_index_string(docs: list, website_id: int): + def create_bulk_index_string(docs: list): action_string = '{"index":{}}\n' - website_id_string = ',"website_id":' + str(website_id) + '}\n' # Add website_id param to each doc + return "\n".join("".join([action_string, json.dumps(doc)]) for doc in docs) - return "\n".join("".join([action_string, doc[:-1], website_id_string]) for doc in docs) - - def search(self, query, page, per_page) -> {}: + def search(self, query, page, per_page, sort_order) -> {}: filters = [] - sort_by = ["_score"] + sort_by = ElasticSearchEngine.SORT_ORDERS.get(sort_order, []) page = self.es.search(body={ "query": { diff --git a/static/css/main.css b/static/css/main.css index d268c9d..66d3863 100644 --- a/static/css/main.css +++ b/static/css/main.css @@ -11,7 +11,7 @@ padding-bottom: 0.3rem; } .table td { - padding: .40rem .75rem; + padding: 2px 0; } .bg-application { @@ -75,4 +75,4 @@ .hl { background: #fff217; -} \ No newline at end of file +} diff --git a/templates/search.html b/templates/search.html index 43b5db7..8bc7485 100644 --- a/templates/search.html +++ b/templates/search.html @@ -18,17 +18,12 @@
@@ -47,7 +42,7 @@
- {% if results["hits"]["total"] > 0 %} + {% if results and results["hits"]["total"] > 0 %}
@@ -60,13 +55,11 @@ {% set hl_name = hit["highlight"]["name"][0] if "name" in hit["highlight"] else src["name"] %} {% set hl_path = hit["highlight"]["path"][0] if "path" in hit["highlight"] else src["path"] %} - {# TODO: website url + path #} - {% set path = src["path"] %} {# File name & link #} - - {{ hl_name |safe }} + + {{ hl_name |safe }}{{ src["ext"] }} {# File type badge #} {% set mime = get_mime(src["path"]) %} @@ -78,13 +71,13 @@ {# File path #}
{{ hl_path | safe }}{{ truncate_path(src["path"], 60) }} + href="/website/{{ src["website_id"] }}">{{ src["website_url"] }}{{ hl_path|safe }}
- {# File size #} + {# File size & date #} - {{ src["size"] | filesizeformat if src["size"] >= 0 else "?" }} +
{{ src["size"] | filesizeformat if src["size"] >= 0 else "?" }}
+ {{ src["mtime"] | datetime_format }} {% endfor %}