initial commit

This commit is contained in:
simon987 2019-02-18 22:00:02 -05:00
commit 8c5f2e0f77
6 changed files with 354 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
.idea/
work/
worker.json

6
.idea/vcs.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

Binary file not shown.

230
api.py Normal file
View File

@ -0,0 +1,230 @@
import base64
import hashlib
import json
import requests
import hmac
API_TIMEOUT = 5
MAX_HTTP_RETRIES = 2
VERSION = 1.0
LOG_TRACE = 7
LOG_DEBUG = 6
LOG_INFO = 5
LOG_WARN = 4
LOG_ERROR = 3
LOG_PANIC = 2
LOG_FATAL = 1
class Project:
def __init__(self, json_obj):
self.id: int = json_obj["id"]
self.priority: int = json_obj["priority"]
self.name: str = json_obj["name"]
self.clone_url: str = json_obj["clone_url"]
self.git_repo: str = json_obj["git_repo"]
self.version: str = json_obj["version"]
self.motd: str = json_obj["motd"]
self.public: bool = json_obj["public"]
self.secret: str
def toJSON(self):
return json.dumps({
"id": self.id, "priority": self.priority, "name": self.name,
"clone_url": self.clone_url, "git_repo": self.git_repo,
"version": self.version, "motd": self.motd, "public": self.public,
})
class Task:
def __init__(self, json_obj):
self.id: int = json_obj["id"]
self.priority: int = json_obj["priority"]
self.project: Project = Project(json_obj["project"])
self.retries: int = json_obj["retries"]
self.max_retries: int = json_obj["max_retries"]
self.status: int = json_obj["status"]
self.recipe: str = json_obj["recipe"]
self.max_assign_time: int = json_obj["max_assign_time"]
self.assign_time: int = json_obj["assign_time"]
self.verification_count: int = json_obj["verification_count"]
def toJSON(self):
return json.dumps({
"id": self.id, "priority": self.priority,
"project": self.project.toJSON(), "retries": self.retries,
"max_retries": self.max_retries, "status": self.status,
"recipe": self.recipe, "max_assign_time": self.max_assign_time,
"verification_count": self.verification_count,
})
class Worker:
def __init__(self, wid=None, alias=None, secret=None, api=None):
self.id: int = wid
self.alias: str = alias
self.secret: bytes = base64.b64decode(secret)
self._secret_b64 = secret
self._api: TaskTrackerApi = api
def fetch_task(self, project_id=None):
return self._api.fetch_task(self)
def submit_task(self, project, recipe, priority=1, max_assign_time=3600, hash64=0, unique_str="",
verification_count=1, max_retries=3):
return self._api.submit_task(self, project, recipe, priority, max_assign_time, hash64, unique_str,
verification_count, max_retries)
def release_task(self, task_id: int, result: int, verification):
return self._api.release_task(self, task_id, result, verification)
def log(self, level: int, message: str, timestamp: int, scope: str):
return self._api.log(self, level, message, timestamp, scope)
def request_access(self, project: int, assign=True, submit=True):
return self._api.request_access(self, project, assign, submit)
def get_secret(self, project: int):
return self._api.get_secret(self, project)
def dump_to_file(self):
with open("worker.json", "w") as out:
json.dump({
"id": self.id,
"alias": self.alias,
"secret": self._secret_b64
}, out)
@staticmethod
def from_file(api):
with open("worker.json", "r") as f:
obj = json.load(f)
return Worker(wid=obj["id"], alias=obj["alias"],
secret=obj["secret"], api=api)
def format_headers(ua: str = None, wid: int = None, signature: str = None):
headers = dict()
if ua is None:
headers["User-Agent"] = "tt_py_client" + str(VERSION)
else:
headers["User-Agent"] = ua
headers["X-Worker-Id"] = str(wid)
headers["X-Signature"] = str(signature)
return headers
class TaskTrackerApi:
def __init__(self, url: str):
self.url = url
def make_worker(self, alias) -> Worker:
response = self._http_post("/worker/create", body={"alias": alias})
if response:
json_response = json.loads(response.text)
print(response.text)
if response.status_code != 200:
raise Exception(json_response["message"])
worker = Worker(json_response["content"]["worker"]["id"], json_response["content"]["worker"]["alias"],
json_response["content"]["worker"]["secret"], self)
return worker
def fetch_task(self, worker: Worker) -> Task:
response = self._http_get("/task/get", worker)
if response:
json_response = json.loads(response.text)
if json_response["ok"]:
return Task(json_response["content"]["task"])
return None
def submit_task(self, worker: Worker, project, recipe, priority, max_assign_time, hash64, unique_str,
verification_count, max_retries):
return self._http_post("/task/submit", {
"project": project,
"recipe": recipe,
"priority": priority,
"max_assign_time": max_assign_time,
"hash_u64": hash64,
"unique_str": unique_str,
"verification_count": verification_count,
"max_retries": max_retries,
}, worker)
def log(self, worker: Worker, level: int, message: str, timestamp: int, scope: str):
if level == LOG_TRACE:
return self._http_post("/log/trace", {"level": level, "message": message, "timestamp": timestamp, "scope": scope}, worker)
if level == LOG_INFO:
return self._http_post("/log/info", {"level": level, "message": message, "timestamp": timestamp, "scope": scope}, worker)
if level == LOG_WARN:
return self._http_post("/log/warn", {"level": level, "message": message, "timestamp": timestamp, "scope": scope}, worker)
if level == LOG_ERROR:
return self._http_post("/log/error", {"level": level, "message": message, "timestamp": timestamp, "scope": scope}, worker)
print("Invalid log level")
def release_task(self, worker: Worker, task_id: int, result: int, verification: int):
return self._http_post("/task/release", {
"task_id": task_id,
"result": result,
"verification": verification
}, worker)
def request_access(self, worker: Worker, project: int, assign:bool, submit:bool):
return self._http_post("/project/request_access", {
"project": project,
"assign": assign,
"submit": submit,
}, worker)
def get_secret(self, worker: Worker, project: int):
r = self._http_get("/project/secret/" + str(project), worker)
if r.status_code == 200:
return json.loads(r.text)["content"]["secret"]
def _http_get(self, endpoint: str, worker: Worker = None):
if worker is not None:
signature = hmac.new(key=worker.secret, msg=endpoint.encode("utf8"), digestmod=hashlib.sha256).hexdigest()
headers = format_headers(signature=signature, wid=worker.id)
else:
headers = format_headers()
retries = 0
while retries < MAX_HTTP_RETRIES:
try:
response = requests.get(self.url + endpoint, timeout=API_TIMEOUT,
headers=headers)
return response
except:
retries += 1
pass
return None
def _http_post(self, endpoint: str, body, worker: Worker = None):
body_bytes = json.dumps(body).encode("utf8")
if worker is not None:
signature = hmac.new(key=worker.secret, msg=body_bytes, digestmod=hashlib.sha256).hexdigest()
headers = format_headers(signature=signature, wid=worker.id)
else:
headers = format_headers()
retries = 0
while retries < MAX_HTTP_RETRIES:
try:
response = requests.post(self.url + endpoint, timeout=API_TIMEOUT,
headers=headers, data=body_bytes)
return response
except:
retries += 1
pass
return None

