mirror of
https://github.com/terorie/od-database-crawler.git
synced 2025-12-13 23:29:02 +00:00
Compare commits
18 Commits
rip
...
fasthttpur
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8f6f8fd17f | ||
|
|
3c39f0d621 | ||
|
|
50952791c5 | ||
|
|
30bf98ad34 | ||
|
|
ccaf758e90 | ||
|
|
f668365edb | ||
|
|
1db8ff43bb | ||
|
|
82234f949e | ||
|
|
084b3a5903 | ||
|
|
ac0b8d2d0b | ||
|
|
ffde1a9e5d | ||
|
|
a268c6dbcf | ||
|
|
4c071171eb | ||
|
|
9c8174dd8d | ||
|
|
93272e1da1 | ||
|
|
0344a120ff | ||
|
|
6e6afd771e | ||
|
|
a8c27b2d21 |
BIN
.github/stress.png
vendored
Normal file
BIN
.github/stress.png
vendored
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 369 KiB |
12
README.md
12
README.md
@@ -1,2 +1,12 @@
|
||||
# oddb Go crawler
|
||||
# oddb Go crawler 🚀
|
||||
> by terorie 2018 :P
|
||||
|
||||
* Crawls HTTP open directories (standard Web Server Listings)
|
||||
* Gets name, path, size and modification time of all files
|
||||
* Soon: Will work as a crawler for [OD-Database](https://github.com/simon987/od-database)!
|
||||
|
||||
Stress test crawling [pandoradir](https://github.com/terorie/pandoradir)
|
||||
on an average laptop (~10K requests per second, 4 connections):
|
||||

|
||||
|
||||
Memory usage is being optimized :P
|
||||
|
||||
@@ -13,6 +13,7 @@ var config struct {
|
||||
Token string
|
||||
Retries int
|
||||
Workers int
|
||||
Timeout time.Duration
|
||||
Tasks int32
|
||||
CrawlStats time.Duration
|
||||
AllocStats time.Duration
|
||||
@@ -25,6 +26,7 @@ const (
|
||||
ConfTasks = "crawl.tasks"
|
||||
ConfRetries = "crawl.retries"
|
||||
ConfWorkers = "crawl.connections"
|
||||
ConfTimeout = "crawl.timeout"
|
||||
ConfCrawlStats = "output.crawl_stats"
|
||||
ConfAllocStats = "output.resource_stats"
|
||||
ConfVerbose = "output.verbose"
|
||||
@@ -34,6 +36,7 @@ func prepareConfig() {
|
||||
viper.SetDefault(ConfRetries, 5)
|
||||
viper.SetDefault(ConfWorkers, 2)
|
||||
viper.SetDefault(ConfTasks, 3)
|
||||
viper.SetDefault(ConfTimeout, 10 * time.Second)
|
||||
viper.SetDefault(ConfCrawlStats, 3 * time.Second)
|
||||
viper.SetDefault(ConfAllocStats, 0)
|
||||
viper.SetDefault(ConfVerbose, false)
|
||||
@@ -73,6 +76,8 @@ func readConfig() {
|
||||
configOOB(ConfTasks, int(config.Tasks))
|
||||
}
|
||||
|
||||
config.Timeout = viper.GetDuration(ConfTimeout)
|
||||
|
||||
config.CrawlStats = viper.GetDuration(ConfCrawlStats)
|
||||
|
||||
config.AllocStats = viper.GetDuration(ConfAllocStats)
|
||||
|
||||
@@ -24,3 +24,5 @@ crawl:
|
||||
# How often to retry getting data
|
||||
# from the site before giving up
|
||||
retries: 5
|
||||
# Time before discarding a network request
|
||||
timeout: 10s
|
||||
|
||||
208
crawl.go
208
crawl.go
@@ -3,13 +3,10 @@ package main
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/terorie/oddb-go/ds/redblackhash"
|
||||
"github.com/terorie/oddb-go/fasturl"
|
||||
"github.com/valyala/fasthttp"
|
||||
"golang.org/x/crypto/blake2b"
|
||||
"golang.org/x/net/html"
|
||||
"golang.org/x/net/html/atom"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -18,9 +15,9 @@ import (
|
||||
|
||||
var client fasthttp.Client
|
||||
|
||||
func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
|
||||
func GetDir(j *Job, f *File) (links []fasthttp.URI, err error) {
|
||||
f.IsDir = true
|
||||
f.Name = path.Base(j.Uri.Path)
|
||||
f.Name = path.Base(string(j.Uri.Path()))
|
||||
|
||||
req := fasthttp.AcquireRequest()
|
||||
req.SetRequestURI(j.UriStr)
|
||||
@@ -28,13 +25,10 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
|
||||
res := fasthttp.AcquireResponse()
|
||||
defer fasthttp.ReleaseResponse(res)
|
||||
|
||||
err = client.Do(req, res)
|
||||
err = client.DoTimeout(req, res, config.Timeout)
|
||||
fasthttp.ReleaseRequest(req)
|
||||
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
return
|
||||
}
|
||||
if err != nil { return }
|
||||
|
||||
err = checkStatusCode(res.StatusCode())
|
||||
if err != nil { return }
|
||||
@@ -43,65 +37,58 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
|
||||
doc := html.NewTokenizer(bytes.NewReader(body))
|
||||
|
||||
var linkHref string
|
||||
var linkTexts []string
|
||||
for {
|
||||
tokenType := doc.Next()
|
||||
token := doc.Token()
|
||||
if tokenType == html.ErrorToken {
|
||||
break
|
||||
}
|
||||
|
||||
switch tokenType {
|
||||
case html.StartTagToken:
|
||||
if token.DataAtom == atom.A {
|
||||
for _, attr := range token.Attr {
|
||||
if attr.Key == "href" {
|
||||
linkHref = attr.Val
|
||||
name, hasAttr := doc.TagName()
|
||||
if len(name) == 1 && name[0] == 'a' {
|
||||
for hasAttr {
|
||||
var ks, vs []byte
|
||||
ks, vs, hasAttr = doc.TagAttr()
|
||||
if bytes.Equal(ks, []byte("href")) {
|
||||
// TODO Check escape
|
||||
linkHref = string(vs)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case html.TextToken:
|
||||
if linkHref != "" {
|
||||
linkTexts = append(linkTexts, token.Data)
|
||||
}
|
||||
|
||||
case html.EndTagToken:
|
||||
if linkHref != "" && token.DataAtom == atom.A {
|
||||
name, _ := doc.TagName()
|
||||
if len(name) == 1 && name[0] == 'a' {
|
||||
// Copy params
|
||||
href := linkHref
|
||||
linkText := strings.Join(linkTexts, " ")
|
||||
|
||||
// Reset params
|
||||
linkHref = ""
|
||||
linkTexts = nil
|
||||
|
||||
// TODO Optimized decision tree
|
||||
for _, entry := range urlBlackList {
|
||||
if href == entry {
|
||||
goto nextToken
|
||||
}
|
||||
}
|
||||
for _, entry := range urlPartBlackList {
|
||||
if strings.Contains(href, entry) {
|
||||
goto nextToken
|
||||
}
|
||||
}
|
||||
for _, entry := range fileNameBlackList {
|
||||
if strings.Contains(linkText, entry) {
|
||||
goto nextToken
|
||||
}
|
||||
if strings.LastIndexByte(href, '?') != -1 {
|
||||
goto nextToken
|
||||
}
|
||||
|
||||
var link fasturl.URL
|
||||
err = j.Uri.ParseRel(&link, href)
|
||||
switch href {
|
||||
case "", " ", ".", "..", "/":
|
||||
goto nextToken
|
||||
}
|
||||
|
||||
if strings.Contains(href, "../") {
|
||||
goto nextToken
|
||||
}
|
||||
|
||||
var link fasthttp.URI
|
||||
j.Uri.CopyTo(&link)
|
||||
link.Update(href)
|
||||
if err != nil { continue }
|
||||
|
||||
if link.Scheme != j.Uri.Scheme ||
|
||||
link.Host != j.Uri.Host ||
|
||||
link.Path == j.Uri.Path ||
|
||||
!strings.HasPrefix(link.Path, j.Uri.Path) {
|
||||
if !bytes.Equal(link.Scheme(), j.Uri.Scheme()) ||
|
||||
!bytes.Equal(link.Host(), j.Uri.Host()) ||
|
||||
bytes.Equal(link.Path(), j.Uri.Path()) ||
|
||||
!bytes.HasPrefix(link.Path(), j.Uri.Path()) {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -115,11 +102,12 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func GetFile(u fasturl.URL, f *File) (err error) {
|
||||
func GetFile(u fasthttp.URI, f *File) (err error) {
|
||||
f.IsDir = false
|
||||
u.Path = path.Clean(u.Path)
|
||||
f.Name = path.Base(u.Path)
|
||||
f.Path = strings.Trim(u.Path, "/")
|
||||
cleanPath := path.Clean(string(u.Path()))
|
||||
u.SetPath(cleanPath)
|
||||
f.Name = path.Base(cleanPath)
|
||||
f.Path = strings.Trim(cleanPath, "/")
|
||||
|
||||
req := fasthttp.AcquireRequest()
|
||||
req.Header.SetMethod("HEAD")
|
||||
@@ -129,7 +117,7 @@ func GetFile(u fasturl.URL, f *File) (err error) {
|
||||
res.SkipBody = true
|
||||
defer fasthttp.ReleaseResponse(res)
|
||||
|
||||
err = client.Do(req, res)
|
||||
err = client.DoTimeout(req, res, config.Timeout)
|
||||
fasthttp.ReleaseRequest(req)
|
||||
|
||||
if err != nil { return }
|
||||
@@ -137,83 +125,41 @@ func GetFile(u fasturl.URL, f *File) (err error) {
|
||||
err = checkStatusCode(res.StatusCode())
|
||||
if err != nil { return }
|
||||
|
||||
// TODO Inefficient af
|
||||
header := res.Header.Header()
|
||||
f.ParseHeader(header)
|
||||
f.applyContentLength(string(res.Header.Peek("content-length")))
|
||||
f.applyLastModified(string(res.Header.Peek("last-modified")))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *File) HashDir(links []fasturl.URL) (o redblackhash.Key) {
|
||||
func (f *File) HashDir(links []fasthttp.URI) (o redblackhash.Key) {
|
||||
h, _ := blake2b.New256(nil)
|
||||
h.Write([]byte(f.Name))
|
||||
for _, link := range links {
|
||||
fileName := path.Base(link.Path)
|
||||
h.Write([]byte(fileName))
|
||||
h.Write(link.Path())
|
||||
}
|
||||
sum := h.Sum(nil)
|
||||
copy(o[:redblackhash.KeySize], sum)
|
||||
return
|
||||
}
|
||||
|
||||
func (f *File) ParseHeader(h []byte) {
|
||||
var k1, k2 int
|
||||
var v1, v2 int
|
||||
|
||||
// Simple finite state machine
|
||||
state := 0
|
||||
for i, b := range h {
|
||||
switch state {
|
||||
case 0:
|
||||
if b == byte(':') {
|
||||
state = 1
|
||||
k2 = i
|
||||
}
|
||||
|
||||
case 1:
|
||||
state = 2
|
||||
|
||||
case 2:
|
||||
state = 3
|
||||
v1 = i
|
||||
|
||||
case 3:
|
||||
if b == byte('\r') {
|
||||
state = 4
|
||||
}
|
||||
|
||||
case 4:
|
||||
state = 0
|
||||
v2 = i - 1
|
||||
|
||||
key := string(h[k1:k2])
|
||||
val := string(h[v1:v2])
|
||||
k1 = i
|
||||
|
||||
f.applyHeader(key, val)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *File) applyContentLength(v string) {
|
||||
if v == "" { return }
|
||||
size, err := strconv.ParseInt(v, 10, 64)
|
||||
if err != nil { return }
|
||||
if size < 0 { return }
|
||||
f.Size = size
|
||||
}
|
||||
|
||||
func (f *File) applyHeader(k, v string) {
|
||||
switch k {
|
||||
case "content-length":
|
||||
size, err := strconv.ParseInt(v, 10, 64)
|
||||
if err != nil { break }
|
||||
if size < 0 { break }
|
||||
f.Size = size
|
||||
|
||||
case "last-modified":
|
||||
var err error
|
||||
f.MTime, err = time.Parse(time.RFC1123, v)
|
||||
if err == nil { break }
|
||||
f.MTime, err = time.Parse(time.RFC850, v)
|
||||
if err == nil { break }
|
||||
// TODO Parse asctime
|
||||
f.MTime, err = time.Parse("2006-01-02", v[:10])
|
||||
if err == nil { break }
|
||||
}
|
||||
func (f *File) applyLastModified(v string) {
|
||||
if v == "" { return }
|
||||
var err error
|
||||
f.MTime, err = time.Parse(time.RFC1123, v)
|
||||
if err == nil { return }
|
||||
f.MTime, err = time.Parse(time.RFC850, v)
|
||||
if err == nil { return }
|
||||
// TODO Parse asctime
|
||||
f.MTime, err = time.Parse("2006-01-02", v[:10])
|
||||
if err == nil { return }
|
||||
}
|
||||
|
||||
func checkStatusCode(status int) error {
|
||||
@@ -232,41 +178,3 @@ func checkStatusCode(status int) error {
|
||||
return fmt.Errorf("got HTTP status %d", status)
|
||||
}
|
||||
}
|
||||
|
||||
var urlBlackList = [...]string {
|
||||
"",
|
||||
" ",
|
||||
".",
|
||||
"..",
|
||||
"/",
|
||||
}
|
||||
|
||||
var urlPartBlackList = [...]string {
|
||||
"?C=N&O=D",
|
||||
"?C=M&O=A",
|
||||
"?C=S&O=A",
|
||||
"?C=D&O=A",
|
||||
"?C=N;O=D",
|
||||
"?C=M;O=A",
|
||||
"?C=M&O=D",
|
||||
"?C=S;O=A",
|
||||
"?C=S&O=D",
|
||||
"?C=D;O=A",
|
||||
"?MA",
|
||||
"?SA",
|
||||
"?DA",
|
||||
"?ND",
|
||||
"?C=N&O=A",
|
||||
"?C=N&O=A",
|
||||
"?M=A",
|
||||
"?N=D",
|
||||
"?S=A",
|
||||
"?D=A",
|
||||
}
|
||||
|
||||
var fileNameBlackList = [...]string {
|
||||
"Parent Directory",
|
||||
" Parent Directory",
|
||||
"../",
|
||||
}
|
||||
|
||||
|
||||
1055
fasturl/url.go
1055
fasturl/url.go
File diff suppressed because it is too large
Load Diff
26
main.go
26
main.go
@@ -3,8 +3,8 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/terorie/oddb-go/fasturl"
|
||||
"github.com/urfave/cli"
|
||||
"github.com/valyala/fasthttp"
|
||||
"log"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
@@ -16,7 +16,7 @@ import (
|
||||
var app = cli.App {
|
||||
Name: "oddb-go",
|
||||
Usage: "OD-Database Go crawler",
|
||||
Version: "0.1",
|
||||
Version: "0.2",
|
||||
BashComplete: cli.DefaultAppComplete,
|
||||
Writer: os.Stdout,
|
||||
Compiled: buildDate,
|
||||
@@ -55,13 +55,19 @@ func cmdCrawler(clic *cli.Context) error {
|
||||
if !strings.Contains(arg, "://") {
|
||||
arg = "http://" + arg
|
||||
}
|
||||
var u fasturl.URL
|
||||
err := u.Parse(arg)
|
||||
if !strings.HasSuffix(u.Path, "/") {
|
||||
u.Path += "/"
|
||||
var u fasthttp.URI
|
||||
u.Parse(nil, []byte(arg))
|
||||
uPath := string(u.Path())
|
||||
if !strings.HasSuffix(uPath, "/") {
|
||||
u.SetPath(uPath + "/")
|
||||
}
|
||||
remotes[i] = &OD {
|
||||
Task: &Task{
|
||||
WebsiteId: 0,
|
||||
Url: u.String(),
|
||||
},
|
||||
BaseUri: u,
|
||||
}
|
||||
if err != nil { return err }
|
||||
remotes[i] = &OD{ BaseUri: u }
|
||||
}
|
||||
|
||||
c := context.Background()
|
||||
@@ -83,6 +89,6 @@ func cmdCrawler(clic *cli.Context) error {
|
||||
}
|
||||
|
||||
var buildDate = time.Date(
|
||||
2018, 10, 28,
|
||||
17, 10, 0, 0,
|
||||
2018, 11, 15,
|
||||
23, 24, 0, 0,
|
||||
time.UTC)
|
||||
|
||||
8
model.go
8
model.go
@@ -2,23 +2,23 @@ package main
|
||||
|
||||
import (
|
||||
"github.com/terorie/oddb-go/ds/redblackhash"
|
||||
"github.com/terorie/oddb-go/fasturl"
|
||||
"github.com/valyala/fasthttp"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Job struct {
|
||||
OD *OD
|
||||
Uri fasturl.URL
|
||||
Uri fasthttp.URI
|
||||
UriStr string
|
||||
Fails int
|
||||
LastError error
|
||||
}
|
||||
|
||||
type OD struct {
|
||||
Task *Task
|
||||
Wait sync.WaitGroup
|
||||
BaseUri fasturl.URL
|
||||
Files []File
|
||||
BaseUri fasthttp.URI
|
||||
WCtx WorkerContext
|
||||
Scanned redblackhash.Tree
|
||||
|
||||
|
||||
90
scheduler.go
90
scheduler.go
@@ -2,7 +2,11 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/sirupsen/logrus"
|
||||
"os"
|
||||
"path"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
@@ -12,38 +16,36 @@ var totalBuffered int64
|
||||
func Schedule(c context.Context, remotes <-chan *OD) {
|
||||
go Stats(c)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.Done():
|
||||
return
|
||||
for remote := range remotes {
|
||||
logrus.WithField("url", remote.BaseUri.String()).
|
||||
Info("Starting crawler")
|
||||
|
||||
case remote := <-remotes:
|
||||
logrus.WithField("url", remote.BaseUri.String()).
|
||||
Info("Starting crawler")
|
||||
// Collect results
|
||||
results := make(chan File)
|
||||
|
||||
// Spawn workers
|
||||
remote.WCtx.in, remote.WCtx.out = makeJobBuffer(c)
|
||||
for i := 0; i < config.Workers; i++ {
|
||||
go remote.WCtx.Worker()
|
||||
}
|
||||
|
||||
// Enqueue initial job
|
||||
atomic.AddInt32(&activeTasks, 1)
|
||||
remote.WCtx.queueJob(Job{
|
||||
OD: remote,
|
||||
Uri: remote.BaseUri,
|
||||
UriStr: remote.BaseUri.String(),
|
||||
Fails: 0,
|
||||
})
|
||||
globalWait.Done()
|
||||
|
||||
// Upload result when ready
|
||||
go remote.Watch()
|
||||
// Spawn workers
|
||||
remote.WCtx.in, remote.WCtx.out = makeJobBuffer(c)
|
||||
for i := 0; i < config.Workers; i++ {
|
||||
go remote.WCtx.Worker(results)
|
||||
}
|
||||
|
||||
// Enqueue initial job
|
||||
atomic.AddInt32(&activeTasks, 1)
|
||||
remote.WCtx.queueJob(Job{
|
||||
OD: remote,
|
||||
Uri: remote.BaseUri,
|
||||
UriStr: remote.BaseUri.String(),
|
||||
Fails: 0,
|
||||
})
|
||||
|
||||
// Upload result when ready
|
||||
go remote.Watch(results)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *OD) Watch() {
|
||||
func (r *OD) Watch(results chan File) {
|
||||
go r.Task.Collect(results)
|
||||
|
||||
// Wait for all jobs on remote to finish
|
||||
r.Wait.Wait()
|
||||
close(r.WCtx.in)
|
||||
@@ -51,6 +53,42 @@ func (r *OD) Watch() {
|
||||
|
||||
logrus.WithField("url", r.BaseUri.String()).
|
||||
Info("Crawler finished")
|
||||
|
||||
globalWait.Done()
|
||||
|
||||
close(results)
|
||||
}
|
||||
|
||||
func (t *Task) Collect(results chan File) {
|
||||
err := t.collect(results)
|
||||
if err != nil {
|
||||
logrus.WithError(err).
|
||||
Error("Failed saving crawl results")
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Task) collect(results chan File) error {
|
||||
err := os.MkdirAll("crawled", 0755)
|
||||
if err != nil { return err }
|
||||
|
||||
f, err := os.OpenFile(
|
||||
path.Join("crawled", fmt.Sprintf("%d.json", t.WebsiteId)),
|
||||
os.O_CREATE | os.O_WRONLY | os.O_TRUNC,
|
||||
0755,
|
||||
)
|
||||
if err != nil { return err }
|
||||
defer f.Close()
|
||||
|
||||
for result := range results {
|
||||
resJson, err := json.Marshal(result)
|
||||
if err != nil { panic(err) }
|
||||
_, err = f.Write(resJson)
|
||||
if err != nil { return err }
|
||||
_, err = f.Write([]byte{'\n'})
|
||||
if err != nil { return err }
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeJobBuffer(c context.Context) (chan<- Job, <-chan Job) {
|
||||
|
||||
44
worker.go
44
worker.go
@@ -1,8 +1,10 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"github.com/sirupsen/logrus"
|
||||
"math"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -17,13 +19,13 @@ type WorkerContext struct {
|
||||
numRateLimits int
|
||||
}
|
||||
|
||||
func (w WorkerContext) Worker() {
|
||||
func (w WorkerContext) Worker(results chan<- File) {
|
||||
for job := range w.out {
|
||||
w.step(job)
|
||||
w.step(results, job)
|
||||
}
|
||||
}
|
||||
|
||||
func (w WorkerContext) step(job Job) {
|
||||
func (w WorkerContext) step(results chan<- File, job Job) {
|
||||
defer w.finishJob(&job)
|
||||
|
||||
var f File
|
||||
@@ -62,12 +64,15 @@ func (w WorkerContext) step(job Job) {
|
||||
w.queueJob(job)
|
||||
}
|
||||
|
||||
job.OD.Files = append(job.OD.Files, f)
|
||||
if !f.IsDir {
|
||||
results <- f
|
||||
}
|
||||
}
|
||||
|
||||
func DoJob(job *Job, f *File) (newJobs []Job, err error) {
|
||||
if len(job.Uri.Path) == 0 { return }
|
||||
if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
|
||||
uriPath := job.Uri.Path()
|
||||
if len(uriPath) == 0 { return }
|
||||
if uriPath[len(uriPath)-1] == '/' {
|
||||
// Load directory
|
||||
links, err := GetDir(job, f)
|
||||
if err != nil {
|
||||
@@ -85,23 +90,36 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
|
||||
return nil, ErrKnown
|
||||
}
|
||||
|
||||
// Sort by path
|
||||
sort.Slice(links, func(i, j int) bool {
|
||||
return bytes.Compare(links[i].Path(), links[j].Path()) < 0
|
||||
})
|
||||
|
||||
var newJobCount int
|
||||
var lastLink string
|
||||
for _, link := range links {
|
||||
// Skip already queued links
|
||||
//if _, old := job.OD.Scanned.LoadOrStore(link, true); old {
|
||||
// continue
|
||||
//}
|
||||
uriStr := link.String()
|
||||
|
||||
// Ignore dupes
|
||||
if uriStr == lastLink {
|
||||
continue
|
||||
}
|
||||
lastLink = uriStr
|
||||
|
||||
job.OD.Wait.Add(1)
|
||||
newJobs = append(newJobs, Job{
|
||||
OD: job.OD,
|
||||
Uri: link,
|
||||
UriStr: link.String(),
|
||||
UriStr: uriStr,
|
||||
Fails: 0,
|
||||
})
|
||||
|
||||
newJobCount++
|
||||
}
|
||||
if config.Verbose {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"url": job.UriStr,
|
||||
"files": len(links),
|
||||
"files": newJobCount,
|
||||
}).Debug("Listed")
|
||||
}
|
||||
} else {
|
||||
@@ -119,7 +137,6 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
|
||||
|
||||
func (w WorkerContext) queueJob(job Job) {
|
||||
job.OD.Wait.Add(1)
|
||||
globalWait.Add(1)
|
||||
|
||||
if w.numRateLimits > 0 {
|
||||
if time.Since(w.lastRateLimit) > 5 * time.Second {
|
||||
@@ -136,5 +153,4 @@ func (w WorkerContext) queueJob(job Job) {
|
||||
|
||||
func (w WorkerContext) finishJob(job *Job) {
|
||||
job.OD.Wait.Done()
|
||||
globalWait.Done()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user