attempt at making it autonomous

This commit is contained in:
simon987 2019-02-28 20:54:47 -05:00
parent 3986a3109b
commit 3400ee44ea
6 changed files with 103 additions and 56 deletions

View File

@ -4,6 +4,6 @@ export DRONEROOT="task_tracker_drone"
screen -S tt_drone -X quit screen -S tt_drone -X quit
echo "starting drone" echo "starting drone"
screen -S tt_drone -d -m bash -c "cd ${DRONEROOT} && source env/bin/activate && python src/tt_drone/drone.py" screen -S tt_drone -d -m bash -c "cd ${DRONEROOT} && source env/bin/activate && python src/drone.py"
sleep 1 sleep 1
screen -list screen -list

View File

@ -1 +1,2 @@
requests requests
bs4

68
src/drone.py Normal file
View File

@ -0,0 +1,68 @@
import signal
import threading
import time
from tt_drone.api import (Worker, TaskTrackerApi)
from tt_drone.worker import (WorkerContext)
die = False
lock = threading.Lock()
current_tasks = set()
threads = list()
THREAD_COUNT = 10
def cleanup(signum: int, frame):
global die
global threads
die = True
print("Waiting for threads to die...")
for t in threads:
t.join()
print("Releasing uncompleted tasks...")
for k, v in current_tasks:
worker.release_task(v.id, 2, 0)
print("Goodbye")
signal.signal(signal.SIGINT, cleanup)
signal.signal(signal.SIGTERM, cleanup)
def drone(ctx: WorkerContext):
global die
while not die:
task = worker.fetch_task(1)
try:
if task is not None:
with lock:
current_tasks.add(task.id)
ctx.execute_task(task)
else:
time.sleep(10)
finally:
with lock:
try:
if task is not None:
current_tasks.remove(task.id)
except KeyError:
pass
api = TaskTrackerApi("https://tt.simon987.net/api")
worker = Worker.from_file(api)
if not worker:
worker = api.make_worker("drone")
worker.dump_to_file()
worker.request_access(1, True, False)
print("Starting %d working contexts" % (THREAD_COUNT,))
for i in range(THREAD_COUNT):
ctx = WorkerContext(worker, "%s_%d" % (worker.alias, i))
t = threading.Thread(target=drone, args=[ctx, ])
t.start()
threads.append(t)

View File

@ -1,12 +1,14 @@
import base64 import base64
import hashlib import hashlib
import hmac
import json import json
import os
import time
import requests import requests
import hmac
API_TIMEOUT = 5 API_TIMEOUT = 5
MAX_HTTP_RETRIES = 2 MAX_HTTP_RETRIES = 3
VERSION = 1.0 VERSION = 1.0
LOG_TRACE = 7 LOG_TRACE = 7
@ -69,8 +71,8 @@ class Worker:
self._secret_b64 = secret self._secret_b64 = secret
self._api: TaskTrackerApi = api self._api: TaskTrackerApi = api
def fetch_task(self, project_id=None): def fetch_task(self, project_id):
return self._api.fetch_task(self) return self._api.fetch_task(self, project_id)
def submit_task(self, project, recipe, priority=1, max_assign_time=3600, hash64=0, unique_str="", def submit_task(self, project, recipe, priority=1, max_assign_time=3600, hash64=0, unique_str="",
verification_count=1, max_retries=3): verification_count=1, max_retries=3):
@ -99,10 +101,12 @@ class Worker:
@staticmethod @staticmethod
def from_file(api): def from_file(api):
if os.path.exists("worker.json"):
with open("worker.json", "r") as f: with open("worker.json", "r") as f:
obj = json.load(f) obj = json.load(f)
return Worker(wid=obj["id"], alias=obj["alias"], return Worker(wid=obj["id"], alias=obj["alias"],
secret=obj["secret"], api=api) secret=obj["secret"], api=api)
return None
def format_headers(ua: str = None, wid: int = None, signature: str = None): def format_headers(ua: str = None, wid: int = None, signature: str = None):
@ -137,8 +141,9 @@ class TaskTrackerApi:
json_response["content"]["worker"]["secret"], self) json_response["content"]["worker"]["secret"], self)
return worker return worker
def fetch_task(self, worker: Worker) -> Task: def fetch_task(self, worker: Worker, project_id: int) -> Task:
response = self._http_get("/task/get", worker) response = self._http_get("/task/get/%d" % (project_id, ), worker)
if response: if response:
json_response = json.loads(response.text) json_response = json.loads(response.text)
if json_response["ok"]: if json_response["ok"]:
@ -201,9 +206,16 @@ class TaskTrackerApi:
try: try:
response = requests.get(self.url + endpoint, timeout=API_TIMEOUT, response = requests.get(self.url + endpoint, timeout=API_TIMEOUT,
headers=headers) headers=headers)
if response.status_code == 429:
delay = json.loads(response.text)["rate_limit_delay"] * 20
time.sleep(delay)
continue
return response return response
except: except Exception as e:
retries += 1 retries += 1
print("ERROR: %s" % (e, ))
pass pass
return None return None
@ -221,10 +233,16 @@ class TaskTrackerApi:
try: try:
response = requests.post(self.url + endpoint, timeout=API_TIMEOUT, response = requests.post(self.url + endpoint, timeout=API_TIMEOUT,
headers=headers, data=body_bytes) headers=headers, data=body_bytes)
if response.status_code == 429:
delay = json.loads(response.text)["rate_limit_delay"] * 20
time.sleep(delay)
continue
return response return response
except: except Exception as e:
print(str(type(e)) + str(e))
retries += 1 retries += 1
pass pass
return None return None

View File

@ -1,20 +0,0 @@
import signal
import time
die = False
def cleanup(signum, frame):
global die
die = True
signal.signal(signal.SIGINT, cleanup)
signal.signal(signal.SIGTERM, cleanup)
while True:
time.sleep(1)
print("tick")
if die:
break

View File

@ -1,13 +1,11 @@
import datetime import datetime
import json import json
import os import os
import random
import shutil import shutil
import subprocess import subprocess
import time
from subprocess import Popen from subprocess import Popen
from tt_drone.api import Project, Worker, TaskTrackerApi, Task from tt_drone.api import (Project, Worker, Task)
class WorkerContext: class WorkerContext:
@ -62,7 +60,6 @@ class WorkerContext:
json_result["result"], json_result["result"],
json_result["verification"] if "verification" in json_result else 0).text) json_result["verification"] if "verification" in json_result else 0).text)
except Exception as e: except Exception as e:
print(e)
return return
def _do_post_task_hooks(self, res): def _do_post_task_hooks(self, res):
@ -87,20 +84,3 @@ class WorkerContext:
task.get("max_retries")) task.get("max_retries"))
print("SUBMIT: %s <%d>" % (task, r.status_code)) print("SUBMIT: %s <%d>" % (task, r.status_code))
api = TaskTrackerApi("https://tt.simon987.net/api")
w1 = Worker.from_file(api)
ctx = WorkerContext(w1, "main")
while True:
try:
t = w1.fetch_task()
ctx.execute_task(t)
time.sleep(random.randint(5, 45))
except KeyboardInterrupt:
print("Cancel current task...")
try:
w1.release_task(t.id, 2, 0)
except NameError:
pass