Add job buffer size parameter

This commit is contained in:
Richard Patel 2018-11-20 03:42:32 +01:00
parent 86ec78cae1
commit 4dbe2aef2b
No known key found for this signature in database
GPG Key ID: C268B2BBDA2ABECB
3 changed files with 29 additions and 6 deletions

View File

@ -25,6 +25,7 @@ var config struct {
AllocStats time.Duration
Verbose bool
PrintHTTP bool
JobBufferSize int
}
const (
@ -39,6 +40,7 @@ const (
ConfUserAgent = "crawl.user-agent"
ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout"
ConfJobBufferSize = "crawl.job_buffer"
ConfCrawlStats = "output.crawl_stats"
ConfAllocStats = "output.resource_stats"
ConfVerbose = "output.verbose"
@ -53,6 +55,7 @@ func prepareConfig() {
viper.SetDefault(ConfUserAgent, "")
viper.SetDefault(ConfDialTimeout, 10 * time.Second)
viper.SetDefault(ConfTimeout, 30 * time.Second)
viper.SetDefault(ConfJobBufferSize, 5000)
viper.SetDefault(ConfCrawlStats, 3 * time.Second)
viper.SetDefault(ConfAllocStats, 0)
viper.SetDefault(ConfVerbose, false)
@ -112,6 +115,8 @@ func readConfig() {
setTimeout(viper.GetDuration(ConfTimeout))
config.JobBufferSize = viper.GetInt(ConfJobBufferSize)
config.CrawlStats = viper.GetDuration(ConfCrawlStats)
config.AllocStats = viper.GetDuration(ConfAllocStats)

View File

@ -46,7 +46,7 @@ crawl:
# Please be careful with this setting!
# The crawler fires fast and more than
# ten connections can overwhelm a server.
connections: 10
connections: 4
# How often to retry getting data
# from the site before giving up
@ -65,4 +65,13 @@ crawl:
# Job buffer size (per task)
# Higher values cause less disk writes
# but require more memory.
#
# The job queue contains all URLs
# that should be crawled next.
# As it grows very large over time,
# it's kept mainly on disk.
# This sets how many jobs are kept
# in memory.
# A negative value will cause all jobs
# to be stored in memory. (Don't do this)
job_buffer: 5000

View File

@ -7,10 +7,6 @@ import (
"sync/atomic"
)
const (
threshold = 5000
)
type BufferedQueue struct {
dataDir string
q *goque.Queue
@ -20,6 +16,9 @@ type BufferedQueue struct {
func OpenQueue(dataDir string) (bq *BufferedQueue, err error) {
bq = new(BufferedQueue)
if config.JobBufferSize < 0 {
return
}
bq.dataDir = dataDir
bq.q, err = goque.OpenQueue(dataDir)
if err != nil { return nil, err }
@ -44,6 +43,11 @@ func (q *BufferedQueue) Dequeue() (job Job, err error) {
return job, nil
}
if config.JobBufferSize < 0 {
err = goque.ErrEmpty
return
}
var item *goque.Item
item, err = q.q.Dequeue()
if err != nil { return }
@ -62,7 +66,8 @@ func (q *BufferedQueue) directEnqueue(job *Job) bool {
q.m.Lock()
defer q.m.Unlock()
if len(q.buf) < threshold {
bs := config.JobBufferSize
if len(q.buf) < bs || bs < 0 {
q.buf = append(q.buf, *job)
return true
} else {
@ -85,6 +90,10 @@ func (q *BufferedQueue) directDequeue(job *Job) bool {
// Always returns nil (But implements io.Closer)
func (q *BufferedQueue) Close() error {
if config.JobBufferSize < 0 {
return nil
}
// Close ignoring errors
q.q.Close()