29 Commits

Author SHA1 Message Date
simon987
a962c60b82 Don't panic on file upload error 2019-04-06 14:56:22 -04:00
simon987
24f0bd91f7 Remove debug messages & don't use disk queue by default 2019-04-06 12:21:35 -04:00
simon987
84c10e1981 Change default config 2019-04-05 17:53:00 -04:00
simon987
860fa79327 Jenkins setup 2019-04-04 19:22:14 -04:00
simon987
76bc8293d6 minimum viable 2019-03-30 09:02:55 -04:00
simon987
3470be6086 More work on task_tracker integration 2019-03-09 16:38:04 -05:00
Richard Patel
60471a081e Switch to simon987/task_tracker 2019-02-28 23:51:26 +01:00
dependabot[bot]
0b3f0d87fe Upgrade fasthttp to 1.2.0
Bumps [github.com/valyala/fasthttp](https://github.com/valyala/fasthttp) from 1.1.0 to 1.2.0.
- [Release notes](https://github.com/valyala/fasthttp/releases)
- [Commits](https://github.com/valyala/fasthttp/compare/v1.1.0...v1.2.0)

Thanks bot

Signed-off-by: dependabot[bot] <support@dependabot.com>
2019-02-28 22:42:40 +01:00
terorie
da9c75e392 Reduce Docker image size 2019-02-22 21:37:04 +01:00
Pascal
8947e05d0c Fix Dockerfile
Fixes #22
Credit to @pascaldulieu
2019-02-22 21:11:55 +01:00
Richard Patel
8c5f99d616 More descriptive error if /task/get returns invalid JSON 2019-02-22 20:17:59 +01:00
Richard Patel
206ea0e91d Simplify config 2019-02-22 18:50:35 +01:00
Richard Patel
8b9d8bfd17 Fix README.md format 2019-02-22 06:04:10 +01:00
Richard Patel
c9ff102d80 Fix Dockerfile 2019-02-22 06:00:57 +01:00
Richard Patel
88856c1c19 Flag explanation in README.md 2019-02-22 05:59:59 +01:00
Richard Patel
9e9b606250 Merge branch 'stable' 2019-02-22 05:37:52 +01:00
Richard Patel
326e29e5e4 Reset to stable branch 2019-02-22 05:37:45 +01:00
Richard Patel
c2acd5463f Restore .travis.yml
Now handling auto-build over Docker Hub directly
2019-02-22 05:16:25 +01:00
Richard Patel
e4d04e6a5f go.mod: Fix package path
lol
2019-02-22 05:10:43 +01:00
terorie
9f1402e841 New Dockerfile and Travis Config (#23) 2019-02-22 05:07:27 +01:00
terorie
7c8ab50ee4 Merge stable into master 2019-02-13 15:32:40 +01:00
terorie
281d2d17d6 Update config.yml 2019-02-13 15:32:00 +01:00
Richard Patel
45cbd4d535 Disable resume feature 2019-02-05 15:44:59 +01:00
Richard Patel
771d49f2dd Fix WaitGroup deadlock 2019-02-03 17:14:20 +01:00
Richard Patel
dbd787aa81 Fix WaitGroup crash 2019-02-03 17:09:43 +01:00
Richard Patel
cea6c1658b Bugfix: Don't schedule new tasks during shutdown 2019-02-03 17:02:44 +01:00
terorie
885af5bb3b Beta task resuming 2019-02-03 16:50:08 +01:00
Richard Patel
b18b70f798 Fix segfault (thanks Pikami) 2019-02-03 14:00:17 +01:00
Richard Patel
9d5f549774 Better server User-Agent string 2019-02-03 12:23:21 +01:00
13 changed files with 547 additions and 363 deletions

15
Dockerfile Normal file
View File

@@ -0,0 +1,15 @@
FROM golang:alpine as builder
ADD . /go/src/github.com/terorie/od-database-crawler
RUN apk add git \
&& go get -d -v github.com/terorie/od-database-crawler \
&& CGO_ENABLED=0 go install -a \
-installsuffix cgo \
-ldflags="-s -w" \
github.com/terorie/od-database-crawler
FROM scratch
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /go/bin/od-database-crawler /bin/
WORKDIR /oddb
VOLUME [ "/oddb" ]
CMD ["/bin/od-database-crawler", "server"]

View File

@@ -9,7 +9,9 @@
https://od-db.the-eye.eu/ https://od-db.the-eye.eu/
#### Usage ## Usage
### Deploys
1. With Config File (if `config.yml` found in working dir) 1. With Config File (if `config.yml` found in working dir)
- Download [default config](https://github.com/terorie/od-database-crawler/blob/master/config.yml) - Download [default config](https://github.com/terorie/od-database-crawler/blob/master/config.yml)
@@ -22,3 +24,31 @@ https://od-db.the-eye.eu/
- Every flag is available as an environment variable: - Every flag is available as an environment variable:
`--server.crawl_stats` ➡️ `OD_SERVER_CRAWL_STATS` `--server.crawl_stats` ➡️ `OD_SERVER_CRAWL_STATS`
- Start with `./od-database-crawler server <flags>` - Start with `./od-database-crawler server <flags>`
3. With Docker
```bash
docker run \
-e OD_SERVER_URL=xxx \
-e OD_SERVER_TOKEN=xxx \
terorie/od-database-crawler
```
### Flag reference
Here are the most important config flags. For more fine control, take a look at `/config.yml`.
| Flag/Environment | Description | Example |
| ------------------------------------------------------- | ------------------------------------------------------------ | ----------------------------------- |
| `server.url`<br />`OD_SERVER_URL` | OD-DB Server URL | `https://od-db.mine.the-eye.eu/api` |
| `server.token`<br />`OD_SERVER_TOKEN` | OD-DB Server Access Token | _Ask Hexa **TM**_ |
| `server.recheck`<br />`OD_SERVER_RECHECK` | Job Fetching Interval | `3s` |
| `output.crawl_stats`<br />`OD_OUTPUT_CRAWL_STATS` | Crawl Stats Logging Interval (0 = disabled) | `500ms` |
| `output.resource_stats`<br />`OD_OUTPUT_RESORUCE_STATS` | Resource Stats Logging Interval (0 = disabled) | `8s` |
| `output.log`<br />`OD_OUTPUT_LOG` | Log File (none = disabled) | `crawler.log` |
| `crawl.tasks`<br />`OD_CRAWL_TASKS` | Max number of sites to crawl concurrently | `500` |
| `crawl.connections`<br />`OD_CRAWL_CONNECTIONS` | HTTP connections per site | `1` |
| `crawl.retries`<br />`OD_CRAWL_RETRIES` | How often to retry after a temporary failure (e.g. `HTTP 429` or timeouts) | `5` |
| `crawl.dial_timeout`<br />`OD_CRAWL_DIAL_TIMEOUT` | TCP Connect timeout | `5s` |
| `crawl.timeout`<br />`OD_CRAWL_TIMEOUT` | HTTP request timeout | `20s` |
| `crawl.user-agent`<br />`OD_CRAWL_USER_AGENT` | HTTP Crawler User-Agent | `googlebot/1.2.3` |
| `crawl.job_buffer`<br />`OD_CRAWL_JOB_BUFFER` | Number of URLs to keep in memory/cache, per job. The rest is offloaded to disk. Decrease this value if the crawler uses too much RAM. (0 = Disable Cache, -1 = Only use Cache) | `5000` |

165
config.go
View File

@@ -4,6 +4,7 @@ import (
"bufio" "bufio"
"fmt" "fmt"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/spf13/viper" "github.com/spf13/viper"
"io" "io"
"os" "os"
@@ -12,8 +13,11 @@ import (
) )
var config struct { var config struct {
ServerUrl string TrackerUrl string
Token string TrackerProject int
TrackerAlias string
WsBucketScheme string
WsBucketHost string
ServerTimeout time.Duration ServerTimeout time.Duration
Recheck time.Duration Recheck time.Duration
ChunkSize int64 ChunkSize int64
@@ -26,9 +30,14 @@ var config struct {
JobBufferSize int JobBufferSize int
} }
var onlineMode bool
const ( const (
ConfServerUrl = "server.url" ConfTrackerUrl = "server.url"
ConfToken = "server.token" ConfTrackerProject = "server.project"
ConfTrackerAlias = "server.alias"
ConfWsBucketScheme = "server.ws_bucket_scheme"
ConfWsBucketHost = "server.ws_bucket_host"
ConfServerTimeout = "server.timeout" ConfServerTimeout = "server.timeout"
ConfRecheck = "server.recheck" ConfRecheck = "server.recheck"
ConfCooldown = "server.cooldown" ConfCooldown = "server.cooldown"
@@ -54,8 +63,62 @@ const (
func prepareConfig() { func prepareConfig() {
pf := rootCmd.PersistentFlags() pf := rootCmd.PersistentFlags()
bind := func(s string) { pf.SortFlags = false
if err := viper.BindPFlag(s, pf.Lookup(s)); err != nil { pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG")
pf.String(ConfTrackerUrl, "https://tt.the-eye.eu/api", "task_tracker api URL")
pf.String(ConfTrackerProject, "1", "task_tracker project id")
pf.String(ConfWsBucketScheme, "wss", "ws_bucket scheme")
pf.String(ConfWsBucketHost, "wsb.the-eye.eu", "ws_bucket host")
pf.String(ConfTrackerAlias, "changeme", "task_tracker worker alias")
pf.Duration(ConfServerTimeout, 60*time.Second, "OD-DB request timeout")
pf.Duration(ConfRecheck, 1*time.Second, "OD-DB: Poll interval for new jobs")
pf.Duration(ConfCooldown, 1*time.Minute, "OD-DB: Time to wait after a server-side error")
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
pf.Duration(ConfUploadRetryInterval, 30*time.Second, "OD-DB: Time to wait between upload retries")
pf.Uint(ConfTasks, 25, "Crawler: Max concurrent tasks")
pf.Uint(ConfWorkers, 1, "Crawler: Connections per server")
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
pf.Duration(ConfDialTimeout, 10*time.Second, "Crawler: Handshake timeout")
pf.Duration(ConfTimeout, 30*time.Second, "Crawler: Request timeout")
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
pf.Int(ConfJobBufferSize, -1, "Crawler: Task queue cache size")
pf.Duration(ConfCrawlStats, 500*time.Second, "Log: Crawl stats interval")
pf.Duration(ConfAllocStats, 500*time.Second, "Log: Resource stats interval")
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors")
pf.String(ConfLogFile, "crawler.log", "Log file")
// Bind all flags to Viper
pf.VisitAll(func(flag *pflag.Flag) {
s := flag.Name
s = strings.TrimLeft(s, "-")
if err := viper.BindPFlag(s, flag); err != nil {
panic(err) panic(err)
} }
var envKey string var envKey string
@@ -65,71 +128,7 @@ func prepareConfig() {
if err := viper.BindEnv(s, envKey); err != nil { if err := viper.BindEnv(s, envKey); err != nil {
panic(err) panic(err)
} }
} })
pf.SortFlags = false
pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG")
pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL")
bind(ConfServerUrl)
pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)")
bind(ConfToken)
pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout")
bind(ConfServerTimeout)
pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs")
bind(ConfRecheck)
pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error")
bind(ConfCooldown)
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
bind(ConfChunkSize)
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
bind(ConfUploadRetries)
pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries")
bind(ConfUploadRetryInterval)
pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks")
bind(ConfTasks)
pf.Uint(ConfWorkers, 4, "Crawler: Connections per server")
bind(ConfWorkers)
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
bind(ConfRetries)
pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout")
bind(ConfDialTimeout)
pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout")
bind(ConfTimeout)
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
bind(ConfUserAgent)
pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size")
bind(ConfJobBufferSize)
pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval")
bind(ConfCrawlStats)
pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval")
bind(ConfAllocStats)
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
bind(ConfVerbose)
pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors")
bind(ConfPrintHTTP)
pf.String(ConfLogFile, "crawler.log", "Log file")
bind(ConfLogFile)
} }
func readConfig() { func readConfig() {
@@ -157,16 +156,20 @@ func readConfig() {
} }
} }
config.ServerUrl = viper.GetString(ConfServerUrl) if onlineMode {
if config.ServerUrl == "" { config.TrackerUrl = viper.GetString(ConfTrackerUrl)
configMissing(ConfServerUrl) if config.TrackerUrl == "" {
configMissing(ConfTrackerUrl)
} }
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/") config.TrackerUrl = strings.TrimRight(config.TrackerUrl, "/")
}
config.TrackerProject = viper.GetInt(ConfTrackerProject)
config.Token = viper.GetString(ConfToken) config.TrackerAlias = viper.GetString(ConfTrackerAlias)
if config.Token == "" {
configMissing(ConfToken) config.WsBucketHost = viper.GetString(ConfWsBucketHost)
}
config.WsBucketScheme = viper.GetString(ConfWsBucketScheme)
config.ServerTimeout = viper.GetDuration(ConfServerTimeout) config.ServerTimeout = viper.GetDuration(ConfServerTimeout)
@@ -208,7 +211,9 @@ func readConfig() {
if filePath := viper.GetString(ConfLogFile); filePath != "" { if filePath := viper.GetString(ConfLogFile); filePath != "" {
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
bufWriter := bufio.NewWriter(f) bufWriter := bufio.NewWriter(f)
if err != nil { panic(err) } if err != nil {
panic(err)
}
exitHooks.Add(func() { exitHooks.Add(func() {
bufWriter.Flush() bufWriter.Flush()
f.Close() f.Close()

View File

@@ -1,10 +1,14 @@
# OD-Database server settings # OD-Database server settings
server: server:
# Connection URL # Connection URL
url: http://od-db.mine.terorie.com/api url: https://tt.the-eye.eu/api
# OD-Database project id (for crawling)
# Server auth token project: 1
token: # Your worker alias
alias: changeme
# Websocket bucket host & scheme (ws/wss)
ws_bucket_host: https://wsb.the-eye.eu
ws_bucket_scheme: wss
# Request timeout # Request timeout
timeout: 60s timeout: 60s
@@ -17,11 +21,7 @@ server:
# Time to wait after receiving an error # Time to wait after receiving an error
# from the server. Doesn't apply to uploads. # from the server. Doesn't apply to uploads.
cooldown: 30s cooldown: 1s
# Upload chunk size
# If the value is too high, the upload fails.
upload_chunk: 1 MB
upload_retries: 10 upload_retries: 10
upload_retry_interval: 30s upload_retry_interval: 30s
@@ -29,10 +29,10 @@ server:
# Log output settings # Log output settings
output: output:
# Crawl statistics # Crawl statistics
crawl_stats: 1s crawl_stats: 1m
# CPU/RAM/Job queue stats # CPU/RAM/Job queue stats
resource_stats: 10s resource_stats: 1m
# More output? (Every listed dir) # More output? (Every listed dir)
verbose: false verbose: false
@@ -47,13 +47,13 @@ output:
# Crawler settings # Crawler settings
crawl: crawl:
# Number of sites that can be processed at once # Number of sites that can be processed at once
tasks: 100 tasks: 25
# Number of connections per site # Number of connections per site
# Please be careful with this setting! # Please be careful with this setting!
# The crawler fires fast and more than # The crawler fires fast and more than
# ten connections can overwhelm a server. # ten connections can overwhelm a server.
connections: 4 connections: 1
# How often to retry getting data # How often to retry getting data
# from the site before giving up # from the site before giving up
@@ -81,4 +81,4 @@ crawl:
# in memory. # in memory.
# A negative value will cause all jobs # A negative value will cause all jobs
# to be stored in memory. (Don't do this) # to be stored in memory. (Don't do this)
job_buffer: 5000 job_buffer: -1

14
go.mod
View File

@@ -1,14 +0,0 @@
module github.com/syndtr/od-database-crawler
require (
github.com/beeker1121/goque v2.0.1+incompatible
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/sirupsen/logrus v1.3.0
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.1
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 // indirect
github.com/terorie/od-database-crawler v1.1.1
github.com/valyala/fasthttp v1.1.0
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3
)

66
go.sum
View File

@@ -1,66 +0,0 @@
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/beeker1121/goque v2.0.1+incompatible h1:5nJHPMqQLxUvGFc8m/NW2QzxKyc0zICmqs/JUsmEjwE=
github.com/beeker1121/goque v2.0.1+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/klauspost/compress v1.4.0 h1:8nsMz3tWa9SWWPL60G1V6CUsf4lLjWLTNEtibhe8gh8=
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e h1:+lIPJOWl+jSiJOc70QXJ07+2eg2Jy2EC7Mi11BWujeM=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.1 h1:5+8j8FTpnFV4nEImW/ofkzEt8VoOiLXxdYIDsB73T38=
github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/terorie/od-database-crawler v1.1.1 h1:Ca+ZqbZX3rVWBR8SDRzvroyxjBtUs75MQXZ9YG0gqGo=
github.com/terorie/od-database-crawler v1.1.1/go.mod h1:vVJ7pLkudrlUNp9qu24JCzQ8N6mFsrOmX1tPXr155DQ=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.1.0 h1:3BohG7mqwj4lq7PTX//7gLbUlzNvZSPmuHFnloXT0lw=
github.com/valyala/fasthttp v1.1.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 h1:MQ/ZZiDsUapFFiMS+vzwXkCTeEKaum+Do5rINYJDmxc=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 h1:czFLhve3vsQetD6JOJ8NZZvGQIXlnN3/yXxbT6/awxI=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

47
jenkins/Jenkinsfile vendored Normal file
View File

@@ -0,0 +1,47 @@
def remote = [:]
remote.name = 'remote'
remote.host = env.DEPLOY_HOST
remote.user = env.DEPLOY_USER
remote.identityFile = '/var/lib/jenkins/.ssh/id_rsa'
remote.knownHosts = '/var/lib/jenkins/.ssh/known_hosts'
remote.allowAnyHosts = true
remote.retryCount = 3
remote.retryWaitSec = 3
logLevel = 'FINER'
pipeline {
agent none
environment {
GOOS='linux'
CGO_ENABLED='0'
HOME='.'
}
stages {
stage('Build') {
agent {
docker {
image 'golang:latest'
}
}
steps {
sh 'mkdir -p /go/src/github.com/terorie/od-database-crawler'
sh 'cp -r *.go fasturl ds jenkins/build.sh "/go/src/github.com/terorie/od-database-crawler"'
sh 'cd /go/src/github.com/terorie/od-database-crawler && go get ./...'
sh './jenkins/build.sh'
stash includes: 'dist/', name: 'dist'
}
}
stage('Deploy') {
agent none
steps {
node('master') {
unstash 'dist'
sshCommand remote: remote, command: "ls od-database-crawler/"
sshPut remote: remote, from: 'dist/', into: 'od-database-crawler'
}
}
}
}
}

23
jenkins/build.sh Executable file
View File

@@ -0,0 +1,23 @@
#!/usr/bin/env bash
appname="od-database-crawler"
outdir="dist/"
tag="${BUILD_ID}_$(date +%Y-%m-%d)"
rm -rf "./${outdir}"
mkdir build 2> /dev/null
name=${outdir}${appname}-${tag}-linux
GOOS="linux" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}
name=${outdir}${appname}-${tag}-mac
GOOS="darwin" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}
name=${outdir}${appname}-${tag}-freebsd
GOOS="freebsd" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}

64
main.go
View File

@@ -8,6 +8,7 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/terorie/od-database-crawler/fasturl" "github.com/terorie/od-database-crawler/fasturl"
"os" "os"
"os/signal"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -17,7 +18,7 @@ var configFile string
var rootCmd = cobra.Command { var rootCmd = cobra.Command {
Use: "od-database-crawler", Use: "od-database-crawler",
Version: "1.2.1", Version: "1.2.2",
Short: "OD-Database Go crawler", Short: "OD-Database Go crawler",
Long: helpText, Long: helpText,
PersistentPreRunE: preRun, PersistentPreRunE: preRun,
@@ -61,8 +62,6 @@ func preRun(cmd *cobra.Command, args []string) error {
if err := os.MkdirAll("queue", 0755); if err := os.MkdirAll("queue", 0755);
err != nil { panic(err) } err != nil { panic(err) }
readConfig()
return nil return nil
} }
@@ -75,25 +74,31 @@ func main() {
} }
func cmdBase(_ *cobra.Command, _ []string) { func cmdBase(_ *cobra.Command, _ []string) {
// TODO Graceful shutdown onlineMode = true
appCtx := context.Background() readConfig()
forceCtx := context.Background()
appCtx, soft := context.WithCancel(context.Background())
forceCtx, hard := context.WithCancel(context.Background())
go hardShutdown(forceCtx)
go listenCtrlC(soft, hard)
inRemotes := make(chan *OD) inRemotes := make(chan *OD)
go Schedule(forceCtx, inRemotes) go Schedule(appCtx, inRemotes)
ticker := time.NewTicker(config.Recheck) ticker := time.NewTicker(config.Recheck)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-appCtx.Done(): case <-appCtx.Done():
return goto shutdown
case <-ticker.C: case <-ticker.C:
t, err := FetchTask() t, err := FetchTask()
if err != nil { if err != nil {
logrus.WithError(err). logrus.WithError(err).
Error("Failed to get new task") Error("Failed to get new task")
time.Sleep(viper.GetDuration(ConfCooldown)) if !sleep(viper.GetDuration(ConfCooldown), appCtx) {
goto shutdown
}
continue continue
} }
if t == nil { if t == nil {
@@ -109,13 +114,7 @@ func cmdBase(_ *cobra.Command, _ []string) {
if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme { if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme {
// Not an error // Not an error
err = nil err = nil
// TODO FTP crawler
// Give back task
//err2 := CancelTask(t.WebsiteId)
//if err2 != nil {
// logrus.Error(err2)
//}
continue continue
} else if err != nil { } else if err != nil {
logrus.WithError(err). logrus.WithError(err).
@@ -126,9 +125,15 @@ func cmdBase(_ *cobra.Command, _ []string) {
ScheduleTask(inRemotes, t, &baseUri) ScheduleTask(inRemotes, t, &baseUri)
} }
} }
shutdown:
globalWait.Wait()
} }
func cmdCrawler(_ *cobra.Command, args []string) error { func cmdCrawler(_ *cobra.Command, args []string) error {
onlineMode = false
readConfig()
arg := args[0] arg := args[0]
// https://github.com/golang/go/issues/19779 // https://github.com/golang/go/issues/19779
if !strings.Contains(arg, "://") { if !strings.Contains(arg, "://") {
@@ -161,3 +166,30 @@ func cmdCrawler(_ *cobra.Command, args []string) error {
return nil return nil
} }
func listenCtrlC(soft, hard context.CancelFunc) {
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
<-c
logrus.Info(">>> Shutting down crawler... <<<")
soft()
<-c
logrus.Warning(">>> Force shutdown! <<<")
hard()
}
func hardShutdown(c context.Context) {
<-c.Done()
os.Exit(1)
}
func sleep(d time.Duration, c context.Context) bool {
select {
case <-time.After(d):
return true
case <-c.Done():
return false
}
}

View File

@@ -7,13 +7,23 @@ import (
"time" "time"
) )
type ResultCode int
const (
TR_OK = ResultCode(iota)
TR_FAIL = 1
TR_SKIP = 2
)
type Task struct { type Task struct {
WebsiteId uint64 `json:"website_id"` WebsiteId uint64 `json:"website_id"`
Url string `json:"url"` Url string `json:"url"`
UploadToken string `json:"upload_token"`
TaskId int64
} }
type TaskResult struct { type TaskResult struct {
StatusCode string `json:"status_code"` ResultCode ResultCode `json:"status_code"`
FileCount uint64 `json:"file_count"` FileCount uint64 `json:"file_count"`
ErrorCount uint64 `json:"-"` ErrorCount uint64 `json:"-"`
StartTime time.Time `json:"-"` StartTime time.Time `json:"-"`
@@ -51,13 +61,16 @@ func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) {
defer o.Scanned.Unlock() defer o.Scanned.Unlock()
exists = o.Scanned.Get(k) exists = o.Scanned.Get(k)
if exists { return true } if exists {
return true
}
o.Scanned.Put(k) o.Scanned.Put(k)
return false return false
} }
type errorString string type errorString string
func (e errorString) Error() string { func (e errorString) Error() string {
return string(e) return string(e)
} }

View File

@@ -34,13 +34,16 @@ func Schedule(c context.Context, remotes <-chan *OD) {
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId)) queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue // Delete existing queue
if err := os.RemoveAll(queuePath); if err := os.RemoveAll(queuePath); err != nil {
err != nil { panic(err) } panic(err)
}
// Start new queue // Start new queue
var err error var err error
remote.WCtx.Queue, err = OpenQueue(queuePath) remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) } if err != nil {
panic(err)
}
// Spawn workers // Spawn workers
for i := 0; i < config.Workers; i++ { for i := 0; i < config.Workers; i++ {
@@ -145,7 +148,7 @@ func (o *OD) Watch(results chan File) {
} }
// Upload results // Upload results
err = PushResult(&o.Result, f) err = PushResult(&o.Task, f)
if err != nil { if err != nil {
logrus.WithError(err). logrus.WithError(err).
Error("Failed uploading crawl results") Error("Failed uploading crawl results")
@@ -178,16 +181,10 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
// Set status code // Set status code
now := time.Now() now := time.Now()
o.Result.EndTimeUnix = now.Unix() o.Result.EndTimeUnix = now.Unix()
fileCount := atomic.LoadUint64(&o.Result.FileCount) if atomic.LoadUint64(&o.Result.ErrorCount) != 0 {
if fileCount == 0 { o.Result.ResultCode = TR_FAIL
errorCount := atomic.LoadUint64(&o.Result.ErrorCount)
if errorCount == 0 {
o.Result.StatusCode = "empty"
} else { } else {
o.Result.StatusCode = "directory listing failed" o.Result.ResultCode = TR_OK
}
} else {
o.Result.StatusCode = "success"
} }
} }
@@ -205,11 +202,17 @@ func (t *Task) collect(results chan File, f *os.File) error {
result.Path = fasturl.PathUnescape(result.Path) result.Path = fasturl.PathUnescape(result.Path)
result.Name = fasturl.PathUnescape(result.Name) result.Name = fasturl.PathUnescape(result.Name)
resJson, err := json.Marshal(result) resJson, err := json.Marshal(result)
if err != nil { panic(err) } if err != nil {
panic(err)
}
_, err = f.Write(resJson) _, err = f.Write(resJson)
if err != nil { return err } if err != nil {
return err
}
_, err = f.Write([]byte{'\n'}) _, err = f.Write([]byte{'\n'})
if err != nil { return err } if err != nil {
return err
}
} }
return nil return nil

290
server.go
View File

@@ -2,48 +2,125 @@ package main
import ( import (
"bytes" "bytes"
"encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/fasthttp/websocket"
"github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/viper" "golang.org/x/time/rate"
"io" "io"
"mime/multipart" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"strconv" "strconv"
"time"
) )
var serverWorker *TrackerWorker
var serverClient = http.Client{ var serverClient = http.Client{
Timeout: config.ServerTimeout, Timeout: config.ServerTimeout,
Transport: new(ServerTripper),
}
var serverUserAgent = "od-database-crawler/" + rootCmd.Version
func getOrCreateWorker() {
if _, err := os.Stat("worker.json"); os.IsNotExist(err) {
req := CreateTrackerWorkerRequest{
Alias: config.TrackerAlias,
}
body, _ := json.Marshal(&req)
buf := bytes.NewBuffer(body)
resp, _ := serverClient.Post(config.TrackerUrl+"/worker/create", "application/json", buf)
workerResponse := CreateTrackerWorkerResponse{}
respBody, _ := ioutil.ReadAll(resp.Body)
_ = json.Unmarshal(respBody, &workerResponse)
workerJsonData, _ := json.Marshal(&workerResponse.Content.Worker)
fp, _ := os.OpenFile("worker.json", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
_, _ = fp.Write(workerJsonData)
//Request ASSIGN permission
serverWorker = &workerResponse.Content.Worker
accessReq, _ := json.Marshal(WorkerAccessRequest{
Project: config.TrackerProject,
Assign: true,
Submit: false,
})
buf = bytes.NewBuffer(accessReq)
res, err := serverClient.Post(config.TrackerUrl+"/project/request_access", "application/json", buf)
if err != nil {
panic(err)
}
logrus.WithFields(logrus.Fields{
"response": res.StatusCode,
}).Info("Requested ASSIGN permission")
} else {
var worker TrackerWorker
fp, _ := os.OpenFile("worker.json", os.O_RDONLY, 0600)
workerJsonData, _ := ioutil.ReadAll(fp)
_ = json.Unmarshal(workerJsonData, &worker)
serverWorker = &worker
}
} }
func FetchTask() (t *Task, err error) { func FetchTask() (t *Task, err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/get", if serverWorker == nil {
url.Values{ "token": {config.Token} }) getOrCreateWorker()
if err != nil { return } }
res, err := serverClient.Get(config.TrackerUrl + "/task/get/" + strconv.Itoa(config.TrackerProject))
if err != nil {
return
}
defer res.Body.Close() defer res.Body.Close()
switch res.StatusCode { switch res.StatusCode {
case 200: case 200:
break break
case 404, 500:
return nil, nil
default: default:
return nil, fmt.Errorf("http %s", res.Status) return nil, fmt.Errorf("http %s", res.Status)
} }
t = new(Task) jsonResponse := FetchTaskResponse{}
err = json.NewDecoder(res.Body).Decode(t) err = json.NewDecoder(res.Body).Decode(&jsonResponse)
if err != nil { return } if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil {
return
}
if !jsonResponse.Ok {
if jsonResponse.Message == "No task available" {
return nil, nil
}
return nil, errors.New(jsonResponse.Message)
}
task := Task{}
err = json.Unmarshal([]byte(jsonResponse.Content.Task.Recipe), &task)
if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil {
return
}
t = &task
t.TaskId = jsonResponse.Content.Task.Id
return return
} }
func PushResult(result *TaskResult, f *os.File) (err error) { func PushResult(task *Task, f *os.File) (err error) {
if result.WebsiteId == 0 { if task.WebsiteId == 0 {
// Not a real result, don't push // Not a real result, don't push
return nil return nil
} }
@@ -54,10 +131,10 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
return return
} }
err = uploadChunks(result.WebsiteId, f) err = uploadWebsocket(f, task.UploadToken)
if err != nil { if err != nil {
logrus.Errorf("Failed to upload file list: %s", err) logrus.Errorf("Failed to upload file list: %s", err)
err2 := CancelTask(result.WebsiteId) err2 := releaseTask(task, TR_FAIL)
if err2 != nil { if err2 != nil {
logrus.Error(err2) logrus.Error(err2)
} }
@@ -65,91 +142,62 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
} }
// Upload result ignoring errors // Upload result ignoring errors
uploadResult(result) _ = releaseTask(task, TR_OK)
return return
} }
func uploadChunks(websiteId uint64, f *os.File) error { func uploadWebsocket(f *os.File, token string) (err error) {
eof := false
for iter := 1; !eof; iter++ {
// TODO Stream with io.Pipe?
var b bytes.Buffer
multi := multipart.NewWriter(&b) u := url.URL{Scheme: config.WsBucketScheme, Host: config.WsBucketHost, Path: "/upload"}
// Set upload fields header := http.Header{}
var err error header.Add("X-Upload-Token", token)
err = multi.WriteField("token", config.Token) conn, _, err := websocket.DefaultDialer.Dial(u.String(), header)
if err != nil { return err } if err != nil {
err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId)) return
if err != nil { return err }
// Copy chunk to file_list
formFile, err := multi.CreateFormFile("file_list", "file_list")
var n int64
n, err = io.CopyN(formFile, f, config.ChunkSize)
if err != io.EOF && err != nil {
return err
}
if n == 0 {
// Don't upload, no content
return nil
} else if n < config.ChunkSize {
err = nil
// Break at end of iteration
eof = true
} }
multi.Close() conn.EnableWriteCompression(true) //TODO: Is this necessary?
for retries := 0; retries < viper.GetInt(ConfUploadRetries); retries++ { socketWriter, _ := conn.NextWriter(websocket.BinaryMessage)
if retries > 0 { _, _ = io.Copy(socketWriter, f)
// Error occurred, retry upload err = socketWriter.Close()
time.Sleep(viper.GetDuration(ConfUploadRetryInterval)) if err != nil {
logrus.Error("FIXME: couldn't do file upload")
return
}
err = conn.Close()
if err != nil {
return
} }
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/upload",
&b)
req.Header.Set("content-type", multi.FormDataContentType())
if err != nil { continue }
res, err := serverClient.Do(req)
if err != nil { continue }
res.Body.Close()
if res.StatusCode != http.StatusOK {
logrus.WithField("status", res.Status).
WithField("part", iter).
Errorf("Upload failed")
continue
}
// Upload successful
break
}
logrus.WithField("id", websiteId).
WithField("part", iter).
Infof("Uploaded files chunk")
}
return nil return nil
} }
func uploadResult(result *TaskResult) (err error) { func releaseTask(task *Task, taskResult ResultCode) (err error) {
resultEnc, err := json.Marshal(result)
if err != nil { panic(err) }
res, err := serverClient.PostForm( req := releaseTaskRequest{
config.ServerUrl + "/task/complete", TaskId: task.TaskId,
url.Values { ResultCode: taskResult,
"token": {config.Token}, // TODO Will implement verification in a later ODDB update
"result": {string(resultEnc)}, Verification: 0,
}, }
resultEnc, err := json.Marshal(&req)
if err != nil {
panic(err)
}
body := bytes.NewBuffer(resultEnc)
res, err := serverClient.Post(
config.TrackerUrl+"/task/release",
"application/json",
body,
) )
if err != nil { return } if err != nil {
return
}
res.Body.Close() res.Body.Close()
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
@@ -159,20 +207,66 @@ func uploadResult(result *TaskResult) (err error) {
return return
} }
func CancelTask(websiteId uint64) (err error) { type ServerTripper struct{}
res, err := serverClient.PostForm(
config.ServerUrl + "/task/cancel",
url.Values{
"token": {config.Token},
"website_id": {strconv.FormatUint(websiteId, 10)},
},
)
if err != nil { return }
res.Body.Close()
if res.StatusCode != http.StatusOK { func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) {
return fmt.Errorf("failed to cancel task: %s", res.Status) req.Header.Set("User-Agent", serverUserAgent)
//TODO: Use task_tracker/client ?
if serverWorker != nil {
req.Header.Add("X-Worker-Id", strconv.Itoa(serverWorker.Id))
req.Header.Add("X-Secret", base64.StdEncoding.EncodeToString(serverWorker.Secret))
}
return http.DefaultTransport.RoundTrip(req)
} }
return // https://github.com/simon987/task_tracker/blob/master/api/models.go
type releaseTaskRequest struct {
TaskId int64 `json:"task_id"`
ResultCode ResultCode `json:"result"`
Verification int64 `json:"verification"`
}
type WorkerAccessRequest struct {
Assign bool `json:"assign"`
Submit bool `json:"submit"`
Project int `json:"project"`
}
type FetchTaskResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Task struct {
Id int64 `json:"id"`
Priority int64 `json:"priority"`
Project struct {
Id int64 `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
AssignRate rate.Limit `json:"assign_rate"`
SubmitRate rate.Limit `json:"submit_rate"`
} `json:"project"`
Recipe string `json:"recipe"`
} `json:"task"`
} `json:"content"`
}
type TrackerWorker struct {
Alias string `json:"alias"`
Id int `json:"id"`
Secret []byte `json:"secret"`
}
type CreateTrackerWorkerResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Worker TrackerWorker `json:"worker"`
} `json:"content"`
}
type CreateTrackerWorkerRequest struct {
Alias string `json:"alias"`
} }

View File

@@ -56,16 +56,16 @@ func (w *WorkerContext) step(results chan<- File, job Job) {
if !shouldRetry(err) { if !shouldRetry(err) {
atomic.AddUint64(&totalAborted, 1) atomic.AddUint64(&totalAborted, 1)
logrus.WithField("url", job.UriStr). //logrus.WithField("url", job.UriStr).
WithError(err). // WithError(err).
Error("Giving up after failure") // Error("Giving up after failure")
return return
} }
if job.Fails > config.Retries { if job.Fails > config.Retries {
atomic.AddUint64(&totalAborted, 1) atomic.AddUint64(&totalAborted, 1)
logrus.WithField("url", job.UriStr). //logrus.WithField("url", job.UriStr).
Errorf("Giving up after %d fails", job.Fails) // Errorf("Giving up after %d fails", job.Fails)
} else { } else {
atomic.AddUint64(&totalRetries, 1) atomic.AddUint64(&totalRetries, 1)
if err == ErrRateLimit { if err == ErrRateLimit {
@@ -88,7 +88,9 @@ func (w *WorkerContext) step(results chan<- File, job Job) {
} }
func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) { func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
if len(job.Uri.Path) == 0 { return } if len(job.Uri.Path) == 0 {
return
}
if job.Uri.Path[len(job.Uri.Path)-1] == '/' { if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
// Load directory // Load directory
links, err := GetDir(job, f) links, err := GetDir(job, f)