Timeout and results saving

This commit is contained in:
Richard Patel 2018-11-15 20:14:31 +01:00
parent a268c6dbcf
commit ffde1a9e5d
No known key found for this signature in database
GPG Key ID: C268B2BBDA2ABECB
7 changed files with 86 additions and 38 deletions

View File

@ -13,6 +13,7 @@ var config struct {
Token string Token string
Retries int Retries int
Workers int Workers int
Timeout time.Duration
Tasks int32 Tasks int32
CrawlStats time.Duration CrawlStats time.Duration
AllocStats time.Duration AllocStats time.Duration
@ -25,6 +26,7 @@ const (
ConfTasks = "crawl.tasks" ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries" ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections" ConfWorkers = "crawl.connections"
ConfTimeout = "crawl.timeout"
ConfCrawlStats = "output.crawl_stats" ConfCrawlStats = "output.crawl_stats"
ConfAllocStats = "output.resource_stats" ConfAllocStats = "output.resource_stats"
ConfVerbose = "output.verbose" ConfVerbose = "output.verbose"
@ -34,6 +36,7 @@ func prepareConfig() {
viper.SetDefault(ConfRetries, 5) viper.SetDefault(ConfRetries, 5)
viper.SetDefault(ConfWorkers, 2) viper.SetDefault(ConfWorkers, 2)
viper.SetDefault(ConfTasks, 3) viper.SetDefault(ConfTasks, 3)
viper.SetDefault(ConfTimeout, 10 * time.Second)
viper.SetDefault(ConfCrawlStats, 3 * time.Second) viper.SetDefault(ConfCrawlStats, 3 * time.Second)
viper.SetDefault(ConfAllocStats, 0) viper.SetDefault(ConfAllocStats, 0)
viper.SetDefault(ConfVerbose, false) viper.SetDefault(ConfVerbose, false)
@ -73,6 +76,8 @@ func readConfig() {
configOOB(ConfTasks, int(config.Tasks)) configOOB(ConfTasks, int(config.Tasks))
} }
config.Timeout = viper.GetDuration(ConfTimeout)
config.CrawlStats = viper.GetDuration(ConfCrawlStats) config.CrawlStats = viper.GetDuration(ConfCrawlStats)
config.AllocStats = viper.GetDuration(ConfAllocStats) config.AllocStats = viper.GetDuration(ConfAllocStats)

View File

@ -24,3 +24,5 @@ crawl:
# How often to retry getting data # How often to retry getting data
# from the site before giving up # from the site before giving up
retries: 5 retries: 5
# Time before discarding a network request
timeout: 10s

View File

@ -3,7 +3,6 @@ package main
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"github.com/sirupsen/logrus"
"github.com/terorie/oddb-go/ds/redblackhash" "github.com/terorie/oddb-go/ds/redblackhash"
"github.com/terorie/oddb-go/fasturl" "github.com/terorie/oddb-go/fasturl"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
@ -28,13 +27,10 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
res := fasthttp.AcquireResponse() res := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(res) defer fasthttp.ReleaseResponse(res)
err = client.Do(req, res) err = client.DoTimeout(req, res, config.Timeout)
fasthttp.ReleaseRequest(req) fasthttp.ReleaseRequest(req)
if err != nil { if err != nil { return }
logrus.Error(err)
return
}
err = checkStatusCode(res.StatusCode()) err = checkStatusCode(res.StatusCode())
if err != nil { return } if err != nil { return }
@ -129,7 +125,7 @@ func GetFile(u fasturl.URL, f *File) (err error) {
res.SkipBody = true res.SkipBody = true
defer fasthttp.ReleaseResponse(res) defer fasthttp.ReleaseResponse(res)
err = client.Do(req, res) err = client.DoTimeout(req, res, config.Timeout)
fasthttp.ReleaseRequest(req) fasthttp.ReleaseRequest(req)
if err != nil { return } if err != nil { return }

View File

@ -61,7 +61,13 @@ func cmdCrawler(clic *cli.Context) error {
u.Path += "/" u.Path += "/"
} }
if err != nil { return err } if err != nil { return err }
remotes[i] = &OD{ BaseUri: u } remotes[i] = &OD {
Task: &Task{
WebsiteId: 0,
Url: u.String(),
},
BaseUri: u,
}
} }
c := context.Background() c := context.Background()

View File

