40 Commits

Author SHA1 Message Date
Richard Patel
24d9d1fd42 Make resume work 2019-02-03 16:47:01 +01:00
Richard Patel
f3be76e001 resume len 2019-02-03 16:41:26 +01:00
Richard Patel
4ef4ab13a8 Fix sleeps 2019-02-03 16:34:29 +01:00
Richard Patel
25d0b0042c resume: Fix missing gob register 2019-02-03 16:32:01 +01:00
Richard Patel
ef7d17cad4 Fix too long sleep 2019-02-03 16:28:43 +01:00
Richard Patel
e919323169 Resume tests 2019-02-03 16:24:18 +01:00
Richard Patel
a3aebe4ef2 Pause file version 2019-02-03 16:08:42 +01:00
Richard Patel
acbfd78a5d Save marker 2019-02-03 16:06:52 +01:00
Richard Patel
fe1e7bf261 Save: queue dir if not yet exists 2019-02-03 16:01:15 +01:00
Richard Patel
c6d7fad8e8 Resume state saving 2019-02-03 15:54:02 +01:00
Richard Patel
0b20823ae1 Resume log messages 2019-02-03 15:09:49 +01:00
Richard Patel
8d68bf1bbc Open result files in append-mode 2019-02-03 15:06:52 +01:00
Richard Patel
a83eb0cfd7 Initial resume implementation 2019-02-03 15:02:07 +01:00
Richard Patel
b18b70f798 Fix segfault (thanks Pikami) 2019-02-03 14:00:17 +01:00
Richard Patel
9d5f549774 Better server User-Agent string 2019-02-03 12:23:21 +01:00
Richard Patel
5239af08f7 Bump version to v1.2.1 2019-02-03 03:36:39 +01:00
Richard Patel
46c0e0bd32 Smarter HTTP error handling 2019-02-03 03:35:09 +01:00
Richard Patel
0ca6deede8 Fix --config flag 2019-02-03 03:26:48 +01:00
Richard Patel
120c026983 Bump version to v1.2.0 2019-02-03 02:55:21 +01:00
Richard Patel
527e8895ec Support configuration without config file 2019-02-03 02:54:52 +01:00
Richard Patel
108fff0503 Add Travis CI badge 2019-02-03 02:09:06 +01:00
Richard Patel
e5746baa5b Switch to spf13/cobra
lul
2019-02-03 02:02:23 +01:00
Richard Patel
17ba5583c9 Add .travis.yml 2019-02-02 23:18:03 +01:00
Richard Patel
92a8c07f4a Add go.mod 2019-02-02 23:15:52 +01:00
Richard Patel
43f96c6988 Benchmark: Reference parser 2018-12-18 15:39:41 +01:00
Richard Patel
b244cdae80 Minor cleanup 2018-12-18 15:31:33 +01:00
Richard Patel
4b8275c7bf Add parser tests 2018-12-18 15:31:09 +01:00
Richard Patel
f90bf94a44 Bump version to v1.1.1 2018-11-27 22:11:57 +01:00
Richard Patel
e82768ff80 Wait time control in config 2018-11-27 22:11:57 +01:00
Richard Patel
b1bf59adef Add The Eye DB to README.md 2018-11-27 17:40:12 +01:00
Richard Patel
a2df2972f4 Bump the upload retry interval up to 30s 2018-11-20 04:13:20 +01:00
Richard Patel
3fc8837dd7 Add output files to .gitignore 2018-11-20 03:51:42 +01:00
Richard Patel
f9a0d6bffe Bump to v1.1.0 2018-11-20 03:46:36 +01:00
Richard Patel
4dbe2aef2b Add job buffer size parameter 2018-11-20 03:42:32 +01:00
Richard Patel
86ec78cae1 Add TCP timeout option 2018-11-20 03:29:10 +01:00
Richard Patel
b846498030 Delete URL queues after crawling 2018-11-20 03:05:43 +01:00
Richard Patel
4f3140a39f Fix queue_count in log 2018-11-20 02:49:03 +01:00
Richard Patel
85d2aac9d4 Performance patch 2018-11-20 02:33:50 +01:00
Richard Patel
b6c0a45900 Job queue disk offloading 2018-11-20 02:03:10 +01:00
Richard Patel
d332f06659 Limit retries to 10 2018-11-18 21:05:26 +01:00
24 changed files with 6173 additions and 212 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
/.idea/ /.idea/
.DS_Store .DS_Store
/od-database-crawler /od-database-crawler
*.log
/queue/
/crawled/

5
.travis.yml Normal file
View File

@@ -0,0 +1,5 @@
language: go
go:
- "1.11.x"
- master

View File

