diff --git a/storage/database.go b/storage/database.go index da92700..3951c52 100644 --- a/storage/database.go +++ b/storage/database.go @@ -7,6 +7,7 @@ import ( "github.com/simon987/task_tracker/config" "io/ioutil" "os" + "sync" ) type Database struct { @@ -14,12 +15,14 @@ type Database struct { saveTaskStmt *sql.Stmt workerCache map[int64]*Worker + assignMutex *sync.Mutex } func New() *Database { d := Database{} d.workerCache = make(map[int64]*Worker) + d.assignMutex = &sync.Mutex{} d.init() diff --git a/storage/task.go b/storage/task.go index 51f8474..393cd12 100644 --- a/storage/task.go +++ b/storage/task.go @@ -106,42 +106,46 @@ func (database *Database) GetTaskFromProject(worker *Worker, projectId int64) *T db := database.getDB() - row := db.QueryRow(` - UPDATE task - SET assignee=$1, assign_time=extract(epoch from now() at time zone 'utc') - WHERE id IN - ( - SELECT task.id - FROM task - INNER JOIN project on task.project = project.id - LEFT JOIN worker_verifies_task wvt on task.id = wvt.task AND wvt.worker=$1 - WHERE NOT project.paused AND assignee IS NULL AND project.id=$2 AND status=1 - AND (project.public OR ( - SELECT a.role_assign and not a.request FROM worker_access a WHERE a.worker=$1 AND a.project=$2 - )) - AND wvt.task IS NULL - ORDER BY task.priority DESC - LIMIT 1 - ) - RETURNING id`, worker.Id, projectId) - var id int64 + database.assignMutex.Lock() + row := db.QueryRow(` + UPDATE task + SET assignee=$1, assign_time=extract(epoch from now() at time zone 'utc') + WHERE task.id = ( + SELECT task.id + FROM task + INNER JOIN project on task.project = project.id AND project.id=$2 AND not paused + LEFT JOIN worker_verifies_task wvt on task.id = wvt.task AND wvt.worker=$1 + LEFT JOIN worker_access wa on project.id = wa.project AND wa.worker=$1 + WHERE + assignee IS NULL + AND status=1 + AND (project.public OR (wa.role_assign AND NOT request)) + AND wvt.task IS NULL + ORDER BY task.priority DESC + LIMIT 1 + ) + RETURNING task.id`, worker.Id, projectId) + + var id int64 err := row.Scan(&id) + if err != nil { - logrus.WithFields(logrus.Fields{ - "worker": worker, - }).Trace("No task available") + database.assignMutex.Unlock() return nil } row = db.QueryRow(` - SELECT task.id, task.priority, task.project, assignee, retries, max_retries, - status, recipe, max_assign_time, assign_time, verification_count, project.priority, project.name, - project.clone_url, project.git_repo, project.version, project.motd, project.public, COALESCE(project.chain,0), - project.assign_rate, project.submit_rate - FROM task - INNER JOIN project project ON task.project = project.id - WHERE task.id=$1`, id) + SELECT task.id, task.priority, task.project, assignee, retries, max_retries, + status, recipe, max_assign_time, assign_time, verification_count, project.priority, project.name, + project.clone_url, project.git_repo, project.version, project.motd, project.public, COALESCE(project.chain,0), + project.assign_rate, project.submit_rate + FROM task + INNER JOIN project project ON task.project = project.id + WHERE task.id=$1`, id) + + database.assignMutex.Unlock() + project := &Project{} task := &Task{} task.Project = project