bugfix + reset timed out tasks

This commit is contained in:
simon987 2019-03-01 19:48:48 -05:00
parent 67cdd1e89e
commit 258a3c56eb
9 changed files with 60 additions and 22 deletions

View File

@ -40,13 +40,19 @@ func Index(r *Request) {
func (api *WebAPI) setupMonitoring() { func (api *WebAPI) setupMonitoring() {
api.Cron = cron.New() api.Cron = cron.New()
schedule := cron.Every(config.Cfg.MonitoringInterval) monSchedule := cron.Every(config.Cfg.MonitoringInterval)
api.Cron.Schedule(schedule, cron.FuncJob(api.Database.MakeProjectSnapshots)) api.Cron.Schedule(monSchedule, cron.FuncJob(api.Database.MakeProjectSnapshots))
timeoutSchedule := cron.Every(config.Cfg.ResetTimedOutTasksInterval)
api.Cron.Schedule(timeoutSchedule, cron.FuncJob(api.Database.ResetTimedOutTasks))
api.Cron.Start() api.Cron.Start()
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"every": config.Cfg.MonitoringInterval.String(), "every": config.Cfg.MonitoringInterval.String(),
}).Info("Started monitoring") }).Info("Started monitoring")
logrus.WithFields(logrus.Fields{
"every": config.Cfg.ResetTimedOutTasksInterval.String(),
}).Info("Started task cleanup cron")
} }
func New() *WebAPI { func New() *WebAPI {

View File

@ -230,13 +230,12 @@ func (api *WebAPI) ReleaseTask(r *Request) {
if !res { if !res {
response.Message = "Task was not marked as closed" response.Message = "Task was not marked as closed"
} else {
logrus.WithFields(logrus.Fields{
"releaseTaskRequest": req,
"taskUpdated": res,
}).Trace("Release task")
} }
logrus.WithFields(logrus.Fields{
"releaseTaskRequest": req,
"taskUpdated": res,
}).Trace("Release task")
r.OkJson(response) r.OkJson(response)
} }

View File

@ -23,3 +23,6 @@ session:
monitoring: monitoring:
snapshot_interval: "120s" snapshot_interval: "120s"
history_length: "400h" history_length: "400h"
maintenance:
reset_timed_out_tasks_interval: "5m"

View File

@ -7,17 +7,18 @@ import (
) )
var Cfg struct { var Cfg struct {
ServerAddr string ServerAddr string
DbConnStr string DbConnStr string
WebHookSecret []byte WebHookSecret []byte
WebHookHash string WebHookHash string
WebHookSigHeader string WebHookSigHeader string
LogLevel logrus.Level LogLevel logrus.Level
DbLogLevels []logrus.Level DbLogLevels []logrus.Level
SessionCookieName string SessionCookieName string
SessionCookieExpiration time.Duration SessionCookieExpiration time.Duration
MonitoringInterval time.Duration MonitoringInterval time.Duration
MonitoringHistory time.Duration ResetTimedOutTasksInterval time.Duration
MonitoringHistory time.Duration
} }
func SetupConfig() { func SetupConfig() {
@ -44,6 +45,8 @@ func SetupConfig() {
Cfg.SessionCookieExpiration, err = time.ParseDuration(viper.GetString("session.expiration")) Cfg.SessionCookieExpiration, err = time.ParseDuration(viper.GetString("session.expiration"))
Cfg.MonitoringInterval, err = time.ParseDuration(viper.GetString("monitoring.snapshot_interval")) Cfg.MonitoringInterval, err = time.ParseDuration(viper.GetString("monitoring.snapshot_interval"))
handleErr(err) handleErr(err)
Cfg.ResetTimedOutTasksInterval, err = time.ParseDuration(viper.GetString("maintenance.reset_timed_out_tasks_interval"))
handleErr(err)
Cfg.MonitoringHistory, err = time.ParseDuration(viper.GetString("monitoring.history_length")) Cfg.MonitoringHistory, err = time.ParseDuration(viper.GetString("monitoring.history_length"))
handleErr(err) handleErr(err)
} }

View File

@ -155,7 +155,8 @@ BEGIN
INSERT INTO worker_verifies_task (worker, verification_hash, task) INSERT INTO worker_verifies_task (worker, verification_hash, task)
SELECT wid, ver, task.id SELECT wid, ver, task.id
FROM task FROM task
WHERE assignee = wid; WHERE assignee = wid
AND task.id = tid;
DELETE DELETE
FROM task FROM task

View File

@ -1,5 +1,9 @@
package storage package storage
import (
"github.com/Sirupsen/logrus"
)
func (database *Database) ResetFailedTasks(pid int64) int64 { func (database *Database) ResetFailedTasks(pid int64) int64 {
db := database.getDB() db := database.getDB()
@ -11,3 +15,20 @@ func (database *Database) ResetFailedTasks(pid int64) int64 {
rowsAffected, _ := res.RowsAffected() rowsAffected, _ := res.RowsAffected()
return rowsAffected return rowsAffected
} }
func (database *Database) ResetTimedOutTasks() {
db := database.getDB()
res, err := db.Exec(`
UPDATE task SET assignee=NULL, assign_time=NULL
WHERE status=1 AND assignee IS NOT NULL
AND extract(epoch from now() at time zone 'utc') > (assign_time + max_assign_time);`)
handleErr(err)
rowsAffected, _ := res.RowsAffected()
logrus.WithFields(logrus.Fields{
"rowsAffected": rowsAffected,
}).Info("Reset timed out tasks")
}

View File

@ -113,7 +113,7 @@ func (database *Database) GetTaskFromProject(worker *Worker, projectId int64) *T
( (
SELECT task.id SELECT task.id
FROM task FROM task
INNER JOIN project project on task.project = project.id INNER JOIN project on task.project = project.id
LEFT JOIN worker_verifies_task wvt on task.id = wvt.task AND wvt.worker=$1 LEFT JOIN worker_verifies_task wvt on task.id = wvt.task AND wvt.worker=$1
WHERE NOT project.paused AND assignee IS NULL AND project.id=$2 AND status=1 WHERE NOT project.paused AND assignee IS NULL AND project.id=$2 AND status=1
AND (project.public OR ( AND (project.public OR (

View File

@ -19,3 +19,6 @@ session:
monitoring: monitoring:
snapshot_interval: "10h" snapshot_interval: "10h"
history_length: "10h" history_length: "10h"
maintenance:
reset_timed_out_tasks_interval: "5m"

View File

@ -155,7 +155,8 @@ BEGIN
INSERT INTO worker_verifies_task (worker, verification_hash, task) INSERT INTO worker_verifies_task (worker, verification_hash, task)
SELECT wid, ver, task.id SELECT wid, ver, task.id
FROM task FROM task
WHERE assignee = wid; WHERE assignee = wid
AND task.id = tid;
DELETE DELETE
FROM task FROM task
@ -176,3 +177,4 @@ BEGIN
RETURN res IS NOT NULL; RETURN res IS NOT NULL;
END; END;
$$ LANGUAGE 'plpgsql'; $$ LANGUAGE 'plpgsql';