added optional task unique field

This commit is contained in:
simon987
2019-01-29 18:16:40 -05:00
parent f250a2180c
commit 64152bfc08
35 changed files with 877 additions and 156 deletions

View File

@@ -3,7 +3,6 @@ package storage
import (
"database/sql"
"github.com/Sirupsen/logrus"
"github.com/google/uuid"
"strings"
)
@@ -19,8 +18,8 @@ type Project struct {
}
type AssignedTasks struct {
Assignee uuid.UUID `json:"assignee"`
TaskCount int64 `json:"task_count"`
Assignee string `json:"assignee"`
TaskCount int64 `json:"task_count"`
}
type ProjectStats struct {
@@ -116,14 +115,16 @@ func (database *Database) GetProjectWithRepoName(repoName string) *Project {
return project
}
func (database *Database) UpdateProject(project *Project) {
func (database *Database) UpdateProject(project *Project) error {
db := database.getDB()
res, err := db.Exec(`UPDATE project
SET (priority, name, clone_url, git_repo, version, motd, public) = ($1,$2,$3,$4,$5,$6,$7) WHERE id=$8`,
project.Priority, project.Name, project.CloneUrl, project.GitRepo, project.Version, project.Motd, project.Public, project.Id)
handleErr(err)
if err != nil {
return err
}
rowsAffected, _ := res.RowsAffected()
@@ -132,7 +133,7 @@ func (database *Database) UpdateProject(project *Project) {
"rowsAffected": rowsAffected,
}).Trace("Database.updateProject UPDATE project")
return
return nil
}
func (database *Database) GetProjectStats(id int64) *ProjectStats {
@@ -154,18 +155,27 @@ func (database *Database) GetProjectStats(id int64) *ProjectStats {
logrus.WithError(err).WithFields(logrus.Fields{
"id": id,
}).Trace("Get project stats: No task for this project")
return nil
}
//todo: only expose worker alias
rows, err := db.Query(`SELECT assignee, COUNT(*) FROM TASK
LEFT JOIN worker ON TASK.assignee = worker.id WHERE project=$1 GROUP BY assignee`, id)
rows, err := db.Query(`SELECT worker.alias, COUNT(*) as wc FROM TASK
LEFT JOIN worker ON TASK.assignee = worker.id WHERE project=$1
GROUP BY worker.id ORDER BY wc LIMIT 10`, id)
stats.Assignees = []*AssignedTasks{}
for rows.Next() {
assignee := AssignedTasks{}
err = rows.Scan(&assignee.Assignee, &assignee.TaskCount)
var assigneeAlias sql.NullString
err = rows.Scan(&assigneeAlias, &assignee.TaskCount)
handleErr(err)
if assigneeAlias.Valid {
assignee.Assignee = assigneeAlias.String
} else {
assignee.Assignee = "unassigned"
}
stats.Assignees = append(stats.Assignees, &assignee)
}
}
@@ -182,8 +192,8 @@ func (database Database) GetAllProjectsStats() *[]ProjectStats {
SUM(CASE WHEN status='failed' THEN 1 ELSE 0 END) failedCount,
SUM(CASE WHEN status='closed' THEN 1 ELSE 0 END) closedCount,
p.*
FROM task INNER JOIN project p on task.project = p.id
GROUP BY p.id`)
FROM task RIGHT JOIN project p on task.project = p.id
GROUP BY p.id ORDER BY p.name`)
handleErr(err)
for rows.Next() {
@@ -191,7 +201,7 @@ func (database Database) GetAllProjectsStats() *[]ProjectStats {
stats := ProjectStats{}
p := &Project{}
err := rows.Scan(&stats.NewTaskCount, &stats.FailedTaskCount, &stats.ClosedTaskCount,
&p.Id, &p.Priority, &p.Motd, &p.Name, &p.CloneUrl, &p.GitRepo, &p.Version, &p.Public)
&p.Id, &p.Priority, &p.Name, &p.CloneUrl, &p.GitRepo, &p.Version, &p.Motd, &p.Public)
handleErr(err)
stats.Project = p

View File

@@ -2,6 +2,7 @@ package storage
import (
"database/sql"
"fmt"
"github.com/Sirupsen/logrus"
"github.com/google/uuid"
)
@@ -19,13 +20,14 @@ type Task struct {
AssignTime int64 `json:"assign_time"`
}
func (database *Database) SaveTask(task *Task, project int64) error {
func (database *Database) SaveTask(task *Task, project int64, hash64 int64) error {
db := database.getDB()
res, err := db.Exec(`
INSERT INTO task (project, max_retries, recipe, priority, max_assign_time)
VALUES ($1,$2,$3,$4,$5)`,
//TODO: For some reason it refuses to insert the 64-bit value unless I do that...
res, err := db.Exec(fmt.Sprintf(`
INSERT INTO task (project, max_retries, recipe, priority, max_assign_time, hash64)
VALUES ($1,$2,$3,$4,$5,NULLIF(%d, 0))`, hash64),
project, task.MaxRetries, task.Recipe, task.Priority, task.MaxAssignTime)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
@@ -57,7 +59,7 @@ func (database *Database) GetTask(worker *Worker) *Task {
SELECT task.id
FROM task
INNER JOIN project p on task.project = p.id
WHERE assignee IS NULL
WHERE assignee IS NULL AND task.status='new'
AND (p.public OR EXISTS (
SELECT 1 FROM worker_has_access_to_project a WHERE a.worker=$1 AND a.project=p.id
))
@@ -88,7 +90,8 @@ func (database *Database) GetTask(worker *Worker) *Task {
func getTaskById(id int64, db *sql.DB) *Task {
row := db.QueryRow(`
SELECT * FROM task
SELECT task.id, task.priority, task.project, assignee, retries, max_retries,
status, recipe, max_assign_time, assign_time, project.* FROM task
INNER JOIN project ON task.project = project.id
WHERE task.id=$1`, id)
task := scanTask(row)
@@ -109,11 +112,11 @@ func (database Database) ReleaseTask(id int64, workerId *uuid.UUID, success bool
var err error
if success {
res, err = db.Exec(`UPDATE task SET (status, assignee) = ('closed', NULL)
WHERE id=$2 AND task.assignee=$2`, id, workerId)
WHERE id=$1 AND task.assignee=$2`, id, workerId)
} else {
res, err = db.Exec(`UPDATE task SET (status, assignee, retries) =
(CASE WHEN retries+1 >= max_retries THEN 'failed' ELSE 'new' END, NULL, retries+1)
WHERE id=$2 AND assignee=$2`, id, workerId)
WHERE id=$1 AND assignee=$2`, id, workerId)
}
handleErr(err)
@@ -138,7 +141,7 @@ func (database *Database) GetTaskFromProject(worker *Worker, projectId int64) *T
SELECT task.id
FROM task
INNER JOIN project p on task.project = p.id
WHERE assignee IS NULL AND p.id=$2
WHERE assignee IS NULL AND p.id=$2 AND status='new'
AND (p.public OR EXISTS (
SELECT 1 FROM worker_has_access_to_project a WHERE a.worker=$1 AND a.project=$2
))

View File

@@ -16,6 +16,8 @@ type Worker struct {
Id uuid.UUID `json:"id"`
Created int64 `json:"created"`
Identity *Identity `json:"identity"`
Alias string `json:"alias,omitempty"`
Secret []byte `json:"secret"`
}
func (database *Database) SaveWorker(worker *Worker) {
@@ -24,8 +26,8 @@ func (database *Database) SaveWorker(worker *Worker) {
identityId := getOrCreateIdentity(worker.Identity, db)
res, err := db.Exec("INSERT INTO worker (id, created, identity) VALUES ($1,$2,$3)",
worker.Id, worker.Created, identityId)
res, err := db.Exec("INSERT INTO worker (id, created, identity, secret, alias) VALUES ($1,$2,$3,$4,$5)",
worker.Id, worker.Created, identityId, worker.Secret, worker.Alias)
handleErr(err)
var rowsAffected, _ = res.RowsAffected()
@@ -41,8 +43,8 @@ func (database *Database) GetWorker(id uuid.UUID) *Worker {
worker := &Worker{}
var identityId int64
row := db.QueryRow("SELECT id, created, identity FROM worker WHERE id=$1", id)
err := row.Scan(&worker.Id, &worker.Created, &identityId)
row := db.QueryRow("SELECT id, created, identity, secret, alias FROM worker WHERE id=$1", id)
err := row.Scan(&worker.Id, &worker.Created, &identityId, &worker.Secret, &worker.Alias)
if err != nil {
logrus.WithFields(logrus.Fields{
"id": id,
@@ -64,7 +66,7 @@ func getIdentity(id int64, db *sql.DB) (*Identity, error) {
identity := &Identity{}
row := db.QueryRow("SELECT remote_addr, user_agent FROM workeridentity WHERE id=$1", id)
row := db.QueryRow("SELECT remote_addr, user_agent FROM worker_identity WHERE id=$1", id)
err := row.Scan(&identity.RemoteAddr, &identity.UserAgent)
if err != nil {
@@ -80,7 +82,7 @@ func getIdentity(id int64, db *sql.DB) (*Identity, error) {
func getOrCreateIdentity(identity *Identity, db *sql.DB) int64 {
res, err := db.Exec("INSERT INTO workeridentity (remote_addr, user_agent) VALUES ($1,$2) ON CONFLICT DO NOTHING",
res, err := db.Exec("INSERT INTO worker_identity (remote_addr, user_agent) VALUES ($1,$2) ON CONFLICT DO NOTHING",
identity.RemoteAddr, identity.UserAgent)
handleErr(err)
@@ -89,7 +91,7 @@ func getOrCreateIdentity(identity *Identity, db *sql.DB) int64 {
"rowsAffected": rowsAffected,
}).Trace("Database.saveWorker INSERT workerIdentity")
row := db.QueryRow("SELECT (id) FROM workeridentity WHERE remote_addr=$1", identity.RemoteAddr)
row := db.QueryRow("SELECT (id) FROM worker_identity WHERE remote_addr=$1", identity.RemoteAddr)
var rowId int64
err = row.Scan(&rowId)
@@ -127,7 +129,7 @@ func (database *Database) GrantAccess(workerId *uuid.UUID, projectId int64) bool
return rowsAffected == 1
}
func (database Database) RemoveAccess(workerId *uuid.UUID, projectId int64) bool {
func (database *Database) RemoveAccess(workerId *uuid.UUID, projectId int64) bool {
db := database.getDB()
res, err := db.Exec(`DELETE FROM worker_has_access_to_project WHERE worker=$1 AND project=$2`,
@@ -144,3 +146,20 @@ func (database Database) RemoveAccess(workerId *uuid.UUID, projectId int64) bool
return rowsAffected == 1
}
func (database *Database) UpdateWorker(worker *Worker) bool {
db := database.getDB()
res, err := db.Exec(`UPDATE worker SET alias=$1 WHERE id=$2`,
worker.Alias, worker.Id)
handleErr(err)
rowsAffected, _ := res.RowsAffected()
logrus.WithFields(logrus.Fields{
"rowsAffected": rowsAffected,
"worker": worker,
}).Trace("Database.UpdateWorker UPDATE worker")
return rowsAffected == 1
}