16 Commits
v1.0 ... v1.1.0

Author SHA1 Message Date
Richard Patel
f9a0d6bffe Bump to v1.1.0 2018-11-20 03:46:36 +01:00
Richard Patel
4dbe2aef2b Add job buffer size parameter 2018-11-20 03:42:32 +01:00
Richard Patel
86ec78cae1 Add TCP timeout option 2018-11-20 03:29:10 +01:00
Richard Patel
b846498030 Delete URL queues after crawling 2018-11-20 03:05:43 +01:00
Richard Patel
4f3140a39f Fix queue_count in log 2018-11-20 02:49:03 +01:00
Richard Patel
85d2aac9d4 Performance patch 2018-11-20 02:33:50 +01:00
Richard Patel
b6c0a45900 Job queue disk offloading 2018-11-20 02:03:10 +01:00
Richard Patel
d332f06659 Limit retries to 10 2018-11-18 21:05:26 +01:00
Richard Patel
1625d6c888 Bump to v1.0.2 2018-11-18 18:53:57 +01:00
Richard Patel
03a487f393 Fix crawl loop 2018-11-18 18:45:06 +01:00
Richard Patel
ac8221b109 Retry /task/upload 2018-11-18 18:33:26 +01:00
Richard Patel
8ed2cf3b93 Bump to v1.0.1 2018-11-18 14:49:07 +01:00
Richard Patel
f3620262fc Add log file support 2018-11-18 14:46:52 +01:00
Richard Patel
dc4e4212a0 Add freebsd to release.sh 2018-11-18 14:38:18 +01:00
Richard Patel
6e6a4edd27 Ignore all HTTP errors 2018-11-18 14:25:06 +01:00
Richard Patel
a71157b4d8 Add User-Agent parameter 2018-11-18 14:24:04 +01:00
12 changed files with 357 additions and 119 deletions

View File

