From 8c5f2e0f7788f57eef766d7edb2e1adf8a9fa0c8 Mon Sep 17 00:00:00 2001 From: simon987 Date: Mon, 18 Feb 2019 22:00:02 -0500 Subject: [PATCH] initial commit --- .gitignore | 4 + .idea/vcs.xml | 6 + __pycache__/api.cpython-37.pyc | Bin 0 -> 7733 bytes api.py | 230 +++++++++++++++++++++++++++++++++ run.py | 2 + worker.py | 112 ++++++++++++++++ 6 files changed, 354 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/vcs.xml create mode 100644 __pycache__/api.cpython-37.pyc create mode 100644 api.py create mode 100644 run.py create mode 100644 worker.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5f3ae1a --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.idea/ +work/ +worker.json + diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/__pycache__/api.cpython-37.pyc b/__pycache__/api.cpython-37.pyc new file mode 100644 index 0000000000000000000000000000000000000000..2340eae4473d8ab8843a6adf631d54001f23a24c GIT binary patch literal 7733 zcmbVRJ9r#d6`uFbKD1iP@-vB}BqU@Z_Cj!+04A}VD2ao!HnJQ$OJK6vJCb&_+F9Qj z+0ruN%NMzzAka{{km%CT@qH9Dbkvl$rE*721OGX9-xdKf(miuuGjq@5p7Wo(S7&B& z2Cjd#Hb41t!7%<#jlt!jas?$m01<{T+eS^_&6>$=t7h@tuGxHdYL2jKuCQyKaB3Oh z)_mdBvgpf+Of4_`S^;A-B3moIXNa80KQu&s->S``RS+|1&7f66t0-pCnnmlFI1$c; z$HO^MdSr`ZV(y_^o5zgfVjeT*G2?_VZyDv2kFiRl`j}Sz*aBInwqg9^t;ZSM9(zO_ zA~q2dq=7QL=#eWZ@zWroK^%fRQ&7^x9Z&ElGgn$0x}HHT|9YA)9t)I6@asAah3q2_ZflNu&OkB?`o%4!PbcV#n@ z&16?Oo%&X&vW<4s34?9fR{nZ331rxfROWst<7U)R&Q_EN<#o4L+s#JVm!xxfPpr?ymBKpxMFlf|GblJO^T!xv9TisW06NgUJn2 z7F~3wWwV%HK_xK`%+zREC^m|N;-Yvc5LwGd$qMt4e@GT$RsRpxbIKv6l`FQly7B)k zF;fy#EF?`%psBnh`ry`$>T{^aK8RsDX2JB$UU9&xo=`2jBqGvi5U5-bY(-+bP4z+$ z+}p0V^@t?t$umSq8)JmNCbJ+^iOJf1)6a9y*EV)%r5RqTZGE8JM zj8%TCz5~zOaBMq@~NOj*riK$;3e{o_1^kzuj zN83K_Ru=q4RHkkJz&(V+nY4Yhb0~T2f$U%pWXHJrYKEAXf;@$f$52?pV;qU@Ch06;(-Wz7MoM8q6;OJ! znwOF3QbeSS?n9nPwkXq5=p-HaS4%4QshvEBqY!v|40|zhtY)0Yp^Vc+=6(W)J?rKllq5QM32!j z^YQlT7J?YBGpn~c?}z8{4joosKuviY^>_pF&24v@_qM|zP9(br?L4Nq`7WOLF$&wR zVa=PpxjsMRU$az(z6xoOTZ^~bNrRj`U34ewX2YQ%G|A1OGr>bahV2mMp_yB1*(b(I@8qHZ{0^Q0MG zI>}{n>8W!_uxq3MyB)3f?Nz>k>GJzTen8}hM5aW08GW}<2J6lB*NeBTludT(UsH-j z8A7pf64VPMLK(b%F+423xU#)+^?8fFm@R2zxt(3%iV zZaqx;yL%9a*f^(zas}`B7-b-o!R{G7Jw#o!zi6-Ux5-8lQnYHtuTR>QsCA2kh)FFt9Qolx-bQhTaCq+JRkkIN~UWkk{A@ikf|n zlpSp+Qxa7Lnja)lu-0scpI~x7x)cy!=nv6lV5#FXhiuw-WXLAj%#h8O`)rzr=AM<< zDS`zsiN&bK(uhWQUtoicv74D@SB|k#CRQBz3T`7`E6?lRtJF&|S2^vdF62wpZS$xy z*TDK}Ynt`!nvAxXZ-L1gVp;{jilW&o^$lw{rR-F3(RT9KLCOk!kQ}7^4iR#i>?q6D z%1U|m$Pi4vSCY$8cC(X6@(9W%|3-3?WZUuzktGmSARyLkY=uY*g#3W|N5;rCMn=Tv z6%q4z4A6&EbIf}RF-53V7``4f5u7$>xW0kLa=bC9Kn zu5i$Dg@=|Gx?x6SfN*?~Mcc<{4x@yBfNe0E3vDVuL2e%f<+(Q(=EhJ^f%|5Fa*hKT z&5ILwx=4NEB;KvD(5!eC?{dXi^bi(uYb9W(LC-Vd9D0t4Gm!ansz_LHMb;YtNQ*!y zrm~UWPPjyJO{{8yA{r?Tu#hRqy$)O;)u-gkVDcB}AGp9%liwTS7nj z8Pt&zQ8vVR(5OY{2cs5eM*Gg5lfvULx4CMsSkb<<0(e(XLiQ?Hx3e0F-N%2$0y##i zOe5+fFk0p5>Dni;JMxilZX`)J=tkJc?0;PHKe2L>a3@hNP8{%HUT0OvkN_vMOFNCQ zOBs{in;Jv%$CVF!(v3QCsAl-|V9=C5rPYia0Gm1zx6LrnYx*U|_0fPe2e%D3X60b# z=S*Otvr{S>dkmW9H{ZeILp_l@pu&10sE`^5&=UnH*3iXq4ME0`w7dvS!3{tQ$XfSt zaKJKltG#T6uvY~UQd!Zatl@Q#4S`ixWwq=urqiXLVNm{@2&EtcV89DprC!>wQ$mpB zRDX>!7Psh-pk(&WOtC$JxWF&;(gs6uROntnVTKn_=`%cmXH%TWJwlRwNWRUk_6nLY z4ODPO!g%3%JUjxkHJ`w2uzl&G{2mRwNMr#73!qqA&S?nA93Jt!e3c%Ud~}+BiYLEA z(W$ZRnbyP~GK^{Uk3(ZutN#c**dT{U7u_kkv2aMzsg)R;mRx|4aEdVi{SXdLDo8jO zw=jN%#}W5+QZO-YWBgMd$NoNI+`;%~JnlX@p1=WyBV#rm1Nh=9lUI>#pf)&^x z&1iPkqJH&3U3U7EDt>L+b`lV z&O7Ams2!aaoWuykNSl|%3?&fG>|5}Rw%~pS0<<*w%7Kyg{nHU2o#uuR%^y&PIWE~E zEbyr*E+%sx%?_LYn|N5h1@d*+IaTSWy_Km1!`6Fq`0}F|oBaGVV}#BAcm!k5Ji*v_ z9x`yQ^zI3)H4K6zFs&bro`A5($>O-8WeM6uuO{{Z9Ba!xCXu&>l9&H`~ZylP6gHUgMVIO#hK%o@kG3qlw za5`R+0XB7nv*+$yPTeHCX~=Z?E)c?lP2-Qzk{( zG2{MnQ8^o1^@g%L;RAgFH+Ku|FuU%5vjc`Z5o*^hh303-V@ow<` z%F4}Pd1+<&+R`n;0c=UJvMD+e5L5ZZo7aMsYacG%xV?fyEA}&OS_AJ)UMDY!&lWje zPHB>i;V&rsV~qhvbIvST9?(J#+q8(bgUyN@!t9-%vg=`-ZV)b@i*ml9GJk^xAFNu3 zNNh=kz*#mc_ex^p2L>2$>Lgw(!)o=RRwxb!MRqI4YW1MjEYyl0!1gjbZ>E_f54HYk z_vxh2nt@u2tkzi^ERkZJJwNq_ip{WMGf?bxDE9hLv2qKmf>Z0@CLUctf zC-Gv2rczLiuo%NE&Rr#)x9N=a5^4mNqxM+x${vRnIp#;$4hgc#+aNuZe!5z;i^Y;( zES@aPXXY~{ckUR^0K7u3eNw(mkFcQGagk898c3X4(92ZY0I6}berM_G?e}DpI$K0G zha)!^tJmJvlipoiS-h@)n_%8vqM6J&^Tw*Ag&xI$@-OS3m)<0I5YwahvB>nH2H)Z% T6HTE6{ Worker: + + response = self._http_post("/worker/create", body={"alias": alias}) + if response: + json_response = json.loads(response.text) + print(response.text) + + if response.status_code != 200: + raise Exception(json_response["message"]) + + worker = Worker(json_response["content"]["worker"]["id"], json_response["content"]["worker"]["alias"], + json_response["content"]["worker"]["secret"], self) + return worker + + def fetch_task(self, worker: Worker) -> Task: + response = self._http_get("/task/get", worker) + if response: + json_response = json.loads(response.text) + if json_response["ok"]: + return Task(json_response["content"]["task"]) + return None + + def submit_task(self, worker: Worker, project, recipe, priority, max_assign_time, hash64, unique_str, + verification_count, max_retries): + + return self._http_post("/task/submit", { + "project": project, + "recipe": recipe, + "priority": priority, + "max_assign_time": max_assign_time, + "hash_u64": hash64, + "unique_str": unique_str, + "verification_count": verification_count, + "max_retries": max_retries, + }, worker) + + def log(self, worker: Worker, level: int, message: str, timestamp: int, scope: str): + if level == LOG_TRACE: + return self._http_post("/log/trace", {"level": level, "message": message, "timestamp": timestamp, "scope": scope}, worker) + if level == LOG_INFO: + return self._http_post("/log/info", {"level": level, "message": message, "timestamp": timestamp, "scope": scope}, worker) + if level == LOG_WARN: + return self._http_post("/log/warn", {"level": level, "message": message, "timestamp": timestamp, "scope": scope}, worker) + if level == LOG_ERROR: + return self._http_post("/log/error", {"level": level, "message": message, "timestamp": timestamp, "scope": scope}, worker) + + print("Invalid log level") + + def release_task(self, worker: Worker, task_id: int, result: int, verification: int): + return self._http_post("/task/release", { + "task_id": task_id, + "result": result, + "verification": verification + }, worker) + + def request_access(self, worker: Worker, project: int, assign:bool, submit:bool): + return self._http_post("/project/request_access", { + "project": project, + "assign": assign, + "submit": submit, + }, worker) + + def get_secret(self, worker: Worker, project: int): + r = self._http_get("/project/secret/" + str(project), worker) + if r.status_code == 200: + return json.loads(r.text)["content"]["secret"] + + def _http_get(self, endpoint: str, worker: Worker = None): + if worker is not None: + signature = hmac.new(key=worker.secret, msg=endpoint.encode("utf8"), digestmod=hashlib.sha256).hexdigest() + headers = format_headers(signature=signature, wid=worker.id) + else: + headers = format_headers() + retries = 0 + while retries < MAX_HTTP_RETRIES: + try: + response = requests.get(self.url + endpoint, timeout=API_TIMEOUT, + headers=headers) + return response + except: + retries += 1 + pass + return None + + def _http_post(self, endpoint: str, body, worker: Worker = None): + + body_bytes = json.dumps(body).encode("utf8") + + if worker is not None: + signature = hmac.new(key=worker.secret, msg=body_bytes, digestmod=hashlib.sha256).hexdigest() + headers = format_headers(signature=signature, wid=worker.id) + else: + headers = format_headers() + retries = 0 + while retries < MAX_HTTP_RETRIES: + try: + response = requests.post(self.url + endpoint, timeout=API_TIMEOUT, + headers=headers, data=body_bytes) + return response + except: + retries += 1 + pass + return None + + diff --git a/run.py b/run.py new file mode 100644 index 0000000..b2af8db --- /dev/null +++ b/run.py @@ -0,0 +1,2 @@ +#!/bin/env python + diff --git a/worker.py b/worker.py new file mode 100644 index 0000000..2c3330b --- /dev/null +++ b/worker.py @@ -0,0 +1,112 @@ +import datetime +import json +import os +import shutil +import subprocess +from subprocess import Popen + +from api import Project, Worker, TaskTrackerApi, Task + + +class WorkerContext: + + def _format_project_path(self, project: Project): + return "work/%s/%d_%s" % (self._ctx_name, project.id, project.version,) + + def __init__(self, worker: Worker, ctx_name): + self._worker = worker + self._projects = dict() + self._ctx_name = ctx_name + + def _deploy_project(self, project: Project): + + project.secret = self._worker.get_secret(project.id) + + print("Deploying project " + project.name) + path = self._format_project_path(project) + if os.path.exists(path): + shutil.rmtree(path) + + os.makedirs(path, exist_ok=True) + proc = Popen(args=["git", "clone", project.clone_url, path]) + proc.wait() + + if project.version: + proc = Popen(args=["git", "checkout", project.version], cwd=os.path.abspath(path)) + proc.wait() + + if os.path.exists(os.path.join(path, "setup")): + proc = Popen(args=["./setup", ], cwd=os.path.abspath(path)) + proc.wait() + + self._projects[project.id] = project + + def _get_project_path(self, project: Project): + if project.id not in self._projects or self._projects[project.id].version != project.version: + self._deploy_project(project) + return self._format_project_path(project) + + def execute_task(self, task: Task): + path = self._get_project_path(task.project) + + if os.path.exists(os.path.join(path, "run")): + proc = Popen(args=["./run", task.toJSON(), self._projects[task.project.id].secret], + stdout=subprocess.PIPE, cwd=os.path.abspath(path)) + result = proc.communicate()[0].decode("utf8") + try: + json_result = json.loads(result) + self._do_post_task_hooks(json_result) + print(self._worker.release_task(task.id, + json_result["result"], + json_result["verification"] if "verification" in json_result else 0).text) + except Exception as e: + print(e) + return + + def _do_post_task_hooks(self, res): + + if "logs" in res: + for log in res["logs"]: + r = self._worker.log(log["level"] if "level" in log else 7, + log["message"], + log.get("timestamp", int(datetime.datetime.utcnow().timestamp())), + log.get("scope", "tt_py_client")) + print("LOG: %s <%d>" % (log, r.status_code)) + + if "tasks" in res: + for task in res["tasks"]: + r = self._worker.submit_task(task["project"], + task["recipe"], + task.get("priority"), + task.get("max_assign_time"), + task.get("hash64"), + task.get("unique_str"), + task.get("verification_count"), + task.get("max_retries")) + print("SUBMIT: %s <%d>" % (task, r.status_code)) + + +api = TaskTrackerApi("http://localhost:42901") +# w = api.make_worker("python_tt") +# w.dump_to_file() + +w1 = Worker.from_file(api) + +# print(w1.request_access(1, True, True).text) + + +# def submit(i): +# w1.submit_task(project=1, recipe=json.dumps({ +# "tid": str(i), +# }), hash64=i) + + +# pool = multiprocessing.Pool(processes=100) +# pool.map(submit, range(0, 500000)) +# pool.join() +# print(t.toJSON()) + +t = w1.fetch_task() + +ctx = WorkerContext(w1, "main") +ctx.execute_task(t)