From 4dbe2aef2bbc4738cd67bf21ed8f1c567f30858b Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Tue, 20 Nov 2018 03:42:32 +0100 Subject: [PATCH] Add job buffer size parameter --- config.go | 5 +++++ config.yml | 11 ++++++++++- queue.go | 19 ++++++++++++++----- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/config.go b/config.go index 2616672..fea0f1c 100644 --- a/config.go +++ b/config.go @@ -25,6 +25,7 @@ var config struct { AllocStats time.Duration Verbose bool PrintHTTP bool + JobBufferSize int } const ( @@ -39,6 +40,7 @@ const ( ConfUserAgent = "crawl.user-agent" ConfDialTimeout = "crawl.dial_timeout" ConfTimeout = "crawl.timeout" + ConfJobBufferSize = "crawl.job_buffer" ConfCrawlStats = "output.crawl_stats" ConfAllocStats = "output.resource_stats" ConfVerbose = "output.verbose" @@ -53,6 +55,7 @@ func prepareConfig() { viper.SetDefault(ConfUserAgent, "") viper.SetDefault(ConfDialTimeout, 10 * time.Second) viper.SetDefault(ConfTimeout, 30 * time.Second) + viper.SetDefault(ConfJobBufferSize, 5000) viper.SetDefault(ConfCrawlStats, 3 * time.Second) viper.SetDefault(ConfAllocStats, 0) viper.SetDefault(ConfVerbose, false) @@ -112,6 +115,8 @@ func readConfig() { setTimeout(viper.GetDuration(ConfTimeout)) + config.JobBufferSize = viper.GetInt(ConfJobBufferSize) + config.CrawlStats = viper.GetDuration(ConfCrawlStats) config.AllocStats = viper.GetDuration(ConfAllocStats) diff --git a/config.yml b/config.yml index 10f7b0b..13e7ae3 100644 --- a/config.yml +++ b/config.yml @@ -46,7 +46,7 @@ crawl: # Please be careful with this setting! # The crawler fires fast and more than # ten connections can overwhelm a server. - connections: 10 + connections: 4 # How often to retry getting data # from the site before giving up @@ -65,4 +65,13 @@ crawl: # Job buffer size (per task) # Higher values cause less disk writes # but require more memory. + # + # The job queue contains all URLs + # that should be crawled next. + # As it grows very large over time, + # it's kept mainly on disk. + # This sets how many jobs are kept + # in memory. + # A negative value will cause all jobs + # to be stored in memory. (Don't do this) job_buffer: 5000 diff --git a/queue.go b/queue.go index 66f57e1..3ee67c1 100644 --- a/queue.go +++ b/queue.go @@ -7,10 +7,6 @@ import ( "sync/atomic" ) -const ( - threshold = 5000 -) - type BufferedQueue struct { dataDir string q *goque.Queue @@ -20,6 +16,9 @@ type BufferedQueue struct { func OpenQueue(dataDir string) (bq *BufferedQueue, err error) { bq = new(BufferedQueue) + if config.JobBufferSize < 0 { + return + } bq.dataDir = dataDir bq.q, err = goque.OpenQueue(dataDir) if err != nil { return nil, err } @@ -44,6 +43,11 @@ func (q *BufferedQueue) Dequeue() (job Job, err error) { return job, nil } + if config.JobBufferSize < 0 { + err = goque.ErrEmpty + return + } + var item *goque.Item item, err = q.q.Dequeue() if err != nil { return } @@ -62,7 +66,8 @@ func (q *BufferedQueue) directEnqueue(job *Job) bool { q.m.Lock() defer q.m.Unlock() - if len(q.buf) < threshold { + bs := config.JobBufferSize + if len(q.buf) < bs || bs < 0 { q.buf = append(q.buf, *job) return true } else { @@ -85,6 +90,10 @@ func (q *BufferedQueue) directDequeue(job *Job) bool { // Always returns nil (But implements io.Closer) func (q *BufferedQueue) Close() error { + if config.JobBufferSize < 0 { + return nil + } + // Close ignoring errors q.q.Close()