mirror of
https://github.com/simon987/od-database.git
synced 2025-12-18 00:59:03 +00:00
docker-compose setup (wip)
This commit is contained in:
26
tasks.py
26
tasks.py
@@ -3,9 +3,11 @@ import logging
|
||||
import os
|
||||
import time
|
||||
from multiprocessing.pool import ThreadPool
|
||||
from tempfile import NamedTemporaryFile
|
||||
from threading import Thread
|
||||
from uuid import uuid4
|
||||
|
||||
import requests
|
||||
import urllib3
|
||||
|
||||
import config
|
||||
@@ -60,13 +62,13 @@ class IndexingTask:
|
||||
class TaskManager:
|
||||
|
||||
def __init__(self):
|
||||
self.search = ElasticSearchEngine("od-database")
|
||||
self.search = ElasticSearchEngine(config.ES_URL, config.ES_INDEX)
|
||||
self.db = database.Database(config.DB_CONN_STR)
|
||||
self.tracker = TaskTrackerApi(config.TT_API)
|
||||
|
||||
self.worker = Worker.from_file(self.tracker)
|
||||
if not self.worker:
|
||||
self.worker = self.tracker.make_worker("oddb_master")
|
||||
self.worker = self.tracker.make_worker("$oddb_master")
|
||||
self.worker.dump_to_file()
|
||||
self.worker.request_access(config.TT_CRAWL_PROJECT, False, True)
|
||||
self.worker.request_access(config.TT_INDEX_PROJECT, True, False)
|
||||
@@ -91,8 +93,9 @@ class TaskManager:
|
||||
try:
|
||||
recipe = task.json_recipe()
|
||||
logger.debug("Got indexing task: " + str(recipe))
|
||||
filename = os.path.join(config.WSB_PATH,
|
||||
format_file_name(recipe["website_id"], recipe["upload_token"]))
|
||||
|
||||
filename = download_file(config.WSB_API + "/slot?token=" + recipe["upload_token"])
|
||||
|
||||
self._complete_task(filename, Task(recipe["website_id"], recipe["url"]))
|
||||
except Exception as e:
|
||||
self.worker.release_task(task_id=task.id, result=1, verification=0)
|
||||
@@ -167,3 +170,18 @@ class TaskManager:
|
||||
|
||||
def format_file_name(website_id, token):
|
||||
return "%d_%s.NDJSON" % (website_id, token,)
|
||||
|
||||
|
||||
def download_file(url):
|
||||
r = requests.get(url, stream=True,)
|
||||
|
||||
if r.status_code != 200:
|
||||
raise ValueError("HTTP error %d: %s" % (r.status_code, url))
|
||||
|
||||
tmp = NamedTemporaryFile(delete=False)
|
||||
for chunk in r.iter_content(chunk_size=4096):
|
||||
if chunk:
|
||||
tmp.write(chunk)
|
||||
tmp.close()
|
||||
|
||||
return tmp.name
|
||||
|
||||
Reference in New Issue
Block a user