diff --git a/crawler/http.py b/crawler/http.py index e01fbc3..5728a1f 100644 --- a/crawler/http.py +++ b/crawler/http.py @@ -76,7 +76,7 @@ class HttpDirectory(RemoteDirectory): results = [] - if len(urls_to_request) > 3: + if len(urls_to_request) > 4: # Many urls, use multi-threaded solution pool = ThreadPool(processes=10) files = pool.starmap(HttpDirectory._request_file, zip(repeat(self), urls_to_request)) diff --git a/requirements.txt b/requirements.txt index a3f3124..e9ae70f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,5 @@ apscheduler bcrypt ftputil lxml -elasticsearch \ No newline at end of file +elasticsearch +python-dateutil \ No newline at end of file diff --git a/search/search.py b/search/search.py index 5e3490f..76386b8 100644 --- a/search/search.py +++ b/search/search.py @@ -1,4 +1,5 @@ import elasticsearch +from elasticsearch.exceptions import TransportError class IndexingError(Exception): @@ -13,7 +14,10 @@ class SearchEngine: def import_json(self, in_file: str, website_id: int): raise NotImplementedError - def search(self, query) -> list: + def search(self, query) -> {}: + raise NotImplementedError + + def scroll(self, scroll_id) -> {}: raise NotImplementedError def reset(self): @@ -40,26 +44,7 @@ class ElasticSearchEngine(SearchEngine): self.es.indices.create(index=self.index_name) self.es.indices.close(index=self.index_name) - # Paths - self.es.indices.put_settings(body= - {"analysis": { - "tokenizer": { - "path_tokenizer": { - "type": "path_hierarchy" - } - } - }}, index=self.index_name) - - self.es.indices.put_settings(body= - {"analysis": { - "analyzer": { - "path_analyser": { - "tokenizer": "path_tokenizer", "filter": ["lowercase"] - } - } - }}, index=self.index_name) - - # File names + # File names and paths self.es.indices.put_settings(body= {"analysis": { "tokenizer": { @@ -79,7 +64,7 @@ class ElasticSearchEngine(SearchEngine): # Mappings self.es.indices.put_mapping(body={"properties": { - "path": {"type": "text", "analyzer": "path_analyser"}, + "path": {"analyzer": "my_nGram", "type": "text"}, "name": {"analyzer": "my_nGram", "type": "text"}, "mtime": {"type": "date", "format": "epoch_millis"}, "size": {"type": "long"}, @@ -131,5 +116,39 @@ class ElasticSearchEngine(SearchEngine): result += action_string + doc[:-1] + website_id_string return result - def search(self, query): - pass + def search(self, query) -> {}: + + filters = [] + + page = self.es.search(body={ + "query": { + "bool": { + "must": { + "multi_match": { + "query": query, + "fields": ["name", "path"], + "operator": "and" + } + }, + "filter": filters + } + }, + "sort": [ + "_score" + ], + "highlight": { + "fields": { + "name": {"pre_tags": [""], "post_tags": [""]}, + } + }, + "size": 40}, index=self.index_name, scroll="8m") + + # todo get scroll time from config + # todo get size from config + return page + + def scroll(self, scroll_id) -> {}: + try: + return self.es.scroll(scroll_id=scroll_id, scroll="3m") # todo get scroll time from config + except TransportError: + return None diff --git a/test/test_search.py b/test/test_search.py index be6b592..14c8cbb 100644 --- a/test/test_search.py +++ b/test/test_search.py @@ -10,17 +10,18 @@ class SearchTest(TestCase): def setUp(self): self.search = ElasticSearchEngine("od-database-test") self.search.reset() - time.sleep(1) + time.sleep(0.5) def test_ping(self): self.assertTrue(self.search.ping(), "Search engine not running") - def test_import_json(self): + def test_import_and_search(self): files = [ - {"name": "a", "size": 1000000000000000000, "path": "c/d", "mtime": 1528765672}, - {"name": "b", "size": 123, "path": "", "mtime": None}, - {"name": "c", "size": -1, "path": "c", "mtime": 12345} + {"name": "PaNopTicon", "size": 1000000000000000000, "path": "c/d", "mtime": 1528765672}, + {"name": "BLAckwAter.Park", "size": 123, "path": "", "mtime": None}, + {"name": "10'000 days", "size": -1, "path": "c", "mtime": 12345}, + {"name": "Dead Racer", "size": 1000, "path": "Speed Machine [FLAC]", "mtime": 12345} ] with open("tmp.json", "w") as f: @@ -28,11 +29,59 @@ class SearchTest(TestCase): f.write(json.dumps(file) + "\n") self.search.import_json("tmp.json", 123) - time.sleep(3) - self.assertEqual(3, self.search.es.count(self.search.index_name, "file")["count"]) + time.sleep(2) + self.assertEqual(4, self.search.es.count(self.search.index_name, "file")["count"]) + + # Search for 'pan' in PaNopTicon and expect 1 result, a scroll id, and an highlight + page = self.search.search("pan") + self.assertIsNotNone(page["_scroll_id"]) + self.assertEqual(1, page["hits"]["total"]) + self.assertIsNotNone(page["hits"]["hits"][0]["highlight"]["name"]) + + # Search for 'park' and expect BLAckwAter.Park + page = self.search.search("park") + self.assertEqual(1, page["hits"]["total"]) + + # Search for fla and expect Dead Racer + page = self.search.search("fla") + self.assertEqual(1, page["hits"]["total"]) + + # Search for 10'000 and expect 10'000 days + page = self.search.search("10'000") + self.assertEqual(1, page["hits"]["total"]) os.remove("tmp.json") + def test_scroll(self): + files = [ + {"name": "PaNopTicon", "size": 1000000000000000000, "path": "c/d", "mtime": 1528765672}, + {"name": "BLAckwAter.Park", "size": 123, "path": "", "mtime": None}, + {"name": "10'000 days", "size": -1, "path": "c", "mtime": 12345}, + {"name": "Dead Racer", "size": 1000, "path": "Speed Machine [FLAC]", "mtime": 12345} + ] + with open("tmp.json", "w") as f: + for file in files: + f.write(json.dumps(file) + "\n") + self.search.import_json("tmp.json", 123) + time.sleep(2) + + page = self.search.search("") + scroll_id = page["_scroll_id"] + + # next page + next_page = self.search.scroll(scroll_id) + next_scroll_id = next_page["_scroll_id"] + self.assertIsNotNone(next_scroll_id) + + # again + next_page2 = self.search.scroll(next_scroll_id) + self.assertIsNotNone(next_page2["_scroll_id"]) + + def test_invalid_scroll(self): + + invalid_scroll = "blahblah" + + self.assertIsNone(self.search.scroll(invalid_scroll))