task verification (not 100% tested)

This commit is contained in:
simon987 2019-02-03 20:04:35 -05:00
parent 9e09246a29
commit 22f4a6b358
6 changed files with 346 additions and 102 deletions

View File

@ -21,15 +21,18 @@ type CreateTaskRequest struct {
MaxAssignTime int64 `json:"max_assign_time"` MaxAssignTime int64 `json:"max_assign_time"`
Hash64 int64 `json:"hash_u64"` Hash64 int64 `json:"hash_u64"`
UniqueString string `json:"unique_string"` UniqueString string `json:"unique_string"`
VerificationCount int64 `json:"verification_count"`
} }
type ReleaseTaskRequest struct { type ReleaseTaskRequest struct {
TaskId int64 `json:"task_id"` TaskId int64 `json:"task_id"`
Success bool `json:"success"` Result storage.TaskResult `json:"result"`
Verification int64 `json:"verification"`
} }
type ReleaseTaskResponse struct { type ReleaseTaskResponse struct {
Ok bool `json:"ok"` Ok bool `json:"ok"`
Updated bool `json:"updated"`
Message string `json:"message,omitempty"` Message string `json:"message,omitempty"`
} }
@ -61,6 +64,7 @@ func (api *WebAPI) TaskCreate(r *Request) {
Priority: createReq.Priority, Priority: createReq.Priority,
AssignTime: 0, AssignTime: 0,
MaxAssignTime: createReq.MaxAssignTime, MaxAssignTime: createReq.MaxAssignTime,
VerificationCount: createReq.VerificationCount,
} }
if createReq.IsValid() && isTaskValid(task) { if createReq.IsValid() && isTaskValid(task) {
@ -229,10 +233,11 @@ func (api *WebAPI) TaskRelease(r *Request) {
Message: "Could not parse request", Message: "Could not parse request",
}, 400) }, 400)
} }
res := api.Database.ReleaseTask(req.TaskId, worker.Id, req.Success) res := api.Database.ReleaseTask(req.TaskId, worker.Id, req.Result, req.Verification)
response := ReleaseTaskResponse{ response := ReleaseTaskResponse{
Ok: res, Updated: res,
Ok: true,
} }
if !res { if !res {

View File

@ -1,5 +1,5 @@
DROP TABLE IF EXISTS worker_identity, worker, project, task, log_entry, DROP TABLE IF EXISTS worker_identity, worker, project, task, log_entry,
worker_has_access_to_project, manager, manager_has_role_on_project; worker_has_access_to_project, manager, manager_has_role_on_project, project_monitoring, worker_verifies_task;
DROP TYPE IF EXISTS status; DROP TYPE IF EXISTS status;
DROP TYPE IF EXISTS log_level; DROP TYPE IF EXISTS log_level;
@ -30,7 +30,8 @@ CREATE TABLE project
git_repo TEXT UNIQUE, git_repo TEXT UNIQUE,
version TEXT, version TEXT,
motd TEXT, motd TEXT,
public boolean public boolean,
closed_task_count INT DEFAULT 0
); );
CREATE TABLE worker_has_access_to_project CREATE TABLE worker_has_access_to_project
@ -48,6 +49,7 @@ CREATE TABLE task
assignee INTEGER REFERENCES worker (id), assignee INTEGER REFERENCES worker (id),
max_assign_time INTEGER DEFAULT 0, max_assign_time INTEGER DEFAULT 0,
assign_time INTEGER DEFAULT 0, assign_time INTEGER DEFAULT 0,
verification_count INTEGER DEFAULT 0,
priority SMALLINT DEFAULT 0, priority SMALLINT DEFAULT 0,
retries SMALLINT DEFAULT 0, retries SMALLINT DEFAULT 0,
max_retries SMALLINT, max_retries SMALLINT,
@ -55,6 +57,13 @@ CREATE TABLE task
recipe TEXT recipe TEXT
); );
CREATE TABLE worker_verifies_task
(
verification_hash BIGINT,
task BIGINT REFERENCES task (id) ON DELETE CASCADE,
worker INT REFERENCES worker (id)
);
CREATE TABLE log_entry CREATE TABLE log_entry
( (
level INTEGER, level INTEGER,
@ -77,3 +86,24 @@ CREATE TABLE manager_has_role_on_project
role SMALLINT, role SMALLINT,
project INTEGER REFERENCES project (id) project INTEGER REFERENCES project (id)
); );
CREATE TABLE project_monitoring
(
project INT REFERENCES project (id),
new_task_count INT,
failed_task_count INT,
closed_task_count INT
);
CREATE OR REPLACE FUNCTION on_task_delete_proc() RETURNS TRIGGER AS
$$
BEGIN
UPDATE project SET closed_task_count=closed_task_count + 1 WHERE id = OLD.project;
RETURN OLD;
END;
$$ LANGUAGE 'plpgsql';
CREATE TRIGGER on_task_delete
BEFORE DELETE
ON task
FOR EACH ROW
EXECUTE PROCEDURE on_task_delete_proc();

