From ffde1a9e5d98ada2961febb637184d87fc0147ea Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Thu, 15 Nov 2018 20:14:31 +0100 Subject: [PATCH] Timeout and results saving --- config.go | 5 +++ config.yml | 2 ++ crawl.go | 10 ++---- main.go | 8 ++++- model.go | 2 +- scheduler.go | 87 +++++++++++++++++++++++++++++++++++++--------------- worker.go | 10 +++--- 7 files changed, 86 insertions(+), 38 deletions(-) diff --git a/config.go b/config.go index dc0610e..566aeab 100644 --- a/config.go +++ b/config.go @@ -13,6 +13,7 @@ var config struct { Token string Retries int Workers int + Timeout time.Duration Tasks int32 CrawlStats time.Duration AllocStats time.Duration @@ -25,6 +26,7 @@ const ( ConfTasks = "crawl.tasks" ConfRetries = "crawl.retries" ConfWorkers = "crawl.connections" + ConfTimeout = "crawl.timeout" ConfCrawlStats = "output.crawl_stats" ConfAllocStats = "output.resource_stats" ConfVerbose = "output.verbose" @@ -34,6 +36,7 @@ func prepareConfig() { viper.SetDefault(ConfRetries, 5) viper.SetDefault(ConfWorkers, 2) viper.SetDefault(ConfTasks, 3) + viper.SetDefault(ConfTimeout, 10 * time.Second) viper.SetDefault(ConfCrawlStats, 3 * time.Second) viper.SetDefault(ConfAllocStats, 0) viper.SetDefault(ConfVerbose, false) @@ -73,6 +76,8 @@ func readConfig() { configOOB(ConfTasks, int(config.Tasks)) } + config.Timeout = viper.GetDuration(ConfTimeout) + config.CrawlStats = viper.GetDuration(ConfCrawlStats) config.AllocStats = viper.GetDuration(ConfAllocStats) diff --git a/config.yml b/config.yml index dc0b6bd..e940981 100644 --- a/config.yml +++ b/config.yml @@ -24,3 +24,5 @@ crawl: # How often to retry getting data # from the site before giving up retries: 5 + # Time before discarding a network request + timeout: 10s diff --git a/crawl.go b/crawl.go index 50d71df..6085bfa 100644 --- a/crawl.go +++ b/crawl.go @@ -3,7 +3,6 @@ package main import ( "bytes" "fmt" - "github.com/sirupsen/logrus" "github.com/terorie/oddb-go/ds/redblackhash" "github.com/terorie/oddb-go/fasturl" "github.com/valyala/fasthttp" @@ -28,13 +27,10 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) { res := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(res) - err = client.Do(req, res) + err = client.DoTimeout(req, res, config.Timeout) fasthttp.ReleaseRequest(req) - if err != nil { - logrus.Error(err) - return - } + if err != nil { return } err = checkStatusCode(res.StatusCode()) if err != nil { return } @@ -129,7 +125,7 @@ func GetFile(u fasturl.URL, f *File) (err error) { res.SkipBody = true defer fasthttp.ReleaseResponse(res) - err = client.Do(req, res) + err = client.DoTimeout(req, res, config.Timeout) fasthttp.ReleaseRequest(req) if err != nil { return } diff --git a/main.go b/main.go index 2e15161..c596066 100644 --- a/main.go +++ b/main.go @@ -61,7 +61,13 @@ func cmdCrawler(clic *cli.Context) error { u.Path += "/" } if err != nil { return err } - remotes[i] = &OD{ BaseUri: u } + remotes[i] = &OD { + Task: &Task{ + WebsiteId: 0, + Url: u.String(), + }, + BaseUri: u, + } } c := context.Background() diff --git a/model.go b/model.go index 448da20..1c6ab4b 100644 --- a/model.go +++ b/model.go @@ -16,9 +16,9 @@ type Job struct { } type OD struct { + Task *Task Wait sync.WaitGroup BaseUri fasturl.URL - Files []File WCtx WorkerContext Scanned redblackhash.Tree diff --git a/scheduler.go b/scheduler.go index eb0b526..84683c7 100644 --- a/scheduler.go +++ b/scheduler.go @@ -2,7 +2,11 @@ package main import ( "context" + "encoding/json" + "fmt" "github.com/sirupsen/logrus" + "os" + "path" "sync/atomic" ) @@ -12,37 +16,36 @@ var totalBuffered int64 func Schedule(c context.Context, remotes <-chan *OD) { go Stats(c) - for { - select { - case <-c.Done(): - return + for remote := range remotes { + logrus.WithField("url", remote.BaseUri.String()). + Info("Starting crawler") - case remote := <-remotes: - logrus.WithField("url", remote.BaseUri.String()). - Info("Starting crawler") + // Collect results + results := make(chan File) - // Spawn workers - remote.WCtx.in, remote.WCtx.out = makeJobBuffer(c) - for i := 0; i < config.Workers; i++ { - go remote.WCtx.Worker() - } - - // Enqueue initial job - atomic.AddInt32(&activeTasks, 1) - remote.WCtx.queueJob(Job{ - OD: remote, - Uri: remote.BaseUri, - UriStr: remote.BaseUri.String(), - Fails: 0, - }) - - // Upload result when ready - go remote.Watch() + // Spawn workers + remote.WCtx.in, remote.WCtx.out = makeJobBuffer(c) + for i := 0; i < config.Workers; i++ { + go remote.WCtx.Worker(results) } + + // Enqueue initial job + atomic.AddInt32(&activeTasks, 1) + remote.WCtx.queueJob(Job{ + OD: remote, + Uri: remote.BaseUri, + UriStr: remote.BaseUri.String(), + Fails: 0, + }) + + // Upload result when ready + go remote.Watch(results) } } -func (r *OD) Watch() { +func (r *OD) Watch(results chan File) { + go r.Task.Collect(results) + // Wait for all jobs on remote to finish r.Wait.Wait() close(r.WCtx.in) @@ -52,6 +55,40 @@ func (r *OD) Watch() { Info("Crawler finished") globalWait.Done() + + close(results) +} + +func (t *Task) Collect(results chan File) { + err := t.collect(results) + if err != nil { + logrus.WithError(err). + Error("Failed saving crawl results") + } +} + +func (t *Task) collect(results chan File) error { + err := os.MkdirAll("crawled", 0755) + if err != nil { return err } + + f, err := os.OpenFile( + path.Join("crawled", fmt.Sprintf("%d.json", t.WebsiteId)), + os.O_CREATE | os.O_WRONLY | os.O_TRUNC, + 0755, + ) + if err != nil { return err } + defer f.Close() + + for result := range results { + resJson, err := json.Marshal(result) + if err != nil { panic(err) } + _, err = f.Write(resJson) + if err != nil { return err } + _, err = f.Write([]byte{'\n'}) + if err != nil { return err } + } + + return nil } func makeJobBuffer(c context.Context) (chan<- Job, <-chan Job) { diff --git a/worker.go b/worker.go index cafc393..813f49b 100644 --- a/worker.go +++ b/worker.go @@ -19,13 +19,13 @@ type WorkerContext struct { numRateLimits int } -func (w WorkerContext) Worker() { +func (w WorkerContext) Worker(results chan<- File) { for job := range w.out { - w.step(job) + w.step(results, job) } } -func (w WorkerContext) step(job Job) { +func (w WorkerContext) step(results chan<- File, job Job) { defer w.finishJob(&job) var f File @@ -64,7 +64,9 @@ func (w WorkerContext) step(job Job) { w.queueJob(job) } - job.OD.Files = append(job.OD.Files, f) + if !f.IsDir { + results <- f + } } func DoJob(job *Job, f *File) (newJobs []Job, err error) {