57 Commits

Author SHA1 Message Date
simon987
a962c60b82 Don't panic on file upload error 2019-04-06 14:56:22 -04:00
simon987
24f0bd91f7 Remove debug messages & don't use disk queue by default 2019-04-06 12:21:35 -04:00
simon987
84c10e1981 Change default config 2019-04-05 17:53:00 -04:00
simon987
860fa79327 Jenkins setup 2019-04-04 19:22:14 -04:00
simon987
76bc8293d6 minimum viable 2019-03-30 09:02:55 -04:00
simon987
3470be6086 More work on task_tracker integration 2019-03-09 16:38:04 -05:00
Richard Patel
60471a081e Switch to simon987/task_tracker 2019-02-28 23:51:26 +01:00
dependabot[bot]
0b3f0d87fe Upgrade fasthttp to 1.2.0
Bumps [github.com/valyala/fasthttp](https://github.com/valyala/fasthttp) from 1.1.0 to 1.2.0.
- [Release notes](https://github.com/valyala/fasthttp/releases)
- [Commits](https://github.com/valyala/fasthttp/compare/v1.1.0...v1.2.0)

Thanks bot

Signed-off-by: dependabot[bot] <support@dependabot.com>
2019-02-28 22:42:40 +01:00
terorie
da9c75e392 Reduce Docker image size 2019-02-22 21:37:04 +01:00
Pascal
8947e05d0c Fix Dockerfile
Fixes #22
Credit to @pascaldulieu
2019-02-22 21:11:55 +01:00
Richard Patel
8c5f99d616 More descriptive error if /task/get returns invalid JSON 2019-02-22 20:17:59 +01:00
Richard Patel
206ea0e91d Simplify config 2019-02-22 18:50:35 +01:00
Richard Patel
8b9d8bfd17 Fix README.md format 2019-02-22 06:04:10 +01:00
Richard Patel
c9ff102d80 Fix Dockerfile 2019-02-22 06:00:57 +01:00
Richard Patel
88856c1c19 Flag explanation in README.md 2019-02-22 05:59:59 +01:00
Richard Patel
9e9b606250 Merge branch 'stable' 2019-02-22 05:37:52 +01:00
Richard Patel
326e29e5e4 Reset to stable branch 2019-02-22 05:37:45 +01:00
Richard Patel
c2acd5463f Restore .travis.yml
Now handling auto-build over Docker Hub directly
2019-02-22 05:16:25 +01:00
Richard Patel
e4d04e6a5f go.mod: Fix package path
lol
2019-02-22 05:10:43 +01:00
terorie
9f1402e841 New Dockerfile and Travis Config (#23) 2019-02-22 05:07:27 +01:00
terorie
7c8ab50ee4 Merge stable into master 2019-02-13 15:32:40 +01:00
terorie
281d2d17d6 Update config.yml 2019-02-13 15:32:00 +01:00
Richard Patel
45cbd4d535 Disable resume feature 2019-02-05 15:44:59 +01:00
Richard Patel
771d49f2dd Fix WaitGroup deadlock 2019-02-03 17:14:20 +01:00
Richard Patel
dbd787aa81 Fix WaitGroup crash 2019-02-03 17:09:43 +01:00
Richard Patel
cea6c1658b Bugfix: Don't schedule new tasks during shutdown 2019-02-03 17:02:44 +01:00
terorie
885af5bb3b Beta task resuming 2019-02-03 16:50:08 +01:00
Richard Patel
b18b70f798 Fix segfault (thanks Pikami) 2019-02-03 14:00:17 +01:00
Richard Patel
9d5f549774 Better server User-Agent string 2019-02-03 12:23:21 +01:00
Richard Patel
5239af08f7 Bump version to v1.2.1 2019-02-03 03:36:39 +01:00
Richard Patel
46c0e0bd32 Smarter HTTP error handling 2019-02-03 03:35:09 +01:00
Richard Patel
0ca6deede8 Fix --config flag 2019-02-03 03:26:48 +01:00
Richard Patel
120c026983 Bump version to v1.2.0 2019-02-03 02:55:21 +01:00
Richard Patel
527e8895ec Support configuration without config file 2019-02-03 02:54:52 +01:00
Richard Patel
108fff0503 Add Travis CI badge 2019-02-03 02:09:06 +01:00
Richard Patel
e5746baa5b Switch to spf13/cobra
lul
2019-02-03 02:02:23 +01:00
Richard Patel
17ba5583c9 Add .travis.yml 2019-02-02 23:18:03 +01:00
Richard Patel
92a8c07f4a Add go.mod 2019-02-02 23:15:52 +01:00
Richard Patel
43f96c6988 Benchmark: Reference parser 2018-12-18 15:39:41 +01:00
Richard Patel
b244cdae80 Minor cleanup 2018-12-18 15:31:33 +01:00
Richard Patel
4b8275c7bf Add parser tests 2018-12-18 15:31:09 +01:00
Richard Patel
f90bf94a44 Bump version to v1.1.1 2018-11-27 22:11:57 +01:00
Richard Patel
e82768ff80 Wait time control in config 2018-11-27 22:11:57 +01:00
Richard Patel
b1bf59adef Add The Eye DB to README.md 2018-11-27 17:40:12 +01:00
Richard Patel
a2df2972f4 Bump the upload retry interval up to 30s 2018-11-20 04:13:20 +01:00
Richard Patel
3fc8837dd7 Add output files to .gitignore 2018-11-20 03:51:42 +01:00
Richard Patel
f9a0d6bffe Bump to v1.1.0 2018-11-20 03:46:36 +01:00
Richard Patel
4dbe2aef2b Add job buffer size parameter 2018-11-20 03:42:32 +01:00
Richard Patel
86ec78cae1 Add TCP timeout option 2018-11-20 03:29:10 +01:00
Richard Patel
b846498030 Delete URL queues after crawling 2018-11-20 03:05:43 +01:00
Richard Patel
4f3140a39f Fix queue_count in log 2018-11-20 02:49:03 +01:00
Richard Patel
85d2aac9d4 Performance patch 2018-11-20 02:33:50 +01:00
Richard Patel
b6c0a45900 Job queue disk offloading 2018-11-20 02:03:10 +01:00
Richard Patel
d332f06659 Limit retries to 10 2018-11-18 21:05:26 +01:00
Richard Patel
1625d6c888 Bump to v1.0.2 2018-11-18 18:53:57 +01:00
Richard Patel
03a487f393 Fix crawl loop 2018-11-18 18:45:06 +01:00
Richard Patel
ac8221b109 Retry /task/upload 2018-11-18 18:33:26 +01:00
21 changed files with 5891 additions and 338 deletions

5
.gitignore vendored
View File

@@ -1,3 +1,6 @@
/.idea/
.DS_Store
/od-database-crawler
/od-database-crawler
*.log
/queue/
/crawled/

5
.travis.yml Normal file
View File

@@ -0,0 +1,5 @@
language: go
go:
- "1.11.x"
- master

15
Dockerfile Normal file
View File

@@ -0,0 +1,15 @@
FROM golang:alpine as builder
ADD . /go/src/github.com/terorie/od-database-crawler
RUN apk add git \
&& go get -d -v github.com/terorie/od-database-crawler \
&& CGO_ENABLED=0 go install -a \
-installsuffix cgo \
-ldflags="-s -w" \
github.com/terorie/od-database-crawler
FROM scratch
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /go/bin/od-database-crawler /bin/
WORKDIR /oddb
VOLUME [ "/oddb" ]
CMD ["/bin/od-database-crawler", "server"]

View File

@@ -1,7 +1,54 @@
# od-database Go crawler 🚀
[![Build Status](https://travis-ci.org/terorie/od-database-crawler.svg?branch=master)](https://travis-ci.org/terorie/od-database-crawler)
> by terorie 2018 :P
* Crawler for [__OD-Database__](https://github.com/simon987/od-database)
* Crawls HTTP open directories (standard Web Server Listings)
* Gets name, path, size and modification time of all files
* Lightweight and fast: __over 9000 requests per second__ on a standard laptop
https://od-db.the-eye.eu/
## Usage
### Deploys
1. With Config File (if `config.yml` found in working dir)
- Download [default config](https://github.com/terorie/od-database-crawler/blob/master/config.yml)
- Set `server.url` and `server.token`
- Start with `./od-database-crawler server --config <file>`
2. With Flags or env
- Override config file if it exists
- `--help` for list of flags
- Every flag is available as an environment variable:
`--server.crawl_stats` ➡️ `OD_SERVER_CRAWL_STATS`
- Start with `./od-database-crawler server <flags>`
3. With Docker
```bash
docker run \
-e OD_SERVER_URL=xxx \
-e OD_SERVER_TOKEN=xxx \
terorie/od-database-crawler
```
### Flag reference
Here are the most important config flags. For more fine control, take a look at `/config.yml`.
| Flag/Environment | Description | Example |
| ------------------------------------------------------- | ------------------------------------------------------------ | ----------------------------------- |
| `server.url`<br />`OD_SERVER_URL` | OD-DB Server URL | `https://od-db.mine.the-eye.eu/api` |
| `server.token`<br />`OD_SERVER_TOKEN` | OD-DB Server Access Token | _Ask Hexa **TM**_ |
| `server.recheck`<br />`OD_SERVER_RECHECK` | Job Fetching Interval | `3s` |
| `output.crawl_stats`<br />`OD_OUTPUT_CRAWL_STATS` | Crawl Stats Logging Interval (0 = disabled) | `500ms` |
| `output.resource_stats`<br />`OD_OUTPUT_RESORUCE_STATS` | Resource Stats Logging Interval (0 = disabled) | `8s` |
| `output.log`<br />`OD_OUTPUT_LOG` | Log File (none = disabled) | `crawler.log` |
| `crawl.tasks`<br />`OD_CRAWL_TASKS` | Max number of sites to crawl concurrently | `500` |
| `crawl.connections`<br />`OD_CRAWL_CONNECTIONS` | HTTP connections per site | `1` |
| `crawl.retries`<br />`OD_CRAWL_RETRIES` | How often to retry after a temporary failure (e.g. `HTTP 429` or timeouts) | `5` |
| `crawl.dial_timeout`<br />`OD_CRAWL_DIAL_TIMEOUT` | TCP Connect timeout | `5s` |
| `crawl.timeout`<br />`OD_CRAWL_TIMEOUT` | HTTP request timeout | `20s` |
| `crawl.user-agent`<br />`OD_CRAWL_USER_AGENT` | HTTP Crawler User-Agent | `googlebot/1.2.3` |
| `crawl.job_buffer`<br />`OD_CRAWL_JOB_BUFFER` | Number of URLs to keep in memory/cache, per job. The rest is offloaded to disk. Decrease this value if the crawler uses too much RAM. (0 = Disable Cache, -1 = Only use Cache) | `5000` |

202
config.go
View File

@@ -4,6 +4,7 @@ import (
"bufio"
"fmt"
"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"io"
"os"
@@ -12,33 +13,46 @@ import (
)
var config struct {
ServerUrl string
Token string
ServerTimeout time.Duration
Recheck time.Duration
ChunkSize int64
Retries int
Workers int
UserAgent string
Timeout time.Duration
Tasks int32
CrawlStats time.Duration
AllocStats time.Duration
Verbose bool
PrintHTTP bool
TrackerUrl string
TrackerProject int
TrackerAlias string
WsBucketScheme string
WsBucketHost string
ServerTimeout time.Duration
Recheck time.Duration
ChunkSize int64
Retries int
Workers int
UserAgent string
Tasks int32
Verbose bool
PrintHTTP bool
JobBufferSize int
}
var onlineMode bool
const (
ConfServerUrl = "server.url"
ConfToken = "server.token"
ConfServerTimeout = "server.timeout"
ConfRecheck = "server.recheck"
ConfChunkSize = "server.upload_chunk"
ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections"
ConfUserAgent = "crawl.user-agent"
ConfTimeout = "crawl.timeout"
ConfTrackerUrl = "server.url"
ConfTrackerProject = "server.project"
ConfTrackerAlias = "server.alias"
ConfWsBucketScheme = "server.ws_bucket_scheme"
ConfWsBucketHost = "server.ws_bucket_host"
ConfServerTimeout = "server.timeout"
ConfRecheck = "server.recheck"
ConfCooldown = "server.cooldown"
ConfChunkSize = "server.upload_chunk"
ConfUploadRetries = "server.upload_retries"
ConfUploadRetryInterval = "server.upload_retry_interval"
ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections"
ConfUserAgent = "crawl.user-agent"
ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout"
ConfJobBufferSize = "crawl.job_buffer"
ConfCrawlStats = "output.crawl_stats"
ConfAllocStats = "output.resource_stats"
ConfVerbose = "output.verbose"
@@ -47,40 +61,116 @@ const (
)
func prepareConfig() {
viper.SetDefault(ConfRetries, 5)
viper.SetDefault(ConfWorkers, 2)
viper.SetDefault(ConfTasks, 3)
viper.SetDefault(ConfUserAgent, "")
viper.SetDefault(ConfTimeout, 10 * time.Second)
viper.SetDefault(ConfCrawlStats, 3 * time.Second)
viper.SetDefault(ConfAllocStats, 0)
viper.SetDefault(ConfVerbose, false)
viper.SetDefault(ConfPrintHTTP, false)
viper.SetDefault(ConfLogFile, "")
viper.SetDefault(ConfRecheck, 3 * time.Second)
viper.SetDefault(ConfChunkSize, "1 MB")
pf := rootCmd.PersistentFlags()
pf.SortFlags = false
pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG")
pf.String(ConfTrackerUrl, "https://tt.the-eye.eu/api", "task_tracker api URL")
pf.String(ConfTrackerProject, "1", "task_tracker project id")
pf.String(ConfWsBucketScheme, "wss", "ws_bucket scheme")
pf.String(ConfWsBucketHost, "wsb.the-eye.eu", "ws_bucket host")
pf.String(ConfTrackerAlias, "changeme", "task_tracker worker alias")
pf.Duration(ConfServerTimeout, 60*time.Second, "OD-DB request timeout")
pf.Duration(ConfRecheck, 1*time.Second, "OD-DB: Poll interval for new jobs")
pf.Duration(ConfCooldown, 1*time.Minute, "OD-DB: Time to wait after a server-side error")
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
pf.Duration(ConfUploadRetryInterval, 30*time.Second, "OD-DB: Time to wait between upload retries")
pf.Uint(ConfTasks, 25, "Crawler: Max concurrent tasks")
pf.Uint(ConfWorkers, 1, "Crawler: Connections per server")
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
pf.Duration(ConfDialTimeout, 10*time.Second, "Crawler: Handshake timeout")
pf.Duration(ConfTimeout, 30*time.Second, "Crawler: Request timeout")
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
pf.Int(ConfJobBufferSize, -1, "Crawler: Task queue cache size")
pf.Duration(ConfCrawlStats, 500*time.Second, "Log: Crawl stats interval")
pf.Duration(ConfAllocStats, 500*time.Second, "Log: Resource stats interval")
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors")
pf.String(ConfLogFile, "crawler.log", "Log file")
// Bind all flags to Viper
pf.VisitAll(func(flag *pflag.Flag) {
s := flag.Name
s = strings.TrimLeft(s, "-")
if err := viper.BindPFlag(s, flag); err != nil {
panic(err)
}
var envKey string
envKey = strings.Replace(s, ".", "_", -1)
envKey = strings.ToUpper(envKey)
envKey = "OD_" + envKey
if err := viper.BindEnv(s, envKey); err != nil {
panic(err)
}
})
}
func readConfig() {
viper.AddConfigPath(".")
viper.SetConfigName("config")
err := viper.ReadInConfig()
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
// If config.yml in working dir, use it
if configFile == "" {
_, err := os.Stat("config.yml")
if err == nil {
configFile = "config.yml"
}
}
config.ServerUrl = viper.GetString(ConfServerUrl)
if config.ServerUrl == "" {
configMissing(ConfServerUrl)
}
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/")
if configFile != "" {
confF, err := os.Open(configFile)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
defer confF.Close()
config.Token = viper.GetString(ConfToken)
if config.Token == "" {
configMissing(ConfToken)
viper.SetConfigType("yml")
err = viper.ReadConfig(confF)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
if onlineMode {
config.TrackerUrl = viper.GetString(ConfTrackerUrl)
if config.TrackerUrl == "" {
configMissing(ConfTrackerUrl)
}
config.TrackerUrl = strings.TrimRight(config.TrackerUrl, "/")
}
config.TrackerProject = viper.GetInt(ConfTrackerProject)
config.TrackerAlias = viper.GetString(ConfTrackerAlias)
config.WsBucketHost = viper.GetString(ConfWsBucketHost)
config.WsBucketScheme = viper.GetString(ConfWsBucketScheme)
config.ServerTimeout = viper.GetDuration(ConfServerTimeout)
config.Recheck = viper.GetDuration(ConfRecheck)
@@ -107,11 +197,11 @@ func readConfig() {
config.UserAgent = viper.GetString(ConfUserAgent)
config.Timeout = viper.GetDuration(ConfTimeout)
setDialTimeout(viper.GetDuration(ConfDialTimeout))
config.CrawlStats = viper.GetDuration(ConfCrawlStats)
setTimeout(viper.GetDuration(ConfTimeout))
config.AllocStats = viper.GetDuration(ConfAllocStats)
config.JobBufferSize = viper.GetInt(ConfJobBufferSize)
config.Verbose = viper.GetBool(ConfVerbose)
if config.Verbose {
@@ -119,9 +209,11 @@ func readConfig() {
}
if filePath := viper.GetString(ConfLogFile); filePath != "" {
f, err := os.OpenFile(filePath, os.O_CREATE | os.O_WRONLY | os.O_APPEND, 0644)
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
bufWriter := bufio.NewWriter(f)
if err != nil { panic(err) }
if err != nil {
panic(err)
}
exitHooks.Add(func() {
bufWriter.Flush()
f.Close()

View File

@@ -1,10 +1,14 @@
# OD-Database server settings
server:
# Connection URL
url: http://od-db.mine.terorie.com/api
# Server auth token
token:
url: https://tt.the-eye.eu/api
# OD-Database project id (for crawling)
project: 1
# Your worker alias
alias: changeme
# Websocket bucket host & scheme (ws/wss)
ws_bucket_host: https://wsb.the-eye.eu
ws_bucket_scheme: wss
# Request timeout
timeout: 60s
@@ -15,17 +19,20 @@ server:
# between /task/get requests to the server.
recheck: 1s
# Upload chunk size
# If the value is too high, the upload fails.
upload_chunk: 1 MB
# Time to wait after receiving an error
# from the server. Doesn't apply to uploads.
cooldown: 1s
upload_retries: 10
upload_retry_interval: 30s
# Log output settings
output:
# Crawl statistics
crawl_stats: 1s
crawl_stats: 1m
# CPU/RAM/Job queue stats
resource_stats: 10s
resource_stats: 1m
# More output? (Every listed dir)
verbose: false
@@ -40,21 +47,38 @@ output:
# Crawler settings
crawl:
# Number of sites that can be processed at once
tasks: 100
tasks: 25
# Number of connections per site
# Please be careful with this setting!
# The crawler fires fast and more than
# ten connections can overwhelm a server.
connections: 10
connections: 1
# How often to retry getting data
# from the site before giving up
retries: 5
# Time before discarding a failed connection attempt
dial_timeout: 10s
# Time before discarding a network request
timeout: 10s
timeout: 30s
# Crawler User-Agent
# If empty, no User-Agent header is sent.
user-agent: "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0"
# Job buffer size (per task)
# Higher values cause less disk writes
# but require more memory.
#
# The job queue contains all URLs
# that should be crawled next.
# As it grows very large over time,
# it's kept mainly on disk.
# This sets how many jobs are kept
# in memory.
# A negative value will cause all jobs
# to be stored in memory. (Don't do this)
job_buffer: -1

View File

@@ -8,6 +8,7 @@ import (
"github.com/valyala/fasthttp"
"golang.org/x/crypto/blake2b"
"golang.org/x/net/html"
"net"
"path"
"strconv"
"strings"
@@ -20,6 +21,17 @@ var client = fasthttp.Client {
},
}
func setDialTimeout(d time.Duration) {
client.Dial = func(addr string) (net.Conn, error) {
return fasthttp.DialTimeout(addr, d)
}
}
func setTimeout(d time.Duration) {
client.ReadTimeout = d
client.WriteTimeout = d / 2
}
func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
f.IsDir = true
f.Name = path.Base(j.Uri.Path)
@@ -33,7 +45,7 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
res := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(res)
err = client.DoTimeout(req, res, config.Timeout)
err = client.Do(req, res)
fasthttp.ReleaseRequest(req)
if err != nil {
@@ -46,10 +58,16 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
}
body := res.Body()
return ParseDir(body, &j.Uri)
}
func ParseDir(body []byte, baseUri *fasturl.URL) (links []fasturl.URL, err error) {
doc := html.NewTokenizer(bytes.NewReader(body))
var linkHref string
for {
err = nil
tokenType := doc.Next()
if tokenType == html.ErrorToken {
break
@@ -80,36 +98,34 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
linkHref = ""
if strings.LastIndexByte(href, '?') != -1 {
goto nextToken
continue
}
switch href {
case "", " ", ".", "..", "/":
goto nextToken
continue
}
if strings.Contains(href, "../") {
goto nextToken
continue
}
var link fasturl.URL
err = j.Uri.ParseRel(&link, href)
err = baseUri.ParseRel(&link, href)
if err != nil {
continue
}
if link.Scheme != j.Uri.Scheme ||
link.Host != j.Uri.Host ||
link.Path == j.Uri.Path ||
!strings.HasPrefix(link.Path, j.Uri.Path) {
if link.Scheme != baseUri.Scheme ||
link.Host != baseUri.Host ||
link.Path == baseUri.Path ||
!strings.HasPrefix(link.Path, baseUri.Path) {
continue
}
links = append(links, link)
}
}
nextToken:
}
return
@@ -132,7 +148,7 @@ func GetFile(u fasturl.URL, f *File) (err error) {
res.SkipBody = true
defer fasthttp.ReleaseResponse(res)
err = client.DoTimeout(req, res, config.Timeout)
err = client.Do(req, res)
fasthttp.ReleaseRequest(req)
if err != nil {

4766
crawl_apache2_test.go Normal file

File diff suppressed because it is too large Load Diff

117
crawl_nginx_test.go Normal file
View File

@@ -0,0 +1,117 @@
package main
import (
"github.com/terorie/od-database-crawler/fasturl"
"testing"
)
func TestParseDirNginx(t *testing.T) {
var u fasturl.URL
err := u.Parse("https://the-eye.eu/public/")
if err != nil {
t.Fatal("Failed to parse URL", err)
}
links, err := ParseDir([]byte(nginxListing), &u)
if err != nil {
t.Fatal("Failed to extract links", err)
}
if len(links) != len(nginxLinks) {
t.Fatalf("Expected %d links, got %d",
len(nginxLinks), len(links))
}
for i := 0; i < len(links); i++ {
gotLink := links[i].String()
expLink := nginxLinks[i]
if gotLink != expLink {
t.Errorf(`Expected "%s" got "%s"`,
expLink, gotLink)
}
}
}
var nginxLinks = []string {
"https://the-eye.eu/public/AppleArchive/",
"https://the-eye.eu/public/AudioBooks/",
"https://the-eye.eu/public/Books/",
"https://the-eye.eu/public/Comics/",
"https://the-eye.eu/public/Games/",
"https://the-eye.eu/public/Icons/",
"https://the-eye.eu/public/Images/",
"https://the-eye.eu/public/JFK_Files/",
"https://the-eye.eu/public/MSDN/",
"https://the-eye.eu/public/Music/",
"https://the-eye.eu/public/Operating%20Systems/",
"https://the-eye.eu/public/Posters/",
"https://the-eye.eu/public/Psychedelics/",
"https://the-eye.eu/public/Psychoactives/",
"https://the-eye.eu/public/Radio/",
"https://the-eye.eu/public/Random/",
"https://the-eye.eu/public/Site-Dumps/",
"https://the-eye.eu/public/Software/",
"https://the-eye.eu/public/Strategic%20Intelligence%20Network/",
"https://the-eye.eu/public/WorldTracker.org/",
"https://the-eye.eu/public/concen.org/",
"https://the-eye.eu/public/freenrg.info/",
"https://the-eye.eu/public/murdercube.com/",
"https://the-eye.eu/public/parazite/",
"https://the-eye.eu/public/ripreddit/",
"https://the-eye.eu/public/rom/",
"https://the-eye.eu/public/touhou/",
"https://the-eye.eu/public/vns/",
"https://the-eye.eu/public/xbins/",
"https://the-eye.eu/public/xbins.diodematrix/",
"https://the-eye.eu/public/Rclone_for_Scrubs.pdf",
"https://the-eye.eu/public/Wget_Linux_Guide.pdf",
"https://the-eye.eu/public/Wget_Windows_Guide.pdf",
"https://the-eye.eu/public/rclone_guide.pdf",
"https://the-eye.eu/public/wget-noobs-guide.pdf",
"https://the-eye.eu/public/xbox-scene_Aug2014.7z",
}
const nginxListing =
`<html>
<head><title>Index of /public/</title></head>
<body bgcolor="white">
<h1>Index of /public/</h1><hr><pre><a href="../">../</a>
<a href="AppleArchive/">AppleArchive/</a> 03-Nov-2017 18:13 -
<a href="AudioBooks/">AudioBooks/</a> 29-Sep-2018 19:47 -
<a href="Books/">Books/</a> 27-Nov-2018 17:50 -
<a href="Comics/">Comics/</a> 05-Nov-2018 21:37 -
<a href="Games/">Games/</a> 28-Nov-2018 11:54 -
<a href="Icons/">Icons/</a> 22-May-2018 07:47 -
<a href="Images/">Images/</a> 21-Jan-2018 03:21 -
<a href="JFK_Files/">JFK_Files/</a> 03-Nov-2017 17:03 -
<a href="MSDN/">MSDN/</a> 03-Nov-2017 15:48 -
<a href="Music/">Music/</a> 02-Mar-2018 15:47 -
<a href="Operating%20Systems/">Operating Systems/</a> 25-Apr-2018 07:18 -
<a href="Posters/">Posters/</a> 07-Jul-2018 01:12 -
<a href="Psychedelics/">Psychedelics/</a> 11-Apr-2018 05:45 -
<a href="Psychoactives/">Psychoactives/</a> 18-May-2018 02:58 -
<a href="Radio/">Radio/</a> 09-Jun-2018 15:49 -
<a href="Random/">Random/</a> 04-Dec-2018 12:33 -
<a href="Site-Dumps/">Site-Dumps/</a> 15-Dec-2018 11:04 -
<a href="Software/">Software/</a> 27-Nov-2017 00:22 -
<a href="Strategic%20Intelligence%20Network/">Strategic Intelligence Network/</a> 17-Nov-2017 16:35 -
<a href="WorldTracker.org/">WorldTracker.org/</a> 12-Apr-2018 04:16 -
<a href="concen.org/">concen.org/</a> 08-Oct-2018 14:08 -
<a href="freenrg.info/">freenrg.info/</a> 19-Dec-2017 10:59 -
<a href="murdercube.com/">murdercube.com/</a> 06-Dec-2017 10:45 -
<a href="parazite/">parazite/</a> 20-Nov-2017 21:25 -
<a href="ripreddit/">ripreddit/</a> 04-Aug-2018 14:30 -
<a href="rom/">rom/</a> 28-Nov-2018 14:15 -
<a href="touhou/">touhou/</a> 03-Nov-2017 11:07 -
<a href="vns/">vns/</a> 03-Nov-2017 11:36 -
<a href="xbins/">xbins/</a> 03-Nov-2017 17:23 -
<a href="xbins.diodematrix/">xbins.diodematrix/</a> 21-Sep-2018 22:33 -
<a href="Rclone_for_Scrubs.pdf">Rclone_for_Scrubs.pdf</a> 04-Sep-2018 13:31 315K
<a href="Wget_Linux_Guide.pdf">Wget_Linux_Guide.pdf</a> 21-Dec-2017 20:28 168K
<a href="Wget_Windows_Guide.pdf">Wget_Windows_Guide.pdf</a> 25-Nov-2017 17:59 867K
<a href="rclone_guide.pdf">rclone_guide.pdf</a> 03-Sep-2018 23:37 315K
<a href="wget-noobs-guide.pdf">wget-noobs-guide.pdf</a> 21-Dec-2017 20:29 168K
<a href="xbox-scene_Aug2014.7z">xbox-scene_Aug2014.7z</a> 26-Oct-2017 23:09 1G
</pre><hr></body>
</html>`

59
crawl_test.go Normal file
View File

@@ -0,0 +1,59 @@
package main
import (
"bytes"
"github.com/PuerkitoBio/goquery"
"github.com/terorie/od-database-crawler/fasturl"
"net/url"
"strings"
"testing"
)
func BenchmarkParseDir(b *testing.B) {
for n := 0; n < b.N; n++ {
var u fasturl.URL
err := u.Parse("http://archive.ubuntu.com/ubuntu/indices/")
if err != nil {
b.Fatal("Failed to parse URL", err)
}
_, err = ParseDir([]byte(apache2Listing), &u)
if err != nil {
b.Fatal("Failed to extract links", err)
}
}
}
func BenchmarkParseDirReference(b *testing.B) {
for n := 0; n < b.N; n++ {
u, err := url.Parse("http://archive.ubuntu.com/ubuntu/indices/")
if err != nil {
b.Fatal("Failed to parse URL", err)
}
_, err = referenceParseDir([]byte(apache2Listing), u)
if err != nil {
b.Fatal("Failed to extract links", err)
}
}
}
func referenceParseDir(body []byte, baseUri *url.URL) (links []*url.URL, err error) {
doc, err := goquery.NewDocumentFromReader(bytes.NewReader(body))
if err != nil { return nil, err }
doc.Find("a[href]").Each(func(i int, s *goquery.Selection) {
href, _ := s.Attr("href")
sub, err := baseUri.Parse(href)
if err != nil { return } // continue
if !strings.HasPrefix(sub.String(), baseUri.String()) {
return // continue
}
links = append(links, sub)
})
return
}

View File

@@ -3,6 +3,8 @@ package main
import (
"errors"
"fmt"
"github.com/valyala/fasthttp"
"net"
)
var ErrRateLimit = errors.New("too many requests")
@@ -15,3 +17,29 @@ type HttpError struct {
func (e HttpError) Error() string {
return fmt.Sprintf("http status %d", e.code)
}
func shouldRetry(err error) bool {
// HTTP errors
if httpErr, ok := err.(*HttpError); ok {
switch httpErr.code {
case fasthttp.StatusTooManyRequests:
return true
default:
// Don't retry HTTP error codes
return false
}
}
if dnsError, ok := err.(*net.DNSError); ok {
// Don't retry permanent DNS errors
return dnsError.IsTemporary
}
if netErr, ok := err.(*net.OpError); ok {
// Don't retry permanent network errors
return netErr.Temporary()
}
// Retry by default
return true
}

15
help.go Normal file
View File

@@ -0,0 +1,15 @@
package main
const helpText =
`HTTP crawler for the OD-Database
DB >> https://od-db.the-eye.eu <<
Crawler >> https://github.com/terorie/od-database-crawler <<
Server >> https://github.com/simon987/od-database <<
Quick start:
- get config file (config.yml in working dir)
- get OD-DB server ("server.url": Database URL + /api)
- get access token ("server.token": e.g. c010b6dd-20...)
- ./od-database-crawler server
Questions? Discord @terorie#2664 / Telegram @terorie`

47
jenkins/Jenkinsfile vendored Normal file
View File

@@ -0,0 +1,47 @@
def remote = [:]
remote.name = 'remote'
remote.host = env.DEPLOY_HOST
remote.user = env.DEPLOY_USER
remote.identityFile = '/var/lib/jenkins/.ssh/id_rsa'
remote.knownHosts = '/var/lib/jenkins/.ssh/known_hosts'
remote.allowAnyHosts = true
remote.retryCount = 3
remote.retryWaitSec = 3
logLevel = 'FINER'
pipeline {
agent none
environment {
GOOS='linux'
CGO_ENABLED='0'
HOME='.'
}
stages {
stage('Build') {
agent {
docker {
image 'golang:latest'
}
}
steps {
sh 'mkdir -p /go/src/github.com/terorie/od-database-crawler'
sh 'cp -r *.go fasturl ds jenkins/build.sh "/go/src/github.com/terorie/od-database-crawler"'
sh 'cd /go/src/github.com/terorie/od-database-crawler && go get ./...'
sh './jenkins/build.sh'
stash includes: 'dist/', name: 'dist'
}
}
stage('Deploy') {
agent none
steps {
node('master') {
unstash 'dist'
sshCommand remote: remote, command: "ls od-database-crawler/"
sshPut remote: remote, from: 'dist/', into: 'od-database-crawler'
}
}
}
}
}

23
jenkins/build.sh Executable file
View File

@@ -0,0 +1,23 @@
#!/usr/bin/env bash
appname="od-database-crawler"
outdir="dist/"
tag="${BUILD_ID}_$(date +%Y-%m-%d)"
rm -rf "./${outdir}"
mkdir build 2> /dev/null
name=${outdir}${appname}-${tag}-linux
GOOS="linux" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}
name=${outdir}${appname}-${tag}-mac
GOOS="darwin" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}
name=${outdir}${appname}-${tag}-freebsd
GOOS="freebsd" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}

144
main.go
View File

@@ -2,72 +2,103 @@ package main
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/terorie/od-database-crawler/fasturl"
"github.com/urfave/cli"
"os"
"os/signal"
"strings"
"sync/atomic"
"time"
)
var app = cli.App {
Name: "od-database-crawler",
Usage: "OD-Database Go crawler",
Version: "1.0.1",
BashComplete: cli.DefaultAppComplete,
Writer: os.Stdout,
Action: cmdBase,
Commands: []cli.Command{
{
Name: "crawl",
Usage: "Crawl a list of URLs",
ArgsUsage: "<site>",
Action: cmdCrawler,
},
},
After: func(i *cli.Context) error {
var configFile string
var rootCmd = cobra.Command {
Use: "od-database-crawler",
Version: "1.2.2",
Short: "OD-Database Go crawler",
Long: helpText,
PersistentPreRunE: preRun,
PersistentPostRun: func(cmd *cobra.Command, args []string) {
exitHooks.Execute()
return nil
},
}
var serverCmd = cobra.Command {
Use: "server",
Short: "Start crawl server",
Long: "Connect to the OD-Database and contribute to the database\n" +
"by crawling the web for open directories!",
Run: cmdBase,
}
var crawlCmd = cobra.Command {
Use: "crawl",
Short: "Crawl an URL",
Long: "Crawl the URL specified.\n" +
"Results will not be uploaded to the database,\n" +
"they're saved under crawled/0.json instead.\n" +
"Primarily used for testing and benchmarking.",
RunE: cmdCrawler,
Args: cobra.ExactArgs(1),
}
var exitHooks Hooks
func init() {
rootCmd.AddCommand(&crawlCmd)
rootCmd.AddCommand(&serverCmd)
prepareConfig()
}
func main() {
err := os.MkdirAll("crawled", 0755)
if err != nil {
panic(err)
}
func preRun(cmd *cobra.Command, args []string) error {
if err := os.MkdirAll("crawled", 0755);
err != nil { panic(err) }
readConfig()
app.Run(os.Args)
if err := os.MkdirAll("queue", 0755);
err != nil { panic(err) }
return nil
}
func cmdBase(_ *cli.Context) error {
// TODO Graceful shutdown
appCtx := context.Background()
forceCtx := context.Background()
func main() {
err := rootCmd.Execute()
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
func cmdBase(_ *cobra.Command, _ []string) {
onlineMode = true
readConfig()
appCtx, soft := context.WithCancel(context.Background())
forceCtx, hard := context.WithCancel(context.Background())
go hardShutdown(forceCtx)
go listenCtrlC(soft, hard)
inRemotes := make(chan *OD)
go Schedule(forceCtx, inRemotes)
go Schedule(appCtx, inRemotes)
ticker := time.NewTicker(config.Recheck)
defer ticker.Stop()
for {
select {
case <-appCtx.Done():
return nil
goto shutdown
case <-ticker.C:
t, err := FetchTask()
if err != nil {
logrus.WithError(err).
Error("Failed to get new task")
time.Sleep(30 * time.Second)
if !sleep(viper.GetDuration(ConfCooldown), appCtx) {
goto shutdown
}
continue
}
if t == nil {
@@ -83,33 +114,27 @@ func cmdBase(_ *cli.Context) error {
if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme {
// Not an error
err = nil
// Give back task
//err2 := CancelTask(t.WebsiteId)
//if err2 != nil {
// logrus.Error(err2)
//}
// TODO FTP crawler
continue
} else if err != nil {
logrus.WithError(err).
Error("Failed to get new task")
time.Sleep(30 * time.Second)
time.Sleep(viper.GetDuration(ConfCooldown))
continue
}
ScheduleTask(inRemotes, t, &baseUri)
}
}
return nil
shutdown:
globalWait.Wait()
}
func cmdCrawler(clic *cli.Context) error {
if clic.NArg() != 1 {
cli.ShowCommandHelpAndExit(clic, "crawl", 1)
}
func cmdCrawler(_ *cobra.Command, args []string) error {
onlineMode = false
readConfig()
arg := clic.Args()[0]
arg := args[0]
// https://github.com/golang/go/issues/19779
if !strings.Contains(arg, "://") {
arg = "http://" + arg
@@ -141,3 +166,30 @@ func cmdCrawler(clic *cli.Context) error {
return nil
}
func listenCtrlC(soft, hard context.CancelFunc) {
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
<-c
logrus.Info(">>> Shutting down crawler... <<<")
soft()
<-c
logrus.Warning(">>> Force shutdown! <<<")
hard()
}
func hardShutdown(c context.Context) {
<-c.Done()
os.Exit(1)
}
func sleep(d time.Duration, c context.Context) bool {
select {
case <-time.After(d):
return true
case <-c.Done():
return false
}
}

View File

@@ -7,23 +7,32 @@ import (
"time"
)
type ResultCode int
const (
TR_OK = ResultCode(iota)
TR_FAIL = 1
TR_SKIP = 2
)
type Task struct {
WebsiteId uint64 `json:"website_id"`
Url string `json:"url"`
WebsiteId uint64 `json:"website_id"`
Url string `json:"url"`
UploadToken string `json:"upload_token"`
TaskId int64
}
type TaskResult struct {
StatusCode string `json:"status_code"`
FileCount uint64 `json:"file_count"`
ErrorCount uint64 `json:"-"`
StartTime time.Time `json:"-"`
StartTimeUnix int64 `json:"start_time"`
EndTimeUnix int64 `json:"end_time"`
WebsiteId uint64 `json:"website_id"`
ResultCode ResultCode `json:"status_code"`
FileCount uint64 `json:"file_count"`
ErrorCount uint64 `json:"-"`
StartTime time.Time `json:"-"`
StartTimeUnix int64 `json:"start_time"`
EndTimeUnix int64 `json:"end_time"`
WebsiteId uint64 `json:"website_id"`
}
type Job struct {
OD *OD
Uri fasturl.URL
UriStr string
Fails int
@@ -52,8 +61,16 @@ func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) {
defer o.Scanned.Unlock()
exists = o.Scanned.Get(k)
if exists { return true }
if exists {
return true
}
o.Scanned.Put(k)
return false
}
type errorString string
func (e errorString) Error() string {
return string(e)
}

129
queue.go Normal file
View File

@@ -0,0 +1,129 @@
package main
import (
"github.com/beeker1121/goque"
"os"
"sync"
"sync/atomic"
)
type BufferedQueue struct {
dataDir string
q *goque.Queue
buf []Job
m sync.Mutex
}
func OpenQueue(dataDir string) (bq *BufferedQueue, err error) {
bq = new(BufferedQueue)
if config.JobBufferSize < 0 {
return
}
bq.dataDir = dataDir
bq.q, err = goque.OpenQueue(dataDir)
if err != nil { return nil, err }
return
}
func (q *BufferedQueue) Enqueue(job *Job) error {
atomic.AddInt64(&totalQueued, 1)
if q.directEnqueue(job) {
return nil
}
var gob JobGob
gob.ToGob(job)
_, err := q.q.EnqueueObject(gob)
return err
}
func (q *BufferedQueue) Dequeue() (job Job, err error) {
if q.directDequeue(&job) {
atomic.AddInt64(&totalQueued, -1)
return job, nil
}
if config.JobBufferSize < 0 {
err = goque.ErrEmpty
return
}
var item *goque.Item
item, err = q.q.Dequeue()
if err != nil { return }
atomic.AddInt64(&totalQueued, -1)
var gob JobGob
err = item.ToObject(&gob)
if err != nil { return }
gob.FromGob(&job)
return
}
func (q *BufferedQueue) directEnqueue(job *Job) bool {
q.m.Lock()
defer q.m.Unlock()
bs := config.JobBufferSize
if len(q.buf) < bs || bs < 0 {
q.buf = append(q.buf, *job)
return true
} else {
return false
}
}
func (q *BufferedQueue) directDequeue(job *Job) bool {
q.m.Lock()
defer q.m.Unlock()
if len(q.buf) > 0 {
*job = q.buf[0]
q.buf = q.buf[1:]
return true
} else {
return false
}
}
// Always returns nil (But implements io.Closer)
func (q *BufferedQueue) Close() error {
if config.JobBufferSize < 0 {
return nil
}
// Close ignoring errors
q.q.Close()
// Delete files
if err := os.RemoveAll(q.dataDir);
err != nil { panic(err) }
return nil
}
type JobGob struct {
Uri string
Fails int
LastError string
}
func (g *JobGob) ToGob(j *Job) {
g.Uri = j.UriStr
g.Fails = j.Fails
if j.LastError != nil {
g.LastError = j.LastError.Error()
}
}
func (g *JobGob) FromGob(j *Job) {
if err := j.Uri.Parse(g.Uri);
err != nil { panic(err) }
j.UriStr = g.Uri
j.Fails = g.Fails
if g.LastError != "" {
j.LastError = errorString(g.LastError)
}
}

View File

@@ -16,7 +16,7 @@ import (
var activeTasksLock sync.Mutex
var activeTasks = make(map[uint64]bool)
var numActiveTasks int32
var totalBuffered int64
var totalQueued int64
func Schedule(c context.Context, remotes <-chan *OD) {
go Stats(c)
@@ -28,8 +28,24 @@ func Schedule(c context.Context, remotes <-chan *OD) {
// Collect results
results := make(chan File)
remote.WCtx.OD = remote
// Get queue path
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue
if err := os.RemoveAll(queuePath); err != nil {
panic(err)
}
// Start new queue
var err error
remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil {
panic(err)
}
// Spawn workers
remote.WCtx.in, remote.WCtx.out = makeJobBuffer(c)
for i := 0; i < config.Workers; i++ {
go remote.WCtx.Worker(results)
}
@@ -37,7 +53,6 @@ func Schedule(c context.Context, remotes <-chan *OD) {
// Enqueue initial job
atomic.AddInt32(&numActiveTasks, 1)
remote.WCtx.queueJob(Job{
OD: remote,
Uri: remote.BaseUri,
UriStr: remote.BaseUri.String(),
Fails: 0,
@@ -65,12 +80,12 @@ func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
globalWait.Add(1)
now := time.Now()
od := &OD {
Task: *t,
od := &OD{
Task: *t,
BaseUri: *u,
Result: TaskResult {
WebsiteId: t.WebsiteId,
StartTime: now,
Result: TaskResult{
WebsiteId: t.WebsiteId,
StartTime: now,
StartTimeUnix: now.Unix(),
},
}
@@ -105,7 +120,7 @@ func (o *OD) Watch(results chan File) {
// Open crawl results file
f, err := os.OpenFile(
filePath,
os.O_CREATE | os.O_RDWR | os.O_TRUNC,
os.O_CREATE|os.O_RDWR|os.O_TRUNC,
0644,
)
if err != nil {
@@ -133,7 +148,7 @@ func (o *OD) Watch(results chan File) {
}
// Upload results
err = PushResult(&o.Result, f)
err = PushResult(&o.Task, f)
if err != nil {
logrus.WithError(err).
Error("Failed uploading crawl results")
@@ -148,30 +163,28 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
// Wait for all jobs on remote to finish
o.Wait.Wait()
close(o.WCtx.in)
// Close queue
if err := o.WCtx.Queue.Close(); err != nil {
panic(err)
}
atomic.AddInt32(&numActiveTasks, -1)
// Log finish
logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
"duration": time.Since(o.Result.StartTime),
}).Info("Crawler finished")
// Set status code
now := time.Now()
o.Result.EndTimeUnix = now.Unix()
fileCount := atomic.LoadUint64(&o.Result.FileCount)
if fileCount == 0 {
errorCount := atomic.LoadUint64(&o.Result.ErrorCount)
if errorCount == 0 {
o.Result.StatusCode = "empty"
} else {
o.Result.StatusCode = "directory listing failed"
}
if atomic.LoadUint64(&o.Result.ErrorCount) != 0 {
o.Result.ResultCode = TR_FAIL
} else {
o.Result.StatusCode = "success"
o.Result.ResultCode = TR_OK
}
}
@@ -189,60 +202,18 @@ func (t *Task) collect(results chan File, f *os.File) error {
result.Path = fasturl.PathUnescape(result.Path)
result.Name = fasturl.PathUnescape(result.Name)
resJson, err := json.Marshal(result)
if err != nil { panic(err) }
if err != nil {
panic(err)
}
_, err = f.Write(resJson)
if err != nil { return err }
if err != nil {
return err
}
_, err = f.Write([]byte{'\n'})
if err != nil { return err }
if err != nil {
return err
}
}
return nil
}
func makeJobBuffer(c context.Context) (chan<- Job, <-chan Job) {
in := make(chan Job)
out := make(chan Job)
go bufferJobs(c, in, out)
return in, out
}
func bufferJobs(c context.Context, in chan Job, out chan Job) {
defer close(out)
var inQueue []Job
outCh := func() chan Job {
if len(inQueue) == 0 {
return nil
}
return out
}
for len(inQueue) > 0 || in != nil {
if len(inQueue) == 0 {
select {
case v, ok := <-in:
if !ok {
in = nil
} else {
atomic.AddInt64(&totalBuffered, 1)
inQueue = append(inQueue, v)
}
case <-c.Done():
return
}
} else {
select {
case v, ok := <-in:
if !ok {
in = nil
} else {
atomic.AddInt64(&totalBuffered, 1)
inQueue = append(inQueue, v)
}
case outCh() <- inQueue[0]:
atomic.AddInt64(&totalBuffered, -1)
inQueue = inQueue[1:]
case <-c.Done():
return
}
}
}
}

290
server.go
View File

@@ -2,46 +2,125 @@ package main
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/fasthttp/websocket"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"io"
"mime/multipart"
"io/ioutil"
"net/http"
"net/url"
"os"
"strconv"
)
var serverClient = http.Client {
Timeout: config.ServerTimeout,
var serverWorker *TrackerWorker
var serverClient = http.Client{
Timeout: config.ServerTimeout,
Transport: new(ServerTripper),
}
var serverUserAgent = "od-database-crawler/" + rootCmd.Version
func getOrCreateWorker() {
if _, err := os.Stat("worker.json"); os.IsNotExist(err) {
req := CreateTrackerWorkerRequest{
Alias: config.TrackerAlias,
}
body, _ := json.Marshal(&req)
buf := bytes.NewBuffer(body)
resp, _ := serverClient.Post(config.TrackerUrl+"/worker/create", "application/json", buf)
workerResponse := CreateTrackerWorkerResponse{}
respBody, _ := ioutil.ReadAll(resp.Body)
_ = json.Unmarshal(respBody, &workerResponse)
workerJsonData, _ := json.Marshal(&workerResponse.Content.Worker)
fp, _ := os.OpenFile("worker.json", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
_, _ = fp.Write(workerJsonData)
//Request ASSIGN permission
serverWorker = &workerResponse.Content.Worker
accessReq, _ := json.Marshal(WorkerAccessRequest{
Project: config.TrackerProject,
Assign: true,
Submit: false,
})
buf = bytes.NewBuffer(accessReq)
res, err := serverClient.Post(config.TrackerUrl+"/project/request_access", "application/json", buf)
if err != nil {
panic(err)
}
logrus.WithFields(logrus.Fields{
"response": res.StatusCode,
}).Info("Requested ASSIGN permission")
} else {
var worker TrackerWorker
fp, _ := os.OpenFile("worker.json", os.O_RDONLY, 0600)
workerJsonData, _ := ioutil.ReadAll(fp)
_ = json.Unmarshal(workerJsonData, &worker)
serverWorker = &worker
}
}
func FetchTask() (t *Task, err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/get",
url.Values{ "token": {config.Token} })
if err != nil { return }
if serverWorker == nil {
getOrCreateWorker()
}
res, err := serverClient.Get(config.TrackerUrl + "/task/get/" + strconv.Itoa(config.TrackerProject))
if err != nil {
return
}
defer res.Body.Close()
switch res.StatusCode {
case 200:
break
case 404, 500:
return nil, nil
default:
return nil, fmt.Errorf("http %s", res.Status)
}
t = new(Task)
err = json.NewDecoder(res.Body).Decode(t)
if err != nil { return }
jsonResponse := FetchTaskResponse{}
err = json.NewDecoder(res.Body).Decode(&jsonResponse)
if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil {
return
}
if !jsonResponse.Ok {
if jsonResponse.Message == "No task available" {
return nil, nil
}
return nil, errors.New(jsonResponse.Message)
}
task := Task{}
err = json.Unmarshal([]byte(jsonResponse.Content.Task.Recipe), &task)
if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil {
return
}
t = &task
t.TaskId = jsonResponse.Content.Task.Id
return
}
func PushResult(result *TaskResult, f *os.File) (err error) {
if result.WebsiteId == 0 {
func PushResult(task *Task, f *os.File) (err error) {
if task.WebsiteId == 0 {
// Not a real result, don't push
return nil
}
@@ -52,10 +131,10 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
return
}
err = uploadChunks(result.WebsiteId, f)
err = uploadWebsocket(f, task.UploadToken)
if err != nil {
logrus.Errorf("Failed to upload file list: %s", err)
err2 := CancelTask(result.WebsiteId)
err2 := releaseTask(task, TR_FAIL)
if err2 != nil {
logrus.Error(err2)
}
@@ -63,79 +142,62 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
}
// Upload result ignoring errors
uploadResult(result)
_ = releaseTask(task, TR_OK)
return
}
func uploadChunks(websiteId uint64, f *os.File) error {
eof := false
for iter := 1; !eof; iter++ {
// TODO Stream with io.Pipe?
var b bytes.Buffer
func uploadWebsocket(f *os.File, token string) (err error) {
multi := multipart.NewWriter(&b)
u := url.URL{Scheme: config.WsBucketScheme, Host: config.WsBucketHost, Path: "/upload"}
// Set upload fields
var err error
err = multi.WriteField("token", config.Token)
if err != nil { return err }
err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId))
if err != nil { return err }
// Copy chunk to file_list
formFile, err := multi.CreateFormFile("file_list", "file_list")
var n int64
n, err = io.CopyN(formFile, f, config.ChunkSize)
if err != io.EOF && err != nil {
return err
}
if n == 0 {
// Don't upload, no content
return nil
} else if n < config.ChunkSize {
err = nil
// Break at end of iteration
eof = true
}
multi.Close()
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/upload",
&b)
req.Header.Set("content-type", multi.FormDataContentType())
if err != nil { return err }
res, err := serverClient.Do(req)
if err != nil { return err }
res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to upload list part %d: %s",
iter, res.Status)
}
logrus.WithField("id", websiteId).
WithField("part", iter).
Infof("Uploading files chunk")
header := http.Header{}
header.Add("X-Upload-Token", token)
conn, _, err := websocket.DefaultDialer.Dial(u.String(), header)
if err != nil {
return
}
conn.EnableWriteCompression(true) //TODO: Is this necessary?
socketWriter, _ := conn.NextWriter(websocket.BinaryMessage)
_, _ = io.Copy(socketWriter, f)
err = socketWriter.Close()
if err != nil {
logrus.Error("FIXME: couldn't do file upload")
return
}
err = conn.Close()
if err != nil {
return
}
return nil
}
func uploadResult(result *TaskResult) (err error) {
resultEnc, err := json.Marshal(result)
if err != nil { panic(err) }
func releaseTask(task *Task, taskResult ResultCode) (err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/complete",
url.Values {
"token": {config.Token},
"result": {string(resultEnc)},
},
req := releaseTaskRequest{
TaskId: task.TaskId,
ResultCode: taskResult,
// TODO Will implement verification in a later ODDB update
Verification: 0,
}
resultEnc, err := json.Marshal(&req)
if err != nil {
panic(err)
}
body := bytes.NewBuffer(resultEnc)
res, err := serverClient.Post(
config.TrackerUrl+"/task/release",
"application/json",
body,
)
if err != nil { return }
if err != nil {
return
}
res.Body.Close()
if res.StatusCode != http.StatusOK {
@@ -145,20 +207,66 @@ func uploadResult(result *TaskResult) (err error) {
return
}
func CancelTask(websiteId uint64) (err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/cancel",
url.Values{
"token": {config.Token},
"website_id": {strconv.FormatUint(websiteId, 10)},
},
)
if err != nil { return }
res.Body.Close()
type ServerTripper struct{}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to cancel task: %s", res.Status)
func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) {
req.Header.Set("User-Agent", serverUserAgent)
//TODO: Use task_tracker/client ?
if serverWorker != nil {
req.Header.Add("X-Worker-Id", strconv.Itoa(serverWorker.Id))
req.Header.Add("X-Secret", base64.StdEncoding.EncodeToString(serverWorker.Secret))
}
return
return http.DefaultTransport.RoundTrip(req)
}
// https://github.com/simon987/task_tracker/blob/master/api/models.go
type releaseTaskRequest struct {
TaskId int64 `json:"task_id"`
ResultCode ResultCode `json:"result"`
Verification int64 `json:"verification"`
}
type WorkerAccessRequest struct {
Assign bool `json:"assign"`
Submit bool `json:"submit"`
Project int `json:"project"`
}
type FetchTaskResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Task struct {
Id int64 `json:"id"`
Priority int64 `json:"priority"`
Project struct {
Id int64 `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
AssignRate rate.Limit `json:"assign_rate"`
SubmitRate rate.Limit `json:"submit_rate"`
} `json:"project"`
Recipe string `json:"recipe"`
} `json:"task"`
} `json:"content"`
}
type TrackerWorker struct {
Alias string `json:"alias"`
Id int `json:"id"`
Secret []byte `json:"secret"`
}
type CreateTrackerWorkerResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Worker TrackerWorker `json:"worker"`
} `json:"content"`
}
type CreateTrackerWorkerRequest struct {
Alias string `json:"alias"`
}

View File

@@ -3,6 +3,7 @@ package main
import (
"context"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"math"
"runtime"
"sync/atomic"
@@ -19,11 +20,14 @@ func Stats(c context.Context) {
var crawlTicker <-chan time.Time
var allocTicker <-chan time.Time
if config.CrawlStats != 0 {
crawlTicker = time.NewTicker(config.CrawlStats).C
crawlInterval := viper.GetDuration(ConfCrawlStats)
allocInterval := viper.GetDuration(ConfAllocStats)
if crawlInterval != 0 {
crawlTicker = time.Tick(crawlInterval)
}
if config.AllocStats != 0 {
allocTicker = time.NewTicker(config.AllocStats).C
if allocInterval != 0 {
allocTicker = time.Tick(allocInterval)
}
for {
@@ -32,7 +36,7 @@ func Stats(c context.Context) {
startedNow := atomic.LoadUint64(&totalStarted)
perSecond := float64(startedNow - startedLast) /
config.CrawlStats.Seconds()
crawlInterval.Seconds()
// Round to .5
perSecond *= 2
@@ -57,7 +61,7 @@ func Stats(c context.Context) {
runtime.ReadMemStats(&mem)
logrus.WithFields(logrus.Fields{
"queue_count": atomic.LoadInt64(&totalBuffered),
"queue_count": atomic.LoadInt64(&totalQueued),
"heap": FormatByteCount(mem.Alloc),
"objects": mem.HeapObjects,
"num_gc": mem.NumGC,

View File

@@ -1,8 +1,8 @@
package main
import (
"github.com/beeker1121/goque"
"github.com/sirupsen/logrus"
"github.com/valyala/fasthttp"
"math"
"sort"
"strings"
@@ -14,24 +14,38 @@ import (
var globalWait sync.WaitGroup
type WorkerContext struct {
in chan<- Job
out <-chan Job
OD *OD
Queue *BufferedQueue
lastRateLimit time.Time
numRateLimits int
}
func (w WorkerContext) Worker(results chan<- File) {
for job := range w.out {
w.step(results, job)
func (w *WorkerContext) Worker(results chan<- File) {
for {
job, err := w.Queue.Dequeue()
switch err {
case goque.ErrEmpty:
time.Sleep(500 * time.Millisecond)
continue
case goque.ErrDBClosed:
return
case nil:
w.step(results, job)
default:
panic(err)
}
}
}
func (w WorkerContext) step(results chan<- File, job Job) {
defer w.finishJob(&job)
func (w *WorkerContext) step(results chan<- File, job Job) {
defer w.finishJob()
var f File
newJobs, err := DoJob(&job, &f)
newJobs, err := w.DoJob(&job, &f)
atomic.AddUint64(&totalStarted, 1)
if err == ErrKnown {
return
@@ -40,20 +54,18 @@ func (w WorkerContext) step(results chan<- File, job Job) {
if err != nil {
job.Fails++
if httpErr, ok := err.(*HttpError); ok {
switch httpErr.code {
case fasthttp.StatusTooManyRequests:
err = ErrRateLimit
default:
// Don't retry HTTP error codes
return
}
if !shouldRetry(err) {
atomic.AddUint64(&totalAborted, 1)
//logrus.WithField("url", job.UriStr).
// WithError(err).
// Error("Giving up after failure")
return
}
if job.Fails > config.Retries {
atomic.AddUint64(&totalAborted, 1)
logrus.WithField("url", job.UriStr).
Errorf("Giving up after %d fails", job.Fails)
//logrus.WithField("url", job.UriStr).
// Errorf("Giving up after %d fails", job.Fails)
} else {
atomic.AddUint64(&totalRetries, 1)
if err == ErrRateLimit {
@@ -75,8 +87,10 @@ func (w WorkerContext) step(results chan<- File, job Job) {
}
}
func DoJob(job *Job, f *File) (newJobs []Job, err error) {
if len(job.Uri.Path) == 0 { return }
func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
if len(job.Uri.Path) == 0 {
return
}
if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
// Load directory
links, err := GetDir(job, f)
@@ -93,7 +107,7 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
hash := f.HashDir(links)
// Skip symlinked dirs
if job.OD.LoadOrStoreKey(&hash) {
if w.OD.LoadOrStoreKey(&hash) {
return nil, ErrKnown
}
@@ -114,7 +128,6 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
lastLink = uriStr
newJobs = append(newJobs, Job{
OD: job.OD,
Uri: link,
UriStr: uriStr,
Fails: 0,
@@ -139,28 +152,30 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
}
return nil, err
}
atomic.AddUint64(&job.OD.Result.FileCount, 1)
atomic.AddUint64(&w.OD.Result.FileCount, 1)
}
return
}
func (w WorkerContext) queueJob(job Job) {
job.OD.Wait.Add(1)
func (w *WorkerContext) queueJob(job Job) {
w.OD.Wait.Add(1)
if w.numRateLimits > 0 {
if time.Since(w.lastRateLimit) > 5 * time.Second {
if time.Since(w.lastRateLimit) > 5*time.Second {
w.numRateLimits = 0
} else {
time.Sleep(time.Duration(math.Sqrt(float64(50 * w.numRateLimits))) *
time.Sleep(time.Duration(math.Sqrt(float64(50*w.numRateLimits))) *
100 * time.Millisecond)
}
}
w.in <- job
if err := w.Queue.Enqueue(&job); err != nil {
panic(err)
}
}
func (w WorkerContext) finishJob(job *Job) {
job.OD.Wait.Done()
func (w *WorkerContext) finishJob() {
w.OD.Wait.Done()
}
func isErrSilent(err error) bool {