Resume state saving

This commit is contained in:
Richard Patel 2019-02-03 15:54:02 +01:00
parent 0b20823ae1
commit c6d7fad8e8
No known key found for this signature in database
GPG Key ID: C268B2BBDA2ABECB
5 changed files with 147 additions and 27 deletions

33
main.go
View File

@ -8,6 +8,7 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/terorie/od-database-crawler/fasturl" "github.com/terorie/od-database-crawler/fasturl"
"os" "os"
"os/signal"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
@ -76,20 +77,21 @@ func cmdBase(_ *cobra.Command, _ []string) {
onlineMode = true onlineMode = true
readConfig() readConfig()
// TODO Graceful shutdown appCtx, soft := context.WithCancel(context.Background())
appCtx := context.Background() forceCtx, hard := context.WithCancel(context.Background())
forceCtx := context.Background() go hardShutdown(forceCtx)
go listenCtrlC(soft, hard)
inRemotes := make(chan *OD) inRemotes := make(chan *OD)
go LoadResumeTasks(inRemotes) go LoadResumeTasks(inRemotes)
go Schedule(forceCtx, inRemotes) go Schedule(appCtx, inRemotes)
ticker := time.NewTicker(config.Recheck) ticker := time.NewTicker(config.Recheck)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-appCtx.Done(): case <-appCtx.Done():
return goto shutdown
case <-ticker.C: case <-ticker.C:
t, err := FetchTask() t, err := FetchTask()
if err != nil { if err != nil {
@ -128,6 +130,9 @@ func cmdBase(_ *cobra.Command, _ []string) {
ScheduleTask(inRemotes, t, &baseUri) ScheduleTask(inRemotes, t, &baseUri)
} }
} }
shutdown:
globalWait.Wait()
} }
func cmdCrawler(_ *cobra.Command, args []string) error { func cmdCrawler(_ *cobra.Command, args []string) error {
@ -166,3 +171,21 @@ func cmdCrawler(_ *cobra.Command, args []string) error {
return nil return nil
} }
func listenCtrlC(soft, hard context.CancelFunc) {
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
<-c
logrus.Info(">>> Shutting down crawler... <<<")
soft()
<-c
logrus.Warning(">>> Force shutdown! <<<")
hard()
}
func hardShutdown(c context.Context) {
<-c.Done()
os.Exit(1)
}

View File

@ -3,7 +3,6 @@ package main
import ( import (
"github.com/terorie/od-database-crawler/ds/redblackhash" "github.com/terorie/od-database-crawler/ds/redblackhash"
"github.com/terorie/od-database-crawler/fasturl" "github.com/terorie/od-database-crawler/fasturl"
"sync"
"time" "time"
) )
@ -30,18 +29,19 @@ type Job struct {
} }
type OD struct { type OD struct {
Task Task Task Task
Result TaskResult Result TaskResult
Wait sync.WaitGroup InProgress int64
BaseUri fasturl.URL BaseUri fasturl.URL
WCtx WorkerContext WCtx WorkerContext
Scanned redblackhash.Tree Scanned redblackhash.Tree
} }
type PausedOD struct { type PausedOD struct {
Task *Task Task *Task
Result *TaskResult Result *TaskResult
BaseUri *fasturl.URL BaseUri *fasturl.URL
InProgress int64
} }
type File struct { type File struct {

View File

@ -8,6 +8,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"sync/atomic"
"time" "time"
) )
@ -71,6 +72,35 @@ func ResumeTasks() (tasks []*OD, err error) {
return tasks, nil return tasks, nil
} }
func SaveTask(od *OD) (err error) {
fPath := filepath.Join("queue",
strconv.FormatUint(od.Task.WebsiteId, 10),
"PAUSED")
pausedF, err := os.OpenFile(fPath, os.O_CREATE | os.O_WRONLY | os.O_TRUNC, 0666)
if err != nil { return err }
defer pausedF.Close()
// Create save state
paused := PausedOD {
Task: &od.Task,
Result: &od.Result,
BaseUri: &od.BaseUri,
InProgress: atomic.LoadInt64(&od.InProgress),
}
// Write pause settings
pauseEnc := gob.NewEncoder(pausedF)
err = pauseEnc.Encode(&paused)
if err != nil { return err }
// Write pause scan state
err = od.Scanned.Marshal(pausedF)
if err != nil { return err }
return nil
}
func resumeQueue(id uint64) (od *OD, err error) { func resumeQueue(id uint64) (od *OD, err error) {
logrus.WithField("id", id). logrus.WithField("id", id).
Info("Found unfinished") Info("Found unfinished")
@ -94,15 +124,16 @@ func resumeQueue(id uint64) (od *OD, err error) {
// Make the paused struct point to OD fields // Make the paused struct point to OD fields
// So gob loads values into the OD struct // So gob loads values into the OD struct
paused := PausedOD { paused := PausedOD {
Task: &od.Task, Task: &od.Task,
Result: &od.Result, Result: &od.Result,
BaseUri: &od.BaseUri, BaseUri: &od.BaseUri,
} }
// Read pause settings // Read pause settings
pauseDec := gob.NewDecoder(pausedF) pauseDec := gob.NewDecoder(pausedF)
err = pauseDec.Decode(&paused) err = pauseDec.Decode(&paused)
if err != nil { return nil, err } if err != nil { return nil, err }
atomic.StoreInt64(&od.InProgress, paused.InProgress)
// Read pause scan state // Read pause scan state
err = od.Scanned.Unmarshal(pausedF) err = od.Scanned.Unmarshal(pausedF)

View File

@ -50,9 +50,7 @@ func scheduleNewTask(c context.Context, remote *OD) bool {
if err != nil { panic(err) } if err != nil { panic(err) }
// Spawn workers // Spawn workers
for i := 0; i < config.Workers; i++ { remote.WCtx.SpawnWorkers(c, results, config.Workers)
go remote.WCtx.Worker(results)
}
// Enqueue initial job // Enqueue initial job
atomic.AddInt32(&numActiveTasks, 1) atomic.AddInt32(&numActiveTasks, 1)
@ -167,16 +165,33 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
defer close(results) defer close(results)
// Wait for all jobs on remote to finish // Wait for all jobs on remote to finish
o.Wait.Wait() for {
// Natural finish
if atomic.LoadInt64(&o.InProgress) == 0 {
o.onTaskFinished()
return
}
// Abort
if atomic.LoadInt32(&o.WCtx.aborted) != 0 {
// Wait for all workers to finish
o.WCtx.workers.Wait()
o.onTaskPaused()
return
}
time.Sleep(500 * time.Millisecond)
}
}
func (o *OD) onTaskFinished() {
defer atomic.AddInt32(&numActiveTasks, -1)
// Close queue // Close queue
if err := o.WCtx.Queue.Close(); err != nil { if err := o.WCtx.Queue.Close(); err != nil {
panic(err) panic(err)
} }
atomic.AddInt32(&numActiveTasks, -1)
// Log finish // Log finish
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId, "id": o.Task.WebsiteId,
"url": o.BaseUri.String(), "url": o.BaseUri.String(),
@ -199,6 +214,37 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
} }
} }
func (o *OD) onTaskPaused() {
defer atomic.AddInt32(&numActiveTasks, -1)
// Close queue
if err := o.WCtx.Queue.Close(); err != nil {
panic(err)
}
// Set current end time
o.Result.EndTimeUnix = time.Now().Unix()
// Save task metadata
err := SaveTask(o)
if err != nil {
// Log finish
logrus.WithFields(logrus.Fields{
"err": err.Error(),
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
}).Error("Failed to save crawler state")
return
}
// Log finish
logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
"duration": time.Since(o.Result.StartTime),
}).Info("Crawler paused")
}
func (t *Task) Collect(results chan File, f *os.File, errC chan<- error) { func (t *Task) Collect(results chan File, f *os.File, errC chan<- error) {
err := t.collect(results, f) err := t.collect(results, f)
if err != nil { if err != nil {

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"github.com/beeker1121/goque" "github.com/beeker1121/goque"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"math" "math"
@ -18,10 +19,29 @@ type WorkerContext struct {
Queue *BufferedQueue Queue *BufferedQueue
lastRateLimit time.Time lastRateLimit time.Time
numRateLimits int numRateLimits int
workers sync.WaitGroup
aborted int32
} }
func (w *WorkerContext) Worker(results chan<- File) { func (w *WorkerContext) SpawnWorkers(c context.Context, results chan<- File, n int) {
w.workers.Add(n)
for i := 0; i < n; i++ {
go w.Worker(c, results)
}
}
func (w *WorkerContext) Worker(c context.Context, results chan<- File) {
defer w.workers.Done()
for { for {
select {
case <-c.Done():
// Not yet done
atomic.StoreInt32(&w.aborted, 1)
return
default:
}
job, err := w.Queue.Dequeue() job, err := w.Queue.Dequeue()
switch err { switch err {
case goque.ErrEmpty: case goque.ErrEmpty:
@ -156,7 +176,7 @@ func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
} }
func (w *WorkerContext) queueJob(job Job) { func (w *WorkerContext) queueJob(job Job) {
w.OD.Wait.Add(1) atomic.AddInt64(&w.OD.InProgress, 1)
if w.numRateLimits > 0 { if w.numRateLimits > 0 {
if time.Since(w.lastRateLimit) > 5 * time.Second { if time.Since(w.lastRateLimit) > 5 * time.Second {
@ -173,7 +193,7 @@ func (w *WorkerContext) queueJob(job Job) {
} }
func (w *WorkerContext) finishJob() { func (w *WorkerContext) finishJob() {
w.OD.Wait.Done() atomic.AddInt64(&w.OD.InProgress, -1)
} }
func isErrSilent(err error) bool { func isErrSilent(err error) bool {