From 22f4a6b358075425f0f86e402c9d09edb1ead6b2 Mon Sep 17 00:00:00 2001 From: simon987 Date: Sun, 3 Feb 2019 20:04:35 -0500 Subject: [PATCH] task verification (not 100% tested) --- api/task.go | 37 ++++++----- schema.sql | 72 ++++++++++++++------ storage/project.go | 10 ++- storage/task.go | 109 +++++++++++++++++++++---------- test/api_task_test.go | 148 ++++++++++++++++++++++++++++++++++++++++-- test/schema.sql | 72 ++++++++++++++------ 6 files changed, 346 insertions(+), 102 deletions(-) diff --git a/api/task.go b/api/task.go index 910e8e5..7588a7b 100644 --- a/api/task.go +++ b/api/task.go @@ -14,22 +14,25 @@ import ( ) type CreateTaskRequest struct { - Project int64 `json:"project"` - MaxRetries int64 `json:"max_retries"` - Recipe string `json:"recipe"` - Priority int64 `json:"priority"` - MaxAssignTime int64 `json:"max_assign_time"` - Hash64 int64 `json:"hash_u64"` - UniqueString string `json:"unique_string"` + Project int64 `json:"project"` + MaxRetries int64 `json:"max_retries"` + Recipe string `json:"recipe"` + Priority int64 `json:"priority"` + MaxAssignTime int64 `json:"max_assign_time"` + Hash64 int64 `json:"hash_u64"` + UniqueString string `json:"unique_string"` + VerificationCount int64 `json:"verification_count"` } type ReleaseTaskRequest struct { - TaskId int64 `json:"task_id"` - Success bool `json:"success"` + TaskId int64 `json:"task_id"` + Result storage.TaskResult `json:"result"` + Verification int64 `json:"verification"` } type ReleaseTaskResponse struct { Ok bool `json:"ok"` + Updated bool `json:"updated"` Message string `json:"message,omitempty"` } @@ -56,11 +59,12 @@ func (api *WebAPI) TaskCreate(r *Request) { return } task := &storage.Task{ - MaxRetries: createReq.MaxRetries, - Recipe: createReq.Recipe, - Priority: createReq.Priority, - AssignTime: 0, - MaxAssignTime: createReq.MaxAssignTime, + MaxRetries: createReq.MaxRetries, + Recipe: createReq.Recipe, + Priority: createReq.Priority, + AssignTime: 0, + MaxAssignTime: createReq.MaxAssignTime, + VerificationCount: createReq.VerificationCount, } if createReq.IsValid() && isTaskValid(task) { @@ -229,10 +233,11 @@ func (api *WebAPI) TaskRelease(r *Request) { Message: "Could not parse request", }, 400) } - res := api.Database.ReleaseTask(req.TaskId, worker.Id, req.Success) + res := api.Database.ReleaseTask(req.TaskId, worker.Id, req.Result, req.Verification) response := ReleaseTaskResponse{ - Ok: res, + Updated: res, + Ok: true, } if !res { diff --git a/schema.sql b/schema.sql index a239aa5..4098d24 100755 --- a/schema.sql +++ b/schema.sql @@ -1,5 +1,5 @@ DROP TABLE IF EXISTS worker_identity, worker, project, task, log_entry, - worker_has_access_to_project, manager, manager_has_role_on_project; + worker_has_access_to_project, manager, manager_has_role_on_project, project_monitoring, worker_verifies_task; DROP TYPE IF EXISTS status; DROP TYPE IF EXISTS log_level; @@ -23,14 +23,15 @@ CREATE TABLE worker CREATE TABLE project ( - id SERIAL PRIMARY KEY, - priority INTEGER DEFAULT 0, - name TEXT UNIQUE, - clone_url TEXT, - git_repo TEXT UNIQUE, - version TEXT, - motd TEXT, - public boolean + id SERIAL PRIMARY KEY, + priority INTEGER DEFAULT 0, + name TEXT UNIQUE, + clone_url TEXT, + git_repo TEXT UNIQUE, + version TEXT, + motd TEXT, + public boolean, + closed_task_count INT DEFAULT 0 ); CREATE TABLE worker_has_access_to_project @@ -42,17 +43,25 @@ CREATE TABLE worker_has_access_to_project CREATE TABLE task ( - hash64 BIGINT DEFAULT NULL UNIQUE, - id SERIAL PRIMARY KEY, - project INTEGER REFERENCES project (id), - assignee INTEGER REFERENCES worker (id), - max_assign_time INTEGER DEFAULT 0, - assign_time INTEGER DEFAULT 0, - priority SMALLINT DEFAULT 0, - retries SMALLINT DEFAULT 0, - max_retries SMALLINT, - status SMALLINT DEFAULT 1, - recipe TEXT + hash64 BIGINT DEFAULT NULL UNIQUE, + id SERIAL PRIMARY KEY, + project INTEGER REFERENCES project (id), + assignee INTEGER REFERENCES worker (id), + max_assign_time INTEGER DEFAULT 0, + assign_time INTEGER DEFAULT 0, + verification_count INTEGER DEFAULT 0, + priority SMALLINT DEFAULT 0, + retries SMALLINT DEFAULT 0, + max_retries SMALLINT, + status SMALLINT DEFAULT 1, + recipe TEXT +); + +CREATE TABLE worker_verifies_task +( + verification_hash BIGINT, + task BIGINT REFERENCES task (id) ON DELETE CASCADE, + worker INT REFERENCES worker (id) ); CREATE TABLE log_entry @@ -76,4 +85,25 @@ CREATE TABLE manager_has_role_on_project manager INTEGER REFERENCES manager (id), role SMALLINT, project INTEGER REFERENCES project (id) -); \ No newline at end of file +); + +CREATE TABLE project_monitoring +( + project INT REFERENCES project (id), + new_task_count INT, + failed_task_count INT, + closed_task_count INT +); + +CREATE OR REPLACE FUNCTION on_task_delete_proc() RETURNS TRIGGER AS +$$ +BEGIN + UPDATE project SET closed_task_count=closed_task_count + 1 WHERE id = OLD.project; + RETURN OLD; +END; +$$ LANGUAGE 'plpgsql'; +CREATE TRIGGER on_task_delete + BEFORE DELETE + ON task + FOR EACH ROW +EXECUTE PROCEDURE on_task_delete_proc(); diff --git a/storage/project.go b/storage/project.go index 72d9532..a7016f4 100644 --- a/storage/project.go +++ b/storage/project.go @@ -72,7 +72,8 @@ func (database *Database) GetProject(id int64) *Project { func getProject(id int64, db *sql.DB) *Project { - row := db.QueryRow(`SELECT * FROM project WHERE id=$1`, id) + row := db.QueryRow(`SELECT id, priority, name, clone_url, git_repo, version, motd, public + FROM project WHERE id=$1`, id) project, err := scanProject(row) if err != nil { @@ -102,7 +103,9 @@ func scanProject(row *sql.Row) (*Project, error) { func (database *Database) GetProjectWithRepoName(repoName string) *Project { db := database.getDB() - row := db.QueryRow(`SELECT * FROM project WHERE LOWER(git_repo)=$1`, strings.ToLower(repoName)) + row := db.QueryRow(`SELECT id, priority, name, clone_url, git_repo, version, motd, public + FROM project WHERE LOWER(git_repo)=$1`, + strings.ToLower(repoName)) project, err := scanProject(row) if err != nil { @@ -191,7 +194,8 @@ func (database Database) GetAllProjectsStats() *[]ProjectStats { SUM(CASE WHEN status= 1 THEN 1 ELSE 0 END) newCount, SUM(CASE WHEN status=2 THEN 1 ELSE 0 END) failedCount, SUM(CASE WHEN status=3 THEN 1 ELSE 0 END) closedCount, - p.* + p.Id, p.priority, p.name, p.clone_url, p.git_repo, p.version, p.motd, + p.public FROM task RIGHT JOIN project p on task.project = p.id GROUP BY p.id ORDER BY p.name`) handleErr(err) diff --git a/storage/task.go b/storage/task.go index 6d84962..9ed090b 100644 --- a/storage/task.go +++ b/storage/task.go @@ -7,25 +7,32 @@ import ( ) type Task struct { - Id int64 `json:"id"` - Priority int64 `json:"priority"` - Project *Project `json:"project"` - Assignee int64 `json:"assignee"` - Retries int64 `json:"retries"` - MaxRetries int64 `json:"max_retries"` - Status TaskStatus `json:"status"` - Recipe string `json:"recipe"` - MaxAssignTime int64 `json:"max_assign_time"` - AssignTime int64 `json:"assign_time"` + Id int64 `json:"id"` + Priority int64 `json:"priority"` + Project *Project `json:"project"` + Assignee int64 `json:"assignee"` + Retries int64 `json:"retries"` + MaxRetries int64 `json:"max_retries"` + Status TaskStatus `json:"status"` + Recipe string `json:"recipe"` + MaxAssignTime int64 `json:"max_assign_time"` + AssignTime int64 `json:"assign_time"` + VerificationCount int64 `json:"verification_count"` } type TaskStatus int const ( - NEW TaskStatus = 1 - FAILED TaskStatus = 2 - CLOSED TaskStatus = 3 - TIMEOUT TaskStatus = 4 + NEW TaskStatus = 1 + FAILED TaskStatus = 2 +) + +type TaskResult int + +const ( + TR_OK TaskResult = 0 + TR_FAIL TaskResult = 1 + TR_SKIP TaskResult = 2 ) func (database *Database) SaveTask(task *Task, project int64, hash64 int64) error { @@ -34,9 +41,9 @@ func (database *Database) SaveTask(task *Task, project int64, hash64 int64) erro //TODO: For some reason it refuses to insert the 64-bit value unless I do that... res, err := db.Exec(fmt.Sprintf(` - INSERT INTO task (project, max_retries, recipe, priority, max_assign_time, hash64) - VALUES ($1,$2,$3,$4,$5,NULLIF(%d, 0))`, hash64), - project, task.MaxRetries, task.Recipe, task.Priority, task.MaxAssignTime) + INSERT INTO task (project, max_retries, recipe, priority, max_assign_time, hash64,verification_count) + VALUES ($1,$2,$3,$4,$5,NULLIF(%d, 0),$6)`, hash64), + project, task.MaxRetries, task.Recipe, task.Priority, task.MaxAssignTime, task.VerificationCount) if err != nil { logrus.WithError(err).WithFields(logrus.Fields{ "task": task, @@ -67,10 +74,12 @@ func (database *Database) GetTask(worker *Worker) *Task { SELECT task.id FROM task INNER JOIN project p on task.project = p.id + LEFT JOIN worker_verifies_task wvt on task.id = wvt.task AND wvt.worker=$1 WHERE assignee IS NULL AND task.status=1 AND (p.public OR EXISTS ( SELECT 1 FROM worker_has_access_to_project a WHERE a.worker=$1 AND a.project=p.id )) + AND wvt.task IS NULL ORDER BY p.priority DESC, task.priority DESC LIMIT 1 ) @@ -99,8 +108,9 @@ func getTaskById(id int64, db *sql.DB) *Task { row := db.QueryRow(` SELECT task.id, task.priority, task.project, assignee, retries, max_retries, - status, recipe, max_assign_time, assign_time, project.* FROM task - INNER JOIN project ON task.project = project.id + status, recipe, max_assign_time, assign_time, verification_count, p.priority, p.name, + p.clone_url, p.git_repo, p.version, p.motd, p.public FROM task + INNER JOIN project p ON task.project = p.id WHERE task.id=$1`, id) project := &Project{} task := &Task{} @@ -108,7 +118,7 @@ func getTaskById(id int64, db *sql.DB) *Task { err := row.Scan(&task.Id, &task.Priority, &project.Id, &task.Assignee, &task.Retries, &task.MaxRetries, &task.Status, &task.Recipe, &task.MaxAssignTime, - &task.AssignTime, &project.Id, &project.Priority, &project.Name, + &task.AssignTime, &task.VerificationCount, &project.Priority, &project.Name, &project.CloneUrl, &project.GitRepo, &project.Version, &project.Motd, &project.Public) handleErr(err) @@ -120,27 +130,54 @@ func getTaskById(id int64, db *sql.DB) *Task { return task } -func (database Database) ReleaseTask(id int64, workerId int64, success bool) bool { +func (database Database) ReleaseTask(id int64, workerId int64, result TaskResult, verification int64) bool { db := database.getDB() - var res sql.Result - var err error - if success { - res, err = db.Exec(`UPDATE task SET (status, assignee) = (3, NULL) - WHERE id=$1 AND task.assignee=$2`, id, workerId) - } else { - res, err = db.Exec(`UPDATE task SET (status, assignee, retries) = - (CASE WHEN retries+1 >= max_retries THEN 2 ELSE 1 END, NULL, retries+1) - WHERE id=$1 AND assignee=$2`, id, workerId) - } - handleErr(err) + var rowsAffected int64 + if result == TR_OK { + var pid int64 - rowsAffected, _ := res.RowsAffected() + //If no verification is required + row := db.QueryRow(`DELETE FROM task WHERE id=$1 AND assignee=$2 AND verification_count < 2 + RETURNING project`, id, workerId) + err := row.Scan(&pid) + if err == nil { + rowsAffected = 1 + } else { + //If verification is required + _, err = db.Exec(`INSERT INTO worker_verifies_task (worker, verification_hash, task) + SELECT $1,$2,task.id FROM task WHERE assignee=$1`, workerId, verification) + handleErr(err) + + res, _ := db.Exec(`DELETE FROM task WHERE id=$1 AND assignee=$2 AND + (SELECT COUNT(*) as vcnt FROM worker_verifies_task wvt WHERE task=$1 + GROUP BY wvt.verification_hash ORDER BY vcnt DESC LIMIT 1) >= task.verification_count`, + id, workerId) + rowsAffected, _ = res.RowsAffected() + + _, _ = db.Exec(`UPDATE task SET assignee=NULL WHERE id=$1 AND assignee=$2`, id, workerId) + } + + } else if result == TR_FAIL { + res, err := db.Exec(`UPDATE task SET (status, assignee, retries) = + (CASE WHEN retries+1 >= max_retries THEN 2 ELSE 1 END, NULL, retries+1) + WHERE id=$1 AND assignee=$2`, id, workerId) + handleErr(err) + rowsAffected, _ = res.RowsAffected() + } else if result == TR_SKIP { + res, err := db.Exec(`UPDATE task SET (status, assignee) = (1, NULL) + WHERE id=$1 AND assignee=$2`, id, workerId) + handleErr(err) + rowsAffected, _ = res.RowsAffected() + } logrus.WithFields(logrus.Fields{ "rowsAffected": rowsAffected, - }) + "taskId": id, + "workerId": workerId, + "verification": verification, + }).Trace("Database.ReleaseTask") return rowsAffected == 1 } @@ -151,16 +188,18 @@ func (database *Database) GetTaskFromProject(worker *Worker, projectId int64) *T row := db.QueryRow(` UPDATE task - SET assignee=$1 + SET assignee=$1, assign_time=extract(epoch from now() at time zone 'utc') WHERE id IN ( SELECT task.id FROM task INNER JOIN project p on task.project = p.id + LEFT JOIN worker_verifies_task wvt on task.id = wvt.task AND wvt.worker=$1 WHERE assignee IS NULL AND p.id=$2 AND status=1 AND (p.public OR EXISTS ( SELECT 1 FROM worker_has_access_to_project a WHERE a.worker=$1 AND a.project=$2 )) + AND wvt.task IS NULL ORDER BY task.priority DESC LIMIT 1 ) diff --git a/test/api_task_test.go b/test/api_task_test.go index 125b982..2a4c4a7 100644 --- a/test/api_task_test.go +++ b/test/api_task_test.go @@ -145,10 +145,11 @@ func TestCreateGetTask(t *testing.T) { worker := genWid() createTask(api.CreateTaskRequest{ - Project: resp.Id, - Recipe: "{\"url\":\"test\"}", - MaxRetries: 3, - Priority: 9999, + Project: resp.Id, + Recipe: "{\"url\":\"test\"}", + MaxRetries: 3, + Priority: 9999, + VerificationCount: 12, }, worker) taskResp := getTaskFromProject(resp.Id, worker) @@ -156,6 +157,9 @@ func TestCreateGetTask(t *testing.T) { if taskResp.Ok != true { t.Error() } + if taskResp.Task.VerificationCount != 12 { + t.Error() + } if taskResp.Task.Priority != 9999 { t.Error() } @@ -398,8 +402,8 @@ func TestReleaseTaskSuccess(t *testing.T) { task := getTaskFromProject(pid, worker).Task releaseResp := releaseTask(api.ReleaseTaskRequest{ - TaskId: task.Id, - Success: true, + TaskId: task.Id, + Result: storage.TR_OK, }, worker) if releaseResp.Ok != true { @@ -503,6 +507,138 @@ func TestCreateStringCollision(t *testing.T) { } } +func TestCannotVerifySameTaskTwice(t *testing.T) { + + pid := createProject(api.CreateProjectRequest{ + Priority: 1, + GitRepo: "verifysametasktwice", + CloneUrl: "verifysametasktwice", + Motd: "verifysametasktwice", + Public: true, + Name: "verifysametasktwice", + Version: "verifysametasktwice", + }).Id + + w := genWid() + + createTask(api.CreateTaskRequest{ + VerificationCount: 2, + Project: pid, + Recipe: "verifysametasktwice", + }, w) + + task := getTaskFromProject(pid, w).Task + rlr := releaseTask(api.ReleaseTaskRequest{ + Result: storage.TR_OK, + TaskId: task.Id, + Verification: 123, + }, w) + + if rlr.Updated != false { + t.Error() + } + + sameTask := getTaskFromProject(pid, w) + + if sameTask.Ok != false { + t.Error() + } +} + +func TestVerification2(t *testing.T) { + + pid := createProject(api.CreateProjectRequest{ + Priority: 1, + GitRepo: "verify2", + CloneUrl: "verify2", + Motd: "verify2", + Public: true, + Name: "verify2", + Version: "verify2", + }).Id + + w := genWid() + w2 := genWid() + w3 := genWid() + + createTask(api.CreateTaskRequest{ + VerificationCount: 2, + Project: pid, + Recipe: "verify2", + }, w) + + task := getTaskFromProject(pid, w).Task + rlr := releaseTask(api.ReleaseTaskRequest{ + Result: storage.TR_OK, + TaskId: task.Id, + Verification: 123, + }, w) + + if rlr.Updated != false { + t.Error() + } + + task2 := getTaskFromProject(pid, w2).Task + rlr2 := releaseTask(api.ReleaseTaskRequest{ + Result: storage.TR_OK, + Verification: 1, + TaskId: task2.Id, + }, w2) + + if rlr2.Updated != false { + t.Error() + } + + task3 := getTaskFromProject(pid, w3).Task + rlr3 := releaseTask(api.ReleaseTaskRequest{ + Result: storage.TR_OK, + Verification: 123, + TaskId: task3.Id, + }, w3) + + if rlr3.Updated != true { + t.Error() + } +} + +func TestReleaseTaskFail(t *testing.T) { + + pid := createProject(api.CreateProjectRequest{ + Priority: 1, + GitRepo: "releasefail", + CloneUrl: "releasefail", + Motd: "releasefail", + Public: true, + Name: "releasefail", + Version: "releasefail", + }).Id + + w := genWid() + + createTask(api.CreateTaskRequest{ + MaxRetries: 0, + Project: pid, + VerificationCount: 1, + Recipe: "releasefail", + }, w) + + task := getTaskFromProject(pid, w).Task + + resp := releaseTask(api.ReleaseTaskRequest{ + Result: storage.TR_FAIL, + TaskId: task.Id, + Verification: 1, + }, w) + + if resp.Updated != true { + t.Error() + } + if resp.Ok != true { + t.Error() + } + +} + func createTask(request api.CreateTaskRequest, worker *storage.Worker) *api.CreateTaskResponse { r := Post("/task/create", request, worker) diff --git a/test/schema.sql b/test/schema.sql index a239aa5..4098d24 100755 --- a/test/schema.sql +++ b/test/schema.sql @@ -1,5 +1,5 @@ DROP TABLE IF EXISTS worker_identity, worker, project, task, log_entry, - worker_has_access_to_project, manager, manager_has_role_on_project; + worker_has_access_to_project, manager, manager_has_role_on_project, project_monitoring, worker_verifies_task; DROP TYPE IF EXISTS status; DROP TYPE IF EXISTS log_level; @@ -23,14 +23,15 @@ CREATE TABLE worker CREATE TABLE project ( - id SERIAL PRIMARY KEY, - priority INTEGER DEFAULT 0, - name TEXT UNIQUE, - clone_url TEXT, - git_repo TEXT UNIQUE, - version TEXT, - motd TEXT, - public boolean + id SERIAL PRIMARY KEY, + priority INTEGER DEFAULT 0, + name TEXT UNIQUE, + clone_url TEXT, + git_repo TEXT UNIQUE, + version TEXT, + motd TEXT, + public boolean, + closed_task_count INT DEFAULT 0 ); CREATE TABLE worker_has_access_to_project @@ -42,17 +43,25 @@ CREATE TABLE worker_has_access_to_project CREATE TABLE task ( - hash64 BIGINT DEFAULT NULL UNIQUE, - id SERIAL PRIMARY KEY, - project INTEGER REFERENCES project (id), - assignee INTEGER REFERENCES worker (id), - max_assign_time INTEGER DEFAULT 0, - assign_time INTEGER DEFAULT 0, - priority SMALLINT DEFAULT 0, - retries SMALLINT DEFAULT 0, - max_retries SMALLINT, - status SMALLINT DEFAULT 1, - recipe TEXT + hash64 BIGINT DEFAULT NULL UNIQUE, + id SERIAL PRIMARY KEY, + project INTEGER REFERENCES project (id), + assignee INTEGER REFERENCES worker (id), + max_assign_time INTEGER DEFAULT 0, + assign_time INTEGER DEFAULT 0, + verification_count INTEGER DEFAULT 0, + priority SMALLINT DEFAULT 0, + retries SMALLINT DEFAULT 0, + max_retries SMALLINT, + status SMALLINT DEFAULT 1, + recipe TEXT +); + +CREATE TABLE worker_verifies_task +( + verification_hash BIGINT, + task BIGINT REFERENCES task (id) ON DELETE CASCADE, + worker INT REFERENCES worker (id) ); CREATE TABLE log_entry @@ -76,4 +85,25 @@ CREATE TABLE manager_has_role_on_project manager INTEGER REFERENCES manager (id), role SMALLINT, project INTEGER REFERENCES project (id) -); \ No newline at end of file +); + +CREATE TABLE project_monitoring +( + project INT REFERENCES project (id), + new_task_count INT, + failed_task_count INT, + closed_task_count INT +); + +CREATE OR REPLACE FUNCTION on_task_delete_proc() RETURNS TRIGGER AS +$$ +BEGIN + UPDATE project SET closed_task_count=closed_task_count + 1 WHERE id = OLD.project; + RETURN OLD; +END; +$$ LANGUAGE 'plpgsql'; +CREATE TRIGGER on_task_delete + BEFORE DELETE + ON task + FOR EACH ROW +EXECUTE PROCEDURE on_task_delete_proc();