@@ -1,9 +1,11 @@
package main package main
import ( import (
"bufio"
"fmt" "fmt"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/viper" "github.com/spf13/viper"
"io"
"os" "os"
"strings" "strings"
"time" "time"
@@ -17,12 +19,13 @@ var config struct {
ChunkSize int64 ChunkSize int64
Retries int Retries int
Workers int Workers int
Timeout time.Duration UserAgent string
Tasks int32 Tasks int32
CrawlStats time.Duration CrawlStats time.Duration
AllocStats time.Duration AllocStats time.Duration
Verbose bool Verbose bool
PrintHTTP bool PrintHTTP bool
JobBufferSize int
} }
const ( const (
@@ -34,22 +37,30 @@ const (
ConfTasks = "crawl.tasks" ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries" ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections" ConfWorkers = "crawl.connections"
ConfUserAgent = "crawl.user-agent"
ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout" ConfTimeout = "crawl.timeout"
ConfJobBufferSize = "crawl.job_buffer"
ConfCrawlStats = "output.crawl_stats" ConfCrawlStats = "output.crawl_stats"
ConfAllocStats = "output.resource_stats" ConfAllocStats = "output.resource_stats"
ConfVerbose = "output.verbose" ConfVerbose = "output.verbose"
ConfPrintHTTP = "output.http" ConfPrintHTTP = "output.http"
ConfLogFile = "output.log"
) )
func prepareConfig() { func prepareConfig() {
viper.SetDefault(ConfRetries, 5) viper.SetDefault(ConfRetries, 5)
viper.SetDefault(ConfWorkers, 2) viper.SetDefault(ConfWorkers, 2)
viper.SetDefault(ConfTasks, 3) viper.SetDefault(ConfTasks, 3)
viper.SetDefault(ConfTimeout, 10 * time.Second) viper.SetDefault(ConfUserAgent, "")
viper.SetDefault(ConfDialTimeout, 10 * time.Second)
viper.SetDefault(ConfTimeout, 30 * time.Second)
viper.SetDefault(ConfJobBufferSize, 5000)
viper.SetDefault(ConfCrawlStats, 3 * time.Second) viper.SetDefault(ConfCrawlStats, 3 * time.Second)
viper.SetDefault(ConfAllocStats, 0) viper.SetDefault(ConfAllocStats, 0)
viper.SetDefault(ConfVerbose, false) viper.SetDefault(ConfVerbose, false)
viper.SetDefault(ConfPrintHTTP, false) viper.SetDefault(ConfPrintHTTP, false)
viper.SetDefault(ConfLogFile, "")
viper.SetDefault(ConfRecheck, 3 * time.Second) viper.SetDefault(ConfRecheck, 3 * time.Second)
viper.SetDefault(ConfChunkSize, "1 MB") viper.SetDefault(ConfChunkSize, "1 MB")
} }
@@ -98,7 +109,13 @@ func readConfig() {
configOOB(ConfTasks, int(config.Tasks)) configOOB(ConfTasks, int(config.Tasks))
} }
config.Timeout = viper.GetDuration(ConfTimeout) config.UserAgent = viper.GetString(ConfUserAgent)
setDialTimeout(viper.GetDuration(ConfDialTimeout))
setTimeout(viper.GetDuration(ConfTimeout))
config.JobBufferSize = viper.GetInt(ConfJobBufferSize)
config.CrawlStats = viper.GetDuration(ConfCrawlStats) config.CrawlStats = viper.GetDuration(ConfCrawlStats)
@@ -109,6 +126,17 @@ func readConfig() {
logrus.SetLevel(logrus.DebugLevel) logrus.SetLevel(logrus.DebugLevel)
} }
if filePath := viper.GetString(ConfLogFile); filePath != "" {
f, err := os.OpenFile(filePath, os.O_CREATE | os.O_WRONLY | os.O_APPEND, 0644)
bufWriter := bufio.NewWriter(f)
if err != nil { panic(err) }
exitHooks.Add(func() {
bufWriter.Flush()
f.Close()
})
logrus.SetOutput(io.MultiWriter(os.Stdout, bufWriter))
}
config.PrintHTTP = viper.GetBool(ConfPrintHTTP) config.PrintHTTP = viper.GetBool(ConfPrintHTTP)
} }

View File

@@ -23,13 +23,20 @@ server:
output: output:
# Crawl statistics # Crawl statistics
crawl_stats: 1s crawl_stats: 1s
# CPU/RAM/Job queue stats # CPU/RAM/Job queue stats
resource_stats: 10s resource_stats: 10s
# More output? (Every listed dir) # More output? (Every listed dir)
verbose: false verbose: false
# Print HTTP errors (Super spammy) # Print HTTP errors (Super spammy)
http: false http: false
# Log file
# If empty, no log file is created.
log: crawler.log
# Crawler settings # Crawler settings
crawl: crawl:
# Number of sites that can be processed at once # Number of sites that can be processed at once
@@ -39,11 +46,32 @@ crawl:
# Please be careful with this setting! # Please be careful with this setting!
# The crawler fires fast and more than # The crawler fires fast and more than
# ten connections can overwhelm a server. # ten connections can overwhelm a server.
connections: 10 connections: 4
# How often to retry getting data # How often to retry getting data
# from the site before giving up # from the site before giving up
retries: 5 retries: 5
# Time before discarding a failed connection attempt
dial_timeout: 10s
# Time before discarding a network request # Time before discarding a network request
timeout: 10s timeout: 30s
# Crawler User-Agent
# If empty, no User-Agent header is sent.
user-agent: "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0"
# Job buffer size (per task)
# Higher values cause less disk writes
# but require more memory.
#
# The job queue contains all URLs
# that should be crawled next.
# As it grows very large over time,
# it's kept mainly on disk.
# This sets how many jobs are kept
# in memory.
# A negative value will cause all jobs
# to be stored in memory. (Don't do this)
job_buffer: 5000

View File

