diff --git a/jenkins/deploy.sh b/jenkins/deploy.sh index 49f4427..b7957e7 100755 --- a/jenkins/deploy.sh +++ b/jenkins/deploy.sh @@ -4,6 +4,6 @@ export DRONEROOT="task_tracker_drone" screen -S tt_drone -X quit echo "starting drone" -screen -S tt_drone -d -m bash -c "cd ${DRONEROOT} && source env/bin/activate && python src/tt_drone/drone.py" +screen -S tt_drone -d -m bash -c "cd ${DRONEROOT} && source env/bin/activate && python src/drone.py" sleep 1 screen -list diff --git a/requirements.txt b/requirements.txt index 663bd1f..1f311f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -requests \ No newline at end of file +requests +bs4 \ No newline at end of file diff --git a/src/drone.py b/src/drone.py new file mode 100644 index 0000000..2878fc8 --- /dev/null +++ b/src/drone.py @@ -0,0 +1,68 @@ +import signal +import threading +import time + +from tt_drone.api import (Worker, TaskTrackerApi) +from tt_drone.worker import (WorkerContext) + +die = False +lock = threading.Lock() +current_tasks = set() +threads = list() + +THREAD_COUNT = 10 + + +def cleanup(signum: int, frame): + global die + global threads + die = True + + print("Waiting for threads to die...") + for t in threads: + t.join() + print("Releasing uncompleted tasks...") + for k, v in current_tasks: + worker.release_task(v.id, 2, 0) + print("Goodbye") + + +signal.signal(signal.SIGINT, cleanup) +signal.signal(signal.SIGTERM, cleanup) + + +def drone(ctx: WorkerContext): + global die + while not die: + task = worker.fetch_task(1) + try: + if task is not None: + with lock: + current_tasks.add(task.id) + ctx.execute_task(task) + else: + time.sleep(10) + finally: + with lock: + try: + if task is not None: + current_tasks.remove(task.id) + except KeyError: + pass + + +api = TaskTrackerApi("https://tt.simon987.net/api") +worker = Worker.from_file(api) +if not worker: + worker = api.make_worker("drone") + worker.dump_to_file() + worker.request_access(1, True, False) + + +print("Starting %d working contexts" % (THREAD_COUNT,)) +for i in range(THREAD_COUNT): + ctx = WorkerContext(worker, "%s_%d" % (worker.alias, i)) + + t = threading.Thread(target=drone, args=[ctx, ]) + t.start() + threads.append(t) diff --git a/src/tt_drone/api.py b/src/tt_drone/api.py index a36545a..f09f11b 100644 --- a/src/tt_drone/api.py +++ b/src/tt_drone/api.py @@ -1,12 +1,14 @@ import base64 import hashlib +import hmac import json +import os +import time import requests -import hmac API_TIMEOUT = 5 -MAX_HTTP_RETRIES = 2 +MAX_HTTP_RETRIES = 3 VERSION = 1.0 LOG_TRACE = 7 @@ -69,8 +71,8 @@ class Worker: self._secret_b64 = secret self._api: TaskTrackerApi = api - def fetch_task(self, project_id=None): - return self._api.fetch_task(self) + def fetch_task(self, project_id): + return self._api.fetch_task(self, project_id) def submit_task(self, project, recipe, priority=1, max_assign_time=3600, hash64=0, unique_str="", verification_count=1, max_retries=3): @@ -99,10 +101,12 @@ class Worker: @staticmethod def from_file(api): - with open("worker.json", "r") as f: - obj = json.load(f) - return Worker(wid=obj["id"], alias=obj["alias"], - secret=obj["secret"], api=api) + if os.path.exists("worker.json"): + with open("worker.json", "r") as f: + obj = json.load(f) + return Worker(wid=obj["id"], alias=obj["alias"], + secret=obj["secret"], api=api) + return None def format_headers(ua: str = None, wid: int = None, signature: str = None): @@ -137,8 +141,9 @@ class TaskTrackerApi: json_response["content"]["worker"]["secret"], self) return worker - def fetch_task(self, worker: Worker) -> Task: - response = self._http_get("/task/get", worker) + def fetch_task(self, worker: Worker, project_id: int) -> Task: + response = self._http_get("/task/get/%d" % (project_id, ), worker) + if response: json_response = json.loads(response.text) if json_response["ok"]: @@ -201,9 +206,16 @@ class TaskTrackerApi: try: response = requests.get(self.url + endpoint, timeout=API_TIMEOUT, headers=headers) + + if response.status_code == 429: + delay = json.loads(response.text)["rate_limit_delay"] * 20 + time.sleep(delay) + continue + return response - except: + except Exception as e: retries += 1 + print("ERROR: %s" % (e, )) pass return None @@ -221,10 +233,16 @@ class TaskTrackerApi: try: response = requests.post(self.url + endpoint, timeout=API_TIMEOUT, headers=headers, data=body_bytes) + + if response.status_code == 429: + delay = json.loads(response.text)["rate_limit_delay"] * 20 + time.sleep(delay) + continue + return response - except: + except Exception as e: + print(str(type(e)) + str(e)) retries += 1 pass return None - diff --git a/src/tt_drone/drone.py b/src/tt_drone/drone.py deleted file mode 100644 index 6eba0ed..0000000 --- a/src/tt_drone/drone.py +++ /dev/null @@ -1,20 +0,0 @@ -import signal -import time - -die = False - - -def cleanup(signum, frame): - global die - die = True - - -signal.signal(signal.SIGINT, cleanup) -signal.signal(signal.SIGTERM, cleanup) - -while True: - time.sleep(1) - print("tick") - if die: - break - diff --git a/src/tt_drone/worker.py b/src/tt_drone/worker.py index 7b31067..022618f 100644 --- a/src/tt_drone/worker.py +++ b/src/tt_drone/worker.py @@ -1,13 +1,11 @@ import datetime import json import os -import random import shutil import subprocess -import time from subprocess import Popen -from tt_drone.api import Project, Worker, TaskTrackerApi, Task +from tt_drone.api import (Project, Worker, Task) class WorkerContext: @@ -62,7 +60,6 @@ class WorkerContext: json_result["result"], json_result["verification"] if "verification" in json_result else 0).text) except Exception as e: - print(e) return def _do_post_task_hooks(self, res): @@ -87,20 +84,3 @@ class WorkerContext: task.get("max_retries")) print("SUBMIT: %s <%d>" % (task, r.status_code)) - -api = TaskTrackerApi("https://tt.simon987.net/api") -w1 = Worker.from_file(api) -ctx = WorkerContext(w1, "main") - -while True: - try: - t = w1.fetch_task() - ctx.execute_task(t) - - time.sleep(random.randint(5, 45)) - except KeyboardInterrupt: - print("Cancel current task...") - try: - w1.release_task(t.id, 2, 0) - except NameError: - pass