Indexing after crawling is a bit more efficient

This commit is contained in:
Simon 2018-06-14 16:41:43 -04:00
parent 83ca579ec7
commit dffd032659
5 changed files with 21 additions and 30 deletions

View File

@ -1,4 +1,4 @@
from flask import Flask, request, abort, Response
from flask import Flask, request, abort, Response, send_file
from flask_httpauth import HTTPTokenAuth
import json
from crawl_server.task_manager import TaskManager, Task
@ -14,7 +14,6 @@ tm = TaskManager("tm_db.sqlite3", 8)
@auth.verify_token
def verify_token(token):
print(token)
if token in tokens:
return True
@ -68,15 +67,10 @@ def get_file_list(website_id):
file_name = "./crawled/" + str(website_id) + ".json"
if os.path.exists(file_name):
with open(file_name, "r") as f:
file_list = f.read()
os.remove(file_name)
return file_list
return send_file(file_name)
else:
return abort(404)
if __name__ == "__main__":
app.run(port=5002)
app.run(port=5001)

View File

@ -13,4 +13,4 @@ ftputil
lxml
elasticsearch
python-dateutil
flask_httpauth
flask_httpauth

View File

@ -1,7 +1,7 @@
import elasticsearch
from elasticsearch import helpers
import os
import json
import ujson
class IndexingError(Exception):
@ -90,17 +90,14 @@ class ElasticSearchEngine(SearchEngine):
def ping(self):
return self.es.ping()
def import_json(self, in_str: str, website_id: int):
def import_json(self, in_lines, website_id: int):
if not in_str:
return
import_every = 5000
import_every = 25000
docs = []
for line in in_str.splitlines():
doc = json.loads(line)
for line in in_lines:
doc = ujson.loads(line)
name, ext = os.path.splitext(doc["name"])
doc["ext"] = ext[1:].lower() if ext and len(ext) > 1 else ""
doc["name"] = name
@ -125,7 +122,7 @@ class ElasticSearchEngine(SearchEngine):
def create_bulk_index_string(docs: list):
action_string = '{"index":{}}\n'
return "\n".join("".join([action_string, json.dumps(doc)]) for doc in docs)
return "\n".join("".join([action_string, ujson.dumps(doc)]) for doc in docs)
def search(self, query, page, per_page, sort_order) -> {}:

View File

@ -51,8 +51,8 @@ def get_random_file():
doc = dict()
doc["name"] = random_file_name()
doc["path"] = random_path()
doc["mtime"] = random.randint(0, 10000000)
doc["size"] = random.randint(-1, 100000000000000)
doc["mtime"] = random.randint(0, 1000000000000)
doc["size"] = random.randint(-1, 1000000000)
return doc
@ -80,8 +80,7 @@ def random_searches(count=10000000, max_workers=1000):
pool.map(search, random.choices(terms, k=count))
# dump_local_filesystem("/mnt/")
# index_file_list("crawl_server/crawled/123.json", 10)
# index_file_list("random_dump.json", 1000)
# random_searches(100000)
dump_random_files(20000 * 100000)
# dump_random_files(20000 * 100000)

13
task.py
View File

@ -31,7 +31,7 @@ class CrawlServer:
def fetch_completed_tasks(self) -> list:
try:
r = requests.get(self.url + "/task/completed")
r = requests.get(self.url + "/task/completed", headers=CrawlServer.headers)
return [
TaskResult(r["status_code"], r["file_count"], r["start_time"], r["end_time"], r["website_id"])
for r in json.loads(r.text)]
@ -42,7 +42,7 @@ class CrawlServer:
def fetch_queued_tasks(self) -> list:
try:
r = requests.get(self.url + "/task/")
r = requests.get(self.url + "/task/", headers=CrawlServer.headers)
return [
Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"])
for t in json.loads(r.text)
@ -53,7 +53,7 @@ class CrawlServer:
def fetch_current_tasks(self):
try:
r = requests.get(self.url + "/task/current")
r = requests.get(self.url + "/task/current", headers=CrawlServer.headers)
return [
Task(t["website_id"], t["url"], t["priority"], t["callback_type"], t["callback_args"])
for t in json.loads(r.text)
@ -64,8 +64,9 @@ class CrawlServer:
def fetch_website_files(self, website_id) -> str:
try:
r = requests.get(self.url + "/file_list/" + str(website_id) + "/")
return r.text if r.status_code == 200 else ""
r = requests.get(self.url + "/file_list/" + str(website_id) + "/", stream=True, headers=CrawlServer.headers)
for line in r.iter_lines(chunk_size=1024 * 256):
yield line
except ConnectionError:
return ""
@ -74,7 +75,7 @@ class TaskDispatcher:
def __init__(self):
scheduler = BackgroundScheduler()
scheduler.add_job(self.check_completed_tasks, "interval", seconds=1)
scheduler.add_job(self.check_completed_tasks, "interval", seconds=10)
scheduler.start()
self.search = ElasticSearchEngine("od-database")