27 Commits

Author SHA1 Message Date
simon987
a962c60b82 Don't panic on file upload error 2019-04-06 14:56:22 -04:00
simon987
24f0bd91f7 Remove debug messages & don't use disk queue by default 2019-04-06 12:21:35 -04:00
simon987
84c10e1981 Change default config 2019-04-05 17:53:00 -04:00
simon987
860fa79327 Jenkins setup 2019-04-04 19:22:14 -04:00
simon987
76bc8293d6 minimum viable 2019-03-30 09:02:55 -04:00
simon987
3470be6086 More work on task_tracker integration 2019-03-09 16:38:04 -05:00
Richard Patel
60471a081e Switch to simon987/task_tracker 2019-02-28 23:51:26 +01:00
dependabot[bot]
0b3f0d87fe Upgrade fasthttp to 1.2.0
Bumps [github.com/valyala/fasthttp](https://github.com/valyala/fasthttp) from 1.1.0 to 1.2.0.
- [Release notes](https://github.com/valyala/fasthttp/releases)
- [Commits](https://github.com/valyala/fasthttp/compare/v1.1.0...v1.2.0)

Thanks bot

Signed-off-by: dependabot[bot] <support@dependabot.com>
2019-02-28 22:42:40 +01:00
terorie
da9c75e392 Reduce Docker image size 2019-02-22 21:37:04 +01:00
Pascal
8947e05d0c Fix Dockerfile
Fixes #22
Credit to @pascaldulieu
2019-02-22 21:11:55 +01:00
Richard Patel
8c5f99d616 More descriptive error if /task/get returns invalid JSON 2019-02-22 20:17:59 +01:00
Richard Patel
206ea0e91d Simplify config 2019-02-22 18:50:35 +01:00
Richard Patel
8b9d8bfd17 Fix README.md format 2019-02-22 06:04:10 +01:00
Richard Patel
c9ff102d80 Fix Dockerfile 2019-02-22 06:00:57 +01:00
Richard Patel
88856c1c19 Flag explanation in README.md 2019-02-22 05:59:59 +01:00
Richard Patel
9e9b606250 Merge branch 'stable' 2019-02-22 05:37:52 +01:00
Richard Patel
326e29e5e4 Reset to stable branch 2019-02-22 05:37:45 +01:00
Richard Patel
c2acd5463f Restore .travis.yml
Now handling auto-build over Docker Hub directly
2019-02-22 05:16:25 +01:00
Richard Patel
e4d04e6a5f go.mod: Fix package path
lol
2019-02-22 05:10:43 +01:00
terorie
9f1402e841 New Dockerfile and Travis Config (#23) 2019-02-22 05:07:27 +01:00
terorie
7c8ab50ee4 Merge stable into master 2019-02-13 15:32:40 +01:00
terorie
281d2d17d6 Update config.yml 2019-02-13 15:32:00 +01:00
Richard Patel
45cbd4d535 Disable resume feature 2019-02-05 15:44:59 +01:00
Richard Patel
771d49f2dd Fix WaitGroup deadlock 2019-02-03 17:14:20 +01:00
Richard Patel
dbd787aa81 Fix WaitGroup crash 2019-02-03 17:09:43 +01:00
Richard Patel
cea6c1658b Bugfix: Don't schedule new tasks during shutdown 2019-02-03 17:02:44 +01:00
terorie
885af5bb3b Beta task resuming 2019-02-03 16:50:08 +01:00
13 changed files with 541 additions and 367 deletions

15
Dockerfile Normal file
View File

@@ -0,0 +1,15 @@
FROM golang:alpine as builder
ADD . /go/src/github.com/terorie/od-database-crawler
RUN apk add git \
&& go get -d -v github.com/terorie/od-database-crawler \
&& CGO_ENABLED=0 go install -a \
-installsuffix cgo \
-ldflags="-s -w" \
github.com/terorie/od-database-crawler
FROM scratch
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /go/bin/od-database-crawler /bin/
WORKDIR /oddb
VOLUME [ "/oddb" ]
CMD ["/bin/od-database-crawler", "server"]

View File

@@ -9,7 +9,9 @@
https://od-db.the-eye.eu/ https://od-db.the-eye.eu/
#### Usage ## Usage
### Deploys
1. With Config File (if `config.yml` found in working dir) 1. With Config File (if `config.yml` found in working dir)
- Download [default config](https://github.com/terorie/od-database-crawler/blob/master/config.yml) - Download [default config](https://github.com/terorie/od-database-crawler/blob/master/config.yml)
@@ -22,3 +24,31 @@ https://od-db.the-eye.eu/
- Every flag is available as an environment variable: - Every flag is available as an environment variable:
`--server.crawl_stats` ➡️ `OD_SERVER_CRAWL_STATS` `--server.crawl_stats` ➡️ `OD_SERVER_CRAWL_STATS`
- Start with `./od-database-crawler server <flags>` - Start with `./od-database-crawler server <flags>`
3. With Docker
```bash
docker run \
-e OD_SERVER_URL=xxx \
-e OD_SERVER_TOKEN=xxx \
terorie/od-database-crawler
```
### Flag reference
Here are the most important config flags. For more fine control, take a look at `/config.yml`.
| Flag/Environment | Description | Example |
| ------------------------------------------------------- | ------------------------------------------------------------ | ----------------------------------- |
| `server.url`<br />`OD_SERVER_URL` | OD-DB Server URL | `https://od-db.mine.the-eye.eu/api` |
| `server.token`<br />`OD_SERVER_TOKEN` | OD-DB Server Access Token | _Ask Hexa **TM**_ |
| `server.recheck`<br />`OD_SERVER_RECHECK` | Job Fetching Interval | `3s` |
| `output.crawl_stats`<br />`OD_OUTPUT_CRAWL_STATS` | Crawl Stats Logging Interval (0 = disabled) | `500ms` |
| `output.resource_stats`<br />`OD_OUTPUT_RESORUCE_STATS` | Resource Stats Logging Interval (0 = disabled) | `8s` |
| `output.log`<br />`OD_OUTPUT_LOG` | Log File (none = disabled) | `crawler.log` |
| `crawl.tasks`<br />`OD_CRAWL_TASKS` | Max number of sites to crawl concurrently | `500` |
| `crawl.connections`<br />`OD_CRAWL_CONNECTIONS` | HTTP connections per site | `1` |
| `crawl.retries`<br />`OD_CRAWL_RETRIES` | How often to retry after a temporary failure (e.g. `HTTP 429` or timeouts) | `5` |
| `crawl.dial_timeout`<br />`OD_CRAWL_DIAL_TIMEOUT` | TCP Connect timeout | `5s` |
| `crawl.timeout`<br />`OD_CRAWL_TIMEOUT` | HTTP request timeout | `20s` |
| `crawl.user-agent`<br />`OD_CRAWL_USER_AGENT` | HTTP Crawler User-Agent | `googlebot/1.2.3` |
| `crawl.job_buffer`<br />`OD_CRAWL_JOB_BUFFER` | Number of URLs to keep in memory/cache, per job. The rest is offloaded to disk. Decrease this value if the crawler uses too much RAM. (0 = Disable Cache, -1 = Only use Cache) | `5000` |

209
config.go
View File

@@ -4,6 +4,7 @@ import (
"bufio" "bufio"
"fmt" "fmt"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/spf13/viper" "github.com/spf13/viper"
"io" "io"
"os" "os"
@@ -12,36 +13,44 @@ import (
) )
var config struct { var config struct {
ServerUrl string TrackerUrl string
Token string TrackerProject int
ServerTimeout time.Duration TrackerAlias string
Recheck time.Duration WsBucketScheme string
ChunkSize int64 WsBucketHost string
Retries int ServerTimeout time.Duration
Workers int Recheck time.Duration
UserAgent string ChunkSize int64
Tasks int32 Retries int
Verbose bool Workers int
PrintHTTP bool UserAgent string
JobBufferSize int Tasks int32
Verbose bool
PrintHTTP bool
JobBufferSize int
} }
var onlineMode bool
const ( const (
ConfServerUrl = "server.url" ConfTrackerUrl = "server.url"
ConfToken = "server.token" ConfTrackerProject = "server.project"
ConfServerTimeout = "server.timeout" ConfTrackerAlias = "server.alias"
ConfRecheck = "server.recheck" ConfWsBucketScheme = "server.ws_bucket_scheme"
ConfCooldown = "server.cooldown" ConfWsBucketHost = "server.ws_bucket_host"
ConfChunkSize = "server.upload_chunk" ConfServerTimeout = "server.timeout"
ConfUploadRetries = "server.upload_retries" ConfRecheck = "server.recheck"
ConfCooldown = "server.cooldown"
ConfChunkSize = "server.upload_chunk"
ConfUploadRetries = "server.upload_retries"
ConfUploadRetryInterval = "server.upload_retry_interval" ConfUploadRetryInterval = "server.upload_retry_interval"
ConfTasks = "crawl.tasks" ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries" ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections" ConfWorkers = "crawl.connections"
ConfUserAgent = "crawl.user-agent" ConfUserAgent = "crawl.user-agent"
ConfDialTimeout = "crawl.dial_timeout" ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout" ConfTimeout = "crawl.timeout"
ConfJobBufferSize = "crawl.job_buffer" ConfJobBufferSize = "crawl.job_buffer"
ConfCrawlStats = "output.crawl_stats" ConfCrawlStats = "output.crawl_stats"
@@ -54,8 +63,62 @@ const (
func prepareConfig() { func prepareConfig() {
pf := rootCmd.PersistentFlags() pf := rootCmd.PersistentFlags()
bind := func(s string) { pf.SortFlags = false
if err := viper.BindPFlag(s, pf.Lookup(s)); err != nil { pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG")
pf.String(ConfTrackerUrl, "https://tt.the-eye.eu/api", "task_tracker api URL")
pf.String(ConfTrackerProject, "1", "task_tracker project id")
pf.String(ConfWsBucketScheme, "wss", "ws_bucket scheme")
pf.String(ConfWsBucketHost, "wsb.the-eye.eu", "ws_bucket host")
pf.String(ConfTrackerAlias, "changeme", "task_tracker worker alias")
pf.Duration(ConfServerTimeout, 60*time.Second, "OD-DB request timeout")
pf.Duration(ConfRecheck, 1*time.Second, "OD-DB: Poll interval for new jobs")
pf.Duration(ConfCooldown, 1*time.Minute, "OD-DB: Time to wait after a server-side error")
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
pf.Duration(ConfUploadRetryInterval, 30*time.Second, "OD-DB: Time to wait between upload retries")
pf.Uint(ConfTasks, 25, "Crawler: Max concurrent tasks")
pf.Uint(ConfWorkers, 1, "Crawler: Connections per server")
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
pf.Duration(ConfDialTimeout, 10*time.Second, "Crawler: Handshake timeout")
pf.Duration(ConfTimeout, 30*time.Second, "Crawler: Request timeout")
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
pf.Int(ConfJobBufferSize, -1, "Crawler: Task queue cache size")
pf.Duration(ConfCrawlStats, 500*time.Second, "Log: Crawl stats interval")
pf.Duration(ConfAllocStats, 500*time.Second, "Log: Resource stats interval")
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors")
pf.String(ConfLogFile, "crawler.log", "Log file")
// Bind all flags to Viper
pf.VisitAll(func(flag *pflag.Flag) {
s := flag.Name
s = strings.TrimLeft(s, "-")
if err := viper.BindPFlag(s, flag); err != nil {
panic(err) panic(err)
} }
var envKey string var envKey string
@@ -65,71 +128,7 @@ func prepareConfig() {
if err := viper.BindEnv(s, envKey); err != nil { if err := viper.BindEnv(s, envKey); err != nil {
panic(err) panic(err)
} }
} })
pf.SortFlags = false
pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG")
pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL")
bind(ConfServerUrl)
pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)")
bind(ConfToken)
pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout")
bind(ConfServerTimeout)
pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs")
bind(ConfRecheck)
pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error")
bind(ConfCooldown)
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
bind(ConfChunkSize)
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
bind(ConfUploadRetries)
pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries")
bind(ConfUploadRetryInterval)
pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks")
bind(ConfTasks)
pf.Uint(ConfWorkers, 4, "Crawler: Connections per server")
bind(ConfWorkers)
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
bind(ConfRetries)
pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout")
bind(ConfDialTimeout)
pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout")
bind(ConfTimeout)
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
bind(ConfUserAgent)
pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size")
bind(ConfJobBufferSize)
pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval")
bind(ConfCrawlStats)
pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval")
bind(ConfAllocStats)
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
bind(ConfVerbose)
pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors")
bind(ConfPrintHTTP)
pf.String(ConfLogFile, "crawler.log", "Log file")
bind(ConfLogFile)
} }
func readConfig() { func readConfig() {
@@ -157,16 +156,20 @@ func readConfig() {
} }
} }
config.ServerUrl = viper.GetString(ConfServerUrl) if onlineMode {
if config.ServerUrl == "" { config.TrackerUrl = viper.GetString(ConfTrackerUrl)
configMissing(ConfServerUrl) if config.TrackerUrl == "" {
configMissing(ConfTrackerUrl)
}
config.TrackerUrl = strings.TrimRight(config.TrackerUrl, "/")
} }
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/") config.TrackerProject = viper.GetInt(ConfTrackerProject)
config.Token = viper.GetString(ConfToken) config.TrackerAlias = viper.GetString(ConfTrackerAlias)
if config.Token == "" {
configMissing(ConfToken) config.WsBucketHost = viper.GetString(ConfWsBucketHost)
}
config.WsBucketScheme = viper.GetString(ConfWsBucketScheme)
config.ServerTimeout = viper.GetDuration(ConfServerTimeout) config.ServerTimeout = viper.GetDuration(ConfServerTimeout)
@@ -206,9 +209,11 @@ func readConfig() {
} }
if filePath := viper.GetString(ConfLogFile); filePath != "" { if filePath := viper.GetString(ConfLogFile); filePath != "" {
f, err := os.OpenFile(filePath, os.O_CREATE | os.O_WRONLY | os.O_APPEND, 0644) f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
bufWriter := bufio.NewWriter(f) bufWriter := bufio.NewWriter(f)
if err != nil { panic(err) } if err != nil {
panic(err)
}
exitHooks.Add(func() { exitHooks.Add(func() {
bufWriter.Flush() bufWriter.Flush()
f.Close() f.Close()

View File

@@ -1,10 +1,14 @@
# OD-Database server settings # OD-Database server settings
server: server:
# Connection URL # Connection URL
url: http://od-db.mine.terorie.com/api url: https://tt.the-eye.eu/api
# OD-Database project id (for crawling)
# Server auth token project: 1
token: # Your worker alias
alias: changeme
# Websocket bucket host & scheme (ws/wss)
ws_bucket_host: https://wsb.the-eye.eu
ws_bucket_scheme: wss
# Request timeout # Request timeout
timeout: 60s timeout: 60s
@@ -17,11 +21,7 @@ server:
# Time to wait after receiving an error # Time to wait after receiving an error
# from the server. Doesn't apply to uploads. # from the server. Doesn't apply to uploads.
cooldown: 30s cooldown: 1s
# Upload chunk size
# If the value is too high, the upload fails.
upload_chunk: 1 MB
upload_retries: 10 upload_retries: 10
upload_retry_interval: 30s upload_retry_interval: 30s
@@ -29,10 +29,10 @@ server:
# Log output settings # Log output settings
output: output:
# Crawl statistics # Crawl statistics
crawl_stats: 1s crawl_stats: 1m
# CPU/RAM/Job queue stats # CPU/RAM/Job queue stats
resource_stats: 10s resource_stats: 1m
# More output? (Every listed dir) # More output? (Every listed dir)
verbose: false verbose: false
@@ -47,13 +47,13 @@ output:
# Crawler settings # Crawler settings
crawl: crawl:
# Number of sites that can be processed at once # Number of sites that can be processed at once
tasks: 100 tasks: 25
# Number of connections per site # Number of connections per site
# Please be careful with this setting! # Please be careful with this setting!
# The crawler fires fast and more than # The crawler fires fast and more than
# ten connections can overwhelm a server. # ten connections can overwhelm a server.
connections: 4 connections: 1
# How often to retry getting data # How often to retry getting data
# from the site before giving up # from the site before giving up
@@ -81,4 +81,4 @@ crawl:
# in memory. # in memory.
# A negative value will cause all jobs # A negative value will cause all jobs
# to be stored in memory. (Don't do this) # to be stored in memory. (Don't do this)
job_buffer: 5000 job_buffer: -1

14
go.mod
View File

@@ -1,14 +0,0 @@
module github.com/syndtr/od-database-crawler
require (
github.com/beeker1121/goque v2.0.1+incompatible
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/sirupsen/logrus v1.3.0
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.1
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 // indirect
github.com/terorie/od-database-crawler v1.1.1
github.com/valyala/fasthttp v1.1.0
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3
)

66
go.sum
View File

@@ -1,66 +0,0 @@
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/beeker1121/goque v2.0.1+incompatible h1:5nJHPMqQLxUvGFc8m/NW2QzxKyc0zICmqs/JUsmEjwE=
github.com/beeker1121/goque v2.0.1+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/klauspost/compress v1.4.0 h1:8nsMz3tWa9SWWPL60G1V6CUsf4lLjWLTNEtibhe8gh8=
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e h1:+lIPJOWl+jSiJOc70QXJ07+2eg2Jy2EC7Mi11BWujeM=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.1 h1:5+8j8FTpnFV4nEImW/ofkzEt8VoOiLXxdYIDsB73T38=
github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/terorie/od-database-crawler v1.1.1 h1:Ca+ZqbZX3rVWBR8SDRzvroyxjBtUs75MQXZ9YG0gqGo=
github.com/terorie/od-database-crawler v1.1.1/go.mod h1:vVJ7pLkudrlUNp9qu24JCzQ8N6mFsrOmX1tPXr155DQ=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.1.0 h1:3BohG7mqwj4lq7PTX//7gLbUlzNvZSPmuHFnloXT0lw=
github.com/valyala/fasthttp v1.1.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 h1:MQ/ZZiDsUapFFiMS+vzwXkCTeEKaum+Do5rINYJDmxc=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 h1:czFLhve3vsQetD6JOJ8NZZvGQIXlnN3/yXxbT6/awxI=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

47
jenkins/Jenkinsfile vendored Normal file
View File

@@ -0,0 +1,47 @@
def remote = [:]
remote.name = 'remote'
remote.host = env.DEPLOY_HOST
remote.user = env.DEPLOY_USER
remote.identityFile = '/var/lib/jenkins/.ssh/id_rsa'
remote.knownHosts = '/var/lib/jenkins/.ssh/known_hosts'
remote.allowAnyHosts = true
remote.retryCount = 3
remote.retryWaitSec = 3
logLevel = 'FINER'
pipeline {
agent none
environment {
GOOS='linux'
CGO_ENABLED='0'
HOME='.'
}
stages {
stage('Build') {
agent {
docker {
image 'golang:latest'
}
}
steps {
sh 'mkdir -p /go/src/github.com/terorie/od-database-crawler'
sh 'cp -r *.go fasturl ds jenkins/build.sh "/go/src/github.com/terorie/od-database-crawler"'
sh 'cd /go/src/github.com/terorie/od-database-crawler && go get ./...'
sh './jenkins/build.sh'
stash includes: 'dist/', name: 'dist'
}
}
stage('Deploy') {
agent none
steps {
node('master') {
unstash 'dist'
sshCommand remote: remote, command: "ls od-database-crawler/"
sshPut remote: remote, from: 'dist/', into: 'od-database-crawler'
}
}
}
}
}

23
jenkins/build.sh Executable file
View File

@@ -0,0 +1,23 @@
#!/usr/bin/env bash
appname="od-database-crawler"
outdir="dist/"
tag="${BUILD_ID}_$(date +%Y-%m-%d)"
rm -rf "./${outdir}"
mkdir build 2> /dev/null
name=${outdir}${appname}-${tag}-linux
GOOS="linux" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}
name=${outdir}${appname}-${tag}-mac
GOOS="darwin" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}
name=${outdir}${appname}-${tag}-freebsd
GOOS="freebsd" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}

64
main.go
View File

@@ -8,6 +8,7 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/terorie/od-database-crawler/fasturl" "github.com/terorie/od-database-crawler/fasturl"
"os" "os"
"os/signal"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -17,7 +18,7 @@ var configFile string
var rootCmd = cobra.Command { var rootCmd = cobra.Command {
Use: "od-database-crawler", Use: "od-database-crawler",
Version: "1.2.1", Version: "1.2.2",
Short: "OD-Database Go crawler", Short: "OD-Database Go crawler",
Long: helpText, Long: helpText,
PersistentPreRunE: preRun, PersistentPreRunE: preRun,
@@ -61,8 +62,6 @@ func preRun(cmd *cobra.Command, args []string) error {
if err := os.MkdirAll("queue", 0755); if err := os.MkdirAll("queue", 0755);
err != nil { panic(err) } err != nil { panic(err) }
readConfig()
return nil return nil
} }
@@ -75,25 +74,31 @@ func main() {
} }
func cmdBase(_ *cobra.Command, _ []string) { func cmdBase(_ *cobra.Command, _ []string) {
// TODO Graceful shutdown onlineMode = true
appCtx := context.Background() readConfig()
forceCtx := context.Background()
appCtx, soft := context.WithCancel(context.Background())
forceCtx, hard := context.WithCancel(context.Background())
go hardShutdown(forceCtx)
go listenCtrlC(soft, hard)
inRemotes := make(chan *OD) inRemotes := make(chan *OD)
go Schedule(forceCtx, inRemotes) go Schedule(appCtx, inRemotes)
ticker := time.NewTicker(config.Recheck) ticker := time.NewTicker(config.Recheck)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-appCtx.Done(): case <-appCtx.Done():
return goto shutdown
case <-ticker.C: case <-ticker.C:
t, err := FetchTask() t, err := FetchTask()
if err != nil { if err != nil {
logrus.WithError(err). logrus.WithError(err).
Error("Failed to get new task") Error("Failed to get new task")
time.Sleep(viper.GetDuration(ConfCooldown)) if !sleep(viper.GetDuration(ConfCooldown), appCtx) {
goto shutdown
}
continue continue
} }
if t == nil { if t == nil {
@@ -109,13 +114,7 @@ func cmdBase(_ *cobra.Command, _ []string) {
if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme { if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme {
// Not an error // Not an error
err = nil err = nil
// TODO FTP crawler
// Give back task
//err2 := CancelTask(t.WebsiteId)
//if err2 != nil {
// logrus.Error(err2)
//}
continue continue
} else if err != nil { } else if err != nil {
logrus.WithError(err). logrus.WithError(err).
@@ -126,9 +125,15 @@ func cmdBase(_ *cobra.Command, _ []string) {
ScheduleTask(inRemotes, t, &baseUri) ScheduleTask(inRemotes, t, &baseUri)
} }
} }
shutdown:
globalWait.Wait()
} }
func cmdCrawler(_ *cobra.Command, args []string) error { func cmdCrawler(_ *cobra.Command, args []string) error {
onlineMode = false
readConfig()
arg := args[0] arg := args[0]
// https://github.com/golang/go/issues/19779 // https://github.com/golang/go/issues/19779
if !strings.Contains(arg, "://") { if !strings.Contains(arg, "://") {
@@ -161,3 +166,30 @@ func cmdCrawler(_ *cobra.Command, args []string) error {
return nil return nil
} }
func listenCtrlC(soft, hard context.CancelFunc) {
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
<-c
logrus.Info(">>> Shutting down crawler... <<<")
soft()
<-c
logrus.Warning(">>> Force shutdown! <<<")
hard()
}
func hardShutdown(c context.Context) {
<-c.Done()
os.Exit(1)
}
func sleep(d time.Duration, c context.Context) bool {
select {
case <-time.After(d):
return true
case <-c.Done():
return false
}
}

View File

@@ -7,19 +7,29 @@ import (
"time" "time"
) )
type ResultCode int
const (
TR_OK = ResultCode(iota)
TR_FAIL = 1
TR_SKIP = 2
)
type Task struct { type Task struct {
WebsiteId uint64 `json:"website_id"` WebsiteId uint64 `json:"website_id"`
Url string `json:"url"` Url string `json:"url"`
UploadToken string `json:"upload_token"`
TaskId int64
} }
type TaskResult struct { type TaskResult struct {
StatusCode string `json:"status_code"` ResultCode ResultCode `json:"status_code"`
FileCount uint64 `json:"file_count"` FileCount uint64 `json:"file_count"`
ErrorCount uint64 `json:"-"` ErrorCount uint64 `json:"-"`
StartTime time.Time `json:"-"` StartTime time.Time `json:"-"`
StartTimeUnix int64 `json:"start_time"` StartTimeUnix int64 `json:"start_time"`
EndTimeUnix int64 `json:"end_time"` EndTimeUnix int64 `json:"end_time"`
WebsiteId uint64 `json:"website_id"` WebsiteId uint64 `json:"website_id"`
} }
type Job struct { type Job struct {
@@ -51,13 +61,16 @@ func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) {
defer o.Scanned.Unlock() defer o.Scanned.Unlock()
exists = o.Scanned.Get(k) exists = o.Scanned.Get(k)
if exists { return true } if exists {
return true
}
o.Scanned.Put(k) o.Scanned.Put(k)
return false return false
} }
type errorString string type errorString string
func (e errorString) Error() string { func (e errorString) Error() string {
return string(e) return string(e)
} }

View File

@@ -34,13 +34,16 @@ func Schedule(c context.Context, remotes <-chan *OD) {
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId)) queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue // Delete existing queue
if err := os.RemoveAll(queuePath); if err := os.RemoveAll(queuePath); err != nil {
err != nil { panic(err) } panic(err)
}
// Start new queue // Start new queue
var err error var err error
remote.WCtx.Queue, err = OpenQueue(queuePath) remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) } if err != nil {
panic(err)
}
// Spawn workers // Spawn workers
for i := 0; i < config.Workers; i++ { for i := 0; i < config.Workers; i++ {
@@ -77,12 +80,12 @@ func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
globalWait.Add(1) globalWait.Add(1)
now := time.Now() now := time.Now()
od := &OD { od := &OD{
Task: *t, Task: *t,
BaseUri: *u, BaseUri: *u,
Result: TaskResult { Result: TaskResult{
WebsiteId: t.WebsiteId, WebsiteId: t.WebsiteId,
StartTime: now, StartTime: now,
StartTimeUnix: now.Unix(), StartTimeUnix: now.Unix(),
}, },
} }
@@ -117,7 +120,7 @@ func (o *OD) Watch(results chan File) {
// Open crawl results file // Open crawl results file
f, err := os.OpenFile( f, err := os.OpenFile(
filePath, filePath,
os.O_CREATE | os.O_RDWR | os.O_TRUNC, os.O_CREATE|os.O_RDWR|os.O_TRUNC,
0644, 0644,
) )
if err != nil { if err != nil {
@@ -145,7 +148,7 @@ func (o *OD) Watch(results chan File) {
} }
// Upload results // Upload results
err = PushResult(&o.Result, f) err = PushResult(&o.Task, f)
if err != nil { if err != nil {
logrus.WithError(err). logrus.WithError(err).
Error("Failed uploading crawl results") Error("Failed uploading crawl results")
@@ -170,24 +173,18 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
// Log finish // Log finish
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId, "id": o.Task.WebsiteId,
"url": o.BaseUri.String(), "url": o.BaseUri.String(),
"duration": time.Since(o.Result.StartTime), "duration": time.Since(o.Result.StartTime),
}).Info("Crawler finished") }).Info("Crawler finished")
// Set status code // Set status code
now := time.Now() now := time.Now()
o.Result.EndTimeUnix = now.Unix() o.Result.EndTimeUnix = now.Unix()
fileCount := atomic.LoadUint64(&o.Result.FileCount) if atomic.LoadUint64(&o.Result.ErrorCount) != 0 {
if fileCount == 0 { o.Result.ResultCode = TR_FAIL
errorCount := atomic.LoadUint64(&o.Result.ErrorCount)
if errorCount == 0 {
o.Result.StatusCode = "empty"
} else {
o.Result.StatusCode = "directory listing failed"
}
} else { } else {
o.Result.StatusCode = "success" o.Result.ResultCode = TR_OK
} }
} }
@@ -205,11 +202,17 @@ func (t *Task) collect(results chan File, f *os.File) error {
result.Path = fasturl.PathUnescape(result.Path) result.Path = fasturl.PathUnescape(result.Path)
result.Name = fasturl.PathUnescape(result.Name) result.Name = fasturl.PathUnescape(result.Name)
resJson, err := json.Marshal(result) resJson, err := json.Marshal(result)
if err != nil { panic(err) } if err != nil {
panic(err)
}
_, err = f.Write(resJson) _, err = f.Write(resJson)
if err != nil { return err } if err != nil {
return err
}
_, err = f.Write([]byte{'\n'}) _, err = f.Write([]byte{'\n'})
if err != nil { return err } if err != nil {
return err
}
} }
return nil return nil

302
server.go
View File

@@ -2,51 +2,125 @@ package main
import ( import (
"bytes" "bytes"
"encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/fasthttp/websocket"
"github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/viper" "golang.org/x/time/rate"
"io" "io"
"mime/multipart" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"strconv" "strconv"
"time"
) )
var serverClient = http.Client { var serverWorker *TrackerWorker
Timeout: config.ServerTimeout,
var serverClient = http.Client{
Timeout: config.ServerTimeout,
Transport: new(ServerTripper), Transport: new(ServerTripper),
} }
var serverUserAgent = "od-database-crawler/" + rootCmd.Version var serverUserAgent = "od-database-crawler/" + rootCmd.Version
func getOrCreateWorker() {
if _, err := os.Stat("worker.json"); os.IsNotExist(err) {
req := CreateTrackerWorkerRequest{
Alias: config.TrackerAlias,
}
body, _ := json.Marshal(&req)
buf := bytes.NewBuffer(body)
resp, _ := serverClient.Post(config.TrackerUrl+"/worker/create", "application/json", buf)
workerResponse := CreateTrackerWorkerResponse{}
respBody, _ := ioutil.ReadAll(resp.Body)
_ = json.Unmarshal(respBody, &workerResponse)
workerJsonData, _ := json.Marshal(&workerResponse.Content.Worker)
fp, _ := os.OpenFile("worker.json", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
_, _ = fp.Write(workerJsonData)
//Request ASSIGN permission
serverWorker = &workerResponse.Content.Worker
accessReq, _ := json.Marshal(WorkerAccessRequest{
Project: config.TrackerProject,
Assign: true,
Submit: false,
})
buf = bytes.NewBuffer(accessReq)
res, err := serverClient.Post(config.TrackerUrl+"/project/request_access", "application/json", buf)
if err != nil {
panic(err)
}
logrus.WithFields(logrus.Fields{
"response": res.StatusCode,
}).Info("Requested ASSIGN permission")
} else {
var worker TrackerWorker
fp, _ := os.OpenFile("worker.json", os.O_RDONLY, 0600)
workerJsonData, _ := ioutil.ReadAll(fp)
_ = json.Unmarshal(workerJsonData, &worker)
serverWorker = &worker
}
}
func FetchTask() (t *Task, err error) { func FetchTask() (t *Task, err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/get", if serverWorker == nil {
url.Values{ "token": {config.Token} }) getOrCreateWorker()
if err != nil { return } }
res, err := serverClient.Get(config.TrackerUrl + "/task/get/" + strconv.Itoa(config.TrackerProject))
if err != nil {
return
}
defer res.Body.Close() defer res.Body.Close()
switch res.StatusCode { switch res.StatusCode {
case 200: case 200:
break break
case 404, 500:
return nil, nil
default: default:
return nil, fmt.Errorf("http %s", res.Status) return nil, fmt.Errorf("http %s", res.Status)
} }
t = new(Task) jsonResponse := FetchTaskResponse{}
err = json.NewDecoder(res.Body).Decode(t) err = json.NewDecoder(res.Body).Decode(&jsonResponse)
if err != nil { return } if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil {
return
}
if !jsonResponse.Ok {
if jsonResponse.Message == "No task available" {
return nil, nil
}
return nil, errors.New(jsonResponse.Message)
}
task := Task{}
err = json.Unmarshal([]byte(jsonResponse.Content.Task.Recipe), &task)
if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil {
return
}
t = &task
t.TaskId = jsonResponse.Content.Task.Id
return return
} }
func PushResult(result *TaskResult, f *os.File) (err error) { func PushResult(task *Task, f *os.File) (err error) {
if result.WebsiteId == 0 { if task.WebsiteId == 0 {
// Not a real result, don't push // Not a real result, don't push
return nil return nil
} }
@@ -57,10 +131,10 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
return return
} }
err = uploadChunks(result.WebsiteId, f) err = uploadWebsocket(f, task.UploadToken)
if err != nil { if err != nil {
logrus.Errorf("Failed to upload file list: %s", err) logrus.Errorf("Failed to upload file list: %s", err)
err2 := CancelTask(result.WebsiteId) err2 := releaseTask(task, TR_FAIL)
if err2 != nil { if err2 != nil {
logrus.Error(err2) logrus.Error(err2)
} }
@@ -68,91 +142,62 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
} }
// Upload result ignoring errors // Upload result ignoring errors
uploadResult(result) _ = releaseTask(task, TR_OK)
return return
} }
func uploadChunks(websiteId uint64, f *os.File) error { func uploadWebsocket(f *os.File, token string) (err error) {
eof := false
for iter := 1; !eof; iter++ {
// TODO Stream with io.Pipe?
var b bytes.Buffer
multi := multipart.NewWriter(&b) u := url.URL{Scheme: config.WsBucketScheme, Host: config.WsBucketHost, Path: "/upload"}
// Set upload fields header := http.Header{}
var err error header.Add("X-Upload-Token", token)
err = multi.WriteField("token", config.Token) conn, _, err := websocket.DefaultDialer.Dial(u.String(), header)
if err != nil { return err } if err != nil {
err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId)) return
if err != nil { return err }
// Copy chunk to file_list
formFile, err := multi.CreateFormFile("file_list", "file_list")
var n int64
n, err = io.CopyN(formFile, f, config.ChunkSize)
if err != io.EOF && err != nil {
return err
}
if n == 0 {
// Don't upload, no content
return nil
} else if n < config.ChunkSize {
err = nil
// Break at end of iteration
eof = true
}
multi.Close()
for retries := 0; retries < viper.GetInt(ConfUploadRetries); retries++ {
if retries > 0 {
// Error occurred, retry upload
time.Sleep(viper.GetDuration(ConfUploadRetryInterval))
}
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/upload",
&b)
req.Header.Set("content-type", multi.FormDataContentType())
if err != nil { continue }
res, err := serverClient.Do(req)
if err != nil { continue }
res.Body.Close()
if res.StatusCode != http.StatusOK {
logrus.WithField("status", res.Status).
WithField("part", iter).
Errorf("Upload failed")
continue
}
// Upload successful
break
}
logrus.WithField("id", websiteId).
WithField("part", iter).
Infof("Uploaded files chunk")
} }
conn.EnableWriteCompression(true) //TODO: Is this necessary?
socketWriter, _ := conn.NextWriter(websocket.BinaryMessage)
_, _ = io.Copy(socketWriter, f)
err = socketWriter.Close()
if err != nil {
logrus.Error("FIXME: couldn't do file upload")
return
}
err = conn.Close()
if err != nil {
return
}
return nil return nil
} }
func uploadResult(result *TaskResult) (err error) { func releaseTask(task *Task, taskResult ResultCode) (err error) {
resultEnc, err := json.Marshal(result)
if err != nil { panic(err) }
res, err := serverClient.PostForm( req := releaseTaskRequest{
config.ServerUrl + "/task/complete", TaskId: task.TaskId,
url.Values { ResultCode: taskResult,
"token": {config.Token}, // TODO Will implement verification in a later ODDB update
"result": {string(resultEnc)}, Verification: 0,
}, }
resultEnc, err := json.Marshal(&req)
if err != nil {
panic(err)
}
body := bytes.NewBuffer(resultEnc)
res, err := serverClient.Post(
config.TrackerUrl+"/task/release",
"application/json",
body,
) )
if err != nil { return } if err != nil {
return
}
res.Body.Close() res.Body.Close()
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
@@ -162,27 +207,66 @@ func uploadResult(result *TaskResult) (err error) {
return return
} }
func CancelTask(websiteId uint64) (err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/cancel",
url.Values{
"token": {config.Token},
"website_id": {strconv.FormatUint(websiteId, 10)},
},
)
if err != nil { return }
res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to cancel task: %s", res.Status)
}
return
}
type ServerTripper struct{} type ServerTripper struct{}
func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) { func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) {
req.Header.Set("User-Agent", serverUserAgent) req.Header.Set("User-Agent", serverUserAgent)
//TODO: Use task_tracker/client ?
if serverWorker != nil {
req.Header.Add("X-Worker-Id", strconv.Itoa(serverWorker.Id))
req.Header.Add("X-Secret", base64.StdEncoding.EncodeToString(serverWorker.Secret))
}
return http.DefaultTransport.RoundTrip(req) return http.DefaultTransport.RoundTrip(req)
} }
// https://github.com/simon987/task_tracker/blob/master/api/models.go
type releaseTaskRequest struct {
TaskId int64 `json:"task_id"`
ResultCode ResultCode `json:"result"`
Verification int64 `json:"verification"`
}
type WorkerAccessRequest struct {
Assign bool `json:"assign"`
Submit bool `json:"submit"`
Project int `json:"project"`
}
type FetchTaskResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Task struct {
Id int64 `json:"id"`
Priority int64 `json:"priority"`
Project struct {
Id int64 `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
AssignRate rate.Limit `json:"assign_rate"`
SubmitRate rate.Limit `json:"submit_rate"`
} `json:"project"`
Recipe string `json:"recipe"`
} `json:"task"`
} `json:"content"`
}
type TrackerWorker struct {
Alias string `json:"alias"`
Id int `json:"id"`
Secret []byte `json:"secret"`
}
type CreateTrackerWorkerResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Worker TrackerWorker `json:"worker"`
} `json:"content"`
}
type CreateTrackerWorkerRequest struct {
Alias string `json:"alias"`
}

View File

@@ -14,8 +14,8 @@ import (
var globalWait sync.WaitGroup var globalWait sync.WaitGroup
type WorkerContext struct { type WorkerContext struct {
OD *OD OD *OD
Queue *BufferedQueue Queue *BufferedQueue
lastRateLimit time.Time lastRateLimit time.Time
numRateLimits int numRateLimits int
} }
@@ -56,16 +56,16 @@ func (w *WorkerContext) step(results chan<- File, job Job) {
if !shouldRetry(err) { if !shouldRetry(err) {
atomic.AddUint64(&totalAborted, 1) atomic.AddUint64(&totalAborted, 1)
logrus.WithField("url", job.UriStr). //logrus.WithField("url", job.UriStr).
WithError(err). // WithError(err).
Error("Giving up after failure") // Error("Giving up after failure")
return return
} }
if job.Fails > config.Retries { if job.Fails > config.Retries {
atomic.AddUint64(&totalAborted, 1) atomic.AddUint64(&totalAborted, 1)
logrus.WithField("url", job.UriStr). //logrus.WithField("url", job.UriStr).
Errorf("Giving up after %d fails", job.Fails) // Errorf("Giving up after %d fails", job.Fails)
} else { } else {
atomic.AddUint64(&totalRetries, 1) atomic.AddUint64(&totalRetries, 1)
if err == ErrRateLimit { if err == ErrRateLimit {
@@ -88,7 +88,9 @@ func (w *WorkerContext) step(results chan<- File, job Job) {
} }
func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) { func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
if len(job.Uri.Path) == 0 { return } if len(job.Uri.Path) == 0 {
return
}
if job.Uri.Path[len(job.Uri.Path)-1] == '/' { if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
// Load directory // Load directory
links, err := GetDir(job, f) links, err := GetDir(job, f)
@@ -159,10 +161,10 @@ func (w *WorkerContext) queueJob(job Job) {
w.OD.Wait.Add(1) w.OD.Wait.Add(1)
if w.numRateLimits > 0 { if w.numRateLimits > 0 {
if time.Since(w.lastRateLimit) > 5 * time.Second { if time.Since(w.lastRateLimit) > 5*time.Second {
w.numRateLimits = 0 w.numRateLimits = 0
} else { } else {
time.Sleep(time.Duration(math.Sqrt(float64(50 * w.numRateLimits))) * time.Sleep(time.Duration(math.Sqrt(float64(50*w.numRateLimits))) *
100 * time.Millisecond) 100 * time.Millisecond)
} }
} }