32 Commits

Author SHA1 Message Date
simon987
a962c60b82 Don't panic on file upload error 2019-04-06 14:56:22 -04:00
simon987
24f0bd91f7 Remove debug messages & don't use disk queue by default 2019-04-06 12:21:35 -04:00
simon987
84c10e1981 Change default config 2019-04-05 17:53:00 -04:00
simon987
860fa79327 Jenkins setup 2019-04-04 19:22:14 -04:00
simon987
76bc8293d6 minimum viable 2019-03-30 09:02:55 -04:00
simon987
3470be6086 More work on task_tracker integration 2019-03-09 16:38:04 -05:00
Richard Patel
60471a081e Switch to simon987/task_tracker 2019-02-28 23:51:26 +01:00
dependabot[bot]
0b3f0d87fe Upgrade fasthttp to 1.2.0
Bumps [github.com/valyala/fasthttp](https://github.com/valyala/fasthttp) from 1.1.0 to 1.2.0.
- [Release notes](https://github.com/valyala/fasthttp/releases)
- [Commits](https://github.com/valyala/fasthttp/compare/v1.1.0...v1.2.0)

Thanks bot

Signed-off-by: dependabot[bot] <support@dependabot.com>
2019-02-28 22:42:40 +01:00
terorie
da9c75e392 Reduce Docker image size 2019-02-22 21:37:04 +01:00
Pascal
8947e05d0c Fix Dockerfile
Fixes #22
Credit to @pascaldulieu
2019-02-22 21:11:55 +01:00
Richard Patel
8c5f99d616 More descriptive error if /task/get returns invalid JSON 2019-02-22 20:17:59 +01:00
Richard Patel
206ea0e91d Simplify config 2019-02-22 18:50:35 +01:00
Richard Patel
8b9d8bfd17 Fix README.md format 2019-02-22 06:04:10 +01:00
Richard Patel
c9ff102d80 Fix Dockerfile 2019-02-22 06:00:57 +01:00
Richard Patel
88856c1c19 Flag explanation in README.md 2019-02-22 05:59:59 +01:00
Richard Patel
9e9b606250 Merge branch 'stable' 2019-02-22 05:37:52 +01:00
Richard Patel
326e29e5e4 Reset to stable branch 2019-02-22 05:37:45 +01:00
Richard Patel
c2acd5463f Restore .travis.yml
Now handling auto-build over Docker Hub directly
2019-02-22 05:16:25 +01:00
Richard Patel
e4d04e6a5f go.mod: Fix package path
lol
2019-02-22 05:10:43 +01:00
terorie
9f1402e841 New Dockerfile and Travis Config (#23) 2019-02-22 05:07:27 +01:00
terorie
7c8ab50ee4 Merge stable into master 2019-02-13 15:32:40 +01:00
terorie
281d2d17d6 Update config.yml 2019-02-13 15:32:00 +01:00
Richard Patel
45cbd4d535 Disable resume feature 2019-02-05 15:44:59 +01:00
Richard Patel
771d49f2dd Fix WaitGroup deadlock 2019-02-03 17:14:20 +01:00
Richard Patel
dbd787aa81 Fix WaitGroup crash 2019-02-03 17:09:43 +01:00
Richard Patel
cea6c1658b Bugfix: Don't schedule new tasks during shutdown 2019-02-03 17:02:44 +01:00
terorie
885af5bb3b Beta task resuming 2019-02-03 16:50:08 +01:00
Richard Patel
b18b70f798 Fix segfault (thanks Pikami) 2019-02-03 14:00:17 +01:00
Richard Patel
9d5f549774 Better server User-Agent string 2019-02-03 12:23:21 +01:00
Richard Patel
5239af08f7 Bump version to v1.2.1 2019-02-03 03:36:39 +01:00
Richard Patel
46c0e0bd32 Smarter HTTP error handling 2019-02-03 03:35:09 +01:00
Richard Patel
0ca6deede8 Fix --config flag 2019-02-03 03:26:48 +01:00
14 changed files with 591 additions and 377 deletions

15
Dockerfile Normal file
View File

@@ -0,0 +1,15 @@
FROM golang:alpine as builder
ADD . /go/src/github.com/terorie/od-database-crawler
RUN apk add git \
&& go get -d -v github.com/terorie/od-database-crawler \
&& CGO_ENABLED=0 go install -a \
-installsuffix cgo \
-ldflags="-s -w" \
github.com/terorie/od-database-crawler
FROM scratch
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /go/bin/od-database-crawler /bin/
WORKDIR /oddb
VOLUME [ "/oddb" ]
CMD ["/bin/od-database-crawler", "server"]

View File

@@ -9,16 +9,46 @@
https://od-db.the-eye.eu/
#### Usage
## Usage
### Deploys
1. With Config File (if `config.yml` found in working dir)
- Download [default config](https://github.com/terorie/od-database-crawler/blob/master/config.yml)
- Set `server.url` and `server.token`
- Start with `./od-database-crawler server --config <file>`
2. With Flags or env
- Override config file if it exists
- `--help` for list of flags
- Every flag is available as an environment variable:
`--server.crawl_stats` ➡️ `OD_SERVER_CRAWL_STATS`
- Start with `./od-database-crawler server <flags>`
3. With Docker
```bash
docker run \
-e OD_SERVER_URL=xxx \
-e OD_SERVER_TOKEN=xxx \
terorie/od-database-crawler
```
### Flag reference
Here are the most important config flags. For more fine control, take a look at `/config.yml`.
| Flag/Environment | Description | Example |
| ------------------------------------------------------- | ------------------------------------------------------------ | ----------------------------------- |
| `server.url`<br />`OD_SERVER_URL` | OD-DB Server URL | `https://od-db.mine.the-eye.eu/api` |
| `server.token`<br />`OD_SERVER_TOKEN` | OD-DB Server Access Token | _Ask Hexa **TM**_ |
| `server.recheck`<br />`OD_SERVER_RECHECK` | Job Fetching Interval | `3s` |
| `output.crawl_stats`<br />`OD_OUTPUT_CRAWL_STATS` | Crawl Stats Logging Interval (0 = disabled) | `500ms` |
| `output.resource_stats`<br />`OD_OUTPUT_RESORUCE_STATS` | Resource Stats Logging Interval (0 = disabled) | `8s` |
| `output.log`<br />`OD_OUTPUT_LOG` | Log File (none = disabled) | `crawler.log` |
| `crawl.tasks`<br />`OD_CRAWL_TASKS` | Max number of sites to crawl concurrently | `500` |
| `crawl.connections`<br />`OD_CRAWL_CONNECTIONS` | HTTP connections per site | `1` |
| `crawl.retries`<br />`OD_CRAWL_RETRIES` | How often to retry after a temporary failure (e.g. `HTTP 429` or timeouts) | `5` |
| `crawl.dial_timeout`<br />`OD_CRAWL_DIAL_TIMEOUT` | TCP Connect timeout | `5s` |
| `crawl.timeout`<br />`OD_CRAWL_TIMEOUT` | HTTP request timeout | `20s` |
| `crawl.user-agent`<br />`OD_CRAWL_USER_AGENT` | HTTP Crawler User-Agent | `googlebot/1.2.3` |
| `crawl.job_buffer`<br />`OD_CRAWL_JOB_BUFFER` | Number of URLs to keep in memory/cache, per job. The rest is offloaded to disk. Decrease this value if the crawler uses too much RAM. (0 = Disable Cache, -1 = Only use Cache) | `5000` |

230
config.go
View File

@@ -4,45 +4,53 @@ import (
"bufio"
"fmt"
"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"io"
"os"
"path/filepath"
"strings"
"time"
)
var config struct {
ServerUrl string
Token string
ServerTimeout time.Duration
Recheck time.Duration
ChunkSize int64
Retries int
Workers int
UserAgent string
Tasks int32
Verbose bool
PrintHTTP bool
JobBufferSize int
TrackerUrl string
TrackerProject int
TrackerAlias string
WsBucketScheme string
WsBucketHost string
ServerTimeout time.Duration
Recheck time.Duration
ChunkSize int64
Retries int
Workers int
UserAgent string
Tasks int32
Verbose bool
PrintHTTP bool
JobBufferSize int
}
var onlineMode bool
const (
ConfServerUrl = "server.url"
ConfToken = "server.token"
ConfServerTimeout = "server.timeout"
ConfRecheck = "server.recheck"
ConfCooldown = "server.cooldown"
ConfChunkSize = "server.upload_chunk"
ConfUploadRetries = "server.upload_retries"
ConfTrackerUrl = "server.url"
ConfTrackerProject = "server.project"
ConfTrackerAlias = "server.alias"
ConfWsBucketScheme = "server.ws_bucket_scheme"
ConfWsBucketHost = "server.ws_bucket_host"
ConfServerTimeout = "server.timeout"
ConfRecheck = "server.recheck"
ConfCooldown = "server.cooldown"
ConfChunkSize = "server.upload_chunk"
ConfUploadRetries = "server.upload_retries"
ConfUploadRetryInterval = "server.upload_retry_interval"
ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections"
ConfUserAgent = "crawl.user-agent"
ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout"
ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections"
ConfUserAgent = "crawl.user-agent"
ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout"
ConfJobBufferSize = "crawl.job_buffer"
ConfCrawlStats = "output.crawl_stats"
@@ -55,8 +63,62 @@ const (
func prepareConfig() {
pf := rootCmd.PersistentFlags()
bind := func(s string) {
if err := viper.BindPFlag(s, pf.Lookup(s)); err != nil {
pf.SortFlags = false
pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG")
pf.String(ConfTrackerUrl, "https://tt.the-eye.eu/api", "task_tracker api URL")
pf.String(ConfTrackerProject, "1", "task_tracker project id")
pf.String(ConfWsBucketScheme, "wss", "ws_bucket scheme")
pf.String(ConfWsBucketHost, "wsb.the-eye.eu", "ws_bucket host")
pf.String(ConfTrackerAlias, "changeme", "task_tracker worker alias")
pf.Duration(ConfServerTimeout, 60*time.Second, "OD-DB request timeout")
pf.Duration(ConfRecheck, 1*time.Second, "OD-DB: Poll interval for new jobs")
pf.Duration(ConfCooldown, 1*time.Minute, "OD-DB: Time to wait after a server-side error")
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
pf.Duration(ConfUploadRetryInterval, 30*time.Second, "OD-DB: Time to wait between upload retries")
pf.Uint(ConfTasks, 25, "Crawler: Max concurrent tasks")
pf.Uint(ConfWorkers, 1, "Crawler: Connections per server")
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
pf.Duration(ConfDialTimeout, 10*time.Second, "Crawler: Handshake timeout")
pf.Duration(ConfTimeout, 30*time.Second, "Crawler: Request timeout")
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
pf.Int(ConfJobBufferSize, -1, "Crawler: Task queue cache size")
pf.Duration(ConfCrawlStats, 500*time.Second, "Log: Crawl stats interval")
pf.Duration(ConfAllocStats, 500*time.Second, "Log: Resource stats interval")
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors")
pf.String(ConfLogFile, "crawler.log", "Log file")
// Bind all flags to Viper
pf.VisitAll(func(flag *pflag.Flag) {
s := flag.Name
s = strings.TrimLeft(s, "-")
if err := viper.BindPFlag(s, flag); err != nil {
panic(err)
}
var envKey string
@@ -66,102 +128,48 @@ func prepareConfig() {
if err := viper.BindEnv(s, envKey); err != nil {
panic(err)
}
}
pf.SortFlags = false
pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG")
pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL")
bind(ConfServerUrl)
pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)")
bind(ConfToken)
pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout")
bind(ConfServerTimeout)
pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs")
bind(ConfRecheck)
pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error")
bind(ConfCooldown)
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
bind(ConfChunkSize)
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
bind(ConfUploadRetries)
pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries")
bind(ConfUploadRetryInterval)
pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks")
bind(ConfTasks)
pf.Uint(ConfWorkers, 4, "Crawler: Connections per server")
bind(ConfWorkers)
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
bind(ConfRetries)
pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout")
bind(ConfDialTimeout)
pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout")
bind(ConfTimeout)
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
bind(ConfUserAgent)
pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size")
bind(ConfJobBufferSize)
pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval")
bind(ConfCrawlStats)
pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval")
bind(ConfAllocStats)
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
bind(ConfVerbose)
pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors")
bind(ConfPrintHTTP)
pf.String(ConfLogFile, "crawler.log", "Log file")
bind(ConfLogFile)
})
}
func readConfig() {
// If config.yml in working dir, use it
if _, err := os.Stat("config.yml"); err == nil {
configFile = "config.yml"
if configFile == "" {
_, err := os.Stat("config.yml")
if err == nil {
configFile = "config.yml"
}
}
if configFile != "" {
var err error
confPath, err := filepath.Abs(configFile)
if err != nil { panic(err) }
confF, err := os.Open(configFile)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
defer confF.Close()
viper.SetConfigFile(confPath)
err = viper.ReadInConfig()
viper.SetConfigType("yml")
err = viper.ReadConfig(confF)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
config.ServerUrl = viper.GetString(ConfServerUrl)
if config.ServerUrl == "" {
configMissing(ConfServerUrl)
if onlineMode {
config.TrackerUrl = viper.GetString(ConfTrackerUrl)
if config.TrackerUrl == "" {
configMissing(ConfTrackerUrl)
}
config.TrackerUrl = strings.TrimRight(config.TrackerUrl, "/")
}
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/")
config.TrackerProject = viper.GetInt(ConfTrackerProject)
config.Token = viper.GetString(ConfToken)
if config.Token == "" {
configMissing(ConfToken)
}
config.TrackerAlias = viper.GetString(ConfTrackerAlias)
config.WsBucketHost = viper.GetString(ConfWsBucketHost)
config.WsBucketScheme = viper.GetString(ConfWsBucketScheme)
config.ServerTimeout = viper.GetDuration(ConfServerTimeout)
@@ -201,9 +209,11 @@ func readConfig() {
}
if filePath := viper.GetString(ConfLogFile); filePath != "" {
f, err := os.OpenFile(filePath, os.O_CREATE | os.O_WRONLY | os.O_APPEND, 0644)
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
bufWriter := bufio.NewWriter(f)
if err != nil { panic(err) }
if err != nil {
panic(err)
}
exitHooks.Add(func() {
bufWriter.Flush()
f.Close()

View File

@@ -1,10 +1,14 @@
# OD-Database server settings
server:
# Connection URL
url: http://od-db.mine.terorie.com/api
# Server auth token
token:
url: https://tt.the-eye.eu/api
# OD-Database project id (for crawling)
project: 1
# Your worker alias
alias: changeme
# Websocket bucket host & scheme (ws/wss)
ws_bucket_host: https://wsb.the-eye.eu
ws_bucket_scheme: wss
# Request timeout
timeout: 60s
@@ -17,11 +21,7 @@ server:
# Time to wait after receiving an error
# from the server. Doesn't apply to uploads.
cooldown: 30s
# Upload chunk size
# If the value is too high, the upload fails.
upload_chunk: 1 MB
cooldown: 1s
upload_retries: 10
upload_retry_interval: 30s
@@ -29,10 +29,10 @@ server:
# Log output settings
output:
# Crawl statistics
crawl_stats: 1s
crawl_stats: 1m
# CPU/RAM/Job queue stats
resource_stats: 10s
resource_stats: 1m
# More output? (Every listed dir)
verbose: false
@@ -47,13 +47,13 @@ output:
# Crawler settings
crawl:
# Number of sites that can be processed at once
tasks: 100
tasks: 25
# Number of connections per site
# Please be careful with this setting!
# The crawler fires fast and more than
# ten connections can overwhelm a server.
connections: 4
connections: 1
# How often to retry getting data
# from the site before giving up
@@ -81,4 +81,4 @@ crawl:
# in memory.
# A negative value will cause all jobs
# to be stored in memory. (Don't do this)
job_buffer: 5000
job_buffer: -1

View File

@@ -3,6 +3,8 @@ package main
import (
"errors"
"fmt"
"github.com/valyala/fasthttp"
"net"
)
var ErrRateLimit = errors.New("too many requests")
@@ -15,3 +17,29 @@ type HttpError struct {
func (e HttpError) Error() string {
return fmt.Sprintf("http status %d", e.code)
}
func shouldRetry(err error) bool {
// HTTP errors
if httpErr, ok := err.(*HttpError); ok {
switch httpErr.code {
case fasthttp.StatusTooManyRequests:
return true
default:
// Don't retry HTTP error codes
return false
}
}
if dnsError, ok := err.(*net.DNSError); ok {
// Don't retry permanent DNS errors
return dnsError.IsTemporary
}
if netErr, ok := err.(*net.OpError); ok {
// Don't retry permanent network errors
return netErr.Temporary()
}
// Retry by default
return true
}

14
go.mod
View File

@@ -1,14 +0,0 @@
module github.com/syndtr/od-database-crawler
require (
github.com/beeker1121/goque v2.0.1+incompatible
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/sirupsen/logrus v1.3.0
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.1
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 // indirect
github.com/terorie/od-database-crawler v1.1.1
github.com/valyala/fasthttp v1.1.0
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3
)

66
go.sum
View File

@@ -1,66 +0,0 @@
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/beeker1121/goque v2.0.1+incompatible h1:5nJHPMqQLxUvGFc8m/NW2QzxKyc0zICmqs/JUsmEjwE=
github.com/beeker1121/goque v2.0.1+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/klauspost/compress v1.4.0 h1:8nsMz3tWa9SWWPL60G1V6CUsf4lLjWLTNEtibhe8gh8=
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e h1:+lIPJOWl+jSiJOc70QXJ07+2eg2Jy2EC7Mi11BWujeM=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.1 h1:5+8j8FTpnFV4nEImW/ofkzEt8VoOiLXxdYIDsB73T38=
github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/terorie/od-database-crawler v1.1.1 h1:Ca+ZqbZX3rVWBR8SDRzvroyxjBtUs75MQXZ9YG0gqGo=
github.com/terorie/od-database-crawler v1.1.1/go.mod h1:vVJ7pLkudrlUNp9qu24JCzQ8N6mFsrOmX1tPXr155DQ=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.1.0 h1:3BohG7mqwj4lq7PTX//7gLbUlzNvZSPmuHFnloXT0lw=
github.com/valyala/fasthttp v1.1.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 h1:MQ/ZZiDsUapFFiMS+vzwXkCTeEKaum+Do5rINYJDmxc=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 h1:czFLhve3vsQetD6JOJ8NZZvGQIXlnN3/yXxbT6/awxI=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

47
jenkins/Jenkinsfile vendored Normal file
View File

@@ -0,0 +1,47 @@
def remote = [:]
remote.name = 'remote'
remote.host = env.DEPLOY_HOST
remote.user = env.DEPLOY_USER
remote.identityFile = '/var/lib/jenkins/.ssh/id_rsa'
remote.knownHosts = '/var/lib/jenkins/.ssh/known_hosts'
remote.allowAnyHosts = true
remote.retryCount = 3
remote.retryWaitSec = 3
logLevel = 'FINER'
pipeline {
agent none
environment {
GOOS='linux'
CGO_ENABLED='0'
HOME='.'
}
stages {
stage('Build') {
agent {
docker {
image 'golang:latest'
}
}
steps {
sh 'mkdir -p /go/src/github.com/terorie/od-database-crawler'
sh 'cp -r *.go fasturl ds jenkins/build.sh "/go/src/github.com/terorie/od-database-crawler"'
sh 'cd /go/src/github.com/terorie/od-database-crawler && go get ./...'
sh './jenkins/build.sh'
stash includes: 'dist/', name: 'dist'
}
}
stage('Deploy') {
agent none
steps {
node('master') {
unstash 'dist'
sshCommand remote: remote, command: "ls od-database-crawler/"
sshPut remote: remote, from: 'dist/', into: 'od-database-crawler'
}
}
}
}
}

23
jenkins/build.sh Executable file
View File

@@ -0,0 +1,23 @@
#!/usr/bin/env bash
appname="od-database-crawler"
outdir="dist/"
tag="${BUILD_ID}_$(date +%Y-%m-%d)"
rm -rf "./${outdir}"
mkdir build 2> /dev/null
name=${outdir}${appname}-${tag}-linux
GOOS="linux" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}
name=${outdir}${appname}-${tag}-mac
GOOS="darwin" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}
name=${outdir}${appname}-${tag}-freebsd
GOOS="freebsd" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}

64
main.go
View File

@@ -8,6 +8,7 @@ import (
"github.com/spf13/viper"
"github.com/terorie/od-database-crawler/fasturl"
"os"
"os/signal"
"strings"
"sync/atomic"
"time"
@@ -17,7 +18,7 @@ var configFile string
var rootCmd = cobra.Command {
Use: "od-database-crawler",
Version: "1.2.0",
Version: "1.2.2",
Short: "OD-Database Go crawler",
Long: helpText,
PersistentPreRunE: preRun,
@@ -61,8 +62,6 @@ func preRun(cmd *cobra.Command, args []string) error {
if err := os.MkdirAll("queue", 0755);
err != nil { panic(err) }
readConfig()
return nil
}
@@ -75,25 +74,31 @@ func main() {
}
func cmdBase(_ *cobra.Command, _ []string) {
// TODO Graceful shutdown
appCtx := context.Background()
forceCtx := context.Background()
onlineMode = true
readConfig()
appCtx, soft := context.WithCancel(context.Background())
forceCtx, hard := context.WithCancel(context.Background())
go hardShutdown(forceCtx)
go listenCtrlC(soft, hard)
inRemotes := make(chan *OD)
go Schedule(forceCtx, inRemotes)
go Schedule(appCtx, inRemotes)
ticker := time.NewTicker(config.Recheck)
defer ticker.Stop()
for {
select {
case <-appCtx.Done():
return
goto shutdown
case <-ticker.C:
t, err := FetchTask()
if err != nil {
logrus.WithError(err).
Error("Failed to get new task")
time.Sleep(viper.GetDuration(ConfCooldown))
if !sleep(viper.GetDuration(ConfCooldown), appCtx) {
goto shutdown
}
continue
}
if t == nil {
@@ -109,13 +114,7 @@ func cmdBase(_ *cobra.Command, _ []string) {
if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme {
// Not an error
err = nil
// Give back task
//err2 := CancelTask(t.WebsiteId)
//if err2 != nil {
// logrus.Error(err2)
//}
// TODO FTP crawler
continue
} else if err != nil {
logrus.WithError(err).
@@ -126,9 +125,15 @@ func cmdBase(_ *cobra.Command, _ []string) {
ScheduleTask(inRemotes, t, &baseUri)
}
}
shutdown:
globalWait.Wait()
}
func cmdCrawler(_ *cobra.Command, args []string) error {
onlineMode = false
readConfig()
arg := args[0]
// https://github.com/golang/go/issues/19779
if !strings.Contains(arg, "://") {
@@ -161,3 +166,30 @@ func cmdCrawler(_ *cobra.Command, args []string) error {
return nil
}
func listenCtrlC(soft, hard context.CancelFunc) {
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
<-c
logrus.Info(">>> Shutting down crawler... <<<")
soft()
<-c
logrus.Warning(">>> Force shutdown! <<<")
hard()
}
func hardShutdown(c context.Context) {
<-c.Done()
os.Exit(1)
}
func sleep(d time.Duration, c context.Context) bool {
select {
case <-time.After(d):
return true
case <-c.Done():
return false
}
}

View File

@@ -7,19 +7,29 @@ import (
"time"
)
type ResultCode int
const (
TR_OK = ResultCode(iota)
TR_FAIL = 1
TR_SKIP = 2
)
type Task struct {
WebsiteId uint64 `json:"website_id"`
Url string `json:"url"`
WebsiteId uint64 `json:"website_id"`
Url string `json:"url"`
UploadToken string `json:"upload_token"`
TaskId int64
}
type TaskResult struct {
StatusCode string `json:"status_code"`
FileCount uint64 `json:"file_count"`
ErrorCount uint64 `json:"-"`
StartTime time.Time `json:"-"`
StartTimeUnix int64 `json:"start_time"`
EndTimeUnix int64 `json:"end_time"`
WebsiteId uint64 `json:"website_id"`
ResultCode ResultCode `json:"status_code"`
FileCount uint64 `json:"file_count"`
ErrorCount uint64 `json:"-"`
StartTime time.Time `json:"-"`
StartTimeUnix int64 `json:"start_time"`
EndTimeUnix int64 `json:"end_time"`
WebsiteId uint64 `json:"website_id"`
}
type Job struct {
@@ -51,13 +61,16 @@ func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) {
defer o.Scanned.Unlock()
exists = o.Scanned.Get(k)
if exists { return true }
if exists {
return true
}
o.Scanned.Put(k)
return false
}
type errorString string
func (e errorString) Error() string {
return string(e)
}

View File

@@ -34,13 +34,16 @@ func Schedule(c context.Context, remotes <-chan *OD) {
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue
if err := os.RemoveAll(queuePath);
err != nil { panic(err) }
if err := os.RemoveAll(queuePath); err != nil {
panic(err)
}
// Start new queue
var err error
remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) }
if err != nil {
panic(err)
}
// Spawn workers
for i := 0; i < config.Workers; i++ {
@@ -77,12 +80,12 @@ func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
globalWait.Add(1)
now := time.Now()
od := &OD {
Task: *t,
od := &OD{
Task: *t,
BaseUri: *u,
Result: TaskResult {
WebsiteId: t.WebsiteId,
StartTime: now,
Result: TaskResult{
WebsiteId: t.WebsiteId,
StartTime: now,
StartTimeUnix: now.Unix(),
},
}
@@ -117,7 +120,7 @@ func (o *OD) Watch(results chan File) {
// Open crawl results file
f, err := os.OpenFile(
filePath,
os.O_CREATE | os.O_RDWR | os.O_TRUNC,
os.O_CREATE|os.O_RDWR|os.O_TRUNC,
0644,
)
if err != nil {
@@ -145,7 +148,7 @@ func (o *OD) Watch(results chan File) {
}
// Upload results
err = PushResult(&o.Result, f)
err = PushResult(&o.Task, f)
if err != nil {
logrus.WithError(err).
Error("Failed uploading crawl results")
@@ -170,24 +173,18 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
// Log finish
logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
"duration": time.Since(o.Result.StartTime),
}).Info("Crawler finished")
// Set status code
now := time.Now()
o.Result.EndTimeUnix = now.Unix()
fileCount := atomic.LoadUint64(&o.Result.FileCount)
if fileCount == 0 {
errorCount := atomic.LoadUint64(&o.Result.ErrorCount)
if errorCount == 0 {
o.Result.StatusCode = "empty"
} else {
o.Result.StatusCode = "directory listing failed"
}
if atomic.LoadUint64(&o.Result.ErrorCount) != 0 {
o.Result.ResultCode = TR_FAIL
} else {
o.Result.StatusCode = "success"
o.Result.ResultCode = TR_OK
}
}
@@ -205,11 +202,17 @@ func (t *Task) collect(results chan File, f *os.File) error {
result.Path = fasturl.PathUnescape(result.Path)
result.Name = fasturl.PathUnescape(result.Name)
resJson, err := json.Marshal(result)
if err != nil { panic(err) }
if err != nil {
panic(err)
}
_, err = f.Write(resJson)
if err != nil { return err }
if err != nil {
return err
}
_, err = f.Write([]byte{'\n'})
if err != nil { return err }
if err != nil {
return err
}
}
return nil

304
server.go
View File

@@ -2,48 +2,125 @@ package main
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/fasthttp/websocket"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"golang.org/x/time/rate"
"io"
"mime/multipart"
"io/ioutil"
"net/http"
"net/url"
"os"
"strconv"
"time"
)
var serverClient = http.Client {
Timeout: config.ServerTimeout,
var serverWorker *TrackerWorker
var serverClient = http.Client{
Timeout: config.ServerTimeout,
Transport: new(ServerTripper),
}
var serverUserAgent = "od-database-crawler/" + rootCmd.Version
func getOrCreateWorker() {
if _, err := os.Stat("worker.json"); os.IsNotExist(err) {
req := CreateTrackerWorkerRequest{
Alias: config.TrackerAlias,
}
body, _ := json.Marshal(&req)
buf := bytes.NewBuffer(body)
resp, _ := serverClient.Post(config.TrackerUrl+"/worker/create", "application/json", buf)
workerResponse := CreateTrackerWorkerResponse{}
respBody, _ := ioutil.ReadAll(resp.Body)
_ = json.Unmarshal(respBody, &workerResponse)
workerJsonData, _ := json.Marshal(&workerResponse.Content.Worker)
fp, _ := os.OpenFile("worker.json", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
_, _ = fp.Write(workerJsonData)
//Request ASSIGN permission
serverWorker = &workerResponse.Content.Worker
accessReq, _ := json.Marshal(WorkerAccessRequest{
Project: config.TrackerProject,
Assign: true,
Submit: false,
})
buf = bytes.NewBuffer(accessReq)
res, err := serverClient.Post(config.TrackerUrl+"/project/request_access", "application/json", buf)
if err != nil {
panic(err)
}
logrus.WithFields(logrus.Fields{
"response": res.StatusCode,
}).Info("Requested ASSIGN permission")
} else {
var worker TrackerWorker
fp, _ := os.OpenFile("worker.json", os.O_RDONLY, 0600)
workerJsonData, _ := ioutil.ReadAll(fp)
_ = json.Unmarshal(workerJsonData, &worker)
serverWorker = &worker
}
}
func FetchTask() (t *Task, err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/get",
url.Values{ "token": {config.Token} })
if err != nil { return }
if serverWorker == nil {
getOrCreateWorker()
}
res, err := serverClient.Get(config.TrackerUrl + "/task/get/" + strconv.Itoa(config.TrackerProject))
if err != nil {
return
}
defer res.Body.Close()
switch res.StatusCode {
case 200:
break
case 404, 500:
return nil, nil
default:
return nil, fmt.Errorf("http %s", res.Status)
}
t = new(Task)
err = json.NewDecoder(res.Body).Decode(t)
if err != nil { return }
jsonResponse := FetchTaskResponse{}
err = json.NewDecoder(res.Body).Decode(&jsonResponse)
if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil {
return
}
if !jsonResponse.Ok {
if jsonResponse.Message == "No task available" {
return nil, nil
}
return nil, errors.New(jsonResponse.Message)
}
task := Task{}
err = json.Unmarshal([]byte(jsonResponse.Content.Task.Recipe), &task)
if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil {
return
}
t = &task
t.TaskId = jsonResponse.Content.Task.Id
return
}
func PushResult(result *TaskResult, f *os.File) (err error) {
if result.WebsiteId == 0 {
func PushResult(task *Task, f *os.File) (err error) {
if task.WebsiteId == 0 {
// Not a real result, don't push
return nil
}
@@ -54,10 +131,10 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
return
}
err = uploadChunks(result.WebsiteId, f)
err = uploadWebsocket(f, task.UploadToken)
if err != nil {
logrus.Errorf("Failed to upload file list: %s", err)
err2 := CancelTask(result.WebsiteId)
err2 := releaseTask(task, TR_FAIL)
if err2 != nil {
logrus.Error(err2)
}
@@ -65,91 +142,62 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
}
// Upload result ignoring errors
uploadResult(result)
_ = releaseTask(task, TR_OK)
return
}
func uploadChunks(websiteId uint64, f *os.File) error {
eof := false
for iter := 1; !eof; iter++ {
// TODO Stream with io.Pipe?
var b bytes.Buffer
func uploadWebsocket(f *os.File, token string) (err error) {
multi := multipart.NewWriter(&b)
u := url.URL{Scheme: config.WsBucketScheme, Host: config.WsBucketHost, Path: "/upload"}
// Set upload fields
var err error
err = multi.WriteField("token", config.Token)
if err != nil { return err }
err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId))
if err != nil { return err }
// Copy chunk to file_list
formFile, err := multi.CreateFormFile("file_list", "file_list")
var n int64
n, err = io.CopyN(formFile, f, config.ChunkSize)
if err != io.EOF && err != nil {
return err
}
if n == 0 {
// Don't upload, no content
return nil
} else if n < config.ChunkSize {
err = nil
// Break at end of iteration
eof = true
}
multi.Close()
for retries := 0; retries < viper.GetInt(ConfUploadRetries); retries++ {
if retries > 0 {
// Error occurred, retry upload
time.Sleep(viper.GetDuration(ConfUploadRetryInterval))
}
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/upload",
&b)
req.Header.Set("content-type", multi.FormDataContentType())
if err != nil { continue }
res, err := serverClient.Do(req)
if err != nil { continue }
res.Body.Close()
if res.StatusCode != http.StatusOK {
logrus.WithField("status", res.Status).
WithField("part", iter).
Errorf("Upload failed")
continue
}
// Upload successful
break
}
logrus.WithField("id", websiteId).
WithField("part", iter).
Infof("Uploaded files chunk")
header := http.Header{}
header.Add("X-Upload-Token", token)
conn, _, err := websocket.DefaultDialer.Dial(u.String(), header)
if err != nil {
return
}
conn.EnableWriteCompression(true) //TODO: Is this necessary?
socketWriter, _ := conn.NextWriter(websocket.BinaryMessage)
_, _ = io.Copy(socketWriter, f)
err = socketWriter.Close()
if err != nil {
logrus.Error("FIXME: couldn't do file upload")
return
}
err = conn.Close()
if err != nil {
return
}
return nil
}
func uploadResult(result *TaskResult) (err error) {
resultEnc, err := json.Marshal(result)
if err != nil { panic(err) }
func releaseTask(task *Task, taskResult ResultCode) (err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/complete",
url.Values {
"token": {config.Token},
"result": {string(resultEnc)},
},
req := releaseTaskRequest{
TaskId: task.TaskId,
ResultCode: taskResult,
// TODO Will implement verification in a later ODDB update
Verification: 0,
}
resultEnc, err := json.Marshal(&req)
if err != nil {
panic(err)
}
body := bytes.NewBuffer(resultEnc)
res, err := serverClient.Post(
config.TrackerUrl+"/task/release",
"application/json",
body,
)
if err != nil { return }
if err != nil {
return
}
res.Body.Close()
if res.StatusCode != http.StatusOK {
@@ -159,20 +207,66 @@ func uploadResult(result *TaskResult) (err error) {
return
}
func CancelTask(websiteId uint64) (err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/cancel",
url.Values{
"token": {config.Token},
"website_id": {strconv.FormatUint(websiteId, 10)},
},
)
if err != nil { return }
res.Body.Close()
type ServerTripper struct{}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to cancel task: %s", res.Status)
func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) {
req.Header.Set("User-Agent", serverUserAgent)
//TODO: Use task_tracker/client ?
if serverWorker != nil {
req.Header.Add("X-Worker-Id", strconv.Itoa(serverWorker.Id))
req.Header.Add("X-Secret", base64.StdEncoding.EncodeToString(serverWorker.Secret))
}
return
return http.DefaultTransport.RoundTrip(req)
}
// https://github.com/simon987/task_tracker/blob/master/api/models.go
type releaseTaskRequest struct {
TaskId int64 `json:"task_id"`
ResultCode ResultCode `json:"result"`
Verification int64 `json:"verification"`
}
type WorkerAccessRequest struct {
Assign bool `json:"assign"`
Submit bool `json:"submit"`
Project int `json:"project"`
}
type FetchTaskResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Task struct {
Id int64 `json:"id"`
Priority int64 `json:"priority"`
Project struct {
Id int64 `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
AssignRate rate.Limit `json:"assign_rate"`
SubmitRate rate.Limit `json:"submit_rate"`
} `json:"project"`
Recipe string `json:"recipe"`
} `json:"task"`
} `json:"content"`
}
type TrackerWorker struct {
Alias string `json:"alias"`
Id int `json:"id"`
Secret []byte `json:"secret"`
}
type CreateTrackerWorkerResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Worker TrackerWorker `json:"worker"`
} `json:"content"`
}
type CreateTrackerWorkerRequest struct {
Alias string `json:"alias"`
}

View File

@@ -3,7 +3,6 @@ package main
import (
"github.com/beeker1121/goque"
"github.com/sirupsen/logrus"
"github.com/valyala/fasthttp"
"math"
"sort"
"strings"
@@ -15,8 +14,8 @@ import (
var globalWait sync.WaitGroup
type WorkerContext struct {
OD *OD
Queue *BufferedQueue
OD *OD
Queue *BufferedQueue
lastRateLimit time.Time
numRateLimits int
}
@@ -55,20 +54,18 @@ func (w *WorkerContext) step(results chan<- File, job Job) {
if err != nil {
job.Fails++
if httpErr, ok := err.(*HttpError); ok {
switch httpErr.code {
case fasthttp.StatusTooManyRequests:
err = ErrRateLimit
default:
// Don't retry HTTP error codes
return
}
if !shouldRetry(err) {
atomic.AddUint64(&totalAborted, 1)
//logrus.WithField("url", job.UriStr).
// WithError(err).
// Error("Giving up after failure")
return
}
if job.Fails > config.Retries {
atomic.AddUint64(&totalAborted, 1)
logrus.WithField("url", job.UriStr).
Errorf("Giving up after %d fails", job.Fails)
//logrus.WithField("url", job.UriStr).
// Errorf("Giving up after %d fails", job.Fails)
} else {
atomic.AddUint64(&totalRetries, 1)
if err == ErrRateLimit {
@@ -91,7 +88,9 @@ func (w *WorkerContext) step(results chan<- File, job Job) {
}
func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
if len(job.Uri.Path) == 0 { return }
if len(job.Uri.Path) == 0 {
return
}
if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
// Load directory
links, err := GetDir(job, f)
@@ -162,10 +161,10 @@ func (w *WorkerContext) queueJob(job Job) {
w.OD.Wait.Add(1)
if w.numRateLimits > 0 {
if time.Since(w.lastRateLimit) > 5 * time.Second {
if time.Since(w.lastRateLimit) > 5*time.Second {
w.numRateLimits = 0
} else {
time.Sleep(time.Duration(math.Sqrt(float64(50 * w.numRateLimits))) *
time.Sleep(time.Duration(math.Sqrt(float64(50*w.numRateLimits))) *
100 * time.Millisecond)
}
}