@@ -8,6 +8,7 @@ import (
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"golang.org/x/crypto/blake2b" "golang.org/x/crypto/blake2b"
"golang.org/x/net/html" "golang.org/x/net/html"
"net"
"path" "path"
"strconv" "strconv"
"strings" "strings"
@@ -20,17 +21,31 @@ var client = fasthttp.Client {
}, },
} }
func setDialTimeout(d time.Duration) {
client.Dial = func(addr string) (net.Conn, error) {
return fasthttp.DialTimeout(addr, d)
}
}
func setTimeout(d time.Duration) {
client.ReadTimeout = d
client.WriteTimeout = d / 2
}
func GetDir(j *Job, f *File) (links []fasturl.URL, err error) { func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
f.IsDir = true f.IsDir = true
f.Name = path.Base(j.Uri.Path) f.Name = path.Base(j.Uri.Path)
req := fasthttp.AcquireRequest() req := fasthttp.AcquireRequest()
if config.UserAgent != "" {
req.Header.SetUserAgent(config.UserAgent)
}
req.SetRequestURI(j.UriStr) req.SetRequestURI(j.UriStr)
res := fasthttp.AcquireResponse() res := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(res) defer fasthttp.ReleaseResponse(res)
err = client.DoTimeout(req, res, config.Timeout) err = client.Do(req, res)
fasthttp.ReleaseRequest(req) fasthttp.ReleaseRequest(req)
if err != nil { if err != nil {
@@ -47,6 +62,8 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
var linkHref string var linkHref string
for { for {
err = nil
tokenType := doc.Next() tokenType := doc.Next()
if tokenType == html.ErrorToken { if tokenType == html.ErrorToken {
break break
@@ -77,16 +94,16 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
linkHref = "" linkHref = ""
if strings.LastIndexByte(href, '?') != -1 { if strings.LastIndexByte(href, '?') != -1 {
goto nextToken continue
} }
switch href { switch href {
case "", " ", ".", "..", "/": case "", " ", ".", "..", "/":
goto nextToken continue
} }
if strings.Contains(href, "../") { if strings.Contains(href, "../") {
goto nextToken continue
} }
var link fasturl.URL var link fasturl.URL
@@ -105,8 +122,6 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
links = append(links, link) links = append(links, link)
} }
} }
nextToken:
} }
return return
@@ -120,13 +135,16 @@ func GetFile(u fasturl.URL, f *File) (err error) {
req := fasthttp.AcquireRequest() req := fasthttp.AcquireRequest()
req.Header.SetMethod("HEAD") req.Header.SetMethod("HEAD")
if config.UserAgent != "" {
req.Header.SetUserAgent(config.UserAgent)
}
req.SetRequestURI(u.String()) req.SetRequestURI(u.String())
res := fasthttp.AcquireResponse() res := fasthttp.AcquireResponse()
res.SkipBody = true res.SkipBody = true
defer fasthttp.ReleaseResponse(res) defer fasthttp.ReleaseResponse(res)
err = client.DoTimeout(req, res, config.Timeout) err = client.Do(req, res)
fasthttp.ReleaseRequest(req) fasthttp.ReleaseRequest(req)
if err != nil { if err != nil {

45
main.go
View File

@@ -3,25 +3,25 @@ package main
import ( import (
"context" "context"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/terorie/od-database-crawler/fasturl" "github.com/terorie/od-database-crawler/fasturl"
"github.com/urfave/cli" "github.com/urfave/cli"
"log"
"net/http"
_ "net/http/pprof"
"os" "os"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
) )
var configFile string
var app = cli.App { var app = cli.App {
Name: "od-database-crawler", Name: "od-database-crawler",
Usage: "OD-Database Go crawler", Usage: "OD-Database Go crawler",
Version: "1.0", Version: "1.1.0",
BashComplete: cli.DefaultAppComplete, BashComplete: cli.DefaultAppComplete,
Writer: os.Stdout, Writer: os.Stdout,
Action: cmdBase, Action: cmdBase,
Commands: []cli.Command{ Commands: []cli.Command {
{ {
Name: "crawl", Name: "crawl",
Usage: "Crawl a list of URLs", Usage: "Crawl a list of URLs",
@@ -29,28 +29,43 @@ var app = cli.App {
Action: cmdCrawler, Action: cmdCrawler,
}, },
}, },
Flags: []cli.Flag {
cli.StringFlag {
Name: "config",
EnvVar: "CONFIG",
Destination: &configFile,
},
},
Before: func(i *cli.Context) error {
if configFile != "" {
viper.SetConfigFile(configFile)
}
return nil
},
After: func(i *cli.Context) error {
exitHooks.Execute()
return nil
},
} }
var exitHooks Hooks
func init() { func init() {
prepareConfig() prepareConfig()
} }
func main() { func main() {
go func() { if err := os.MkdirAll("crawled", 0755);
log.Println(http.ListenAndServe("localhost:42069", nil)) err != nil { panic(err) }
}()
err := os.MkdirAll("crawled", 0755) if err := os.MkdirAll("queue", 0755);
if err != nil { err != nil { panic(err) }
panic(err)
}
readConfig()
app.Run(os.Args) app.Run(os.Args)
} }
func cmdBase(_ *cli.Context) error { func cmdBase(_ *cli.Context) error {
readConfig()
// TODO Graceful shutdown // TODO Graceful shutdown
appCtx := context.Background() appCtx := context.Background()
forceCtx := context.Background() forceCtx := context.Background()
@@ -107,8 +122,6 @@ func cmdBase(_ *cli.Context) error {
} }
func cmdCrawler(clic *cli.Context) error { func cmdCrawler(clic *cli.Context) error {
readConfig()
if clic.NArg() != 1 { if clic.NArg() != 1 {
cli.ShowCommandHelpAndExit(clic, "crawl", 1) cli.ShowCommandHelpAndExit(clic, "crawl", 1)
} }

View File

@@ -23,7 +23,6 @@ type TaskResult struct {
} }
type Job struct { type Job struct {
OD *OD
Uri fasturl.URL Uri fasturl.URL
UriStr string UriStr string
Fails int Fails int
@@ -57,3 +56,8 @@ func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) {
o.Scanned.Put(k) o.Scanned.Put(k)
return false return false
} }
type errorString string
func (e errorString) Error() string {
return string(e)
}

129
queue.go Normal file
View File

@@ -0,0 +1,129 @@
package main
import (
"github.com/beeker1121/goque"
"os"
"sync"
"sync/atomic"
)
type BufferedQueue struct {
dataDir string
q *goque.Queue
buf []Job
m sync.Mutex
}
func OpenQueue(dataDir string) (bq *BufferedQueue, err error) {
bq = new(BufferedQueue)
if config.JobBufferSize < 0 {
return
}
bq.dataDir = dataDir
bq.q, err = goque.OpenQueue(dataDir)
if err != nil { return nil, err }
return
}
func (q *BufferedQueue) Enqueue(job *Job) error {
atomic.AddInt64(&totalQueued, 1)
if q.directEnqueue(job) {
return nil
}
var gob JobGob
gob.ToGob(job)
_, err := q.q.EnqueueObject(gob)
return err
}
func (q *BufferedQueue) Dequeue() (job Job, err error) {
if q.directDequeue(&job) {
atomic.AddInt64(&totalQueued, -1)
return job, nil
}
if config.JobBufferSize < 0 {
err = goque.ErrEmpty
return
}
var item *goque.Item
item, err = q.q.Dequeue()
if err != nil { return }
atomic.AddInt64(&totalQueued, -1)
var gob JobGob
err = item.ToObject(&gob)
if err != nil { return }
gob.FromGob(&job)
return
}
func (q *BufferedQueue) directEnqueue(job *Job) bool {
q.m.Lock()
defer q.m.Unlock()
bs := config.JobBufferSize
if len(q.buf) < bs || bs < 0 {
q.buf = append(q.buf, *job)
return true
} else {
return false
}
}
func (q *BufferedQueue) directDequeue(job *Job) bool {
q.m.Lock()
defer q.m.Unlock()
if len(q.buf) > 0 {
*job = q.buf[0]
q.buf = q.buf[1:]
return true
} else {
return false
}
}
// Always returns nil (But implements io.Closer)
func (q *BufferedQueue) Close() error {
if config.JobBufferSize < 0 {
return nil
}
// Close ignoring errors
q.q.Close()
// Delete files
if err := os.RemoveAll(q.dataDir);
err != nil { panic(err) }
return nil
}
type JobGob struct {
Uri string
Fails int
LastError string
}
func (g *JobGob) ToGob(j *Job) {
g.Uri = j.UriStr
g.Fails = j.Fails
if j.LastError != nil {
g.LastError = j.LastError.Error()
}
}
func (g *JobGob) FromGob(j *Job) {
if err := j.Uri.Parse(g.Uri);
err != nil { panic(err) }
j.UriStr = g.Uri
j.Fails = g.Fails
if g.LastError != "" {
j.LastError = errorString(g.LastError)
}
}

View File

@@ -18,3 +18,8 @@ name=${appname}-${tag}-mac
GOOS="darwin" GOARCH="amd64" go build -ldflags="-s -w" -o $name GOOS="darwin" GOARCH="amd64" go build -ldflags="-s -w" -o $name
gzip -f $name gzip -f $name
echo $name echo $name
name=${appname}-${tag}-freebsd
GOOS="freebsd" GOARCH="amd64" go build -ldflags="-s -w" -o $name
gzip -f $name
echo $name

View File

@@ -16,7 +16,7 @@ import (
var activeTasksLock sync.Mutex var activeTasksLock sync.Mutex
var activeTasks = make(map[uint64]bool) var activeTasks = make(map[uint64]bool)
var numActiveTasks int32 var numActiveTasks int32
var totalBuffered int64 var totalQueued int64
func Schedule(c context.Context, remotes <-chan *OD) { func Schedule(c context.Context, remotes <-chan *OD) {
go Stats(c) go Stats(c)
@@ -28,8 +28,21 @@ func Schedule(c context.Context, remotes <-chan *OD) {
// Collect results // Collect results
results := make(chan File) results := make(chan File)
remote.WCtx.OD = remote
// Get queue path
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue
if err := os.RemoveAll(queuePath);
err != nil { panic(err) }
// Start new queue
var err error
remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) }
// Spawn workers // Spawn workers
remote.WCtx.in, remote.WCtx.out = makeJobBuffer(c)
for i := 0; i < config.Workers; i++ { for i := 0; i < config.Workers; i++ {
go remote.WCtx.Worker(results) go remote.WCtx.Worker(results)
} }
@@ -37,7 +50,6 @@ func Schedule(c context.Context, remotes <-chan *OD) {
// Enqueue initial job // Enqueue initial job
atomic.AddInt32(&numActiveTasks, 1) atomic.AddInt32(&numActiveTasks, 1)
remote.WCtx.queueJob(Job{ remote.WCtx.queueJob(Job{
OD: remote,
Uri: remote.BaseUri, Uri: remote.BaseUri,
UriStr: remote.BaseUri.String(), UriStr: remote.BaseUri.String(),
Fails: 0, Fails: 0,
@@ -148,7 +160,11 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
// Wait for all jobs on remote to finish // Wait for all jobs on remote to finish
o.Wait.Wait() o.Wait.Wait()
close(o.WCtx.in)
// Close queue
if err := o.WCtx.Queue.Close(); err != nil {
panic(err)
}
atomic.AddInt32(&numActiveTasks, -1) atomic.AddInt32(&numActiveTasks, -1)
// Log finish // Log finish
@@ -198,51 +214,3 @@ func (t *Task) collect(results chan File, f *os.File) error {
return nil return nil
} }
func makeJobBuffer(c context.Context) (chan<- Job, <-chan Job) {
in := make(chan Job)
out := make(chan Job)
go bufferJobs(c, in, out)
return in, out
}
func bufferJobs(c context.Context, in chan Job, out chan Job) {
defer close(out)
var inQueue []Job
outCh := func() chan Job {
if len(inQueue) == 0 {
return nil
}
return out
}
for len(inQueue) > 0 || in != nil {
if len(inQueue) == 0 {
select {
case v, ok := <-in:
if !ok {
in = nil
} else {
atomic.AddInt64(&totalBuffered, 1)
inQueue = append(inQueue, v)
}
case <-c.Done():
return
}
} else {
select {
case v, ok := <-in:
if !ok {
in = nil
} else {
atomic.AddInt64(&totalBuffered, 1)
inQueue = append(inQueue, v)
}
case outCh() <- inQueue[0]:
atomic.AddInt64(&totalBuffered, -1)
inQueue = inQueue[1:]
case <-c.Done():
return
}
}
}
}

View File

@@ -11,6 +11,7 @@ import (
"net/url" "net/url"
"os" "os"
"strconv" "strconv"
"time"
) )
var serverClient = http.Client { var serverClient = http.Client {
@@ -101,25 +102,37 @@ func uploadChunks(websiteId uint64, f *os.File) error {
multi.Close() multi.Close()
req, err := http.NewRequest( for retries := 0; retries < 10; retries++ {
http.MethodPost, if retries > 0 {
config.ServerUrl + "/task/upload", // Error occurred, retry upload
&b) time.Sleep(5 * time.Second)
req.Header.Set("content-type", multi.FormDataContentType()) }
if err != nil { return err }
res, err := serverClient.Do(req) req, err := http.NewRequest(
if err != nil { return err } http.MethodPost,
res.Body.Close() config.ServerUrl + "/task/upload",
&b)
req.Header.Set("content-type", multi.FormDataContentType())
if err != nil { continue }
if res.StatusCode != http.StatusOK { res, err := serverClient.Do(req)
return fmt.Errorf("failed to upload list part %d: %s", if err != nil { continue }
iter, res.Status) res.Body.Close()
if res.StatusCode != http.StatusOK {
logrus.WithField("status", res.Status).
WithField("part", iter).
Errorf("Upload failed")
continue
}
// Upload successful
break
} }
logrus.WithField("id", websiteId). logrus.WithField("id", websiteId).
WithField("part", iter). WithField("part", iter).
Infof("Uploading files chunk") Infof("Uploaded files chunk")
} }
return nil return nil
} }

View File

@@ -57,7 +57,7 @@ func Stats(c context.Context) {
runtime.ReadMemStats(&mem) runtime.ReadMemStats(&mem)
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"queue_count": atomic.LoadInt64(&totalBuffered), "queue_count": atomic.LoadInt64(&totalQueued),
"heap": FormatByteCount(mem.Alloc), "heap": FormatByteCount(mem.Alloc),
"objects": mem.HeapObjects, "objects": mem.HeapObjects,
"num_gc": mem.NumGC, "num_gc": mem.NumGC,

22
util.go
View File

@@ -1,6 +1,9 @@
package main package main
import "fmt" import (
"fmt"
"sync"
)
// https://programming.guide/go/formatting-byte-size-to-human-readable-format.html // https://programming.guide/go/formatting-byte-size-to-human-readable-format.html
func FormatByteCount(b uint64) string { func FormatByteCount(b uint64) string {
@@ -16,3 +19,20 @@ func FormatByteCount(b uint64) string {
return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp]) return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp])
} }
} }
type Hooks struct {
m sync.Mutex
l []func()
}
func (h *Hooks) Add(hook func()) {
h.m.Lock()
h.l = append(h.l, hook)
h.m.Unlock()
}
func (h *Hooks) Execute() {
for _, hook := range h.l {
hook()
}
}

View File

@@ -1,6 +1,7 @@
package main package main
import ( import (
"github.com/beeker1121/goque"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"math" "math"
@@ -14,24 +15,38 @@ import (
var globalWait sync.WaitGroup var globalWait sync.WaitGroup
type WorkerContext struct { type WorkerContext struct {
in chan<- Job OD *OD
out <-chan Job Queue *BufferedQueue
lastRateLimit time.Time lastRateLimit time.Time
numRateLimits int numRateLimits int
} }
func (w WorkerContext) Worker(results chan<- File) { func (w *WorkerContext) Worker(results chan<- File) {
for job := range w.out { for {
w.step(results, job) job, err := w.Queue.Dequeue()
switch err {
case goque.ErrEmpty:
time.Sleep(500 * time.Millisecond)
continue
case goque.ErrDBClosed:
return
case nil:
w.step(results, job)
default:
panic(err)
}
} }
} }
func (w WorkerContext) step(results chan<- File, job Job) { func (w *WorkerContext) step(results chan<- File, job Job) {
defer w.finishJob(&job) defer w.finishJob(&job)
var f File var f File
newJobs, err := DoJob(&job, &f) newJobs, err := w.DoJob(&job, &f)
atomic.AddUint64(&totalStarted, 1) atomic.AddUint64(&totalStarted, 1)
if err == ErrKnown { if err == ErrKnown {
return return
@@ -42,15 +57,11 @@ func (w WorkerContext) step(results chan<- File, job Job) {
if httpErr, ok := err.(*HttpError); ok { if httpErr, ok := err.(*HttpError); ok {
switch httpErr.code { switch httpErr.code {
case
fasthttp.StatusMovedPermanently,
fasthttp.StatusFound,
fasthttp.StatusUnauthorized,
fasthttp.StatusForbidden,
fasthttp.StatusNotFound:
return
case fasthttp.StatusTooManyRequests: case fasthttp.StatusTooManyRequests:
err = ErrRateLimit err = ErrRateLimit
default:
// Don't retry HTTP error codes
return
} }
} }
@@ -79,7 +90,7 @@ func (w WorkerContext) step(results chan<- File, job Job) {
} }
} }
func DoJob(job *Job, f *File) (newJobs []Job, err error) { func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
if len(job.Uri.Path) == 0 { return } if len(job.Uri.Path) == 0 { return }
if job.Uri.Path[len(job.Uri.Path)-1] == '/' { if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
// Load directory // Load directory
@@ -97,7 +108,7 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
hash := f.HashDir(links) hash := f.HashDir(links)
// Skip symlinked dirs // Skip symlinked dirs
if job.OD.LoadOrStoreKey(&hash) { if w.OD.LoadOrStoreKey(&hash) {
return nil, ErrKnown return nil, ErrKnown
} }
@@ -118,7 +129,6 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
lastLink = uriStr lastLink = uriStr
newJobs = append(newJobs, Job{ newJobs = append(newJobs, Job{
OD: job.OD,
Uri: link, Uri: link,
UriStr: uriStr, UriStr: uriStr,
Fails: 0, Fails: 0,
@@ -143,13 +153,13 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
} }
return nil, err return nil, err
} }
atomic.AddUint64(&job.OD.Result.FileCount, 1) atomic.AddUint64(&w.OD.Result.FileCount, 1)
} }
return return
} }
func (w WorkerContext) queueJob(job Job) { func (w *WorkerContext) queueJob(job Job) {
job.OD.Wait.Add(1) w.OD.Wait.Add(1)
if w.numRateLimits > 0 { if w.numRateLimits > 0 {
if time.Since(w.lastRateLimit) > 5 * time.Second { if time.Since(w.lastRateLimit) > 5 * time.Second {
@@ -160,11 +170,13 @@ func (w WorkerContext) queueJob(job Job) {
} }
} }
w.in <- job if err := w.Queue.Enqueue(&job); err != nil {
panic(err)
}
} }
func (w WorkerContext) finishJob(job *Job) { func (w *WorkerContext) finishJob(job *Job) {
job.OD.Wait.Done() w.OD.Wait.Done()
} }
func isErrSilent(err error) bool { func isErrSilent(err error) bool {