commit 8c5f2e0f7788f57eef766d7edb2e1adf8a9fa0c8 Author: simon987 Date: Mon Feb 18 22:00:02 2019 -0500 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5f3ae1a --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.idea/ +work/ +worker.json + diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/__pycache__/api.cpython-37.pyc b/__pycache__/api.cpython-37.pyc new file mode 100644 index 0000000..2340eae Binary files /dev/null and b/__pycache__/api.cpython-37.pyc differ diff --git a/api.py b/api.py new file mode 100644 index 0000000..a36545a --- /dev/null +++ b/api.py @@ -0,0 +1,230 @@ +import base64 +import hashlib +import json + +import requests +import hmac + +API_TIMEOUT = 5 +MAX_HTTP_RETRIES = 2 +VERSION = 1.0 + +LOG_TRACE = 7 +LOG_DEBUG = 6 +LOG_INFO = 5 +LOG_WARN = 4 +LOG_ERROR = 3 +LOG_PANIC = 2 +LOG_FATAL = 1 + + +class Project: + def __init__(self, json_obj): + self.id: int = json_obj["id"] + self.priority: int = json_obj["priority"] + self.name: str = json_obj["name"] + self.clone_url: str = json_obj["clone_url"] + self.git_repo: str = json_obj["git_repo"] + self.version: str = json_obj["version"] + self.motd: str = json_obj["motd"] + self.public: bool = json_obj["public"] + self.secret: str + + def toJSON(self): + return json.dumps({ + "id": self.id, "priority": self.priority, "name": self.name, + "clone_url": self.clone_url, "git_repo": self.git_repo, + "version": self.version, "motd": self.motd, "public": self.public, + }) + + +class Task: + def __init__(self, json_obj): + self.id: int = json_obj["id"] + self.priority: int = json_obj["priority"] + self.project: Project = Project(json_obj["project"]) + self.retries: int = json_obj["retries"] + self.max_retries: int = json_obj["max_retries"] + self.status: int = json_obj["status"] + self.recipe: str = json_obj["recipe"] + self.max_assign_time: int = json_obj["max_assign_time"] + self.assign_time: int = json_obj["assign_time"] + self.verification_count: int = json_obj["verification_count"] + + def toJSON(self): + return json.dumps({ + "id": self.id, "priority": self.priority, + "project": self.project.toJSON(), "retries": self.retries, + "max_retries": self.max_retries, "status": self.status, + "recipe": self.recipe, "max_assign_time": self.max_assign_time, + "verification_count": self.verification_count, + }) + + +class Worker: + def __init__(self, wid=None, alias=None, secret=None, api=None): + self.id: int = wid + self.alias: str = alias + self.secret: bytes = base64.b64decode(secret) + self._secret_b64 = secret + self._api: TaskTrackerApi = api + + def fetch_task(self, project_id=None): + return self._api.fetch_task(self) + + def submit_task(self, project, recipe, priority=1, max_assign_time=3600, hash64=0, unique_str="", + verification_count=1, max_retries=3): + return self._api.submit_task(self, project, recipe, priority, max_assign_time, hash64, unique_str, + verification_count, max_retries) + + def release_task(self, task_id: int, result: int, verification): + return self._api.release_task(self, task_id, result, verification) + + def log(self, level: int, message: str, timestamp: int, scope: str): + return self._api.log(self, level, message, timestamp, scope) + + def request_access(self, project: int, assign=True, submit=True): + return self._api.request_access(self, project, assign, submit) + + def get_secret(self, project: int): + return self._api.get_secret(self, project) + + def dump_to_file(self): + with open("worker.json", "w") as out: + json.dump({ + "id": self.id, + "alias": self.alias, + "secret": self._secret_b64 + }, out) + + @staticmethod + def from_file(api): + with open("worker.json", "r") as f: + obj = json.load(f) + return Worker(wid=obj["id"], alias=obj["alias"], + secret=obj["secret"], api=api) + + +def format_headers(ua: str = None, wid: int = None, signature: str = None): + headers = dict() + + if ua is None: + headers["User-Agent"] = "tt_py_client" + str(VERSION) + else: + headers["User-Agent"] = ua + + headers["X-Worker-Id"] = str(wid) + headers["X-Signature"] = str(signature) + + return headers + + +class TaskTrackerApi: + def __init__(self, url: str): + self.url = url + + def make_worker(self, alias) -> Worker: + + response = self._http_post("/worker/create", body={"alias": alias}) + if response: + json_response = json.loads(response.text) + print(response.text) + + if response.status_code != 200: + raise Exception(json_response["message"]) + + worker = Worker(json_response["content"]["worker"]["id"], json_response["content"]["worker"]["alias"], + json_response["content"]["worker"]["secret"], self) + return worker + + def fetch_task(self, worker: Worker) -> Task: + response = self._http_get("/task/get", worker) + if response: + json_response = json.loads(response.text) + if json_response["ok"]: + return Task(json_response["content"]["task"]) + return None + + def submit_task(self, worker: Worker, project, recipe, priority, max_assign_time, hash64, unique_str, + verification_count, max_retries): + + return self._http_post("/task/submit", { + "project": project, + "recipe": recipe, + "priority": priority, + "max_assign_time": max_assign_time, + "hash_u64": hash64, + "unique_str": unique_str, + "verification_count": verification_count, + "max_retries": max_retries, + }, worker) + + def log(self, worker: Worker, level: int, message: str, timestamp: int, scope: str): + if level == LOG_TRACE: + return self._http_post("/log/trace", {"level": level, "message": message, "timestamp": timestamp, "scope": scope}, worker) + if level == LOG_INFO: + return self._http_post("/log/info", {"level": level, "message": message, "timestamp": timestamp, "scope": scope}, worker) + if level == LOG_WARN: + return self._http_post("/log/warn", {"level": level, "message": message, "timestamp": timestamp, "scope": scope}, worker) + if level == LOG_ERROR: + return self._http_post("/log/error", {"level": level, "message": message, "timestamp": timestamp, "scope": scope}, worker) + + print("Invalid log level") + + def release_task(self, worker: Worker, task_id: int, result: int, verification: int): + return self._http_post("/task/release", { + "task_id": task_id, + "result": result, + "verification": verification + }, worker) + + def request_access(self, worker: Worker, project: int, assign:bool, submit:bool): + return self._http_post("/project/request_access", { + "project": project, + "assign": assign, + "submit": submit, + }, worker) + + def get_secret(self, worker: Worker, project: int): + r = self._http_get("/project/secret/" + str(project), worker) + if r.status_code == 200: + return json.loads(r.text)["content"]["secret"] + + def _http_get(self, endpoint: str, worker: Worker = None): + if worker is not None: + signature = hmac.new(key=worker.secret, msg=endpoint.encode("utf8"), digestmod=hashlib.sha256).hexdigest() + headers = format_headers(signature=signature, wid=worker.id) + else: + headers = format_headers() + retries = 0 + while retries < MAX_HTTP_RETRIES: + try: + response = requests.get(self.url + endpoint, timeout=API_TIMEOUT, + headers=headers) + return response + except: + retries += 1 + pass + return None + + def _http_post(self, endpoint: str, body, worker: Worker = None): + + body_bytes = json.dumps(body).encode("utf8") + + if worker is not None: + signature = hmac.new(key=worker.secret, msg=body_bytes, digestmod=hashlib.sha256).hexdigest() + headers = format_headers(signature=signature, wid=worker.id) + else: + headers = format_headers() + retries = 0 + while retries < MAX_HTTP_RETRIES: + try: + response = requests.post(self.url + endpoint, timeout=API_TIMEOUT, + headers=headers, data=body_bytes) + return response + except: + retries += 1 + pass + return None + + diff --git a/run.py b/run.py new file mode 100644 index 0000000..b2af8db --- /dev/null +++ b/run.py @@ -0,0 +1,2 @@ +#!/bin/env python + diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..2c3330b --- /dev/null +++ b/worker.py @@ -0,0 +1,112 @@ +import datetime +import json +import os +import shutil +import subprocess +from subprocess import Popen + +from api import Project, Worker, TaskTrackerApi, Task + + +class WorkerContext: + + def _format_project_path(self, project: Project): + return "work/%s/%d_%s" % (self._ctx_name, project.id, project.version,) + + def __init__(self, worker: Worker, ctx_name): + self._worker = worker + self._projects = dict() + self._ctx_name = ctx_name + + def _deploy_project(self, project: Project): + + project.secret = self._worker.get_secret(project.id) + + print("Deploying project " + project.name) + path = self._format_project_path(project) + if os.path.exists(path): + shutil.rmtree(path) + + os.makedirs(path, exist_ok=True) + proc = Popen(args=["git", "clone", project.clone_url, path]) + proc.wait() + + if project.version: + proc = Popen(args=["git", "checkout", project.version], cwd=os.path.abspath(path)) + proc.wait() + + if os.path.exists(os.path.join(path, "setup")): + proc = Popen(args=["./setup", ], cwd=os.path.abspath(path)) + proc.wait() + + self._projects[project.id] = project + + def _get_project_path(self, project: Project): + if project.id not in self._projects or self._projects[project.id].version != project.version: + self._deploy_project(project) + return self._format_project_path(project) + + def execute_task(self, task: Task): + path = self._get_project_path(task.project) + + if os.path.exists(os.path.join(path, "run")): + proc = Popen(args=["./run", task.toJSON(), self._projects[task.project.id].secret], + stdout=subprocess.PIPE, cwd=os.path.abspath(path)) + result = proc.communicate()[0].decode("utf8") + try: + json_result = json.loads(result) + self._do_post_task_hooks(json_result) + print(self._worker.release_task(task.id, + json_result["result"], + json_result["verification"] if "verification" in json_result else 0).text) + except Exception as e: + print(e) + return + + def _do_post_task_hooks(self, res): + + if "logs" in res: + for log in res["logs"]: + r = self._worker.log(log["level"] if "level" in log else 7, + log["message"], + log.get("timestamp", int(datetime.datetime.utcnow().timestamp())), + log.get("scope", "tt_py_client")) + print("LOG: %s <%d>" % (log, r.status_code)) + + if "tasks" in res: + for task in res["tasks"]: + r = self._worker.submit_task(task["project"], + task["recipe"], + task.get("priority"), + task.get("max_assign_time"), + task.get("hash64"), + task.get("unique_str"), + task.get("verification_count"), + task.get("max_retries")) + print("SUBMIT: %s <%d>" % (task, r.status_code)) + + +api = TaskTrackerApi("http://localhost:42901") +# w = api.make_worker("python_tt") +# w.dump_to_file() + +w1 = Worker.from_file(api) + +# print(w1.request_access(1, True, True).text) + + +# def submit(i): +# w1.submit_task(project=1, recipe=json.dumps({ +# "tid": str(i), +# }), hash64=i) + + +# pool = multiprocessing.Pool(processes=100) +# pool.map(submit, range(0, 500000)) +# pool.join() +# print(t.toJSON()) + +t = w1.fetch_task() + +ctx = WorkerContext(w1, "main") +ctx.execute_task(t)