mirror of
https://github.com/terorie/od-database-crawler.git
synced 2025-12-14 07:39:03 +00:00
Compare commits
18 Commits
rip
...
fasthttpur
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8f6f8fd17f | ||
|
|
3c39f0d621 | ||
|
|
50952791c5 | ||
|
|
30bf98ad34 | ||
|
|
ccaf758e90 | ||
|
|
f668365edb | ||
|
|
1db8ff43bb | ||
|
|
82234f949e | ||
|
|
084b3a5903 | ||
|
|
ac0b8d2d0b | ||
|
|
ffde1a9e5d | ||
|
|
a268c6dbcf | ||
|
|
4c071171eb | ||
|
|
9c8174dd8d | ||
|
|
93272e1da1 | ||
|
|
0344a120ff | ||
|
|
6e6afd771e | ||
|
|
a8c27b2d21 |
BIN
.github/stress.png
vendored
Normal file
BIN
.github/stress.png
vendored
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 369 KiB |
12
README.md
12
README.md
@@ -1,2 +1,12 @@
|
|||||||
# oddb Go crawler
|
# oddb Go crawler 🚀
|
||||||
> by terorie 2018 :P
|
> by terorie 2018 :P
|
||||||
|
|
||||||
|
* Crawls HTTP open directories (standard Web Server Listings)
|
||||||
|
* Gets name, path, size and modification time of all files
|
||||||
|
* Soon: Will work as a crawler for [OD-Database](https://github.com/simon987/od-database)!
|
||||||
|
|
||||||
|
Stress test crawling [pandoradir](https://github.com/terorie/pandoradir)
|
||||||
|
on an average laptop (~10K requests per second, 4 connections):
|
||||||
|

|
||||||
|
|
||||||
|
Memory usage is being optimized :P
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ var config struct {
|
|||||||
Token string
|
Token string
|
||||||
Retries int
|
Retries int
|
||||||
Workers int
|
Workers int
|
||||||
|
Timeout time.Duration
|
||||||
Tasks int32
|
Tasks int32
|
||||||
CrawlStats time.Duration
|
CrawlStats time.Duration
|
||||||
AllocStats time.Duration
|
AllocStats time.Duration
|
||||||
@@ -25,6 +26,7 @@ const (
|
|||||||
ConfTasks = "crawl.tasks"
|
ConfTasks = "crawl.tasks"
|
||||||
ConfRetries = "crawl.retries"
|
ConfRetries = "crawl.retries"
|
||||||
ConfWorkers = "crawl.connections"
|
ConfWorkers = "crawl.connections"
|
||||||
|
ConfTimeout = "crawl.timeout"
|
||||||
ConfCrawlStats = "output.crawl_stats"
|
ConfCrawlStats = "output.crawl_stats"
|
||||||
ConfAllocStats = "output.resource_stats"
|
ConfAllocStats = "output.resource_stats"
|
||||||
ConfVerbose = "output.verbose"
|
ConfVerbose = "output.verbose"
|
||||||
@@ -34,6 +36,7 @@ func prepareConfig() {
|
|||||||
viper.SetDefault(ConfRetries, 5)
|
viper.SetDefault(ConfRetries, 5)
|
||||||
viper.SetDefault(ConfWorkers, 2)
|
viper.SetDefault(ConfWorkers, 2)
|
||||||
viper.SetDefault(ConfTasks, 3)
|
viper.SetDefault(ConfTasks, 3)
|
||||||
|
viper.SetDefault(ConfTimeout, 10 * time.Second)
|
||||||
viper.SetDefault(ConfCrawlStats, 3 * time.Second)
|
viper.SetDefault(ConfCrawlStats, 3 * time.Second)
|
||||||
viper.SetDefault(ConfAllocStats, 0)
|
viper.SetDefault(ConfAllocStats, 0)
|
||||||
viper.SetDefault(ConfVerbose, false)
|
viper.SetDefault(ConfVerbose, false)
|
||||||
@@ -73,6 +76,8 @@ func readConfig() {
|
|||||||
configOOB(ConfTasks, int(config.Tasks))
|
configOOB(ConfTasks, int(config.Tasks))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
config.Timeout = viper.GetDuration(ConfTimeout)
|
||||||
|
|
||||||
config.CrawlStats = viper.GetDuration(ConfCrawlStats)
|
config.CrawlStats = viper.GetDuration(ConfCrawlStats)
|
||||||
|
|
||||||
config.AllocStats = viper.GetDuration(ConfAllocStats)
|
config.AllocStats = viper.GetDuration(ConfAllocStats)
|
||||||
|
|||||||
@@ -24,3 +24,5 @@ crawl:
|
|||||||
# How often to retry getting data
|
# How often to retry getting data
|
||||||
# from the site before giving up
|
# from the site before giving up
|
||||||
retries: 5
|
retries: 5
|
||||||
|
# Time before discarding a network request
|
||||||
|
timeout: 10s
|
||||||
|
|||||||
208
crawl.go
208
crawl.go
@@ -3,13 +3,10 @@ package main
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
"github.com/terorie/oddb-go/ds/redblackhash"
|
"github.com/terorie/oddb-go/ds/redblackhash"
|
||||||
"github.com/terorie/oddb-go/fasturl"
|
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"golang.org/x/crypto/blake2b"
|
"golang.org/x/crypto/blake2b"
|
||||||
"golang.org/x/net/html"
|
"golang.org/x/net/html"
|
||||||
"golang.org/x/net/html/atom"
|
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -18,9 +15,9 @@ import (
|
|||||||
|
|
||||||
var client fasthttp.Client
|
var client fasthttp.Client
|
||||||
|
|
||||||
func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
|
func GetDir(j *Job, f *File) (links []fasthttp.URI, err error) {
|
||||||
f.IsDir = true
|
f.IsDir = true
|
||||||
f.Name = path.Base(j.Uri.Path)
|
f.Name = path.Base(string(j.Uri.Path()))
|
||||||
|
|
||||||
req := fasthttp.AcquireRequest()
|
req := fasthttp.AcquireRequest()
|
||||||
req.SetRequestURI(j.UriStr)
|
req.SetRequestURI(j.UriStr)
|
||||||
@@ -28,13 +25,10 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
|
|||||||
res := fasthttp.AcquireResponse()
|
res := fasthttp.AcquireResponse()
|
||||||
defer fasthttp.ReleaseResponse(res)
|
defer fasthttp.ReleaseResponse(res)
|
||||||
|
|
||||||
err = client.Do(req, res)
|
err = client.DoTimeout(req, res, config.Timeout)
|
||||||
fasthttp.ReleaseRequest(req)
|
fasthttp.ReleaseRequest(req)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil { return }
|
||||||
logrus.Error(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err = checkStatusCode(res.StatusCode())
|
err = checkStatusCode(res.StatusCode())
|
||||||
if err != nil { return }
|
if err != nil { return }
|
||||||
@@ -43,65 +37,58 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
|
|||||||
doc := html.NewTokenizer(bytes.NewReader(body))
|
doc := html.NewTokenizer(bytes.NewReader(body))
|
||||||
|
|
||||||
var linkHref string
|
var linkHref string
|
||||||
var linkTexts []string
|
|
||||||
for {
|
for {
|
||||||
tokenType := doc.Next()
|
tokenType := doc.Next()
|
||||||
token := doc.Token()
|
|
||||||
if tokenType == html.ErrorToken {
|
if tokenType == html.ErrorToken {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
switch tokenType {
|
switch tokenType {
|
||||||
case html.StartTagToken:
|
case html.StartTagToken:
|
||||||
if token.DataAtom == atom.A {
|
name, hasAttr := doc.TagName()
|
||||||
for _, attr := range token.Attr {
|
if len(name) == 1 && name[0] == 'a' {
|
||||||
if attr.Key == "href" {
|
for hasAttr {
|
||||||
linkHref = attr.Val
|
var ks, vs []byte
|
||||||
|
ks, vs, hasAttr = doc.TagAttr()
|
||||||
|
if bytes.Equal(ks, []byte("href")) {
|
||||||
|
// TODO Check escape
|
||||||
|
linkHref = string(vs)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case html.TextToken:
|
|
||||||
if linkHref != "" {
|
|
||||||
linkTexts = append(linkTexts, token.Data)
|
|
||||||
}
|
|
||||||
|
|
||||||
case html.EndTagToken:
|
case html.EndTagToken:
|
||||||
if linkHref != "" && token.DataAtom == atom.A {
|
name, _ := doc.TagName()
|
||||||
|
if len(name) == 1 && name[0] == 'a' {
|
||||||
// Copy params
|
// Copy params
|
||||||
href := linkHref
|
href := linkHref
|
||||||
linkText := strings.Join(linkTexts, " ")
|
|
||||||
|
|
||||||
// Reset params
|
// Reset params
|
||||||
linkHref = ""
|
linkHref = ""
|
||||||
linkTexts = nil
|
|
||||||
|
|
||||||
// TODO Optimized decision tree
|
if strings.LastIndexByte(href, '?') != -1 {
|
||||||
for _, entry := range urlBlackList {
|
goto nextToken
|
||||||
if href == entry {
|
|
||||||
goto nextToken
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, entry := range urlPartBlackList {
|
|
||||||
if strings.Contains(href, entry) {
|
|
||||||
goto nextToken
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, entry := range fileNameBlackList {
|
|
||||||
if strings.Contains(linkText, entry) {
|
|
||||||
goto nextToken
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var link fasturl.URL
|
switch href {
|
||||||
err = j.Uri.ParseRel(&link, href)
|
case "", " ", ".", "..", "/":
|
||||||
|
goto nextToken
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(href, "../") {
|
||||||
|
goto nextToken
|
||||||
|
}
|
||||||
|
|
||||||
|
var link fasthttp.URI
|
||||||
|
j.Uri.CopyTo(&link)
|
||||||
|
link.Update(href)
|
||||||
if err != nil { continue }
|
if err != nil { continue }
|
||||||
|
|
||||||
if link.Scheme != j.Uri.Scheme ||
|
if !bytes.Equal(link.Scheme(), j.Uri.Scheme()) ||
|
||||||
link.Host != j.Uri.Host ||
|
!bytes.Equal(link.Host(), j.Uri.Host()) ||
|
||||||
link.Path == j.Uri.Path ||
|
bytes.Equal(link.Path(), j.Uri.Path()) ||
|
||||||
!strings.HasPrefix(link.Path, j.Uri.Path) {
|
!bytes.HasPrefix(link.Path(), j.Uri.Path()) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,11 +102,12 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetFile(u fasturl.URL, f *File) (err error) {
|
func GetFile(u fasthttp.URI, f *File) (err error) {
|
||||||
f.IsDir = false
|
f.IsDir = false
|
||||||
u.Path = path.Clean(u.Path)
|
cleanPath := path.Clean(string(u.Path()))
|
||||||
f.Name = path.Base(u.Path)
|
u.SetPath(cleanPath)
|
||||||
f.Path = strings.Trim(u.Path, "/")
|
f.Name = path.Base(cleanPath)
|
||||||
|
f.Path = strings.Trim(cleanPath, "/")
|
||||||
|
|
||||||
req := fasthttp.AcquireRequest()
|
req := fasthttp.AcquireRequest()
|
||||||
req.Header.SetMethod("HEAD")
|
req.Header.SetMethod("HEAD")
|
||||||
@@ -129,7 +117,7 @@ func GetFile(u fasturl.URL, f *File) (err error) {
|
|||||||
res.SkipBody = true
|
res.SkipBody = true
|
||||||
defer fasthttp.ReleaseResponse(res)
|
defer fasthttp.ReleaseResponse(res)
|
||||||
|
|
||||||
err = client.Do(req, res)
|
err = client.DoTimeout(req, res, config.Timeout)
|
||||||
fasthttp.ReleaseRequest(req)
|
fasthttp.ReleaseRequest(req)
|
||||||
|
|
||||||
if err != nil { return }
|
if err != nil { return }
|
||||||
@@ -137,83 +125,41 @@ func GetFile(u fasturl.URL, f *File) (err error) {
|
|||||||
err = checkStatusCode(res.StatusCode())
|
err = checkStatusCode(res.StatusCode())
|
||||||
if err != nil { return }
|
if err != nil { return }
|
||||||
|
|
||||||
// TODO Inefficient af
|
f.applyContentLength(string(res.Header.Peek("content-length")))
|
||||||
header := res.Header.Header()
|
f.applyLastModified(string(res.Header.Peek("last-modified")))
|
||||||
f.ParseHeader(header)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *File) HashDir(links []fasturl.URL) (o redblackhash.Key) {
|
func (f *File) HashDir(links []fasthttp.URI) (o redblackhash.Key) {
|
||||||
h, _ := blake2b.New256(nil)
|
h, _ := blake2b.New256(nil)
|
||||||
h.Write([]byte(f.Name))
|
h.Write([]byte(f.Name))
|
||||||
for _, link := range links {
|
for _, link := range links {
|
||||||
fileName := path.Base(link.Path)
|
h.Write(link.Path())
|
||||||
h.Write([]byte(fileName))
|
|
||||||
}
|
}
|
||||||
sum := h.Sum(nil)
|
sum := h.Sum(nil)
|
||||||
copy(o[:redblackhash.KeySize], sum)
|
copy(o[:redblackhash.KeySize], sum)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *File) ParseHeader(h []byte) {
|
func (f *File) applyContentLength(v string) {
|
||||||
var k1, k2 int
|
if v == "" { return }
|
||||||
var v1, v2 int
|
size, err := strconv.ParseInt(v, 10, 64)
|
||||||
|
if err != nil { return }
|
||||||
// Simple finite state machine
|
if size < 0 { return }
|
||||||
state := 0
|
f.Size = size
|
||||||
for i, b := range h {
|
|
||||||
switch state {
|
|
||||||
case 0:
|
|
||||||
if b == byte(':') {
|
|
||||||
state = 1
|
|
||||||
k2 = i
|
|
||||||
}
|
|
||||||
|
|
||||||
case 1:
|
|
||||||
state = 2
|
|
||||||
|
|
||||||
case 2:
|
|
||||||
state = 3
|
|
||||||
v1 = i
|
|
||||||
|
|
||||||
case 3:
|
|
||||||
if b == byte('\r') {
|
|
||||||
state = 4
|
|
||||||
}
|
|
||||||
|
|
||||||
case 4:
|
|
||||||
state = 0
|
|
||||||
v2 = i - 1
|
|
||||||
|
|
||||||
key := string(h[k1:k2])
|
|
||||||
val := string(h[v1:v2])
|
|
||||||
k1 = i
|
|
||||||
|
|
||||||
f.applyHeader(key, val)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *File) applyHeader(k, v string) {
|
func (f *File) applyLastModified(v string) {
|
||||||
switch k {
|
if v == "" { return }
|
||||||
case "content-length":
|
var err error
|
||||||
size, err := strconv.ParseInt(v, 10, 64)
|
f.MTime, err = time.Parse(time.RFC1123, v)
|
||||||
if err != nil { break }
|
if err == nil { return }
|
||||||
if size < 0 { break }
|
f.MTime, err = time.Parse(time.RFC850, v)
|
||||||
f.Size = size
|
if err == nil { return }
|
||||||
|
// TODO Parse asctime
|
||||||
case "last-modified":
|
f.MTime, err = time.Parse("2006-01-02", v[:10])
|
||||||
var err error
|
if err == nil { return }
|
||||||
f.MTime, err = time.Parse(time.RFC1123, v)
|
|
||||||
if err == nil { break }
|
|
||||||
f.MTime, err = time.Parse(time.RFC850, v)
|
|
||||||
if err == nil { break }
|
|
||||||
// TODO Parse asctime
|
|
||||||
f.MTime, err = time.Parse("2006-01-02", v[:10])
|
|
||||||
if err == nil { break }
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkStatusCode(status int) error {
|
func checkStatusCode(status int) error {
|
||||||
@@ -232,41 +178,3 @@ func checkStatusCode(status int) error {
|
|||||||
return fmt.Errorf("got HTTP status %d", status)
|
return fmt.Errorf("got HTTP status %d", status)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var urlBlackList = [...]string {
|
|
||||||
"",
|
|
||||||
" ",
|
|
||||||
".",
|
|
||||||
"..",
|
|
||||||
"/",
|
|
||||||
}
|
|
||||||
|
|
||||||
var urlPartBlackList = [...]string {
|
|
||||||
"?C=N&O=D",
|
|
||||||
"?C=M&O=A",
|
|
||||||
"?C=S&O=A",
|
|
||||||
"?C=D&O=A",
|
|
||||||
"?C=N;O=D",
|
|
||||||
"?C=M;O=A",
|
|
||||||
"?C=M&O=D",
|
|
||||||
"?C=S;O=A",
|
|
||||||
"?C=S&O=D",
|
|
||||||
"?C=D;O=A",
|
|
||||||
"?MA",
|
|
||||||
"?SA",
|
|
||||||
"?DA",
|
|
||||||
"?ND",
|
|
||||||
"?C=N&O=A",
|
|
||||||
"?C=N&O=A",
|
|
||||||
"?M=A",
|
|
||||||
"?N=D",
|
|
||||||
"?S=A",
|
|
||||||
"?D=A",
|
|
||||||
}
|
|
||||||
|
|
||||||
var fileNameBlackList = [...]string {
|
|
||||||
"Parent Directory",
|
|
||||||
" Parent Directory",
|
|
||||||
"../",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|||||||
1055
fasturl/url.go
1055
fasturl/url.go
File diff suppressed because it is too large
Load Diff
26
main.go
26
main.go
@@ -3,8 +3,8 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/terorie/oddb-go/fasturl"
|
|
||||||
"github.com/urfave/cli"
|
"github.com/urfave/cli"
|
||||||
|
"github.com/valyala/fasthttp"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
_ "net/http/pprof"
|
_ "net/http/pprof"
|
||||||
@@ -16,7 +16,7 @@ import (
|
|||||||
var app = cli.App {
|
var app = cli.App {
|
||||||
Name: "oddb-go",
|
Name: "oddb-go",
|
||||||
Usage: "OD-Database Go crawler",
|
Usage: "OD-Database Go crawler",
|
||||||
Version: "0.1",
|
Version: "0.2",
|
||||||
BashComplete: cli.DefaultAppComplete,
|
BashComplete: cli.DefaultAppComplete,
|
||||||
Writer: os.Stdout,
|
Writer: os.Stdout,
|
||||||
Compiled: buildDate,
|
Compiled: buildDate,
|
||||||
@@ -55,13 +55,19 @@ func cmdCrawler(clic *cli.Context) error {
|
|||||||
if !strings.Contains(arg, "://") {
|
if !strings.Contains(arg, "://") {
|
||||||
arg = "http://" + arg
|
arg = "http://" + arg
|
||||||
}
|
}
|
||||||
var u fasturl.URL
|
var u fasthttp.URI
|
||||||
err := u.Parse(arg)
|
u.Parse(nil, []byte(arg))
|
||||||
if !strings.HasSuffix(u.Path, "/") {
|
uPath := string(u.Path())
|
||||||
u.Path += "/"
|
if !strings.HasSuffix(uPath, "/") {
|
||||||
|
u.SetPath(uPath + "/")
|
||||||
|
}
|
||||||
|
remotes[i] = &OD {
|
||||||
|
Task: &Task{
|
||||||
|
WebsiteId: 0,
|
||||||
|
Url: u.String(),
|
||||||
|
},
|
||||||
|
BaseUri: u,
|
||||||
}
|
}
|
||||||
if err != nil { return err }
|
|
||||||
remotes[i] = &OD{ BaseUri: u }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c := context.Background()
|
c := context.Background()
|
||||||
@@ -83,6 +89,6 @@ func cmdCrawler(clic *cli.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var buildDate = time.Date(
|
var buildDate = time.Date(
|
||||||
2018, 10, 28,
|
2018, 11, 15,
|
||||||
17, 10, 0, 0,
|
23, 24, 0, 0,
|
||||||
time.UTC)
|
time.UTC)
|
||||||
|
|||||||
8
model.go
8
model.go
@@ -2,23 +2,23 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/terorie/oddb-go/ds/redblackhash"
|
"github.com/terorie/oddb-go/ds/redblackhash"
|
||||||
"github.com/terorie/oddb-go/fasturl"
|
"github.com/valyala/fasthttp"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Job struct {
|
type Job struct {
|
||||||
OD *OD
|
OD *OD
|
||||||
Uri fasturl.URL
|
Uri fasthttp.URI
|
||||||
UriStr string
|
UriStr string
|
||||||
Fails int
|
Fails int
|
||||||
LastError error
|
LastError error
|
||||||
}
|
}
|
||||||
|
|
||||||
type OD struct {
|
type OD struct {
|
||||||
|
Task *Task
|
||||||
Wait sync.WaitGroup
|
Wait sync.WaitGroup
|
||||||
BaseUri fasturl.URL
|
BaseUri fasthttp.URI
|
||||||
Files []File
|
|
||||||
WCtx WorkerContext
|
WCtx WorkerContext
|
||||||
Scanned redblackhash.Tree
|
Scanned redblackhash.Tree
|
||||||
|
|
||||||
|
|||||||
90
scheduler.go
90
scheduler.go
@@ -2,7 +2,11 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -12,38 +16,36 @@ var totalBuffered int64
|
|||||||
func Schedule(c context.Context, remotes <-chan *OD) {
|
func Schedule(c context.Context, remotes <-chan *OD) {
|
||||||
go Stats(c)
|
go Stats(c)
|
||||||
|
|
||||||
for {
|
for remote := range remotes {
|
||||||
select {
|
logrus.WithField("url", remote.BaseUri.String()).
|
||||||
case <-c.Done():
|
Info("Starting crawler")
|
||||||
return
|
|
||||||
|
|
||||||
case remote := <-remotes:
|
// Collect results
|
||||||
logrus.WithField("url", remote.BaseUri.String()).
|
results := make(chan File)
|
||||||
Info("Starting crawler")
|
|
||||||
|
|
||||||
// Spawn workers
|
// Spawn workers
|
||||||
remote.WCtx.in, remote.WCtx.out = makeJobBuffer(c)
|
remote.WCtx.in, remote.WCtx.out = makeJobBuffer(c)
|
||||||
for i := 0; i < config.Workers; i++ {
|
for i := 0; i < config.Workers; i++ {
|
||||||
go remote.WCtx.Worker()
|
go remote.WCtx.Worker(results)
|
||||||
}
|
|
||||||
|
|
||||||
// Enqueue initial job
|
|
||||||
atomic.AddInt32(&activeTasks, 1)
|
|
||||||
remote.WCtx.queueJob(Job{
|
|
||||||
OD: remote,
|
|
||||||
Uri: remote.BaseUri,
|
|
||||||
UriStr: remote.BaseUri.String(),
|
|
||||||
Fails: 0,
|
|
||||||
})
|
|
||||||
globalWait.Done()
|
|
||||||
|
|
||||||
// Upload result when ready
|
|
||||||
go remote.Watch()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Enqueue initial job
|
||||||
|
atomic.AddInt32(&activeTasks, 1)
|
||||||
|
remote.WCtx.queueJob(Job{
|
||||||
|
OD: remote,
|
||||||
|
Uri: remote.BaseUri,
|
||||||
|
UriStr: remote.BaseUri.String(),
|
||||||
|
Fails: 0,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Upload result when ready
|
||||||
|
go remote.Watch(results)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *OD) Watch() {
|
func (r *OD) Watch(results chan File) {
|
||||||
|
go r.Task.Collect(results)
|
||||||
|
|
||||||
// Wait for all jobs on remote to finish
|
// Wait for all jobs on remote to finish
|
||||||
r.Wait.Wait()
|
r.Wait.Wait()
|
||||||
close(r.WCtx.in)
|
close(r.WCtx.in)
|
||||||
@@ -51,6 +53,42 @@ func (r *OD) Watch() {
|
|||||||
|
|
||||||
logrus.WithField("url", r.BaseUri.String()).
|
logrus.WithField("url", r.BaseUri.String()).
|
||||||
Info("Crawler finished")
|
Info("Crawler finished")
|
||||||
|
|
||||||
|
globalWait.Done()
|
||||||
|
|
||||||
|
close(results)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Task) Collect(results chan File) {
|
||||||
|
err := t.collect(results)
|
||||||
|
if err != nil {
|
||||||
|
logrus.WithError(err).
|
||||||
|
Error("Failed saving crawl results")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Task) collect(results chan File) error {
|
||||||
|
err := os.MkdirAll("crawled", 0755)
|
||||||
|
if err != nil { return err }
|
||||||
|
|
||||||
|
f, err := os.OpenFile(
|
||||||
|
path.Join("crawled", fmt.Sprintf("%d.json", t.WebsiteId)),
|
||||||
|
os.O_CREATE | os.O_WRONLY | os.O_TRUNC,
|
||||||
|
0755,
|
||||||
|
)
|
||||||
|
if err != nil { return err }
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
for result := range results {
|
||||||
|
resJson, err := json.Marshal(result)
|
||||||
|
if err != nil { panic(err) }
|
||||||
|
_, err = f.Write(resJson)
|
||||||
|
if err != nil { return err }
|
||||||
|
_, err = f.Write([]byte{'\n'})
|
||||||
|
if err != nil { return err }
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeJobBuffer(c context.Context) (chan<- Job, <-chan Job) {
|
func makeJobBuffer(c context.Context) (chan<- Job, <-chan Job) {
|
||||||
|
|||||||
44
worker.go
44
worker.go
@@ -1,8 +1,10 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"math"
|
"math"
|
||||||
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@@ -17,13 +19,13 @@ type WorkerContext struct {
|
|||||||
numRateLimits int
|
numRateLimits int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w WorkerContext) Worker() {
|
func (w WorkerContext) Worker(results chan<- File) {
|
||||||
for job := range w.out {
|
for job := range w.out {
|
||||||
w.step(job)
|
w.step(results, job)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w WorkerContext) step(job Job) {
|
func (w WorkerContext) step(results chan<- File, job Job) {
|
||||||
defer w.finishJob(&job)
|
defer w.finishJob(&job)
|
||||||
|
|
||||||
var f File
|
var f File
|
||||||
@@ -62,12 +64,15 @@ func (w WorkerContext) step(job Job) {
|
|||||||
w.queueJob(job)
|
w.queueJob(job)
|
||||||
}
|
}
|
||||||
|
|
||||||
job.OD.Files = append(job.OD.Files, f)
|
if !f.IsDir {
|
||||||
|
results <- f
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func DoJob(job *Job, f *File) (newJobs []Job, err error) {
|
func DoJob(job *Job, f *File) (newJobs []Job, err error) {
|
||||||
if len(job.Uri.Path) == 0 { return }
|
uriPath := job.Uri.Path()
|
||||||
if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
|
if len(uriPath) == 0 { return }
|
||||||
|
if uriPath[len(uriPath)-1] == '/' {
|
||||||
// Load directory
|
// Load directory
|
||||||
links, err := GetDir(job, f)
|
links, err := GetDir(job, f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -85,23 +90,36 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
|
|||||||
return nil, ErrKnown
|
return nil, ErrKnown
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sort by path
|
||||||
|
sort.Slice(links, func(i, j int) bool {
|
||||||
|
return bytes.Compare(links[i].Path(), links[j].Path()) < 0
|
||||||
|
})
|
||||||
|
|
||||||
|
var newJobCount int
|
||||||
|
var lastLink string
|
||||||
for _, link := range links {
|
for _, link := range links {
|
||||||
// Skip already queued links
|
uriStr := link.String()
|
||||||
//if _, old := job.OD.Scanned.LoadOrStore(link, true); old {
|
|
||||||
// continue
|
// Ignore dupes
|
||||||
//}
|
if uriStr == lastLink {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
lastLink = uriStr
|
||||||
|
|
||||||
job.OD.Wait.Add(1)
|
job.OD.Wait.Add(1)
|
||||||
newJobs = append(newJobs, Job{
|
newJobs = append(newJobs, Job{
|
||||||
OD: job.OD,
|
OD: job.OD,
|
||||||
Uri: link,
|
Uri: link,
|
||||||
UriStr: link.String(),
|
UriStr: uriStr,
|
||||||
Fails: 0,
|
Fails: 0,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
newJobCount++
|
||||||
}
|
}
|
||||||
if config.Verbose {
|
if config.Verbose {
|
||||||
logrus.WithFields(logrus.Fields{
|
logrus.WithFields(logrus.Fields{
|
||||||
"url": job.UriStr,
|
"url": job.UriStr,
|
||||||
"files": len(links),
|
"files": newJobCount,
|
||||||
}).Debug("Listed")
|
}).Debug("Listed")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -119,7 +137,6 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
|
|||||||
|
|
||||||
func (w WorkerContext) queueJob(job Job) {
|
func (w WorkerContext) queueJob(job Job) {
|
||||||
job.OD.Wait.Add(1)
|
job.OD.Wait.Add(1)
|
||||||
globalWait.Add(1)
|
|
||||||
|
|
||||||
if w.numRateLimits > 0 {
|
if w.numRateLimits > 0 {
|
||||||
if time.Since(w.lastRateLimit) > 5 * time.Second {
|
if time.Since(w.lastRateLimit) > 5 * time.Second {
|
||||||
@@ -136,5 +153,4 @@ func (w WorkerContext) queueJob(job Job) {
|
|||||||
|
|
||||||
func (w WorkerContext) finishJob(job *Job) {
|
func (w WorkerContext) finishJob(job *Job) {
|
||||||
job.OD.Wait.Done()
|
job.OD.Wait.Done()
|
||||||
globalWait.Done()
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user