@@ -1,7 +1,24 @@
# od-database Go crawler 🚀 # od-database Go crawler 🚀
[![Build Status](https://travis-ci.org/terorie/od-database-crawler.svg?branch=master)](https://travis-ci.org/terorie/od-database-crawler)
> by terorie 2018 :P > by terorie 2018 :P
* Crawler for [__OD-Database__](https://github.com/simon987/od-database) * Crawler for [__OD-Database__](https://github.com/simon987/od-database)
* Crawls HTTP open directories (standard Web Server Listings) * Crawls HTTP open directories (standard Web Server Listings)
* Gets name, path, size and modification time of all files * Gets name, path, size and modification time of all files
* Lightweight and fast: __over 9000 requests per second__ on a standard laptop * Lightweight and fast: __over 9000 requests per second__ on a standard laptop
https://od-db.the-eye.eu/
#### Usage
1. With Config File (if `config.yml` found in working dir)
- Download [default config](https://github.com/terorie/od-database-crawler/blob/master/config.yml)
- Set `server.url` and `server.token`
- Start with `./od-database-crawler server --config <file>`
2. With Flags or env
- Override config file if it exists
- `--help` for list of flags
- Every flag is available as an environment variable:
`--server.crawl_stats` ➡️ `OD_SERVER_CRAWL_STATS`
- Start with `./od-database-crawler server <flags>`

121
config.go
View File

@@ -4,6 +4,7 @@ import (
"bufio" "bufio"
"fmt" "fmt"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/spf13/viper" "github.com/spf13/viper"
"io" "io"
"os" "os"
@@ -20,25 +21,33 @@ var config struct {
Retries int Retries int
Workers int Workers int
UserAgent string UserAgent string
Timeout time.Duration
Tasks int32 Tasks int32
CrawlStats time.Duration
AllocStats time.Duration
Verbose bool Verbose bool
PrintHTTP bool PrintHTTP bool
JobBufferSize int
} }
var onlineMode bool
const ( const (
ConfServerUrl = "server.url" ConfServerUrl = "server.url"
ConfToken = "server.token" ConfToken = "server.token"
ConfServerTimeout = "server.timeout" ConfServerTimeout = "server.timeout"
ConfRecheck = "server.recheck" ConfRecheck = "server.recheck"
ConfCooldown = "server.cooldown"
ConfChunkSize = "server.upload_chunk" ConfChunkSize = "server.upload_chunk"
ConfUploadRetries = "server.upload_retries"
ConfUploadRetryInterval = "server.upload_retry_interval"
ConfTasks = "crawl.tasks" ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries" ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections" ConfWorkers = "crawl.connections"
ConfUserAgent = "crawl.user-agent" ConfUserAgent = "crawl.user-agent"
ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout" ConfTimeout = "crawl.timeout"
ConfJobBufferSize = "crawl.job_buffer"
ConfResume = "crawl.resume"
ConfCrawlStats = "output.crawl_stats" ConfCrawlStats = "output.crawl_stats"
ConfAllocStats = "output.resource_stats" ConfAllocStats = "output.resource_stats"
ConfVerbose = "output.verbose" ConfVerbose = "output.verbose"
@@ -47,29 +56,98 @@ const (
) )
func prepareConfig() { func prepareConfig() {
viper.SetDefault(ConfRetries, 5) pf := rootCmd.PersistentFlags()
viper.SetDefault(ConfWorkers, 2)
viper.SetDefault(ConfTasks, 3) pf.SortFlags = false
viper.SetDefault(ConfUserAgent, "") pf.StringVar(&configFile, "config", "", "Config file")
viper.SetDefault(ConfTimeout, 10 * time.Second) configFile = os.Getenv("OD_CONFIG")
viper.SetDefault(ConfCrawlStats, 3 * time.Second)
viper.SetDefault(ConfAllocStats, 0) pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL")
viper.SetDefault(ConfVerbose, false)
viper.SetDefault(ConfPrintHTTP, false) pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)")
viper.SetDefault(ConfLogFile, "")
viper.SetDefault(ConfRecheck, 3 * time.Second) pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout")
viper.SetDefault(ConfChunkSize, "1 MB")
pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs")
pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error")
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries")
pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks")
pf.Uint(ConfWorkers, 4, "Crawler: Connections per server")
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout")
pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout")
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size")
pf.Duration(ConfResume, 72 * time.Hour, "Crawler: Resume tasks not older than x")
pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval")
pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval")
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors")
pf.String(ConfLogFile, "crawler.log", "Log file")
// Bind all flags to Viper
pf.VisitAll(func(flag *pflag.Flag) {
s := flag.Name
s = strings.TrimLeft(s, "-")
if err := viper.BindPFlag(s, flag); err != nil {
panic(err)
}
var envKey string
envKey = strings.Replace(s, ".", "_", -1)
envKey = strings.ToUpper(envKey)
envKey = "OD_" + envKey
if err := viper.BindEnv(s, envKey); err != nil {
panic(err)
}
})
} }
func readConfig() { func readConfig() {
viper.AddConfigPath(".") // If config.yml in working dir, use it
viper.SetConfigName("config") if configFile == "" {
err := viper.ReadInConfig() _, err := os.Stat("config.yml")
if err == nil {
configFile = "config.yml"
}
}
if configFile != "" {
confF, err := os.Open(configFile)
if err != nil { if err != nil {
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
os.Exit(1) os.Exit(1)
} }
defer confF.Close()
viper.SetConfigType("yml")
err = viper.ReadConfig(confF)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
if onlineMode {
config.ServerUrl = viper.GetString(ConfServerUrl) config.ServerUrl = viper.GetString(ConfServerUrl)
if config.ServerUrl == "" { if config.ServerUrl == "" {
configMissing(ConfServerUrl) configMissing(ConfServerUrl)
@@ -80,6 +158,7 @@ func readConfig() {
if config.Token == "" { if config.Token == "" {
configMissing(ConfToken) configMissing(ConfToken)
} }
}
config.ServerTimeout = viper.GetDuration(ConfServerTimeout) config.ServerTimeout = viper.GetDuration(ConfServerTimeout)
@@ -107,11 +186,11 @@ func readConfig() {
config.UserAgent = viper.GetString(ConfUserAgent) config.UserAgent = viper.GetString(ConfUserAgent)
config.Timeout = viper.GetDuration(ConfTimeout) setDialTimeout(viper.GetDuration(ConfDialTimeout))
config.CrawlStats = viper.GetDuration(ConfCrawlStats) setTimeout(viper.GetDuration(ConfTimeout))
config.AllocStats = viper.GetDuration(ConfAllocStats) config.JobBufferSize = viper.GetInt(ConfJobBufferSize)
config.Verbose = viper.GetBool(ConfVerbose) config.Verbose = viper.GetBool(ConfVerbose)
if config.Verbose { if config.Verbose {

View File

@@ -15,10 +15,17 @@ server:
# between /task/get requests to the server. # between /task/get requests to the server.
recheck: 1s recheck: 1s
# Time to wait after receiving an error
# from the server. Doesn't apply to uploads.
cooldown: 30s
# Upload chunk size # Upload chunk size
# If the value is too high, the upload fails. # If the value is too high, the upload fails.
upload_chunk: 1 MB upload_chunk: 1 MB
upload_retries: 10
upload_retry_interval: 30s
# Log output settings # Log output settings
output: output:
# Crawl statistics # Crawl statistics
@@ -46,15 +53,32 @@ crawl:
# Please be careful with this setting! # Please be careful with this setting!
# The crawler fires fast and more than # The crawler fires fast and more than
# ten connections can overwhelm a server. # ten connections can overwhelm a server.
connections: 10 connections: 4
# How often to retry getting data # How often to retry getting data
# from the site before giving up # from the site before giving up
retries: 5 retries: 5
# Time before discarding a failed connection attempt
dial_timeout: 10s
# Time before discarding a network request # Time before discarding a network request
timeout: 10s timeout: 30s
# Crawler User-Agent # Crawler User-Agent
# If empty, no User-Agent header is sent. # If empty, no User-Agent header is sent.
user-agent: "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0" user-agent: "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0"
# Job buffer size (per task)
# Higher values cause less disk writes
# but require more memory.
#
# The job queue contains all URLs
# that should be crawled next.
# As it grows very large over time,
# it's kept mainly on disk.
# This sets how many jobs are kept
# in memory.
# A negative value will cause all jobs
# to be stored in memory. (Don't do this)
job_buffer: 5000

View File

@@ -8,6 +8,7 @@ import (
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"golang.org/x/crypto/blake2b" "golang.org/x/crypto/blake2b"
"golang.org/x/net/html" "golang.org/x/net/html"
"net"
"path" "path"
"strconv" "strconv"
"strings" "strings"
@@ -20,6 +21,17 @@ var client = fasthttp.Client {
}, },
} }
func setDialTimeout(d time.Duration) {
client.Dial = func(addr string) (net.Conn, error) {
return fasthttp.DialTimeout(addr, d)
}
}
func setTimeout(d time.Duration) {
client.ReadTimeout = d
client.WriteTimeout = d / 2
}
func GetDir(j *Job, f *File) (links []fasturl.URL, err error) { func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
f.IsDir = true f.IsDir = true
f.Name = path.Base(j.Uri.Path) f.Name = path.Base(j.Uri.Path)
@@ -33,7 +45,7 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
res := fasthttp.AcquireResponse() res := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(res) defer fasthttp.ReleaseResponse(res)
err = client.DoTimeout(req, res, config.Timeout) err = client.Do(req, res)
fasthttp.ReleaseRequest(req) fasthttp.ReleaseRequest(req)
if err != nil { if err != nil {
@@ -46,6 +58,10 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
} }
body := res.Body() body := res.Body()
return ParseDir(body, &j.Uri)
}
func ParseDir(body []byte, baseUri *fasturl.URL) (links []fasturl.URL, err error) {
doc := html.NewTokenizer(bytes.NewReader(body)) doc := html.NewTokenizer(bytes.NewReader(body))
var linkHref string var linkHref string
@@ -95,15 +111,15 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
} }
var link fasturl.URL var link fasturl.URL
err = j.Uri.ParseRel(&link, href) err = baseUri.ParseRel(&link, href)
if err != nil { if err != nil {
continue continue
} }
if link.Scheme != j.Uri.Scheme || if link.Scheme != baseUri.Scheme ||
link.Host != j.Uri.Host || link.Host != baseUri.Host ||
link.Path == j.Uri.Path || link.Path == baseUri.Path ||
!strings.HasPrefix(link.Path, j.Uri.Path) { !strings.HasPrefix(link.Path, baseUri.Path) {
continue continue
} }
@@ -132,7 +148,7 @@ func GetFile(u fasturl.URL, f *File) (err error) {
res.SkipBody = true res.SkipBody = true
defer fasthttp.ReleaseResponse(res) defer fasthttp.ReleaseResponse(res)
err = client.DoTimeout(req, res, config.Timeout) err = client.Do(req, res)
fasthttp.ReleaseRequest(req) fasthttp.ReleaseRequest(req)
if err != nil { if err != nil {

4766
crawl_apache2_test.go Normal file

File diff suppressed because it is too large Load Diff

117
crawl_nginx_test.go Normal file
View File

@@ -0,0 +1,117 @@
package main
import (
"github.com/terorie/od-database-crawler/fasturl"
"testing"
)
func TestParseDirNginx(t *testing.T) {
var u fasturl.URL
err := u.Parse("https://the-eye.eu/public/")
if err != nil {
t.Fatal("Failed to parse URL", err)
}
links, err := ParseDir([]byte(nginxListing), &u)
if err != nil {
t.Fatal("Failed to extract links", err)
}
if len(links) != len(nginxLinks) {
t.Fatalf("Expected %d links, got %d",
len(nginxLinks), len(links))
}
for i := 0; i < len(links); i++ {
gotLink := links[i].String()
expLink := nginxLinks[i]
if gotLink != expLink {
t.Errorf(`Expected "%s" got "%s"`,
expLink, gotLink)
}
}
}
var nginxLinks = []string {
"https://the-eye.eu/public/AppleArchive/",
"https://the-eye.eu/public/AudioBooks/",
"https://the-eye.eu/public/Books/",
"https://the-eye.eu/public/Comics/",
"https://the-eye.eu/public/Games/",
"https://the-eye.eu/public/Icons/",
"https://the-eye.eu/public/Images/",
"https://the-eye.eu/public/JFK_Files/",
"https://the-eye.eu/public/MSDN/",
"https://the-eye.eu/public/Music/",
"https://the-eye.eu/public/Operating%20Systems/",
"https://the-eye.eu/public/Posters/",
"https://the-eye.eu/public/Psychedelics/",
"https://the-eye.eu/public/Psychoactives/",
"https://the-eye.eu/public/Radio/",
"https://the-eye.eu/public/Random/",
"https://the-eye.eu/public/Site-Dumps/",
"https://the-eye.eu/public/Software/",
"https://the-eye.eu/public/Strategic%20Intelligence%20Network/",
"https://the-eye.eu/public/WorldTracker.org/",
"https://the-eye.eu/public/concen.org/",
"https://the-eye.eu/public/freenrg.info/",
"https://the-eye.eu/public/murdercube.com/",
"https://the-eye.eu/public/parazite/",
"https://the-eye.eu/public/ripreddit/",
"https://the-eye.eu/public/rom/",
"https://the-eye.eu/public/touhou/",
"https://the-eye.eu/public/vns/",
"https://the-eye.eu/public/xbins/",
"https://the-eye.eu/public/xbins.diodematrix/",
"https://the-eye.eu/public/Rclone_for_Scrubs.pdf",
"https://the-eye.eu/public/Wget_Linux_Guide.pdf",
"https://the-eye.eu/public/Wget_Windows_Guide.pdf",
"https://the-eye.eu/public/rclone_guide.pdf",
"https://the-eye.eu/public/wget-noobs-guide.pdf",
"https://the-eye.eu/public/xbox-scene_Aug2014.7z",
}
const nginxListing =
`<html>
<head><title>Index of /public/</title></head>
<body bgcolor="white">
<h1>Index of /public/</h1><hr><pre><a href="../">../</a>
<a href="AppleArchive/">AppleArchive/</a> 03-Nov-2017 18:13 -
<a href="AudioBooks/">AudioBooks/</a> 29-Sep-2018 19:47 -
<a href="Books/">Books/</a> 27-Nov-2018 17:50 -
<a href="Comics/">Comics/</a> 05-Nov-2018 21:37 -
<a href="Games/">Games/</a> 28-Nov-2018 11:54 -
<a href="Icons/">Icons/</a> 22-May-2018 07:47 -
<a href="Images/">Images/</a> 21-Jan-2018 03:21 -
<a href="JFK_Files/">JFK_Files/</a> 03-Nov-2017 17:03 -
<a href="MSDN/">MSDN/</a> 03-Nov-2017 15:48 -
<a href="Music/">Music/</a> 02-Mar-2018 15:47 -
<a href="Operating%20Systems/">Operating Systems/</a> 25-Apr-2018 07:18 -
<a href="Posters/">Posters/</a> 07-Jul-2018 01:12 -
<a href="Psychedelics/">Psychedelics/</a> 11-Apr-2018 05:45 -
<a href="Psychoactives/">Psychoactives/</a> 18-May-2018 02:58 -
<a href="Radio/">Radio/</a> 09-Jun-2018 15:49 -
<a href="Random/">Random/</a> 04-Dec-2018 12:33 -
<a href="Site-Dumps/">Site-Dumps/</a> 15-Dec-2018 11:04 -
<a href="Software/">Software/</a> 27-Nov-2017 00:22 -
<a href="Strategic%20Intelligence%20Network/">Strategic Intelligence Network/</a> 17-Nov-2017 16:35 -
<a href="WorldTracker.org/">WorldTracker.org/</a> 12-Apr-2018 04:16 -
<a href="concen.org/">concen.org/</a> 08-Oct-2018 14:08 -
<a href="freenrg.info/">freenrg.info/</a> 19-Dec-2017 10:59 -
<a href="murdercube.com/">murdercube.com/</a> 06-Dec-2017 10:45 -
<a href="parazite/">parazite/</a> 20-Nov-2017 21:25 -
<a href="ripreddit/">ripreddit/</a> 04-Aug-2018 14:30 -
<a href="rom/">rom/</a> 28-Nov-2018 14:15 -
<a href="touhou/">touhou/</a> 03-Nov-2017 11:07 -
<a href="vns/">vns/</a> 03-Nov-2017 11:36 -
<a href="xbins/">xbins/</a> 03-Nov-2017 17:23 -
<a href="xbins.diodematrix/">xbins.diodematrix/</a> 21-Sep-2018 22:33 -
<a href="Rclone_for_Scrubs.pdf">Rclone_for_Scrubs.pdf</a> 04-Sep-2018 13:31 315K
<a href="Wget_Linux_Guide.pdf">Wget_Linux_Guide.pdf</a> 21-Dec-2017 20:28 168K
<a href="Wget_Windows_Guide.pdf">Wget_Windows_Guide.pdf</a> 25-Nov-2017 17:59 867K
<a href="rclone_guide.pdf">rclone_guide.pdf</a> 03-Sep-2018 23:37 315K
<a href="wget-noobs-guide.pdf">wget-noobs-guide.pdf</a> 21-Dec-2017 20:29 168K
<a href="xbox-scene_Aug2014.7z">xbox-scene_Aug2014.7z</a> 26-Oct-2017 23:09 1G
</pre><hr></body>
</html>`

59
crawl_test.go Normal file
View File

@@ -0,0 +1,59 @@
package main
import (
"bytes"
"github.com/PuerkitoBio/goquery"
"github.com/terorie/od-database-crawler/fasturl"
"net/url"
"strings"
"testing"
)
func BenchmarkParseDir(b *testing.B) {
for n := 0; n < b.N; n++ {
var u fasturl.URL
err := u.Parse("http://archive.ubuntu.com/ubuntu/indices/")
if err != nil {
b.Fatal("Failed to parse URL", err)
}
_, err = ParseDir([]byte(apache2Listing), &u)
if err != nil {
b.Fatal("Failed to extract links", err)
}
}
}
func BenchmarkParseDirReference(b *testing.B) {
for n := 0; n < b.N; n++ {
u, err := url.Parse("http://archive.ubuntu.com/ubuntu/indices/")
if err != nil {
b.Fatal("Failed to parse URL", err)
}
_, err = referenceParseDir([]byte(apache2Listing), u)
if err != nil {
b.Fatal("Failed to extract links", err)
}
}
}
func referenceParseDir(body []byte, baseUri *url.URL) (links []*url.URL, err error) {
doc, err := goquery.NewDocumentFromReader(bytes.NewReader(body))
if err != nil { return nil, err }
doc.Find("a[href]").Each(func(i int, s *goquery.Selection) {
href, _ := s.Attr("href")
sub, err := baseUri.Parse(href)
if err != nil { return } // continue
if !strings.HasPrefix(sub.String(), baseUri.String()) {
return // continue
}
links = append(links, sub)
})
return
}

View File

@@ -15,7 +15,10 @@ package redblackhash
import ( import (
"bytes" "bytes"
"encoding/binary"
"encoding/hex"
"fmt" "fmt"
"io"
"sync" "sync"
) )
@@ -43,6 +46,13 @@ type Node struct {
Parent *Node Parent *Node
} }
type nodeHeader struct {
Key *Key
Color color
}
var o = binary.BigEndian
func (k *Key) Compare(o *Key) int { func (k *Key) Compare(o *Key) int {
return bytes.Compare(k[:], o[:]) return bytes.Compare(k[:], o[:])
} }
@@ -233,7 +243,7 @@ func (tree *Tree) String() string {
} }
func (node *Node) String() string { func (node *Node) String() string {
return fmt.Sprintf("%v", node.Key) return hex.EncodeToString(node.Key[:16]) + "..."
} }
func output(node *Node, prefix string, isTail bool, str *string) { func output(node *Node, prefix string, isTail bool, str *string) {
@@ -481,6 +491,119 @@ func (tree *Tree) deleteCase6(node *Node) {
} }
} }
func (tree *Tree) Marshal(w io.Writer) (err error) {
tree.Lock()
defer tree.Unlock()
err = binary.Write(w, o, uint64(0x617979797979790A))
if err != nil { return err }
err = marshal(tree.Root, w)
if err != nil { return err }
err = binary.Write(w, o, uint64(0x6C6D616F6F6F6F0A))
if err != nil { return err }
return nil
}
func marshal(n *Node, w io.Writer) (err error) {
if n == nil {
err = binary.Write(w, o, uint64(0x796565656565740A))
return err
}
err = binary.Write(w, o, uint64(0xF09F85B1EFB88F0A))
if err != nil { return err }
_, err = w.Write(n.Key[:])
if err != nil { return err }
var colorI uint64
if n.color {
colorI = 0x7468652D6579657C
} else {
colorI = 0x6865782B7465727C
}
err = binary.Write(w, o, colorI)
if err != nil { return err }
err = marshal(n.Left, w)
if err != nil { return err }
err = marshal(n.Right, w)
if err != nil { return err }
return nil
}
func (tree *Tree) Unmarshal(r io.Reader) (err error) {
tree.Lock()
defer tree.Unlock()
var sof uint64
err = binary.Read(r, o, &sof)
if err != nil { return err }
if sof != 0x617979797979790A {
return fmt.Errorf("redblack: wrong format")
}
tree.Root, tree.size, err = unmarshal(r)
if err != nil { return err }
var eof uint64
err = binary.Read(r, o, &eof)
if err != nil { return err }
if eof != 0x6C6D616F6F6F6F0A {
return fmt.Errorf("redblack: end of file missing")
}
return nil
}
func unmarshal(r io.Reader) (n *Node, size int, err error) {
var head uint64
err = binary.Read(r, o, &head)
if err != nil { return nil, 0, err }
size = 1
switch head {
case 0x796565656565740A:
return nil, 0, nil
case 0xF09F85B1EFB88F0A:
n = new(Node)
_, err = io.ReadFull(r, n.Key[:])
if err != nil { return nil, 0, err }
var colorInt uint64
err = binary.Read(r, o, &colorInt)
if err != nil { return nil, 0, err }
switch colorInt {
case 0x7468652D6579657C:
n.color = true
case 0x6865782B7465727C:
n.color = false
default:
return nil, 0, fmt.Errorf("redblack: corrupt node color")
}
default:
return nil, 0, fmt.Errorf("redblack: corrupt node info")
}
var s2 int
n.Left, s2, err = unmarshal(r)
size += s2
if err != nil { return nil, 0, err }
n.Right, s2, err = unmarshal(r)
size += s2
if err != nil { return nil, 0, err }
return n, size, nil
}
func nodeColor(node *Node) color { func nodeColor(node *Node) color {
if node == nil { if node == nil {
return black return black

View File

@@ -0,0 +1,47 @@
package redblackhash
import (
"bytes"
"math/rand"
"testing"
)
func TestTree_Marshal(t *testing.T) {
var t1, t2 Tree
// Generate 1000 random values to insert
for i := 0; i < 1000; i++ {
var key Key
rand.Read(key[:])
t1.Put(&key)
}
// Marshal tree
var wr bytes.Buffer
err := t1.Marshal(&wr)
if err != nil {
t.Error(err)
t.FailNow()
}
buf := wr.Bytes()
rd := bytes.NewBuffer(buf)
// Unmarshal tree
err = t2.Unmarshal(rd)
if err != nil {
t.Error(err)
t.FailNow()
}
if !compare(t1.Root, t2.Root) {
t.Error("trees are not equal")
t.FailNow()
}
}
func compare(n1, n2 *Node) bool {
return n1.Key.Compare(&n2.Key) == 0 &&
(n1.Left == nil || compare(n1.Left, n2.Left)) &&
(n1.Right == nil || compare(n1.Right, n2.Right))
}

View File

@@ -3,6 +3,8 @@ package main
import ( import (
"errors" "errors"
"fmt" "fmt"
"github.com/valyala/fasthttp"
"net"
) )
var ErrRateLimit = errors.New("too many requests") var ErrRateLimit = errors.New("too many requests")
@@ -15,3 +17,29 @@ type HttpError struct {
func (e HttpError) Error() string { func (e HttpError) Error() string {
return fmt.Sprintf("http status %d", e.code) return fmt.Sprintf("http status %d", e.code)
} }
func shouldRetry(err error) bool {
// HTTP errors
if httpErr, ok := err.(*HttpError); ok {
switch httpErr.code {
case fasthttp.StatusTooManyRequests:
return true
default:
// Don't retry HTTP error codes
return false
}
}
if dnsError, ok := err.(*net.DNSError); ok {
// Don't retry permanent DNS errors
return dnsError.IsTemporary
}
if netErr, ok := err.(*net.OpError); ok {
// Don't retry permanent network errors
return netErr.Temporary()
}
// Retry by default
return true
}

14
go.mod Normal file
View File

@@ -0,0 +1,14 @@
module github.com/syndtr/od-database-crawler
require (
github.com/beeker1121/goque v2.0.1+incompatible
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/sirupsen/logrus v1.3.0
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.1
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 // indirect
github.com/terorie/od-database-crawler v1.1.1
github.com/valyala/fasthttp v1.1.0
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3
)

66
go.sum Normal file
View File

@@ -0,0 +1,66 @@
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/beeker1121/goque v2.0.1+incompatible h1:5nJHPMqQLxUvGFc8m/NW2QzxKyc0zICmqs/JUsmEjwE=
github.com/beeker1121/goque v2.0.1+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/klauspost/compress v1.4.0 h1:8nsMz3tWa9SWWPL60G1V6CUsf4lLjWLTNEtibhe8gh8=
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e h1:+lIPJOWl+jSiJOc70QXJ07+2eg2Jy2EC7Mi11BWujeM=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.1 h1:5+8j8FTpnFV4nEImW/ofkzEt8VoOiLXxdYIDsB73T38=
github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/terorie/od-database-crawler v1.1.1 h1:Ca+ZqbZX3rVWBR8SDRzvroyxjBtUs75MQXZ9YG0gqGo=
github.com/terorie/od-database-crawler v1.1.1/go.mod h1:vVJ7pLkudrlUNp9qu24JCzQ8N6mFsrOmX1tPXr155DQ=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.1.0 h1:3BohG7mqwj4lq7PTX//7gLbUlzNvZSPmuHFnloXT0lw=
github.com/valyala/fasthttp v1.1.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 h1:MQ/ZZiDsUapFFiMS+vzwXkCTeEKaum+Do5rINYJDmxc=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 h1:czFLhve3vsQetD6JOJ8NZZvGQIXlnN3/yXxbT6/awxI=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

15
help.go Normal file
View File

@@ -0,0 +1,15 @@
package main
const helpText =
`HTTP crawler for the OD-Database
DB >> https://od-db.the-eye.eu <<
Crawler >> https://github.com/terorie/od-database-crawler <<
Server >> https://github.com/simon987/od-database <<
Quick start:
- get config file (config.yml in working dir)
- get OD-DB server ("server.url": Database URL + /api)
- get access token ("server.token": e.g. c010b6dd-20...)
- ./od-database-crawler server
Questions? Discord @terorie#2664 / Telegram @terorie`

137
main.go
View File

@@ -2,72 +2,104 @@ package main
import ( import (
"context" "context"
"fmt"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/terorie/od-database-crawler/fasturl" "github.com/terorie/od-database-crawler/fasturl"
"github.com/urfave/cli"
"os" "os"
"os/signal"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
) )
var app = cli.App { var configFile string
Name: "od-database-crawler",
Usage: "OD-Database Go crawler", var rootCmd = cobra.Command {
Version: "1.0.2", Use: "od-database-crawler",
BashComplete: cli.DefaultAppComplete, Version: "1.2.1",
Writer: os.Stdout, Short: "OD-Database Go crawler",
Action: cmdBase, Long: helpText,
Commands: []cli.Command{ PersistentPreRunE: preRun,
{ PersistentPostRun: func(cmd *cobra.Command, args []string) {
Name: "crawl",
Usage: "Crawl a list of URLs",
ArgsUsage: "<site>",
Action: cmdCrawler,
},
},
After: func(i *cli.Context) error {
exitHooks.Execute() exitHooks.Execute()
return nil
}, },
} }
var serverCmd = cobra.Command {
Use: "server",
Short: "Start crawl server",
Long: "Connect to the OD-Database and contribute to the database\n" +
"by crawling the web for open directories!",
Run: cmdBase,
}
var crawlCmd = cobra.Command {
Use: "crawl",
Short: "Crawl an URL",
Long: "Crawl the URL specified.\n" +
"Results will not be uploaded to the database,\n" +
"they're saved under crawled/0.json instead.\n" +
"Primarily used for testing and benchmarking.",
RunE: cmdCrawler,
Args: cobra.ExactArgs(1),
}
var exitHooks Hooks var exitHooks Hooks
func init() { func init() {
rootCmd.AddCommand(&crawlCmd)
rootCmd.AddCommand(&serverCmd)
prepareConfig() prepareConfig()
} }
func main() { func preRun(cmd *cobra.Command, args []string) error {
err := os.MkdirAll("crawled", 0755) if err := os.MkdirAll("crawled", 0755);
if err != nil { err != nil { panic(err) }
panic(err)
}
readConfig() if err := os.MkdirAll("queue", 0755);
app.Run(os.Args) err != nil { panic(err) }
return nil
} }
func cmdBase(_ *cli.Context) error { func main() {
// TODO Graceful shutdown err := rootCmd.Execute()
appCtx := context.Background() if err != nil {
forceCtx := context.Background() fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
func cmdBase(_ *cobra.Command, _ []string) {
onlineMode = true
readConfig()
appCtx, soft := context.WithCancel(context.Background())
forceCtx, hard := context.WithCancel(context.Background())
go hardShutdown(forceCtx)
go listenCtrlC(soft, hard)
inRemotes := make(chan *OD) inRemotes := make(chan *OD)
go Schedule(forceCtx, inRemotes) go LoadResumeTasks(inRemotes)
go Schedule(appCtx, inRemotes)
ticker := time.NewTicker(config.Recheck) ticker := time.NewTicker(config.Recheck)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-appCtx.Done(): case <-appCtx.Done():
return nil goto shutdown
case <-ticker.C: case <-ticker.C:
t, err := FetchTask() t, err := FetchTask()
if err != nil { if err != nil {
logrus.WithError(err). logrus.WithError(err).
Error("Failed to get new task") Error("Failed to get new task")
time.Sleep(30 * time.Second) if !sleep(viper.GetDuration(ConfCooldown), appCtx) {
goto shutdown
}
continue continue
} }
if t == nil { if t == nil {
@@ -94,22 +126,22 @@ func cmdBase(_ *cli.Context) error {
} else if err != nil { } else if err != nil {
logrus.WithError(err). logrus.WithError(err).
Error("Failed to get new task") Error("Failed to get new task")
time.Sleep(30 * time.Second) time.Sleep(viper.GetDuration(ConfCooldown))
continue continue
} }
ScheduleTask(inRemotes, t, &baseUri) ScheduleTask(inRemotes, t, &baseUri)
} }
} }
return nil shutdown:
globalWait.Wait()
} }
func cmdCrawler(clic *cli.Context) error { func cmdCrawler(_ *cobra.Command, args []string) error {
if clic.NArg() != 1 { onlineMode = false
cli.ShowCommandHelpAndExit(clic, "crawl", 1) readConfig()
}
arg := clic.Args()[0] arg := args[0]
// https://github.com/golang/go/issues/19779 // https://github.com/golang/go/issues/19779
if !strings.Contains(arg, "://") { if !strings.Contains(arg, "://") {
arg = "http://" + arg arg = "http://" + arg
@@ -141,3 +173,30 @@ func cmdCrawler(clic *cli.Context) error {
return nil return nil
} }
func listenCtrlC(soft, hard context.CancelFunc) {
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
<-c
logrus.Info(">>> Shutting down crawler... <<<")
soft()
<-c
logrus.Warning(">>> Force shutdown! <<<")
hard()
}
func hardShutdown(c context.Context) {
<-c.Done()
os.Exit(1)
}
func sleep(d time.Duration, c context.Context) bool {
select {
case <-time.After(d):
return true
case <-c.Done():
return false
}
}

View File

@@ -3,7 +3,6 @@ package main
import ( import (
"github.com/terorie/od-database-crawler/ds/redblackhash" "github.com/terorie/od-database-crawler/ds/redblackhash"
"github.com/terorie/od-database-crawler/fasturl" "github.com/terorie/od-database-crawler/fasturl"
"sync"
"time" "time"
) )
@@ -23,7 +22,6 @@ type TaskResult struct {
} }
type Job struct { type Job struct {
OD *OD
Uri fasturl.URL Uri fasturl.URL
UriStr string UriStr string
Fails int Fails int
@@ -33,12 +31,19 @@ type Job struct {
type OD struct { type OD struct {
Task Task Task Task
Result TaskResult Result TaskResult
Wait sync.WaitGroup InProgress int64
BaseUri fasturl.URL BaseUri fasturl.URL
WCtx WorkerContext WCtx WorkerContext
Scanned redblackhash.Tree Scanned redblackhash.Tree
} }
type PausedOD struct {
Task *Task
Result *TaskResult
BaseUri *fasturl.URL
InProgress int64
}
type File struct { type File struct {
Name string `json:"name"` Name string `json:"name"`
Size int64 `json:"size"` Size int64 `json:"size"`
@@ -57,3 +62,8 @@ func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) {
o.Scanned.Put(k) o.Scanned.Put(k)
return false return false
} }
type errorString string
func (e errorString) Error() string {
return string(e)
}

129
queue.go Normal file
View File

@@ -0,0 +1,129 @@
package main
import (
"github.com/beeker1121/goque"
"os"
"sync"
"sync/atomic"
)
type BufferedQueue struct {
dataDir string
q *goque.Queue
buf []Job
m sync.Mutex
}
func OpenQueue(dataDir string) (bq *BufferedQueue, err error) {
bq = new(BufferedQueue)
if config.JobBufferSize < 0 {
return
}
bq.dataDir = dataDir
bq.q, err = goque.OpenQueue(dataDir)
if err != nil { return nil, err }
return
}
func (q *BufferedQueue) Enqueue(job *Job) error {
atomic.AddInt64(&totalQueued, 1)
if q.directEnqueue(job) {
return nil
}
var gob JobGob
gob.ToGob(job)
_, err := q.q.EnqueueObject(gob)
return err
}
func (q *BufferedQueue) Dequeue() (job Job, err error) {
if q.directDequeue(&job) {
atomic.AddInt64(&totalQueued, -1)
return job, nil
}
if config.JobBufferSize < 0 {
err = goque.ErrEmpty
return
}
var item *goque.Item
item, err = q.q.Dequeue()
if err != nil { return }
atomic.AddInt64(&totalQueued, -1)
var gob JobGob
err = item.ToObject(&gob)
if err != nil { return }
gob.FromGob(&job)
return
}
func (q *BufferedQueue) directEnqueue(job *Job) bool {
q.m.Lock()
defer q.m.Unlock()
bs := config.JobBufferSize
if len(q.buf) < bs || bs < 0 {
q.buf = append(q.buf, *job)
return true
} else {
return false
}
}
func (q *BufferedQueue) directDequeue(job *Job) bool {
q.m.Lock()
defer q.m.Unlock()
if len(q.buf) > 0 {
*job = q.buf[0]
q.buf = q.buf[1:]
return true
} else {
return false
}
}
// Always returns nil (But implements io.Closer)
func (q *BufferedQueue) Close() error {
if config.JobBufferSize < 0 {
return nil
}
// Close ignoring errors
q.q.Close()
// Delete files
if err := os.RemoveAll(q.dataDir);
err != nil { panic(err) }
return nil
}
type JobGob struct {
Uri string
Fails int
LastError string
}
func (g *JobGob) ToGob(j *Job) {
g.Uri = j.UriStr
g.Fails = j.Fails
if j.LastError != nil {
g.LastError = j.LastError.Error()
}
}
func (g *JobGob) FromGob(j *Job) {
if err := j.Uri.Parse(g.Uri);
err != nil { panic(err) }
j.UriStr = g.Uri
j.Fails = g.Fails
if g.LastError != "" {
j.LastError = errorString(g.LastError)
}
}

270
resume.go Normal file
View File

@@ -0,0 +1,270 @@
package main
import (
"bytes"
"encoding/binary"
"encoding/gob"
"fmt"
"github.com/beeker1121/goque"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"io"
"os"
"path/filepath"
"strconv"
"sync/atomic"
"time"
)
func init() {
gob.Register(&PausedOD{})
}
func LoadResumeTasks(inRemotes chan<- *OD) {
resumed, err := ResumeTasks()
if err != nil {
logrus.WithError(err).
Error("Failed to resume queued tasks. " +
"/queue is probably corrupt")
err = nil
}
for _, remote := range resumed {
inRemotes <- remote
}
}
func ResumeTasks() (tasks []*OD, err error) {
// Get files in /queue
var queueF *os.File
var entries []os.FileInfo
queueF, err = os.Open("queue")
if err != nil { return nil, err }
defer queueF.Close()
entries, err = queueF.Readdir(-1)
if err != nil { return nil, err }
resumeDur := viper.GetDuration(ConfResume)
for _, entry := range entries {
if !entry.IsDir() { continue }
// Check if name is a number
var id uint64
if id, err = strconv.ParseUint(entry.Name(), 10, 64); err != nil {
continue
}
// Too old to be resumed
timeDelta := time.Since(entry.ModTime())
if resumeDur >= 0 && timeDelta > resumeDur {
removeOldQueue(id)
continue
}
// Load queue
var od *OD
if od, err = resumeQueue(id); err != nil {
logrus.WithError(err).
WithField("id", id).
Warning("Failed to load paused task")
continue
} else if od == nil {
removeOldQueue(id)
continue
}
tasks = append(tasks, od)
}
return tasks, nil
}
func SaveTask(od *OD) (err error) {
dir := filepath.Join("queue",
strconv.FormatUint(od.Task.WebsiteId, 10))
fPath := filepath.Join(dir, "PAUSED")
err = os.Mkdir(dir, 0777)
if err != nil { return err }
// Open pause file
pausedF, err := os.OpenFile(fPath, os.O_CREATE | os.O_WRONLY | os.O_TRUNC, 0666)
if err != nil { return err }
defer pausedF.Close()
err = writePauseFile(od, pausedF)
if err != nil { return err }
return nil
}
func resumeQueue(id uint64) (od *OD, err error) {
logrus.WithField("id", id).
Info("Found unfinished")
fPath := filepath.Join("queue", strconv.FormatUint(id, 10))
// Try to find pause file
pausedF, err := os.Open(filepath.Join(fPath, "PAUSED"))
if os.IsNotExist(err) {
// No PAUSED file => not paused
// not paused => no error
return nil, nil
} else if err != nil {
return nil, err
}
defer pausedF.Close()
od = new(OD)
od.WCtx.OD = od
err = readPauseFile(od, pausedF)
if err != nil { return nil, err }
// Open queue
bq, err := OpenQueue(fPath)
if err != nil { return nil, err }
od.WCtx.Queue = bq
logrus.WithField("id", id).
Info("Resuming task")
return od, nil
}
func removeOldQueue(id uint64) {
if id == 0 {
// TODO Make custom crawl less of an ugly hack
return
}
logrus.WithField("id", id).
Warning("Deleting & returning old task")
name := strconv.FormatUint(id, 10)
fPath := filepath.Join("queue", name)
// Acquire old queue
q, err := goque.OpenQueue(fPath)
if err != nil {
// Queue lock exists, don't delete
logrus.WithField("err", err).
WithField("path", fPath).
Error("Failed to acquire old task")
return
}
// Delete old queue from disk
err = q.Drop()
if err != nil {
// Queue lock exists, don't delete
logrus.WithField("err", err).
WithField("path", fPath).
Error("Failed to delete old task")
return
}
// Delete old crawl result from disk
_ = os.Remove(filepath.Join("crawled", name + ".json"))
// Return task to server
if err := CancelTask(id); err != nil {
// Queue lock exists, don't delete
logrus.WithField("err", err).
WithField("id", id).
Warning("Failed to return unfinished task to server")
return
}
}
func writePauseFile(od *OD, w io.Writer) (err error) {
// Write pause file version
_, err = w.Write([]byte("ODPAUSE-"))
if err != nil { return err }
// Create save state
paused := PausedOD {
Task: &od.Task,
Result: &od.Result,
BaseUri: &od.BaseUri,
InProgress: atomic.LoadInt64(&od.InProgress),
}
// Prepare pause settings
var b bytes.Buffer
pauseEnc := gob.NewEncoder(&b)
err = pauseEnc.Encode(&paused)
if err != nil { return err }
// Write length of pause settings
err = binary.Write(w, binary.LittleEndian, uint64(b.Len()))
if err != nil { return err }
// Write pause settings
_, err = w.Write(b.Bytes())
if err != nil { return err }
// Write pause scan state
err = od.Scanned.Marshal(w)
if err != nil { return err }
// Save mark
_, err = w.Write([]byte("--------"))
if err != nil { return err }
return nil
}
func readPauseFile(od *OD, r io.Reader) (err error) {
// Make the paused struct point to OD fields
// So gob loads values into the OD struct
paused := PausedOD {
Task: &od.Task,
Result: &od.Result,
BaseUri: &od.BaseUri,
}
var version [8]byte
_, err = io.ReadFull(r, version[:])
if err != nil { return err }
if !bytes.Equal(version[:], []byte("ODPAUSE-")) {
return fmt.Errorf("unsupported pause file")
}
// Read pause settings len
var pauseSettingsLen uint64
err = binary.Read(r, binary.LittleEndian, &pauseSettingsLen)
// Read pause settings
pauseDec := gob.NewDecoder(io.LimitReader(r, int64(pauseSettingsLen)))
err = pauseDec.Decode(&paused)
if err != nil { return err }
atomic.StoreInt64(&od.InProgress, paused.InProgress)
err = readPauseStateTree(od, r)
if err != nil {
return fmt.Errorf("failed to read state tree: %s", err)
}
return nil
}
func readPauseStateTree(od *OD, r io.Reader) (err error) {
// Read pause scan state
err = od.Scanned.Unmarshal(r)
if err != nil { return err }
// Check mark
var mark [8]byte
_, err = io.ReadFull(r, mark[:])
if err != nil { return err }
if !bytes.Equal(mark[:], []byte("--------")) {
return fmt.Errorf("corrupt pause file")
}
return nil
}

48
resume_test.go Normal file
View File

@@ -0,0 +1,48 @@
package main
import (
"bytes"
"github.com/terorie/od-database-crawler/fasturl"
"testing"
"time"
)
func TestResumeTasks_Empty(t *testing.T) {
start := time.Now().Add(-1 * time.Minute)
od := OD {
Task: Task {
WebsiteId: 213,
Url: "https://the-eye.eu/public/",
},
Result: TaskResult {
StartTime: start,
StartTimeUnix: start.Unix(),
EndTimeUnix: time.Now().Unix(),
WebsiteId: 213,
},
InProgress: 0,
BaseUri: fasturl.URL {
Scheme: fasturl.SchemeHTTPS,
Host: "the-eye.eu",
Path: "/public/",
},
}
od.WCtx.OD = &od
var b bytes.Buffer
var err error
err = writePauseFile(&od, &b)
if err != nil {
t.Fatal(err)
}
buf := b.Bytes()
var od2 OD
b2 := bytes.NewBuffer(buf)
err = readPauseFile(&od2, b2)
if err != nil {
t.Fatal(err)
}
}

View File

@@ -16,28 +16,45 @@ import (
var activeTasksLock sync.Mutex var activeTasksLock sync.Mutex
var activeTasks = make(map[uint64]bool) var activeTasks = make(map[uint64]bool)
var numActiveTasks int32 var numActiveTasks int32
var totalBuffered int64 var totalQueued int64
func Schedule(c context.Context, remotes <-chan *OD) { func Schedule(c context.Context, remotes <-chan *OD) {
go Stats(c) go Stats(c)
for remote := range remotes { for remote := range remotes {
if !scheduleNewTask(c, remote) {
return
}
}
}
func scheduleNewTask(c context.Context, remote *OD) bool {
logrus.WithField("url", remote.BaseUri.String()). logrus.WithField("url", remote.BaseUri.String()).
Info("Starting crawler") Info("Starting crawler")
// Collect results // Collect results
results := make(chan File) results := make(chan File)
remote.WCtx.OD = remote
// Get queue path
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue
if err := os.RemoveAll(queuePath);
err != nil { panic(err) }
// Start new queue
var err error
remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) }
// Spawn workers // Spawn workers
remote.WCtx.in, remote.WCtx.out = makeJobBuffer(c) remote.WCtx.SpawnWorkers(c, results, config.Workers)
for i := 0; i < config.Workers; i++ {
go remote.WCtx.Worker(results)
}
// Enqueue initial job // Enqueue initial job
atomic.AddInt32(&numActiveTasks, 1) atomic.AddInt32(&numActiveTasks, 1)
remote.WCtx.queueJob(Job{ remote.WCtx.queueJob(Job{
OD: remote,
Uri: remote.BaseUri, Uri: remote.BaseUri,
UriStr: remote.BaseUri.String(), UriStr: remote.BaseUri.String(),
Fails: 0, Fails: 0,
@@ -48,14 +65,12 @@ func Schedule(c context.Context, remotes <-chan *OD) {
// Sleep if max number of tasks are active // Sleep if max number of tasks are active
for atomic.LoadInt32(&numActiveTasks) > config.Tasks { for atomic.LoadInt32(&numActiveTasks) > config.Tasks {
select { if !sleep(time.Second, c) {
case <-c.Done(): break
return
case <-time.After(time.Second):
continue
}
} }
} }
return true
} }
func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) { func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
@@ -105,7 +120,7 @@ func (o *OD) Watch(results chan File) {
// Open crawl results file // Open crawl results file
f, err := os.OpenFile( f, err := os.OpenFile(
filePath, filePath,
os.O_CREATE | os.O_RDWR | os.O_TRUNC, os.O_CREATE | os.O_RDWR | os.O_APPEND,
0644, 0644,
) )
if err != nil { if err != nil {
@@ -147,12 +162,33 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
defer close(results) defer close(results)
// Wait for all jobs on remote to finish // Wait for all jobs on remote to finish
o.Wait.Wait() for {
close(o.WCtx.in) // Natural finish
atomic.AddInt32(&numActiveTasks, -1) if atomic.LoadInt64(&o.InProgress) == 0 {
o.onTaskFinished()
return
}
// Abort
if atomic.LoadInt32(&o.WCtx.aborted) != 0 {
// Wait for all workers to finish
o.WCtx.workers.Wait()
o.onTaskPaused()
return
}
time.Sleep(500 * time.Millisecond)
}
}
func (o *OD) onTaskFinished() {
defer atomic.AddInt32(&numActiveTasks, -1)
// Close queue
if err := o.WCtx.Queue.Close(); err != nil {
panic(err)
}
// Log finish // Log finish
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId, "id": o.Task.WebsiteId,
"url": o.BaseUri.String(), "url": o.BaseUri.String(),
@@ -175,6 +211,37 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
} }
} }
func (o *OD) onTaskPaused() {
defer atomic.AddInt32(&numActiveTasks, -1)
// Close queue
if err := o.WCtx.Queue.Close(); err != nil {
panic(err)
}
// Set current end time
o.Result.EndTimeUnix = time.Now().Unix()
// Save task metadata
err := SaveTask(o)
if err != nil {
// Log finish
logrus.WithFields(logrus.Fields{
"err": err.Error(),
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
}).Error("Failed to save crawler state")
return
}
// Log finish
logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
"duration": time.Since(o.Result.StartTime),
}).Info("Crawler paused")
}
func (t *Task) Collect(results chan File, f *os.File, errC chan<- error) { func (t *Task) Collect(results chan File, f *os.File, errC chan<- error) {
err := t.collect(results, f) err := t.collect(results, f)
if err != nil { if err != nil {
@@ -198,51 +265,3 @@ func (t *Task) collect(results chan File, f *os.File) error {
return nil return nil
} }
func makeJobBuffer(c context.Context) (chan<- Job, <-chan Job) {
in := make(chan Job)
out := make(chan Job)
go bufferJobs(c, in, out)
return in, out
}
func bufferJobs(c context.Context, in chan Job, out chan Job) {
defer close(out)
var inQueue []Job
outCh := func() chan Job {
if len(inQueue) == 0 {
return nil
}
return out
}
for len(inQueue) > 0 || in != nil {
if len(inQueue) == 0 {
select {
case v, ok := <-in:
if !ok {
in = nil
} else {
atomic.AddInt64(&totalBuffered, 1)
inQueue = append(inQueue, v)
}
case <-c.Done():
return
}
} else {
select {
case v, ok := <-in:
if !ok {
in = nil
} else {
atomic.AddInt64(&totalBuffered, 1)
inQueue = append(inQueue, v)
}
case outCh() <- inQueue[0]:
atomic.AddInt64(&totalBuffered, -1)
inQueue = inQueue[1:]
case <-c.Done():
return
}
}
}
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"io" "io"
"mime/multipart" "mime/multipart"
"net/http" "net/http"
@@ -16,8 +17,11 @@ import (
var serverClient = http.Client { var serverClient = http.Client {
Timeout: config.ServerTimeout, Timeout: config.ServerTimeout,
Transport: new(ServerTripper),
} }
var serverUserAgent = "od-database-crawler/" + rootCmd.Version
func FetchTask() (t *Task, err error) { func FetchTask() (t *Task, err error) {
res, err := serverClient.PostForm( res, err := serverClient.PostForm(
config.ServerUrl + "/task/get", config.ServerUrl + "/task/get",
@@ -102,11 +106,10 @@ func uploadChunks(websiteId uint64, f *os.File) error {
multi.Close() multi.Close()
for retried := false; true; retried = true { for retries := 0; retries < viper.GetInt(ConfUploadRetries); retries++ {
err = nil if retries > 0 {
if retried {
// Error occurred, retry upload // Error occurred, retry upload
time.Sleep(5 * time.Second) time.Sleep(viper.GetDuration(ConfUploadRetryInterval))
} }
req, err := http.NewRequest( req, err := http.NewRequest(
@@ -176,3 +179,10 @@ func CancelTask(websiteId uint64) (err error) {
return return
} }
type ServerTripper struct{}
func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) {
req.Header.Set("User-Agent", serverUserAgent)
return http.DefaultTransport.RoundTrip(req)
}

View File

@@ -3,6 +3,7 @@ package main
import ( import (
"context" "context"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"math" "math"
"runtime" "runtime"
"sync/atomic" "sync/atomic"
@@ -19,11 +20,14 @@ func Stats(c context.Context) {
var crawlTicker <-chan time.Time var crawlTicker <-chan time.Time
var allocTicker <-chan time.Time var allocTicker <-chan time.Time
if config.CrawlStats != 0 { crawlInterval := viper.GetDuration(ConfCrawlStats)
crawlTicker = time.NewTicker(config.CrawlStats).C allocInterval := viper.GetDuration(ConfAllocStats)
if crawlInterval != 0 {
crawlTicker = time.Tick(crawlInterval)
} }
if config.AllocStats != 0 { if allocInterval != 0 {
allocTicker = time.NewTicker(config.AllocStats).C allocTicker = time.Tick(allocInterval)
} }
for { for {
@@ -32,7 +36,7 @@ func Stats(c context.Context) {
startedNow := atomic.LoadUint64(&totalStarted) startedNow := atomic.LoadUint64(&totalStarted)
perSecond := float64(startedNow - startedLast) / perSecond := float64(startedNow - startedLast) /
config.CrawlStats.Seconds() crawlInterval.Seconds()
// Round to .5 // Round to .5
perSecond *= 2 perSecond *= 2
@@ -57,7 +61,7 @@ func Stats(c context.Context) {
runtime.ReadMemStats(&mem) runtime.ReadMemStats(&mem)
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"queue_count": atomic.LoadInt64(&totalBuffered), "queue_count": atomic.LoadInt64(&totalQueued),
"heap": FormatByteCount(mem.Alloc), "heap": FormatByteCount(mem.Alloc),
"objects": mem.HeapObjects, "objects": mem.HeapObjects,
"num_gc": mem.NumGC, "num_gc": mem.NumGC,

View File

@@ -1,8 +1,9 @@
package main package main
import ( import (
"context"
"github.com/beeker1121/goque"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/valyala/fasthttp"
"math" "math"
"sort" "sort"
"strings" "strings"
@@ -14,24 +15,57 @@ import (
var globalWait sync.WaitGroup var globalWait sync.WaitGroup
type WorkerContext struct { type WorkerContext struct {
in chan<- Job OD *OD
out <-chan Job Queue *BufferedQueue
lastRateLimit time.Time lastRateLimit time.Time
numRateLimits int numRateLimits int
workers sync.WaitGroup
aborted int32
} }
func (w WorkerContext) Worker(results chan<- File) { func (w *WorkerContext) SpawnWorkers(c context.Context, results chan<- File, n int) {
for job := range w.out { w.workers.Add(n)
w.step(results, job) for i := 0; i < n; i++ {
go w.Worker(c, results)
} }
} }
func (w WorkerContext) step(results chan<- File, job Job) { func (w *WorkerContext) Worker(c context.Context, results chan<- File) {
defer w.finishJob(&job) defer w.workers.Done()
for {
select {
case <-c.Done():
// Not yet done
atomic.StoreInt32(&w.aborted, 1)
return
default:
}
job, err := w.Queue.Dequeue()
switch err {
case goque.ErrEmpty:
time.Sleep(500 * time.Millisecond)
continue
case goque.ErrDBClosed:
return
case nil:
w.step(results, job)
default:
panic(err)
}
}
}
func (w *WorkerContext) step(results chan<- File, job Job) {
defer w.finishJob()
var f File var f File
newJobs, err := DoJob(&job, &f) newJobs, err := w.DoJob(&job, &f)
atomic.AddUint64(&totalStarted, 1) atomic.AddUint64(&totalStarted, 1)
if err == ErrKnown { if err == ErrKnown {
return return
@@ -40,15 +74,13 @@ func (w WorkerContext) step(results chan<- File, job Job) {
if err != nil { if err != nil {
job.Fails++ job.Fails++
if httpErr, ok := err.(*HttpError); ok { if !shouldRetry(err) {
switch httpErr.code { atomic.AddUint64(&totalAborted, 1)
case fasthttp.StatusTooManyRequests: logrus.WithField("url", job.UriStr).
err = ErrRateLimit WithError(err).
default: Error("Giving up after failure")
// Don't retry HTTP error codes
return return
} }
}
if job.Fails > config.Retries { if job.Fails > config.Retries {
atomic.AddUint64(&totalAborted, 1) atomic.AddUint64(&totalAborted, 1)
@@ -75,7 +107,7 @@ func (w WorkerContext) step(results chan<- File, job Job) {
} }
} }
func DoJob(job *Job, f *File) (newJobs []Job, err error) { func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
if len(job.Uri.Path) == 0 { return } if len(job.Uri.Path) == 0 { return }
if job.Uri.Path[len(job.Uri.Path)-1] == '/' { if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
// Load directory // Load directory
@@ -93,7 +125,7 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
hash := f.HashDir(links) hash := f.HashDir(links)
// Skip symlinked dirs // Skip symlinked dirs
if job.OD.LoadOrStoreKey(&hash) { if w.OD.LoadOrStoreKey(&hash) {
return nil, ErrKnown return nil, ErrKnown
} }
@@ -114,7 +146,6 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
lastLink = uriStr lastLink = uriStr
newJobs = append(newJobs, Job{ newJobs = append(newJobs, Job{
OD: job.OD,
Uri: link, Uri: link,
UriStr: uriStr, UriStr: uriStr,
Fails: 0, Fails: 0,
@@ -139,13 +170,13 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
} }
return nil, err return nil, err
} }
atomic.AddUint64(&job.OD.Result.FileCount, 1) atomic.AddUint64(&w.OD.Result.FileCount, 1)
} }
return return
} }
func (w WorkerContext) queueJob(job Job) { func (w *WorkerContext) queueJob(job Job) {
job.OD.Wait.Add(1) atomic.AddInt64(&w.OD.InProgress, 1)
if w.numRateLimits > 0 { if w.numRateLimits > 0 {
if time.Since(w.lastRateLimit) > 5 * time.Second { if time.Since(w.lastRateLimit) > 5 * time.Second {
@@ -156,11 +187,13 @@ func (w WorkerContext) queueJob(job Job) {
} }
} }
w.in <- job if err := w.Queue.Enqueue(&job); err != nil {
panic(err)
}
} }
func (w WorkerContext) finishJob(job *Job) { func (w *WorkerContext) finishJob() {
job.OD.Wait.Done() atomic.AddInt64(&w.OD.InProgress, -1)
} }
func isErrSilent(err error) bool { func isErrSilent(err error) bool {