16 Commits

Author SHA1 Message Date
Richard Patel
f90bf94a44 Bump version to v1.1.1 2018-11-27 22:11:57 +01:00
Richard Patel
e82768ff80 Wait time control in config 2018-11-27 22:11:57 +01:00
Richard Patel
b1bf59adef Add The Eye DB to README.md 2018-11-27 17:40:12 +01:00
Richard Patel
a2df2972f4 Bump the upload retry interval up to 30s 2018-11-20 04:13:20 +01:00
Richard Patel
3fc8837dd7 Add output files to .gitignore 2018-11-20 03:51:42 +01:00
Richard Patel
f9a0d6bffe Bump to v1.1.0 2018-11-20 03:46:36 +01:00
Richard Patel
4dbe2aef2b Add job buffer size parameter 2018-11-20 03:42:32 +01:00
Richard Patel
86ec78cae1 Add TCP timeout option 2018-11-20 03:29:10 +01:00
Richard Patel
b846498030 Delete URL queues after crawling 2018-11-20 03:05:43 +01:00
Richard Patel
4f3140a39f Fix queue_count in log 2018-11-20 02:49:03 +01:00
Richard Patel
85d2aac9d4 Performance patch 2018-11-20 02:33:50 +01:00
Richard Patel
b6c0a45900 Job queue disk offloading 2018-11-20 02:03:10 +01:00
Richard Patel
d332f06659 Limit retries to 10 2018-11-18 21:05:26 +01:00
Richard Patel
1625d6c888 Bump to v1.0.2 2018-11-18 18:53:57 +01:00
Richard Patel
03a487f393 Fix crawl loop 2018-11-18 18:45:06 +01:00
Richard Patel
ac8221b109 Retry /task/upload 2018-11-18 18:33:26 +01:00
12 changed files with 309 additions and 104 deletions

5
.gitignore vendored
View File

@@ -1,3 +1,6 @@
/.idea/
.DS_Store
/od-database-crawler
/od-database-crawler
*.log
/queue/
/crawled/

View File

@@ -5,3 +5,5 @@
* Crawls HTTP open directories (standard Web Server Listings)
* Gets name, path, size and modification time of all files
* Lightweight and fast: __over 9000 requests per second__ on a standard laptop
https://od-db.the-eye.eu/

View File