@ -16,9 +16,9 @@ type Job struct {
} }
type OD struct { type OD struct {
Task *Task
Wait sync.WaitGroup Wait sync.WaitGroup
BaseUri fasturl.URL BaseUri fasturl.URL
Files []File
WCtx WorkerContext WCtx WorkerContext
Scanned redblackhash.Tree Scanned redblackhash.Tree

View File

@ -2,7 +2,11 @@ package main
import ( import (
"context" "context"
"encoding/json"
"fmt"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"os"
"path"
"sync/atomic" "sync/atomic"
) )
@ -12,37 +16,36 @@ var totalBuffered int64
func Schedule(c context.Context, remotes <-chan *OD) { func Schedule(c context.Context, remotes <-chan *OD) {
go Stats(c) go Stats(c)
for { for remote := range remotes {
select { logrus.WithField("url", remote.BaseUri.String()).
case <-c.Done(): Info("Starting crawler")
return
case remote := <-remotes: // Collect results
logrus.WithField("url", remote.BaseUri.String()). results := make(chan File)
Info("Starting crawler")
// Spawn workers // Spawn workers
remote.WCtx.in, remote.WCtx.out = makeJobBuffer(c) remote.WCtx.in, remote.WCtx.out = makeJobBuffer(c)
for i := 0; i < config.Workers; i++ { for i := 0; i < config.Workers; i++ {
go remote.WCtx.Worker() go remote.WCtx.Worker(results)
}
// Enqueue initial job
atomic.AddInt32(&activeTasks, 1)
remote.WCtx.queueJob(Job{
OD: remote,
Uri: remote.BaseUri,
UriStr: remote.BaseUri.String(),
Fails: 0,
})
// Upload result when ready
go remote.Watch()
} }
// Enqueue initial job
atomic.AddInt32(&activeTasks, 1)
remote.WCtx.queueJob(Job{
OD: remote,
Uri: remote.BaseUri,
UriStr: remote.BaseUri.String(),
Fails: 0,
})
// Upload result when ready
go remote.Watch(results)
} }
} }
func (r *OD) Watch() { func (r *OD) Watch(results chan File) {
go r.Task.Collect(results)
// Wait for all jobs on remote to finish // Wait for all jobs on remote to finish
r.Wait.Wait() r.Wait.Wait()
close(r.WCtx.in) close(r.WCtx.in)
@ -52,6 +55,40 @@ func (r *OD) Watch() {
Info("Crawler finished") Info("Crawler finished")
globalWait.Done() globalWait.Done()
close(results)
}
func (t *Task) Collect(results chan File) {
err := t.collect(results)
if err != nil {
logrus.WithError(err).
Error("Failed saving crawl results")
}
}
func (t *Task) collect(results chan File) error {
err := os.MkdirAll("crawled", 0755)
if err != nil { return err }
f, err := os.OpenFile(
path.Join("crawled", fmt.Sprintf("%d.json", t.WebsiteId)),
os.O_CREATE | os.O_WRONLY | os.O_TRUNC,
0755,
)
if err != nil { return err }
defer f.Close()
for result := range results {
resJson, err := json.Marshal(result)
if err != nil { panic(err) }
_, err = f.Write(resJson)
if err != nil { return err }
_, err = f.Write([]byte{'\n'})
if err != nil { return err }
}
return nil
} }
func makeJobBuffer(c context.Context) (chan<- Job, <-chan Job) { func makeJobBuffer(c context.Context) (chan<- Job, <-chan Job) {

View File

@ -19,13 +19,13 @@ type WorkerContext struct {
numRateLimits int numRateLimits int
} }
func (w WorkerContext) Worker() { func (w WorkerContext) Worker(results chan<- File) {
for job := range w.out { for job := range w.out {
w.step(job) w.step(results, job)
} }
} }
func (w WorkerContext) step(job Job) { func (w WorkerContext) step(results chan<- File, job Job) {
defer w.finishJob(&job) defer w.finishJob(&job)
var f File var f File
@ -64,7 +64,9 @@ func (w WorkerContext) step(job Job) {
w.queueJob(job) w.queueJob(job)
} }
job.OD.Files = append(job.OD.Files, f) if !f.IsDir {
results <- f
}
} }
func DoJob(job *Job, f *File) (newJobs []Job, err error) { func DoJob(job *Job, f *File) (newJobs []Job, err error) {