2
run.py Normal file
View File

@ -0,0 +1,2 @@
#!/bin/env python

112
worker.py Normal file
View File

@ -0,0 +1,112 @@
import datetime
import json
import os
import shutil
import subprocess
from subprocess import Popen
from api import Project, Worker, TaskTrackerApi, Task
class WorkerContext:
def _format_project_path(self, project: Project):
return "work/%s/%d_%s" % (self._ctx_name, project.id, project.version,)
def __init__(self, worker: Worker, ctx_name):
self._worker = worker
self._projects = dict()
self._ctx_name = ctx_name
def _deploy_project(self, project: Project):
project.secret = self._worker.get_secret(project.id)
print("Deploying project " + project.name)
path = self._format_project_path(project)
if os.path.exists(path):
shutil.rmtree(path)
os.makedirs(path, exist_ok=True)
proc = Popen(args=["git", "clone", project.clone_url, path])
proc.wait()
if project.version:
proc = Popen(args=["git", "checkout", project.version], cwd=os.path.abspath(path))
proc.wait()
if os.path.exists(os.path.join(path, "setup")):
proc = Popen(args=["./setup", ], cwd=os.path.abspath(path))
proc.wait()
self._projects[project.id] = project
def _get_project_path(self, project: Project):
if project.id not in self._projects or self._projects[project.id].version != project.version:
self._deploy_project(project)
return self._format_project_path(project)
def execute_task(self, task: Task):
path = self._get_project_path(task.project)
if os.path.exists(os.path.join(path, "run")):
proc = Popen(args=["./run", task.toJSON(), self._projects[task.project.id].secret],
stdout=subprocess.PIPE, cwd=os.path.abspath(path))
result = proc.communicate()[0].decode("utf8")
try:
json_result = json.loads(result)
self._do_post_task_hooks(json_result)
print(self._worker.release_task(task.id,
json_result["result"],
json_result["verification"] if "verification" in json_result else 0).text)
except Exception as e:
print(e)
return
def _do_post_task_hooks(self, res):
if "logs" in res:
for log in res["logs"]:
r = self._worker.log(log["level"] if "level" in log else 7,
log["message"],
log.get("timestamp", int(datetime.datetime.utcnow().timestamp())),
log.get("scope", "tt_py_client"))
print("LOG: %s <%d>" % (log, r.status_code))
if "tasks" in res:
for task in res["tasks"]:
r = self._worker.submit_task(task["project"],
task["recipe"],
task.get("priority"),
task.get("max_assign_time"),
task.get("hash64"),
task.get("unique_str"),
task.get("verification_count"),
task.get("max_retries"))
print("SUBMIT: %s <%d>" % (task, r.status_code))
api = TaskTrackerApi("http://localhost:42901")
# w = api.make_worker("python_tt")
# w.dump_to_file()
w1 = Worker.from_file(api)
# print(w1.request_access(1, True, True).text)
# def submit(i):
# w1.submit_task(project=1, recipe=json.dumps({
# "tid": str(i),
# }), hash64=i)
# pool = multiprocessing.Pool(processes=100)
# pool.map(submit, range(0, 500000))
# pool.join()
# print(t.toJSON())
t = w1.fetch_task()
ctx = WorkerContext(w1, "main")
ctx.execute_task(t)