diff --git a/main.go b/main.go index 88fe21c..229c9fb 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "context" "github.com/sirupsen/logrus" + "github.com/spf13/viper" "github.com/terorie/od-database-crawler/fasturl" "github.com/urfave/cli" "os" @@ -11,6 +12,8 @@ import ( "time" ) +var configFile string + var app = cli.App { Name: "od-database-crawler", Usage: "OD-Database Go crawler", @@ -18,7 +21,7 @@ var app = cli.App { BashComplete: cli.DefaultAppComplete, Writer: os.Stdout, Action: cmdBase, - Commands: []cli.Command{ + Commands: []cli.Command { { Name: "crawl", Usage: "Crawl a list of URLs", @@ -26,6 +29,19 @@ var app = cli.App { Action: cmdCrawler, }, }, + Flags: []cli.Flag { + cli.StringFlag { + Name: "config", + EnvVar: "CONFIG", + Destination: &configFile, + }, + }, + Before: func(i *cli.Context) error { + if configFile != "" { + viper.SetConfigFile(configFile) + } + return nil + }, After: func(i *cli.Context) error { exitHooks.Execute() return nil @@ -39,10 +55,11 @@ func init() { } func main() { - err := os.MkdirAll("crawled", 0755) - if err != nil { - panic(err) - } + if err := os.MkdirAll("crawled", 0755); + err != nil { panic(err) } + + if err := os.MkdirAll("queue", 0755); + err != nil { panic(err) } readConfig() app.Run(os.Args) diff --git a/model.go b/model.go index d0509b6..ff1f048 100644 --- a/model.go +++ b/model.go @@ -23,13 +23,36 @@ type TaskResult struct { } type Job struct { - OD *OD Uri fasturl.URL UriStr string Fails int LastError error } +type JobGob struct { + Uri string + Fails int + LastError string +} + +func (g *JobGob) ToGob(j *Job) { + g.Uri = j.UriStr + g.Fails = j.Fails + if j.LastError != nil { + g.LastError = j.LastError.Error() + } +} + +func (g *JobGob) FromGob(j *Job) { + if err := j.Uri.Parse(g.Uri); + err != nil { panic(err) } + j.UriStr = g.Uri + j.Fails = g.Fails + if g.LastError != "" { + j.LastError = errorString(g.LastError) + } +} + type OD struct { Task Task Result TaskResult @@ -57,3 +80,8 @@ func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) { o.Scanned.Put(k) return false } + +type errorString string +func (e errorString) Error() string { + return string(e) +} diff --git a/scheduler.go b/scheduler.go index 6f6ed92..50e14bb 100644 --- a/scheduler.go +++ b/scheduler.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/beeker1121/goque" "github.com/sirupsen/logrus" "github.com/terorie/od-database-crawler/fasturl" "os" @@ -28,8 +29,23 @@ func Schedule(c context.Context, remotes <-chan *OD) { // Collect results results := make(chan File) + remote.WCtx.OD = remote + + // Get queue path + queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId)) + + // Delete existing queue + if err := os.RemoveAll(queuePath); + err != nil { panic(err) } + + // Start new queue + var err error + remote.WCtx.Queue, err = goque.OpenQueue(queuePath) + if err != nil { + panic(err) + } + // Spawn workers - remote.WCtx.in, remote.WCtx.out = makeJobBuffer(c) for i := 0; i < config.Workers; i++ { go remote.WCtx.Worker(results) } @@ -37,7 +53,6 @@ func Schedule(c context.Context, remotes <-chan *OD) { // Enqueue initial job atomic.AddInt32(&numActiveTasks, 1) remote.WCtx.queueJob(Job{ - OD: remote, Uri: remote.BaseUri, UriStr: remote.BaseUri.String(), Fails: 0, @@ -148,7 +163,9 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error // Wait for all jobs on remote to finish o.Wait.Wait() - close(o.WCtx.in) + if err := o.WCtx.Queue.Close(); err != nil { + panic(err) + } atomic.AddInt32(&numActiveTasks, -1) // Log finish @@ -198,51 +215,3 @@ func (t *Task) collect(results chan File, f *os.File) error { return nil } - -func makeJobBuffer(c context.Context) (chan<- Job, <-chan Job) { - in := make(chan Job) - out := make(chan Job) - go bufferJobs(c, in, out) - return in, out -} - -func bufferJobs(c context.Context, in chan Job, out chan Job) { - defer close(out) - var inQueue []Job - outCh := func() chan Job { - if len(inQueue) == 0 { - return nil - } - return out - } - for len(inQueue) > 0 || in != nil { - if len(inQueue) == 0 { - select { - case v, ok := <-in: - if !ok { - in = nil - } else { - atomic.AddInt64(&totalBuffered, 1) - inQueue = append(inQueue, v) - } - case <-c.Done(): - return - } - } else { - select { - case v, ok := <-in: - if !ok { - in = nil - } else { - atomic.AddInt64(&totalBuffered, 1) - inQueue = append(inQueue, v) - } - case outCh() <- inQueue[0]: - atomic.AddInt64(&totalBuffered, -1) - inQueue = inQueue[1:] - case <-c.Done(): - return - } - } - } -} diff --git a/worker.go b/worker.go index a5c3a64..837ffda 100644 --- a/worker.go +++ b/worker.go @@ -1,6 +1,7 @@ package main import ( + "github.com/beeker1121/goque" "github.com/sirupsen/logrus" "github.com/valyala/fasthttp" "math" @@ -14,24 +15,47 @@ import ( var globalWait sync.WaitGroup type WorkerContext struct { - in chan<- Job - out <-chan Job + OD *OD + Queue *goque.Queue lastRateLimit time.Time numRateLimits int } -func (w WorkerContext) Worker(results chan<- File) { - for job := range w.out { +func (w *WorkerContext) Worker(results chan<- File) { + for { + item, err := w.Queue.Dequeue() + switch err { + case goque.ErrEmpty: + time.Sleep(500 * time.Millisecond) + continue + + case goque.ErrDBClosed: + return + + case nil: + break + + default: + panic(err) + } + + var gob JobGob + if err := item.ToObject(&gob); err != nil { + panic(err) + } + + var job Job + gob.FromGob(&job) w.step(results, job) } } -func (w WorkerContext) step(results chan<- File, job Job) { +func (w *WorkerContext) step(results chan<- File, job Job) { defer w.finishJob(&job) var f File - newJobs, err := DoJob(&job, &f) + newJobs, err := w.DoJob(&job, &f) atomic.AddUint64(&totalStarted, 1) if err == ErrKnown { return @@ -75,7 +99,7 @@ func (w WorkerContext) step(results chan<- File, job Job) { } } -func DoJob(job *Job, f *File) (newJobs []Job, err error) { +func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) { if len(job.Uri.Path) == 0 { return } if job.Uri.Path[len(job.Uri.Path)-1] == '/' { // Load directory @@ -93,7 +117,7 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) { hash := f.HashDir(links) // Skip symlinked dirs - if job.OD.LoadOrStoreKey(&hash) { + if w.OD.LoadOrStoreKey(&hash) { return nil, ErrKnown } @@ -114,7 +138,6 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) { lastLink = uriStr newJobs = append(newJobs, Job{ - OD: job.OD, Uri: link, UriStr: uriStr, Fails: 0, @@ -139,13 +162,13 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) { } return nil, err } - atomic.AddUint64(&job.OD.Result.FileCount, 1) + atomic.AddUint64(&w.OD.Result.FileCount, 1) } return } -func (w WorkerContext) queueJob(job Job) { - job.OD.Wait.Add(1) +func (w *WorkerContext) queueJob(job Job) { + w.OD.Wait.Add(1) if w.numRateLimits > 0 { if time.Since(w.lastRateLimit) > 5 * time.Second { @@ -156,11 +179,15 @@ func (w WorkerContext) queueJob(job Job) { } } - w.in <- job + var gob JobGob + gob.ToGob(&job) + if _, err := w.Queue.EnqueueObject(gob); err != nil { + panic(err) + } } -func (w WorkerContext) finishJob(job *Job) { - job.OD.Wait.Done() +func (w *WorkerContext) finishJob(job *Job) { + w.OD.Wait.Done() } func isErrSilent(err error) bool {