mirror of
https://github.com/terorie/od-database-crawler.git
synced 2025-12-13 15:19:03 +00:00
Compare commits
29 Commits
v1.2.1
...
task_track
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a962c60b82 | ||
|
|
24f0bd91f7 | ||
|
|
84c10e1981 | ||
|
|
860fa79327 | ||
|
|
76bc8293d6 | ||
|
|
3470be6086 | ||
|
|
60471a081e | ||
|
|
0b3f0d87fe | ||
|
|
da9c75e392 | ||
|
|
8947e05d0c | ||
|
|
8c5f99d616 | ||
|
|
206ea0e91d | ||
|
|
8b9d8bfd17 | ||
|
|
c9ff102d80 | ||
|
|
88856c1c19 | ||
|
|
9e9b606250 | ||
|
|
326e29e5e4 | ||
|
|
c2acd5463f | ||
|
|
e4d04e6a5f | ||
|
|
9f1402e841 | ||
|
|
7c8ab50ee4 | ||
|
|
281d2d17d6 | ||
|
|
45cbd4d535 | ||
|
|
771d49f2dd | ||
|
|
dbd787aa81 | ||
|
|
cea6c1658b | ||
|
|
885af5bb3b | ||
|
|
b18b70f798 | ||
|
|
9d5f549774 |
15
Dockerfile
Normal file
15
Dockerfile
Normal file
@@ -0,0 +1,15 @@
|
||||
FROM golang:alpine as builder
|
||||
ADD . /go/src/github.com/terorie/od-database-crawler
|
||||
RUN apk add git \
|
||||
&& go get -d -v github.com/terorie/od-database-crawler \
|
||||
&& CGO_ENABLED=0 go install -a \
|
||||
-installsuffix cgo \
|
||||
-ldflags="-s -w" \
|
||||
github.com/terorie/od-database-crawler
|
||||
|
||||
FROM scratch
|
||||
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
|
||||
COPY --from=builder /go/bin/od-database-crawler /bin/
|
||||
WORKDIR /oddb
|
||||
VOLUME [ "/oddb" ]
|
||||
CMD ["/bin/od-database-crawler", "server"]
|
||||
34
README.md
34
README.md
@@ -9,16 +9,46 @@
|
||||
|
||||
https://od-db.the-eye.eu/
|
||||
|
||||
#### Usage
|
||||
## Usage
|
||||
|
||||
### Deploys
|
||||
|
||||
1. With Config File (if `config.yml` found in working dir)
|
||||
- Download [default config](https://github.com/terorie/od-database-crawler/blob/master/config.yml)
|
||||
- Set `server.url` and `server.token`
|
||||
- Start with `./od-database-crawler server --config <file>`
|
||||
|
||||
|
||||
2. With Flags or env
|
||||
- Override config file if it exists
|
||||
- `--help` for list of flags
|
||||
- Every flag is available as an environment variable:
|
||||
`--server.crawl_stats` ➡️ `OD_SERVER_CRAWL_STATS`
|
||||
- Start with `./od-database-crawler server <flags>`
|
||||
|
||||
3. With Docker
|
||||
```bash
|
||||
docker run \
|
||||
-e OD_SERVER_URL=xxx \
|
||||
-e OD_SERVER_TOKEN=xxx \
|
||||
terorie/od-database-crawler
|
||||
```
|
||||
|
||||
### Flag reference
|
||||
|
||||
Here are the most important config flags. For more fine control, take a look at `/config.yml`.
|
||||
|
||||
| Flag/Environment | Description | Example |
|
||||
| ------------------------------------------------------- | ------------------------------------------------------------ | ----------------------------------- |
|
||||
| `server.url`<br />`OD_SERVER_URL` | OD-DB Server URL | `https://od-db.mine.the-eye.eu/api` |
|
||||
| `server.token`<br />`OD_SERVER_TOKEN` | OD-DB Server Access Token | _Ask Hexa **TM**_ |
|
||||
| `server.recheck`<br />`OD_SERVER_RECHECK` | Job Fetching Interval | `3s` |
|
||||
| `output.crawl_stats`<br />`OD_OUTPUT_CRAWL_STATS` | Crawl Stats Logging Interval (0 = disabled) | `500ms` |
|
||||
| `output.resource_stats`<br />`OD_OUTPUT_RESORUCE_STATS` | Resource Stats Logging Interval (0 = disabled) | `8s` |
|
||||
| `output.log`<br />`OD_OUTPUT_LOG` | Log File (none = disabled) | `crawler.log` |
|
||||
| `crawl.tasks`<br />`OD_CRAWL_TASKS` | Max number of sites to crawl concurrently | `500` |
|
||||
| `crawl.connections`<br />`OD_CRAWL_CONNECTIONS` | HTTP connections per site | `1` |
|
||||
| `crawl.retries`<br />`OD_CRAWL_RETRIES` | How often to retry after a temporary failure (e.g. `HTTP 429` or timeouts) | `5` |
|
||||
| `crawl.dial_timeout`<br />`OD_CRAWL_DIAL_TIMEOUT` | TCP Connect timeout | `5s` |
|
||||
| `crawl.timeout`<br />`OD_CRAWL_TIMEOUT` | HTTP request timeout | `20s` |
|
||||
| `crawl.user-agent`<br />`OD_CRAWL_USER_AGENT` | HTTP Crawler User-Agent | `googlebot/1.2.3` |
|
||||
| `crawl.job_buffer`<br />`OD_CRAWL_JOB_BUFFER` | Number of URLs to keep in memory/cache, per job. The rest is offloaded to disk. Decrease this value if the crawler uses too much RAM. (0 = Disable Cache, -1 = Only use Cache) | `5000` |
|
||||
|
||||
209
config.go
209
config.go
@@ -4,6 +4,7 @@ import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/spf13/viper"
|
||||
"io"
|
||||
"os"
|
||||
@@ -12,36 +13,44 @@ import (
|
||||
)
|
||||
|
||||
var config struct {
|
||||
ServerUrl string
|
||||
Token string
|
||||
ServerTimeout time.Duration
|
||||
Recheck time.Duration
|
||||
ChunkSize int64
|
||||
Retries int
|
||||
Workers int
|
||||
UserAgent string
|
||||
Tasks int32
|
||||
Verbose bool
|
||||
PrintHTTP bool
|
||||
JobBufferSize int
|
||||
TrackerUrl string
|
||||
TrackerProject int
|
||||
TrackerAlias string
|
||||
WsBucketScheme string
|
||||
WsBucketHost string
|
||||
ServerTimeout time.Duration
|
||||
Recheck time.Duration
|
||||
ChunkSize int64
|
||||
Retries int
|
||||
Workers int
|
||||
UserAgent string
|
||||
Tasks int32
|
||||
Verbose bool
|
||||
PrintHTTP bool
|
||||
JobBufferSize int
|
||||
}
|
||||
|
||||
var onlineMode bool
|
||||
|
||||
const (
|
||||
ConfServerUrl = "server.url"
|
||||
ConfToken = "server.token"
|
||||
ConfServerTimeout = "server.timeout"
|
||||
ConfRecheck = "server.recheck"
|
||||
ConfCooldown = "server.cooldown"
|
||||
ConfChunkSize = "server.upload_chunk"
|
||||
ConfUploadRetries = "server.upload_retries"
|
||||
ConfTrackerUrl = "server.url"
|
||||
ConfTrackerProject = "server.project"
|
||||
ConfTrackerAlias = "server.alias"
|
||||
ConfWsBucketScheme = "server.ws_bucket_scheme"
|
||||
ConfWsBucketHost = "server.ws_bucket_host"
|
||||
ConfServerTimeout = "server.timeout"
|
||||
ConfRecheck = "server.recheck"
|
||||
ConfCooldown = "server.cooldown"
|
||||
ConfChunkSize = "server.upload_chunk"
|
||||
ConfUploadRetries = "server.upload_retries"
|
||||
ConfUploadRetryInterval = "server.upload_retry_interval"
|
||||
|
||||
ConfTasks = "crawl.tasks"
|
||||
ConfRetries = "crawl.retries"
|
||||
ConfWorkers = "crawl.connections"
|
||||
ConfUserAgent = "crawl.user-agent"
|
||||
ConfDialTimeout = "crawl.dial_timeout"
|
||||
ConfTimeout = "crawl.timeout"
|
||||
ConfTasks = "crawl.tasks"
|
||||
ConfRetries = "crawl.retries"
|
||||
ConfWorkers = "crawl.connections"
|
||||
ConfUserAgent = "crawl.user-agent"
|
||||
ConfDialTimeout = "crawl.dial_timeout"
|
||||
ConfTimeout = "crawl.timeout"
|
||||
ConfJobBufferSize = "crawl.job_buffer"
|
||||
|
||||
ConfCrawlStats = "output.crawl_stats"
|
||||
@@ -54,8 +63,62 @@ const (
|
||||
func prepareConfig() {
|
||||
pf := rootCmd.PersistentFlags()
|
||||
|
||||
bind := func(s string) {
|
||||
if err := viper.BindPFlag(s, pf.Lookup(s)); err != nil {
|
||||
pf.SortFlags = false
|
||||
pf.StringVar(&configFile, "config", "", "Config file")
|
||||
configFile = os.Getenv("OD_CONFIG")
|
||||
|
||||
pf.String(ConfTrackerUrl, "https://tt.the-eye.eu/api", "task_tracker api URL")
|
||||
|
||||
pf.String(ConfTrackerProject, "1", "task_tracker project id")
|
||||
|
||||
pf.String(ConfWsBucketScheme, "wss", "ws_bucket scheme")
|
||||
|
||||
pf.String(ConfWsBucketHost, "wsb.the-eye.eu", "ws_bucket host")
|
||||
|
||||
pf.String(ConfTrackerAlias, "changeme", "task_tracker worker alias")
|
||||
|
||||
pf.Duration(ConfServerTimeout, 60*time.Second, "OD-DB request timeout")
|
||||
|
||||
pf.Duration(ConfRecheck, 1*time.Second, "OD-DB: Poll interval for new jobs")
|
||||
|
||||
pf.Duration(ConfCooldown, 1*time.Minute, "OD-DB: Time to wait after a server-side error")
|
||||
|
||||
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
|
||||
|
||||
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
|
||||
|
||||
pf.Duration(ConfUploadRetryInterval, 30*time.Second, "OD-DB: Time to wait between upload retries")
|
||||
|
||||
pf.Uint(ConfTasks, 25, "Crawler: Max concurrent tasks")
|
||||
|
||||
pf.Uint(ConfWorkers, 1, "Crawler: Connections per server")
|
||||
|
||||
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
|
||||
|
||||
pf.Duration(ConfDialTimeout, 10*time.Second, "Crawler: Handshake timeout")
|
||||
|
||||
pf.Duration(ConfTimeout, 30*time.Second, "Crawler: Request timeout")
|
||||
|
||||
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
|
||||
|
||||
pf.Int(ConfJobBufferSize, -1, "Crawler: Task queue cache size")
|
||||
|
||||
pf.Duration(ConfCrawlStats, 500*time.Second, "Log: Crawl stats interval")
|
||||
|
||||
pf.Duration(ConfAllocStats, 500*time.Second, "Log: Resource stats interval")
|
||||
|
||||
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
|
||||
|
||||
pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors")
|
||||
|
||||
pf.String(ConfLogFile, "crawler.log", "Log file")
|
||||
|
||||
// Bind all flags to Viper
|
||||
pf.VisitAll(func(flag *pflag.Flag) {
|
||||
s := flag.Name
|
||||
s = strings.TrimLeft(s, "-")
|
||||
|
||||
if err := viper.BindPFlag(s, flag); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var envKey string
|
||||
@@ -65,71 +128,7 @@ func prepareConfig() {
|
||||
if err := viper.BindEnv(s, envKey); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
pf.SortFlags = false
|
||||
pf.StringVar(&configFile, "config", "", "Config file")
|
||||
configFile = os.Getenv("OD_CONFIG")
|
||||
|
||||
pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL")
|
||||
bind(ConfServerUrl)
|
||||
|
||||
pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)")
|
||||
bind(ConfToken)
|
||||
|
||||
pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout")
|
||||
bind(ConfServerTimeout)
|
||||
|
||||
pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs")
|
||||
bind(ConfRecheck)
|
||||
|
||||
pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error")
|
||||
bind(ConfCooldown)
|
||||
|
||||
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
|
||||
bind(ConfChunkSize)
|
||||
|
||||
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
|
||||
bind(ConfUploadRetries)
|
||||
|
||||
pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries")
|
||||
bind(ConfUploadRetryInterval)
|
||||
|
||||
pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks")
|
||||
bind(ConfTasks)
|
||||
|
||||
pf.Uint(ConfWorkers, 4, "Crawler: Connections per server")
|
||||
bind(ConfWorkers)
|
||||
|
||||
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
|
||||
bind(ConfRetries)
|
||||
|
||||
pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout")
|
||||
bind(ConfDialTimeout)
|
||||
|
||||
pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout")
|
||||
bind(ConfTimeout)
|
||||
|
||||
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
|
||||
bind(ConfUserAgent)
|
||||
|
||||
pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size")
|
||||
bind(ConfJobBufferSize)
|
||||
|
||||
pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval")
|
||||
bind(ConfCrawlStats)
|
||||
|
||||
pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval")
|
||||
bind(ConfAllocStats)
|
||||
|
||||
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
|
||||
bind(ConfVerbose)
|
||||
|
||||
pf.Bool(ConfPrintHTTP, false, "Log: Print HTTP client errors")
|
||||
bind(ConfPrintHTTP)
|
||||
|
||||
pf.String(ConfLogFile, "crawler.log", "Log file")
|
||||
bind(ConfLogFile)
|
||||
})
|
||||
}
|
||||
|
||||
func readConfig() {
|
||||
@@ -157,16 +156,20 @@ func readConfig() {
|
||||
}
|
||||
}
|
||||
|
||||
config.ServerUrl = viper.GetString(ConfServerUrl)
|
||||
if config.ServerUrl == "" {
|
||||
configMissing(ConfServerUrl)
|
||||
if onlineMode {
|
||||
config.TrackerUrl = viper.GetString(ConfTrackerUrl)
|
||||
if config.TrackerUrl == "" {
|
||||
configMissing(ConfTrackerUrl)
|
||||
}
|
||||
config.TrackerUrl = strings.TrimRight(config.TrackerUrl, "/")
|
||||
}
|
||||
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/")
|
||||
config.TrackerProject = viper.GetInt(ConfTrackerProject)
|
||||
|
||||
config.Token = viper.GetString(ConfToken)
|
||||
if config.Token == "" {
|
||||
configMissing(ConfToken)
|
||||
}
|
||||
config.TrackerAlias = viper.GetString(ConfTrackerAlias)
|
||||
|
||||
config.WsBucketHost = viper.GetString(ConfWsBucketHost)
|
||||
|
||||
config.WsBucketScheme = viper.GetString(ConfWsBucketScheme)
|
||||
|
||||
config.ServerTimeout = viper.GetDuration(ConfServerTimeout)
|
||||
|
||||
@@ -206,9 +209,11 @@ func readConfig() {
|
||||
}
|
||||
|
||||
if filePath := viper.GetString(ConfLogFile); filePath != "" {
|
||||
f, err := os.OpenFile(filePath, os.O_CREATE | os.O_WRONLY | os.O_APPEND, 0644)
|
||||
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
||||
bufWriter := bufio.NewWriter(f)
|
||||
if err != nil { panic(err) }
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
exitHooks.Add(func() {
|
||||
bufWriter.Flush()
|
||||
f.Close()
|
||||
|
||||
28
config.yml
28
config.yml
@@ -1,10 +1,14 @@
|
||||
# OD-Database server settings
|
||||
server:
|
||||
# Connection URL
|
||||
url: http://od-db.mine.terorie.com/api
|
||||
|
||||
# Server auth token
|
||||
token:
|
||||
url: https://tt.the-eye.eu/api
|
||||
# OD-Database project id (for crawling)
|
||||
project: 1
|
||||
# Your worker alias
|
||||
alias: changeme
|
||||
# Websocket bucket host & scheme (ws/wss)
|
||||
ws_bucket_host: https://wsb.the-eye.eu
|
||||
ws_bucket_scheme: wss
|
||||
|
||||
# Request timeout
|
||||
timeout: 60s
|
||||
@@ -17,11 +21,7 @@ server:
|
||||
|
||||
# Time to wait after receiving an error
|
||||
# from the server. Doesn't apply to uploads.
|
||||
cooldown: 30s
|
||||
|
||||
# Upload chunk size
|
||||
# If the value is too high, the upload fails.
|
||||
upload_chunk: 1 MB
|
||||
cooldown: 1s
|
||||
|
||||
upload_retries: 10
|
||||
upload_retry_interval: 30s
|
||||
@@ -29,10 +29,10 @@ server:
|
||||
# Log output settings
|
||||
output:
|
||||
# Crawl statistics
|
||||
crawl_stats: 1s
|
||||
crawl_stats: 1m
|
||||
|
||||
# CPU/RAM/Job queue stats
|
||||
resource_stats: 10s
|
||||
resource_stats: 1m
|
||||
|
||||
# More output? (Every listed dir)
|
||||
verbose: false
|
||||
@@ -47,13 +47,13 @@ output:
|
||||
# Crawler settings
|
||||
crawl:
|
||||
# Number of sites that can be processed at once
|
||||
tasks: 100
|
||||
tasks: 25
|
||||
|
||||
# Number of connections per site
|
||||
# Please be careful with this setting!
|
||||
# The crawler fires fast and more than
|
||||
# ten connections can overwhelm a server.
|
||||
connections: 4
|
||||
connections: 1
|
||||
|
||||
# How often to retry getting data
|
||||
# from the site before giving up
|
||||
@@ -81,4 +81,4 @@ crawl:
|
||||
# in memory.
|
||||
# A negative value will cause all jobs
|
||||
# to be stored in memory. (Don't do this)
|
||||
job_buffer: 5000
|
||||
job_buffer: -1
|
||||
|
||||
14
go.mod
14
go.mod
@@ -1,14 +0,0 @@
|
||||
module github.com/syndtr/od-database-crawler
|
||||
|
||||
require (
|
||||
github.com/beeker1121/goque v2.0.1+incompatible
|
||||
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
|
||||
github.com/sirupsen/logrus v1.3.0
|
||||
github.com/spf13/cobra v0.0.3
|
||||
github.com/spf13/viper v1.3.1
|
||||
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 // indirect
|
||||
github.com/terorie/od-database-crawler v1.1.1
|
||||
github.com/valyala/fasthttp v1.1.0
|
||||
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613
|
||||
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3
|
||||
)
|
||||
66
go.sum
66
go.sum
@@ -1,66 +0,0 @@
|
||||
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
|
||||
github.com/beeker1121/goque v2.0.1+incompatible h1:5nJHPMqQLxUvGFc8m/NW2QzxKyc0zICmqs/JUsmEjwE=
|
||||
github.com/beeker1121/goque v2.0.1+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
|
||||
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
|
||||
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
|
||||
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
|
||||
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
|
||||
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
|
||||
github.com/klauspost/compress v1.4.0 h1:8nsMz3tWa9SWWPL60G1V6CUsf4lLjWLTNEtibhe8gh8=
|
||||
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
|
||||
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e h1:+lIPJOWl+jSiJOc70QXJ07+2eg2Jy2EC7Mi11BWujeM=
|
||||
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
|
||||
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
|
||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
|
||||
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
|
||||
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
|
||||
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
|
||||
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
|
||||
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
|
||||
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
|
||||
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
|
||||
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
|
||||
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
|
||||
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
|
||||
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
|
||||
github.com/spf13/viper v1.3.1 h1:5+8j8FTpnFV4nEImW/ofkzEt8VoOiLXxdYIDsB73T38=
|
||||
github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
|
||||
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs=
|
||||
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
|
||||
github.com/terorie/od-database-crawler v1.1.1 h1:Ca+ZqbZX3rVWBR8SDRzvroyxjBtUs75MQXZ9YG0gqGo=
|
||||
github.com/terorie/od-database-crawler v1.1.1/go.mod h1:vVJ7pLkudrlUNp9qu24JCzQ8N6mFsrOmX1tPXr155DQ=
|
||||
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
|
||||
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
|
||||
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
||||
github.com/valyala/fasthttp v1.1.0 h1:3BohG7mqwj4lq7PTX//7gLbUlzNvZSPmuHFnloXT0lw=
|
||||
github.com/valyala/fasthttp v1.1.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
|
||||
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
|
||||
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 h1:MQ/ZZiDsUapFFiMS+vzwXkCTeEKaum+Do5rINYJDmxc=
|
||||
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 h1:czFLhve3vsQetD6JOJ8NZZvGQIXlnN3/yXxbT6/awxI=
|
||||
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A=
|
||||
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
47
jenkins/Jenkinsfile
vendored
Normal file
47
jenkins/Jenkinsfile
vendored
Normal file
@@ -0,0 +1,47 @@
|
||||
def remote = [:]
|
||||
remote.name = 'remote'
|
||||
remote.host = env.DEPLOY_HOST
|
||||
remote.user = env.DEPLOY_USER
|
||||
remote.identityFile = '/var/lib/jenkins/.ssh/id_rsa'
|
||||
remote.knownHosts = '/var/lib/jenkins/.ssh/known_hosts'
|
||||
remote.allowAnyHosts = true
|
||||
remote.retryCount = 3
|
||||
remote.retryWaitSec = 3
|
||||
logLevel = 'FINER'
|
||||
|
||||
pipeline {
|
||||
agent none
|
||||
environment {
|
||||
GOOS='linux'
|
||||
CGO_ENABLED='0'
|
||||
HOME='.'
|
||||
}
|
||||
stages {
|
||||
stage('Build') {
|
||||
agent {
|
||||
docker {
|
||||
image 'golang:latest'
|
||||
}
|
||||
}
|
||||
steps {
|
||||
sh 'mkdir -p /go/src/github.com/terorie/od-database-crawler'
|
||||
sh 'cp -r *.go fasturl ds jenkins/build.sh "/go/src/github.com/terorie/od-database-crawler"'
|
||||
sh 'cd /go/src/github.com/terorie/od-database-crawler && go get ./...'
|
||||
sh './jenkins/build.sh'
|
||||
stash includes: 'dist/', name: 'dist'
|
||||
}
|
||||
}
|
||||
stage('Deploy') {
|
||||
agent none
|
||||
steps {
|
||||
node('master') {
|
||||
unstash 'dist'
|
||||
sshCommand remote: remote, command: "ls od-database-crawler/"
|
||||
sshPut remote: remote, from: 'dist/', into: 'od-database-crawler'
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
23
jenkins/build.sh
Executable file
23
jenkins/build.sh
Executable file
@@ -0,0 +1,23 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
appname="od-database-crawler"
|
||||
outdir="dist/"
|
||||
tag="${BUILD_ID}_$(date +%Y-%m-%d)"
|
||||
|
||||
rm -rf "./${outdir}"
|
||||
mkdir build 2> /dev/null
|
||||
|
||||
name=${outdir}${appname}-${tag}-linux
|
||||
GOOS="linux" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
|
||||
gzip -f ${name}
|
||||
echo ${name}
|
||||
|
||||
name=${outdir}${appname}-${tag}-mac
|
||||
GOOS="darwin" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
|
||||
gzip -f ${name}
|
||||
echo ${name}
|
||||
|
||||
name=${outdir}${appname}-${tag}-freebsd
|
||||
GOOS="freebsd" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
|
||||
gzip -f ${name}
|
||||
echo ${name}
|
||||
64
main.go
64
main.go
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/spf13/viper"
|
||||
"github.com/terorie/od-database-crawler/fasturl"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -17,7 +18,7 @@ var configFile string
|
||||
|
||||
var rootCmd = cobra.Command {
|
||||
Use: "od-database-crawler",
|
||||
Version: "1.2.1",
|
||||
Version: "1.2.2",
|
||||
Short: "OD-Database Go crawler",
|
||||
Long: helpText,
|
||||
PersistentPreRunE: preRun,
|
||||
@@ -61,8 +62,6 @@ func preRun(cmd *cobra.Command, args []string) error {
|
||||
if err := os.MkdirAll("queue", 0755);
|
||||
err != nil { panic(err) }
|
||||
|
||||
readConfig()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -75,25 +74,31 @@ func main() {
|
||||
}
|
||||
|
||||
func cmdBase(_ *cobra.Command, _ []string) {
|
||||
// TODO Graceful shutdown
|
||||
appCtx := context.Background()
|
||||
forceCtx := context.Background()
|
||||
onlineMode = true
|
||||
readConfig()
|
||||
|
||||
appCtx, soft := context.WithCancel(context.Background())
|
||||
forceCtx, hard := context.WithCancel(context.Background())
|
||||
go hardShutdown(forceCtx)
|
||||
go listenCtrlC(soft, hard)
|
||||
|
||||
inRemotes := make(chan *OD)
|
||||
go Schedule(forceCtx, inRemotes)
|
||||
go Schedule(appCtx, inRemotes)
|
||||
|
||||
ticker := time.NewTicker(config.Recheck)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-appCtx.Done():
|
||||
return
|
||||
goto shutdown
|
||||
case <-ticker.C:
|
||||
t, err := FetchTask()
|
||||
if err != nil {
|
||||
logrus.WithError(err).
|
||||
Error("Failed to get new task")
|
||||
time.Sleep(viper.GetDuration(ConfCooldown))
|
||||
if !sleep(viper.GetDuration(ConfCooldown), appCtx) {
|
||||
goto shutdown
|
||||
}
|
||||
continue
|
||||
}
|
||||
if t == nil {
|
||||
@@ -109,13 +114,7 @@ func cmdBase(_ *cobra.Command, _ []string) {
|
||||
if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme {
|
||||
// Not an error
|
||||
err = nil
|
||||
|
||||
// Give back task
|
||||
//err2 := CancelTask(t.WebsiteId)
|
||||
//if err2 != nil {
|
||||
// logrus.Error(err2)
|
||||
//}
|
||||
|
||||
// TODO FTP crawler
|
||||
continue
|
||||
} else if err != nil {
|
||||
logrus.WithError(err).
|
||||
@@ -126,9 +125,15 @@ func cmdBase(_ *cobra.Command, _ []string) {
|
||||
ScheduleTask(inRemotes, t, &baseUri)
|
||||
}
|
||||
}
|
||||
|
||||
shutdown:
|
||||
globalWait.Wait()
|
||||
}
|
||||
|
||||
func cmdCrawler(_ *cobra.Command, args []string) error {
|
||||
onlineMode = false
|
||||
readConfig()
|
||||
|
||||
arg := args[0]
|
||||
// https://github.com/golang/go/issues/19779
|
||||
if !strings.Contains(arg, "://") {
|
||||
@@ -161,3 +166,30 @@ func cmdCrawler(_ *cobra.Command, args []string) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func listenCtrlC(soft, hard context.CancelFunc) {
|
||||
c := make(chan os.Signal)
|
||||
signal.Notify(c, os.Interrupt)
|
||||
|
||||
<-c
|
||||
logrus.Info(">>> Shutting down crawler... <<<")
|
||||
soft()
|
||||
|
||||
<-c
|
||||
logrus.Warning(">>> Force shutdown! <<<")
|
||||
hard()
|
||||
}
|
||||
|
||||
func hardShutdown(c context.Context) {
|
||||
<-c.Done()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
func sleep(d time.Duration, c context.Context) bool {
|
||||
select {
|
||||
case <-time.After(d):
|
||||
return true
|
||||
case <-c.Done():
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
33
model.go
33
model.go
@@ -7,19 +7,29 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type ResultCode int
|
||||
|
||||
const (
|
||||
TR_OK = ResultCode(iota)
|
||||
TR_FAIL = 1
|
||||
TR_SKIP = 2
|
||||
)
|
||||
|
||||
type Task struct {
|
||||
WebsiteId uint64 `json:"website_id"`
|
||||
Url string `json:"url"`
|
||||
WebsiteId uint64 `json:"website_id"`
|
||||
Url string `json:"url"`
|
||||
UploadToken string `json:"upload_token"`
|
||||
TaskId int64
|
||||
}
|
||||
|
||||
type TaskResult struct {
|
||||
StatusCode string `json:"status_code"`
|
||||
FileCount uint64 `json:"file_count"`
|
||||
ErrorCount uint64 `json:"-"`
|
||||
StartTime time.Time `json:"-"`
|
||||
StartTimeUnix int64 `json:"start_time"`
|
||||
EndTimeUnix int64 `json:"end_time"`
|
||||
WebsiteId uint64 `json:"website_id"`
|
||||
ResultCode ResultCode `json:"status_code"`
|
||||
FileCount uint64 `json:"file_count"`
|
||||
ErrorCount uint64 `json:"-"`
|
||||
StartTime time.Time `json:"-"`
|
||||
StartTimeUnix int64 `json:"start_time"`
|
||||
EndTimeUnix int64 `json:"end_time"`
|
||||
WebsiteId uint64 `json:"website_id"`
|
||||
}
|
||||
|
||||
type Job struct {
|
||||
@@ -51,13 +61,16 @@ func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) {
|
||||
defer o.Scanned.Unlock()
|
||||
|
||||
exists = o.Scanned.Get(k)
|
||||
if exists { return true }
|
||||
if exists {
|
||||
return true
|
||||
}
|
||||
|
||||
o.Scanned.Put(k)
|
||||
return false
|
||||
}
|
||||
|
||||
type errorString string
|
||||
|
||||
func (e errorString) Error() string {
|
||||
return string(e)
|
||||
}
|
||||
|
||||
51
scheduler.go
51
scheduler.go
@@ -34,13 +34,16 @@ func Schedule(c context.Context, remotes <-chan *OD) {
|
||||
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
|
||||
|
||||
// Delete existing queue
|
||||
if err := os.RemoveAll(queuePath);
|
||||
err != nil { panic(err) }
|
||||
if err := os.RemoveAll(queuePath); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Start new queue
|
||||
var err error
|
||||
remote.WCtx.Queue, err = OpenQueue(queuePath)
|
||||
if err != nil { panic(err) }
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Spawn workers
|
||||
for i := 0; i < config.Workers; i++ {
|
||||
@@ -77,12 +80,12 @@ func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
|
||||
|
||||
globalWait.Add(1)
|
||||
now := time.Now()
|
||||
od := &OD {
|
||||
Task: *t,
|
||||
od := &OD{
|
||||
Task: *t,
|
||||
BaseUri: *u,
|
||||
Result: TaskResult {
|
||||
WebsiteId: t.WebsiteId,
|
||||
StartTime: now,
|
||||
Result: TaskResult{
|
||||
WebsiteId: t.WebsiteId,
|
||||
StartTime: now,
|
||||
StartTimeUnix: now.Unix(),
|
||||
},
|
||||
}
|
||||
@@ -117,7 +120,7 @@ func (o *OD) Watch(results chan File) {
|
||||
// Open crawl results file
|
||||
f, err := os.OpenFile(
|
||||
filePath,
|
||||
os.O_CREATE | os.O_RDWR | os.O_TRUNC,
|
||||
os.O_CREATE|os.O_RDWR|os.O_TRUNC,
|
||||
0644,
|
||||
)
|
||||
if err != nil {
|
||||
@@ -145,7 +148,7 @@ func (o *OD) Watch(results chan File) {
|
||||
}
|
||||
|
||||
// Upload results
|
||||
err = PushResult(&o.Result, f)
|
||||
err = PushResult(&o.Task, f)
|
||||
if err != nil {
|
||||
logrus.WithError(err).
|
||||
Error("Failed uploading crawl results")
|
||||
@@ -170,24 +173,18 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
|
||||
// Log finish
|
||||
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"id": o.Task.WebsiteId,
|
||||
"url": o.BaseUri.String(),
|
||||
"id": o.Task.WebsiteId,
|
||||
"url": o.BaseUri.String(),
|
||||
"duration": time.Since(o.Result.StartTime),
|
||||
}).Info("Crawler finished")
|
||||
|
||||
// Set status code
|
||||
now := time.Now()
|
||||
o.Result.EndTimeUnix = now.Unix()
|
||||
fileCount := atomic.LoadUint64(&o.Result.FileCount)
|
||||
if fileCount == 0 {
|
||||
errorCount := atomic.LoadUint64(&o.Result.ErrorCount)
|
||||
if errorCount == 0 {
|
||||
o.Result.StatusCode = "empty"
|
||||
} else {
|
||||
o.Result.StatusCode = "directory listing failed"
|
||||
}
|
||||
if atomic.LoadUint64(&o.Result.ErrorCount) != 0 {
|
||||
o.Result.ResultCode = TR_FAIL
|
||||
} else {
|
||||
o.Result.StatusCode = "success"
|
||||
o.Result.ResultCode = TR_OK
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,11 +202,17 @@ func (t *Task) collect(results chan File, f *os.File) error {
|
||||
result.Path = fasturl.PathUnescape(result.Path)
|
||||
result.Name = fasturl.PathUnescape(result.Name)
|
||||
resJson, err := json.Marshal(result)
|
||||
if err != nil { panic(err) }
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = f.Write(resJson)
|
||||
if err != nil { return err }
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = f.Write([]byte{'\n'})
|
||||
if err != nil { return err }
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
304
server.go
304
server.go
@@ -2,48 +2,125 @@ package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/fasthttp/websocket"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/spf13/viper"
|
||||
"golang.org/x/time/rate"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
var serverClient = http.Client {
|
||||
Timeout: config.ServerTimeout,
|
||||
var serverWorker *TrackerWorker
|
||||
|
||||
var serverClient = http.Client{
|
||||
Timeout: config.ServerTimeout,
|
||||
Transport: new(ServerTripper),
|
||||
}
|
||||
|
||||
var serverUserAgent = "od-database-crawler/" + rootCmd.Version
|
||||
|
||||
func getOrCreateWorker() {
|
||||
|
||||
if _, err := os.Stat("worker.json"); os.IsNotExist(err) {
|
||||
req := CreateTrackerWorkerRequest{
|
||||
Alias: config.TrackerAlias,
|
||||
}
|
||||
body, _ := json.Marshal(&req)
|
||||
buf := bytes.NewBuffer(body)
|
||||
resp, _ := serverClient.Post(config.TrackerUrl+"/worker/create", "application/json", buf)
|
||||
|
||||
workerResponse := CreateTrackerWorkerResponse{}
|
||||
respBody, _ := ioutil.ReadAll(resp.Body)
|
||||
_ = json.Unmarshal(respBody, &workerResponse)
|
||||
|
||||
workerJsonData, _ := json.Marshal(&workerResponse.Content.Worker)
|
||||
fp, _ := os.OpenFile("worker.json", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
|
||||
_, _ = fp.Write(workerJsonData)
|
||||
|
||||
//Request ASSIGN permission
|
||||
serverWorker = &workerResponse.Content.Worker
|
||||
accessReq, _ := json.Marshal(WorkerAccessRequest{
|
||||
Project: config.TrackerProject,
|
||||
Assign: true,
|
||||
Submit: false,
|
||||
})
|
||||
buf = bytes.NewBuffer(accessReq)
|
||||
res, err := serverClient.Post(config.TrackerUrl+"/project/request_access", "application/json", buf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"response": res.StatusCode,
|
||||
}).Info("Requested ASSIGN permission")
|
||||
} else {
|
||||
var worker TrackerWorker
|
||||
|
||||
fp, _ := os.OpenFile("worker.json", os.O_RDONLY, 0600)
|
||||
workerJsonData, _ := ioutil.ReadAll(fp)
|
||||
_ = json.Unmarshal(workerJsonData, &worker)
|
||||
|
||||
serverWorker = &worker
|
||||
}
|
||||
}
|
||||
|
||||
func FetchTask() (t *Task, err error) {
|
||||
res, err := serverClient.PostForm(
|
||||
config.ServerUrl + "/task/get",
|
||||
url.Values{ "token": {config.Token} })
|
||||
if err != nil { return }
|
||||
|
||||
if serverWorker == nil {
|
||||
getOrCreateWorker()
|
||||
}
|
||||
|
||||
res, err := serverClient.Get(config.TrackerUrl + "/task/get/" + strconv.Itoa(config.TrackerProject))
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
switch res.StatusCode {
|
||||
case 200:
|
||||
break
|
||||
case 404, 500:
|
||||
return nil, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("http %s", res.Status)
|
||||
}
|
||||
|
||||
t = new(Task)
|
||||
err = json.NewDecoder(res.Body).Decode(t)
|
||||
if err != nil { return }
|
||||
jsonResponse := FetchTaskResponse{}
|
||||
err = json.NewDecoder(res.Body).Decode(&jsonResponse)
|
||||
if _, ok := err.(*json.SyntaxError); ok {
|
||||
return nil, fmt.Errorf("/task/get returned invalid JSON")
|
||||
} else if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if !jsonResponse.Ok {
|
||||
if jsonResponse.Message == "No task available" {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, errors.New(jsonResponse.Message)
|
||||
}
|
||||
|
||||
task := Task{}
|
||||
err = json.Unmarshal([]byte(jsonResponse.Content.Task.Recipe), &task)
|
||||
if _, ok := err.(*json.SyntaxError); ok {
|
||||
return nil, fmt.Errorf("/task/get returned invalid JSON")
|
||||
} else if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
t = &task
|
||||
t.TaskId = jsonResponse.Content.Task.Id
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func PushResult(result *TaskResult, f *os.File) (err error) {
|
||||
if result.WebsiteId == 0 {
|
||||
func PushResult(task *Task, f *os.File) (err error) {
|
||||
if task.WebsiteId == 0 {
|
||||
// Not a real result, don't push
|
||||
return nil
|
||||
}
|
||||
@@ -54,10 +131,10 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
err = uploadChunks(result.WebsiteId, f)
|
||||
err = uploadWebsocket(f, task.UploadToken)
|
||||
if err != nil {
|
||||
logrus.Errorf("Failed to upload file list: %s", err)
|
||||
err2 := CancelTask(result.WebsiteId)
|
||||
err2 := releaseTask(task, TR_FAIL)
|
||||
if err2 != nil {
|
||||
logrus.Error(err2)
|
||||
}
|
||||
@@ -65,91 +142,62 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
|
||||
}
|
||||
|
||||
// Upload result ignoring errors
|
||||
uploadResult(result)
|
||||
_ = releaseTask(task, TR_OK)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func uploadChunks(websiteId uint64, f *os.File) error {
|
||||
eof := false
|
||||
for iter := 1; !eof; iter++ {
|
||||
// TODO Stream with io.Pipe?
|
||||
var b bytes.Buffer
|
||||
func uploadWebsocket(f *os.File, token string) (err error) {
|
||||
|
||||
multi := multipart.NewWriter(&b)
|
||||
u := url.URL{Scheme: config.WsBucketScheme, Host: config.WsBucketHost, Path: "/upload"}
|
||||
|
||||
// Set upload fields
|
||||
var err error
|
||||
err = multi.WriteField("token", config.Token)
|
||||
if err != nil { return err }
|
||||
err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId))
|
||||
if err != nil { return err }
|
||||
|
||||
// Copy chunk to file_list
|
||||
formFile, err := multi.CreateFormFile("file_list", "file_list")
|
||||
var n int64
|
||||
n, err = io.CopyN(formFile, f, config.ChunkSize)
|
||||
if err != io.EOF && err != nil {
|
||||
return err
|
||||
}
|
||||
if n == 0 {
|
||||
// Don't upload, no content
|
||||
return nil
|
||||
} else if n < config.ChunkSize {
|
||||
err = nil
|
||||
// Break at end of iteration
|
||||
eof = true
|
||||
}
|
||||
|
||||
multi.Close()
|
||||
|
||||
for retries := 0; retries < viper.GetInt(ConfUploadRetries); retries++ {
|
||||
if retries > 0 {
|
||||
// Error occurred, retry upload
|
||||
time.Sleep(viper.GetDuration(ConfUploadRetryInterval))
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(
|
||||
http.MethodPost,
|
||||
config.ServerUrl + "/task/upload",
|
||||
&b)
|
||||
req.Header.Set("content-type", multi.FormDataContentType())
|
||||
if err != nil { continue }
|
||||
|
||||
res, err := serverClient.Do(req)
|
||||
if err != nil { continue }
|
||||
res.Body.Close()
|
||||
|
||||
if res.StatusCode != http.StatusOK {
|
||||
logrus.WithField("status", res.Status).
|
||||
WithField("part", iter).
|
||||
Errorf("Upload failed")
|
||||
continue
|
||||
}
|
||||
|
||||
// Upload successful
|
||||
break
|
||||
}
|
||||
|
||||
logrus.WithField("id", websiteId).
|
||||
WithField("part", iter).
|
||||
Infof("Uploaded files chunk")
|
||||
header := http.Header{}
|
||||
header.Add("X-Upload-Token", token)
|
||||
conn, _, err := websocket.DefaultDialer.Dial(u.String(), header)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
conn.EnableWriteCompression(true) //TODO: Is this necessary?
|
||||
|
||||
socketWriter, _ := conn.NextWriter(websocket.BinaryMessage)
|
||||
_, _ = io.Copy(socketWriter, f)
|
||||
err = socketWriter.Close()
|
||||
if err != nil {
|
||||
logrus.Error("FIXME: couldn't do file upload")
|
||||
return
|
||||
}
|
||||
err = conn.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func uploadResult(result *TaskResult) (err error) {
|
||||
resultEnc, err := json.Marshal(result)
|
||||
if err != nil { panic(err) }
|
||||
func releaseTask(task *Task, taskResult ResultCode) (err error) {
|
||||
|
||||
res, err := serverClient.PostForm(
|
||||
config.ServerUrl + "/task/complete",
|
||||
url.Values {
|
||||
"token": {config.Token},
|
||||
"result": {string(resultEnc)},
|
||||
},
|
||||
req := releaseTaskRequest{
|
||||
TaskId: task.TaskId,
|
||||
ResultCode: taskResult,
|
||||
// TODO Will implement verification in a later ODDB update
|
||||
Verification: 0,
|
||||
}
|
||||
|
||||
resultEnc, err := json.Marshal(&req)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
body := bytes.NewBuffer(resultEnc)
|
||||
|
||||
res, err := serverClient.Post(
|
||||
config.TrackerUrl+"/task/release",
|
||||
"application/json",
|
||||
body,
|
||||
)
|
||||
if err != nil { return }
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
res.Body.Close()
|
||||
|
||||
if res.StatusCode != http.StatusOK {
|
||||
@@ -159,20 +207,66 @@ func uploadResult(result *TaskResult) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func CancelTask(websiteId uint64) (err error) {
|
||||
res, err := serverClient.PostForm(
|
||||
config.ServerUrl + "/task/cancel",
|
||||
url.Values{
|
||||
"token": {config.Token},
|
||||
"website_id": {strconv.FormatUint(websiteId, 10)},
|
||||
},
|
||||
)
|
||||
if err != nil { return }
|
||||
res.Body.Close()
|
||||
type ServerTripper struct{}
|
||||
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("failed to cancel task: %s", res.Status)
|
||||
func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) {
|
||||
req.Header.Set("User-Agent", serverUserAgent)
|
||||
|
||||
//TODO: Use task_tracker/client ?
|
||||
if serverWorker != nil {
|
||||
req.Header.Add("X-Worker-Id", strconv.Itoa(serverWorker.Id))
|
||||
req.Header.Add("X-Secret", base64.StdEncoding.EncodeToString(serverWorker.Secret))
|
||||
}
|
||||
|
||||
return
|
||||
return http.DefaultTransport.RoundTrip(req)
|
||||
}
|
||||
|
||||
// https://github.com/simon987/task_tracker/blob/master/api/models.go
|
||||
|
||||
type releaseTaskRequest struct {
|
||||
TaskId int64 `json:"task_id"`
|
||||
ResultCode ResultCode `json:"result"`
|
||||
Verification int64 `json:"verification"`
|
||||
}
|
||||
|
||||
type WorkerAccessRequest struct {
|
||||
Assign bool `json:"assign"`
|
||||
Submit bool `json:"submit"`
|
||||
Project int `json:"project"`
|
||||
}
|
||||
|
||||
type FetchTaskResponse struct {
|
||||
Ok bool `json:"ok"`
|
||||
Message string `json:"message"`
|
||||
Content struct {
|
||||
Task struct {
|
||||
Id int64 `json:"id"`
|
||||
Priority int64 `json:"priority"`
|
||||
Project struct {
|
||||
Id int64 `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Version string `json:"version"`
|
||||
AssignRate rate.Limit `json:"assign_rate"`
|
||||
SubmitRate rate.Limit `json:"submit_rate"`
|
||||
} `json:"project"`
|
||||
Recipe string `json:"recipe"`
|
||||
} `json:"task"`
|
||||
} `json:"content"`
|
||||
}
|
||||
|
||||
type TrackerWorker struct {
|
||||
Alias string `json:"alias"`
|
||||
Id int `json:"id"`
|
||||
Secret []byte `json:"secret"`
|
||||
}
|
||||
|
||||
type CreateTrackerWorkerResponse struct {
|
||||
Ok bool `json:"ok"`
|
||||
Message string `json:"message"`
|
||||
Content struct {
|
||||
Worker TrackerWorker `json:"worker"`
|
||||
} `json:"content"`
|
||||
}
|
||||
|
||||
type CreateTrackerWorkerRequest struct {
|
||||
Alias string `json:"alias"`
|
||||
}
|
||||
|
||||
22
worker.go
22
worker.go
@@ -14,8 +14,8 @@ import (
|
||||
var globalWait sync.WaitGroup
|
||||
|
||||
type WorkerContext struct {
|
||||
OD *OD
|
||||
Queue *BufferedQueue
|
||||
OD *OD
|
||||
Queue *BufferedQueue
|
||||
lastRateLimit time.Time
|
||||
numRateLimits int
|
||||
}
|
||||
@@ -56,16 +56,16 @@ func (w *WorkerContext) step(results chan<- File, job Job) {
|
||||
|
||||
if !shouldRetry(err) {
|
||||
atomic.AddUint64(&totalAborted, 1)
|
||||
logrus.WithField("url", job.UriStr).
|
||||
WithError(err).
|
||||
Error("Giving up after failure")
|
||||
//logrus.WithField("url", job.UriStr).
|
||||
// WithError(err).
|
||||
// Error("Giving up after failure")
|
||||
return
|
||||
}
|
||||
|
||||
if job.Fails > config.Retries {
|
||||
atomic.AddUint64(&totalAborted, 1)
|
||||
logrus.WithField("url", job.UriStr).
|
||||
Errorf("Giving up after %d fails", job.Fails)
|
||||
//logrus.WithField("url", job.UriStr).
|
||||
// Errorf("Giving up after %d fails", job.Fails)
|
||||
} else {
|
||||
atomic.AddUint64(&totalRetries, 1)
|
||||
if err == ErrRateLimit {
|
||||
@@ -88,7 +88,9 @@ func (w *WorkerContext) step(results chan<- File, job Job) {
|
||||
}
|
||||
|
||||
func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
|
||||
if len(job.Uri.Path) == 0 { return }
|
||||
if len(job.Uri.Path) == 0 {
|
||||
return
|
||||
}
|
||||
if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
|
||||
// Load directory
|
||||
links, err := GetDir(job, f)
|
||||
@@ -159,10 +161,10 @@ func (w *WorkerContext) queueJob(job Job) {
|
||||
w.OD.Wait.Add(1)
|
||||
|
||||
if w.numRateLimits > 0 {
|
||||
if time.Since(w.lastRateLimit) > 5 * time.Second {
|
||||
if time.Since(w.lastRateLimit) > 5*time.Second {
|
||||
w.numRateLimits = 0
|
||||
} else {
|
||||
time.Sleep(time.Duration(math.Sqrt(float64(50 * w.numRateLimits))) *
|
||||
time.Sleep(time.Duration(math.Sqrt(float64(50*w.numRateLimits))) *
|
||||
100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user