From 258a3c56eba641d9204d2372d64849618b41b109 Mon Sep 17 00:00:00 2001 From: simon987 Date: Fri, 1 Mar 2019 19:48:48 -0500 Subject: [PATCH] bugfix + reset timed out tasks --- api/main.go | 10 ++++++++-- api/task.go | 11 +++++------ config.yml | 3 +++ config/config.go | 25 ++++++++++++++----------- schema.sql | 3 ++- storage/maintenance.go | 21 +++++++++++++++++++++ storage/task.go | 2 +- test/config.yml | 3 +++ test/schema.sql | 4 +++- 9 files changed, 60 insertions(+), 22 deletions(-) diff --git a/api/main.go b/api/main.go index 54e3c54..c27af66 100644 --- a/api/main.go +++ b/api/main.go @@ -40,13 +40,19 @@ func Index(r *Request) { func (api *WebAPI) setupMonitoring() { api.Cron = cron.New() - schedule := cron.Every(config.Cfg.MonitoringInterval) - api.Cron.Schedule(schedule, cron.FuncJob(api.Database.MakeProjectSnapshots)) + monSchedule := cron.Every(config.Cfg.MonitoringInterval) + api.Cron.Schedule(monSchedule, cron.FuncJob(api.Database.MakeProjectSnapshots)) + + timeoutSchedule := cron.Every(config.Cfg.ResetTimedOutTasksInterval) + api.Cron.Schedule(timeoutSchedule, cron.FuncJob(api.Database.ResetTimedOutTasks)) api.Cron.Start() logrus.WithFields(logrus.Fields{ "every": config.Cfg.MonitoringInterval.String(), }).Info("Started monitoring") + logrus.WithFields(logrus.Fields{ + "every": config.Cfg.ResetTimedOutTasksInterval.String(), + }).Info("Started task cleanup cron") } func New() *WebAPI { diff --git a/api/task.go b/api/task.go index fa77fe4..75f0fe9 100644 --- a/api/task.go +++ b/api/task.go @@ -230,13 +230,12 @@ func (api *WebAPI) ReleaseTask(r *Request) { if !res { response.Message = "Task was not marked as closed" - } else { - - logrus.WithFields(logrus.Fields{ - "releaseTaskRequest": req, - "taskUpdated": res, - }).Trace("Release task") } + logrus.WithFields(logrus.Fields{ + "releaseTaskRequest": req, + "taskUpdated": res, + }).Trace("Release task") + r.OkJson(response) } diff --git a/config.yml b/config.yml index 30f6512..4bb06de 100755 --- a/config.yml +++ b/config.yml @@ -23,3 +23,6 @@ session: monitoring: snapshot_interval: "120s" history_length: "400h" + +maintenance: + reset_timed_out_tasks_interval: "5m" diff --git a/config/config.go b/config/config.go index e072d64..a21fae8 100644 --- a/config/config.go +++ b/config/config.go @@ -7,17 +7,18 @@ import ( ) var Cfg struct { - ServerAddr string - DbConnStr string - WebHookSecret []byte - WebHookHash string - WebHookSigHeader string - LogLevel logrus.Level - DbLogLevels []logrus.Level - SessionCookieName string - SessionCookieExpiration time.Duration - MonitoringInterval time.Duration - MonitoringHistory time.Duration + ServerAddr string + DbConnStr string + WebHookSecret []byte + WebHookHash string + WebHookSigHeader string + LogLevel logrus.Level + DbLogLevels []logrus.Level + SessionCookieName string + SessionCookieExpiration time.Duration + MonitoringInterval time.Duration + ResetTimedOutTasksInterval time.Duration + MonitoringHistory time.Duration } func SetupConfig() { @@ -44,6 +45,8 @@ func SetupConfig() { Cfg.SessionCookieExpiration, err = time.ParseDuration(viper.GetString("session.expiration")) Cfg.MonitoringInterval, err = time.ParseDuration(viper.GetString("monitoring.snapshot_interval")) handleErr(err) + Cfg.ResetTimedOutTasksInterval, err = time.ParseDuration(viper.GetString("maintenance.reset_timed_out_tasks_interval")) + handleErr(err) Cfg.MonitoringHistory, err = time.ParseDuration(viper.GetString("monitoring.history_length")) handleErr(err) } diff --git a/schema.sql b/schema.sql index 0089cbb..ebfb14c 100755 --- a/schema.sql +++ b/schema.sql @@ -155,7 +155,8 @@ BEGIN INSERT INTO worker_verifies_task (worker, verification_hash, task) SELECT wid, ver, task.id FROM task - WHERE assignee = wid; + WHERE assignee = wid + AND task.id = tid; DELETE FROM task diff --git a/storage/maintenance.go b/storage/maintenance.go index 0aa6255..fdc0c6c 100644 --- a/storage/maintenance.go +++ b/storage/maintenance.go @@ -1,5 +1,9 @@ package storage +import ( + "github.com/Sirupsen/logrus" +) + func (database *Database) ResetFailedTasks(pid int64) int64 { db := database.getDB() @@ -11,3 +15,20 @@ func (database *Database) ResetFailedTasks(pid int64) int64 { rowsAffected, _ := res.RowsAffected() return rowsAffected } + +func (database *Database) ResetTimedOutTasks() { + + db := database.getDB() + + res, err := db.Exec(` + UPDATE task SET assignee=NULL, assign_time=NULL + WHERE status=1 AND assignee IS NOT NULL + AND extract(epoch from now() at time zone 'utc') > (assign_time + max_assign_time);`) + handleErr(err) + + rowsAffected, _ := res.RowsAffected() + + logrus.WithFields(logrus.Fields{ + "rowsAffected": rowsAffected, + }).Info("Reset timed out tasks") +} diff --git a/storage/task.go b/storage/task.go index b895558..51f8474 100644 --- a/storage/task.go +++ b/storage/task.go @@ -113,7 +113,7 @@ func (database *Database) GetTaskFromProject(worker *Worker, projectId int64) *T ( SELECT task.id FROM task - INNER JOIN project project on task.project = project.id + INNER JOIN project on task.project = project.id LEFT JOIN worker_verifies_task wvt on task.id = wvt.task AND wvt.worker=$1 WHERE NOT project.paused AND assignee IS NULL AND project.id=$2 AND status=1 AND (project.public OR ( diff --git a/test/config.yml b/test/config.yml index 4fbfaff..de56131 100644 --- a/test/config.yml +++ b/test/config.yml @@ -19,3 +19,6 @@ session: monitoring: snapshot_interval: "10h" history_length: "10h" + +maintenance: + reset_timed_out_tasks_interval: "5m" diff --git a/test/schema.sql b/test/schema.sql index 83c4f08..5385159 100755 --- a/test/schema.sql +++ b/test/schema.sql @@ -155,7 +155,8 @@ BEGIN INSERT INTO worker_verifies_task (worker, verification_hash, task) SELECT wid, ver, task.id FROM task - WHERE assignee = wid; + WHERE assignee = wid + AND task.id = tid; DELETE FROM task @@ -176,3 +177,4 @@ BEGIN RETURN res IS NOT NULL; END; $$ LANGUAGE 'plpgsql'; +