@@ -20,12 +20,12 @@ var config struct {
Retries int
Workers int
UserAgent string
Timeout time.Duration
Tasks int32
CrawlStats time.Duration
AllocStats time.Duration
Verbose bool
PrintHTTP bool
JobBufferSize int
}
const (
@@ -33,12 +33,19 @@ const (
ConfToken = "server.token"
ConfServerTimeout = "server.timeout"
ConfRecheck = "server.recheck"
ConfCooldown = "server.cooldown"
ConfChunkSize = "server.upload_chunk"
ConfUploadRetries = "server.upload_retries"
ConfUploadRetryInterval = "server.upload_retry_interval"
ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections"
ConfUserAgent = "crawl.user-agent"
ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout"
ConfJobBufferSize = "crawl.job_buffer"
ConfCrawlStats = "output.crawl_stats"
ConfAllocStats = "output.resource_stats"
ConfVerbose = "output.verbose"
@@ -51,14 +58,19 @@ func prepareConfig() {
viper.SetDefault(ConfWorkers, 2)
viper.SetDefault(ConfTasks, 3)
viper.SetDefault(ConfUserAgent, "")
viper.SetDefault(ConfTimeout, 10 * time.Second)
viper.SetDefault(ConfDialTimeout, 10 * time.Second)
viper.SetDefault(ConfTimeout, 60 * time.Second)
viper.SetDefault(ConfJobBufferSize, 5000)
viper.SetDefault(ConfCrawlStats, 3 * time.Second)
viper.SetDefault(ConfAllocStats, 0)
viper.SetDefault(ConfVerbose, false)
viper.SetDefault(ConfPrintHTTP, false)
viper.SetDefault(ConfLogFile, "")
viper.SetDefault(ConfRecheck, 3 * time.Second)
viper.SetDefault(ConfCooldown, 30 * time.Second)
viper.SetDefault(ConfChunkSize, "1 MB")
viper.SetDefault(ConfUploadRetries, 10)
viper.SetDefault(ConfUploadRetryInterval, 30 * time.Second)
}
func readConfig() {
@@ -107,7 +119,11 @@ func readConfig() {
config.UserAgent = viper.GetString(ConfUserAgent)
config.Timeout = viper.GetDuration(ConfTimeout)
setDialTimeout(viper.GetDuration(ConfDialTimeout))
setTimeout(viper.GetDuration(ConfTimeout))
config.JobBufferSize = viper.GetInt(ConfJobBufferSize)
config.CrawlStats = viper.GetDuration(ConfCrawlStats)

View File

@@ -15,10 +15,17 @@ server:
# between /task/get requests to the server.
recheck: 1s
# Time to wait after receiving an error
# from the server. Doesn't apply to uploads.
cooldown: 30s
# Upload chunk size
# If the value is too high, the upload fails.
upload_chunk: 1 MB
upload_retries: 10
upload_retry_interval: 30s
# Log output settings
output:
# Crawl statistics
@@ -46,15 +53,32 @@ crawl:
# Please be careful with this setting!
# The crawler fires fast and more than
# ten connections can overwhelm a server.
connections: 10
connections: 4
# How often to retry getting data
# from the site before giving up
retries: 5
# Time before discarding a failed connection attempt
dial_timeout: 10s
# Time before discarding a network request
timeout: 10s
timeout: 30s
# Crawler User-Agent
# If empty, no User-Agent header is sent.
user-agent: "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0"
# Job buffer size (per task)
# Higher values cause less disk writes
# but require more memory.
#
# The job queue contains all URLs
# that should be crawled next.
# As it grows very large over time,
# it's kept mainly on disk.
# This sets how many jobs are kept
# in memory.
# A negative value will cause all jobs
# to be stored in memory. (Don't do this)
job_buffer: 5000

View File

@@ -8,6 +8,7 @@ import (
"github.com/valyala/fasthttp"
"golang.org/x/crypto/blake2b"
"golang.org/x/net/html"
"net"
"path"
"strconv"
"strings"
@@ -20,6 +21,17 @@ var client = fasthttp.Client {
},
}
func setDialTimeout(d time.Duration) {
client.Dial = func(addr string) (net.Conn, error) {
return fasthttp.DialTimeout(addr, d)
}
}
func setTimeout(d time.Duration) {
client.ReadTimeout = d
client.WriteTimeout = d / 2
}
func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
f.IsDir = true
f.Name = path.Base(j.Uri.Path)
@@ -33,7 +45,7 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
res := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(res)
err = client.DoTimeout(req, res, config.Timeout)
err = client.Do(req, res)
fasthttp.ReleaseRequest(req)
if err != nil {
@@ -50,6 +62,8 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
var linkHref string
for {
err = nil
tokenType := doc.Next()
if tokenType == html.ErrorToken {
break
@@ -80,16 +94,16 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
linkHref = ""
if strings.LastIndexByte(href, '?') != -1 {
goto nextToken
continue
}
switch href {
case "", " ", ".", "..", "/":
goto nextToken
continue
}
if strings.Contains(href, "../") {
goto nextToken
continue
}
var link fasturl.URL
@@ -108,8 +122,6 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
links = append(links, link)
}
}
nextToken:
}
return
@@ -132,7 +144,7 @@ func GetFile(u fasturl.URL, f *File) (err error) {
res.SkipBody = true
defer fasthttp.ReleaseResponse(res)
err = client.DoTimeout(req, res, config.Timeout)
err = client.Do(req, res)
fasthttp.ReleaseRequest(req)
if err != nil {

33
main.go
View File

@@ -3,6 +3,7 @@ package main
import (
"context"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/terorie/od-database-crawler/fasturl"
"github.com/urfave/cli"
"os"
@@ -11,14 +12,16 @@ import (
"time"
)
var configFile string
var app = cli.App {
Name: "od-database-crawler",
Usage: "OD-Database Go crawler",
Version: "1.0.1",
Version: "1.1.1",
BashComplete: cli.DefaultAppComplete,
Writer: os.Stdout,
Action: cmdBase,
Commands: []cli.Command{
Commands: []cli.Command {
{
Name: "crawl",
Usage: "Crawl a list of URLs",
@@ -26,6 +29,19 @@ var app = cli.App {
Action: cmdCrawler,
},
},
Flags: []cli.Flag {
cli.StringFlag {
Name: "config",
EnvVar: "CONFIG",
Destination: &configFile,
},
},
Before: func(i *cli.Context) error {
if configFile != "" {
viper.SetConfigFile(configFile)
}
return nil
},
After: func(i *cli.Context) error {
exitHooks.Execute()
return nil
@@ -39,10 +55,11 @@ func init() {
}
func main() {
err := os.MkdirAll("crawled", 0755)
if err != nil {
panic(err)
}
if err := os.MkdirAll("crawled", 0755);
err != nil { panic(err) }
if err := os.MkdirAll("queue", 0755);
err != nil { panic(err) }
readConfig()
app.Run(os.Args)
@@ -67,7 +84,7 @@ func cmdBase(_ *cli.Context) error {
if err != nil {
logrus.WithError(err).
Error("Failed to get new task")
time.Sleep(30 * time.Second)
time.Sleep(viper.GetDuration(ConfCooldown))
continue
}
if t == nil {
@@ -94,7 +111,7 @@ func cmdBase(_ *cli.Context) error {
} else if err != nil {
logrus.WithError(err).
Error("Failed to get new task")
time.Sleep(30 * time.Second)
time.Sleep(viper.GetDuration(ConfCooldown))
continue
}
ScheduleTask(inRemotes, t, &baseUri)

View File

@@ -23,7 +23,6 @@ type TaskResult struct {
}
type Job struct {
OD *OD
Uri fasturl.URL
UriStr string
Fails int
@@ -57,3 +56,8 @@ func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) {
o.Scanned.Put(k)
return false
}
type errorString string
func (e errorString) Error() string {
return string(e)
}

129
queue.go Normal file
View File

@@ -0,0 +1,129 @@
package main
import (
"github.com/beeker1121/goque"
"os"
"sync"
"sync/atomic"
)
type BufferedQueue struct {
dataDir string
q *goque.Queue
buf []Job
m sync.Mutex
}
func OpenQueue(dataDir string) (bq *BufferedQueue, err error) {
bq = new(BufferedQueue)
if config.JobBufferSize < 0 {
return
}
bq.dataDir = dataDir
bq.q, err = goque.OpenQueue(dataDir)
if err != nil { return nil, err }
return
}
func (q *BufferedQueue) Enqueue(job *Job) error {
atomic.AddInt64(&totalQueued, 1)
if q.directEnqueue(job) {
return nil
}
var gob JobGob
gob.ToGob(job)
_, err := q.q.EnqueueObject(gob)
return err
}
func (q *BufferedQueue) Dequeue() (job Job, err error) {
if q.directDequeue(&job) {
atomic.AddInt64(&totalQueued, -1)
return job, nil
}
if config.JobBufferSize < 0 {
err = goque.ErrEmpty
return
}
var item *goque.Item
item, err = q.q.Dequeue()
if err != nil { return }
atomic.AddInt64(&totalQueued, -1)
var gob JobGob
err = item.ToObject(&gob)
if err != nil { return }
gob.FromGob(&job)
return
}
func (q *BufferedQueue) directEnqueue(job *Job) bool {
q.m.Lock()
defer q.m.Unlock()
bs := config.JobBufferSize
if len(q.buf) < bs || bs < 0 {
q.buf = append(q.buf, *job)
return true
} else {
return false
}
}
func (q *BufferedQueue) directDequeue(job *Job) bool {
q.m.Lock()
defer q.m.Unlock()
if len(q.buf) > 0 {
*job = q.buf[0]
q.buf = q.buf[1:]
return true
} else {
return false
}
}
// Always returns nil (But implements io.Closer)
func (q *BufferedQueue) Close() error {
if config.JobBufferSize < 0 {
return nil
}
// Close ignoring errors
q.q.Close()
// Delete files
if err := os.RemoveAll(q.dataDir);
err != nil { panic(err) }
return nil
}
type JobGob struct {
Uri string
Fails int
LastError string
}
func (g *JobGob) ToGob(j *Job) {
g.Uri = j.UriStr
g.Fails = j.Fails
if j.LastError != nil {
g.LastError = j.LastError.Error()
}
}
func (g *JobGob) FromGob(j *Job) {
if err := j.Uri.Parse(g.Uri);
err != nil { panic(err) }
j.UriStr = g.Uri
j.Fails = g.Fails
if g.LastError != "" {
j.LastError = errorString(g.LastError)
}
}

View File

@@ -16,7 +16,7 @@ import (
var activeTasksLock sync.Mutex
var activeTasks = make(map[uint64]bool)
var numActiveTasks int32
var totalBuffered int64
var totalQueued int64
func Schedule(c context.Context, remotes <-chan *OD) {
go Stats(c)
@@ -28,8 +28,21 @@ func Schedule(c context.Context, remotes <-chan *OD) {
// Collect results
results := make(chan File)
remote.WCtx.OD = remote
// Get queue path
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue
if err := os.RemoveAll(queuePath);
err != nil { panic(err) }
// Start new queue
var err error
remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) }
// Spawn workers
remote.WCtx.in, remote.WCtx.out = makeJobBuffer(c)
for i := 0; i < config.Workers; i++ {
go remote.WCtx.Worker(results)
}
@@ -37,7 +50,6 @@ func Schedule(c context.Context, remotes <-chan *OD) {
// Enqueue initial job
atomic.AddInt32(&numActiveTasks, 1)
remote.WCtx.queueJob(Job{
OD: remote,
Uri: remote.BaseUri,
UriStr: remote.BaseUri.String(),
Fails: 0,
@@ -148,7 +160,11 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
// Wait for all jobs on remote to finish
o.Wait.Wait()
close(o.WCtx.in)
// Close queue
if err := o.WCtx.Queue.Close(); err != nil {
panic(err)
}
atomic.AddInt32(&numActiveTasks, -1)
// Log finish
@@ -198,51 +214,3 @@ func (t *Task) collect(results chan File, f *os.File) error {
return nil
}
func makeJobBuffer(c context.Context) (chan<- Job, <-chan Job) {
in := make(chan Job)
out := make(chan Job)
go bufferJobs(c, in, out)
return in, out
}
func bufferJobs(c context.Context, in chan Job, out chan Job) {
defer close(out)
var inQueue []Job
outCh := func() chan Job {
if len(inQueue) == 0 {
return nil
}
return out
}
for len(inQueue) > 0 || in != nil {
if len(inQueue) == 0 {
select {
case v, ok := <-in:
if !ok {
in = nil
} else {
atomic.AddInt64(&totalBuffered, 1)
inQueue = append(inQueue, v)
}
case <-c.Done():
return
}
} else {
select {
case v, ok := <-in:
if !ok {
in = nil
} else {
atomic.AddInt64(&totalBuffered, 1)
inQueue = append(inQueue, v)
}
case outCh() <- inQueue[0]:
atomic.AddInt64(&totalBuffered, -1)
inQueue = inQueue[1:]
case <-c.Done():
return
}
}
}
}

View File

@@ -5,12 +5,14 @@ import (
"encoding/json"
"fmt"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"io"
"mime/multipart"
"net/http"
"net/url"
"os"
"strconv"
"time"
)
var serverClient = http.Client {
@@ -101,25 +103,37 @@ func uploadChunks(websiteId uint64, f *os.File) error {
multi.Close()
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/upload",
&b)
req.Header.Set("content-type", multi.FormDataContentType())
if err != nil { return err }
for retries := 0; retries < viper.GetInt(ConfUploadRetries); retries++ {
if retries > 0 {
// Error occurred, retry upload
time.Sleep(viper.GetDuration(ConfUploadRetryInterval))
}
res, err := serverClient.Do(req)
if err != nil { return err }
res.Body.Close()
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/upload",
&b)
req.Header.Set("content-type", multi.FormDataContentType())
if err != nil { continue }
if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to upload list part %d: %s",
iter, res.Status)
res, err := serverClient.Do(req)
if err != nil { continue }
res.Body.Close()
if res.StatusCode != http.StatusOK {
logrus.WithField("status", res.Status).
WithField("part", iter).
Errorf("Upload failed")
continue
}
// Upload successful
break
}
logrus.WithField("id", websiteId).
WithField("part", iter).
Infof("Uploading files chunk")
Infof("Uploaded files chunk")
}
return nil
}

View File

@@ -57,7 +57,7 @@ func Stats(c context.Context) {
runtime.ReadMemStats(&mem)
logrus.WithFields(logrus.Fields{
"queue_count": atomic.LoadInt64(&totalBuffered),
"queue_count": atomic.LoadInt64(&totalQueued),
"heap": FormatByteCount(mem.Alloc),
"objects": mem.HeapObjects,
"num_gc": mem.NumGC,

View File

@@ -1,6 +1,7 @@
package main
import (
"github.com/beeker1121/goque"
"github.com/sirupsen/logrus"
"github.com/valyala/fasthttp"
"math"
@@ -14,24 +15,38 @@ import (
var globalWait sync.WaitGroup
type WorkerContext struct {
in chan<- Job
out <-chan Job
OD *OD
Queue *BufferedQueue
lastRateLimit time.Time
numRateLimits int
}
func (w WorkerContext) Worker(results chan<- File) {
for job := range w.out {
w.step(results, job)
func (w *WorkerContext) Worker(results chan<- File) {
for {
job, err := w.Queue.Dequeue()
switch err {
case goque.ErrEmpty:
time.Sleep(500 * time.Millisecond)
continue
case goque.ErrDBClosed:
return
case nil:
w.step(results, job)
default:
panic(err)
}
}
}
func (w WorkerContext) step(results chan<- File, job Job) {
func (w *WorkerContext) step(results chan<- File, job Job) {
defer w.finishJob(&job)
var f File
newJobs, err := DoJob(&job, &f)
newJobs, err := w.DoJob(&job, &f)
atomic.AddUint64(&totalStarted, 1)
if err == ErrKnown {
return
@@ -75,7 +90,7 @@ func (w WorkerContext) step(results chan<- File, job Job) {
}
}
func DoJob(job *Job, f *File) (newJobs []Job, err error) {
func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
if len(job.Uri.Path) == 0 { return }
if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
// Load directory
@@ -93,7 +108,7 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
hash := f.HashDir(links)
// Skip symlinked dirs
if job.OD.LoadOrStoreKey(&hash) {
if w.OD.LoadOrStoreKey(&hash) {
return nil, ErrKnown
}
@@ -114,7 +129,6 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
lastLink = uriStr
newJobs = append(newJobs, Job{
OD: job.OD,
Uri: link,
UriStr: uriStr,
Fails: 0,
@@ -139,13 +153,13 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
}
return nil, err
}
atomic.AddUint64(&job.OD.Result.FileCount, 1)
atomic.AddUint64(&w.OD.Result.FileCount, 1)
}
return
}
func (w WorkerContext) queueJob(job Job) {
job.OD.Wait.Add(1)
func (w *WorkerContext) queueJob(job Job) {
w.OD.Wait.Add(1)
if w.numRateLimits > 0 {
if time.Since(w.lastRateLimit) > 5 * time.Second {
@@ -156,11 +170,13 @@ func (w WorkerContext) queueJob(job Job) {
}
}
w.in <- job
if err := w.Queue.Enqueue(&job); err != nil {
panic(err)
}
}
func (w WorkerContext) finishJob(job *Job) {
job.OD.Wait.Done()
func (w *WorkerContext) finishJob(job *Job) {
w.OD.Wait.Done()
}
func isErrSilent(err error) bool {