From 3123abceb6b0543b5dc395f27a14a0bab682ae7f Mon Sep 17 00:00:00 2001 From: simon Date: Sat, 21 Sep 2019 14:32:18 -0400 Subject: [PATCH] Performance patch, version bump --- api/log.go | 17 -- api/main.go | 94 +++++----- api/models.go | 8 +- api/project.go | 6 + api/task.go | 25 +-- client/models.go | 14 +- config.yml | 0 jenkins/Jenkinsfile | 1 + jenkins/deploy.sh | 0 main/main.go | 9 - schema.sql | 24 ++- storage/database.go | 19 +- storage/project.go | 12 +- storage/task.go | 172 +++++++++++------- storage/worker.go | 28 +-- test/api_project_test.go | 5 +- test/api_task_bench_test.go | 41 +++++ test/api_task_test.go | 52 +++++- test/schema.sql | 25 ++- web/angular/e2e/protractor.conf.js | 0 web/angular/e2e/src/app.e2e-spec.ts | 0 web/angular/e2e/src/app.po.ts | 0 web/angular/e2e/tsconfig.e2e.json | 0 web/angular/package-lock.json | 63 ++----- web/angular/package.json | 2 +- web/angular/src/app/api.service.ts | 0 web/angular/src/app/app-routing.module.ts | 0 web/angular/src/app/app.component.css | 0 web/angular/src/app/app.component.html | 0 web/angular/src/app/app.module.ts | 0 .../project-list/project-list.component.css | 0 .../project-list/project-list.component.html | 0 .../project-list/project-list.component.ts | 0 web/angular/src/assets/i18n/en.json | 2 +- 34 files changed, 362 insertions(+), 257 deletions(-) mode change 100755 => 100644 config.yml mode change 100755 => 100644 jenkins/deploy.sh mode change 100755 => 100644 schema.sql mode change 100755 => 100644 test/schema.sql mode change 100755 => 100644 web/angular/e2e/protractor.conf.js mode change 100755 => 100644 web/angular/e2e/src/app.e2e-spec.ts mode change 100755 => 100644 web/angular/e2e/src/app.po.ts mode change 100755 => 100644 web/angular/e2e/tsconfig.e2e.json mode change 100755 => 100644 web/angular/src/app/api.service.ts mode change 100755 => 100644 web/angular/src/app/app-routing.module.ts mode change 100755 => 100644 web/angular/src/app/app.component.css mode change 100755 => 100644 web/angular/src/app/app.component.html mode change 100755 => 100644 web/angular/src/app/app.module.ts mode change 100755 => 100644 web/angular/src/app/project-list/project-list.component.css mode change 100755 => 100644 web/angular/src/app/project-list/project-list.component.html mode change 100755 => 100644 web/angular/src/app/project-list/project-list.component.ts diff --git a/api/log.go b/api/log.go index 9be3802..7da2677 100644 --- a/api/log.go +++ b/api/log.go @@ -5,7 +5,6 @@ import ( "errors" "github.com/simon987/task_tracker/config" "github.com/sirupsen/logrus" - "github.com/valyala/fasthttp" "os" "time" ) @@ -16,22 +15,6 @@ func (e *LogRequest) Time() time.Time { return t } -func LogRequestMiddleware(h RequestHandler) fasthttp.RequestHandler { - return fasthttp.RequestHandler(func(ctx *fasthttp.RequestCtx) { - - ctx.Response.Header.Add("Access-Control-Allow-Headers", "Content-Type") - ctx.Response.Header.Add("Access-Control-Allow-Methods", "GET, POST, OPTION") - ctx.Response.Header.Add("Access-Control-Allow-Origin", "*") - - logrus.WithFields(logrus.Fields{ - "path": string(ctx.Path()), - "header": ctx.Request.Header.String(), - }).Trace(string(ctx.Method())) - - h(&Request{Ctx: ctx}) - }) -} - func (api *WebAPI) SetupLogger() { writer, err := os.OpenFile("log.txt", os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600) if err != nil { diff --git a/api/main.go b/api/main.go index 8a232eb..edd973d 100644 --- a/api/main.go +++ b/api/main.go @@ -27,7 +27,13 @@ type RequestHandler func(*Request) var info = Info{ Name: "task_tracker", - Version: "1.0", + Version: "1.1", +} + +func Middleware(h RequestHandler) fasthttp.RequestHandler { + return fasthttp.RequestHandler(func(ctx *fasthttp.RequestCtx) { + h(&Request{Ctx: ctx}) + }) } func Index(r *Request) { @@ -77,56 +83,56 @@ func New() *WebAPI { Name: info.Name, } - api.router.GET("/", LogRequestMiddleware(Index)) + api.router.GET("/", Middleware(Index)) - api.router.POST("/log/trace", LogRequestMiddleware(api.LogTrace)) - api.router.POST("/log/info", LogRequestMiddleware(api.LogInfo)) - api.router.POST("/log/warn", LogRequestMiddleware(api.LogWarn)) - api.router.POST("/log/error", LogRequestMiddleware(api.LogError)) + api.router.POST("/log/trace", Middleware(api.LogTrace)) + api.router.POST("/log/info", Middleware(api.LogInfo)) + api.router.POST("/log/warn", Middleware(api.LogWarn)) + api.router.POST("/log/error", Middleware(api.LogError)) - api.router.POST("/worker/create", LogRequestMiddleware(api.CreateWorker)) - api.router.POST("/worker/update", LogRequestMiddleware(api.UpdateWorker)) - api.router.POST("/worker/set_paused", LogRequestMiddleware(api.WorkerSetPaused)) - api.router.GET("/worker/get/:id", LogRequestMiddleware(api.GetWorker)) - api.router.GET("/worker/stats", LogRequestMiddleware(api.GetAllWorkerStats)) + api.router.POST("/worker/create", Middleware(api.CreateWorker)) + api.router.POST("/worker/update", Middleware(api.UpdateWorker)) + api.router.POST("/worker/set_paused", Middleware(api.WorkerSetPaused)) + api.router.GET("/worker/get/:id", Middleware(api.GetWorker)) + api.router.GET("/worker/stats", Middleware(api.GetAllWorkerStats)) - api.router.POST("/project/create", LogRequestMiddleware(api.CreateProject)) - api.router.GET("/project/get/:id", LogRequestMiddleware(api.GetProject)) - api.router.POST("/project/update/:id", LogRequestMiddleware(api.UpdateProject)) - api.router.GET("/project/list", LogRequestMiddleware(api.GetProjectList)) - api.router.GET("/project/monitoring-between/:id", LogRequestMiddleware(api.GetSnapshotsWithinRange)) - api.router.GET("/project/monitoring/:id", LogRequestMiddleware(api.GetNSnapshots)) - api.router.GET("/project/assignees/:id", LogRequestMiddleware(api.GetAssigneeStatsForProject)) - api.router.GET("/project/access_list/:id", LogRequestMiddleware(api.GetWorkerAccessListForProject)) - api.router.POST("/project/request_access", LogRequestMiddleware(api.CreateWorkerAccess)) - api.router.POST("/project/accept_request/:id/:wid", LogRequestMiddleware(api.AcceptAccessRequest)) - api.router.POST("/project/reject_request/:id/:wid", LogRequestMiddleware(api.RejectAccessRequest)) - api.router.GET("/project/secret/:id", LogRequestMiddleware(api.GetSecret)) - api.router.POST("/project/secret/:id", LogRequestMiddleware(api.SetSecret)) - api.router.GET("/project/webhook_secret/:id", LogRequestMiddleware(api.GetWebhookSecret)) - api.router.POST("/project/webhook_secret/:id", LogRequestMiddleware(api.SetWebhookSecret)) - api.router.POST("/project/reset_failed_tasks/:id", LogRequestMiddleware(api.ResetFailedTasks)) - api.router.POST("/project/hard_reset/:id", LogRequestMiddleware(api.HardReset)) - api.router.POST("/project/reclaim_assigned_tasks/:id", LogRequestMiddleware(api.ReclaimAssignedTasks)) + api.router.POST("/project/create", Middleware(api.CreateProject)) + api.router.GET("/project/get/:id", Middleware(api.GetProject)) + api.router.POST("/project/update/:id", Middleware(api.UpdateProject)) + api.router.GET("/project/list", Middleware(api.GetProjectList)) + api.router.GET("/project/monitoring-between/:id", Middleware(api.GetSnapshotsWithinRange)) + api.router.GET("/project/monitoring/:id", Middleware(api.GetNSnapshots)) + api.router.GET("/project/assignees/:id", Middleware(api.GetAssigneeStatsForProject)) + api.router.GET("/project/access_list/:id", Middleware(api.GetWorkerAccessListForProject)) + api.router.POST("/project/request_access", Middleware(api.CreateWorkerAccess)) + api.router.POST("/project/accept_request/:id/:wid", Middleware(api.AcceptAccessRequest)) + api.router.POST("/project/reject_request/:id/:wid", Middleware(api.RejectAccessRequest)) + api.router.GET("/project/secret/:id", Middleware(api.GetSecret)) + api.router.POST("/project/secret/:id", Middleware(api.SetSecret)) + api.router.GET("/project/webhook_secret/:id", Middleware(api.GetWebhookSecret)) + api.router.POST("/project/webhook_secret/:id", Middleware(api.SetWebhookSecret)) + api.router.POST("/project/reset_failed_tasks/:id", Middleware(api.ResetFailedTasks)) + api.router.POST("/project/hard_reset/:id", Middleware(api.HardReset)) + api.router.POST("/project/reclaim_assigned_tasks/:id", Middleware(api.ReclaimAssignedTasks)) - api.router.POST("/task/submit", LogRequestMiddleware(api.SubmitTask)) - api.router.POST("/task/bulk_submit", LogRequestMiddleware(api.BulkSubmitTask)) - api.router.GET("/task/get/:project", LogRequestMiddleware(api.GetTaskFromProject)) - api.router.POST("/task/release", LogRequestMiddleware(api.ReleaseTask)) + api.router.POST("/task/submit", Middleware(api.SubmitTask)) + api.router.POST("/task/bulk_submit", Middleware(api.BulkSubmitTask)) + api.router.GET("/task/get/:project", Middleware(api.GetTaskFromProject)) + api.router.POST("/task/release", Middleware(api.ReleaseTask)) - api.router.POST("/git/receivehook", LogRequestMiddleware(api.ReceiveGitWebHook)) + api.router.POST("/git/receivehook", Middleware(api.ReceiveGitWebHook)) - api.router.POST("/logs", LogRequestMiddleware(api.GetLog)) + api.router.POST("/logs", Middleware(api.GetLog)) - api.router.POST("/register", LogRequestMiddleware(api.Register)) - api.router.POST("/login", LogRequestMiddleware(api.Login)) - api.router.GET("/logout", LogRequestMiddleware(api.Logout)) - api.router.GET("/account", LogRequestMiddleware(api.GetAccountDetails)) - api.router.GET("/manager/list", LogRequestMiddleware(api.GetManagerList)) - api.router.GET("/manager/list_for_project/:id", LogRequestMiddleware(api.GetManagerListWithRoleOn)) - api.router.GET("/manager/promote/:id", LogRequestMiddleware(api.PromoteManager)) - api.router.GET("/manager/demote/:id", LogRequestMiddleware(api.DemoteManager)) - api.router.POST("/manager/set_role_for_project/:id", LogRequestMiddleware(api.SetManagerRoleOnProject)) + api.router.POST("/register", Middleware(api.Register)) + api.router.POST("/login", Middleware(api.Login)) + api.router.GET("/logout", Middleware(api.Logout)) + api.router.GET("/account", Middleware(api.GetAccountDetails)) + api.router.GET("/manager/list", Middleware(api.GetManagerList)) + api.router.GET("/manager/list_for_project/:id", Middleware(api.GetManagerListWithRoleOn)) + api.router.GET("/manager/promote/:id", Middleware(api.PromoteManager)) + api.router.GET("/manager/demote/:id", Middleware(api.DemoteManager)) + api.router.POST("/manager/set_role_for_project/:id", Middleware(api.SetManagerRoleOnProject)) api.router.NotFound = func(ctx *fasthttp.RequestCtx) { diff --git a/api/models.go b/api/models.go index efc253f..aaa6934 100644 --- a/api/models.go +++ b/api/models.go @@ -196,13 +196,13 @@ type GetWorkerAccessListForProjectResponse struct { type SubmitTaskRequest struct { Project int64 `json:"project"` - MaxRetries int64 `json:"max_retries"` + MaxRetries int16 `json:"max_retries"` Recipe string `json:"recipe"` - Priority int64 `json:"priority"` + Priority int16 `json:"priority"` MaxAssignTime int64 `json:"max_assign_time"` Hash64 int64 `json:"hash_u64"` UniqueString string `json:"unique_string"` - VerificationCount int64 `json:"verification_count"` + VerificationCount int16 `json:"verification_count"` } func (req *SubmitTaskRequest) IsValid() bool { @@ -212,7 +212,7 @@ func (req *SubmitTaskRequest) IsValid() bool { if len(req.Recipe) <= 0 { return false } - if req.Hash64 != 0 && req.UniqueString != "" { + if req.Hash64 != 0 && len(req.UniqueString) != 0 { return false } if req.Project == 0 { diff --git a/api/project.go b/api/project.go index b4f44d9..fcfc574 100644 --- a/api/project.go +++ b/api/project.go @@ -165,6 +165,12 @@ func (api *WebAPI) UpdateProject(r *Request) { return } + if updateReq.AssignRate == 0 { + updateReq.AssignRate = rate.Inf + } + if updateReq.SubmitRate == 0 { + updateReq.SubmitRate = rate.Inf + } project := &storage.Project{ Id: id, Name: updateReq.Name, diff --git a/api/task.go b/api/task.go index 623386f..b64a5d6 100644 --- a/api/task.go +++ b/api/task.go @@ -33,6 +33,10 @@ func (api *WebAPI) SubmitTask(r *Request) { return } + if createReq.VerificationCount == 0 { + createReq.VerificationCount = 1 + } + if !createReq.IsValid() { logrus.WithFields(logrus.Fields{ "req": createReq, @@ -46,7 +50,7 @@ func (api *WebAPI) SubmitTask(r *Request) { task := &storage.Task{ MaxRetries: createReq.MaxRetries, - Recipe: createReq.Recipe, + Recipe: string(createReq.Recipe), Priority: createReq.Priority, AssignTime: 0, MaxAssignTime: createReq.MaxAssignTime, @@ -72,7 +76,7 @@ func (api *WebAPI) SubmitTask(r *Request) { return } - if createReq.UniqueString != "" { + if len(createReq.UniqueString) != 0 { createReq.Hash64 = int64(siphash.Hash(1, 2, []byte(createReq.UniqueString))) } @@ -135,14 +139,17 @@ func (api *WebAPI) BulkSubmitTask(r *Request) { return } - if req.UniqueString != "" { + if len(req.UniqueString) != 0 { req.Hash64 = int64(siphash.Hash(1, 2, []byte(req.UniqueString))) } + if req.VerificationCount == 0 { + req.VerificationCount = 1 + } saveRequests[i] = storage.SaveTaskRequest{ Task: &storage.Task{ MaxRetries: req.MaxRetries, - Recipe: req.Recipe, + Recipe: string(req.Recipe), Priority: req.Priority, AssignTime: 0, MaxAssignTime: req.MaxAssignTime, @@ -184,6 +191,8 @@ func (api *WebAPI) BulkSubmitTask(r *Request) { return } + logrus.Info(saveErrors) + r.OkJson(JsonResponse{ Ok: true, }) @@ -263,7 +272,7 @@ func (api *WebAPI) validateSecret(r *Request) (*storage.Worker, error) { if widStr == "" { return nil, errors.New("worker id not specified") } - if bytes.Equal(secretHeader, []byte("")) { + if len(secretHeader) == 0 { return nil, errors.New("secret is not specified") } @@ -290,12 +299,6 @@ func (api *WebAPI) validateSecret(r *Request) (*storage.Worker, error) { secretLen, _ := base64.StdEncoding.Decode(secret, secretHeader) matches := bytes.Equal(worker.Secret, secret[:secretLen]) - logrus.WithFields(logrus.Fields{ - "expected": string(worker.Secret), - "header": string(secretHeader), - "matches": matches, - }).Trace("Validating Worker secret") - if !matches { return nil, errors.New("invalid secret") } diff --git a/client/models.go b/client/models.go index 3bf1c0e..e2e66d3 100644 --- a/client/models.go +++ b/client/models.go @@ -21,7 +21,19 @@ type AssignTaskResponse struct { Message string `json:"message"` RateLimitDelay float64 `json:"rate_limit_delay,omitempty"` Content struct { - Task *storage.Task `json:"task"` + Task *struct { + Id int64 `json:"id"` + Priority int16 `json:"priority"` + Project *storage.Project `json:"project"` + Assignee int64 `json:"assignee"` + Retries int16 `json:"retries"` + MaxRetries int64 `json:"max_retries"` + Status storage.TaskStatus `json:"status"` + Recipe string `json:"recipe"` + MaxAssignTime int64 `json:"max_assign_time"` + AssignTime int64 `json:"assign_time"` + VerificationCount int16 `json:"verification_count"` + } `json:"task"` } `json:"content"` } diff --git a/config.yml b/config.yml old mode 100755 new mode 100644 diff --git a/jenkins/Jenkinsfile b/jenkins/Jenkinsfile index 1c855ab..44934b4 100644 --- a/jenkins/Jenkinsfile +++ b/jenkins/Jenkinsfile @@ -7,6 +7,7 @@ remote.knownHosts = '/var/lib/jenkins/.ssh/known_hosts' remote.allowAnyHosts = true remote.retryCount = 3 remote.retryWaitSec = 3 +remote.port = 2299 logLevel = 'FINER' pipeline { diff --git a/jenkins/deploy.sh b/jenkins/deploy.sh old mode 100755 new mode 100644 diff --git a/main/main.go b/main/main.go index 2be616d..755ed5d 100644 --- a/main/main.go +++ b/main/main.go @@ -3,18 +3,10 @@ package main import ( "github.com/simon987/task_tracker/api" "github.com/simon987/task_tracker/config" - //"github.com/simon987/task_tracker/storage" "math/rand" "time" ) -func tmpDebugSetup() { - - //db := storage.Database{} - //db.Reset() - -} - func main() { rand.Seed(time.Now().UTC().UnixNano()) @@ -22,6 +14,5 @@ func main() { webApi := api.New() webApi.SetupLogger() - tmpDebugSetup() webApi.Run() } diff --git a/schema.sql b/schema.sql old mode 100755 new mode 100644 index a4115e7..4af5afa --- a/schema.sql +++ b/schema.sql @@ -41,6 +41,8 @@ CREATE TABLE worker_access request boolean, primary key (worker, project) ); +CREATE INDEX worker_index ON worker_access (worker); +CREATE INDEX project_index ON worker_access (project); CREATE TABLE task ( @@ -50,22 +52,28 @@ CREATE TABLE task assignee INTEGER REFERENCES worker (id), max_assign_time INTEGER DEFAULT 0, assign_time INTEGER DEFAULT NULL, - verification_count INTEGER DEFAULT 0, + verification_count SMALLINT DEFAULT 0, priority SMALLINT DEFAULT 0, retries SMALLINT DEFAULT 0, max_retries SMALLINT, status SMALLINT DEFAULT 1, - recipe TEXT, - UNIQUE (project, hash64) + recipe TEXT ); +CREATE INDEX priority_desc_index ON task (priority DESC); +CREATE INDEX assignee_index ON task (assignee); +CREATE INDEX verifcnt_index ON task (verification_count); +CREATE UNIQUE INDEX project_hash_unique ON task (project, hash64); + CREATE TABLE worker_verifies_task ( - verification_hash BIGINT NOT NULL, - task BIGINT REFERENCES task (id) ON DELETE CASCADE NOT NULL, - worker INT REFERENCES worker (id) NOT NULL + verification_hash BIGINT NOT NULL, + task INT REFERENCES task (id) ON DELETE CASCADE NOT NULL, + worker INT REFERENCES worker (id) NOT NULL ); +CREATE INDEX task_index ON worker_verifies_task (task); + CREATE TABLE log_entry ( level INTEGER NOT NULL, @@ -150,7 +158,7 @@ $$ DECLARE res INT = NULL; BEGIN - DELETE FROM task WHERE id = tid AND assignee = wid AND verification_count < 2 RETURNING project INTO res; + DELETE FROM task WHERE id = tid AND assignee = wid AND verification_count = 1 RETURNING project INTO res; IF res IS NULL THEN INSERT INTO worker_verifies_task (worker, verification_hash, task) @@ -171,7 +179,7 @@ BEGIN LIMIT 1) >= task.verification_count RETURNING task.id INTO res; IF res IS NULL THEN - UPDATE task SET assignee= NULL WHERE id = tid AND assignee = wid; + UPDATE task SET assignee=NULL WHERE id = tid AND assignee = wid; end if; end if; diff --git a/storage/database.go b/storage/database.go index ca4899c..3e49d81 100644 --- a/storage/database.go +++ b/storage/database.go @@ -14,7 +14,13 @@ type Database struct { db *sql.DB saveTaskStmt *sql.Stmt - workerCache map[int64]*Worker + workerCache map[int64]*Worker + projectCache map[int64]*Project + + // [worker][project] + submitAccessCache map[int64]map[int64]bool + assignAccessCache map[int64]map[int64]bool + assignMutex *sync.Mutex } @@ -22,6 +28,9 @@ func New() *Database { d := Database{} d.workerCache = make(map[int64]*Worker) + d.projectCache = make(map[int64]*Project) + d.assignAccessCache = make(map[int64]map[int64]bool) + d.submitAccessCache = make(map[int64]map[int64]bool) d.assignMutex = &sync.Mutex{} d.init() @@ -68,10 +77,12 @@ func (database *Database) getDB() *sql.DB { db.SetMaxOpenConns(50) database.db = db - } else { - err := database.db.Ping() - handleErr(err) } return database.db } + +func (database *Database) invalidateAccessCache(worker int64) { + delete(database.submitAccessCache, worker) + delete(database.assignAccessCache, worker) +} diff --git a/storage/project.go b/storage/project.go index 57fb7bd..97e14d5 100644 --- a/storage/project.go +++ b/storage/project.go @@ -55,11 +55,17 @@ func (database *Database) SaveProject(project *Project, webhookSecret string) (i "project": project, }).Trace("Database.saveProject INSERT project") + database.projectCache[id] = project + return id, nil } func (database *Database) GetProject(id int64) *Project { + if database.projectCache[id] != nil { + return database.projectCache[id] + } + db := database.getDB() row := db.QueryRow(`SELECT id, priority, name, clone_url, git_repo, version, motd, public, hidden, COALESCE(chain, 0), paused, assign_rate, submit_rate @@ -76,7 +82,9 @@ func (database *Database) GetProject(id int64) *Project { logrus.WithFields(logrus.Fields{ "id": id, "project": project, - }).Trace("Database.saveProject SELECT project") + }).Trace("Database.getProject SELECT project") + + database.projectCache[id] = project return project } @@ -132,6 +140,8 @@ func (database *Database) UpdateProject(project *Project) error { "rowsAffected": rowsAffected, }).Trace("Database.updateProject UPDATE project") + database.projectCache[project.Id] = project + return nil } diff --git a/storage/task.go b/storage/task.go index 7c3fe83..7ec83ea 100644 --- a/storage/task.go +++ b/storage/task.go @@ -1,23 +1,24 @@ package storage import ( + "database/sql" "errors" - "fmt" + "github.com/lib/pq" "github.com/sirupsen/logrus" ) type Task struct { Id int64 `json:"id"` - Priority int64 `json:"priority"` + Priority int16 `json:"priority"` Project *Project `json:"project"` Assignee int64 `json:"assignee"` - Retries int64 `json:"retries"` - MaxRetries int64 `json:"max_retries"` + Retries int16 `json:"retries"` + MaxRetries int16 `json:"max_retries"` Status TaskStatus `json:"status"` Recipe string `json:"recipe"` MaxAssignTime int64 `json:"max_assign_time"` AssignTime int64 `json:"assign_time"` - VerificationCount int64 `json:"verification_count"` + VerificationCount int16 `json:"verification_count"` } type TaskStatus int @@ -42,16 +43,62 @@ type SaveTaskRequest struct { WorkerId int64 } -func (database *Database) SaveTask(task *Task, project int64, hash64 int64, wid int64) error { +func (database *Database) checkAccess(workerId, projectId int64, assign, submit bool) bool { + + if database.submitAccessCache[workerId] == nil { + database.submitAccessCache[workerId] = make(map[int64]bool) + database.assignAccessCache[workerId] = make(map[int64]bool) + } else { + _, ok := database.submitAccessCache[workerId][projectId] + if ok { + if assign && !database.assignAccessCache[workerId][projectId] { + return false + } + if submit && !database.submitAccessCache[workerId][projectId] { + return false + } + return true + } + } db := database.getDB() - res, err := db.Exec(fmt.Sprintf(` - INSERT INTO task (project, max_retries, recipe, priority, max_assign_time, hash64,verification_count) - SELECT $1,$2,$3,$4,$5,NULLIF(%d, 0),$6 FROM worker_access - WHERE role_submit AND NOT request AND worker=$7 AND project=$1`, hash64), - project, task.MaxRetries, task.Recipe, task.Priority, task.MaxAssignTime, task.VerificationCount, - wid) + row := db.QueryRow(`SELECT role_assign, role_submit FROM worker_access + WHERE worker=$1 and project=$2 AND NOT request`, + workerId, projectId) + + var hasAssign, hasSubmit bool + err := row.Scan(&hasAssign, &hasSubmit) + + database.submitAccessCache[workerId][projectId] = hasSubmit + database.assignAccessCache[workerId][projectId] = hasAssign + + if err != nil { + return false + } + if !hasAssign && assign { + return false + } + if !hasSubmit && submit { + return false + } + + return true +} + +func (database *Database) SaveTask(task *Task, project int64, hash64 int64, wid int64) error { + + if !database.checkAccess(wid, project, false, true) { + return errors.New("unauthorized task submit") + } + + db := database.getDB() + + _, err := db.Exec(`INSERT INTO task + (project, max_retries, recipe, priority, max_assign_time, hash64, verification_count) + VALUES ($1,$2,$3,$4,$5,$6,$7)`, + project, task.MaxRetries, task.Recipe, task.Priority, task.MaxAssignTime, + makeNullableInt(hash64), task.VerificationCount) if err != nil { logrus.WithError(err).WithFields(logrus.Fields{ "task": task, @@ -59,45 +106,55 @@ func (database *Database) SaveTask(task *Task, project int64, hash64 int64, wid return err } - rowsAffected, err := res.RowsAffected() - handleErr(err) - - logrus.WithFields(logrus.Fields{ - "rowsAffected": rowsAffected, - "task": task, - }).Trace("Database.saveTask INSERT task") - - if rowsAffected == 0 { - return errors.New("unauthorized task submit") - } - return nil } +func makeNullableInt(i int64) sql.NullInt64 { + if i == 0 { + return sql.NullInt64{Valid: false} + } else { + return sql.NullInt64{ + Valid: true, + Int64: i, + } + } +} + func (database Database) BulkSaveTask(bulkSaveTaskReqs []SaveTaskRequest) []error { + if !database.checkAccess(bulkSaveTaskReqs[0].WorkerId, bulkSaveTaskReqs[0].Project, + false, true) { + return []error{errors.New("unauthorized task submit")} + } + db := database.getDB() + txn, err := db.Begin() + handleErr(err) + errs := make([]error, len(bulkSaveTaskReqs)) - for i, req := range bulkSaveTaskReqs { - res, err := db.Exec(fmt.Sprintf(` - INSERT INTO task (project, max_retries, recipe, priority, max_assign_time, hash64,verification_count) - SELECT $1,$2,$3,$4,$5,NULLIF(%d, 0),$6 FROM worker_access - WHERE role_submit AND NOT request AND worker=$7 AND project=$1`, req.Hash64), - req.Project, req.Task.MaxRetries, req.Task.Recipe, req.Task.Priority, - req.Task.MaxAssignTime, req.Task.VerificationCount, - req.WorkerId) - errs[i] = err + stmt, _ := txn.Prepare(pq.CopyIn( + "task", + "project", "max_retries", "recipe", "priority", + "max_assign_time", "hash64", "verification_count", + )) - if res != nil { - rowsAffected, _ := res.RowsAffected() - if rowsAffected == 0 { - errs[i] = errors.New("unauthorized task submit") - } + for i, req := range bulkSaveTaskReqs { + _, err = stmt.Exec(req.Project, req.Task.MaxRetries, req.Task.Recipe, + req.Task.Priority, req.Task.MaxAssignTime, makeNullableInt(req.Hash64), + req.Task.VerificationCount) + if err != nil { + errs[i] = err } } + _, err = stmt.Exec() + err = stmt.Close() + handleErr(err) + err = txn.Commit() + handleErr(err) + return errs } @@ -107,7 +164,7 @@ func (database Database) ReleaseTask(id int64, workerId int64, result TaskResult var taskUpdated bool if result == TR_OK { - row := db.QueryRow(fmt.Sprintf(`SELECT release_task_ok(%d,%d,%d)`, workerId, id, verification)) + row := db.QueryRow(`SELECT release_task_ok($1,$2,$3)`, workerId, id, verification) err := row.Scan(&taskUpdated) if err != nil { @@ -128,13 +185,6 @@ func (database Database) ReleaseTask(id int64, workerId int64, result TaskResult taskUpdated = rowsAffected == 1 } - logrus.WithFields(logrus.Fields{ - "taskUpdated": taskUpdated, - "taskId": id, - "workerId": workerId, - "verification": verification, - }).Trace("Database.ReleaseTask") - return taskUpdated } @@ -161,35 +211,21 @@ func (database *Database) GetTaskFromProject(worker *Worker, projectId int64) *T ORDER BY task.priority DESC LIMIT 1 ) - RETURNING task.id`, worker.Id, projectId) + RETURNING task.id, task.priority, assignee, retries, max_retries, + status, recipe, max_assign_time, assign_time, verification_count`, worker.Id, projectId) - var id int64 - err := row.Scan(&id) database.assignMutex.Unlock() + task := &Task{} + err := row.Scan(&task.Id, &task.Priority, &task.Assignee, + &task.Retries, &task.MaxRetries, &task.Status, &task.Recipe, &task.MaxAssignTime, + &task.AssignTime, &task.VerificationCount) + if err != nil { return nil } - row = db.QueryRow(` - SELECT task.id, task.priority, task.project, assignee, retries, max_retries, - status, recipe, max_assign_time, assign_time, verification_count, project.priority, project.name, - project.clone_url, project.git_repo, project.version, project.motd, project.public, COALESCE(project.chain,0), - project.assign_rate, project.submit_rate - FROM task - INNER JOIN project project ON task.project = project.id - WHERE task.id=$1`, id) - - project := &Project{} - task := &Task{} - task.Project = project - - err = row.Scan(&task.Id, &task.Priority, &project.Id, &task.Assignee, - &task.Retries, &task.MaxRetries, &task.Status, &task.Recipe, &task.MaxAssignTime, - &task.AssignTime, &task.VerificationCount, &project.Priority, &project.Name, - &project.CloneUrl, &project.GitRepo, &project.Version, &project.Motd, &project.Public, - &project.Chain, &project.AssignRate, &project.SubmitRate) - handleErr(err) + task.Project = database.GetProject(projectId) return task } diff --git a/storage/worker.go b/storage/worker.go index c4feffe..b0c97e9 100644 --- a/storage/worker.go +++ b/storage/worker.go @@ -71,31 +71,6 @@ func (database *Database) GetWorker(id int64) *Worker { return worker } -func (database *Database) GrantAccess(workerId int64, projectId int64) bool { - - db := database.getDB() - res, err := db.Exec(`UPDATE worker_access SET - request=FALSE WHERE worker=$1 AND project=$2`, - workerId, projectId) - if err != nil { - logrus.WithFields(logrus.Fields{ - "workerId": workerId, - "projectId": projectId, - }).WithError(err).Warn("Database.GrantAccess INSERT") - return false - } - - rowsAffected, _ := res.RowsAffected() - - logrus.WithFields(logrus.Fields{ - "rowsAffected": rowsAffected, - "workerId": workerId, - "projectId": projectId, - }).Trace("Database.GrantAccess INSERT") - - return rowsAffected == 1 -} - func (database *Database) UpdateWorker(worker *Worker) bool { db := database.getDB() @@ -139,6 +114,7 @@ func (database *Database) SaveAccessRequest(wa *WorkerAccess) bool { func (database *Database) AcceptAccessRequest(worker int64, projectId int64) bool { db := database.getDB() + database.invalidateAccessCache(worker) res, err := db.Exec(`UPDATE worker_access SET request=FALSE WHERE worker=$1 AND project=$2`, worker, projectId) @@ -156,6 +132,8 @@ func (database *Database) AcceptAccessRequest(worker int64, projectId int64) boo func (database *Database) RejectAccessRequest(workerId int64, projectId int64) bool { db := database.getDB() + database.invalidateAccessCache(workerId) + res, err := db.Exec(`DELETE FROM worker_access WHERE worker=$1 AND project=$2`, workerId, projectId) handleErr(err) diff --git a/test/api_project_test.go b/test/api_project_test.go index 8737d49..9cc2cdd 100644 --- a/test/api_project_test.go +++ b/test/api_project_test.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/simon987/task_tracker/api" "github.com/simon987/task_tracker/storage" + "golang.org/x/time/rate" "io/ioutil" "net/http" "testing" @@ -136,7 +137,7 @@ func TestUpdateProjectValid(t *testing.T) { Hidden: true, Paused: true, AssignRate: 1, - SubmitRate: 2, + SubmitRate: 0, Version: "VersionB", }, pid, testAdminCtx) @@ -173,7 +174,7 @@ func TestUpdateProjectValid(t *testing.T) { if proj.Project.AssignRate != 1 { t.Error() } - if proj.Project.SubmitRate != 2 { + if proj.Project.SubmitRate != rate.Inf { t.Error() } } diff --git a/test/api_task_bench_test.go b/test/api_task_bench_test.go index 0dde35b..967525b 100644 --- a/test/api_task_bench_test.go +++ b/test/api_task_bench_test.go @@ -2,6 +2,7 @@ package test import ( "github.com/simon987/task_tracker/api" + "github.com/simon987/task_tracker/storage" "strconv" "testing" ) @@ -17,6 +18,13 @@ func BenchmarkCreateTaskRemote(b *testing.B) { worker := genWid() + requestAccess(api.CreateWorkerAccessRequest{ + Submit: true, + Assign: false, + Project: resp.Content.Id, + }, worker) + acceptAccessRequest(resp.Content.Id, worker.Id, testAdminCtx) + b.ResetTimer() for i := 0; i < b.N; i++ { createTask(api.SubmitTaskRequest{ @@ -27,3 +35,36 @@ func BenchmarkCreateTaskRemote(b *testing.B) { }, worker) } } + +func BenchmarkCreateTask(b *testing.B) { + + resp := createProjectAsAdmin(api.CreateProjectRequest{ + Name: "BenchmarkCreateTask" + strconv.Itoa(b.N), + GitRepo: "benchmark_test" + strconv.Itoa(b.N), + Version: "f09e8c9r0w839x0c43", + CloneUrl: "http://localhost", + }) + + worker := genWid() + + requestAccess(api.CreateWorkerAccessRequest{ + Submit: true, + Assign: false, + Project: resp.Content.Id, + }, worker) + acceptAccessRequest(resp.Content.Id, worker.Id, testAdminCtx) + + db := storage.New() + + b.ResetTimer() + + p := db.GetProject(resp.Content.Id) + for i := 0; i < b.N; i++ { + db.SaveTask(&storage.Task{ + Project: p, + Priority: 0, + Recipe: "{}", + MaxRetries: 1, + }, resp.Content.Id, 0, worker.Id) + } +} diff --git a/test/api_task_test.go b/test/api_task_test.go index 9287cb6..c9d182b 100644 --- a/test/api_task_test.go +++ b/test/api_task_test.go @@ -405,6 +405,7 @@ func TestReleaseTaskSuccess(t *testing.T) { Project: pid, Recipe: "{}", MaxRetries: 3, + Hash64: math.MaxInt64, }, worker) task := getTaskFromProject(pid, worker).Content.Task @@ -923,25 +924,20 @@ func TestTaskSubmitInvalidDoesntGiveRateLimit(t *testing.T) { func TestBulkTaskSubmitValid(t *testing.T) { - proj := createProjectAsAdmin(api.CreateProjectRequest{ - Name: "testbulksubmit", - CloneUrl: "testbulkprojectsubmit", - GitRepo: "testbulkprojectsubmit", - }).Content.Id - r := bulkSubmitTask(api.BulkSubmitTaskRequest{ Requests: []api.SubmitTaskRequest{ { Recipe: "1234", - Project: proj, + Project: testProject, }, { Recipe: "1234", - Project: proj, + Project: testProject, }, { Recipe: "1234", - Project: proj, + Project: testProject, + Hash64: 8565956259293726066, }, }, }, testWorker) @@ -1015,6 +1011,44 @@ func TestBulkTaskSubmitInvalid2(t *testing.T) { } } +func TestTaskGetUnauthorizedWithCache(t *testing.T) { + + pid := createProjectAsAdmin(api.CreateProjectRequest{ + Name: "testtaskgetunauthorizedcache", + GitRepo: "testtaskgetunauthorizedcache", + CloneUrl: "testtaskgettunauthorizedcache", + }).Content.Id + + w := genWid() + + requestAccess(api.CreateWorkerAccessRequest{ + Project: pid, + Submit: true, + Assign: true, + }, w) + acceptAccessRequest(pid, w.Id, testAdminCtx) + + r1 := createTask(api.SubmitTaskRequest{ + Project: pid, + Recipe: "ssss", + }, w) + + // removed access, cache should be invalidated + rejectAccessRequest(pid, w.Id, testAdminCtx) + + r2 := createTask(api.SubmitTaskRequest{ + Project: pid, + Recipe: "ssss", + }, w) + + if r1.Ok != true { + t.Error() + } + if r2.Ok != false { + t.Error() + } +} + func bulkSubmitTask(request api.BulkSubmitTaskRequest, worker *storage.Worker) (ar api.JsonResponse) { r := Post("/task/bulk_submit", request, worker, nil) UnmarshalResponse(r, &ar) diff --git a/test/schema.sql b/test/schema.sql old mode 100755 new mode 100644 index 57b14fd..4af5afa --- a/test/schema.sql +++ b/test/schema.sql @@ -41,6 +41,8 @@ CREATE TABLE worker_access request boolean, primary key (worker, project) ); +CREATE INDEX worker_index ON worker_access (worker); +CREATE INDEX project_index ON worker_access (project); CREATE TABLE task ( @@ -50,22 +52,28 @@ CREATE TABLE task assignee INTEGER REFERENCES worker (id), max_assign_time INTEGER DEFAULT 0, assign_time INTEGER DEFAULT NULL, - verification_count INTEGER DEFAULT 0, + verification_count SMALLINT DEFAULT 0, priority SMALLINT DEFAULT 0, retries SMALLINT DEFAULT 0, max_retries SMALLINT, status SMALLINT DEFAULT 1, - recipe TEXT, - UNIQUE (project, hash64) + recipe TEXT ); +CREATE INDEX priority_desc_index ON task (priority DESC); +CREATE INDEX assignee_index ON task (assignee); +CREATE INDEX verifcnt_index ON task (verification_count); +CREATE UNIQUE INDEX project_hash_unique ON task (project, hash64); + CREATE TABLE worker_verifies_task ( - verification_hash BIGINT NOT NULL, - task BIGINT REFERENCES task (id) ON DELETE CASCADE NOT NULL, - worker INT REFERENCES worker (id) NOT NULL + verification_hash BIGINT NOT NULL, + task INT REFERENCES task (id) ON DELETE CASCADE NOT NULL, + worker INT REFERENCES worker (id) NOT NULL ); +CREATE INDEX task_index ON worker_verifies_task (task); + CREATE TABLE log_entry ( level INTEGER NOT NULL, @@ -150,7 +158,7 @@ $$ DECLARE res INT = NULL; BEGIN - DELETE FROM task WHERE id = tid AND assignee = wid AND verification_count < 2 RETURNING project INTO res; + DELETE FROM task WHERE id = tid AND assignee = wid AND verification_count = 1 RETURNING project INTO res; IF res IS NULL THEN INSERT INTO worker_verifies_task (worker, verification_hash, task) @@ -171,11 +179,10 @@ BEGIN LIMIT 1) >= task.verification_count RETURNING task.id INTO res; IF res IS NULL THEN - UPDATE task SET assignee= NULL WHERE id = tid AND assignee = wid; + UPDATE task SET assignee=NULL WHERE id = tid AND assignee = wid; end if; end if; RETURN res IS NOT NULL; END; $$ LANGUAGE 'plpgsql'; - diff --git a/web/angular/e2e/protractor.conf.js b/web/angular/e2e/protractor.conf.js old mode 100755 new mode 100644 diff --git a/web/angular/e2e/src/app.e2e-spec.ts b/web/angular/e2e/src/app.e2e-spec.ts old mode 100755 new mode 100644 diff --git a/web/angular/e2e/src/app.po.ts b/web/angular/e2e/src/app.po.ts old mode 100755 new mode 100644 diff --git a/web/angular/e2e/tsconfig.e2e.json b/web/angular/e2e/tsconfig.e2e.json old mode 100755 new mode 100644 diff --git a/web/angular/package-lock.json b/web/angular/package-lock.json index c890de8..7a7bf84 100644 --- a/web/angular/package-lock.json +++ b/web/angular/package-lock.json @@ -4323,9 +4323,9 @@ } }, "fstream": { - "version": "1.0.11", - "resolved": "https://registry.npmjs.org/fstream/-/fstream-1.0.11.tgz", - "integrity": "sha1-XB+x8RdHcRTwYyoOtLcbPLD9MXE=", + "version": "1.0.12", + "resolved": "https://registry.npmjs.org/fstream/-/fstream-1.0.12.tgz", + "integrity": "sha512-WvJ193OHa0GHPEL+AycEJgxvBEwyfRkN1vhjca23OaPVMCaLCXTd5qAu82AjTcgP1UJmytkOKb63Ypde7raDIg==", "dev": true, "optional": true, "requires": { @@ -6050,9 +6050,9 @@ } }, "lodash": { - "version": "4.17.11", - "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.11.tgz", - "integrity": "sha512-cQKh8igo5QUhZ7lg38DYWAxMvjSAKG0A8wGSVimP07SIUEK2UO+arSRKbRZWtelMtN5V0Hkwh5ryOto/SshYIg==" + "version": "4.17.15", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.15.tgz", + "integrity": "sha512-8xOcRHvCjnocdS5cpwXQXVzmmh5e5+saE2QGoeQmbKmRS6J3VQppPOIt0MnmE+4xlZoumy0GPG0D0MVIQbNA1A==" }, "lodash.clonedeep": { "version": "4.5.0", @@ -6519,9 +6519,9 @@ } }, "mixin-deep": { - "version": "1.3.1", - "resolved": "https://registry.npmjs.org/mixin-deep/-/mixin-deep-1.3.1.tgz", - "integrity": "sha512-8ZItLHeEgaqEvd5lYBXfm4EZSFCX29Jb9K+lAHhDKzReKBQKj3R+7NOF6tjqYi9t4oI8VUfaWITJQm86wnXGNQ==", + "version": "1.3.2", + "resolved": "https://registry.npmjs.org/mixin-deep/-/mixin-deep-1.3.2.tgz", + "integrity": "sha512-WRoDn//mXBiJ1H40rqa3vH0toePwSsGb45iInWlTySa+Uu4k3tYUSxa2v1KqAiLtvlrSzaExqS1gtk96A9zvEA==", "dev": true, "requires": { "for-in": "^1.0.2", @@ -8514,9 +8514,9 @@ "dev": true }, "set-value": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/set-value/-/set-value-2.0.0.tgz", - "integrity": "sha512-hw0yxk9GT/Hr5yJEYnHNKYXkIA8mVJgd9ditYZCe16ZczcaELYYcfvaXesNACk2O8O0nTiPQcQhGUQj8JLzeeg==", + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/set-value/-/set-value-2.0.1.tgz", + "integrity": "sha512-JxHc1weCN68wRY0fhCoXpyK55m/XPHafOmK4UWD7m2CI14GMcFypt4w/0+NV5f/ZMby2F6S2wwA7fgynh9gWSw==", "dev": true, "requires": { "extend-shallow": "^2.0.1", @@ -9366,14 +9366,14 @@ "dev": true }, "tar": { - "version": "2.2.1", - "resolved": "https://registry.npmjs.org/tar/-/tar-2.2.1.tgz", - "integrity": "sha1-jk0qJWwOIYXGsYrWlK7JaLg8sdE=", + "version": "2.2.2", + "resolved": "https://registry.npmjs.org/tar/-/tar-2.2.2.tgz", + "integrity": "sha512-FCEhQ/4rE1zYv9rYXJw/msRqsnmlje5jHP6huWeBZ704jUTy02c5AZyWujpMR1ax6mVw9NyJMfuK2CMDWVIfgA==", "dev": true, "optional": true, "requires": { "block-stream": "*", - "fstream": "^1.0.2", + "fstream": "^1.0.12", "inherits": "2" } }, @@ -9881,38 +9881,15 @@ "dev": true }, "union-value": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/union-value/-/union-value-1.0.0.tgz", - "integrity": "sha1-XHHDTLW61dzr4+oM0IIHulqhrqQ=", + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/union-value/-/union-value-1.0.1.tgz", + "integrity": "sha512-tJfXmxMeWYnczCVs7XAEvIV7ieppALdyepWMkHkwciRpZraG/xwT+s2JN8+pr1+8jCRf80FFzvr+MpQeeoF4Xg==", "dev": true, "requires": { "arr-union": "^3.1.0", "get-value": "^2.0.6", "is-extendable": "^0.1.1", - "set-value": "^0.4.3" - }, - "dependencies": { - "extend-shallow": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/extend-shallow/-/extend-shallow-2.0.1.tgz", - "integrity": "sha1-Ua99YUrZqfYQ6huvu5idaxxWiQ8=", - "dev": true, - "requires": { - "is-extendable": "^0.1.0" - } - }, - "set-value": { - "version": "0.4.3", - "resolved": "https://registry.npmjs.org/set-value/-/set-value-0.4.3.tgz", - "integrity": "sha1-fbCPnT0i3H945Trzw79GZuzfzPE=", - "dev": true, - "requires": { - "extend-shallow": "^2.0.1", - "is-extendable": "^0.1.1", - "is-plain-object": "^2.0.1", - "to-object-path": "^0.3.0" - } - } + "set-value": "^2.0.1" } }, "unique-filename": { diff --git a/web/angular/package.json b/web/angular/package.json index 7ef3c8f..17bbfa8 100644 --- a/web/angular/package.json +++ b/web/angular/package.json @@ -25,7 +25,7 @@ "@ngx-translate/http-loader": "^4.0.0", "chart.js": "^2.7.3", "core-js": "^2.5.4", - "lodash": "^4.17.11", + "lodash": "^4.17.15", "moment": "^2.23.0", "rxjs": "~6.3.3", "tslib": "^1.9.0", diff --git a/web/angular/src/app/api.service.ts b/web/angular/src/app/api.service.ts old mode 100755 new mode 100644 diff --git a/web/angular/src/app/app-routing.module.ts b/web/angular/src/app/app-routing.module.ts old mode 100755 new mode 100644 diff --git a/web/angular/src/app/app.component.css b/web/angular/src/app/app.component.css old mode 100755 new mode 100644 diff --git a/web/angular/src/app/app.component.html b/web/angular/src/app/app.component.html old mode 100755 new mode 100644 diff --git a/web/angular/src/app/app.module.ts b/web/angular/src/app/app.module.ts old mode 100755 new mode 100644 diff --git a/web/angular/src/app/project-list/project-list.component.css b/web/angular/src/app/project-list/project-list.component.css old mode 100755 new mode 100644 diff --git a/web/angular/src/app/project-list/project-list.component.html b/web/angular/src/app/project-list/project-list.component.html old mode 100755 new mode 100644 diff --git a/web/angular/src/app/project-list/project-list.component.ts b/web/angular/src/app/project-list/project-list.component.ts old mode 100755 new mode 100644 diff --git a/web/angular/src/assets/i18n/en.json b/web/angular/src/assets/i18n/en.json index 8cc5e4d..39e2e9e 100644 --- a/web/angular/src/assets/i18n/en.json +++ b/web/angular/src/assets/i18n/en.json @@ -115,7 +115,7 @@ "subtitle": "Real-time data for all projects", "paused": "(paused)", "pause": "Pause", - "manage": "Manager workers" + "manage": "Manage workers" }, "perms": { "title": "Project permissions",