View File

@ -72,7 +72,8 @@ func (database *Database) GetProject(id int64) *Project {
func getProject(id int64, db *sql.DB) *Project { func getProject(id int64, db *sql.DB) *Project {
row := db.QueryRow(`SELECT * FROM project WHERE id=$1`, id) row := db.QueryRow(`SELECT id, priority, name, clone_url, git_repo, version, motd, public
FROM project WHERE id=$1`, id)
project, err := scanProject(row) project, err := scanProject(row)
if err != nil { if err != nil {
@ -102,7 +103,9 @@ func scanProject(row *sql.Row) (*Project, error) {
func (database *Database) GetProjectWithRepoName(repoName string) *Project { func (database *Database) GetProjectWithRepoName(repoName string) *Project {
db := database.getDB() db := database.getDB()
row := db.QueryRow(`SELECT * FROM project WHERE LOWER(git_repo)=$1`, strings.ToLower(repoName)) row := db.QueryRow(`SELECT id, priority, name, clone_url, git_repo, version, motd, public
FROM project WHERE LOWER(git_repo)=$1`,
strings.ToLower(repoName))
project, err := scanProject(row) project, err := scanProject(row)
if err != nil { if err != nil {
@ -191,7 +194,8 @@ func (database Database) GetAllProjectsStats() *[]ProjectStats {
SUM(CASE WHEN status= 1 THEN 1 ELSE 0 END) newCount, SUM(CASE WHEN status= 1 THEN 1 ELSE 0 END) newCount,
SUM(CASE WHEN status=2 THEN 1 ELSE 0 END) failedCount, SUM(CASE WHEN status=2 THEN 1 ELSE 0 END) failedCount,
SUM(CASE WHEN status=3 THEN 1 ELSE 0 END) closedCount, SUM(CASE WHEN status=3 THEN 1 ELSE 0 END) closedCount,
p.* p.Id, p.priority, p.name, p.clone_url, p.git_repo, p.version, p.motd,
p.public
FROM task RIGHT JOIN project p on task.project = p.id FROM task RIGHT JOIN project p on task.project = p.id
GROUP BY p.id ORDER BY p.name`) GROUP BY p.id ORDER BY p.name`)
handleErr(err) handleErr(err)

View File

@ -17,6 +17,7 @@ type Task struct {
Recipe string `json:"recipe"` Recipe string `json:"recipe"`
MaxAssignTime int64 `json:"max_assign_time"` MaxAssignTime int64 `json:"max_assign_time"`
AssignTime int64 `json:"assign_time"` AssignTime int64 `json:"assign_time"`
VerificationCount int64 `json:"verification_count"`
} }
type TaskStatus int type TaskStatus int
@ -24,8 +25,14 @@ type TaskStatus int
const ( const (
NEW TaskStatus = 1 NEW TaskStatus = 1
FAILED TaskStatus = 2 FAILED TaskStatus = 2
CLOSED TaskStatus = 3 )
TIMEOUT TaskStatus = 4
type TaskResult int
const (
TR_OK TaskResult = 0
TR_FAIL TaskResult = 1
TR_SKIP TaskResult = 2
) )
func (database *Database) SaveTask(task *Task, project int64, hash64 int64) error { func (database *Database) SaveTask(task *Task, project int64, hash64 int64) error {
@ -34,9 +41,9 @@ func (database *Database) SaveTask(task *Task, project int64, hash64 int64) erro
//TODO: For some reason it refuses to insert the 64-bit value unless I do that... //TODO: For some reason it refuses to insert the 64-bit value unless I do that...
res, err := db.Exec(fmt.Sprintf(` res, err := db.Exec(fmt.Sprintf(`
INSERT INTO task (project, max_retries, recipe, priority, max_assign_time, hash64) INSERT INTO task (project, max_retries, recipe, priority, max_assign_time, hash64,verification_count)
VALUES ($1,$2,$3,$4,$5,NULLIF(%d, 0))`, hash64), VALUES ($1,$2,$3,$4,$5,NULLIF(%d, 0),$6)`, hash64),
project, task.MaxRetries, task.Recipe, task.Priority, task.MaxAssignTime) project, task.MaxRetries, task.Recipe, task.Priority, task.MaxAssignTime, task.VerificationCount)
if err != nil { if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{ logrus.WithError(err).WithFields(logrus.Fields{
"task": task, "task": task,
@ -67,10 +74,12 @@ func (database *Database) GetTask(worker *Worker) *Task {
SELECT task.id SELECT task.id
FROM task FROM task
INNER JOIN project p on task.project = p.id INNER JOIN project p on task.project = p.id
LEFT JOIN worker_verifies_task wvt on task.id = wvt.task AND wvt.worker=$1
WHERE assignee IS NULL AND task.status=1 WHERE assignee IS NULL AND task.status=1
AND (p.public OR EXISTS ( AND (p.public OR EXISTS (
SELECT 1 FROM worker_has_access_to_project a WHERE a.worker=$1 AND a.project=p.id SELECT 1 FROM worker_has_access_to_project a WHERE a.worker=$1 AND a.project=p.id
)) ))
AND wvt.task IS NULL
ORDER BY p.priority DESC, task.priority DESC ORDER BY p.priority DESC, task.priority DESC
LIMIT 1 LIMIT 1
) )
@ -99,8 +108,9 @@ func getTaskById(id int64, db *sql.DB) *Task {
row := db.QueryRow(` row := db.QueryRow(`
SELECT task.id, task.priority, task.project, assignee, retries, max_retries, SELECT task.id, task.priority, task.project, assignee, retries, max_retries,
status, recipe, max_assign_time, assign_time, project.* FROM task status, recipe, max_assign_time, assign_time, verification_count, p.priority, p.name,
INNER JOIN project ON task.project = project.id p.clone_url, p.git_repo, p.version, p.motd, p.public FROM task
INNER JOIN project p ON task.project = p.id
WHERE task.id=$1`, id) WHERE task.id=$1`, id)
project := &Project{} project := &Project{}
task := &Task{} task := &Task{}
@ -108,7 +118,7 @@ func getTaskById(id int64, db *sql.DB) *Task {
err := row.Scan(&task.Id, &task.Priority, &project.Id, &task.Assignee, err := row.Scan(&task.Id, &task.Priority, &project.Id, &task.Assignee,
&task.Retries, &task.MaxRetries, &task.Status, &task.Recipe, &task.MaxAssignTime, &task.Retries, &task.MaxRetries, &task.Status, &task.Recipe, &task.MaxAssignTime,
&task.AssignTime, &project.Id, &project.Priority, &project.Name, &task.AssignTime, &task.VerificationCount, &project.Priority, &project.Name,
&project.CloneUrl, &project.GitRepo, &project.Version, &project.Motd, &project.Public) &project.CloneUrl, &project.GitRepo, &project.Version, &project.Motd, &project.Public)
handleErr(err) handleErr(err)
@ -120,27 +130,54 @@ func getTaskById(id int64, db *sql.DB) *Task {
return task return task
} }
func (database Database) ReleaseTask(id int64, workerId int64, success bool) bool { func (database Database) ReleaseTask(id int64, workerId int64, result TaskResult, verification int64) bool {
db := database.getDB() db := database.getDB()
var res sql.Result var rowsAffected int64
var err error if result == TR_OK {
if success { var pid int64
res, err = db.Exec(`UPDATE task SET (status, assignee) = (3, NULL)
WHERE id=$1 AND task.assignee=$2`, id, workerId) //If no verification is required
row := db.QueryRow(`DELETE FROM task WHERE id=$1 AND assignee=$2 AND verification_count < 2
RETURNING project`, id, workerId)
err := row.Scan(&pid)
if err == nil {
rowsAffected = 1
} else { } else {
res, err = db.Exec(`UPDATE task SET (status, assignee, retries) = //If verification is required
(CASE WHEN retries+1 >= max_retries THEN 2 ELSE 1 END, NULL, retries+1) _, err = db.Exec(`INSERT INTO worker_verifies_task (worker, verification_hash, task)
WHERE id=$1 AND assignee=$2`, id, workerId) SELECT $1,$2,task.id FROM task WHERE assignee=$1`, workerId, verification)
}
handleErr(err) handleErr(err)
rowsAffected, _ := res.RowsAffected() res, _ := db.Exec(`DELETE FROM task WHERE id=$1 AND assignee=$2 AND
(SELECT COUNT(*) as vcnt FROM worker_verifies_task wvt WHERE task=$1
GROUP BY wvt.verification_hash ORDER BY vcnt DESC LIMIT 1) >= task.verification_count`,
id, workerId)
rowsAffected, _ = res.RowsAffected()
_, _ = db.Exec(`UPDATE task SET assignee=NULL WHERE id=$1 AND assignee=$2`, id, workerId)
}
} else if result == TR_FAIL {
res, err := db.Exec(`UPDATE task SET (status, assignee, retries) =
(CASE WHEN retries+1 >= max_retries THEN 2 ELSE 1 END, NULL, retries+1)
WHERE id=$1 AND assignee=$2`, id, workerId)
handleErr(err)
rowsAffected, _ = res.RowsAffected()
} else if result == TR_SKIP {
res, err := db.Exec(`UPDATE task SET (status, assignee) = (1, NULL)
WHERE id=$1 AND assignee=$2`, id, workerId)
handleErr(err)
rowsAffected, _ = res.RowsAffected()
}
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"rowsAffected": rowsAffected, "rowsAffected": rowsAffected,
}) "taskId": id,
"workerId": workerId,
"verification": verification,
}).Trace("Database.ReleaseTask")
return rowsAffected == 1 return rowsAffected == 1
} }
@ -151,16 +188,18 @@ func (database *Database) GetTaskFromProject(worker *Worker, projectId int64) *T
row := db.QueryRow(` row := db.QueryRow(`
UPDATE task UPDATE task
SET assignee=$1 SET assignee=$1, assign_time=extract(epoch from now() at time zone 'utc')
WHERE id IN WHERE id IN
( (
SELECT task.id SELECT task.id
FROM task FROM task
INNER JOIN project p on task.project = p.id INNER JOIN project p on task.project = p.id
LEFT JOIN worker_verifies_task wvt on task.id = wvt.task AND wvt.worker=$1
WHERE assignee IS NULL AND p.id=$2 AND status=1 WHERE assignee IS NULL AND p.id=$2 AND status=1
AND (p.public OR EXISTS ( AND (p.public OR EXISTS (
SELECT 1 FROM worker_has_access_to_project a WHERE a.worker=$1 AND a.project=$2 SELECT 1 FROM worker_has_access_to_project a WHERE a.worker=$1 AND a.project=$2
)) ))
AND wvt.task IS NULL
ORDER BY task.priority DESC ORDER BY task.priority DESC
LIMIT 1 LIMIT 1
) )

View File

@ -149,6 +149,7 @@ func TestCreateGetTask(t *testing.T) {
Recipe: "{\"url\":\"test\"}", Recipe: "{\"url\":\"test\"}",
MaxRetries: 3, MaxRetries: 3,
Priority: 9999, Priority: 9999,
VerificationCount: 12,
}, worker) }, worker)
taskResp := getTaskFromProject(resp.Id, worker) taskResp := getTaskFromProject(resp.Id, worker)
@ -156,6 +157,9 @@ func TestCreateGetTask(t *testing.T) {
if taskResp.Ok != true { if taskResp.Ok != true {
t.Error() t.Error()
} }
if taskResp.Task.VerificationCount != 12 {
t.Error()
}
if taskResp.Task.Priority != 9999 { if taskResp.Task.Priority != 9999 {
t.Error() t.Error()
} }
@ -399,7 +403,7 @@ func TestReleaseTaskSuccess(t *testing.T) {
releaseResp := releaseTask(api.ReleaseTaskRequest{ releaseResp := releaseTask(api.ReleaseTaskRequest{
TaskId: task.Id, TaskId: task.Id,
Success: true, Result: storage.TR_OK,
}, worker) }, worker)
if releaseResp.Ok != true { if releaseResp.Ok != true {
@ -503,6 +507,138 @@ func TestCreateStringCollision(t *testing.T) {
} }
} }
func TestCannotVerifySameTaskTwice(t *testing.T) {
pid := createProject(api.CreateProjectRequest{
Priority: 1,
GitRepo: "verifysametasktwice",
CloneUrl: "verifysametasktwice",
Motd: "verifysametasktwice",
Public: true,
Name: "verifysametasktwice",
Version: "verifysametasktwice",
}).Id
w := genWid()
createTask(api.CreateTaskRequest{
VerificationCount: 2,
Project: pid,
Recipe: "verifysametasktwice",
}, w)
task := getTaskFromProject(pid, w).Task
rlr := releaseTask(api.ReleaseTaskRequest{
Result: storage.TR_OK,
TaskId: task.Id,
Verification: 123,
}, w)
if rlr.Updated != false {
t.Error()
}
sameTask := getTaskFromProject(pid, w)
if sameTask.Ok != false {
t.Error()
}
}
func TestVerification2(t *testing.T) {
pid := createProject(api.CreateProjectRequest{
Priority: 1,
GitRepo: "verify2",
CloneUrl: "verify2",
Motd: "verify2",
Public: true,
Name: "verify2",
Version: "verify2",
}).Id
w := genWid()
w2 := genWid()
w3 := genWid()
createTask(api.CreateTaskRequest{
VerificationCount: 2,
Project: pid,
Recipe: "verify2",
}, w)
task := getTaskFromProject(pid, w).Task
rlr := releaseTask(api.ReleaseTaskRequest{
Result: storage.TR_OK,
TaskId: task.Id,
Verification: 123,
}, w)
if rlr.Updated != false {
t.Error()
}
task2 := getTaskFromProject(pid, w2).Task
rlr2 := releaseTask(api.ReleaseTaskRequest{
Result: storage.TR_OK,
Verification: 1,
TaskId: task2.Id,
}, w2)
if rlr2.Updated != false {
t.Error()
}
task3 := getTaskFromProject(pid, w3).Task
rlr3 := releaseTask(api.ReleaseTaskRequest{
Result: storage.TR_OK,
Verification: 123,
TaskId: task3.Id,
}, w3)
if rlr3.Updated != true {
t.Error()
}
}
func TestReleaseTaskFail(t *testing.T) {
pid := createProject(api.CreateProjectRequest{
Priority: 1,
GitRepo: "releasefail",
CloneUrl: "releasefail",
Motd: "releasefail",
Public: true,
Name: "releasefail",
Version: "releasefail",
}).Id
w := genWid()
createTask(api.CreateTaskRequest{
MaxRetries: 0,
Project: pid,
VerificationCount: 1,
Recipe: "releasefail",
}, w)
task := getTaskFromProject(pid, w).Task
resp := releaseTask(api.ReleaseTaskRequest{
Result: storage.TR_FAIL,
TaskId: task.Id,
Verification: 1,
}, w)
if resp.Updated != true {
t.Error()
}
if resp.Ok != true {
t.Error()
}
}
func createTask(request api.CreateTaskRequest, worker *storage.Worker) *api.CreateTaskResponse { func createTask(request api.CreateTaskRequest, worker *storage.Worker) *api.CreateTaskResponse {
r := Post("/task/create", request, worker) r := Post("/task/create", request, worker)

View File

@ -1,5 +1,5 @@
DROP TABLE IF EXISTS worker_identity, worker, project, task, log_entry, DROP TABLE IF EXISTS worker_identity, worker, project, task, log_entry,
worker_has_access_to_project, manager, manager_has_role_on_project; worker_has_access_to_project, manager, manager_has_role_on_project, project_monitoring, worker_verifies_task;
DROP TYPE IF EXISTS status; DROP TYPE IF EXISTS status;
DROP TYPE IF EXISTS log_level; DROP TYPE IF EXISTS log_level;
@ -30,7 +30,8 @@ CREATE TABLE project
git_repo TEXT UNIQUE, git_repo TEXT UNIQUE,
version TEXT, version TEXT,
motd TEXT, motd TEXT,
public boolean public boolean,
closed_task_count INT DEFAULT 0
); );
CREATE TABLE worker_has_access_to_project CREATE TABLE worker_has_access_to_project
@ -48,6 +49,7 @@ CREATE TABLE task
assignee INTEGER REFERENCES worker (id), assignee INTEGER REFERENCES worker (id),
max_assign_time INTEGER DEFAULT 0, max_assign_time INTEGER DEFAULT 0,
assign_time INTEGER DEFAULT 0, assign_time INTEGER DEFAULT 0,
verification_count INTEGER DEFAULT 0,
priority SMALLINT DEFAULT 0, priority SMALLINT DEFAULT 0,
retries SMALLINT DEFAULT 0, retries SMALLINT DEFAULT 0,
max_retries SMALLINT, max_retries SMALLINT,
@ -55,6 +57,13 @@ CREATE TABLE task
recipe TEXT recipe TEXT
); );
CREATE TABLE worker_verifies_task
(
verification_hash BIGINT,
task BIGINT REFERENCES task (id) ON DELETE CASCADE,
worker INT REFERENCES worker (id)
);
CREATE TABLE log_entry CREATE TABLE log_entry
( (
level INTEGER, level INTEGER,
@ -77,3 +86,24 @@ CREATE TABLE manager_has_role_on_project
role SMALLINT, role SMALLINT,
project INTEGER REFERENCES project (id) project INTEGER REFERENCES project (id)
); );
CREATE TABLE project_monitoring
(
project INT REFERENCES project (id),
new_task_count INT,
failed_task_count INT,
closed_task_count INT
);
CREATE OR REPLACE FUNCTION on_task_delete_proc() RETURNS TRIGGER AS
$$
BEGIN
UPDATE project SET closed_task_count=closed_task_count + 1 WHERE id = OLD.project;
RETURN OLD;
END;
$$ LANGUAGE 'plpgsql';
CREATE TRIGGER on_task_delete
BEFORE DELETE
ON task
FOR EACH ROW
EXECUTE PROCEDURE on_task_delete_proc();