From c6d7fad8e80e17ee613e0bf41653d34f4d7d91a4 Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Sun, 3 Feb 2019 15:54:02 +0100 Subject: [PATCH] Resume state saving --- main.go | 33 +++++++++++++++++++++++++----- model.go | 20 +++++++++--------- resume.go | 37 ++++++++++++++++++++++++++++++--- scheduler.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++------ worker.go | 26 ++++++++++++++++++++--- 5 files changed, 147 insertions(+), 27 deletions(-) diff --git a/main.go b/main.go index bd70b55..1259408 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,7 @@ import ( "github.com/spf13/viper" "github.com/terorie/od-database-crawler/fasturl" "os" + "os/signal" "strings" "sync/atomic" "time" @@ -76,20 +77,21 @@ func cmdBase(_ *cobra.Command, _ []string) { onlineMode = true readConfig() - // TODO Graceful shutdown - appCtx := context.Background() - forceCtx := context.Background() + appCtx, soft := context.WithCancel(context.Background()) + forceCtx, hard := context.WithCancel(context.Background()) + go hardShutdown(forceCtx) + go listenCtrlC(soft, hard) inRemotes := make(chan *OD) go LoadResumeTasks(inRemotes) - go Schedule(forceCtx, inRemotes) + go Schedule(appCtx, inRemotes) ticker := time.NewTicker(config.Recheck) defer ticker.Stop() for { select { case <-appCtx.Done(): - return + goto shutdown case <-ticker.C: t, err := FetchTask() if err != nil { @@ -128,6 +130,9 @@ func cmdBase(_ *cobra.Command, _ []string) { ScheduleTask(inRemotes, t, &baseUri) } } + + shutdown: + globalWait.Wait() } func cmdCrawler(_ *cobra.Command, args []string) error { @@ -166,3 +171,21 @@ func cmdCrawler(_ *cobra.Command, args []string) error { return nil } + +func listenCtrlC(soft, hard context.CancelFunc) { + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt) + + <-c + logrus.Info(">>> Shutting down crawler... <<<") + soft() + + <-c + logrus.Warning(">>> Force shutdown! <<<") + hard() +} + +func hardShutdown(c context.Context) { + <-c.Done() + os.Exit(1) +} diff --git a/model.go b/model.go index 273357e..7a0a907 100644 --- a/model.go +++ b/model.go @@ -3,7 +3,6 @@ package main import ( "github.com/terorie/od-database-crawler/ds/redblackhash" "github.com/terorie/od-database-crawler/fasturl" - "sync" "time" ) @@ -30,18 +29,19 @@ type Job struct { } type OD struct { - Task Task - Result TaskResult - Wait sync.WaitGroup - BaseUri fasturl.URL - WCtx WorkerContext - Scanned redblackhash.Tree + Task Task + Result TaskResult + InProgress int64 + BaseUri fasturl.URL + WCtx WorkerContext + Scanned redblackhash.Tree } type PausedOD struct { - Task *Task - Result *TaskResult - BaseUri *fasturl.URL + Task *Task + Result *TaskResult + BaseUri *fasturl.URL + InProgress int64 } type File struct { diff --git a/resume.go b/resume.go index 604bfdc..62c085c 100644 --- a/resume.go +++ b/resume.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "strconv" + "sync/atomic" "time" ) @@ -71,6 +72,35 @@ func ResumeTasks() (tasks []*OD, err error) { return tasks, nil } +func SaveTask(od *OD) (err error) { + fPath := filepath.Join("queue", + strconv.FormatUint(od.Task.WebsiteId, 10), + "PAUSED") + + pausedF, err := os.OpenFile(fPath, os.O_CREATE | os.O_WRONLY | os.O_TRUNC, 0666) + if err != nil { return err } + defer pausedF.Close() + + // Create save state + paused := PausedOD { + Task: &od.Task, + Result: &od.Result, + BaseUri: &od.BaseUri, + InProgress: atomic.LoadInt64(&od.InProgress), + } + + // Write pause settings + pauseEnc := gob.NewEncoder(pausedF) + err = pauseEnc.Encode(&paused) + if err != nil { return err } + + // Write pause scan state + err = od.Scanned.Marshal(pausedF) + if err != nil { return err } + + return nil +} + func resumeQueue(id uint64) (od *OD, err error) { logrus.WithField("id", id). Info("Found unfinished") @@ -94,15 +124,16 @@ func resumeQueue(id uint64) (od *OD, err error) { // Make the paused struct point to OD fields // So gob loads values into the OD struct paused := PausedOD { - Task: &od.Task, - Result: &od.Result, - BaseUri: &od.BaseUri, + Task: &od.Task, + Result: &od.Result, + BaseUri: &od.BaseUri, } // Read pause settings pauseDec := gob.NewDecoder(pausedF) err = pauseDec.Decode(&paused) if err != nil { return nil, err } + atomic.StoreInt64(&od.InProgress, paused.InProgress) // Read pause scan state err = od.Scanned.Unmarshal(pausedF) diff --git a/scheduler.go b/scheduler.go index 128c024..03e67c2 100644 --- a/scheduler.go +++ b/scheduler.go @@ -50,9 +50,7 @@ func scheduleNewTask(c context.Context, remote *OD) bool { if err != nil { panic(err) } // Spawn workers - for i := 0; i < config.Workers; i++ { - go remote.WCtx.Worker(results) - } + remote.WCtx.SpawnWorkers(c, results, config.Workers) // Enqueue initial job atomic.AddInt32(&numActiveTasks, 1) @@ -167,16 +165,33 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error defer close(results) // Wait for all jobs on remote to finish - o.Wait.Wait() + for { + // Natural finish + if atomic.LoadInt64(&o.InProgress) == 0 { + o.onTaskFinished() + return + } + // Abort + if atomic.LoadInt32(&o.WCtx.aborted) != 0 { + // Wait for all workers to finish + o.WCtx.workers.Wait() + o.onTaskPaused() + return + } + + time.Sleep(500 * time.Millisecond) + } +} + +func (o *OD) onTaskFinished() { + defer atomic.AddInt32(&numActiveTasks, -1) // Close queue if err := o.WCtx.Queue.Close(); err != nil { panic(err) } - atomic.AddInt32(&numActiveTasks, -1) // Log finish - logrus.WithFields(logrus.Fields{ "id": o.Task.WebsiteId, "url": o.BaseUri.String(), @@ -199,6 +214,37 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error } } +func (o *OD) onTaskPaused() { + defer atomic.AddInt32(&numActiveTasks, -1) + + // Close queue + if err := o.WCtx.Queue.Close(); err != nil { + panic(err) + } + + // Set current end time + o.Result.EndTimeUnix = time.Now().Unix() + + // Save task metadata + err := SaveTask(o) + if err != nil { + // Log finish + logrus.WithFields(logrus.Fields{ + "err": err.Error(), + "id": o.Task.WebsiteId, + "url": o.BaseUri.String(), + }).Error("Failed to save crawler state") + return + } + + // Log finish + logrus.WithFields(logrus.Fields{ + "id": o.Task.WebsiteId, + "url": o.BaseUri.String(), + "duration": time.Since(o.Result.StartTime), + }).Info("Crawler paused") +} + func (t *Task) Collect(results chan File, f *os.File, errC chan<- error) { err := t.collect(results, f) if err != nil { diff --git a/worker.go b/worker.go index 118e28f..5c07a15 100644 --- a/worker.go +++ b/worker.go @@ -1,6 +1,7 @@ package main import ( + "context" "github.com/beeker1121/goque" "github.com/sirupsen/logrus" "math" @@ -18,10 +19,29 @@ type WorkerContext struct { Queue *BufferedQueue lastRateLimit time.Time numRateLimits int + workers sync.WaitGroup + aborted int32 } -func (w *WorkerContext) Worker(results chan<- File) { +func (w *WorkerContext) SpawnWorkers(c context.Context, results chan<- File, n int) { + w.workers.Add(n) + for i := 0; i < n; i++ { + go w.Worker(c, results) + } +} + +func (w *WorkerContext) Worker(c context.Context, results chan<- File) { + defer w.workers.Done() + for { + select { + case <-c.Done(): + // Not yet done + atomic.StoreInt32(&w.aborted, 1) + return + default: + } + job, err := w.Queue.Dequeue() switch err { case goque.ErrEmpty: @@ -156,7 +176,7 @@ func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) { } func (w *WorkerContext) queueJob(job Job) { - w.OD.Wait.Add(1) + atomic.AddInt64(&w.OD.InProgress, 1) if w.numRateLimits > 0 { if time.Since(w.lastRateLimit) > 5 * time.Second { @@ -173,7 +193,7 @@ func (w *WorkerContext) queueJob(job Job) { } func (w *WorkerContext) finishJob() { - w.OD.Wait.Done() + atomic.AddInt64(&w.OD.InProgress, -1) } func isErrSilent(err error) bool {