Performance patch, version bump

This commit is contained in:
2019-09-21 14:32:18 -04:00
parent 77b4da0653
commit 3123abceb6
34 changed files with 362 additions and 257 deletions

View File

@@ -1,23 +1,24 @@
package storage
import (
"database/sql"
"errors"
"fmt"
"github.com/lib/pq"
"github.com/sirupsen/logrus"
)
type Task struct {
Id int64 `json:"id"`
Priority int64 `json:"priority"`
Priority int16 `json:"priority"`
Project *Project `json:"project"`
Assignee int64 `json:"assignee"`
Retries int64 `json:"retries"`
MaxRetries int64 `json:"max_retries"`
Retries int16 `json:"retries"`
MaxRetries int16 `json:"max_retries"`
Status TaskStatus `json:"status"`
Recipe string `json:"recipe"`
MaxAssignTime int64 `json:"max_assign_time"`
AssignTime int64 `json:"assign_time"`
VerificationCount int64 `json:"verification_count"`
VerificationCount int16 `json:"verification_count"`
}
type TaskStatus int
@@ -42,16 +43,62 @@ type SaveTaskRequest struct {
WorkerId int64
}
func (database *Database) SaveTask(task *Task, project int64, hash64 int64, wid int64) error {
func (database *Database) checkAccess(workerId, projectId int64, assign, submit bool) bool {
if database.submitAccessCache[workerId] == nil {
database.submitAccessCache[workerId] = make(map[int64]bool)
database.assignAccessCache[workerId] = make(map[int64]bool)
} else {
_, ok := database.submitAccessCache[workerId][projectId]
if ok {
if assign && !database.assignAccessCache[workerId][projectId] {
return false
}
if submit && !database.submitAccessCache[workerId][projectId] {
return false
}
return true
}
}
db := database.getDB()
res, err := db.Exec(fmt.Sprintf(`
INSERT INTO task (project, max_retries, recipe, priority, max_assign_time, hash64,verification_count)
SELECT $1,$2,$3,$4,$5,NULLIF(%d, 0),$6 FROM worker_access
WHERE role_submit AND NOT request AND worker=$7 AND project=$1`, hash64),
project, task.MaxRetries, task.Recipe, task.Priority, task.MaxAssignTime, task.VerificationCount,
wid)
row := db.QueryRow(`SELECT role_assign, role_submit FROM worker_access
WHERE worker=$1 and project=$2 AND NOT request`,
workerId, projectId)
var hasAssign, hasSubmit bool
err := row.Scan(&hasAssign, &hasSubmit)
database.submitAccessCache[workerId][projectId] = hasSubmit
database.assignAccessCache[workerId][projectId] = hasAssign
if err != nil {
return false
}
if !hasAssign && assign {
return false
}
if !hasSubmit && submit {
return false
}
return true
}
func (database *Database) SaveTask(task *Task, project int64, hash64 int64, wid int64) error {
if !database.checkAccess(wid, project, false, true) {
return errors.New("unauthorized task submit")
}
db := database.getDB()
_, err := db.Exec(`INSERT INTO task
(project, max_retries, recipe, priority, max_assign_time, hash64, verification_count)
VALUES ($1,$2,$3,$4,$5,$6,$7)`,
project, task.MaxRetries, task.Recipe, task.Priority, task.MaxAssignTime,
makeNullableInt(hash64), task.VerificationCount)
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"task": task,
@@ -59,45 +106,55 @@ func (database *Database) SaveTask(task *Task, project int64, hash64 int64, wid
return err
}
rowsAffected, err := res.RowsAffected()
handleErr(err)
logrus.WithFields(logrus.Fields{
"rowsAffected": rowsAffected,
"task": task,
}).Trace("Database.saveTask INSERT task")
if rowsAffected == 0 {
return errors.New("unauthorized task submit")
}
return nil
}
func makeNullableInt(i int64) sql.NullInt64 {
if i == 0 {
return sql.NullInt64{Valid: false}
} else {
return sql.NullInt64{
Valid: true,
Int64: i,
}
}
}
func (database Database) BulkSaveTask(bulkSaveTaskReqs []SaveTaskRequest) []error {
if !database.checkAccess(bulkSaveTaskReqs[0].WorkerId, bulkSaveTaskReqs[0].Project,
false, true) {
return []error{errors.New("unauthorized task submit")}
}
db := database.getDB()
txn, err := db.Begin()
handleErr(err)
errs := make([]error, len(bulkSaveTaskReqs))
for i, req := range bulkSaveTaskReqs {
res, err := db.Exec(fmt.Sprintf(`
INSERT INTO task (project, max_retries, recipe, priority, max_assign_time, hash64,verification_count)
SELECT $1,$2,$3,$4,$5,NULLIF(%d, 0),$6 FROM worker_access
WHERE role_submit AND NOT request AND worker=$7 AND project=$1`, req.Hash64),
req.Project, req.Task.MaxRetries, req.Task.Recipe, req.Task.Priority,
req.Task.MaxAssignTime, req.Task.VerificationCount,
req.WorkerId)
errs[i] = err
stmt, _ := txn.Prepare(pq.CopyIn(
"task",
"project", "max_retries", "recipe", "priority",
"max_assign_time", "hash64", "verification_count",
))
if res != nil {
rowsAffected, _ := res.RowsAffected()
if rowsAffected == 0 {
errs[i] = errors.New("unauthorized task submit")
}
for i, req := range bulkSaveTaskReqs {
_, err = stmt.Exec(req.Project, req.Task.MaxRetries, req.Task.Recipe,
req.Task.Priority, req.Task.MaxAssignTime, makeNullableInt(req.Hash64),
req.Task.VerificationCount)
if err != nil {
errs[i] = err
}
}
_, err = stmt.Exec()
err = stmt.Close()
handleErr(err)
err = txn.Commit()
handleErr(err)
return errs
}
@@ -107,7 +164,7 @@ func (database Database) ReleaseTask(id int64, workerId int64, result TaskResult
var taskUpdated bool
if result == TR_OK {
row := db.QueryRow(fmt.Sprintf(`SELECT release_task_ok(%d,%d,%d)`, workerId, id, verification))
row := db.QueryRow(`SELECT release_task_ok($1,$2,$3)`, workerId, id, verification)
err := row.Scan(&taskUpdated)
if err != nil {
@@ -128,13 +185,6 @@ func (database Database) ReleaseTask(id int64, workerId int64, result TaskResult
taskUpdated = rowsAffected == 1
}
logrus.WithFields(logrus.Fields{
"taskUpdated": taskUpdated,
"taskId": id,
"workerId": workerId,
"verification": verification,
}).Trace("Database.ReleaseTask")
return taskUpdated
}
@@ -161,35 +211,21 @@ func (database *Database) GetTaskFromProject(worker *Worker, projectId int64) *T
ORDER BY task.priority DESC
LIMIT 1
)
RETURNING task.id`, worker.Id, projectId)
RETURNING task.id, task.priority, assignee, retries, max_retries,
status, recipe, max_assign_time, assign_time, verification_count`, worker.Id, projectId)
var id int64
err := row.Scan(&id)
database.assignMutex.Unlock()
task := &Task{}
err := row.Scan(&task.Id, &task.Priority, &task.Assignee,
&task.Retries, &task.MaxRetries, &task.Status, &task.Recipe, &task.MaxAssignTime,
&task.AssignTime, &task.VerificationCount)
if err != nil {
return nil
}
row = db.QueryRow(`
SELECT task.id, task.priority, task.project, assignee, retries, max_retries,
status, recipe, max_assign_time, assign_time, verification_count, project.priority, project.name,
project.clone_url, project.git_repo, project.version, project.motd, project.public, COALESCE(project.chain,0),
project.assign_rate, project.submit_rate
FROM task
INNER JOIN project project ON task.project = project.id
WHERE task.id=$1`, id)
project := &Project{}
task := &Task{}
task.Project = project
err = row.Scan(&task.Id, &task.Priority, &project.Id, &task.Assignee,
&task.Retries, &task.MaxRetries, &task.Status, &task.Recipe, &task.MaxAssignTime,
&task.AssignTime, &task.VerificationCount, &project.Priority, &project.Name,
&project.CloneUrl, &project.GitRepo, &project.Version, &project.Motd, &project.Public,
&project.Chain, &project.AssignRate, &project.SubmitRate)
handleErr(err)
task.Project = database.GetProject(projectId)
return task
}