mirror of
https://github.com/simon987/od-database.git
synced 2025-04-04 06:52:59 +00:00
192 lines
6.6 KiB
Python
192 lines
6.6 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
import traceback
|
|
from multiprocessing.pool import ThreadPool
|
|
from tempfile import NamedTemporaryFile
|
|
from threading import Thread
|
|
from uuid import uuid4
|
|
|
|
import requests
|
|
import urllib3
|
|
|
|
import config
|
|
import database
|
|
from database import Website
|
|
from search.search import ElasticSearchEngine
|
|
from task_tracker_drone.src.tt_drone.api import TaskTrackerApi, Worker
|
|
from ws_bucket_client.api import WsBucketApi
|
|
|
|
urllib3.disable_warnings()
|
|
|
|
logger = logging.getLogger("default")
|
|
|
|
|
|
class Task:
|
|
|
|
def __init__(self, website_id: int, url: str, priority: int = 1,
|
|
callback_type: str = None, callback_args: str = None,
|
|
upload_token: str = None):
|
|
self.website_id = website_id
|
|
self.url = url
|
|
self.priority = priority
|
|
self.callback_type = callback_type
|
|
self.callback_args = json.loads(callback_args) if callback_args else {}
|
|
self.upload_token = upload_token
|
|
|
|
def to_json(self):
|
|
return {
|
|
"website_id": self.website_id,
|
|
"url": self.url,
|
|
"callback_type": self.callback_type,
|
|
"callback_args": json.dumps(self.callback_args),
|
|
"upload_token": self.upload_token
|
|
}
|
|
|
|
def __str__(self):
|
|
return json.dumps(self.to_json())
|
|
|
|
def __repr__(self):
|
|
return self.__str__()
|
|
|
|
|
|
class IndexingTask:
|
|
|
|
def __init__(self, website_id: int, file_path: str, callback_type: str, callback_args):
|
|
self.website_id = website_id
|
|
self.file_path = file_path
|
|
self.callback_type = callback_type
|
|
self.callback_args = callback_args
|
|
|
|
|
|
class TaskManager:
|
|
|
|
def __init__(self):
|
|
self.search = ElasticSearchEngine(config.ES_URL, config.ES_INDEX)
|
|
self.db = database.Database(config.DB_CONN_STR)
|
|
self.tracker = TaskTrackerApi(config.TT_API)
|
|
|
|
self.bucket = WsBucketApi(config.WSB_API, config.WSB_SECRET)
|
|
self._indexer_threads = list()
|
|
|
|
self.worker = Worker.from_file(self.tracker)
|
|
if not self.worker:
|
|
self.worker = self.tracker.make_worker("$oddb_master")
|
|
if not self.worker:
|
|
print("Could not create worker: %s" % traceback.format_exc())
|
|
return
|
|
self.worker.dump_to_file()
|
|
self.worker.request_access(config.TT_CRAWL_PROJECT, False, True)
|
|
self.worker.request_access(config.TT_INDEX_PROJECT, True, False)
|
|
|
|
def start_indexer_threads(self):
|
|
logger.info("Starting %s indexer threads " % (config.INDEXER_THREADS, ))
|
|
for _ in range(config.INDEXER_THREADS):
|
|
t = Thread(target=self._do_indexing)
|
|
t.setDaemon(True)
|
|
self._indexer_threads.append(t)
|
|
t.start()
|
|
|
|
def _do_indexing(self):
|
|
|
|
while True:
|
|
task = self.worker.fetch_task(project_id=config.TT_INDEX_PROJECT)
|
|
|
|
if task:
|
|
try:
|
|
recipe = task.json_recipe()
|
|
logger.debug("Got indexing task: " + str(recipe))
|
|
|
|
filename = download_file(config.WSB_API + "/slot?token=" + recipe["upload_token"])
|
|
|
|
self._complete_task(filename, Task(recipe["website_id"], recipe["url"]))
|
|
except Exception as e:
|
|
self.worker.release_task(task_id=task.id, result=1, verification=0)
|
|
finally:
|
|
try:
|
|
self.worker.release_task(task_id=task.id, result=0, verification=0)
|
|
except:
|
|
pass
|
|
else:
|
|
time.sleep(5)
|
|
|
|
def _complete_task(self, file_list, task):
|
|
|
|
self.search.delete_docs(task.website_id)
|
|
|
|
if file_list:
|
|
def iter_lines():
|
|
with open(file_list, "r") as f:
|
|
line = f.readline()
|
|
while line:
|
|
yield line
|
|
line = f.readline()
|
|
|
|
self.search.import_json(iter_lines(), task.website_id)
|
|
os.remove(file_list)
|
|
|
|
self.db.update_website_date_if_exists(task.website_id)
|
|
|
|
def do_recrawl(self):
|
|
logger.debug("Creating re-crawl tasks")
|
|
self._generate_crawling_tasks()
|
|
|
|
def _generate_crawling_tasks(self):
|
|
|
|
# TODO: Insert more in-depth re-crawl logic here
|
|
websites_to_crawl = self.db.get_oldest_updated_websites(config.RECRAWL_POOL_SIZE, prefix="http")
|
|
|
|
def recrawl(website: Website):
|
|
crawl_task = Task(website.id, website.url,
|
|
priority=(int((time.time() - website.last_modified.timestamp()) / 3600)))
|
|
self.queue_task(crawl_task)
|
|
|
|
pool = ThreadPool(processes=30)
|
|
pool.map(func=recrawl, iterable=websites_to_crawl)
|
|
pool.close()
|
|
|
|
def queue_task(self, task: Task):
|
|
max_assign_time = 24 * 4 * 3600
|
|
upload_token = uuid4().__str__()
|
|
|
|
task.upload_token = upload_token
|
|
tracker_response = self.worker.submit_task(config.TT_CRAWL_PROJECT,
|
|
recipe=task.__str__(),
|
|
priority=task.priority,
|
|
max_assign_time=max_assign_time,
|
|
hash64=task.website_id,
|
|
verification_count=1,
|
|
max_retries=3
|
|
)
|
|
print(tracker_response.text)
|
|
logging.info("Queued task and made it available to crawlers: t=%s, r=%s" % (task, tracker_response.text))
|
|
if not tracker_response.json()["ok"]:
|
|
return
|
|
|
|
bucket_response = self.bucket.allocate(upload_token.__str__(),
|
|
21474837499, # 20Gib
|
|
format_file_name(task.website_id, upload_token),
|
|
to_dispose_date=int(time.time() + max_assign_time),
|
|
upload_hook="")
|
|
logging.info("Allocated upload bucket: %d, t=%s, r=%s" % (task.website_id, upload_token, bucket_response.text))
|
|
|
|
|
|
def format_file_name(website_id, token):
|
|
return "%d_%s.NDJSON" % (website_id, token,)
|
|
|
|
|
|
def download_file(url):
|
|
r = requests.get(url, stream=True,)
|
|
|
|
if r.status_code != 200:
|
|
raise ValueError("HTTP error %d: %s" % (r.status_code, url))
|
|
|
|
tmp = NamedTemporaryFile(delete=False)
|
|
for chunk in r.iter_content(chunk_size=4096):
|
|
if chunk:
|
|
tmp.write(chunk)
|
|
tmp.close()
|
|
|
|
return tmp.name
|